From 6ee2144f00514c510d79ba8f7f6b0c22757614b9 Mon Sep 17 00:00:00 2001 From: Andreas Gattringer <andreas.gattringer@univie.ac.at> Date: Wed, 6 Dec 2023 08:51:39 +0100 Subject: [PATCH] added SHT45 support --- .../microcontroller/sensors/sht45/__init__.py | 94 +++++++++++++++++++ 1 file changed, 94 insertions(+) create mode 100644 umnp/microcontroller/sensors/sht45/__init__.py diff --git a/umnp/microcontroller/sensors/sht45/__init__.py b/umnp/microcontroller/sensors/sht45/__init__.py new file mode 100644 index 0000000..83401c0 --- /dev/null +++ b/umnp/microcontroller/sensors/sht45/__init__.py @@ -0,0 +1,94 @@ +try: + from machine import I2C +except ImportError: + from umnp.microcontroller.umock.machine import I2C + +import asyncio +import time + +SHT45_HIRES = 0xFD +NO_HOLD_WAIT_TIME = 18.5 / 1000 / 1000 +SHT45_WAIT_TIME_MS = 9 +SHT45_RESET_TIME_MS = 1 +SHT45_SOFT_RESET = 0x94 +SHT45_DEFAULT_ADDRESS = 0x44 + + +class SHT45: + def __init__(self, i2c: I2C, i2c_address=SHT45_DEFAULT_ADDRESS): + self._i2c = i2c + self._i2c_address = i2c_address + self._initialised = False + self._nan = float("NAN") + + devices = self._i2c.scan() + + if self._i2c_address not in devices: + raise RuntimeError("SHT45: Device not found") + + if self.reset(): + self._initialised = True + else: + print("Could not initialise device") + + @property + def initialised(self) -> bool: + return self._initialised + + def _command(self, command: int) -> int: + pass + + def _calulate_crc8(self, data, polynom=0x31, init=0xFF): + # CRC-8-Dallas/Maxim for I2C with 0x31 polynomial + crc = init + for byte in data: + crc ^= byte + for _ in range(8): + if crc & 0x80: + crc = (crc << 1) ^ polynom + else: + crc = crc << 1 + crc &= 0xFF + return crc + + def reset(self) -> bool: + cmd = bytearray(1) + cmd[0] = SHT45_SOFT_RESET + n_ack = self._i2c.writeto(self._i2c_address, cmd) + time.sleep(SHT45_RESET_TIME_MS / 1000) + if n_ack == 1: + return True + return False + + def _decode_and_verify_value(self, buffer): + if self._calulate_crc8(buffer[:2]) != buffer[2]: + return self._nan + + return buffer[1] + (buffer[0] << 8) + + def _translate_temperature(self, buffer) -> float: + t_raw = self._decode_and_verify_value(buffer) + return -45 + 175 * t_raw / (2**16 - 1) + + def _translate_rh(self, buffer): + rh_raw = self._decode_and_verify_value(buffer) + return -6 + 125 * rh_raw / (2**16 - 1) + + async def measure(self): + temperature = self._nan + rh = self._nan + if not self.initialised: + return temperature, rh + + cmd = bytearray(1) + cmd[0] = SHT45_HIRES + n_ack = self._i2c.writeto(self._i2c_address, cmd) + if n_ack != 1: + return temperature, rh + await asyncio.sleep(SHT45_WAIT_TIME_MS / 1000) + + data = self._i2c.readfrom(self._i2c_address, 6) + temperature = self._translate_temperature(data[0:3]) + rh = self._translate_rh(data[3:6]) + + return temperature, rh -- GitLab