From 8b4ded7ff5be7cc50595040a739431f1af5b9582 Mon Sep 17 00:00:00 2001 From: Marko Mecina <marko.mecina@univie.ac.at> Date: Tue, 8 Nov 2022 16:00:46 +0100 Subject: [PATCH] fix handling of RMAP packets over SPW protocol - align RMAP read reply format to FEEsim behaviour (i.e. no data checksum) - avoid concurrent access SQL pool_row handle --- Ccs/pus_datapool.py | 132 ++++++++++++++------------------------------ 1 file changed, 41 insertions(+), 91 deletions(-) diff --git a/Ccs/pus_datapool.py b/Ccs/pus_datapool.py index a2aa5ae..d6ecda9 100644 --- a/Ccs/pus_datapool.py +++ b/Ccs/pus_datapool.py @@ -408,7 +408,7 @@ class DatapoolManager: self.logger.warning('"{}" is not a supported protocol, aborting.'.format(protocol)) return - self.logger.info('Recording from new connection {}:{} to pool "{}" using {} protocol.'.format(host, port, protocol, pool_name)) + self.logger.info('Recording from new connection {}:{} to pool "{}" using {} protocol.'.format(host, port, pool_name, protocol)) new_session = self.session_factory_storage while True: dbrow = new_session.query(DbTelemetryPool).filter(DbTelemetryPool.pool_name == pool_name).first() @@ -999,26 +999,12 @@ class DatapoolManager: else: buf_to_send = buf - self.logger.debug('tc_send: pool_name = {}'.format(pool_name)) - self.logger.debug('tc_send: buf = {}'.format(buf_to_send)) - if pool_name not in self.loaded_pools: self.logger.warning("Cannot add TC to {}. Pool not loaded.".format(pool_name)) return # self.logger.debug('tc_send: tc_connections = {}'.format(self.tc_connections)) - try: - self.tc_connections[pool_name]['socket'].send(buf_to_send) - except Exception as err: - self.logger.error('Failed to send packet of length {} to {} [{}].'.format( - len(buf_to_send), pool_name, self.tc_connections[pool_name]['socket'].getpeername())) - return - - with self.lock: - self.tc_databuflen += len(buf_to_send) - new_session = self.session_factory_storage - pool_row = new_session.query(DbTelemetryPool).filter(DbTelemetryPool.pool_name == pool_name).first() # TC normally just take the information which pool it is from the first Row, But if a Pool is given with only @@ -1043,6 +1029,19 @@ class DatapoolManager: self.state[pool_name] = 1 self.last_commit_time = time.time() + try: + self.tc_connections[pool_name]['socket'].send(buf_to_send) + except Exception as err: + self.logger.error('Failed to send packet of length {} to {} [{}].'.format( + len(buf_to_send), pool_name, self.tc_connections[pool_name]['socket'].getpeername())) + return + + self.logger.debug('tc_send: pool_name = {}'.format(pool_name)) + self.logger.debug('tc_send: buf = {}'.format(buf_to_send)) + + with self.lock: + self.tc_databuflen += len(buf_to_send) + def process_tm(tmd, tm_raw): tm = tmd[0] @@ -1072,9 +1071,20 @@ class DatapoolManager: self.state[pool_row.pool_name] += 1 new_session.commit() - self.decode_tmdump_and_process_packets_internal(buf, process_tm) + if self.tc_connections[pool_name]['protocol'].upper() in ('PUS', 'PLMSIM'): + checkcrc = True + self.decode_tmdump_and_process_packets_internal(buf, process_tm, checkcrc=checkcrc) + + # if TC is not PUS, also handle it properly + elif self.tc_connections[pool_name]['protocol'].upper() == 'SPW': + buf = io.BytesIO(buf) + headers, pkts, _ = self.extract_spw(buf) + self.logger.critical(str(headers)+str(pkts)) + for header, pkt in zip(headers, pkts): + self.process_rmap(header, pkt, pool_name, pool_row=pool_row) + else: + self.logger.warning('Unknown TC protocol, cannot store {} in DB'.format(buf)) new_session.close() - return def crc_check(self, pckt): # return bool(self.crcfunc(pckt)) @@ -1221,6 +1231,7 @@ class DatapoolManager: pckts = self.extract_pus(buf) for pckt in pckts: + # this CRC only works for PUS packets if checkcrc: if self.crc_check(pckt): if self.pecmode == 'warn': @@ -1673,7 +1684,6 @@ class DatapoolManager: return pid, header, buf, pkt_size_stream = self.read_spw_from_socket(sockfd, pkt_size_stream) - if buf is None and pkt_size_stream is not None: self.trashbytes[pool_name] += 1 continue @@ -1752,7 +1762,7 @@ class DatapoolManager: header.bits.PKT_TYPE == 0 and header.bits.WRITE == 1): pktsize = hsize else: - pktsize = hsize + header.bits.DATA_LEN + RMAP_PEC_LEN + pktsize = hsize + header.bits.DATA_LEN# + RMAP_PEC_LEN # TODO: data CRC from FEEsim? while len(buf) < pktsize: d = sockfd.recv(pktsize - len(buf)) @@ -1765,10 +1775,15 @@ class DatapoolManager: return pid, header, buf, pkt_size_stream - def process_rmap(self, header, raw, pool_name, db_insert=True): + def process_rmap(self, header, raw, pool_name, pool_row=None, db_insert=True): + dbsession = self.session_factory_storage pkt = header.bits + if pool_row is not None: + pool_id = pool_row.iid + else: + pool_id = self.pool_rows[pool_name].iid newdbrow = RMapTelemetry( - pool_id=self.pool_rows[pool_name].iid, + pool_id=pool_id, idx=self.state[pool_name], cmd=pkt.PKT_TYPE, write=pkt.WRITE, @@ -1785,11 +1800,12 @@ class DatapoolManager: self.state[pool_name] += 1 return newdbrow - self.session_factory_storage.add(newdbrow) + dbsession.add(newdbrow) self.state[pool_name] += 1 + self.logger.debug('feed: {}'.format(newdbrow.raw.hex())) now = time.time() if (now - self.last_commit_time) > self.commit_interval: - self.session_factory_storage.commit() + dbsession.commit() self.last_commit_time = now def process_feedata(self, header, raw, pool_name): @@ -1827,7 +1843,7 @@ class DatapoolManager: break tla, pid = pkt_size_stream[:2] - if (tla == self.TLA) and (pid in self.PROTOCOL_IDS): + if pid in self.PROTOCOL_IDS: buf = pkt_size_stream else: pkt_size_stream = pkt_size_stream[1:] @@ -1869,7 +1885,7 @@ class DatapoolManager: while len(buf) < pktsize: data = stream.read(pktsize - len(buf)) if not data: - return pckts, pkt_size_stream + return headers, pckts, pkt_size_stream buf += data buf = buf[:pktsize] @@ -2653,69 +2669,3 @@ if __name__ == "__main__": # If no poolmanager with given instance name is found start a new poolmanager if startnew: run() - -''' - # Check if Poolmanager is already running - if cfl.is_open('poolmanager', cfl.communication['poolmanager']): - pmgr = cfl.dbus_connection('poolmanager', cfl.communication['poolmanager']) - gui = pmgr.Variables('gui_running') - running = True - else: - running = False - #try: - # dbus_type = dbus.SessionBus() - # Bus_Name = cfg.get('ccs-dbus_names', 'poolmanager') - # dbus_type.get_object(Bus_Name, '/MessageListener') - # running = True - #except: - # running = False - - # If argument --gui is given and if poolmanager is not running start manager with GUI - if not '--nogui' in sys.argv and not running: - #sys.argv.remove('--gui') - pm = PUSDatapoolManager() - - # pm.connect_to(label='new tm', host='127.0.0.1', port=5570, kind='TM') - # pm.connect_to(label='new tc', host='127.0.0.1', port=5571, kind='TC') - #signal.signal(signal.SIGINT, signal.SIG_DFL) - - Bus_Name = cfg.get('ccs-dbus_names', 'poolmanager') - #DBusGMainLoop(set_as_default=True) - DBus_Basic.MessageListener(pm, Bus_Name, *sys.argv) - - pm.start_gui() - - Gtk.main() - - # If Manager is not running start it without a GUI - elif not running: - Bus_Name = cfg.get('ccs-dbus_names', 'poolmanager') - #DBusGMainLoop(set_as_default=True) - pv = PUSDatapoolManager() - DBus_Basic.MessageListener(pv, Bus_Name, *sys.argv) - - Gtk.main() - - # If Manager is running and argument --background is given do nothing and keep Poolmanager running without a GUI - # Do the same if a GUI is already running (prevents 2 GUIs) - elif running and gui: - #sys.argv.remove('--gui') - pm = PUSDatapoolManager() - - # pm.connect_to(label='new tm', host='127.0.0.1', port=5570, kind='TM') - # pm.connect_to(label='new tc', host='127.0.0.1', port=5571, kind='TC') - #signal.signal(signal.SIGINT, signal.SIG_DFL) - - Bus_Name = cfg.get('ccs-dbus_names', 'poolmanager') - #DBusGMainLoop(set_as_default=True) - DBus_Basic.MessageListener(pm, Bus_Name, *sys.argv) - - pm.start_gui() - - Gtk.main() - - # If Manager is Running and nothing else is given open the GUI - else: - pmgr = cfl.dbus_connection('poolmanager', cfl.communication['poolmanager']) - pmgr.Functions('start_gui') -''' -- GitLab