Skip to content
Snippets Groups Projects
Commit 37e13603 authored by Andreas Gattringer's avatar Andreas Gattringer
Browse files

first code dump

parent 32cbd660
No related branches found
No related tags found
No related merge requests found
Showing
with 793 additions and 0 deletions
import sys
from umnp.microcontroller.communication.communicator import Communicator
from umnp.microcontroller.measurementdevice import MeasurementDevice
from umnp.microcontroller.network.ethernet_w5500 import EthernetW5500
from umnp.microcontroller.network.udp import UDPSender, UDPReceiver
from umnp.microcontroller.sensors.sht25 import SHT25
from umnp.microcontroller.tasks.periodictask import PeriodicTask
if sys.implementation.name == "micropython":
# noinspection PyUnresolvedReferences
import machine
# noinspection PyUnresolvedReferences
import uasyncio as asyncio
else:
from umnp.microcontroller.umock import machine
import asyncio
def test_function(*args):
print("test_function called with: ", *args)
result = " - ".join(*args)
print(f"test_function returns '{result}'")
return result
def main():
# configure network
device = MeasurementDevice()
spi = machine.SPI(0, 2_000_000, mosi=machine.Pin(19), miso=machine.Pin(16), sck=machine.Pin(18))
ether = EthernetW5500(spi, 17, 20, mac=device.generated_mac_raw(), dhcp=True)
device.add_network_adapter(ether)
i2c = machine.I2C(id=1, scl=machine.Pin(27), sda=machine.Pin(26))
sht25 = SHT25(i2c)
sender = UDPSender(ether.ip, ether.netmask, 7777)
receiver = UDPReceiver(ether.ip, 7776)
comm = Communicator(receiver=receiver, sender=sender, device_id=device.identifier)
x = PeriodicTask(test_function, print, 1000)
comm.add_task(x, "test_function")
# start
asyncio.run(comm.start())
from fs import FileSystemInformation
current_dir = ""
def main():
fs = FileSystemInformation()
fs.print_fs_tree()
main()
import os
def fs_entries_recurse(current_directory="", total_path=""):
files = []
dirs = []
all_entries = []
for fstats in os.ilistdir(current_directory):
fn = fstats[0]
ft = fstats[1]
all_entries.append(total_path + "/" + fn)
if ft == 0x4000:
dirs.append(total_path + "/" + fn)
f, current_directory, a = fs_entries_recurse(fn, total_path + "/" + fn)
files.extend(f)
dirs.extend(current_directory)
all_entries.extend(a)
else:
files.append(total_path + "/" + fn)
return files, dirs, all_entries
class FileSystemInformation:
def __init__(self):
self._size_cache = {}
self._files = []
self._directories = []
self._all_entries = []
self._vfs_stats = []
self._usage = None
self._size_free_kb = None
self._blocks_free = None
self._max_fn_len = None
self._blocks_used = None
self._block_size = None
self._size_total_kb = None
self._longest_path = 0
self.update_file_system_info()
self._files, self._directories, self._all_entries = fs_entries_recurse()
for fn in self._all_entries:
self._longest_path = max(self._longest_path, len(fn))
def invalidate(self):
self._size_cache = {}
self._vfs_stats = []
self._files = []
self._directories = []
self._all_entries = []
def get_size(self, path):
if path in self._size_cache:
return self._size_cache[path]
mode, inode, device_id, link_count, uid, gid, size, atime, mtime, ctime = os.stat(path)
b = 0
if mode == 0x8000:
b = size
self._size_cache[path] = b
return b
def update_file_system_info(self):
if len(self._vfs_stats) == 0:
self._vfs_stats = os.statvfs("/")
b_size, _, b_count, b_count_free, _, _, _, _, _, max_fn_len = self._vfs_stats
self._usage = 1 - b_count_free / b_count
self._size_free_kb = b_size * b_count_free / 1024
self._blocks_free = b_count_free
self._blocks_used = b_count - b_count_free
self._block_size = b_size
self._size_total_kb = b_size * b_count / 1024
self._max_fn_len = max_fn_len
def print_fs_tree(self):
for fn in self._all_entries:
spacer = " " * (self._longest_path + 2 - len(fn))
size = self.get_size(fn)
size_spacer = " " * (6 - len(str(size)))
print(f"{fn}{spacer} {size_spacer}{size} bytes")
import sys
if sys.implementation.name == "micropython":
# noinspection PyUnresolvedReferences
import machine
else:
from umnp.microcontroller.umock import machine
UNIQUE_ID_MAC = machine.unique_id()[:6]
UNIQUE_ID_MAC_STR = ':'.join('%02x' % x for x in UNIQUE_ID_MAC)
import sys
import time
from umnp.microcontroller.network.udp import UDPSender, UDPReceiver
from umnp.microcontroller.tasks.periodictask import PeriodicTask
if sys.implementation.name == "micropython":
# noinspection PyUnresolvedReferences
import uasyncio as asyncio
# noinspection PyUnresolvedReferences
import machine
else:
import asyncio
from umnp.microcontroller.umock import machine
class Communicator:
def __init__(self, sender: UDPSender, receiver: UDPReceiver, device_id, max_msgs: int = 10):
self._receive_lock = asyncio.Lock()
self._send_lock = asyncio.Lock()
self._messages_received = []
self._messages_send_queue = []
self._max_msgs = max_msgs
self._sender = sender
self._receiver = receiver
self._tasks = {}
self._device_id = device_id
async def queue_incoming_message(self, msg, source):
async with self._receive_lock:
self._messages_received.append((msg, source, time.time()))
while len(self._messages_received) > self._max_msgs:
self._messages_received.pop(0)
async def get_newest_message(self):
async with self._receive_lock:
if len(self._messages_received):
return self._messages_received.pop(0)
else:
return None
async def receive_task(self):
while True:
await self._receiver.receive(self)
await asyncio.sleep(0.500)
async def send_task(self):
device_id = self._device_id
rtc = machine.RTC()
while True:
msg = None
async with self._send_lock:
if len(self._messages_send_queue) > 0:
msg = self._messages_send_queue.pop()
if msg is not None:
msg = msg.replace(",", ";")
now = rtc.datetime()
now = "%04d-%02d-%02dT%02d:%02d:%02d" % (now[0], now[1], now[2], now[4], now[5], now[6])
await self._sender.broadcast("%s,%s,%s" % (device_id, now, msg))
await asyncio.sleep(0.5)
async def send_data_message(self, data: str):
async with self._send_lock:
self._messages_send_queue.append(data)
async def control_task(self):
while True:
async with self._receive_lock:
# print("Control: %d" % len(self.msgs))
msg = await self.get_newest_message()
if msg is not None:
pass
# print("Controll::msg::", msg)
await asyncio.sleep(0.5)
async def start(self):
receiver = asyncio.create_task(self.receive_task())
sender = asyncio.create_task(self.send_task())
controller = asyncio.create_task(self.control_task())
tasks = []
for name, task in self._tasks.items():
result = asyncio.create_task(task.run())
tasks.append(result)
for t in tasks:
await t
await receiver
await sender
await controller
def add_task(self, task: PeriodicTask, name: str):
self._tasks[name] = task
import sys
if sys.implementation.name == "micropython":
# noinspection PyUnresolvedReferences
import framebuf
# noinspection PyUnresolvedReferences
import machine
# noinspection PyUnresolvedReferences
from micropython import const
else:
from umnp.microcontroller.umock import machine, network, framebuf
from umnp.microcontroller.umock.micropython import const
_BL = const(13)
_DC = const(8)
_RST = const(12)
_MOSI = const(11)
_SCK = const(10)
_CS = const(9)
_WIDTH = const(240)
_HEIGHT = const(240)
def colour(red, green, blue): # Convert RGB888 to RGB565 # copied from? FIXME
return (
(((green & 0b00011100) << 3) + ((blue & 0b11111000) >> 3) << 8) +
(red & 0b11111000) + ((green & 0b11100000) >> 5)
)
class PicoLCD13(framebuf.FrameBuffer):
def __init__(self):
self.width = _WIDTH
self.height = _HEIGHT
self.cs = machine.Pin(_CS, machine.Pin.OUT)
self.rst = machine.Pin(_RST, machine.Pin.OUT)
self.cs(1)
self.spi = machine.SPI(1, 100_000_000, polarity=0, phase=0,
sck=machine.Pin(_SCK),
mosi=machine.Pin(_MOSI),
miso=None)
self.dc = machine.Pin(_DC, machine.Pin.OUT)
self.dc(1)
self.buffer = bytearray(self.height * self.width * 2)
super().__init__(self.buffer, self.width, self.height, framebuf.RGB565)
self.red = 0x07E0
self.green = 0x001f
self.blue = 0xf800
self.white = 0xffff
self._bg_color = 0
self._initialise_display()
self.clear()
self.update()
@property
def bg_color(self):
return self._bg_color
def _write_cmd(self, cmd):
self.cs(1)
self.dc(0)
self.cs(0)
self.spi.write(bytearray([cmd]))
self.cs(1)
def _write_data(self, buf):
self.cs(1)
self.dc(1)
self.cs(0)
self.spi.write(bytearray([buf]))
self.cs(1)
def _config(self, cmd, data):
self._write_cmd(cmd)
for d in data:
self._write_data(d)
def clear(self, r=0, g=0, b=0):
self.fill(colour(r, g, b))
def _initialise_display(self):
self.rst(1)
self.rst(0)
self.rst(1)
self._config(0x36, (0x70,)) # Address Order setup
self._config(0x3a, (0x05,)) # Interface Pixel Format
self._config(0xB2, (0x0c, 0x0c, 0x00, 0x33, 0x33)) # Porch setting (back, front, 0, b partial, f part)
self._config(0xb7, (0x35,)) # gate control
self._config(0xbb, (0x19,)) # VCOMS setting
self._config(0xc0, (0x2c,)) # LCM Control
self._config(0xc2, (0x01,)) # VDV and VRH Command Enable
self._config(0xc3, (0x12,)) # VRH Set
self._config(0xc4, (0x20,)) # VRV Set
self._config(0xc6, (0x0f,)) # Frame Rate Control in normal mode
self._config(0xd0, (0xa4, 0xa1,)) # Power Control 1
self._config(0xe0, # Positive voltage gamma control
(0xD0, 0x04, 0x0D, 0x11, 0x13, 0x2B, 0x3F, 0x54, 0x4C, 0x18, 0x0D, 0x0B, 0x1F, 0x23))
self._config(0xe1, # Negative voltage gamma control
(0xD0, 0x04, 0x0C, 0x11, 0x13, 0x2C, 0x3F, 0x44, 0x51, 0x2F, 0x1F, 0x1F, 0x20, 0x23))
self._write_cmd(0x21) # Display inversion on
self._write_cmd(0x11) # Turn off sleep mode
self._write_cmd(0x29) # Display on
def update(self):
self._config(0x2a, (0x00, 0x00, 0x00, 0xef)) # column address set
self._config(0x2b, (0x00, 0x00, 0x00, 0xef)) # row address set
self._write_cmd(0x2C) # memory set
self.cs(1)
self.dc(1)
self.cs(0)
self.spi.write(self.buffer)
self.cs(1)
import sys
if sys.implementation.name == "micropython":
# noinspection PyUnresolvedReferences
from machine import I2C
else:
from umnp.microcontroller.umock.machine import I2C
I2C_ADDRESS_EEPROM24LC32A = 0x50
class EEPROM24LC32A:
def __init__(self, i2c: I2C, i2c_address=I2C_ADDRESS_EEPROM24LC32A):
self.__block_size = 32
self.__max_size = 4096
self.__block_count = self.__max_size // self.__block_size
self.__i2c_address = i2c_address
self.__i2c = i2c
self.count = self.__block_count
if self.__i2c_address not in self.__i2c.scan():
raise RuntimeError(f"No I2C device at address {self.__i2c_address} found")
def __calculate_address(self, block_idx, offset=0):
address = block_idx * self.__block_size + offset
if address > self.__max_size or address < 0:
raise RuntimeError(f"Invalid address {address}, out of range [0, {self.__max_size}]")
result = bytearray(2)
result[0] = address >> 8
result[1] = address & 0xFF
def __set_read_address(self, address):
self.__i2c.writeto(self.__i2c_address, address)
def readblocks(self, block_num, buf, offset=0):
"""
Parameters
----------
block_num
buf
offset
Returns
-------
According to https: // docs.micropython.org / en / latest / library / os.html # os.AbstractBlockDev
if offset = 0:
read multiple aligned blocks (count given by the length of buf, which is a multiple of the block size)
else:
read arbitrary number of bytes from an arbitrary location
We do not differentiate between those two cases, as the EEPROM only supports random reads (either single byte
or sequential)
"""
bytes_to_read = len(buf)
if offset == 0 and bytes_to_read % self.__block_size != 0:
raise RuntimeError("Buffer length not a multiple of the block size")
if bytes_to_read + block_num * self.__block_size + offset > self.__max_size:
raise RuntimeError("Error: operation would read beyond the maximum address")
address = self.__calculate_address(block_num, offset)
self.__set_read_address(address)
# we don't want to overwrite buf, but write into it
buf[0:bytes_to_read] = self.__i2c.readfrom(self.__i2c_address, bytes_to_read)
def __write_page(self, address, data):
self.__i2c.writevto(self.__i2c_address, (address, data))
def writeblocks(self, block_num: int, buf: bytearray, offset=0) -> None:
"""
Parameters
----------
block_num: int
buf: bytearray
offset: int
Returns
-------
According to https://docs.micropython.org/en/latest/library/os.html#os.AbstractBlockDev
if offset = 0:
read multiple aligned blocks (count given by the length of buf, which is a multiple of the block size)
else:
read arbitrary number of bytes from an arbitrary location
We need to differentiate, because the EEPROM supports two modes:
* single byte write: writes at random addresses
* page writes: write 32 bytes at once at a random address, but wrap around at the page boundary,
potentially overwriting data - so we need to ensure that we start a page write at a page boundary so we
don't cross page boundaries
"""
bytes_to_write = len(buf)
if bytes_to_write + block_num * self.__block_size + offset > self.__max_size:
raise RuntimeError("Error: operation would read beyond the maximum address")
if offset == 0:
block_count, remainder = divmod(bytes_to_write, self.__block_size)
if remainder != 0:
raise RuntimeError("Buffer length not a multiple of the block size")
for i in range(block_count):
address = self.__calculate_address(block_num + i, 0)
start = i * self.__block_size
end = start + self.__block_size
self.__write_page(address, buf[start:end])
else:
block_count, remainder = divmod(bytes_to_write, self.__block_size)
if remainder != self.__block_size - offset:
raise RuntimeError("Buffer length not a multiple of the block size + offset")
address = self.__calculate_address(block_num, offset)
self.__write_page(address, buf[:self.__block_size - offset])
for i in range(block_count):
address = self.__calculate_address(block_num + i + 1, 0)
start = i * self.__block_size + self.__block_size - offset
end = start + self.__block_size
self.__write_page(address, buf[start:end])
def ioctl(self, op, arg):
if op == 4: # block count
return self.__block_count
if op == 5: # block size
return self.__block_size
# Needed for littlefs
if op == 6: # erase
return 0
import binascii
import sys
import time
from umnp.protocol.constants import MSG_STRING_ENCODING
if sys.implementation.name == "micropython":
# noinspection PyUnresolvedReferences
import machine
else:
from umnp.microcontroller.umock import machine
class MeasurementDevice:
def __init__(self):
self._boot_time = time.time()
self._identifier_raw = machine.unique_id()
self._identifier = binascii.hexlify(self.identifier_raw).decode(MSG_STRING_ENCODING)
self._network = None
@property
def boot_time(self):
return self._boot_time
def add_network_adapter(self, adapter):
self._network = adapter
def generated_mac_string(self) -> str:
machine_id = self.identifier_raw[:6]
return ':'.join(f'{digit:02x}' for digit in machine_id)
def generated_mac_raw(self) -> bytes:
return self.identifier_raw[:6]
@property
def identifier_raw(self) -> bytes:
return self._identifier_raw
@property
def identifier(self) -> str:
return self._identifier
LISTEN_TIMEOUT_MS = 1
def calculate_broadcast_ip(ip: str, mask: str) -> str:
if ip is None or mask is None:
return ""
ip = [int(x) for x in ip.split(".")]
mask = [int(x) for x in mask.split(".")]
if len(ip) != 4 or len(mask) != 4:
return ""
bc = [(i | ~j) & 0xff for i, j in zip(ip, mask)]
bc = ".".join(str(x) for x in bc)
return bc
# micropython code
# can't use abstract baseclass here
class EthernetAdapter:
def __init__(self):
pass
@property
def ip(self):
return
@property
def netmask(self):
return
def enable_dhcp(self):
pass
from umnp.microcontroller.network.ethernet import EthernetAdapter
import sys
if sys.implementation.name == "micropython":
# noinspection PyUnresolvedReferences
import machine
# noinspection PyUnresolvedReferences
import network
else:
from umnp.microcontroller.umock import machine, network
class EthernetW5500(EthernetAdapter):
def __init__(self, spi: machine.SPI, pin1: int, pin2: int, mac: bytes, dhcp=False):
super().__init__()
self._spi = spi
self._pin1 = machine.Pin(pin1)
self._pin2 = machine.Pin(pin2)
self._nic = network.WIZNET5K(spi, self._pin1, self._pin2)
# self._nic.active(True)
self._nic.config(mac=mac)
if dhcp:
self.enable_dhcp()
def get_dhcp(self):
while True:
print("Requesting IP via DHCP")
try:
self._nic.ifconfig('dhcp')
return
except OSError:
pass
def enable_dhcp(self):
self.get_dhcp()
print(self._nic.ifconfig())
@property
def netmask(self):
return self._nic.ifconfig()[1]
@property
def ip(self):
return self._nic.ifconfig()[0]
import sys
import socket
import select
from umnp.microcontroller.network import LISTEN_TIMEOUT_MS, calculate_broadcast_ip
if sys.implementation.name == "micropython":
# noinspection PyUnresolvedReferences
import uasyncio as asyncio
else:
import asyncio
class UDPReceiver:
def __init__(self, listen_ip: str, listen_port: int, timeout: int = LISTEN_TIMEOUT_MS):
self.socket = None
self._listen_port = listen_port
self.listen_ip = listen_ip
self.timeout = timeout
self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.socket.setblocking(False)
self.socket.bind((listen_ip, listen_port))
self.poller = select.poll()
self.poller.register(self.socket, select.POLLIN)
async def receive(self, controller):
timeout = self.timeout
while True:
if self.poller.poll(timeout):
buffer, address = self.socket.recvfrom(1024)
await controller.queue_incoming_message(buffer, address)
await asyncio.sleep(0)
class UDPSender:
def __init__(self, ip, netmask, target_port: int):
self.socket = None
self.ip = ip
self.netmask = netmask
self._target_port = target_port
if ip and netmask:
self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.broadcast_ip = calculate_broadcast_ip(ip, netmask)
async def broadcast(self, msg):
# print("sending %s" % msg)
if isinstance(msg, str):
msg = msg.encode('utf-8')
if self.socket:
self.socket.sendto(msg, (self.broadcast_ip, self._target_port))
try:
from machine import I2C
except ImportError:
from umnp.microcontroller.umock.machine import I2C
import time
import asyncio
# SHT25_READ_T_HOLD =
# SHT25_READ_RH_HOLD =
# SHT25_WRITE_USER_REGISTER =
# SHT25_READ_USER_REGISTER =
SHT25_READ_T_NO_HOLD = 0xF3
SHT25_READ_RH_NO_HOLD = 0xF5
SHT25_SOFT_RESET = 0xFE
NO_HOLD_WAIT_TIME = 18.5 / 1000 / 1000
SHT25_MEASUREMENT_TYPE_T = 0
SHT25_MEASUREMENT_TYPE_RH = 1
class SHT25:
def __init__(self, i2c: I2C):
self._i2c = i2c
self._address = 64
self._initialised = False
self._wait_rh_ms = 100
self._wait_t_ms = 100
self._nan = float("NAN")
devices = self._i2c.scan()
if self._address not in devices:
print("Device not found")
return
if self.reset():
print("Initialised")
self._initialised = True
else:
print("Could not initialise device")
@property
def initialised(self) -> bool:
return self._initialised
def _command(self, command: int) -> int:
pass
def _crc8(self, data):
# CRC-8-Dallas/Maxim for I2C with 0x31 polynomial
crc = 0x0
for byte in data:
crc ^= byte
for _ in range(8):
if crc & 0x80:
crc = (crc << 1) ^ 0x31
else:
crc = crc << 1
crc &= 0xFF
return crc
def reset(self) -> bool:
cmd = bytearray(1)
cmd[0] = SHT25_SOFT_RESET
n_ack = self._i2c.writeto(self._address, cmd)
if n_ack == 1:
return True
return False
def _decode(self, buffer):
lsb = buffer[1]
msb = buffer[0]
measurement_type = (lsb & 0x2) >> 1
lsb = lsb & ~0x03
if self._crc8(buffer[:2]) != buffer[2]:
return self._nan, self._nan
return lsb + (msb << 8), measurement_type
def _translate_temperature(self, buffer) -> float:
t_raw, measurement_type = self._decode(buffer)
if measurement_type != SHT25_MEASUREMENT_TYPE_T:
return self._nan
return -46.85 + 175.72 * t_raw / 2**16
def _translate_rh(self, buffer):
rh_raw, measurement_type = self._decode(buffer)
if measurement_type != SHT25_MEASUREMENT_TYPE_RH:
return self._nan
return -6 + 125 * rh_raw / 2**16
async def measure(self):
temperature = self._nan
rh = self._nan
if not self.initialised:
# print("Not initialised")
return temperature, rh
cmd = bytearray(1)
cmd[0] = SHT25_READ_T_NO_HOLD
n_ack = self._i2c.writeto(self._address, cmd)
# print("T req n_ack" ,n_ack)
if n_ack != 1:
return temperature, rh
time.sleep(NO_HOLD_WAIT_TIME * 2)
await asyncio.sleep(self._wait_t_ms / 1000)
data = self._i2c.readfrom(self._address, 3)
temperature = self._translate_temperature(data)
data = None
cmd[0] = SHT25_READ_RH_NO_HOLD
n_ack = self._i2c.writeto(self._address, cmd, False)
time.sleep(NO_HOLD_WAIT_TIME * 2)
self._i2c.readfrom(self._address, 0) # send stop signal
await asyncio.sleep(self._wait_rh_ms / 1000)
data = self._i2c.readfrom(self._address, 3)
rh = self._translate_rh(data)
return round(temperature, 3), round(rh, 3)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment