Skip to content
Snippets Groups Projects
Commit f53ca014 authored by Dominik Loidolt's avatar Dominik Loidolt
Browse files

add cmp_tool test for --rdcu_par --rdcu_pkt --last_info options

parent 6932c1df
No related branches found
No related tags found
No related merge requests found
......@@ -5,6 +5,7 @@ import shlex
import sys
import os
import math
import shutil
from datetime import datetime
from datetime import timedelta
......@@ -15,6 +16,7 @@ EXIT_FAILURE = 1
EXIT_SUCCESS = 0
DATA_TYPE_IMAGETTE = 1
DATA_TYPE_IMAGETTE_ADAPTIVE = 2
GENERIC_HEADER_SIZE = 32
IMAGETTE_HEADER_SIZE = GENERIC_HEADER_SIZE+4
......@@ -53,6 +55,14 @@ def del_file(filePath):
print("The file %s could not be deleted because it does not exist!" % (filePath))
def del_directory(directoryPath):
# Try to remove the tree; if it fails, throw an error using try...except.
try:
shutil.rmtree(directoryPath)
except OSError as e:
print("Error: %s - %s." % (e.filename, e.strerror))
def cuc_timestamp(now):
epoch = datetime(2020, 1, 1)
timestamp = (now - epoch).total_seconds()
......@@ -85,7 +95,7 @@ def read_in_cmp_header(compressed_string):
'ap1_golomb_par' : { 'value': -1, 'bits': 8 },
'ap2_spill_used' : { 'value': -1, 'bits': 16 },
'ap2_golomb_par' : { 'value': -1, 'bits': 8 },
'spare_ap_ima' : { 'value': -1, 'bits': 8 },
'spare_ap_ima' : { 'value': -1, 'bits': 24 },
'spill_1_used' : { 'value': -1, 'bits': 24 },
'cmp_par_1_used' : { 'value': -1, 'bits': 16 },
'spill_2_used' : { 'value': -1, 'bits': 24 },
......@@ -130,7 +140,7 @@ def read_in_cmp_header(compressed_string):
assert(l/2==IMAGETTE_HEADER_SIZE)
break
if data_type == 2:
assert(l/2== NON_IMAGETTE_HEADER_SIZE)
assert(l/2==IMAGETTE_ADAPTIVE_HEADER_SIZE)
# Non-Imagette Headers
elif data_type < 24:
......@@ -191,8 +201,7 @@ General Options:
-o <prefix> Use the <prefix> for output files
-n, --model_cfg Print a default model configuration and exit
--diff_cfg Print a default 1d-differencing configuration and exit
--binary Read and write files in binary format
--no_header Do not add a compression entity header in front of the compressed data
-b, --binary Read and write files in binary format
-a, --rdcu_par Add additional RDCU control parameters
-V, --version Print program version and exit
-v, --verbose Print various debugging information
......@@ -200,6 +209,7 @@ Compression Options:
-c <file> File containing the compressing configuration
-d <file> File containing the data to be compressed
-m <file> File containing the model of the data to be compressed
--no_header Do not add a compression entity header in front of the compressed data
--rdcu_pkt Generate RMAP packets for an RDCU compression
--last_info <.info file> Generate RMAP packets for an RDCU compression with parallel read of the last results
Decompression Options:
......@@ -213,12 +223,12 @@ Guessing Options:
--guess_level <level> Set guess level to <level> (optional)
""" % (PATH_CMP_TOOL)
welcome_str = "### PLATO Compression/Decompression Tool Version %s ###"% (VERSION)
CMP_START_STR = \
"""#########################################################
### PLATO Compression/Decompression Tool Version %s ###
#########################################################
""" % (VERSION)
'#'*len(welcome_str)+'\n'+ \
welcome_str+'\n'+ \
'#'*len(welcome_str)+'\n'
CMP_START_STR_CMP = CMP_START_STR + "## Starting the compression ##\n"
CMP_START_STR_DECMP = CMP_START_STR + "## Starting the decompression ##\n"
......@@ -480,7 +490,7 @@ def test_model_compression():
# generate test configuration
with open(cfg_file_name, 'w', encoding='utf-8') as f:
returncode, stdout, stderr = call_cmp_tool("--model_cfg")
returncode, stdout, stderr = call_cmp_tool("--model_cfg --rdcu_par")
assert(returncode == EXIT_SUCCESS)
assert(stderr == "")
cfg = parse_key_value(stdout)
......@@ -488,11 +498,14 @@ def test_model_compression():
cfg['model_value'] = '0'
cfg["samples"] = '5'
cfg["buffer_length"] = '2'
cfg["ap1_golomb_par"] = '20'
cfg["ap1_spill"] = '70'
cfg["ap2_golomb_par"] = '63'
cfg["ap1_spill"] = '6'
for key, value in cfg.items():
f.write(key + ' = ' + str(value) + '\n')
add_args = [" --no_header", "", " --binary"]
add_args = [" --no_header", "", " --binary"]
add_args = [" --no_header", " --no_header --rdcu_par", "", " --binary"]
for add_arg in add_args:
if "--binary" in add_arg:
with open(data_file_name, 'wb') as f:
......@@ -512,7 +525,7 @@ def test_model_compression():
"Compress data ... DONE\n" +
"Write compressed data to file %s.cmp ... DONE\n" % (output_prefix1) +
"%s" % ((lambda arg : "Write decompression information to file %s.info ... DONE\n" % (output_prefix1)
if arg == " --no_header" else "")(add_arg)) +
if " --no_header" in add_arg else "")(add_arg)) +
"Write updated model to file %s_upmodel.dat ... DONE\n" % (output_prefix1)
)
assert(returncode == EXIT_SUCCESS)
......@@ -532,6 +545,11 @@ def test_model_compression():
assert(info['samples_used'] == cfg['samples'])
assert(info['cmp_size'] == '15')
assert(info['cmp_err'] == '0')
if "--rdcu_par" in add_arg:
assert(info['ap1_cmp_size'] == '25')
assert(info['ap2_cmp_size'] == '35')
assert(info['rdcu_new_model_adr_used'] == cfg['rdcu_new_model_adr'])
assert(info['rdcu_cmp_adr_used'] == cfg['rdcu_buffer_adr'])
else:
if not "--binary" in add_arg:
with open(output_prefix1+".cmp", encoding='utf-8') as f:
......@@ -541,13 +559,13 @@ def test_model_compression():
header = read_in_cmp_header(bytearray(f.read()).hex())
assert(header['asw_version_id']['value'] == VERSION.split('-')[0])
assert(header['cmp_ent_size']['value'] == IMAGETTE_HEADER_SIZE+4)
assert(header['cmp_ent_size']['value'] == IMAGETTE_ADAPTIVE_HEADER_SIZE+4)
assert(header['original_size']['value'] == 10)
# todo
assert(header['start_time']['value'] < cuc_timestamp(datetime.utcnow()))
#todo
assert(header['end_timestamp']['value'] < cuc_timestamp(datetime.utcnow()))
assert(header['data_type']['value'] == DATA_TYPE_IMAGETTE)
assert(header['data_type']['value'] == DATA_TYPE_IMAGETTE_ADAPTIVE)
assert(header['cmp_mode_used']['value'] == 3)
assert(header['model_value_used']['value'] == int(cfg['model_value']))
assert(header['model_id']['value'] == 53264)
......@@ -1293,7 +1311,102 @@ def test_model_fiel_erros():
del_file(output_prefix+"_upmodel.dat")
# TODO: random test
# TODO: random test
# TODO: random test
def test_rdcu_pkt():
# generate test data
data = '00 01 00 02 00 03 00 04 00 05 \n'
model = '00 00 00 01 00 02 00 03 00 04 \n'
data_file_name = 'data_pkt.dat'
model_file_name = 'model_pkt.dat'
cfg_file_name = 'pkt.cfg'
output_prefix1 = 'pkt_cmp'
output_prefix2 = 'pkt_decmp'
try:
# generate test configuration
with open(cfg_file_name, 'w', encoding='utf-8') as f:
returncode, stdout, stderr = call_cmp_tool("--model_cfg --rdcu_par")
assert(stderr == "")
assert(returncode == EXIT_SUCCESS)
f.write(stdout)
cfg = parse_key_value(stdout)
with open(data_file_name, 'w', encoding='utf-8') as f:
f.write(data)
with open(model_file_name, 'w', encoding='utf-8') as f:
f.write(model)
add_args = [" --rdcu_pkt", " --last_info "+output_prefix1+".info"]
for add_arg in add_args:
# compression
returncode, stdout, stderr = call_cmp_tool(
" -c "+cfg_file_name+" -d "+data_file_name+" -m "+model_file_name+" -o " +output_prefix1+add_arg)
assert(stderr == "")
assert(stdout == CMP_START_STR_CMP +
"Importing configuration file %s ... DONE\n" % (cfg_file_name) +
"Importing data file %s ... \n" % (data_file_name) +
"No samples parameter set. Use samples = 5.\n" +
"... DONE\n" +
"Importing model file %s ... DONE\n" % (model_file_name) +
"No buffer_length parameter set. Use buffer_length = 12 as compression buffer size.\n" +
"Generate compression setup packets ...\n" +
"Use ICU_ADDR = 0XA7, RDCU_ADDR = 0XFE and MTU = 4224 for the RAMP packets.\n"
"... DONE\n" +
"Compress data ... DONE\n" +
"Generate the read results packets ... DONE\n"
"Write compressed data to file %s.cmp ... DONE\n" % (output_prefix1) +
"Write decompression information to file %s.info ... DONE\n" % (output_prefix1) +
"Write updated model to file %s_upmodel.dat ... DONE\n" % (output_prefix1)
)
assert(returncode == EXIT_SUCCESS)
# check compressed data
with open(output_prefix1+".cmp", encoding='utf-8') as f:
assert(f.read() == "49 24 00 00 \n")
# check info file
with open(output_prefix1+".info", encoding='utf-8') as f:
info = parse_key_value(f.read())
assert(info['cmp_mode_used'] == '3')
assert(info['model_value_used'] == cfg['model_value'])
assert(info['round_used'] == cfg['round'])
assert(info['spill_used'] == cfg['spill'])
assert(info['golomb_par_used'] == cfg['golomb_par'])
assert(info['samples_used'] == '5')
assert(info['cmp_size'] == '15')
assert(info['cmp_err'] == '0')
assert(info['ap1_cmp_size'] == '15')
assert(info['ap2_cmp_size'] == '15')
assert(info['rdcu_new_model_adr_used'] == cfg['rdcu_new_model_adr'])
assert(info['rdcu_cmp_adr_used'] == cfg['rdcu_buffer_adr'])
# decompression
returncode, stdout, stderr = call_cmp_tool(
" -i "+output_prefix1+".info -d "+output_prefix1+".cmp -m "+model_file_name+" -o "+output_prefix2)
assert(stderr == "")
assert(stdout == CMP_START_STR_DECMP +
"Importing decompression information file %s.info ... DONE\n" % (output_prefix1) +
"Importing compressed data file %s.cmp ... DONE\n" % (output_prefix1) +
"Importing model file %s ... DONE\n" % (model_file_name) +
"Decompress data ... DONE\n" +
"Write decompressed data to file %s.dat ... DONE\n" % (output_prefix2) +
"Write updated model to file %s_upmodel.dat ... DONE\n" % (output_prefix2))
assert(returncode == EXIT_SUCCESS)
with open(output_prefix2+".dat", encoding='utf-8') as f:
assert(f.read() == data)
with open(output_prefix1+"_upmodel.dat", encoding='utf-8') as f1:
with open(output_prefix2+"_upmodel.dat", encoding='utf-8') as f2:
assert(f1.read() == f2.read())
finally:
pass
del_directory('TC_FILES')
del_file(data_file_name)
del_file(cfg_file_name)
del_file(model_file_name)
del_file(output_prefix1+'.cmp')
del_file(output_prefix1+'.info')
del_file(output_prefix1+'_upmodel.dat')
del_file(output_prefix2+'.dat')
del_file(output_prefix2+'_upmodel.dat')
# TODO: random test
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment