diff --git a/Tst/testing_library/testlib/tm.py b/Tst/testing_library/testlib/tm.py index be2abb9169d7a93a73aff98c107430e6f2a9e036..767c5e8ee54fd5d4209dfaec9ffe720f70402cd0 100644 --- a/Tst/testing_library/testlib/tm.py +++ b/Tst/testing_library/testlib/tm.py @@ -67,42 +67,6 @@ from . import tools # create a logger logger = logging.getLogger(__name__) -''' -def sessionfactory(ccs): - """ - Creates a sessionmaker - - :param ccs: Instance of the class packets.CCScom - :type ccs: packets.CCScom - - :return: Sessionmaker object - :rtype: sqlalchemy.orm.session.sessionmaker - """ - if ccs.session is None: - engine = None - s_factory = None - if engine is None: - engine = create_engine( - config_db.mysql_connection_string, - echo="-v" in sys.argv) - s_factory = sessionmaker(bind=engine) - else: - s_factory = ccs.session - return s_factory - - -def new_database_session(ccs): - """ - Creates a new session. - :param ccs: packets.CCScom - Instance of the class packets.CCScom - :return: session - """ - session_maker = sessionfactory(ccs=ccs) - session = session_maker() - return session -''' - def filter_chain(query, pool_name, is_tm=True, st=None, sst=None, apid=None, seq=None, t_from=None, t_to=None, dest_id=None, not_apid=None): """ @@ -338,10 +302,10 @@ def decode_single_tm_packet(packet): header = cfl.Tmread(packet) if header is not None: # the packet is a TC - if header[1] == 1: + if header[0].PKT_TYPE == 1: result = header, None # the packet is a TM - elif header[1] == 0: + elif header[0].PKT_TYPE == 0: data = cfl.Tmdata(packet) if data != (None, None): # data field could be decoded result = header, data @@ -951,9 +915,11 @@ def get_st_and_sst(pool_name, apid, ssc, is_tm=False, t_from=None): # if the packet was found read the header and extract the ST and SST if len(tc_list) == 1: if tc_list[0][0] is not None: - header = tc_list[0][0] - tc_st = header[10] - tc_sst = header[11] + header = tc_list[0][0][0] + #tc_st = header[10] + #tc_sst = header[11] + tc_st = header.SERV_TYPE + tc_sst = header.SERV_SUB_TYPE elif len(tc_list) < 1: logger.warning('get_st_and_sst: TC packet with apid {} and source sequence counter {} could not be found in the ' 'database'.format(apid, ssc)) @@ -1028,8 +994,8 @@ def get_tc_acknow(pool_name, t_tc_sent, tc_apid, tc_ssc, tm_st=1, tm_sst=None): """ result = None assert isinstance(pool_name, str) - assert isinstance(tc_apid, int) or isinstance(tc_apid, str) - assert isinstance(t_tc_sent, float) + assert isinstance(tc_apid, (int, str)) + assert isinstance(t_tc_sent, (float, int)) # if the tc_apid is provided as hexadecimal number, convert it to and integer tc_apid = tools.convert_apid_to_int(apid=tc_apid) @@ -1042,13 +1008,14 @@ def get_tc_acknow(pool_name, t_tc_sent, tc_apid, tc_ssc, tm_st=1, tm_sst=None): for i in range(len(packets)): if packets[i][1] is not None and packets[i][1][0] is not None: # get the data entries for APID and SSC - pac_apid = packets[i][0][3] + #pac_apid = packets[i][0][3] # TODO: not compatible anymore + pac_apid = packets[i][0][0].APID if pac_apid == 961: # for acknowledgements from SEM name_apid = 'PAR_CMD_APID' name_psc = 'PAR_CMD_SEQUENCE_COUNT' else: - name_apid = 'TcPacketId' - name_psc = 'TcPacketSeqCtrl' + name_apid = 'TcPcktId' + name_psc = 'TcPcktSeqCtrl' para = get_tm_data_entries(tm_packet=packets[i], data_entry_names=[name_apid, name_psc]) if name_apid in para and name_psc in para: # extract the SSC from the PSC @@ -1077,25 +1044,25 @@ def get_tc_acknow(pool_name, t_tc_sent, tc_apid, tc_ssc, tm_st=1, tm_sst=None): # check if there was a failure, the result becomes False if a failure occurred for i in range(len(ack_tms)): - head = ack_tms[i][0] + head = ack_tms[i][0][0] data = ack_tms[i][1] if result is not False: - if head[11] == 1 or head[11] == 3 or head[11] == 7: - logger.info('TM({},{}) @ {}'.format(head[10], head[11], cfl.get_cuctime(head))) + if head.SERV_SUB_TYPE in [1, 3, 7]: + logger.info('TM({},{}) @ {}'.format(head.SERV_TYPE, head.SERV_SUB_TYPE, cfl.get_cuctime(head))) result = True - if head[11] == 2 or head[11] == 4 or head[11] == 8: - if head[11] == 2: + elif head.SERV_SUB_TYPE in [2, 4, 8]: + if head.SERV_SUB_TYPE == 2: logger.info('TM({},{}) @ {} FAILURE: Acknowledge failure of acceptance check for a command.' - .format(head[10], head[11], cfl.get_cuctime(head))) + .format(head.SERV_TYPE, head.SERV_SUB_TYPE, cfl.get_cuctime(head))) logger.debug('Data of the TM packet: {}'.format(data)) - if head[11] == 4: + if head.SERV_SUB_TYPE == 4: logger.info('TM({},{}) @ {} FAILURE: Acknowledge failure of start check for a command.' - .format(head[10], head[11], cfl.get_cuctime(head))) + .format(head.SERV_TYPE, head.SERV_SUB_TYPE, cfl.get_cuctime(head))) logger.debug('Data of the TM packet: {}'.format(data)) - if head[11] == 8: + if head.SERV_SUB_TYPE == 8: logger.info( 'TM({},{}) @ {} FAILURE: Acknowledge failure of termination check for a command.' - .format(head[10], head[11], cfl.get_cuctime(head))) + .format(head.SERV_TYPE, head.SERV_SUB_TYPE, cfl.get_cuctime(head))) logger.debug('Data of the TM packet: {}'.format(data)) result = False @@ -1564,3 +1531,7 @@ def get_acquisition(pool_name, tm_21_3): logger.info(meta_data[i]) return result +if __name__ == '__main__': + sys.path.append('.') + r=get_tc_acknow('PLM',0.,321,5) + print(r) \ No newline at end of file diff --git a/Tst/testing_library/testlib/tools.py b/Tst/testing_library/testlib/tools.py index 0c57a3702b58ac6532883c628260e2a4f47df72e..b1104bd27e5385f697c7ecffc450643c4537f8d5 100644 --- a/Tst/testing_library/testlib/tools.py +++ b/Tst/testing_library/testlib/tools.py @@ -146,19 +146,15 @@ def convert_apid_to_int(apid): """ assert isinstance(apid, int) or isinstance(apid, str) - res = None if isinstance(apid, str): try: - if 'x' in apid: - res = int(apid, 16) - else: - res = int(apid) + res = int(apid, 0) except ValueError as err: logger.exception(err) - if isinstance(apid, int): + res = None + + elif isinstance(apid, int): res = apid - if res is None: - logger.error('convert_apid_to_int: failed to convert the APID {} into an integer'.format(apid)) return res @@ -167,7 +163,6 @@ def convert_apid_to_int(apid): # @param apid: <int> application process identifier # @result: <int> PID, None if no valid apid is provided def extract_pid_from_apid(apid): - result = None """ Since commands ('0x14C' = 332 and '0x3CC' = 972) are not in the pid table, this check is not used # query for the existing apids self.c.execute('SELECT PID_APID FROM dabys_mib_cheops.pid') @@ -183,21 +178,22 @@ def extract_pid_from_apid(apid): # if the apid is provided as a string try to convert it to a integer if isinstance(apid, str): try: - apid = int(apid) + apid = int(apid, 0) except ValueError as error: - try: - apid = int(apid, 16) - except ValueError as error: - logger.exception(error) + logger.exception(error) + raise error - if isinstance(apid, int): + # if isinstance(apid, int): # convert the apid into hexadecimal system and slice the last character (=PID), then convert it back to an int - apid_as_hex = hex(apid) - pid_as_hex = apid_as_hex[:4] - pid_as_dez = int(pid_as_hex, 16) - result = pid_as_dez + # apid_as_hex = hex(apid) + # pid_as_hex = apid_as_hex[:4] + # pid_as_dez = int(pid_as_hex, 16) + # result = pid_as_dez # TODO: this is the PCAT, not the PID!? - return result + pid = apid & 0xf + pcat = (apid >> 4) & 0x7f + + return pcat, pid def print_apids():