Skip to content
Snippets Groups Projects
Commit 2bdcaa8b authored by Andreas Gattringer's avatar Andreas Gattringer
Browse files

MeasurementDevice: methods to add communicator, sender and receiver

parent 1b7925b6
Branches
No related tags found
No related merge requests found
......@@ -2,6 +2,13 @@ import binascii
import sys
import time
from umnp.microcontroller.communication.udp_communicator import UDPCommunicator
from umnp.microcontroller.devices.network.udp import (
UDPSender,
UDPReceiver,
DEFAULT_UMNP_DATA_IN_PORT,
DEFAULT_UMNP_COMMAND_IN_PORT,
)
from umnp.protocol.constants import MSG_STRING_ENCODING
if sys.implementation.name == "micropython":
......@@ -15,8 +22,13 @@ class MeasurementDevice:
def __init__(self):
self._boot_time = time.time()
self._identifier_raw = machine.unique_id()
self._identifier = binascii.hexlify(self.identifier_raw).decode(MSG_STRING_ENCODING)
self._identifier = binascii.hexlify(self.identifier_raw).decode(
MSG_STRING_ENCODING
)
self._network = None
self._sender = None
self._receiver = None
self._communicator = None
@property
def boot_time(self):
......@@ -27,7 +39,7 @@ class MeasurementDevice:
def generated_mac_string(self) -> str:
machine_id = self.identifier_raw[:6]
return ':'.join(f'{digit:02x}' for digit in machine_id)
return ":".join(f"{digit:02x}" for digit in machine_id)
def generated_mac_raw(self) -> bytes:
return self.identifier_raw[:6]
......@@ -39,3 +51,36 @@ class MeasurementDevice:
@property
def identifier(self) -> str:
return self._identifier
def create_sender(self, port: int = DEFAULT_UMNP_DATA_IN_PORT):
if not self._network:
raise ValueError("No network adapter added")
if not self._sender:
self._sender = UDPSender(
self._network.ip, self._network.netmask, send_to_port=port
)
return self._sender
def create_receiver(self, port: int = DEFAULT_UMNP_COMMAND_IN_PORT):
if not self._network:
raise ValueError("No network adapter added")
if not self._receiver:
self._receiver = UDPReceiver(self._network.ip, listen_port=port)
return self._receiver
def create_communicator(
self,
data_port: int = DEFAULT_UMNP_DATA_IN_PORT,
cmd_port: int = DEFAULT_UMNP_COMMAND_IN_PORT,
):
if not self._communicator:
s = self.create_sender(port=data_port)
r = self.create_receiver(port=cmd_port)
self._communicator = UDPCommunicator(
receiver=r, sender=s, device_id=self.identifier
)
return self._communicator
@property
def communicator(self) -> UDPCommunicator | None:
return self._communicator
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment