diff --git a/Ccs/ccs_function_lib.py b/Ccs/ccs_function_lib.py index a8164303e7f2dc1581e5450ce334dc4dc95c42db..96f4f57594d4308a89ebf053f51cb86a06215ee2 100644 --- a/Ccs/ccs_function_lib.py +++ b/Ccs/ccs_function_lib.py @@ -69,6 +69,8 @@ SEG_HEADER_LEN = 12 SEG_SPARE_LEN = 2 SEG_CRC_LEN = 2 +SDU_PAR_LENGTH = 1 + pid_offset = int(cfg.get('ccs-misc', 'pid_offset')) fmtlist = {'INT8': 'b', 'UINT8': 'B', 'INT16': 'h', 'UINT16': 'H', 'INT32': 'i', 'UINT32': 'I', 'INT64': 'q', @@ -107,12 +109,6 @@ else: # Notify.init('cfl') -# for CE and S13 collect -ldt_minimum_ce_gap = 0.001 -ce_collect_timeout = 1 -last_ce_time = 0 -ce_decompression_on = False - def _add_log_socket_handler(): global logger @@ -248,7 +244,6 @@ def start_plotter(pool_name=None, console=False, **kwargs): :param console: If False will be run in Console, otherwise will be run in seperate Environment :return: """ - directory = cfg.get('paths', 'ccs') file_path = os.path.join(directory, 'plotter.py') @@ -257,7 +252,6 @@ def start_plotter(pool_name=None, console=False, **kwargs): else: start_app(file_path, directory, console=console, **kwargs) - def start_tst(console=False, **kwargs): directory = cfg.get('paths', 'tst') file_path = os.path.join(directory, 'tst/main.py') @@ -311,9 +305,11 @@ def dbus_connection(name, instance=1): dbus_type = dbus.SessionBus() try: Bus_Name = cfg.get('ccs-dbus_names', name) - except: + except (ValueError, confignator.config.configparser.NoOptionError): logger.warning(str(name) + ' is not a valid DBUS name.') logger.warning(str(name) + ' not found in config file.') + raise NameError('"{}" is not a valid module name'.format(name)) + Bus_Name += str(instance) try: @@ -1630,6 +1626,8 @@ def get_module_handle(module_name, instance=1, timeout=5): if instance is None: instance = communication[module_name] + module = None + t1 = time.time() while (time.time() - t1) < timeout: try: @@ -3923,7 +3921,7 @@ def savepool(filename, pool_name, mode='binary', st_filter=None): logger.info('Pool {} saved as {} in {} mode'.format(pool_name, filename, mode.upper())) -def get_packets_from_pool(pool_name, indices=[], st=None, sst=None, apid=None, **kwargs): +def get_packets_from_pool(pool_name, indices=None, st=None, sst=None, apid=None, **kwargs): """ @param pool_name: @param indices: @@ -3943,7 +3941,7 @@ def get_packets_from_pool(pool_name, indices=[], st=None, sst=None, apid=None, * DbTelemetryPool.pool_name == pool_name ) - if len(indices) != 0: + if indices is not None and len(indices) > 0: rows = rows.filter( DbTelemetry.idx.in_(indices) ) @@ -4000,49 +3998,69 @@ def create_format_model(): ################### quick copy of S13 get data and ce_decompress functionality from old CCS #################### -import astropy.io.fits as pyfits -import threading -CEthread = None -IFSWDIR = cfg.get('paths', 'obsw') -if not IFSWDIR.endswith('/'): - IFSWDIR += '/' ## # Collect TM13 packets -def collect_13(pool_name, starttime=0, endtime=None, start=None, end=None, join=True, collect_all=False, - sdu=None): +def collect_13(pool_name, starttime=None, endtime=None, startidx=None, endidx=None, join=True, collect_all=False, sdu=None): + + # try fetching pool info from pools opened in viewer + + rows = get_pool_rows(pool_name) - if start is not None: - starttime = float(rows.filter(DbTelemetry.idx == start).first().timestamp[:-1]) - if endtime is None: - endtime = get_last_pckt_time(pool_name, string=False) + # if starttime is None: + # starttime = 0 + + # if start is not None: + # starttime = float(rows.filter(DbTelemetry.idx == start).first().timestamp[:-1]) - if end is not None: - endtime = float(rows.filter(DbTelemetry.idx == end).first().timestamp[:-1]) + # if endtime is None: + # endtime = get_last_pckt_time(pool_name, string=False) + # if end is not None: + # endtime = float(rows.filter(DbTelemetry.idx == end).first().timestamp[:-1]) - if starttime is None or endtime is None: - raise ValueError('Specify start(time) and end(time)!') + # if starttime is None or endtime is None: + # raise ValueError('Specify start(time) and end(time)!') ces = {} - # faster method to collect TM13 transfers already completed - tm_bounds = rows.filter(DbTelemetry.stc == 13, DbTelemetry.sst.in_([1, 3]), - func.left(DbTelemetry.timestamp, func.length(DbTelemetry.timestamp) - 1) >= starttime, - func.left(DbTelemetry.timestamp, func.length(DbTelemetry.timestamp) - 1) <= endtime - ).order_by(DbTelemetry.idx) + # faster method to collect already completed TM13 transfers + tm_bounds = rows.filter(DbTelemetry.stc == 13, DbTelemetry.sst.in_([1, 3])).order_by(DbTelemetry.idx) + + if starttime is not None: + tm_bounds = tm_bounds.filter(func.left(DbTelemetry.timestamp, func.length(DbTelemetry.timestamp) - 1) >= starttime) + + if endtime is not None: + tm_bounds = tm_bounds.filter(func.left(DbTelemetry.timestamp, func.length(DbTelemetry.timestamp) - 1) <= endtime) + + if startidx is not None: + tm_bounds = tm_bounds.filter(DbTelemetry.idx >= startidx) + + if endidx is not None: + tm_bounds = tm_bounds.filter(DbTelemetry.idx <= endidx) + if sdu: - tm_bounds = tm_bounds.filter(func.left(DbTelemetry.data, 1) == struct.pack('>B', sdu)) + tm_bounds = tm_bounds.filter(func.left(DbTelemetry.data, 1) == sdu.to_bytes(SDU_PAR_LENGTH, 'big')) # quit if no start and end packet are found if tm_bounds.count() < 2: return {None: None} - tm_132 = rows.filter(DbTelemetry.stc == 13, DbTelemetry.sst == 2, - func.left(DbTelemetry.timestamp, func.length(DbTelemetry.timestamp) - 1) > starttime, - func.left(DbTelemetry.timestamp, func.length(DbTelemetry.timestamp) - 1) < endtime - ).order_by(DbTelemetry.idx) + tm_132 = rows.filter(DbTelemetry.stc == 13, DbTelemetry.sst == 2).order_by(DbTelemetry.idx) + + if starttime is not None: + tm_132 = tm_132.filter(func.left(DbTelemetry.timestamp, func.length(DbTelemetry.timestamp) - 1) >= starttime) + + if endtime is not None: + tm_132 = tm_132.filter(func.left(DbTelemetry.timestamp, func.length(DbTelemetry.timestamp) - 1) <= endtime) + + if startidx is not None: + tm_132 = tm_132.filter(DbTelemetry.idx >= startidx) + + if endidx is not None: + tm_132 = tm_132.filter(DbTelemetry.idx <= endidx) + if sdu: - tm_132 = tm_132.filter(func.left(DbTelemetry.data, 1) == struct.pack('>B', sdu)) + tm_132 = tm_132.filter(func.left(DbTelemetry.data, 1) == sdu.to_bytes(SDU_PAR_LENGTH, 'big')) # make sure to start with a 13,1 while tm_bounds[0].sst != 1: @@ -4102,58 +4120,12 @@ def collect_13(pool_name, starttime=0, endtime=None, start=None, end=None, join= return ces - # def collect(pool_name, rows=rows, starttime=0., endtime=-1, join=False, dbcon=None): - # tm13_1 = \ - # self.wait_for_tm(pool_name=pool_name, st=13, sst=1, reftime=starttime, max_wait=endtime - starttime, - # sid=sdu, sqlquery=rows, dbcon=dbcon)[1] - # if tm13_1 is not None: - # starttime2 = self.get_cuctime(tm13_1) - # else: - # return None, None - # tm13_3 = \ - # self.wait_for_tm(pool_name=pool_name, st=13, sst=3, reftime=starttime2, max_wait=endtime - starttime2, - # sid=sdu, sqlquery=rows, dbcon=dbcon)[1] - # - # if tm13_3 is None: - # return None, None - # dl_start, dl_end = self.get_cuctime(tm13_1), self.get_cuctime(tm13_3) - # # pckts = rows.filter(DbTelemetry.stc == 13, DbTelemetry.sst == 2).order_by(DbTelemetry.seq).all() - # pckts = rows.filter(DbTelemetry.stc == 13, DbTelemetry.sst == 2, - # func.left(DbTelemetry.timestamp, func.length(DbTelemetry.timestamp) - 1) <= dl_end, - # func.left(DbTelemetry.timestamp, func.length(DbTelemetry.timestamp) - 1) >= dl_start - # ).order_by(DbTelemetry.seq) - # if sdu: - # pckts = pckts.filter(func.left(DbTelemetry.data, 1) == struct.pack('>B', sdu)) - # # pckts = [pckt for pckt in pckts if pckt.raw[16]==sdu] - # # pckts = self.Tm_filter_st(pckts, st=13, sst=2, sid=sdu) - # if join: - # buf = tm13_1[21:-2] + b''.join([pckt.raw[21:-2] for pckt in pckts]) + tm13_3[21:-2] - # # buf = tm13_1[21:-2] + b''.join( - # # [pckt.raw[21:-2] for pckt in pckts if (dl_start <= float(pckt.timestamp[:-1]) - # # <= dl_end)]) + tm13_3[21:-2] - # else: - # buf = [tm13_1[21:-2]] + [pckt.raw[21:-2] for pckt in pckts] + [tm13_3[21:-2]] - # # buf = [tm13_1[21:-2]] + [pckt.raw[21:-2] for pckt in pckts if - # # (dl_start <= float(pckt.timestamp[:-1]) <= dl_end)] \ - # # + [tm13_3[21:-2]] - # # arr = np.frombuffer(buf, dtype='>u2') - # return buf, self.get_cuctime(tm13_1) - # - # ces = {} - # buf, time = collect(pool_name, starttime=starttime, endtime=endtime, join=join, dbcon=dbcon) - # ces[time] = buf - # if collect_all: - # while buf is not None: - # starttime = time + self.ldt_minimum_ce_gap - # buf, time = collect(pool_name, starttime=starttime, endtime=endtime, join=join, dbcon=dbcon) - # if buf is not None: - # ces[time] = buf - # rows.session.close() - # return ces - -def dump_large_data(pool_name, starttime=0, endtime=None, outdir="", dump_all=False, sdu=None): + +def dump_large_data(pool_name, starttime=0, endtime=None, outdir="", dump_all=False, sdu=None, startidx=None, + endidx=None): """ Dump 13,2 data to disk + @param endidx: @param pool_name: @param starttime: @param endtime: @@ -4162,139 +4134,19 @@ def dump_large_data(pool_name, starttime=0, endtime=None, outdir="", dump_all=Fa """ filedict = {} ldt_dict = collect_13(pool_name, starttime=starttime, endtime=endtime, join=True, collect_all=dump_all, - sdu=sdu) + startidx=startidx, endidx=endidx, sdu=sdu) for buf in ldt_dict: if ldt_dict[buf] is None: continue obsid, time, ftime, ctr = struct.unpack('>IIHH', ldt_dict[buf][:12]) - # if (outdir != "") and (not outdir.endswith("/")): - # outdir += "/" - with open("{}LDT_{:03d}_{:010d}_{:06d}.ce".format(outdir, obsid, time, ctr), "wb") as fdesc: + fname = os.path.join(outdir, "LDT_{:03d}_{:010d}_{:06d}.ce".format(obsid, time, ctr)) + + with open(fname, "wb") as fdesc: fdesc.write(ldt_dict[buf]) filedict[buf] = fdesc.name # return list(ldt_dict.keys()) return filedict -def create_fits(data=None, header=None, filename=None): - hdulist = pyfits.HDUList() - hdu = pyfits.PrimaryHDU() - hdu.header = header - hdulist.append(hdu) - - imagette_hdu = pyfits.ImageHDU() - stack_hdu = pyfits.ImageHDU() - margins = pyfits.ImageHDU() - - hdulist.append(imagette_hdu) - hdulist.append(stack_hdu) - hdulist.append(margins) - - if filename: - with open(filename, "wb") as fd: - hdulist.writeto(fd) - - return hdulist - - -def ce_decompress(pool_name='LIVE', outdir="", sdu=None, starttime=None, endtime=None): - global ce_decompression_on - global CEthread - global last_ce_time - # if outdir != "" and (not outdir.endswith("/")): - # outdir += "/" - checkdir = '/'.join(outdir.split('/')[:-1]) - if not os.path.exists(checkdir) and checkdir != "": - os.mkdir(checkdir) - - thread = threading.Thread(target=_ce_decompress_worker, - kwargs={'pool_name': pool_name, 'outdir': outdir, 'sdu': sdu, 'endtime': endtime}, - name="CeDecompression") - thread.daemon = True - CEthread = thread - if starttime is not None: - last_ce_time = starttime - ce_decompression_on = True - thread.start() - return thread - -def _ce_decompress_worker(pool_name, outdir="", sdu=None, endtime=None): - global ce_collect_timeout - global ldt_minimum_ce_gap - global last_ce_time - global ce_decompression_on - global IFSWDIR - - def decompress(cefile): - logger.info("Decompressing {}".format(cefile)) - fitspath = cefile[:-2] + 'fits' - if os.path.isfile(fitspath): - subprocess.run(["rm", fitspath]) - subprocess.run([IFSWDIR + "CompressionEntity/build/DecompressCe", cefile, fitspath], - stdout=open(cefile[:-2] + 'log', 'w')) - - # first, get all TM13s already complete in pool - filedict = dump_large_data(pool_name=pool_name, starttime=last_ce_time, endtime=endtime, outdir=outdir, - dump_all=True, sdu=sdu) - for ce in filedict: - last_ce_time = ce - decompress(filedict[ce]) - - while ce_decompression_on: - filedict = dump_large_data(pool_name=pool_name, starttime=last_ce_time, endtime=None, outdir=outdir, - dump_all=False, sdu=sdu) - if len(filedict) == 0: - time.sleep(ce_collect_timeout) - continue - last_ce_time, cefile = list(filedict.items())[0] - decompress(cefile) - last_ce_time += ldt_minimum_ce_gap - time.sleep(ce_collect_timeout) - -def stop_ce_decompress(): - global ce_decompression_on - ce_decompression_on = False - -def reset_ce_decompress(timestamp=0.0): - global last_ce_time - last_ce_time = timestamp - -def build_fits(basefits, newfits): - base = pyfits.open(basefits) - new = pyfits.open(newfits) - for hdu in range(len(base)): - base[hdu].data = np.concatenate([base[hdu].data, new[hdu].data]) - base.writeto(basefits, overwrite=True) - - -def convert_fullframe_to_cheopssim(fname): - """ - Convert a fullframe (1076x1033) FITS to CHEOPS-SIM format - @param fname: Input FITS file - """ - d = pyfits.open(fname) - full = np.array(np.round(d[0].data), dtype=np.uint16) - win_dict = {"SubArray": full[:, :1024, 28:28+1024], - "OverscanLeftImage": full[:, :1024, :4], - "BlankLeftImage": full[:, :1024, 4:4+8], - "DarkLeftImage": full[:, :1024, 12:28], - "DarkRightImage": full[:, :1024, 1052:1052+16], - "BlankRightImage": full[:, :1024, 1068:], - "DarkTopImage": full[:, 1024:-6, 28:-24], - "OverscanTopImage": full[:, -6:, 28:-24]} - - hdulist = pyfits.HDUList() - hdulist.append(pyfits.PrimaryHDU()) - - for win in win_dict: - hdu = pyfits.ImageHDU(data=win_dict[win], name=win) - hdulist.append(hdu) - - hdulist.append(pyfits.BinTableHDU(name="ImageMetaData")) - - hdulist.writeto(fname[:-5] + '_CHEOPSSIM.fits') - -############################################# - class TestReport: diff --git a/Ccs/ccs_main_config.cfg b/Ccs/ccs_main_config.cfg index 71048821335a83505e70f8b241c66af4b6cf0436..6359876164ceee31513989b1c3ce8717d3a53b5c 100644 --- a/Ccs/ccs_main_config.cfg +++ b/Ccs/ccs_main_config.cfg @@ -32,6 +32,7 @@ editor_host = editor_ul_port = 4242 editor_dl_port = 4343 ifsw_path = ../../../IFSW/ +ce_exec = [ccs-pus_connection] target_ip = 10.0.0.1 diff --git a/Ccs/decompression.py b/Ccs/decompression.py new file mode 100644 index 0000000000000000000000000000000000000000..de58d6d481839cbbce554ff5ac50b58038eed16b --- /dev/null +++ b/Ccs/decompression.py @@ -0,0 +1,182 @@ +import logging +import numpy as np +import os +import subprocess +import threading +import time +import astropy.io.fits as pyfits + +import confignator +import ccs_function_lib as cfl + +cfg = confignator.get_config(check_interpolation=False) +logger = logging.getLogger(__name__) +logger.setLevel(getattr(logging, cfg.get('ccs-logging', 'level').upper())) + +ce_decompressors = {} + + +def create_fits(data=None, header=None, filename=None): + hdulist = pyfits.HDUList() + hdu = pyfits.PrimaryHDU() + hdu.header = header + hdulist.append(hdu) + + imagette_hdu = pyfits.ImageHDU() + stack_hdu = pyfits.ImageHDU() + margins = pyfits.ImageHDU() + + hdulist.append(imagette_hdu) + hdulist.append(stack_hdu) + hdulist.append(margins) + + if filename: + with open(filename, "wb") as fd: + hdulist.writeto(fd) + + return hdulist + + +def build_fits(basefits, newfits): + base = pyfits.open(basefits) + new = pyfits.open(newfits) + for hdu in range(len(base)): + base[hdu].data = np.concatenate([base[hdu].data, new[hdu].data]) + base.writeto(basefits, overwrite=True) + + +def convert_fullframe_to_cheopssim(fname): + """ + Convert a fullframe (1076x1033) FITS to CHEOPS-SIM format + @param fname: Input FITS file + """ + d = pyfits.open(fname) + full = np.array(np.round(d[0].data), dtype=np.uint16) + win_dict = {"SubArray": full[:, :1024, 28:28+1024], + "OverscanLeftImage": full[:, :1024, :4], + "BlankLeftImage": full[:, :1024, 4:4+8], + "DarkLeftImage": full[:, :1024, 12:28], + "DarkRightImage": full[:, :1024, 1052:1052+16], + "BlankRightImage": full[:, :1024, 1068:], + "DarkTopImage": full[:, 1024:-6, 28:-24], + "OverscanTopImage": full[:, -6:, 28:-24]} + + hdulist = pyfits.HDUList() + hdulist.append(pyfits.PrimaryHDU()) + + for win in win_dict: + hdu = pyfits.ImageHDU(data=win_dict[win], name=win) + hdulist.append(hdu) + + hdulist.append(pyfits.BinTableHDU(name="ImageMetaData")) + + hdulist.writeto(fname[:-5] + '_CHEOPSSIM.fits') + + +def ce_decompress(outdir, pool_name=None, sdu=None, starttime=None, endtime=None, startidx=None, endidx=None, + ce_exec=None): + decomp = CeDecompress(outdir, pool_name=pool_name, sdu=sdu, starttime=starttime, endtime=endtime, startidx=startidx, + endidx=endidx, ce_exec=ce_exec) + decomp.start() + ce_decompressors[int(time.time())] = decomp + + +def ce_decompress_stop(name=None): + + if name is not None: + ce_decompressors[name].stop() + else: + for p in ce_decompressors: + ce_decompressors[p].stop() + + +class CeDecompress: + + def __init__(self, outdir, pool_name=None, sdu=None, starttime=None, endtime=None, startidx=None, endidx=None, + ce_exec=None): + self.outdir = outdir + self.pool_name = pool_name + self.sdu = sdu + self.starttime = starttime + self.endtime = endtime + self.startidx = startidx + self.endidx = endidx + + if ce_exec is None: + try: + self.ce_exec = cfg.get('ccs-misc', 'ce_exec') + except (ValueError, confignator.config.configparser.NoOptionError) as err: + raise err + else: + self.ce_exec = ce_exec + + # check if decompression is executable + if not os.access(self.ce_exec, os.X_OK): + raise PermissionError('"{}" is not executable.'.format(self.ce_exec)) + + self.ce_decompression_on = False + self.ce_thread = None + self.last_ce_time = 0 + self.ce_collect_timeout = 1 + self.ldt_minimum_ce_gap = 0.001 + + def _ce_decompress(self): + checkdir = os.path.dirname(self.outdir) + if not os.path.exists(checkdir) and checkdir != "": + os.mkdir(checkdir) + + thread = threading.Thread(target=self._ce_decompress_worker, name="CeDecompression") + thread.daemon = True + self.ce_thread = thread + if self.starttime is not None: + self.last_ce_time = self.starttime + self.ce_decompression_on = True + + try: + thread.start() + logger.info('Started CeDecompress...') + except Exception as err: + logger.error(err) + self.ce_decompression_on = False + raise err + + return thread + + def _ce_decompress_worker(self): + + def decompress(cefile): + logger.info("Decompressing {}".format(cefile)) + fitspath = cefile[:-2] + 'fits' + if os.path.isfile(fitspath): + subprocess.run(["rm", fitspath]) + subprocess.run([self.ce_exec, cefile, fitspath], stdout=open(cefile[:-2] + 'log', 'w')) + + # first, get all TM13s already complete in pool + filedict = cfl.dump_large_data(pool_name=self.pool_name, starttime=self.last_ce_time, endtime=self.endtime, + outdir=self.outdir, dump_all=True, sdu=self.sdu, startidx=self.startidx, + endidx=self.endidx) + for ce in filedict: + self.last_ce_time = ce + decompress(filedict[ce]) + + while self.ce_decompression_on: + filedict = cfl.dump_large_data(pool_name=self.pool_name, starttime=self.last_ce_time, endtime=self.endtime, + outdir=self.outdir, dump_all=False, sdu=self.sdu, startidx=self.startidx, + endidx=self.endidx) + if len(filedict) == 0: + time.sleep(self.ce_collect_timeout) + continue + self.last_ce_time, cefile = list(filedict.items())[0] + decompress(cefile) + self.last_ce_time += self.ldt_minimum_ce_gap + time.sleep(self.ce_collect_timeout) + logger.info('CeDecompress stopped.') + + def start(self): + self._ce_decompress() + + def stop(self): + self.ce_decompression_on = False + + def reset(self, timestamp=0): + self.last_ce_time = timestamp