Skip to content
Snippets Groups Projects
Commit f156f852 authored by Marko Mecina's avatar Marko Mecina
Browse files

add generic proc_func

+ minor usability improvements
parent c1a613c8
No related branches found
No related tags found
No related merge requests found
...@@ -70,10 +70,11 @@ class Connector: ...@@ -70,10 +70,11 @@ class Connector:
self.conn.settimeout(self.response_to) self.conn.settimeout(self.response_to)
def _close(self, servershtdwn): def _close(self, servershtdwn):
self.conn.close() if self.conn.fileno() != -1:
print('Closed connection to {}:{}'.format(self.host, self.port)) self.conn.close()
print('Closed connection to {}:{}'.format(self.host, self.port))
if servershtdwn: if servershtdwn and self.sockfd.fileno() != -1:
print('Closing server {}'.format(self.sockfd.getsockname())) print('Closing server {}'.format(self.sockfd.getsockname()))
self.sockfd.close() self.sockfd.close()
...@@ -115,7 +116,7 @@ class Connector: ...@@ -115,7 +116,7 @@ class Connector:
resp = b'' resp = b''
if rx: if rx:
resp = self._recv_response() resp += self._recv_response()
self.log.append((t, msg, resp)) self.log.append((t, msg, resp))
...@@ -159,10 +160,12 @@ class Connector: ...@@ -159,10 +160,12 @@ class Connector:
self.conn.settimeout(seconds) self.conn.settimeout(seconds)
self.response_to = seconds self.response_to = seconds
def start_receiver(self, procfunc=None): def start_receiver(self, procfunc=None, outfile=None, ofmode='w'):
""" """
:param procfunc: function that must return a list :param procfunc:
:param outfile:
:param ofmode:
:return: :return:
""" """
if self.conn is None: if self.conn is None:
...@@ -170,11 +173,19 @@ class Connector: ...@@ -170,11 +173,19 @@ class Connector:
return return
if self.receiver is None: if self.receiver is None:
self.receiver = Receiver([self.conn], procfunc=procfunc) self.receiver = Receiver([self.conn], procfunc=procfunc, outfile=outfile, ofmode=ofmode)
self.receiver.start() self.receiver.start()
else: else:
print('Receiver already initialised') print('Receiver already initialised')
def stop_receiver(self):
if self.receiver is None:
print('No receiver to stop')
return
self.receiver.stop()
self.receiver = None
class Receiver: class Receiver:
""" """
...@@ -185,7 +196,7 @@ class Receiver: ...@@ -185,7 +196,7 @@ class Receiver:
SEL_TIMEOUT = 2 SEL_TIMEOUT = 2
RECV_BUF_SIZE = 1024**3 RECV_BUF_SIZE = 1024**3
def __init__(self, sockfds, procfunc=None, recv_buf_size=RECV_BUF_SIZE): def __init__(self, sockfds, procfunc=None, recv_buf_size=RECV_BUF_SIZE, outfile=None, ofmode='w'):
self.sockfds = sockfds self.sockfds = sockfds
self.recvd_data_buf = queue.Queue(recv_buf_size) self.recvd_data_buf = queue.Queue(recv_buf_size)
...@@ -194,13 +205,18 @@ class Receiver: ...@@ -194,13 +205,18 @@ class Receiver:
self._proc_thread = None self._proc_thread = None
self.proc_data = [] self.proc_data = []
if outfile is not None:
self.proc_data_fd = open(outfile, ofmode)
else:
self.proc_data_fd = None
self._isrunning = False self._isrunning = False
def start(self): def start(self):
if (self._recv_thread is None) or (not self._recv_thread.is_alive()): if (self._recv_thread is None) or (not self._recv_thread.is_alive()):
self._start_recv() self._start_recv()
else: else:
print('Already running!') print('Recv already running!')
if self._procfunc is not None: if self._procfunc is not None:
if (self._proc_thread is None) or (not self._proc_thread.is_alive()): if (self._proc_thread is None) or (not self._proc_thread.is_alive()):
...@@ -212,6 +228,7 @@ class Receiver: ...@@ -212,6 +228,7 @@ class Receiver:
def _start_recv(self): def _start_recv(self):
self._isrunning = True self._isrunning = True
self._recv_thread = threading.Thread(target=self._recv_worker, name='recv_worker') self._recv_thread = threading.Thread(target=self._recv_worker, name='recv_worker')
# self._recv_thread.daemon = True
self._recv_thread.start() self._recv_thread.start()
def _recv_worker(self): def _recv_worker(self):
...@@ -255,14 +272,27 @@ class Receiver: ...@@ -255,14 +272,27 @@ class Receiver:
while self._isrunning: while self._isrunning:
try: try:
t, data = self.recvd_data_buf.get(timeout=1) t, data = self.recvd_data_buf.get(timeout=1)
self.proc_data += self._procfunc(data) procdata = self._procfunc(data, ts=t)
self.proc_data.append(procdata)
if self.proc_data_fd is not None:
try:
self.proc_data_fd.write(str(procdata))
except Exception as err:
self.proc_data_fd.write('# {} #\n'.format(err))
continue
finally:
self.proc_data_fd.flush()
except queue.Empty: except queue.Empty:
continue continue
except Exception as err: except Exception as err:
print(err) print('Processing error:', err)
print('Processing stopped')
break break
print('Processing stopped')
self.proc_data_fd.close()
def _msgdecoder(msg, fmt, sep=''): def _msgdecoder(msg, fmt, sep=''):
if fmt == 'hex': if fmt == 'hex':
...@@ -281,3 +311,20 @@ def hexify(bs, sep=''): ...@@ -281,3 +311,20 @@ def hexify(bs, sep=''):
def toascii(bs, errors='replace'): def toascii(bs, errors='replace'):
return bs.decode('ascii', errors=errors) return bs.decode('ascii', errors=errors)
def proc_func_generic(data, ts=None):
"""
Generic function that takes data and returns it in a list. Example function to process raw data from Receiver input queue recvd_data_buf for proc_data storage.
:param data: raw data
:param ts: timestamp from input queue associated with the raw data
:return:
"""
if ts is None:
ts = ''
else:
ts = '{:.6f}'.format(ts)
return [(ts, str(data))]
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment