From c118c62575bc003bfcf10baa5c0b9ca917502814 Mon Sep 17 00:00:00 2001
From: Andreas Gattringer <andreas.gattringer@univie.ac.at>
Date: Sun, 21 Apr 2024 18:49:58 +0200
Subject: [PATCH] daq: improved message handling

---
 umnp-daq.py                        | 23 +++++++----
 umnp/daq/__init__.py               |  0
 umnp/daq/file_daq.py               | 65 ++++++++++++++++++++++++++++++
 umnp/devices/__init__.py           |  7 ++++
 umnp/protocol/__init__.py          |  6 ++-
 umnp/protocol/error_message.py     | 33 +++++++++++++++
 umnp/protocol/headers.py           |  6 +++
 umnp/protocol/info_message.py      | 33 +++++++++++++++
 umnp/protocol/message.py           | 13 +++++-
 umnp/protocol/messagetype.py       |  2 +
 umnp/protocol/register_messages.py |  6 ++-
 11 files changed, 182 insertions(+), 12 deletions(-)
 create mode 100644 umnp/daq/__init__.py
 create mode 100644 umnp/daq/file_daq.py
 create mode 100644 umnp/protocol/error_message.py
 create mode 100644 umnp/protocol/headers.py
 create mode 100644 umnp/protocol/info_message.py

diff --git a/umnp-daq.py b/umnp-daq.py
index cd6ef57..a8d20b5 100644
--- a/umnp-daq.py
+++ b/umnp-daq.py
@@ -1,6 +1,7 @@
 import datetime
 import socket
 
+from umnp.daq.file_daq import FileDAQ
 from umnp.microcontroller.devices.network.udp import DEFAULT_UMNP_DATA_IN_PORT
 from umnp.protocol import DataMessage
 from umnp.protocol.message import Message
@@ -9,11 +10,19 @@ sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
 sock.bind(("0.0.0.0", DEFAULT_UMNP_DATA_IN_PORT))
 
-print("receive-time,sender-id,T,rH,p,T(p)")
-while True:
-    data, addr = sock.recvfrom(2048)
 
-    msg = Message.from_bytes(data)
-    now = datetime.datetime.now(tz=datetime.timezone.utc)
-    if isinstance(msg, DataMessage):
-        print(f"{now},{msg.sender_id},{msg.payload()}")
+def main():
+    files = FileDAQ()
+    print("receive-time,sender-id,T,rH,p,T(p)")
+    while True:
+        data, addr = sock.recvfrom(2048)
+
+        msg = Message.from_bytes(data)
+        now = datetime.datetime.now(tz=datetime.timezone.utc)
+        if isinstance(msg, DataMessage):
+            print(f"{now},{msg.sender_id},{msg.payload()}")
+        files.save_message(msg)
+
+
+if __name__ == "__main__":
+    main()
diff --git a/umnp/daq/__init__.py b/umnp/daq/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/umnp/daq/file_daq.py b/umnp/daq/file_daq.py
new file mode 100644
index 0000000..ffaaf0d
--- /dev/null
+++ b/umnp/daq/file_daq.py
@@ -0,0 +1,65 @@
+import datetime
+import os
+
+from umnp.devices import get_device_type_name
+from umnp.protocol.headers import MESSAGE_HEADERS
+from umnp.protocol.message import Message
+
+
+class FileDAQ:
+    def __init__(self, output_dir: str = "daq-data"):
+        if not os.path.exists(output_dir):
+            os.mkdir(output_dir)
+        self.__output_dir = output_dir
+        self.__open_files = {}
+        self.__open_fns = {}
+
+    def filename(self, msg: Message, today: str):
+        device_type_name = get_device_type_name(msg.sender_type)
+        message_type_name = msg.get_message_type_name()
+        ident = f"{today}-{device_type_name}-{msg.sender_id}-{message_type_name}"
+        fn = os.path.join(self.__output_dir, f"{ident}.csv")
+        return fn, ident, today
+
+    def start_file(
+        self, msg: Message, when: datetime.datetime, file_id: str, filename: str
+    ):
+        self.__open_files[file_id] = open(filename, "w")
+        f = self.__open_files[file_id]
+        self.__open_fns[file_id] = filename
+        header = MESSAGE_HEADERS.get(msg.sender_type, {}).get(msg.type)
+        if header:
+            f.write("receive-time,sender-id," + ",".join(header) + "\n")
+        return f
+
+    def save_message(self, msg: Message):
+        msg_type = msg.type
+        when = datetime.datetime.now(tz=datetime.timezone.utc)
+        today = when.strftime("%Y-%m-%d")
+        filename, file_id, today = self.filename(msg, today)
+        f = None
+        if file_id in self.__open_fns:
+
+            if self.__open_fns[file_id] == filename:
+                f = self.__open_files[file_id]
+            else:
+                # date has changed in the meantime
+                f = self.__open_files[file_id]
+                f.close()
+                f = self.start_file(msg, when, file_id, filename)
+
+        else:
+            if (
+                os.path.exists(filename)
+                and self.__open_files.get(file_id, None) is None
+            ):
+                self.__open_files[file_id] = open(filename, "a")
+                self.__open_fns[file_id] = filename
+                f = self.__open_files[file_id]
+            else:
+                f = self.start_file(msg, when, file_id, filename)
+                f.flush()
+
+        if f:
+            f.write(f"{when},{msg.sender_id},{msg.payload()}\n")
+            f.flush()
diff --git a/umnp/devices/__init__.py b/umnp/devices/__init__.py
index 1be4e0a..63fa0da 100644
--- a/umnp/devices/__init__.py
+++ b/umnp/devices/__init__.py
@@ -1,2 +1,9 @@
 DEVICE_TYPE_UNKNOWN = 0
 DEVICE_TYPE_RHTP = 1
+
+
+DEVICE_TYPE_NAMES = {DEVICE_TYPE_RHTP: "RHTP", DEVICE_TYPE_UNKNOWN: "unknown"}
+
+
+def get_device_type_name(device_type: int) -> str:
+    return DEVICE_TYPE_NAMES.get(device_type, "invalid-device")
diff --git a/umnp/protocol/__init__.py b/umnp/protocol/__init__.py
index 6ef410d..a7f0d18 100644
--- a/umnp/protocol/__init__.py
+++ b/umnp/protocol/__init__.py
@@ -1,5 +1,9 @@
 from umnp.protocol.data_message import DataMessage
+from umnp.protocol.error_message import ErrorMessage
+from umnp.protocol.info_message import InfoMessage
 from umnp.protocol.messagetype import MessageType
 from umnp.protocol.register_messages import register_messages
 
-register_messages(MessageType.MSG_DEVICE_DATA, DataMessage)
+register_messages(MessageType.MSG_DEVICE_DATA, DataMessage, "data")
+register_messages(MessageType.MSG_TYPE_INFO, InfoMessage, "info")
+register_messages(MessageType.MSG_TYPE_ERROR, ErrorMessage, "error")
diff --git a/umnp/protocol/error_message.py b/umnp/protocol/error_message.py
new file mode 100644
index 0000000..314332f
--- /dev/null
+++ b/umnp/protocol/error_message.py
@@ -0,0 +1,33 @@
+from umnp.protocol.common.timestamp import TimeStamp
+from umnp.protocol.constants import MSG_STRING_ENCODING
+from umnp.protocol.message import Message, MessageHeader
+from umnp.protocol.messagetype import MessageType
+
+
+class ErrorMessage(Message):
+    def __init__(
+        self, data: str, sender_id: bytes, sender_type: int, send_time: TimeStamp = None
+    ):
+        self._payload = data
+        self._encoded_data = data.encode(MSG_STRING_ENCODING)
+        super().__init__(
+            MessageType.MSG_DEVICE_ERROR,
+            self._encoded_data,
+            sender_id,
+            sender_type,
+            send_time,
+        )
+
+    @staticmethod
+    def _decode_payload(transferred_data):
+        return transferred_data.decode(MSG_STRING_ENCODING)
+
+    def payload(self) -> str:
+        return self._payload
+
+    @classmethod
+    def decode(cls, payload: bytes, header: MessageHeader) -> "ErrorMessage":
+        decoded_payload = cls._decode_payload(payload)
+        return cls(
+            decoded_payload, header.sender_id, header.sender_type, header.timestamp
+        )
diff --git a/umnp/protocol/headers.py b/umnp/protocol/headers.py
new file mode 100644
index 0000000..d868e09
--- /dev/null
+++ b/umnp/protocol/headers.py
@@ -0,0 +1,6 @@
+from umnp.devices import DEVICE_TYPE_RHTP
+from umnp.protocol.messagetype import MessageType
+
+MESSAGE_HEADERS = {
+    DEVICE_TYPE_RHTP: {MessageType.MSG_DEVICE_DATA: ["T", "rH", "p", "T(p)"]}
+}
diff --git a/umnp/protocol/info_message.py b/umnp/protocol/info_message.py
new file mode 100644
index 0000000..b8a45b2
--- /dev/null
+++ b/umnp/protocol/info_message.py
@@ -0,0 +1,33 @@
+from umnp.protocol.common.timestamp import TimeStamp
+from umnp.protocol.constants import MSG_STRING_ENCODING
+from umnp.protocol.message import Message, MessageHeader
+from umnp.protocol.messagetype import MessageType
+
+
+class InfoMessage(Message):
+    def __init__(
+        self, data: str, sender_id: bytes, sender_type: int, send_time: TimeStamp = None
+    ):
+        self._payload = data
+        self._encoded_data = data.encode(MSG_STRING_ENCODING)
+        super().__init__(
+            MessageType.MSG_DEVICE_INFO,
+            self._encoded_data,
+            sender_id,
+            sender_type,
+            send_time,
+        )
+
+    @staticmethod
+    def _decode_payload(transferred_data):
+        return transferred_data.decode(MSG_STRING_ENCODING)
+
+    def payload(self) -> str:
+        return self._payload
+
+    @classmethod
+    def decode(cls, payload: bytes, header: MessageHeader) -> "InfoMessage":
+        decoded_payload = cls._decode_payload(payload)
+        return cls(
+            decoded_payload, header.sender_id, header.sender_type, header.timestamp
+        )
diff --git a/umnp/protocol/message.py b/umnp/protocol/message.py
index bd87987..f18431f 100644
--- a/umnp/protocol/message.py
+++ b/umnp/protocol/message.py
@@ -17,6 +17,7 @@ from umnp.protocol.message_header import MessageHeader
 
 class Message:
     _registered_types = {}
+    _registered_names = {}
 
     @property
     def message_types(self):
@@ -47,16 +48,20 @@ class Message:
         )
 
     @classmethod
-    def add_message_type(cls, msg_type: int, msg: typing.Type["Message"]):
+    def add_message_type(cls, msg_type: int, msg: typing.Type["Message"], name: str):
         if msg_type in cls._registered_types:
             return
-        print(f"Registering message type {msg_type}")
+        print(f"Registering message type {msg_type} '{name}'")
         cls._registered_types[msg_type] = msg
+        cls._registered_names[msg_type] = name
 
     @classmethod
     def get_message_type(cls, msg_type: int):
         return cls._registered_types[msg_type]
 
+    def get_message_type_name(self) -> str:
+        return self._registered_names[self.type]
+
     @property
     def type(self) -> int:
         return self._header.message_type
@@ -65,6 +70,10 @@ class Message:
     def version(self) -> int:
         return self._header.version
 
+    @property
+    def sender_type(self) -> int:
+        return self._sender_type
+
     @property
     def sender_id(self) -> str:
         return ":".join(f"{byte:02x}" for byte in self._sender_id)
diff --git a/umnp/protocol/messagetype.py b/umnp/protocol/messagetype.py
index bd4b1ca..b180fd9 100644
--- a/umnp/protocol/messagetype.py
+++ b/umnp/protocol/messagetype.py
@@ -3,6 +3,8 @@
 
 class MessageType:
     MSG_DEVICE_DATA = 1
+    MSG_TYPE_INFO = 16
+    MSG_TYPE_ERROR = 32
     MSG_TYPE_UNKNOWN = 65535
 
     _allowed_message_types = [MSG_DEVICE_DATA, MSG_TYPE_UNKNOWN]
diff --git a/umnp/protocol/register_messages.py b/umnp/protocol/register_messages.py
index 2b19405..3e8c8dc 100644
--- a/umnp/protocol/register_messages.py
+++ b/umnp/protocol/register_messages.py
@@ -5,6 +5,8 @@ except ImportError:
     pass
 from umnp.protocol.message import Message
 
+MESSAGE_NAMES = {}
 
-def register_messages(msg_type: int, msg: typing.Type[Message]):
-    msg.add_message_type(msg_type, msg)
+
+def register_messages(msg_type: int, msg: typing.Type[Message], name: str):
+    msg.add_message_type(msg_type, msg, name)
-- 
GitLab