From 7d3497372c5125fa59720e1479a03e0c129adb34 Mon Sep 17 00:00:00 2001 From: Sebastian <seb.miksch@aon.at> Date: Wed, 21 Sep 2022 15:10:29 +0200 Subject: [PATCH] Added verification button in view.py. Changed and added functions in tm.py --- Ccs/ccs_function_lib.py | 1 + Ccs/editor.py | 3 + .../codeblockreusefeature.cfg | 2 +- Tst/testing_library/testlib/tm.py | 113 +------ Tst/testing_library/testlib/tm_test.py | 59 +++- Tst/tst/verification.py | 304 ++++++++++++++---- Tst/tst/view.py | 41 ++- 7 files changed, 336 insertions(+), 187 deletions(-) diff --git a/Ccs/ccs_function_lib.py b/Ccs/ccs_function_lib.py index 7399942..6a48bcb 100644 --- a/Ccs/ccs_function_lib.py +++ b/Ccs/ccs_function_lib.py @@ -2845,6 +2845,7 @@ def get_data_pool_items(pcf_descr=None, src_file=None): def make_tc_template(ccf_descr, pool_name='LIVE', preamble='cfl.Tcsend_DB', options='', comment=True, add_parcfg=False): try: cmd, pars = list(get_tc_list(ccf_descr).items())[0] + # print("pars: ", pars) except IndexError: raise IndexError('"{}" not found in IDB.'.format(ccf_descr)) # print(tc_template(cmd, pars, pool_name=pool_name, preamble=preamble, options=options, comment=True)) diff --git a/Ccs/editor.py b/Ccs/editor.py index 0e45700..d3387d9 100644 --- a/Ccs/editor.py +++ b/Ccs/editor.py @@ -23,6 +23,9 @@ import DBus_Basic # import config_dialog import ccs_function_lib as cfl +sys.path.append(os.path.join(confignator.get_option("paths", "Tst"), "testing_library/testlib")) +import tm + cfg = confignator.get_config() pixmap_folder = cfg.get('ccs-paths', 'pixmap') diff --git a/Tst/codeblockreusefeature/codeblockreusefeature.cfg b/Tst/codeblockreusefeature/codeblockreusefeature.cfg index d4f5beb..10a0ed1 100644 --- a/Tst/codeblockreusefeature/codeblockreusefeature.cfg +++ b/Tst/codeblockreusefeature/codeblockreusefeature.cfg @@ -25,6 +25,6 @@ threadname = True main-window-height = 791 main-window-width = 612 paned-position-search = 406 -paned-position-command-code = 68 +paned-position-command-code = 28 paned-position-comment = 462 diff --git a/Tst/testing_library/testlib/tm.py b/Tst/testing_library/testlib/tm.py index e66c416..173a2db 100644 --- a/Tst/testing_library/testlib/tm.py +++ b/Tst/testing_library/testlib/tm.py @@ -1156,7 +1156,7 @@ def await_tc_acknow(tc_identifier, pool_name="LIVE", duration=10, tm_st=1, tm_ss return result, ack_list -def get_5_1_tc_acknow(pool_name="LIVE", t_tc_sent=0, tc_apid=321, tc_ssc=1, tm_st=5, tm_sst=None): +def get_5_1_tc_acknow(pool_name="LIVE", t_tc_sent=0.0, tc_apid=321, tc_ssc=1, tm_st=5, tm_sst=None): """ Check if for the TC acknowledgement packets can be found in the database. This function makes a single database query. @@ -1176,7 +1176,6 @@ def get_5_1_tc_acknow(pool_name="LIVE", t_tc_sent=0, tc_apid=321, tc_ssc=1, tm_s List of the acknowledgement TM packets for the TC, [] if no acknowledgement TM packets could be found in the database """ - print("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!") result = None assert isinstance(pool_name, str) assert isinstance(tc_apid, (int, str)) @@ -1184,108 +1183,24 @@ def get_5_1_tc_acknow(pool_name="LIVE", t_tc_sent=0, tc_apid=321, tc_ssc=1, tm_s # if the tc_apid is provided as hexadecimal number, convert it to and integer tc_apid = tools.convert_apid_to_int(apid=tc_apid) - + evtid_list = [] + data_list = [] # make database query packets = fetch_packets(pool_name=pool_name, st=tm_st, sst=tm_sst, t_from=t_tc_sent) + return packets + """ + for packet in packets: + evtid = get_tm_data_entries(packet, "EvtId") + evtid_list.append(evtid) + data = packet[0][1].hex() + data_list.append(data) + print(evtid_list) + print(data_list) + """ # print(packets[1][0][0].CTIME + (packets[1][0][0].FTIME/1000000)) # filter for TM packets with the correct APID and source sequence counter (SSC) in the data field - ack_tms = [] - for i in range(len(packets)): - if packets[i][1] is not None and packets[i][1][0] is not None: - # get the data entries for APID and SSC - # pac_apid = packets[i][0][3] # TODO: not compatible anymore - pac_apid = packets[i][0][0].APID - if pac_apid == 961: # for acknowledgements from SEM - name_apid = 'PAR_CMD_APID' - name_psc = 'PAR_CMD_SEQUENCE_COUNT' - else: - name_apid = 'TcPcktId' - name_psc = 'TcPcktSeqCtrl' - para = get_tm_data_entries(tm_packet=packets[i], data_entry_names=[name_apid, name_psc]) - if name_apid in para and name_psc in para: - # extract the SSC from the PSC - ssc = extract_ssc_from_psc(psc=para[name_psc]) - apid = extract_apid_from_packetid(packet_id=para[name_apid]) - if pac_apid == 961: # acknowledgement packets from SEM have the PID in the field 'PAR_CMD_APID' - tc_pid = tools.extract_pid_from_apid(tc_apid) - if apid == tc_pid and ssc == tc_ssc: - ack_tms.append(packets[i]) - else: - if apid == tc_apid and ssc == tc_ssc: - ack_tms.append(packets[i]) - else: - logger.debug('get_tc_acknow: could not read the data from the TM packet') - - # treat with the result from the database query - if len(ack_tms) > 0: - # get the ST and SST of the TC for logging purposes - tc_st, tc_sst = get_st_and_sst(pool_name=pool_name, - apid=tc_apid, - ssc=tc_ssc, - is_tm=False, - t_from=t_tc_sent) - logger.info('Received acknowledgement TM packets for TC({},{}) apid={} ssc={}:' - .format(tc_st, tc_sst, tc_apid, tc_ssc)) - - # check if there was a failure, the result becomes False if a failure occurred - for i in range(len(ack_tms)): - head = ack_tms[i][0][0] - data = ack_tms[i][1] - if result is not False: - if head.SERV_SUB_TYPE in [1, 3, 7]: - logger.info('TM({},{}) @ {}'.format(head.SERV_TYPE, head.SERV_SUB_TYPE, cfl.get_cuctime(head))) - result = True - elif head.SERV_SUB_TYPE in [2, 4, 8]: - if head.SERV_SUB_TYPE == 2: - logger.info('TM({},{}) @ {} FAILURE: Acknowledge failure of acceptance check for a command.' - .format(head.SERV_TYPE, head.SERV_SUB_TYPE, cfl.get_cuctime(head))) - logger.debug('Data of the TM packet: {}'.format(data)) - if head.SERV_SUB_TYPE == 4: - logger.info('TM({},{}) @ {} FAILURE: Acknowledge failure of start check for a command.' - .format(head.SERV_TYPE, head.SERV_SUB_TYPE, cfl.get_cuctime(head))) - logger.debug('Data of the TM packet: {}'.format(data)) - if head.SERV_SUB_TYPE == 8: - logger.info( - 'TM({},{}) @ {} FAILURE: Acknowledge failure of termination check for a command.' - .format(head.SERV_TYPE, head.SERV_SUB_TYPE, cfl.get_cuctime(head))) - logger.debug('Data of the TM packet: {}'.format(data)) - result = False - - return result, ack_tms - pass -def get_8_1_tc_acknow(pool_name="LIVE", t_tc_sent=0, tc_apid=321, tc_ssc=1, tm_st=8, tm_sst=None): """ - Check if for the TC acknowledgement packets can be found in the database. - This function makes a single database query. - :param pool_name: str - Name of the TM pool in the database - :param t_tc_sent: float - CUC timestamp from which search for telecommand should start - :param tc_apid: int or str - Application process ID of the sent TC. Can be provided as integer or hexadecimal string - :param tc_ssc: int - Source sequence counter of the sent TC - :return: (boolean, list) - boolean: - True if one or up to all acknowledgement packets TM(5,1), TM(5,3), TM(5,7) were found - False if one or all of TM(5,2), TM(5,4), TM(5,8) were found - list: - List of the acknowledgement TM packets for the TC, - [] if no acknowledgement TM packets could be found in the database - """ - result = None - assert isinstance(pool_name, str) - assert isinstance(tc_apid, (int, str)) - assert isinstance(t_tc_sent, (float, int)) - - # if the tc_apid is provided as hexadecimal number, convert it to and integer - tc_apid = tools.convert_apid_to_int(apid=tc_apid) - - # make database query - packets = fetch_packets(pool_name=pool_name, st=tm_st, sst=tm_sst, t_from=t_tc_sent) - # print(packets[1][0][0].CTIME + (packets[1][0][0].FTIME/1000000)) - # filter for TM packets with the correct APID and source sequence counter (SSC) in the data field ack_tms = [] for i in range(len(packets)): if packets[i][1] is not None and packets[i][1][0] is not None: @@ -1350,6 +1265,8 @@ def get_8_1_tc_acknow(pool_name="LIVE", t_tc_sent=0, tc_apid=321, tc_ssc=1, tm_s return result, ack_tms pass + """ + def get_sid_of_tm(packet): """ diff --git a/Tst/testing_library/testlib/tm_test.py b/Tst/testing_library/testlib/tm_test.py index 43f10ce..fd2e406 100644 --- a/Tst/testing_library/testlib/tm_test.py +++ b/Tst/testing_library/testlib/tm_test.py @@ -5,59 +5,84 @@ import matplotlib matplotlib.use('Gtk3Cairo') from confignator import config check_cfg = config.get_config(file_path=confignator.get_option('config-files', 'ccs')) +import inspect telemetry = cfl.get_pool_rows("PLM") +telemetry_packet_1_1 = cfl.get_pool_rows("PLM")[0].data telemetry_packet_1 = cfl.get_pool_rows("PLM")[0].raw telemetry_packet_2 = cfl.get_pool_rows("PLM")[1].raw +telemetry_packet_3 = cfl.get_pool_rows("PLM")[-7] + +# print(telemetry_packet_3.timestamp) +# print(telemetry_packet_3.stc) + list_of_tm_packets = [telemetry_packet_1, telemetry_packet_2] -telemetry_raw = telemetry.all()[-1].raw + +# decode_list = tm.decode_single_tm_packet(telemetry_packet_3)[1][0] + +# print(tm.get_tm_data_entries(telemetry_packet_3, "EvtId")) +# print(type(len(telemetry_packet_1))) + timestamp = cfl.get_cuctime(telemetry_packet_1) # test = tm.get_tm_data_entries(telemetry_packet_1, "EvtId") # print(test) timestamp_test = tm.highest_cuc_timestamp(list_of_tm_packets) +""" +test_1 = cfl.get_header_parameters_detailed(telemetry_packet_1) +test_2 = cfl.get_header_parameters_detailed(telemetry_packet_2) -# baba = telemetry[-1].timestamp -# print(baba) - -# print("Telemetry Packet: ", telemetry_packet_1) -# print("Telemetry Packet: ", telemetry_packet_2) - - -# print(tm.get_tm_data_entries(telemetry_packet_1, "DpuMode")) +print(test_1) +print(test_2) +""" -# get_list_from_db = tm.fetch_packets(pool_name="PLM", is_tm=True, st=None, sst=None, apid=None, ssc=None, t_from=0., t_to=1401., +# y = tm.get_tc_acknow(pool_name='PLM', t_tc_sent=60.0, tc_apid=321, tc_ssc=9, tm_st=3, tm_sst=1) +# print(y) +x = tm.get_5_1_tc_acknow(pool_name='PLM', t_tc_sent=82.0, tc_apid=321, tc_ssc=1, tm_st=5, tm_sst=None) +print(x[0][0][1]) +# read = cfl.Tmread(x[0][0][1]) +# print(read) +# for i in x: + # print(i[0][1].hex()) + # print(i) + # print(tm.get_tm_data_entries(i, "EvtId")) + # tm.decode_single_tm_packet(i) + +# get_list_from_d b = tm.fetch_packets(pool_name="PLM", is_tm=True, st=None, sst=None, apid=None, ssc=None, t_from=0., t_to=1401., # dest_id=None, not_apid=None, decode=False, silent=False) +# test = tm.get_tc_acknow(pool_name="PLM", t_tc_sent=1980., tc_apid=321, tc_ssc=1, tm_st=1, tm_sst=1) +# test = tm.get_5_1_tc_acknow(pool_name='PLM', t_tc_sent=0., tc_apid=321, tc_ssc=0, tm_st=5, tm_sst=1) - -identified_tc = tm.get_tc_identifier(pool_name="PLM",tc_apid=321,tc_ssc=1, tc_time=100) - - - +# print(test) +# print(test[0]) +# print(test[1][0][0][0].CTIME + (test[1][0][0][0].FTIME/1000000)) +# print(tm.get_tc_acknow.__globals__["get_tc_acknow"]) +# print(inspect.signature(tm.get_tc_acknow)) +# identified_tc = tm.get_tc_identifier(pool_name="PLM",tc_apid=321,tc_ssc=1, tc_time=100) +# acknowledgement = tm.await_tc_acknow(pool_name="PLM", tc_identifier=identified_tc, duration=10, tm_st=1, tm_sst=7) +# print(acknowledgement[1][0][0][0].CTIME + (acknowledgement[1][0][0][0].FTIME/1000000)) -acknowledgement = tm.await_tc_acknow(pool_name="PLM", tc_identifier=identified_tc, duration=10, tm_st=1, tm_sst=7) -print(acknowledgement[1][0][0][0].CTIME + (acknowledgement[1][0][0][0].FTIME/1000000)) diff --git a/Tst/tst/verification.py b/Tst/tst/verification.py index 802718b..143c908 100644 --- a/Tst/tst/verification.py +++ b/Tst/tst/verification.py @@ -27,105 +27,136 @@ from database.tm_db import DbTelemetryPool, DbTelemetry, RMapTelemetry, FEEDataT import importlib from confignator import config check_cfg = config.get_config(file_path=confignator.get_option('config-files', 'ccs')) +# import tm -def type_comparison(comparison_data, sst_1=1, sst_2=7, st_=1, st_2=1,): - pool_rows = cfl.get_pool_rows("PLM") +# test = tm.get_tc_acknow(pool_name="PLM", t_tc_sent=1020., tc_apid=321, tc_ssc=1, tm_st=1, tm_sst=None) - st_list = [] - sst_list = [] - x = 0 - header_counter = 0 - while header_counter < 2: - x += 1 - entry = pool_rows.all()[-x] - - if entry.data.hex() == comparison_data: +""" +def verification_template(verification_descr, pool_name="LIVE", ST_1=None, ST_2=None, SST_1=None, SST_2=None, time=2, + comment="", preamble="Ver.verification", add_parcfg=False): - st_list.append(entry.stc) - sst_list.append(entry.sst) + if comment: + commentstr = "# TC({}, {}): {} [{}]\n# {}\n".format(*cmd[3:], cmd[1], cmd[0], cmd[2]) + newline = "\n" + else: + commentstr = "" + newline = "" - # print("ST Entry_" + str(x) + ": ", entry.stc) - # print("SST Entry_" + str(x) + ": ", entry.sst) - # print("Timestamp entry_" + str(x) + ": ", entry.timestamp) - header_counter += 1 + parcfg = '' + if add_parcfg: + for par in pars: + if par[2] == 'E': + if par[4] is not None: + if par[5] == 'E': + parval = '"{}"'.format(par[4]) + elif par[6] == 'H': + parval = '0x{}'.format(par[4]) + else: + parval = par[4] + else: + parval = par[4] + line = '{} = {} # {}\n'.format(par[0], parval, par[3]) + elif par[2] == 'F': + line = '# {} = {} # {} [NOT EDITABLE]\n'.format(par[0], par[4], par[3]) + else: + line = '' + parcfg += line + + parstr = ', '.join(parsinfo_to_str(pars)) + if len(parstr) > 0: + parstr = ', ' + parstr - st_list_reverse = [st_list[1], st_list[0]] - sst_list_reverse = [sst_list[1], sst_list[0]] + exe = "{}('{}', ST_1={}, SST_1={}, ST_2={}, SST_2={}, pool_name='{}', time={})".format(preamble, verification_descr, + ST_1, SST_1, ST_2, SST_2, + pool_name, time) + return commentstr + exe + newline +""" +# verification template nach Stefan +""" +def verification_template(cmd, pars, pool_name='LIVE', preamble='cfl.Tcsend_DB', options='', comment=True, add_parcfg=False): - if sst_list_reverse == [sst_1, sst_2]: - print("Verification successful") + if comment: + commentstr = "# TC({}, {}): {} [{}]\n# {}\n".format(*cmd[3:], cmd[1], cmd[0], cmd[2]) + newline = "\n" else: - print("Verification unsuccessful") + commentstr = "" + newline = "" + + parcfg = '' + if add_parcfg: + for par in pars: + if par[2] == 'E': + if par[4] is not None: + if par[5] == 'E': + parval = '"{}"'.format(par[4]) + elif par[6] == 'H': + parval = '0x{}'.format(par[4]) + else: + parval = par[4] + else: + parval = par[4] + line = '{} = {} # {}\n'.format(par[0], parval, par[3]) + elif par[2] == 'F': + line = '# {} = {} # {} [NOT EDITABLE]\n'.format(par[0], par[4], par[3]) + else: + line = '' + parcfg += line + + parstr = ', '.join(parsinfo_to_str(pars)) + if len(parstr) > 0: + parstr = ', ' + parstr + + + exe = "{}('{}'{}, pool_name='{}'{})".format(preamble, cmd[1], parstr, pool_name, options) + return commentstr + parcfg + exe + newline + +""" + + + + + + + + + + + + - return False -def Verification( st_1=1, sst_1=1, st_2=1, sst_2=7, pool_name="PLM", verification_duration=2): - verification_running = True - print("RUNNING!!") - print("Variables: ") - print(st_1) - print(sst_1) - print(st_2) - print(sst_2) - print(pool_name) - print(verification_duration) - while verification_running == True: - # while running the script checks the last three entries of the database and keeps them up to date - # to recognize a tc it checks the time - pool_rows = cfl.get_pool_rows(pool_name) - system_time = time.clock_gettime(0) - entry_1_data = pool_rows.all()[-1] - # entry_2_data = pool_rows.all()[-2] - # entry_3_data = pool_rows.all()[-3] - time_1 = entry_1_data.timestamp - # time_2 = entry_2_data.timestamp - # time_3 = entry_3_data.timestamp - # in this script the number 1 after a variable name always refers to data from the last entry - # number 2 refers to second last entry, number 3 to third last entry and so on - # this part triggers as soon as a tc has arrived in the database - if time_1 == "": - first_raw_digits = "" # this string will contain the first bytes of raw data - telecommand = entry_1_data - telecommand_time = telecommand.timestamp - telecommand_raw = telecommand.raw.hex() - # telecommand_data = telecommand.data.hex() - # Variable to generate new telecommand timestamp, other than telecommand_time - telecommand_verification_timestamp = time.clock_gettime(0) - verification_time = telecommand_verification_timestamp + verification_duration - for i in telecommand_raw: - first_raw_digits += str(i) - if len(first_raw_digits) > 7: - break - # print("After Loop telecommand_first_digits: ", first_raw_digits) - while system_time < verification_time and system_time != verification_time: - system_time = time.clock_gettime(0) - if system_time >= verification_time: - verification_running = type_comparison(first_raw_digits, sst_1, sst_2) + + + + + + +""" def verification_template(verification_descr, pool_name="LIVE", ST_1=None, ST_2=None, SST_1=None, SST_2=None, time=2, comment="", preamble="Ver.verification"): @@ -141,7 +172,7 @@ def verification_template(verification_descr, pool_name="LIVE", ST_1=None, ST_2= pool_name, time) return commentstr + exe + newline - +""" # def new_verification_template(verification_descr, pool_name="LIVE", comment="", preamble=): @@ -216,3 +247,144 @@ def read_verification(verification_string): + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +""" + +def type_comparison(comparison_data, sst_1=1, sst_2=7, st_=1, st_2=1,): + pool_rows = cfl.get_pool_rows("PLM") + + st_list = [] + sst_list = [] + x = 0 + header_counter = 0 + while header_counter < 2: + x += 1 + entry = pool_rows.all()[-x] + + if entry.data.hex() == comparison_data: + + st_list.append(entry.stc) + sst_list.append(entry.sst) + + # print("ST Entry_" + str(x) + ": ", entry.stc) + # print("SST Entry_" + str(x) + ": ", entry.sst) + # print("Timestamp entry_" + str(x) + ": ", entry.timestamp) + header_counter += 1 + + + st_list_reverse = [st_list[1], st_list[0]] + sst_list_reverse = [sst_list[1], sst_list[0]] + + + if sst_list_reverse == [sst_1, sst_2]: + print("Verification successful") + else: + print("Verification unsuccessful") + + return False + + +def Verification( st_1=1, sst_1=1, st_2=1, sst_2=7, pool_name="PLM", verification_duration=2): + verification_running = True + + print("RUNNING!!") + + print("Variables: ") + print(st_1) + print(sst_1) + print(st_2) + print(sst_2) + print(pool_name) + print(verification_duration) + + + while verification_running == True: + + # while running the script checks the last three entries of the database and keeps them up to date + # to recognize a tc it checks the time + + pool_rows = cfl.get_pool_rows(pool_name) + + system_time = time.clock_gettime(0) + + entry_1_data = pool_rows.all()[-1] + # entry_2_data = pool_rows.all()[-2] + # entry_3_data = pool_rows.all()[-3] + + time_1 = entry_1_data.timestamp + + if time_1 == "": + + first_raw_digits = "" # this string will contain the first bytes of raw data + + telecommand = entry_1_data + telecommand_time = telecommand.timestamp + telecommand_raw = telecommand.raw.hex() + # telecommand_data = telecommand.data.hex() + # Variable to generate new telecommand timestamp, other than telecommand_time + telecommand_verification_timestamp = time.clock_gettime(0) + verification_time = telecommand_verification_timestamp + verification_duration + + for i in telecommand_raw: + first_raw_digits += str(i) + if len(first_raw_digits) > 7: + break + + # print("After Loop telecommand_first_digits: ", first_raw_digits) + + + while system_time < verification_time and system_time != verification_time: + system_time = time.clock_gettime(0) + + if system_time >= verification_time: + + verification_running = type_comparison(first_raw_digits, sst_1, sst_2) + + + +""" + + + diff --git a/Tst/tst/view.py b/Tst/tst/view.py index 4df4e38..0ea3e58 100644 --- a/Tst/tst/view.py +++ b/Tst/tst/view.py @@ -15,13 +15,15 @@ import dnd_data_parser import toolbox import cairo import sys -import verification + import confignator ccs_path = confignator.get_option('paths', 'ccs') sys.path.append(ccs_path) +sys.path.append(os.path.join(confignator.get_option("paths", "Tst"), "testing_library/testlib")) import ccs_function_lib as cfl +import tm lm = GtkSource.LanguageManager() lngg = lm.get_language('python') @@ -632,6 +634,12 @@ class StepWidget(Gtk.EventBox): self.btn_execute_step.set_tooltip_text(_('Execute step')) self.btn_execute_step.connect('clicked', self.on_execute_step) + # TODO: find better suited icon for execute verification button + self.btn_execute_verification = Gtk.ToolButton() + self.btn_execute_verification.set_icon_name("media-playback-start-symbolic-down") + self.btn_execute_verification.set_tooltip_text(_("Execute Verification")) + self.btn_execute_verification.connect("clicked", self.on_execute_verification) + self.text_label_event_box = Gtk.EventBox() self.text_label_event_box.set_visible_window(False) self.text_label_event_box.add_events(Gdk.EventMask.BUTTON_PRESS_MASK) @@ -651,6 +659,7 @@ class StepWidget(Gtk.EventBox): self.tbox.pack_end(self.btn_delete_step, False, False, 0) self.tbox.pack_end(self.is_active, False, False, 0) self.tbox.pack_end(self.btn_execute_step, False, False, 0) + self.tbox.pack_end(self.btn_execute_verification, False, False, 0) self.vbox.pack_start(self.tbox, True, True, 0) @@ -771,7 +780,7 @@ class StepWidget(Gtk.EventBox): # self.lbl_box_verification.pack_start(self.btn_exec_verification, False, False, 0) #self.detail_box.pack_start(self.lbl_box_verification, True, True, 0) self.verification_scrolled_window = Gtk.ScrolledWindow() - #self.verification_scrolled_window.set_size_request(50, 100) + self.verification_scrolled_window.set_size_request(50, 200) self.verification_view = GtkSource.View() self.verification_view.set_auto_indent(True) self.verification_view.set_show_line_numbers(True) @@ -1300,6 +1309,7 @@ class StepWidget(Gtk.EventBox): # Connect to the editor and send the commands to the terminal via D-Bus ed = cfl.dbus_connection('editor') cfl.Functions(ed, '_to_console_via_socket', commands) + #import editor #x = editor.CcsEditor() #x._to_console_via_socket(commands) @@ -1307,9 +1317,33 @@ class StepWidget(Gtk.EventBox): def on_exec_verification(self, button): # get the code of the commands out of the buffer of the widget verification = self.get_verification_from_widget() + + # Check if CCS is open + if not cfl.is_open('editor'): + print('CCS-Editor has to be started first') + logger.info('CCS-Editor has to be running if a step should be executed') + return #ack = misc.to_console_via_socket(verification) #print(ack) + def on_execute_verification(self, *args): + if not cfl.is_open("editor"): + print('CCS-Editor has to be started first') + logger.info('CCS-Editor has to be running if a step should be executed') + return + + commands = str(self.get_verification_from_widget()) + + if len(commands) == 0: + return + + # execute_function = exec(commands) + # print(execute_function) + + + ed = cfl.dbus_connection("editor") + cfl.Functions(ed, "_to_console_via_socket", commands) + def on_execute_step(self, *args): if not cfl.is_open('editor'): print('CCS-Editor has to be started first') @@ -1325,9 +1359,6 @@ class StepWidget(Gtk.EventBox): # if len(commands2) == 0: # return - # print("Commands: ", commands) - # print("Verification: ", commands2) - # print(type(commands2)) # verification.read_verification(commands2) -- GitLab