diff --git a/Ccs/actions/action1.py b/Ccs/actions/action1.py index 3c18a4437d0fcca050c06b7bfd245f44d426ef34..f278101f41033e92a8c6f39efb247c33d8f35dd9 100644 --- a/Ccs/actions/action1.py +++ b/Ccs/actions/action1.py @@ -1,66 +1,4 @@ -# Prologue -#if (('initialization' not in dir()) or (initialization != "DB")) : -# exec(open('../acceptance_tests/v0.5/TestSetup_DB_05.py').read()) - -import time - - -#ccs.CnCsend('TC_CSR_OTA_S_HTR_N_ILIM 0.3') -#ccs.CnCsend('TC_OTA_S_HTR_N_SET_MODE 0') - -############################## -# allow several heater lines # -############################## - -#ccs.CnCsend('TC_CSR_OTA_S_HTR_N_ON') -ccs.CnCsend('TC_CSR_FEE_S_HTR_N_ON') -#ccs.CnCsend('TC_CSR_FPA_S_HTR_N_ON') -#ccs.CnCsend('TC_CSR_FPA_ANN_HTR_N_ON') - -#################### -# POWER ON THE BEE # -#################### - -# LCL/ARM the BEE power supply -ccs.CnCsend('TC_CSR_PS_ARM') -time.sleep(1) - -# set BEE PSN OVP and Voltage -ccs.CnCsend('TC_CSR_PSN_OVP 32.0') -ccs.CnCsend('TC_CSR_PSN_VSET 31.0') - -# switch ON the BEE power supply [NOMINAL] -ccs.CnCsend('TC_CSR_PSN_ON') -time.sleep(0.5) - -# set heater PS OCP limit -ccs.CnCsend('TC_CSR_PSH_OVP 32.0') -ccs.CnCsend('TC_CSR_PSH_ILIM 2.0') -ccs.CnCsend('TC_CSR_PSH_VSET 31.0') - -# switch ON the BEE Heater Output power supply [NOMINAL] -ccs.CnCsend('TC_CSR_PSH_ON') -time.sleep(0.5) - -# set ILIM of OTA Output to 0.5 (default is 0.2) -ccs.CnCsend('TC_CSR_P28V_OTA_N_ILIM 2.0') -time.sleep(0.25) - -# switch ON the OTA Output (also necessary for heaters) -ccs.CnCsend('TC_CSR_P28V_OTA_N_ON') -time.sleep(1) - - -######################### -# SEND THE BEE ON PULSE # -######################### - -# ARM the SHP pulse -ccs.CnCsend('TC_CSR_SHP_ARM 1') -time.sleep(0.5) - -# activate BEE [NOMINAL] via SHP output -ccs.CnCsend('TC_CSR_BEE_ON_N') - -# DISARM the SHP pulse -# ccs.CnCsend('TC_CSR_SHP_ARM 0',socket_name='TC') +# TC(3,5): SASW EnbHkCmd [KSC50057] +# Enable Periodic Generation of a Housekeeping Parameter Report St +SidNoCal = None # KSP50181 +cfl.Tcsend_DB('SASW EnbHkCmd', SidNoCal, pool_name='LIVE') \ No newline at end of file diff --git a/Ccs/ccs_function_lib.py b/Ccs/ccs_function_lib.py index b80210b17eec9a3d9a91553c9c499b8da72f786b..73999423fd6eed248eeabe0228bda669e1433935 100644 --- a/Ccs/ccs_function_lib.py +++ b/Ccs/ccs_function_lib.py @@ -2347,6 +2347,7 @@ def get_last_pckt_time(pool_name='LIVE', string=True): DbTelemetry.idx.desc() ).first() dbcon.close() + if row is not None: packet = row.raw else: @@ -2653,6 +2654,21 @@ def source_to_srec(data, outfile, memaddr=0x40180000, header=None, bytes_per_lin # print('Data written to file: "{}"'.format(outfile)) logger.info('Data written to file: "{}"'.format(outfile)) +""" +Test Function to get tm and tc from database tm +""" +def get_acute_tm_tc(description=None): + + if description is None: + test = scoped_session_idb.execute("SELECT * FROM smile_data_storage.tm " + "WHERE smile_data_storage.tm.pool_id = 40;").fetchall() + else: + test = scoped_session_idb.execute("SELECT * FROM smile_data_storage.tm " + "WHERE smile_data_storage.tm.pool_id = 40;").fetchall() + +""" +Test function ends +""" def get_tc_list(ccf_descr=None): diff --git a/Tst/__init__.py b/Tst/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/Tst/testing_library/testlib/tm.py b/Tst/testing_library/testlib/tm.py index 767c5e8ee54fd5d4209dfaec9ffe720f70402cd0..e66c416a7574294c409e02efa5782bb3bd7f1159 100644 --- a/Tst/testing_library/testlib/tm.py +++ b/Tst/testing_library/testlib/tm.py @@ -61,8 +61,10 @@ import ccs_function_lib as cfl from database import tm_db -from . import idb -from . import tools +# from . import idb +# from . import tools +import tools +import idb # create a logger logger = logging.getLogger(__name__) @@ -112,13 +114,14 @@ def filter_chain(query, pool_name, is_tm=True, st=None, sst=None, apid=None, seq # ToDo database has the CUC timestamp as string. Here the timestamps are floats. # Does this comparison operations work? t_from_string = str(t_from) + 'U' # the timestamps in the database are saved as string - query = query.filter(tm_db.DbTelemetry.timestamp >= t_from_string) # ToDo check if the change from > to >= breaks something! + query = query.filter(tm_db.DbTelemetry.timestamp >= t_from) # ToDo check if the change from > to >= breaks something! + # ToDo: Funktion Cast verwenden um String auf Double zu casten # query = query.filter(tm_db.DbTelemetry.timestamp > t_from) # <- comparison with float if t_to is not None: # ToDo database has the CUC timestamp as string. Here the timestamps are floats. # Does this comparison operations work? t_to_string = str(t_to) + 'U' # the timestamps in the database are saved as string - query = query.filter(tm_db.DbTelemetry.timestamp <= t_to_string) + query = query.filter(tm_db.DbTelemetry.timestamp <= t_to) # ToDo: Funktion Cast verwenden um String auf Double zu casten # query = query.filter(tm_db.DbTelemetry.timestamp <= end) # <- comparison with float if dest_id is not None: query = query.filter(tm_db.DbTelemetry.destID == dest_id) @@ -136,6 +139,7 @@ def highest_cuc_timestamp(tm_list): :rtype: PUS packet || None """ highest = None + timestamp = None if isinstance(tm_list, list) and len(tm_list) > 0: cuc = 0 for i in range(len(tm_list)): @@ -147,7 +151,8 @@ def highest_cuc_timestamp(tm_list): if tstamp > cuc: cuc = tstamp highest = tm_list[i] - return highest + timestamp = tstamp + return highest, timestamp def lowest_cuc_timestamp(pool_name, tm_list): @@ -622,11 +627,13 @@ def get_tm(pool_name, st=None, sst=None, apid=None, ssc=None, duration=5, t_from of decoded telemetry packets or [] """ + # set the time interval t_from, t_to = set_time_interval(pool_name=pool_name, t_from=t_from, t_to=t_to, duration=duration) # for the case that t_to is in future, wait current_cuc = cfl.get_last_pckt_time(pool_name=pool_name, string=False) + if t_to > current_cuc: difference = t_to - current_cuc time.sleep(difference) @@ -972,14 +979,14 @@ def extract_apid_from_packetid(packet_id): return apid -def get_tc_acknow(pool_name, t_tc_sent, tc_apid, tc_ssc, tm_st=1, tm_sst=None): +def get_tc_acknow(pool_name="LIVE", t_tc_sent=0, tc_apid=321, tc_ssc=1, tm_st=1, tm_sst=None): """ Check if for the TC acknowledgement packets can be found in the database. This function makes a single database query. :param pool_name: str Name of the TM pool in the database :param t_tc_sent: float - CUC timestamp of the telecommand + CUC timestamp from which search for telecommand should start :param tc_apid: int or str Application process ID of the sent TC. Can be provided as integer or hexadecimal string :param tc_ssc: int @@ -1002,7 +1009,7 @@ def get_tc_acknow(pool_name, t_tc_sent, tc_apid, tc_ssc, tm_st=1, tm_sst=None): # make database query packets = fetch_packets(pool_name=pool_name, st=tm_st, sst=tm_sst, t_from=t_tc_sent - 1) - + # print(packets[1][0][0].CTIME + (packets[1][0][0].FTIME/1000000)) # filter for TM packets with the correct APID and source sequence counter (SSC) in the data field ack_tms = [] for i in range(len(packets)): @@ -1069,7 +1076,7 @@ def get_tc_acknow(pool_name, t_tc_sent, tc_apid, tc_ssc, tm_st=1, tm_sst=None): return result, ack_tms -def await_tc_acknow(pool_name, tc_identifier, duration=10, tm_st=1, tm_sst=None): +def await_tc_acknow(tc_identifier, pool_name="LIVE", duration=10, tm_st=1, tm_sst=None): """ Waiting to receive the acknowledgement packet of a sent telecommand (TC) for a given duration. As soon as acknowledgement packets were found the function returns. @@ -1149,6 +1156,279 @@ def await_tc_acknow(pool_name, tc_identifier, duration=10, tm_st=1, tm_sst=None) return result, ack_list +def get_5_1_tc_acknow(pool_name="LIVE", t_tc_sent=0, tc_apid=321, tc_ssc=1, tm_st=5, tm_sst=None): + """ + Check if for the TC acknowledgement packets can be found in the database. + This function makes a single database query. + :param pool_name: str + Name of the TM pool in the database + :param t_tc_sent: float + CUC timestamp from which search for telecommand should start + :param tc_apid: int or str + Application process ID of the sent TC. Can be provided as integer or hexadecimal string + :param tc_ssc: int + Source sequence counter of the sent TC + :return: (boolean, list) + boolean: + True if one or up to all acknowledgement packets TM(5,1), TM(5,3), TM(5,7) were found + False if one or all of TM(5,2), TM(5,4), TM(5,8) were found + list: + List of the acknowledgement TM packets for the TC, + [] if no acknowledgement TM packets could be found in the database + """ + print("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!") + result = None + assert isinstance(pool_name, str) + assert isinstance(tc_apid, (int, str)) + assert isinstance(t_tc_sent, (float, int)) + + # if the tc_apid is provided as hexadecimal number, convert it to and integer + tc_apid = tools.convert_apid_to_int(apid=tc_apid) + + # make database query + packets = fetch_packets(pool_name=pool_name, st=tm_st, sst=tm_sst, t_from=t_tc_sent) + # print(packets[1][0][0].CTIME + (packets[1][0][0].FTIME/1000000)) + # filter for TM packets with the correct APID and source sequence counter (SSC) in the data field + ack_tms = [] + for i in range(len(packets)): + if packets[i][1] is not None and packets[i][1][0] is not None: + # get the data entries for APID and SSC + # pac_apid = packets[i][0][3] # TODO: not compatible anymore + pac_apid = packets[i][0][0].APID + if pac_apid == 961: # for acknowledgements from SEM + name_apid = 'PAR_CMD_APID' + name_psc = 'PAR_CMD_SEQUENCE_COUNT' + else: + name_apid = 'TcPcktId' + name_psc = 'TcPcktSeqCtrl' + para = get_tm_data_entries(tm_packet=packets[i], data_entry_names=[name_apid, name_psc]) + if name_apid in para and name_psc in para: + # extract the SSC from the PSC + ssc = extract_ssc_from_psc(psc=para[name_psc]) + apid = extract_apid_from_packetid(packet_id=para[name_apid]) + if pac_apid == 961: # acknowledgement packets from SEM have the PID in the field 'PAR_CMD_APID' + tc_pid = tools.extract_pid_from_apid(tc_apid) + if apid == tc_pid and ssc == tc_ssc: + ack_tms.append(packets[i]) + else: + if apid == tc_apid and ssc == tc_ssc: + ack_tms.append(packets[i]) + else: + logger.debug('get_tc_acknow: could not read the data from the TM packet') + + # treat with the result from the database query + if len(ack_tms) > 0: + # get the ST and SST of the TC for logging purposes + tc_st, tc_sst = get_st_and_sst(pool_name=pool_name, + apid=tc_apid, + ssc=tc_ssc, + is_tm=False, + t_from=t_tc_sent) + logger.info('Received acknowledgement TM packets for TC({},{}) apid={} ssc={}:' + .format(tc_st, tc_sst, tc_apid, tc_ssc)) + + # check if there was a failure, the result becomes False if a failure occurred + for i in range(len(ack_tms)): + head = ack_tms[i][0][0] + data = ack_tms[i][1] + if result is not False: + if head.SERV_SUB_TYPE in [1, 3, 7]: + logger.info('TM({},{}) @ {}'.format(head.SERV_TYPE, head.SERV_SUB_TYPE, cfl.get_cuctime(head))) + result = True + elif head.SERV_SUB_TYPE in [2, 4, 8]: + if head.SERV_SUB_TYPE == 2: + logger.info('TM({},{}) @ {} FAILURE: Acknowledge failure of acceptance check for a command.' + .format(head.SERV_TYPE, head.SERV_SUB_TYPE, cfl.get_cuctime(head))) + logger.debug('Data of the TM packet: {}'.format(data)) + if head.SERV_SUB_TYPE == 4: + logger.info('TM({},{}) @ {} FAILURE: Acknowledge failure of start check for a command.' + .format(head.SERV_TYPE, head.SERV_SUB_TYPE, cfl.get_cuctime(head))) + logger.debug('Data of the TM packet: {}'.format(data)) + if head.SERV_SUB_TYPE == 8: + logger.info( + 'TM({},{}) @ {} FAILURE: Acknowledge failure of termination check for a command.' + .format(head.SERV_TYPE, head.SERV_SUB_TYPE, cfl.get_cuctime(head))) + logger.debug('Data of the TM packet: {}'.format(data)) + result = False + + return result, ack_tms + pass + +def get_8_1_tc_acknow(pool_name="LIVE", t_tc_sent=0, tc_apid=321, tc_ssc=1, tm_st=8, tm_sst=None): + """ + Check if for the TC acknowledgement packets can be found in the database. + This function makes a single database query. + :param pool_name: str + Name of the TM pool in the database + :param t_tc_sent: float + CUC timestamp from which search for telecommand should start + :param tc_apid: int or str + Application process ID of the sent TC. Can be provided as integer or hexadecimal string + :param tc_ssc: int + Source sequence counter of the sent TC + :return: (boolean, list) + boolean: + True if one or up to all acknowledgement packets TM(5,1), TM(5,3), TM(5,7) were found + False if one or all of TM(5,2), TM(5,4), TM(5,8) were found + list: + List of the acknowledgement TM packets for the TC, + [] if no acknowledgement TM packets could be found in the database + """ + result = None + assert isinstance(pool_name, str) + assert isinstance(tc_apid, (int, str)) + assert isinstance(t_tc_sent, (float, int)) + + # if the tc_apid is provided as hexadecimal number, convert it to and integer + tc_apid = tools.convert_apid_to_int(apid=tc_apid) + + # make database query + packets = fetch_packets(pool_name=pool_name, st=tm_st, sst=tm_sst, t_from=t_tc_sent) + # print(packets[1][0][0].CTIME + (packets[1][0][0].FTIME/1000000)) + # filter for TM packets with the correct APID and source sequence counter (SSC) in the data field + ack_tms = [] + for i in range(len(packets)): + if packets[i][1] is not None and packets[i][1][0] is not None: + # get the data entries for APID and SSC + # pac_apid = packets[i][0][3] # TODO: not compatible anymore + pac_apid = packets[i][0][0].APID + if pac_apid == 961: # for acknowledgements from SEM + name_apid = 'PAR_CMD_APID' + name_psc = 'PAR_CMD_SEQUENCE_COUNT' + else: + name_apid = 'TcPcktId' + name_psc = 'TcPcktSeqCtrl' + para = get_tm_data_entries(tm_packet=packets[i], data_entry_names=[name_apid, name_psc]) + if name_apid in para and name_psc in para: + # extract the SSC from the PSC + ssc = extract_ssc_from_psc(psc=para[name_psc]) + apid = extract_apid_from_packetid(packet_id=para[name_apid]) + if pac_apid == 961: # acknowledgement packets from SEM have the PID in the field 'PAR_CMD_APID' + tc_pid = tools.extract_pid_from_apid(tc_apid) + if apid == tc_pid and ssc == tc_ssc: + ack_tms.append(packets[i]) + else: + if apid == tc_apid and ssc == tc_ssc: + ack_tms.append(packets[i]) + else: + logger.debug('get_tc_acknow: could not read the data from the TM packet') + + # treat with the result from the database query + if len(ack_tms) > 0: + # get the ST and SST of the TC for logging purposes + tc_st, tc_sst = get_st_and_sst(pool_name=pool_name, + apid=tc_apid, + ssc=tc_ssc, + is_tm=False, + t_from=t_tc_sent) + logger.info('Received acknowledgement TM packets for TC({},{}) apid={} ssc={}:' + .format(tc_st, tc_sst, tc_apid, tc_ssc)) + + # check if there was a failure, the result becomes False if a failure occurred + for i in range(len(ack_tms)): + head = ack_tms[i][0][0] + data = ack_tms[i][1] + if result is not False: + if head.SERV_SUB_TYPE in [1, 3, 7]: + logger.info('TM({},{}) @ {}'.format(head.SERV_TYPE, head.SERV_SUB_TYPE, cfl.get_cuctime(head))) + result = True + elif head.SERV_SUB_TYPE in [2, 4, 8]: + if head.SERV_SUB_TYPE == 2: + logger.info('TM({},{}) @ {} FAILURE: Acknowledge failure of acceptance check for a command.' + .format(head.SERV_TYPE, head.SERV_SUB_TYPE, cfl.get_cuctime(head))) + logger.debug('Data of the TM packet: {}'.format(data)) + if head.SERV_SUB_TYPE == 4: + logger.info('TM({},{}) @ {} FAILURE: Acknowledge failure of start check for a command.' + .format(head.SERV_TYPE, head.SERV_SUB_TYPE, cfl.get_cuctime(head))) + logger.debug('Data of the TM packet: {}'.format(data)) + if head.SERV_SUB_TYPE == 8: + logger.info( + 'TM({},{}) @ {} FAILURE: Acknowledge failure of termination check for a command.' + .format(head.SERV_TYPE, head.SERV_SUB_TYPE, cfl.get_cuctime(head))) + logger.debug('Data of the TM packet: {}'.format(data)) + result = False + + return result, ack_tms + pass + +def get_sid_of_tm(packet): + """ + Gets SID from raw packet data. + :param packet: bytes + Packet whose SID should be extracted + :param decoded_packet: str + Decoded information of packet + :param header: str + extract header from raw data + + """ + + decoded_packet = packet.hex() + header = decoded_packet[36:124] + return int(header[3]) + + + + +def get_frequency_of_hk(duration=60., frequency=20, SID=1, pool_name="LIVE"): + """ + Check the time passing between TM packets during a given duration. + This function makes a multiple database queries. + :param pool_name: str + Name of the TM pool in the database + :param duration: float + timeslot in which the incoming packages are checked for their cuc-timestamp + """ + + + assert isinstance(duration, (float, int)) + assert isinstance(frequency, int) + assert isinstance(SID, int) + + + list_of_timestamps = [] + last_telemetry_packet = cfl.get_pool_rows(pool_name)[-1].raw + timestamp = cfl.get_cuctime(last_telemetry_packet) + # last_packet_sid = get_sid_of_tm(last_telemetry_packet) + current_timestamp = 0.0 + end_time = timestamp + duration + + while current_timestamp <= end_time: + + current_telemetry_packet = cfl.get_pool_rows(pool_name)[-1].raw + current_timestamp = cfl.get_cuctime(current_telemetry_packet) + current_packet_sid = get_sid_of_tm((current_telemetry_packet)) + if current_timestamp not in list_of_timestamps and current_packet_sid == SID: + list_of_timestamps.append(current_timestamp) + else: + continue + + list_of_differences = [j-i for i, j in zip(list_of_timestamps[:-1], list_of_timestamps[1:])] + + first_element = round(list_of_differences[0]) + chk = True + + for item in list_of_differences: + if first_element != round(item): + chk = False + break + + if chk == True: + # print(round(list_of_differences[0])) + print(first_element) + if first_element == frequency: + print(True) + return True + else: + print("Didn't work") + return False + + + + + +# get_frequency_of_hk() + def check_acknowledgement(pool_name, tc_identifier, duration=10): """ Check that for a sent TC the acknowledgement packets were received (assuming the acknowledgement was enabled) @@ -1531,6 +1811,125 @@ def get_acquisition(pool_name, tm_21_3): logger.info(meta_data[i]) return result +def get_dpu_mode(pool_name="LIVE"): + """ + Get the data from the last entry in the database and check its DPU Mode. + :param pool_name: str + Name of the pool for TM/TC packets in the database + :param dpu_mode: dict + DPU Mode of the packet + + """ + + data_packet = cfl.get_pool_rows(pool_name)[-1].raw + dpu_mode = get_tm_data_entries(data_packet,"DpuMode") + print(dpu_mode) + +def get_packet_length(pool_name="LIVE"): + """ + Get the data from the last entry in the database and check its length. + :param pool_name: str + Name of the pool for TM/TC packets in the database + :param data_packet: bit + data in the last entry in database + :param packet_length: int + Length of data_packet + + """ + + data_packet = cfl.get_pool_rows(pool_name)[-1].raw + packet_length = len(data_packet) + print(packet_length) + +def get_version_number(version_number="0x0112", pool_name="LIVE"): + """ + Get the version number of the newest TM packet. + :param version_number: str + version number which should be verified + :param pool_name: str + Name of the pool for TM/TC packets in the database + :param data_packet: bytes + Data in Packet + :param packet_version_number_dic: dic + dictionary containing the version number + :param packet_version_number: str + version number converted into hex + """ + + assert isinstance(version_number, str) + + data_packet = cfl.get_pool_rows(pool_name)[-1].raw + packet_version_number_dic = get_tm_data_entries(data_packet, "VersionNumber") + # print(packet_version_number) + packet_version_number = hex(packet_version_number_dic["VersionNumber"]) + if packet_version_number == version_number: + print(True) + return True + else: + print(False) + return False + + +def await_tc_identifier(pool_name, tc_apid, tc_ssc): + """ + Gather the data necessary to identify the newest telecommand and put it in tuple. + :param pool_name: str + Name of the pool for TM/TC packets in the database + :param tc_apid: int or str + APID of the TC + :param tc_ssc: int + Sequence number of TC + :param last_tm_timestamp: float + Timestamp of the latest TM in datapool + :return: tc_identifier: tuple + Tuple with data to identify TC + + """ + + assert isinstance(pool_name, str) + assert isinstance(tc_apid, int) or isinstance(tc_apid, str) + assert isinstance(tc_ssc, int) + + last_tm_timestamp = cfl.get_pool_rows(pool_name)[-1].timestamp + length_last_tm_timestamp = len(last_tm_timestamp) + last_tm_timestamp = last_tm_timestamp[:length_last_tm_timestamp - 1] + last_tm_timestamp = float(last_tm_timestamp) + + tc_identifier = (tc_apid, tc_ssc, last_tm_timestamp) + + return tc_identifier + + +def get_tc_identifier(pool_name="LIVE", tc_apid=321, tc_ssc=1, tc_time=0.): + """ + Gather the data necessary to identify the newest telecommand and put it in tuple. + :param pool_name: str + Name of the pool for TM/TC packets in the database + :param tc_apid: int or str + APID of the TC + :param tc_ssc: int + Sequence number of TC + :param tc_time: float or int + Timestamp of the latest TM in datapool + :return: tc_identifier: tuple + Tuple with data to identify TC + + """ + assert isinstance(pool_name, str) + assert isinstance(tc_apid, int) or isinstance(tc_apid, str) + assert isinstance(tc_ssc, int) + assert isinstance(tc_time, int) or isinstance(tc_time, float) + + tc_time = float(tc_time) + + tc_identifier = (tc_apid, tc_ssc, tc_time) + + return tc_identifier + + + + + if __name__ == '__main__': sys.path.append('.') r=get_tc_acknow('PLM',0.,321,5) diff --git a/Tst/testing_library/testlib/tm_test.py b/Tst/testing_library/testlib/tm_test.py new file mode 100644 index 0000000000000000000000000000000000000000..43f10ce8a7af8829f695c5ce88031c5fb823c1cf --- /dev/null +++ b/Tst/testing_library/testlib/tm_test.py @@ -0,0 +1,93 @@ +import tm +import ccs_function_lib as cfl +import confignator +import matplotlib +matplotlib.use('Gtk3Cairo') +from confignator import config +check_cfg = config.get_config(file_path=confignator.get_option('config-files', 'ccs')) + + +telemetry = cfl.get_pool_rows("PLM") +telemetry_packet_1 = cfl.get_pool_rows("PLM")[0].raw +telemetry_packet_2 = cfl.get_pool_rows("PLM")[1].raw + +list_of_tm_packets = [telemetry_packet_1, telemetry_packet_2] + +telemetry_raw = telemetry.all()[-1].raw + + +timestamp = cfl.get_cuctime(telemetry_packet_1) +# test = tm.get_tm_data_entries(telemetry_packet_1, "EvtId") +# print(test) +timestamp_test = tm.highest_cuc_timestamp(list_of_tm_packets) + + +# baba = telemetry[-1].timestamp +# print(baba) + +# print("Telemetry Packet: ", telemetry_packet_1) +# print("Telemetry Packet: ", telemetry_packet_2) + + +# print(tm.get_tm_data_entries(telemetry_packet_1, "DpuMode")) + + +# get_list_from_db = tm.fetch_packets(pool_name="PLM", is_tm=True, st=None, sst=None, apid=None, ssc=None, t_from=0., t_to=1401., +# dest_id=None, not_apid=None, decode=False, silent=False) + + + + +identified_tc = tm.get_tc_identifier(pool_name="PLM",tc_apid=321,tc_ssc=1, tc_time=100) + + + + + + + + + + + + + + + +acknowledgement = tm.await_tc_acknow(pool_name="PLM", tc_identifier=identified_tc, duration=10, tm_st=1, tm_sst=7) + + +print(acknowledgement[1][0][0][0].CTIME + (acknowledgement[1][0][0][0].FTIME/1000000)) + + + + +""" +# fetch funktion, demonstration von .CTIME und FTIME + +test = tm.fetch_packets(pool_name="PLM", is_tm=True, st=3, sst=25, apid=321, ssc=None, t_from=0.00, t_to=21.0, + dest_id=None, not_apid=None, decode=True, silent=False) + + +# print(test) +# print(test[0][0][1]) +# print("Test: ", test[0][0][1]) +header = test[0][0][0] +# print(header.CTIME) +# print(header.FTIME) + +print(header.CTIME + (header.FTIME/1000000)) + +test_liste = [] + +for i in test: + test_liste.append(i[0][0]) + +print(len(test_liste)) +print(test_liste) + +""" + + + + diff --git a/Tst/tst/tst.cfg b/Tst/tst/tst.cfg index 3b780ad8e0b33b12adb304bc98a28888b1c39917..fc1645fa0a73456856f85ae3b3ce0faf9d4b4086 100644 --- a/Tst/tst/tst.cfg +++ b/Tst/tst/tst.cfg @@ -14,7 +14,7 @@ output-file-path = ${paths:tst}/logs_test_runs/output_files/ [tst-preferences] show-json-view = True -main-window-height = 1076 +main-window-height = 1090 main-window-width = 1920 paned-position = 953 paned-position-codeblockreuse = 520 diff --git a/Tst/tst/tst.py b/Tst/tst/tst.py index 67455c4ce9f16cd5874ef00cb12424859ddf7452..8dda45f38447b94a3e4a4fa9dbca1e2b311f5fe7 100755 --- a/Tst/tst/tst.py +++ b/Tst/tst/tst.py @@ -26,6 +26,7 @@ import toolbox import tc_management as tcm import tm_management as tmm import data_pool_tab as dpt +import verification_tab as vt import json_to_barescript import json_to_csv @@ -373,6 +374,12 @@ class TstAppWindow(Gtk.ApplicationWindow): self.label_widget_data_pool.set_text('Data Pool') self.feature_area.insert_page(child=self.data_pool_tab, tab_label=self.label_widget_data_pool, position=3) + # verification tab + self.verification_tab = vt.VerificationTable() + self.label_widget_verification = Gtk.Label() + self.label_widget_verification.set_text("Verification") + self.feature_area.insert_page(child=self.verification_tab, tab_label=self.label_widget_verification, position=4) + self.box.pack_start(self.work_desk, True, True, 0) # # panes for the step grid an so on diff --git a/Tst/tst/verification.py b/Tst/tst/verification.py new file mode 100644 index 0000000000000000000000000000000000000000..802718b988b39b70ec70cf038e4e5c7a251d4ceb --- /dev/null +++ b/Tst/tst/verification.py @@ -0,0 +1,218 @@ +import os +import json +import struct +import threading +import subprocess +import time +import sys +import dbus +import dbus.service +from dbus.mainloop.glib import DBusGMainLoop +import DBus_Basic + +import ccs_function_lib as cfl + +from typing import NamedTuple +import confignator +import gi + +import matplotlib +matplotlib.use('Gtk3Cairo') + + +# from sqlalchemy.sql.expression import func, distinct +from sqlalchemy.orm import load_only +from database.tm_db import DbTelemetryPool, DbTelemetry, RMapTelemetry, FEEDataTelemetry, scoped_session_maker + +import importlib +from confignator import config +check_cfg = config.get_config(file_path=confignator.get_option('config-files', 'ccs')) + + +def type_comparison(comparison_data, sst_1=1, sst_2=7, st_=1, st_2=1,): + pool_rows = cfl.get_pool_rows("PLM") + + st_list = [] + sst_list = [] + x = 0 + header_counter = 0 + while header_counter < 2: + x += 1 + entry = pool_rows.all()[-x] + + if entry.data.hex() == comparison_data: + + st_list.append(entry.stc) + sst_list.append(entry.sst) + + # print("ST Entry_" + str(x) + ": ", entry.stc) + # print("SST Entry_" + str(x) + ": ", entry.sst) + # print("Timestamp entry_" + str(x) + ": ", entry.timestamp) + header_counter += 1 + + + st_list_reverse = [st_list[1], st_list[0]] + sst_list_reverse = [sst_list[1], sst_list[0]] + + + if sst_list_reverse == [sst_1, sst_2]: + print("Verification successful") + else: + print("Verification unsuccessful") + + return False + + +def Verification( st_1=1, sst_1=1, st_2=1, sst_2=7, pool_name="PLM", verification_duration=2): + verification_running = True + + print("RUNNING!!") + + print("Variables: ") + print(st_1) + print(sst_1) + print(st_2) + print(sst_2) + print(pool_name) + print(verification_duration) + + + while verification_running == True: + + # while running the script checks the last three entries of the database and keeps them up to date + # to recognize a tc it checks the time + + pool_rows = cfl.get_pool_rows(pool_name) + + system_time = time.clock_gettime(0) + + entry_1_data = pool_rows.all()[-1] + # entry_2_data = pool_rows.all()[-2] + # entry_3_data = pool_rows.all()[-3] + + time_1 = entry_1_data.timestamp + # time_2 = entry_2_data.timestamp + # time_3 = entry_3_data.timestamp + + # in this script the number 1 after a variable name always refers to data from the last entry + # number 2 refers to second last entry, number 3 to third last entry and so on + # this part triggers as soon as a tc has arrived in the database + + if time_1 == "": + + first_raw_digits = "" # this string will contain the first bytes of raw data + + telecommand = entry_1_data + telecommand_time = telecommand.timestamp + telecommand_raw = telecommand.raw.hex() + # telecommand_data = telecommand.data.hex() + # Variable to generate new telecommand timestamp, other than telecommand_time + telecommand_verification_timestamp = time.clock_gettime(0) + verification_time = telecommand_verification_timestamp + verification_duration + + for i in telecommand_raw: + first_raw_digits += str(i) + if len(first_raw_digits) > 7: + break + + # print("After Loop telecommand_first_digits: ", first_raw_digits) + + + while system_time < verification_time and system_time != verification_time: + system_time = time.clock_gettime(0) + + if system_time >= verification_time: + + verification_running = type_comparison(first_raw_digits, sst_1, sst_2) + + + +def verification_template(verification_descr, pool_name="LIVE", ST_1=None, ST_2=None, SST_1=None, SST_2=None, time=2, + comment="", preamble="Ver.verification"): + if comment: + commentstr = "# TC({}, {}): {} [{}]\n# {}\n".format(*cmd[3:], cmd[1], cmd[0], cmd[2]) + newline = "\n" + else: + commentstr = "" + newline = "" + + exe = "{}('{}', ST_1={}, SST_1={}, ST_2={}, SST_2={}, pool_name='{}', time={})".format(preamble, verification_descr, + ST_1, SST_1, ST_2, SST_2, + pool_name, time) + return commentstr + exe + newline + + +# def new_verification_template(verification_descr, pool_name="LIVE", comment="", preamble=): + + +# reads in verification as soon as exec button in step widget is pressed +def read_verification(verification_string): + + + first_split = verification_string.split("(") + + + second_split = "" + for i in first_split[1]: + if i != ")": + second_split += i + + + third_split = second_split.split(",") + + + st_1 = third_split[1] + sst_1 = third_split[2] + st_2 = third_split[3] + sst_2 = third_split[4] + pool_name = third_split[5] + time_duration = third_split[6] + + + st_1_value = st_1.split("=")[1] + sst_1_value = sst_1.split("=")[1] + st_2_value = st_2.split("=")[1] + sst_2_value = sst_2.split("=")[1] + pool_name_value = pool_name.split("=")[1] + time_duration_value = time_duration.split("=")[1] + + st_sst_list = [st_1_value, sst_1_value, st_2_value, sst_2_value] + + + + value_list = [] + + for element in st_sst_list: + # print(element) + if element == "None": + element = None + else: + try: + element = int(element) + except: + raise AssertionError("ST and SST values have to be None or int") + + value_list.append(element) + + + if type(pool_name_value) == str: + # print("if pool name: ", pool_name_value) + value_list.append(pool_name_value) + else: + raise AssertionError("Pool Name needs to be str") + + # time_duration_value_checker = isinstance(time_duration_value, int) + # print("Time instance: ", time_duration_value_checker) + try: + time_duration_value = int(time_duration_value) + value_list.append(time_duration_value) + except: + raise AssertionError("Time has to be int") + + Verification(value_list[0], value_list[1], value_list[2], value_list[3], value_list[4], value_list[5]) + + + + + + diff --git a/Tst/tst/verification_tab.py b/Tst/tst/verification_tab.py new file mode 100644 index 0000000000000000000000000000000000000000..dc189cc84ae151bc6d7e0d77c70cf18d781e6449 --- /dev/null +++ b/Tst/tst/verification_tab.py @@ -0,0 +1,200 @@ +# !/usr/bin/env python3 +import gi +import os + +gi.require_version("Gtk", "3.0") +gi.require_version("GtkSource", "3.0") +from gi.repository import Gtk, Gdk, GtkSource +from gi.repository.GdkPixbuf import Pixbuf +import confignator +import sys +sys.path.append(confignator.get_option('paths', 'ccs')) +sys.path.append(os.path.join(confignator.get_option("paths", "Tst"), "testing_library/testlib")) +import ccs_function_lib as cfl +import s2k_partypes as s2k + +import time +import sys +import dbus +import dbus.service +from dbus.mainloop.glib import DBusGMainLoop +import DBus_Basic + + +from typing import NamedTuple +import confignator +import gi +# print(sys.path) +import inspect + + +import tm + +Verification_1 = "tm." + tm.get_tc_acknow.__name__ + str(inspect.signature((tm.get_tc_acknow))) +Verification_2 = "tm." + tm.await_tc_acknow.__name__ + str(inspect.signature((tm.await_tc_acknow))) +Verification_3 = "tm." + tm.get_5_1_tc_acknow.__name__ + str(inspect.signature((tm.get_5_1_tc_acknow))) +tc_identifier = "identified_tc = tm." + tm.get_tc_identifier.__name__ + str(inspect.signature((tm.get_tc_identifier))) +Verification_4 = "tm." + tm.get_frequency_of_hk.__name__ + str(inspect.signature((tm.get_frequency_of_hk))) +Verification_5 = "tm." + tm.get_dpu_mode.__name__ + str(inspect.signature((tm.get_dpu_mode))) +Verification_6 = "tm." + tm.get_packet_length.__name__ + str(inspect.signature((tm.get_packet_length))) +Verification_7 = "tm." + tm.get_version_number.__name__ + str(inspect.signature((tm.get_version_number))) + + + + +verification_list = [ + ("Get TC Verification", 1, 7, "descr", Verification_1), + ("Await TC Verification", 1, 7, "descr", tc_identifier + "\n" + Verification_2), + ("Get TC Verification", 5, 1, "descr", Verification_3), + ("Get HK frequency", None, None, "descr", Verification_4), + ("Get DPU Mode", None, None, "descr", Verification_5), + ("Get Packet Length", None, None, "descr", Verification_6), + ("Get Version Number", None, None, "descr", Verification_7) +] + + + + + + +def translate_drag_data(data): + + + translated = "kla" + + return translated + + +class VerificationTable(Gtk.Grid): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + self.set_size_request(500,500) + + + self.grid = Gtk.Grid() + self.grid.set_column_homogeneous(True) + self.grid.set_row_homogeneous(True) + self.add(self.grid) + + # Creating ListStore model + self.verification_liststore = Gtk.ListStore(str, int, int, str, str) + for verification_ref in verification_list: + self. verification_liststore.append(list(verification_ref)) + self.current_filter_verification = None + + # Creating filter, feeding it with liststore model + self.verification_filter = self.verification_liststore.filter_new() + # setting the filter function + self.verification_filter.set_visible_func(self.verification_filter_func) + + # Creating treeview + self.treeview = Gtk.TreeView(model=self.verification_filter) + for i, column_title in enumerate( + ["Verification", "ST", "SST", "Description"] + ): + renderer = Gtk.CellRendererText() + column = Gtk.TreeViewColumn(column_title, renderer, text=i) + self.treeview.append_column(column) + + # setting up layout + self.scrollable_treelist = Gtk.ScrolledWindow() + self.scrollable_treelist.set_vexpand(True) + self.scrollable_treelist.set_hexpand(True) + self.grid.attach(self.scrollable_treelist, 0, 0, 8, 10) + self.scrollable_treelist.add(self.treeview) + + + # handle selection + self.selected_row = self.treeview.get_selection() + self.selected_row.connect("changed", self.item_selected) + + + # Set up Drag and Drop + self.treeview.drag_source_set(Gdk.ModifierType.BUTTON1_MASK, [], Gdk.DragAction.COPY) + self.treeview.drag_source_set_target_list(None) + self.treeview.drag_source_add_text_targets() + + self.treeview.connect("drag-data-get", self.on_drag_data_get) + self.treeview.connect("drag-begin", self.on_drag_begin) + + self.show_all() + + def verification_filter_func(self, model, iter, data): + if( + self.current_filter_verification is None + or self.current_filter_verification == "None" + ): + return True + else: + return model[iter][2] == self.current_filter_verification + + + + + + + # drag and drop + + def item_selected(self, selection): + model, row = selection.get_selected() + if row is not None: + + global verification_name + global ST + global SST + global descr + global name_string + + verification_name = model[row][0] + ST = model[row][1] + SST = model[row][2] + descr = model[row][3] + name_string = model[row][4] + + global selected_data_for_drag_drop + # print(verification_name) + # print(name_string) + + selected_data_for_drag_drop = name_string + # selected_data_for_drag_drop = cfl.verification_template(name_string) + # str(verification_name) + "\n ST = " + str(ST) + "\n SST = " + str(SST) + "\n Time = 2" + # selected_data_for_drag_drop = "{} ({}, {})".format((name, ST, SST)) + + else: + pass + + + + + def on_drag_data_get(self, treeview, drag_context, selection_data, info, time, *args): + treeselection = treeview.get_selection() + model, my_iter = treeselection.get_selected() + global selected_data_for_drag_drop + selection_data.set_text(selected_data_for_drag_drop, -1) + + + + + def on_drag_begin(self, *args): + pass + + + + + + + + + + + + + + + + + + + + diff --git a/Tst/tst/view.py b/Tst/tst/view.py index da845933c0ecd02ef924471332e5b2e9c2de54f2..4df4e384b7014f033b55e744fe9c3299eb396697 100644 --- a/Tst/tst/view.py +++ b/Tst/tst/view.py @@ -15,6 +15,7 @@ import dnd_data_parser import toolbox import cairo import sys +import verification import confignator ccs_path = confignator.get_option('paths', 'ccs') @@ -1316,12 +1317,23 @@ class StepWidget(Gtk.EventBox): return commands = str(self.get_commands_from_widget()) + commands2 = str(self.get_verification_from_widget()) + if len(commands) == 0: return + # if len(commands2) == 0: + # return + # print("Commands: ", commands) + # print("Verification: ", commands2) + # print(type(commands2)) + + # verification.read_verification(commands2) + ed = cfl.dbus_connection('editor') cfl.Functions(ed, '_to_console_via_socket', commands) + # cfl.Functions(ed, '_to_console_via_socket', commands2) def on_toggle_detail(self, toolbutton, *args): """ diff --git a/egse.cfg b/egse.cfg index 7818b3dbe944350a1d639f5defe70603d46bbf51..e89ec8216a1f682188abff6fde363001dea875b1 100644 --- a/egse.cfg +++ b/egse.cfg @@ -11,8 +11,8 @@ start-simulator-log = ${logging:log-dir}/simulators/sim.log name = SMILE [database] -user = egse -password = xrayvision +user = sebastian +password = Ego,ich1 host = 127.0.0.1 mib-schema = mib_smile_sxi datapool-items =