diff --git a/Ccs/ccs_function_lib.py b/Ccs/ccs_function_lib.py index e2f1c0b735a85468cf4853632cc416ed0d75e60f..fe63b9a8725b3514043214727f75e6f181e0e237 100644 --- a/Ccs/ccs_function_lib.py +++ b/Ccs/ccs_function_lib.py @@ -5743,8 +5743,11 @@ def collect_13(pool_name, starttime=None, endtime=None, startidx=None, endidx=No # remove incomplete transfers clean_bounds = _check_s13_downlinks(tm_bounds, tm_132) + if len(clean_bounds) == 0: + return {None: None} + if not collect_all: - clean_bounds = clean_bounds[0] + clean_bounds = [clean_bounds[0]] ces = _assemble_s13(clean_bounds, tm_132, join=join, consistency_check=consistency_check, verbose=verbose) @@ -5780,7 +5783,7 @@ def _check_s13_downlinks(s13_bounds, s13_intermediate): sidx = pkt.idx else: clean_transfers.append((valid_start, None)) - logger.info('single packet downlink at {}'.format(sidx)) + logger.debug('single packet downlink at {}'.format(valid_start.idx)) valid_start = pkt tx = True @@ -5791,7 +5794,7 @@ def _check_s13_downlinks(s13_bounds, s13_intermediate): elif not tx and pkt.sst == 3: tx = False - logger.warning('unexpected end-of-transmission packet at {}'.format(pkt.idx)) + logger.debug('unexpected end-of-transmission packet at {}'.format(pkt.idx)) elif tx and pkt.sst == 4: tx = False @@ -5892,7 +5895,7 @@ def dump_large_data(pool_name, starttime=0, endtime=None, outdir="", dump_all=Fa startidx=startidx, endidx=endidx, sdu=sdu, verbose=verbose, consistency_check=consistency_check) ldt_cnt = 0 - for i, buf in enumerate(ldt_dict, 1): + for buf in ldt_dict: if ldt_dict[buf] is None: continue diff --git a/Ccs/decompression.py b/Ccs/decompression.py index 2696d696d19efda740a6d6df5d007bdbb54f208f..9bcd72f4cefeeb715feb3ba4694ef08425147a95 100644 --- a/Ccs/decompression.py +++ b/Ccs/decompression.py @@ -95,7 +95,7 @@ def ce_decompress_stop(name=None): class CeDecompress: def __init__(self, pool_name, outdir, sdu=None, starttime=None, endtime=None, startidx=None, endidx=None, - ce_exec=None, check_s13_consistency=True): + ce_exec=None, check_s13_consistency=True, verbose=True): self.outdir = outdir self.pool_name = pool_name self.sdu = sdu @@ -104,6 +104,7 @@ class CeDecompress: self.startidx = startidx self.endidx = endidx self.check_s13_consistency = check_s13_consistency + self.verbose = verbose self.init_time = int(time.time()) @@ -126,27 +127,25 @@ class CeDecompress: self.ldt_minimum_ce_gap = LDT_MINIMUM_CE_GAP def _ce_decompress(self): - checkdir = os.path.dirname(self.outdir) - if not os.path.exists(checkdir) and checkdir != "": - os.mkdir(checkdir) + checkdir = os.path.abspath(self.outdir) + if not os.path.isdir(checkdir): # and checkdir != "": + # os.mkdir(checkdir) + raise NotADirectoryError('{} is not a directory/does not exist.'.format(checkdir)) - thread = threading.Thread(target=self._ce_decompress_worker, name="CeDecompression") - thread.daemon = True - self.ce_thread = thread + self.ce_thread = threading.Thread(target=self._ce_decompress_worker, name="CeDecompression_{}".format(self.init_time)) + self.ce_thread.daemon = True if self.starttime is not None: self.last_ce_time = self.starttime self.ce_decompression_on = True try: - thread.start() + self.ce_thread.start() logger.info('Started CeDecompress [{}]...'.format(self.init_time)) except Exception as err: logger.error(err) self.ce_decompression_on = False raise err - return thread - def _ce_decompress_worker(self): def decompress(cefile): @@ -161,7 +160,8 @@ class CeDecompress: try: filedict = cfl.dump_large_data(pool_name=self.pool_name, starttime=self.last_ce_time, endtime=self.endtime, outdir=self.outdir, dump_all=True, sdu=self.sdu, startidx=self.startidx, - endidx=self.endidx, consistency_check=self.check_s13_consistency) + endidx=self.endidx, consistency_check=self.check_s13_consistency, + verbose=self.verbose) for ce in filedict: decompress(filedict[ce]) self.last_ce_time = ce @@ -174,13 +174,19 @@ class CeDecompress: while self.ce_decompression_on: filedict = cfl.dump_large_data(pool_name=self.pool_name, starttime=self.last_ce_time, endtime=self.endtime, - outdir=self.outdir, dump_all=False, sdu=self.sdu, startidx=self.startidx, - endidx=self.endidx, consistency_check=self.check_s13_consistency) + outdir=self.outdir, dump_all=True, sdu=self.sdu, startidx=self.startidx, + endidx=self.endidx, consistency_check=self.check_s13_consistency, + verbose=self.verbose) if len(filedict) == 0: time.sleep(self.ce_collect_timeout) continue - self.last_ce_time, cefile = list(filedict.items())[0] - decompress(cefile) + + for ce in filedict: + self.last_ce_time, cefile = ce, filedict[ce] + decompress(cefile) + + # self.last_ce_time, cefile = list(filedict.items())[0] + # decompress(cefile) self.last_ce_time += self.ldt_minimum_ce_gap time.sleep(self.ce_collect_timeout) logger.info('CeDecompress stopped [{}].'.format(self.init_time))