"""Get the power consumption of as usb plugded ADEC WATTS wattmeters."""
import math
import queue
import struct
import threading
import time
import serial # uv pip install pyserial
from serial.tools.list_ports import comports
HEADER: bytes = b"\xfe\xfd\xfc\xfb" # comes from adecwatts c code 'read_device_values'
BAUD: int = 921600
STEP = 2.0**-32.0 # time counter step time
[docs]
class ADECWattmeter(threading.Thread):
"""Extract the activity of a ADEC-WATTS wattmeter connected to serial usb port.
Examples
--------
>>> from mendevi.measures.adecwatts import ADECWattmeter
>>> with ADECWattmeter(no_fail=True) as usage:
... pass
...
>>>
"""
def __init__(self, port: str | None = None, *, no_fail: bool = False) -> None:
"""Initialize the usage context.
Parameters
----------
port : str, default=autodetect
The port to listen on. In gerenal it is "/dev/ttyUSB0".
By default, it is autodetected using :py:meth`guess_port`.
no_fail : bool, default=False
If False, raise RuntimeError if it fails to get the ADEC measure.
Otherwise (if True), return None instead of failing.
"""
assert isinstance(no_fail, bool), no_fail
super().__init__(daemon=True)
if port is None:
try:
port = self.guess_port()
except RuntimeError:
if not no_fail:
raise
else:
assert isinstance(port, str), port.__class__.__name__
self._signal_queue = queue.Queue()
self._stop_flag = False
self.port = port
self.res: dict | None = {"ts": [], "u_rms": [], "i_rms": [], "p_eff": []} if port else None
def _find_header(self, port: serial.Serial) -> float:
"""Read the stream until it encounters the header."""
buffer: bytes = self._read_port(port, len(HEADER))
while buffer != HEADER:
buffer = buffer[1:] + self._read_port(port)
return time.time()
def _read_frame(self, port: serial.Serial) -> dict[str]:
"""Read a full frame after the header has been consumed."""
# get frame informations from header
data = self._read_port(port, 24)
(
frame_size, # payload size (uint16 le)
_, # frame id number (uint16 le)
delta, # delay since previous frame, unit 2**-32 s (uint32)
samplecount, # number of accumulated sample (uint32)
u1s_cum, # sum of the u1**2 tensions (float32)
u2s_cum, # sum of the u2**2 tensions (float32)
u3s_cum, # sum of the u3**2 tensions (float32)
) = struct.unpack("<HHIIfff", data)
# print(frame_size, frame_id, delta, u1s_cum, u2s_cum, u3s_cum)
if frame_size < 30:
msg = f"the frame size is absurde {frame_size}"
raise ValueError(msg)
if samplecount == 0:
msg = "the first frame (samplecount=0) has to be rejected"
raise ValueError(msg)
# frame crc verification
data += self._read_port(port, frame_size-21) # contains crc
checksum = data[0] # faster than functools.reduce(operator.xor, data[:-1]), ...
for item in data[1:-1]:
checksum ^= item
if checksum != data[-1]:
msg = f"the frame checksum {checksum} doese not match the excpected checksum {data[-1]}"
raise ValueError(msg)
data = data[24:-1]
# extract cumulated current and cummulated efficient powers
is_cum, p_cum = [], [] # each cumulative square current an efficient power for each channel
while data:
is_cum.extend(struct.unpack("<ffffff", data[:24]))
p_cum.extend(struct.unpack("<ffffff", data[24:48]))
data = data[48:]
# final normalization
tot = float(samplecount)
return {
"dt": float(delta)*STEP,
"u_rms": [math.sqrt(u1s_cum / tot), math.sqrt(u2s_cum / tot), math.sqrt(u3s_cum / tot)],
"i_rms": [math.sqrt(is_cum_ / tot) for is_cum_ in is_cum],
"p_eff": [p_cum_ / tot for p_cum_ in p_cum],
}
@staticmethod
def _read_port(port: serial.Serial, nbr: int = 1) -> bytes:
"""Raise an exception if the port is not readable."""
try:
if len(data := port.read(nbr)) == nbr:
return data
# device reports readiness to read but returned no data...
except serial.SerialException as err:
time.sleep(0.1) # wait for better stabilisation
msg = f"device {port.port!r} busy"
raise ValueError(msg) from err
msg = f"failed to read on device {port.port!r}, data to short {len(data)} vs {nbr}"
raise TimeoutError(msg)
def _yield_frames(self, port: serial.Serial) -> dict[str]:
"""Like _read_frame but skip the corrupted one."""
n_corrupted_max = 5
errors = [None]
while True:
for _ in range(n_corrupted_max):
try:
self._find_header(port)
yield self._read_frame(port)
except ValueError as err: # reject the first frame
errors[0] = err
else:
break
else:
msg = f"{n_corrupted_max} corrupted packets recovered subsequently"
raise RuntimeError(msg) from errors.pop()
[docs]
@staticmethod
def guess_port() -> str:
"""Automaticaly find the port in the list of reachable ports."""
known_vid_pid = [
(0x10C4, 0xEA60), # CP210x
(0x1A86, 0x7523), # CH340
(0x0403, 0x6001), # FTDI FT232
(0x0403, 0x6011), # FTDI FT4232
]
ports = comports()
if len( # first selection
selection := {
port.device for port in comports()
if any(port.vid == vid and port.pid == pid for vid, pid in known_vid_pid)
} or {p.device for p in ports},
) == 1:
return selection.pop()
# fallback on default values
if not (selection := {p for p in selection if "USB" in p or "ACM" in p}):
msg = "no usb port found for the wattmeter, please provide it"
raise RuntimeError(msg)
return min(selection)
[docs]
def run(self) -> None:
"""Perform the measures."""
is_warming = True
if self.res is None:
self._signal_queue.put(None)
return
# reset the measures
for data in self.res.values():
data.clear()
try:
# open the port
with serial.Serial(
self.port,
baudrate=BAUD,
timeout=1.0,
dsrdtr=False, # already default values, just to be shure
rtscts=False, # already default values, just to be shure
) as port:
port.dtr = port.rts = False # like in adecwatts c code
port.reset_input_buffer() # can trigger other daemons doing stuff on this port
self.res["ts"].append(time.time())
for frame in self._yield_frames(port):
if is_warming:
is_warming = False
self._signal_queue.put(None) # send signal catched by .__enter__()
self.res["ts"].append(self.res["ts"][-1] + frame["dt"]) # increment timestamp
for lbl in ("u_rms", "i_rms", "p_eff"):
self.res[lbl].append(frame[lbl])
if self._stop_flag:
break
except Exception as err:
self._signal_queue.put(err)
raise
def __enter__(self) -> dict:
"""Start to measure.
Returns
-------
Consumption: dict[str]
* 'ts': The absolute timestamps between 2 mesurements (in s), shape (n+1,).
* 'u_rms': The voltage of the 3 phases (in V), shape (n, 3).
* 'i_rms': The current of each channal (in A), shape (n, c).
* 'p_eff': The efficient power of each channal (in W), shape (n, c).
"""
self.start()
if (err := self._signal_queue.get()) is not None: # wait until the first frame is catched
raise err
return self.res
def __exit__(self, *_: object) -> None:
"""Stop the measure and update the dictionary returnd by __enter__."""
self._stop_flag = True
self.join() # wait the last update of self.run
try:
err = self._signal_queue.get_nowait()
except queue.Empty:
pass
else:
raise err