Skip to content
Snippets Groups Projects
Commit da2cf79a authored by Andreas Gattringer's avatar Andreas Gattringer
Browse files

updated a33 tests

parent 436e8672
No related branches found
No related tags found
No related merge requests found
...@@ -6,22 +6,22 @@ import time ...@@ -6,22 +6,22 @@ import time
from umnp.communication.serial_connection import SerialConnection from umnp.communication.serial_connection import SerialConnection
from umnp.devices.aethalometer.ae33 import AE33 from umnp.devices.aethalometer.ae33 import AE33
now = datetime.datetime.now() # now = datetime.datetime.now()
if now.microsecond >= 500 * 1000: # if now.microsecond >= 500 * 1000:
now = now + datetime.timedelta(seconds=1) # now = now + datetime.timedelta(seconds=1)
now = now.replace(microsecond=0) # now = now.replace(microsecond=0)
now_string = now.isoformat().replace(":", "-") # now_string = now.isoformat().replace(":", "-")
logger = logging.getLogger("ae33-test") # logger = logging.getLogger("ae33-test")
logger.setLevel(logging.DEBUG) # logger.setLevel(logging.DEBUG)
log_file_fn = os.path.join("logs", f"ae33-{now_string}.log") # log_file_fn = os.path.join("logs", f"ae33-{now_string}.log")
log_file = logging.FileHandler(log_file_fn) # log_file = logging.FileHandler(log_file_fn)
log_file.setLevel(logging.DEBUG) # log_file.setLevel(logging.DEBUG)
logger.addHandler(log_file) # logger.addHandler(log_file)
def main(): def main():
conn_opts = {"baudrate": 115200, "dsrdtr": True, "rtscts": False, "xonoff": False} conn_opts = {"baudrate": 115200, "dsrdtr": True, "rtscts": False, "xonoff": False}
connection = SerialConnection("/dev/ttyUSB2", options=conn_opts) connection = SerialConnection("/dev/ttyUSB0", options=conn_opts)
ae33 = AE33(connection) ae33 = AE33(connection)
while True: while True:
current = ae33.current_measurement() current = ae33.current_measurement()
......
SERIAL_DEFAULT_BAUD_RATE = 112512 SERIAL_DEFAULT_BAUD_RATE = 115200
SERIAL_DEFAULT_WAIT_TIME_S = 0.01 SERIAL_DEFAULT_WAIT_TIME_S = 0.5
class ConnectionProblemException(Exception): class ConnectionProblemException(Exception):
......
...@@ -6,7 +6,7 @@ from umnp.communication import SERIAL_DEFAULT_BAUD_RATE, SERIAL_DEFAULT_WAIT_TIM ...@@ -6,7 +6,7 @@ from umnp.communication import SERIAL_DEFAULT_BAUD_RATE, SERIAL_DEFAULT_WAIT_TIM
class AbstractSerialConnection: class AbstractSerialConnection:
def __init__(self, address, options: dict | None = None): def __init__(self, address, options: dict | None = None):
self.__connection = None self.__connection = None
self.__lock = asyncio.Lock()
self.__address = address self.__address = address
if options is None: if options is None:
self.__options = {} self.__options = {}
...@@ -19,6 +19,33 @@ class AbstractSerialConnection: ...@@ -19,6 +19,33 @@ class AbstractSerialConnection:
self.__line_sep_write = self.get_option("write separator", b"\r") self.__line_sep_write = self.get_option("write separator", b"\r")
self.__max_connection_attempts = self.get_option("max connection attempts", 3) self.__max_connection_attempts = self.get_option("max connection attempts", 3)
@property
def connection(self):
return self.__connection
@property
def max_connection_attempts(self):
return self.__max_connection_attempts
@property
def address(self):
return self.__address
@property
def wait_time(self):
return self.__wait_time
@property
def line_sep_read(self):
return self.__line_sep_read
@property
def line_sep_write(self):
return self.__line_sep_write
@connection.setter
def connection(self, conn):
self.__connection = conn
def connect(self): def connect(self):
raise NotImplementedError raise NotImplementedError
...@@ -29,7 +56,7 @@ class AbstractSerialConnection: ...@@ -29,7 +56,7 @@ class AbstractSerialConnection:
raise NotImplementedError raise NotImplementedError
def send_command(self, command): def send_command(self, command):
pass raise NotImplementedError
def read_line(self, timeout=None): def read_line(self, timeout=None):
raise NotImplementedError raise NotImplementedError
...@@ -37,4 +64,4 @@ class AbstractSerialConnection: ...@@ -37,4 +64,4 @@ class AbstractSerialConnection:
def get_option(self, name: str, default=None): def get_option(self, name: str, default=None):
if self.__options is None: if self.__options is None:
return default return default
return self.__options.get(name) return self.__options.get(name, default)
import asyncio
import logging import logging
import threading
import time import time
import serial import serial
from umnp.communication import ConnectionProblemException from umnp.communication import ConnectionProblemException, SERIAL_DEFAULT_BAUD_RATE, SERIAL_DEFAULT_WAIT_TIME_S
from umnp.communication.abstract_serial_connection import AbstractSerialConnection from umnp.communication.abstract_serial_connection import AbstractSerialConnection
class SerialConnection(AbstractSerialConnection): class SerialConnection(AbstractSerialConnection):
def __init__(self, address, options=None): def __init__(self, address, options=None):
super().__init__(address, options) super().__init__(address, options)
self.connect()
self.__lock = threading.RLock()
@property
def baud_rate(self):
return self.get_option("baudrate", SERIAL_DEFAULT_BAUD_RATE)
def connect(self): def connect(self):
if self.__connection: if self.connection:
return return
attempts = 0 attempts = 0
connection = None connection = None
max_attempts = self.__max_connection_attempts max_attempts = self.max_connection_attempts
while attempts < max_attempts: while attempts < max_attempts:
try: try:
connection = serial.Serial( connection = serial.Serial(
port=self.__address, port=self.address,
baudrate=self.__baud_rate, baudrate=self.baud_rate,
parity=self.get_option("parity", serial.PARITY_NONE), parity=self.get_option("parity", serial.PARITY_NONE),
stopbits=self.get_option("stopbits", serial.STOPBITS_ONE), stopbits=self.get_option("stopbits", serial.STOPBITS_ONE),
timeout=self.__wait_time, timeout=self.get_option("wait time", SERIAL_DEFAULT_WAIT_TIME_S),
xonxoff=self.get_option("xonoff", False), xonxoff=self.get_option("xonoff", False),
rtscts=self.get_option("rtscts", False), rtscts=self.get_option("rtscts", False),
dsrdtr=self.get_option("dsrdtr", False), dsrdtr=self.get_option("dsrdtr", False),
...@@ -39,14 +47,120 @@ class SerialConnection(AbstractSerialConnection): ...@@ -39,14 +47,120 @@ class SerialConnection(AbstractSerialConnection):
time.sleep(0.1) time.sleep(0.1)
if not connection: if not connection:
logging.error(f"could not connect to {self.__address}") print(f"could not connect to {self.address}")
return return
time.sleep(0.1) time.sleep(0.1)
self.__connection = connection self.connection = connection
connection.flush() connection.flush()
connection.reset_input_buffer() connection.reset_input_buffer()
connection.reset_output_buffer() connection.reset_output_buffer()
logging.info(f"connected to {self.__address}, baud rate: {self.__baud_rate}") print(f"connected to {self.address}, baud rate: {self.baud_rate}")
logging.debug(connection) logging.debug(connection)
def sync_command(self, cmd, expected_lines=1, show_reply=True):
if not self.connection:
logging.error(f"could not send command '{cmd}': not connected")
return None
time.sleep(self.wait_time)
self.connection.flushInput()
self.connection.flushOutput()
if isinstance(self.line_sep_read, bytes):
empty = self.line_sep_read.decode('utf8')
elif isinstance(self.line_sep_read, str):
empty = self.line_sep_read
else:
empty = ""
result = ''
result_debug = "<no response>"
cmd_nice = cmd
print(
f"sending sync command '{cmd_nice}' to '{self.address}' and expecting {expected_lines} lines of results")
with self.__lock:
self.send_command(cmd)
# time.sleep(self.wait_time)
if expected_lines == 1:
result = self.read_line(timeout=self.wait_time)
else:
received_lines = 0
while received_lines < expected_lines:
line = self.read_line(timeout=self.wait_time)
if line and line != empty:
result += line
received_lines += 1
if result:
if isinstance(result, bytes):
result = result.decode("utf8")
result_debug = result.replace('\n', '\\n').replace('\r', '\\r')
else:
result_debug = result.replace('\n', '\\n').replace('\r', '\\r')
if show_reply:
print(f"received (processed) '{result_debug}' from {self.address} in response to '{cmd}'")
else:
print(f"received (processed) '{result_debug}' from {self.address} in response to '{cmd}'")
return result
def read_line(self, timeout=None):
# fixme: what do we do if we have no connection
if not self.connection:
return None
result = b''
# fixme: what do we do on an error?
with self.__lock:
sep = self.line_sep_read
sep = None
if sep:
start_time = time.time()
try:
while True:
newest_char = self.connection.read(1)
print(newest_char)
if newest_char is not None:
result += newest_char
if newest_char == sep:
break
if time.time() - start_time > timeout:
# break
pass
except serial.serialutil.SerialException:
pass
else:
try:
result = self.connection.readline()
print("Result:", result.decode('utf-8').strip())
except serial.serialutil.SerialException:
pass
logging.debug(f"received (raw) '{result}' from {self.address}")
return result.decode('utf8').strip()
def send_command(self, cmd):
if not self.connection:
logging.error(f"could not send command '{cmd}': not connected")
return None
print(f"sending command '{cmd}' to '{self.address}'")
if isinstance(cmd, str):
cmd = cmd.encode("utf-8")
if self.line_sep_write:
sep = self.line_sep_write
if isinstance(sep, str):
sep = sep.encode('utf-8')
cmd += sep
with self.__lock:
self.connection.write(cmd)
...@@ -13,7 +13,7 @@ class AE33: ...@@ -13,7 +13,7 @@ class AE33:
pass pass
def request_measurement(self) -> str | None: def request_measurement(self) -> str | None:
return self.__connection.sync_command("$A33:D1\r", 1) return self.__connection.sync_command(b"$AE33:D1\r", 1)
def current_measurement(self) -> str | None: def current_measurement(self) -> str | None:
current = self.request_measurement() current = self.request_measurement()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment