Skip to content
GitLab
Explore
Sign in
Primary navigation
Search or go to…
Project
C
CCS
Manage
Activity
Members
Plan
Wiki
Code
Repository
Branches
Commits
Tags
Repository graph
Compare revisions
Snippets
Deploy
Releases
Package registry
Model registry
Operate
Terraform modules
Analyze
Contributor analytics
Model experiments
Help
Help
Support
GitLab documentation
Compare GitLab plans
GitLab community forum
Contribute to GitLab
Provide feedback
Terms and privacy
Keyboard shortcuts
?
Snippets
Groups
Projects
Show more breadcrumbs
Marko Mecina
CCS
Commits
e1391dbd
Commit
e1391dbd
authored
1 year ago
by
Marko Mecina
Browse files
Options
Downloads
Patches
Plain Diff
add new TC ack verification function
parent
666c2952
Branches
Branches containing commit
No related tags found
No related merge requests found
Changes
2
Show whitespace changes
Inline
Side-by-side
Showing
2 changed files
Tst/testing_library/testlib/tm.py
+84
-17
84 additions, 17 deletions
Tst/testing_library/testlib/tm.py
Tst/tst/generator_templates/ver_header.py
+1
-0
1 addition, 0 deletions
Tst/tst/generator_templates/ver_header.py
with
85 additions
and
17 deletions
Tst/testing_library/testlib/tm.py
+
84
−
17
View file @
e1391dbd
...
...
@@ -979,15 +979,78 @@ def extract_apid_from_packetid(packet_id):
return
apid
def
get_tc_ack
(
pool_name
=
"
LIVE
"
,
ack_types
=
None
,
tc_apid
=
321
):
def
get_tc_acknow
(
pool_name
=
"
LIVE
"
,
tc_apid
=
321
,
tc_ssc
=
1
,
tm_st
=
1
,
tm_sst
=
None
):
if
ack_types
is
None
:
remaining
=
[
1
,
7
]
else
:
remaining
=
list
(
ack_types
)
tc_data
=
get_data_of_last_tc
(
pool_name
,
apid
=
tc_apid
)
if
tc_data
is
None
:
return
False
,
[]
t_tc_sent
=
tc_data
[
0
]
tc_idx
=
tc_data
[
1
]
tc_head_bytes
=
tc_data
[
2
]
assert
isinstance
(
pool_name
,
str
)
assert
isinstance
(
tc_apid
,
(
int
,
str
))
assert
isinstance
(
t_tc_sent
,
(
float
,
int
))
assert
isinstance
(
tc_idx
,
int
)
assert
isinstance
(
tc_head_bytes
,
(
int
,
str
,
bytes
))
acks
=
cfl
.
get_pool_rows
(
pool_name
)
acks
=
acks
.
filter
(
cfl
.
DbTelemetry
.
stc
==
1
,
cfl
.
DbTelemetry
.
idx
>
tc_idx
).
order_by
(
cfl
.
DbTelemetry
.
idx
)
ack_pkts
=
[]
result
=
False
for
ack
in
acks
:
if
ack
.
data
[:
4
]
==
tc_head_bytes
:
if
ack
.
sst
in
[
1
,
3
,
7
]:
logger
.
info
(
'
TM({},{}) @ {}
'
.
format
(
ack
.
stc
,
ack
.
sst
,
ack
.
timestamp
))
elif
ack
.
sst
in
[
2
,
4
,
8
]:
if
ack
.
sst
==
2
:
logger
.
info
(
'
TM({},{}) @ {} FAILURE: Acknowledge failure of acceptance check for a command.
'
.
format
(
ack
.
stc
,
ack
.
sst
,
ack
.
timestamp
))
logger
.
debug
(
'
Data of the TM packet: {}
'
.
format
(
ack
.
raw
.
hex
()))
if
ack
.
sst
==
4
:
logger
.
info
(
'
TM({},{}) @ {} FAILURE: Acknowledge failure of start check for a command.
'
.
format
(
ack
.
stc
,
ack
.
sst
,
ack
.
timestamp
))
logger
.
debug
(
'
Data of the TM packet: {}
'
.
format
(
ack
.
raw
.
hex
()))
if
ack
.
sst
==
8
:
logger
.
info
(
'
TM({},{}) @ {} FAILURE: Acknowledge failure of termination check for a command.
'
.
format
(
ack
.
stc
,
ack
.
sst
,
ack
.
timestamp
))
logger
.
debug
(
'
Data of the TM packet: {}
'
.
format
(
ack
.
raw
.
hex
()))
else
:
logger
.
info
(
'
No TM found with matching pattern {}
'
.
format
(
tc_head_bytes
))
try
:
remaining
.
remove
(
ack
.
sst
)
except
ValueError
:
pass
ack_pkts
.
append
(
ack
.
raw
)
if
not
remaining
:
result
=
True
if
not
ack_pkts
:
logger
.
info
(
'
No ACKs found with matching pattern {}
'
.
format
(
tc_head_bytes
.
hex
().
upper
()))
return
result
,
ack_pkts
def
get_tc_acknow
(
pool_name
=
"
LIVE
"
,
tc_apid
=
321
,
tc_ssc
=
None
,
tm_st
=
1
,
tm_sst
=
None
):
"""
Check if for the TC acknowledgement packets can be found in the database.
This function makes a single database query.
:param pool_name: str
Name of the TM pool in the database
:param t_tc_sent: float
CUC timestamp from which search for telecommand should start
:param tc_apid: int or str
Application process ID of the sent TC. Can be provided as integer or hexadecimal string
:param tc_ssc: int
...
...
@@ -1017,7 +1080,7 @@ def get_tc_acknow(pool_name="LIVE", tc_apid=321, tc_ssc=1, tm_st=1, tm_sst=None)
tc_apid
=
tools
.
convert_apid_to_int
(
apid
=
tc_apid
)
# make database query
packets
=
fetch_packets
(
pool_name
=
pool_name
,
st
=
tm_st
,
sst
=
tm_sst
,
t_from
=
t_tc_sent
-
1
)
packets
=
fetch_packets
(
pool_name
=
pool_name
,
st
=
tm_st
,
sst
=
tm_sst
,
t_from
=
t_tc_sent
)
# print(packets[1][0][0].CTIME + (packets[1][0][0].FTIME/1000000))
# filter for TM packets with the correct APID and source sequence counter (SSC) in the data field
ack_tms
=
[]
...
...
@@ -1848,7 +1911,7 @@ def get_acquisition(pool_name, tm_21_3):
return
result
def
get_data_of_last_tc
(
pool_name
=
"
LIVE
"
):
def
get_data_of_last_tc
(
pool_name
=
"
LIVE
"
,
apid
=
None
):
"""
Finds the newest Telecommand in database with the Telemetry just before.
:param pool_name: str
...
...
@@ -1857,21 +1920,25 @@ def get_data_of_last_tc(pool_name="LIVE"):
timestamp of packet, IID of packet, First four bytes of packet data
"""
tm_pool
=
cfl
.
get_pool_rows
(
pool_name
)
# last_telecommand = None
tm_before_tc
=
None
for
i
in
range
(
-
1
,
-
100
,
-
1
):
if
tm_pool
[
i
].
timestamp
==
""
:
# last_telecommand = tm_pool[i]
tm_before_tc
=
tm_pool
[
i
-
1
]
tc_id
=
tm_pool
[
i
].
iid
tc_bytes
=
tm_pool
[
i
].
raw
.
hex
()[:
4
]
break
tcpkt
=
tm_pool
.
filter
(
cfl
.
DbTelemetry
.
is_tm
==
1
).
order_by
(
cfl
.
DbTelemetry
.
idx
.
desc
())
if
apid
is
not
None
:
tcpkt
=
tcpkt
.
filter
(
cfl
.
DbTelemetry
.
apid
==
apid
)
tcpkt
=
tcpkt
.
first
()
if
tcpkt
is
None
:
return
tc_id
=
tcpkt
.
idx
tc_bytes
=
tcpkt
.
raw
[:
4
]
# fist 4 bytes of TC used for identification in ACK service
tm_before_tc
=
tm_pool
.
filter
(
cfl
.
DbTelemetry
.
is_tm
==
0
,
cfl
.
DbTelemetry
.
idx
<
tc_id
).
order_by
(
cfl
.
DbTelemetry
.
idx
.
desc
()).
first
()
timestamp
=
tm_before_tc
.
timestamp
timestamp_length
=
len
(
timestamp
)
timestamp
=
float
(
timestamp
[:
timestamp_length
-
1
])
timestamp
=
float
(
timestamp
[:
-
1
])
return
timestamp
,
tc_id
,
tc_bytes
...
...
This diff is collapsed.
Click to expand it.
Tst/tst/generator_templates/ver_header.py
+
1
−
0
View file @
e1391dbd
...
...
@@ -11,6 +11,7 @@ sys.path.append(ccs_path)
sys
.
path
.
append
(
confignator
.
get_option
(
'
tst-paths
'
,
'
testing_library
'
))
import
ccs_function_lib
as
cfl
from
Tst.testing_library.testlib
import
tm
from
datetime
import
datetime
from
testlib
import
report
...
...
This diff is collapsed.
Click to expand it.
Preview
0%
Loading
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Save comment
Cancel
Please
register
or
sign in
to comment