Skip to content
Snippets Groups Projects
Select Git revision
  • d05bbd1c772d96f8010066460162b3d3205755a3
  • master default protected
  • replication_test
  • release-1.10 protected
  • dev protected
  • 556-usage-statistics
  • 553-semantic-recommendation-2
  • 553-semantic-recommendation
  • release-1.9 protected
  • 551-init-broker-service-permissions
  • 549-test-oai-pmh
  • 545-saving-multiple-times-breaks-pid-metadata
  • 499-standalone-compute-service-2
  • 539-load-tests
  • hotfix/helm-chart
  • luca_ba_new_interface
  • 534-bug-when-adding-access-to-user-that-is-not-registered-at-dashboard-service
  • release-1.8 protected
  • 533-integrate-semantic-recommendation
  • feature/openshift
  • 518-spark-doesn-t-map-the-headers-correct
  • v1.10.4 protected
  • v1.10.3 protected
  • v1.10.2 protected
  • v1.10.1 protected
  • v1.10.0-rc13 protected
  • v1.10.0-rc12 protected
  • v1.10.0-rc11 protected
  • v1.10.0-rc10 protected
  • v1.10.0-rc9 protected
  • v1.10.0-rc8 protected
  • v1.10.0-rc7 protected
  • v1.10.0-rc6 protected
  • v1.10.0-rc5 protected
  • v1.10.0-rc4 protected
  • v1.10.0-rc3 protected
  • v1.10.0-rc2 protected
  • v1.10.0rc1 protected
  • v1.10.0rc0 protected
  • v1.10.0 protected
  • v1.9.3 protected
41 results

app.py

Blame
  • wfi_de_communication.py 1.54 KiB
    """
    Examples for Athena WFI DE communications
    """
    
    import communication as com
    import packet_config_ATHENA_DE as de
    
    # set up socket and connect
    decon = com.Connector('', 12345, msgdecoding='hex')
    decon.connect()
    
    # example commands
    # test echo interface (0x20)
    decon.send(b'\x20\xDE\xAD')
    
    # HK interface 0x33
    decon.send(de.HkCmdRead(0x1000))  # get PCM MODE register
    decon.send(de.HkCmdWrite(0x1000, 0x0001))  # set PCM MODE register
    
    # CMD interface 0x34
    decon.send(de.CmdWrite(0x3C00, 0x0001))  # write sequencer register
    decon.send(de.CmdWrite(0x3C00, 1), rx=False)  # write sequencer register, but don't fetch cmd response from socket
    decon.send(de.CmdRead(0x3C00))  # read sequencer register
    
    # SCI interface 0x35
    decon.send(de.SciCmd(100))  # set science data output rate
    
    # dump cmd log (decon.log)
    logfile = '/path/to/de_cmd.log'
    decon.dump_log(logfile)
    
    # automatically log to file
    decon.setup_storage(logfile)
    
    # run rx thread on socket, received data is put in recvd_data_buf queue
    decon.start_receiver()
    decon.receiver.recvd_data_buf
    
    
    # custom TM processing function; must take bytestring as arg *data*, and timestamp kwarg *ts*
    def msg_to_hex_string(data, ts=''):
        try:
            return '{}: {}\n'.format(ts, data.hex(' ', 1))
        except Exception as err:
            print(err)
            return '# ERROR #\n'
    
    
    # optionally, add custom TM processing
    # this logs the received data hex-formatted in outfile
    decon.start_receiver(procfunc=msg_to_hex_string, outfile='/path/to/de_rx.log', ofmode='w')
    
    # processed data is also collected in
    decon.receiver.proc_data