diff --git a/test/cmp_tool/cmp_tool_integration_test.py b/test/cmp_tool/cmp_tool_integration_test.py index 4abae437b63d61e13297618efd209b59bd709ed8..5b06fb3b693458580780e1026c0afebb9b6b1fbf 100755 --- a/test/cmp_tool/cmp_tool_integration_test.py +++ b/test/cmp_tool/cmp_tool_integration_test.py @@ -5,6 +5,7 @@ import shlex import sys import os import math +import shutil from datetime import datetime from datetime import timedelta @@ -15,6 +16,7 @@ EXIT_FAILURE = 1 EXIT_SUCCESS = 0 DATA_TYPE_IMAGETTE = 1 +DATA_TYPE_IMAGETTE_ADAPTIVE = 2 GENERIC_HEADER_SIZE = 32 IMAGETTE_HEADER_SIZE = GENERIC_HEADER_SIZE+4 @@ -53,6 +55,14 @@ def del_file(filePath): print("The file %s could not be deleted because it does not exist!" % (filePath)) +def del_directory(directoryPath): + # Try to remove the tree; if it fails, throw an error using try...except. + try: + shutil.rmtree(directoryPath) + except OSError as e: + print("Error: %s - %s." % (e.filename, e.strerror)) + + def cuc_timestamp(now): epoch = datetime(2020, 1, 1) timestamp = (now - epoch).total_seconds() @@ -85,7 +95,7 @@ def read_in_cmp_header(compressed_string): 'ap1_golomb_par' : { 'value': -1, 'bits': 8 }, 'ap2_spill_used' : { 'value': -1, 'bits': 16 }, 'ap2_golomb_par' : { 'value': -1, 'bits': 8 }, - 'spare_ap_ima' : { 'value': -1, 'bits': 8 }, + 'spare_ap_ima' : { 'value': -1, 'bits': 24 }, 'spill_1_used' : { 'value': -1, 'bits': 24 }, 'cmp_par_1_used' : { 'value': -1, 'bits': 16 }, 'spill_2_used' : { 'value': -1, 'bits': 24 }, @@ -130,7 +140,7 @@ def read_in_cmp_header(compressed_string): assert(l/2==IMAGETTE_HEADER_SIZE) break if data_type == 2: - assert(l/2== NON_IMAGETTE_HEADER_SIZE) + assert(l/2==IMAGETTE_ADAPTIVE_HEADER_SIZE) # Non-Imagette Headers elif data_type < 24: @@ -191,8 +201,7 @@ General Options: -o <prefix> Use the <prefix> for output files -n, --model_cfg Print a default model configuration and exit --diff_cfg Print a default 1d-differencing configuration and exit - --binary Read and write files in binary format - --no_header Do not add a compression entity header in front of the compressed data + -b, --binary Read and write files in binary format -a, --rdcu_par Add additional RDCU control parameters -V, --version Print program version and exit -v, --verbose Print various debugging information @@ -200,6 +209,7 @@ Compression Options: -c <file> File containing the compressing configuration -d <file> File containing the data to be compressed -m <file> File containing the model of the data to be compressed + --no_header Do not add a compression entity header in front of the compressed data --rdcu_pkt Generate RMAP packets for an RDCU compression --last_info <.info file> Generate RMAP packets for an RDCU compression with parallel read of the last results Decompression Options: @@ -213,12 +223,12 @@ Guessing Options: --guess_level <level> Set guess level to <level> (optional) """ % (PATH_CMP_TOOL) +welcome_str = "### PLATO Compression/Decompression Tool Version %s ###"% (VERSION) CMP_START_STR = \ -"""######################################################### -### PLATO Compression/Decompression Tool Version %s ### -######################################################### -""" % (VERSION) + '#'*len(welcome_str)+'\n'+ \ + welcome_str+'\n'+ \ + '#'*len(welcome_str)+'\n' CMP_START_STR_CMP = CMP_START_STR + "## Starting the compression ##\n" CMP_START_STR_DECMP = CMP_START_STR + "## Starting the decompression ##\n" @@ -480,7 +490,7 @@ def test_model_compression(): # generate test configuration with open(cfg_file_name, 'w', encoding='utf-8') as f: - returncode, stdout, stderr = call_cmp_tool("--model_cfg") + returncode, stdout, stderr = call_cmp_tool("--model_cfg --rdcu_par") assert(returncode == EXIT_SUCCESS) assert(stderr == "") cfg = parse_key_value(stdout) @@ -488,11 +498,14 @@ def test_model_compression(): cfg['model_value'] = '0' cfg["samples"] = '5' cfg["buffer_length"] = '2' + cfg["ap1_golomb_par"] = '20' + cfg["ap1_spill"] = '70' + cfg["ap2_golomb_par"] = '63' + cfg["ap1_spill"] = '6' for key, value in cfg.items(): f.write(key + ' = ' + str(value) + '\n') - add_args = [" --no_header", "", " --binary"] - add_args = [" --no_header", "", " --binary"] + add_args = [" --no_header", " --no_header --rdcu_par", "", " --binary"] for add_arg in add_args: if "--binary" in add_arg: with open(data_file_name, 'wb') as f: @@ -512,26 +525,31 @@ def test_model_compression(): "Compress data ... DONE\n" + "Write compressed data to file %s.cmp ... DONE\n" % (output_prefix1) + "%s" % ((lambda arg : "Write decompression information to file %s.info ... DONE\n" % (output_prefix1) - if arg == " --no_header" else "")(add_arg)) + + if " --no_header" in add_arg else "")(add_arg)) + "Write updated model to file %s_upmodel.dat ... DONE\n" % (output_prefix1) ) assert(returncode == EXIT_SUCCESS) - if "--no_header" in add_arg: - # check compressed data - with open(output_prefix1+".cmp", encoding='utf-8') as f: - assert(f.read() == "49 24 00 00 \n") - # check info file - with open(output_prefix1+".info", encoding='utf-8') as f: - info = parse_key_value(f.read()) - assert(info['cmp_mode_used'] == '3') - assert(info['model_value_used'] == cfg['model_value']) - assert(info['round_used'] == cfg['round']) - assert(info['spill_used'] == cfg['spill']) - assert(info['golomb_par_used'] == cfg['golomb_par']) - assert(info['samples_used'] == cfg['samples']) - assert(info['cmp_size'] == '15') - assert(info['cmp_err'] == '0') + if "--no_header" in add_arg: + # check compressed data + with open(output_prefix1+".cmp", encoding='utf-8') as f: + assert(f.read() == "49 24 00 00 \n") + # check info file + with open(output_prefix1+".info", encoding='utf-8') as f: + info = parse_key_value(f.read()) + assert(info['cmp_mode_used'] == '3') + assert(info['model_value_used'] == cfg['model_value']) + assert(info['round_used'] == cfg['round']) + assert(info['spill_used'] == cfg['spill']) + assert(info['golomb_par_used'] == cfg['golomb_par']) + assert(info['samples_used'] == cfg['samples']) + assert(info['cmp_size'] == '15') + assert(info['cmp_err'] == '0') + if "--rdcu_par" in add_arg: + assert(info['ap1_cmp_size'] == '25') + assert(info['ap2_cmp_size'] == '35') + assert(info['rdcu_new_model_adr_used'] == cfg['rdcu_new_model_adr']) + assert(info['rdcu_cmp_adr_used'] == cfg['rdcu_buffer_adr']) else: if not "--binary" in add_arg: with open(output_prefix1+".cmp", encoding='utf-8') as f: @@ -541,13 +559,13 @@ def test_model_compression(): header = read_in_cmp_header(bytearray(f.read()).hex()) assert(header['asw_version_id']['value'] == VERSION.split('-')[0]) - assert(header['cmp_ent_size']['value'] == IMAGETTE_HEADER_SIZE+4) + assert(header['cmp_ent_size']['value'] == IMAGETTE_ADAPTIVE_HEADER_SIZE+4) assert(header['original_size']['value'] == 10) # todo assert(header['start_time']['value'] < cuc_timestamp(datetime.utcnow())) #todo assert(header['end_timestamp']['value'] < cuc_timestamp(datetime.utcnow())) - assert(header['data_type']['value'] == DATA_TYPE_IMAGETTE) + assert(header['data_type']['value'] == DATA_TYPE_IMAGETTE_ADAPTIVE) assert(header['cmp_mode_used']['value'] == 3) assert(header['model_value_used']['value'] == int(cfg['model_value'])) assert(header['model_id']['value'] == 53264) @@ -1293,7 +1311,102 @@ def test_model_fiel_erros(): del_file(output_prefix+"_upmodel.dat") -# TODO: random test -# TODO: random test -# TODO: random test +def test_rdcu_pkt(): + # generate test data + data = '00 01 00 02 00 03 00 04 00 05 \n' + model = '00 00 00 01 00 02 00 03 00 04 \n' + data_file_name = 'data_pkt.dat' + model_file_name = 'model_pkt.dat' + cfg_file_name = 'pkt.cfg' + output_prefix1 = 'pkt_cmp' + output_prefix2 = 'pkt_decmp' + + try: + # generate test configuration + with open(cfg_file_name, 'w', encoding='utf-8') as f: + returncode, stdout, stderr = call_cmp_tool("--model_cfg --rdcu_par") + assert(stderr == "") + assert(returncode == EXIT_SUCCESS) + f.write(stdout) + cfg = parse_key_value(stdout) + with open(data_file_name, 'w', encoding='utf-8') as f: + f.write(data) + with open(model_file_name, 'w', encoding='utf-8') as f: + f.write(model) + + add_args = [" --rdcu_pkt", " --last_info "+output_prefix1+".info"] + for add_arg in add_args: + # compression + returncode, stdout, stderr = call_cmp_tool( + " -c "+cfg_file_name+" -d "+data_file_name+" -m "+model_file_name+" -o " +output_prefix1+add_arg) + assert(stderr == "") + assert(stdout == CMP_START_STR_CMP + + "Importing configuration file %s ... DONE\n" % (cfg_file_name) + + "Importing data file %s ... \n" % (data_file_name) + + "No samples parameter set. Use samples = 5.\n" + + "... DONE\n" + + "Importing model file %s ... DONE\n" % (model_file_name) + + "No buffer_length parameter set. Use buffer_length = 12 as compression buffer size.\n" + + "Generate compression setup packets ...\n" + + "Use ICU_ADDR = 0XA7, RDCU_ADDR = 0XFE and MTU = 4224 for the RAMP packets.\n" + "... DONE\n" + + "Compress data ... DONE\n" + + "Generate the read results packets ... DONE\n" + "Write compressed data to file %s.cmp ... DONE\n" % (output_prefix1) + + "Write decompression information to file %s.info ... DONE\n" % (output_prefix1) + + "Write updated model to file %s_upmodel.dat ... DONE\n" % (output_prefix1) + ) + assert(returncode == EXIT_SUCCESS) + # check compressed data + with open(output_prefix1+".cmp", encoding='utf-8') as f: + assert(f.read() == "49 24 00 00 \n") + # check info file + with open(output_prefix1+".info", encoding='utf-8') as f: + info = parse_key_value(f.read()) + assert(info['cmp_mode_used'] == '3') + assert(info['model_value_used'] == cfg['model_value']) + assert(info['round_used'] == cfg['round']) + assert(info['spill_used'] == cfg['spill']) + assert(info['golomb_par_used'] == cfg['golomb_par']) + assert(info['samples_used'] == '5') + assert(info['cmp_size'] == '15') + assert(info['cmp_err'] == '0') + assert(info['ap1_cmp_size'] == '15') + assert(info['ap2_cmp_size'] == '15') + assert(info['rdcu_new_model_adr_used'] == cfg['rdcu_new_model_adr']) + assert(info['rdcu_cmp_adr_used'] == cfg['rdcu_buffer_adr']) + + # decompression + returncode, stdout, stderr = call_cmp_tool( + " -i "+output_prefix1+".info -d "+output_prefix1+".cmp -m "+model_file_name+" -o "+output_prefix2) + assert(stderr == "") + assert(stdout == CMP_START_STR_DECMP + + "Importing decompression information file %s.info ... DONE\n" % (output_prefix1) + + "Importing compressed data file %s.cmp ... DONE\n" % (output_prefix1) + + "Importing model file %s ... DONE\n" % (model_file_name) + + "Decompress data ... DONE\n" + + "Write decompressed data to file %s.dat ... DONE\n" % (output_prefix2) + + "Write updated model to file %s_upmodel.dat ... DONE\n" % (output_prefix2)) + assert(returncode == EXIT_SUCCESS) + with open(output_prefix2+".dat", encoding='utf-8') as f: + assert(f.read() == data) + with open(output_prefix1+"_upmodel.dat", encoding='utf-8') as f1: + with open(output_prefix2+"_upmodel.dat", encoding='utf-8') as f2: + assert(f1.read() == f2.read()) + + finally: + pass + del_directory('TC_FILES') + del_file(data_file_name) + del_file(cfg_file_name) + del_file(model_file_name) + del_file(output_prefix1+'.cmp') + del_file(output_prefix1+'.info') + del_file(output_prefix1+'_upmodel.dat') + del_file(output_prefix2+'.dat') + del_file(output_prefix2+'_upmodel.dat') + + + +# TODO: random test