Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found
Select Git revision
  • release
  • workshop
2 results

Target

Select target project
  • mecinam2/ccs
1 result
Select Git revision
  • release
  • workshop
2 results
Show changes
Commits on Source (12)
......@@ -20,6 +20,9 @@
/Tst/test_specs/
/Tst/logs_test_runs/
# FITS files
*.fits
### Example user template
!.gitkeep
......
......@@ -4,3 +4,6 @@ gi.require_version('Gtk', '3.0')
# disable autosuggest in IPython >= 8
# c = get_config()
# c.TerminalInteractiveShell.autosuggestions_provider = None
# c.TerminalInteractiveShell.highlighting_style = 'friendly'
# c.TerminalInteractiveShell.colors = 'neutral'
......@@ -3,7 +3,6 @@
import sys
import io
import os
# import gi
import dbus
import dbus.service
import logging
......@@ -11,8 +10,6 @@ import ccs_function_lib as cfl
sys.path.append(cfl.cfg.get('paths', 'ccs'))
# gi.require_version('Gtk', '3.0')
# import confignator
# cfg = confignator.get_config(check_interpolation=False)
......
......@@ -3907,7 +3907,7 @@ def srectosrecmod(input_srec, output_srec, imageaddr=0x40180000, linesperpack=61
source_to_srec('srec_binary_source.TC', output_srec, memaddr=imageaddr)
def srec_to_s6(fname, memid, memaddr, segid, tcname=None, linesperpack=50, max_pkt_size=MAX_PKT_LEN, image_crc=True):
def srec_to_s6(fname, memid, memaddr, segid, tcname=None, linesperpack=50, max_pkt_size=MAX_PKT_LEN, image_crc=True, return_upload=False):
# get service 6,2 info from MIB
apid, memid_ref, fmt, endspares = _get_upload_service_info(tcname)
pkt_overhead = TC_HEADER_LEN + struct.calcsize(fmt) + SEG_HEADER_LEN + SEG_SPARE_LEN + SEG_CRC_LEN + len(
......@@ -3983,9 +3983,15 @@ def srec_to_s6(fname, memid, memaddr, segid, tcname=None, linesperpack=50, max_p
if image_crc:
# return total length of uploaded data (without termination segment) and CRC over entire image, including segment headers
return pckts, len(upload_bytes), crc(upload_bytes)
if return_upload:
return pckts, upload_bytes, crc(upload_bytes)
else:
return pckts, len(upload_bytes), crc(upload_bytes)
return pckts
if return_upload:
return pckts, upload_bytes
else:
return pckts
def upload_srec(fname, memid, memaddr, segid, pool_name='LIVE', tcname=None, linesperpack=50, sleep=0.125,
......@@ -4242,19 +4248,23 @@ def srec_direct(fname, memid, pool_name='LIVE', max_pkt_size=MAX_PKT_LEN, tcname
linecount = 0
nextlinelength = len(lines[linecount]) // 2
while linecount < len(f) - 1:
while linecount < len(f):
t1 = time.time()
linepacklist = []
packlen = 0
while (packlen + nextlinelength) <= payload_len:
if linecount >= (len(lines) - 1):
if linecount >= (len(lines)):
break
linepacklist.append(lines[linecount])
linelength = len(lines[linecount]) // 2
packlen += linelength
if int(f[linecount + 1][4:12], 16) != (int(f[linecount][4:12], 16) + linelength):
if (linecount + 1) >= len(f):
linecount += 1
break
elif int(f[linecount + 1][4:12], 16) != (int(f[linecount][4:12], 16) + linelength):
linecount += 1
newstartaddr = int(f[linecount][4:12], 16)
break
......@@ -4294,6 +4304,7 @@ def srec_direct(fname, memid, pool_name='LIVE', max_pkt_size=MAX_PKT_LEN, tcname
if data == b'':
print('No data left, exit upload.')
print('Upload finished, {} bytes sent in {} packets.'.format(bcnt, pcnt))
return
memaddr = newstartaddr
......@@ -5571,7 +5582,7 @@ def get_packets_from_pool(pool_name, indices=None, st=None, sst=None, apid=None,
:param apid:
:return:
"""
new_session = scoped_session_storage
new_session = scoped_session_storage()
rows = new_session.query(
DbTelemetry
......@@ -5950,6 +5961,25 @@ def get_hk_def_tcs(filename, sid=None, sidoff=TC_HEADER_LEN, sidbs=2):
return pkts
def pktinfo_report(pkt):
"""
Format raw packet data for insertion into test report
:param pkt:
:return:
"""
TEXT = "& \\multicolumn{{3}}{{|p{{18cm}}|}}{{\\vspace{{-2mm}} \\footnotesize \nTM({},{}) {} @ {} \\newline\n\\begin{{tabular}}{{l}} \\end{{tabular}}\\vspace{{1mm}} }}\\\\ \\hline\n"
name = Tmdata(pkt)[-1][0]
head = Tmread(pkt)[0]
t = get_cuctime(pkt)
msg = TEXT.format(head.SERV_TYPE, head.SERV_SUB_TYPE, name, t).replace('_', '\\_')
return msg
class DbTools:
"""
SQL database management tools
......@@ -6509,11 +6539,11 @@ class TestReport:
response = dialog.run()
if response == Gtk.ResponseType.YES:
result = 'VERIFIED'
result = 'OK'
comment = dialog.comment.get_text()
dialog.destroy()
elif response == Gtk.ResponseType.NO:
result = 'FAILED'
result = 'NOT_OK'
comment = dialog.comment.get_text()
dialog.destroy()
else:
......
......@@ -22,6 +22,9 @@ log-dir = ${paths:base}/logs
level = WARNING
max_logs = 30
[ccs-viewer]
drag_report_fmt = False
[ccs-monitor]
interval = 1
max_age = 20
......
......@@ -227,14 +227,13 @@ class Receiver:
SEL_TIMEOUT = 2
RECV_BUF_SIZE = 1024**3
def __init__(self, sockfds, procfunc=None, recv_buf_size=RECV_BUF_SIZE, outfile=None, ofmode='w', pkt_parser_func=None, extend_processed=True):
def __init__(self, sockfds, procfunc=None, recv_buf_size=RECV_BUF_SIZE, outfile=None, ofmode='w', pkt_parser_func=None, extend_processed=True, procdata=None):
self.sockfds = sockfds
self.recvd_data_buf = queue.Queue(recv_buf_size)
self._procfunc = procfunc
self._recv_thread = None
self._proc_thread = None
self.proc_data = []
self.extend_processed = extend_processed
self._pkt_parser_func = pkt_parser_func
......@@ -243,6 +242,11 @@ class Receiver:
else:
self.proc_data_fd = None
if procdata is not None:
self.proc_data = procdata
else:
self.proc_data = []
self._isrunning = False
def start(self):
......
......@@ -27,6 +27,17 @@ protocols = {'PUS': ('', DB_BASE),
'RMAP': ('rmap_', DB_BASE),
'FEEDATA': ('feedata_', DB_BASE)}
# get max packet size from project
try:
sys.path.insert(0, '..')
packet_config = 'packet_config_' + config_db.cfg.get('project', 'name').strip()
pcmodule = __import__(packet_config)
MAX_PKT_LEN = pcmodule.MAX_PKT_LEN
except Exception as err:
print(err)
print('Setting max. packet length to 1024')
MAX_PKT_LEN = 1024 # default max packet length
class DbTelemetryPool(DB_BASE): # type: ignore
"""
......@@ -86,8 +97,8 @@ class DbTelemetry(DB_BASE): # type: ignore
destID = Column(Integer, nullable=False)
timestamp = Column(Unicode(250, collation='utf8_general_ci'), nullable=True,
index=True) # Should this be TIMESTAMP?
data = Column(VARBINARY(1024), nullable=False) # Much faster than BLOB
raw = Column(VARBINARY(1024), nullable=False) # Much faster than BLOB
data = Column(VARBINARY(MAX_PKT_LEN), nullable=False) # Much faster than BLOB
raw = Column(VARBINARY(MAX_PKT_LEN), nullable=False) # Much faster than BLOB
# Helper attribute to access the associated pool.
pool = relationship("DbTelemetryPool")
......@@ -279,14 +290,14 @@ def scoped_session_maker(db_schema, idb_version=None):
return
_engine = create_engine(gen_mysql_conn_str(schema=schema), echo="-v" in sys.argv, pool_size=15)
session_factory = sessionmaker(bind=_engine)
scoped_session_factory = scoped_session(session_factory)
#scoped_session_factory = scoped_session_v2(session_factory)
# scoped_session_factory = scoped_session(session_factory)
scoped_session_factory = scoped_session_v2(session_factory)
return scoped_session_factory
class scoped_session_v2(scoped_session):
"""
Wrapper class to cast SQL query statement string to TextClause before execution, as this is required since SQLAlchemy 2.0.
Wrapper class to cast SQL query statement string to TextClause before execution, as this is required from SQLAlchemy 2.0.
"""
def execute(self, x, *args, **kwargs):
......
......@@ -471,12 +471,17 @@ class FpmPktParser:
Parses telemetry received from FPM
"""
def __init__(self, scibytes, echobytes=None, defaultbytes=1024):
def __init__(self, scibytes, echobytes=None, defaultbytes=1024, scifile=None):
self.scibytes = scibytes
self.echobytes = echobytes
self.defaultbytes = defaultbytes
if scifile is not None:
self.scifile = open(scifile, 'wb')
else:
self.scifile = scifile
self.nbytes_tot = 0
self.lastpkt = None
......@@ -485,6 +490,7 @@ class FpmPktParser:
strict = True
msg = sock.recv(IF_LEN)
tofile = False
if not msg:
return b''
......@@ -492,6 +498,7 @@ class FpmPktParser:
if msg[0] == IfAddr.CMD:
mlen = ACK_LEN - IF_LEN
elif msg[0] == IfAddr.SCI:
tofile = True
mlen = self.scibytes
elif msg[0] == IfAddr.HK:
mlen = ACK_LEN - IF_LEN
......@@ -517,6 +524,9 @@ class FpmPktParser:
self.lastpkt = msg
self.nbytes_tot += len(msg)
if tofile and self.scifile is not None:
self.scifile.write(msg[1:])
return msg
def set_scibytes(self, nbytes):
......@@ -568,7 +578,7 @@ class FpmProcessor:
def __call__(self, buf, ts=None):
assert isinstance(buf, queue.Queue)
# assert isinstance(buf, queue.Queue)
try:
pkt = buf.get(timeout=self.queue_to)
......@@ -608,7 +618,10 @@ class FpmProcessor:
else:
if self.curfrm != -1:
self.frames.append(self.mk_evt_frame(framesize=self.framesize, verbose=verbose))
if not self.process:
self.frames.append(self.cfdata)
else:
self.frames.append(self.mk_evt_frame(framesize=self.framesize, verbose=verbose))
self.cfdata = ed
self.framecnt += 1
......@@ -662,6 +675,91 @@ class FpmProcessor:
self.frames.clear()
class FpmProcessorMod:
"""
Processes FPM packets and assembles event frames.
"""
def __init__(self, fromfile, framesize=NPIX_LD, ashex=True, process=True, notime=False, queue_to=10):
self.framesize = framesize
self.ashex = ashex
self.process = process
self.notime = notime
self.cfdata = b''
self.curfrm = -1
self.framecnt = 0
self.frames = []
self.queue_to = queue_to
self.fromfile = open(fromfile, 'rb')
def __call__(self, ts=None):
# assert isinstance(buf, queue.Queue)
try:
pkt = self.fromfile.read(EVT_PKT_ELEMENT_LEN)
if not pkt:
return
except:
return
self.frames.clear()
try:
# process data from SCI interface
self.proc(pkt)
if self.frames:
return self.frames
except Exception as err:
print(err)
def proc(self, ed, verbose=False):
if ed[0] == self.curfrm:
self.cfdata += ed
else:
if self.curfrm != -1:
if not self.process:
self.frames.append(self.cfdata)
else:
self.frames.append(self.mk_evt_frame(framesize=self.framesize, verbose=verbose))
self.cfdata = ed
self.framecnt += 1
self.curfrm = ed[0]
def mk_evt_frame(self, framesize=NPIX_LD, verbose=False):
try:
frame = EventFrame(self.cfdata, framesize=framesize)
if verbose:
print(frame)
except Exception as err:
print(err)
return self.cfdata
return frame
def close_file(self):
self.fromfile.close()
return self.frames
def reset(self):
self.cfdata = b''
self.curfrm = -1
self.framecnt = 0
self.frames.clear()
class FrameList(list):
def get_frame_ids(self):
......
......@@ -46,6 +46,8 @@ TM_HEADER_LEN, TC_HEADER_LEN, PEC_LEN = [packet_config.TM_HEADER_LEN, packet_con
Telemetry = {'PUS': DbTelemetry, 'RMAP': RMapTelemetry, 'FEE': FEEDataTelemetry}
REPORTFORMAT = cfg.get('ccs-viewer', 'drag_report_fmt').lower() == 'true' # format drag-action data for test report
class TMPoolView(Gtk.Window):
# (label, data column alignment)
......@@ -699,7 +701,15 @@ class TMPoolView(Gtk.Window):
).filter(
Telemetry[self.decoding_type].idx == model[my_iter][0]
)
selection_data.set_text(str(row.first().raw), -1)
rawpkt = row.first().raw
if REPORTFORMAT:
data = cfl.pktinfo_report(rawpkt)
else:
data = str(rawpkt)
selection_data.set_text(data, -1)
new_session.close()
def fetch_lines_from_db(self, offset=0, limit=None, sort=None, order='asc', buffer=10, rows=None, scrolled=False,
......@@ -1856,7 +1866,12 @@ class TMPoolView(Gtk.Window):
def on_drag_tmdata_get(self, treeview, drag_context, selection_data, info, time, *args):
treeselection = treeview.get_selection()
model, my_iter = treeselection.get_selected()
selection_data.set_text('{} = {}'.format(*model[my_iter][:2]), -1)
txt = '{} = {}'.format(*model[my_iter][:2])
if REPORTFORMAT:
txt = txt.replace('_', '\\_')
selection_data.set_text(txt, -1)
def create_decoder_bar(self):
box = Gtk.VBox()
......
......@@ -53,3 +53,26 @@ Use CCS/Ccs/tools/import_mib.py to import a set of SCOS2000 MIB files into the M
# CCS & TST
./start_ccs: starts the CCS
./start_tst: starts the TST
# INSTALLATION ON MAC
The CCS tools should generally also work on a Mac, the installation process will require some modifications and/or additional steps, however.
- it is recommended to use a Python venv (with system-site packages enabled):
python3 -m venv ve_ccs --system-site-packages
- some additional (Python) packages may need to be explicitly installed:
pip install setuptools
brew install libnotify
brew install pkg-config
brew install gtk+3
pip install dbus-python==1.2.18
brew install python-argcomplete
brew install openssl
export LDFLAGS="-L$(brew --prefix openssl)/lib"
export CPPFLAGS="-I$(brew --prefix openssl)/include"
export PKG_CONFIG_PATH="$(brew --prefix openssl)/lib/pkgconfig"
pip install mysqlclient
- potentially, versions have to be specified for some packages in requirements.txt (sqlalchemy==1.4, ipython==7.12.0)
......@@ -46,13 +46,13 @@ def run(jfile, outfile, reportfunc=False, specfile=None):
script += 'report = cfl.TestReport(specfile, rep_version, mib_version, gui=True)\n\n'
# init code
script += '# INIT CODE\n{}\n#! CCS.BREAKPOINT\n\n'.format(data['_custom_imports'])
script += '# INIT CODE\n{}\n#! CCS.BREAKPOINT\n\n'.format(data.get('_custom_imports'))
script += '# PRECONDITIONS\n# {}\n#! CCS.BREAKPOINT\n\n'.format(replace_newline(data['_precon_descr']))
# script += '{}\n\n\n'.format(data['_precon_code'].strip()) # Add the precondition code
for step in data['sequences'][0]['steps']:
comment = '# COMMENT: {}\n'.format(step['_step_comment'].strip()) if step['_step_comment'] != '' else ''
comment = '# COMMENT: {}\n'.format(replace_newline(step['_step_comment'])) if step['_step_comment'] != '' else ''
cmd_code = step['_command_code'].strip()
cmd_code += '\n' if cmd_code else ''
......@@ -68,9 +68,9 @@ def run(jfile, outfile, reportfunc=False, specfile=None):
'# {}\n' \
'{}' \
'{}' \
'# VERIFICATION: {}\n{}{}\n#! CCS.BREAKPOINT\n\n'.format(step['_step_number'], step['_description'].strip(), exec_step,
'# VERIFICATION: {}\n{}{}\n#! CCS.BREAKPOINT\n\n'.format(step['_step_number'], replace_newline(step['_description']), exec_step,
cmd_code,
step['_verification_description'].strip(),
replace_newline(step['_verification_description']),
verif_step,
# step['_verification_code'].strip(), # Add verification code
comment)
......
......@@ -13,35 +13,37 @@ def run(jfile, outfile):
else:
data = json.loads(jfile)
header = 'Item|Description|Verification|TestResult'
name = '{}|{}|Test spec. version: {}| IASW-{}'.format(data['_name'], replace_newline(data['_description']),
header = 'Item|Description|TMTC|Verification|TestResult'
name = '{}|{}|Test spec. version: {}| IASW-{}|'.format(data['_name'], replace_newline(data['_description']),
data['_spec_version'], data['_iasw_version'])
# Date from last time the json file was changed + current date
date = 'Date||{}|'.format(datetime.datetime.now().strftime('%Y-%m-%d'))
testcomment = 'Comment|{}||'.format(replace_newline(data['_comment']))
precond = 'Precond.|{}||'.format(replace_newline(data['_precon_descr']))
postcond = 'Postcond.|{}||'.format(replace_newline(data['_postcon_descr']))
date = 'Date||{}||'.format(datetime.datetime.now().strftime('%Y-%m-%d'))
testcomment = 'Comment|{}|||'.format(replace_newline(data['_comment']))
precond = 'Precond.|{}|||'.format(replace_newline(data['_precon_descr']))
postcond = 'Postcond.|{}|||'.format(replace_newline(data['_postcon_descr']))
requirements = 'Requ.|{}|||'.format(replace_newline(data['_requirements']))
steps = []
for step in data['sequences'][0]['steps']:
line = 'Step {}|{}|{}|'.format(step['_step_number'], replace_newline(step['_description']),
replace_newline(step['_verification_description']))
line = 'Step {}|{}|{}|{}|'.format(step['_step_number'], replace_newline(step['_description']),
replace_newline(step['_tmtc_comment']),
replace_newline(step['_verification_description']))
steps.append(line)
if step['_step_comment'] != '':
comment = 'Comment|{}||'.format(replace_newline(step['_step_comment']))
comment = 'Comment|{}|||'.format(replace_newline(step['_step_comment']))
steps.append(comment)
if outfile[-1] == '/': # If only path is given but no filename
outfile = outfile + data['_name'] + '-' + '-'.join(data['_spec_version'].split('-')[-2:]) + '.csv_PIPE'
outfile = outfile + data['_name'] + '-TS-' + '-'.join(data['_spec_version'].split('-')[-2:]) + '.csv_PIPE'
with open(outfile, 'w') as fd:
if len(data['_comment']) != 0:
buf = '\n'.join([header, name, date, testcomment, precond] + steps + [postcond])
buf = '\n'.join([header, name, date, testcomment, requirements, precond] + steps + [postcond])
else:
buf = '\n'.join([header, name, date, precond] + steps + [postcond])
buf = buf.replace('_', '\\_')
buf = '\n'.join([header, name, date, requirements, precond] + steps + [postcond])
buf = buf.replace('_', '\\_').replace('&', '\\&')
fd.write(buf)
......@@ -52,7 +54,7 @@ def replace_newline(txt):
if __name__ == '__main__':
json_file_path = sys.argv[1]
if len(sys.argv) > 1: # If filename is given
if len(sys.argv) > 2: # If filename is given
outputfile = sys.argv[2]
else: # If no filename is given take the working directory path, filename is used from the json file
outputfile = os.getcwd() + '/'
......
......@@ -113,6 +113,7 @@ class Step:
self._description = ''
self._command_code = ''
self._step_comment = ''
self._tmtc_comment = ''
self._verification_code = ''
self._verification_description = ''
self._is_active = True
......@@ -144,6 +145,7 @@ class Step:
new_step.description = copy.copy(self.description)
new_step.command_code = copy.copy(self.command_code)
new_step.step_comment = copy.copy(self.step_comment)
new_step.tmtc_comment = copy.copy(self.tmtc_comment)
new_step.verification_code = copy.copy(self.verification_code)
new_step.verification_description = copy.copy(self.verification_description)
new_step.is_active = copy.copy(self.is_active)
......@@ -224,6 +226,15 @@ class Step:
assert isinstance(value, str)
self._step_comment = value
@property
def tmtc_comment(self):
return self._tmtc_comment
@tmtc_comment.setter
def tmtc_comment(self, value: str):
assert isinstance(value, str)
self._tmtc_comment = value
@property
def verification_code(self):
return self._verification_code
......@@ -288,6 +299,7 @@ class Step:
self.description = step['_description']
self.command_code = step['_command_code']
self.step_comment = step['_step_comment']
self.tmtc_comment = step['_tmtc_comment']
self.verification_code = step['_verification_code']
self.verification_description = step['_verification_description']
self.is_active = step['_is_active']
......@@ -902,6 +914,7 @@ class TestSpecification:
self._description = ''
self._spec_version = ''
self._iasw_version = ''
self._requirements = ''
self._primary_counter_locked = False
self._precon_name = ''
self._precon_code = ''
......@@ -924,6 +937,7 @@ class TestSpecification:
new_testspec.description = copy.copy(self.description)
new_testspec.spec_version = copy.copy(self.spec_version)
new_testspec.iasw_version = copy.copy(self.iasw_version)
new_testspec.requirements = copy.copy(self.requirements)
new_testspec.primary_counter_locked = copy.copy(self.primary_counter_locked)
new_testspec.precon_name = copy.copy(self.precon_name)
new_testspec.precon_code = copy.copy(self.precon_code)
......@@ -985,6 +999,15 @@ class TestSpecification:
assert isinstance(value, str)
self._iasw_version = value
@property
def requirements(self):
return self._requirements
@requirements.setter
def requirements(self, value: str):
assert isinstance(value, str)
self._requirements = value
@property
def precon_name(self):
return self._precon_name
......@@ -1082,6 +1105,7 @@ class TestSpecification:
self.description = json_data['_description']
self.spec_version = json_data['_spec_version']
self.iasw_version = json_data['_iasw_version']
self.requirements = json_data['_requirements']
self.primary_counter_locked = json_data['_primary_counter_locked']
self.precon_name = json_data['_precon_name']
self.precon_code = json_data['_precon_code']
......
......@@ -17,7 +17,7 @@ data_type_snippet = 'snippet'
data_type_step = 'step'
def create_datastring(data_type, sequence='', step_number='', description='', comment='', command_code='', verification_code='', verification_descr='', logger=logger):
def create_datastring(data_type, sequence='', step_number='', description='', comment='', tmtc='', command_code='', verification_code='', verification_descr='', logger=logger):
if data_type == data_type_snippet:
step_number = ''
# build the data string
......@@ -27,6 +27,7 @@ def create_datastring(data_type, sequence='', step_number='', description='', co
data_string += separator + step_number
data_string += separator + description
data_string += separator + comment
data_string += separator + tmtc
data_string += separator + command_code
data_string += separator + verification_code
data_string += separator + verification_descr
......@@ -49,15 +50,17 @@ def read_datastring(data_string: str, logger=logger) -> dict:
step_number = data[2]
description = data[3]
comment = data[4]
command_code = data[5]
verification_code = data[6]
verification_descr = data[7]
tmtc = data[5]
command_code = data[6]
verification_code = data[7]
verification_descr = data[8]
data_dict = {
'data_type': data_type,
'sequence': sequence,
'step_number': step_number,
'description': description,
'comment': comment,
'tmtc': tmtc,
'command_code': command_code,
'verification_code': verification_code,
'verification_descr' : verification_descr
......
......@@ -135,6 +135,15 @@ class Board(Gtk.Box):
self.test_meta_data_iasw_box.pack_start(self.test_meta_data_iasw_version_label, False, False, 4)
self.test_meta_data_iasw_box.pack_end(self.test_meta_data_iasw_version, False, False, 0)
self.test_meta_data_info.pack_start(self.test_meta_data_iasw_box, True, True, 0)
# IASW Requirements
self.test_meta_data_req_label = Gtk.Label()
self.test_meta_data_req_label.set_text('Requirements:')
self.test_meta_data_req = Gtk.Entry(width_chars=25)
self.test_meta_data_req.set_placeholder_text('< Requirements >')
self.test_meta_data_req_box = Gtk.Box(spacing=5, orientation=Gtk.Orientation.HORIZONTAL)
self.test_meta_data_req_box.pack_start(self.test_meta_data_req_label, False, False, 4)
self.test_meta_data_req_box.pack_end(self.test_meta_data_req, False, False, 0)
self.test_meta_data_info.pack_start(self.test_meta_data_req_box, True, True, 0)
# checkbox for locking the step numbers
self.test_is_locked_label = Gtk.Label()
self.test_is_locked_label.set_text(_('Lock step enumeration:'))
......@@ -291,6 +300,7 @@ class Board(Gtk.Box):
self.test_meta_data_desc.connect('changed', self.on_test_desc_change)
self.test_meta_data_spec_version.connect('changed', self.on_test_spec_version_change)
self.test_meta_data_iasw_version.connect('changed', self.on_test_iasw_version_change)
self.test_meta_data_req.connect('changed', self.on_test_requirements_change)
self.text_meta_data_test_is_locked.connect('toggled', self.on_test_locked_toggled)
self.test_meta_data_comment.get_buffer().connect('changed', self.on_comment_change)
self.custom_import_buffer.connect('changed', self.on_custom_import_change)
......@@ -344,6 +354,8 @@ class Board(Gtk.Box):
self.test_meta_data_spec_version.set_text(self.model.spec_version)
# set the Software version of the test specification from the data model
self.test_meta_data_iasw_version.set_text(self.model.iasw_version)
# set the Requirements of the test specification from the data model
self.test_meta_data_req.set_text(self.model.requirements)
# set the pre-condition name
if self.model.precon_name:
found = False
......@@ -544,6 +556,14 @@ class Board(Gtk.Box):
# update the data model viewer
self.app.update_model_viewer()
def on_test_requirements_change(self, widget):
# get the IASW Version out of the text buffer of the widget
requirements = widget.get_text()
# update the model
self.model.requirements = requirements
# update the data model viewer
self.app.update_model_viewer()
def on_test_locked_toggled(self, *args):
# toggle the value in the widget
self.test_is_locked = not self.test_is_locked
......@@ -785,9 +805,13 @@ class StepWidget(Gtk.EventBox):
# fields for commands and verification
# lm = GtkSource.LanguageManager()
# Area for Commands and TM/TC
self.commands_and_tmtc_box = Gtk.Grid()
self.commands_and_tmtc_box.set_column_homogeneous(False)
# Area for the commands
self.whole_commands_box = Gtk.Box()
self.whole_commands_box.set_orientation(Gtk.Orientation.VERTICAL)
#self.whole_commands_box = Gtk.Box()
#self.whole_commands_box.set_orientation(Gtk.Orientation.VERTICAL)
# Make the label, inside a own Box to show it on the left side
self.lbl_box_commands = Gtk.Box()
self.lbl_box_commands.set_orientation(Gtk.Orientation.HORIZONTAL)
......@@ -800,7 +824,7 @@ class StepWidget(Gtk.EventBox):
# Make the area where the real command is entered
# self.detail_box.pack_start(self.lbl_box_commands, True, True, 0)
self.commands_scrolled_window = Gtk.ScrolledWindow()
self.commands_scrolled_window.set_size_request(-1, 200)
self.commands_scrolled_window.set_size_request(600, 200)
self.commands_view = GtkSource.View()
self.commands_view.set_auto_indent(True)
self.commands_view.set_wrap_mode(Gtk.WrapMode.WORD)
......@@ -827,9 +851,65 @@ class StepWidget(Gtk.EventBox):
# self.commands_buffer.set_style_scheme(self.board.current_scheme)
self.commands_scrolled_window.add(self.commands_view)
self.whole_commands_box.pack_start(self.lbl_box_commands, False, False, 0)
self.whole_commands_box.pack_start(self.commands_scrolled_window, True, True, 0)
self.detail_box.pack_start(self.whole_commands_box, True, True, 0)
#self.whole_commands_box.pack_start(self.lbl_box_commands, False, False, 0)
#self.whole_commands_box.pack_start(self.commands_scrolled_window, True, True, 0)
#self.commands_and_tmtc_box.pack_start(self.whole_commands_box, True, True, 0)
#self.detail_box.pack_start(self.commands_and_tmtc_box, True, True, 0)
# Area for TM/TC
#self.whole_tmtc_box = Gtk.Box()
#self.whole_tmtc_box.set_orientation(Gtk.Orientation.VERTICAL)
# box for TM/TC
self.tmtc_box = Gtk.Box()
self.tmtc_box.set_orientation(Gtk.Orientation.HORIZONTAL)
self.tmtc_label = Gtk.Label.new()
self.tmtc_label.set_text(_('TM/TC'))
self.tmtc_box.pack_start(self.tmtc_label, False, False, 0)
self.tmtc_scrolled_window = Gtk.ScrolledWindow()
self.tmtc_scrolled_window.set_size_request(195, 200)
self.tmtc_view = GtkSource.View()
self.tmtc_view.set_auto_indent(True)
self.tmtc_view.set_wrap_mode(Gtk.WrapMode.WORD)
#self.tmtc_view.set_show_line_numbers(True)
# self.tmtc_view.set_show_right_margin(True)
self.tmtc_view.set_monospace(True)
#self.tmtc_view.set_highlight_current_line(True)
self.tmtc_view.set_indent_on_tab(True)
self.tmtc_view.set_insert_spaces_instead_of_tabs(True)
self.tmtc_view.set_indent_width(4)
self.tmtc_view.set_auto_indent(True)
self.tmtc_comment_buffer = self.tmtc_view.get_buffer()
# draganddrop here
"""
self.tmtc_view.drag_dest_set(Gtk.DestDefaults.ALL, [], Gdk.DragAction.COPY)
self.tmtc_view.drag_dest_set_target_list(None)
self.tmtc_view.drag_dest_add_text_targets()
self.tmtc_view.connect("drag-motion", self.on_drag_motion_2)
self.tmtc_view.connect("drag-leave", self.on_drag_leave)
"""
self.tmtc_comment_buffer.set_language(lngg)
# self.tmtc_buffer.set_style_scheme(self.board.current_scheme)
self.tmtc_scrolled_window.add(self.tmtc_view)
#self.whole_tmtc_box.pack_start(self.tmtc_box, False, False, 0)
#self.whole_tmtc_box.pack_start(self.tmtc_scrolled_window, True, True, 0)
# self.commands_and_tmtc_box.pack_start(self.whole_tmtc_box, True, True, 0)
# self.detail_box.pack_start(self.commands_and_tmtc_box, True, True, 0)
#ADD everything to the whole grid
self.commands_and_tmtc_box.set_column_spacing(10)
self.commands_and_tmtc_box.attach(self.lbl_box_commands, 0, 0, 3, 1)
self.commands_and_tmtc_box.attach(self.commands_scrolled_window, 0, 1, 3, 5)
self.commands_and_tmtc_box.attach_next_to(self.tmtc_box, self.lbl_box_commands, Gtk.PositionType.RIGHT, 3, 1)
self.commands_and_tmtc_box.attach_next_to(self.tmtc_scrolled_window, self.commands_scrolled_window, Gtk.PositionType.RIGHT, 3, 5)
self.detail_box.pack_start(self.commands_and_tmtc_box, True, True, 0)
# area for the verification
self.whole_verification_box = Gtk.Grid()
self.whole_verification_box.set_column_homogeneous(True)
......@@ -922,6 +1002,7 @@ class StepWidget(Gtk.EventBox):
self.desc_text_buffer.connect('changed', self.on_description_buffer_changed)
self.commands_buffer.connect('changed', self.on_commands_buffer_changed)
self.step_comment_buffer.connect('changed', self.on_step_comment_buffer_changed)
self.tmtc_comment_buffer.connect('changed', self.on_tmtc_buffer_changed)
self.verification_buffer.connect('changed', self.on_verification_buffer_changed)
self.verification_description_buffer.connect('changed', self.on_verification_description_buffer_changed)
......@@ -1020,6 +1101,7 @@ class StepWidget(Gtk.EventBox):
self.set_description_in_widget()
self.set_commands_in_widget()
self.set_step_comment_in_widget()
self.set_tmtc_comment_in_widget()
self.set_verification_in_widget()
self.set_verification_description_in_widget()
self.set_start_sequence_in_widget()
......@@ -1058,6 +1140,13 @@ class StepWidget(Gtk.EventBox):
step_comment = self.model.get_sequence(self.sequence).steps[stp_ndx].step_comment
self.step_comment_buffer.set_text(step_comment)
return
def set_tmtc_comment_in_widget(self):
""" gets the commands comment from the model and sets it in the commands comment buffer in order to display it """
stp_ndx = self.model.get_sequence(self.sequence).get_step_index(self.step_number)
tmtc_comment = self.model.get_sequence(self.sequence).steps[stp_ndx].tmtc_comment
self.tmtc_comment_buffer.set_text(tmtc_comment)
return
def set_verification_in_widget(self):
""" gets the commands from the model and sets it in the commands buffer in order to display it """
......@@ -1144,6 +1233,7 @@ class StepWidget(Gtk.EventBox):
step_number = step.step_number
description = step.description
comment = step.step_comment
tmtc = step.tmtc_comment
command_code = step.command_code
verification_code = step.verification_code
verification_descr = step.verification_description
......@@ -1153,6 +1243,7 @@ class StepWidget(Gtk.EventBox):
step_number,
description,
comment,
tmtc,
command_code,
verification_code,
verification_descr,
......@@ -1231,6 +1322,7 @@ class StepWidget(Gtk.EventBox):
step.description = data['description']
step.command_code = data['command_code']
step.step_comment = data['comment']
step.tmtc_comment = data['tmtc']
step.verification_code = data['verification_code']
step.verification_description = data['verification_descr']
if drag_source_type == dnd_data_parser.data_type_step: # a step is moved
......@@ -1332,6 +1424,24 @@ class StepWidget(Gtk.EventBox):
# update the data model viewer
self.app.update_model_viewer()
def on_tmtc_buffer_changed(self, text_buffer):
"""
Signal 'changed' for the tmtc comment buffer
"""
# get the text of the commands comment out of the buffer of the widget
tmtc_comment = self.read_out_text_buffer(text_buffer)
# Setting the commands string for a step in the data model
# find the correct step within the data model
stp_ndx = self.model.get_sequence(self.sequence).get_step_index(self.step_number)
step_in_data_model = self.model.get_sequence(self.sequence).steps[stp_ndx]
# use the setter of the data model
if isinstance(step_in_data_model, data_model.Step):
step_in_data_model.tmtc_comment = tmtc_comment
else:
self.logger('step with the step number {} could not be found'.format(self.step_number))
# update the data model viewer
self.app.update_model_viewer()
def on_verification_buffer_changed(self, text_buffer):
"""
Signal 'changed' for the verification source buffer
......@@ -1648,6 +1758,7 @@ class InterStepWidget(Gtk.Box):
# set the data into the test script data model
new_step.description = data['description']
new_step.step_comment = data['comment']
new_step.tmtc_comment = data['tmtc']
new_step.command_code = data['command_code']
new_step.verification_code = data['verification_code']
new_step.verification_description = data['verification_descr']
......@@ -1672,6 +1783,7 @@ class InterStepWidget(Gtk.Box):
# set the data into the test script data model
new_step.description = data['description']
new_step.step_comment = data['comment']
new_step.tmtc_comment = data['tmtc']
new_step.command_code = data['command_code']
new_step.verification_code = data['verification_code']
new_step.verification_description = data['verification_descr']
......