Skip to content
Snippets Groups Projects
Commit 3c343373 authored by Marko Mecina's avatar Marko Mecina
Browse files

compatibility update of CHEOPS packet_config

+ bugfix in timestamp calculation
parent 361a02fa
No related branches found
No related tags found
No related merge requests found
......@@ -86,20 +86,37 @@ def timecal(data, string=False):
return coarse + fine
def calc_timestamp(time, sync=0):
def calc_timestamp(time, sync=0, return_bytes=False):
if isinstance(time, (float, int)):
ctime = int(time)
ftime = round(time % 1 * timepack[2])
if ftime == timepack[2]:
ctime += 1
ftime = 0
elif isinstance(time, str):
if time[-1].upper() in ['U', 'S']:
t = float(time[:-1])
else:
t = float(time)
ctime = int(t)
ftime = round(t % 1 * timepack[2])
if ftime == timepack[2]:
ctime += 1
ftime = 0
sync = 1 if time[-1].upper() == 'S' else 0
elif isinstance(time, bytes):
ctime = int.from_bytes(time[:4], 'big')
ftime = int.from_bytes(time[-2:], 'big') >> 1
ftime = int.from_bytes(time[4:6], 'big') >> 1
sync = time[-1] & 1
else:
raise TypeError('Unsupported input for time ({})'.format(type(time)))
if return_bytes:
if sync is None or not sync:
return ctime.to_bytes(4, 'big') + ((ftime << 1) & 0xFFFE).to_bytes(2, 'big')
else:
return ctime.to_bytes(4, 'big') + (((ftime << 1) & 0xFFFE) + 1).to_bytes(2, 'big')
else:
return ctime, ftime, sync
......@@ -158,13 +175,13 @@ class TCHeader(ctypes.Union):
CUC_OFFSET = TMHeaderBits.CTIME.offset
SPW_PROTOCOL_IDS = {
"RMAP": 0x01,
"FEEDATA": 0xF0,
"CCSDS": 0x02
}
class RawGetterSetter:
@property
......@@ -175,6 +192,7 @@ class RawGetterSetter:
def raw(self, rawdata):
self.bin[:] = rawdata
# RMAP packet structure definitions
RMAP_MAX_PKT_LEN = 2 ** 15
......@@ -251,29 +269,21 @@ FEEDATA_TRANSFER_HEADER = [
class RMapCommandHeaderBits(ctypes.BigEndianStructure):
_pack_ = 1
_fields_ = [(label, ctype, bits) for label, ctype, bits in RMAP_COMMAND_HEADER]
RMAP_COMMAND_HEADER_LEN = ctypes.sizeof(RMapCommandHeaderBits) # sum([x[2] for x in RMAP_COMMAND_HEADER]) // 8
def __init__(self):
raise NotImplementedError('Not available in project CHEOPS')
class RMapCommandHeader(ctypes.Union, RawGetterSetter):
_pack_ = 1
_fields_ = [
('bits', RMapCommandHeaderBits),
('bin', ctypes.c_ubyte * RMAP_COMMAND_HEADER_LEN)
]
def __init__(self, *args, **kw):
super(RMapCommandHeader, self).__init__(*args, **kw)
self.bits.PROTOCOL_ID = SPW_PROTOCOL_IDS["RMAP"]
self.bits.PKT_TYPE = 1
def __init__(self):
raise NotImplementedError('Not available in project CHEOPS')
class RMapReplyWriteHeaderBits(ctypes.BigEndianStructure):
_pack_ = 1
_fields_ = [(label, ctype, bits) for label, ctype, bits in RMAP_REPLY_WRITE_HEADER]
def __init__(self):
raise NotImplementedError('Not available in project CHEOPS')
RMAP_REPLY_WRITE_HEADER_LEN = ctypes.sizeof(
......@@ -281,72 +291,36 @@ RMAP_REPLY_WRITE_HEADER_LEN = ctypes.sizeof(
class RMapReplyWriteHeader(ctypes.Union, RawGetterSetter):
_pack_ = 1
_fields_ = [
('bits', RMapReplyWriteHeaderBits),
('bin', ctypes.c_ubyte * RMAP_REPLY_WRITE_HEADER_LEN)
]
def __init__(self, *args, **kw):
super(RMapReplyWriteHeader, self).__init__(*args, **kw)
self.bits.PROTOCOL_ID = SPW_PROTOCOL_IDS["RMAP"]
self.bits.PKT_TYPE = 0
self.bits.WRITE = 1
self.bits.REPLY = 1
def __init__(self):
raise NotImplementedError('Not available in project CHEOPS')
class RMapReplyReadHeaderBits(ctypes.BigEndianStructure):
_pack_ = 1
_fields_ = [(label, ctype, bits) for label, ctype, bits in RMAP_REPLY_READ_HEADER]
def __init__(self):
raise NotImplementedError('Not available in project CHEOPS')
RMAP_REPLY_READ_HEADER_LEN = ctypes.sizeof(RMapReplyReadHeaderBits) # sum([x[2] for x in RMAP_REPLY_READ_HEADER]) // 8
class RMapReplyReadHeader(ctypes.Union, RawGetterSetter):
_pack_ = 1
_fields_ = [
('bits', RMapReplyReadHeaderBits),
('bin', ctypes.c_ubyte * RMAP_REPLY_READ_HEADER_LEN)
]
def __init__(self, *args, **kw):
super(RMapReplyReadHeader, self).__init__(*args, **kw)
self.bits.PROTOCOL_ID = SPW_PROTOCOL_IDS["RMAP"]
self.bits.PKT_TYPE = 0
self.bits.WRITE = 0
self.bits.VERIFY = 0
self.bits.REPLY = 1
def __init__(self):
raise NotImplementedError('Not available in project CHEOPS')
class FeeDataTransferHeaderBits(ctypes.BigEndianStructure):
_pack_ = 1
_fields_ = [(label, ctype, bits) for label, ctype, bits in FEEDATA_TRANSFER_HEADER]
FEE_DATA_TRANSFER_HEADER_LEN = ctypes.sizeof(
FeeDataTransferHeaderBits) # sum([x[2] for x in FEEDATA_TRANSFER_HEADER]) // 8
def __init__(self):
raise NotImplementedError('Not available in project CHEOPS')
class FeeDataTransferHeader(ctypes.Union):
_pack_ = 1
_fields_ = [
('bits', FeeDataTransferHeaderBits),
('bin', ctypes.c_ubyte * FEE_DATA_TRANSFER_HEADER_LEN)
]
def __init__(self, *args, **kw):
super(FeeDataTransferHeader, self).__init__()
self.bits.PROTOCOL_ID = SPW_PROTOCOL_IDS["FEEDATA"]
@property
def raw(self):
return bytes(self.bin)
@property
def comptype(self):
"""Composite packet type used in DB storage, consists of sub-parameters"""
return int.from_bytes(self.bin[4:6], 'big')
def __init__(self):
raise NotImplementedError('Not available in project CHEOPS')
##########################
......@@ -354,177 +328,30 @@ class FeeDataTransferHeader(ctypes.Union):
##########################
class RMapCommandWrite(RMapCommandHeader):
"""This is intended for building an RMap Write Command"""
def __init__(self, addr, data, verify=True, reply=True, incr=True, key=SPW_FEE_KEY,
initiator=SPW_DPU_LOGICAL_ADDRESS, tid=1, *args, **kwargs):
super(RMapCommandWrite, self).__init__(*args, **kwargs)
self.header = self.bits
self.data = data
self.data_crc = rmapcrc(self.data).to_bytes(RMAP_PEC_LEN, 'big')
self.bits.TARGET_LOGICAL_ADDR = SPW_FEE_LOGICAL_ADDRESS
self.bits.PROTOCOL_ID = SPW_PROTOCOL_IDS['RMAP']
self.bits.PKT_TYPE = 1
self.bits.WRITE = 1
self.bits.VERIFY = verify
self.bits.REPLY = reply
self.bits.INCREMENT = incr
self.bits.REPLY_ADDR_LEN = 0
self.bits.KEY = key
self.bits.INIT_LOGICAL_ADDR = initiator
self.bits.TRANSACTION_ID = tid
self.bits.EXT_ADDR = addr >> 32
self.bits.ADDR = addr
self.bits.DATA_LEN = len(self.data)
self.bits.HEADER_CRC = rmapcrc(bytes(self.bin[:-1]))
@property
def raw(self):
"""Return raw packet with updated CRCs"""
self.bits.HEADER_CRC = rmapcrc(bytes(self.bin[:-1]))
self.data_crc = rmapcrc(self.data).to_bytes(RMAP_PEC_LEN, 'big')
return bytes(self.bin) + self.data + self.data_crc
@raw.setter
def raw(self, rawdata):
self.bin[:] = rawdata[:RMAP_COMMAND_HEADER_LEN]
self.data = rawdata[RMAP_COMMAND_HEADER_LEN:-RMAP_PEC_LEN]
self.data_crc = rawdata[-RMAP_PEC_LEN:]
def __init__(self):
raise NotImplementedError('Not available in project CHEOPS')
class RMapCommandRead(RMapCommandHeader):
"""This is intended for building an RMap Read Command"""
def __init__(self, addr, datalen, incr=True, key=SPW_FEE_KEY, initiator=SPW_DPU_LOGICAL_ADDRESS, tid=1,
*args, **kwargs):
super(RMapCommandRead, self).__init__(*args, **kwargs)
self.header = self.bits
self.bits.TARGET_LOGICAL_ADDR = SPW_FEE_LOGICAL_ADDRESS
self.bits.PROTOCOL_ID = SPW_PROTOCOL_IDS['RMAP']
self.bits.PKT_TYPE = 1
self.bits.WRITE = 0
self.bits.VERIFY = 0
self.bits.REPLY = 1
self.bits.INCREMENT = incr
self.bits.REPLY_ADDR_LEN = 0
self.bits.KEY = key
self.bits.INIT_LOGICAL_ADDR = initiator
self.bits.TRANSACTION_ID = tid
self.bits.EXT_ADDR = addr >> 32
self.bits.ADDR = addr
self.bits.DATA_LEN = datalen
self.bits.HEADER_CRC = rmapcrc(bytes(self.bin[:-1]))
@property
def raw(self):
"""Return raw packet with updated CRCs"""
self.bits.HEADER_CRC = rmapcrc(bytes(self.bin[:-1]))
return bytes(self.bin)
@raw.setter
def raw(self, rawdata):
self.bin[:] = rawdata[:RMAP_COMMAND_HEADER_LEN]
def __init__(self):
raise NotImplementedError('Not available in project CHEOPS')
class FeeDataTransfer(FeeDataTransferHeader):
"""
Bytes 4 and 5 of the data-packet-header contains additional information about the packet-content. The type-field is defined in the following way:
- bits 15:12 = reserved for future usage
- bits 11:8 = See MSSL-IF-17
- bit 7 = last packet: 1 = last packet of this type in the current read-out-cycle
- bit 6:5 = CCD side: 0 = left side (side F), 1 = right side (side E), 2 = F&E interleaved
- bit 4 = CCD: 0 = CCD2, 1= CCD4
- bit 3:2 = reserved
- bits 1:0 = packet type: 0 = data packet, 1 = Event detection packet, 2 = housekeeping packet
"""
modes = {0: "On Mode",
1: "Frame Transfer Pattern",
2: "Stand-By-Mode",
3: "Frame Transfer",
4: "Full Frame",
5: "Parallel trap pumping mode 1",
6: "Parallel trap pumping mode 2",
7: "Serial trap pumping mode 1",
8: "Serial trap pumping mode 2"}
ccd_sides = {0: "left side (F)",
1: "right side (E)",
2: "F&E interleaved"}
ccds = {0: "CCD2",
1: "CCD4"}
pkt_types = {0: "Data",
1: "Event detection",
2: "Housekeeping"}
DATA_HK_STRUCT = []
def __init__(self, pkt=None):
super(FeeDataTransfer, self).__init__()
if pkt is not None:
self._raw = pkt
self.bin[:] = self._raw[:FEE_DATA_TRANSFER_HEADER_LEN]
self.data = self._raw[FEE_DATA_TRANSFER_HEADER_LEN:]
self.set_evt_data()
else:
self._raw = b''
self.set_evt_data()
self.set_type_details()
@property
def raw(self):
return self._raw
@raw.setter
def raw(self, rawdata):
self.bin[:] = rawdata[:FEE_DATA_TRANSFER_HEADER_LEN]
self.data = rawdata[FEE_DATA_TRANSFER_HEADER_LEN:]
self._raw = rawdata
self.set_type_details()
self.set_evt_data()
def set_type_details(self):
self.type_details = {"MODE": self.modes[self.bits.MODE] if self.bits.MODE in self.modes else self.bits.MODE,
"LAST_PKT": bool(self.bits.LAST_PKT),
"CCDSIDE": self.ccd_sides[
self.bits.CCDSIDE] if self.bits.CCDSIDE in self.ccd_sides else self.bits.CCDSIDE,
"CCD": self.ccds[self.bits.CCD] if self.bits.CCD in self.ccds else self.bits.CCD,
"PKT_TYPE": self.pkt_types[
self.bits.PKT_TYPE] if self.bits.PKT_TYPE in self.pkt_types else self.bits.PKT_TYPE}
def set_evt_data(self):
if self.bits.PKT_TYPE == 1:
evtdata = EventDetectionData()
evtdata.bin[:] = self.data
self.evt_data = {"COLUMN": evtdata.bits.column,
"ROW": evtdata.bits.row,
"IMAGE": np.array(evtdata.bits.array)[::-1]} # structure according to MSSL-SMILE-SXI-IRD-0001
else:
self.evt_data = None
def __init__(self):
raise NotImplementedError('Not available in project CHEOPS')
class EventDetectionFields(ctypes.BigEndianStructure):
_pack_ = 1
_fields_ = [
("column", ctypes.c_uint16),
("row", ctypes.c_uint16),
("array", (ctypes.c_uint16 * 5) * 5)
]
def __init__(self):
raise NotImplementedError('Not available in project CHEOPS')
class EventDetectionData(ctypes.Union):
_pack_ = 1
_fields_ = [
("bits", EventDetectionFields),
("bin", ctypes.c_ubyte * ctypes.sizeof(EventDetectionFields))
]
def __init__(self):
raise NotImplementedError('Not available in project CHEOPS')
......@@ -66,7 +66,7 @@ TC_SECONDARY_HEADER = [
("SOURCE_ID", ctypes.c_uint8, 8)
]
# [format of time stamp, amount of bytes of time stamp including sync byte(s), fine time resolution, length of sync flag]
# [format of time stamp, amount of bytes of time stamp including sync byte(s), fine time resolution, length of extra sync flag in bytes]
timepack = [ptt[9][18], 8, 1e6, 1]
CUC_EPOCH = datetime.datetime(2018, 1, 1, 0, 0, 0, 0, tzinfo=datetime.timezone.utc)
......@@ -106,30 +106,40 @@ def timecal(data, string=False):
return coarse + fine
def calc_timestamp(time, sync=0, return_bytes=False):
def calc_timestamp(time, sync=None, return_bytes=False):
if isinstance(time, (float, int)):
ctime = int(time)
ftime = round(time % 1 * timepack[2])
if ftime == timepack[2]:
ctime += 1
ftime = 0
elif isinstance(time, str):
if time[:-1] in ['U', 'S']:
if time[-1].upper() in ['U', 'S']:
t = float(time[:-1])
else:
t = float(time)
ctime = int(t)
ftime = round(t % 1 * timepack[2])
if ftime == timepack[2]:
ctime += 1
ftime = 0
sync = 0b101 if time[-1].upper() == 'S' else 0
elif isinstance(time, bytes):
ctime = int.from_bytes(time[:4], 'big')
ftime = int.from_bytes(time[4:-1], 'big')
ftime = int.from_bytes(time[4:7], 'big')
if len(time) == timepack[1]:
sync = time[-1]
else:
sync = None
else:
raise TypeError('Unsupported input for time ({})'.format(type(time)))
if return_bytes:
if sync is None:
if sync is None or sync is False:
return ctime.to_bytes(4, 'big') + ftime.to_bytes(3, 'big')
else:
return ctime.to_bytes(4, 'big') + ftime.to_bytes(3, 'big') + sync.to_bytes(1, 'big')
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment