Skip to content
Snippets Groups Projects
Commit 3db721d6 authored by Marko Mecina's avatar Marko Mecina
Browse files

minor fixes in decompression error handling

parent c474996e
No related branches found
No related tags found
No related merge requests found
...@@ -81,7 +81,6 @@ def ce_decompress(outdir, pool_name=None, sdu=None, starttime=None, endtime=None ...@@ -81,7 +81,6 @@ def ce_decompress(outdir, pool_name=None, sdu=None, starttime=None, endtime=None
decomp = CeDecompress(outdir, pool_name=pool_name, sdu=sdu, starttime=starttime, endtime=endtime, startidx=startidx, decomp = CeDecompress(outdir, pool_name=pool_name, sdu=sdu, starttime=starttime, endtime=endtime, startidx=startidx,
endidx=endidx, ce_exec=ce_exec) endidx=endidx, ce_exec=ce_exec)
decomp.start() decomp.start()
ce_decompressors[decomp.init_time] = decomp
def ce_decompress_stop(name=None): def ce_decompress_stop(name=None):
...@@ -125,6 +124,9 @@ class CeDecompress: ...@@ -125,6 +124,9 @@ class CeDecompress:
self.ce_collect_timeout = CE_COLLECT_TIMEOUT self.ce_collect_timeout = CE_COLLECT_TIMEOUT
self.ldt_minimum_ce_gap = LDT_MINIMUM_CE_GAP self.ldt_minimum_ce_gap = LDT_MINIMUM_CE_GAP
global ce_decompressors
ce_decompressors[self.init_time] = self
def _ce_decompress(self): def _ce_decompress(self):
checkdir = os.path.dirname(self.outdir) checkdir = os.path.dirname(self.outdir)
if not os.path.exists(checkdir) and checkdir != "": if not os.path.exists(checkdir) and checkdir != "":
...@@ -161,14 +163,13 @@ class CeDecompress: ...@@ -161,14 +163,13 @@ class CeDecompress:
filedict = cfl.dump_large_data(pool_name=self.pool_name, starttime=self.last_ce_time, endtime=self.endtime, filedict = cfl.dump_large_data(pool_name=self.pool_name, starttime=self.last_ce_time, endtime=self.endtime,
outdir=self.outdir, dump_all=True, sdu=self.sdu, startidx=self.startidx, outdir=self.outdir, dump_all=True, sdu=self.sdu, startidx=self.startidx,
endidx=self.endidx) endidx=self.endidx)
except ValueError as err: for ce in filedict:
self.last_ce_time = ce
decompress(filedict[ce])
except (ValueError, TypeError, AttributeError) as err:
ce_decompressors.pop(self.init_time) ce_decompressors.pop(self.init_time)
raise err raise err
for ce in filedict:
self.last_ce_time = ce
decompress(filedict[ce])
while self.ce_decompression_on: while self.ce_decompression_on:
filedict = cfl.dump_large_data(pool_name=self.pool_name, starttime=self.last_ce_time, endtime=self.endtime, filedict = cfl.dump_large_data(pool_name=self.pool_name, starttime=self.last_ce_time, endtime=self.endtime,
outdir=self.outdir, dump_all=False, sdu=self.sdu, startidx=self.startidx, outdir=self.outdir, dump_all=False, sdu=self.sdu, startidx=self.startidx,
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment