diff --git a/Ccs/packet_config_CHEOPS.py b/Ccs/packet_config_CHEOPS.py index 6195dd425a38644271ec3bdc4185f3758247f3a0..357c05f53c07e4800cd8b3a219e59e8831dc9fa7 100644 --- a/Ccs/packet_config_CHEOPS.py +++ b/Ccs/packet_config_CHEOPS.py @@ -62,7 +62,7 @@ TC_SECONDARY_HEADER = [ ("SOURCE_ID", ctypes.c_uint8, 8) ] # [Format of time Packet, Amount of Bytes in Time Packet, Factor for Finetime, length of extra sync flag -timepack = [ptt[9][17], 6, 2**15, 0] +timepack = [ptt[9][17], 6, 2 ** 15, 0] CUC_EPOCH = datetime.datetime(2000, 1, 1, 0, 0, 0, 0, tzinfo=datetime.timezone.utc) @@ -86,21 +86,38 @@ def timecal(data, string=False): return coarse + fine -def calc_timestamp(time, sync=0): +def calc_timestamp(time, sync=0, return_bytes=False): if isinstance(time, (float, int)): ctime = int(time) ftime = round(time % 1 * timepack[2]) + if ftime == timepack[2]: + ctime += 1 + ftime = 0 elif isinstance(time, str): - t = float(time[:-1]) + if time[-1].upper() in ['U', 'S']: + t = float(time[:-1]) + else: + t = float(time) ctime = int(t) ftime = round(t % 1 * timepack[2]) + if ftime == timepack[2]: + ctime += 1 + ftime = 0 sync = 1 if time[-1].upper() == 'S' else 0 elif isinstance(time, bytes): ctime = int.from_bytes(time[:4], 'big') - ftime = int.from_bytes(time[-2:], 'big') >> 1 + ftime = int.from_bytes(time[4:6], 'big') >> 1 sync = time[-1] & 1 + else: + raise TypeError('Unsupported input for time ({})'.format(type(time))) - return ctime, ftime, sync + if return_bytes: + if sync is None or not sync: + return ctime.to_bytes(4, 'big') + ((ftime << 1) & 0xFFFE).to_bytes(2, 'big') + else: + return ctime.to_bytes(4, 'big') + (((ftime << 1) & 0xFFFE) + 1).to_bytes(2, 'big') + else: + return ctime, ftime, sync # P_HEADER_LEN = sum([x[2] for x in PRIMARY_HEADER]) // 8 @@ -158,13 +175,13 @@ class TCHeader(ctypes.Union): CUC_OFFSET = TMHeaderBits.CTIME.offset - SPW_PROTOCOL_IDS = { "RMAP": 0x01, "FEEDATA": 0xF0, "CCSDS": 0x02 } + class RawGetterSetter: @property @@ -175,6 +192,7 @@ class RawGetterSetter: def raw(self, rawdata): self.bin[:] = rawdata + # RMAP packet structure definitions RMAP_MAX_PKT_LEN = 2 ** 15 @@ -251,29 +269,21 @@ FEEDATA_TRANSFER_HEADER = [ class RMapCommandHeaderBits(ctypes.BigEndianStructure): - _pack_ = 1 - _fields_ = [(label, ctype, bits) for label, ctype, bits in RMAP_COMMAND_HEADER] - -RMAP_COMMAND_HEADER_LEN = ctypes.sizeof(RMapCommandHeaderBits) # sum([x[2] for x in RMAP_COMMAND_HEADER]) // 8 + def __init__(self): + raise NotImplementedError('Not available in project CHEOPS') class RMapCommandHeader(ctypes.Union, RawGetterSetter): - _pack_ = 1 - _fields_ = [ - ('bits', RMapCommandHeaderBits), - ('bin', ctypes.c_ubyte * RMAP_COMMAND_HEADER_LEN) - ] - def __init__(self, *args, **kw): - super(RMapCommandHeader, self).__init__(*args, **kw) - self.bits.PROTOCOL_ID = SPW_PROTOCOL_IDS["RMAP"] - self.bits.PKT_TYPE = 1 + def __init__(self): + raise NotImplementedError('Not available in project CHEOPS') class RMapReplyWriteHeaderBits(ctypes.BigEndianStructure): - _pack_ = 1 - _fields_ = [(label, ctype, bits) for label, ctype, bits in RMAP_REPLY_WRITE_HEADER] + + def __init__(self): + raise NotImplementedError('Not available in project CHEOPS') RMAP_REPLY_WRITE_HEADER_LEN = ctypes.sizeof( @@ -281,72 +291,36 @@ RMAP_REPLY_WRITE_HEADER_LEN = ctypes.sizeof( class RMapReplyWriteHeader(ctypes.Union, RawGetterSetter): - _pack_ = 1 - _fields_ = [ - ('bits', RMapReplyWriteHeaderBits), - ('bin', ctypes.c_ubyte * RMAP_REPLY_WRITE_HEADER_LEN) - ] - def __init__(self, *args, **kw): - super(RMapReplyWriteHeader, self).__init__(*args, **kw) - self.bits.PROTOCOL_ID = SPW_PROTOCOL_IDS["RMAP"] - self.bits.PKT_TYPE = 0 - self.bits.WRITE = 1 - self.bits.REPLY = 1 + def __init__(self): + raise NotImplementedError('Not available in project CHEOPS') class RMapReplyReadHeaderBits(ctypes.BigEndianStructure): - _pack_ = 1 - _fields_ = [(label, ctype, bits) for label, ctype, bits in RMAP_REPLY_READ_HEADER] + + def __init__(self): + raise NotImplementedError('Not available in project CHEOPS') RMAP_REPLY_READ_HEADER_LEN = ctypes.sizeof(RMapReplyReadHeaderBits) # sum([x[2] for x in RMAP_REPLY_READ_HEADER]) // 8 class RMapReplyReadHeader(ctypes.Union, RawGetterSetter): - _pack_ = 1 - _fields_ = [ - ('bits', RMapReplyReadHeaderBits), - ('bin', ctypes.c_ubyte * RMAP_REPLY_READ_HEADER_LEN) - ] - def __init__(self, *args, **kw): - super(RMapReplyReadHeader, self).__init__(*args, **kw) - self.bits.PROTOCOL_ID = SPW_PROTOCOL_IDS["RMAP"] - self.bits.PKT_TYPE = 0 - self.bits.WRITE = 0 - self.bits.VERIFY = 0 - self.bits.REPLY = 1 + def __init__(self): + raise NotImplementedError('Not available in project CHEOPS') class FeeDataTransferHeaderBits(ctypes.BigEndianStructure): - _pack_ = 1 - _fields_ = [(label, ctype, bits) for label, ctype, bits in FEEDATA_TRANSFER_HEADER] - -FEE_DATA_TRANSFER_HEADER_LEN = ctypes.sizeof( - FeeDataTransferHeaderBits) # sum([x[2] for x in FEEDATA_TRANSFER_HEADER]) // 8 + def __init__(self): + raise NotImplementedError('Not available in project CHEOPS') class FeeDataTransferHeader(ctypes.Union): - _pack_ = 1 - _fields_ = [ - ('bits', FeeDataTransferHeaderBits), - ('bin', ctypes.c_ubyte * FEE_DATA_TRANSFER_HEADER_LEN) - ] - - def __init__(self, *args, **kw): - super(FeeDataTransferHeader, self).__init__() - self.bits.PROTOCOL_ID = SPW_PROTOCOL_IDS["FEEDATA"] - - @property - def raw(self): - return bytes(self.bin) - @property - def comptype(self): - """Composite packet type used in DB storage, consists of sub-parameters""" - return int.from_bytes(self.bin[4:6], 'big') + def __init__(self): + raise NotImplementedError('Not available in project CHEOPS') ########################## @@ -354,177 +328,30 @@ class FeeDataTransferHeader(ctypes.Union): ########################## class RMapCommandWrite(RMapCommandHeader): - """This is intended for building an RMap Write Command""" - - def __init__(self, addr, data, verify=True, reply=True, incr=True, key=SPW_FEE_KEY, - initiator=SPW_DPU_LOGICAL_ADDRESS, tid=1, *args, **kwargs): - super(RMapCommandWrite, self).__init__(*args, **kwargs) - - self.header = self.bits - self.data = data - self.data_crc = rmapcrc(self.data).to_bytes(RMAP_PEC_LEN, 'big') - - self.bits.TARGET_LOGICAL_ADDR = SPW_FEE_LOGICAL_ADDRESS - self.bits.PROTOCOL_ID = SPW_PROTOCOL_IDS['RMAP'] - - self.bits.PKT_TYPE = 1 - self.bits.WRITE = 1 - self.bits.VERIFY = verify - self.bits.REPLY = reply - self.bits.INCREMENT = incr - self.bits.REPLY_ADDR_LEN = 0 - self.bits.KEY = key - - self.bits.INIT_LOGICAL_ADDR = initiator - self.bits.TRANSACTION_ID = tid - self.bits.EXT_ADDR = addr >> 32 - self.bits.ADDR = addr - self.bits.DATA_LEN = len(self.data) - self.bits.HEADER_CRC = rmapcrc(bytes(self.bin[:-1])) - @property - def raw(self): - """Return raw packet with updated CRCs""" - self.bits.HEADER_CRC = rmapcrc(bytes(self.bin[:-1])) - self.data_crc = rmapcrc(self.data).to_bytes(RMAP_PEC_LEN, 'big') - return bytes(self.bin) + self.data + self.data_crc - - @raw.setter - def raw(self, rawdata): - self.bin[:] = rawdata[:RMAP_COMMAND_HEADER_LEN] - self.data = rawdata[RMAP_COMMAND_HEADER_LEN:-RMAP_PEC_LEN] - self.data_crc = rawdata[-RMAP_PEC_LEN:] + def __init__(self): + raise NotImplementedError('Not available in project CHEOPS') class RMapCommandRead(RMapCommandHeader): - """This is intended for building an RMap Read Command""" - def __init__(self, addr, datalen, incr=True, key=SPW_FEE_KEY, initiator=SPW_DPU_LOGICAL_ADDRESS, tid=1, - *args, **kwargs): - super(RMapCommandRead, self).__init__(*args, **kwargs) - - self.header = self.bits - - self.bits.TARGET_LOGICAL_ADDR = SPW_FEE_LOGICAL_ADDRESS - self.bits.PROTOCOL_ID = SPW_PROTOCOL_IDS['RMAP'] - - self.bits.PKT_TYPE = 1 - self.bits.WRITE = 0 - self.bits.VERIFY = 0 - self.bits.REPLY = 1 - self.bits.INCREMENT = incr - self.bits.REPLY_ADDR_LEN = 0 - self.bits.KEY = key - - self.bits.INIT_LOGICAL_ADDR = initiator - self.bits.TRANSACTION_ID = tid - self.bits.EXT_ADDR = addr >> 32 - self.bits.ADDR = addr - self.bits.DATA_LEN = datalen - self.bits.HEADER_CRC = rmapcrc(bytes(self.bin[:-1])) - - @property - def raw(self): - """Return raw packet with updated CRCs""" - self.bits.HEADER_CRC = rmapcrc(bytes(self.bin[:-1])) - return bytes(self.bin) - - @raw.setter - def raw(self, rawdata): - self.bin[:] = rawdata[:RMAP_COMMAND_HEADER_LEN] + def __init__(self): + raise NotImplementedError('Not available in project CHEOPS') class FeeDataTransfer(FeeDataTransferHeader): - """ - Bytes 4 and 5 of the data-packet-header contains additional information about the packet-content. The type-field is defined in the following way: - - bits 15:12 = reserved for future usage - - bits 11:8 = See MSSL-IF-17 - - bit 7 = last packet: 1 = last packet of this type in the current read-out-cycle - - bit 6:5 = CCD side: 0 = left side (side F), 1 = right side (side E), 2 = F&E interleaved - - bit 4 = CCD: 0 = CCD2, 1= CCD4 - - bit 3:2 = reserved - - bits 1:0 = packet type: 0 = data packet, 1 = Event detection packet, 2 = housekeeping packet - """ - modes = {0: "On Mode", - 1: "Frame Transfer Pattern", - 2: "Stand-By-Mode", - 3: "Frame Transfer", - 4: "Full Frame", - 5: "Parallel trap pumping mode 1", - 6: "Parallel trap pumping mode 2", - 7: "Serial trap pumping mode 1", - 8: "Serial trap pumping mode 2"} - ccd_sides = {0: "left side (F)", - 1: "right side (E)", - 2: "F&E interleaved"} - ccds = {0: "CCD2", - 1: "CCD4"} - pkt_types = {0: "Data", - 1: "Event detection", - 2: "Housekeeping"} - - DATA_HK_STRUCT = [] - - def __init__(self, pkt=None): - super(FeeDataTransfer, self).__init__() - - if pkt is not None: - self._raw = pkt - self.bin[:] = self._raw[:FEE_DATA_TRANSFER_HEADER_LEN] - self.data = self._raw[FEE_DATA_TRANSFER_HEADER_LEN:] - - self.set_evt_data() - else: - self._raw = b'' - self.set_evt_data() - - self.set_type_details() - - @property - def raw(self): - return self._raw - - @raw.setter - def raw(self, rawdata): - self.bin[:] = rawdata[:FEE_DATA_TRANSFER_HEADER_LEN] - self.data = rawdata[FEE_DATA_TRANSFER_HEADER_LEN:] - self._raw = rawdata - self.set_type_details() - self.set_evt_data() - - def set_type_details(self): - self.type_details = {"MODE": self.modes[self.bits.MODE] if self.bits.MODE in self.modes else self.bits.MODE, - "LAST_PKT": bool(self.bits.LAST_PKT), - "CCDSIDE": self.ccd_sides[ - self.bits.CCDSIDE] if self.bits.CCDSIDE in self.ccd_sides else self.bits.CCDSIDE, - "CCD": self.ccds[self.bits.CCD] if self.bits.CCD in self.ccds else self.bits.CCD, - "PKT_TYPE": self.pkt_types[ - self.bits.PKT_TYPE] if self.bits.PKT_TYPE in self.pkt_types else self.bits.PKT_TYPE} - - def set_evt_data(self): - if self.bits.PKT_TYPE == 1: - evtdata = EventDetectionData() - evtdata.bin[:] = self.data - self.evt_data = {"COLUMN": evtdata.bits.column, - "ROW": evtdata.bits.row, - "IMAGE": np.array(evtdata.bits.array)[::-1]} # structure according to MSSL-SMILE-SXI-IRD-0001 - else: - self.evt_data = None + def __init__(self): + raise NotImplementedError('Not available in project CHEOPS') class EventDetectionFields(ctypes.BigEndianStructure): - _pack_ = 1 - _fields_ = [ - ("column", ctypes.c_uint16), - ("row", ctypes.c_uint16), - ("array", (ctypes.c_uint16 * 5) * 5) - ] + + def __init__(self): + raise NotImplementedError('Not available in project CHEOPS') class EventDetectionData(ctypes.Union): - _pack_ = 1 - _fields_ = [ - ("bits", EventDetectionFields), - ("bin", ctypes.c_ubyte * ctypes.sizeof(EventDetectionFields)) - ] + + def __init__(self): + raise NotImplementedError('Not available in project CHEOPS') diff --git a/Ccs/packet_config_SMILE.py b/Ccs/packet_config_SMILE.py index 4587729fce88620195e60db15896c1c12e110181..f43de1e432cb0c84dbe0e3cb310877536da73dc7 100644 --- a/Ccs/packet_config_SMILE.py +++ b/Ccs/packet_config_SMILE.py @@ -66,7 +66,7 @@ TC_SECONDARY_HEADER = [ ("SOURCE_ID", ctypes.c_uint8, 8) ] -# [format of time stamp, amount of bytes of time stamp including sync byte(s), fine time resolution, length of sync flag] +# [format of time stamp, amount of bytes of time stamp including sync byte(s), fine time resolution, length of extra sync flag in bytes] timepack = [ptt[9][18], 8, 1e6, 1] CUC_EPOCH = datetime.datetime(2018, 1, 1, 0, 0, 0, 0, tzinfo=datetime.timezone.utc) @@ -106,30 +106,40 @@ def timecal(data, string=False): return coarse + fine -def calc_timestamp(time, sync=0, return_bytes=False): +def calc_timestamp(time, sync=None, return_bytes=False): + if isinstance(time, (float, int)): ctime = int(time) ftime = round(time % 1 * timepack[2]) + if ftime == timepack[2]: + ctime += 1 + ftime = 0 elif isinstance(time, str): - if time[:-1] in ['U', 'S']: + if time[-1].upper() in ['U', 'S']: t = float(time[:-1]) else: t = float(time) ctime = int(t) ftime = round(t % 1 * timepack[2]) + if ftime == timepack[2]: + ctime += 1 + ftime = 0 sync = 0b101 if time[-1].upper() == 'S' else 0 elif isinstance(time, bytes): ctime = int.from_bytes(time[:4], 'big') - ftime = int.from_bytes(time[4:-1], 'big') - sync = time[-1] + ftime = int.from_bytes(time[4:7], 'big') + if len(time) == timepack[1]: + sync = time[-1] + else: + sync = None else: raise TypeError('Unsupported input for time ({})'.format(type(time))) if return_bytes: - if sync is None: + if sync is None or sync is False: return ctime.to_bytes(4, 'big') + ftime.to_bytes(3, 'big') else: return ctime.to_bytes(4, 'big') + ftime.to_bytes(3, 'big') + sync.to_bytes(1, 'big')