From 73ead86d67fd21e42bebf766024e2896f6d1fd1e Mon Sep 17 00:00:00 2001
From: Marko Mecina <marko.mecina@univie.ac.at>
Date: Tue, 9 Jan 2024 18:23:57 +0100
Subject: [PATCH] S13 collector and decompression bugfixes

---
 Ccs/ccs_function_lib.py | 11 +++++++----
 Ccs/decompression.py    | 36 +++++++++++++++++++++---------------
 2 files changed, 28 insertions(+), 19 deletions(-)

diff --git a/Ccs/ccs_function_lib.py b/Ccs/ccs_function_lib.py
index e2f1c0b..fe63b9a 100644
--- a/Ccs/ccs_function_lib.py
+++ b/Ccs/ccs_function_lib.py
@@ -5743,8 +5743,11 @@ def collect_13(pool_name, starttime=None, endtime=None, startidx=None, endidx=No
     # remove incomplete transfers
     clean_bounds = _check_s13_downlinks(tm_bounds, tm_132)
 
+    if len(clean_bounds) == 0:
+        return {None: None}
+
     if not collect_all:
-        clean_bounds = clean_bounds[0]
+        clean_bounds = [clean_bounds[0]]
 
     ces = _assemble_s13(clean_bounds, tm_132, join=join, consistency_check=consistency_check, verbose=verbose)
 
@@ -5780,7 +5783,7 @@ def _check_s13_downlinks(s13_bounds, s13_intermediate):
                 sidx = pkt.idx
             else:
                 clean_transfers.append((valid_start, None))
-                logger.info('single packet downlink at {}'.format(sidx))
+                logger.debug('single packet downlink at {}'.format(valid_start.idx))
             valid_start = pkt
             tx = True
 
@@ -5791,7 +5794,7 @@ def _check_s13_downlinks(s13_bounds, s13_intermediate):
 
         elif not tx and pkt.sst == 3:
             tx = False
-            logger.warning('unexpected end-of-transmission packet at {}'.format(pkt.idx))
+            logger.debug('unexpected end-of-transmission packet at {}'.format(pkt.idx))
 
         elif tx and pkt.sst == 4:
             tx = False
@@ -5892,7 +5895,7 @@ def dump_large_data(pool_name, starttime=0, endtime=None, outdir="", dump_all=Fa
                           startidx=startidx, endidx=endidx, sdu=sdu, verbose=verbose, consistency_check=consistency_check)
 
     ldt_cnt = 0
-    for i, buf in enumerate(ldt_dict, 1):
+    for buf in ldt_dict:
         if ldt_dict[buf] is None:
             continue
 
diff --git a/Ccs/decompression.py b/Ccs/decompression.py
index 2696d69..9bcd72f 100644
--- a/Ccs/decompression.py
+++ b/Ccs/decompression.py
@@ -95,7 +95,7 @@ def ce_decompress_stop(name=None):
 class CeDecompress:
 
     def __init__(self, pool_name, outdir, sdu=None, starttime=None, endtime=None, startidx=None, endidx=None,
-                 ce_exec=None, check_s13_consistency=True):
+                 ce_exec=None, check_s13_consistency=True, verbose=True):
         self.outdir = outdir
         self.pool_name = pool_name
         self.sdu = sdu
@@ -104,6 +104,7 @@ class CeDecompress:
         self.startidx = startidx
         self.endidx = endidx
         self.check_s13_consistency = check_s13_consistency
+        self.verbose = verbose
 
         self.init_time = int(time.time())
 
@@ -126,27 +127,25 @@ class CeDecompress:
         self.ldt_minimum_ce_gap = LDT_MINIMUM_CE_GAP
 
     def _ce_decompress(self):
-        checkdir = os.path.dirname(self.outdir)
-        if not os.path.exists(checkdir) and checkdir != "":
-            os.mkdir(checkdir)
+        checkdir = os.path.abspath(self.outdir)
+        if not os.path.isdir(checkdir):  # and checkdir != "":
+            # os.mkdir(checkdir)
+            raise NotADirectoryError('{} is not a directory/does not exist.'.format(checkdir))
 
-        thread = threading.Thread(target=self._ce_decompress_worker, name="CeDecompression")
-        thread.daemon = True
-        self.ce_thread = thread
+        self.ce_thread = threading.Thread(target=self._ce_decompress_worker, name="CeDecompression_{}".format(self.init_time))
+        self.ce_thread.daemon = True
         if self.starttime is not None:
             self.last_ce_time = self.starttime
         self.ce_decompression_on = True
 
         try:
-            thread.start()
+            self.ce_thread.start()
             logger.info('Started CeDecompress [{}]...'.format(self.init_time))
         except Exception as err:
             logger.error(err)
             self.ce_decompression_on = False
             raise err
 
-        return thread
-
     def _ce_decompress_worker(self):
 
         def decompress(cefile):
@@ -161,7 +160,8 @@ class CeDecompress:
         try:
             filedict = cfl.dump_large_data(pool_name=self.pool_name, starttime=self.last_ce_time, endtime=self.endtime,
                                            outdir=self.outdir, dump_all=True, sdu=self.sdu, startidx=self.startidx,
-                                           endidx=self.endidx, consistency_check=self.check_s13_consistency)
+                                           endidx=self.endidx, consistency_check=self.check_s13_consistency,
+                                           verbose=self.verbose)
             for ce in filedict:
                 decompress(filedict[ce])
                 self.last_ce_time = ce
@@ -174,13 +174,19 @@ class CeDecompress:
 
         while self.ce_decompression_on:
             filedict = cfl.dump_large_data(pool_name=self.pool_name, starttime=self.last_ce_time, endtime=self.endtime,
-                                           outdir=self.outdir, dump_all=False, sdu=self.sdu, startidx=self.startidx,
-                                           endidx=self.endidx, consistency_check=self.check_s13_consistency)
+                                           outdir=self.outdir, dump_all=True, sdu=self.sdu, startidx=self.startidx,
+                                           endidx=self.endidx, consistency_check=self.check_s13_consistency,
+                                           verbose=self.verbose)
             if len(filedict) == 0:
                 time.sleep(self.ce_collect_timeout)
                 continue
-            self.last_ce_time, cefile = list(filedict.items())[0]
-            decompress(cefile)
+
+            for ce in filedict:
+                self.last_ce_time, cefile = ce, filedict[ce]
+                decompress(cefile)
+
+            # self.last_ce_time, cefile = list(filedict.items())[0]
+            # decompress(cefile)
             self.last_ce_time += self.ldt_minimum_ce_gap
             time.sleep(self.ce_collect_timeout)
         logger.info('CeDecompress stopped [{}].'.format(self.init_time))
-- 
GitLab