Skip to content
Snippets Groups Projects
Commit dfb05c30 authored by Andreas Gattringer's avatar Andreas Gattringer
Browse files

added v2 of adc/dma test

parent df1f2e27
Branches
No related tags found
No related merge requests found
# This file is licensed under the Apache License, Version 2.0
# based on https://github.com/jbentham/pico/blob/main/rp_adc_test.py
# which is Copyright (c) 2021 Jeremy P Bentham and licensed under the Apache License, Version 2.0
import array
import math
import machine
import rp2
import asyncio
import uctypes
from uctypes import BF_POS, BF_LEN, UINT32, BFUINT32, struct
import gc
ADC_BASE = 0x4004C000
ADC_CS_FIELDS = {
"RROBIN": 16 << BF_POS | 5 << BF_LEN | BFUINT32,
"AINSEL": 12 << BF_POS | 3 << BF_LEN | BFUINT32,
"ERR_STICKY": 10 << BF_POS | 1 << BF_LEN | BFUINT32,
"ERR": 9 << BF_POS | 1 << BF_LEN | BFUINT32,
"READY": 8 << BF_POS | 1 << BF_LEN | BFUINT32,
"START_MANY": 3 << BF_POS | 1 << BF_LEN | BFUINT32,
"START_ONCE": 2 << BF_POS | 1 << BF_LEN | BFUINT32,
"TS_EN": 1 << BF_POS | 1 << BF_LEN | BFUINT32,
"EN": 0 << BF_POS | 1 << BF_LEN | BFUINT32,
}
ADC_FCS_FIELDS = {
"THRESH": 24 << BF_POS | 4 << BF_LEN | BFUINT32,
"LEVEL": 16 << BF_POS | 4 << BF_LEN | BFUINT32,
"OVER": 11 << BF_POS | 1 << BF_LEN | BFUINT32,
"UNDER": 10 << BF_POS | 1 << BF_LEN | BFUINT32,
"FULL": 9 << BF_POS | 1 << BF_LEN | BFUINT32,
"EMPTY": 8 << BF_POS | 1 << BF_LEN | BFUINT32,
"DREQ_EN": 3 << BF_POS | 1 << BF_LEN | BFUINT32,
"ERR": 2 << BF_POS | 1 << BF_LEN | BFUINT32,
"SHIFT": 1 << BF_POS | 1 << BF_LEN | BFUINT32,
"EN": 0 << BF_POS | 1 << BF_LEN | BFUINT32,
}
ADC_REGS = {
"CS_REG": 0x00 | UINT32,
"CS": (0x00, ADC_CS_FIELDS),
"RESULT_REG": 0x04 | UINT32,
"FCS_REG": 0x08 | UINT32,
"FCS": (0x08, ADC_FCS_FIELDS),
"FIFO_REG": 0x0C | UINT32,
"DIV_REG": 0x10 | UINT32,
"INTR_REG": 0x14 | UINT32,
"INTE_REG": 0x18 | UINT32,
"INTF_REG": 0x1C | UINT32,
"INTS_REG": 0x20 | UINT32,
}
ADC_DEVICE = struct(ADC_BASE, ADC_REGS)
src = 0x4004C00C # dev.RESULT_REG # 0x4004C00F
def t(x):
return (27 - (x - 0.706) / 0.001721)
SAMPLES = 1000
SAMPLING_RATE_HZ = 22000
class AdcDevice:
def __init__(self, device_nr, rate=SAMPLING_RATE_HZ, samples=SAMPLES):
self.__lock = asyncio.Lock()
self.__channel = device_nr
self.__adc_device = struct(ADC_BASE, ADC_REGS)
self._buffer = None
self._sample_count = 0
self._rate = rate
self.set_sample_count(samples)
self.set_sampling_rate(rate)
self.__setup_adc()
self._dma = rp2.DMA()
self._dma_ctrl = self._dma.pack_ctrl(inc_read=False,
inc_write=True,
size=1,
treq_sel=36,
irq_quiet=True,
write_err=True,
read_err=True)
def set_sampling_rate(self, rate):
async with self.__lock:
self._rate = rate
self.__adc_device.DIV_REG = (48000000 // self._rate - 1) << 8
def set_sample_count(self, samples):
async with self.__lock:
self._sample_count = samples
self._buffer = array.array("H", (0 for _ in range(samples)))
def __stop_adc(self):
dev = self.__adc_device
# drain FIFO
while dev.FCS.LEVEL:
_ = dev.FIFO_REG
# clear ADC Control and Status register
dev.CS_REG = 0
# clear ADC FIFO control register
dev.FCS_REG = 0
# disable ADC device
dev.CS.EN = 0
# disable temperature sensor
dev.CS.TS_EN = 0
def __setup_adc(self):
dev = self.__adc_device
# reset ADC Control and Status register
dev.CS_REG = 0
# reset ADC FIFO control register
dev.FCS_REG = 0
# enable ADC device
dev.CS.EN = 1
# Power on temperature sensor
# dev.CS.TS_EN = 1
# Select analog mux input
dev.CS.AINSEL = self.__channel
# Set to single conversion mode (self-clearing)
dev.CS.START_ONCE = 1
# Set clock divider:
# 0 for back-to-back conversions (9 cycles)
# non-zero for 1 + INT + FRAC/256
dev.DIV_REG = (48000000 // self._rate - 1) << 8
# FIFO control and status
# write result to FIFO after each conversion
dev.FCS.EN = 1
# assert DMA requests when FIFO contains data
dev.FCS.DREQ_EN = 1
# DREQ/IRQ asserted when level >= threshold
dev.FCS.THRESH = 1
# clear FIFO overflow/underflow
dev.FCS.OVER = 1
dev.FCS.UNDER = 1
# discard data from FIFO
dev.CS.START_MANY = 0
while dev.FCS.LEVEL:
_ = dev.FIFO_REG
# start measuring
dev.CS.START_MANY = 1
async def measure(self):
self.__setup_adc()
self._dma.config(read=src,
write=uctypes.addressof(self._buffer),
count=SAMPLES,
ctrl=self._dma_ctrl,
trigger=True)
self._dma.active(1)
while self._dma.count > 0:
await asyncio.sleep(0)
self.__stop_adc()
return [x for x in self._buffer]
def calculate_frequency(voltages_centered, sampling_rate):
consecutive_sign = 0
counts = []
old_sign = voltages_centered > 0
for v in voltages_centered:
sign = v > 0
if sign != old_sign:
counts.append(consecutive_sign)
consecutive_sign = 0
old_sign = sign
consecutive_sign += 1
if len(counts) < 3:
return -1
counts = counts[1:-1]
f = sampling_rate / (2 * sum(counts) / len(counts))
count = None
return f
def calculate_peak_to_peak(voltages_centered):
positive_peaks = []
negative_peaks = []
maximum = -9999
minimum = 9999
old_sign = voltages_centered > 0
for v in voltages_centered:
sign = v > 0
if sign != old_sign:
if maximum > -9999:
positive_peaks.append(maximum)
if minimum < 9999:
negative_peaks.append(minimum)
maximum = -9999
minimum = 9999
if v > 0 and v > maximum:
maximum = v
if v < 0 and v < minimum:
minimum = v
if len(positive_peaks) < 3 or len(negative_peaks) < 3:
return 0
negative_peaks = negative_peaks[1: -1]
positive_peaks = positive_peaks[1: -1]
return sum(positive_peaks) / len(positive_peaks) - sum(negative_peaks) / len(negative_peaks)
async def main():
samples = SAMPLES
rate = SAMPLING_RATE_HZ
device = AdcDevice(1, rate=rate, samples=samples)
while True:
voltages = [v / 4096 * 3.3 for v in await device.measure()]
voltage_mean = sum(voltages) / SAMPLES
voltages_centered = [v - voltage_mean for v in voltages]
freq = calculate_frequency(voltages_centered, sampling_rate=SAMPLING_RATE_HZ)
v_to_v = calculate_peak_to_peak(voltages_centered)
print(f"{freq:0.2f} Hz, {v_to_v:0.2f} V peak to peak")
gc.collect()
if __name__ == "__main__":
asyncio.run(main())
gc.collect()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment