Skip to content
Snippets Groups Projects
Commit c118c625 authored by Andreas Gattringer's avatar Andreas Gattringer
Browse files

daq: improved message handling

parent 1f50273e
No related branches found
No related tags found
No related merge requests found
import datetime
import socket
from umnp.daq.file_daq import FileDAQ
from umnp.microcontroller.devices.network.udp import DEFAULT_UMNP_DATA_IN_PORT
from umnp.protocol import DataMessage
from umnp.protocol.message import Message
......@@ -9,11 +10,19 @@ sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind(("0.0.0.0", DEFAULT_UMNP_DATA_IN_PORT))
print("receive-time,sender-id,T,rH,p,T(p)")
while True:
data, addr = sock.recvfrom(2048)
msg = Message.from_bytes(data)
now = datetime.datetime.now(tz=datetime.timezone.utc)
if isinstance(msg, DataMessage):
print(f"{now},{msg.sender_id},{msg.payload()}")
def main():
files = FileDAQ()
print("receive-time,sender-id,T,rH,p,T(p)")
while True:
data, addr = sock.recvfrom(2048)
msg = Message.from_bytes(data)
now = datetime.datetime.now(tz=datetime.timezone.utc)
if isinstance(msg, DataMessage):
print(f"{now},{msg.sender_id},{msg.payload()}")
files.save_message(msg)
if __name__ == "__main__":
main()
import datetime
import os
from umnp.devices import get_device_type_name
from umnp.protocol.headers import MESSAGE_HEADERS
from umnp.protocol.message import Message
class FileDAQ:
def __init__(self, output_dir: str = "daq-data"):
if not os.path.exists(output_dir):
os.mkdir(output_dir)
self.__output_dir = output_dir
self.__open_files = {}
self.__open_fns = {}
def filename(self, msg: Message, today: str):
device_type_name = get_device_type_name(msg.sender_type)
message_type_name = msg.get_message_type_name()
ident = f"{today}-{device_type_name}-{msg.sender_id}-{message_type_name}"
fn = os.path.join(self.__output_dir, f"{ident}.csv")
return fn, ident, today
def start_file(
self, msg: Message, when: datetime.datetime, file_id: str, filename: str
):
self.__open_files[file_id] = open(filename, "w")
f = self.__open_files[file_id]
self.__open_fns[file_id] = filename
header = MESSAGE_HEADERS.get(msg.sender_type, {}).get(msg.type)
if header:
f.write("receive-time,sender-id," + ",".join(header) + "\n")
return f
def save_message(self, msg: Message):
msg_type = msg.type
when = datetime.datetime.now(tz=datetime.timezone.utc)
today = when.strftime("%Y-%m-%d")
filename, file_id, today = self.filename(msg, today)
f = None
if file_id in self.__open_fns:
if self.__open_fns[file_id] == filename:
f = self.__open_files[file_id]
else:
# date has changed in the meantime
f = self.__open_files[file_id]
f.close()
f = self.start_file(msg, when, file_id, filename)
else:
if (
os.path.exists(filename)
and self.__open_files.get(file_id, None) is None
):
self.__open_files[file_id] = open(filename, "a")
self.__open_fns[file_id] = filename
f = self.__open_files[file_id]
else:
f = self.start_file(msg, when, file_id, filename)
f.flush()
if f:
f.write(f"{when},{msg.sender_id},{msg.payload()}\n")
f.flush()
DEVICE_TYPE_UNKNOWN = 0
DEVICE_TYPE_RHTP = 1
DEVICE_TYPE_NAMES = {DEVICE_TYPE_RHTP: "RHTP", DEVICE_TYPE_UNKNOWN: "unknown"}
def get_device_type_name(device_type: int) -> str:
return DEVICE_TYPE_NAMES.get(device_type, "invalid-device")
from umnp.protocol.data_message import DataMessage
from umnp.protocol.error_message import ErrorMessage
from umnp.protocol.info_message import InfoMessage
from umnp.protocol.messagetype import MessageType
from umnp.protocol.register_messages import register_messages
register_messages(MessageType.MSG_DEVICE_DATA, DataMessage)
register_messages(MessageType.MSG_DEVICE_DATA, DataMessage, "data")
register_messages(MessageType.MSG_TYPE_INFO, InfoMessage, "info")
register_messages(MessageType.MSG_TYPE_ERROR, ErrorMessage, "error")
from umnp.protocol.common.timestamp import TimeStamp
from umnp.protocol.constants import MSG_STRING_ENCODING
from umnp.protocol.message import Message, MessageHeader
from umnp.protocol.messagetype import MessageType
class ErrorMessage(Message):
def __init__(
self, data: str, sender_id: bytes, sender_type: int, send_time: TimeStamp = None
):
self._payload = data
self._encoded_data = data.encode(MSG_STRING_ENCODING)
super().__init__(
MessageType.MSG_DEVICE_ERROR,
self._encoded_data,
sender_id,
sender_type,
send_time,
)
@staticmethod
def _decode_payload(transferred_data):
return transferred_data.decode(MSG_STRING_ENCODING)
def payload(self) -> str:
return self._payload
@classmethod
def decode(cls, payload: bytes, header: MessageHeader) -> "ErrorMessage":
decoded_payload = cls._decode_payload(payload)
return cls(
decoded_payload, header.sender_id, header.sender_type, header.timestamp
)
from umnp.devices import DEVICE_TYPE_RHTP
from umnp.protocol.messagetype import MessageType
MESSAGE_HEADERS = {
DEVICE_TYPE_RHTP: {MessageType.MSG_DEVICE_DATA: ["T", "rH", "p", "T(p)"]}
}
from umnp.protocol.common.timestamp import TimeStamp
from umnp.protocol.constants import MSG_STRING_ENCODING
from umnp.protocol.message import Message, MessageHeader
from umnp.protocol.messagetype import MessageType
class InfoMessage(Message):
def __init__(
self, data: str, sender_id: bytes, sender_type: int, send_time: TimeStamp = None
):
self._payload = data
self._encoded_data = data.encode(MSG_STRING_ENCODING)
super().__init__(
MessageType.MSG_DEVICE_INFO,
self._encoded_data,
sender_id,
sender_type,
send_time,
)
@staticmethod
def _decode_payload(transferred_data):
return transferred_data.decode(MSG_STRING_ENCODING)
def payload(self) -> str:
return self._payload
@classmethod
def decode(cls, payload: bytes, header: MessageHeader) -> "InfoMessage":
decoded_payload = cls._decode_payload(payload)
return cls(
decoded_payload, header.sender_id, header.sender_type, header.timestamp
)
......@@ -17,6 +17,7 @@ from umnp.protocol.message_header import MessageHeader
class Message:
_registered_types = {}
_registered_names = {}
@property
def message_types(self):
......@@ -47,16 +48,20 @@ class Message:
)
@classmethod
def add_message_type(cls, msg_type: int, msg: typing.Type["Message"]):
def add_message_type(cls, msg_type: int, msg: typing.Type["Message"], name: str):
if msg_type in cls._registered_types:
return
print(f"Registering message type {msg_type}")
print(f"Registering message type {msg_type} '{name}'")
cls._registered_types[msg_type] = msg
cls._registered_names[msg_type] = name
@classmethod
def get_message_type(cls, msg_type: int):
return cls._registered_types[msg_type]
def get_message_type_name(self) -> str:
return self._registered_names[self.type]
@property
def type(self) -> int:
return self._header.message_type
......@@ -65,6 +70,10 @@ class Message:
def version(self) -> int:
return self._header.version
@property
def sender_type(self) -> int:
return self._sender_type
@property
def sender_id(self) -> str:
return ":".join(f"{byte:02x}" for byte in self._sender_id)
......
......@@ -3,6 +3,8 @@
class MessageType:
MSG_DEVICE_DATA = 1
MSG_TYPE_INFO = 16
MSG_TYPE_ERROR = 32
MSG_TYPE_UNKNOWN = 65535
_allowed_message_types = [MSG_DEVICE_DATA, MSG_TYPE_UNKNOWN]
......
......@@ -5,6 +5,8 @@ except ImportError:
pass
from umnp.protocol.message import Message
MESSAGE_NAMES = {}
def register_messages(msg_type: int, msg: typing.Type[Message]):
msg.add_message_type(msg_type, msg)
def register_messages(msg_type: int, msg: typing.Type[Message], name: str):
msg.add_message_type(msg_type, msg, name)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment