diff --git a/programs/main_test.py b/programs/main_test.py new file mode 100644 index 0000000000000000000000000000000000000000..84af29af4a5f6487ad6fd984636901dab0042cf3 --- /dev/null +++ b/programs/main_test.py @@ -0,0 +1,44 @@ +import sys + +from umnp.microcontroller.communication.communicator import Communicator +from umnp.microcontroller.measurementdevice import MeasurementDevice +from umnp.microcontroller.network.ethernet_w5500 import EthernetW5500 +from umnp.microcontroller.network.udp import UDPSender, UDPReceiver +from umnp.microcontroller.sensors.sht25 import SHT25 +from umnp.microcontroller.tasks.periodictask import PeriodicTask + +if sys.implementation.name == "micropython": + # noinspection PyUnresolvedReferences + import machine + # noinspection PyUnresolvedReferences + import uasyncio as asyncio +else: + from umnp.microcontroller.umock import machine + import asyncio + + +def test_function(*args): + print("test_function called with: ", *args) + result = " - ".join(*args) + print(f"test_function returns '{result}'") + return result + + +def main(): + # configure network + device = MeasurementDevice() + spi = machine.SPI(0, 2_000_000, mosi=machine.Pin(19), miso=machine.Pin(16), sck=machine.Pin(18)) + ether = EthernetW5500(spi, 17, 20, mac=device.generated_mac_raw(), dhcp=True) + device.add_network_adapter(ether) + + i2c = machine.I2C(id=1, scl=machine.Pin(27), sda=machine.Pin(26)) + sht25 = SHT25(i2c) + + sender = UDPSender(ether.ip, ether.netmask, 7777) + receiver = UDPReceiver(ether.ip, 7776) + comm = Communicator(receiver=receiver, sender=sender, device_id=device.identifier) + + x = PeriodicTask(test_function, print, 1000) + comm.add_task(x, "test_function") + # start + asyncio.run(comm.start()) diff --git a/tools/backup.py b/tools/backup.py new file mode 100644 index 0000000000000000000000000000000000000000..b30ff8f04a955b51b90c3be08a65166e33eebe77 --- /dev/null +++ b/tools/backup.py @@ -0,0 +1,11 @@ +from fs import FileSystemInformation + +current_dir = "" + + +def main(): + fs = FileSystemInformation() + fs.print_fs_tree() + + +main() diff --git a/tools/fs/__init__.py b/tools/fs/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..5fb9dc5512a9b3a2a06b12e2bf52e305182a2635 --- /dev/null +++ b/tools/fs/__init__.py @@ -0,0 +1,83 @@ +import os + + +def fs_entries_recurse(current_directory="", total_path=""): + files = [] + dirs = [] + all_entries = [] + + for fstats in os.ilistdir(current_directory): + fn = fstats[0] + ft = fstats[1] + all_entries.append(total_path + "/" + fn) + if ft == 0x4000: + dirs.append(total_path + "/" + fn) + + f, current_directory, a = fs_entries_recurse(fn, total_path + "/" + fn) + files.extend(f) + dirs.extend(current_directory) + all_entries.extend(a) + else: + files.append(total_path + "/" + fn) + return files, dirs, all_entries + + +class FileSystemInformation: + def __init__(self): + self._size_cache = {} + self._files = [] + self._directories = [] + self._all_entries = [] + self._vfs_stats = [] + self._usage = None + self._size_free_kb = None + self._blocks_free = None + self._max_fn_len = None + self._blocks_used = None + self._block_size = None + self._size_total_kb = None + self._longest_path = 0 + self.update_file_system_info() + + self._files, self._directories, self._all_entries = fs_entries_recurse() + for fn in self._all_entries: + self._longest_path = max(self._longest_path, len(fn)) + + def invalidate(self): + self._size_cache = {} + self._vfs_stats = [] + self._files = [] + self._directories = [] + self._all_entries = [] + + def get_size(self, path): + if path in self._size_cache: + return self._size_cache[path] + + mode, inode, device_id, link_count, uid, gid, size, atime, mtime, ctime = os.stat(path) + b = 0 + if mode == 0x8000: + b = size + self._size_cache[path] = b + return b + + def update_file_system_info(self): + if len(self._vfs_stats) == 0: + self._vfs_stats = os.statvfs("/") + + b_size, _, b_count, b_count_free, _, _, _, _, _, max_fn_len = self._vfs_stats + self._usage = 1 - b_count_free / b_count + self._size_free_kb = b_size * b_count_free / 1024 + self._blocks_free = b_count_free + self._blocks_used = b_count - b_count_free + self._block_size = b_size + self._size_total_kb = b_size * b_count / 1024 + self._max_fn_len = max_fn_len + + def print_fs_tree(self): + for fn in self._all_entries: + spacer = " " * (self._longest_path + 2 - len(fn)) + size = self.get_size(fn) + size_spacer = " " * (6 - len(str(size))) + + print(f"{fn}{spacer} {size_spacer}{size} bytes") diff --git a/umnp/__init__.py b/umnp/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..39839d622b5a3dd7f9537169b49787ed93394754 --- /dev/null +++ b/umnp/__init__.py @@ -0,0 +1,10 @@ +import sys + +if sys.implementation.name == "micropython": + # noinspection PyUnresolvedReferences + import machine +else: + from umnp.microcontroller.umock import machine + +UNIQUE_ID_MAC = machine.unique_id()[:6] +UNIQUE_ID_MAC_STR = ':'.join('%02x' % x for x in UNIQUE_ID_MAC) diff --git a/umnp/microcontroller/__init__.py b/umnp/microcontroller/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/umnp/microcontroller/communication/__init__.py b/umnp/microcontroller/communication/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/umnp/microcontroller/communication/communicator.py b/umnp/microcontroller/communication/communicator.py new file mode 100644 index 0000000000000000000000000000000000000000..dfa41f9cad4c12efc3320062cee0196c50df10fc --- /dev/null +++ b/umnp/microcontroller/communication/communicator.py @@ -0,0 +1,100 @@ +import sys +import time + +from umnp.microcontroller.network.udp import UDPSender, UDPReceiver +from umnp.microcontroller.tasks.periodictask import PeriodicTask + +if sys.implementation.name == "micropython": + # noinspection PyUnresolvedReferences + import uasyncio as asyncio + # noinspection PyUnresolvedReferences + import machine +else: + import asyncio + from umnp.microcontroller.umock import machine + + +class Communicator: + def __init__(self, sender: UDPSender, receiver: UDPReceiver, device_id, max_msgs: int = 10): + + self._receive_lock = asyncio.Lock() + self._send_lock = asyncio.Lock() + + self._messages_received = [] + self._messages_send_queue = [] + self._max_msgs = max_msgs + self._sender = sender + self._receiver = receiver + self._tasks = {} + self._device_id = device_id + + async def queue_incoming_message(self, msg, source): + async with self._receive_lock: + self._messages_received.append((msg, source, time.time())) + while len(self._messages_received) > self._max_msgs: + self._messages_received.pop(0) + + async def get_newest_message(self): + async with self._receive_lock: + if len(self._messages_received): + return self._messages_received.pop(0) + else: + return None + + async def receive_task(self): + while True: + await self._receiver.receive(self) + await asyncio.sleep(0.500) + + async def send_task(self): + device_id = self._device_id + rtc = machine.RTC() + + while True: + msg = None + async with self._send_lock: + if len(self._messages_send_queue) > 0: + msg = self._messages_send_queue.pop() + + if msg is not None: + msg = msg.replace(",", ";") + now = rtc.datetime() + now = "%04d-%02d-%02dT%02d:%02d:%02d" % (now[0], now[1], now[2], now[4], now[5], now[6]) + await self._sender.broadcast("%s,%s,%s" % (device_id, now, msg)) + + await asyncio.sleep(0.5) + + async def send_data_message(self, data: str): + async with self._send_lock: + self._messages_send_queue.append(data) + + async def control_task(self): + while True: + async with self._receive_lock: + # print("Control: %d" % len(self.msgs)) + msg = await self.get_newest_message() + if msg is not None: + pass + # print("Controll::msg::", msg) + + await asyncio.sleep(0.5) + + async def start(self): + receiver = asyncio.create_task(self.receive_task()) + sender = asyncio.create_task(self.send_task()) + controller = asyncio.create_task(self.control_task()) + tasks = [] + for name, task in self._tasks.items(): + result = asyncio.create_task(task.run()) + tasks.append(result) + + for t in tasks: + await t + + await receiver + await sender + await controller + + def add_task(self, task: PeriodicTask, name: str): + self._tasks[name] = task + diff --git a/umnp/microcontroller/cpc/__init__.py b/umnp/microcontroller/cpc/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/umnp/microcontroller/display/__init__.py b/umnp/microcontroller/display/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/umnp/microcontroller/display/lcd.py b/umnp/microcontroller/display/lcd.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/umnp/microcontroller/display/picolcd13.py b/umnp/microcontroller/display/picolcd13.py new file mode 100644 index 0000000000000000000000000000000000000000..52208ee1dc831fac3d7ba1d49e1ab9643eb34580 --- /dev/null +++ b/umnp/microcontroller/display/picolcd13.py @@ -0,0 +1,120 @@ +import sys + +if sys.implementation.name == "micropython": + # noinspection PyUnresolvedReferences + import framebuf + # noinspection PyUnresolvedReferences + import machine + # noinspection PyUnresolvedReferences + from micropython import const +else: + from umnp.microcontroller.umock import machine, network, framebuf + from umnp.microcontroller.umock.micropython import const + +_BL = const(13) +_DC = const(8) +_RST = const(12) +_MOSI = const(11) +_SCK = const(10) +_CS = const(9) +_WIDTH = const(240) +_HEIGHT = const(240) + + +def colour(red, green, blue): # Convert RGB888 to RGB565 # copied from? FIXME + return ( + (((green & 0b00011100) << 3) + ((blue & 0b11111000) >> 3) << 8) + + (red & 0b11111000) + ((green & 0b11100000) >> 5) + ) + + +class PicoLCD13(framebuf.FrameBuffer): + def __init__(self): + self.width = _WIDTH + self.height = _HEIGHT + self.cs = machine.Pin(_CS, machine.Pin.OUT) + self.rst = machine.Pin(_RST, machine.Pin.OUT) + + self.cs(1) + self.spi = machine.SPI(1, 100_000_000, polarity=0, phase=0, + sck=machine.Pin(_SCK), + mosi=machine.Pin(_MOSI), + miso=None) + self.dc = machine.Pin(_DC, machine.Pin.OUT) + self.dc(1) + self.buffer = bytearray(self.height * self.width * 2) + super().__init__(self.buffer, self.width, self.height, framebuf.RGB565) + + self.red = 0x07E0 + self.green = 0x001f + self.blue = 0xf800 + self.white = 0xffff + self._bg_color = 0 + + self._initialise_display() + + self.clear() + self.update() + + @property + def bg_color(self): + return self._bg_color + + def _write_cmd(self, cmd): + self.cs(1) + self.dc(0) + self.cs(0) + self.spi.write(bytearray([cmd])) + self.cs(1) + + def _write_data(self, buf): + self.cs(1) + self.dc(1) + self.cs(0) + self.spi.write(bytearray([buf])) + self.cs(1) + + def _config(self, cmd, data): + self._write_cmd(cmd) + for d in data: + self._write_data(d) + + def clear(self, r=0, g=0, b=0): + self.fill(colour(r, g, b)) + + def _initialise_display(self): + self.rst(1) + self.rst(0) + self.rst(1) + + self._config(0x36, (0x70,)) # Address Order setup + self._config(0x3a, (0x05,)) # Interface Pixel Format + self._config(0xB2, (0x0c, 0x0c, 0x00, 0x33, 0x33)) # Porch setting (back, front, 0, b partial, f part) + self._config(0xb7, (0x35,)) # gate control + self._config(0xbb, (0x19,)) # VCOMS setting + self._config(0xc0, (0x2c,)) # LCM Control + self._config(0xc2, (0x01,)) # VDV and VRH Command Enable + self._config(0xc3, (0x12,)) # VRH Set + self._config(0xc4, (0x20,)) # VRV Set + self._config(0xc6, (0x0f,)) # Frame Rate Control in normal mode + self._config(0xd0, (0xa4, 0xa1,)) # Power Control 1 + self._config(0xe0, # Positive voltage gamma control + (0xD0, 0x04, 0x0D, 0x11, 0x13, 0x2B, 0x3F, 0x54, 0x4C, 0x18, 0x0D, 0x0B, 0x1F, 0x23)) + + self._config(0xe1, # Negative voltage gamma control + (0xD0, 0x04, 0x0C, 0x11, 0x13, 0x2C, 0x3F, 0x44, 0x51, 0x2F, 0x1F, 0x1F, 0x20, 0x23)) + + self._write_cmd(0x21) # Display inversion on + self._write_cmd(0x11) # Turn off sleep mode + self._write_cmd(0x29) # Display on + + def update(self): + self._config(0x2a, (0x00, 0x00, 0x00, 0xef)) # column address set + self._config(0x2b, (0x00, 0x00, 0x00, 0xef)) # row address set + self._write_cmd(0x2C) # memory set + + self.cs(1) + self.dc(1) + self.cs(0) + self.spi.write(self.buffer) + self.cs(1) diff --git a/umnp/microcontroller/eeprom/EEPROM_24LC32A.py b/umnp/microcontroller/eeprom/EEPROM_24LC32A.py new file mode 100644 index 0000000000000000000000000000000000000000..2f985040c20cdb421e7787dc28704d41dc2f3fe4 --- /dev/null +++ b/umnp/microcontroller/eeprom/EEPROM_24LC32A.py @@ -0,0 +1,133 @@ +import sys +if sys.implementation.name == "micropython": + # noinspection PyUnresolvedReferences + from machine import I2C +else: + from umnp.microcontroller.umock.machine import I2C + + +I2C_ADDRESS_EEPROM24LC32A = 0x50 + + +class EEPROM24LC32A: + def __init__(self, i2c: I2C, i2c_address=I2C_ADDRESS_EEPROM24LC32A): + self.__block_size = 32 + self.__max_size = 4096 + self.__block_count = self.__max_size // self.__block_size + + self.__i2c_address = i2c_address + self.__i2c = i2c + self.count = self.__block_count + + if self.__i2c_address not in self.__i2c.scan(): + raise RuntimeError(f"No I2C device at address {self.__i2c_address} found") + + def __calculate_address(self, block_idx, offset=0): + address = block_idx * self.__block_size + offset + if address > self.__max_size or address < 0: + raise RuntimeError(f"Invalid address {address}, out of range [0, {self.__max_size}]") + result = bytearray(2) + result[0] = address >> 8 + result[1] = address & 0xFF + + + def __set_read_address(self, address): + self.__i2c.writeto(self.__i2c_address, address) + + def readblocks(self, block_num, buf, offset=0): + """ + Parameters + ---------- + block_num + buf + offset + + Returns + ------- + + According to https: // docs.micropython.org / en / latest / library / os.html # os.AbstractBlockDev + if offset = 0: + read multiple aligned blocks (count given by the length of buf, which is a multiple of the block size) + else: + read arbitrary number of bytes from an arbitrary location + + We do not differentiate between those two cases, as the EEPROM only supports random reads (either single byte + or sequential) + """ + bytes_to_read = len(buf) + if offset == 0 and bytes_to_read % self.__block_size != 0: + raise RuntimeError("Buffer length not a multiple of the block size") + if bytes_to_read + block_num * self.__block_size + offset > self.__max_size: + raise RuntimeError("Error: operation would read beyond the maximum address") + address = self.__calculate_address(block_num, offset) + self.__set_read_address(address) + + # we don't want to overwrite buf, but write into it + buf[0:bytes_to_read] = self.__i2c.readfrom(self.__i2c_address, bytes_to_read) + + def __write_page(self, address, data): + self.__i2c.writevto(self.__i2c_address, (address, data)) + + def writeblocks(self, block_num: int, buf: bytearray, offset=0) -> None: + """ + Parameters + ---------- + block_num: int + buf: bytearray + offset: int + + Returns + ------- + + According to https://docs.micropython.org/en/latest/library/os.html#os.AbstractBlockDev + if offset = 0: + read multiple aligned blocks (count given by the length of buf, which is a multiple of the block size) + else: + read arbitrary number of bytes from an arbitrary location + + We need to differentiate, because the EEPROM supports two modes: + * single byte write: writes at random addresses + * page writes: write 32 bytes at once at a random address, but wrap around at the page boundary, + potentially overwriting data - so we need to ensure that we start a page write at a page boundary so we + don't cross page boundaries + """ + + bytes_to_write = len(buf) + + if bytes_to_write + block_num * self.__block_size + offset > self.__max_size: + raise RuntimeError("Error: operation would read beyond the maximum address") + + if offset == 0: + block_count, remainder = divmod(bytes_to_write, self.__block_size) + if remainder != 0: + raise RuntimeError("Buffer length not a multiple of the block size") + for i in range(block_count): + address = self.__calculate_address(block_num + i, 0) + start = i * self.__block_size + end = start + self.__block_size + self.__write_page(address, buf[start:end]) + + else: + block_count, remainder = divmod(bytes_to_write, self.__block_size) + if remainder != self.__block_size - offset: + raise RuntimeError("Buffer length not a multiple of the block size + offset") + + address = self.__calculate_address(block_num, offset) + self.__write_page(address, buf[:self.__block_size - offset]) + + for i in range(block_count): + address = self.__calculate_address(block_num + i + 1, 0) + start = i * self.__block_size + self.__block_size - offset + end = start + self.__block_size + self.__write_page(address, buf[start:end]) + + +def ioctl(self, op, arg): + if op == 4: # block count + return self.__block_count + if op == 5: # block size + return self.__block_size + + # Needed for littlefs + if op == 6: # erase + return 0 diff --git a/umnp/microcontroller/eeprom/__init__.py b/umnp/microcontroller/eeprom/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/umnp/microcontroller/measurementdevice.py b/umnp/microcontroller/measurementdevice.py new file mode 100644 index 0000000000000000000000000000000000000000..56d587bb6ab3809a81024aec064813f19558e9b0 --- /dev/null +++ b/umnp/microcontroller/measurementdevice.py @@ -0,0 +1,41 @@ +import binascii +import sys +import time + +from umnp.protocol.constants import MSG_STRING_ENCODING + +if sys.implementation.name == "micropython": + # noinspection PyUnresolvedReferences + import machine +else: + from umnp.microcontroller.umock import machine + + +class MeasurementDevice: + def __init__(self): + self._boot_time = time.time() + self._identifier_raw = machine.unique_id() + self._identifier = binascii.hexlify(self.identifier_raw).decode(MSG_STRING_ENCODING) + self._network = None + + @property + def boot_time(self): + return self._boot_time + + def add_network_adapter(self, adapter): + self._network = adapter + + def generated_mac_string(self) -> str: + machine_id = self.identifier_raw[:6] + return ':'.join(f'{digit:02x}' for digit in machine_id) + + def generated_mac_raw(self) -> bytes: + return self.identifier_raw[:6] + + @property + def identifier_raw(self) -> bytes: + return self._identifier_raw + + @property + def identifier(self) -> str: + return self._identifier diff --git a/umnp/microcontroller/network/__init__.py b/umnp/microcontroller/network/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..ab19e4a1980609b526bccebc79fabdb2689d736c --- /dev/null +++ b/umnp/microcontroller/network/__init__.py @@ -0,0 +1,16 @@ +LISTEN_TIMEOUT_MS = 1 + + +def calculate_broadcast_ip(ip: str, mask: str) -> str: + if ip is None or mask is None: + return "" + + ip = [int(x) for x in ip.split(".")] + mask = [int(x) for x in mask.split(".")] + + if len(ip) != 4 or len(mask) != 4: + return "" + + bc = [(i | ~j) & 0xff for i, j in zip(ip, mask)] + bc = ".".join(str(x) for x in bc) + return bc diff --git a/umnp/microcontroller/network/ethernet.py b/umnp/microcontroller/network/ethernet.py new file mode 100644 index 0000000000000000000000000000000000000000..8282bfd67688d0e07ad47a8529931a718ec6cc7e --- /dev/null +++ b/umnp/microcontroller/network/ethernet.py @@ -0,0 +1,17 @@ +# micropython code +# can't use abstract baseclass here + +class EthernetAdapter: + def __init__(self): + pass + + @property + def ip(self): + return + + @property + def netmask(self): + return + + def enable_dhcp(self): + pass diff --git a/umnp/microcontroller/network/ethernet_w5500.py b/umnp/microcontroller/network/ethernet_w5500.py new file mode 100644 index 0000000000000000000000000000000000000000..0b4c9691812717ab753388b915b93395e353fff6 --- /dev/null +++ b/umnp/microcontroller/network/ethernet_w5500.py @@ -0,0 +1,44 @@ +from umnp.microcontroller.network.ethernet import EthernetAdapter +import sys + +if sys.implementation.name == "micropython": + # noinspection PyUnresolvedReferences + import machine + # noinspection PyUnresolvedReferences + import network +else: + from umnp.microcontroller.umock import machine, network + + +class EthernetW5500(EthernetAdapter): + def __init__(self, spi: machine.SPI, pin1: int, pin2: int, mac: bytes, dhcp=False): + super().__init__() + self._spi = spi + self._pin1 = machine.Pin(pin1) + self._pin2 = machine.Pin(pin2) + self._nic = network.WIZNET5K(spi, self._pin1, self._pin2) + # self._nic.active(True) + self._nic.config(mac=mac) + if dhcp: + self.enable_dhcp() + + def get_dhcp(self): + while True: + print("Requesting IP via DHCP") + try: + self._nic.ifconfig('dhcp') + return + except OSError: + pass + + def enable_dhcp(self): + self.get_dhcp() + print(self._nic.ifconfig()) + + @property + def netmask(self): + return self._nic.ifconfig()[1] + + @property + def ip(self): + return self._nic.ifconfig()[0] diff --git a/umnp/microcontroller/network/udp.py b/umnp/microcontroller/network/udp.py new file mode 100644 index 0000000000000000000000000000000000000000..4f3c098844f8370e9f6332fb642e6ddf07648273 --- /dev/null +++ b/umnp/microcontroller/network/udp.py @@ -0,0 +1,51 @@ +import sys +import socket +import select + +from umnp.microcontroller.network import LISTEN_TIMEOUT_MS, calculate_broadcast_ip + +if sys.implementation.name == "micropython": + # noinspection PyUnresolvedReferences + import uasyncio as asyncio +else: + import asyncio + + +class UDPReceiver: + def __init__(self, listen_ip: str, listen_port: int, timeout: int = LISTEN_TIMEOUT_MS): + self.socket = None + self._listen_port = listen_port + self.listen_ip = listen_ip + self.timeout = timeout + self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + self.socket.setblocking(False) + self.socket.bind((listen_ip, listen_port)) + self.poller = select.poll() + self.poller.register(self.socket, select.POLLIN) + + async def receive(self, controller): + timeout = self.timeout + while True: + if self.poller.poll(timeout): + buffer, address = self.socket.recvfrom(1024) + await controller.queue_incoming_message(buffer, address) + await asyncio.sleep(0) + + +class UDPSender: + def __init__(self, ip, netmask, target_port: int): + self.socket = None + self.ip = ip + self.netmask = netmask + self._target_port = target_port + + if ip and netmask: + self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + self.broadcast_ip = calculate_broadcast_ip(ip, netmask) + + async def broadcast(self, msg): + # print("sending %s" % msg) + if isinstance(msg, str): + msg = msg.encode('utf-8') + if self.socket: + self.socket.sendto(msg, (self.broadcast_ip, self._target_port)) diff --git a/umnp/microcontroller/sensors/__init__.py b/umnp/microcontroller/sensors/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/umnp/microcontroller/sensors/sht25/__init__.py b/umnp/microcontroller/sensors/sht25/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..c2304939f58542643c32b25f1b2246e0978ac5c8 --- /dev/null +++ b/umnp/microcontroller/sensors/sht25/__init__.py @@ -0,0 +1,123 @@ +try: + from machine import I2C +except ImportError: + from umnp.microcontroller.umock.machine import I2C + +import time +import asyncio + +# SHT25_READ_T_HOLD = +# SHT25_READ_RH_HOLD = +# SHT25_WRITE_USER_REGISTER = +# SHT25_READ_USER_REGISTER = + +SHT25_READ_T_NO_HOLD = 0xF3 +SHT25_READ_RH_NO_HOLD = 0xF5 +SHT25_SOFT_RESET = 0xFE +NO_HOLD_WAIT_TIME = 18.5 / 1000 / 1000 +SHT25_MEASUREMENT_TYPE_T = 0 +SHT25_MEASUREMENT_TYPE_RH = 1 + + +class SHT25: + def __init__(self, i2c: I2C): + self._i2c = i2c + self._address = 64 + self._initialised = False + self._wait_rh_ms = 100 + self._wait_t_ms = 100 + self._nan = float("NAN") + devices = self._i2c.scan() + if self._address not in devices: + print("Device not found") + return + if self.reset(): + print("Initialised") + self._initialised = True + else: + print("Could not initialise device") + + @property + def initialised(self) -> bool: + return self._initialised + + def _command(self, command: int) -> int: + pass + + def _crc8(self, data): + # CRC-8-Dallas/Maxim for I2C with 0x31 polynomial + crc = 0x0 + for byte in data: + crc ^= byte + for _ in range(8): + if crc & 0x80: + crc = (crc << 1) ^ 0x31 + else: + crc = crc << 1 + crc &= 0xFF + + return crc + + def reset(self) -> bool: + cmd = bytearray(1) + cmd[0] = SHT25_SOFT_RESET + n_ack = self._i2c.writeto(self._address, cmd) + if n_ack == 1: + return True + return False + + def _decode(self, buffer): + lsb = buffer[1] + msb = buffer[0] + measurement_type = (lsb & 0x2) >> 1 + lsb = lsb & ~0x03 + + if self._crc8(buffer[:2]) != buffer[2]: + return self._nan, self._nan + + return lsb + (msb << 8), measurement_type + + def _translate_temperature(self, buffer) -> float: + t_raw, measurement_type = self._decode(buffer) + if measurement_type != SHT25_MEASUREMENT_TYPE_T: + return self._nan + + return -46.85 + 175.72 * t_raw / 2**16 + + def _translate_rh(self, buffer): + rh_raw, measurement_type = self._decode(buffer) + if measurement_type != SHT25_MEASUREMENT_TYPE_RH: + return self._nan + + return -6 + 125 * rh_raw / 2**16 + + async def measure(self): + temperature = self._nan + rh = self._nan + if not self.initialised: + # print("Not initialised") + return temperature, rh + + cmd = bytearray(1) + cmd[0] = SHT25_READ_T_NO_HOLD + n_ack = self._i2c.writeto(self._address, cmd) + # print("T req n_ack" ,n_ack) + if n_ack != 1: + return temperature, rh + + time.sleep(NO_HOLD_WAIT_TIME * 2) + await asyncio.sleep(self._wait_t_ms / 1000) + data = self._i2c.readfrom(self._address, 3) + temperature = self._translate_temperature(data) + data = None + + cmd[0] = SHT25_READ_RH_NO_HOLD + n_ack = self._i2c.writeto(self._address, cmd, False) + time.sleep(NO_HOLD_WAIT_TIME * 2) + self._i2c.readfrom(self._address, 0) # send stop signal + await asyncio.sleep(self._wait_rh_ms / 1000) + + data = self._i2c.readfrom(self._address, 3) + rh = self._translate_rh(data) + + return round(temperature, 3), round(rh, 3) diff --git a/umnp/microcontroller/tasks/__init__.py b/umnp/microcontroller/tasks/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/umnp/microcontroller/tasks/periodictask.py b/umnp/microcontroller/tasks/periodictask.py new file mode 100644 index 0000000000000000000000000000000000000000..deb9c32ed2377be7b5ee3df5eb2edf6c0627a610 --- /dev/null +++ b/umnp/microcontroller/tasks/periodictask.py @@ -0,0 +1,34 @@ +import time +import sys +if sys.implementation.name == "micropython": + # noinspection PyUnresolvedReferences + import uasyncio as asyncio +else: + import asyncio + + +class PeriodicTask: + def __init__(self, function: callable, async_call_back: callable, every_ms: int, *args): + self._function = function + self._args = args + self._every_ms = every_ms + self._call_back = async_call_back + + async def run(self): + func = self._function + args = self._args + call_back = self._call_back + delay_seconds = 25/1000 + while True: + last = time.time() + print("A0", args) + print("A1", *args) + result = await func(*args) + print(result) + await call_back(result) + + while True: + now = time.time() + if now - last > 0: + break + await asyncio.sleep(delay_seconds) diff --git a/umnp/microcontroller/umock/__init__.py b/umnp/microcontroller/umock/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..139597f9cb07c5d48bed18984ec4747f4b4f3438 --- /dev/null +++ b/umnp/microcontroller/umock/__init__.py @@ -0,0 +1,2 @@ + + diff --git a/umnp/microcontroller/umock/framebuf/__init__.py b/umnp/microcontroller/umock/framebuf/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..fcea3fe449d5e1f3f5e14d56d3d5420203842e0c --- /dev/null +++ b/umnp/microcontroller/umock/framebuf/__init__.py @@ -0,0 +1,17 @@ +RGB565 = 1 + + +class FrameBuffer: + def __init__(self, buffer, width, height, fmt, stride=None): + self._buffer = buffer + self._width = width + self._height = height + self._format = fmt + self._stride = stride + if self._stride is None: + self._stride = width # or height? FIXME + self._fill = None + + def fill(self, *args): + self._fill = args + diff --git a/umnp/microcontroller/umock/machine/__init__.py b/umnp/microcontroller/umock/machine/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..4057afa6f0bc5f8d1b874f14541ee8497bb3fd3c --- /dev/null +++ b/umnp/microcontroller/umock/machine/__init__.py @@ -0,0 +1,65 @@ +import datetime +import uuid + +from umnp.protocol import MSG_BYTE_ORDER + + +def unique_id() -> bytes: + return uuid.getnode().to_bytes(6, MSG_BYTE_ORDER) + + +class RTC: + def __init__(self): + self._time_offset = 0 + + def datetime(self, set_time=None): + if set_time: + # save time offset to given time + self._time_offset = 0 + + print("RTC.datetime() setting not implemented") + return + + now = datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(seconds=self._time_offset) + return now.year, now.month, now.day, now.hour, now.minute, now.second, now.microsecond / 1000 / 1000 + + +class SPI: + def __init__(self, *args, **kwargs): + pass + + def write(self, *args): + pass + + +class Pin: + OUT = 1 + + def __init__(self, pin: int, mode: int = -1): + self._pin = pin + self._mode = mode + + def __call__(self, value): + pass + + +class I2C: + def __init__(self, id: int, scl: Pin, sda: Pin): + self._id = id + self._scl = scl + self._sda = sda + + def scan(self) -> list[int]: + return [] + + def writeto(self, addr: int, buf: bytearray, sopt=True): + return 1 + + def readfrom_info(self, addr: int, buf, stop=True, ): + return None + + def readfrom(self, addr: int, nbytes, stop=True) -> bytes: + return b'' + + def writevto(self,add: int, vector, stop=True): + return 1 \ No newline at end of file diff --git a/umnp/microcontroller/umock/micropython.py b/umnp/microcontroller/umock/micropython.py new file mode 100644 index 0000000000000000000000000000000000000000..9a4c9c71e7642a9de69722bdbdafdbf88969e5f3 --- /dev/null +++ b/umnp/microcontroller/umock/micropython.py @@ -0,0 +1,2 @@ +def const(x: any) -> any: + return x diff --git a/umnp/microcontroller/umock/network/__init__.py b/umnp/microcontroller/umock/network/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..fe11856f5ea5f325ade7f8d592fb952a79605114 --- /dev/null +++ b/umnp/microcontroller/umock/network/__init__.py @@ -0,0 +1,30 @@ +from umnp.microcontroller.umock.machine import SPI, Pin + + +class WIZNET5K: + def __init__(self, spi: SPI, pin1: Pin, pin2: Pin, mac: bytes): + self._active: bool = False + self._mac = b'\xe6ad\x08Cx' # FIXME + self._ip = "127.0.0.1" + self._gateway = "127.0.0.2" + self._netmask = "255.255.255.0" + self._nameserver = "8.8.8.8" + self._spi = spi + self._pin1 = pin1 + self._pin2 = pin2 + self._mac = mac + + def active(self, active: bool) -> None: + self._active = active + + def config(self, *args, **kwargs): + if len(args) == 1 and args[0] == "mac": + return self._mac + if 'mac' in kwargs: + self._mac = kwargs['mac'] + + def ifconfig(self, *args, **kwargs): + if len(args) == 1 and args[0] == "dhcp": + return + if args is None and kwargs is None: + return self._ip, self._netmask, self._gateway, self._nameserver diff --git a/umnp/protocol/__init__.py b/umnp/protocol/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..eb786fa7fbcd5e2ae293a686c11484c91166a999 --- /dev/null +++ b/umnp/protocol/__init__.py @@ -0,0 +1,7 @@ +from umnp.protocol.constants import * +from umnp.protocol.messagetype import MSG_DEVICE_DATA + +from umnp.protocol.message import Message +from umnp.protocol.data_message import DataMessage + +Message.add_message_type(MSG_DEVICE_DATA, DataMessage) diff --git a/umnp/protocol/common/__init__.py b/umnp/protocol/common/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/umnp/protocol/common/timestamp.py b/umnp/protocol/common/timestamp.py new file mode 100644 index 0000000000000000000000000000000000000000..575a4e66f6ff8087f0b82d607f7dced7fbdcf473 --- /dev/null +++ b/umnp/protocol/common/timestamp.py @@ -0,0 +1,33 @@ +try: + import datetime +except ImportError: + from umnp.protocol.compat.datetime import datetime + + +class TimeStamp: + def __init__(self, when=None): + if when: + self._ts = int(round(datetime.datetime.now(datetime.timezone.utc).timestamp())) + else: + self._ts = when + + if not isinstance(self._ts, int): + raise ValueError("TimeStamp not an integer") + if not self._ts >= 0: + raise ValueError("TimeStamp < 0") + + @property + def value(self) -> int: + return self._ts + + +def valid_timestamp(when: TimeStamp | None) -> TimeStamp: + if when is None: + return TimeStamp() + + if not isinstance(when, TimeStamp): + raise ValueError("Expected None TimeStamp") + + return when + + diff --git a/umnp/protocol/compat/__init__.py b/umnp/protocol/compat/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/umnp/protocol/compat/datetime.py b/umnp/protocol/compat/datetime.py new file mode 100644 index 0000000000000000000000000000000000000000..62fafebd666a6af8c6cea63ab75a62b6fa484a0d --- /dev/null +++ b/umnp/protocol/compat/datetime.py @@ -0,0 +1,44 @@ +import time + +try: + # noinspection PyUnresolvedReferences + import machine +except ImportError: + import umnp.microcontroller.umock.machine as machine + + +class timezone: + def __init__(self): + pass + + @property + def utc(self): + return None + + +class datetime: + def __init__(self): + self.rtc = machine.RTC() + self._time = int(round(time.time())) + + date = self.rtc.datetime() + self.year, self.month, self.day, self.hour, self.minute, self.second, self.second_fraction = date + + def _update(self): + date = self.rtc.datetime() + self._time = time.time() + self.year, self.month, self.day, self.hour, self.minute, self.second, self.second_fraction = date + + @classmethod + def utcnow(cls): + return datetime() + + def timestamp(self): + return self._time + + @classmethod + def now(cls, *args): + return datetime() + + def timezone(self): + return timezone() diff --git a/umnp/protocol/compat/logging.py b/umnp/protocol/compat/logging.py new file mode 100644 index 0000000000000000000000000000000000000000..a3ea0a73d6a77c3a75d375a557b657d9798abb79 --- /dev/null +++ b/umnp/protocol/compat/logging.py @@ -0,0 +1,6 @@ +def info(msg: str) -> None: + print(f"INFO: {msg}") + + +def error(msg: str) -> None: + print(f"ERROR: {msg}") diff --git a/umnp/protocol/compat/time.py b/umnp/protocol/compat/time.py new file mode 100644 index 0000000000000000000000000000000000000000..f6d2c32f3368ab0cf33335bbb148d892b4418201 --- /dev/null +++ b/umnp/protocol/compat/time.py @@ -0,0 +1,7 @@ +try: + import typing +except ImportError: + pass + + + diff --git a/umnp/protocol/constants.py b/umnp/protocol/constants.py new file mode 100644 index 0000000000000000000000000000000000000000..e8ffbf06c7b6a9c1d9f600a897d0a049c10431b9 --- /dev/null +++ b/umnp/protocol/constants.py @@ -0,0 +1,15 @@ +try: + # noinspection PyUnresolvedReferences + import typing +except ImportError: + pass + +MSG_STRING_ENCODING: str = 'utf-8' +MSG_PROTOCOL_VERSION: int = 0 +MSG_BYTE_ORDER: typing.Literal["little", "big"] = 'big' +MSG_VERSION_LENGTH: int = 2 +MSG_TYPE_LENGTH: int = 4 +MSG_PAYLOAD_LENGTH: int = 4 +MSG_TIMESTAMP_LENGTH: int = 4 +MSG_SENDER_ID_LENGTH: int = 6 +MSG_SENDER_TYPE_LENGTH: int = 2 diff --git a/umnp/protocol/data_message.py b/umnp/protocol/data_message.py new file mode 100644 index 0000000000000000000000000000000000000000..e1f77617b3f29aa5d180bcecfe8690f71beb1925 --- /dev/null +++ b/umnp/protocol/data_message.py @@ -0,0 +1,26 @@ +from umnp.protocol import MSG_DEVICE_DATA +from umnp.protocol.constants import MSG_STRING_ENCODING +from umnp.protocol.message import Message, MessageHeader +from umnp.protocol.common.timestamp import TimeStamp + + +class DataMessage(Message): + def __init__(self, data: str, sender_id: bytes, sender_type: int, send_time: TimeStamp = None): + self._payload = data + self._encoded_data = data.encode(MSG_STRING_ENCODING) + super().__init__(MSG_DEVICE_DATA, self._encoded_data, sender_id, sender_type, send_time) + + @staticmethod + def _decode_payload(transferred_data): + return transferred_data.decode(MSG_STRING_ENCODING) + + def payload(self) -> str: + return self._payload + + @classmethod + def decode(cls, payload: bytes, header: MessageHeader) -> 'DataMessage': + decoded_payload = cls._decode_payload(payload) + return cls(decoded_payload, header.sender_id, header.sender_type, header.timestamp) + + + diff --git a/umnp/protocol/device.py b/umnp/protocol/device.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/umnp/protocol/device_types.py b/umnp/protocol/device_types.py new file mode 100644 index 0000000000000000000000000000000000000000..48c5ccb81490ddce9aabf7684697b95e108e274f --- /dev/null +++ b/umnp/protocol/device_types.py @@ -0,0 +1,3 @@ +DEVICE_TYPE_UNKNOWN = 0 +DEVICE_TYPE_RHTP = 1 +DEVICE_TYPE_CPC_GRIMM = 11 diff --git a/umnp/protocol/message.py b/umnp/protocol/message.py new file mode 100644 index 0000000000000000000000000000000000000000..0417874e08d6f8a4c6db8ff43f98c7440498322e --- /dev/null +++ b/umnp/protocol/message.py @@ -0,0 +1,102 @@ + +try: + # noinspection PyUnresolvedReferences + import typing +except ImportError: + pass + +try: + import logging +except ImportError: + import umnp.protocol.compat.logging as logging + + +from umnp.protocol.constants import MSG_BYTE_ORDER, MSG_PAYLOAD_LENGTH +from umnp.protocol.message_header import MessageHeader +from umnp.protocol.common.timestamp import TimeStamp + + +class Message: + _registered_types = {} + + @property + def message_types(self): + return self._registered_types + + def __init__(self, + msg_type: int, + data: bytes, + sender_id: bytes, + sender_type: int, + send_time: typing.Optional[TimeStamp]): + """ + Parameters + ---------- + send_time: datetime.datetime, optional + The timestamp of sending. If None, the current time will be used. + """ + self._sender_id = sender_id + self._sender_type = sender_type + self._data = data + self._header = MessageHeader(msg_type, + sender_device_type=sender_type, + sender_device_id=sender_id, + send_time=send_time) + + + @classmethod + def add_message_type(cls, msg_type: int, msg: typing.Type['Message']): + if msg_type in cls._registered_types: + logging.info(f"Already registered {msg_type}") + return + cls._registered_types[msg_type] = msg + + @classmethod + def get_message_type(cls, msg_type: int): + return cls._registered_types[msg_type] + + @property + def type(self) -> int: + return self._header.message_type + + @property + def version(self) -> int: + return self._header.version + + @property + def data(self) -> bytes: + return self._data + + def encode(self) -> bytes: + header = MessageHeader(msg_type=self.type, + sender_device_type=self._sender_type, + sender_device_id=self._sender_id) + payload = self.data + payload_length = len(payload).to_bytes(length=MSG_PAYLOAD_LENGTH, byteorder=MSG_BYTE_ORDER) + return header.encode() + payload_length + payload + + @classmethod + def from_bytes(cls, data: bytes): + header = MessageHeader.decode(data) + offset = header.encoded_size_bytes + try: + payload_bytes = data[offset:offset + MSG_PAYLOAD_LENGTH] + payload_length = int.from_bytes(payload_bytes, MSG_BYTE_ORDER) + except (KeyError, ValueError): + logging.error("Invalid message: could not extract payload length") + return None + + offset += MSG_PAYLOAD_LENGTH + payload = data[offset:] + if len(payload) != payload_length: + logging.error("Invalid message: mismatch between specified and actual payload length") + return None + + msg = Message.get_message_type(header.message_type).decode(payload, header=header) + return msg + + + + + def __str__(self): + return f"Message of type {self.type}" diff --git a/umnp/protocol/message_header.py b/umnp/protocol/message_header.py new file mode 100644 index 0000000000000000000000000000000000000000..c0de102534408c1c737f6f148d558f0200366985 --- /dev/null +++ b/umnp/protocol/message_header.py @@ -0,0 +1,118 @@ +from umnp.protocol.common.timestamp import TimeStamp, valid_timestamp +from umnp.protocol.constants import MSG_PROTOCOL_VERSION, MSG_VERSION_LENGTH, MSG_BYTE_ORDER, MSG_TYPE_LENGTH, \ + MSG_TIMESTAMP_LENGTH, MSG_SENDER_ID_LENGTH, MSG_SENDER_TYPE_LENGTH + +try: + import logging +except ImportError: + import umnp.protocol.compat.logging as logging + +try: + # noinspection PyUnresolvedReferences + import typing +except ImportError: + pass + + +class MessageHeader: + def __init__(self, + msg_type: int, + sender_device_id: bytes, + sender_device_type: int, + send_time: typing.Optional[TimeStamp] = None, + version: int = MSG_PROTOCOL_VERSION, + ): + + self._message_type = msg_type + self._send_time = valid_timestamp(send_time) + self._timestamp = TimeStamp(self._send_time) + self._version = version + self._sender_device_id = sender_device_id + self._sender_device_type = sender_device_type + + @property + def message_type(self) -> int: + return self._message_type + + @property + def version(self): + return self._version + + @property + def sender_id(self) -> bytes: + return self._sender_device_id + + @property + def sender_type(self) -> int: + return self._sender_device_type + + @property + def timestamp(self) -> TimeStamp: + return self._timestamp + + def encode(self) -> bytes: + version = self.version.to_bytes(length=MSG_VERSION_LENGTH, byteorder=MSG_BYTE_ORDER) + sender_id = self._sender_device_id + sender_type = self._sender_device_type.to_bytes(length=MSG_SENDER_TYPE_LENGTH, byteorder=MSG_BYTE_ORDER) + message_type = self.message_type.to_bytes(length=MSG_TYPE_LENGTH, byteorder=MSG_BYTE_ORDER) + timestamp = self._timestamp.value.to_bytes(length=MSG_TIMESTAMP_LENGTH, byteorder=MSG_BYTE_ORDER) + return version + sender_id + sender_type + message_type + timestamp + + @property + def encoded_size_bytes(self) -> int: + return (MSG_VERSION_LENGTH + MSG_SENDER_ID_LENGTH + MSG_SENDER_TYPE_LENGTH + + MSG_TYPE_LENGTH + MSG_TIMESTAMP_LENGTH) + + @classmethod + def decode(cls, data: bytes): + offset = 0 + + if not (isinstance(data, bytes)): + logging.error("Invalid message header: not bytes") + return None + try: + protocol_bytes = data[:MSG_VERSION_LENGTH] + protocol_version = int.from_bytes(protocol_bytes, MSG_BYTE_ORDER) + cls._version = protocol_version + except (KeyError, ValueError): + logging.error("Invalid message header: could not extract version") + return None + offset += MSG_VERSION_LENGTH + + try: + message_sender_id_bytes = data[offset:offset + MSG_SENDER_ID_LENGTH] + message_sender_type_bytes = data[ + offset + MSG_SENDER_ID_LENGTH: offset + MSG_SENDER_ID_LENGTH + MSG_SENDER_TYPE_LENGTH] + message_sender_type = int.from_bytes(message_sender_type_bytes, MSG_BYTE_ORDER) + except(KeyError, ValueError): + logging.error("Invalid message sender information: could not extract data") + return None + + offset += MSG_SENDER_TYPE_LENGTH + MSG_SENDER_ID_LENGTH + + try: + message_type_bytes = data[offset:offset + MSG_TYPE_LENGTH] + message_type = int.from_bytes(message_type_bytes, MSG_BYTE_ORDER) + + except (KeyError, ValueError): + logging.error("Invalid message payload: could not extract version") + return None + + offset += MSG_TYPE_LENGTH + + if protocol_version < 0 or protocol_version > MSG_PROTOCOL_VERSION: + logging.error(f"Invalid protocol version {protocol_version}, outside of range [0, {MSG_PROTOCOL_VERSION}]") + return None + + try: + message_ts = data[offset: offset + MSG_TIMESTAMP_LENGTH] + timestamp = TimeStamp(int.from_bytes(message_ts, byteorder=MSG_BYTE_ORDER)) + except (KeyError, ValueError): + logging.error("Invalid message timestamp: could not extract version") + return None + + return cls(message_type, + sender_device_type=message_sender_type, + sender_device_id=message_sender_id_bytes, + version=protocol_version, + send_time=timestamp) diff --git a/umnp/protocol/messagetype.py b/umnp/protocol/messagetype.py new file mode 100644 index 0000000000000000000000000000000000000000..a27bcbc19dbbf06425a10ece88c673f0d8858bb4 --- /dev/null +++ b/umnp/protocol/messagetype.py @@ -0,0 +1,3 @@ +# This would be an enum, if micropython supported them + +MSG_DEVICE_DATA = 1 diff --git a/umnp/protocol/register_messages.py b/umnp/protocol/register_messages.py new file mode 100644 index 0000000000000000000000000000000000000000..2b19405e2a42349a4bcfdee4c5225535946575e8 --- /dev/null +++ b/umnp/protocol/register_messages.py @@ -0,0 +1,10 @@ +try: + # noinspection PyUnresolvedReferences + import typing +except ImportError: + pass +from umnp.protocol.message import Message + + +def register_messages(msg_type: int, msg: typing.Type[Message]): + msg.add_message_type(msg_type, msg) diff --git a/umnp/test.py b/umnp/test.py new file mode 100644 index 0000000000000000000000000000000000000000..0a347134c444bcfd2a8ae3eca9aeb10991f9a408 --- /dev/null +++ b/umnp/test.py @@ -0,0 +1,23 @@ +import binascii + + +from protocol.data_message import DataMessage +from umnp.microcontroller.umock.machine import SPI, Pin +from umnp.microcontroller.umock.network import WIZNET5K +from umnp.protocol import Message + +x = DataMessage("abasdc", None, None) +w = WIZNET5K(SPI(), Pin(1), Pin(1), mac=b'') +print(w.config('mac')) +w.config(mac=3) +print(w.config('mac')) +encoded = x.encode() +print("====") +print(binascii.hexlify(encoded)) +print("====") +# encoded = b'\x00\x00\x00\x00\x00\x01\x00\x00\x00\x03abc' +print(encoded) + +y = Message.from_bytes(encoded) +print(y) +print(y._data) diff --git a/umnp/test_enum.py b/umnp/test_enum.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391