Skip to content
Snippets Groups Projects
Select Git revision
  • ed3cbd7c5d34ad55862a11a877e455eb1aa136b7
  • consistent_config default protected
2 results

index.rst

Blame
  • decompression.py 7.15 KiB
    import logging
    import numpy as np
    import os
    import subprocess
    import threading
    import time
    import astropy.io.fits as pyfits
    
    import confignator
    import ccs_function_lib as cfl
    
    cfg = confignator.get_config(check_interpolation=False)
    logger = cfl.start_logging('Decompression')
    # logger.setLevel(getattr(logging, cfg.get('ccs-logging', 'level').upper()))
    
    CE_COLLECT_TIMEOUT = 1
    LDT_MINIMUM_CE_GAP = 0.001
    
    ce_decompressors = {}
    
    
    def create_fits(header=None, filename=None):
        hdulist = pyfits.HDUList()
        hdu = pyfits.PrimaryHDU()
        hdu.header = header
        hdulist.append(hdu)
    
        imagette_hdu = pyfits.ImageHDU()
        stack_hdu = pyfits.ImageHDU()
        margins = pyfits.ImageHDU()
    
        hdulist.append(imagette_hdu)
        hdulist.append(stack_hdu)
        hdulist.append(margins)
    
        if filename:
            with open(filename, "wb") as fd:
                hdulist.writeto(fd)
    
        return hdulist
    
    
    def build_fits(basefits, newfits):
        base = pyfits.open(basefits)
        new = pyfits.open(newfits)
        for hdu in range(len(base)):
            base[hdu].data = np.concatenate([base[hdu].data, new[hdu].data])
        base.writeto(basefits, overwrite=True)
    
    
    def convert_fullframe_to_cheopssim(fname):
        """
        Convert a fullframe (1076x1033) FITS to CHEOPS-SIM format
        @param fname: Input FITS file
        """
        d = pyfits.open(fname)
        full = np.array(np.round(d[0].data), dtype=np.uint16)
        win_dict = {"SubArray": full[:, :1024, 28:28+1024],
                    "OverscanLeftImage": full[:, :1024, :4],
                    "BlankLeftImage": full[:, :1024, 4:4+8],
                    "DarkLeftImage": full[:, :1024, 12:28],
                    "DarkRightImage": full[:, :1024, 1052:1052+16],
                    "BlankRightImage": full[:, :1024, 1068:],
                    "DarkTopImage": full[:, 1024:-6, 28:-24],
                    "OverscanTopImage": full[:, -6:, 28:-24]}
    
        hdulist = pyfits.HDUList()
        hdulist.append(pyfits.PrimaryHDU())
    
        for win in win_dict:
            hdu = pyfits.ImageHDU(data=win_dict[win], name=win)
            hdulist.append(hdu)
    
        hdulist.append(pyfits.BinTableHDU(name="ImageMetaData"))
    
        hdulist.writeto(fname[:-5] + '_CHEOPSSIM.fits')
    
    
    def ce_decompress(pool_name, outdir, sdu=None, starttime=None, endtime=None, startidx=None, endidx=None,
                      ce_exec=None, check_s13_consistency=True):
        decomp = CeDecompress(pool_name, outdir, sdu=sdu, starttime=starttime, endtime=endtime, startidx=startidx,
                              endidx=endidx, ce_exec=ce_exec, check_s13_consistency=check_s13_consistency)
        decomp.start()
    
    
    def ce_decompress_stop(name=None):
    
        if name is not None:
            ce_decompressors[name].stop()
        else:
            for p in ce_decompressors:
                ce_decompressors[p].stop()
    
    
    class CeDecompress:
    
        def __init__(self, pool_name, outdir, sdu=None, starttime=None, endtime=None, startidx=None, endidx=None,
                     ce_exec=None, check_s13_consistency=True, verbose=True):
            self.outdir = outdir
            self.pool_name = pool_name
            self.sdu = sdu
            self.starttime = starttime
            self.endtime = endtime
            self.startidx = startidx
            self.endidx = endidx
            self.check_s13_consistency = check_s13_consistency
            self.verbose = verbose
    
            self.init_time = int(time.time())
    
            if ce_exec is None:
                try:
                    self.ce_exec = cfg.get('ccs-misc', 'ce_exec')
                except (ValueError, confignator.config.configparser.NoOptionError) as err:
                    raise err
            else:
                self.ce_exec = ce_exec
    
            # check if decompression is executable
            if not os.access(self.ce_exec, os.X_OK):
                raise PermissionError('"{}" is not executable.'.format(self.ce_exec))
    
            self.ce_decompression_on = False
            self.ce_thread = None
            self.last_ce_time = 0
            self.ce_collect_timeout = CE_COLLECT_TIMEOUT
            self.ldt_minimum_ce_gap = LDT_MINIMUM_CE_GAP
    
        def _ce_decompress(self):
            checkdir = os.path.abspath(self.outdir)
            if not os.path.isdir(checkdir):  # and checkdir != "":
                # os.mkdir(checkdir)
                raise NotADirectoryError('{} is not a directory/does not exist.'.format(checkdir))
    
            self.ce_thread = threading.Thread(target=self._ce_decompress_worker, name="CeDecompression_{}".format(self.init_time))
            self.ce_thread.daemon = True
            if self.starttime is not None:
                self.last_ce_time = self.starttime
            self.ce_decompression_on = True
    
            try:
                self.ce_thread.start()
                logger.info('Started CeDecompress [{}]...'.format(self.init_time))
            except Exception as err:
                logger.error(err)
                self.ce_decompression_on = False
                raise err
    
        def _ce_decompress_worker(self):
    
            def decompress(cefile):
                logger.info("Decompressing {}".format(cefile))
                fitspath = cefile[:-2] + 'fits'
                if os.path.isfile(fitspath):
                    subprocess.run(["rm", fitspath])
                with open(cefile[:-2] + 'log', 'w') as logfd:
                    subprocess.run([self.ce_exec, cefile, fitspath], stdout=logfd, stderr=logfd)
    
            # first, get all TM13s already complete in pool
            try:
                filedict = cfl.dump_large_data(pool_name=self.pool_name, starttime=self.last_ce_time, endtime=self.endtime,
                                               outdir=self.outdir, dump_all=True, sdu=self.sdu, startidx=self.startidx,
                                               endidx=self.endidx, consistency_check=self.check_s13_consistency,
                                               verbose=self.verbose)
                for ce in filedict:
                    decompress(filedict[ce])
                    self.last_ce_time = ce
    
                self.last_ce_time += self.ldt_minimum_ce_gap
    
            except (ValueError, TypeError, AttributeError) as err:
                ce_decompressors.pop(self.init_time)
                raise err
    
            while self.ce_decompression_on:
                filedict = cfl.dump_large_data(pool_name=self.pool_name, starttime=self.last_ce_time, endtime=self.endtime,
                                               outdir=self.outdir, dump_all=True, sdu=self.sdu, startidx=self.startidx,
                                               endidx=self.endidx, consistency_check=self.check_s13_consistency,
                                               verbose=self.verbose)
                if len(filedict) == 0:
                    time.sleep(self.ce_collect_timeout)
                    continue
    
                for ce in filedict:
                    self.last_ce_time, cefile = ce, filedict[ce]
                    decompress(cefile)
    
                    if not self.ce_decompression_on:
                        break
    
                # self.last_ce_time, cefile = list(filedict.items())[0]
                # decompress(cefile)
                self.last_ce_time += self.ldt_minimum_ce_gap
                time.sleep(self.ce_collect_timeout)
            logger.info('CeDecompress stopped [{}].'.format(self.init_time))
            ce_decompressors.pop(self.init_time)
    
        def start(self):
            self._ce_decompress()
    
            global ce_decompressors
            ce_decompressors[self.init_time] = self
    
        def stop(self):
            self.ce_decompression_on = False
    
        def reset(self, timestamp=0):
            self.last_ce_time = timestamp