Skip to content
Snippets Groups Projects
Commit 81790f47 authored by Marko Mecina's avatar Marko Mecina
Browse files

add generic socket communication utilities and IWF EGSE command interface library

parent f6511466
No related branches found
No related tags found
No related merge requests found
#!/usr/bin/env python3
"""
General purpose socket communication utility functions
"""
import queue
import select
import socket
import threading
import time
class Connector:
RECV_NBYTES = 4096
def __init__(self, host, port, is_server=False, response_to=2, recv_nbytes_min=0, save_to_file=None, msgdecoding='hex', resp_decoder=None):
self.sock_timeout = 10
self.response_to = response_to
self.host = host
self.port = port
self.isserver = is_server
self.recv_nbytes_min = recv_nbytes_min
self.msgdecoding = msgdecoding
self.resp_decoder = resp_decoder
self.conn = None
self.log = []
self._storagefd = None
self.receiver = None
self._startup(save_to_file)
def _startup(self, save_to):
self.setup_port()
if save_to is not None:
self.setup_storage(save_to)
def setup_storage(self, fname):
self._storagefd = open(fname, 'w')
def setup_port(self):
self.sockfd = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sockfd.settimeout(self.sock_timeout)
if self.isserver:
self.sockfd.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.sockfd.bind((self.host, self.port))
self.sockfd.listen()
print('Listening for connections on {}:{}'.format(self.host, self.port))
def _connect(self):
if self.isserver:
self.conn, addr = self.sockfd.accept()
print('Got connection on {}:{}'.format(self.host, self.port))
else:
self.sockfd.connect((self.host, self.port))
self.conn = self.sockfd
print('Connected to {}:{}'.format(self.host, self.port))
self.conn.settimeout(self.response_to)
def _close(self, servershtdwn):
self.conn.close()
print('Closed connection to {}:{}'.format(self.host, self.port))
if servershtdwn:
print('Closing server {}'.format(self.sockfd.getsockname()))
self.sockfd.close()
def connect(self):
if self.conn is not None and self.conn.fileno() < 0:
self.setup_port()
self._connect()
def close(self):
self._close(False)
def close_server(self):
if self.isserver:
self._close(True)
else:
print('Not a server')
def close_storage(self):
if self._storagefd is None:
print('No file to close')
return
self._storagefd.close()
self._storagefd = None
def dump_log(self, fname, hexsep=''):
log = '\n'.join(['{:.3f}\t{}\t{}'.format(t, _msgdecoder(msg, self.msgdecoding, sep=hexsep), _msgdecoder(resp, self.msgdecoding, sep=hexsep)) for (t, msg, resp) in self.log])
with open(fname, 'w') as fd:
fd.write(log)
def send(self, msg, rx=True, output=False):
if hasattr(msg, 'raw'):
msg = msg.raw
if self.conn is not None:
self.conn.sendall(msg)
t = time.time()
resp = b''
if rx:
resp = self._recv_response()
self.log.append((t, msg, resp))
if self._storagefd is not None:
self._storagefd.write('{:.3f}\t{}\t{}\n'.format(t, _msgdecoder(msg, self.msgdecoding), _msgdecoder(resp, self.msgdecoding)))
self._storagefd.flush()
if output:
print('{:.3f}: SENT {} | RECV: {}'.format(t, _msgdecoder(msg, self.msgdecoding), _msgdecoder(resp, self.msgdecoding)))
return resp if self.resp_decoder is None else self.resp_decoder(resp)
else:
print('Not connected!')
def recv(self, nbytes=None):
if nbytes is None and self.recv_nbytes_min != 0:
nbytes = self.recv_nbytes_min
elif nbytes is None:
nbytes = self.RECV_NBYTES
return self.conn.recv(nbytes)
def _recv_response(self):
data = b''
try:
if self.recv_nbytes_min != 0:
while len(data) < self.recv_nbytes_min:
data += self.conn.recv(self.recv_nbytes_min - len(data))
else:
data += self.conn.recv(self.RECV_NBYTES)
except Exception as err:
print('No/invalid response ({})'.format(err))
finally:
return data
def set_response_to(self, seconds):
self.conn.settimeout(seconds)
self.response_to = seconds
def start_receiver(self, procfunc=None):
if self.conn is None:
print('No connection')
return
if self.receiver is None:
self.receiver = Receiver([self.conn], procfunc=procfunc)
self.receiver.start()
else:
print('Receiver already initialised')
class Receiver:
RECV_BYTES = 4096
SEL_TIMEOUT = 2
RECV_BUF_SIZE = 1024**3
def __init__(self, sockfds, procfunc=None, recv_buf_size=RECV_BUF_SIZE):
self.sockfds = sockfds
self.recvd_data_buf = queue.Queue(recv_buf_size)
self._procfunc = procfunc
self._recv_thread = None
self._proc_thread = None
self.proc_data = []
self._isrunning = False
def start(self):
if (self._recv_thread is None) or (not self._recv_thread.is_alive()):
self._start_recv()
else:
print('Already running!')
if self._procfunc is not None:
if (self._proc_thread is None) or (not self._proc_thread.is_alive()):
self._start_processing()
def stop(self):
self._isrunning = False
def _start_recv(self):
self._isrunning = True
self._recv_thread = threading.Thread(target=self._recv_worker, name='recv_worker')
self._recv_thread.start()
def _recv_worker(self):
for sockfd in self.sockfds:
if sockfd is not None:
print('Receiving from socket {}:{}'.format(*sockfd.getpeername()))
else:
self.sockfds.remove(sockfd)
while self._isrunning:
try:
rd, wr, er = select.select(self.sockfds, [], self.sockfds, self.SEL_TIMEOUT)
for sock in rd:
self.recvd_data_buf.put((time.time(), sock.recv(self.RECV_BYTES)))
# print(self.recvd_data.get())
for sock in er:
print('Error in {}'.format(sock.getpeername()))
self.sockfds.remove(sock)
if not self.sockfds:
self.stop()
except socket.timeout:
continue
except (ValueError, OSError):
self.stop()
break
# for sockfd in self.sockfds:
# print('Stopped receiving from socket {}:{}'.format(*sockfd.getpeername()))
print('Receiving stopped')
def _start_processing(self):
self._proc_thread = threading.Thread(target=self._proc_worker, name='proc_worker')
self._proc_thread.daemon = True
self._proc_thread.start()
def _proc_worker(self):
while self._isrunning:
try:
t, data = self.recvd_data_buf.get(timeout=1)
self.proc_data += self._procfunc(data)
except queue.Empty:
continue
except Exception as err:
print(err)
print('Processing stopped')
break
def _msgdecoder(msg, fmt, sep=''):
if fmt == 'hex':
return hexify(msg, sep=sep)
elif fmt == 'ascii':
return toascii(msg)
else:
raise NotImplementedError('Unknown decoding style {}'.format(fmt))
def hexify(bs, sep=''):
if bs is None:
bs = b''
return bs.hex().upper() if sep == '' else bs.hex(sep).upper()
def toascii(bs, errors='replace'):
return bs.decode('ascii', errors=errors)
"""
IWF EGSE communication library
Ref: SMILE-IWF-PL-IF-048
"""
PORT = 8089
EOP = b'\x0D\x0A' # CR/LF
ERRORCODE = {
b'\x30': 'Command OK',
b'\x31': 'Parameter 1 NOT OK',
b'\x32': 'Parameter 2 NOT OK',
b'\x33': 'Parameter 3 NOT OK',
b'\x34': 'Parameter 4 NOT OK',
b'\x35': 'Parameter 5 NOT OK',
b'\x36': 'Parameter 6 NOT OK',
b'\x37': 'Command NOT ALLOWED',
b'\x38': 'Command lenght NOT OK',
b'\x39': 'Command UNKNOWN'
}
RESPONSE_ID = {
b's': ('currentStatus', 10),
b'x': ('execReset', 4),
b'a': ('settledNewDelay', 4),
b'b': ('handledErrorInjection', 4),
b'c': ('settledRSERegValue', 4),
b'd': ('receivedRSEData', 28),
b'e': ('changedPSUReportPeriod', 4),
b'f': ('settledPSUOK', 4),
b'g': ('settledPSUAnalogueValue', 4),
b'h': ('settledPWM', 4),
b'k': ('newPSUStatus', 26),
b'm': ('changedFEEReportPeriod', 4),
b'n': ('newFEEStatus', 13),
b'p': ('newFEEPWRStatus', 11),
b'q': ('settledMaxLoads', 4),
b'r': ('settledRSMEndSwitch', 4),
b't': ('newRSMStatus', 13),
b'u': ('newPSUEBOXStatus', 9),
b'v': ('changedFEEPWRReportPeriod', 4),
b'w': ('changedRSEReportPeriod', 4)
}
class Command:
# GENERAL
@staticmethod
def get_status():
"""
The general status of the IWF EGSE is requested with this command.
:return:
"""
return 'S'.encode('ascii') + EOP
@staticmethod
def reset():
"""
The FPGA in the IWF_EGSE is reset.
:return:
"""
return 'X'.encode('ascii') + EOP
### DPU EGSE Interface ###
# RSE
@staticmethod
def set_new_delay(delay):
"""
RSE Interface
With this command the delay between the received command and the generated response can be changed. (The default value after power-on is 2 baud)
:param delay: Contains the 2Byte Unsigned-Integer as ASCII coded hexadecimal value to set
:return:
"""
delay = _hexasciify(delay, 4)
return 'A'.encode('ascii') + delay.encode('ascii') + EOP
@staticmethod
def inject_errors(error_type, apply_to, num_errors, error_reg, error_resp, byte_sel):
"""
With this command errors can be injected to the serial communication RSE - DPU (it can be used to abort the error injection, too).
:param error_type: Selects the error type to inject
:param apply_to: Selects when to apply error injection
:param num_errors: Selects the number of errors to inject. Use '-1' to inject the error endless. Contains the 4Byte Signed-Integer as ASCII coded hexadecimal number.
:param error_reg: Contains the defined address as 1Byte Unsigned-Integer as ASCII coded hexadecimal. Only used, if ApplyTO is 'defined address only'
:param error_resp: Contains the error response as 1Byte Unsigned-Integer as ASCII coded hexadecimal. Only used, if ErrorType is “send an error response”
:param byte_sel: Selects the bytes to inject frame or parity errors
:return:
"""
error_type = _hexasciify(error_type, 1)
apply_to = _hexasciify(apply_to, 1)
num_errors = _hexasciify(num_errors, 4, signed=True)
error_reg = _hexasciify(error_reg, 2)
error_resp = _hexasciify(error_resp, 2)
byte_sel = _hexasciify(byte_sel, 1)
params = ''.join([error_type, apply_to, num_errors, error_reg, error_resp, byte_sel])
return 'B'.encode('ascii') + params.encode('ascii') + EOP
@staticmethod
def set_rse_reg_value(register_address, value):
"""
A new value is set to a register address in the myRIO FPGA.
:param register_address: Contains the 1Byte Unsigned-Integer as ASCII coded hexadecimal address of the register
:param value: Contains the 1Byte Unsigned-Integer as ASCII coded hexadecimal value to set
:return:
"""
register_address = _hexasciify(register_address, 2)
value = _hexasciify(value, 2)
params = ''.join([register_address, value])
return 'C'.encode('ascii') + params.encode('ascii') + EOP
# PSU
@staticmethod
def change_psu_report_period(newperiod):
"""
The default report period of 1 second can be changed with this command.
:param newperiod: Contains the new period in ms as 2Byte Unsigned-Integer as ASCII coded hexadecimal
:return:
"""
newperiod = _hexasciify(newperiod, 4)
return 'E'.encode('ascii') + newperiod.encode('ascii') + EOP
@staticmethod
def set_psu_ok_signal(ok_signal, output):
"""
The output of an IWF_EGSE_xxx_OK signal is set as selected with this command.
:param ok_signal: Selects the signal
:param output: Selects the value
:return:
"""
ok_signal = _hexasciify(ok_signal, 1)
output = _hexasciify(output, 1)
params = ''.join([ok_signal, output])
return 'F'.encode('ascii') + params.encode('ascii') + EOP
@staticmethod
def set_psu_analogue_value(i_signal, output):
"""
The analogue output IWF_EGSE_I_xxx is set as selected with this command.
:param i_signal: Selects the signal
:param output: Selects the value (lower 12Bits are used). Value as digital value in the range from 0 (≙ 0V) to 3276 (≙ 4V)
:return:
"""
ok_signal = _hexasciify(i_signal, 1)
output = _hexasciify(output, 4)
params = ''.join([ok_signal, output])
return 'G'.encode('ascii') + params.encode('ascii') + EOP
@staticmethod
def set_pwm(thermistor, value):
"""
The PWM for the OTA thermistor or CDD thermistor is as selected with this command. If no command is sent after a power-up, the PWM_MODE “automatic” is used by default.
:param thermistor: Selects the thermistor
:param value: Sets the current value for the PWM in thousandth (0 ≙ 0‰, 4000 ≙ 1000‰) as 2Byte Unsigned Integer, if mode is manual
:return:
"""
thermistor = _hexasciify(thermistor, 1)
spare = '0'
value = _hexasciify(value, 4)
params = ''.join([thermistor, spare, value])
return 'H'.encode('ascii') + params.encode('ascii') + EOP
# FEE
@staticmethod
def change_fee_report_period(new_period):
"""
The default report period of 1 second can be changed with this command.
:param new_period: Contains the new period in ms as 2Byte Unsigned Int as ASCII coded hexadecimal
:return:
"""
new_period = _hexasciify(new_period, 4)
return 'M'.encode('ascii') + new_period.encode('ascii') + EOP
### EBOX EGSE Interface ###
# FEE Power
@staticmethod
def set_max_loads(ccd_max, an1_max, an2_max, an3_max, clk_max, dig_spw_max, dig_fpga_max):
"""
The maximum loads at the LoadSim can be enabled or disabled with this command. If they are disabled, the nominal loads are active.
:param ccd_max: Enables or disables the maximum load for IWF_EGSE_FEE_CCD
:param an1_max: Enables or disables the maximum load for IWF_EGSE_FEE_AN1
:param an2_max: Enables or disables the maximum load for IWF_EGSE_FEE_AN2
:param an3_max: Enables or disables the maximum load for IWF_EGSE_FEE_AN3
:param clk_max: Enables or disables the maximum load for IWF_EGSE_FEE_CLK
:param dig_spw_max: Enables or disables the maximum load for IWF_EGSE_FEE_DIG_SPW
:param dig_fpga_max: Enables or disables the maximum load for IWF_EGSE_FEE_DIG_FPGA
:return:
"""
spare = '0'
ccd_max = _hexasciify(ccd_max, 1)
an1_max = _hexasciify(an1_max, 1)
an2_max = _hexasciify(an2_max, 1)
an3_max = _hexasciify(an3_max, 1)
clk_max = _hexasciify(clk_max, 1)
dig_spw_max = _hexasciify(dig_spw_max, 1)
dig_fpga_max = _hexasciify(dig_fpga_max, 1)
params = ''.join([spare, ccd_max, an1_max, an2_max, an3_max, clk_max, dig_spw_max, dig_fpga_max])
return 'Q'.encode('ascii') + params.encode('ascii') + EOP
@staticmethod
def change_fee_pwr_report_period(new_period):
"""
The default report period of 1 second can be changed with this command.
:param new_period: Contains the new period in ms as 2Byte Unsigned Int as ASCII coded hexadecimal
:return:
"""
new_period = _hexasciify(new_period, 4)
return 'V'.encode('ascii') + new_period.encode('ascii') + EOP
# RSM
@staticmethod
def change_rsm_report_period(new_period):
"""
The default report period of 1 second can be changed with this command.
:param new_period: Contains the new period in ms as 2Byte Unsigned Int as ASCII coded hexadecimal
:return:
"""
new_period = _hexasciify(new_period, 4)
return 'W'.encode('ascii') + new_period.encode('ascii') + EOP
@staticmethod
def set_rsm_end_switch(open_pos, close_pos):
"""
The end switches (open or close) can be set with this command.
:param open_pos: Enables or disables the signal IWF_EGSE_OPEN_POS
:param close_pos: Enables or disables the signal IWF_EGSE_CLOSE_POS
:return:
"""
open_pos = _hexasciify(open_pos, 1)
close_pos = _hexasciify(close_pos, 1)
params = ''.join([open_pos, close_pos])
return 'R'.encode('ascii') + params.encode('ascii') + EOP
class Response:
pass
def _hexasciify(value, nchars, signed=False):
"""
Returns an int as a hexadecimal string of length *nchars*
:param value:
:param nchars:
:return:
"""
if isinstance(value, int):
if signed:
return value.to_bytes(nchars // 2, 'big', signed=True).hex().upper()
else:
return '{:0{nc}X}'.format(value, nc=nchars)
else:
return value
def response_proc_func(rawdata):
pkts = rawdata.split(EOP)
pkts.remove(b'')
proc_pkts = [(RESPONSE_ID.get(pkt[0:1], 'UKNOWN'), pkt.decode('ascii', errors='replace')) for pkt in pkts]
return proc_pkts
import communication as com
import iwf_egse as iwf
econ = com.Connector('', iwf.PORT, msgdecoding='ascii')
econ.connect()
econ.start_receiver(procfunc=iwf.response_proc_func)
econ.send(iwf.Command.get_status(), rx=False)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment