diff --git a/Ccs/actions/action1.py b/Ccs/actions/action1.py index 3c18a4437d0fcca050c06b7bfd245f44d426ef34..f278101f41033e92a8c6f39efb247c33d8f35dd9 100644 --- a/Ccs/actions/action1.py +++ b/Ccs/actions/action1.py @@ -1,66 +1,4 @@ -# Prologue -#if (('initialization' not in dir()) or (initialization != "DB")) : -# exec(open('../acceptance_tests/v0.5/TestSetup_DB_05.py').read()) - -import time - - -#ccs.CnCsend('TC_CSR_OTA_S_HTR_N_ILIM 0.3') -#ccs.CnCsend('TC_OTA_S_HTR_N_SET_MODE 0') - -############################## -# allow several heater lines # -############################## - -#ccs.CnCsend('TC_CSR_OTA_S_HTR_N_ON') -ccs.CnCsend('TC_CSR_FEE_S_HTR_N_ON') -#ccs.CnCsend('TC_CSR_FPA_S_HTR_N_ON') -#ccs.CnCsend('TC_CSR_FPA_ANN_HTR_N_ON') - -#################### -# POWER ON THE BEE # -#################### - -# LCL/ARM the BEE power supply -ccs.CnCsend('TC_CSR_PS_ARM') -time.sleep(1) - -# set BEE PSN OVP and Voltage -ccs.CnCsend('TC_CSR_PSN_OVP 32.0') -ccs.CnCsend('TC_CSR_PSN_VSET 31.0') - -# switch ON the BEE power supply [NOMINAL] -ccs.CnCsend('TC_CSR_PSN_ON') -time.sleep(0.5) - -# set heater PS OCP limit -ccs.CnCsend('TC_CSR_PSH_OVP 32.0') -ccs.CnCsend('TC_CSR_PSH_ILIM 2.0') -ccs.CnCsend('TC_CSR_PSH_VSET 31.0') - -# switch ON the BEE Heater Output power supply [NOMINAL] -ccs.CnCsend('TC_CSR_PSH_ON') -time.sleep(0.5) - -# set ILIM of OTA Output to 0.5 (default is 0.2) -ccs.CnCsend('TC_CSR_P28V_OTA_N_ILIM 2.0') -time.sleep(0.25) - -# switch ON the OTA Output (also necessary for heaters) -ccs.CnCsend('TC_CSR_P28V_OTA_N_ON') -time.sleep(1) - - -######################### -# SEND THE BEE ON PULSE # -######################### - -# ARM the SHP pulse -ccs.CnCsend('TC_CSR_SHP_ARM 1') -time.sleep(0.5) - -# activate BEE [NOMINAL] via SHP output -ccs.CnCsend('TC_CSR_BEE_ON_N') - -# DISARM the SHP pulse -# ccs.CnCsend('TC_CSR_SHP_ARM 0',socket_name='TC') +# TC(3,5): SASW EnbHkCmd [KSC50057] +# Enable Periodic Generation of a Housekeeping Parameter Report St +SidNoCal = None # KSP50181 +cfl.Tcsend_DB('SASW EnbHkCmd', SidNoCal, pool_name='LIVE') \ No newline at end of file diff --git a/Ccs/ccs_function_lib.py b/Ccs/ccs_function_lib.py index 7dc4edf5f33b2eacdf7b416236e3a321d496c8e5..fda779d5840f24aedb85ec357784049e11725762 100644 --- a/Ccs/ccs_function_lib.py +++ b/Ccs/ccs_function_lib.py @@ -4266,6 +4266,23 @@ def _get_upload_service_info(tcname=None): return apid, memid_ref, fmt, endspares +""" +Test Function to get tm and tc from database tm +""" +def get_acute_tm_tc(description=None): + + if description is None: + test = scoped_session_idb.execute("SELECT * FROM smile_data_storage.tm " + "WHERE smile_data_storage.tm.pool_id = 40;").fetchall() + else: + test = scoped_session_idb.execute("SELECT * FROM smile_data_storage.tm " + "WHERE smile_data_storage.tm.pool_id = 40;").fetchall() + +""" +Test function ends +""" + + def get_tc_list(ccf_descr=None): """ diff --git a/Ccs/editor.py b/Ccs/editor.py index c7655993f0d2ce6b15c3041fb459c73daa52b804..ff9e48f9d2989d17309e46c947f948f757c9d592 100644 --- a/Ccs/editor.py +++ b/Ccs/editor.py @@ -23,6 +23,9 @@ import DBus_Basic # import config_dialog import ccs_function_lib as cfl +sys.path.append(os.path.join(confignator.get_option("paths", "Tst"), "testing_library/testlib")) +import tm + cfg = confignator.get_config() pixmap_folder = cfg.get('ccs-paths', 'pixmap') diff --git a/Tst/__init__.py b/Tst/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/Tst/codeblockreusefeature/codeblockreusefeature.cfg b/Tst/codeblockreusefeature/codeblockreusefeature.cfg index d4f5beb1a6d84b3dc9f413260a4da44203f1fcf9..10a0ed13793c712a244fa0940bb5f0e85a72d985 100644 --- a/Tst/codeblockreusefeature/codeblockreusefeature.cfg +++ b/Tst/codeblockreusefeature/codeblockreusefeature.cfg @@ -25,6 +25,6 @@ threadname = True main-window-height = 791 main-window-width = 612 paned-position-search = 406 -paned-position-command-code = 68 +paned-position-command-code = 28 paned-position-comment = 462 diff --git a/Tst/testing_library/testlib/tm.py b/Tst/testing_library/testlib/tm.py index 767c5e8ee54fd5d4209dfaec9ffe720f70402cd0..4ad37b9a875f3cd781535051c3221399c70dce4e 100644 --- a/Tst/testing_library/testlib/tm.py +++ b/Tst/testing_library/testlib/tm.py @@ -61,8 +61,10 @@ import ccs_function_lib as cfl from database import tm_db -from . import idb -from . import tools +# from . import idb +# from . import tools +import tools +import idb # create a logger logger = logging.getLogger(__name__) @@ -112,13 +114,14 @@ def filter_chain(query, pool_name, is_tm=True, st=None, sst=None, apid=None, seq # ToDo database has the CUC timestamp as string. Here the timestamps are floats. # Does this comparison operations work? t_from_string = str(t_from) + 'U' # the timestamps in the database are saved as string - query = query.filter(tm_db.DbTelemetry.timestamp >= t_from_string) # ToDo check if the change from > to >= breaks something! + query = query.filter(tm_db.DbTelemetry.timestamp >= t_from) # ToDo check if the change from > to >= breaks something! + # ToDo: Funktion Cast verwenden um String auf Double zu casten # query = query.filter(tm_db.DbTelemetry.timestamp > t_from) # <- comparison with float if t_to is not None: # ToDo database has the CUC timestamp as string. Here the timestamps are floats. # Does this comparison operations work? t_to_string = str(t_to) + 'U' # the timestamps in the database are saved as string - query = query.filter(tm_db.DbTelemetry.timestamp <= t_to_string) + query = query.filter(tm_db.DbTelemetry.timestamp <= t_to) # ToDo: Funktion Cast verwenden um String auf Double zu casten # query = query.filter(tm_db.DbTelemetry.timestamp <= end) # <- comparison with float if dest_id is not None: query = query.filter(tm_db.DbTelemetry.destID == dest_id) @@ -136,6 +139,7 @@ def highest_cuc_timestamp(tm_list): :rtype: PUS packet || None """ highest = None + timestamp = None if isinstance(tm_list, list) and len(tm_list) > 0: cuc = 0 for i in range(len(tm_list)): @@ -147,7 +151,8 @@ def highest_cuc_timestamp(tm_list): if tstamp > cuc: cuc = tstamp highest = tm_list[i] - return highest + timestamp = tstamp + return highest, timestamp def lowest_cuc_timestamp(pool_name, tm_list): @@ -622,11 +627,13 @@ def get_tm(pool_name, st=None, sst=None, apid=None, ssc=None, duration=5, t_from of decoded telemetry packets or [] """ + # set the time interval t_from, t_to = set_time_interval(pool_name=pool_name, t_from=t_from, t_to=t_to, duration=duration) # for the case that t_to is in future, wait current_cuc = cfl.get_last_pckt_time(pool_name=pool_name, string=False) + if t_to > current_cuc: difference = t_to - current_cuc time.sleep(difference) @@ -972,14 +979,15 @@ def extract_apid_from_packetid(packet_id): return apid -def get_tc_acknow(pool_name, t_tc_sent, tc_apid, tc_ssc, tm_st=1, tm_sst=None): + +def get_tc_acknow(pool_name="LIVE", tc_apid=321, tc_ssc=1, tm_st=1, tm_sst=None): """ Check if for the TC acknowledgement packets can be found in the database. This function makes a single database query. :param pool_name: str Name of the TM pool in the database :param t_tc_sent: float - CUC timestamp of the telecommand + CUC timestamp from which search for telecommand should start :param tc_apid: int or str Application process ID of the sent TC. Can be provided as integer or hexadecimal string :param tc_ssc: int @@ -993,16 +1001,128 @@ def get_tc_acknow(pool_name, t_tc_sent, tc_apid, tc_ssc, tm_st=1, tm_sst=None): [] if no acknowledgement TM packets could be found in the database """ result = None + + tc_data = get_data_of_last_tc(pool_name) + t_tc_sent = tc_data[0] + tc_iid = tc_data[1] + tc_bytes = tc_data[2] + assert isinstance(pool_name, str) assert isinstance(tc_apid, (int, str)) assert isinstance(t_tc_sent, (float, int)) + assert isinstance(tc_iid, int) + assert isinstance(tc_bytes, (int, str)) # if the tc_apid is provided as hexadecimal number, convert it to and integer tc_apid = tools.convert_apid_to_int(apid=tc_apid) # make database query packets = fetch_packets(pool_name=pool_name, st=tm_st, sst=tm_sst, t_from=t_tc_sent - 1) + # print(packets[1][0][0].CTIME + (packets[1][0][0].FTIME/1000000)) + # filter for TM packets with the correct APID and source sequence counter (SSC) in the data field + ack_tms = [] + for i in range(len(packets)): + if packets[i][1] is not None and packets[i][1][0] is not None: + # get the data entries for APID and SSC + #pac_apid = packets[i][0][3] # TODO: not compatible anymore + pac_apid = packets[i][0][0].APID + first_tm_bytes = packets[i][0][1].hex()[0:4] + + if pac_apid == 961: # for acknowledgements from SEM + name_apid = 'PAR_CMD_APID' + name_psc = 'PAR_CMD_SEQUENCE_COUNT' + else: + name_apid = 'TcPcktId' + name_psc = 'TcPcktSeqCtrl' + para = get_tm_data_entries(tm_packet=packets[i], data_entry_names=[name_apid, name_psc]) + if name_apid in para and name_psc in para: + # extract the SSC from the PSC + ssc = extract_ssc_from_psc(psc=para[name_psc]) + apid = extract_apid_from_packetid(packet_id=para[name_apid]) + if pac_apid == 961: # acknowledgement packets from SEM have the PID in the field 'PAR_CMD_APID' + tc_pid = tools.extract_pid_from_apid(tc_apid) + if apid == tc_pid and ssc == tc_ssc: + ack_tms.append(packets[i]) + else: + if apid == tc_apid and ssc == tc_ssc: + ack_tms.append(packets[i]) + else: + logger.debug('get_tc_acknow: could not read the data from the TM packet') + + # treat with the result from the database query + if len(ack_tms) > 0: + # get the ST and SST of the TC for logging purposes + tc_st, tc_sst = get_st_and_sst(pool_name=pool_name, + apid=tc_apid, + ssc=tc_ssc, + is_tm=False, + t_from=t_tc_sent) + logger.info('Received acknowledgement TM packets for TC({},{}) apid={} ssc={}:' + .format(tc_st, tc_sst, tc_apid, tc_ssc)) + + # check if there was a failure, the result becomes False if a failure occurred + for i in range(len(ack_tms)): + head = ack_tms[i][0][0] + data = ack_tms[i][1] + if result is not False: + if head.SERV_SUB_TYPE in [1, 3, 7] and (first_tm_bytes == tc_bytes): + logger.info('TM({},{}) @ {}'.format(head.SERV_TYPE, head.SERV_SUB_TYPE, cfl.get_cuctime(head))) + result = True + # print(head.SERV_TYPE, head.SERV_SUB_TYPE) + elif head.SERV_SUB_TYPE in [2, 4, 8] and (first_tm_bytes == tc_bytes): + if head.SERV_SUB_TYPE == 2: + logger.info('TM({},{}) @ {} FAILURE: Acknowledge failure of acceptance check for a command.' + .format(head.SERV_TYPE, head.SERV_SUB_TYPE, cfl.get_cuctime(head))) + logger.debug('Data of the TM packet: {}'.format(data)) + if head.SERV_SUB_TYPE == 4: + logger.info('TM({},{}) @ {} FAILURE: Acknowledge failure of start check for a command.' + .format(head.SERV_TYPE, head.SERV_SUB_TYPE, cfl.get_cuctime(head))) + logger.debug('Data of the TM packet: {}'.format(data)) + if head.SERV_SUB_TYPE == 8: + logger.info( + 'TM({},{}) @ {} FAILURE: Acknowledge failure of termination check for a command.' + .format(head.SERV_TYPE, head.SERV_SUB_TYPE, cfl.get_cuctime(head))) + logger.debug('Data of the TM packet: {}'.format(data)) + result = False + else: + logger.info('No TM found with matching pattern {}'.format(tc_bytes)) + + return result, ack_tms + + + +def get_tc_acknow_original(pool_name="LIVE", t_tc_sent=0, tc_apid=321, tc_ssc=1, tm_st=1, tm_sst=None): + """ + Check if for the TC acknowledgement packets can be found in the database. + This function makes a single database query. + :param pool_name: str + Name of the TM pool in the database + :param t_tc_sent: float + CUC timestamp from which search for telecommand should start + :param tc_apid: int or str + Application process ID of the sent TC. Can be provided as integer or hexadecimal string + :param tc_ssc: int + Source sequence counter of the sent TC + :return: (boolean, list) + boolean: + True if one or up to all acknowledgement packets TM(1,1), TM(1,3), TM(1,7) were found + False if one or all of TM(1,2), TM(1,4), TM(1,8) were found + list: + List of the acknowledgement TM packets for the TC, + [] if no acknowledgement TM packets could be found in the database + """ + result = None + assert isinstance(pool_name, str) + assert isinstance(tc_apid, (int, str)) + assert isinstance(t_tc_sent, (float, int)) + + # if the tc_apid is provided as hexadecimal number, convert it to and integer + tc_apid = tools.convert_apid_to_int(apid=tc_apid) + + # make database query + packets = fetch_packets(pool_name=pool_name, st=tm_st, sst=tm_sst, t_from=t_tc_sent - 1) + # print(packets[1][0][0].CTIME + (packets[1][0][0].FTIME/1000000)) # filter for TM packets with the correct APID and source sequence counter (SSC) in the data field ack_tms = [] for i in range(len(packets)): @@ -1050,6 +1170,7 @@ def get_tc_acknow(pool_name, t_tc_sent, tc_apid, tc_ssc, tm_st=1, tm_sst=None): if head.SERV_SUB_TYPE in [1, 3, 7]: logger.info('TM({},{}) @ {}'.format(head.SERV_TYPE, head.SERV_SUB_TYPE, cfl.get_cuctime(head))) result = True + print(head.SERV_TYPE, head.SERV_SUB_TYPE) elif head.SERV_SUB_TYPE in [2, 4, 8]: if head.SERV_SUB_TYPE == 2: logger.info('TM({},{}) @ {} FAILURE: Acknowledge failure of acceptance check for a command.' @@ -1069,7 +1190,7 @@ def get_tc_acknow(pool_name, t_tc_sent, tc_apid, tc_ssc, tm_st=1, tm_sst=None): return result, ack_tms -def await_tc_acknow(pool_name, tc_identifier, duration=10, tm_st=1, tm_sst=None): +def await_tc_acknow(tc_identifier, pool_name="LIVE", duration=10, tm_st=1, tm_sst=None): """ Waiting to receive the acknowledgement packet of a sent telecommand (TC) for a given duration. As soon as acknowledgement packets were found the function returns. @@ -1149,6 +1270,201 @@ def await_tc_acknow(pool_name, tc_identifier, duration=10, tm_st=1, tm_sst=None) return result, ack_list +def get_5_1_tc_acknow(pool_name="LIVE", tm_name="name", parameter_1=5, parameter_2=3): + """ + Check if for the TC acknowledgement packets can be found in the database. + This function makes a single database query. + :param pool_name: str + Name of the TM pool in the database + :param tm_name: string + Description of TM as found in TM Table + :param parameter_1: int + Value of the second parameter of the TM + :param parameter_2: int + Value of the third parameter of the TM + :return: (boolean) + boolean: + True if the 5_1 packet was found with the given parameters + False if no such packet was found + """ + + assert isinstance(pool_name, str) + assert isinstance(tm_name, str) + assert isinstance(parameter_1, int) + assert isinstance((parameter_2, int)) + + + time_of_packet = get_time_of_last_tc(pool_name) + packets = fetch_packets(pool_name=pool_name, is_tm=True, st=5, sst=None, apid=321, ssc=None, t_from=time_of_packet, + t_to=time_of_packet+30 , dest_id=None, not_apid=None, decode=True, silent=False) + + if tm_name == "SASW Rep1_EVT_IASW_TR" and parameter_1 == 5 and parameter_2 == 3: + assert packets[0][1][0][1][4][0] == parameter_1 + assert packets[0][1][0][2][4][0] == parameter_2 + return True + + elif tm_name == "SASW Rep1_EVT_FEE_TR": + + + if parameter_1 == 3 and parameter_2 == 2: + assert packets[1][1][0][1][4][0] == parameter_1 + assert packets[1][1][0][2][4][0] == parameter_2 + return True + elif parameter_1 == 2 and parameter_2 == 7: + assert packets[2][1][0][1][4][0] == parameter_1 + assert packets[2][1][0][2][4][0] == parameter_2 + return True + elif parameter_1 == 7 and parameter_2 == 6: + assert packets[3][1][0][1][4][0] == parameter_1 + assert packets[3][1][0][2][4][0] == parameter_2 + return True + else: + print("Packet does not exist. SASW Rep1_EVT_FEE_TR") + return False + + else: + print("Packet does not exist. ASW Rep1_EVT_IASW_TR") + return False + + + + + # return packets + + # print(packets[1][0][0].CTIME + (packets[1][0][0].FTIME/1000000)) + # filter for TM packets with the correct APID and source sequence counter (SSC) in the data field + + + + +def check_if_packet_is_received(ST=1,SST=7,pool_name="LIVE"): + """ + Checks if a certain type of packet has been received + :param ST: int + The subtype of the packet that should be checked for + :param SST: int + The subsubtype of the packet that should be checked for + :param pool_name: str + Name of the TM pool in the database + """ + + assert isinstance(pool_name, str) + assert isinstance(ST, int) + assert isinstance(SST, int) + + + raw_packet_list, data_packet_list = get_last_100_packets(pool_name=pool_name) + # print(raw_packet_list) + + + # ST 10 Stelle + # SST 11 Stelle + for i in range(len(raw_packet_list)): + packet = cfl.get_header_parameters_detailed(raw_packet_list[i]) + + # print(packet) + + current_packet_ST = packet[10][1] + current_packet_SST = packet[11][1] + + + + try: + current_timestamp = cfl.get_cuctime(raw_packet_list[i]) + except: + current_timestamp = "" + + # print(current_packet_ST, current_packet_SST, current_timestamp) + + if current_packet_ST == ST and current_packet_SST == SST: + print("Match") + print(ST, SST, current_timestamp) + return True + else: + continue + + # print(time) + + if current_packet_ST == ST and current_packet_SST == SST: + print("Found it") + + + + + +def get_sid_of_tm(packet): + """ + Gets SID from raw packet data. + :param packet: bytes + Packet whose SID should be extracted + """ + + decoded_packet = packet.hex() + header = decoded_packet[36:124] + return int(header[3]) + + + + +def get_frequency_of_hk(duration=60., frequency=20, SID=1, pool_name="LIVE"): + """ + Check the time passing between TM packets during a given duration. + This function makes a multiple database queries. + :param pool_name: str + Name of the TM pool in the database + :param duration: float + timeslot in which the incoming packages are checked for their cuc-timestamp + """ + + + assert isinstance(duration, (float, int)) + assert isinstance(frequency, int) + assert isinstance(SID, int) + + + list_of_timestamps = [] + last_telemetry_packet = cfl.get_pool_rows(pool_name)[-1].raw + timestamp = cfl.get_cuctime(last_telemetry_packet) + # last_packet_sid = get_sid_of_tm(last_telemetry_packet) + current_timestamp = 0.0 + end_time = timestamp + duration + + while current_timestamp <= end_time: + + current_telemetry_packet = cfl.get_pool_rows(pool_name)[-1].raw + current_timestamp = cfl.get_cuctime(current_telemetry_packet) + current_packet_sid = get_sid_of_tm((current_telemetry_packet)) + if current_timestamp not in list_of_timestamps and current_packet_sid == SID: + list_of_timestamps.append(current_timestamp) + else: + continue + + list_of_differences = [j-i for i, j in zip(list_of_timestamps[:-1], list_of_timestamps[1:])] + + first_element = round(list_of_differences[0]) + chk = True + + for item in list_of_differences: + if first_element != round(item): + chk = False + break + + if chk == True: + # print(round(list_of_differences[0])) + print(first_element) + if first_element == frequency: + print(True) + return True + else: + print("Didn't work") + return False + + + + + +# get_frequency_of_hk() + def check_acknowledgement(pool_name, tc_identifier, duration=10): """ Check that for a sent TC the acknowledgement packets were received (assuming the acknowledgement was enabled) @@ -1531,6 +1847,225 @@ def get_acquisition(pool_name, tm_21_3): logger.info(meta_data[i]) return result + +def get_data_of_last_tc(pool_name="LIVE"): + """ + Finds the newest Telecommand in database with the Telemetry just before. + :param pool_name: str + Name of the pool for TM/TC packets in the database + :return: str + timestamp of packet, IID of packet, First four bytes of packet data + """ + + + tm_pool = cfl.get_pool_rows(pool_name) + # last_telecommand = None + tm_before_tc = None + for i in range(-1, -100, -1): + if tm_pool[i].timestamp == "": + # last_telecommand = tm_pool[i] + tm_before_tc = tm_pool[i-1] + tc_id = tm_pool[i].iid + tc_bytes = tm_pool[i].raw.hex()[:4] + + break + timestamp = tm_before_tc.timestamp + timestamp_length = len(timestamp) + timestamp = float(timestamp[:timestamp_length -1]) + return timestamp, tc_id, tc_bytes + + +def verify_no_more_hk(wait_time=5, pool_name="LIVE"): + """ + Function waits for a spcified time and the searches the data pool for HK telemetry. + Returns True if there is no HK. + Returns False if there is HK. + :param wait_time: int + timespan in which the program checks if a hk has appeard + :param pool_name: str + Name of the pool for TM/TC packets in the database + :return: boolean + True if there is no HK, False if there is a HK + """ + assert isinstance(SID, int) + assert isinstance(pool_name, str) + + hk_signature = "0001050000000000000000000000000000000000000000000000000000000000000000000000000000000112" + time.sleep(wait_time) + # TODO incorporate function to find last 100 packets + list_of_packets = [] + for i in range (1,1000): + current_telemetry_packet = cfl.get_pool_rows(pool_name)[-i] + current_raw = current_telemetry_packet.raw + current_data = current_telemetry_packet.data + list_of_packets.append(current_data.hex()) + + try: + current_timestamp = cfl.get_cuctime(current_raw) + print(current_timestamp) + except: + break + + if hk_signature in list_of_packets: + return False + else: + return True + + +def get_tm_timestamp(packet, pool_name="LIVE"): + """ + gets timestamp from packet irregardless of data type + """ + pass + + +def get_last_100_packets(pool_name="Live"): + +# TODO Fix function to work with any number of packets + raw_packets = [] + data_packets = [] + data_pool = cfl.get_pool_rows(pool_name) + + + + for i in range(1, 101): + try: + current_packet = cfl.get_pool_rows(pool_name)[-i] + current_raw = current_packet.raw + current_data = current_packet.data + raw_packets.append(current_raw) + data_packets.append(current_data) + except IndexError: + break + + + return raw_packets, data_packets + + + + + + + +def get_dpu_mode(pool_name="LIVE"): + """ + Get the data from the last entry in the database and check its DPU Mode. + :param pool_name: str + Name of the pool for TM/TC packets in the database + :param dpu_mode: dict + DPU Mode of the packet + + """ + + data_packet = cfl.get_pool_rows(pool_name)[-1].raw + dpu_mode = get_tm_data_entries(data_packet,"DpuMode") + print(dpu_mode) + +def get_packet_length(pool_name="LIVE"): + """ + Get the data from the last entry in the database and check its length. + :param pool_name: str + Name of the pool for TM/TC packets in the database + :param data_packet: bit + data in the last entry in database + :param packet_length: int + Length of data_packet + + """ + + data_packet = cfl.get_pool_rows(pool_name)[-1].raw + packet_length = len(data_packet) + print(packet_length) + +def get_version_number(version_number="0x0112", pool_name="LIVE"): + """ + Get the version number of the newest TM packet. + :param version_number: str + version number which should be verified + :param pool_name: str + Name of the pool for TM/TC packets in the database + :param data_packet: bytes + Data in Packet + :param packet_version_number_dic: dic + dictionary containing the version number + :param packet_version_number: str + version number converted into hex + """ + + assert isinstance(version_number, str) + + data_packet = cfl.get_pool_rows(pool_name)[-1].raw + packet_version_number_dic = get_tm_data_entries(data_packet, "VersionNumber") + # print(packet_version_number) + packet_version_number = hex(packet_version_number_dic["VersionNumber"]) + if packet_version_number == version_number: + print(True) + return True + else: + print(False) + return False + + +def await_tc_identifier(pool_name, tc_apid, tc_ssc): + """ + Gather the data necessary to identify the newest telecommand and put it in tuple. + :param pool_name: str + Name of the pool for TM/TC packets in the database + :param tc_apid: int or str + APID of the TC + :param tc_ssc: int + Sequence number of TC + :param last_tm_timestamp: float + Timestamp of the latest TM in datapool + :return: tc_identifier: tuple + Tuple with data to identify TC + + """ + + assert isinstance(pool_name, str) + assert isinstance(tc_apid, int) or isinstance(tc_apid, str) + assert isinstance(tc_ssc, int) + + last_tm_timestamp = cfl.get_pool_rows(pool_name)[-1].timestamp + length_last_tm_timestamp = len(last_tm_timestamp) + last_tm_timestamp = last_tm_timestamp[:length_last_tm_timestamp - 1] + last_tm_timestamp = float(last_tm_timestamp) + + tc_identifier = (tc_apid, tc_ssc, last_tm_timestamp) + + return tc_identifier + + +def get_tc_identifier(pool_name="LIVE", tc_apid=321, tc_ssc=1, tc_time=0.): + """ + Gather the data necessary to identify the newest telecommand and put it in tuple. + :param pool_name: str + Name of the pool for TM/TC packets in the database + :param tc_apid: int or str + APID of the TC + :param tc_ssc: int + Sequence number of TC + :param tc_time: float or int + Timestamp of the latest TM in datapool + :return: tc_identifier: tuple + Tuple with data to identify TC + + """ + assert isinstance(pool_name, str) + assert isinstance(tc_apid, int) or isinstance(tc_apid, str) + assert isinstance(tc_ssc, int) + assert isinstance(tc_time, int) or isinstance(tc_time, float) + + tc_time = float(tc_time) + + tc_identifier = (tc_apid, tc_ssc, tc_time) + + return tc_identifier + + + + + if __name__ == '__main__': sys.path.append('.') r=get_tc_acknow('PLM',0.,321,5) diff --git a/Tst/testing_library/testlib/tm_test.py b/Tst/testing_library/testlib/tm_test.py new file mode 100644 index 0000000000000000000000000000000000000000..95c89e889a085466586f5812e7e1b33e7003d648 --- /dev/null +++ b/Tst/testing_library/testlib/tm_test.py @@ -0,0 +1,232 @@ +import tm +import ccs_function_lib as cfl +import confignator +import matplotlib +matplotlib.use('Gtk3Cairo') +import confignator +import sys +sys.path.append(confignator.get_option('paths', 'ccs')) + +from confignator import config +check_cfg = config.get_config(file_path=confignator.get_option('config-files', 'ccs')) + +import inspect + + + +telemetry = cfl.get_pool_rows("PLM") +telemetry_packet_1_1 = cfl.get_pool_rows("PLM")[0].iid +telemetry_packet_1 = cfl.get_pool_rows("PLM")[0].raw +telemetry_packet_2 = cfl.get_pool_rows("PLM")[1].raw + + +""" +To test PI1VALUE on 5_3 1, 3, 4 +""" + +packet_1 = telemetry[1] +packet_2 = telemetry[3] +packet_3 = telemetry[4] +packet_4 = telemetry[5] + +print(packet_1.iid, packet_1.raw.hex()) +print(packet_2.iid, packet_2.raw.hex()) +print(packet_3.iid, packet_3.raw.hex()) + +# print(tm.decode_single_tm_packet(packet_4.raw)[1]) + +decoded_packet = tm.decode_single_tm_packet(packet_4.raw) + +print(decoded_packet) + +print(decoded_packet[1]) +print(type(decoded_packet[1][1])) +print(decoded_packet[1][1]) +string_packet = str(decoded_packet[1][1]) +string_packet = string_packet[2:25] +print("string: ", string_packet) +# new_packet = decoded_packet[1][1](1-25) +# print(new_packet) +print(type(decoded_packet[1][0][0][4][0])) + +wanted_data = string_packet, decoded_packet[1][0][0][4][0] + +print(wanted_data) + + + +# dictionary_of_tms = cfl.get_tm_id() + +# tm_list = list(dictionary_of_tms.keys()) + +# print(tm_list) + +# print(telemetry_packet_1) + +# print(cfl.Tmdata(telemetry_packet_3_1)) + +# print(telemetry_packet_3_1.raw) +# print(telemetry_packet_3_1.timestamp) + +# print(tm.get_data_of_last_tc("PLM")) +# print(telemetry_packet_3_1.iid) + +# test_packet = database.tm_db.DBTelemetry() +# print(test_packet) + + +# print(telemetry_packet_3.stc) +# print(cfl.get_cuc_now()) + +list_of_tm_packets = [telemetry_packet_1, telemetry_packet_2] + +# package_id = telemetry_packet_3.iid +# print(package_id) +# package_time = cfl.get_cuctime(telemetry_packet_3) +# print(package_time) + + +# packet_list = tm.fetch_packets("PLM", is_tm=True, st=5, sst=None, apid=321, ssc=None, t_from=5224., t_to=5240., +# dest_id=None, not_apid=None, decode=True, silent=False) + + + +# print(tm.get_tc_acknow(pool_name="PLM", tc_apid=321, tc_ssc=1, tm_st=1, tm_sst=None)) + +# print(packet_list[0]) +# print(packet_list[0][0][0]) +# print(packet_list[0][1][0][0]) # EvtId +# print(packet_list[0][1][0][1]) # SrcIaswSt +# print(packet_list[0][1][0][2]) # DestIaswSt + +# print(packet_list[1]) +# print(packet_list[1][1][0][0]) +# print(packet_list[1][1][0][1]) +# print(str(packet_list[1][1][0][1][0]) + " " + str(packet_list[1][1][0][1][4][0])) +# print(packet_list[1][1][0][2]) + + + +# print(tm.get_tm_data_entries(telemetry_packet_3, "SrcIaswSt")) +# print(tm.get_tm_data_entries(packet_list[0][0][0], "SrcIaswSt")) + + +# important tm.check_if_packet_is_received(pool_name="PLM") +# tm.get_last_100_packets("PLM") + + + + +# header = cfl.Tmread(telemetry_packet_1) + + + + + + + +# print(get_time_of_last_tc(pool_name="PLM")) + + +# decode_list = tm.decode_single_tm_packet(telemetry_packet_3)[1][0] + + + + + + + + + +""" +test_1 = cfl.get_header_parameters_detailed(telemetry_packet_1) +test_2 = cfl.get_header_parameters_detailed(telemetry_packet_2) + + +print(test_1) +print(test_2) +""" + +# y = tm.get_tc_acknow(pool_name='PLM', t_tc_sent=60.0, tc_apid=321, tc_ssc=9, tm_st=3, tm_sst=1) +# print(y) +""" +x = tm.get_5_1_tc_acknow(pool_name='PLM', t_tc_sent=get_time_of_last_tc("PLM"), tc_apid=321, tc_ssc=1, tm_st=5, tm_sst=None) +print(x[0]) +print(x[0][0][0]) +print(x[0][0][1].hex()) +print(x[1][0][1].hex()) +print(x[2][0][1].hex()) +print(x[3][0][1].hex()) +""" + +# print(cfl.Tmdata(x[0][0][1])) +# read = cfl.Tmread(x[0][0][1]) +# print(read) +# for i in x: + # print(i[0][1].hex()) + # print(i) + # print(tm.get_tm_data_entries(i, "EvtId")) + # tm.decode_single_tm_packet(i) + +# get_list_from_d b = tm.fetch_packets(pool_name="PLM", is_tm=True, st=None, sst=None, apid=None, ssc=None, t_from=0., t_to=1401., +# dest_id=None, not_apid=None, decode=False, silent=False) + + +# test = tm.get_tc_acknow(pool_name="PLM", t_tc_sent=1980., tc_apid=321, tc_ssc=1, tm_st=1, tm_sst=1) +# test = tm.get_5_1_tc_acknow(pool_name='PLM', t_tc_sent=0., tc_apid=321, tc_ssc=0, tm_st=5, tm_sst=1) + +# print(test) +# print(test[0]) +# print(test[1][0][0][0].CTIME + (test[1][0][0][0].FTIME/1000000)) + + +# print(tm.get_tc_acknow.__globals__["get_tc_acknow"]) + +# print(inspect.signature(tm.get_tc_acknow)) + + + +# identified_tc = tm.get_tc_identifier(pool_name="PLM",tc_apid=321,tc_ssc=1, tc_time=100) + + + +# acknowledgement = tm.await_tc_acknow(pool_name="PLM", tc_identifier=identified_tc, duration=10, tm_st=1, tm_sst=7) + + +# print(acknowledgement[1][0][0][0].CTIME + (acknowledgement[1][0][0][0].FTIME/1000000)) + + + + + + + +""" +# fetch funktion, demonstration von .CTIME und FTIME + +test = tm.fetch_packets(pool_name="PLM", is_tm=True, st=3, sst=25, apid=321, ssc=None, t_from=0.00, t_to=21.0, + dest_id=None, not_apid=None, decode=True, silent=False) + + +# print(test) +# print(test[0][0][1]) +# print("Test: ", test[0][0][1]) +header = test[0][0][0] +# print(header.CTIME) +# print(header.FTIME) + +print(header.CTIME + (header.FTIME/1000000)) + +test_liste = [] + +for i in test: + test_liste.append(i[0][0]) + +print(len(test_liste)) +print(test_liste) + +""" + + + + diff --git a/Tst/tst/tst.cfg b/Tst/tst/tst.cfg index 3b780ad8e0b33b12adb304bc98a28888b1c39917..73eb13057cb58dbbabd506a874e35eab947fd4ef 100644 --- a/Tst/tst/tst.cfg +++ b/Tst/tst/tst.cfg @@ -14,11 +14,11 @@ output-file-path = ${paths:tst}/logs_test_runs/output_files/ [tst-preferences] show-json-view = True -main-window-height = 1076 -main-window-width = 1920 -paned-position = 953 +main-window-height = 1016 +main-window-width = 1848 +paned-position = 1019 paned-position-codeblockreuse = 520 [tst-history] -last-folder = /home/marko/space/smile/OBSW/Documentation/testspec/tst +last-folder = /home/sebastian/OBSW/Documentation/testspec/tst/derived_specs diff --git a/Tst/tst/tst.py b/Tst/tst/tst.py index 4128a5f033a22673805eefac313c71fafc633413..dd5d03f0c892f787e0e118263b4c5568919d7395 100755 --- a/Tst/tst/tst.py +++ b/Tst/tst/tst.py @@ -26,6 +26,7 @@ import toolbox import tc_management as tcm import tm_management as tmm import data_pool_tab as dpt +import verification_tab as vt import json_to_barescript import json_to_csv @@ -391,6 +392,12 @@ class TstAppWindow(Gtk.ApplicationWindow): self.label_widget_data_pool.set_text('Data Pool') self.feature_area.insert_page(child=self.data_pool_tab, tab_label=self.label_widget_data_pool, position=3) + # verification tab + self.verification_tab = vt.VerificationTable() + self.label_widget_verification = Gtk.Label() + self.label_widget_verification.set_text("Verification") + self.feature_area.insert_page(child=self.verification_tab, tab_label=self.label_widget_verification, position=4) + self.box.pack_start(self.work_desk, True, True, 0) # # panes for the step grid an so on diff --git a/Tst/tst/verification.py b/Tst/tst/verification.py new file mode 100644 index 0000000000000000000000000000000000000000..143c90850a339de196d946e015aae72804dce18f --- /dev/null +++ b/Tst/tst/verification.py @@ -0,0 +1,390 @@ +import os +import json +import struct +import threading +import subprocess +import time +import sys +import dbus +import dbus.service +from dbus.mainloop.glib import DBusGMainLoop +import DBus_Basic + +import ccs_function_lib as cfl + +from typing import NamedTuple +import confignator +import gi + +import matplotlib +matplotlib.use('Gtk3Cairo') + + +# from sqlalchemy.sql.expression import func, distinct +from sqlalchemy.orm import load_only +from database.tm_db import DbTelemetryPool, DbTelemetry, RMapTelemetry, FEEDataTelemetry, scoped_session_maker + +import importlib +from confignator import config +check_cfg = config.get_config(file_path=confignator.get_option('config-files', 'ccs')) +# import tm + + +# test = tm.get_tc_acknow(pool_name="PLM", t_tc_sent=1020., tc_apid=321, tc_ssc=1, tm_st=1, tm_sst=None) + +""" +def verification_template(verification_descr, pool_name="LIVE", ST_1=None, ST_2=None, SST_1=None, SST_2=None, time=2, + comment="", preamble="Ver.verification", add_parcfg=False): + + if comment: + commentstr = "# TC({}, {}): {} [{}]\n# {}\n".format(*cmd[3:], cmd[1], cmd[0], cmd[2]) + newline = "\n" + else: + commentstr = "" + newline = "" + + parcfg = '' + if add_parcfg: + for par in pars: + if par[2] == 'E': + if par[4] is not None: + if par[5] == 'E': + parval = '"{}"'.format(par[4]) + elif par[6] == 'H': + parval = '0x{}'.format(par[4]) + else: + parval = par[4] + else: + parval = par[4] + line = '{} = {} # {}\n'.format(par[0], parval, par[3]) + elif par[2] == 'F': + line = '# {} = {} # {} [NOT EDITABLE]\n'.format(par[0], par[4], par[3]) + else: + line = '' + parcfg += line + + parstr = ', '.join(parsinfo_to_str(pars)) + if len(parstr) > 0: + parstr = ', ' + parstr + + + exe = "{}('{}', ST_1={}, SST_1={}, ST_2={}, SST_2={}, pool_name='{}', time={})".format(preamble, verification_descr, + ST_1, SST_1, ST_2, SST_2, + pool_name, time) + return commentstr + exe + newline +""" + +# verification template nach Stefan +""" +def verification_template(cmd, pars, pool_name='LIVE', preamble='cfl.Tcsend_DB', options='', comment=True, add_parcfg=False): + + if comment: + commentstr = "# TC({}, {}): {} [{}]\n# {}\n".format(*cmd[3:], cmd[1], cmd[0], cmd[2]) + newline = "\n" + else: + commentstr = "" + newline = "" + + parcfg = '' + if add_parcfg: + for par in pars: + if par[2] == 'E': + if par[4] is not None: + if par[5] == 'E': + parval = '"{}"'.format(par[4]) + elif par[6] == 'H': + parval = '0x{}'.format(par[4]) + else: + parval = par[4] + else: + parval = par[4] + line = '{} = {} # {}\n'.format(par[0], parval, par[3]) + elif par[2] == 'F': + line = '# {} = {} # {} [NOT EDITABLE]\n'.format(par[0], par[4], par[3]) + else: + line = '' + parcfg += line + + parstr = ', '.join(parsinfo_to_str(pars)) + if len(parstr) > 0: + parstr = ', ' + parstr + + + exe = "{}('{}'{}, pool_name='{}'{})".format(preamble, cmd[1], parstr, pool_name, options) + return commentstr + parcfg + exe + newline + +""" + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +""" + +def verification_template(verification_descr, pool_name="LIVE", ST_1=None, ST_2=None, SST_1=None, SST_2=None, time=2, + comment="", preamble="Ver.verification"): + if comment: + commentstr = "# TC({}, {}): {} [{}]\n# {}\n".format(*cmd[3:], cmd[1], cmd[0], cmd[2]) + newline = "\n" + else: + commentstr = "" + newline = "" + + exe = "{}('{}', ST_1={}, SST_1={}, ST_2={}, SST_2={}, pool_name='{}', time={})".format(preamble, verification_descr, + ST_1, SST_1, ST_2, SST_2, + pool_name, time) + return commentstr + exe + newline + +""" +# def new_verification_template(verification_descr, pool_name="LIVE", comment="", preamble=): + + +# reads in verification as soon as exec button in step widget is pressed +def read_verification(verification_string): + + + first_split = verification_string.split("(") + + + second_split = "" + for i in first_split[1]: + if i != ")": + second_split += i + + + third_split = second_split.split(",") + + + st_1 = third_split[1] + sst_1 = third_split[2] + st_2 = third_split[3] + sst_2 = third_split[4] + pool_name = third_split[5] + time_duration = third_split[6] + + + st_1_value = st_1.split("=")[1] + sst_1_value = sst_1.split("=")[1] + st_2_value = st_2.split("=")[1] + sst_2_value = sst_2.split("=")[1] + pool_name_value = pool_name.split("=")[1] + time_duration_value = time_duration.split("=")[1] + + st_sst_list = [st_1_value, sst_1_value, st_2_value, sst_2_value] + + + + value_list = [] + + for element in st_sst_list: + # print(element) + if element == "None": + element = None + else: + try: + element = int(element) + except: + raise AssertionError("ST and SST values have to be None or int") + + value_list.append(element) + + + if type(pool_name_value) == str: + # print("if pool name: ", pool_name_value) + value_list.append(pool_name_value) + else: + raise AssertionError("Pool Name needs to be str") + + # time_duration_value_checker = isinstance(time_duration_value, int) + # print("Time instance: ", time_duration_value_checker) + try: + time_duration_value = int(time_duration_value) + value_list.append(time_duration_value) + except: + raise AssertionError("Time has to be int") + + Verification(value_list[0], value_list[1], value_list[2], value_list[3], value_list[4], value_list[5]) + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +""" + +def type_comparison(comparison_data, sst_1=1, sst_2=7, st_=1, st_2=1,): + pool_rows = cfl.get_pool_rows("PLM") + + st_list = [] + sst_list = [] + x = 0 + header_counter = 0 + while header_counter < 2: + x += 1 + entry = pool_rows.all()[-x] + + if entry.data.hex() == comparison_data: + + st_list.append(entry.stc) + sst_list.append(entry.sst) + + # print("ST Entry_" + str(x) + ": ", entry.stc) + # print("SST Entry_" + str(x) + ": ", entry.sst) + # print("Timestamp entry_" + str(x) + ": ", entry.timestamp) + header_counter += 1 + + + st_list_reverse = [st_list[1], st_list[0]] + sst_list_reverse = [sst_list[1], sst_list[0]] + + + if sst_list_reverse == [sst_1, sst_2]: + print("Verification successful") + else: + print("Verification unsuccessful") + + return False + + +def Verification( st_1=1, sst_1=1, st_2=1, sst_2=7, pool_name="PLM", verification_duration=2): + verification_running = True + + print("RUNNING!!") + + print("Variables: ") + print(st_1) + print(sst_1) + print(st_2) + print(sst_2) + print(pool_name) + print(verification_duration) + + + while verification_running == True: + + # while running the script checks the last three entries of the database and keeps them up to date + # to recognize a tc it checks the time + + pool_rows = cfl.get_pool_rows(pool_name) + + system_time = time.clock_gettime(0) + + entry_1_data = pool_rows.all()[-1] + # entry_2_data = pool_rows.all()[-2] + # entry_3_data = pool_rows.all()[-3] + + time_1 = entry_1_data.timestamp + + if time_1 == "": + + first_raw_digits = "" # this string will contain the first bytes of raw data + + telecommand = entry_1_data + telecommand_time = telecommand.timestamp + telecommand_raw = telecommand.raw.hex() + # telecommand_data = telecommand.data.hex() + # Variable to generate new telecommand timestamp, other than telecommand_time + telecommand_verification_timestamp = time.clock_gettime(0) + verification_time = telecommand_verification_timestamp + verification_duration + + for i in telecommand_raw: + first_raw_digits += str(i) + if len(first_raw_digits) > 7: + break + + # print("After Loop telecommand_first_digits: ", first_raw_digits) + + + while system_time < verification_time and system_time != verification_time: + system_time = time.clock_gettime(0) + + if system_time >= verification_time: + + verification_running = type_comparison(first_raw_digits, sst_1, sst_2) + + + +""" + + + diff --git a/Tst/tst/verification_tab.py b/Tst/tst/verification_tab.py new file mode 100644 index 0000000000000000000000000000000000000000..8e6b1101500e721b6bae00d05d77f8a43578cbe9 --- /dev/null +++ b/Tst/tst/verification_tab.py @@ -0,0 +1,215 @@ +# !/usr/bin/env python3 +import gi +import os + +gi.require_version("Gtk", "3.0") +gi.require_version("GtkSource", "3.0") +from gi.repository import Gtk, Gdk, GtkSource +from gi.repository.GdkPixbuf import Pixbuf +import confignator +import sys +sys.path.append(confignator.get_option('paths', 'ccs')) +sys.path.append(os.path.join(confignator.get_option("paths", "Tst"), "testing_library/testlib")) +sys.path.append('/home/sebastian/CCS/Tst/testing_library/testlib') # notwendig damit tm als Modul erkannt wird +import ccs_function_lib as cfl +import s2k_partypes as s2k + +import time +import sys +import dbus +import dbus.service +from dbus.mainloop.glib import DBusGMainLoop +import DBus_Basic + + +from typing import NamedTuple +import confignator +import gi +# print(sys.path) +import inspect + + +import tm + +Verification_1 = "tm." + tm.get_tc_acknow.__name__ + str(inspect.signature((tm.get_tc_acknow))) +Verification_2 = "tm." + tm.await_tc_acknow.__name__ + str(inspect.signature((tm.await_tc_acknow))) +Verification_3 = "tm." + tm.get_5_1_tc_acknow.__name__ + str(inspect.signature((tm.get_5_1_tc_acknow))) +tc_identifier = "identified_tc = tm." + tm.get_tc_identifier.__name__ + str(inspect.signature((tm.get_tc_identifier))) +Verification_4 = "tm." + tm.get_frequency_of_hk.__name__ + str(inspect.signature((tm.get_frequency_of_hk))) +Verification_5 = "tm." + tm.get_dpu_mode.__name__ + str(inspect.signature((tm.get_dpu_mode))) +Verification_6 = "tm." + tm.get_packet_length.__name__ + str(inspect.signature((tm.get_packet_length))) +Verification_7 = "tm." + tm.get_version_number.__name__ + str(inspect.signature((tm.get_version_number))) +Verification_8 = "tm." + tm.get_data_of_last_tc.__name__ + str(inspect.signature((tm.get_data_of_last_tc))) +Verification_9 = "tm." + tm.verify_no_more_hk.__name__ + str(inspect.signature((tm.verify_no_more_hk))) + + + + +# !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! + +# Descriptions + +descr_8 = "Get Timestamp of TM before last TC, get IID of last TC, get first 4 bytes of TC raw data." +descr_9 = "Check if there are no more HK packets" + + + +verification_list = [ + ("Get TC Verification", 1, 7, "descr", Verification_1), + ("Await TC Verification", 1, 7, "descr", tc_identifier + "\n" + Verification_2), + ("Get TC Verification", 5, 1, "descr", Verification_3), + ("Get HK frequency", None, None, "descr", Verification_4), + ("Get DPU Mode", None, None, "descr", Verification_5), + ("Get Packet Length", None, None, "descr", Verification_6), + ("Get Version Number", None, None, "descr", Verification_7), + ("Get Data of last TC", None, None, descr_8, Verification_8), + ("Test if there are no more HK packets", None, None, descr_9, Verification_9) +] + + + + + + +def translate_drag_data(data): + + + translated = "kla" + + return translated + + +class VerificationTable(Gtk.Grid): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + self.set_size_request(500,500) + + + self.grid = Gtk.Grid() + self.grid.set_column_homogeneous(True) + self.grid.set_row_homogeneous(True) + self.add(self.grid) + + # Creating ListStore model + self.verification_liststore = Gtk.ListStore(str, int, int, str, str) + for verification_ref in verification_list: + self. verification_liststore.append(list(verification_ref)) + self.current_filter_verification = None + + # Creating filter, feeding it with liststore model + self.verification_filter = self.verification_liststore.filter_new() + # setting the filter function + self.verification_filter.set_visible_func(self.verification_filter_func) + + # Creating treeview + self.treeview = Gtk.TreeView(model=self.verification_filter) + for i, column_title in enumerate( + ["Verification", "ST", "SST", "Description"] + ): + renderer = Gtk.CellRendererText() + column = Gtk.TreeViewColumn(column_title, renderer, text=i) + self.treeview.append_column(column) + + # setting up layout + self.scrollable_treelist = Gtk.ScrolledWindow() + self.scrollable_treelist.set_vexpand(True) + self.scrollable_treelist.set_hexpand(True) + self.grid.attach(self.scrollable_treelist, 0, 0, 8, 10) + self.scrollable_treelist.add(self.treeview) + + + # handle selection + self.selected_row = self.treeview.get_selection() + self.selected_row.connect("changed", self.item_selected) + + + # Set up Drag and Drop + self.treeview.drag_source_set(Gdk.ModifierType.BUTTON1_MASK, [], Gdk.DragAction.COPY) + self.treeview.drag_source_set_target_list(None) + self.treeview.drag_source_add_text_targets() + + self.treeview.connect("drag-data-get", self.on_drag_data_get) + self.treeview.connect("drag-begin", self.on_drag_begin) + + self.show_all() + + def verification_filter_func(self, model, iter, data): + if( + self.current_filter_verification is None + or self.current_filter_verification == "None" + ): + return True + else: + return model[iter][2] == self.current_filter_verification + + + + + + + # drag and drop + + def item_selected(self, selection): + model, row = selection.get_selected() + if row is not None: + + global verification_name + global ST + global SST + global descr + global name_string + + verification_name = model[row][0] + ST = model[row][1] + SST = model[row][2] + descr = model[row][3] + name_string = model[row][4] + + global selected_data_for_drag_drop + # print(verification_name) + # print(name_string) + + selected_data_for_drag_drop = name_string + # ToDo: selected_data_for_drag_drop = "result = " + name_string + # selected_data_for_drag_drop = cfl.verification_template(name_string) + # str(verification_name) + "\n ST = " + str(ST) + "\n SST = " + str(SST) + "\n Time = 2" + # selected_data_for_drag_drop = "{} ({}, {})".format((name, ST, SST)) + + else: + pass + + + + + def on_drag_data_get(self, treeview, drag_context, selection_data, info, time, *args): + treeselection = treeview.get_selection() + model, my_iter = treeselection.get_selected() + global selected_data_for_drag_drop + selection_data.set_text(selected_data_for_drag_drop, -1) + + + + + def on_drag_begin(self, *args): + pass + + + + + + + + + + + + + + + + + + + + diff --git a/Tst/tst/view.py b/Tst/tst/view.py index ddaca9f063a2f569d346f44f75d96ab269737e6f..e7955a38d5b99b59e67fa97f5399fdbf2590e065 100644 --- a/Tst/tst/view.py +++ b/Tst/tst/view.py @@ -775,7 +775,7 @@ class StepWidget(Gtk.EventBox): # self.lbl_box_verification.pack_start(self.btn_exec_verification, False, False, 0) # self.detail_box.pack_start(self.lbl_box_verification, True, True, 0) self.verification_scrolled_window = Gtk.ScrolledWindow() - # self.verification_scrolled_window.set_size_request(50, 100) + #self.verification_scrolled_window.set_size_request(50, 100) self.verification_view = GtkSource.View() self.verification_view.set_auto_indent(True) self.verification_view.set_show_line_numbers(True) diff --git a/egse.cfg b/egse.cfg index 7818b3dbe944350a1d639f5defe70603d46bbf51..e89ec8216a1f682188abff6fde363001dea875b1 100644 --- a/egse.cfg +++ b/egse.cfg @@ -11,8 +11,8 @@ start-simulator-log = ${logging:log-dir}/simulators/sim.log name = SMILE [database] -user = egse -password = xrayvision +user = sebastian +password = Ego,ich1 host = 127.0.0.1 mib-schema = mib_smile_sxi datapool-items =