From 81790f473b29f950f499b3bc6ca4ac417dd4c8da Mon Sep 17 00:00:00 2001 From: Marko Mecina <marko.mecina@univie.ac.at> Date: Tue, 4 Jul 2023 18:23:05 +0200 Subject: [PATCH] add generic socket communication utilities and IWF EGSE command interface library --- Ccs/communication.py | 271 +++++++++++++++++++++++++++ Ccs/iwf_egse.py | 310 +++++++++++++++++++++++++++++++ Ccs/scripts/iwf_egse_template.py | 9 + 3 files changed, 590 insertions(+) create mode 100644 Ccs/communication.py create mode 100644 Ccs/iwf_egse.py create mode 100644 Ccs/scripts/iwf_egse_template.py diff --git a/Ccs/communication.py b/Ccs/communication.py new file mode 100644 index 0000000..c02e20b --- /dev/null +++ b/Ccs/communication.py @@ -0,0 +1,271 @@ +#!/usr/bin/env python3 + +""" +General purpose socket communication utility functions + +""" +import queue +import select +import socket +import threading +import time + + +class Connector: + RECV_NBYTES = 4096 + + def __init__(self, host, port, is_server=False, response_to=2, recv_nbytes_min=0, save_to_file=None, msgdecoding='hex', resp_decoder=None): + + self.sock_timeout = 10 + self.response_to = response_to + self.host = host + self.port = port + self.isserver = is_server + self.recv_nbytes_min = recv_nbytes_min + self.msgdecoding = msgdecoding + self.resp_decoder = resp_decoder + + self.conn = None + self.log = [] + self._storagefd = None + + self.receiver = None + + self._startup(save_to_file) + + def _startup(self, save_to): + + self.setup_port() + if save_to is not None: + self.setup_storage(save_to) + + def setup_storage(self, fname): + self._storagefd = open(fname, 'w') + + def setup_port(self): + + self.sockfd = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.sockfd.settimeout(self.sock_timeout) + + if self.isserver: + self.sockfd.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + self.sockfd.bind((self.host, self.port)) + self.sockfd.listen() + print('Listening for connections on {}:{}'.format(self.host, self.port)) + + def _connect(self): + + if self.isserver: + self.conn, addr = self.sockfd.accept() + print('Got connection on {}:{}'.format(self.host, self.port)) + else: + self.sockfd.connect((self.host, self.port)) + self.conn = self.sockfd + print('Connected to {}:{}'.format(self.host, self.port)) + + self.conn.settimeout(self.response_to) + + def _close(self, servershtdwn): + self.conn.close() + print('Closed connection to {}:{}'.format(self.host, self.port)) + + if servershtdwn: + print('Closing server {}'.format(self.sockfd.getsockname())) + self.sockfd.close() + + def connect(self): + if self.conn is not None and self.conn.fileno() < 0: + self.setup_port() + self._connect() + + def close(self): + self._close(False) + + def close_server(self): + if self.isserver: + self._close(True) + else: + print('Not a server') + + def close_storage(self): + if self._storagefd is None: + print('No file to close') + return + + self._storagefd.close() + self._storagefd = None + + def dump_log(self, fname, hexsep=''): + log = '\n'.join(['{:.3f}\t{}\t{}'.format(t, _msgdecoder(msg, self.msgdecoding, sep=hexsep), _msgdecoder(resp, self.msgdecoding, sep=hexsep)) for (t, msg, resp) in self.log]) + with open(fname, 'w') as fd: + fd.write(log) + + def send(self, msg, rx=True, output=False): + + if hasattr(msg, 'raw'): + msg = msg.raw + + if self.conn is not None: + self.conn.sendall(msg) + t = time.time() + + resp = b'' + if rx: + resp = self._recv_response() + + self.log.append((t, msg, resp)) + + if self._storagefd is not None: + self._storagefd.write('{:.3f}\t{}\t{}\n'.format(t, _msgdecoder(msg, self.msgdecoding), _msgdecoder(resp, self.msgdecoding))) + self._storagefd.flush() + + if output: + print('{:.3f}: SENT {} | RECV: {}'.format(t, _msgdecoder(msg, self.msgdecoding), _msgdecoder(resp, self.msgdecoding))) + + return resp if self.resp_decoder is None else self.resp_decoder(resp) + + else: + print('Not connected!') + + def recv(self, nbytes=None): + + if nbytes is None and self.recv_nbytes_min != 0: + nbytes = self.recv_nbytes_min + elif nbytes is None: + nbytes = self.RECV_NBYTES + + return self.conn.recv(nbytes) + + def _recv_response(self): + + data = b'' + + try: + if self.recv_nbytes_min != 0: + while len(data) < self.recv_nbytes_min: + data += self.conn.recv(self.recv_nbytes_min - len(data)) + else: + data += self.conn.recv(self.RECV_NBYTES) + except Exception as err: + print('No/invalid response ({})'.format(err)) + finally: + return data + + def set_response_to(self, seconds): + self.conn.settimeout(seconds) + self.response_to = seconds + + def start_receiver(self, procfunc=None): + if self.conn is None: + print('No connection') + return + + if self.receiver is None: + self.receiver = Receiver([self.conn], procfunc=procfunc) + self.receiver.start() + else: + print('Receiver already initialised') + + +class Receiver: + + RECV_BYTES = 4096 + SEL_TIMEOUT = 2 + RECV_BUF_SIZE = 1024**3 + + def __init__(self, sockfds, procfunc=None, recv_buf_size=RECV_BUF_SIZE): + + self.sockfds = sockfds + self.recvd_data_buf = queue.Queue(recv_buf_size) + self._procfunc = procfunc + self._recv_thread = None + self._proc_thread = None + self.proc_data = [] + + self._isrunning = False + + def start(self): + if (self._recv_thread is None) or (not self._recv_thread.is_alive()): + self._start_recv() + else: + print('Already running!') + + if self._procfunc is not None: + if (self._proc_thread is None) or (not self._proc_thread.is_alive()): + self._start_processing() + + def stop(self): + self._isrunning = False + + def _start_recv(self): + self._isrunning = True + self._recv_thread = threading.Thread(target=self._recv_worker, name='recv_worker') + self._recv_thread.start() + + def _recv_worker(self): + + for sockfd in self.sockfds: + if sockfd is not None: + print('Receiving from socket {}:{}'.format(*sockfd.getpeername())) + else: + self.sockfds.remove(sockfd) + + while self._isrunning: + try: + rd, wr, er = select.select(self.sockfds, [], self.sockfds, self.SEL_TIMEOUT) + for sock in rd: + self.recvd_data_buf.put((time.time(), sock.recv(self.RECV_BYTES))) + # print(self.recvd_data.get()) + + for sock in er: + print('Error in {}'.format(sock.getpeername())) + self.sockfds.remove(sock) + if not self.sockfds: + self.stop() + + except socket.timeout: + continue + + except (ValueError, OSError): + self.stop() + break + + # for sockfd in self.sockfds: + # print('Stopped receiving from socket {}:{}'.format(*sockfd.getpeername())) + print('Receiving stopped') + + def _start_processing(self): + self._proc_thread = threading.Thread(target=self._proc_worker, name='proc_worker') + self._proc_thread.daemon = True + self._proc_thread.start() + + def _proc_worker(self): + while self._isrunning: + try: + t, data = self.recvd_data_buf.get(timeout=1) + self.proc_data += self._procfunc(data) + except queue.Empty: + continue + except Exception as err: + print(err) + print('Processing stopped') + break + + +def _msgdecoder(msg, fmt, sep=''): + if fmt == 'hex': + return hexify(msg, sep=sep) + elif fmt == 'ascii': + return toascii(msg) + else: + raise NotImplementedError('Unknown decoding style {}'.format(fmt)) + + +def hexify(bs, sep=''): + if bs is None: + bs = b'' + return bs.hex().upper() if sep == '' else bs.hex(sep).upper() + + +def toascii(bs, errors='replace'): + return bs.decode('ascii', errors=errors) diff --git a/Ccs/iwf_egse.py b/Ccs/iwf_egse.py new file mode 100644 index 0000000..de33510 --- /dev/null +++ b/Ccs/iwf_egse.py @@ -0,0 +1,310 @@ +""" +IWF EGSE communication library + +Ref: SMILE-IWF-PL-IF-048 +""" + +PORT = 8089 + +EOP = b'\x0D\x0A' # CR/LF + +ERRORCODE = { + b'\x30': 'Command OK', + b'\x31': 'Parameter 1 NOT OK', + b'\x32': 'Parameter 2 NOT OK', + b'\x33': 'Parameter 3 NOT OK', + b'\x34': 'Parameter 4 NOT OK', + b'\x35': 'Parameter 5 NOT OK', + b'\x36': 'Parameter 6 NOT OK', + b'\x37': 'Command NOT ALLOWED', + b'\x38': 'Command lenght NOT OK', + b'\x39': 'Command UNKNOWN' +} + +RESPONSE_ID = { + b's': ('currentStatus', 10), + b'x': ('execReset', 4), + b'a': ('settledNewDelay', 4), + b'b': ('handledErrorInjection', 4), + b'c': ('settledRSERegValue', 4), + b'd': ('receivedRSEData', 28), + b'e': ('changedPSUReportPeriod', 4), + b'f': ('settledPSUOK', 4), + b'g': ('settledPSUAnalogueValue', 4), + b'h': ('settledPWM', 4), + b'k': ('newPSUStatus', 26), + b'm': ('changedFEEReportPeriod', 4), + b'n': ('newFEEStatus', 13), + b'p': ('newFEEPWRStatus', 11), + b'q': ('settledMaxLoads', 4), + b'r': ('settledRSMEndSwitch', 4), + b't': ('newRSMStatus', 13), + b'u': ('newPSUEBOXStatus', 9), + b'v': ('changedFEEPWRReportPeriod', 4), + b'w': ('changedRSEReportPeriod', 4) +} + + +class Command: + + # GENERAL + @staticmethod + def get_status(): + """ + The general status of the IWF EGSE is requested with this command. + + :return: + """ + + return 'S'.encode('ascii') + EOP + + @staticmethod + def reset(): + """ + The FPGA in the IWF_EGSE is reset. + + :return: + """ + + return 'X'.encode('ascii') + EOP + + ### DPU EGSE Interface ### + # RSE + @staticmethod + def set_new_delay(delay): + """ + RSE Interface + With this command the delay between the received command and the generated response can be changed. (The default value after power-on is 2 baud) + + :param delay: Contains the 2Byte Unsigned-Integer as ASCII coded hexadecimal value to set + :return: + """ + + delay = _hexasciify(delay, 4) + + return 'A'.encode('ascii') + delay.encode('ascii') + EOP + + @staticmethod + def inject_errors(error_type, apply_to, num_errors, error_reg, error_resp, byte_sel): + """ + With this command errors can be injected to the serial communication RSE - DPU (it can be used to abort the error injection, too). + + :param error_type: Selects the error type to inject + :param apply_to: Selects when to apply error injection + :param num_errors: Selects the number of errors to inject. Use '-1' to inject the error endless. Contains the 4Byte Signed-Integer as ASCII coded hexadecimal number. + :param error_reg: Contains the defined address as 1Byte Unsigned-Integer as ASCII coded hexadecimal. Only used, if ApplyTO is 'defined address only' + :param error_resp: Contains the error response as 1Byte Unsigned-Integer as ASCII coded hexadecimal. Only used, if ErrorType is “send an error response” + :param byte_sel: Selects the bytes to inject frame or parity errors + :return: + """ + + error_type = _hexasciify(error_type, 1) + apply_to = _hexasciify(apply_to, 1) + num_errors = _hexasciify(num_errors, 4, signed=True) + error_reg = _hexasciify(error_reg, 2) + error_resp = _hexasciify(error_resp, 2) + byte_sel = _hexasciify(byte_sel, 1) + + params = ''.join([error_type, apply_to, num_errors, error_reg, error_resp, byte_sel]) + + return 'B'.encode('ascii') + params.encode('ascii') + EOP + + @staticmethod + def set_rse_reg_value(register_address, value): + """ + A new value is set to a register address in the myRIO FPGA. + + :param register_address: Contains the 1Byte Unsigned-Integer as ASCII coded hexadecimal address of the register + :param value: Contains the 1Byte Unsigned-Integer as ASCII coded hexadecimal value to set + :return: + """ + + register_address = _hexasciify(register_address, 2) + value = _hexasciify(value, 2) + + params = ''.join([register_address, value]) + + return 'C'.encode('ascii') + params.encode('ascii') + EOP + + # PSU + @staticmethod + def change_psu_report_period(newperiod): + """ + The default report period of 1 second can be changed with this command. + + :param newperiod: Contains the new period in ms as 2Byte Unsigned-Integer as ASCII coded hexadecimal + :return: + """ + + newperiod = _hexasciify(newperiod, 4) + + return 'E'.encode('ascii') + newperiod.encode('ascii') + EOP + + @staticmethod + def set_psu_ok_signal(ok_signal, output): + """ + The output of an IWF_EGSE_xxx_OK signal is set as selected with this command. + + :param ok_signal: Selects the signal + :param output: Selects the value + :return: + """ + + ok_signal = _hexasciify(ok_signal, 1) + output = _hexasciify(output, 1) + + params = ''.join([ok_signal, output]) + + return 'F'.encode('ascii') + params.encode('ascii') + EOP + + @staticmethod + def set_psu_analogue_value(i_signal, output): + """ + The analogue output IWF_EGSE_I_xxx is set as selected with this command. + + :param i_signal: Selects the signal + :param output: Selects the value (lower 12Bits are used). Value as digital value in the range from 0 (≙ 0V) to 3276 (≙ 4V) + :return: + """ + + ok_signal = _hexasciify(i_signal, 1) + output = _hexasciify(output, 4) + + params = ''.join([ok_signal, output]) + + return 'G'.encode('ascii') + params.encode('ascii') + EOP + + @staticmethod + def set_pwm(thermistor, value): + """ + The PWM for the OTA thermistor or CDD thermistor is as selected with this command. If no command is sent after a power-up, the PWM_MODE “automatic” is used by default. + + :param thermistor: Selects the thermistor + :param value: Sets the current value for the PWM in thousandth (0 ≙ 0‰, 4000 ≙ 1000‰) as 2Byte Unsigned Integer, if mode is manual + :return: + """ + + thermistor = _hexasciify(thermistor, 1) + spare = '0' + value = _hexasciify(value, 4) + + params = ''.join([thermistor, spare, value]) + + return 'H'.encode('ascii') + params.encode('ascii') + EOP + + # FEE + @staticmethod + def change_fee_report_period(new_period): + """ + The default report period of 1 second can be changed with this command. + + :param new_period: Contains the new period in ms as 2Byte Unsigned Int as ASCII coded hexadecimal + :return: + """ + + new_period = _hexasciify(new_period, 4) + + return 'M'.encode('ascii') + new_period.encode('ascii') + EOP + + ### EBOX EGSE Interface ### + # FEE Power + @staticmethod + def set_max_loads(ccd_max, an1_max, an2_max, an3_max, clk_max, dig_spw_max, dig_fpga_max): + """ + The maximum loads at the LoadSim can be enabled or disabled with this command. If they are disabled, the nominal loads are active. + + :param ccd_max: Enables or disables the maximum load for IWF_EGSE_FEE_CCD + :param an1_max: Enables or disables the maximum load for IWF_EGSE_FEE_AN1 + :param an2_max: Enables or disables the maximum load for IWF_EGSE_FEE_AN2 + :param an3_max: Enables or disables the maximum load for IWF_EGSE_FEE_AN3 + :param clk_max: Enables or disables the maximum load for IWF_EGSE_FEE_CLK + :param dig_spw_max: Enables or disables the maximum load for IWF_EGSE_FEE_DIG_SPW + :param dig_fpga_max: Enables or disables the maximum load for IWF_EGSE_FEE_DIG_FPGA + :return: + """ + + spare = '0' + + ccd_max = _hexasciify(ccd_max, 1) + an1_max = _hexasciify(an1_max, 1) + an2_max = _hexasciify(an2_max, 1) + an3_max = _hexasciify(an3_max, 1) + clk_max = _hexasciify(clk_max, 1) + dig_spw_max = _hexasciify(dig_spw_max, 1) + dig_fpga_max = _hexasciify(dig_fpga_max, 1) + + params = ''.join([spare, ccd_max, an1_max, an2_max, an3_max, clk_max, dig_spw_max, dig_fpga_max]) + + return 'Q'.encode('ascii') + params.encode('ascii') + EOP + + @staticmethod + def change_fee_pwr_report_period(new_period): + """ + The default report period of 1 second can be changed with this command. + + :param new_period: Contains the new period in ms as 2Byte Unsigned Int as ASCII coded hexadecimal + :return: + """ + + new_period = _hexasciify(new_period, 4) + + return 'V'.encode('ascii') + new_period.encode('ascii') + EOP + + # RSM + @staticmethod + def change_rsm_report_period(new_period): + """ + The default report period of 1 second can be changed with this command. + + :param new_period: Contains the new period in ms as 2Byte Unsigned Int as ASCII coded hexadecimal + :return: + """ + + new_period = _hexasciify(new_period, 4) + + return 'W'.encode('ascii') + new_period.encode('ascii') + EOP + + @staticmethod + def set_rsm_end_switch(open_pos, close_pos): + """ + The end switches (open or close) can be set with this command. + + :param open_pos: Enables or disables the signal IWF_EGSE_OPEN_POS + :param close_pos: Enables or disables the signal IWF_EGSE_CLOSE_POS + :return: + """ + + open_pos = _hexasciify(open_pos, 1) + close_pos = _hexasciify(close_pos, 1) + + params = ''.join([open_pos, close_pos]) + + return 'R'.encode('ascii') + params.encode('ascii') + EOP + + +class Response: + pass + + +def _hexasciify(value, nchars, signed=False): + """ + Returns an int as a hexadecimal string of length *nchars* + + :param value: + :param nchars: + :return: + """ + if isinstance(value, int): + if signed: + return value.to_bytes(nchars // 2, 'big', signed=True).hex().upper() + else: + return '{:0{nc}X}'.format(value, nc=nchars) + else: + return value + + +def response_proc_func(rawdata): + pkts = rawdata.split(EOP) + pkts.remove(b'') + proc_pkts = [(RESPONSE_ID.get(pkt[0:1], 'UKNOWN'), pkt.decode('ascii', errors='replace')) for pkt in pkts] + return proc_pkts diff --git a/Ccs/scripts/iwf_egse_template.py b/Ccs/scripts/iwf_egse_template.py new file mode 100644 index 0000000..a624f27 --- /dev/null +++ b/Ccs/scripts/iwf_egse_template.py @@ -0,0 +1,9 @@ +import communication as com +import iwf_egse as iwf + +econ = com.Connector('', iwf.PORT, msgdecoding='ascii') +econ.connect() + +econ.start_receiver(procfunc=iwf.response_proc_func) + +econ.send(iwf.Command.get_status(), rx=False) -- GitLab