From 82bfeb74a94392e2500b18b64e68b5255051b8b3 Mon Sep 17 00:00:00 2001 From: Marko Mecina <marko.mecina@univie.ac.at> Date: Fri, 3 May 2024 15:40:40 +0200 Subject: [PATCH] add preliminary WFI FPM telemetry decoding utilities --- Ccs/communication.py | 63 ++-- Ccs/packet_config_ATHENA_DE.py | 301 +++++++++++++++--- Ccs/scripts/wfi_de_communication.py | 55 ---- .../dataprocessing/athena_de_frame_proc.py | 125 ++++++++ 4 files changed, 428 insertions(+), 116 deletions(-) delete mode 100644 Ccs/scripts/wfi_de_communication.py create mode 100755 Ccs/tools/dataprocessing/athena_de_frame_proc.py diff --git a/Ccs/communication.py b/Ccs/communication.py index ad4e4cf..024d35e 100644 --- a/Ccs/communication.py +++ b/Ccs/communication.py @@ -227,7 +227,7 @@ class Receiver: SEL_TIMEOUT = 2 RECV_BUF_SIZE = 1024**3 - def __init__(self, sockfds, procfunc=None, recv_buf_size=RECV_BUF_SIZE, outfile=None, ofmode='w', pkt_parser_func=None): + def __init__(self, sockfds, procfunc=None, recv_buf_size=RECV_BUF_SIZE, outfile=None, ofmode='w', pkt_parser_func=None, extend_processed=True): self.sockfds = sockfds self.recvd_data_buf = queue.Queue(recv_buf_size) @@ -235,6 +235,7 @@ class Receiver: self._recv_thread = None self._proc_thread = None self.proc_data = [] + self.extend_processed = extend_processed self._pkt_parser_func = pkt_parser_func if outfile is not None: @@ -257,6 +258,12 @@ class Receiver: def stop(self): self._isrunning = False + def set_procfunc(self, func): + self._procfunc = func + + def set_pkt_parser_func(self, func): + self._pkt_parser_func = func + def _start_recv(self): self._isrunning = True self._recv_thread = threading.Thread(target=self._recv_worker, name='recv_worker') @@ -276,7 +283,7 @@ class Receiver: rd, wr, er = select.select(self.sockfds, [], self.sockfds, self.SEL_TIMEOUT) for sock in rd: if self._pkt_parser_func is not None: - self.recvd_data_buf.put((time.time(), self._pkt_parser_func(sock))) + self.recvd_data_buf.put(self._pkt_parser_func(sock)) else: self.recvd_data_buf.put((time.time(), sock.recv(self.RECV_BYTES))) @@ -306,24 +313,40 @@ class Receiver: def _proc_worker(self): while self._isrunning: try: - t, data = self.recvd_data_buf.get(timeout=1) - procdata = self._procfunc(data, ts=t) - self.proc_data.append(procdata) - - if self.proc_data_fd is not None: - try: - if self.proc_data_fd.mode.count('b'): - self.proc_data_fd.write(procdata) - else: - self.proc_data_fd.write(str(procdata)) - except io.UnsupportedOperation as err: - print(err) - break - except Exception as err: - self.proc_data_fd.write('# {} #\n'.format(err)) - continue - finally: - self.proc_data_fd.flush() + if self._procfunc is None: + time.sleep(1) + continue + + # t, data = self.recvd_data_buf.get(timeout=1) + procdata = self._procfunc(self.recvd_data_buf) + + if procdata is not None: + if self.extend_processed: + self.proc_data.extend(procdata) + else: + self.proc_data.append(procdata) + + if self.proc_data_fd is not None: + try: + if self.proc_data_fd.mode.count('b'): + wdata = procdata + else: + wdata = str(procdata) + + if self.extend_processed: + for x in wdata: + self.proc_data_fd.write(x) + else: + self.proc_data_fd.write(wdata) + + except io.UnsupportedOperation as err: + print(err) + break + except Exception as err: + self.proc_data_fd.write('# {} #\n'.format(err)) + continue + finally: + self.proc_data_fd.flush() except queue.Empty: continue diff --git a/Ccs/packet_config_ATHENA_DE.py b/Ccs/packet_config_ATHENA_DE.py index 65f7bb2..e42e576 100644 --- a/Ccs/packet_config_ATHENA_DE.py +++ b/Ccs/packet_config_ATHENA_DE.py @@ -8,6 +8,13 @@ Author: Marko Mecina (UVIE) """ import ctypes +import io +import os +import queue +import socket + +import numpy as np +from astropy.io import fits from packet_config_ATHENA import RawGetterSetter @@ -69,6 +76,7 @@ class CommandBaseStruct(ctypes.BigEndianStructure): super(CommandBaseStruct).__init__() +IF_LEN = 1 CMD_LEN = ctypes.sizeof(CommandBaseStruct) ACK_LEN = 3 SCI_CMD_LEN = 2 @@ -170,6 +178,10 @@ class HkPkt: # Science interface + +FRAME_SIZE_X = 512 +FRAME_SIZE_Y = 512 + EVT_PKT_ELEMENT_LEN = 6 # length of event packet element (timestamp, header, pixel, footer) in bytes STRUCT_EVT_PIX = { # name, bitsize, bitoffset (LSB) "FRAME_N": (8, 40), @@ -289,7 +301,7 @@ class EvtPix(EvtPktBase): class EvtHeader(EvtPktBase): """ - *EventPacket* header + *EventFrame* header """ def __init__(self, raw): @@ -306,7 +318,7 @@ class EvtHeader(EvtPktBase): class EvtFooter(EvtPktBase): """ - *EventPacket* footer + *EventFrame* footer """ def __init__(self, raw): @@ -319,7 +331,7 @@ class EvtFooter(EvtPktBase): class EvtTimestamp(EvtPktBase): """ - *EventPacket* time stamp structure + *EventFrame* time stamp structure """ def __init__(self, raw): @@ -351,29 +363,74 @@ def _shift_mask(x, bs_os): return (x >> bs_os[1]) & (2 ** bs_os[0] - 1) -class EventPacket: +class EventFrame: """ - FP science event packet + FP frame containing event data """ - def __init__(self, raw, process=True): + def __init__(self, raw, process=True, notime=False): self._raw = raw self._len = len(raw) - self.pixels_list_proc = None + self.evt_list_proc = None + + if notime: + hoff = 1 + poff = 2 + self.timestamp = None + else: + hoff = 1 + poff = 2 + self.timestamp = EvtTimestamp(raw[:EVT_PKT_ELEMENT_LEN]) - self.timestamp = EvtTimestamp(raw[:EVT_PKT_ELEMENT_LEN]) - self.header = EvtHeader(raw[EVT_PKT_ELEMENT_LEN:2 * EVT_PKT_ELEMENT_LEN]) + self.header = EvtHeader(raw[hoff * EVT_PKT_ELEMENT_LEN:poff * EVT_PKT_ELEMENT_LEN]) # self._pixels_block = raw[2 * EVT_PKT_ELEMENT_LEN:-EVT_PKT_ELEMENT_LEN] # if footer is present - self._pixels_block = raw[2 * EVT_PKT_ELEMENT_LEN:] - self.pixels_list = [self._pixels_block[i:i + EVT_PKT_ELEMENT_LEN] for i in - range(0, len(self._pixels_block), EVT_PKT_ELEMENT_LEN)] + self._pixels_block = raw[poff * EVT_PKT_ELEMENT_LEN:] + self.evt_list = [self._pixels_block[i:i + EVT_PKT_ELEMENT_LEN] for i in + range(0, len(self._pixels_block), EVT_PKT_ELEMENT_LEN)] # self.footer = EvtFooter(raw[-EVT_PKT_ELEMENT_LEN:]) if process: - self._process_pixels() + self._process_evts() + + def __str__(self): + return 'EventFrame (ID {}, {} events)'.format(self.frameid, self.nevts) + + @property + def nevts(self): + return len(self.evt_list_proc) + + @property + def frameid(self): + return self._raw[0] + + def get_inv_evts(self): + return [x for x in self.evt_list if x[-1] != 0xFF] + + def _process_evts(self): + # only include valid events (LSB == 0xFF) + self.evt_list_proc = [EvtPix(pix) for pix in self.evt_list if pix[-1] == 0xFF] + + def as_array(self, nans=False): + + # initialise empty array + if nans: + arr = np.zeros((FRAME_SIZE_Y, FRAME_SIZE_X)) + arr[:] = np.NAN + else: + arr = np.zeros((FRAME_SIZE_Y, FRAME_SIZE_X), dtype=np.uint16) + + for p in self.evt_list_proc: + arr[p.line_n, p.pixel_n] = p.energy + + return arr + + def save_fits(self, fname, nans=False, overwrite=False): + hdu = fits.ImageHDU(data=self.as_array(nans=nans)) + hdu.header['FRAMENUM'] = self.header.frame_n + hdu.header['NEVTS'] = len(self.evt_list) - def _process_pixels(self): - self.pixels_list_proc = [EvtPix(pix) for pix in self.pixels_list] + hdu.writeto(fname, overwrite=overwrite) + return fname # delay = 0xXX * sys_clk period @@ -408,42 +465,204 @@ class SciCmd: return self._raw[1] -def read_de_pkt(pkt, process=True): +class FpmPktParser: + """ + Parses telemetry received from FPM + """ + + def __init__(self, scibytes, echobytes=None, defaultbytes=1024): + + self.scibytes = scibytes + self.echobytes = echobytes + self.defaultbytes = defaultbytes - assert isinstance(pkt, (bytes, bytearray)) + self.lastpkt = None - p = None + def __call__(self, sock): + + strict = True + msg = sock.recv(IF_LEN) + + if not msg: + return b'' + + if msg[0] == IfAddr.CMD: + mlen = ACK_LEN - IF_LEN + elif msg[0] == IfAddr.SCI: + mlen = self.scibytes + elif msg[0] == IfAddr.HK: + mlen = ACK_LEN - IF_LEN + # TODO: HK interface + # msg += sock.recv(1) + # mlen = msg[1] # first byte after interface ID specifies HK length? + elif msg[0] == IfAddr.ECHO: + if self.echobytes is None: + print('WARNING: Echo length is not defined, reading {}!'.format(self.defaultbytes)) + mlen = self.defaultbytes + strict = False + # raise ValueError('WARNING: Echo length is not defined, reading {}!'.format(self.defaultbytes)) + else: + mlen = self.echobytes - if pkt[0] == IfAddr.HK: - if len(pkt) == ACK_LEN: - p = Ack(pkt) - elif len(pkt) == CMD_LEN: - p = CommandBase(pkt) else: - p = HkPkt(pkt) - elif pkt[0] == IfAddr.CMD: - if len(pkt) == ACK_LEN: - p = Ack(pkt) - elif len(pkt) == CMD_LEN: - p = CommandBase(pkt) - elif pkt[0] == IfAddr.SCI and len(pkt) == SCI_CMD_LEN: - p = SciCmd(pkt) - elif pkt[0] == IfAddr.ECHO: - p = pkt - elif pkt[0] == IfAddr.SCI and len(pkt[1:]) % EVT_PKT_ELEMENT_LEN == 0: - p = EventPacket(pkt[1:], process=process) + print('ERROR: Unknown interface ID {:02X}'.format(msg[0])) + mlen = self.defaultbytes + strict = False + + msg += self.read_socket(sock, mlen, strict=strict) - if p is None: - raise NotImplementedError('Unknown interface ID {}'.format(pkt[0])) + self.lastpkt = msg - return p + return msg + def set_scibytes(self, nbytes): + self.scibytes = nbytes -def split_de_dump(data, de_pkt_size): + def set_echobytes(self, nbytes): + self.echobytes = nbytes + + def set_defaultbytes(self, nbytes): + self.defaultbytes = nbytes + + def read_socket(self, sock, rlen, strict=True): + msg = b'' + + if strict: + while len(msg) < rlen: + try: + msg += sock.recv(rlen - len(msg)) + except socket.timeout: + continue + else: + msg += sock.recv(rlen) - assert isinstance(data, bytes) + return msg - return [data[i:i+de_pkt_size] for i in range(0, len(data), de_pkt_size)] + +class FpmProcessor: + """ + Processes FPM packets and assembles event frames. + """ + + def __init__(self, ashex=True, process=True, notime=False, tofile=None, queue_to=10): + self.ashex = ashex + self.process = process + self.notime = notime + + self.cfdata = b'' + self.curfrm = -1 + self.framecnt = 0 + + self.frames = [] + self.tofile = tofile + + self.queue_to = queue_to + + if tofile is not None: + self.tofile = open(tofile, 'wb') + + def __call__(self, buf, ts=None): + + assert isinstance(buf, queue.Queue) + + try: + pkt = buf.get(timeout=self.queue_to) + except queue.Empty: + return + + self.frames.clear() + + try: + if self.tofile is not None: + self.tofile.write(pkt) + self.tofile.flush() + + # process non-SCI data + if pkt[0] != IfAddr.SCI: + return pkt.hex(sep=' ') if self.ashex else pkt + + # process data from SCI itnerface + nevts = (len(pkt) - 1) // EVT_PKT_ELEMENT_LEN + buf = io.BytesIO(pkt[1:]) + for i in range(nevts): + self.proc(buf) + + if self.frames: + return self.frames + + except Exception as err: + print(err) + + def proc(self, buf, verbose=False): + + ed = buf.read(EVT_PKT_ELEMENT_LEN) + + if ed[0] == self.curfrm: + self.cfdata += ed + + else: + + if self.curfrm != -1: + self.frames.append(self.mk_evt_frame(verbose=verbose)) + + self.cfdata = ed + self.framecnt += 1 + self.curfrm = ed[0] + + def mk_evt_frame(self, verbose=False): + try: + frame = EventFrame(self.cfdata) + if verbose: + print(frame) + except Exception as err: + print(err) + return self.cfdata + + return frame + + def close_file(self): + self.tofile.close() + + def proc_file(self, fname, npkts_sci=16, binary=False): + + assert os.path.isfile(fname) + + if binary: + buf = open(fname, 'rb') + else: + f = bytes.fromhex(open(fname, 'r').read()) + buf = io.BytesIO(f) + + while True: + pt = buf.read(IF_LEN) + if not pt: + print('END OF FILE.', buf.tell(), 'bytes read.') + break + if pt[0] == IfAddr.CMD: + msg = buf.read(ACK_LEN - IF_LEN) + print((pt + msg).hex(sep=' ')) + elif pt[0] == IfAddr.SCI: + for i in range(npkts_sci): + self.proc(buf, verbose=True) + else: + print('ERROR', pt, buf.tell()) + + return self.frames + + def reset(self): + self.cfdata = b'' + self.curfrm = -1 + self.framecnt = 0 + + self.frames.clear() + + +def filter_frames(objlist, empty_frames=True): + + if not empty_frames: + return [x for x in objlist if isinstance(x, EventFrame)] + else: + return [x for x in objlist if isinstance(x, EventFrame) and x.nevts > 0] # PCM Registers [TBC] diff --git a/Ccs/scripts/wfi_de_communication.py b/Ccs/scripts/wfi_de_communication.py deleted file mode 100644 index e556ac4..0000000 --- a/Ccs/scripts/wfi_de_communication.py +++ /dev/null @@ -1,55 +0,0 @@ -""" -Examples for Athena WFI DE communications -""" - -import communication as com -import packet_config_ATHENA_DE as de - -# set up socket and connect -decon = com.Connector('', 12345, msgdecoding='hex') -decon.connect() - -# example commands -# test echo interface (0x20) -decon.send(b'\x20\xDE\xAD') - -# HK interface 0x33 -decon.send(de.HkCmdRead(0x1000)) # get PCM MODE register -decon.send(de.HkCmdWrite(0x1000, 0x0001)) # set PCM MODE register - -# CMD interface 0x34 -decon.send(de.CmdWrite(0x3C00, 0x0001)) # write sequencer register -decon.send(de.CmdWrite(0x3C00, 1), rx=False) # write sequencer register, but don't fetch cmd response from socket -decon.send(de.CmdRead(0x3C00)) # read sequencer register - -# SCI interface 0x35 -decon.send(de.SciCmd(100)) # set science data output rate - -# dump cmd log (decon.log) -logfile = '/path/to/de_cmd.log' -decon.dump_log(logfile) - -# automatically log to file -decon.setup_storage(logfile) - -# run rx thread on socket, received data is put in recvd_data_buf queue -decon.start_receiver() -decon.receiver.recvd_data_buf - - -# custom TM processing function; must take bytestring as arg *data*, and timestamp kwarg *ts* -def msg_to_hex_string(data, ts=''): - try: - return '{}: {}\n'.format(ts, data.hex(' ', 1)) - except Exception as err: - print(err) - return '# ERROR #\n' - - -# optionally, add custom TM processing -# this logs the received data hex-formatted in outfile -decon.start_receiver(procfunc=msg_to_hex_string, outfile='/path/to/de_rx.log', ofmode='w') - -# processed data is also collected in -decon.receiver.proc_data - diff --git a/Ccs/tools/dataprocessing/athena_de_frame_proc.py b/Ccs/tools/dataprocessing/athena_de_frame_proc.py new file mode 100755 index 0000000..c629e1b --- /dev/null +++ b/Ccs/tools/dataprocessing/athena_de_frame_proc.py @@ -0,0 +1,125 @@ +#!/usr/bin/env python3 + +""" +Extract FPM frames from telemetry file and save as FITS. + +""" + +import os +import sys + +import numpy as np +import matplotlib.pyplot as plt +from matplotlib.widgets import Slider, Button + +sys.path.append('../../') +import packet_config_ATHENA_DE as de + + +NPKTS = 16 # number of event packets (6 bytes) per SpW packet + + +def run(fname, binary=False): + + fproc = de.FpmProcessor() + + results = fproc.proc_file(fname, npkts_sci=NPKTS, binary=binary) + + return results + + +def save_frames(frames, outfile, nans=False): + + from astropy.io import fits + + hdl = fits.HDUList() + empty_frames = 0 + + for frame in frames: + + if len(frame.evt_list_proc) == 0: + empty_frames += 1 + continue + + hdu = fits.ImageHDU(data=frame.as_array(nans=nans)) + hdu.header['FRAMENUM'] = frame.header.frame_n + hdu.header['NEVTS'] = len(frame.evt_list_proc) + + hdl.append(hdu) + + print('Processed {} frames.'.format(len(frames))) + + if empty_frames != 0: + print('There were {} frames with no valid event data!'.format(empty_frames)) + + hdl.writeto(outfile, overwrite=True) + print('Results written to', outfile) + + +class FrameViewer: + + def __init__(self, framelist, show_empty=False, nans=False): + + self.update(framelist, show_empty=show_empty, nans=nans) + + def update(self, framelist, show_empty=False, nans=False): + + if show_empty: + self.frames = framelist + else: + self.frames = [frame for frame in framelist if frame.nevts > 0] + + self.data = np.zeros((len(self.frames), de.FRAME_SIZE_Y, de.FRAME_SIZE_X)) + self.nans = nans + + for i, frame in enumerate(self.frames): + self.data[i, :, :] = frame.as_array(nans=self.nans) + + def show(self, idx=0, cmap='inferno', interpolation='none'): + + idx = int(idx) + + fig, ax = plt.subplots(figsize=(7, 8)) + fig.subplots_adjust(bottom=0.25) + im = ax.imshow(self.data[idx, :, :], origin='lower', vmin=np.nanmin(self.data), vmax=np.nanmax(self.data), cmap=cmap, interpolation=interpolation) + ax.set_title('Frame ID {} (#{})'.format(self.frames[idx].frameid, idx)) + + ax_frm = fig.add_axes([0.15, 0.1, 0.73, 0.03]) + + # create the sliders + sfrm = Slider(ax_frm, "Frame #", 0, self.data.shape[0] - 1, valinit=idx, valstep=range(self.data.shape[0]), color="tab:blue") + + def _update(val): + im.set_data(self.data[val, :, :]) + ax.set_title('Frame ID {} (#{})'.format(self.frames[val].frameid, val)) + fig.canvas.draw_idle() + + sfrm.on_changed(_update) + + plt.show() + + +if __name__ == '__main__': + if len(sys.argv) < 3: + print('USAGE: ./athena_de_frame_proc.py <EVT_FILENAME> <FITS_FILE>') + sys.exit() + + if '-b' in sys.argv: + binary = True + sys.argv.remove('-b') + else: + binary = False + + fname, fitsfile = sys.argv[1:3] + fitsfile = os.path.abspath(fitsfile) + + ow = '' + if os.path.isfile(fitsfile): + while ow.lower() not in ['y', 'n']: + ow = input('File {} already exists, overwrite? (y/n) '.format(fitsfile)) + + if ow.lower() == 'n': + sys.exit() + + frames = run(fname, binary=binary) + save_frames(frames, fitsfile, nans=False) -- GitLab