Skip to content
Snippets Groups Projects
Commit d7902dc7 authored by Andreas Gattringer's avatar Andreas Gattringer
Browse files

new umnp program prototype for rHtp sensors

parent 2bdcaa8b
No related branches found
No related tags found
No related merge requests found
import sys
from umnp.microcontroller.devices.network.ethernet_w5500 import EthernetW5500
from umnp.microcontroller.measurementdevice import MeasurementDevice
from umnp.microcontroller.sensors.lps28dfw import LPS28DFW
from umnp.microcontroller.sensors.sht45 import SHT45
from umnp.microcontroller.tasks.periodictask import PeriodicTask
if sys.implementation.name == "micropython":
# noinspection PyUnresolvedReferences
import machine
# noinspection PyUnresolvedReferences
import uasyncio as asyncio
else:
from umnp.microcontroller.umock import machine
async def aggregate_and_send(
device: MeasurementDevice, sht45: SHT45, p_sensor: LPS28DFW
):
comm = device.communicator
t, rh = await sht45.measure()
p, p_t = await p_sensor.measure()
data = f"{t},{rh},{p},{p_t}"
await comm.send_data_message(data)
async def main():
# configure network
device = MeasurementDevice()
spi = machine.SPI(
0, 2_000_000, mosi=machine.Pin(19), miso=machine.Pin(16), sck=machine.Pin(18)
)
i2c = machine.I2C(id=1, scl=machine.Pin(27), sda=machine.Pin(26))
ether = EthernetW5500(
spi, machine.Pin(17), machine.Pin(20), mac=device.generated_mac_raw(), dhcp=True
)
device.add_network_adapter(ether)
comm = device.create_communicator()
sht45 = SHT45(i2c)
p_sensor = LPS28DFW(i2c)
task = PeriodicTask(aggregate_and_send, None, 1000, device, sht45, p_sensor)
comm.add_task(task, "aggregate_and_send")
# start
asyncio.run(comm.start())
if __name__ == "__main__":
asyncio.run(main())
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment