From be54b76421c9e3078763484e2641adea5dc83a66 Mon Sep 17 00:00:00 2001 From: Marko Mecina <marko.mecina@univie.ac.at> Date: Wed, 26 May 2021 16:37:57 +0200 Subject: [PATCH] remove/replace dependencies on instances of packets.CCS class --- Tst/testing_library/testlib/idb.py | 27 +- Tst/testing_library/testlib/precond.py | 154 ++++----- Tst/testing_library/testlib/report.py | 24 +- Tst/testing_library/testlib/sim.py | 3 + Tst/testing_library/testlib/tc.py | 45 ++- Tst/testing_library/testlib/testing_logger.py | 8 +- Tst/testing_library/testlib/tm.py | 314 ++++++++---------- Tst/testing_library/testlib/tools.py | 103 +++--- 8 files changed, 314 insertions(+), 364 deletions(-) diff --git a/Tst/testing_library/testlib/idb.py b/Tst/testing_library/testlib/idb.py index 59438da..658e4b6 100644 --- a/Tst/testing_library/testlib/idb.py +++ b/Tst/testing_library/testlib/idb.py @@ -4,6 +4,11 @@ Functions to use the instrument database ======================================== """ import logging +import sys + +import confignator +sys.path.append(confignator.get_option('paths', 'ccs')) +import ccs_function_lib as cfl # create a logger logger = logging.getLogger(__name__) @@ -11,13 +16,11 @@ logger = logging.getLogger(__name__) # Return the sid of a housekeeping as integer. Sid names of housekeepings are translated into a interger via # instrument data base (IDB) -# @param ccs: instance of the CCScom class # @param sid: sid of housekeepings <str> # @return: sid <int> -def convert_hk_sid(ccs, sid): +def convert_hk_sid(sid): """ Convert the SID of housekeeping reports in both ways: int to str and str to int - :param ccs: packets.CCScom: instance of the class CCScom :param sid: int or str: SID of a housekeeping as string or as integer :return: str or int: SID of the housekeeping as string or integer @@ -25,13 +28,13 @@ def convert_hk_sid(ccs, sid): assert isinstance(sid, int) or isinstance(sid, str), logger.error('convert_hk_sid: argument sid has to be a integer or string') result = None if isinstance(sid, str): - query = ccs.dbcon.execute('SELECT txp_from FROM txp WHERE txp_altxt="{}"'.format(sid)) + query = cfl.scoped_session_idb().execute('SELECT txp_from FROM txp WHERE txp_altxt="{}"'.format(sid)) fetch = query.fetchall() if len(fetch) != 0: result = int(fetch[0][0]) if isinstance(sid, int): # ToDo: replace hardcoded DPKT7030 - query = ccs.dbcon.execute('SELECT txp_altxt FROM txp WHERE txp_numbr="DPKT7030" AND txp_from="{}"'.format(sid)) + query = cfl.scoped_session_idb().execute('SELECT txp_altxt FROM txp WHERE txp_numbr="DPKT7030" AND txp_from="{}"'.format(sid)) fetch = query.fetchall() if len(fetch) != 0: result = str(fetch[0][0]) @@ -71,22 +74,20 @@ class DataPoolParameter: } self.possible_values.append(entry) - def assign_value(self, ccs, value): - self.value = ccs.get_calibrated(pcf_name=self.name, rawval=value) + def assign_value(self, value): + self.value = cfl.get_calibrated(pcf_name=self.name, rawval=value) def log_par(self): logger.info('name = {}; width = {}'.format(self.name, self.width)) -def get_info_of_data_pool_parameter(ccs, name): +def get_info_of_data_pool_parameter(name): """ from testlib import idb - x = idb.get_info_of_data_pool_parameter(ccs=ccs, name='sdu2State') + x = idb.get_info_of_data_pool_parameter(name='sdu2State') Fetching all information from the instrument database about data pool parameter names. Knowing only the name of the parameter, all other information should be collected by database queries. - :param ccs: packets.CCScom - Instance of the class packets.CCScom :param name: str Name of the parameter. :return: idb.data_pool_parameter @@ -96,7 +97,7 @@ def get_info_of_data_pool_parameter(ccs, name): # get information from pcf query = 'SELECT * from pcf where pcf_descr="{}"'.format(name) - dbres = ccs.dbcon.execute(query) + dbres = cfl.scoped_session_idb().execute(query) result = dbres.fetchall() if len(result) == 1: @@ -110,7 +111,7 @@ def get_info_of_data_pool_parameter(ccs, name): if txp_number is not None: # get the possible values query = 'SELECT * from txp where txp_numbr="{}"'.format(txp_number) - dbres = ccs.dbcon.execute(query) + dbres = cfl.scoped_session_idb().execute(query) values = dbres.fetchall() if len(values) > 0: for val in values: diff --git a/Tst/testing_library/testlib/precond.py b/Tst/testing_library/testlib/precond.py index 6df2ad2..597f8e0 100644 --- a/Tst/testing_library/testlib/precond.py +++ b/Tst/testing_library/testlib/precond.py @@ -10,15 +10,17 @@ from . import sim from . import tm from . import tools +import confignator +sys.path.append(confignator.get_option('paths', 'ccs')) +import ccs_function_lib as cfl + # create logger logger = logging.getLogger(__name__) -def get_states(ccs, pool_name, silent=False): +def get_states(pool_name, silent=False): """ Returns the states of IASW, SEM and SEM_Operational state machine - :param ccs: packets.CCScom - Instance of the class packets.CCScom :param pool_name: str Name of the pool for TM/TCs in the database :return: str, str, str @@ -33,7 +35,7 @@ def get_states(ccs, pool_name, silent=False): if state_names is not None: # fetch the house keeping report entries - states = tm.get_hk_entry(ccs=ccs, pool_name=pool_name, hk_name=hk_name, name=state_names, silent=silent) + states = tm.get_hk_entry(pool_name=pool_name, hk_name=hk_name, name=state_names, silent=silent) # check if the states could be found if states is not None and 'iaswState' in states[0]: @@ -48,17 +50,16 @@ def get_states(ccs, pool_name, silent=False): return state_iasw, state_sem, state_sem_oper -def iasw_standby(ccs, pool_name, silent=False): +def iasw_standby(pool_name, silent=False): """ precondition: 'Nominal operation in STANDBY mode' - :param ccs: instance of packets.CCScom :param pool_name: <str>: name of the pool :return: <boolean>: True if the precondition are fulfilled """ success = False # get states - iasw, state_sem, state_sem_oper = get_states(ccs=ccs, pool_name=pool_name, silent=silent) + iasw, state_sem, state_sem_oper = get_states(pool_name=pool_name, silent=silent) # if the IASW is a other mode: shut down the SEM (if running) and command IASW into STANDBY if iasw == 'SEM_OFFLINE' or iasw == 'PRE_SCIENCE' or iasw == 'SCIENCE': @@ -66,12 +67,12 @@ def iasw_standby(ccs, pool_name, silent=False): sem_runs = sim.sem_runs() if sem_runs: # command SEM to shut of - switch_off_sem(ccs=ccs, pool_name=pool_name) + switch_off_sem(pool_name=pool_name) # command IASW into Standby - ccs.TcStopSem() + cfl.TcStopSem() #TODO: project specific command -- has to be done another way logger.info('command IASW into STANDBY') # get states again - iasw, state_sem, state_sem_oper = get_states(ccs=ccs, pool_name=pool_name) + iasw, state_sem, state_sem_oper = get_states(pool_name=pool_name) # check if the conditions are fulfilled if iasw == 'STANDBY': @@ -80,12 +81,10 @@ def iasw_standby(ccs, pool_name, silent=False): return success -def sem_safe_mode(ccs, pool_name): +def sem_safe_mode(pool_name): """ Verify that the IASW is in SAFE. If IASW is in a other mode, command it to SAFE. - :param ccs: packets.CCScom - instance of packets.CCScom :param pool_name: str Name of the pool for telemetry and telecommand packets in the database :return: boolean @@ -95,7 +94,7 @@ def sem_safe_mode(ccs, pool_name): crsem = None # check the current states - iasw, state_sem, state_sem_oper = get_states(ccs=ccs, pool_name=pool_name) + iasw, state_sem, state_sem_oper = get_states(pool_name=pool_name) # if SEM is not in mode SAFE, do the steps to bring it into it if state_sem != 'SAFE': @@ -103,46 +102,44 @@ def sem_safe_mode(ccs, pool_name): sem_runs = sim.sem_runs() if not sem_runs: # send TC to switch on the SEM - tc_on = ccs.Tcsend_DB('DPU_IFSW_START_OFFLINE_O', ack='0b1011', pool_name=pool_name) - t_tc_on = tm.time_tc_accepted(ccs=ccs, pool_name=pool_name, tc_identifier=tc_on) + tc_on = cfl.Tcsend_DB('DPU_IFSW_START_OFFLINE_O', ack='0b1011', pool_name=pool_name) + t_tc_on = tm.time_tc_accepted(pool_name=pool_name, tc_identifier=tc_on) # switch on the SEM simulator crsem = sim.start_sem_w_fits() # wait for the event when the SEM to enter INIT trans_1 = {'SrcSemSt': 'OFF', 'DestSemSt': 'INIT'} - evt_init = tm.await_event(ccs=ccs, pool_name=pool_name, severity=1, event_id='EVT_SEM_TR', entries=trans_1, + evt_init = tm.await_event(pool_name=pool_name, severity=1, event_id='EVT_SEM_TR', entries=trans_1, t_from=t_tc_on, duration=20) # wait for the event when the SEM enters OPER trans_2 = {'SrcSemSt': 'INIT', 'DestSemSt': 'OPER'} - evt_oper = tm.await_event(ccs=ccs, pool_name=pool_name, severity=1, event_id='EVT_SEM_TR', entries=trans_2, + evt_oper = tm.await_event(pool_name=pool_name, severity=1, event_id='EVT_SEM_TR', entries=trans_2, t_from=t_tc_on, duration=20) # check if IASW is in OPER and go into SAFE - state_sem = tm.get_hk_entry(ccs=ccs, pool_name=pool_name, hk_name='IFSW_HK', name='semState', silent=True) + state_sem = tm.get_hk_entry(pool_name=pool_name, hk_name='IFSW_HK', name='semState', silent=True) if tools.entry_is_equal(entry=state_sem, key_value={'semState': 'OPER'}): # send TC(192,10) command to bring SEM into SAFE - tc_safe = ccs.Tcsend_DB('DPU_IFSW_GO_SAFE', ack='0b1011', pool_name=pool_name) - t_tc_save = tm.time_tc_accepted(ccs=ccs, pool_name=pool_name, tc_identifier=tc_safe) + tc_safe = cfl.Tcsend_DB('DPU_IFSW_GO_SAFE', ack='0b1011', pool_name=pool_name) + t_tc_save = tm.time_tc_accepted(pool_name=pool_name, tc_identifier=tc_safe) # wait for the event when the SEM to enter SAFE trans_3 = {'SrcSemSt': 'OPER', 'DestSemSt': 'SAFE'} - evt_init = tm.await_event(ccs=ccs, pool_name=pool_name, severity=1, event_id='EVT_SEM_TR', entries=trans_3, + evt_init = tm.await_event(pool_name=pool_name, severity=1, event_id='EVT_SEM_TR', entries=trans_3, t_from=t_tc_save, duration=20) # verify that the SEM is now in SAFE expected = {'semState': 'SAFE'} - state_sem = tm.get_hk_entry(ccs=ccs, pool_name=pool_name, hk_name='IFSW_HK', name='semState', silent=True) + state_sem = tm.get_hk_entry(pool_name=pool_name, hk_name='IFSW_HK', name='semState', silent=True) if tools.entry_is_equal(entry=state_sem, key_value=expected): # log the current states - iasw, state_sem, state_sem_oper = get_states(ccs=ccs, pool_name=pool_name) + iasw, state_sem, state_sem_oper = get_states(pool_name=pool_name) success = True return success -def any_iasw_state(ccs, pool_name): +def any_iasw_state(pool_name): """ Precondition: 'Nominal operation in any IASW mode'. - :param ccs: packets.CCScom - Instance of packets.CCScom :param pool_name: str Name of the pool for TM/TCs in the database :return: bool @@ -151,7 +148,7 @@ def any_iasw_state(ccs, pool_name): success = False # get the current states - iasw, state_sem, state_sem_oper = get_states(ccs=ccs, pool_name=pool_name) + iasw, state_sem, state_sem_oper = get_states(pool_name=pool_name) # check if preconditions are fulfilled if iasw == 'SEM_OFFLINE' or iasw == 'STANDBY' or iasw == 'PRE_SCIENCE' or iasw == 'SCIENCE': @@ -160,14 +157,12 @@ def any_iasw_state(ccs, pool_name): return success -def switch_off_sem(ccs, pool_name): +def switch_off_sem(pool_name): """ Send the TC() to switch off the SEM. Wait for the transition events EVT_SEM_TR: * 'SrcSemSt': 'OPER', 'DestSemSt': 'SHUTDOWN' * 'SrcSemSt': 'SHUTDOWN', 'DestSemSt': 'OFF' - :param ccs: packets.CCScom - Instance of the class packets.CCScom :param pool_name: str Name of the pool for TC/TMs in the database :return: bool @@ -179,35 +174,35 @@ def switch_off_sem(ccs, pool_name): sem_state = None # switch off the SEM - sem_off = ccs.Tcsend_DB('DPU_IFSW_SWCH_OFF_SEM', ack='0b1011', pool_name=pool_name) + sem_off = cfl.Tcsend_DB('DPU_IFSW_SWCH_OFF_SEM', ack='0b1011', pool_name=pool_name) logger.info('Terminating SEM simulator process') sim.stop_sem(None) - tm.check_acknowledgement(ccs=ccs, pool_name=pool_name, tc_identifier=sem_off) + tm.check_acknowledgement(pool_name=pool_name, tc_identifier=sem_off) # check if a TM(1,7) was received for this TC - suc, acknow = tm.await_tc_acknow(ccs=ccs, pool_name=pool_name, tc_identifier=sem_off, tm_st=1, tm_sst=7) + suc, acknow = tm.await_tc_acknow(pool_name=pool_name, tc_identifier=sem_off, tm_st=1, tm_sst=7) if len(acknow) > 0: logger.info('Command was accepted, started, terminated. Received TM(1,7)') - t_sem_off = tm.time_tc_accepted(ccs=ccs, pool_name=pool_name, tc_identifier=sem_off) + t_sem_off = tm.time_tc_accepted(pool_name=pool_name, tc_identifier=sem_off) # get event EVT_SEM_TR with transistion_1 = {'SrcSemSt': 'OPER', 'DestSemSt': 'SHUTDOWN'} - event_shutdown = tm.await_event(ccs=ccs, severity=ccs.EVENT_SEVERITY_NORMAL, event_id='EVT_SEM_TR', + event_shutdown = tm.await_event(severity=cfl.EVENT_SEVERITY_NORMAL, event_id='EVT_SEM_TR',# TODO: EVENT_SEVERITY has to be defined elsewhere pool_name=pool_name, t_from=t_sem_off, entries=transistion_1) # get event EVT_SEM_TR with transistion_2 = {'SrcSemSt': 'SHUTDOWN', 'DestSemSt': 'OFF'} - event_off = tm.await_event(ccs=ccs, severity=ccs.EVENT_SEVERITY_NORMAL, event_id='EVT_SEM_TR', + event_off = tm.await_event(severity=cfl.EVENT_SEVERITY_NORMAL, event_id='EVT_SEM_TR',# TODO: EVENT_SEVERITY has to be defined elsewhere pool_name=pool_name, t_from=t_sem_off, entries=transistion_2) if len(event_shutdown) > 0 and len(event_off) > 0: - t_event_1 = ccs.get_cuctime(event_shutdown) - t_event_2 = ccs.get_cuctime(event_off) - time_diff = t_event_2 - ccs.get_cuctime(t_sem_off) + t_event_1 = cfl.get_cuctime(event_shutdown) + t_event_2 = cfl.get_cuctime(event_off) + time_diff = t_event_2 - cfl.get_cuctime(t_sem_off) logger.info('Time difference between TC acknowledgement and event EVT_SEM_TR (OFF): {}s'.format(time_diff)) # verify that the SEM state machine is in state OFF - sem_state_machine = tm.get_hk_entry(ccs=ccs, pool_name=pool_name, hk_name='IFSW_HK', name='semState', + sem_state_machine = tm.get_hk_entry(pool_name=pool_name, hk_name='IFSW_HK', name='semState', t_from=t_event_2) if sem_state_machine is not None and 'semState' in sem_state_machine[0]: if sem_state_machine[0]['semState'] == 'OFF': @@ -221,7 +216,7 @@ def switch_off_sem(ccs, pool_name): return result -def iasw_semoffline_semoper_standby(ccs, pool_name): +def iasw_semoffline_semoper_standby(pool_name): """ Establish the precondition: Nominal operation with * IASW state machine in 'SEM_OFFLINE' @@ -238,8 +233,6 @@ def iasw_semoffline_semoper_standby(ccs, pool_name): Waiting for the SEM Operational event when entering 'STANDBY'. * Check the states again to verify if the preconditions are satisfied. - :param ccs: instance of packets.CCScom - Instance of packets.CCScom :param pool_name: str Name of the pool in the database where TC/TMs are stored :return: bool, subprocess.Popen @@ -248,7 +241,7 @@ def iasw_semoffline_semoper_standby(ccs, pool_name): success = False # get the states - state_iasw, state_sem, state_sem_oper = get_states(ccs=ccs, pool_name=pool_name) + state_iasw, state_sem, state_sem_oper = get_states(pool_name=pool_name) # check if the preconditions are already fulfilled if state_iasw == 'SEM_OFFLINE' and state_sem == 'OPER' and state_sem_oper == 'STANDBY': @@ -259,11 +252,11 @@ def iasw_semoffline_semoper_standby(ccs, pool_name): if state_iasw != 'STANDBY': # command IASW into Standby logger.info('Command IASW into STANDBY') - tc_stop = ccs.TcStopSem() - t_tc_stop = tm.time_tc_accepted(ccs=ccs, pool_name=pool_name, tc_identifier=tc_stop) + tc_stop = cfl.TcStopSem() # TODO: too project specific -- replace + t_tc_stop = tm.time_tc_accepted(pool_name=pool_name, tc_identifier=tc_stop) # wait for the event when the IASW enters STANDBY trans = {'DestIaswSt': 'STANDBY'} - evt_stop = tm.await_event(ccs=ccs, pool_name=pool_name, severity=1, event_id='EVT_IASW_TR', + evt_stop = tm.await_event(pool_name=pool_name, severity=1, event_id='EVT_IASW_TR', entries=trans, t_from=t_tc_stop, duration=10) # check if the CrSem process is already running @@ -275,36 +268,36 @@ def iasw_semoffline_semoper_standby(ccs, pool_name): logger.info('CrSem is already running') # check if the state of the SEM is OFF - sem_state = tm.get_hk_entry(ccs=ccs, pool_name=pool_name, hk_name='IFSW_HK', name='semState', silent=True) + sem_state = tm.get_hk_entry(pool_name=pool_name, hk_name='IFSW_HK', name='semState', silent=True) expect = {'semState': 'OFF'} if tools.entry_is_equal(entry=sem_state, key_value=expect): # command IASW into SEM_OFFLINE if it is in STANDBY expected = {'iaswState': 'STANDBY'} - iasw_state = tm.get_hk_entry(ccs=ccs, pool_name=pool_name, hk_name='IFSW_HK', name=expected, silent=True) + iasw_state = tm.get_hk_entry(pool_name=pool_name, hk_name='IFSW_HK', name=expected, silent=True) if tools.entry_is_equal(entry=iasw_state, key_value=expected): logger.info('Command IASW into SEM_OFFLINE') - tc_on = ccs.Tcsend_DB('DPU_IFSW_START_OFFLINE_O', ack='0b1011', pool_name=pool_name) + tc_on = cfl.Tcsend_DB('DPU_IFSW_START_OFFLINE_O', ack='0b1011', pool_name=pool_name) # check that the command was successful accepted, started, terminated - tm.check_acknowledgement(ccs=ccs, pool_name=pool_name, tc_identifier=tc_on) - t_tc_on = tm.time_tc_accepted(ccs=ccs, pool_name=pool_name, tc_identifier=tc_on) + tm.check_acknowledgement(pool_name=pool_name, tc_identifier=tc_on) + t_tc_on = tm.time_tc_accepted(pool_name=pool_name, tc_identifier=tc_on) # wait for the events of the SEM start up events if t_tc_on is not None: logger.info('Waiting for the events of the SEM to enter INIT -> OPER and for SemOp to enter STANDBY') # wait for the event when the SEM to enter INIT trans_1 = {'SrcSemSt': 'OFF', 'DestSemSt': 'INIT'} - evt_init = tm.await_event(ccs=ccs, pool_name=pool_name, severity=1, event_id='EVT_SEM_TR', + evt_init = tm.await_event(pool_name=pool_name, severity=1, event_id='EVT_SEM_TR', entries=trans_1, t_from=t_tc_on, duration=2) # wait for the event when the SEM enters OPER trans_2 = {'SrcSemSt': 'INIT', 'DestSemSt': 'OPER'} - evt_oper = tm.await_event(ccs=ccs, pool_name=pool_name, severity=1, event_id='EVT_SEM_TR', + evt_oper = tm.await_event(pool_name=pool_name, severity=1, event_id='EVT_SEM_TR', entries=trans_2, t_from=t_tc_on, duration=60) # wait for the event when the SEM operational enters STANDBY trans_3 = {'DestSemOpSt': 'STANDBY'} - evt_stan = tm.await_event(ccs=ccs, pool_name=pool_name, severity=1, event_id='EVT_SEMOP_TR', + evt_stan = tm.await_event(pool_name=pool_name, severity=1, event_id='EVT_SEMOP_TR', entries=trans_3, t_from=t_tc_on, duration=2) # get the states again - state_iasw, state_sem, state_sem_oper = get_states(ccs=ccs, pool_name=pool_name) + state_iasw, state_sem, state_sem_oper = get_states(pool_name=pool_name) # check if preconditions are fulfilled if state_iasw == 'SEM_OFFLINE' and state_sem == 'OPER' and state_sem_oper == 'STANDBY': success = True @@ -312,11 +305,9 @@ def iasw_semoffline_semoper_standby(ccs, pool_name): return success -def sem_oper_go_stabilize(ccs, pool_name, await_event=True): +def sem_oper_go_stabilize(pool_name, await_event=True): """ bring the SEM Operational State Machine into STABILIZE - :param ccs: packets.CCScom - Instance of the class packets.CCScom :param pool_name: str Name of the pool for TC/TM in the database :param await_event: bool @@ -328,16 +319,15 @@ def sem_oper_go_stabilize(ccs, pool_name, await_event=True): success = False logger.info('bring SEM Operational State Machine into STABILIZE') # send TC(192,3) to order the SEM Operational State Machine into STABILIZE - tc = ccs.Tcsend_DB('DPU_IFSW_GO_STAB', ack='0b1011', pool_name=pool_name) - t_tc = tm.time_tc_accepted(ccs=ccs, pool_name=pool_name, tc_identifier=tc) + tc = cfl.Tcsend_DB('DPU_IFSW_GO_STAB', ack='0b1011', pool_name=pool_name) + t_tc = tm.time_tc_accepted(pool_name=pool_name, tc_identifier=tc) # check if the command was successful by looking for acknowledgement packets - ack = tm.check_acknowledgement(ccs=ccs, pool_name=pool_name, tc_identifier=tc) + ack = tm.check_acknowledgement(pool_name=pool_name, tc_identifier=tc) # verify the transition of the SEM Operational State Machine into STABILIZE if await_event is True: par = {'SrcSemOpSt': 'TR_STABILIZE', 'DestSemOpSt': 'STABILIZE'} - event = tm.await_event(ccs=ccs, - pool_name=pool_name, + event = tm.await_event(pool_name=pool_name, severity=1, event_id='EVT_SEMOP_TR', entries=par, @@ -348,7 +338,7 @@ def sem_oper_go_stabilize(ccs, pool_name, await_event=True): return success -def pre_science_stabilize(ccs, pool_name): +def pre_science_stabilize(pool_name): """ This function sends the command to enter PRE_SCIENCE. The SEM event forwarding is enabled and at the end of this function disabled again. @@ -360,8 +350,6 @@ def pre_science_stabilize(ccs, pool_name): Then the command to go into PRE_SCIENCE is sent. The events of entering PRE_SCIENCE and STABILIZE are awaited. If they are received the function returns True. - :param ccs: packets.CCScom - Instance of the class packets.CCScom :param pool_name: str Name of the data pool for TM/TCs in the database :return: bool @@ -376,18 +364,18 @@ def pre_science_stabilize(ccs, pool_name): # enable SEM event forwarding logger.info('pre_science_stabilize: enable the SEM event forwarding') - tc_for = ccs.Tcsend_DB('DPU_IFSW_UPDT_PAR_BOOL', 4, + tc_for = cfl.Tcsend_DB('DPU_IFSW_UPDT_PAR_BOOL', 4, 'SEM_SERV5_1_FORWARD', 'TYPE_BOOL', 0, 1, 'SEM_SERV5_2_FORWARD', 'TYPE_BOOL', 0, 1, 'SEM_SERV5_3_FORWARD', 'TYPE_BOOL', 0, 1, 'SEM_SERV5_4_FORWARD', 'TYPE_BOOL', 0, 1, ack='0b1011') - tm.check_acknowledgement(ccs=ccs, pool_name=pool_name, tc_identifier=tc_for) + tm.check_acknowledgement(pool_name=pool_name, tc_identifier=tc_for) if not sim.sem_runs(): # start the CrSem (with fits or with the data simulator) logger.info('pre_science_stabilize: starting CrSem ...') - t_sem_start = ccs.get_last_pckt_time(pool_name=pool_name, string=False) + t_sem_start = cfl.get_last_pckt_time(pool_name=pool_name, string=False) sim.start_sem_w_fits() # wait for the CrSem to boot and load the configuration @@ -395,9 +383,9 @@ def pre_science_stabilize(ccs, pool_name): sem_event_2 = 'EVT_PRG_CFG_LD' logger.info('pre_science_stabilize: waiting for the CrSem to boot and load the configuration (events {} and {})' .format(sem_event_1, sem_event_2)) - event_1 = tm.await_event(ccs=ccs, severity=ccs.EVENT_SEVERITY_NORMAL, event_id=sem_event_1, + event_1 = tm.await_event(severity=cfl.EVENT_SEVERITY_NORMAL, event_id=sem_event_1, #TODO: EVENT_SEVERITY not in cfl anymore pool_name=pool_name, duration=wait, t_from=t_sem_start) - event_2 = tm.await_event(ccs=ccs, severity=ccs.EVENT_SEVERITY_NORMAL, event_id=sem_event_2, + event_2 = tm.await_event(severity=cfl.EVENT_SEVERITY_NORMAL, event_id=sem_event_2, #TODO: EVENT_SEVERITY not in cfl anymore pool_name=pool_name, duration=wait, t_from=t_sem_start) # check if the IASW, SEM and SEM operational are in the correct states to command them into PRE_SCIENCE @@ -407,25 +395,25 @@ def pre_science_stabilize(ccs, pool_name): 'semOperState': 'STOPPED' } logger.info('pre_science_stabilize: current states are') - entries = tm.get_hk_entry(ccs=ccs, pool_name=pool_name, hk_name='IFSW_HK', name=expected_states, silent=True) + entries = tm.get_hk_entry(pool_name=pool_name, hk_name='IFSW_HK', name=expected_states, silent=True) ready = tools.entry_is_equal(entry=entries, key_value=expected_states) # if the CrSem is ready, send the TC to prepare for science if ready: # a) command the IASW into PRE_SCIENCE and the SEM into STABILIZE with TC(193,1) logger.info('pre_science_stabilize: command IASW into PRE_SCIENCE...') - prepare_tc = ccs.Tcsend_DB('DPU_IFSW_PREPARE_SCI', ack='0b1011', pool_name=pool_name) - tm.check_acknowledgement(ccs=ccs, pool_name=pool_name, tc_identifier=prepare_tc) - t_tc_presci = tm.time_tc_accepted(ccs=ccs, pool_name=pool_name, tc_identifier=prepare_tc) + prepare_tc = cfl.Tcsend_DB('DPU_IFSW_PREPARE_SCI', ack='0b1011', pool_name=pool_name) + tm.check_acknowledgement(pool_name=pool_name, tc_identifier=prepare_tc) + t_tc_presci = tm.time_tc_accepted(pool_name=pool_name, tc_identifier=prepare_tc) # b) wait for the event: IASW is in state PRE_SCIENCE req_event_i = 'EVT_IASW_TR' req_state_i = {'DestIaswSt': 'PRE_SCIENCE'} - event_iasw = tm.await_event(ccs=ccs, severity=ccs.EVENT_SEVERITY_NORMAL, event_id=req_event_i, + event_iasw = tm.await_event(severity=cfl.EVENT_SEVERITY_NORMAL, event_id=req_event_i, #TODO: EVENT_SEVERITY not in cfl anymore pool_name=pool_name, duration=wait, t_from=t_tc_presci, entries=req_state_i) # log the event TM packet if len(event_iasw) > 0: - report.print_event_data_tuple(ccs=ccs, tm_packets=event_iasw) + report.print_event_data_tuple(tm_packets=event_iasw) else: logger.warning('pre_science_stabilize: waited for IASW to go into the state PRE_SCIENCE for {}s.' 'No event report was received.'.format(wait)) @@ -433,11 +421,11 @@ def pre_science_stabilize(ccs, pool_name): # c) wait for the event: SEM is in state STABILIZE req_event_s = 'EVT_SEMOP_TR' req_state_s = {'DestSemOpSt': 'STABILIZE'} - event_sem_op = tm.await_event(ccs=ccs, severity=ccs.EVENT_SEVERITY_NORMAL, event_id=req_event_s, + event_sem_op = tm.await_event(severity=cfl.EVENT_SEVERITY_NORMAL, event_id=req_event_s, #TODO: EVENT_SEVERITY not in cfl anymore pool_name=pool_name, duration=wait, t_from=t_tc_presci, entries=req_state_s) # log the event TM packet if len(event_sem_op) > 0: - report.print_event_data_tuple(ccs=ccs, tm_packets=event_sem_op) + report.print_event_data_tuple(tm_packets=event_sem_op) else: logger.warning('pre_science_stabilize: waited for SEM to go into the state STABILIZE for {}s.' 'No event report was received.'.format(wait)) @@ -448,13 +436,13 @@ def pre_science_stabilize(ccs, pool_name): # disable SEM event forwarding logger.info('pre_science_stabilize: disable the SEM event forwarding') - tc_dis = ccs.Tcsend_DB('DPU_IFSW_UPDT_PAR_BOOL', 4, + tc_dis = cfl.Tcsend_DB('DPU_IFSW_UPDT_PAR_BOOL', 4, 'SEM_SERV5_1_FORWARD', 'TYPE_BOOL', 0, 0, 'SEM_SERV5_2_FORWARD', 'TYPE_BOOL', 0, 0, 'SEM_SERV5_3_FORWARD', 'TYPE_BOOL', 0, 0, 'SEM_SERV5_4_FORWARD', 'TYPE_BOOL', 0, 0, ack='0b1011') - tm.check_acknowledgement(ccs=ccs, pool_name=pool_name, tc_identifier=tc_for) + tm.check_acknowledgement(pool_name=pool_name, tc_identifier=tc_for) # if both events were received this procedure was successful if len(event_iasw) > 0 and len(event_sem_op) > 0: diff --git a/Tst/testing_library/testlib/report.py b/Tst/testing_library/testlib/report.py index c9e10b5..d0cdc4d 100644 --- a/Tst/testing_library/testlib/report.py +++ b/Tst/testing_library/testlib/report.py @@ -7,6 +7,11 @@ import datetime import logging import collections import json +import sys + +import confignator +sys.path.append(confignator.get_option('paths', 'ccs')) +import ccs_function_lib as cfl # create a logger logger = logging.getLogger(__name__) @@ -80,13 +85,12 @@ def parse_step_from_json_string(line, key_word): logger.error('parse_tc_id_from_json_string: parsing of the TC JSON string failed!') -def command_step_begin(step_param, script_version, ccs, pool_name, step_start_cuc): +def command_step_begin(step_param, script_version, pool_name, step_start_cuc): """ Builds a string and writes it into the logging file. A keyword is set to enable a machine read out of the log file. All information of the step is written in a JSON string. :param step_param: :param script_version: - :param ccs: :param pool_name: :param step_start_cuc: :return: @@ -111,7 +115,7 @@ def command_step_end(step_param, step_end_cuc): logger.info('{} {}\n'.format(cmd_step_keyword_done, encode_to_json_string(step_param['step_no'], step_end_cuc))) -def verification_step_begin(step_param, script_version, ccs, pool_name, step_start_cuc): +def verification_step_begin(step_param, script_version, pool_name, step_start_cuc): logger.info('{} {} {}'.format(vrc_step_keyword, step_param['step_no'], encode_to_json_string(step_number=step_param['step_no'], @@ -152,7 +156,7 @@ class StepSummary: # -------------------------------------------- -def write_log_step_header(step_param, ccs, pool_name, step_start_cuc): +def write_log_step_header(step_param, pool_name, step_start_cuc): logger.info('STEP {} (starting from {})' .format(step_param['step_no'], step_start_cuc)) logger.info(step_param['msg']) @@ -168,11 +172,11 @@ def write_log_step_footer(step_param, step_result): logger.warning('Step {} failed.'.format(step_param['step_no'])) -def write_log_test_header(test, ccs, pool_name): +def write_log_test_header(test, pool_name): logger.info('-------------------------------------------------------------------------------') logger.info('Running test {} version {}\n\t\t\t\t\tpoolname = {}\n\t\t\t\t\tCUC-timestamp of test ' 'start = {}\n\t\t\t\t\tlocal time = {}' - .format(test.id, test.version, pool_name, ccs.get_last_pckt_time(pool_name=pool_name, string=False), + .format(test.id, test.version, pool_name, cfl.get_last_pckt_time(pool_name=pool_name, string=False), datetime.datetime.now().isoformat())) logger.info('Description:\n\t\t\t\t\t {}'.format(test.description)) if test.comment: @@ -209,12 +213,12 @@ def write_log_test_footer(test): return successful_steps -def print_data_tuple(ccs, tm_packets): +def print_data_tuple(tm_packets): if not isinstance(tm_packets, list): tm_packets = [tm_packets] for packet in tm_packets: if isinstance(packet, bytes): - data = ccs.Tmdata(packet)[0] + data = cfl.Tmdata(packet)[0] elif isinstance(packet, tuple): data = packet[1][0] else: @@ -227,12 +231,12 @@ def print_data_tuple(ccs, tm_packets): logger.debug('{} = {}'.format(name, value)) -def print_event_data_tuple(ccs, tm_packets): +def print_event_data_tuple(tm_packets): if not isinstance(tm_packets, list): tm_packets = [tm_packets] for packet in tm_packets: if isinstance(packet, bytes): - data = ccs.Tmdata(packet)[0] + data = cfl.Tmdata(packet)[0] elif isinstance(packet, tuple): data = packet[1][0] else: diff --git a/Tst/testing_library/testlib/sim.py b/Tst/testing_library/testlib/sim.py index 6e4d6b9..ad77d59 100644 --- a/Tst/testing_library/testlib/sim.py +++ b/Tst/testing_library/testlib/sim.py @@ -3,6 +3,9 @@ Simulators - start/stop them ============================ """ + +# TODO: many project specific methods -- implement more general functions, if necessary + import logging import os import subprocess diff --git a/Tst/testing_library/testlib/tc.py b/Tst/testing_library/testlib/tc.py index 6f1e194..48eebc4 100644 --- a/Tst/testing_library/testlib/tc.py +++ b/Tst/testing_library/testlib/tc.py @@ -7,6 +7,12 @@ This module contains functions which have to do with sending telecommands. import logging import time +import sys + +import confignator +sys.path.append(confignator.get_option('paths', 'ccs')) +import ccs_function_lib as cfl + from . import tm @@ -14,11 +20,9 @@ from . import tm logger = logging.getLogger(__name__) -def generate_stream_of_tc_6_5(ccs, pool_name, duration=300): +def generate_stream_of_tc_6_5(pool_name, duration=300): """ Generating a stream of TC(6,5). Aim to to send one TC(6,5) per cycle (one cycle lasts for 0.125ms) - :param ccs: packets.CCScom - Instance of the class CCScom :param pool_name: str Name of the pool for TM/TCs in the database :param duration: int @@ -51,7 +55,7 @@ def generate_stream_of_tc_6_5(ccs, pool_name, duration=300): # send TC(6,5) and note the current time again current_time = time.time() - tc = ccs.Tcsend_DB('SES CMD_Memory_Dump', + tc = cfl.Tcsend_DB('SES CMD_Memory_Dump', PAR_MEMORY_ID_DUMP, PAR_START_ADDRESS_DUMP, PAR_BLOCK_LENGTH_DUMP, @@ -86,13 +90,11 @@ def generate_stream_of_tc_6_5(ccs, pool_name, duration=300): return in_time, sent_tcs -def reset_housekeeping(ccs, pool_name, name): +def reset_housekeeping(pool_name, name): """ Reset a housekeeping. Set its generation frequency back to its default value. Set back its enable-status back to its default value (by default only IFSW_HK is enabled) - :param ccs: packets.CCScom - Instance of the class packets.CCScom :param pool_name: str Name of the pool for TM/TCs in the database :param name: str @@ -153,21 +155,21 @@ def reset_housekeeping(ccs, pool_name, name): # set the generation frequency to back to its default value logger.info('set the generation frequency of {} back to its default value of {} cycles ({}Hz)' .format(name, hk_freq, hk_freq/8)) - tc_freq = ccs.TcSetHkRepFreq(sid=hk_sid, period=hk_freq) - set_freq, ack_freq = tm.await_tc_acknow(ccs=ccs, pool_name=pool_name, tc_identifier=tc_freq, tm_sst=7) + tc_freq = cfl.TcSetHkRepFreq(sid=hk_sid, period=hk_freq) #TODO: not in cfl anymore + set_freq, ack_freq = tm.await_tc_acknow(pool_name=pool_name, tc_identifier=tc_freq, tm_sst=7) # disable or enable the housekeeping if hk_enabled is True: # send TC(3,5) to enable the housekeeping report logger.info('enable the {} housekeeping report'.format(name)) - tc_enb = ccs.Tcsend_DB('DPU_IFSW_ENB_HK_DR_GEN', hk_sid, ack='0b1011', pool_name=pool_name) - enabled, ack_enb = tm.await_tc_acknow(ccs=ccs, pool_name=pool_name, tc_identifier=tc_enb, tm_sst=7) + tc_enb = cfl.Tcsend_DB('DPU_IFSW_ENB_HK_DR_GEN', hk_sid, ack='0b1011', pool_name=pool_name) + enabled, ack_enb = tm.await_tc_acknow(pool_name=pool_name, tc_identifier=tc_enb, tm_sst=7) if hk_enabled is False: # send TC(3,5) to disable the housekeeping report logger.info('disable the {} housekeeping report'.format(name)) - tc_dis = ccs.Tcsend_DB('DPU_IFSW_DIS_HK_DR_GEN', hk_sid, ack='0b1011', pool_name=pool_name) - disabled, ack_dis = tm.await_tc_acknow(ccs=ccs, pool_name=pool_name, tc_identifier=tc_dis, tm_sst=7) + tc_dis = cfl.Tcsend_DB('DPU_IFSW_DIS_HK_DR_GEN', hk_sid, ack='0b1011', pool_name=pool_name) + disabled, ack_dis = tm.await_tc_acknow(pool_name=pool_name, tc_identifier=tc_dis, tm_sst=7) # evaluate if the function was successful if set_freq: @@ -181,13 +183,11 @@ def reset_housekeeping(ccs, pool_name, name): return success -def reset_all_housekeepings(ccs, pool_name): +def reset_all_housekeepings(pool_name): """ This function resets all housekeepings. The frequencies are set to their default value. The enabled status is set back to its default value. - :param ccs: packets.CCScom - Instance of the class packets.CCScom :param poolname: str Name of the pool for TM/TCs in the database :return: bool @@ -209,7 +209,7 @@ def reset_all_housekeepings(ccs, pool_name): # reset all housekeepings result = [] for hk_name in housekeepings: - suc = reset_housekeeping(ccs=ccs, pool_name=pool_name, name=hk_name) + suc = reset_housekeeping(pool_name=pool_name, name=hk_name) result.append(suc) if suc: logger.debug('reset_all_housekeepings: reset of {} was successful'.format(hk_name)) @@ -229,26 +229,25 @@ def reset_all_housekeepings(ccs, pool_name): return success -def stop_sem(ccs, pool_name): +def stop_sem(pool_name): """ The TC (193,4) is sent to stop the SEM. Two events are awaited EVT_IASW_TR with DestIaswSt = STANDBY and EVT_SEM_TR with DestSemSt = OFF. - :param ccs: :param pool_name: :return: """ result = False # send TC(193,4) to stop SEM - tc_stop = ccs.Tcsend_DB('DPU_IFSW_STOP_SEM', ack='0b1011', pool_name=pool_name) - t_tc_stop = tm.time_tc_accepted(ccs=ccs, pool_name=pool_name, tc_identifier=tc_stop) + tc_stop = cfl.Tcsend_DB('DPU_IFSW_STOP_SEM', ack='0b1011', pool_name=pool_name) + t_tc_stop = tm.time_tc_accepted(pool_name=pool_name, tc_identifier=tc_stop) # sim.stop_sem(sem=None) ??? entry_iasw = {'DestIaswSt': 'STANDBY'} - evt_iasw = tm.await_event(ccs=ccs, pool_name=pool_name, severity=1, event_id='EVT_IASW_TR', entries=entry_iasw, + evt_iasw = tm.await_event(pool_name=pool_name, severity=1, event_id='EVT_IASW_TR', entries=entry_iasw, duration=20, t_from=t_tc_stop - 1) entry_sem = {'DestSemSt': 'OFF'} - evt_sem = tm.await_event(ccs=ccs, pool_name=pool_name, severity=1, event_id='EVT_SEM_TR', entries=entry_sem, + evt_sem = tm.await_event(pool_name=pool_name, severity=1, event_id='EVT_SEM_TR', entries=entry_sem, duration=20, t_from=t_tc_stop - 1) if len(evt_iasw) > 0 and len(evt_sem) > 0: result = True diff --git a/Tst/testing_library/testlib/testing_logger.py b/Tst/testing_library/testlib/testing_logger.py index c29be18..7f0d77b 100644 --- a/Tst/testing_library/testlib/testing_logger.py +++ b/Tst/testing_library/testlib/testing_logger.py @@ -7,8 +7,7 @@ import logging.config import os import datetime -from . import tools - +import confignator cmd_log_auxiliary = '_command.log' vrc_log_auxiliary = '_verification.log' @@ -23,9 +22,10 @@ def get_path_for_logs(module_name): :rtype: str """ # ToDo create the filename using the testing_logger.cmd_scrpt_auxiliary variable - cfg = tools.read_config() + # cfg = tools.read_config() # Fetch the path from the project config file - path = cfg.get('LOGGING', 'test_run') + # path = cfg.get('LOGGING', 'test_run') + path = confignator.get_config().get('tst-logging', 'test_run') # Create the directory for the logging files os.makedirs(path, mode=0o777, exist_ok=True) filename = path + module_name + '.log' diff --git a/Tst/testing_library/testlib/tm.py b/Tst/testing_library/testlib/tm.py index 446be07..3f23c6a 100644 --- a/Tst/testing_library/testlib/tm.py +++ b/Tst/testing_library/testlib/tm.py @@ -56,14 +56,11 @@ import sys import time import bitstring -from sqlalchemy import create_engine -from sqlalchemy.orm import sessionmaker import confignator -ccs_path = confignator.get_option('paths', 'ccs') -sys.path.append(ccs_path) +sys.path.append(confignator.get_option('paths', 'ccs')) +import ccs_function_lib as cfl -from database import config_db from database import tm_db from . import idb @@ -73,39 +70,39 @@ from . import tools logger = logging.getLogger(__name__) -def sessionfactory(ccs): - """ - Creates a sessionmaker - - :param ccs: Instance of the class packets.CCScom - :type ccs: packets.CCScom - - :return: Sessionmaker object - :rtype: sqlalchemy.orm.session.sessionmaker - """ - if ccs.session is None: - engine = None - s_factory = None - if engine is None: - engine = create_engine( - config_db.mysql_connection_string, - echo="-v" in sys.argv) - s_factory = sessionmaker(bind=engine) - else: - s_factory = ccs.session - return s_factory - - -def new_database_session(ccs): - """ - Creates a new session. - :param ccs: packets.CCScom - Instance of the class packets.CCScom - :return: session - """ - session_maker = sessionfactory(ccs=ccs) - session = session_maker() - return session +# def sessionfactory(ccs): +# """ +# Creates a sessionmaker +# +# :param ccs: Instance of the class packets.CCScom +# :type ccs: packets.CCScom +# +# :return: Sessionmaker object +# :rtype: sqlalchemy.orm.session.sessionmaker +# """ +# if ccs.session is None: +# engine = None +# s_factory = None +# if engine is None: +# engine = create_engine( +# config_db.mysql_connection_string, +# echo="-v" in sys.argv) +# s_factory = sessionmaker(bind=engine) +# else: +# s_factory = ccs.session +# return s_factory +# +# +# def new_database_session(ccs): +# """ +# Creates a new session. +# :param ccs: packets.CCScom +# Instance of the class packets.CCScom +# :return: session +# """ +# session_maker = sessionfactory(ccs=ccs) +# session = session_maker() +# return session def filter_chain(query, pool_name, is_tm=True, st=None, sst=None, apid=None, seq=None, t_from=None, t_to=None, dest_id=None, not_apid=None): @@ -167,11 +164,10 @@ def filter_chain(query, pool_name, is_tm=True, st=None, sst=None, apid=None, seq return query -def highest_cuc_timestamp(ccs, tm_list): +def highest_cuc_timestamp(tm_list): """ Get the TM packet with the highest CUC timestamp of the packet list - :param (packets.CCScom) ccs: Instance of the class packets.CCScom :param list tm_list: List of TM packets :return: The TM packet with the highest CUC timestamp (this is the one with the smallest difference to now). @@ -182,7 +178,7 @@ def highest_cuc_timestamp(ccs, tm_list): cuc = 0 for i in range(len(tm_list)): try: - tstamp = ccs.get_cuctime(tm_list[i]) + tstamp = cfl.get_cuctime(tm_list[i]) except Exception as unknown_error: logger.exception(unknown_error) continue @@ -192,11 +188,10 @@ def highest_cuc_timestamp(ccs, tm_list): return highest -def lowest_cuc_timestamp(ccs, pool_name, tm_list): +def lowest_cuc_timestamp(pool_name, tm_list): """ Get the TM packet with the lowest CUC timestamp of the packet list - :param (packets.CCScom) ccs: Instance of the class packets.CCScom :param (str) pool_name: name of the packet pool in the database :param (list) tm_list: List of TM packets @@ -205,10 +200,10 @@ def lowest_cuc_timestamp(ccs, pool_name, tm_list): """ lowest = None if isinstance(tm_list, list) and len(tm_list) > 0: - cuc = ccs.get_last_pckt_time(pool_name=pool_name, string=False) + cuc = cfl.get_last_pckt_time(pool_name=pool_name, string=False) for i in range(len(tm_list)): try: - tstamp = ccs.get_cuctime(tm_list[i]) + tstamp = cfl.get_cuctime(tm_list[i]) except Exception as unknown_error: logger.exception(unknown_error) continue @@ -218,11 +213,10 @@ def lowest_cuc_timestamp(ccs, pool_name, tm_list): return lowest -def time_tc_accepted(ccs, pool_name, tc_identifier): +def time_tc_accepted(pool_name, tc_identifier): """ Get the CUC timestamp of the command acceptance acknowledgement TM packet (or acceptance failure) - :param packets.CCScom ccs: Instance of the class packets.CCScom :param str pool_name: name of the packet pool in the database :param tuple tc_identifier: TC identifier is a tuple which consists out of apid, ssc, CUC-timestamp @@ -231,21 +225,21 @@ def time_tc_accepted(ccs, pool_name, tc_identifier): """ cuc = None # get the acknowledgement packets - suc, acknow = await_tc_acknow(ccs=ccs, pool_name=pool_name, tc_identifier=tc_identifier, tm_st=1) + suc, acknow = await_tc_acknow(pool_name=pool_name, tc_identifier=tc_identifier, tm_st=1) # filter for accepted TM(1,1) or acceptance failure TM(1,2) if len(acknow) > 0: for i in range(len(acknow)): subservicetype = acknow[i][0][11] if subservicetype == 1 or subservicetype == 2: # get the cuc timestamp of the acknowledgement packet - cuc = ccs.get_cuctime(acknow[i]) + cuc = cfl.get_cuctime(acknow[i]) break else: logger.warning('time_tc_accepted: no acknowledgement TM found for TM {}'.format(tc_identifier)) return cuc -def set_time_interval(ccs, pool_name, t_from, t_to, duration): +def set_time_interval(pool_name, t_from, t_to, duration): """ Calculate the time interval for given values t_from, t_to and duration. The time interval is used to for database queries. @@ -256,7 +250,6 @@ def set_time_interval(ccs, pool_name, t_from, t_to, duration): value in a other function. * If only a duration and no t_to is provided the upper boundary = lower boundary + duration - :param packets.CCScom ccs: Instance of the class packets.CCScom :param str pool_name: name of the packet pool in the database :param float t_from: CUC timestamp, lower boundary for the time interval. If t_from is None the current CUC timestamp is retrieved from the database by packets.get_last_pckt_time() :param float t_to: CUC timestamp, upper boundary for the time interval. @@ -274,7 +267,7 @@ def set_time_interval(ccs, pool_name, t_from, t_to, duration): # set the lower interval boundary, if no t_from is given, the current time will be used if t_from is None: # CUC timestamp of the last tm packet is used as "now" - last_packet_time = ccs.get_last_pckt_time(pool_name=pool_name, string=False) + last_packet_time = cfl.get_last_pckt_time(pool_name=pool_name, string=False) if last_packet_time: t_from = last_packet_time else: @@ -329,7 +322,7 @@ def set_query_interval(t_from, t_to): return interval -def decode_single_tm_packet(packet, ccs): +def decode_single_tm_packet(packet): """ Decodes a single TM packet. The packet has to be of the type bytes. If the packet is a TC the returned tuple consists out of (header, None) @@ -337,7 +330,6 @@ def decode_single_tm_packet(packet, ccs): For the case that the data field can not be read the tuple (header, None) is returned :param bytes packet: TM packet in byte-string format - :param packets.CCScom ccs: Instance of the class packets.CCScom :return: tuple or None :rtype: the decoded packet || None @@ -345,14 +337,14 @@ def decode_single_tm_packet(packet, ccs): assert isinstance(packet, bytes) result = None - header = ccs.Tmread(packet) + header = cfl.Tmread(packet) if header is not None: # the packet is a TC if header[1] == 1: result = header, None # the packet is a TM elif header[1] == 0: - data = ccs.Tmdata(packet) + data = cfl.Tmdata(packet) if data != (None, None): # data field could be decoded result = header, data else: # data field could not be decoded @@ -363,13 +355,12 @@ def decode_single_tm_packet(packet, ccs): return result -def decode_tm(tm_packets, ccs): +def decode_tm(tm_packets): """ Check if a TM packet or a list of TM packets are still bytes. If so, they are decoded, otherwise just pass the packets. If a failure occurs while unpacking return None :param list tm_packets: <list> of <bytes>: TM packet or a list of TM packets in byte format or as tm_db.DbTelemetry row - :param packets.CCScom ccs: Instance of the class packets.CCScom :return: list decoded TM packets (a TM packet is a tuple (header, data)) :rtype: list @@ -384,12 +375,12 @@ def decode_tm(tm_packets, ccs): for j in range(len(tm_packets)): t_start = time.time() if isinstance(tm_packets[j], bytes): - decoded.append(decode_single_tm_packet(packet=tm_packets[j], ccs=ccs)) + decoded.append(decode_single_tm_packet(packet=tm_packets[j])) elif isinstance(tm_packets[j], tuple): decoded.append(tm_packets[j]) elif isinstance(tm_packets[j], tm_db.DbTelemetry): row = tm_packets[j].raw - decoded.append(decode_single_tm_packet(packet=row, ccs=ccs)) + decoded.append(decode_single_tm_packet(packet=row)) else: logger.debug('decode_tm: data format for the TM packet is not known! Type of the packet is {}' .format(type(tm_packets[j]))) @@ -400,12 +391,12 @@ def decode_tm(tm_packets, ccs): else: if isinstance(tm_packets, bytes): - decoded.append(decode_single_tm_packet(packet=tm_packets, ccs=ccs)) + decoded.append(decode_single_tm_packet(packet=tm_packets)) elif isinstance(tm_packets, tuple): decoded.append(tm_packets) elif isinstance(tm_packets, tm_db.DbTelemetry): row = tm_packets.raw - decoded.append(decode_single_tm_packet(packet=row, ccs=ccs)) + decoded.append(decode_single_tm_packet(packet=row)) else: logger.debug('decode_tm: data format for the TM packet is not known! Type of the packet is {}' .format(type(tm_packets))) @@ -413,11 +404,10 @@ def decode_tm(tm_packets, ccs): return decoded -def get_tm_data_entries(ccs, tm_packet, data_entry_names): +def get_tm_data_entries(tm_packet, data_entry_names): """ For one TM packet the specified entries are extracted and returned. - :param packets.CCScom ccs: Instance of the class packets.CCScom :param PUS-packet tm_packet: TM packet which holds the desired parameter entries :param string-or-list data_entry_names: string or list of strings: this are the names/identifiers of the data entries @@ -427,7 +417,7 @@ def get_tm_data_entries(ccs, tm_packet, data_entry_names): values = {} keys = data_entry_names # if the TM packets are not decoded already, do it now - packet = decode_tm(tm_packets=tm_packet, ccs=ccs) + packet = decode_tm(tm_packets=tm_packet) # extract the required entries from the telemetry packet/packets if len(packet) == 1: @@ -459,20 +449,19 @@ def get_tm_data_entries(ccs, tm_packet, data_entry_names): # For every TM packet of the list the specified entries are extracted and returned. -# @param ccs: instance of the class CCScom # @param event_tms: <list> or single TM packet (expecting a event TM packet) # @param data_entry_names: <string> or <list of strings>: this are the names/identifiers of the data entry # @return: <list> of <dict>: key-value pairs of the data entries (as dict) or a empty array -def get_tm_list_data_entries(ccs, tm_packets, data_entry_names): +def get_tm_list_data_entries(tm_packets, data_entry_names): result = [] # if the TM packets are not decoded already, do it now - event_packets = decode_tm(tm_packets=tm_packets, ccs=ccs) + event_packets = decode_tm(tm_packets=tm_packets) # extract the required entries from the telemetry packet/packets if event_packets is not None: if isinstance(event_packets, list) and len(event_packets) > 0: for j in range(len(event_packets)): - result.append(get_tm_data_entries(tm_packet=event_packets[j], data_entry_names=data_entry_names, ccs=ccs)) + result.append(get_tm_data_entries(tm_packet=event_packets[j], data_entry_names=data_entry_names)) return result @@ -480,10 +469,9 @@ def get_tm_list_data_entries(ccs, tm_packets, data_entry_names): # @param packet: TM packet # @param entry_name: <str> name of the entry (the 3rd value of the tuple) # @param entry_value: value the entry (the 1st value of the tuple) -# @param ccs: instance of the class CCScom # @return: <boolean>: True if entry exists and has the correct value -def has_entry(packet, entry_name, entry_value, ccs): - item = get_tm_data_entries(tm_packet=packet, data_entry_names=entry_name, ccs=ccs) +def has_entry(packet, entry_name, entry_value): + item = get_tm_data_entries(tm_packet=packet, data_entry_names=entry_name) if item is not None: if entry_name in item and item[entry_name] == entry_value: return True @@ -492,14 +480,13 @@ def has_entry(packet, entry_name, entry_value, ccs): # ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ -def fetch_packets(ccs, pool_name, is_tm=True, st=None, sst=None, apid=None, ssc=None, t_from=None, t_to=None, +def fetch_packets(pool_name, is_tm=True, st=None, sst=None, apid=None, ssc=None, t_from=None, t_to=None, dest_id=None, not_apid=None, decode=True, silent=False): # ToDo: remove the agrument silent, as this functionality is covert by using log level INFO # (DEBUG still logs more information) """ Makes a single database query for packets from the pool for a fixed time interval. By using arguments, specific packets can be retrieved. - :param ccs: instance of packets.CCScom :param pool_name: str The name of the pool in the database :param is_tm: bool @@ -545,7 +532,7 @@ def fetch_packets(ccs, pool_name, is_tm=True, st=None, sst=None, apid=None, ssc= apid = tools.convert_apid_to_int(apid=apid) # make database query - session = new_database_session(ccs=ccs) + session = cfl.scoped_session_storage() query = session.query(tm_db.DbTelemetry) query = filter_chain(query, pool_name=pool_name, @@ -570,17 +557,15 @@ def fetch_packets(ccs, pool_name, is_tm=True, st=None, sst=None, apid=None, ssc= # decode the data if len(data) > 0 and decode is True: - data = decode_tm(tm_packets=data, ccs=ccs) + data = decode_tm(tm_packets=data) return data -def await_tm(ccs, pool_name, st, sst=None, apid=None, ssc=None, t_from=None, t_to=None, dest_id=None, not_apid=None, decode=True, duration=5, check_int=None): +def await_tm(pool_name, st, sst=None, apid=None, ssc=None, t_from=None, t_to=None, dest_id=None, not_apid=None, decode=True, duration=5, check_int=None): """ Waiting for a specific TM packet, if it is received the packet is returned immediately. The database queries are done in regular intervals. - :param ccs: instance of packets.CCScom - instance of packets.CCScom :param pool_name: str name of the pool in the database :param st: int @@ -612,7 +597,7 @@ def await_tm(ccs, pool_name, st, sst=None, apid=None, ssc=None, t_from=None, t_t result = [] # set time interval for the desired packets - t_from, t_to = set_time_interval(ccs=ccs, pool_name=pool_name, t_from=t_from, t_to=t_to, duration=duration) + t_from, t_to = set_time_interval(pool_name=pool_name, t_from=t_from, t_to=t_to, duration=duration) # set the interval of fetching packets from the pool if check_int is None: @@ -622,8 +607,7 @@ def await_tm(ccs, pool_name, st, sst=None, apid=None, ssc=None, t_from=None, t_t condition = True while condition is True: # get packets from the database - packets = fetch_packets(ccs=ccs, - pool_name=pool_name, + packets = fetch_packets(pool_name=pool_name, is_tm=True, st=st, sst=sst, @@ -636,7 +620,7 @@ def await_tm(ccs, pool_name, st, sst=None, apid=None, ssc=None, t_from=None, t_t decode=decode, silent=True) # check condition - if len(packets) > 0 or ccs.get_last_pckt_time(pool_name=pool_name, string=False) > t_to: + if len(packets) > 0 or cfl.get_last_pckt_time(pool_name=pool_name, string=False) > t_to: condition = False result = packets else: @@ -646,7 +630,7 @@ def await_tm(ccs, pool_name, st, sst=None, apid=None, ssc=None, t_from=None, t_t return result -def get_tm(ccs, pool_name, st=None, sst=None, apid=None, ssc=None, duration=5, t_from=None, t_to=None, +def get_tm(pool_name, st=None, sst=None, apid=None, ssc=None, duration=5, t_from=None, t_to=None, check_interval=0.2, decode=True): # ToDo: remove the argument check_interval """ @@ -654,8 +638,6 @@ def get_tm(ccs, pool_name, st=None, sst=None, apid=None, ssc=None, duration=5, t parameters. Time intervals: ]t_from, t_from+duration[ or ]now, now+duration[. "now" means the CUC timestamp of the last tm packet. - :param ccs: packets.CCScom - Instance of the class packets.CCScom :param pool_name: str Name of the pool for TM and TC packets in the database. :param st: int @@ -679,17 +661,16 @@ def get_tm(ccs, pool_name, st=None, sst=None, apid=None, ssc=None, duration=5, t """ # set the time interval - t_from, t_to = set_time_interval(ccs=ccs, pool_name=pool_name, t_from=t_from, t_to=t_to, duration=duration) + t_from, t_to = set_time_interval(pool_name=pool_name, t_from=t_from, t_to=t_to, duration=duration) # for the case that t_to is in future, wait - current_cuc = ccs.get_last_pckt_time(pool_name=pool_name, string=False) + current_cuc = cfl.get_last_pckt_time(pool_name=pool_name, string=False) if t_to > current_cuc: difference = t_to - current_cuc time.sleep(difference) # get packets - tm_packets = fetch_packets(ccs=ccs, - pool_name=pool_name, + tm_packets = fetch_packets(pool_name=pool_name, st=st, sst=sst, apid=apid, @@ -716,12 +697,10 @@ def get_tm(ccs, pool_name, st=None, sst=None, apid=None, ssc=None, duration=5, t # ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ -def get_hk_tm(ccs, pool_name, hk_name, t_from=None, t_to=None, duration=5): +def get_hk_tm(pool_name, hk_name, t_from=None, t_to=None, duration=5): """ Fetches housekeeping reports TM(3,25) from the database and filter them by the housekeeping name (SID). - :param ccs: packets.CCScom - Instance of the class packets.CCScom :param pool_name: str Name of the pool for TM/TC packets in the database :param hk_name: str @@ -738,24 +717,22 @@ def get_hk_tm(ccs, pool_name, hk_name, t_from=None, t_to=None, duration=5): hk_list = [] # set the time interval for the database query - t_from, t_to = set_time_interval(ccs=ccs, pool_name=pool_name, t_from=t_from, t_to=t_to, duration=duration) + t_from, t_to = set_time_interval(pool_name=pool_name, t_from=t_from, t_to=t_to, duration=duration) # get the TM packets from the database - data = get_tm(ccs=ccs, pool_name=pool_name, st=3, sst=25, t_from=t_from, t_to=t_to, duration=duration) + data = get_tm(pool_name=pool_name, st=3, sst=25, t_from=t_from, t_to=t_to, duration=duration) for packet in data: - if has_entry(packet=packet, entry_name='Sid', entry_value=hk_name, ccs=ccs): + if has_entry(packet=packet, entry_name='Sid', entry_value=hk_name): hk_list.append(packet) return hk_list -def await_hk_tm(ccs, pool_name, sid=None, t_from=None, duration=5): +def await_hk_tm(pool_name, sid=None, t_from=None, duration=5): """ Get the next housekeeping TM packet with a specific SID. If there are more packets of the same kind. The one with the highest CUC timestamp is returned. - :param ccs: packets.CCScom - Instance of the class packets.CCScom :param pool_name: str Name of the TM/TC pool in the database :param sid: int or str @@ -774,15 +751,15 @@ def await_hk_tm(ccs, pool_name, sid=None, t_from=None, duration=5): # if the sid is a integer, get the name string from the instrument database if isinstance(sid, int): - sid = idb.convert_hk_sid(ccs=ccs, sid=sid) + sid = idb.convert_hk_sid(sid=sid) # get the housekeeping TM packets from the pool - tm_list = await_tm(ccs=ccs, pool_name=pool_name, st=3, sst=25, t_from=t_from, duration=duration) + tm_list = await_tm(pool_name=pool_name, st=3, sst=25, t_from=t_from, duration=duration) # filter for the correct housekeeping kind (SID) housekeepings = [] for packet in tm_list: - packet_sid = get_tm_data_entries(ccs=ccs, tm_packet=packet, data_entry_names='Sid') + packet_sid = get_tm_data_entries(tm_packet=packet, data_entry_names='Sid') if 'Sid' in packet_sid: if packet_sid['Sid'] == sid: housekeepings.append(packet) @@ -791,7 +768,7 @@ def await_hk_tm(ccs, pool_name, sid=None, t_from=None, duration=5): if len(housekeepings) > 0: logger.debug('await_hk_tm: found {} packets with SID {}'.format(len(housekeepings), sid)) # ToDo: change to the lowest_cuc_timestamp (the first HK with this SID), because of waiting for the next HK TM?! - youngest = highest_cuc_timestamp(ccs=ccs, tm_list=housekeepings) + youngest = highest_cuc_timestamp(tm_list=housekeepings) header = youngest[0] data = youngest[1] result = header, data @@ -801,7 +778,7 @@ def await_hk_tm(ccs, pool_name, sid=None, t_from=None, duration=5): return result -def get_self_def_hk_tm(ccs, pool_name, sid, format_string, t_from=None, t_to=None): +def get_self_def_hk_tm(pool_name, sid, format_string, t_from=None, t_to=None): """ Fetches TM(3,25) housekeeping packets for self defined housekeeping. In order to unpack the data field a format string is required. The packets from the pool are filtered, after unpacking, by the SID (which are the first @@ -809,8 +786,6 @@ def get_self_def_hk_tm(ccs, pool_name, sid, format_string, t_from=None, t_to=Non Parameters ---------- - :param ccs: packets.CCScom - instance of the class CCSCom :param pool_name: str pool name of the TM/TC packets pool :param sid: int @@ -837,12 +812,12 @@ def get_self_def_hk_tm(ccs, pool_name, sid, format_string, t_from=None, t_to=Non hk_list = [] # get the TM packets from the database - packets = get_tm(ccs=ccs, pool_name=pool_name, st=3, sst=25, t_from=t_from, t_to=t_to, decode=False) + packets = get_tm(pool_name=pool_name, st=3, sst=25, t_from=t_from, t_to=t_to, decode=False) # filter TM packets with the correct Sid for packet in packets: # read the header - header = ccs.Tmread(pckt=packet) + header = cfl.Tmread(pckt=packet) # extract the SID from the Bits-Field (the first 8 bits are the SID) packet_sid = header[-2][0:8].unpack('uint:8')[0] if packet_sid == sid: @@ -861,11 +836,10 @@ def get_self_def_hk_tm(ccs, pool_name, sid, format_string, t_from=None, t_to=Non return hk_list -def get_hk_entry(ccs, pool_name, hk_name, name=None, t_from=None, t_to=None, duration=5, silent=False): +def get_hk_entry(pool_name, hk_name, name=None, t_from=None, t_to=None, duration=5, silent=False): """ Get a specific entry of the youngest housekeeping report from the TM/TC database by name. - :param ccs: :param pool_name: :param hk_name: :param name: @@ -887,16 +861,16 @@ def get_hk_entry(ccs, pool_name, hk_name, name=None, t_from=None, t_to=None, dur name = new_names # get the TM packets from the database - hk_list = get_hk_tm(ccs=ccs, pool_name=pool_name, hk_name=hk_name, t_from=t_from, t_to=t_to, duration=duration) + hk_list = get_hk_tm(pool_name=pool_name, hk_name=hk_name, t_from=t_from, t_to=t_to, duration=duration) if len(hk_list) > 0: # take the youngest housekeeping report - hk_report = highest_cuc_timestamp(ccs=ccs, tm_list=hk_list) + hk_report = highest_cuc_timestamp(tm_list=hk_list) # get the requested housekeeping entry out of the TM packet - entries = get_tm_data_entries(tm_packet=hk_report, data_entry_names=name, ccs=ccs) + entries = get_tm_data_entries(tm_packet=hk_report, data_entry_names=name) # pick out the results if len(entries) > 0: - result = entries, ccs.get_cuctime(hk_report), hk_name + result = entries, cfl.get_cuctime(hk_report), hk_name # log the result if isinstance(entries, dict): keys = entries.keys() @@ -907,16 +881,16 @@ def get_hk_entry(ccs, pool_name, hk_name, name=None, t_from=None, t_to=None, dur logger.debug('get_hk_entry: UNDER CONSTRUCTION: HERE IS SOMETHING TO IMPLEMENT') if len(entries) < 1: logger.debug('No entry with name(s) {} found in the housekeeping {} with ' - 'CUC timestamp {}'.format(name, hk_name, ccs.get_cuctime(hk_report))) + 'CUC timestamp {}'.format(name, hk_name, cfl.get_cuctime(hk_report))) else: logger.warning('The required {} housekeeping report/entry could not be found in the database.'.format(hk_name)) return result -def await_hk_entries(ccs, pool_name, sid=None, name=None): # 2 usages IASW39 +def await_hk_entries(pool_name, sid=None, name=None): # 2 usages IASW39 result = None - hks = await_hk_tm(ccs, pool_name, sid=sid) + hks = await_hk_tm(pool_name, sid=sid) # extract the required entries from the housekeeping if hks is not None: @@ -943,13 +917,11 @@ def await_hk_entries(ccs, pool_name, sid=None, name=None): # 2 usages IASW39 # ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ -def get_st_and_sst(ccs, pool_name, apid, ssc, is_tm=False, t_from=None): +def get_st_and_sst(pool_name, apid, ssc, is_tm=False, t_from=None): """ Get the ST and SST of a packet by using the APID and SSC of the packet. Does a database query with the APID and SSC for a TM or a TC. For TC the t_from will be ignored, since TC does not have a valid CUC timestamp - :param ccs: packets.CCScom - instance of packets.CCScom :param pool_name: str name of the pool for TC/TMs in the database :param apid: str or int @@ -974,9 +946,9 @@ def get_st_and_sst(ccs, pool_name, apid, ssc, is_tm=False, t_from=None): # make a database query for the packet in order to retrieve the ST and SST for logging if is_tm is False: # for TC the timestamp is no valid CUC timestamp - tc_list = fetch_packets(ccs=ccs, pool_name=pool_name, is_tm=is_tm, apid=apid, ssc=ssc) + tc_list = fetch_packets(pool_name=pool_name, is_tm=is_tm, apid=apid, ssc=ssc) else: - tc_list = fetch_packets(ccs=ccs, pool_name=pool_name, is_tm=is_tm, apid=apid, ssc=ssc, t_from=t_from) + tc_list = fetch_packets(pool_name=pool_name, is_tm=is_tm, apid=apid, ssc=ssc, t_from=t_from) # if the packet was found read the header and extract the ST and SST if len(tc_list) == 1: @@ -1067,12 +1039,10 @@ def extract_apid_from_packetid(packet_id): return apid -def get_tc_acknow(ccs, pool_name, t_tc_sent, tc_apid, tc_ssc, tm_st=1, tm_sst=None): +def get_tc_acknow(pool_name, t_tc_sent, tc_apid, tc_ssc, tm_st=1, tm_sst=None): """ Check if for the TC acknowledgement packets can be found in the database. This function makes a single database query. - :param ccs: packets.CCScom - instance of the class packets.CCScom :param pool_name: str Name of the TM pool in the database :param t_tc_sent: float @@ -1098,7 +1068,7 @@ def get_tc_acknow(ccs, pool_name, t_tc_sent, tc_apid, tc_ssc, tm_st=1, tm_sst=No tc_apid = tools.convert_apid_to_int(apid=tc_apid) # make database query - packets = fetch_packets(ccs=ccs, pool_name=pool_name, st=tm_st, sst=tm_sst, t_from=t_tc_sent - 1) + packets = fetch_packets(pool_name=pool_name, st=tm_st, sst=tm_sst, t_from=t_tc_sent - 1) # filter for TM packets with the correct APID and source sequence counter (SSC) in the data field ack_tms = [] @@ -1112,7 +1082,7 @@ def get_tc_acknow(ccs, pool_name, t_tc_sent, tc_apid, tc_ssc, tm_st=1, tm_sst=No else: name_apid = 'TcPacketId' name_psc = 'TcPacketSeqCtrl' - para = get_tm_data_entries(ccs=ccs, tm_packet=packets[i], data_entry_names=[name_apid, name_psc]) + para = get_tm_data_entries(tm_packet=packets[i], data_entry_names=[name_apid, name_psc]) if name_apid in para and name_psc in para: # extract the SSC from the PSC ssc = extract_ssc_from_psc(psc=para[name_psc]) @@ -1130,8 +1100,7 @@ def get_tc_acknow(ccs, pool_name, t_tc_sent, tc_apid, tc_ssc, tm_st=1, tm_sst=No # treat with the result from the database query if len(ack_tms) > 0: # get the ST and SST of the TC for logging purposes - tc_st, tc_sst = get_st_and_sst(ccs=ccs, - pool_name=pool_name, + tc_st, tc_sst = get_st_and_sst(pool_name=pool_name, apid=tc_apid, ssc=tc_ssc, is_tm=False, @@ -1145,33 +1114,31 @@ def get_tc_acknow(ccs, pool_name, t_tc_sent, tc_apid, tc_ssc, tm_st=1, tm_sst=No data = ack_tms[i][1] if result is not False: if head[11] == 1 or head[11] == 3 or head[11] == 7: - logger.info('TM({},{}) @ {}'.format(head[10], head[11], ccs.get_cuctime(head))) + logger.info('TM({},{}) @ {}'.format(head[10], head[11], cfl.get_cuctime(head))) result = True if head[11] == 2 or head[11] == 4 or head[11] == 8: if head[11] == 2: logger.info('TM({},{}) @ {} FAILURE: Acknowledge failure of acceptance check for a command.' - .format(head[10], head[11], ccs.get_cuctime(head))) + .format(head[10], head[11], cfl.get_cuctime(head))) logger.debug('Data of the TM packet: {}'.format(data)) if head[11] == 4: logger.info('TM({},{}) @ {} FAILURE: Acknowledge failure of start check for a command.' - .format(head[10], head[11], ccs.get_cuctime(head))) + .format(head[10], head[11], cfl.get_cuctime(head))) logger.debug('Data of the TM packet: {}'.format(data)) if head[11] == 8: logger.info( 'TM({},{}) @ {} FAILURE: Acknowledge failure of termination check for a command.' - .format(head[10], head[11], ccs.get_cuctime(head))) + .format(head[10], head[11], cfl.get_cuctime(head))) logger.debug('Data of the TM packet: {}'.format(data)) result = False return result, ack_tms -def await_tc_acknow(ccs, pool_name, tc_identifier, duration=10, tm_st=1, tm_sst=None): +def await_tc_acknow(pool_name, tc_identifier, duration=10, tm_st=1, tm_sst=None): """ Waiting to receive the acknowledgement packet of a sent telecommand (TC) for a given duration. As soon as acknowledgement packets were found the function returns. - :param ccs: packets.CCScom - instance of the class packets.CCScom :param pool_name: str Name of the pool in the database :param tc_identifier: tuple @@ -1187,7 +1154,6 @@ def await_tc_acknow(ccs, pool_name, tc_identifier, duration=10, tm_st=1, tm_sst= list of the found acknowledgement packets """ - # assert isinstance(ccs, packets.CCScom) assert isinstance(pool_name, str) assert isinstance(tc_identifier, tuple) tc_apid = tc_identifier[0] @@ -1205,8 +1171,7 @@ def await_tc_acknow(ccs, pool_name, tc_identifier, duration=10, tm_st=1, tm_sst= finished = False while True: # get the acknowledgement packets for the TC - outcome, ack_list = get_tc_acknow(ccs=ccs, - pool_name=pool_name, + outcome, ack_list = get_tc_acknow(pool_name=pool_name, t_tc_sent=t_tc_sent, tc_apid=tc_apid, tc_ssc=tc_ssc, @@ -1221,8 +1186,7 @@ def await_tc_acknow(ccs, pool_name, tc_identifier, duration=10, tm_st=1, tm_sst= # if not all 3 TM(1,1), TM(1,3), TM(1,7) were received, wait 1s and query a last time if tm_sst is None and len(ack_list) < 3: time.sleep(1) - outcome, ack_list = get_tc_acknow(ccs=ccs, - pool_name=pool_name, + outcome, ack_list = get_tc_acknow(pool_name=pool_name, t_tc_sent=t_tc_sent, tc_apid=tc_apid, tc_ssc=tc_ssc, @@ -1241,8 +1205,7 @@ def await_tc_acknow(ccs, pool_name, tc_identifier, duration=10, tm_st=1, tm_sst= # if no acknowledgement packets were received at all after the loop if result is None: # get the ST and SST of the TC for logging purposes - tc_st, tc_sst = get_st_and_sst(ccs=ccs, - pool_name=pool_name, + tc_st, tc_sst = get_st_and_sst(pool_name=pool_name, apid=tc_apid, ssc=tc_ssc, is_tm=False, @@ -1252,14 +1215,12 @@ def await_tc_acknow(ccs, pool_name, tc_identifier, duration=10, tm_st=1, tm_sst= return result, ack_list -def check_acknowledgement(ccs, pool_name, tc_identifier, duration=10): +def check_acknowledgement(pool_name, tc_identifier, duration=10): """ Check that for a sent TC the acknowledgement packets were received (assuming the acknowledgement was enabled) Will return True when all tree acknowledgement TM packets (1,1), (1,3), (1,7) or (1,1), (1,3) or just (1,1) were received. - :param ccs: packets.CCScom - Instance of the class packets.CCScom :param pool_name: str Name of the telemetry pool :param tc_identifier: tuple or list @@ -1274,13 +1235,13 @@ def check_acknowledgement(ccs, pool_name, tc_identifier, duration=10): outcome = None # check if there is a single or a list of TC for acknowledgements to check if isinstance(tc_identifier, tuple): - outcome, acks = await_tc_acknow(ccs=ccs, pool_name=pool_name, tc_identifier=tc_identifier, duration=duration) + outcome, acks = await_tc_acknow(pool_name=pool_name, tc_identifier=tc_identifier, duration=duration) if isinstance(tc_identifier, list): tc_res = [] # do the check for all commands for telecommand in tc_identifier: - outcome, acks = await_tc_acknow(ccs=ccs, pool_name=pool_name, tc_identifier=telecommand, duration=duration) + outcome, acks = await_tc_acknow(pool_name=pool_name, tc_identifier=telecommand, duration=duration) tc_res.append(outcome) # check if the TC were successful (by acknowledgement TM packets) @@ -1295,19 +1256,18 @@ def check_acknowledgement(ccs, pool_name, tc_identifier, duration=10): # ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ -def condition_event_id(ccs, tmpackets, event_id, data_entries=None): +def condition_event_id(tmpackets, event_id, data_entries=None): """ # checking the TM packets (list) if the event_id and, if given, the entries matches # @param tmpackets: <list> of TM packets (events) # returns a list of TM packets - :param ccs: :param tmpackets: :param event_id: :param data_entries: :return: """ found_packets = [] - tmpackets = decode_tm(tm_packets=tmpackets, ccs=ccs) + tmpackets = decode_tm(tm_packets=tmpackets) # compare the event identifier and if requested the data entries if len(tmpackets) > 0: @@ -1335,7 +1295,7 @@ def condition_event_id(ccs, tmpackets, event_id, data_entries=None): if isinstance(data_entries[k], dict): keys = data_entries[k].keys() for key in keys: - value = get_tm_data_entries(ccs=ccs, tm_packet=tmpackets[i], data_entry_names=key) + value = get_tm_data_entries(tm_packet=tmpackets[i], data_entry_names=key) if key in value: if value[key] == data_entries[k][key]: matches = True @@ -1353,15 +1313,13 @@ def condition_event_id(ccs, tmpackets, event_id, data_entries=None): return found_packets -def get_events(ccs, pool_name, severity, event_id, t_from=None, t_to=None, duration=None, entries=None): +def get_events(pool_name, severity, event_id, t_from=None, t_to=None, duration=None, entries=None): """ For a given duration all events with suiting severity are collected. Filtering for events with specific entries can be done by providing them in the argument entries. The function makes a single database query. If upper time interval boundary is the future, the function waits before doing the database query. - :param ccs: packets.CCScom - Instance of the class CCScom :param pool_name: str Name of the pool for TM packets in the database :param severity: int @@ -1383,19 +1341,19 @@ def get_events(ccs, pool_name, severity, event_id, t_from=None, t_to=None, durat result = [] # set the time interval - t_from, t_to = set_time_interval(ccs=ccs, pool_name=pool_name, t_from=t_from, t_to=t_to, duration=duration) + t_from, t_to = set_time_interval(pool_name=pool_name, t_from=t_from, t_to=t_to, duration=duration) # for the case that t_to is in future, wait - current_cuc = ccs.get_last_pckt_time(pool_name=pool_name, string=False) + current_cuc = cfl.get_last_pckt_time(pool_name=pool_name, string=False) if t_to > current_cuc: difference = t_to - current_cuc time.sleep(difference) # get packets - tm_packets = fetch_packets(ccs=ccs, pool_name=pool_name, st=5, sst=severity, t_from=t_from, t_to=t_to) + tm_packets = fetch_packets(pool_name=pool_name, st=5, sst=severity, t_from=t_from, t_to=t_to) # check condition (if the event TM packets have been found) - tm_list = condition_event_id(ccs=ccs, tmpackets=tm_packets, event_id=event_id, data_entries=entries) + tm_list = condition_event_id(tmpackets=tm_packets, event_id=event_id, data_entries=entries) if len(tm_list) > 0: result = tm_list @@ -1414,12 +1372,11 @@ def get_events(ccs, pool_name, severity, event_id, t_from=None, t_to=None, durat return result -def await_event(ccs, pool_name, severity, event_id, entries=None, duration=10, t_from=None, check_period=1, decode=True): +def await_event(pool_name, severity, event_id, entries=None, duration=10, t_from=None, check_period=1, decode=True): """ Wait for a event to happen. When it is received the function returns. Database queries are done periodically till the duration is elapsed or the event is received. - :param packets.CCScom ccs: Instance of the class packets.CCScom :param str pool_name: Name of the TM pool in the database :param int severity: The severity of an event is equal with the Sub-Service Type of the TM packet :param str event_id: Event ID @@ -1436,7 +1393,7 @@ def await_event(ccs, pool_name, severity, event_id, entries=None, duration=10, t result = [] # set time interval for the desired packets - t_from, t_to = set_time_interval(ccs=ccs, pool_name=pool_name, t_from=t_from, t_to=None, duration=duration) + t_from, t_to = set_time_interval(pool_name=pool_name, t_from=t_from, t_to=None, duration=duration) # set the interval of fetching packets from the pool if check_period is None: @@ -1449,8 +1406,7 @@ def await_event(ccs, pool_name, severity, event_id, entries=None, duration=10, t condition = True while condition is True: # get packets from the database - packets = fetch_packets(ccs=ccs, - pool_name=pool_name, + packets = fetch_packets(pool_name=pool_name, is_tm=True, st=st, sst=sst, @@ -1460,9 +1416,9 @@ def await_event(ccs, pool_name, severity, event_id, entries=None, duration=10, t silent=True) # check condition (if the event TM packets have been found) - events = condition_event_id(ccs=ccs, tmpackets=packets, event_id=event_id, data_entries=entries) + events = condition_event_id(tmpackets=packets, event_id=event_id, data_entries=entries) - if len(events) > 0 or ccs.get_last_pckt_time(pool_name=pool_name, string=False) > t_to: + if len(events) > 0 or cfl.get_last_pckt_time(pool_name=pool_name, string=False) > t_to: condition = False result = events else: @@ -1484,14 +1440,12 @@ def await_event(ccs, pool_name, severity, event_id, entries=None, duration=10, t return result -def extract_status_data(ccs, tm_packet): +def extract_status_data(tm_packet): """ Extract status data from Service 21 DAT_CCD_Window packets. Science data blocks are ignored Not all parameters are decoded correctly: HK_STAT_DATA_ACQ_TYPE, HK_STAT_DATA_ACQ_SRC, HK_STAT_CCD_TIMING_SCRIP HK_STAT_DATA_ACQ_TIME - :param ccs: packet.CCScom - Instance of the class CCScom :param tm_packet: PUS packet A TM(21,3) :return: dict @@ -1499,7 +1453,7 @@ def extract_status_data(ccs, tm_packet): """ status_data = {} - header = ccs.Tmread(tm_packet) + header = cfl.Tmread(tm_packet) # check if it is a TM(21,3) if header[10] == 21 and header[11] == 3: data_field = header[-2] @@ -1553,14 +1507,12 @@ def extract_status_data(ccs, tm_packet): return status_data -def get_acquisition(ccs, pool_name, tm_21_3): +def get_acquisition(pool_name, tm_21_3): """ Get all packets with the same acquisition ID. Requirement: the packet of the acquisition with the lowest CUC timestamp For every acquisition type of the same ID, it is checked if all packets were found. - :param ccs: packets.CCScom - Instance of the class packets.CCScom :param pool_name: str Name of the pool for TM/TC packets in the database :param tm_21_3: PUS packet @@ -1571,22 +1523,22 @@ def get_acquisition(ccs, pool_name, tm_21_3): result = [] transmission_finished = False - t_first_received = ccs.get_cuctime(tml=tm_21_3) + t_first_received = cfl.get_cuctime(tml=tm_21_3) # get the acquisition ID - current_acq_id = extract_status_data(ccs=ccs, tm_packet=tm_21_3)['HK_STAT_DATA_ACQ_ID'] + current_acq_id = extract_status_data(tm_packet=tm_21_3)['HK_STAT_DATA_ACQ_ID'] # get TM(21,3), not decoded and check if the acquisition ID are the correct ones t_to = t_first_received + 3 logger.info(t_first_received) while not transmission_finished: - data = get_tm(ccs=ccs, pool_name=pool_name, st=21, sst=3, t_from=t_first_received, t_to=t_to, decode=False) + data = get_tm(pool_name=pool_name, st=21, sst=3, t_from=t_first_received, t_to=t_to, decode=False) # filter data for ACQ_ID data_acq_id = [] data_acq_types = [] for pac in data: # check if the packets has the correct acquisition ID - status_data = extract_status_data(ccs=ccs, tm_packet=pac) + status_data = extract_status_data(tm_packet=pac) if status_data['HK_STAT_DATA_ACQ_ID'] == current_acq_id: data_acq_id.append(pac) # add the acquisition type to a list @@ -1600,11 +1552,11 @@ def get_acquisition(ccs, pool_name, tm_21_3): # extract all packets with the current type (acquisition type) curr_type_packets = [] for item in data_acq_id: - status_data = extract_status_data(ccs=ccs, tm_packet=item) + status_data = extract_status_data(tm_packet=item) if status_data['HK_STAT_DATA_ACQ_TYPE'] == type: curr_type_packets.append(item) # check if all packets of this type are here - total_num = extract_status_data(ccs=ccs, tm_packet=curr_type_packets[0])['HK_STAT_TOTAL_PACKET_NUM'] + total_num = extract_status_data(tm_packet=curr_type_packets[0])['HK_STAT_TOTAL_PACKET_NUM'] if len(curr_type_packets) == total_num: found_all_packets.append(True) else: @@ -1631,8 +1583,8 @@ def get_acquisition(ccs, pool_name, tm_21_3): result = data_acq_id else: # set the new t_to and do another query with extended time interval - last = highest_cuc_timestamp(ccs=ccs, tm_list=data) - t_to = ccs.get_cuctime(last) + 3 + last = highest_cuc_timestamp(tm_list=data) + t_to = cfl.get_cuctime(last) + 3 logger.info(t_to) # after 5 min stop the loop, if not finished yet length_of_time = t_to - t_first_received diff --git a/Tst/testing_library/testlib/tools.py b/Tst/testing_library/testlib/tools.py index be20e89..d0c2897 100644 --- a/Tst/testing_library/testlib/tools.py +++ b/Tst/testing_library/testlib/tools.py @@ -5,9 +5,12 @@ Tools """ import logging import os -import configparser import math -from confignator import config +import sys + +import confignator +sys.path.append(confignator.get_option('paths', 'ccs')) +import ccs_function_lib as cfl # create a logger logger = logging.getLogger(__name__) @@ -32,56 +35,56 @@ logger = logging.getLogger(__name__) # ToDo: These sections should be separated in order to make unit-testing easy and a good readability and understanding. -def read_config(file_path=None): - print(os.getcwd()) - file_path = None - if os.path.isfile(os.path.abspath('../egse.cfg')): - file_path = '../egse.cfg' - elif os.path.isfile(os.path.abspath('egse.cfg')): - file_path = 'egse.cfg' - try: - #logger.info('read_config: Reading configuration file: {}'.format(os.path.abspath(file_path))) - config = config.Config(file_path=file_path) - return config - except Exception as exception: - pass - #logger.error('Could not find the configuration file: {}'.format(os.path.abspath(file_path))) - #logger.exception(exception) - - -def read_config_file(): - config = None - # load the config file - configuration_file = os.path.realpath('egse.cfg') - if os.path.isfile(os.path.abspath(configuration_file)): - try: - config = configparser.ConfigParser() - config.read(configuration_file) - config.source = configuration_file - except Exception as ex: - logger.critical('Configuration file {} could not be read!'.format(configuration_file)) - logger.exception(ex) - else: - configuration_file = os.path.realpath('../egse.cfg') - if os.path.isfile(os.path.abspath(configuration_file)): - try: - config = configparser.ConfigParser() - config.read(configuration_file) - config.source = configuration_file - except Exception as ex: - logger.critical('Configuration file {} could not be read!'.format(configuration_file)) - logger.exception(ex) - else: - logger.error('Could not find the configuration file: {}'.format(os.path.abspath(configuration_file))) - return config +# def read_config(): +# print(os.getcwd()) +# file_path = None +# if os.path.isfile(os.path.abspath('../egse.cfg')): +# file_path = '../egse.cfg' +# elif os.path.isfile(os.path.abspath('egse.cfg')): +# file_path = 'egse.cfg' +# try: +# #logger.info('read_config: Reading configuration file: {}'.format(os.path.abspath(file_path))) +# config = config.Config(file_path=file_path) +# return config +# except Exception as exception: +# pass +# #logger.error('Could not find the configuration file: {}'.format(os.path.abspath(file_path))) +# #logger.exception(exception) + + +# def read_config_file(): +# config = None +# # load the config file +# configuration_file = os.path.realpath('egse.cfg') +# if os.path.isfile(os.path.abspath(configuration_file)): +# try: +# config = configparser.ConfigParser() +# config.read(configuration_file) +# config.source = configuration_file +# except Exception as ex: +# logger.critical('Configuration file {} could not be read!'.format(configuration_file)) +# logger.exception(ex) +# else: +# configuration_file = os.path.realpath('../egse.cfg') +# if os.path.isfile(os.path.abspath(configuration_file)): +# try: +# config = configparser.ConfigParser() +# config.read(configuration_file) +# config.source = configuration_file +# except Exception as ex: +# logger.critical('Configuration file {} could not be read!'.format(configuration_file)) +# logger.exception(ex) +# else: +# logger.error('Could not find the configuration file: {}'.format(os.path.abspath(configuration_file))) +# return config # read the path for the logging files from the configuration file egse.cfg. # if the directory does not exist it is created. -# @param self Reference to the current instance of CCScom -def get_path_for_testing_logs(ccs): +def get_path_for_testing_logs(): # fetch the path from the project config file - path = ccs.cfg.get('LOGGING', 'test_run') + # path = cfl.cfg.get('LOGGING', 'test_run') + path = confignator.get_config().get('tst-logging', 'test_run') # create the directory for the logging files if it does not exist os.makedirs(path, mode=0o777, exist_ok=True) return path @@ -215,11 +218,11 @@ def print_apids(): # @param value1: <float> or <tmpacket> # @param value2: <float> or <tmpacket> # @return: <float> -def get_cuc_diff(ccs, value1, value2): +def get_cuc_diff(value1, value2): if not isinstance(value1, float): # if value1 is a TM packet - value1 = ccs.get_cuctime(value1) + value1 = cfl.get_cuctime(value1) if not isinstance(value2, float): # if value2 is a TM packet - value2 = ccs.get_cuctime(value2) + value2 = cfl.get_cuctime(value2) difference = math.fabs(value2-value1) return difference -- GitLab