diff --git a/programs/dmatest_v2.py b/programs/dmatest_v2.py new file mode 100644 index 0000000000000000000000000000000000000000..d0b83a5b0b1782115efe4da51d745d8a50dfd611 --- /dev/null +++ b/programs/dmatest_v2.py @@ -0,0 +1,249 @@ +# This file is licensed under the Apache License, Version 2.0 +# based on https://github.com/jbentham/pico/blob/main/rp_adc_test.py +# which is Copyright (c) 2021 Jeremy P Bentham and licensed under the Apache License, Version 2.0 + +import array +import math + +import machine +import rp2 +import asyncio +import uctypes +from uctypes import BF_POS, BF_LEN, UINT32, BFUINT32, struct +import gc + +ADC_BASE = 0x4004C000 + +ADC_CS_FIELDS = { + "RROBIN": 16 << BF_POS | 5 << BF_LEN | BFUINT32, + "AINSEL": 12 << BF_POS | 3 << BF_LEN | BFUINT32, + "ERR_STICKY": 10 << BF_POS | 1 << BF_LEN | BFUINT32, + "ERR": 9 << BF_POS | 1 << BF_LEN | BFUINT32, + "READY": 8 << BF_POS | 1 << BF_LEN | BFUINT32, + "START_MANY": 3 << BF_POS | 1 << BF_LEN | BFUINT32, + "START_ONCE": 2 << BF_POS | 1 << BF_LEN | BFUINT32, + "TS_EN": 1 << BF_POS | 1 << BF_LEN | BFUINT32, + "EN": 0 << BF_POS | 1 << BF_LEN | BFUINT32, +} +ADC_FCS_FIELDS = { + "THRESH": 24 << BF_POS | 4 << BF_LEN | BFUINT32, + "LEVEL": 16 << BF_POS | 4 << BF_LEN | BFUINT32, + "OVER": 11 << BF_POS | 1 << BF_LEN | BFUINT32, + "UNDER": 10 << BF_POS | 1 << BF_LEN | BFUINT32, + "FULL": 9 << BF_POS | 1 << BF_LEN | BFUINT32, + "EMPTY": 8 << BF_POS | 1 << BF_LEN | BFUINT32, + "DREQ_EN": 3 << BF_POS | 1 << BF_LEN | BFUINT32, + "ERR": 2 << BF_POS | 1 << BF_LEN | BFUINT32, + "SHIFT": 1 << BF_POS | 1 << BF_LEN | BFUINT32, + "EN": 0 << BF_POS | 1 << BF_LEN | BFUINT32, +} + +ADC_REGS = { + "CS_REG": 0x00 | UINT32, + "CS": (0x00, ADC_CS_FIELDS), + "RESULT_REG": 0x04 | UINT32, + "FCS_REG": 0x08 | UINT32, + "FCS": (0x08, ADC_FCS_FIELDS), + "FIFO_REG": 0x0C | UINT32, + "DIV_REG": 0x10 | UINT32, + "INTR_REG": 0x14 | UINT32, + "INTE_REG": 0x18 | UINT32, + "INTF_REG": 0x1C | UINT32, + "INTS_REG": 0x20 | UINT32, +} +ADC_DEVICE = struct(ADC_BASE, ADC_REGS) +src = 0x4004C00C # dev.RESULT_REG # 0x4004C00F + + +def t(x): + return (27 - (x - 0.706) / 0.001721) + + +SAMPLES = 1000 +SAMPLING_RATE_HZ = 22000 + + +class AdcDevice: + def __init__(self, device_nr, rate=SAMPLING_RATE_HZ, samples=SAMPLES): + self.__lock = asyncio.Lock() + self.__channel = device_nr + self.__adc_device = struct(ADC_BASE, ADC_REGS) + + self._buffer = None + self._sample_count = 0 + self._rate = rate + + self.set_sample_count(samples) + self.set_sampling_rate(rate) + + self.__setup_adc() + self._dma = rp2.DMA() + + self._dma_ctrl = self._dma.pack_ctrl(inc_read=False, + inc_write=True, + size=1, + treq_sel=36, + irq_quiet=True, + write_err=True, + read_err=True) + + def set_sampling_rate(self, rate): + async with self.__lock: + self._rate = rate + self.__adc_device.DIV_REG = (48000000 // self._rate - 1) << 8 + + def set_sample_count(self, samples): + async with self.__lock: + self._sample_count = samples + self._buffer = array.array("H", (0 for _ in range(samples))) + + def __stop_adc(self): + dev = self.__adc_device + + # drain FIFO + while dev.FCS.LEVEL: + _ = dev.FIFO_REG + + # clear ADC Control and Status register + dev.CS_REG = 0 + # clear ADC FIFO control register + dev.FCS_REG = 0 + + # disable ADC device + dev.CS.EN = 0 + # disable temperature sensor + dev.CS.TS_EN = 0 + + def __setup_adc(self): + dev = self.__adc_device + + # reset ADC Control and Status register + dev.CS_REG = 0 + # reset ADC FIFO control register + dev.FCS_REG = 0 + + # enable ADC device + dev.CS.EN = 1 + + # Power on temperature sensor + # dev.CS.TS_EN = 1 + + # Select analog mux input + dev.CS.AINSEL = self.__channel + + # Set to single conversion mode (self-clearing) + dev.CS.START_ONCE = 1 + + # Set clock divider: + # 0 for back-to-back conversions (9 cycles) + # non-zero for 1 + INT + FRAC/256 + dev.DIV_REG = (48000000 // self._rate - 1) << 8 + + # FIFO control and status + # write result to FIFO after each conversion + dev.FCS.EN = 1 + + # assert DMA requests when FIFO contains data + dev.FCS.DREQ_EN = 1 + + # DREQ/IRQ asserted when level >= threshold + dev.FCS.THRESH = 1 + + # clear FIFO overflow/underflow + dev.FCS.OVER = 1 + dev.FCS.UNDER = 1 + + # discard data from FIFO + dev.CS.START_MANY = 0 + while dev.FCS.LEVEL: + _ = dev.FIFO_REG + + # start measuring + dev.CS.START_MANY = 1 + + async def measure(self): + self.__setup_adc() + self._dma.config(read=src, + write=uctypes.addressof(self._buffer), + count=SAMPLES, + ctrl=self._dma_ctrl, + trigger=True) + + self._dma.active(1) + while self._dma.count > 0: + await asyncio.sleep(0) + self.__stop_adc() + return [x for x in self._buffer] + + +def calculate_frequency(voltages_centered, sampling_rate): + consecutive_sign = 0 + counts = [] + + old_sign = voltages_centered > 0 + for v in voltages_centered: + sign = v > 0 + if sign != old_sign: + counts.append(consecutive_sign) + consecutive_sign = 0 + + old_sign = sign + consecutive_sign += 1 + + if len(counts) < 3: + return -1 + + counts = counts[1:-1] + f = sampling_rate / (2 * sum(counts) / len(counts)) + count = None + return f + + +def calculate_peak_to_peak(voltages_centered): + positive_peaks = [] + negative_peaks = [] + maximum = -9999 + minimum = 9999 + + old_sign = voltages_centered > 0 + for v in voltages_centered: + sign = v > 0 + if sign != old_sign: + if maximum > -9999: + positive_peaks.append(maximum) + if minimum < 9999: + negative_peaks.append(minimum) + maximum = -9999 + minimum = 9999 + + if v > 0 and v > maximum: + maximum = v + if v < 0 and v < minimum: + minimum = v + + if len(positive_peaks) < 3 or len(negative_peaks) < 3: + return 0 + + negative_peaks = negative_peaks[1: -1] + positive_peaks = positive_peaks[1: -1] + return sum(positive_peaks) / len(positive_peaks) - sum(negative_peaks) / len(negative_peaks) + + +async def main(): + samples = SAMPLES + rate = SAMPLING_RATE_HZ + device = AdcDevice(1, rate=rate, samples=samples) + + while True: + voltages = [v / 4096 * 3.3 for v in await device.measure()] + voltage_mean = sum(voltages) / SAMPLES + voltages_centered = [v - voltage_mean for v in voltages] + freq = calculate_frequency(voltages_centered, sampling_rate=SAMPLING_RATE_HZ) + v_to_v = calculate_peak_to_peak(voltages_centered) + print(f"{freq:0.2f} Hz, {v_to_v:0.2f} V peak to peak") + gc.collect() + + +if __name__ == "__main__": + asyncio.run(main()) + gc.collect()