From 8bc4ee036d0d005d5eb450a540a496ccaef33183 Mon Sep 17 00:00:00 2001
From: Andreas Gattringer <andreas.gattringer@univie.ac.at>
Date: Wed, 6 Dec 2023 08:51:57 +0100
Subject: [PATCH] added lps28dfw support

---
 .../sensors/lps28dfw/__init__.py              | 205 ++++++++++++++++++
 1 file changed, 205 insertions(+)
 create mode 100644 umnp/microcontroller/sensors/lps28dfw/__init__.py

diff --git a/umnp/microcontroller/sensors/lps28dfw/__init__.py b/umnp/microcontroller/sensors/lps28dfw/__init__.py
new file mode 100644
index 0000000..31d0b79
--- /dev/null
+++ b/umnp/microcontroller/sensors/lps28dfw/__init__.py
@@ -0,0 +1,205 @@
+import asyncio
+import struct
+import time
+
+try:
+    from machine import I2C
+except ImportError:
+    from umnp.microcontroller.umock.machine import I2C
+
+LPS28DFW_DEFAULT_ADDRESS = 0x5C
+LPS28DFW_READ = 0xB9
+LPS28DFW_WRITE = 0xB8
+
+LPS28DFW_CTRL_REG1 = 0x10
+LPS28DFW_CTRL_REG2 = 0x11
+LPS28DFW_CTRL_REG3 = 0x12
+LPS28DFW_CTRL_REG4 = 0x13
+
+LPS28DFW_WHO_AM_I = 0x0F
+
+LPS28DFW_TEMP_OUT_H = 0x2B
+LPS28DFW_TEMP_OUT_L = 0x2C
+
+
+LPS28DFW_PRESS_OUT_H = 0x2A
+LPS28DFW_PRESS_OUT_L = 0x29
+LPS28DFW_PRESS_OUT_XL = 0x28
+LPS28DFW_STATUS = 0x27
+
+VALID_FREQUENCIES_HZ = [0, 1, 4, 10, 25, 50, 75, 100, 200]
+VALID_AVERAGING = [4, 8, 16, 32, 64, 128, 512]
+
+
+class LPS28DFW:
+    def __init__(self, i2c: I2C, i2c_address=LPS28DFW_DEFAULT_ADDRESS):
+        self._i2c = i2c
+        self._i2c_address = i2c_address
+        self._initialised = False
+        self._nan = float("NAN")
+
+        devices = self._i2c.scan()
+
+        if self._i2c_address not in devices:
+            raise RuntimeError("LPS28DFW: Device not found")
+
+        if self.initialise():
+            self._initialised = True
+        else:
+            raise RuntimeError("Could not initialise device")
+
+        self._divisions = self._get_divisions()
+
+    def initialise(self) -> bool:
+        self.set_frequency(25)
+        self.set_averaging(512)
+        return True
+
+    def read_address(self, address, n_bytes):
+        return self._i2c.readfrom_mem(self._i2c_address, address, n_bytes)
+
+    def write_address(self, address, buffer) -> None:
+        self._i2c.writeto_mem(self._i2c_address, address, buffer)
+
+    def _get_divisions(self) -> int:
+        reg2 = self.read_address_to_int(LPS28DFW_CTRL_REG2, 1)
+        mode = self.get_bit(reg2, 6)
+        if mode == 0:
+            return 4096
+        else:
+            return 2048
+
+    async def one_shot(self):
+        reg1 = self.read_address_to_int(LPS28DFW_CTRL_REG1, 1)
+        reg1 = self.unset_bit(reg1, 3)
+        reg1 = self.unset_bit(reg1, 4)
+        reg1 = self.unset_bit(reg1, 5)
+        reg1 = self.unset_bit(reg1, 6)
+        self.write_single_byte(LPS28DFW_CTRL_REG1, reg1)
+
+        reg2 = self.read_address_to_int(LPS28DFW_CTRL_REG2, 1)
+        reg2 = self.set_bit(reg2, 0)
+        self.write_single_byte(LPS28DFW_CTRL_REG2, reg2)
+        await asyncio.sleep(0.01)
+        data = self.read_address_to_int(LPS28DFW_PRESS_OUT_XL, 3)
+        pressure = data / self._get_divisions()
+        return pressure
+
+    def set_frequency(self, frequency):
+        if frequency not in VALID_FREQUENCIES_HZ:
+            raise RuntimeError(f"Invalid frequency {frequency}")
+        idx = VALID_FREQUENCIES_HZ.index(frequency)
+        reg1 = self.read_address_to_int(LPS28DFW_CTRL_REG1, 1)
+        reg1 &= 0b111
+        reg1 = (idx << 3) + reg1
+        self.write_single_byte(LPS28DFW_CTRL_REG1, reg1)
+
+    def set_averaging(self, count):
+        if count not in VALID_AVERAGING:
+            raise RuntimeError(f"Invalid averaging count {count}")
+        idx = VALID_AVERAGING.index(count)
+        reg1 = self.read_address_to_int(LPS28DFW_CTRL_REG1, 1)
+        reg1 &= 0b11111000
+        reg1 += idx
+        self.write_single_byte(LPS28DFW_CTRL_REG1, reg1)
+
+    def debug(self):
+        whoami = self.read_address(LPS28DFW_WHO_AM_I, 1)
+        print("Who am I", whoami)
+
+        reg1 = self.read_address(LPS28DFW_CTRL_REG1, 1)
+        print("Register 1", reg1)
+
+        reg2 = self.read_address(LPS28DFW_CTRL_REG2, 1)
+        print("Register 2:", reg2)
+
+        reg3 = self.read_address(LPS28DFW_CTRL_REG3, 1)
+        print("Register 3", reg3)
+
+    def write_single_byte(self, register, value):
+        buffer = bytearray(1)
+        buffer[0] = value
+        self.write_address(register, buffer)
+
+    @staticmethod
+    def set_bit(value: int, bit: int) -> int:
+        mask = 1 << bit
+        value |= mask
+        return value
+
+    @staticmethod
+    def unset_bit(value: int, bit: int) -> int:
+        mask = 1 << bit
+        value &= ~mask
+
+        return value
+
+    @staticmethod
+    def get_bit(value: int, bit: int) -> int:
+        mask = 1 << bit
+        return value & mask
+
+    def read_address_to_int(self, address, n_bytes):
+        data = self.read_address(address, n_bytes)
+        if n_bytes == 1:
+            return int.from_bytes(data, "little")
+        elif n_bytes == 2:
+            return struct.unpack("<h", data)[0]
+        elif n_bytes == 3:
+            value = struct.unpack("<I", data + "\0")[0]
+            if value & 0x800000:
+                return value - 0x1000000
+            else:
+                return value
+        raise RuntimeError(f"can't convert {n_bytes} byte integers")
+
+    def boot(self):
+        # fresh the content of the internal registers stored in the non-volatile memory block
+        reg2 = self.read_address_to_int(LPS28DFW_CTRL_REG2, 1)
+        reg2 = self.set_bit(reg2, 7)
+
+        self.write_single_byte(LPS28DFW_CTRL_REG2, reg2)
+        time.sleep(0.1)
+        self._divisions = self._get_divisions()
+
+    def software_reset(self):
+        # resets the volatile registers to the default value
+        reg2 = self.read_address_to_int(LPS28DFW_CTRL_REG2, 1)
+        reg2 = self.set_bit(reg2, 2)
+        self.write_single_byte(LPS28DFW_CTRL_REG2, reg2)
+        time.sleep(0.1)
+        self._divisions = self._get_divisions()
+
+    def pressure_status(self):
+        status = self.read_address_to_int(LPS28DFW_STATUS, 1)
+        avail_mask = 1 << 0
+        overrun_mask = 1 << 4
+        return status & avail_mask == avail_mask, status & overrun_mask == overrun_mask
+
+    def temperature_status(self):
+        status = self.read_address_to_int(LPS28DFW_STATUS, 1)
+        avail_mask = 1 << 1
+        overrun_mask = 1 << 5
+        return status & avail_mask == avail_mask, status & overrun_mask == overrun_mask
+
+    async def pressure(self):
+        while True:
+            avail, overrun = self.pressure_status()
+            if avail:
+                break
+            await asyncio.sleep(0.01)
+
+        pressure = self.read_address_to_int(LPS28DFW_PRESS_OUT_XL, 3)
+        pressure = pressure / self._divisions
+        return pressure
+
+    async def temperature(self):
+        while True:
+            avail, overrun = self.temperature_status()
+            if avail:
+                break
+            await asyncio.sleep(0.01)
+
+        temperature = self.read_address_to_int(LPS28DFW_TEMP_OUT_L, 2)
+        temperature = temperature / 100
+        return temperature
-- 
GitLab