Skip to content
Snippets Groups Projects
Select Git revision
  • 3e1c6338d36dd3b5d067b12965d09c46ae5a3230
  • release default protected
  • workshop
3 results

tcgui.py

Blame
  • tcgui.py 13.21 KiB
    import gi
    import matplotlib
    
    matplotlib.use('GTK3Cairo')
    gi.require_version('Notify', '0.7')
    import datetime
    from gi.repository import Gtk, Notify
    import numpy as np
    
    from database.tm_db import scoped_session_maker
    
    
    class TcGui(Gtk.Window):
        autoscroll = 1
        tclog = Gtk.ListStore(str, str, str)
        column_names = ['Time', 'TC Name', 'Parameters']
        cmd_archive = {}
    
        def __init__(self, cfg, ccs, tcpool=None):
            # Gtk.Window.__init__(self, title="TC Control", default_height=500, default_width=800)
            super(TcGui, self).__init__(title="TC Control", default_height=500, default_width=800)
    
            self.set_border_width(5)
            self.set_resizable(True)
            self.set_position(Gtk.WindowPosition.CENTER)
    
            self.ccs = ccs
            self.cfg = cfg
            self.poolmgr = ccs.poolmgr
            self.session_factory = self.poolmgr.session_factory
    
            Notify.init("TCGui")
    
            if tcpool is None:
                self.poolmgr.logger.warning("No TC Pool selected!")
                # Notify.Notification.new('No TC Pool selected!').show()
            elif tcpool not in self.poolmgr.loaded_pools:
                self.poolmgr.logger.warning("Cannot set {}. Not a loaded pool.".format(tcpool))
                # Notify.Notification.new("Cannot set {}. Not a loaded pool.".format(tcpool)).show()
                return
            self.pool_name = tcpool
            self.set_title("TC Control (Pool: {})".format(tcpool))
    
            if self.pool_name not in self.poolmgr.tc_connections:
                self.poolmgr.logger.warning("TC socket not connected!")
    
            cmd_model = self.create_cmd_model()
    
            box = Gtk.ComboBoxText()
            box.set_model(cmd_model)
            # box.get_child().set_placeholder_text('Select TC')
            box.set_tooltip_text('Select TC')
            but = Gtk.Button()
            but.set_label('Create TC')
            but.grab_focus()
            but.connect('clicked', self.on_click, box)
    
            resend_but = Gtk.Button(label='Send TC', tooltip_text='Send TC selected in list')
            # resend_but.set_sensitive(False)
    
            self.treeview = self.create_treeview()
            self.selection = self.treeview.get_selection()
            self.selection.connect('changed', self.tree_selection_changed, resend_but)
            resend_but.connect('clicked', self.resend_tc, self.selection)
    
            scrolled_window = Gtk.ScrolledWindow()
            scrolled_window.add(self.treeview)
    
            scrolled_window.connect('edge-reached', self.edge_reached)
            scrolled_window.connect('edge-overshot', self.edge_reached)
            scrolled_window.connect('scroll-event', self.scroll_event)
    
            vbox = Gtk.VBox()
            hbox = Gtk.HBox()
    
            hbox.pack_start(box, 0, 0, 0)
            hbox.pack_start(but, 0, 0, 5)
            hbox.pack_end(resend_but, 0, 0, 0)
    
            vbox.pack_start(hbox, 0, 0, 2)
            vbox.pack_start(scrolled_window, 1, 1, 2)
            self.add(vbox)
            self.show_all()
    
        def set_pool(self, tcpool):
            if tcpool not in self.poolmgr.loaded_pools:
                self.poolmgr.logger.warning("Cannot set {}. Not a loaded pool.".format(tcpool))
                # Notify.Notification.new("Cannot set {}. Not a loaded pool.".format(tcpool)).show()
                return
            self.pool_name = tcpool
            self.set_title("TC Control (Pool: {})".format(tcpool))
            self.show_all()
    
        def create_cmd_model(self):
            # cmd_model = Gtk.ListStore(str)
            cmd_model = Gtk.TreeStore(str)
            dbcon = self.session_factory()
            dbres = dbcon.execute('SELECT ccf_descr,ccf_type,ccf_stype from ccf order by ccf_type,ccf_stype')
            tcs = dbres.fetchall()
            dbcon.close()
    
            categ = ['DPU_DBS', 'DPU_IFSW', 'SES']
    
            for c in categ:
                it = cmd_model.append(None, [c])
                [cmd_model.append(it, ['({:d},{:d})\t'.format(*tc[1:]) + tc[0]]) for tc in tcs if tc[0].startswith(c)]
            return cmd_model
    
        def create_treeview(self):
            # self.textview = Gtk.TextView(editable=False, cursor_visible=False, monospace=True)
            treeview = Gtk.TreeView(self.tclog)
    
            for i, name in enumerate(self.column_names):
                render = Gtk.CellRendererText(font='monotype')
                column = Gtk.TreeViewColumn(name, render, text=i)
                # column.set_sort_column_id(i)
                column.set_resizable(True)
                treeview.append_column(column)
            treeview.connect('size-allocate', self.treeview_update)
            return treeview
    
        def on_click(self, widget, box):
            model = box.get_model()
            path = box.get_active_iter()
            tc = model[path][0].split('\t')[1]
    
            dbcon = self.session_factory()
            dbres = dbcon.execute(
                'select CCF_NPARS,CCF_DESCR2,CDF_PNAME,CDF_GRPSIZE from ccf left join cdf on ccf_cname=cdf_cname where ccf_descr="%s"' % tc)
            fetch = dbres.fetchall()
            dbcon.close()
            npars, descr2, *_ = fetch[0]
    
            if npars != 0:
                win2 = CommandWindow()
                # win2 = Gtk.Window()
                # ask for number of parameter repetitions in case of var. length TCs 
                if any([i[-1] for i in fetch]):
                    pnum = self.ask_par(win2)
                    if pnum == 0:
                        return
                else:
                    pnum = 0
    
                self.command_string = Gtk.Label('')
                self.command_string.set_selectable(True)
                self.command_string.set_padding(5, 1)
    
                pbox = self.tc_setup(tc, pnum)
                vbox = Gtk.VBox()
                hbox = Gtk.HBox()
                b = Gtk.Button()
                b.set_label('Cancel')
                b.connect('clicked', self.wdestroy, win2)
    
                b2 = Gtk.Button()
                b2.set_label('Create TC')
                b2.connect('clicked', self.tc_getpars, tc, pbox)
                b2.grab_focus()
    
                d = Gtk.Label(descr2.strip('_'))
                win2.set_title(tc.strip('_'))
    
                hbox.pack_start(d, 1, 0, 5)
                hbox.pack_start(b, 1, 0, 5)
                hbox.pack_end(b2, 1, 0, 5)
    
                vbox.pack_end(hbox, 1, 0, 5)
                vbox.pack_start(pbox, 0, 0, 5)
                vbox.pack_start(self.command_string, 1, 1, 0)
    
                win2.add(vbox)
                win2.connect('delete-event', self.wdestroy)
                win2.show_all()
            else:
                # pckt = self.ccs.Tcbuild(tc)
                self.log_tc(tc, pckt=[tc])
    
        def wdestroy(self, widget, w, *args):
            try:
                w.destroy()
            except AttributeError:
                widget.destroy()
    
        def tc_setup(self, tcname, reps):
            dbcon = self.session_factory()
            dbres = dbcon.execute(
                'SELECT cpc_descr,cpc_pafref,cdf_grpsize FROM ccf left join cdf on ccf_cname=cdf_cname left join cpc on cdf_pname=cpc_pname where ccf_descr="%s" and cpc_descr!="None"' % tcname)
            pars = dbres.fetchall()
            pbox = Gtk.HBox()
    
            for par in pars:
                if par[1] is not None:
                    model = Gtk.ListStore(str)
                    dbres = dbcon.execute(
                        'SELECT pas_altxt,pas_alval FROM cpc left join pas on cpc_pafref=pas_numbr where cpc_descr="%s"' %
                        par[0])
                    paslist = dbres.fetchall()
                    paslist = [i[0] for i in paslist]
                    for pas in paslist:
                        model.append([pas])
    
                    pfield = Gtk.ComboBoxText()
                    pfield.set_model(model)
                    pfield.set_tooltip_text(par[0])
    
                else:
                    dbres = dbcon.execute(
                        'SELECT prv_minval,prv_maxval from prv left join cpc on cpc_prfref=prv_numbr where cpc_descr="%s"' %
                        par[0])
                    fetch = dbres.fetchall()
                    limits = '\n'.join(['[{} - {}]'.format(*x) for x in fetch])
                    pfield = Gtk.Entry()
                    pfield.set_placeholder_text(self.ccs.none_to_empty(par[0]))
                    pfield.set_tooltip_text(par[0] + '\n' + limits)
    
                pbox.pack_start(pfield, 0, 0, 5)
    
            grpsize = [i[2] for i in pars]
    
            if np.any(grpsize) and reps > 1:
                repbox = Gtk.VBox()
                repbox.pack_start(pbox, 0, 0, 2)
                grpos = np.argwhere(grpsize)[0, 0]
                for i in range(reps - 1):
                    pbox = Gtk.HBox()
    
                    for par in pars[grpos + 1:][::-1]:
                        if par[1] is not None:
                            model = Gtk.ListStore(str)
                            dbres = dbcon.execute(
                                'SELECT pas_altxt,pas_alval FROM cpc left join pas on cpc_pafref=pas_numbr where cpc_descr="%s"' %
                                par[0])
                            paslist = dbres.fetchall()
                            paslist = [i[0] for i in paslist]
                            for pas in paslist:
                                model.append([pas])
    
                            pfield = Gtk.ComboBoxText()
                            pfield.set_model(model)
                            pfield.set_tooltip_text(par[0])
    
                        else:
                            pfield = Gtk.Entry()
                            pfield.set_placeholder_text(par[0])
                            pfield.set_tooltip_text(par[0])
    
                        pbox.pack_end(pfield, 0, 0, 5)
                    repbox.pack_start(pbox, 0, 0, 2)
                dbcon.close()
                return repbox
    
            else:
                self.command_string.set_markup('<span style="italic">Tcsend_DB("{}", {})</span>'.format(tcname, ', '.join(
                    [x.get_placeholder_text() if hasattr(x, 'get_placeholder_text') else x.get_tooltip_text()
                     for x in pbox.get_children()])))
                dbcon.close()
                return pbox
    
        def tc_getpars(self, widget, tcname, pbox=None):
            pars = [tcname]
    
            if isinstance(pbox, Gtk.HBox):
                pfields = pbox.get_children()
            elif isinstance(pbox, Gtk.VBox):
                pfields = [i for x in [hbox.get_children() for hbox in pbox.get_children()] for i in x]
    
            for par in pfields:
                if 'get_active' in par.__dir__():
                    # dbres = self.dbcon.execute('SELECT pas_alval FROM pas where pas_altxt="%s";'%par.get_active_text())
                    # pars.append(self.ccs.c.fetchone()[0])
                    pars.append(par.get_active_text())
                else:
                    parval = par.get_text()
                    if par.get_placeholder_text() in ['DPU_IFSW_SkyPattern', 'DPU_DBS_FREE_BOOT_PARAM']:
                        parval = bytes.fromhex(parval)
                    pars.append(parval)
    
            if None in pars:
                return
    
            pars = list(map(self.ccs.str_convert, pars))
            # pckt = self.ccs.Tcbuild(*pars)
            self.log_tc(tcname, pckt=pars, params=
                        ', '.join([x.get_active_text() if 'get_active' in x.__dir__() else x.get_text() for x in pfields]))
    
        def ask_par(self, widget, minval=0):
            dia = Gtk.Dialog('# of Parameters', widget, 0, ('Cancel', Gtk.ResponseType.CANCEL, 'OK', Gtk.ResponseType.OK))
    
            adj = Gtk.Adjustment(1, minval, 20, 1, 5, 0)
            e = Gtk.SpinButton.new(adj, adj.get_step_increment(), 0)
            e.set_numeric(1)
            cbox = dia.get_content_area()
            cbox.add(e)
            dia.show_all()
    
            resp = dia.run()
            if resp == Gtk.ResponseType.OK:
                parnum = e.get_value_as_int()
                # print(parnum)
            else:
                parnum = 0
            dia.destroy()
            return parnum
    
        def log_tc(self, tc, params='', pckt=None):
            timestamp = datetime.datetime.utcnow().strftime('%T.%f')[:-3]
            # text = '[{:s}]\t{:s}\t{:s}\n'.format(timestamp,tc,params)
            self.tclog.append([timestamp, tc, params])
            self.cmd_archive[timestamp] = pckt
            return
    
        def resend_tc(self, widget, selection):
            assert selection
            model, treepath = self.selection.get_selected_rows()
    
            if len(treepath) == 0:
                return
            row = model[treepath]
            tc_config = self.cmd_archive[row[0]]
            if self.pool_name not in self.poolmgr.tc_connections:
                self.poolmgr.logger.warning("Not connected to TC socket!")
                # Notify.Notification.new("Not connected to TC socket!").show()
                return
            tc, (st, sst, apid) = self.ccs.Tcbuild(*tc_config)
            self.poolmgr.tc_send(self.pool_name, tc.bytes)
            self.ccs.counters[int(str(apid), 0)] += 1
            self.poolmgr.logger.info('TC %s,%s sent to %s\n' % (st, sst, apid))
    
        def tree_selection_changed(self, selection, button):
            # if selection.count_selected_rows:
            #     button.set_sensitive(True)
            # else:
            #     button.set_sensitive(False)
            return
    
        def treeview_update(self, widget, event, data=None):
            if self.autoscroll:
                adj = widget.get_vadjustment()
                # if self.sort_order == Gtk.SortType.DESCENDING:
                adj.set_value(adj.get_upper() - adj.get_page_size())
    
        def edge_reached(self, widget, event, data=None):
            self.autoscroll = 1;
            return
    
        def scroll_bar(self, widget):
            # a little crude, but we want to catch scrollbar-drag events too
            self.autoscroll = 0;
            return
    
        def scroll_event(self, widget, event, data=None):
            self.autoscroll = 0;
            return
    
    
    class CommandWindow(Gtk.Window):
        def __init__(self, parent=None):
            # Gtk.Window.__init__(self)
            super(CommandWindow, self).__init__()
    
    
    if __name__ == "__main__":
        win = TcGui()
        win.connect("delete-event", Gtk.main_quit)
        win.show_all()
        Gtk.main()