Skip to content
Snippets Groups Projects
Commit 436e8672 authored by Andreas Gattringer's avatar Andreas Gattringer
Browse files

mass update

- renames
- initial ae33 support
- cleanups
parent 8bc4ee03
No related branches found
No related tags found
No related merge requests found
Showing
with 236 additions and 18 deletions
import binascii
from protocol.data_message import DataMessage
from umnp.microcontroller.umock.machine import SPI, Pin
from umnp.microcontroller.umock.network import WIZNET5K
from umnp.protocol import Message
from umnp.protocol.message import Message
from umnp.protocol.messagetype import MessageType
x = DataMessage("abasdc", None, None)
w = WIZNET5K(SPI(), Pin(1), Pin(1), mac=b'')
print(w.config('mac'))
x = Message(MessageType.MSG_DEVICE_DATA, "abasdc", None)
w = WIZNET5K(SPI(), Pin(1), Pin(1), mac=b"")
print(w.config("mac"))
w.config(mac=3)
print(w.config('mac'))
print(w.config("mac"))
encoded = x.encode()
print("====")
print(binascii.hexlify(encoded))
......
import sys
from umnp.microcontroller.communication.communicator import Communicator
from umnp.microcontroller.communication.udp_communicator import UDPCommunicator
from umnp.microcontroller.devices.network.ethernet_w5500 import EthernetW5500
from umnp.microcontroller.devices.network.udp import UDPSender, UDPReceiver
from umnp.microcontroller.measurementdevice import MeasurementDevice
from umnp.microcontroller.network.ethernet_w5500 import EthernetW5500
from umnp.microcontroller.network.udp import UDPSender, UDPReceiver
from umnp.microcontroller.sensors.sht25 import SHT25
from umnp.microcontroller.tasks.periodictask import PeriodicTask
if sys.implementation.name == "micropython":
# noinspection PyUnresolvedReferences
import machine
# noinspection PyUnresolvedReferences
import uasyncio as asyncio
else:
......@@ -27,7 +28,9 @@ def test_function(*args):
def main():
# configure network
device = MeasurementDevice()
spi = machine.SPI(0, 2_000_000, mosi=machine.Pin(19), miso=machine.Pin(16), sck=machine.Pin(18))
spi = machine.SPI(
0, 2_000_000, mosi=machine.Pin(19), miso=machine.Pin(16), sck=machine.Pin(18)
)
ether = EthernetW5500(spi, 17, 20, mac=device.generated_mac_raw(), dhcp=True)
device.add_network_adapter(ether)
......@@ -36,7 +39,9 @@ def main():
sender = UDPSender(ether.ip, ether.netmask, 7777)
receiver = UDPReceiver(ether.ip, 7776)
comm = Communicator(receiver=receiver, sender=sender, device_id=device.identifier)
comm = UDPCommunicator(
receiver=receiver, sender=sender, device_id=device.identifier
)
x = PeriodicTask(test_function, print, 1000)
comm.add_task(x, "test_function")
......
import sys
from umnp.microcontroller.communication.udp_communicator import UDPCommunicator
from umnp.microcontroller.devices.network.ethernet_w5500 import EthernetW5500
from umnp.microcontroller.devices.network.udp import UDPSender, UDPReceiver
from umnp.microcontroller.measurementdevice import MeasurementDevice
from umnp.microcontroller.sensors.lps28dfw import LPS28DFW
from umnp.microcontroller.sensors.sht45 import SHT45
from umnp.microcontroller.tasks.periodictask import PeriodicTask
if sys.implementation.name == "micropython":
# noinspection PyUnresolvedReferences
import machine
# noinspection PyUnresolvedReferences
import uasyncio as asyncio
else:
from umnp.microcontroller.umock import machine
import asyncio
def test_function(*args):
print("test_function called with: ", *args)
result = " - ".join(*args)
print(f"test_function returns '{result}'")
return result
def main():
# configure network
device = MeasurementDevice()
spi = machine.SPI(
0, 2_000_000, mosi=machine.Pin(19), miso=machine.Pin(16), sck=machine.Pin(18)
)
ether = EthernetW5500(spi, 17, 20, mac=device.generated_mac_raw(), dhcp=True)
device.add_network_adapter(ether)
i2c = machine.I2C(id=1, scl=machine.Pin(27), sda=machine.Pin(26))
sht45 = SHT45(i2c)
p_sensor = LPS28DFW(i2c)
sender = UDPSender(ether.ip, ether.netmask, 7777)
receiver = UDPReceiver(ether.ip, 7776)
comm = UDPCommunicator(
receiver=receiver, sender=sender, device_id=device.identifier
)
x = PeriodicTask(test_function, print, 1000)
comm.add_task(x, "test_function")
# start
asyncio.run(comm.start())
File moved
import datetime
import logging
import os
import time
from umnp.communication.serial_connection import SerialConnection
from umnp.devices.aethalometer.ae33 import AE33
now = datetime.datetime.now()
if now.microsecond >= 500 * 1000:
now = now + datetime.timedelta(seconds=1)
now = now.replace(microsecond=0)
now_string = now.isoformat().replace(":", "-")
logger = logging.getLogger("ae33-test")
logger.setLevel(logging.DEBUG)
log_file_fn = os.path.join("logs", f"ae33-{now_string}.log")
log_file = logging.FileHandler(log_file_fn)
log_file.setLevel(logging.DEBUG)
logger.addHandler(log_file)
def main():
conn_opts = {"baudrate": 115200, "dsrdtr": True, "rtscts": False, "xonoff": False}
connection = SerialConnection("/dev/ttyUSB2", options=conn_opts)
ae33 = AE33(connection)
while True:
current = ae33.current_measurement()
if current:
logging.info(current)
time.sleep(1)
if __name__ == "__main__":
main()
SERIAL_DEFAULT_BAUD_RATE = 112512
SERIAL_DEFAULT_WAIT_TIME_S = 0.01
class ConnectionProblemException(Exception):
pass
import asyncio
from umnp.communication import SERIAL_DEFAULT_BAUD_RATE, SERIAL_DEFAULT_WAIT_TIME_S
class AbstractSerialConnection:
def __init__(self, address, options: dict | None = None):
self.__connection = None
self.__lock = asyncio.Lock()
self.__address = address
if options is None:
self.__options = {}
else:
self.__options = options
self.__baud_rate = self.get_option("baudrate", SERIAL_DEFAULT_BAUD_RATE)
self.__wait_time = self.get_option("wait time", SERIAL_DEFAULT_WAIT_TIME_S)
self.__line_sep_read = self.get_option("read separator", b"\r")
self.__line_sep_write = self.get_option("write separator", b"\r")
self.__max_connection_attempts = self.get_option("max connection attempts", 3)
def connect(self):
raise NotImplementedError
def disconnect(self):
raise NotImplementedError
def sync_command(self, command, expected_lines=None, show_reply=False):
raise NotImplementedError
def send_command(self, command):
pass
def read_line(self, timeout=None):
raise NotImplementedError
def get_option(self, name: str, default=None):
if self.__options is None:
return default
return self.__options.get(name)
import logging
import time
import serial
from umnp.communication import ConnectionProblemException
from umnp.communication.abstract_serial_connection import AbstractSerialConnection
class SerialConnection(AbstractSerialConnection):
def __init__(self, address, options=None):
super().__init__(address, options)
def connect(self):
if self.__connection:
return
attempts = 0
connection = None
max_attempts = self.__max_connection_attempts
while attempts < max_attempts:
try:
connection = serial.Serial(
port=self.__address,
baudrate=self.__baud_rate,
parity=self.get_option("parity", serial.PARITY_NONE),
stopbits=self.get_option("stopbits", serial.STOPBITS_ONE),
timeout=self.__wait_time,
xonxoff=self.get_option("xonoff", False),
rtscts=self.get_option("rtscts", False),
dsrdtr=self.get_option("dsrdtr", False),
)
except serial.serialutil.SerialException as e:
raise ConnectionProblemException(e)
if connection:
break
attempts += 1
time.sleep(0.1)
if not connection:
logging.error(f"could not connect to {self.__address}")
return
time.sleep(0.1)
self.__connection = connection
connection.flush()
connection.reset_input_buffer()
connection.reset_output_buffer()
logging.info(f"connected to {self.__address}, baud rate: {self.__baud_rate}")
logging.debug(connection)
File moved
from umnp.communication.abstract_serial_connection import AbstractSerialConnection
class SerialConnection:
def __init__(self):
pass
class AE33:
def __init__(self, connection: AbstractSerialConnection):
self.__last_measurement = None
self.__connection = connection
pass
def request_measurement(self) -> str | None:
return self.__connection.sync_command("$A33:D1\r", 1)
def current_measurement(self) -> str | None:
current = self.request_measurement()
if current == self.__last_measurement:
return None
self.__last_measurement = current
return current
File moved
import sys
import time
from umnp.microcontroller.network.udp import UDPSender, UDPReceiver
from umnp.microcontroller.devices.network.udp import UDPSender, UDPReceiver
from umnp.microcontroller.tasks.periodictask import PeriodicTask
if sys.implementation.name == "micropython":
# noinspection PyUnresolvedReferences
import uasyncio as asyncio
# noinspection PyUnresolvedReferences
import machine
else:
......@@ -14,9 +15,10 @@ else:
from umnp.microcontroller.umock import machine
class Communicator:
def __init__(self, sender: UDPSender, receiver: UDPReceiver, device_id, max_msgs: int = 10):
class UDPCommunicator:
def __init__(
self, sender: UDPSender, receiver: UDPReceiver, device_id, max_msgs: int = 10
):
self._receive_lock = asyncio.Lock()
self._send_lock = asyncio.Lock()
......@@ -59,7 +61,14 @@ class Communicator:
if msg is not None:
msg = msg.replace(",", ";")
now = rtc.datetime()
now = "%04d-%02d-%02dT%02d:%02d:%02d" % (now[0], now[1], now[2], now[4], now[5], now[6])
now = "%04d-%02d-%02dT%02d:%02d:%02d" % (
now[0],
now[1],
now[2],
now[4],
now[5],
now[6],
)
await self._sender.broadcast("%s,%s,%s" % (device_id, now, msg))
await asyncio.sleep(0.5)
......@@ -97,4 +106,3 @@ class Communicator:
def add_task(self, task: PeriodicTask, name: str):
self._tasks[name] = task
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment