Skip to content
Snippets Groups Projects
Commit 8bc4ee03 authored by Andreas Gattringer's avatar Andreas Gattringer
Browse files

added lps28dfw support

parent 6ee2144f
No related branches found
No related tags found
No related merge requests found
import asyncio
import struct
import time
try:
from machine import I2C
except ImportError:
from umnp.microcontroller.umock.machine import I2C
LPS28DFW_DEFAULT_ADDRESS = 0x5C
LPS28DFW_READ = 0xB9
LPS28DFW_WRITE = 0xB8
LPS28DFW_CTRL_REG1 = 0x10
LPS28DFW_CTRL_REG2 = 0x11
LPS28DFW_CTRL_REG3 = 0x12
LPS28DFW_CTRL_REG4 = 0x13
LPS28DFW_WHO_AM_I = 0x0F
LPS28DFW_TEMP_OUT_H = 0x2B
LPS28DFW_TEMP_OUT_L = 0x2C
LPS28DFW_PRESS_OUT_H = 0x2A
LPS28DFW_PRESS_OUT_L = 0x29
LPS28DFW_PRESS_OUT_XL = 0x28
LPS28DFW_STATUS = 0x27
VALID_FREQUENCIES_HZ = [0, 1, 4, 10, 25, 50, 75, 100, 200]
VALID_AVERAGING = [4, 8, 16, 32, 64, 128, 512]
class LPS28DFW:
def __init__(self, i2c: I2C, i2c_address=LPS28DFW_DEFAULT_ADDRESS):
self._i2c = i2c
self._i2c_address = i2c_address
self._initialised = False
self._nan = float("NAN")
devices = self._i2c.scan()
if self._i2c_address not in devices:
raise RuntimeError("LPS28DFW: Device not found")
if self.initialise():
self._initialised = True
else:
raise RuntimeError("Could not initialise device")
self._divisions = self._get_divisions()
def initialise(self) -> bool:
self.set_frequency(25)
self.set_averaging(512)
return True
def read_address(self, address, n_bytes):
return self._i2c.readfrom_mem(self._i2c_address, address, n_bytes)
def write_address(self, address, buffer) -> None:
self._i2c.writeto_mem(self._i2c_address, address, buffer)
def _get_divisions(self) -> int:
reg2 = self.read_address_to_int(LPS28DFW_CTRL_REG2, 1)
mode = self.get_bit(reg2, 6)
if mode == 0:
return 4096
else:
return 2048
async def one_shot(self):
reg1 = self.read_address_to_int(LPS28DFW_CTRL_REG1, 1)
reg1 = self.unset_bit(reg1, 3)
reg1 = self.unset_bit(reg1, 4)
reg1 = self.unset_bit(reg1, 5)
reg1 = self.unset_bit(reg1, 6)
self.write_single_byte(LPS28DFW_CTRL_REG1, reg1)
reg2 = self.read_address_to_int(LPS28DFW_CTRL_REG2, 1)
reg2 = self.set_bit(reg2, 0)
self.write_single_byte(LPS28DFW_CTRL_REG2, reg2)
await asyncio.sleep(0.01)
data = self.read_address_to_int(LPS28DFW_PRESS_OUT_XL, 3)
pressure = data / self._get_divisions()
return pressure
def set_frequency(self, frequency):
if frequency not in VALID_FREQUENCIES_HZ:
raise RuntimeError(f"Invalid frequency {frequency}")
idx = VALID_FREQUENCIES_HZ.index(frequency)
reg1 = self.read_address_to_int(LPS28DFW_CTRL_REG1, 1)
reg1 &= 0b111
reg1 = (idx << 3) + reg1
self.write_single_byte(LPS28DFW_CTRL_REG1, reg1)
def set_averaging(self, count):
if count not in VALID_AVERAGING:
raise RuntimeError(f"Invalid averaging count {count}")
idx = VALID_AVERAGING.index(count)
reg1 = self.read_address_to_int(LPS28DFW_CTRL_REG1, 1)
reg1 &= 0b11111000
reg1 += idx
self.write_single_byte(LPS28DFW_CTRL_REG1, reg1)
def debug(self):
whoami = self.read_address(LPS28DFW_WHO_AM_I, 1)
print("Who am I", whoami)
reg1 = self.read_address(LPS28DFW_CTRL_REG1, 1)
print("Register 1", reg1)
reg2 = self.read_address(LPS28DFW_CTRL_REG2, 1)
print("Register 2:", reg2)
reg3 = self.read_address(LPS28DFW_CTRL_REG3, 1)
print("Register 3", reg3)
def write_single_byte(self, register, value):
buffer = bytearray(1)
buffer[0] = value
self.write_address(register, buffer)
@staticmethod
def set_bit(value: int, bit: int) -> int:
mask = 1 << bit
value |= mask
return value
@staticmethod
def unset_bit(value: int, bit: int) -> int:
mask = 1 << bit
value &= ~mask
return value
@staticmethod
def get_bit(value: int, bit: int) -> int:
mask = 1 << bit
return value & mask
def read_address_to_int(self, address, n_bytes):
data = self.read_address(address, n_bytes)
if n_bytes == 1:
return int.from_bytes(data, "little")
elif n_bytes == 2:
return struct.unpack("<h", data)[0]
elif n_bytes == 3:
value = struct.unpack("<I", data + "\0")[0]
if value & 0x800000:
return value - 0x1000000
else:
return value
raise RuntimeError(f"can't convert {n_bytes} byte integers")
def boot(self):
# fresh the content of the internal registers stored in the non-volatile memory block
reg2 = self.read_address_to_int(LPS28DFW_CTRL_REG2, 1)
reg2 = self.set_bit(reg2, 7)
self.write_single_byte(LPS28DFW_CTRL_REG2, reg2)
time.sleep(0.1)
self._divisions = self._get_divisions()
def software_reset(self):
# resets the volatile registers to the default value
reg2 = self.read_address_to_int(LPS28DFW_CTRL_REG2, 1)
reg2 = self.set_bit(reg2, 2)
self.write_single_byte(LPS28DFW_CTRL_REG2, reg2)
time.sleep(0.1)
self._divisions = self._get_divisions()
def pressure_status(self):
status = self.read_address_to_int(LPS28DFW_STATUS, 1)
avail_mask = 1 << 0
overrun_mask = 1 << 4
return status & avail_mask == avail_mask, status & overrun_mask == overrun_mask
def temperature_status(self):
status = self.read_address_to_int(LPS28DFW_STATUS, 1)
avail_mask = 1 << 1
overrun_mask = 1 << 5
return status & avail_mask == avail_mask, status & overrun_mask == overrun_mask
async def pressure(self):
while True:
avail, overrun = self.pressure_status()
if avail:
break
await asyncio.sleep(0.01)
pressure = self.read_address_to_int(LPS28DFW_PRESS_OUT_XL, 3)
pressure = pressure / self._divisions
return pressure
async def temperature(self):
while True:
avail, overrun = self.temperature_status()
if avail:
break
await asyncio.sleep(0.01)
temperature = self.read_address_to_int(LPS28DFW_TEMP_OUT_L, 2)
temperature = temperature / 100
return temperature
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment