From f156f85205ca0fdeb3fd9e94a74cb792520aa91a Mon Sep 17 00:00:00 2001
From: Marko Mecina <marko.mecina@univie.ac.at>
Date: Fri, 21 Jul 2023 13:14:42 +0200
Subject: [PATCH] add generic proc_func

+ minor usability improvements
---
 Ccs/communication.py | 71 ++++++++++++++++++++++++++++++++++++--------
 1 file changed, 59 insertions(+), 12 deletions(-)

diff --git a/Ccs/communication.py b/Ccs/communication.py
index 3bb4b21..766f290 100644
--- a/Ccs/communication.py
+++ b/Ccs/communication.py
@@ -70,10 +70,11 @@ class Connector:
         self.conn.settimeout(self.response_to)
 
     def _close(self, servershtdwn):
-        self.conn.close()
-        print('Closed connection to {}:{}'.format(self.host, self.port))
+        if self.conn.fileno() != -1:
+            self.conn.close()
+            print('Closed connection to {}:{}'.format(self.host, self.port))
 
-        if servershtdwn:
+        if servershtdwn and self.sockfd.fileno() != -1:
             print('Closing server {}'.format(self.sockfd.getsockname()))
             self.sockfd.close()
 
@@ -115,7 +116,7 @@ class Connector:
 
             resp = b''
             if rx:
-                resp = self._recv_response()
+                resp += self._recv_response()
 
             self.log.append((t, msg, resp))
 
@@ -159,10 +160,12 @@ class Connector:
         self.conn.settimeout(seconds)
         self.response_to = seconds
 
-    def start_receiver(self, procfunc=None):
+    def start_receiver(self, procfunc=None, outfile=None, ofmode='w'):
         """
 
-        :param procfunc: function that must return a list
+        :param procfunc:
+        :param outfile:
+        :param ofmode:
         :return:
         """
         if self.conn is None:
@@ -170,11 +173,19 @@ class Connector:
             return
 
         if self.receiver is None:
-            self.receiver = Receiver([self.conn], procfunc=procfunc)
+            self.receiver = Receiver([self.conn], procfunc=procfunc, outfile=outfile, ofmode=ofmode)
             self.receiver.start()
         else:
             print('Receiver already initialised')
 
+    def stop_receiver(self):
+        if self.receiver is None:
+            print('No receiver to stop')
+            return
+
+        self.receiver.stop()
+        self.receiver = None
+
 
 class Receiver:
     """
@@ -185,7 +196,7 @@ class Receiver:
     SEL_TIMEOUT = 2
     RECV_BUF_SIZE = 1024**3
 
-    def __init__(self, sockfds, procfunc=None, recv_buf_size=RECV_BUF_SIZE):
+    def __init__(self, sockfds, procfunc=None, recv_buf_size=RECV_BUF_SIZE, outfile=None, ofmode='w'):
 
         self.sockfds = sockfds
         self.recvd_data_buf = queue.Queue(recv_buf_size)
@@ -194,13 +205,18 @@ class Receiver:
         self._proc_thread = None
         self.proc_data = []
 
+        if outfile is not None:
+            self.proc_data_fd = open(outfile, ofmode)
+        else:
+            self.proc_data_fd = None
+
         self._isrunning = False
 
     def start(self):
         if (self._recv_thread is None) or (not self._recv_thread.is_alive()):
             self._start_recv()
         else:
-            print('Already running!')
+            print('Recv already running!')
 
         if self._procfunc is not None:
             if (self._proc_thread is None) or (not self._proc_thread.is_alive()):
@@ -212,6 +228,7 @@ class Receiver:
     def _start_recv(self):
         self._isrunning = True
         self._recv_thread = threading.Thread(target=self._recv_worker, name='recv_worker')
+        # self._recv_thread.daemon = True
         self._recv_thread.start()
 
     def _recv_worker(self):
@@ -255,14 +272,27 @@ class Receiver:
         while self._isrunning:
             try:
                 t, data = self.recvd_data_buf.get(timeout=1)
-                self.proc_data += self._procfunc(data)
+                procdata = self._procfunc(data, ts=t)
+                self.proc_data.append(procdata)
+
+                if self.proc_data_fd is not None:
+                    try:
+                        self.proc_data_fd.write(str(procdata))
+                    except Exception as err:
+                        self.proc_data_fd.write('# {} #\n'.format(err))
+                        continue
+                    finally:
+                        self.proc_data_fd.flush()
+
             except queue.Empty:
                 continue
             except Exception as err:
-                print(err)
-                print('Processing stopped')
+                print('Processing error:', err)
                 break
 
+        print('Processing stopped')
+        self.proc_data_fd.close()
+
 
 def _msgdecoder(msg, fmt, sep=''):
     if fmt == 'hex':
@@ -281,3 +311,20 @@ def hexify(bs, sep=''):
 
 def toascii(bs, errors='replace'):
     return bs.decode('ascii', errors=errors)
+
+
+def proc_func_generic(data, ts=None):
+    """
+    Generic function that takes data and returns it in a list. Example function to process raw data from Receiver input queue recvd_data_buf for proc_data storage.
+
+    :param data: raw data
+    :param ts: timestamp from input queue associated with the raw data
+    :return:
+    """
+
+    if ts is None:
+        ts = ''
+    else:
+        ts = '{:.6f}'.format(ts)
+
+    return [(ts, str(data))]
-- 
GitLab