Skip to content
GitLab
Explore
Sign in
Primary navigation
Search or go to…
Project
C
CCS
Manage
Activity
Members
Plan
Wiki
Code
Repository
Branches
Commits
Tags
Repository graph
Compare revisions
Snippets
Deploy
Releases
Package registry
Model registry
Operate
Terraform modules
Analyze
Contributor analytics
Model experiments
Help
Help
Support
GitLab documentation
Compare GitLab plans
GitLab community forum
Contribute to GitLab
Provide feedback
Terms and privacy
Keyboard shortcuts
?
Snippets
Groups
Projects
Show more breadcrumbs
Marko Mecina
CCS
Commits
9e45b685
Commit
9e45b685
authored
Feb 15, 2022
by
Marko Mecina
Browse files
Options
Downloads
Patches
Plain Diff
testlib adaptations
parent
017b2988
No related branches found
No related tags found
No related merge requests found
Changes
2
Show whitespace changes
Inline
Side-by-side
Showing
2 changed files
Tst/testing_library/testlib/tm.py
+27
-56
27 additions, 56 deletions
Tst/testing_library/testlib/tm.py
Tst/testing_library/testlib/tools.py
+16
-20
16 additions, 20 deletions
Tst/testing_library/testlib/tools.py
with
43 additions
and
76 deletions
Tst/testing_library/testlib/tm.py
+
27
−
56
View file @
9e45b685
...
...
@@ -67,42 +67,6 @@ from . import tools
# create a logger
logger
=
logging
.
getLogger
(
__name__
)
'''
def sessionfactory(ccs):
"""
Creates a sessionmaker
:param ccs: Instance of the class packets.CCScom
:type ccs: packets.CCScom
:return: Sessionmaker object
:rtype: sqlalchemy.orm.session.sessionmaker
"""
if ccs.session is None:
engine = None
s_factory = None
if engine is None:
engine = create_engine(
config_db.mysql_connection_string,
echo=
"
-v
"
in sys.argv)
s_factory = sessionmaker(bind=engine)
else:
s_factory = ccs.session
return s_factory
def new_database_session(ccs):
"""
Creates a new session.
:param ccs: packets.CCScom
Instance of the class packets.CCScom
:return: session
"""
session_maker = sessionfactory(ccs=ccs)
session = session_maker()
return session
'''
def
filter_chain
(
query
,
pool_name
,
is_tm
=
True
,
st
=
None
,
sst
=
None
,
apid
=
None
,
seq
=
None
,
t_from
=
None
,
t_to
=
None
,
dest_id
=
None
,
not_apid
=
None
):
"""
...
...
@@ -338,10 +302,10 @@ def decode_single_tm_packet(packet):
header
=
cfl
.
Tmread
(
packet
)
if
header
is
not
None
:
# the packet is a TC
if
header
[
1
]
==
1
:
if
header
[
0
].
PKT_TYPE
==
1
:
result
=
header
,
None
# the packet is a TM
elif
header
[
1
]
==
0
:
elif
header
[
0
].
PKT_TYPE
==
0
:
data
=
cfl
.
Tmdata
(
packet
)
if
data
!=
(
None
,
None
):
# data field could be decoded
result
=
header
,
data
...
...
@@ -951,9 +915,11 @@ def get_st_and_sst(pool_name, apid, ssc, is_tm=False, t_from=None):
# if the packet was found read the header and extract the ST and SST
if
len
(
tc_list
)
==
1
:
if
tc_list
[
0
][
0
]
is
not
None
:
header
=
tc_list
[
0
][
0
]
tc_st
=
header
[
10
]
tc_sst
=
header
[
11
]
header
=
tc_list
[
0
][
0
][
0
]
#tc_st = header[10]
#tc_sst = header[11]
tc_st
=
header
.
SERV_TYPE
tc_sst
=
header
.
SERV_SUB_TYPE
elif
len
(
tc_list
)
<
1
:
logger
.
warning
(
'
get_st_and_sst: TC packet with apid {} and source sequence counter {} could not be found in the
'
'
database
'
.
format
(
apid
,
ssc
))
...
...
@@ -1028,8 +994,8 @@ def get_tc_acknow(pool_name, t_tc_sent, tc_apid, tc_ssc, tm_st=1, tm_sst=None):
"""
result
=
None
assert
isinstance
(
pool_name
,
str
)
assert
isinstance
(
tc_apid
,
int
)
or
isinstance
(
tc_apid
,
str
)
assert
isinstance
(
t_tc_sent
,
float
)
assert
isinstance
(
tc_apid
,
(
int
,
str
)
)
assert
isinstance
(
t_tc_sent
,
(
float
,
int
)
)
# if the tc_apid is provided as hexadecimal number, convert it to and integer
tc_apid
=
tools
.
convert_apid_to_int
(
apid
=
tc_apid
)
...
...
@@ -1042,13 +1008,14 @@ def get_tc_acknow(pool_name, t_tc_sent, tc_apid, tc_ssc, tm_st=1, tm_sst=None):
for
i
in
range
(
len
(
packets
)):
if
packets
[
i
][
1
]
is
not
None
and
packets
[
i
][
1
][
0
]
is
not
None
:
# get the data entries for APID and SSC
pac_apid
=
packets
[
i
][
0
][
3
]
#pac_apid = packets[i][0][3] # TODO: not compatible anymore
pac_apid
=
packets
[
i
][
0
][
0
].
APID
if
pac_apid
==
961
:
# for acknowledgements from SEM
name_apid
=
'
PAR_CMD_APID
'
name_psc
=
'
PAR_CMD_SEQUENCE_COUNT
'
else
:
name_apid
=
'
TcP
a
ck
e
tId
'
name_psc
=
'
TcP
a
ck
e
tSeqCtrl
'
name_apid
=
'
TcPcktId
'
name_psc
=
'
TcPcktSeqCtrl
'
para
=
get_tm_data_entries
(
tm_packet
=
packets
[
i
],
data_entry_names
=
[
name_apid
,
name_psc
])
if
name_apid
in
para
and
name_psc
in
para
:
# extract the SSC from the PSC
...
...
@@ -1077,25 +1044,25 @@ def get_tc_acknow(pool_name, t_tc_sent, tc_apid, tc_ssc, tm_st=1, tm_sst=None):
# check if there was a failure, the result becomes False if a failure occurred
for
i
in
range
(
len
(
ack_tms
)):
head
=
ack_tms
[
i
][
0
]
head
=
ack_tms
[
i
][
0
]
[
0
]
data
=
ack_tms
[
i
][
1
]
if
result
is
not
False
:
if
head
[
11
]
==
1
or
head
[
11
]
==
3
or
head
[
11
]
==
7
:
logger
.
info
(
'
TM({},{}) @ {}
'
.
format
(
head
[
10
],
head
[
11
]
,
cfl
.
get_cuctime
(
head
)))
if
head
.
SERV_SUB_TYPE
in
[
1
,
3
,
7
]
:
logger
.
info
(
'
TM({},{}) @ {}
'
.
format
(
head
.
SERV_TYPE
,
head
.
SERV_SUB_TYPE
,
cfl
.
get_cuctime
(
head
)))
result
=
True
if
head
[
11
]
==
2
or
head
[
11
]
==
4
or
head
[
11
]
==
8
:
if
head
[
11
]
==
2
:
el
if
head
.
SERV_SUB_TYPE
in
[
2
,
4
,
8
]
:
if
head
.
SERV_SUB_TYPE
==
2
:
logger
.
info
(
'
TM({},{}) @ {} FAILURE: Acknowledge failure of acceptance check for a command.
'
.
format
(
head
[
10
],
head
[
11
]
,
cfl
.
get_cuctime
(
head
)))
.
format
(
head
.
SERV_TYPE
,
head
.
SERV_SUB_TYPE
,
cfl
.
get_cuctime
(
head
)))
logger
.
debug
(
'
Data of the TM packet: {}
'
.
format
(
data
))
if
head
[
11
]
==
4
:
if
head
.
SERV_SUB_TYPE
==
4
:
logger
.
info
(
'
TM({},{}) @ {} FAILURE: Acknowledge failure of start check for a command.
'
.
format
(
head
[
10
],
head
[
11
]
,
cfl
.
get_cuctime
(
head
)))
.
format
(
head
.
SERV_TYPE
,
head
.
SERV_SUB_TYPE
,
cfl
.
get_cuctime
(
head
)))
logger
.
debug
(
'
Data of the TM packet: {}
'
.
format
(
data
))
if
head
[
11
]
==
8
:
if
head
.
SERV_SUB_TYPE
==
8
:
logger
.
info
(
'
TM({},{}) @ {} FAILURE: Acknowledge failure of termination check for a command.
'
.
format
(
head
[
10
],
head
[
11
]
,
cfl
.
get_cuctime
(
head
)))
.
format
(
head
.
SERV_TYPE
,
head
.
SERV_SUB_TYPE
,
cfl
.
get_cuctime
(
head
)))
logger
.
debug
(
'
Data of the TM packet: {}
'
.
format
(
data
))
result
=
False
...
...
@@ -1564,3 +1531,7 @@ def get_acquisition(pool_name, tm_21_3):
logger
.
info
(
meta_data
[
i
])
return
result
if
__name__
==
'
__main__
'
:
sys
.
path
.
append
(
'
.
'
)
r
=
get_tc_acknow
(
'
PLM
'
,
0.
,
321
,
5
)
print
(
r
)
\ No newline at end of file
This diff is collapsed.
Click to expand it.
Tst/testing_library/testlib/tools.py
+
16
−
20
View file @
9e45b685
...
...
@@ -146,19 +146,15 @@ def convert_apid_to_int(apid):
"""
assert
isinstance
(
apid
,
int
)
or
isinstance
(
apid
,
str
)
res
=
None
if
isinstance
(
apid
,
str
):
try
:
if
'
x
'
in
apid
:
res
=
int
(
apid
,
16
)
else
:
res
=
int
(
apid
)
res
=
int
(
apid
,
0
)
except
ValueError
as
err
:
logger
.
exception
(
err
)
if
isinstance
(
apid
,
int
):
res
=
None
elif
isinstance
(
apid
,
int
):
res
=
apid
if
res
is
None
:
logger
.
error
(
'
convert_apid_to_int: failed to convert the APID {} into an integer
'
.
format
(
apid
))
return
res
...
...
@@ -167,7 +163,6 @@ def convert_apid_to_int(apid):
# @param apid: <int> application process identifier
# @result: <int> PID, None if no valid apid is provided
def
extract_pid_from_apid
(
apid
):
result
=
None
"""
Since commands (
'
0x14C
'
= 332 and
'
0x3CC
'
= 972) are not in the pid table, this check is not used
# query for the existing apids
self.c.execute(
'
SELECT PID_APID FROM dabys_mib_cheops.pid
'
)
...
...
@@ -183,21 +178,22 @@ def extract_pid_from_apid(apid):
# if the apid is provided as a string try to convert it to a integer
if
isinstance
(
apid
,
str
):
try
:
apid
=
int
(
apid
)
except
ValueError
as
error
:
try
:
apid
=
int
(
apid
,
16
)
apid
=
int
(
apid
,
0
)
except
ValueError
as
error
:
logger
.
exception
(
error
)
raise
error
if
isinstance
(
apid
,
int
):
#
if isinstance(apid, int):
# convert the apid into hexadecimal system and slice the last character (=PID), then convert it back to an int
apid_as_hex
=
hex
(
apid
)
pid_as_hex
=
apid_as_hex
[:
4
]
pid_as_dez
=
int
(
pid_as_hex
,
16
)
result
=
pid_as_dez
#
apid_as_hex = hex(apid)
#
pid_as_hex = apid_as_hex[:4]
#
pid_as_dez = int(pid_as_hex, 16)
#
result = pid_as_dez
# TODO: this is the PCAT, not the PID!?
return
result
pid
=
apid
&
0xf
pcat
=
(
apid
>>
4
)
&
0x7f
return
pcat
,
pid
def
print_apids
():
...
...
This diff is collapsed.
Click to expand it.
Preview
0%
Loading
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Save comment
Cancel
Please
register
or
sign in
to comment