Skip to content
Snippets Groups Projects
Commit d5aae4fb authored by Sebastian's avatar Sebastian
Browse files

added verification functions

parent c9ec464e
No related branches found
No related tags found
No related merge requests found
...@@ -979,7 +979,120 @@ def extract_apid_from_packetid(packet_id): ...@@ -979,7 +979,120 @@ def extract_apid_from_packetid(packet_id):
return apid return apid
def get_tc_acknow(pool_name="LIVE", t_tc_sent=0, tc_apid=321, tc_ssc=1, tm_st=1, tm_sst=None):
def get_tc_acknow(pool_name="LIVE", tc_apid=321, tc_ssc=1, tm_st=1, tm_sst=None):
"""
Check if for the TC acknowledgement packets can be found in the database.
This function makes a single database query.
:param pool_name: str
Name of the TM pool in the database
:param t_tc_sent: float
CUC timestamp from which search for telecommand should start
:param tc_apid: int or str
Application process ID of the sent TC. Can be provided as integer or hexadecimal string
:param tc_ssc: int
Source sequence counter of the sent TC
:return: (boolean, list)
boolean:
True if one or up to all acknowledgement packets TM(1,1), TM(1,3), TM(1,7) were found
False if one or all of TM(1,2), TM(1,4), TM(1,8) were found
list:
List of the acknowledgement TM packets for the TC,
[] if no acknowledgement TM packets could be found in the database
"""
result = None
tc_data = get_data_of_last_tc(pool_name)
t_tc_sent = tc_data[0]
tc_iid = tc_data[1]
tc_bytes = tc_data[2]
assert isinstance(pool_name, str)
assert isinstance(tc_apid, (int, str))
assert isinstance(t_tc_sent, (float, int))
assert isinstance(tc_iid, int)
assert isinstance(tc_bytes, (int, str))
# if the tc_apid is provided as hexadecimal number, convert it to and integer
tc_apid = tools.convert_apid_to_int(apid=tc_apid)
# make database query
packets = fetch_packets(pool_name=pool_name, st=tm_st, sst=tm_sst, t_from=t_tc_sent - 1)
# print(packets[1][0][0].CTIME + (packets[1][0][0].FTIME/1000000))
# filter for TM packets with the correct APID and source sequence counter (SSC) in the data field
ack_tms = []
for i in range(len(packets)):
if packets[i][1] is not None and packets[i][1][0] is not None:
# get the data entries for APID and SSC
#pac_apid = packets[i][0][3] # TODO: not compatible anymore
pac_apid = packets[i][0][0].APID
first_tm_bytes = packets[i][0][1].hex()[0:4]
if pac_apid == 961: # for acknowledgements from SEM
name_apid = 'PAR_CMD_APID'
name_psc = 'PAR_CMD_SEQUENCE_COUNT'
else:
name_apid = 'TcPcktId'
name_psc = 'TcPcktSeqCtrl'
para = get_tm_data_entries(tm_packet=packets[i], data_entry_names=[name_apid, name_psc])
if name_apid in para and name_psc in para:
# extract the SSC from the PSC
ssc = extract_ssc_from_psc(psc=para[name_psc])
apid = extract_apid_from_packetid(packet_id=para[name_apid])
if pac_apid == 961: # acknowledgement packets from SEM have the PID in the field 'PAR_CMD_APID'
tc_pid = tools.extract_pid_from_apid(tc_apid)
if apid == tc_pid and ssc == tc_ssc:
ack_tms.append(packets[i])
else:
if apid == tc_apid and ssc == tc_ssc:
ack_tms.append(packets[i])
else:
logger.debug('get_tc_acknow: could not read the data from the TM packet')
# treat with the result from the database query
if len(ack_tms) > 0:
# get the ST and SST of the TC for logging purposes
tc_st, tc_sst = get_st_and_sst(pool_name=pool_name,
apid=tc_apid,
ssc=tc_ssc,
is_tm=False,
t_from=t_tc_sent)
logger.info('Received acknowledgement TM packets for TC({},{}) apid={} ssc={}:'
.format(tc_st, tc_sst, tc_apid, tc_ssc))
# check if there was a failure, the result becomes False if a failure occurred
for i in range(len(ack_tms)):
head = ack_tms[i][0][0]
data = ack_tms[i][1]
if result is not False:
if head.SERV_SUB_TYPE in [1, 3, 7] and (first_tm_bytes == tc_bytes):
logger.info('TM({},{}) @ {}'.format(head.SERV_TYPE, head.SERV_SUB_TYPE, cfl.get_cuctime(head)))
result = True
# print(head.SERV_TYPE, head.SERV_SUB_TYPE)
elif head.SERV_SUB_TYPE in [2, 4, 8] and (first_tm_bytes == tc_bytes):
if head.SERV_SUB_TYPE == 2:
logger.info('TM({},{}) @ {} FAILURE: Acknowledge failure of acceptance check for a command.'
.format(head.SERV_TYPE, head.SERV_SUB_TYPE, cfl.get_cuctime(head)))
logger.debug('Data of the TM packet: {}'.format(data))
if head.SERV_SUB_TYPE == 4:
logger.info('TM({},{}) @ {} FAILURE: Acknowledge failure of start check for a command.'
.format(head.SERV_TYPE, head.SERV_SUB_TYPE, cfl.get_cuctime(head)))
logger.debug('Data of the TM packet: {}'.format(data))
if head.SERV_SUB_TYPE == 8:
logger.info(
'TM({},{}) @ {} FAILURE: Acknowledge failure of termination check for a command.'
.format(head.SERV_TYPE, head.SERV_SUB_TYPE, cfl.get_cuctime(head)))
logger.debug('Data of the TM packet: {}'.format(data))
result = False
else:
logger.info('No TM found with matching pattern {}'.format(tc_bytes))
return result, ack_tms
def get_tc_acknow_original(pool_name="LIVE", t_tc_sent=0, tc_apid=321, tc_ssc=1, tm_st=1, tm_sst=None):
""" """
Check if for the TC acknowledgement packets can be found in the database. Check if for the TC acknowledgement packets can be found in the database.
This function makes a single database query. This function makes a single database query.
...@@ -1057,6 +1170,7 @@ def get_tc_acknow(pool_name="LIVE", t_tc_sent=0, tc_apid=321, tc_ssc=1, tm_st=1, ...@@ -1057,6 +1170,7 @@ def get_tc_acknow(pool_name="LIVE", t_tc_sent=0, tc_apid=321, tc_ssc=1, tm_st=1,
if head.SERV_SUB_TYPE in [1, 3, 7]: if head.SERV_SUB_TYPE in [1, 3, 7]:
logger.info('TM({},{}) @ {}'.format(head.SERV_TYPE, head.SERV_SUB_TYPE, cfl.get_cuctime(head))) logger.info('TM({},{}) @ {}'.format(head.SERV_TYPE, head.SERV_SUB_TYPE, cfl.get_cuctime(head)))
result = True result = True
print(head.SERV_TYPE, head.SERV_SUB_TYPE)
elif head.SERV_SUB_TYPE in [2, 4, 8]: elif head.SERV_SUB_TYPE in [2, 4, 8]:
if head.SERV_SUB_TYPE == 2: if head.SERV_SUB_TYPE == 2:
logger.info('TM({},{}) @ {} FAILURE: Acknowledge failure of acceptance check for a command.' logger.info('TM({},{}) @ {} FAILURE: Acknowledge failure of acceptance check for a command.'
...@@ -1156,116 +1270,117 @@ def await_tc_acknow(tc_identifier, pool_name="LIVE", duration=10, tm_st=1, tm_ss ...@@ -1156,116 +1270,117 @@ def await_tc_acknow(tc_identifier, pool_name="LIVE", duration=10, tm_st=1, tm_ss
return result, ack_list return result, ack_list
def get_5_1_tc_acknow(pool_name="LIVE", t_tc_sent=0.0, tc_apid=321, tc_ssc=1, tm_st=5, tm_sst=None): def get_5_1_tc_acknow(pool_name="LIVE", tm_name="name", parameter_1=5, parameter_2=3):
""" """
Check if for the TC acknowledgement packets can be found in the database. Check if for the TC acknowledgement packets can be found in the database.
This function makes a single database query. This function makes a single database query.
:param pool_name: str :param pool_name: str
Name of the TM pool in the database Name of the TM pool in the database
:param t_tc_sent: float :param tm_name: string
CUC timestamp from which search for telecommand should start Description of TM as found in TM Table
:param tc_apid: int or str :param parameter_1: int
Application process ID of the sent TC. Can be provided as integer or hexadecimal string Value of the second parameter of the TM
:param tc_ssc: int :param parameter_2: int
Source sequence counter of the sent TC Value of the third parameter of the TM
:return: (boolean, list) :return: (boolean)
boolean: boolean:
True if one or up to all acknowledgement packets TM(5,1), TM(5,3), TM(5,7) were found True if the 5_1 packet was found with the given parameters
False if one or all of TM(5,2), TM(5,4), TM(5,8) were found False if no such packet was found
list:
List of the acknowledgement TM packets for the TC,
[] if no acknowledgement TM packets could be found in the database
""" """
result = None
assert isinstance(pool_name, str) assert isinstance(pool_name, str)
assert isinstance(tc_apid, (int, str)) assert isinstance(tm_name, str)
assert isinstance(t_tc_sent, (float, int)) assert isinstance(parameter_1, int)
assert isinstance((parameter_2, int))
time_of_packet = get_time_of_last_tc(pool_name)
packets = fetch_packets(pool_name=pool_name, is_tm=True, st=5, sst=None, apid=321, ssc=None, t_from=time_of_packet,
t_to=time_of_packet+30 , dest_id=None, not_apid=None, decode=True, silent=False)
if tm_name == "SASW Rep1_EVT_IASW_TR" and parameter_1 == 5 and parameter_2 == 3:
assert packets[0][1][0][1][4][0] == parameter_1
assert packets[0][1][0][2][4][0] == parameter_2
return True
elif tm_name == "SASW Rep1_EVT_FEE_TR":
if parameter_1 == 3 and parameter_2 == 2:
assert packets[1][1][0][1][4][0] == parameter_1
assert packets[1][1][0][2][4][0] == parameter_2
return True
elif parameter_1 == 2 and parameter_2 == 7:
assert packets[2][1][0][1][4][0] == parameter_1
assert packets[2][1][0][2][4][0] == parameter_2
return True
elif parameter_1 == 7 and parameter_2 == 6:
assert packets[3][1][0][1][4][0] == parameter_1
assert packets[3][1][0][2][4][0] == parameter_2
return True
else:
print("Packet does not exist. SASW Rep1_EVT_FEE_TR")
return False
else:
print("Packet does not exist. ASW Rep1_EVT_IASW_TR")
return False
# return packets
# if the tc_apid is provided as hexadecimal number, convert it to and integer
tc_apid = tools.convert_apid_to_int(apid=tc_apid)
evtid_list = []
data_list = []
# make database query
packets = fetch_packets(pool_name=pool_name, st=tm_st, sst=tm_sst, t_from=t_tc_sent)
return packets
"""
for packet in packets:
evtid = get_tm_data_entries(packet, "EvtId")
evtid_list.append(evtid)
data = packet[0][1].hex()
data_list.append(data)
print(evtid_list)
print(data_list)
"""
# print(packets[1][0][0].CTIME + (packets[1][0][0].FTIME/1000000)) # print(packets[1][0][0].CTIME + (packets[1][0][0].FTIME/1000000))
# filter for TM packets with the correct APID and source sequence counter (SSC) in the data field # filter for TM packets with the correct APID and source sequence counter (SSC) in the data field
"""
ack_tms = []
for i in range(len(packets)):
if packets[i][1] is not None and packets[i][1][0] is not None: def check_if_packet_is_received(ST=1,SST=7,pool_name="LIVE"):
# get the data entries for APID and SSC assert isinstance(pool_name, str)
# pac_apid = packets[i][0][3] # TODO: not compatible anymore assert isinstance(ST, int)
pac_apid = packets[i][0][0].APID assert isinstance(SST, int)
if pac_apid == 961: # for acknowledgements from SEM
name_apid = 'PAR_CMD_APID'
name_psc = 'PAR_CMD_SEQUENCE_COUNT' raw_packet_list, data_packet_list = get_last_100_packets(pool_name=pool_name)
else: # print(raw_packet_list)
name_apid = 'TcPcktId'
name_psc = 'TcPcktSeqCtrl'
para = get_tm_data_entries(tm_packet=packets[i], data_entry_names=[name_apid, name_psc]) # ST 10 Stelle
if name_apid in para and name_psc in para: # SST 11 Stelle
# extract the SSC from the PSC for i in range(len(raw_packet_list)):
ssc = extract_ssc_from_psc(psc=para[name_psc]) packet = cfl.get_header_parameters_detailed(raw_packet_list[i])
apid = extract_apid_from_packetid(packet_id=para[name_apid])
if pac_apid == 961: # acknowledgement packets from SEM have the PID in the field 'PAR_CMD_APID' print(packet)
tc_pid = tools.extract_pid_from_apid(tc_apid)
if apid == tc_pid and ssc == tc_ssc: current_packet_ST = packet[10][1]
ack_tms.append(packets[i]) current_packet_SST = packet[11][1]
else:
if apid == tc_apid and ssc == tc_ssc:
ack_tms.append(packets[i]) try:
current_timestamp = cfl.get_cuctime(raw_packet_list[i])
except:
current_timestamp = ""
if current_packet_ST == ST and current_packet_SST == SST:
print("Match")
print(ST, SST, current_timestamp)
return True
else: else:
logger.debug('get_tc_acknow: could not read the data from the TM packet') continue
# print(time)
if current_packet_ST == ST and current_packet_SST == SST:
print("Found it")
# treat with the result from the database query
if len(ack_tms) > 0:
# get the ST and SST of the TC for logging purposes
tc_st, tc_sst = get_st_and_sst(pool_name=pool_name,
apid=tc_apid,
ssc=tc_ssc,
is_tm=False,
t_from=t_tc_sent)
logger.info('Received acknowledgement TM packets for TC({},{}) apid={} ssc={}:'
.format(tc_st, tc_sst, tc_apid, tc_ssc))
# check if there was a failure, the result becomes False if a failure occurred
for i in range(len(ack_tms)):
head = ack_tms[i][0][0]
data = ack_tms[i][1]
if result is not False:
if head.SERV_SUB_TYPE in [1, 3, 7]:
logger.info('TM({},{}) @ {}'.format(head.SERV_TYPE, head.SERV_SUB_TYPE, cfl.get_cuctime(head)))
result = True
elif head.SERV_SUB_TYPE in [2, 4, 8]:
if head.SERV_SUB_TYPE == 2:
logger.info('TM({},{}) @ {} FAILURE: Acknowledge failure of acceptance check for a command.'
.format(head.SERV_TYPE, head.SERV_SUB_TYPE, cfl.get_cuctime(head)))
logger.debug('Data of the TM packet: {}'.format(data))
if head.SERV_SUB_TYPE == 4:
logger.info('TM({},{}) @ {} FAILURE: Acknowledge failure of start check for a command.'
.format(head.SERV_TYPE, head.SERV_SUB_TYPE, cfl.get_cuctime(head)))
logger.debug('Data of the TM packet: {}'.format(data))
if head.SERV_SUB_TYPE == 8:
logger.info(
'TM({},{}) @ {} FAILURE: Acknowledge failure of termination check for a command.'
.format(head.SERV_TYPE, head.SERV_SUB_TYPE, cfl.get_cuctime(head)))
logger.debug('Data of the TM packet: {}'.format(data))
result = False
return result, ack_tms
pass
"""
def get_sid_of_tm(packet): def get_sid_of_tm(packet):
...@@ -1273,11 +1388,6 @@ def get_sid_of_tm(packet): ...@@ -1273,11 +1388,6 @@ def get_sid_of_tm(packet):
Gets SID from raw packet data. Gets SID from raw packet data.
:param packet: bytes :param packet: bytes
Packet whose SID should be extracted Packet whose SID should be extracted
:param decoded_packet: str
Decoded information of packet
:param header: str
extract header from raw data
""" """
decoded_packet = packet.hex() decoded_packet = packet.hex()
...@@ -1729,11 +1839,13 @@ def get_acquisition(pool_name, tm_21_3): ...@@ -1729,11 +1839,13 @@ def get_acquisition(pool_name, tm_21_3):
return result return result
def get_time_of_last_tc(pool_name="LIVE"): def get_data_of_last_tc(pool_name="LIVE"):
""" """
Finds the newest Telecommand in database with the Telemetry just before. Finds the newest Telecommand in database with the Telemetry just before.
param pool_name: str :param pool_name: str
Name of the pool for TM/TC packets in the database Name of the pool for TM/TC packets in the database
:return: str
timestamp of packet, IID of packet, First four bytes of packet data
""" """
...@@ -1744,11 +1856,86 @@ def get_time_of_last_tc(pool_name="LIVE"): ...@@ -1744,11 +1856,86 @@ def get_time_of_last_tc(pool_name="LIVE"):
if tm_pool[i].timestamp == "": if tm_pool[i].timestamp == "":
# last_telecommand = tm_pool[i] # last_telecommand = tm_pool[i]
tm_before_tc = tm_pool[i-1] tm_before_tc = tm_pool[i-1]
tc_id = tm_pool[i].iid
tc_bytes = tm_pool[i].raw.hex()[:4]
break break
timestamp = tm_before_tc.timestamp timestamp = tm_before_tc.timestamp
timestamp_length = len(timestamp) timestamp_length = len(timestamp)
timestamp = float(timestamp[:timestamp_length -1]) timestamp = float(timestamp[:timestamp_length -1])
return timestamp return timestamp, tc_id, tc_bytes
def verify_no_more_hk(wait_time=5, pool_name="LIVE"):
"""
Function waits for a spcified time and the searches the data pool for HK telemetry.
Returns True if there is no HK.
Returns False if there is HK.
:param wait_time: int
timespan in which the program checks if a hk has appeard
:param pool_name: str
Name of the pool for TM/TC packets in the database
:return: boolean
True if there is no HK, False if there is a HK
"""
assert isinstance(SID, int)
assert isinstance(pool_name, str)
hk_signature = "0001050000000000000000000000000000000000000000000000000000000000000000000000000000000112"
time.sleep(wait_time)
# TODO incorporate function to find last 100 packets
list_of_packets = []
for i in range (1,1000):
current_telemetry_packet = cfl.get_pool_rows(pool_name)[-i]
current_raw = current_telemetry_packet.raw
current_data = current_telemetry_packet.data
list_of_packets.append(current_data.hex())
try:
current_timestamp = cfl.get_cuctime(current_raw)
print(current_timestamp)
except:
break
if hk_signature in list_of_packets:
return False
else:
return True
def get_tm_timestamp(packet, pool_name="LIVE"):
"""
gets timestamp from packet irregardless of data type
"""
pass
def get_last_100_packets(pool_name="Live"):
# TODO Fix function to work with any number of packets
raw_packets = []
data_packets = []
data_pool = cfl.get_pool_rows(pool_name)
for i in range(1, 101):
try:
current_packet = cfl.get_pool_rows(pool_name)[-i]
current_raw = current_packet.raw
current_data = current_packet.data
raw_packets.append(current_raw)
data_packets.append(current_data)
except IndexError:
break
return raw_packets, data_packets
def get_dpu_mode(pool_name="LIVE"): def get_dpu_mode(pool_name="LIVE"):
......
...@@ -14,11 +14,11 @@ output-file-path = ${paths:tst}/logs_test_runs/output_files/ ...@@ -14,11 +14,11 @@ output-file-path = ${paths:tst}/logs_test_runs/output_files/
[tst-preferences] [tst-preferences]
show-json-view = True show-json-view = True
main-window-height = 1090 main-window-height = 1016
main-window-width = 1920 main-window-width = 1848
paned-position = 953 paned-position = 1019
paned-position-codeblockreuse = 520 paned-position-codeblockreuse = 520
[tst-history] [tst-history]
last-folder = /home/marko/space/smile/OBSW/Documentation/testspec/tst last-folder = /home/sebastian/OBSW/Documentation/testspec/tst/derived_specs
...@@ -10,6 +10,7 @@ import confignator ...@@ -10,6 +10,7 @@ import confignator
import sys import sys
sys.path.append(confignator.get_option('paths', 'ccs')) sys.path.append(confignator.get_option('paths', 'ccs'))
sys.path.append(os.path.join(confignator.get_option("paths", "Tst"), "testing_library/testlib")) sys.path.append(os.path.join(confignator.get_option("paths", "Tst"), "testing_library/testlib"))
sys.path.append('/home/sebastian/CCS/Tst/testing_library/testlib') # notwendig damit tm als Modul erkannt wird
import ccs_function_lib as cfl import ccs_function_lib as cfl
import s2k_partypes as s2k import s2k_partypes as s2k
...@@ -38,11 +39,21 @@ Verification_4 = "tm." + tm.get_frequency_of_hk.__name__ + str(inspect.signature ...@@ -38,11 +39,21 @@ Verification_4 = "tm." + tm.get_frequency_of_hk.__name__ + str(inspect.signature
Verification_5 = "tm." + tm.get_dpu_mode.__name__ + str(inspect.signature((tm.get_dpu_mode))) Verification_5 = "tm." + tm.get_dpu_mode.__name__ + str(inspect.signature((tm.get_dpu_mode)))
Verification_6 = "tm." + tm.get_packet_length.__name__ + str(inspect.signature((tm.get_packet_length))) Verification_6 = "tm." + tm.get_packet_length.__name__ + str(inspect.signature((tm.get_packet_length)))
Verification_7 = "tm." + tm.get_version_number.__name__ + str(inspect.signature((tm.get_version_number))) Verification_7 = "tm." + tm.get_version_number.__name__ + str(inspect.signature((tm.get_version_number)))
Verification_8 = "tm." + tm.get_time_of_last_tc.__name__ + str(inspect.signature((tm.get_time_of_last_tc))) Verification_8 = "tm." + tm.get_data_of_last_tc.__name__ + str(inspect.signature((tm.get_data_of_last_tc)))
Verification_9 = "tm." + tm.verify_no_more_hk.__name__ + str(inspect.signature((tm.verify_no_more_hk)))
# !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
# Descriptions
descr_8 = "Get Timestamp of TM before last TC, get IID of last TC, get first 4 bytes of TC raw data."
descr_9 = "Check if there are no more HK packets"
verification_list = [ verification_list = [
("Get TC Verification", 1, 7, "descr", Verification_1), ("Get TC Verification", 1, 7, "descr", Verification_1),
("Await TC Verification", 1, 7, "descr", tc_identifier + "\n" + Verification_2), ("Await TC Verification", 1, 7, "descr", tc_identifier + "\n" + Verification_2),
...@@ -51,7 +62,8 @@ verification_list = [ ...@@ -51,7 +62,8 @@ verification_list = [
("Get DPU Mode", None, None, "descr", Verification_5), ("Get DPU Mode", None, None, "descr", Verification_5),
("Get Packet Length", None, None, "descr", Verification_6), ("Get Packet Length", None, None, "descr", Verification_6),
("Get Version Number", None, None, "descr", Verification_7), ("Get Version Number", None, None, "descr", Verification_7),
("Get Time before last TC", None, None, "descr", Verification_8) ("Get Data of last TC", None, None, descr_8, Verification_8),
("Test if there are no more HK packets", None, None, descr_9, Verification_9)
] ]
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment