From a10b642e364202610595230ffbbefb7dcb8723b8 Mon Sep 17 00:00:00 2001 From: Marko Mecina <marko.mecina@univie.ac.at> Date: Thu, 7 Dec 2023 12:11:58 +0100 Subject: [PATCH] make collecting S13 transfers more robust against incomplete transfers --- Ccs/ccs_function_lib.py | 189 ++++++++++++++++++++++++++-------------- Ccs/decompression.py | 17 ++-- Ccs/editor.py | 3 +- 3 files changed, 135 insertions(+), 74 deletions(-) diff --git a/Ccs/ccs_function_lib.py b/Ccs/ccs_function_lib.py index 728e79c..efef284 100644 --- a/Ccs/ccs_function_lib.py +++ b/Ccs/ccs_function_lib.py @@ -27,6 +27,7 @@ import logging.handlers from database.tm_db import scoped_session_maker, DbTelemetry, DbTelemetryPool, RMapTelemetry, FEEDataTelemetry from sqlalchemy.exc import OperationalError as SQLOperationalError from sqlalchemy.sql.expression import func +import threading from typing import NamedTuple from s2k_partypes import ptt, ptt_reverse, ptype_parameters, ptype_values @@ -5633,7 +5634,7 @@ def _get_displayed_pool_path(pool_name=None): def collect_13(pool_name, starttime=None, endtime=None, startidx=None, endidx=None, join=True, collect_all=False, - sdu=None, verbose=True): + sdu=None, verbose=True, consistency_check=True): """ Collect and group S13 down transfer packet trains @@ -5646,8 +5647,10 @@ def collect_13(pool_name, starttime=None, endtime=None, startidx=None, endidx=No :param collect_all: :param sdu: :param verbose: + :param consistency_check: :return: """ + if not os.path.isfile(pool_name): logger.debug('{} is not a file, looking it up in DB'.format(pool_name)) # try fetching pool info from pools opened in viewer @@ -5657,9 +5660,8 @@ def collect_13(pool_name, starttime=None, endtime=None, startidx=None, endidx=No rows = get_pool_rows(pool_name, check_existence=True) - ces = {} # faster method to collect already completed TM13 transfers - tm_bounds = rows.filter(DbTelemetry.stc == 13, DbTelemetry.sst.in_([1, 3])).order_by(DbTelemetry.idx) + tm_bounds = rows.filter(DbTelemetry.stc == 13, DbTelemetry.sst.in_([1, 3, 4])).order_by(DbTelemetry.idx) if starttime is not None: tm_bounds = tm_bounds.filter(func.left(DbTelemetry.timestamp, func.length(DbTelemetry.timestamp) - 1) >= starttime) @@ -5676,8 +5678,8 @@ def collect_13(pool_name, starttime=None, endtime=None, startidx=None, endidx=No if sdu: tm_bounds = tm_bounds.filter(func.left(DbTelemetry.data, 1) == sdu.to_bytes(SDU_PAR_LENGTH, 'big')) - # quit if no start and end packet are found - if tm_bounds.count() < 2: + # quit if no start packet is found + if tm_bounds.filter(DbTelemetry.sst == 1).count() == 0: return {None: None} tm_132 = rows.filter(DbTelemetry.stc == 13, DbTelemetry.sst == 2).order_by(DbTelemetry.idx) @@ -5697,78 +5699,135 @@ def collect_13(pool_name, starttime=None, endtime=None, startidx=None, endidx=No if sdu: tm_132 = tm_132.filter(func.left(DbTelemetry.data, 1) == sdu.to_bytes(SDU_PAR_LENGTH, 'big')) - # make sure to start with a 13,1 - while tm_bounds[0].sst != 1: - tm_bounds = tm_bounds[1:] - if len(tm_bounds) < 2: - return {None: None} - # and end with a 13,3 - while tm_bounds[-1].sst != 3: - tm_bounds = tm_bounds[:-1] - if len(tm_bounds) < 2: - return {None: None} + # remove incomplete transfers + clean_bounds = _check_s13_downlinks(tm_bounds, tm_132) if not collect_all: - tm_bounds = tm_bounds[:2] - else: - tm_bounds = tm_bounds[:] # cast Query to list if not list already - - # check for out of order 1s and 3s TODO: there can be short transfers with only TM13,1 and no TM13,3 - idcs = [i.sst for i in tm_bounds] - outoforder = [] - for i in range(len(idcs) - 1): - if idcs[i + 1] == idcs[i]: - if idcs[i] == 1: - outoforder.append(i) - elif idcs[i] == 3: - outoforder.append(i + 1) - dels = 0 - for i in outoforder: - del tm_bounds[i - dels] - dels += 1 - # check if start/end (1/3) strictly alternating - if not np.diff([i.sst for i in tm_bounds]).all(): - print('Detected inconsistent transfers') - return {None: None} + clean_bounds = clean_bounds[0] + + ces = _assemble_s13(clean_bounds, tm_132, join=join, consistency_check=consistency_check, verbose=verbose) + + return ces + + +def _check_s13_downlinks(s13_bounds, s13_intermediate): + """ + Filter out TM13 1 & 3/4 packets from incomplete transfers + + :param s13_bounds: + :param s13_intermediate: + :return: + """ + + tx = False + sidx = 0 + + valid_start = None + + clean_transfers = [] + + for pkt in s13_bounds: + + if not tx and pkt.sst == 1: + tx = True + sidx = pkt.idx + valid_start = pkt + + elif pkt.sst == 1: + if s13_intermediate.filter(DbTelemetry.idx > sidx, DbTelemetry.idx < pkt.idx).count(): + logger.warning('incomplete downlink at {}'.format(sidx)) + sidx = pkt.idx + else: + clean_transfers.append((valid_start, None)) + logger.info('single packet downlink at {}'.format(sidx)) + valid_start = pkt + tx = True + + elif tx and pkt.sst == 3: + clean_transfers.append((valid_start, pkt)) + valid_start = None + tx = False + + elif not tx and pkt.sst == 3: + tx = False + logger.warning('unexpected end-of-transmission packet at {}'.format(pkt.idx)) + + elif tx and pkt.sst == 4: + tx = False + logger.warning('aborted downlink at {}'.format(pkt.idx)) + + elif not tx and pkt.sst == 4: + tx = False + logger.warning('unexpected abort-of-transmission packet at {}'.format(pkt.idx)) - k = 0 - n = len(tm_bounds) - cnt = 0 - while k < n - 1: - i, j = tm_bounds[k:k + 2] - if i.sst == j.sst: - k += 1 - logger.warning('Identical consecutive SSTs found (idx={})'.format(i.idx)) - continue else: - pkts = [a.raw[S13_HEADER_LEN_TOTAL:-PEC_LEN] for a in tm_132.filter(DbTelemetry.idx > i.idx, DbTelemetry.idx < j.idx)] + logger.error("I shouldn't be here! ({})".format(pkt.idx)) + + return clean_transfers + + +def _assemble_s13(bounds, tm_132, join=True, consistency_check=False, verbose=True): + """ + Assemble payload data from S13 transfers. + + :param bounds: + :param tm_132: + :param join: + :param consistency_check: + :param verbose: + :return: + """ + + ces = {} + errs = [] + + scnt_offset = TM_HEADER_LEN + _s13_info[0][1] + scnt_size = _s13_info[1][1] + + for i, j in bounds: + + try: + # single packet transfer + if j is None: + firstpktdata = b'' + pkts = [] + datalen = int.from_bytes(i.raw[S13_DATALEN_PAR_OFFSET:S13_DATALEN_PAR_OFFSET + S13_DATALEN_PAR_SIZE], 'big') + lastpktdata = i.raw[S13_HEADER_LEN_TOTAL:S13_HEADER_LEN_TOTAL + datalen] + + else: + firstpktdata = i.raw[S13_HEADER_LEN_TOTAL:-PEC_LEN] + pkts = [a.raw[S13_HEADER_LEN_TOTAL:-PEC_LEN] for a in tm_132.filter(DbTelemetry.idx > i.idx, DbTelemetry.idx < j.idx)] + + # check for padding bytes in last packet + datalen = int.from_bytes(j.raw[S13_DATALEN_PAR_OFFSET:S13_DATALEN_PAR_OFFSET + S13_DATALEN_PAR_SIZE], 'big') + lastpktdata = j.raw[S13_HEADER_LEN_TOTAL:S13_HEADER_LEN_TOTAL + datalen] - # check for padding bytes in last packet - datalen = int.from_bytes(j.raw[S13_DATALEN_PAR_OFFSET:S13_DATALEN_PAR_OFFSET + S13_DATALEN_PAR_SIZE], 'big') - lastpktdata = j.raw[S13_HEADER_LEN_TOTAL:S13_HEADER_LEN_TOTAL + datalen] + if consistency_check: + # check if number of collected packets matches the sequence counter of TM13,3 + cnt = int.from_bytes(j.raw[scnt_offset:scnt_offset + scnt_size], 'big') + npkts = len(pkts) + 2 + if cnt != npkts: + logger.warning('Inconsistent number of packets in transfer starting at {}'.format(i.timestamp)) + errs.append(i.timestamp) if join: - ces[float(i.timestamp[:-1])] = i.raw[S13_HEADER_LEN_TOTAL:-PEC_LEN] + b''.join(pkts) + lastpktdata + ces[float(i.timestamp[:-1])] = firstpktdata + b''.join(pkts) + lastpktdata else: - ces[float(i.timestamp[:-1])] = [i.raw[S13_HEADER_LEN_TOTAL:-PEC_LEN]] + pkts + [lastpktdata] - k += 2 + ces[float(i.timestamp[:-1])] = [firstpktdata] + pkts + [lastpktdata] - cnt += 1 - if verbose: - print('Collected {} of {} S13 transfers.'.format(cnt, n // 2), end='\r') + except Exception as err: + logger.error(err) - # for (i, j) in zip(tm_bounds[::2], tm_bounds[1::2]): - # pkts = [a.raw[21:-2] for a in tm_132.filter(DbTelemetry.idx > i.idx, DbTelemetry.idx < j.idx)] - # if join: - # ces[float(i.timestamp[:-1])] = i.raw[21:-2] + b''.join(pkts) + j.raw[21:-2] - # else: - # ces[float(i.timestamp[:-1])] = [i.raw[21:-2]] + [b''.join(pkts)] + [j.raw[21:-2]] + if verbose: + print('Collected {} S13 transfers.'.format(len(ces))) + if len(errs) != 0: + print('There are inconsistencies in {} transfer(s)!\n{}'.format(len(errs), '\n'.join(errs))) return ces def dump_large_data(pool_name, starttime=0, endtime=None, outdir="", dump_all=False, sdu=None, startidx=None, - endidx=None, verbose=True): + endidx=None, verbose=True, consistency_check=True): """ Dump S13 down transfer data to disk. For pools loaded from a file, pool_name must be the absolute path of that file. @@ -5781,6 +5840,7 @@ def dump_large_data(pool_name, starttime=0, endtime=None, outdir="", dump_all=Fa :param startidx: :param endidx: :param verbose: + :param consistency_check: """ if not os.path.exists(outdir): @@ -5788,7 +5848,7 @@ def dump_large_data(pool_name, starttime=0, endtime=None, outdir="", dump_all=Fa filedict = {} ldt_dict = collect_13(pool_name, starttime=starttime, endtime=endtime, join=True, collect_all=dump_all, - startidx=startidx, endidx=endidx, sdu=sdu, verbose=verbose) + startidx=startidx, endidx=endidx, sdu=sdu, verbose=verbose, consistency_check=consistency_check) ldt_cnt = 0 for i, buf in enumerate(ldt_dict, 1): @@ -5821,7 +5881,6 @@ def dump_large_data(pool_name, starttime=0, endtime=None, outdir="", dump_all=Fa return filedict -import threading class DbTools: """ SQL database management tools @@ -7257,7 +7316,7 @@ try: except (FileNotFoundError, ValueError, confignator.config.configparser.NoOptionError): if 'DP_ITEMS_SRC_FILE' not in locals(): DP_ITEMS_SRC_FILE = None - logger.warning('Could not load data pool from file: {}. Using MIB instead.'.format(DP_ITEMS_SRC_FILE)) + logger.warning('Could not load data pool from file: {} Using MIB instead.'.format(DP_ITEMS_SRC_FILE)) _dp_items = get_data_pool_items(as_dict=True) finally: # DP_IDS_TO_ITEMS = {int(k[0]): k[1] for k in _dp_items} diff --git a/Ccs/decompression.py b/Ccs/decompression.py index c97bcce..2696d69 100644 --- a/Ccs/decompression.py +++ b/Ccs/decompression.py @@ -77,9 +77,9 @@ def convert_fullframe_to_cheopssim(fname): def ce_decompress(pool_name, outdir, sdu=None, starttime=None, endtime=None, startidx=None, endidx=None, - ce_exec=None): + ce_exec=None, check_s13_consistency=True): decomp = CeDecompress(pool_name, outdir, sdu=sdu, starttime=starttime, endtime=endtime, startidx=startidx, - endidx=endidx, ce_exec=ce_exec) + endidx=endidx, ce_exec=ce_exec, check_s13_consistency=check_s13_consistency) decomp.start() @@ -95,7 +95,7 @@ def ce_decompress_stop(name=None): class CeDecompress: def __init__(self, pool_name, outdir, sdu=None, starttime=None, endtime=None, startidx=None, endidx=None, - ce_exec=None): + ce_exec=None, check_s13_consistency=True): self.outdir = outdir self.pool_name = pool_name self.sdu = sdu @@ -103,6 +103,7 @@ class CeDecompress: self.endtime = endtime self.startidx = startidx self.endidx = endidx + self.check_s13_consistency = check_s13_consistency self.init_time = int(time.time()) @@ -124,9 +125,6 @@ class CeDecompress: self.ce_collect_timeout = CE_COLLECT_TIMEOUT self.ldt_minimum_ce_gap = LDT_MINIMUM_CE_GAP - global ce_decompressors - ce_decompressors[self.init_time] = self - def _ce_decompress(self): checkdir = os.path.dirname(self.outdir) if not os.path.exists(checkdir) and checkdir != "": @@ -163,7 +161,7 @@ class CeDecompress: try: filedict = cfl.dump_large_data(pool_name=self.pool_name, starttime=self.last_ce_time, endtime=self.endtime, outdir=self.outdir, dump_all=True, sdu=self.sdu, startidx=self.startidx, - endidx=self.endidx) + endidx=self.endidx, consistency_check=self.check_s13_consistency) for ce in filedict: decompress(filedict[ce]) self.last_ce_time = ce @@ -177,7 +175,7 @@ class CeDecompress: while self.ce_decompression_on: filedict = cfl.dump_large_data(pool_name=self.pool_name, starttime=self.last_ce_time, endtime=self.endtime, outdir=self.outdir, dump_all=False, sdu=self.sdu, startidx=self.startidx, - endidx=self.endidx) + endidx=self.endidx, consistency_check=self.check_s13_consistency) if len(filedict) == 0: time.sleep(self.ce_collect_timeout) continue @@ -191,6 +189,9 @@ class CeDecompress: def start(self): self._ce_decompress() + global ce_decompressors + ce_decompressors[self.init_time] = self + def stop(self): self.ce_decompression_on = False diff --git a/Ccs/editor.py b/Ccs/editor.py index 5f15165..0d9af6f 100644 --- a/Ccs/editor.py +++ b/Ccs/editor.py @@ -24,7 +24,7 @@ import DBus_Basic import ccs_function_lib as cfl sys.path.append(os.path.join(confignator.get_option("paths", "Tst"), "testing_library/testlib")) -import tm +# import tm cfg = confignator.get_config() @@ -32,6 +32,7 @@ pixmap_folder = cfg.get('ccs-paths', 'pixmap') action_folder = cfg.get('ccs-paths', 'actions') scripts = glob.glob(os.path.join(cfg.get('paths', 'ccs'), "scripts/*.py")) +scripts.sort() script_actions = '\n'.join(["<menuitem action='{}' />".format(os.path.split(script)[-1][:-3]) for script in scripts]) LOG_UPDT_PER = 2000 # ms -- GitLab