Skip to content
Snippets Groups Projects
Commit 23484217 authored by Andreas Gattringer's avatar Andreas Gattringer
Browse files

next mass update

parent a91fa95b
No related branches found
No related tags found
No related merge requests found
Showing
with 595 additions and 27 deletions
import sys
from umnp.microcontroller.communication.udp_communicator import UDPCommunicator
from umnp.microcontroller.devices.network.ethernet_w5500 import EthernetW5500
from umnp.microcontroller.devices.network.udp import UDPSender, UDPReceiver
from umnp.microcontroller.measurementdevice import MeasurementDevice
from umnp.microcontroller.tasks.periodictask import PeriodicTask
from umnp.protocol.common import UDP_DATA_PORT, UDP_CMD_PORT
if sys.implementation.name == "micropython":
# noinspection PyUnresolvedReferences
import machine
# noinspection PyUnresolvedReferences
import ntptime
# noinspection PyUnresolvedReferences
import uasyncio as asyncio
else:
from umnp.microcontroller.umock import machine
import asyncio
def report_time(*args) -> str:
return ""
def main():
# configure network
device = MeasurementDevice()
spi = machine.SPI(
0, 2_000_000, mosi=machine.Pin(19), miso=machine.Pin(16), sck=machine.Pin(18)
)
ether = EthernetW5500(
spi, machine.Pin(17), machine.Pin(20), mac=device.generated_mac_raw(), dhcp=True
)
uart = machine.UART(1, 11)
device.add_network_adapter(ether)
sender = UDPSender(ether.ip, ether.netmask, UDP_DATA_PORT)
receiver = UDPReceiver(ether.ip, UDP_CMD_PORT)
communicator = UDPCommunicator(
receiver=receiver, sender=sender, device_id=device.identifier
)
timer = PeriodicTask(report_time, print, 1000)
communicator.add_task(task=timer, name="timer")
# start
asyncio.run(communicator.start())
if __name__ == "__main__":
main()
import sys
from umnp.microcontroller.communication.udp_communicator import UDPCommunicator
from umnp.microcontroller.devices.network.ethernet_w5500 import EthernetW5500
from umnp.microcontroller.devices.network.udp import UDPSender, UDPReceiver
from umnp.microcontroller.measurementdevice import MeasurementDevice
from umnp.microcontroller.tasks.periodictask import PeriodicTask
from umnp.protocol.common import UDP_DATA_PORT, UDP_CMD_PORT
if sys.implementation.name == "micropython":
# noinspection PyUnresolvedReferences
import machine
# noinspection PyUnresolvedReferences
import ntptime
# noinspection PyUnresolvedReferences
import uasyncio as asyncio
else:
from umnp.microcontroller.umock import machine
import asyncio
def report_time(*args) -> str:
return ""
def main():
# configure network
device = MeasurementDevice()
spi = machine.SPI(
0, 2_000_000, mosi=machine.Pin(19), miso=machine.Pin(16), sck=machine.Pin(18)
)
ether = EthernetW5500(
spi, machine.Pin(17), machine.Pin(20), mac=device.generated_mac_raw(), dhcp=True
)
ntptime.settime()
device.add_network_adapter(ether)
sender = UDPSender(ether.ip, ether.netmask, UDP_DATA_PORT)
receiver = UDPReceiver(ether.ip, UDP_CMD_PORT)
communicator = UDPCommunicator(
receiver=receiver, sender=sender, device_id=device.identifier
)
timer = PeriodicTask(report_time, print, 1000)
communicator.add_task(task=timer, name="timer")
# start
asyncio.run(communicator.start())
if __name__ == "__main__":
main()
......@@ -32,7 +32,10 @@ def main():
spi = machine.SPI(
0, 2_000_000, mosi=machine.Pin(19), miso=machine.Pin(16), sck=machine.Pin(18)
)
ether = EthernetW5500(spi, 17, 20, mac=device.generated_mac_raw(), dhcp=True)
ether = EthernetW5500(
spi, machine.Pin(17), machine.Pin(20), mac=device.generated_mac_raw(), dhcp=True
)
print(ether.mac)
device.add_network_adapter(ether)
i2c = machine.I2C(id=1, scl=machine.Pin(27), sda=machine.Pin(26))
......@@ -49,3 +52,7 @@ def main():
comm.add_task(x, "test_function")
# start
asyncio.run(comm.start())
if __name__ == "__main__":
main()
import time
import serial
conn = serial.Serial(
port="/dev/ttyUSB0",
baudrate=115200,
dsrdtr=True,
xonxoff=False,
rtscts=False,
timeout=0.5,
)
print("Connected")
time.sleep(1)
print()
print("get newest log")
conn.write(b"$AE33:L1\r")
print(conn.readline())
time.sleep(1)
print()
print("get tape advances")
conn.write(b"$AE33:A\r")
print(conn.readlines())
time.sleep(1)
print()
print("get setup file")
conn.write(b"$AE33:SG\r")
print(conn.readlines())
print()
time.sleep(1)
print("start measurement")
conn.write(b"$AE33:X1\r")
print(conn.readlines())
print()
import time
import serial
conn = serial.Serial(
port="/dev/ttyUSB0",
baudrate=115200,
dsrdtr=True,
xonxoff=False,
rtscts=False,
timeout=0.5,
)
print("Connected")
for i in range(5):
conn.write(b"$AE33:D1\r")
print(conn.readline())
time.sleep(0.2)
import datetime
import os
import subprocess
def upload_file(port: str, local_path: str, remote_path: str | None = None):
if remote_path is None:
remote_path = local_path
if not os.path.exists(local_path):
raise ValueError("fFile {fn} does not exist")
if remote_path != local_path:
print(f"Uploading file {local_path} as {remote_path}")
else:
print(f"Uploading file {local_path}")
cmd = ["ampy", "--port", port, "put", local_path, remote_path]
subprocess.run(cmd)
def list_remote_file_dates_and_directories(port):
cmd = ["ampy", "--port", port, "run", "tools/list-files.py"]
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE)
files = {}
dirs = []
for line in proc.stdout:
line = line.decode("utf-8")
fn, size, _, f_type, date = line.split()
while fn.startswith("/"):
fn = fn[1:]
if f_type == "f":
files[fn] = datetime.datetime.strptime(date, "%Y-%m-%dT%H:%M:%S")
elif f_type == "d":
dirs.append(fn)
return files, dirs
def make_remote_directory(path: str, port: str):
if not path:
return
print(f"Creating directory {path}")
cmd = ["ampy", "--port", port, "mkdir", path]
subprocess.run(cmd)
import os
def get_directory_tree_from_directories(directories: list[str]) -> list[str]:
dirs = list(set([os.path.split(d)[0] for d in directories]))
all_dirs = dirs
for d in dirs:
while d:
d = os.path.split(d)[0]
if d in all_dirs:
continue
all_dirs.append(d)
return list(sorted(list(set(dirs))))
import subprocess
def parse_git_status(state: str) -> str:
if len(state) == 0 or len(state) > 2:
raise ValueError
if "M" in state:
return "modified"
if "??" in state:
return "untracked"
if "D" in state:
return "deleted"
if "A" in state:
return "added"
if "T" in state:
return "type changed"
if "R" in state:
return "renamed"
if "C" in state:
return "copied"
def get_git_unclean_files() -> dict[str:str]:
"""
Returns a dictionary of unclean files of the git repository, in which the current directory resides.
Raises
------
ValueError
If 'git status' can't be executed or fails with an error
Returns
-------
dict[str: str]
Keys of this dictionary are the paths of 'dirty' files (e.g. uncommitted and untracked files),
and a description as value.
"""
git_status = {}
cmd = ["git", "status", "--porcelain=v1"]
try:
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
proc.wait()
except FileNotFoundError:
raise ValueError("git executable not found")
if proc.returncode != 0:
error = "\n".join([x.decode("utf-8") for x in proc.stderr])
raise ValueError(f"Error running {' '.join(cmd)}: {error}")
for line in proc.stdout:
fields = line.decode("utf-8").split()
status = fields[0]
file = " ".join(fields[1:])
if file.startswith('"') and file.endswith('"'):
file = file[1:-1]
git_status[file] = parse_git_status(status)
return git_status
import ast
import os
def get_module_names(node: ast.Import):
names = [n.name for n in node.names]
return names
def get_imported_files(
fn,
modules=None,
files=None,
filter_by: str | None = None,
ignore: str | None = None,
):
if modules is None:
modules = []
if files is None:
files = []
current_modules = []
with open(fn) as f:
root = ast.parse(f.read(), fn)
for node in ast.iter_child_nodes(root):
if isinstance(node, ast.Import):
current_modules.extend(get_module_names(node))
elif isinstance(node, ast.ImportFrom):
current_modules.append(node.module)
current_modules = sorted(list(set(current_modules)))
for module in current_modules:
if module in modules:
pass
module_fn = module.replace(".", os.sep)
if (filter_by and module_fn.startswith(filter_by)) or filter_by is None:
if ignore is not None and ignore in module_fn:
continue
if os.path.exists(module_fn) and os.path.isdir(module_fn):
module_fn = os.path.join(module_fn, "__init__.py")
else:
module_fn = module_fn + ".py"
modules.append(module)
files.append(module_fn)
files, modules = get_imported_files(
module_fn, modules, files, filter_by=filter_by, ignore=ignore
)
files = list(sorted(list(set(files))))
modules = list(sorted(list(set(modules))))
return files, modules
#! /usr/bin/env/python3
"""
upload-script.py
Uploads a Python script, including imported dependencies to a microcontroller using ampy.
Unless otherwise specified, the upload will only occur, if the files do not contain any changes not yet committed
to git, i.e. the working tree (or the parts of it that will be uploaded) are not dirty.
If a file to be uploaded already exists on the microcontroller, it will only be uploaded if newer.
Example:
python ./upload-script.py programs/sensor_calibration.py --allow-dirty --port "/dev/ttyACM0"
"""
import argparse
import datetime
import os
import sys
# this is ugly and probably not recommended, but enables imports relative to this file
sys.path.append(os.path.dirname(os.path.dirname(__file__)))
from helpers.ampy import (
upload_file,
list_remote_file_dates_and_directories,
make_remote_directory,
)
from helpers.filesystem import get_directory_tree_from_directories
from helpers.git import get_git_unclean_files
from helpers.imports import get_imported_files
def remote_file_older_or_missing(path: str, remote_file_date: datetime.datetime | None):
if not remote_file_date:
return True
disk_date = datetime.datetime.fromtimestamp(os.path.getmtime(path))
if disk_date > remote_file_date:
return True
return False
def make_directories(port: str, local_dirs: list[str], remote_dirs: list[str]):
for path in local_dirs:
if path in remote_dirs:
print(f"Skipping creation of '{path}': directory already exists")
continue
make_remote_directory(path, port=port)
def upload_files(local_files, script, port: str, ignore_file_times=False):
remote_file_dates, remote_directories = list_remote_file_dates_and_directories(port)
local_directories = get_directory_tree_from_directories(local_files)
make_directories(port, local_directories, remote_directories)
for file_path in local_files:
remote_date = remote_file_dates.get(file_path)
local_newer = remote_file_older_or_missing(file_path, remote_date)
if ignore_file_times or local_newer:
upload_file(port=port, local_path=file_path)
else:
print(f"Skipping '{file_path}': remote file is newer")
remote_script_date = remote_file_dates.get("main.py")
if remote_file_older_or_missing(script, remote_script_date):
upload_file(port, script, "main.py")
else:
print(f"Skipping upload '{script}' as main.py: remote file is newer")
def check_for_unclean_files(files_to_upload: list[str], args):
if not args.no_git:
return
errors = 0
unclean_files = get_git_unclean_files()
for fn in files_to_upload:
if fn in unclean_files:
state = unclean_files[fn]
print(f"Warning: file '{fn}': {state}")
errors += 1
if errors == 0:
return
print("Warning: files to upload contain uncommitted changes.")
if args.allow_dirty:
print("Continuing with dirty working tree")
return
print("Please either commit and retry, or re-run this script with --dirty")
exit(1)
def main(args):
fn = args.script
files, modules = get_imported_files(fn, filter_by="umnp", ignore="mock")
check_for_unclean_files(files, args)
upload_files(files, fn, port=args.port, ignore_file_times=args.ignore_time)
if __name__ == "__main__":
parser = argparse.ArgumentParser(prog=sys.argv[0])
parser.add_argument(
"--allow-dirty",
action="store_true",
help="allow upload even when changes to uploaded files are not yet committed to git",
)
parser.add_argument(
"--ignore-time",
action="store_true",
help="ignore file timestamps and always upload, even when the "
"files on the disk are older than on the device",
)
parser.add_argument(
"--no-git", action="store_true", help="don't check for git status"
)
parser.add_argument(
"--port", help="address or port of the attached micro-controller", required=True
)
parser.add_argument(
"script",
help="Name of the python script file to upload. Will be renamed to main.py",
)
args_ = parser.parse_args()
if not os.path.exists(args_.script):
print(f"File '{args_.script}' does not exist")
exit(1)
if not os.path.exists(args_.port):
print(f"Can't connect to '{args_.port}': no such file")
exit(1)
main(args_)
from fs import FileSystemInformation
current_dir = ""
def main():
fs = FileSystemInformation()
fs.print_fs_tree()
main()
import datetime
def utc_now_rounded_second() -> datetime.datetime:
now = datetime.datetime.now(tz=datetime.timezone.utc)
if now.microsecond >= 500 * 1000:
now = now + datetime.timedelta(seconds=1)
now = now.replace(microsecond=0)
return now
import logging
import os
def setup_logger(name: str, log_dir: str | None = None):
now = utc_now_rounded_second()
now_string = now.isoformat().replace(":", "-")
now_string = now_string.split("+")[0]
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
if log_dir:
if not os.path.exists(log_dir):
os.mkdir(log_dir)
log_file_fn = os.path.join(log_dir, f"{name}-{now_string}.log")
log_file = logging.FileHandler(log_file_fn)
log_file.setLevel(logging.DEBUG)
logger.addHandler(log_file)
console_logger = logging.StreamHandler()
logger.addHandler(console_logger)
return logger
from umnp.communication.abstract_serial_connection import AbstractSerialConnection
AE33_BAUDRATE = 115200
class SerialConnection:
def __init__(self):
......
import sys
from umnp.communication.abstract_serial_connection import AbstractSerialConnection
if sys.implementation == "micropython":
import machine
else:
from umnp.microcontroller.umock import machine
class UARTSerial(AbstractSerialConnection):
def __init__(self, address: machine.UART):
super().__init__(address)
......@@ -13,18 +13,25 @@ else:
class EthernetW5500(EthernetAdapter):
def __init__(self, spi: machine.SPI, pin1: int, pin2: int, mac: bytes, dhcp=False):
def __init__(
self,
spi: machine.SPI,
pin1: machine.Pin,
pin2: machine.Pin,
mac: bytes,
dhcp=False,
):
super().__init__()
self._spi = spi
self._pin1 = machine.Pin(pin1)
self._pin2 = machine.Pin(pin2)
self._nic = network.WIZNET5K(spi, self._pin1, self._pin2, mac)
# self._nic.active(True)
self._pin1 = pin1
self._pin2 = pin2
self._nic = network.WIZNET5K(spi, self._pin1, self._pin2)
self._nic.active(True)
self._nic.config(mac=mac)
if dhcp:
self.enable_dhcp()
def get_dhcp(self):
def enable_dhcp(self):
while True:
print("Requesting IP via DHCP")
try:
......@@ -32,10 +39,7 @@ class EthernetW5500(EthernetAdapter):
return
except OSError:
pass
def enable_dhcp(self):
self.get_dhcp()
print(self._nic.ifconfig())
print(self._nic.ifconfig())
@property
def netmask(self):
......@@ -44,3 +48,7 @@ class EthernetW5500(EthernetAdapter):
@property
def ip(self):
return self._nic.ifconfig()[0]
@property
def mac(self):
return self._nic.config("mac")
......@@ -84,3 +84,42 @@ class I2C:
def writeto_mem(self, i2c_address, address, buffer):
pass
class UART:
def __init__(self, id: int): ...
def init(
self,
baudrate=9600,
bits=8,
parity=None,
stop=1,
tx=None,
rx=None,
rts=None,
cts=None,
txbuf=None,
rxbuf=None,
timeout=None,
timeout_chars=None,
invert=None,
flow=None,
): ...
def any(self) -> int: ...
def deinit(self): ...
def read(self, nbytes=None): ...
def readinto(self, buf, nbytes=None): ...
def readline(self): ...
def write(self, buf: bytes): ...
def sendbread(self): ...
def flush(self): ...
def txdone(self): ...
......@@ -2,9 +2,9 @@ from umnp.microcontroller.umock.machine import SPI, Pin
class WIZNET5K:
def __init__(self, spi: SPI, pin1: Pin, pin2: Pin, mac: bytes):
def __init__(self, spi: SPI, pin1: Pin, pin2: Pin):
self._active: bool = False
self._mac = b'\xe6ad\x08Cx' # FIXME
self._mac = b"\xe6ad\x08Cx" # FIXME
self._ip = "127.0.0.1"
self._gateway = "127.0.0.2"
self._netmask = "255.255.255.0"
......@@ -12,7 +12,7 @@ class WIZNET5K:
self._spi = spi
self._pin1 = pin1
self._pin2 = pin2
self._mac = mac
self._mac = None
def active(self, active: bool) -> None:
self._active = active
......@@ -20,8 +20,8 @@ class WIZNET5K:
def config(self, *args, **kwargs):
if len(args) == 1 and args[0] == "mac":
return self._mac
if 'mac' in kwargs:
self._mac = kwargs['mac']
if "mac" in kwargs:
self._mac = kwargs["mac"]
def ifconfig(self, *args, **kwargs):
if len(args) == 1 and args[0] == "dhcp":
......
UDP_DATA_PORT = 7777
UDP_METADATA_PORT = 7778
UDP_CMD_PORT = 7776
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment