From 0e47ad15b63b26fafe7ca9680a9b81a5e6a5ca97 Mon Sep 17 00:00:00 2001 From: Marko Mecina <marko.mecina@univie.ac.at> Date: Wed, 7 Sep 2022 12:57:26 +0200 Subject: [PATCH] implement new SCOS parameter type look-up function --- Ccs/ccs_function_lib.py | 221 ++++++++++++++---------------------- Ccs/packet_config_CHEOPS.py | 2 +- Ccs/packet_config_SMILE.py | 2 +- Ccs/s2k_partypes.py | 96 ++++++++++------ Tst/tst/data_pool_tab.py | 2 +- Tst/tst/tc_management.py | 6 +- Tst/tst/tm_management.py | 5 +- 7 files changed, 155 insertions(+), 179 deletions(-) diff --git a/Ccs/ccs_function_lib.py b/Ccs/ccs_function_lib.py index 2b02fc9..e4e2525 100644 --- a/Ccs/ccs_function_lib.py +++ b/Ccs/ccs_function_lib.py @@ -1,8 +1,8 @@ import gi gi.require_version('Gtk', '3.0') -gi.require_version('Notify', '0.7') +# gi.require_version('Notify', '0.7') -from gi.repository import Gtk, GLib, Notify, GdkPixbuf +from gi.repository import Gtk, GLib, GdkPixbuf #, Notify import subprocess import struct import datetime @@ -23,7 +23,7 @@ from database.tm_db import scoped_session_maker, DbTelemetry, DbTelemetryPool, R from sqlalchemy.exc import OperationalError as SQLOperationalError from sqlalchemy.sql.expression import func -from s2k_partypes import ptt, ptype_parameters, ptype_values +from s2k_partypes import ptt, ptt_reverse, ptype_parameters, ptype_values import confignator import importlib @@ -75,7 +75,7 @@ pid_offset = int(cfg.get('ccs-misc', 'pid_offset')) fmtlist = {'INT8': 'b', 'UINT8': 'B', 'INT16': 'h', 'UINT16': 'H', 'INT32': 'i', 'UINT32': 'I', 'INT64': 'q', 'UINT64': 'Q', 'FLOAT': 'f', 'DOUBLE': 'd', 'INT24': 'i24', 'UINT24': 'I24', 'bit*': 'bit'} -personal_fmtlist = ['uint', 'int', 'ascii', 'oct'] +personal_fmtlist = ['uint', 'ascii', 'oct'] fmtlengthlist = {'b': 1, 'B': 1, 'h': 2, 'H': 2, 'i': 4, 'I': 4, 'q': 8, 'Q': 8, 'f': 4, 'd': 8, 'i24': 3, 'I24': 3} @@ -107,7 +107,7 @@ if cfg.has_section('ccs-user_defined_packets'): else: user_tm_decoders = {} -Notify.init('cfl') +# Notify.init('cfl') def _add_log_socket_handler(): @@ -542,25 +542,24 @@ def set_monitor(pool_name=None, param_set=None): return -def ptt_reverse(typ): - - """ - Returns the ptc location (first layer) of a Type stored in s2k_partypes 'ptt' - :param typ: Has to be a type given in s2k_partypes 'ptt' - :return: ptc location - """ - # TODO: adapt to new ptt - if typ.startswith('oct'): - return [7, typ[3:]] - elif typ.startswith('ascii'): - return [8, typ[5:]] - - for i in ptt: # First Section - for j in ptt[i]: # Second Section - if ptt[i][j] == typ: # Check for type - return [i, j] - - return False +# def ptt_reverse(typ): +# +# """ +# Returns the ptc location (first layer) of a Type stored in s2k_partypes 'ptt' +# :param typ: Has to be a type given in s2k_partypes 'ptt' +# :return: ptc location +# """ +# if typ.startswith('oct'): +# return [7, typ[3:]] +# elif typ.startswith('ascii'): +# return [8, typ[5:]] +# +# for i in ptt: # First Section +# for j in ptt[i]: # Second Section +# if ptt[i][j] == typ: # Check for type +# return [i, j] +# +# return False def user_tm_decoders_func(): @@ -755,6 +754,7 @@ def decode_pus(tm_data, parameters, decode_tc=False): else: return [(read_stream(tms, fmt, pos=par[-1]/8, offbi=0 if par[5] % 8 == 0 else 8 - par[5] % 8), par) for fmt, par in zip(fmts, parameters)] + ## # Read_stream # @@ -785,9 +785,9 @@ def read_stream(stream, fmt, pos=None, offbi=0): bitsize = len(data) * 8 x = (int.from_bytes(data, 'big') & (2 ** (bitsize - offbi) - 1)) >> (bitsize - offbi - bitlen) elif fmt.startswith('oct'): - x = struct.unpack('>' + fmt[3:], data)[0] + x = struct.unpack('>{}s'.format(fmt[3:]), data)[0] elif fmt.startswith('ascii'): - x = struct.unpack('>' + fmt[5:], data)[0] + x = struct.unpack('>{}s'.format(fmt[5:]), data)[0] x = x.decode() elif fmt == timepack[0]: x = timecal(data) @@ -796,6 +796,7 @@ def read_stream(stream, fmt, pos=None, offbi=0): return x + ## # csize # @@ -810,41 +811,33 @@ def csize(fmt, offbi=0): elif fmt == timepack[0]: return timepack[1] - timepack[3] elif fmt.startswith('oct'): - return int(fmt[3:-1]) + offbi + return int(fmt[3:]) elif fmt.startswith('ascii'): - return int(fmt[5:-1]) + offbi + return int(fmt[5:]) else: return struct.calcsize(fmt) + ## # parameter_ptt_type # # Returns the format of the input bytes for TM (list has to be formated the correct way) # @param parameters Input List of one parameter - def parameter_ptt_type_tm(par): - if not par[4] in [7,8]: - return ptt[par[4]][par[5]] - elif par[4] == 7: - return 'oct' + str(par[5]) + 's' - elif par[4] == 8: - return 'ascii' + str(par[5]) + 's' + return ptt(par[4], par[5]) + ## # parameter_ptt_type # # Returns the format of the input bytes for TC (list has to be formated the correct way) # @param parameters Input List of one parameter - def parameter_ptt_type_tc_read(par): if par[2] is None: - return ptt['SPARE_visible'][par[5]] - elif par[2] == 7: - return 'oct' + str(par[3]) + 's' - elif par[2] == 8: - return 'ascii' + str(par[3]) + 's' + return ptt('SPARE_visible', par[5]) else: - return ptt[par[2]][par[3]] + return ptt(par[2], par[3]) + ## # Nonetoempty @@ -963,7 +956,7 @@ def Tcdata(tm, *args): finfo = dbcon.execute(que).fetchall() if finfo: cname, offbit, cdfval, ptc, pfc, paf = finfo[0] - fvalue = read_stream(io.BytesIO(data), ptt[ptc][pfc], pos=offbit // 8) + fvalue = read_stream(io.BytesIO(data), ptt(ptc, pfc), pos=offbit // 8) for paf in [info[-1] for info in finfo]: fname = tc_param_alias_reverse(paf, None, fvalue) @@ -1093,16 +1086,17 @@ def Tmread(pckt): finally: return head_pars, data, crc + ## # Generate (space separated) hexstring from byte/bitstring # @param inbytes bytestring or bitstring object to be converted # @param separator string by which the hex doublettes are joined, default=' ' - def prettyhex(inbytes, separator=' '): if not isinstance(inbytes, bytes): inbytes = inbytes.bytes return separator.join(['%02X' % x for x in inbytes]) + ## # Varpack # @@ -1114,11 +1108,12 @@ def prettyhex(inbytes, separator=' '): # @param parlist list of decoded source data parameter properties def read_varpack(data, parameters, paramid, outlist, parlist): while paramid < len(parameters): - fmt = ptt[parameters[paramid][2]][parameters[paramid][3]] - if parameters[paramid][2] == 11: - fmt = fmt[ptype] - if ptype == 7: # ptt fmt string for bool not parseable with .read - fmt = 'uint:8' + fmt = ptt(parameters[paramid][2], parameters[paramid][3]) + if parameters[paramid][2] == 11: # TODO: handle deduced parameter types + raise NotImplementedError('Deduced parameter type PTC=11') + # fmt = fmt[ptype] + # if ptype == 7: # ptt fmt string for bool not parsable with .read + # fmt = 'uint8' outdata = data.read(fmt) grpsize = parameters[paramid][-2] if parameters[paramid][6] == FMT_TYPE_PARAM: @@ -1182,14 +1177,15 @@ def read_stream_recursive(tms, parameters, decoded=None): if grp is None: # None happens for UDFP, would give error using None grp = 0 - fmt = ptt[par[2]][par[3]] + fmt = ptt(par[2], par[3]) if fmt == 'deduced': - if 'ptype' in locals(): - fmt = ptype_values[ptype] - else: - # print('No format deduced for parameter, aborting.') - logger.warning('No format deduced for parameter, aborting.') - return decoded + raise NotImplementedError('Deduced parameter type PTC=11') + # if 'ptype' in locals(): + # fmt = ptype_values[ptype] + # else: + # # print('No format deduced for parameter, aborting.') + # logger.warning('No format deduced for parameter, aborting.') + # return decoded value = read_stream(tms, fmt) if par[0] in ptype_parameters: @@ -1277,8 +1273,8 @@ def get_calibrated(pcf_name, rawval, properties=None, numerical=False, dbcon=Non ptc, pfc, categ, curtx = properties try: - type_par = ptt[ptc][pfc] - except: + type_par = ptt(ptc, pfc) + except NotImplementedError: type_par = None if type_par == timepack[0]: @@ -1508,7 +1504,7 @@ def get_param_values(tmlist=None, hk=None, param=None, last=0, numerical=False): else: tmlist_filt = Tm_filter_st(tmlist, st=st, sst=sst, apid=apid)[-last:] #ufmt = ptt['hk'][ptc][pfc] - ufmt = ptt[ptc][pfc] + ufmt = ptt(ptc, pfc) elif hk != 'User defined' and not hk.startswith('UDEF|'): if not isinstance(param, int): pass # param=self.get_pid(param) @@ -1522,7 +1518,7 @@ def get_param_values(tmlist=None, hk=None, param=None, last=0, numerical=False): sid = None tmlist_filt = Tm_filter_st(tmlist, st=st, sst=sst, apid=apid, sid=sid)[-last:] #ufmt = ptt['hk'][ptc][pfc] - ufmt = ptt[ptc][pfc] + ufmt = ptt(ptc, pfc) elif hk.startswith('UDEF|'): label = hk.strip('UDEF|') @@ -1541,7 +1537,7 @@ def get_param_values(tmlist=None, hk=None, param=None, last=0, numerical=False): sst = 25 tmlist_filt = Hk_filter(tmlist, st, sst, apid, sid)[-last:] #ufmt = ptt['hk'][ptc][pfc] - ufmt = ptt[ptc][pfc] + ufmt = ptt(ptc, pfc) else: userpar = json.loads(cfg['ccs-plot_parameters'][param]) if ('SID' not in userpar.keys()) or (userpar['SID'] is None): @@ -1594,33 +1590,6 @@ def Hk_filter(tmlist, st, sst, apid=None, sid=None): len(tm) > TM_HEADER_LEN and (tm[7], tm[8], struct.unpack('>H', tm[:2])[0] & 0b0000011111111111, tm[TM_HEADER_LEN]) == (st, sst, apid, sid))] -''' -def calcfmtsize(fmt): - try: - return struct.calcsize(fmt) - except struct.error: - if fmt in ('i24', 'I24'): - return 3 - elif fmt.startswith('bit'): - return int(fmt[3:]) // 8 + 1 - else: - return 0 - -def unpack_bytes(data, fmt, offbi=0): - if fmt == 'I24': - x = struct.unpack('>I', b'\x00' + data)[0] - elif fmt == 'i24': - x = struct.unpack('>i', data + b'\x00')[0] >> 8 - # for bit-sized unsigned parameters: - elif fmt.startswith('bit'): - bitlen = int(fmt[3:]) - bitsize = (bitlen // 8 + 1) * 8 - x = (int.from_bytes(data, 'big') & (2 ** (bitsize - offbi) - 1)) >> (bitsize - offbi - bitlen) - else: - x = struct.unpack('>' + fmt, data)[0] - return x -''' - def show_extracted_packet(): """ @@ -1796,7 +1765,7 @@ def Tcbuild(cmd, *args, sdid=0, ack=None, no_check=False, hack_value=None, **kwa raise NameError('Unknown command "{}"'.format(cmd)) if ack is None: - ack = bin(Tcack(cmd)) # TODO: get ack in query above + ack = bin(Tcack(cmd)) if npars == 0: pdata = b'' @@ -1913,16 +1882,16 @@ def pack_bytes(fmt, value, bitbuffer=0, offbit=0): elif fmt.startswith('oct'): if not isinstance(value, (bytes, bytearray)): raise TypeError('Value packed with fmt "{}" is not an octet string: {} {}!'.format(fmt, value, type(value))) - if len(value) != int(fmt[3:-1]): + if len(value) != int(fmt[3:]): logger.warning('Length of octet string ({}) does not match format {}!'.format(len(value), fmt)) - x = struct.pack('>' + fmt[3:], value) + x = struct.pack('>{}s'.format(fmt[3:]), value) elif fmt.startswith('ascii'): if not isinstance(value, str): raise TypeError('Value packed with fmt "{}" is not a string: {} {}!'.format(fmt, value, type(value))) - if len(value) != int(fmt[5:-1]): + if len(value) != int(fmt[5:]): logger.warning('Length of string ({}) does not match format {}!'.format(len(value), fmt)) - x = struct.pack('>' + fmt[5:], value.encode(encoding='ascii')) + x = struct.pack('>{}s'.format(fmt[5:]), value.encode(encoding='ascii')) elif fmt == timepack[0]: x = calc_timestamp(value, sync=None, return_bytes=True) @@ -1962,12 +1931,8 @@ def date_to_cuc_bytes(date, sync=None): # Returns the format of the input bytes for TC (list has to be formated the correct way) # @param parameters Input List of one parameter def parameter_ptt_type_tc(par): - if not par[-4] in [7, 8]: - return ptt[par[-4]][par[-3]] - elif par[-4] == 7: - return 'oct' + str(par[-3]) + 's' - elif par[-4] == 8: - return 'ascii' + str(par[-3]) + 's' + return ptt(par[-4], par[-3]) + ## # Acknowledgement @@ -2110,17 +2075,17 @@ def tc_param_in_range(prf, val, pdesc): limits = ' | '.join(['{:}-{:}'.format(*rng) for rng in ranges]) # print('Parameter %s out of range: %s [valid: %s]' % (pdesc, val, limits)) logger.warning('Parameter %s out of range: %s [valid: %s]' % (pdesc, val, limits)) - Notify.Notification.new('Parameter %s out of range: %s [valid: %s]' % (pdesc, val, limits)).show() + # Notify.Notification.new('Parameter %s out of range: %s [valid: %s]' % (pdesc, val, limits)).show() return False, 'Parameter %s out of range: %s [valid: %s]' % (pdesc, val, limits) elif prfs[0][0] == 'A': if val not in [i[2] for i in prfs]: valid = ' | '.join([i[2] for i in prfs]) # print('Invalid parameter value for %s: %s [valid: %s]' % (pdesc, val, valid)) logger.warning('Invalid parameter value for %s: %s [valid: %s]' % (pdesc, val, valid)) - Notify.Notification.new('Invalid parameter value for %s: %s [valid: %s]' % (pdesc, val, valid)).show() + # Notify.Notification.new('Invalid parameter value for %s: %s [valid: %s]' % (pdesc, val, valid)).show() return False, 'Invalid parameter value for %s: %s [valid: %s]' % (pdesc, val, valid) else: - pass # TODO: no check for time ranges yet + logger.warning('Range check for parameter type "{}" not implemented [{}]'.format(prfs[0][0], pdesc)) # TODO: no check for time ranges yet return True, '' @@ -2270,23 +2235,7 @@ def PUSpack(version=0, typ=0, dhead=0, apid=0, gflags=0b11, sc=0, pktl=0, return bytes(header.bin) + data -''' -# Available in project specified packet config -def get_cuc_elements(time, sync=0): - if isinstance(time, (float, int)): - ctime = int(time) - ftime = int(time % 1 * 2 ** 15) - elif isinstance(time, str): - t = float(time[:-1]) - ctime = int(t) - ftime = int(t % 1 * 2 ** 15) - sync = 1 if time[-1].upper() == 'S' else 0 - elif isinstance(time, bytes): - ctime = int.from_bytes(time[:4], 'big') - ftime = int.from_bytes(time[-2:], 'big') >> 1 - sync = time[-1] & 1 - return ctime, ftime, sync -''' + ## # Build Packstring 11 # @@ -2312,7 +2261,7 @@ def build_packstr_11(st, sst, apid, params, varpos, grpsize, repfac, *args, no_c varlist.append(parameter_ptt_type_tc(par)) else: varlist.append( - ptt[par[-4]][par[-3]][tc_param_alias(FMT_TYPE_PARAM, ptype[ptc], no_check=no_check)]) + ptt(par[-4], par[-3])[tc_param_alias(FMT_TYPE_PARAM, ptype[ptc], no_check=no_check)]) #varlist.append(ptt[par[-4]][par[-3]][tc_param_alias('DPP70044', ptype[ptc], no_check=no_check)]) ptc += 1 return varlist, args2 @@ -3007,6 +2956,7 @@ def change_communication_func(main_instance=None,new_main=None,new_main_nbr=None return + def change_main_communication(new_main, new_main_nbr, main_instance, own_bus_name=None): """ Changes the main_communication for the entire main_instance (project) @@ -3042,6 +2992,7 @@ def add_decode_parameter(label=None, format=None, bytepos=None, parentwin=None): :return: """ + pos = None fmt = None if format and label: @@ -3049,23 +3000,24 @@ def add_decode_parameter(label=None, format=None, bytepos=None, parentwin=None): # print('Please choose a different name for the parameter, can not exist as plot and decode parameter') # return if isinstance(format, str): - if not ptt_reverse(format): - if not format in fmtlist.keys(): + try: + dummy = ptt_reverse(format) + fmt = format + except NotImplementedError: + if format not in fmtlist.keys(): logger.error('Please give a correct Format') return else: fmt = fmtlist[format] - else: - fmt = format - elif isinstance(format, list): + elif isinstance(format, (list, tuple)): if len(format) == 2: try: - fmt = ptt[format[0]][format[1]] - except: + fmt = ptt(format[0], format[1]) + except NotImplementedError: logger.error('Give valid location of format') return else: - logger.error('Please give a correct Format Length') + logger.error('Please give a correct PTC/PFC format tuple') return if bytepos: @@ -3093,7 +3045,6 @@ def add_decode_parameter(label=None, format=None, bytepos=None, parentwin=None): if fmt.upper() in fmtlist: fmt = fmtlist[fmt.upper()] dialog.destroy() - #return fmt else: dialog.destroy() return @@ -3103,10 +3054,11 @@ def add_decode_parameter(label=None, format=None, bytepos=None, parentwin=None): return # If a position was found the parameter will be stored in user_decoder layer in cfg + leng = None if pos: if fmt in fmtlengthlist: leng = fmtlengthlist[fmt] - elif fmt.startswith(('bit', 'int', 'oct')): + elif fmt.startswith(('bit', 'oct')): len_test = int(fmt[3:]) if len_test % 8 == 0: leng = len_test @@ -3163,7 +3115,6 @@ def add_tm_decoder(label=None, st=None, sst=None, apid=None, sid=None, parameter response = dialog.run() if response == Gtk.ResponseType.OK: slots = dialog.get_content_area().get_children()[0].get_children()[1].get_children() - parameters = [] parameters_name = [] model = slots[0].get_children()[1].get_child().get_model() parameters_name.append([par[1] for par in model]) @@ -3215,7 +3166,7 @@ def add_tm_decoder(label=None, st=None, sst=None, apid=None, sid=None, parameter ptt_value[0], ptt_value[1], None, None, None] params.append(tuple(params_value)) - i +=1 + i += 1 else: #Parameters will be decoded in the given order while i < len(parameters_descr): # Check for each parameter if it is User-defined or IDB @@ -3680,7 +3631,11 @@ def add_tst_import_paths(): # insert this to import the tst view.py, not the one in .local folder sys.path.insert(0, cfg.get('paths', 'tst') + '/tst') - return + +def interleave_lists(*args): + if len({len(x) for x in args}) > 1: + logger.warning('Iterables are not of the same length, result will be truncated to the shortest input!') + return [i for j in zip(*args) for i in j] class TestReport: diff --git a/Ccs/packet_config_CHEOPS.py b/Ccs/packet_config_CHEOPS.py index 357c05f..dcf4d27 100644 --- a/Ccs/packet_config_CHEOPS.py +++ b/Ccs/packet_config_CHEOPS.py @@ -62,7 +62,7 @@ TC_SECONDARY_HEADER = [ ("SOURCE_ID", ctypes.c_uint8, 8) ] # [Format of time Packet, Amount of Bytes in Time Packet, Factor for Finetime, length of extra sync flag -timepack = [ptt[9][17], 6, 2 ** 15, 0] +timepack = [ptt(9, 17), 6, 2**15, 0] CUC_EPOCH = datetime.datetime(2000, 1, 1, 0, 0, 0, 0, tzinfo=datetime.timezone.utc) diff --git a/Ccs/packet_config_SMILE.py b/Ccs/packet_config_SMILE.py index f43de1e..1db09e3 100644 --- a/Ccs/packet_config_SMILE.py +++ b/Ccs/packet_config_SMILE.py @@ -67,7 +67,7 @@ TC_SECONDARY_HEADER = [ ] # [format of time stamp, amount of bytes of time stamp including sync byte(s), fine time resolution, length of extra sync flag in bytes] -timepack = [ptt[9][18], 8, 1e6, 1] +timepack = [ptt(9, 18), 8, 1e6, 1] CUC_EPOCH = datetime.datetime(2018, 1, 1, 0, 0, 0, 0, tzinfo=datetime.timezone.utc) diff --git a/Ccs/s2k_partypes.py b/Ccs/s2k_partypes.py index 93ae905..dacb9c6 100644 --- a/Ccs/s2k_partypes.py +++ b/Ccs/s2k_partypes.py @@ -1,29 +1,5 @@ # SCOS 2000 PTC/PFC parameter format translation table -ptt = { - 1: {0: 'uint1'}, - 2: {1: 'uint1', 2: 'uint2', 3: 'uint3', 4: 'uint4', 5: 'uint5', 6: 'uint6', 7: 'uint7', 8: 'B', 9: 'uint9', 10: 'uint10', - 11: 'uint11', 12: 'uint12', 13: 'uint13', 14: 'uint14', 15: 'uint15', 16: 'H', 17: 'uint17', 18: 'uint18', - 19: 'uint19', 20: 'uint20', 21: 'uint21', 22: 'uint22', 23: 'uint23', 24: 'I24', 25: 'uint25', 26: 'uint26', - 27: 'uint27', 28: 'uint28', 29: 'uint29', 30: 'uint30', 31: 'uint31', 32: 'I'}, - 3: {0: 'uint4', 1: 'uint5', 2: 'uint6', 3: 'uint7', 4: 'B', 5: 'uint9', 6: 'uint10', 7: 'uint11', 8: 'uint12', - 9: 'uint13', 10: 'uint14', 11: 'uint15', 12: 'H', 13: 'I24', 14: 'I'}, - 4: {0: 'int4', 1: 'int5', 2: 'int6', 3: 'int7', 4: 'b', 5: 'int9', 6: 'int10', 7: 'int11', 8: 'int12', - 9: 'int13', 10: 'int14', 11: 'int15', 12: 'h', 13: 'i24', 14: 'i'}, - 5: {1: 'f', 2: 'd'}, - 6: {1: 'uint1', 2: 'uint2', 3: 'uint3', 4: 'uint4', 5: 'uint5', 6: 'uint6', 7: 'uint7', 8: 'B', 9: 'uint9', 10: 'uint10', - 11: 'uint11', 12: 'uint12', 13: 'uint13', 14: 'uint14', 15: 'uint15', 16: 'H', 17: 'uint17', 18: 'uint18', - 19: 'uint19', 20: 'uint20', 21: 'uint21', 22: 'uint22', 23: 'uint23', 24: 'I24', 25: 'uint25', 26: 'uint26', - 27: 'uint27', 28: 'uint28', 29: 'uint29', 30: 'uint30', 31: 'uint31', 32: 'I'}, - 7: {0: 'vOCT', 1: '1s', 12: '12s', 382: '382s'}, - 8: {0: 'vASCII', 382: '382s'}, - 9: {17: 'CUC917', 18: 'CUC918'}, - 11: {0: 'deduced'}, - 'SPARE': {8: '1x', 16: '2x', 24: '3x', 32: '4x'}, - 'SPARE_visible': {8: 'B', 16: 'H', 24: 'I24', 32: 'I'}, - 'PAD': {8: '1x', 16: '2x', 24: '3x', 32: '4x'} -} - ptype_parameters = () ptype_values = {} @@ -37,8 +13,8 @@ DEFAULT_FORMATS = { 9: 'int13', 10: 'int14', 11: 'int15', 12: 'h', 13: 'i24', 14: 'i'}, 5: {1: 'f', 2: 'd'}, 6: {8: 'B', 16: 'H', 24: 'I24', 32: 'I'}, - 7: {0: 'vOCT'}, - 8: {0: 'vASCII'}, + 7: {}, # 0: 'vOCT'}, + 8: {}, # 0: 'vASCII'}, 9: {17: 'CUC917', 18: 'CUC918'}, 11: {0: 'deduced'}, 'SPARE': {8: '1x', 16: '2x', 24: '3x', 32: '4x'}, @@ -49,19 +25,65 @@ DEFAULT_FORMATS = { class ParameterTypeLookupTable: - def __call__(self, a, b): - if a in DEFAULT_FORMATS: - if b in DEFAULT_FORMATS[a]: - return DEFAULT_FORMATS[a][b] + def __call__(self, ptc, pfc): + if ptc in DEFAULT_FORMATS: + if pfc in DEFAULT_FORMATS[ptc]: + return DEFAULT_FORMATS[ptc][pfc] + elif pfc == 0: + raise NotImplementedError('(PTC, PFC) = ({}, {})'.format(ptc, pfc)) else: - if a in [2, 6]: - return 'uint{}'.format(b) - elif a in [7, 8]: - return '{}s'.format(b) + if ptc in [2, 6]: + if pfc > 32: + raise NotImplementedError('(PTC, PFC) = ({}, {})'.format(ptc, pfc)) + return 'uint{}'.format(pfc) + elif ptc == 7: + return 'oct{}'.format(pfc) + elif ptc == 8: + return 'ascii{}'.format(pfc) else: - raise NotImplementedError('(PTC, PFC) = ({}, {})'.format(a, b)) + raise NotImplementedError('(PTC, PFC) = ({}, {})'.format(ptc, pfc)) else: - raise NotImplementedError('PTC = {}'.format(a)) + raise NotImplementedError('PTC = {}'.format(ptc)) + + +class ParameterTypeLookupTableReverse: + + _special_fmts = {'B': (3, 4), + 'H': (3, 12), + 'I24': (3, 13), + 'I': (3, 14)} + + def __init__(self): + self._reverse_dict = dict() + for ptc in DEFAULT_FORMATS: + if isinstance(ptc, int): + for pfc in DEFAULT_FORMATS[ptc]: + if DEFAULT_FORMATS[ptc][pfc] in self._special_fmts: + self._reverse_dict[DEFAULT_FORMATS[ptc][pfc]] = self._special_fmts[DEFAULT_FORMATS[ptc][pfc]] + else: + self._reverse_dict[DEFAULT_FORMATS[ptc][pfc]] = (ptc, pfc) + + def __call__(self, fmt): + try: + if fmt in self._reverse_dict: + return self._reverse_dict[fmt] + elif fmt.startswith('uint'): + if int(fmt[4:]) > 32: + raise NotImplementedError('Format {} not supported'.format(fmt)) + return tuple((6, int(fmt[4:]))) + elif fmt.startswith('bit'): + if int(fmt[3:]) > 32: + raise NotImplementedError('Format {} not supported'.format(fmt)) + return tuple((6, int(fmt[3:]))) + elif fmt.startswith('oct'): + return tuple((7, int(fmt[3:]))) + elif fmt.startswith('ascii'): + return tuple((8, int(fmt[5:]))) + else: + raise NotImplementedError('Format {} not supported'.format(fmt)) + except ValueError: + raise NotImplementedError('Format {} not supported'.format(fmt)) -# ptt = ParameterTypeLookupTable() +ptt = ParameterTypeLookupTable() +ptt_reverse = ParameterTypeLookupTableReverse() diff --git a/Tst/tst/data_pool_tab.py b/Tst/tst/data_pool_tab.py index 20831c5..bf1f152 100644 --- a/Tst/tst/data_pool_tab.py +++ b/Tst/tst/data_pool_tab.py @@ -48,7 +48,7 @@ def get_data_pool_sublist(): if pcf_ptc is None: data_type = "None" else: - data_type = s2k.ptt[pcf_ptc][pcf_pfc] + data_type = s2k.ptt(pcf_ptc, pcf_pfc) data_pool_sublist.append([pcf_pid, pcf_descr, data_type, '', '', '']) diff --git a/Tst/tst/tc_management.py b/Tst/tst/tc_management.py index d544fac..cfd09a6 100644 --- a/Tst/tst/tc_management.py +++ b/Tst/tst/tc_management.py @@ -91,11 +91,11 @@ def get_cpc_descr(tc_type): if eltype == 'A': cpc_descr.append(['', key[3], '{} bit'.format(key[4]), False, Pango.Style.ITALIC]) elif eltype == 'F': - cpc_descr.append([*key[6:8], s2k.ptt[ptc][pfc], True, Pango.Style.ITALIC]) + cpc_descr.append([*key[6:8], s2k.ptt(ptc, pfc), True, Pango.Style.ITALIC]) elif eltype is None: pass else: - cpc_descr.append([*key[6:8], s2k.ptt[ptc][pfc], True, Pango.Style.NORMAL]) + cpc_descr.append([*key[6:8], s2k.ptt(ptc, pfc), True, Pango.Style.NORMAL]) return cpc_descr @@ -139,7 +139,7 @@ def get_calibrations(tc_type, cpc_descr): if cpc_ptc == "None": data_type = "None" else: - data_type = s2k.ptt[cpc_ptc][cpc_pfc] + data_type = s2k.ptt(cpc_ptc, cpc_pfc) treeview_tuple = tuple([prv_minval, prv_maxval, pas_altxt, pas_alval]) treeview_tuple_list.append(treeview_tuple) diff --git a/Tst/tst/tm_management.py b/Tst/tst/tm_management.py index 26635eb..a937c71 100644 --- a/Tst/tst/tm_management.py +++ b/Tst/tst/tm_management.py @@ -61,9 +61,8 @@ def get_tm_type_sublist(tm_descr): if pcf_ptc is None: data_type = "None" - pass else: - data_type = s2k.ptt[pcf_ptc][pcf_pfc] + data_type = s2k.ptt(pcf_ptc, pcf_pfc) tm_type_sub_list.append([pcf_name, pcf_descr, pcf_curtx, txp_from, txp_altxt, plf_offpy, data_type]) @@ -239,7 +238,7 @@ class TmSecondaryTable(Gtk.Box): self.parameter_liststore.clear() for i, par in enumerate(parlist): par = list(par) - self.parameter_liststore.append([i+1] + par[:3] + [s2k.ptt[par[3]][par[4]]]) + self.parameter_liststore.append([i+1] + par[:3] + [s2k.ptt(par[3], par[4])]) def parameter_selected(self, selection): model, row = selection.get_selected() -- GitLab