Select Git revision
decompression.py
decompression.py 7.15 KiB
import logging
import numpy as np
import os
import subprocess
import threading
import time
import astropy.io.fits as pyfits
import confignator
import ccs_function_lib as cfl
cfg = confignator.get_config(check_interpolation=False)
logger = cfl.start_logging('Decompression')
# logger.setLevel(getattr(logging, cfg.get('ccs-logging', 'level').upper()))
CE_COLLECT_TIMEOUT = 1
LDT_MINIMUM_CE_GAP = 0.001
ce_decompressors = {}
def create_fits(header=None, filename=None):
hdulist = pyfits.HDUList()
hdu = pyfits.PrimaryHDU()
hdu.header = header
hdulist.append(hdu)
imagette_hdu = pyfits.ImageHDU()
stack_hdu = pyfits.ImageHDU()
margins = pyfits.ImageHDU()
hdulist.append(imagette_hdu)
hdulist.append(stack_hdu)
hdulist.append(margins)
if filename:
with open(filename, "wb") as fd:
hdulist.writeto(fd)
return hdulist
def build_fits(basefits, newfits):
base = pyfits.open(basefits)
new = pyfits.open(newfits)
for hdu in range(len(base)):
base[hdu].data = np.concatenate([base[hdu].data, new[hdu].data])
base.writeto(basefits, overwrite=True)
def convert_fullframe_to_cheopssim(fname):
"""
Convert a fullframe (1076x1033) FITS to CHEOPS-SIM format
@param fname: Input FITS file
"""
d = pyfits.open(fname)
full = np.array(np.round(d[0].data), dtype=np.uint16)
win_dict = {"SubArray": full[:, :1024, 28:28+1024],
"OverscanLeftImage": full[:, :1024, :4],
"BlankLeftImage": full[:, :1024, 4:4+8],
"DarkLeftImage": full[:, :1024, 12:28],
"DarkRightImage": full[:, :1024, 1052:1052+16],
"BlankRightImage": full[:, :1024, 1068:],
"DarkTopImage": full[:, 1024:-6, 28:-24],
"OverscanTopImage": full[:, -6:, 28:-24]}
hdulist = pyfits.HDUList()
hdulist.append(pyfits.PrimaryHDU())
for win in win_dict:
hdu = pyfits.ImageHDU(data=win_dict[win], name=win)
hdulist.append(hdu)
hdulist.append(pyfits.BinTableHDU(name="ImageMetaData"))
hdulist.writeto(fname[:-5] + '_CHEOPSSIM.fits')
def ce_decompress(pool_name, outdir, sdu=None, starttime=None, endtime=None, startidx=None, endidx=None,
ce_exec=None, check_s13_consistency=True):
decomp = CeDecompress(pool_name, outdir, sdu=sdu, starttime=starttime, endtime=endtime, startidx=startidx,
endidx=endidx, ce_exec=ce_exec, check_s13_consistency=check_s13_consistency)
decomp.start()
def ce_decompress_stop(name=None):
if name is not None:
ce_decompressors[name].stop()
else:
for p in ce_decompressors:
ce_decompressors[p].stop()
class CeDecompress:
def __init__(self, pool_name, outdir, sdu=None, starttime=None, endtime=None, startidx=None, endidx=None,
ce_exec=None, check_s13_consistency=True, verbose=True):
self.outdir = outdir
self.pool_name = pool_name
self.sdu = sdu
self.starttime = starttime
self.endtime = endtime
self.startidx = startidx
self.endidx = endidx
self.check_s13_consistency = check_s13_consistency
self.verbose = verbose
self.init_time = int(time.time())
if ce_exec is None:
try:
self.ce_exec = cfg.get('ccs-misc', 'ce_exec')
except (ValueError, confignator.config.configparser.NoOptionError) as err:
raise err
else:
self.ce_exec = ce_exec
# check if decompression is executable
if not os.access(self.ce_exec, os.X_OK):
raise PermissionError('"{}" is not executable.'.format(self.ce_exec))
self.ce_decompression_on = False
self.ce_thread = None
self.last_ce_time = 0
self.ce_collect_timeout = CE_COLLECT_TIMEOUT
self.ldt_minimum_ce_gap = LDT_MINIMUM_CE_GAP
def _ce_decompress(self):
checkdir = os.path.abspath(self.outdir)
if not os.path.isdir(checkdir): # and checkdir != "":
# os.mkdir(checkdir)
raise NotADirectoryError('{} is not a directory/does not exist.'.format(checkdir))
self.ce_thread = threading.Thread(target=self._ce_decompress_worker, name="CeDecompression_{}".format(self.init_time))
self.ce_thread.daemon = True
if self.starttime is not None:
self.last_ce_time = self.starttime
self.ce_decompression_on = True
try:
self.ce_thread.start()
logger.info('Started CeDecompress [{}]...'.format(self.init_time))
except Exception as err:
logger.error(err)
self.ce_decompression_on = False
raise err
def _ce_decompress_worker(self):
def decompress(cefile):
logger.info("Decompressing {}".format(cefile))
fitspath = cefile[:-2] + 'fits'
if os.path.isfile(fitspath):
subprocess.run(["rm", fitspath])
with open(cefile[:-2] + 'log', 'w') as logfd:
subprocess.run([self.ce_exec, cefile, fitspath], stdout=logfd, stderr=logfd)
# first, get all TM13s already complete in pool
try:
filedict = cfl.dump_large_data(pool_name=self.pool_name, starttime=self.last_ce_time, endtime=self.endtime,
outdir=self.outdir, dump_all=True, sdu=self.sdu, startidx=self.startidx,
endidx=self.endidx, consistency_check=self.check_s13_consistency,
verbose=self.verbose)
for ce in filedict:
decompress(filedict[ce])
self.last_ce_time = ce
self.last_ce_time += self.ldt_minimum_ce_gap
except (ValueError, TypeError, AttributeError) as err:
ce_decompressors.pop(self.init_time)
raise err
while self.ce_decompression_on:
filedict = cfl.dump_large_data(pool_name=self.pool_name, starttime=self.last_ce_time, endtime=self.endtime,
outdir=self.outdir, dump_all=True, sdu=self.sdu, startidx=self.startidx,
endidx=self.endidx, consistency_check=self.check_s13_consistency,
verbose=self.verbose)
if len(filedict) == 0:
time.sleep(self.ce_collect_timeout)
continue
for ce in filedict:
self.last_ce_time, cefile = ce, filedict[ce]
decompress(cefile)
if not self.ce_decompression_on:
break
# self.last_ce_time, cefile = list(filedict.items())[0]
# decompress(cefile)
self.last_ce_time += self.ldt_minimum_ce_gap
time.sleep(self.ce_collect_timeout)
logger.info('CeDecompress stopped [{}].'.format(self.init_time))
ce_decompressors.pop(self.init_time)
def start(self):
self._ce_decompress()
global ce_decompressors
ce_decompressors[self.init_time] = self
def stop(self):
self.ce_decompression_on = False
def reset(self, timestamp=0):
self.last_ce_time = timestamp