From d5aae4fbd3d9aa26d402931ee90bcb1cb9278d55 Mon Sep 17 00:00:00 2001 From: Sebastian <seb.miksch@aon.at> Date: Wed, 15 Mar 2023 15:39:47 +0100 Subject: [PATCH] added verification functions --- Tst/testing_library/testlib/tm.py | 393 ++++++++++++++++++++++-------- Tst/tst/tst.cfg | 8 +- Tst/tst/verification_tab.py | 16 +- 3 files changed, 308 insertions(+), 109 deletions(-) diff --git a/Tst/testing_library/testlib/tm.py b/Tst/testing_library/testlib/tm.py index 0b387e2..fda5d20 100644 --- a/Tst/testing_library/testlib/tm.py +++ b/Tst/testing_library/testlib/tm.py @@ -979,7 +979,120 @@ def extract_apid_from_packetid(packet_id): return apid -def get_tc_acknow(pool_name="LIVE", t_tc_sent=0, tc_apid=321, tc_ssc=1, tm_st=1, tm_sst=None): + +def get_tc_acknow(pool_name="LIVE", tc_apid=321, tc_ssc=1, tm_st=1, tm_sst=None): + """ + Check if for the TC acknowledgement packets can be found in the database. + This function makes a single database query. + :param pool_name: str + Name of the TM pool in the database + :param t_tc_sent: float + CUC timestamp from which search for telecommand should start + :param tc_apid: int or str + Application process ID of the sent TC. Can be provided as integer or hexadecimal string + :param tc_ssc: int + Source sequence counter of the sent TC + :return: (boolean, list) + boolean: + True if one or up to all acknowledgement packets TM(1,1), TM(1,3), TM(1,7) were found + False if one or all of TM(1,2), TM(1,4), TM(1,8) were found + list: + List of the acknowledgement TM packets for the TC, + [] if no acknowledgement TM packets could be found in the database + """ + result = None + + tc_data = get_data_of_last_tc(pool_name) + t_tc_sent = tc_data[0] + tc_iid = tc_data[1] + tc_bytes = tc_data[2] + + assert isinstance(pool_name, str) + assert isinstance(tc_apid, (int, str)) + assert isinstance(t_tc_sent, (float, int)) + assert isinstance(tc_iid, int) + assert isinstance(tc_bytes, (int, str)) + + # if the tc_apid is provided as hexadecimal number, convert it to and integer + tc_apid = tools.convert_apid_to_int(apid=tc_apid) + + # make database query + packets = fetch_packets(pool_name=pool_name, st=tm_st, sst=tm_sst, t_from=t_tc_sent - 1) + # print(packets[1][0][0].CTIME + (packets[1][0][0].FTIME/1000000)) + # filter for TM packets with the correct APID and source sequence counter (SSC) in the data field + ack_tms = [] + for i in range(len(packets)): + if packets[i][1] is not None and packets[i][1][0] is not None: + # get the data entries for APID and SSC + #pac_apid = packets[i][0][3] # TODO: not compatible anymore + pac_apid = packets[i][0][0].APID + first_tm_bytes = packets[i][0][1].hex()[0:4] + + if pac_apid == 961: # for acknowledgements from SEM + name_apid = 'PAR_CMD_APID' + name_psc = 'PAR_CMD_SEQUENCE_COUNT' + else: + name_apid = 'TcPcktId' + name_psc = 'TcPcktSeqCtrl' + para = get_tm_data_entries(tm_packet=packets[i], data_entry_names=[name_apid, name_psc]) + if name_apid in para and name_psc in para: + # extract the SSC from the PSC + ssc = extract_ssc_from_psc(psc=para[name_psc]) + apid = extract_apid_from_packetid(packet_id=para[name_apid]) + if pac_apid == 961: # acknowledgement packets from SEM have the PID in the field 'PAR_CMD_APID' + tc_pid = tools.extract_pid_from_apid(tc_apid) + if apid == tc_pid and ssc == tc_ssc: + ack_tms.append(packets[i]) + else: + if apid == tc_apid and ssc == tc_ssc: + ack_tms.append(packets[i]) + else: + logger.debug('get_tc_acknow: could not read the data from the TM packet') + + # treat with the result from the database query + if len(ack_tms) > 0: + # get the ST and SST of the TC for logging purposes + tc_st, tc_sst = get_st_and_sst(pool_name=pool_name, + apid=tc_apid, + ssc=tc_ssc, + is_tm=False, + t_from=t_tc_sent) + logger.info('Received acknowledgement TM packets for TC({},{}) apid={} ssc={}:' + .format(tc_st, tc_sst, tc_apid, tc_ssc)) + + # check if there was a failure, the result becomes False if a failure occurred + for i in range(len(ack_tms)): + head = ack_tms[i][0][0] + data = ack_tms[i][1] + if result is not False: + if head.SERV_SUB_TYPE in [1, 3, 7] and (first_tm_bytes == tc_bytes): + logger.info('TM({},{}) @ {}'.format(head.SERV_TYPE, head.SERV_SUB_TYPE, cfl.get_cuctime(head))) + result = True + # print(head.SERV_TYPE, head.SERV_SUB_TYPE) + elif head.SERV_SUB_TYPE in [2, 4, 8] and (first_tm_bytes == tc_bytes): + if head.SERV_SUB_TYPE == 2: + logger.info('TM({},{}) @ {} FAILURE: Acknowledge failure of acceptance check for a command.' + .format(head.SERV_TYPE, head.SERV_SUB_TYPE, cfl.get_cuctime(head))) + logger.debug('Data of the TM packet: {}'.format(data)) + if head.SERV_SUB_TYPE == 4: + logger.info('TM({},{}) @ {} FAILURE: Acknowledge failure of start check for a command.' + .format(head.SERV_TYPE, head.SERV_SUB_TYPE, cfl.get_cuctime(head))) + logger.debug('Data of the TM packet: {}'.format(data)) + if head.SERV_SUB_TYPE == 8: + logger.info( + 'TM({},{}) @ {} FAILURE: Acknowledge failure of termination check for a command.' + .format(head.SERV_TYPE, head.SERV_SUB_TYPE, cfl.get_cuctime(head))) + logger.debug('Data of the TM packet: {}'.format(data)) + result = False + else: + logger.info('No TM found with matching pattern {}'.format(tc_bytes)) + + return result, ack_tms + + + + +def get_tc_acknow_original(pool_name="LIVE", t_tc_sent=0, tc_apid=321, tc_ssc=1, tm_st=1, tm_sst=None): """ Check if for the TC acknowledgement packets can be found in the database. This function makes a single database query. @@ -1057,6 +1170,7 @@ def get_tc_acknow(pool_name="LIVE", t_tc_sent=0, tc_apid=321, tc_ssc=1, tm_st=1, if head.SERV_SUB_TYPE in [1, 3, 7]: logger.info('TM({},{}) @ {}'.format(head.SERV_TYPE, head.SERV_SUB_TYPE, cfl.get_cuctime(head))) result = True + print(head.SERV_TYPE, head.SERV_SUB_TYPE) elif head.SERV_SUB_TYPE in [2, 4, 8]: if head.SERV_SUB_TYPE == 2: logger.info('TM({},{}) @ {} FAILURE: Acknowledge failure of acceptance check for a command.' @@ -1156,116 +1270,117 @@ def await_tc_acknow(tc_identifier, pool_name="LIVE", duration=10, tm_st=1, tm_ss return result, ack_list -def get_5_1_tc_acknow(pool_name="LIVE", t_tc_sent=0.0, tc_apid=321, tc_ssc=1, tm_st=5, tm_sst=None): +def get_5_1_tc_acknow(pool_name="LIVE", tm_name="name", parameter_1=5, parameter_2=3): """ Check if for the TC acknowledgement packets can be found in the database. This function makes a single database query. :param pool_name: str Name of the TM pool in the database - :param t_tc_sent: float - CUC timestamp from which search for telecommand should start - :param tc_apid: int or str - Application process ID of the sent TC. Can be provided as integer or hexadecimal string - :param tc_ssc: int - Source sequence counter of the sent TC - :return: (boolean, list) + :param tm_name: string + Description of TM as found in TM Table + :param parameter_1: int + Value of the second parameter of the TM + :param parameter_2: int + Value of the third parameter of the TM + :return: (boolean) boolean: - True if one or up to all acknowledgement packets TM(5,1), TM(5,3), TM(5,7) were found - False if one or all of TM(5,2), TM(5,4), TM(5,8) were found - list: - List of the acknowledgement TM packets for the TC, - [] if no acknowledgement TM packets could be found in the database + True if the 5_1 packet was found with the given parameters + False if no such packet was found """ - result = None + assert isinstance(pool_name, str) - assert isinstance(tc_apid, (int, str)) - assert isinstance(t_tc_sent, (float, int)) + assert isinstance(tm_name, str) + assert isinstance(parameter_1, int) + assert isinstance((parameter_2, int)) + + + time_of_packet = get_time_of_last_tc(pool_name) + packets = fetch_packets(pool_name=pool_name, is_tm=True, st=5, sst=None, apid=321, ssc=None, t_from=time_of_packet, + t_to=time_of_packet+30 , dest_id=None, not_apid=None, decode=True, silent=False) + + if tm_name == "SASW Rep1_EVT_IASW_TR" and parameter_1 == 5 and parameter_2 == 3: + assert packets[0][1][0][1][4][0] == parameter_1 + assert packets[0][1][0][2][4][0] == parameter_2 + return True + + elif tm_name == "SASW Rep1_EVT_FEE_TR": + + + if parameter_1 == 3 and parameter_2 == 2: + assert packets[1][1][0][1][4][0] == parameter_1 + assert packets[1][1][0][2][4][0] == parameter_2 + return True + elif parameter_1 == 2 and parameter_2 == 7: + assert packets[2][1][0][1][4][0] == parameter_1 + assert packets[2][1][0][2][4][0] == parameter_2 + return True + elif parameter_1 == 7 and parameter_2 == 6: + assert packets[3][1][0][1][4][0] == parameter_1 + assert packets[3][1][0][2][4][0] == parameter_2 + return True + else: + print("Packet does not exist. SASW Rep1_EVT_FEE_TR") + return False + + else: + print("Packet does not exist. ASW Rep1_EVT_IASW_TR") + return False + + + + + # return packets - # if the tc_apid is provided as hexadecimal number, convert it to and integer - tc_apid = tools.convert_apid_to_int(apid=tc_apid) - evtid_list = [] - data_list = [] - # make database query - packets = fetch_packets(pool_name=pool_name, st=tm_st, sst=tm_sst, t_from=t_tc_sent) - return packets - """ - for packet in packets: - evtid = get_tm_data_entries(packet, "EvtId") - evtid_list.append(evtid) - data = packet[0][1].hex() - data_list.append(data) - print(evtid_list) - print(data_list) - """ # print(packets[1][0][0].CTIME + (packets[1][0][0].FTIME/1000000)) # filter for TM packets with the correct APID and source sequence counter (SSC) in the data field - """ - ack_tms = [] - for i in range(len(packets)): - if packets[i][1] is not None and packets[i][1][0] is not None: - # get the data entries for APID and SSC - # pac_apid = packets[i][0][3] # TODO: not compatible anymore - pac_apid = packets[i][0][0].APID - if pac_apid == 961: # for acknowledgements from SEM - name_apid = 'PAR_CMD_APID' - name_psc = 'PAR_CMD_SEQUENCE_COUNT' - else: - name_apid = 'TcPcktId' - name_psc = 'TcPcktSeqCtrl' - para = get_tm_data_entries(tm_packet=packets[i], data_entry_names=[name_apid, name_psc]) - if name_apid in para and name_psc in para: - # extract the SSC from the PSC - ssc = extract_ssc_from_psc(psc=para[name_psc]) - apid = extract_apid_from_packetid(packet_id=para[name_apid]) - if pac_apid == 961: # acknowledgement packets from SEM have the PID in the field 'PAR_CMD_APID' - tc_pid = tools.extract_pid_from_apid(tc_apid) - if apid == tc_pid and ssc == tc_ssc: - ack_tms.append(packets[i]) - else: - if apid == tc_apid and ssc == tc_ssc: - ack_tms.append(packets[i]) + + + +def check_if_packet_is_received(ST=1,SST=7,pool_name="LIVE"): + assert isinstance(pool_name, str) + assert isinstance(ST, int) + assert isinstance(SST, int) + + + raw_packet_list, data_packet_list = get_last_100_packets(pool_name=pool_name) + # print(raw_packet_list) + + + # ST 10 Stelle + # SST 11 Stelle + for i in range(len(raw_packet_list)): + packet = cfl.get_header_parameters_detailed(raw_packet_list[i]) + + print(packet) + + current_packet_ST = packet[10][1] + current_packet_SST = packet[11][1] + + + try: + current_timestamp = cfl.get_cuctime(raw_packet_list[i]) + except: + current_timestamp = "" + + if current_packet_ST == ST and current_packet_SST == SST: + print("Match") + print(ST, SST, current_timestamp) + return True else: - logger.debug('get_tc_acknow: could not read the data from the TM packet') + continue + + + + + + # print(time) + + if current_packet_ST == ST and current_packet_SST == SST: + print("Found it") - # treat with the result from the database query - if len(ack_tms) > 0: - # get the ST and SST of the TC for logging purposes - tc_st, tc_sst = get_st_and_sst(pool_name=pool_name, - apid=tc_apid, - ssc=tc_ssc, - is_tm=False, - t_from=t_tc_sent) - logger.info('Received acknowledgement TM packets for TC({},{}) apid={} ssc={}:' - .format(tc_st, tc_sst, tc_apid, tc_ssc)) - # check if there was a failure, the result becomes False if a failure occurred - for i in range(len(ack_tms)): - head = ack_tms[i][0][0] - data = ack_tms[i][1] - if result is not False: - if head.SERV_SUB_TYPE in [1, 3, 7]: - logger.info('TM({},{}) @ {}'.format(head.SERV_TYPE, head.SERV_SUB_TYPE, cfl.get_cuctime(head))) - result = True - elif head.SERV_SUB_TYPE in [2, 4, 8]: - if head.SERV_SUB_TYPE == 2: - logger.info('TM({},{}) @ {} FAILURE: Acknowledge failure of acceptance check for a command.' - .format(head.SERV_TYPE, head.SERV_SUB_TYPE, cfl.get_cuctime(head))) - logger.debug('Data of the TM packet: {}'.format(data)) - if head.SERV_SUB_TYPE == 4: - logger.info('TM({},{}) @ {} FAILURE: Acknowledge failure of start check for a command.' - .format(head.SERV_TYPE, head.SERV_SUB_TYPE, cfl.get_cuctime(head))) - logger.debug('Data of the TM packet: {}'.format(data)) - if head.SERV_SUB_TYPE == 8: - logger.info( - 'TM({},{}) @ {} FAILURE: Acknowledge failure of termination check for a command.' - .format(head.SERV_TYPE, head.SERV_SUB_TYPE, cfl.get_cuctime(head))) - logger.debug('Data of the TM packet: {}'.format(data)) - result = False - return result, ack_tms - pass - """ def get_sid_of_tm(packet): @@ -1273,11 +1388,6 @@ def get_sid_of_tm(packet): Gets SID from raw packet data. :param packet: bytes Packet whose SID should be extracted - :param decoded_packet: str - Decoded information of packet - :param header: str - extract header from raw data - """ decoded_packet = packet.hex() @@ -1729,11 +1839,13 @@ def get_acquisition(pool_name, tm_21_3): return result -def get_time_of_last_tc(pool_name="LIVE"): +def get_data_of_last_tc(pool_name="LIVE"): """ Finds the newest Telecommand in database with the Telemetry just before. - param pool_name: str + :param pool_name: str Name of the pool for TM/TC packets in the database + :return: str + timestamp of packet, IID of packet, First four bytes of packet data """ @@ -1744,11 +1856,86 @@ def get_time_of_last_tc(pool_name="LIVE"): if tm_pool[i].timestamp == "": # last_telecommand = tm_pool[i] tm_before_tc = tm_pool[i-1] + tc_id = tm_pool[i].iid + tc_bytes = tm_pool[i].raw.hex()[:4] + break timestamp = tm_before_tc.timestamp timestamp_length = len(timestamp) timestamp = float(timestamp[:timestamp_length -1]) - return timestamp + return timestamp, tc_id, tc_bytes + + +def verify_no_more_hk(wait_time=5, pool_name="LIVE"): + """ + Function waits for a spcified time and the searches the data pool for HK telemetry. + Returns True if there is no HK. + Returns False if there is HK. + :param wait_time: int + timespan in which the program checks if a hk has appeard + :param pool_name: str + Name of the pool for TM/TC packets in the database + :return: boolean + True if there is no HK, False if there is a HK + """ + assert isinstance(SID, int) + assert isinstance(pool_name, str) + + hk_signature = "0001050000000000000000000000000000000000000000000000000000000000000000000000000000000112" + time.sleep(wait_time) + # TODO incorporate function to find last 100 packets + list_of_packets = [] + for i in range (1,1000): + current_telemetry_packet = cfl.get_pool_rows(pool_name)[-i] + current_raw = current_telemetry_packet.raw + current_data = current_telemetry_packet.data + list_of_packets.append(current_data.hex()) + + try: + current_timestamp = cfl.get_cuctime(current_raw) + print(current_timestamp) + except: + break + + if hk_signature in list_of_packets: + return False + else: + return True + + +def get_tm_timestamp(packet, pool_name="LIVE"): + """ + gets timestamp from packet irregardless of data type + """ + pass + + +def get_last_100_packets(pool_name="Live"): + +# TODO Fix function to work with any number of packets + raw_packets = [] + data_packets = [] + data_pool = cfl.get_pool_rows(pool_name) + + + + for i in range(1, 101): + try: + current_packet = cfl.get_pool_rows(pool_name)[-i] + current_raw = current_packet.raw + current_data = current_packet.data + raw_packets.append(current_raw) + data_packets.append(current_data) + except IndexError: + break + + + return raw_packets, data_packets + + + + + def get_dpu_mode(pool_name="LIVE"): diff --git a/Tst/tst/tst.cfg b/Tst/tst/tst.cfg index fc1645f..73eb130 100644 --- a/Tst/tst/tst.cfg +++ b/Tst/tst/tst.cfg @@ -14,11 +14,11 @@ output-file-path = ${paths:tst}/logs_test_runs/output_files/ [tst-preferences] show-json-view = True -main-window-height = 1090 -main-window-width = 1920 -paned-position = 953 +main-window-height = 1016 +main-window-width = 1848 +paned-position = 1019 paned-position-codeblockreuse = 520 [tst-history] -last-folder = /home/marko/space/smile/OBSW/Documentation/testspec/tst +last-folder = /home/sebastian/OBSW/Documentation/testspec/tst/derived_specs diff --git a/Tst/tst/verification_tab.py b/Tst/tst/verification_tab.py index bb6e368..202182a 100644 --- a/Tst/tst/verification_tab.py +++ b/Tst/tst/verification_tab.py @@ -10,6 +10,7 @@ import confignator import sys sys.path.append(confignator.get_option('paths', 'ccs')) sys.path.append(os.path.join(confignator.get_option("paths", "Tst"), "testing_library/testlib")) +sys.path.append('/home/sebastian/CCS/Tst/testing_library/testlib') # notwendig damit tm als Modul erkannt wird import ccs_function_lib as cfl import s2k_partypes as s2k @@ -38,11 +39,21 @@ Verification_4 = "tm." + tm.get_frequency_of_hk.__name__ + str(inspect.signature Verification_5 = "tm." + tm.get_dpu_mode.__name__ + str(inspect.signature((tm.get_dpu_mode))) Verification_6 = "tm." + tm.get_packet_length.__name__ + str(inspect.signature((tm.get_packet_length))) Verification_7 = "tm." + tm.get_version_number.__name__ + str(inspect.signature((tm.get_version_number))) -Verification_8 = "tm." + tm.get_time_of_last_tc.__name__ + str(inspect.signature((tm.get_time_of_last_tc))) +Verification_8 = "tm." + tm.get_data_of_last_tc.__name__ + str(inspect.signature((tm.get_data_of_last_tc))) +Verification_9 = "tm." + tm.verify_no_more_hk.__name__ + str(inspect.signature((tm.verify_no_more_hk))) +# !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! + +# Descriptions + +descr_8 = "Get Timestamp of TM before last TC, get IID of last TC, get first 4 bytes of TC raw data." +descr_9 = "Check if there are no more HK packets" + + + verification_list = [ ("Get TC Verification", 1, 7, "descr", Verification_1), ("Await TC Verification", 1, 7, "descr", tc_identifier + "\n" + Verification_2), @@ -51,7 +62,8 @@ verification_list = [ ("Get DPU Mode", None, None, "descr", Verification_5), ("Get Packet Length", None, None, "descr", Verification_6), ("Get Version Number", None, None, "descr", Verification_7), - ("Get Time before last TC", None, None, "descr", Verification_8) + ("Get Data of last TC", None, None, descr_8, Verification_8), + ("Test if there are no more HK packets", None, None, descr_9, Verification_9) ] -- GitLab