Skip to content
Snippets Groups Projects
Commit 82bfeb74 authored by Marko Mecina's avatar Marko Mecina
Browse files

add preliminary WFI FPM telemetry decoding utilities

parent 99a06d92
No related branches found
No related tags found
No related merge requests found
...@@ -227,7 +227,7 @@ class Receiver: ...@@ -227,7 +227,7 @@ class Receiver:
SEL_TIMEOUT = 2 SEL_TIMEOUT = 2
RECV_BUF_SIZE = 1024**3 RECV_BUF_SIZE = 1024**3
def __init__(self, sockfds, procfunc=None, recv_buf_size=RECV_BUF_SIZE, outfile=None, ofmode='w', pkt_parser_func=None): def __init__(self, sockfds, procfunc=None, recv_buf_size=RECV_BUF_SIZE, outfile=None, ofmode='w', pkt_parser_func=None, extend_processed=True):
self.sockfds = sockfds self.sockfds = sockfds
self.recvd_data_buf = queue.Queue(recv_buf_size) self.recvd_data_buf = queue.Queue(recv_buf_size)
...@@ -235,6 +235,7 @@ class Receiver: ...@@ -235,6 +235,7 @@ class Receiver:
self._recv_thread = None self._recv_thread = None
self._proc_thread = None self._proc_thread = None
self.proc_data = [] self.proc_data = []
self.extend_processed = extend_processed
self._pkt_parser_func = pkt_parser_func self._pkt_parser_func = pkt_parser_func
if outfile is not None: if outfile is not None:
...@@ -257,6 +258,12 @@ class Receiver: ...@@ -257,6 +258,12 @@ class Receiver:
def stop(self): def stop(self):
self._isrunning = False self._isrunning = False
def set_procfunc(self, func):
self._procfunc = func
def set_pkt_parser_func(self, func):
self._pkt_parser_func = func
def _start_recv(self): def _start_recv(self):
self._isrunning = True self._isrunning = True
self._recv_thread = threading.Thread(target=self._recv_worker, name='recv_worker') self._recv_thread = threading.Thread(target=self._recv_worker, name='recv_worker')
...@@ -276,7 +283,7 @@ class Receiver: ...@@ -276,7 +283,7 @@ class Receiver:
rd, wr, er = select.select(self.sockfds, [], self.sockfds, self.SEL_TIMEOUT) rd, wr, er = select.select(self.sockfds, [], self.sockfds, self.SEL_TIMEOUT)
for sock in rd: for sock in rd:
if self._pkt_parser_func is not None: if self._pkt_parser_func is not None:
self.recvd_data_buf.put((time.time(), self._pkt_parser_func(sock))) self.recvd_data_buf.put(self._pkt_parser_func(sock))
else: else:
self.recvd_data_buf.put((time.time(), sock.recv(self.RECV_BYTES))) self.recvd_data_buf.put((time.time(), sock.recv(self.RECV_BYTES)))
...@@ -306,16 +313,32 @@ class Receiver: ...@@ -306,16 +313,32 @@ class Receiver:
def _proc_worker(self): def _proc_worker(self):
while self._isrunning: while self._isrunning:
try: try:
t, data = self.recvd_data_buf.get(timeout=1) if self._procfunc is None:
procdata = self._procfunc(data, ts=t) time.sleep(1)
continue
# t, data = self.recvd_data_buf.get(timeout=1)
procdata = self._procfunc(self.recvd_data_buf)
if procdata is not None:
if self.extend_processed:
self.proc_data.extend(procdata)
else:
self.proc_data.append(procdata) self.proc_data.append(procdata)
if self.proc_data_fd is not None: if self.proc_data_fd is not None:
try: try:
if self.proc_data_fd.mode.count('b'): if self.proc_data_fd.mode.count('b'):
self.proc_data_fd.write(procdata) wdata = procdata
else:
wdata = str(procdata)
if self.extend_processed:
for x in wdata:
self.proc_data_fd.write(x)
else: else:
self.proc_data_fd.write(str(procdata)) self.proc_data_fd.write(wdata)
except io.UnsupportedOperation as err: except io.UnsupportedOperation as err:
print(err) print(err)
break break
......
...@@ -8,6 +8,13 @@ Author: Marko Mecina (UVIE) ...@@ -8,6 +8,13 @@ Author: Marko Mecina (UVIE)
""" """
import ctypes import ctypes
import io
import os
import queue
import socket
import numpy as np
from astropy.io import fits
from packet_config_ATHENA import RawGetterSetter from packet_config_ATHENA import RawGetterSetter
...@@ -69,6 +76,7 @@ class CommandBaseStruct(ctypes.BigEndianStructure): ...@@ -69,6 +76,7 @@ class CommandBaseStruct(ctypes.BigEndianStructure):
super(CommandBaseStruct).__init__() super(CommandBaseStruct).__init__()
IF_LEN = 1
CMD_LEN = ctypes.sizeof(CommandBaseStruct) CMD_LEN = ctypes.sizeof(CommandBaseStruct)
ACK_LEN = 3 ACK_LEN = 3
SCI_CMD_LEN = 2 SCI_CMD_LEN = 2
...@@ -170,6 +178,10 @@ class HkPkt: ...@@ -170,6 +178,10 @@ class HkPkt:
# Science interface # Science interface
FRAME_SIZE_X = 512
FRAME_SIZE_Y = 512
EVT_PKT_ELEMENT_LEN = 6 # length of event packet element (timestamp, header, pixel, footer) in bytes EVT_PKT_ELEMENT_LEN = 6 # length of event packet element (timestamp, header, pixel, footer) in bytes
STRUCT_EVT_PIX = { # name, bitsize, bitoffset (LSB) STRUCT_EVT_PIX = { # name, bitsize, bitoffset (LSB)
"FRAME_N": (8, 40), "FRAME_N": (8, 40),
...@@ -289,7 +301,7 @@ class EvtPix(EvtPktBase): ...@@ -289,7 +301,7 @@ class EvtPix(EvtPktBase):
class EvtHeader(EvtPktBase): class EvtHeader(EvtPktBase):
""" """
*EventPacket* header *EventFrame* header
""" """
def __init__(self, raw): def __init__(self, raw):
...@@ -306,7 +318,7 @@ class EvtHeader(EvtPktBase): ...@@ -306,7 +318,7 @@ class EvtHeader(EvtPktBase):
class EvtFooter(EvtPktBase): class EvtFooter(EvtPktBase):
""" """
*EventPacket* footer *EventFrame* footer
""" """
def __init__(self, raw): def __init__(self, raw):
...@@ -319,7 +331,7 @@ class EvtFooter(EvtPktBase): ...@@ -319,7 +331,7 @@ class EvtFooter(EvtPktBase):
class EvtTimestamp(EvtPktBase): class EvtTimestamp(EvtPktBase):
""" """
*EventPacket* time stamp structure *EventFrame* time stamp structure
""" """
def __init__(self, raw): def __init__(self, raw):
...@@ -351,29 +363,74 @@ def _shift_mask(x, bs_os): ...@@ -351,29 +363,74 @@ def _shift_mask(x, bs_os):
return (x >> bs_os[1]) & (2 ** bs_os[0] - 1) return (x >> bs_os[1]) & (2 ** bs_os[0] - 1)
class EventPacket: class EventFrame:
""" """
FP science event packet FP frame containing event data
""" """
def __init__(self, raw, process=True): def __init__(self, raw, process=True, notime=False):
self._raw = raw self._raw = raw
self._len = len(raw) self._len = len(raw)
self.pixels_list_proc = None self.evt_list_proc = None
if notime:
hoff = 1
poff = 2
self.timestamp = None
else:
hoff = 1
poff = 2
self.timestamp = EvtTimestamp(raw[:EVT_PKT_ELEMENT_LEN]) self.timestamp = EvtTimestamp(raw[:EVT_PKT_ELEMENT_LEN])
self.header = EvtHeader(raw[EVT_PKT_ELEMENT_LEN:2 * EVT_PKT_ELEMENT_LEN])
self.header = EvtHeader(raw[hoff * EVT_PKT_ELEMENT_LEN:poff * EVT_PKT_ELEMENT_LEN])
# self._pixels_block = raw[2 * EVT_PKT_ELEMENT_LEN:-EVT_PKT_ELEMENT_LEN] # if footer is present # self._pixels_block = raw[2 * EVT_PKT_ELEMENT_LEN:-EVT_PKT_ELEMENT_LEN] # if footer is present
self._pixels_block = raw[2 * EVT_PKT_ELEMENT_LEN:] self._pixels_block = raw[poff * EVT_PKT_ELEMENT_LEN:]
self.pixels_list = [self._pixels_block[i:i + EVT_PKT_ELEMENT_LEN] for i in self.evt_list = [self._pixels_block[i:i + EVT_PKT_ELEMENT_LEN] for i in
range(0, len(self._pixels_block), EVT_PKT_ELEMENT_LEN)] range(0, len(self._pixels_block), EVT_PKT_ELEMENT_LEN)]
# self.footer = EvtFooter(raw[-EVT_PKT_ELEMENT_LEN:]) # self.footer = EvtFooter(raw[-EVT_PKT_ELEMENT_LEN:])
if process: if process:
self._process_pixels() self._process_evts()
def __str__(self):
return 'EventFrame (ID {}, {} events)'.format(self.frameid, self.nevts)
@property
def nevts(self):
return len(self.evt_list_proc)
@property
def frameid(self):
return self._raw[0]
def get_inv_evts(self):
return [x for x in self.evt_list if x[-1] != 0xFF]
def _process_evts(self):
# only include valid events (LSB == 0xFF)
self.evt_list_proc = [EvtPix(pix) for pix in self.evt_list if pix[-1] == 0xFF]
def as_array(self, nans=False):
# initialise empty array
if nans:
arr = np.zeros((FRAME_SIZE_Y, FRAME_SIZE_X))
arr[:] = np.NAN
else:
arr = np.zeros((FRAME_SIZE_Y, FRAME_SIZE_X), dtype=np.uint16)
def _process_pixels(self): for p in self.evt_list_proc:
self.pixels_list_proc = [EvtPix(pix) for pix in self.pixels_list] arr[p.line_n, p.pixel_n] = p.energy
return arr
def save_fits(self, fname, nans=False, overwrite=False):
hdu = fits.ImageHDU(data=self.as_array(nans=nans))
hdu.header['FRAMENUM'] = self.header.frame_n
hdu.header['NEVTS'] = len(self.evt_list)
hdu.writeto(fname, overwrite=overwrite)
return fname
# delay = 0xXX * sys_clk period # delay = 0xXX * sys_clk period
...@@ -408,42 +465,204 @@ class SciCmd: ...@@ -408,42 +465,204 @@ class SciCmd:
return self._raw[1] return self._raw[1]
def read_de_pkt(pkt, process=True): class FpmPktParser:
"""
Parses telemetry received from FPM
"""
def __init__(self, scibytes, echobytes=None, defaultbytes=1024):
self.scibytes = scibytes
self.echobytes = echobytes
self.defaultbytes = defaultbytes
self.lastpkt = None
def __call__(self, sock):
strict = True
msg = sock.recv(IF_LEN)
if not msg:
return b''
if msg[0] == IfAddr.CMD:
mlen = ACK_LEN - IF_LEN
elif msg[0] == IfAddr.SCI:
mlen = self.scibytes
elif msg[0] == IfAddr.HK:
mlen = ACK_LEN - IF_LEN
# TODO: HK interface
# msg += sock.recv(1)
# mlen = msg[1] # first byte after interface ID specifies HK length?
elif msg[0] == IfAddr.ECHO:
if self.echobytes is None:
print('WARNING: Echo length is not defined, reading {}!'.format(self.defaultbytes))
mlen = self.defaultbytes
strict = False
# raise ValueError('WARNING: Echo length is not defined, reading {}!'.format(self.defaultbytes))
else:
mlen = self.echobytes
else:
print('ERROR: Unknown interface ID {:02X}'.format(msg[0]))
mlen = self.defaultbytes
strict = False
assert isinstance(pkt, (bytes, bytearray)) msg += self.read_socket(sock, mlen, strict=strict)
p = None self.lastpkt = msg
if pkt[0] == IfAddr.HK: return msg
if len(pkt) == ACK_LEN:
p = Ack(pkt) def set_scibytes(self, nbytes):
elif len(pkt) == CMD_LEN: self.scibytes = nbytes
p = CommandBase(pkt)
def set_echobytes(self, nbytes):
self.echobytes = nbytes
def set_defaultbytes(self, nbytes):
self.defaultbytes = nbytes
def read_socket(self, sock, rlen, strict=True):
msg = b''
if strict:
while len(msg) < rlen:
try:
msg += sock.recv(rlen - len(msg))
except socket.timeout:
continue
else: else:
p = HkPkt(pkt) msg += sock.recv(rlen)
elif pkt[0] == IfAddr.CMD:
if len(pkt) == ACK_LEN: return msg
p = Ack(pkt)
elif len(pkt) == CMD_LEN:
p = CommandBase(pkt)
elif pkt[0] == IfAddr.SCI and len(pkt) == SCI_CMD_LEN:
p = SciCmd(pkt)
elif pkt[0] == IfAddr.ECHO:
p = pkt
elif pkt[0] == IfAddr.SCI and len(pkt[1:]) % EVT_PKT_ELEMENT_LEN == 0:
p = EventPacket(pkt[1:], process=process)
if p is None:
raise NotImplementedError('Unknown interface ID {}'.format(pkt[0]))
return p class FpmProcessor:
"""
Processes FPM packets and assembles event frames.
"""
def __init__(self, ashex=True, process=True, notime=False, tofile=None, queue_to=10):
self.ashex = ashex
self.process = process
self.notime = notime
def split_de_dump(data, de_pkt_size): self.cfdata = b''
self.curfrm = -1
self.framecnt = 0
assert isinstance(data, bytes) self.frames = []
self.tofile = tofile
return [data[i:i+de_pkt_size] for i in range(0, len(data), de_pkt_size)] self.queue_to = queue_to
if tofile is not None:
self.tofile = open(tofile, 'wb')
def __call__(self, buf, ts=None):
assert isinstance(buf, queue.Queue)
try:
pkt = buf.get(timeout=self.queue_to)
except queue.Empty:
return
self.frames.clear()
try:
if self.tofile is not None:
self.tofile.write(pkt)
self.tofile.flush()
# process non-SCI data
if pkt[0] != IfAddr.SCI:
return pkt.hex(sep=' ') if self.ashex else pkt
# process data from SCI itnerface
nevts = (len(pkt) - 1) // EVT_PKT_ELEMENT_LEN
buf = io.BytesIO(pkt[1:])
for i in range(nevts):
self.proc(buf)
if self.frames:
return self.frames
except Exception as err:
print(err)
def proc(self, buf, verbose=False):
ed = buf.read(EVT_PKT_ELEMENT_LEN)
if ed[0] == self.curfrm:
self.cfdata += ed
else:
if self.curfrm != -1:
self.frames.append(self.mk_evt_frame(verbose=verbose))
self.cfdata = ed
self.framecnt += 1
self.curfrm = ed[0]
def mk_evt_frame(self, verbose=False):
try:
frame = EventFrame(self.cfdata)
if verbose:
print(frame)
except Exception as err:
print(err)
return self.cfdata
return frame
def close_file(self):
self.tofile.close()
def proc_file(self, fname, npkts_sci=16, binary=False):
assert os.path.isfile(fname)
if binary:
buf = open(fname, 'rb')
else:
f = bytes.fromhex(open(fname, 'r').read())
buf = io.BytesIO(f)
while True:
pt = buf.read(IF_LEN)
if not pt:
print('END OF FILE.', buf.tell(), 'bytes read.')
break
if pt[0] == IfAddr.CMD:
msg = buf.read(ACK_LEN - IF_LEN)
print((pt + msg).hex(sep=' '))
elif pt[0] == IfAddr.SCI:
for i in range(npkts_sci):
self.proc(buf, verbose=True)
else:
print('ERROR', pt, buf.tell())
return self.frames
def reset(self):
self.cfdata = b''
self.curfrm = -1
self.framecnt = 0
self.frames.clear()
def filter_frames(objlist, empty_frames=True):
if not empty_frames:
return [x for x in objlist if isinstance(x, EventFrame)]
else:
return [x for x in objlist if isinstance(x, EventFrame) and x.nevts > 0]
# PCM Registers [TBC] # PCM Registers [TBC]
......
"""
Examples for Athena WFI DE communications
"""
import communication as com
import packet_config_ATHENA_DE as de
# set up socket and connect
decon = com.Connector('', 12345, msgdecoding='hex')
decon.connect()
# example commands
# test echo interface (0x20)
decon.send(b'\x20\xDE\xAD')
# HK interface 0x33
decon.send(de.HkCmdRead(0x1000)) # get PCM MODE register
decon.send(de.HkCmdWrite(0x1000, 0x0001)) # set PCM MODE register
# CMD interface 0x34
decon.send(de.CmdWrite(0x3C00, 0x0001)) # write sequencer register
decon.send(de.CmdWrite(0x3C00, 1), rx=False) # write sequencer register, but don't fetch cmd response from socket
decon.send(de.CmdRead(0x3C00)) # read sequencer register
# SCI interface 0x35
decon.send(de.SciCmd(100)) # set science data output rate
# dump cmd log (decon.log)
logfile = '/path/to/de_cmd.log'
decon.dump_log(logfile)
# automatically log to file
decon.setup_storage(logfile)
# run rx thread on socket, received data is put in recvd_data_buf queue
decon.start_receiver()
decon.receiver.recvd_data_buf
# custom TM processing function; must take bytestring as arg *data*, and timestamp kwarg *ts*
def msg_to_hex_string(data, ts=''):
try:
return '{}: {}\n'.format(ts, data.hex(' ', 1))
except Exception as err:
print(err)
return '# ERROR #\n'
# optionally, add custom TM processing
# this logs the received data hex-formatted in outfile
decon.start_receiver(procfunc=msg_to_hex_string, outfile='/path/to/de_rx.log', ofmode='w')
# processed data is also collected in
decon.receiver.proc_data
#!/usr/bin/env python3
"""
Extract FPM frames from telemetry file and save as FITS.
"""
import os
import sys
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.widgets import Slider, Button
sys.path.append('../../')
import packet_config_ATHENA_DE as de
NPKTS = 16 # number of event packets (6 bytes) per SpW packet
def run(fname, binary=False):
fproc = de.FpmProcessor()
results = fproc.proc_file(fname, npkts_sci=NPKTS, binary=binary)
return results
def save_frames(frames, outfile, nans=False):
from astropy.io import fits
hdl = fits.HDUList()
empty_frames = 0
for frame in frames:
if len(frame.evt_list_proc) == 0:
empty_frames += 1
continue
hdu = fits.ImageHDU(data=frame.as_array(nans=nans))
hdu.header['FRAMENUM'] = frame.header.frame_n
hdu.header['NEVTS'] = len(frame.evt_list_proc)
hdl.append(hdu)
print('Processed {} frames.'.format(len(frames)))
if empty_frames != 0:
print('There were {} frames with no valid event data!'.format(empty_frames))
hdl.writeto(outfile, overwrite=True)
print('Results written to', outfile)
class FrameViewer:
def __init__(self, framelist, show_empty=False, nans=False):
self.update(framelist, show_empty=show_empty, nans=nans)
def update(self, framelist, show_empty=False, nans=False):
if show_empty:
self.frames = framelist
else:
self.frames = [frame for frame in framelist if frame.nevts > 0]
self.data = np.zeros((len(self.frames), de.FRAME_SIZE_Y, de.FRAME_SIZE_X))
self.nans = nans
for i, frame in enumerate(self.frames):
self.data[i, :, :] = frame.as_array(nans=self.nans)
def show(self, idx=0, cmap='inferno', interpolation='none'):
idx = int(idx)
fig, ax = plt.subplots(figsize=(7, 8))
fig.subplots_adjust(bottom=0.25)
im = ax.imshow(self.data[idx, :, :], origin='lower', vmin=np.nanmin(self.data), vmax=np.nanmax(self.data), cmap=cmap, interpolation=interpolation)
ax.set_title('Frame ID {} (#{})'.format(self.frames[idx].frameid, idx))
ax_frm = fig.add_axes([0.15, 0.1, 0.73, 0.03])
# create the sliders
sfrm = Slider(ax_frm, "Frame #", 0, self.data.shape[0] - 1, valinit=idx, valstep=range(self.data.shape[0]), color="tab:blue")
def _update(val):
im.set_data(self.data[val, :, :])
ax.set_title('Frame ID {} (#{})'.format(self.frames[val].frameid, val))
fig.canvas.draw_idle()
sfrm.on_changed(_update)
plt.show()
if __name__ == '__main__':
if len(sys.argv) < 3:
print('USAGE: ./athena_de_frame_proc.py <EVT_FILENAME> <FITS_FILE>')
sys.exit()
if '-b' in sys.argv:
binary = True
sys.argv.remove('-b')
else:
binary = False
fname, fitsfile = sys.argv[1:3]
fitsfile = os.path.abspath(fitsfile)
ow = ''
if os.path.isfile(fitsfile):
while ow.lower() not in ['y', 'n']:
ow = input('File {} already exists, overwrite? (y/n) '.format(fitsfile))
if ow.lower() == 'n':
sys.exit()
frames = run(fname, binary=binary)
save_frames(frames, fitsfile, nans=False)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment