diff --git a/umnp/microcontroller/sensors/lps28dfw/__init__.py b/umnp/microcontroller/sensors/lps28dfw/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..31d0b7904f1c8ac80f9d4003a0004a151897d431 --- /dev/null +++ b/umnp/microcontroller/sensors/lps28dfw/__init__.py @@ -0,0 +1,205 @@ +import asyncio +import struct +import time + +try: + from machine import I2C +except ImportError: + from umnp.microcontroller.umock.machine import I2C + +LPS28DFW_DEFAULT_ADDRESS = 0x5C +LPS28DFW_READ = 0xB9 +LPS28DFW_WRITE = 0xB8 + +LPS28DFW_CTRL_REG1 = 0x10 +LPS28DFW_CTRL_REG2 = 0x11 +LPS28DFW_CTRL_REG3 = 0x12 +LPS28DFW_CTRL_REG4 = 0x13 + +LPS28DFW_WHO_AM_I = 0x0F + +LPS28DFW_TEMP_OUT_H = 0x2B +LPS28DFW_TEMP_OUT_L = 0x2C + + +LPS28DFW_PRESS_OUT_H = 0x2A +LPS28DFW_PRESS_OUT_L = 0x29 +LPS28DFW_PRESS_OUT_XL = 0x28 +LPS28DFW_STATUS = 0x27 + +VALID_FREQUENCIES_HZ = [0, 1, 4, 10, 25, 50, 75, 100, 200] +VALID_AVERAGING = [4, 8, 16, 32, 64, 128, 512] + + +class LPS28DFW: + def __init__(self, i2c: I2C, i2c_address=LPS28DFW_DEFAULT_ADDRESS): + self._i2c = i2c + self._i2c_address = i2c_address + self._initialised = False + self._nan = float("NAN") + + devices = self._i2c.scan() + + if self._i2c_address not in devices: + raise RuntimeError("LPS28DFW: Device not found") + + if self.initialise(): + self._initialised = True + else: + raise RuntimeError("Could not initialise device") + + self._divisions = self._get_divisions() + + def initialise(self) -> bool: + self.set_frequency(25) + self.set_averaging(512) + return True + + def read_address(self, address, n_bytes): + return self._i2c.readfrom_mem(self._i2c_address, address, n_bytes) + + def write_address(self, address, buffer) -> None: + self._i2c.writeto_mem(self._i2c_address, address, buffer) + + def _get_divisions(self) -> int: + reg2 = self.read_address_to_int(LPS28DFW_CTRL_REG2, 1) + mode = self.get_bit(reg2, 6) + if mode == 0: + return 4096 + else: + return 2048 + + async def one_shot(self): + reg1 = self.read_address_to_int(LPS28DFW_CTRL_REG1, 1) + reg1 = self.unset_bit(reg1, 3) + reg1 = self.unset_bit(reg1, 4) + reg1 = self.unset_bit(reg1, 5) + reg1 = self.unset_bit(reg1, 6) + self.write_single_byte(LPS28DFW_CTRL_REG1, reg1) + + reg2 = self.read_address_to_int(LPS28DFW_CTRL_REG2, 1) + reg2 = self.set_bit(reg2, 0) + self.write_single_byte(LPS28DFW_CTRL_REG2, reg2) + await asyncio.sleep(0.01) + data = self.read_address_to_int(LPS28DFW_PRESS_OUT_XL, 3) + pressure = data / self._get_divisions() + return pressure + + def set_frequency(self, frequency): + if frequency not in VALID_FREQUENCIES_HZ: + raise RuntimeError(f"Invalid frequency {frequency}") + idx = VALID_FREQUENCIES_HZ.index(frequency) + reg1 = self.read_address_to_int(LPS28DFW_CTRL_REG1, 1) + reg1 &= 0b111 + reg1 = (idx << 3) + reg1 + self.write_single_byte(LPS28DFW_CTRL_REG1, reg1) + + def set_averaging(self, count): + if count not in VALID_AVERAGING: + raise RuntimeError(f"Invalid averaging count {count}") + idx = VALID_AVERAGING.index(count) + reg1 = self.read_address_to_int(LPS28DFW_CTRL_REG1, 1) + reg1 &= 0b11111000 + reg1 += idx + self.write_single_byte(LPS28DFW_CTRL_REG1, reg1) + + def debug(self): + whoami = self.read_address(LPS28DFW_WHO_AM_I, 1) + print("Who am I", whoami) + + reg1 = self.read_address(LPS28DFW_CTRL_REG1, 1) + print("Register 1", reg1) + + reg2 = self.read_address(LPS28DFW_CTRL_REG2, 1) + print("Register 2:", reg2) + + reg3 = self.read_address(LPS28DFW_CTRL_REG3, 1) + print("Register 3", reg3) + + def write_single_byte(self, register, value): + buffer = bytearray(1) + buffer[0] = value + self.write_address(register, buffer) + + @staticmethod + def set_bit(value: int, bit: int) -> int: + mask = 1 << bit + value |= mask + return value + + @staticmethod + def unset_bit(value: int, bit: int) -> int: + mask = 1 << bit + value &= ~mask + + return value + + @staticmethod + def get_bit(value: int, bit: int) -> int: + mask = 1 << bit + return value & mask + + def read_address_to_int(self, address, n_bytes): + data = self.read_address(address, n_bytes) + if n_bytes == 1: + return int.from_bytes(data, "little") + elif n_bytes == 2: + return struct.unpack("<h", data)[0] + elif n_bytes == 3: + value = struct.unpack("<I", data + "\0")[0] + if value & 0x800000: + return value - 0x1000000 + else: + return value + raise RuntimeError(f"can't convert {n_bytes} byte integers") + + def boot(self): + # fresh the content of the internal registers stored in the non-volatile memory block + reg2 = self.read_address_to_int(LPS28DFW_CTRL_REG2, 1) + reg2 = self.set_bit(reg2, 7) + + self.write_single_byte(LPS28DFW_CTRL_REG2, reg2) + time.sleep(0.1) + self._divisions = self._get_divisions() + + def software_reset(self): + # resets the volatile registers to the default value + reg2 = self.read_address_to_int(LPS28DFW_CTRL_REG2, 1) + reg2 = self.set_bit(reg2, 2) + self.write_single_byte(LPS28DFW_CTRL_REG2, reg2) + time.sleep(0.1) + self._divisions = self._get_divisions() + + def pressure_status(self): + status = self.read_address_to_int(LPS28DFW_STATUS, 1) + avail_mask = 1 << 0 + overrun_mask = 1 << 4 + return status & avail_mask == avail_mask, status & overrun_mask == overrun_mask + + def temperature_status(self): + status = self.read_address_to_int(LPS28DFW_STATUS, 1) + avail_mask = 1 << 1 + overrun_mask = 1 << 5 + return status & avail_mask == avail_mask, status & overrun_mask == overrun_mask + + async def pressure(self): + while True: + avail, overrun = self.pressure_status() + if avail: + break + await asyncio.sleep(0.01) + + pressure = self.read_address_to_int(LPS28DFW_PRESS_OUT_XL, 3) + pressure = pressure / self._divisions + return pressure + + async def temperature(self): + while True: + avail, overrun = self.temperature_status() + if avail: + break + await asyncio.sleep(0.01) + + temperature = self.read_address_to_int(LPS28DFW_TEMP_OUT_L, 2) + temperature = temperature / 100 + return temperature