From 3cf6e4801fbc3d147fde040be99ad4eb4a742d26 Mon Sep 17 00:00:00 2001 From: Marko Mecina <marko.mecina@univie.ac.at> Date: Thu, 13 Oct 2022 17:50:08 +0200 Subject: [PATCH] add dedicated SREC upload function that is somewhat project agnostic --- Ccs/ccs_function_lib.py | 148 +++++++++++++++++++++++++++++++++++----- 1 file changed, 130 insertions(+), 18 deletions(-) diff --git a/Ccs/ccs_function_lib.py b/Ccs/ccs_function_lib.py index e8d356b..a5fde8a 100644 --- a/Ccs/ccs_function_lib.py +++ b/Ccs/ccs_function_lib.py @@ -65,7 +65,9 @@ PUS_VERSION, TMHeader, TCHeader, PHeader, TM_HEADER_LEN, TC_HEADER_LEN, P_HEADER pc.CUC_OFFSET, pc.CUC_EPOCH, pc.puscrc, pc.PLM_PKT_PREFIX_TC_SEND, pc.PLM_PKT_SUFFIX, pc.FMT_TYPE_PARAM] SREC_MAX_BYTES_PER_LINE = 250 - +SEG_HEADER_LEN = 12 +SEG_SPARE_LEN = 2 +SEG_CRC_LEN = 2 pid_offset = int(cfg.get('ccs-misc', 'pid_offset')) @@ -1834,11 +1836,19 @@ def Tcbuild(cmd, *args, sdid=0, ack=None, no_check=False, hack_value=None, sourc return Tcpack(st=st, sst=sst, apid=int(apid), data=pdata, sdid=sdid, ack=ack, **kwargs), (st, sst, apid) -def _get_tc_params(cmd): - que = 'SELECT ccf_type,ccf_stype,ccf_apid,ccf_npars,cdf.cdf_grpsize,cdf.cdf_eltype,cdf.cdf_ellen,' \ - 'cdf.cdf_value,cpc.cpc_ptc,cpc.cpc_pfc,cpc.cpc_descr,cpc.cpc_pname FROM ccf LEFT JOIN cdf ON ' \ - 'cdf.cdf_cname=ccf.ccf_cname LEFT JOIN cpc ON cpc.cpc_pname=cdf.cdf_pname ' \ - 'WHERE BINARY ccf_descr="%s"' % cmd +def _get_tc_params(cmd, paf_cal=False): + + if paf_cal: + que = 'SELECT ccf_type,ccf_stype,ccf_apid,ccf_npars,cdf.cdf_grpsize,cdf.cdf_eltype,cdf.cdf_ellen,' \ + 'cdf.cdf_value,cpc.cpc_ptc,cpc.cpc_pfc,cpc.cpc_descr,cpc.cpc_pname,cpc.cpc_pafref FROM ccf LEFT JOIN cdf ON ' \ + 'cdf.cdf_cname=ccf.ccf_cname LEFT JOIN cpc ON cpc.cpc_pname=cdf.cdf_pname ' \ + 'WHERE BINARY ccf_descr="%s"' % cmd + else: + que = 'SELECT ccf_type,ccf_stype,ccf_apid,ccf_npars,cdf.cdf_grpsize,cdf.cdf_eltype,cdf.cdf_ellen,' \ + 'cdf.cdf_value,cpc.cpc_ptc,cpc.cpc_pfc,cpc.cpc_descr,cpc.cpc_pname FROM ccf LEFT JOIN cdf ON ' \ + 'cdf.cdf_cname=ccf.ccf_cname LEFT JOIN cpc ON cpc.cpc_pname=cdf.cdf_pname ' \ + 'WHERE BINARY ccf_descr="%s"' % cmd + dbcon = scoped_session_idb params = dbcon.execute(que).fetchall() dbcon.close() @@ -2623,7 +2633,7 @@ def tc_load_to_memory(data, memid, mempos, slicesize=1000, sleep=0., ack=None, p def get_tc_descr_from_stsst(st, sst): - res = scoped_session_idb.execute('SELECT ccf_descr FROM {}.ccf where ccf_type={} and ccf_stype={}'.format(cfg.get('ccs-database', 'idb_schema'), st, sst)).fetchall() + res = scoped_session_idb.execute('SELECT ccf_descr FROM ccf where ccf_type={} and ccf_stype={}'.format(st, sst)).fetchall() return [x[0] for x in res] @@ -2649,15 +2659,19 @@ def bin_to_hex(fname, outfile): # @param linesperpack Number of lines in srec file to concatenate in one PUS packet # @param pcount Initial sequence counter for packets # @param sleep Timeout after each packet if packets are sent directly to socket -def srectohex(fname, outname=None, memid=0x0002, memaddr=0x40180000, segid=0x200B0101, tcsend=False, - linesperpack=61, pcount=0, sleep=0.2, source_only=False, add_memaddr_to_source=False): +def srectohex(fname, memid, memaddr, segid, tcsend=False, outname=None, linesperpack=61, pcount=0, sleep=0., + source_only=False, add_memaddr_to_source=False): source_list = [] if outname is None: outname = fname.replace('.srec', '') + + # get service 6,2 info from MIB + apid, memid_ref, fmt, endspares = _get_upload_service_info() + if not isinstance(memid, int): dbcon = scoped_session_idb - dbres = dbcon.execute('SELECT pas_alval from pas where pas_numbr="DPKT9007" and pas_altxt="%s"' % memid) + dbres = dbcon.execute('SELECT pas_alval from pas where pas_numbr="{}" and pas_altxt="{}"'.format(memid_ref, memid)) try: memid, = dbres.fetchall()[0] except IndexError: @@ -2665,6 +2679,7 @@ def srectohex(fname, outname=None, memid=0x0002, memaddr=0x40180000, segid=0x200 finally: dbcon.close() memid = int(memid) + f = open(fname, 'r').readlines()[1:] lines = [p[12:-3] for p in f] startaddr = int(f[0][4:12], 16) @@ -2749,7 +2764,68 @@ def srectosrecmod(input_srec, output_srec, imageaddr=0x40180000, linesperpack=61 # write source data to new srec source_to_srec('srec_binary_source.TC', output_srec, memaddr=imageaddr) -####################################################### + +def upload_srec(fname, memid, memaddr, segid, pool_name='LIVE', tcname=None, linesperpack=61, sleep=0.1): + + # get service 6,2 info from MIB + apid, memid_ref, fmt, endspares = _get_upload_service_info(tcname) + + if not isinstance(memid, int): + dbcon = scoped_session_idb + dbres = dbcon.execute('SELECT pas_alval from pas where pas_numbr="{}" and pas_altxt="{}"'.format(memid_ref, memid)) + try: + memid, = dbres.fetchall()[0] + except IndexError: + raise ValueError('MemID "{}" does not exist. Aborting.'.format(memid)) + finally: + dbcon.close() + memid = int(memid) + + f = open(fname, 'r').readlines()[1:] + lines = [p[12:-3] for p in f] + startaddr = int(f[0][4:12], 16) + + linecount = 0 + while linecount < len(f) - 1: + linepacklist = [] + for n in range(linesperpack): + if linecount >= (len(lines) - 1): + break + linepacklist.append(lines[linecount]) + linelength = len(lines[linecount]) // 2 + if int(f[linecount + 1][4:12], 16) != (int(f[linecount][4:12], 16) + linelength): + linecount += 1 + newstartaddr = int(f[linecount][4:12], 16) + break + else: + linecount += 1 + newstartaddr = int(f[linecount][4:12], 16) + + linepack = bytes.fromhex(''.join(linepacklist)) + dlen = len(linepack) + # segment header, see IWF DBS HW SW ICD + data = struct.pack('>III', segid, startaddr, dlen // 4) + linepack + b'\x00\x00' + data = data + crc(data).to_bytes(SEG_CRC_LEN, 'big') + + # create PUS packet + packetdata = struct.pack(fmt, memid, memaddr, len(data)) + data + endspares + puspckt = Tcpack(data=packetdata, st=6, sst=2, apid=apid, sc=counters[apid], ack=0b1001) + + if len(puspckt) > MAX_PKT_LEN: + logger.warning('Packet length ({}) exceeding MAX_PKT_LEN of {} bytes!'.format(len(puspckt), MAX_PKT_LEN)) + + Tcsend_bytes(puspckt, pool_name=pool_name) + time.sleep(sleep) + + startaddr = newstartaddr + memaddr += len(data) + counters[apid] += 1 + + # send all-zero termination segment of length 12 + packetdata = struct.pack(fmt, memid, memaddr, 12) + bytes(12) + endspares + puspckt = Tcpack(data=packetdata, st=6, sst=2, apid=apid, sc=counters[apid], ack=0b1001) + Tcsend_bytes(puspckt, pool_name=pool_name) + counters[apid] += 1 def segment_data(data, segid, addr, seglen=480): @@ -2758,10 +2834,7 @@ def segment_data(data, segid, addr, seglen=480): Segment data has to be two-word aligned. Return list of segments. """ - SEG_HEADER_LEN = 12 - SEG_SPARE_LEN = 2 - SEG_CRC_LEN = 2 - + if isinstance(data, str): data = open(data, 'rb').read() @@ -2770,7 +2843,7 @@ def segment_data(data, segid, addr, seglen=480): datalen = len(data) if datalen % 4: - raise ValueError('Data length is not two-word aligned') + raise ValueError('Data length is not two-word aligned') data = io.BytesIO(data) segments = [] @@ -2783,8 +2856,8 @@ def segment_data(data, segid, addr, seglen=480): if chunklen % 4: raise ValueError('Segment data length is not two-word aligned') - sdata = struct.pack('>III', segid, segaddr, chunklen // 4) + chunk - sdata += bytes(SEG_SPARE_LEN) + crc(sdata).to_bytes(SEG_CRC_LEN, 'big') + sdata = struct.pack('>III', segid, segaddr, chunklen // 4) + chunk + bytes(SEG_SPARE_LEN) + sdata += crc(sdata).to_bytes(SEG_CRC_LEN, 'big') segments.append(sdata) segaddr += chunklen @@ -2847,6 +2920,45 @@ def source_to_srec(data, outfile, memaddr, header=None, bytes_per_line=32): logger.info('Data written to file: "{}"'.format(outfile)) +def _get_upload_service_info(tcname=None): + """ + Get info about service 6,2 from MIB + @param tcname: + @return: + """ + if tcname is None: + cmd = get_tc_descr_from_stsst(6, 2)[0] + else: + cmd = tcname + + params = _get_tc_params(cmd, paf_cal=True) + + apid = params[0][2] + + # try to find paf_ref for MEMID + try: + memid_ref = [p[-1] for p in params if p[-1] is not None][0] + except KeyError: + memid_ref = None + + # get format info for fixed block + fmt = '>' + idx = 0 + for par in params: + fmt += ptt(*par[8:10]) + if par[4] != 0: + idx = params.index(par) + break + + # check for spares after variable part + endspares = b'' + for par in params[idx:]: + if par[5] == 'A': + endspares += bytes(par[6] // 8) + + return apid, memid_ref, fmt, endspares + + def get_tc_list(ccf_descr=None): if ccf_descr is None: -- GitLab