diff --git a/Ccs/DBus_Basic.py b/Ccs/DBus_Basic.py index 9be63bb0279f785e8fcfef35ca6921943fcbd01b..cc8dd3b521be317ddabe8a7e6ce2f55f07c402cf 100644 --- a/Ccs/DBus_Basic.py +++ b/Ccs/DBus_Basic.py @@ -54,7 +54,7 @@ class MessageListener(dbus.service.Object): try: self.win.set_title(str(project) + ': ' + str(self.win.get_title()) + ' ' + str(counting - 1)) except Exception as err: - print(err) + # print(err) # Looks like an odd title name but is reshaped in pus_datapool.py self.win.windowname = str(project) + ': @ ' + str(counting - 1) diff --git a/Ccs/ccs_function_lib.py b/Ccs/ccs_function_lib.py index dabef7ef2414488f784560b97aa1ac805b7749d9..681f094df31dddbf7fa46bfcceb4ebaffbc025e6 100644 --- a/Ccs/ccs_function_lib.py +++ b/Ccs/ccs_function_lib.py @@ -330,7 +330,6 @@ def dbus_connection(name, instance=1): dbuscon = dbus_type.get_object(Bus_Name, '/MessageListener') return dbuscon except: - # print('Connection to ' + str(name) + ' is not possible') # print('Please start ' + str(name) + ' if it is not running') logger.warning('Connection to ' + str(name) + ' is not possible.') return False @@ -1683,9 +1682,7 @@ def unpack_bytes(data, fmt, offbi=0): x = struct.unpack('>' + fmt, data)[0] return x ''' -def set_packet_list(packets): - packet_selection = packets - return packets + def show_extracted_packet(): """ @@ -1699,10 +1696,12 @@ def show_extracted_packet(): return eval(pv.Functions('selected_packet')) + def packet_selection(): """Alias for show_extracted_packet call""" return show_extracted_packet() + def get_module_handle(module_name, instance=1, timeout=5): """ Try getting the DBUS proxy object for the module_name module for timeout seconds. @@ -1722,19 +1721,21 @@ def get_module_handle(module_name, instance=1, timeout=5): if module: break else: - time.sleep(0.2) - except dbus.DBusException: + time.sleep(1.) + except dbus.DBusException as err: + logger.warning(err) module = False - time.sleep(0.2) + time.sleep(0.5) if module: return module else: - raise ValueError('No running {} instance found'.format(module_name.upper())) - return + logger.error('No running {} instance found'.format(module_name.upper())) + return False -def connect(pool_name, host, port, protocol='PUS'): #, return_socket=False, is_server=False, timeout=10, delete_abandoned=False, try_delete=True, pckt_filter=None, options='', drop_rx=False, drop_tx=False): +def connect(pool_name, host, port, protocol='PUS', is_server=False, timeout=10, delete_abandoned=False, try_delete=True, + pckt_filter=None, options='', drop_rx=False, drop_tx=False): """ Accessibility function for 'connect' in pus_datapool :param pool_name: @@ -1754,20 +1755,28 @@ def connect(pool_name, host, port, protocol='PUS'): #, return_socket=False, is_ """ pmgr = get_module_handle('poolmanager') - # None cannot be passed over DBUS - # if pckt_filter is None: - # pckt_filter = False + if not pmgr: + return + + kwarguments = str({'protocol': protocol, + 'is_server': is_server, + 'timeout': timeout, + 'delete_abandoned': delete_abandoned, + 'try_delete': try_delete, + 'pckt_filter': pckt_filter, + 'options': options, + 'drop_rx': drop_rx, + 'drop_tx': drop_tx}) # kwarguments = {'return_socket': return_socket, 'is_server': is_server, 'timeout': timeout, # 'delete_abandoned': delete_abandoned, 'try_delete': try_delete, 'pckt_filter': pckt_filter, # 'options': options, 'drop_rx': drop_rx, 'drop_tx': drop_tx, 'protocol': protocol} - #TODO: passing the whole kwarg dict over DBUS does not work yet - pmgr.Functions('connect', pool_name, host, port, {'kwargs': dbus.Dictionary({'protocol': protocol})}) - return + pmgr.Functions('connect', pool_name, host, port, {'kwargs': dbus.Dictionary({'options': kwarguments, + 'override_with_options': '1'})}) -def connect_tc(pool_name, host, port, protocol='PUS'): #, drop_rx=True, timeout=10, is_server=False, options=''): +def connect_tc(pool_name, host, port, protocol='PUS', drop_rx=True, timeout=10, is_server=False, options=''): """ Accessibility function for 'connect_tc' in pus_datapool :param pool_name: @@ -1782,11 +1791,17 @@ def connect_tc(pool_name, host, port, protocol='PUS'): #, drop_rx=True, timeout """ pmgr = get_module_handle('poolmanager') - # TODO: passing the whole kwarg dict over DBUS does not work yet - # kwarguments = {'is_server': is_server, 'timeout': timeout, 'options': options, 'drop_rx': drop_rx, 'protocol': protocol} + if not pmgr: + return - pmgr.Functions('connect_tc', pool_name, host, port, {'kwargs': dbus.Dictionary({'protocol': protocol})}) - return + kwarguments = str({'protocol': protocol, + 'is_server': is_server, + 'timeout': timeout, + 'options': options, + 'drop_rx': drop_rx}) + + pmgr.Functions('connect_tc', pool_name, host, port, {'kwargs': dbus.Dictionary({'options': kwarguments, + 'override_with_options': '1'})}) ## @@ -2457,6 +2472,18 @@ def get_last_pckt_time(pool_name='LIVE', string=True): return cuc +def _has_tc_connection(pool_name, pmgr_handle): + try: + if not pmgr_handle.Functions('_is_tc_connection_active', pool_name): + logger.error('"{}" is not connected to any TC socket!'.format(pool_name)) + return False + else: + return True + except Exception as err: + logger.error(err) + return False + + def Tcsend_bytes(tc_bytes, pool_name='LIVE'): pmgr = dbus_connection('poolmanager', communication['poolmanager']) @@ -2465,10 +2492,7 @@ def Tcsend_bytes(tc_bytes, pool_name='LIVE'): return False # check if pool is connected - try: - pmgr.Dictionaries('tc_connections', pool_name) - except (dbus.DBusException, KeyError): - logger.error('"{}" has no TC connection!'.format(pool_name)) + if not _has_tc_connection(pool_name, pmgr): return False # Tell dbus with signature = that you send a byte array (ay), otherwise does not allow null bytes @@ -2489,18 +2513,20 @@ def Tcsend_bytes(tc_bytes, pool_name='LIVE'): # @param pool_name Name of the pool bound to the socket for CnC/TC communication # @param cmd Command string to be sent to C&C socket def CnCsend(cmd, pool_name=None): - global counters # One can only Change variable as global since we are static + global counters # One can only Change variable as global since we are static pmgr = dbus_connection('poolmanager', communication['poolmanager']) if pool_name is None: pool_name = pmgr.Variables('tc_name') + packed_data = CnCpack(data=cmd, sc=counters.setdefault(1804, 1)) + + logger.info('[CNC sent:]' + str(packed_data)) received = pmgr.Functions('socket_send_packed_data', packed_data, pool_name, signature='says') if received is not None: counters[1804] += 1 - received = bytes(received) # convert dbus type to python type - # print('[CNC Response:]' + str(received)) - logger.info('[CNC Response:]' + str(received)) + received = bytes(received) # convert dbus type to python type + logger.info('[CNC response:]' + str(received)) return received @@ -2538,8 +2564,12 @@ def CnCpack(data=b'', version=0b011, typ=1, dhead=0, pid=112, cat=12, gflags=0b1 # @param pool_name Name of pool bound to Python socket for CnC/TC communication def Datasend(data, pool_name): pmgr = dbus_connection('poolmanager', communication['poolmanager']) - pmgr.Functions('tc_send', pool_name, data) - return + + if not pmgr: + return + + if _has_tc_connection(pool_name, pmgr): + pmgr.Functions('tc_send', pool_name, data) ## @@ -2625,7 +2655,6 @@ def tc_load_to_memory(data, memid, mempos, slicesize=1000, sleep=0., ack=None, p slices = [data[i:i + slicesize] for i in range(0, len(data), slicesize)] if slicesize > 1000: - # print('SLICESIZE > 1000 bytes, this is not gonna work!') logger.warning('SLICESIZE > 1000 bytes, this is not gonna work!') slicount = 1 @@ -2633,7 +2662,7 @@ def tc_load_to_memory(data, memid, mempos, slicesize=1000, sleep=0., ack=None, p t1 = time.time() parts = struct.unpack(len(sli) * 'B', sli) Tcsend_DB(cmd, memid, mempos, len(parts), *parts, ack=ack, pool_name=pool_name) - sys.stdout.write('%i / %i packets sentto {}\r'.format(slicount, len(slices), memid)) + sys.stdout.write('%i / %i packets sent to {}\r'.format(slicount, len(slices), memid)) logger.info('%i / %i packets sent to {}'.format(slicount, len(slices), memid)) slicount += 1 dt = time.time() - t1 diff --git a/Ccs/pus_datapool.py b/Ccs/pus_datapool.py index fb3b997d7e410e77a8cf01c6ec09aedc80324310..d73026ec811832e3a88950f98c49e2b8ae03ae8a 100644 --- a/Ccs/pus_datapool.py +++ b/Ccs/pus_datapool.py @@ -341,8 +341,24 @@ class DatapoolManager: self.loaded_pools[pool_name] = ActivePoolInfo(pool_name, timestamp, pool_name, True) self.logger.info('Resuming recording from {}:{}'.format(*sockfd.getpeername())) - def connect(self, pool_name, host, port, return_socket=False, is_server=False, timeout=10, delete_abandoned=False, - try_delete=True, pckt_filter=None, options='', drop_rx=False, drop_tx=False, protocol='PUS'): + def connect(self, pool_name, host, port, protocol='PUS', is_server=False, timeout=10, delete_abandoned=False, + try_delete=True, pckt_filter=None, options='', drop_rx=False, drop_tx=False, return_socket=False, + override_with_options=False): + + # override variables that are set in the options string + if bool(override_with_options): + self.logger.debug('Overriding kwargs with values from options string.') + override = eval(options) + protocol = override.get('protocol', protocol) + is_server = override.get('is_server', is_server) + timeout = override.get('timeout', timeout) + delete_abandoned = override.get('delete_abandoned', delete_abandoned) + try_delete = override.get('try_delete', try_delete) + pckt_filter = override.get('pckt_filter', pckt_filter) + drop_rx = override.get('drop_rx', drop_rx) + drop_tx = override.get('drop_tx', drop_tx) + return_socket = override.get('return_socket', return_socket) + # options = override.get('options', options) protocol = protocol.upper() @@ -390,7 +406,7 @@ class DatapoolManager: self.logger.warning('"{}" is not a supported protocol, aborting.'.format(protocol)) return - self.logger.info('Recording from new connection to ' + host + ':' + str(port) + '\n') + self.logger.info('Recording from new connection {}:{} to pool "{}" using {} protocol.'.format(host, port, protocol, pool_name)) new_session = self.session_factory_storage while True: dbrow = new_session.query(DbTelemetryPool).filter(DbTelemetryPool.pool_name == pool_name).first() @@ -432,23 +448,26 @@ class DatapoolManager: if self.own_gui: self.own_gui.disconnect_incoming_via_code(param=[pool_name, None, 'TM']) # Updates the gui - #Tell the Poolviewer to stop updating + # Tell the Poolviewer to stop updating if cfl.is_open('poolviewer', cfl.communication['poolviewer']): pv = cfl.dbus_connection('poolviewer', cfl.communication['poolviewer']) cfl.Functions(pv, 'stop_recording_info', str(pool_name)) return - def connect_tc(self, pool_name, host, port, drop_rx=True, protocol='PUS', timeout=10, is_server=False, options=''): - # self.logger.debug('connect_tc: type pool_name = {}'.format(type(pool_name))) - # self.logger.debug('connect_tc: type host = {}'.format(type(host))) - # self.logger.debug('connect_tc: type port = {}'.format(type(port))) - # if isinstance(pool_name, dbus.String): - # pool_name = str(pool_name) - # if isinstance(host, dbus.String): - # host = str(host) - # if isinstance(port, dbus.Int32): - # port = int(port) + def connect_tc(self, pool_name, host, port, protocol='PUS', drop_rx=True, timeout=10, is_server=False, options='', + override_with_options=False): + + # override variables that are set in the options string + if bool(override_with_options): + self.logger.debug('Overriding kwargs with values from options string.') + override = eval(options) + protocol = override.get('protocol', protocol) + drop_rx = override.get('drop_rx', drop_rx) + timeout = override.get('timeout', timeout) + is_server = override.get('is_server', is_server) + # options = override.get('options', options) + if is_server: socketserver = socket.socket(socket.AF_INET, socket.SOCK_STREAM) socketserver.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) @@ -466,9 +485,7 @@ class DatapoolManager: sockfd.settimeout(timeout) if pool_name in self.tc_connections: - # self.logger.info(self.tc_connections[pool_name]) - # if self.tc_connections[pool_name]['recording']: - self.logger.warning('Pool "{}" already exists!'.format(pool_name)) + self.logger.warning('Pool "{}" already has TC connection to {}!'.format(pool_name, self.tc_connections[pool_name]['socket'].getpeername())) return try: sockfd.connect((host, port)) @@ -480,6 +497,8 @@ class DatapoolManager: self.tc_name = pool_name self.tc_connections[pool_name] = {'socket': sockfd, 'protocol': protocol} + self.logger.info('Established TC connection to {}, using {} protocol.'.format(sockfd.getpeername(), protocol)) + if pool_name not in self.loaded_pools: self.loaded_pools[pool_name] = ActivePoolInfo(pool_name, 0, pool_name, True) @@ -495,8 +514,9 @@ class DatapoolManager: # read data received on TC socket to prevent buffer overflow if drop_rx: tc_recv = threading.Thread(target=self.tc_receiver, kwargs={'sockfd': sockfd, 'protocol': protocol}) - tc_recv.setDaemon(True) - tc_recv.name = 'TC-drop_rx' + # tc_recv.setDaemon(True) + tc_recv.daemon = True + tc_recv.name = 'TC-drop_rx-{}'.format(pool_name) tc_recv.start() if self.own_gui and self.tc_name is not None: @@ -532,6 +552,15 @@ class DatapoolManager: return + def _is_tc_connection_active(self, pool_name): + """ + Utility function to check whether a pool has an active TC connection to report back via DBus + """ + if pool_name in self.tc_connections and not self.tc_connections[pool_name]['socket'].fileno() < 0: + return True + else: + return False + # Function will disconnect both TC/TM connection if they have the same name def disconnect(self, pool_name): @@ -703,7 +732,7 @@ class DatapoolManager: elif drop_tx is False and pkt.startswith(PLM_PKT_PREFIX_TC.decode()): tm = bytes.fromhex(pkt.split(' ')[-3]) else: - self.logger.info("Not a PUS packet: " + pkt) + self.logger.warning("Not a PUS packet: " + pkt) continue if self.crc_check(tm): self.logger.warning("Invalid CRC: " + pkt) @@ -777,17 +806,17 @@ class DatapoolManager: else: self.decode_tmdump_and_process_packets_internal(buf, process_tm, checkcrc=False) except socket.timeout as e: - self.logger.debug('Socket timeout') + self.logger.info('Socket timeout ({}:{})'.format(host, port)) new_session.commit() continue except socket.error as e: - self.logger.error('Socket error') + self.logger.error('Socket error ({}:{})'.format(host, port)) self.logger.exception(e) # self.logger.error('ERROR: socket error') self.connections[pool_name]['recording'] = False break except struct.error as e: - self.logger.error('Lost connection...') + self.logger.error('Lost connection to {}:{}'.format(host, port)) self.logger.exception(e) self.connections[pool_name]['recording'] = False break @@ -919,6 +948,7 @@ class DatapoolManager: with self.lock: self.databuflen += len(buf) except socket.timeout: + self.logger.info('Socket timeout {}:{} [TC RX]'.format(host, port)) continue except socket.error: self.logger.error('Socket error') @@ -926,7 +956,7 @@ class DatapoolManager: except struct.error: self.logger.error('Lost connection...') break - self.logger.warning('Disconnected TC_recvr: ' + str(host) + ':' + str(port)) + self.logger.warning('Disconnected TC RX: ' + str(host) + ':' + str(port)) sockfd.close() # def set_commit_interval(self, pool_name, commit_interval): @@ -940,9 +970,8 @@ class DatapoolManager: def tc_send(self, pool_name, buf): if pool_name not in self.tc_connections: - self.logger.error('"{}" has no TC connection!'.format(pool_name)) + self.logger.error('"{}" is not connected to any TC socket!'.format(pool_name)) return - # raise NameError('"{}" has no TC connection!'.format(pool_name)) # check protocol of TC socket to append headers and stuff, this has to happen here, not in Tcsend_DB if self.tc_connections[pool_name]['protocol'].upper() == 'PLMSIM': @@ -961,8 +990,8 @@ class DatapoolManager: try: self.tc_connections[pool_name]['socket'].send(buf_to_send) except Exception as err: - self.logger.error('Failed to send packet of length {} to {} [{}].'.format(len(buf_to_send), pool_name, - self.tc_connections[pool_name]['socket'].getpeername())) + self.logger.error('Failed to send packet of length {} to {} [{}].'.format( + len(buf_to_send), pool_name, self.tc_connections[pool_name]['socket'].getpeername())) return with self.lock: