diff --git a/umnp-daq.py b/umnp-daq.py index cd6ef576f4d1d7438426669cefb6b4a93b5d9ac2..a8d20b5858329e35640897f9ce6a83d3bfba0f9d 100644 --- a/umnp-daq.py +++ b/umnp-daq.py @@ -1,6 +1,7 @@ import datetime import socket +from umnp.daq.file_daq import FileDAQ from umnp.microcontroller.devices.network.udp import DEFAULT_UMNP_DATA_IN_PORT from umnp.protocol import DataMessage from umnp.protocol.message import Message @@ -9,11 +10,19 @@ sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sock.bind(("0.0.0.0", DEFAULT_UMNP_DATA_IN_PORT)) -print("receive-time,sender-id,T,rH,p,T(p)") -while True: - data, addr = sock.recvfrom(2048) - msg = Message.from_bytes(data) - now = datetime.datetime.now(tz=datetime.timezone.utc) - if isinstance(msg, DataMessage): - print(f"{now},{msg.sender_id},{msg.payload()}") +def main(): + files = FileDAQ() + print("receive-time,sender-id,T,rH,p,T(p)") + while True: + data, addr = sock.recvfrom(2048) + + msg = Message.from_bytes(data) + now = datetime.datetime.now(tz=datetime.timezone.utc) + if isinstance(msg, DataMessage): + print(f"{now},{msg.sender_id},{msg.payload()}") + files.save_message(msg) + + +if __name__ == "__main__": + main() diff --git a/umnp/daq/__init__.py b/umnp/daq/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/umnp/daq/file_daq.py b/umnp/daq/file_daq.py new file mode 100644 index 0000000000000000000000000000000000000000..ffaaf0d75238f902e844cd89f02fede99b509be8 --- /dev/null +++ b/umnp/daq/file_daq.py @@ -0,0 +1,65 @@ +import datetime +import os + +from umnp.devices import get_device_type_name +from umnp.protocol.headers import MESSAGE_HEADERS +from umnp.protocol.message import Message + + +class FileDAQ: + def __init__(self, output_dir: str = "daq-data"): + if not os.path.exists(output_dir): + os.mkdir(output_dir) + self.__output_dir = output_dir + self.__open_files = {} + self.__open_fns = {} + + def filename(self, msg: Message, today: str): + device_type_name = get_device_type_name(msg.sender_type) + message_type_name = msg.get_message_type_name() + ident = f"{today}-{device_type_name}-{msg.sender_id}-{message_type_name}" + fn = os.path.join(self.__output_dir, f"{ident}.csv") + return fn, ident, today + + def start_file( + self, msg: Message, when: datetime.datetime, file_id: str, filename: str + ): + self.__open_files[file_id] = open(filename, "w") + f = self.__open_files[file_id] + self.__open_fns[file_id] = filename + header = MESSAGE_HEADERS.get(msg.sender_type, {}).get(msg.type) + if header: + f.write("receive-time,sender-id," + ",".join(header) + "\n") + return f + + def save_message(self, msg: Message): + msg_type = msg.type + when = datetime.datetime.now(tz=datetime.timezone.utc) + today = when.strftime("%Y-%m-%d") + filename, file_id, today = self.filename(msg, today) + f = None + if file_id in self.__open_fns: + + if self.__open_fns[file_id] == filename: + f = self.__open_files[file_id] + else: + # date has changed in the meantime + f = self.__open_files[file_id] + f.close() + f = self.start_file(msg, when, file_id, filename) + + else: + if ( + os.path.exists(filename) + and self.__open_files.get(file_id, None) is None + ): + self.__open_files[file_id] = open(filename, "a") + self.__open_fns[file_id] = filename + f = self.__open_files[file_id] + else: + f = self.start_file(msg, when, file_id, filename) + f.flush() + + if f: + f.write(f"{when},{msg.sender_id},{msg.payload()}\n") + f.flush() diff --git a/umnp/devices/__init__.py b/umnp/devices/__init__.py index 1be4e0a26df5111a5d10d35d36b1e887fffcac57..63fa0dae66e3e0805f51e577fd74f94e4780b598 100644 --- a/umnp/devices/__init__.py +++ b/umnp/devices/__init__.py @@ -1,2 +1,9 @@ DEVICE_TYPE_UNKNOWN = 0 DEVICE_TYPE_RHTP = 1 + + +DEVICE_TYPE_NAMES = {DEVICE_TYPE_RHTP: "RHTP", DEVICE_TYPE_UNKNOWN: "unknown"} + + +def get_device_type_name(device_type: int) -> str: + return DEVICE_TYPE_NAMES.get(device_type, "invalid-device") diff --git a/umnp/protocol/__init__.py b/umnp/protocol/__init__.py index 6ef410d6c535a3cd42e03fc3be65d2031a3fa9ff..a7f0d18c8257f43157f4dbb7517ee04769b7c9fe 100644 --- a/umnp/protocol/__init__.py +++ b/umnp/protocol/__init__.py @@ -1,5 +1,9 @@ from umnp.protocol.data_message import DataMessage +from umnp.protocol.error_message import ErrorMessage +from umnp.protocol.info_message import InfoMessage from umnp.protocol.messagetype import MessageType from umnp.protocol.register_messages import register_messages -register_messages(MessageType.MSG_DEVICE_DATA, DataMessage) +register_messages(MessageType.MSG_DEVICE_DATA, DataMessage, "data") +register_messages(MessageType.MSG_TYPE_INFO, InfoMessage, "info") +register_messages(MessageType.MSG_TYPE_ERROR, ErrorMessage, "error") diff --git a/umnp/protocol/error_message.py b/umnp/protocol/error_message.py new file mode 100644 index 0000000000000000000000000000000000000000..314332f45e76a8d7e67af3d50a5984c321cdd8d6 --- /dev/null +++ b/umnp/protocol/error_message.py @@ -0,0 +1,33 @@ +from umnp.protocol.common.timestamp import TimeStamp +from umnp.protocol.constants import MSG_STRING_ENCODING +from umnp.protocol.message import Message, MessageHeader +from umnp.protocol.messagetype import MessageType + + +class ErrorMessage(Message): + def __init__( + self, data: str, sender_id: bytes, sender_type: int, send_time: TimeStamp = None + ): + self._payload = data + self._encoded_data = data.encode(MSG_STRING_ENCODING) + super().__init__( + MessageType.MSG_DEVICE_ERROR, + self._encoded_data, + sender_id, + sender_type, + send_time, + ) + + @staticmethod + def _decode_payload(transferred_data): + return transferred_data.decode(MSG_STRING_ENCODING) + + def payload(self) -> str: + return self._payload + + @classmethod + def decode(cls, payload: bytes, header: MessageHeader) -> "ErrorMessage": + decoded_payload = cls._decode_payload(payload) + return cls( + decoded_payload, header.sender_id, header.sender_type, header.timestamp + ) diff --git a/umnp/protocol/headers.py b/umnp/protocol/headers.py new file mode 100644 index 0000000000000000000000000000000000000000..d868e09b4aa50d589defdecf6e8506f20e2d2f5f --- /dev/null +++ b/umnp/protocol/headers.py @@ -0,0 +1,6 @@ +from umnp.devices import DEVICE_TYPE_RHTP +from umnp.protocol.messagetype import MessageType + +MESSAGE_HEADERS = { + DEVICE_TYPE_RHTP: {MessageType.MSG_DEVICE_DATA: ["T", "rH", "p", "T(p)"]} +} diff --git a/umnp/protocol/info_message.py b/umnp/protocol/info_message.py new file mode 100644 index 0000000000000000000000000000000000000000..b8a45b2ef4f1c4bbd87c05b080f15eca58d9fdde --- /dev/null +++ b/umnp/protocol/info_message.py @@ -0,0 +1,33 @@ +from umnp.protocol.common.timestamp import TimeStamp +from umnp.protocol.constants import MSG_STRING_ENCODING +from umnp.protocol.message import Message, MessageHeader +from umnp.protocol.messagetype import MessageType + + +class InfoMessage(Message): + def __init__( + self, data: str, sender_id: bytes, sender_type: int, send_time: TimeStamp = None + ): + self._payload = data + self._encoded_data = data.encode(MSG_STRING_ENCODING) + super().__init__( + MessageType.MSG_DEVICE_INFO, + self._encoded_data, + sender_id, + sender_type, + send_time, + ) + + @staticmethod + def _decode_payload(transferred_data): + return transferred_data.decode(MSG_STRING_ENCODING) + + def payload(self) -> str: + return self._payload + + @classmethod + def decode(cls, payload: bytes, header: MessageHeader) -> "InfoMessage": + decoded_payload = cls._decode_payload(payload) + return cls( + decoded_payload, header.sender_id, header.sender_type, header.timestamp + ) diff --git a/umnp/protocol/message.py b/umnp/protocol/message.py index bd879870c6602d4281a114b0f5bf22032ddb0430..f18431f9982b2b8f4135f42b6e2d67df98a1f225 100644 --- a/umnp/protocol/message.py +++ b/umnp/protocol/message.py @@ -17,6 +17,7 @@ from umnp.protocol.message_header import MessageHeader class Message: _registered_types = {} + _registered_names = {} @property def message_types(self): @@ -47,16 +48,20 @@ class Message: ) @classmethod - def add_message_type(cls, msg_type: int, msg: typing.Type["Message"]): + def add_message_type(cls, msg_type: int, msg: typing.Type["Message"], name: str): if msg_type in cls._registered_types: return - print(f"Registering message type {msg_type}") + print(f"Registering message type {msg_type} '{name}'") cls._registered_types[msg_type] = msg + cls._registered_names[msg_type] = name @classmethod def get_message_type(cls, msg_type: int): return cls._registered_types[msg_type] + def get_message_type_name(self) -> str: + return self._registered_names[self.type] + @property def type(self) -> int: return self._header.message_type @@ -65,6 +70,10 @@ class Message: def version(self) -> int: return self._header.version + @property + def sender_type(self) -> int: + return self._sender_type + @property def sender_id(self) -> str: return ":".join(f"{byte:02x}" for byte in self._sender_id) diff --git a/umnp/protocol/messagetype.py b/umnp/protocol/messagetype.py index bd4b1caf9e8b4b9ac6f673de347cc456a8dea929..b180fd99ead62f6528c8ff060b28eeb787c39ed3 100644 --- a/umnp/protocol/messagetype.py +++ b/umnp/protocol/messagetype.py @@ -3,6 +3,8 @@ class MessageType: MSG_DEVICE_DATA = 1 + MSG_TYPE_INFO = 16 + MSG_TYPE_ERROR = 32 MSG_TYPE_UNKNOWN = 65535 _allowed_message_types = [MSG_DEVICE_DATA, MSG_TYPE_UNKNOWN] diff --git a/umnp/protocol/register_messages.py b/umnp/protocol/register_messages.py index 2b19405e2a42349a4bcfdee4c5225535946575e8..3e8c8dce7b43ac741d37206012ed651f125ea1a6 100644 --- a/umnp/protocol/register_messages.py +++ b/umnp/protocol/register_messages.py @@ -5,6 +5,8 @@ except ImportError: pass from umnp.protocol.message import Message +MESSAGE_NAMES = {} -def register_messages(msg_type: int, msg: typing.Type[Message]): - msg.add_message_type(msg_type, msg) + +def register_messages(msg_type: int, msg: typing.Type[Message], name: str): + msg.add_message_type(msg_type, msg, name)