From 8aede399afe5927ed2cb6e87126efe894e7a0f7f Mon Sep 17 00:00:00 2001 From: Marko Mecina <marko.mecina@univie.ac.at> Date: Thu, 27 Oct 2022 15:52:54 +0200 Subject: [PATCH] display progress when collecting S13 data --- Ccs/ccs_function_lib.py | 43 +++++++++++++++++++++++++++++------------ 1 file changed, 31 insertions(+), 12 deletions(-) diff --git a/Ccs/ccs_function_lib.py b/Ccs/ccs_function_lib.py index e6abdeb..27c3766 100644 --- a/Ccs/ccs_function_lib.py +++ b/Ccs/ccs_function_lib.py @@ -4084,14 +4084,17 @@ def _get_displayed_pool_path(pool_name=None): # Collect TM13 packets -def collect_13(pool_name, starttime=None, endtime=None, startidx=None, endidx=None, join=True, collect_all=False, sdu=None): +def collect_13(pool_name, starttime=None, endtime=None, startidx=None, endidx=None, join=True, collect_all=False, + sdu=None, verbose=True): - # try fetching pool info from pools opened in viewer - pname = _get_displayed_pool_path(pool_name) - if pname: - pool_name = pname + if not os.path.isfile(pool_name): + logger.info('{} is not a file, looking it up in DB') + # try fetching pool info from pools opened in viewer + pname = _get_displayed_pool_path(pool_name) + if pname: + pool_name = pname - rows = get_pool_rows(pool_name) + rows = get_pool_rows(pool_name, check_existence=True) # if starttime is None: # starttime = 0 @@ -4164,7 +4167,7 @@ def collect_13(pool_name, starttime=None, endtime=None, startidx=None, endidx=No else: tm_bounds = tm_bounds[:] # cast Query to list if not list already - # check for out of order 1s and 3s + # check for out of order 1s and 3s TODO: there can be short transfers with only TM13,1 and no TM13,3 idcs = [i.sst for i in tm_bounds] outoforder = [] for i in range(len(idcs) - 1): @@ -4184,10 +4187,12 @@ def collect_13(pool_name, starttime=None, endtime=None, startidx=None, endidx=No k = 0 n = len(tm_bounds) + cnt = 0 while k < n - 1: i, j = tm_bounds[k:k + 2] if i.sst == j.sst: k += 1 + logger.warning('Identical consecutive SSTs found (idx={})'.format(i.idx)) continue else: pkts = [a.raw[S13_HEADER_LEN_TOTAL:-PEC_LEN] for a in tm_132.filter(DbTelemetry.idx > i.idx, DbTelemetry.idx < j.idx)] @@ -4197,6 +4202,10 @@ def collect_13(pool_name, starttime=None, endtime=None, startidx=None, endidx=No ces[float(i.timestamp[:-1])] = [i.raw[S13_HEADER_LEN_TOTAL:-PEC_LEN]] + [b''.join(pkts)] + [j.raw[S13_HEADER_LEN_TOTAL:-PEC_LEN]] k += 2 + cnt += 1 + if verbose: + print('Collected {} of {} S13 transfers.'.format(cnt, n // 2), end='\r') + # for (i, j) in zip(tm_bounds[::2], tm_bounds[1::2]): # pkts = [a.raw[21:-2] for a in tm_132.filter(DbTelemetry.idx > i.idx, DbTelemetry.idx < j.idx)] # if join: @@ -4208,7 +4217,7 @@ def collect_13(pool_name, starttime=None, endtime=None, startidx=None, endidx=No def dump_large_data(pool_name, starttime=0, endtime=None, outdir="", dump_all=False, sdu=None, startidx=None, - endidx=None): + endidx=None, verbose=True): """ Dump 13,2 data to disk. For pools loaded from a file, pool_name must be the absolute path of that file. @param pool_name: @@ -4219,25 +4228,35 @@ def dump_large_data(pool_name, starttime=0, endtime=None, outdir="", dump_all=Fa @param sdu: @param startidx: @param endidx: + @param verbose: """ + + if not os.path.exists(outdir): + raise FileNotFoundError('Directory "{}" does not exist'.format(outdir)) + filedict = {} ldt_dict = collect_13(pool_name, starttime=starttime, endtime=endtime, join=True, collect_all=dump_all, - startidx=startidx, endidx=endidx, sdu=sdu) - for buf in ldt_dict: + startidx=startidx, endidx=endidx, sdu=sdu, verbose=verbose) + n_ldt = len(ldt_dict) + for i, buf in enumerate(ldt_dict, 1): if ldt_dict[buf] is None: continue try: - obsid, time, ftime, ctr = s13_unpack_data_header(ldt_dict[buf]) + obsid, ctime, ftime, ctr = s13_unpack_data_header(ldt_dict[buf]) except ValueError as err: logger.error('Incompatible definition of S13 data header.') raise err - fname = os.path.join(outdir, "LDT_{:03d}_{:010d}_{:06d}.ce".format(obsid, time, ctr)) + fname = os.path.join(outdir, "LDT_{:03d}_{:010d}_{:06d}.ce".format(obsid, ctime, ctr)) with open(fname, "wb") as fdesc: fdesc.write(ldt_dict[buf]) filedict[buf] = fdesc.name + + logger.info('Dumped {} CEs to {}'.format(n_ldt, outdir)) + print('Dumped {} CEs to {}'.format(n_ldt, outdir)) + return filedict -- GitLab