Skip to content
GitLab
Explore
Sign in
Primary navigation
Search or go to…
Project
C
CCS
Manage
Activity
Members
Plan
Wiki
Code
Repository
Branches
Commits
Tags
Repository graph
Compare revisions
Snippets
Deploy
Releases
Package registry
Model registry
Operate
Terraform modules
Analyze
Contributor analytics
Model experiments
Help
Help
Support
GitLab documentation
Compare GitLab plans
Community forum
Contribute to GitLab
Provide feedback
Terms and privacy
Keyboard shortcuts
?
Snippets
Groups
Projects
Show more breadcrumbs
Marko Mecina
CCS
Commits
9d4fc5c3
Commit
9d4fc5c3
authored
2 years ago
by
Sebastian
Browse files
Options
Downloads
Patches
Plain Diff
added functions in verification_tab.py and used said functions in verification_tab.py
parent
f2cd5814
No related branches found
No related tags found
No related merge requests found
Changes
3
Hide whitespace changes
Inline
Side-by-side
Showing
3 changed files
Tst/testing_library/testlib/tm.py
+333
-3
333 additions, 3 deletions
Tst/testing_library/testlib/tm.py
Tst/testing_library/testlib/tm_test.py
+93
-0
93 additions, 0 deletions
Tst/testing_library/testlib/tm_test.py
Tst/tst/verification_tab.py
+44
-111
44 additions, 111 deletions
Tst/tst/verification_tab.py
with
470 additions
and
114 deletions
Tst/testing_library/testlib/tm.py
+
333
−
3
View file @
9d4fc5c3
...
@@ -979,7 +979,7 @@ def extract_apid_from_packetid(packet_id):
...
@@ -979,7 +979,7 @@ def extract_apid_from_packetid(packet_id):
return
apid
return
apid
def
get_tc_acknow
(
pool_name
,
t_tc_sent
,
tc_apid
,
tc_ssc
,
tm_st
=
1
,
tm_sst
=
None
):
def
get_tc_acknow
(
pool_name
=
"
LIVE
"
,
t_tc_sent
=
0
,
tc_apid
=
321
,
tc_ssc
=
1
,
tm_st
=
1
,
tm_sst
=
None
):
"""
"""
Check if for the TC acknowledgement packets can be found in the database.
Check if for the TC acknowledgement packets can be found in the database.
This function makes a single database query.
This function makes a single database query.
...
@@ -1076,7 +1076,7 @@ def get_tc_acknow(pool_name, t_tc_sent, tc_apid, tc_ssc, tm_st=1, tm_sst=None):
...
@@ -1076,7 +1076,7 @@ def get_tc_acknow(pool_name, t_tc_sent, tc_apid, tc_ssc, tm_st=1, tm_sst=None):
return
result
,
ack_tms
return
result
,
ack_tms
def
await_tc_acknow
(
pool_name
,
tc_identifier
,
duration
=
10
,
tm_st
=
1
,
tm_sst
=
None
):
def
await_tc_acknow
(
tc_identifier
,
pool_name
=
"
LIVE
"
,
duration
=
10
,
tm_st
=
1
,
tm_sst
=
None
):
"""
Waiting to receive the acknowledgement packet of a sent telecommand (TC) for a given duration.
"""
Waiting to receive the acknowledgement packet of a sent telecommand (TC) for a given duration.
As soon as acknowledgement packets were found the function returns.
As soon as acknowledgement packets were found the function returns.
...
@@ -1156,6 +1156,279 @@ def await_tc_acknow(pool_name, tc_identifier, duration=10, tm_st=1, tm_sst=None)
...
@@ -1156,6 +1156,279 @@ def await_tc_acknow(pool_name, tc_identifier, duration=10, tm_st=1, tm_sst=None)
return
result
,
ack_list
return
result
,
ack_list
def
get_5_1_tc_acknow
(
pool_name
=
"
LIVE
"
,
t_tc_sent
=
0
,
tc_apid
=
321
,
tc_ssc
=
1
,
tm_st
=
5
,
tm_sst
=
None
):
"""
Check if for the TC acknowledgement packets can be found in the database.
This function makes a single database query.
:param pool_name: str
Name of the TM pool in the database
:param t_tc_sent: float
CUC timestamp from which search for telecommand should start
:param tc_apid: int or str
Application process ID of the sent TC. Can be provided as integer or hexadecimal string
:param tc_ssc: int
Source sequence counter of the sent TC
:return: (boolean, list)
boolean:
True if one or up to all acknowledgement packets TM(5,1), TM(5,3), TM(5,7) were found
False if one or all of TM(5,2), TM(5,4), TM(5,8) were found
list:
List of the acknowledgement TM packets for the TC,
[] if no acknowledgement TM packets could be found in the database
"""
print
(
"
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
"
)
result
=
None
assert
isinstance
(
pool_name
,
str
)
assert
isinstance
(
tc_apid
,
(
int
,
str
))
assert
isinstance
(
t_tc_sent
,
(
float
,
int
))
# if the tc_apid is provided as hexadecimal number, convert it to and integer
tc_apid
=
tools
.
convert_apid_to_int
(
apid
=
tc_apid
)
# make database query
packets
=
fetch_packets
(
pool_name
=
pool_name
,
st
=
tm_st
,
sst
=
tm_sst
,
t_from
=
t_tc_sent
)
# print(packets[1][0][0].CTIME + (packets[1][0][0].FTIME/1000000))
# filter for TM packets with the correct APID and source sequence counter (SSC) in the data field
ack_tms
=
[]
for
i
in
range
(
len
(
packets
)):
if
packets
[
i
][
1
]
is
not
None
and
packets
[
i
][
1
][
0
]
is
not
None
:
# get the data entries for APID and SSC
# pac_apid = packets[i][0][3] # TODO: not compatible anymore
pac_apid
=
packets
[
i
][
0
][
0
].
APID
if
pac_apid
==
961
:
# for acknowledgements from SEM
name_apid
=
'
PAR_CMD_APID
'
name_psc
=
'
PAR_CMD_SEQUENCE_COUNT
'
else
:
name_apid
=
'
TcPcktId
'
name_psc
=
'
TcPcktSeqCtrl
'
para
=
get_tm_data_entries
(
tm_packet
=
packets
[
i
],
data_entry_names
=
[
name_apid
,
name_psc
])
if
name_apid
in
para
and
name_psc
in
para
:
# extract the SSC from the PSC
ssc
=
extract_ssc_from_psc
(
psc
=
para
[
name_psc
])
apid
=
extract_apid_from_packetid
(
packet_id
=
para
[
name_apid
])
if
pac_apid
==
961
:
# acknowledgement packets from SEM have the PID in the field 'PAR_CMD_APID'
tc_pid
=
tools
.
extract_pid_from_apid
(
tc_apid
)
if
apid
==
tc_pid
and
ssc
==
tc_ssc
:
ack_tms
.
append
(
packets
[
i
])
else
:
if
apid
==
tc_apid
and
ssc
==
tc_ssc
:
ack_tms
.
append
(
packets
[
i
])
else
:
logger
.
debug
(
'
get_tc_acknow: could not read the data from the TM packet
'
)
# treat with the result from the database query
if
len
(
ack_tms
)
>
0
:
# get the ST and SST of the TC for logging purposes
tc_st
,
tc_sst
=
get_st_and_sst
(
pool_name
=
pool_name
,
apid
=
tc_apid
,
ssc
=
tc_ssc
,
is_tm
=
False
,
t_from
=
t_tc_sent
)
logger
.
info
(
'
Received acknowledgement TM packets for TC({},{}) apid={} ssc={}:
'
.
format
(
tc_st
,
tc_sst
,
tc_apid
,
tc_ssc
))
# check if there was a failure, the result becomes False if a failure occurred
for
i
in
range
(
len
(
ack_tms
)):
head
=
ack_tms
[
i
][
0
][
0
]
data
=
ack_tms
[
i
][
1
]
if
result
is
not
False
:
if
head
.
SERV_SUB_TYPE
in
[
1
,
3
,
7
]:
logger
.
info
(
'
TM({},{}) @ {}
'
.
format
(
head
.
SERV_TYPE
,
head
.
SERV_SUB_TYPE
,
cfl
.
get_cuctime
(
head
)))
result
=
True
elif
head
.
SERV_SUB_TYPE
in
[
2
,
4
,
8
]:
if
head
.
SERV_SUB_TYPE
==
2
:
logger
.
info
(
'
TM({},{}) @ {} FAILURE: Acknowledge failure of acceptance check for a command.
'
.
format
(
head
.
SERV_TYPE
,
head
.
SERV_SUB_TYPE
,
cfl
.
get_cuctime
(
head
)))
logger
.
debug
(
'
Data of the TM packet: {}
'
.
format
(
data
))
if
head
.
SERV_SUB_TYPE
==
4
:
logger
.
info
(
'
TM({},{}) @ {} FAILURE: Acknowledge failure of start check for a command.
'
.
format
(
head
.
SERV_TYPE
,
head
.
SERV_SUB_TYPE
,
cfl
.
get_cuctime
(
head
)))
logger
.
debug
(
'
Data of the TM packet: {}
'
.
format
(
data
))
if
head
.
SERV_SUB_TYPE
==
8
:
logger
.
info
(
'
TM({},{}) @ {} FAILURE: Acknowledge failure of termination check for a command.
'
.
format
(
head
.
SERV_TYPE
,
head
.
SERV_SUB_TYPE
,
cfl
.
get_cuctime
(
head
)))
logger
.
debug
(
'
Data of the TM packet: {}
'
.
format
(
data
))
result
=
False
return
result
,
ack_tms
pass
def
get_8_1_tc_acknow
(
pool_name
=
"
LIVE
"
,
t_tc_sent
=
0
,
tc_apid
=
321
,
tc_ssc
=
1
,
tm_st
=
8
,
tm_sst
=
None
):
"""
Check if for the TC acknowledgement packets can be found in the database.
This function makes a single database query.
:param pool_name: str
Name of the TM pool in the database
:param t_tc_sent: float
CUC timestamp from which search for telecommand should start
:param tc_apid: int or str
Application process ID of the sent TC. Can be provided as integer or hexadecimal string
:param tc_ssc: int
Source sequence counter of the sent TC
:return: (boolean, list)
boolean:
True if one or up to all acknowledgement packets TM(5,1), TM(5,3), TM(5,7) were found
False if one or all of TM(5,2), TM(5,4), TM(5,8) were found
list:
List of the acknowledgement TM packets for the TC,
[] if no acknowledgement TM packets could be found in the database
"""
result
=
None
assert
isinstance
(
pool_name
,
str
)
assert
isinstance
(
tc_apid
,
(
int
,
str
))
assert
isinstance
(
t_tc_sent
,
(
float
,
int
))
# if the tc_apid is provided as hexadecimal number, convert it to and integer
tc_apid
=
tools
.
convert_apid_to_int
(
apid
=
tc_apid
)
# make database query
packets
=
fetch_packets
(
pool_name
=
pool_name
,
st
=
tm_st
,
sst
=
tm_sst
,
t_from
=
t_tc_sent
)
# print(packets[1][0][0].CTIME + (packets[1][0][0].FTIME/1000000))
# filter for TM packets with the correct APID and source sequence counter (SSC) in the data field
ack_tms
=
[]
for
i
in
range
(
len
(
packets
)):
if
packets
[
i
][
1
]
is
not
None
and
packets
[
i
][
1
][
0
]
is
not
None
:
# get the data entries for APID and SSC
# pac_apid = packets[i][0][3] # TODO: not compatible anymore
pac_apid
=
packets
[
i
][
0
][
0
].
APID
if
pac_apid
==
961
:
# for acknowledgements from SEM
name_apid
=
'
PAR_CMD_APID
'
name_psc
=
'
PAR_CMD_SEQUENCE_COUNT
'
else
:
name_apid
=
'
TcPcktId
'
name_psc
=
'
TcPcktSeqCtrl
'
para
=
get_tm_data_entries
(
tm_packet
=
packets
[
i
],
data_entry_names
=
[
name_apid
,
name_psc
])
if
name_apid
in
para
and
name_psc
in
para
:
# extract the SSC from the PSC
ssc
=
extract_ssc_from_psc
(
psc
=
para
[
name_psc
])
apid
=
extract_apid_from_packetid
(
packet_id
=
para
[
name_apid
])
if
pac_apid
==
961
:
# acknowledgement packets from SEM have the PID in the field 'PAR_CMD_APID'
tc_pid
=
tools
.
extract_pid_from_apid
(
tc_apid
)
if
apid
==
tc_pid
and
ssc
==
tc_ssc
:
ack_tms
.
append
(
packets
[
i
])
else
:
if
apid
==
tc_apid
and
ssc
==
tc_ssc
:
ack_tms
.
append
(
packets
[
i
])
else
:
logger
.
debug
(
'
get_tc_acknow: could not read the data from the TM packet
'
)
# treat with the result from the database query
if
len
(
ack_tms
)
>
0
:
# get the ST and SST of the TC for logging purposes
tc_st
,
tc_sst
=
get_st_and_sst
(
pool_name
=
pool_name
,
apid
=
tc_apid
,
ssc
=
tc_ssc
,
is_tm
=
False
,
t_from
=
t_tc_sent
)
logger
.
info
(
'
Received acknowledgement TM packets for TC({},{}) apid={} ssc={}:
'
.
format
(
tc_st
,
tc_sst
,
tc_apid
,
tc_ssc
))
# check if there was a failure, the result becomes False if a failure occurred
for
i
in
range
(
len
(
ack_tms
)):
head
=
ack_tms
[
i
][
0
][
0
]
data
=
ack_tms
[
i
][
1
]
if
result
is
not
False
:
if
head
.
SERV_SUB_TYPE
in
[
1
,
3
,
7
]:
logger
.
info
(
'
TM({},{}) @ {}
'
.
format
(
head
.
SERV_TYPE
,
head
.
SERV_SUB_TYPE
,
cfl
.
get_cuctime
(
head
)))
result
=
True
elif
head
.
SERV_SUB_TYPE
in
[
2
,
4
,
8
]:
if
head
.
SERV_SUB_TYPE
==
2
:
logger
.
info
(
'
TM({},{}) @ {} FAILURE: Acknowledge failure of acceptance check for a command.
'
.
format
(
head
.
SERV_TYPE
,
head
.
SERV_SUB_TYPE
,
cfl
.
get_cuctime
(
head
)))
logger
.
debug
(
'
Data of the TM packet: {}
'
.
format
(
data
))
if
head
.
SERV_SUB_TYPE
==
4
:
logger
.
info
(
'
TM({},{}) @ {} FAILURE: Acknowledge failure of start check for a command.
'
.
format
(
head
.
SERV_TYPE
,
head
.
SERV_SUB_TYPE
,
cfl
.
get_cuctime
(
head
)))
logger
.
debug
(
'
Data of the TM packet: {}
'
.
format
(
data
))
if
head
.
SERV_SUB_TYPE
==
8
:
logger
.
info
(
'
TM({},{}) @ {} FAILURE: Acknowledge failure of termination check for a command.
'
.
format
(
head
.
SERV_TYPE
,
head
.
SERV_SUB_TYPE
,
cfl
.
get_cuctime
(
head
)))
logger
.
debug
(
'
Data of the TM packet: {}
'
.
format
(
data
))
result
=
False
return
result
,
ack_tms
pass
def
get_sid_of_tm
(
packet
):
"""
Gets SID from raw packet data.
:param packet: bytes
Packet whose SID should be extracted
:param decoded_packet: str
Decoded information of packet
:param header: str
extract header from raw data
"""
decoded_packet
=
packet
.
hex
()
header
=
decoded_packet
[
36
:
124
]
return
int
(
header
[
3
])
def
get_frequency_of_hk
(
duration
=
60.
,
frequency
=
20
,
SID
=
1
,
pool_name
=
"
LIVE
"
):
"""
Check the time passing between TM packets during a given duration.
This function makes a multiple database queries.
:param pool_name: str
Name of the TM pool in the database
:param duration: float
timeslot in which the incoming packages are checked for their cuc-timestamp
"""
assert
isinstance
(
duration
,
(
float
,
int
))
assert
isinstance
(
frequency
,
int
)
assert
isinstance
(
SID
,
int
)
list_of_timestamps
=
[]
last_telemetry_packet
=
cfl
.
get_pool_rows
(
pool_name
)[
-
1
].
raw
timestamp
=
cfl
.
get_cuctime
(
last_telemetry_packet
)
# last_packet_sid = get_sid_of_tm(last_telemetry_packet)
current_timestamp
=
0.0
end_time
=
timestamp
+
duration
while
current_timestamp
<=
end_time
:
current_telemetry_packet
=
cfl
.
get_pool_rows
(
pool_name
)[
-
1
].
raw
current_timestamp
=
cfl
.
get_cuctime
(
current_telemetry_packet
)
current_packet_sid
=
get_sid_of_tm
((
current_telemetry_packet
))
if
current_timestamp
not
in
list_of_timestamps
and
current_packet_sid
==
SID
:
list_of_timestamps
.
append
(
current_timestamp
)
else
:
continue
list_of_differences
=
[
j
-
i
for
i
,
j
in
zip
(
list_of_timestamps
[:
-
1
],
list_of_timestamps
[
1
:])]
first_element
=
round
(
list_of_differences
[
0
])
chk
=
True
for
item
in
list_of_differences
:
if
first_element
!=
round
(
item
):
chk
=
False
break
if
chk
==
True
:
# print(round(list_of_differences[0]))
print
(
first_element
)
if
first_element
==
frequency
:
print
(
True
)
return
True
else
:
print
(
"
Didn
'
t work
"
)
return
False
# get_frequency_of_hk()
def
check_acknowledgement
(
pool_name
,
tc_identifier
,
duration
=
10
):
def
check_acknowledgement
(
pool_name
,
tc_identifier
,
duration
=
10
):
"""
"""
Check that for a sent TC the acknowledgement packets were received (assuming the acknowledgement was enabled)
Check that for a sent TC the acknowledgement packets were received (assuming the acknowledgement was enabled)
...
@@ -1538,6 +1811,63 @@ def get_acquisition(pool_name, tm_21_3):
...
@@ -1538,6 +1811,63 @@ def get_acquisition(pool_name, tm_21_3):
logger
.
info
(
meta_data
[
i
])
logger
.
info
(
meta_data
[
i
])
return
result
return
result
def
get_dpu_mode
(
pool_name
=
"
LIVE
"
):
"""
Get the data from the last entry in the database and check its DPU Mode.
:param pool_name: str
Name of the pool for TM/TC packets in the database
:param dpu_mode: dict
DPU Mode of the packet
"""
data_packet
=
cfl
.
get_pool_rows
(
pool_name
)[
-
1
].
raw
dpu_mode
=
get_tm_data_entries
(
data_packet
,
"
DpuMode
"
)
print
(
dpu_mode
)
def
get_packet_length
(
pool_name
=
"
LIVE
"
):
"""
Get the data from the last entry in the database and check its length.
:param pool_name: str
Name of the pool for TM/TC packets in the database
:param data_packet: bit
data in the last entry in database
:param packet_length: int
Length of data_packet
"""
data_packet
=
cfl
.
get_pool_rows
(
pool_name
)[
-
1
].
raw
packet_length
=
len
(
data_packet
)
print
(
packet_length
)
def
get_version_number
(
version_number
=
"
0x0112
"
,
pool_name
=
"
LIVE
"
):
"""
Get the version number of the newest TM packet.
:param version_number: str
version number which should be verified
:param pool_name: str
Name of the pool for TM/TC packets in the database
:param data_packet: bytes
Data in Packet
:param packet_version_number_dic: dic
dictionary containing the version number
:param packet_version_number: str
version number converted into hex
"""
assert
isinstance
(
version_number
,
str
)
data_packet
=
cfl
.
get_pool_rows
(
pool_name
)[
-
1
].
raw
packet_version_number_dic
=
get_tm_data_entries
(
data_packet
,
"
VersionNumber
"
)
# print(packet_version_number)
packet_version_number
=
hex
(
packet_version_number_dic
[
"
VersionNumber
"
])
if
packet_version_number
==
version_number
:
print
(
True
)
return
True
else
:
print
(
False
)
return
False
def
await_tc_identifier
(
pool_name
,
tc_apid
,
tc_ssc
):
def
await_tc_identifier
(
pool_name
,
tc_apid
,
tc_ssc
):
...
@@ -1570,7 +1900,7 @@ def await_tc_identifier(pool_name, tc_apid, tc_ssc):
...
@@ -1570,7 +1900,7 @@ def await_tc_identifier(pool_name, tc_apid, tc_ssc):
return
tc_identifier
return
tc_identifier
def
get_tc_identifier
(
pool_name
,
tc_apid
,
tc_ssc
,
tc_time
):
def
get_tc_identifier
(
pool_name
=
"
LIVE
"
,
tc_apid
=
321
,
tc_ssc
=
1
,
tc_time
=
0.
):
"""
"""
Gather the data necessary to identify the newest telecommand and put it in tuple.
Gather the data necessary to identify the newest telecommand and put it in tuple.
:param pool_name: str
:param pool_name: str
...
...
This diff is collapsed.
Click to expand it.
Tst/testing_library/testlib/tm_test.py
0 → 100644
+
93
−
0
View file @
9d4fc5c3
import
tm
import
ccs_function_lib
as
cfl
import
confignator
import
matplotlib
matplotlib
.
use
(
'
Gtk3Cairo
'
)
from
confignator
import
config
check_cfg
=
config
.
get_config
(
file_path
=
confignator
.
get_option
(
'
config-files
'
,
'
ccs
'
))
telemetry
=
cfl
.
get_pool_rows
(
"
PLM
"
)
telemetry_packet_1
=
cfl
.
get_pool_rows
(
"
PLM
"
)[
0
].
raw
telemetry_packet_2
=
cfl
.
get_pool_rows
(
"
PLM
"
)[
1
].
raw
list_of_tm_packets
=
[
telemetry_packet_1
,
telemetry_packet_2
]
telemetry_raw
=
telemetry
.
all
()[
-
1
].
raw
timestamp
=
cfl
.
get_cuctime
(
telemetry_packet_1
)
# test = tm.get_tm_data_entries(telemetry_packet_1, "EvtId")
# print(test)
timestamp_test
=
tm
.
highest_cuc_timestamp
(
list_of_tm_packets
)
# baba = telemetry[-1].timestamp
# print(baba)
# print("Telemetry Packet: ", telemetry_packet_1)
# print("Telemetry Packet: ", telemetry_packet_2)
# print(tm.get_tm_data_entries(telemetry_packet_1, "DpuMode"))
# get_list_from_db = tm.fetch_packets(pool_name="PLM", is_tm=True, st=None, sst=None, apid=None, ssc=None, t_from=0., t_to=1401.,
# dest_id=None, not_apid=None, decode=False, silent=False)
identified_tc
=
tm
.
get_tc_identifier
(
pool_name
=
"
PLM
"
,
tc_apid
=
321
,
tc_ssc
=
1
,
tc_time
=
100
)
acknowledgement
=
tm
.
await_tc_acknow
(
pool_name
=
"
PLM
"
,
tc_identifier
=
identified_tc
,
duration
=
10
,
tm_st
=
1
,
tm_sst
=
7
)
print
(
acknowledgement
[
1
][
0
][
0
][
0
].
CTIME
+
(
acknowledgement
[
1
][
0
][
0
][
0
].
FTIME
/
1000000
))
"""
# fetch funktion, demonstration von .CTIME und FTIME
test = tm.fetch_packets(pool_name=
"
PLM
"
, is_tm=True, st=3, sst=25, apid=321, ssc=None, t_from=0.00, t_to=21.0,
dest_id=None, not_apid=None, decode=True, silent=False)
# print(test)
# print(test[0][0][1])
# print(
"
Test:
"
, test[0][0][1])
header = test[0][0][0]
# print(header.CTIME)
# print(header.FTIME)
print(header.CTIME + (header.FTIME/1000000))
test_liste = []
for i in test:
test_liste.append(i[0][0])
print(len(test_liste))
print(test_liste)
"""
This diff is collapsed.
Click to expand it.
Tst/tst/verification_tab.py
+
44
−
111
View file @
9d4fc5c3
# !/usr/bin/env python3
# !/usr/bin/env python3
import
gi
import
gi
import
os
gi
.
require_version
(
"
Gtk
"
,
"
3.0
"
)
gi
.
require_version
(
"
Gtk
"
,
"
3.0
"
)
gi
.
require_version
(
"
GtkSource
"
,
"
3.0
"
)
gi
.
require_version
(
"
GtkSource
"
,
"
3.0
"
)
...
@@ -9,6 +9,7 @@ from gi.repository.GdkPixbuf import Pixbuf
...
@@ -9,6 +9,7 @@ from gi.repository.GdkPixbuf import Pixbuf
import
confignator
import
confignator
import
sys
import
sys
sys
.
path
.
append
(
confignator
.
get_option
(
'
paths
'
,
'
ccs
'
))
sys
.
path
.
append
(
confignator
.
get_option
(
'
paths
'
,
'
ccs
'
))
sys
.
path
.
append
(
os
.
path
.
join
(
confignator
.
get_option
(
"
paths
"
,
"
Tst
"
),
"
testing_library/testlib
"
))
import
ccs_function_lib
as
cfl
import
ccs_function_lib
as
cfl
import
s2k_partypes
as
s2k
import
s2k_partypes
as
s2k
...
@@ -23,14 +24,32 @@ import DBus_Basic
...
@@ -23,14 +24,32 @@ import DBus_Basic
from
typing
import
NamedTuple
from
typing
import
NamedTuple
import
confignator
import
confignator
import
gi
import
gi
# print(sys.path)
import
inspect
import
tm
Verification_1
=
"
tm.
"
+
tm
.
get_tc_acknow
.
__name__
+
str
(
inspect
.
signature
((
tm
.
get_tc_acknow
)))
Verification_2
=
"
tm.
"
+
tm
.
await_tc_acknow
.
__name__
+
str
(
inspect
.
signature
((
tm
.
await_tc_acknow
)))
Verification_3
=
"
tm.
"
+
tm
.
get_5_1_tc_acknow
.
__name__
+
str
(
inspect
.
signature
((
tm
.
get_5_1_tc_acknow
)))
tc_identifier
=
"
identified_tc = tm.
"
+
tm
.
get_tc_identifier
.
__name__
+
str
(
inspect
.
signature
((
tm
.
get_tc_identifier
)))
Verification_4
=
"
tm.
"
+
tm
.
get_frequency_of_hk
.
__name__
+
str
(
inspect
.
signature
((
tm
.
get_frequency_of_hk
)))
Verification_5
=
"
tm.
"
+
tm
.
get_dpu_mode
.
__name__
+
str
(
inspect
.
signature
((
tm
.
get_dpu_mode
)))
Verification_6
=
"
tm.
"
+
tm
.
get_packet_length
.
__name__
+
str
(
inspect
.
signature
((
tm
.
get_packet_length
)))
Verification_7
=
"
tm.
"
+
tm
.
get_version_number
.
__name__
+
str
(
inspect
.
signature
((
tm
.
get_version_number
)))
import
verification
as
ver
verification_list
=
[
verification_list
=
[
(
"
Get TC Verification
"
,
1
,
7
),
(
"
Get TC Verification
"
,
1
,
7
,
"
descr
"
,
Verification_1
),
(
"
Await TC Verification
"
,
1
,
7
)
(
"
Await TC Verification
"
,
1
,
7
,
"
descr
"
,
tc_identifier
+
"
\n
"
+
Verification_2
),
(
"
Get TC Verification
"
,
5
,
1
,
"
descr
"
,
Verification_3
),
(
"
Get HK frequency
"
,
None
,
None
,
"
descr
"
,
Verification_4
),
(
"
Get DPU Mode
"
,
None
,
None
,
"
descr
"
,
Verification_5
),
(
"
Get Packet Length
"
,
None
,
None
,
"
descr
"
,
Verification_6
),
(
"
Get Version Number
"
,
None
,
None
,
"
descr
"
,
Verification_7
)
]
]
...
@@ -38,6 +57,14 @@ verification_list = [
...
@@ -38,6 +57,14 @@ verification_list = [
def
translate_drag_data
(
data
):
translated
=
"
kla
"
return
translated
class
VerificationTable
(
Gtk
.
Grid
):
class
VerificationTable
(
Gtk
.
Grid
):
def
__init__
(
self
,
*
args
,
**
kwargs
):
def
__init__
(
self
,
*
args
,
**
kwargs
):
super
().
__init__
(
*
args
,
**
kwargs
)
super
().
__init__
(
*
args
,
**
kwargs
)
...
@@ -51,7 +78,7 @@ class VerificationTable(Gtk.Grid):
...
@@ -51,7 +78,7 @@ class VerificationTable(Gtk.Grid):
self
.
add
(
self
.
grid
)
self
.
add
(
self
.
grid
)
# Creating ListStore model
# Creating ListStore model
self
.
verification_liststore
=
Gtk
.
ListStore
(
str
,
int
,
int
)
self
.
verification_liststore
=
Gtk
.
ListStore
(
str
,
int
,
int
,
str
,
str
)
for
verification_ref
in
verification_list
:
for
verification_ref
in
verification_list
:
self
.
verification_liststore
.
append
(
list
(
verification_ref
))
self
.
verification_liststore
.
append
(
list
(
verification_ref
))
self
.
current_filter_verification
=
None
self
.
current_filter_verification
=
None
...
@@ -64,7 +91,7 @@ class VerificationTable(Gtk.Grid):
...
@@ -64,7 +91,7 @@ class VerificationTable(Gtk.Grid):
# Creating treeview
# Creating treeview
self
.
treeview
=
Gtk
.
TreeView
(
model
=
self
.
verification_filter
)
self
.
treeview
=
Gtk
.
TreeView
(
model
=
self
.
verification_filter
)
for
i
,
column_title
in
enumerate
(
for
i
,
column_title
in
enumerate
(
[
"
Verification
"
,
"
ST
"
,
"
SST
"
]
[
"
Verification
"
,
"
ST
"
,
"
SST
"
,
"
Description
"
]
):
):
renderer
=
Gtk
.
CellRendererText
()
renderer
=
Gtk
.
CellRendererText
()
column
=
Gtk
.
TreeViewColumn
(
column_title
,
renderer
,
text
=
i
)
column
=
Gtk
.
TreeViewColumn
(
column_title
,
renderer
,
text
=
i
)
...
@@ -116,14 +143,21 @@ class VerificationTable(Gtk.Grid):
...
@@ -116,14 +143,21 @@ class VerificationTable(Gtk.Grid):
global
verification_name
global
verification_name
global
ST
global
ST
global
SST
global
SST
global
descr
global
name_string
verification_name
=
model
[
row
][
0
]
verification_name
=
model
[
row
][
0
]
ST
=
model
[
row
][
1
]
ST
=
model
[
row
][
1
]
SST
=
model
[
row
][
2
]
SST
=
model
[
row
][
2
]
global
selected_data_for_drag_drop
descr
=
model
[
row
][
3
]
name_string
=
model
[
row
][
4
]
global
selected_data_for_drag_drop
# print(verification_name)
# print(name_string)
selected_data_for_drag_drop
=
ver
.
verification_template
(
"
Verification_1
"
)
selected_data_for_drag_drop
=
name_string
# selected_data_for_drag_drop = cfl.verification_template(name_string)
# str(verification_name) + "\n ST = " + str(ST) + "\n SST = " + str(SST) + "\n Time = 2"
# str(verification_name) + "\n ST = " + str(ST) + "\n SST = " + str(SST) + "\n Time = 2"
# selected_data_for_drag_drop = "{} ({}, {})".format((name, ST, SST))
# selected_data_for_drag_drop = "{} ({}, {})".format((name, ST, SST))
...
@@ -132,6 +166,7 @@ class VerificationTable(Gtk.Grid):
...
@@ -132,6 +166,7 @@ class VerificationTable(Gtk.Grid):
def
on_drag_data_get
(
self
,
treeview
,
drag_context
,
selection_data
,
info
,
time
,
*
args
):
def
on_drag_data_get
(
self
,
treeview
,
drag_context
,
selection_data
,
info
,
time
,
*
args
):
treeselection
=
treeview
.
get_selection
()
treeselection
=
treeview
.
get_selection
()
model
,
my_iter
=
treeselection
.
get_selected
()
model
,
my_iter
=
treeselection
.
get_selected
()
...
@@ -140,6 +175,7 @@ class VerificationTable(Gtk.Grid):
...
@@ -140,6 +175,7 @@ class VerificationTable(Gtk.Grid):
def
on_drag_begin
(
self
,
*
args
):
def
on_drag_begin
(
self
,
*
args
):
pass
pass
...
@@ -152,109 +188,6 @@ class VerificationTable(Gtk.Grid):
...
@@ -152,109 +188,6 @@ class VerificationTable(Gtk.Grid):
"""
def make_verification_template(pool_name=
"
LIVE
"
, preamble=
"
verification.Verification
"
, options=
""
, comment=False):
prcfg =
""
return
def verification(self):
# if st_variable == 1 and sst_variable == 7:
pool_rows = cfl.get_pool_rows(
"
PLM
"
)
system_time = time.clock_gettime(0)
entry_1_data = pool_rows.all()[-1]
time_1 = entry_1_data.timestamp
if time_1 ==
""
:
first_raw_digits =
""
# this string will contain the first bytes of raw data
telecommand = entry_1_data
telecommand_time = telecommand.timestamp
telecommand_raw = telecommand.raw.hex()
# Variable to generate new telecommand timestamp, other than telecommand_time
telecommand_verification_timestamp = time.clock_gettime(0)
verification_time = telecommand_verification_timestamp + 2
for i in telecommand_raw:
first_raw_digits += str(i)
if len(first_raw_digits) > 7:
break
# print(
"
After Loop telecommand_first_digits:
"
, first_raw_digits)
while system_time < verification_time and system_time != verification_time:
system_time = time.clock_gettime(0)
if system_time >= verification_time:
verification_running = self.type_comparison(first_raw_digits)
def type_comparison(comparison_data):
pool_rows = cfl.get_pool_rows(
"
PLM
"
)
st_list = []
sst_list = []
x = 0
header_counter = 0
while header_counter < 2:
x += 1
entry = pool_rows.all()[-x]
if entry.data.hex() == comparison_data:
st_list.append(entry.stc)
sst_list.append(entry.sst)
# print(
"
ST Entry_
"
+ str(x) +
"
:
"
, entry.stc)
# print(
"
SST Entry_
"
+ str(x) +
"
:
"
, entry.sst)
# print(
"
Timestamp entry_
"
+ str(x) +
"
:
"
, entry.timestamp)
header_counter += 1
st_list_reverse = [st_list[1], st_list[0]]
sst_list_reverse = [sst_list[1], sst_list[0]]
if sst_list_reverse == [1, 7]:
print(
"
Verification successful
"
)
else:
print(
"
Verification unsuccessful
"
)
return False
def get_drop_verification(self):
global verification_name
global ST
global SST
return
def verification_table(self, name, subtype, subsubtype):
"""
...
...
This diff is collapsed.
Click to expand it.
Preview
0%
Loading
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Save comment
Cancel
Please
register
or
sign in
to comment