"""Get the power consumption of as usb plugded ADEC WATTS wattmeters."""
import math
import struct
import threading
import time
import serial # uv pip install pyserial
from serial.tools.list_ports import comports
HEADER: bytes = b"\xfe\xfd\xfc\xfb" # comes from adecwatts c code 'read_device_values'
BAUD: int = 921600
STEP = 2.0**-32.0 # time counter step time
[docs]
class ADECWattmeter(threading.Thread):
"""Extract the activity of a ADEC-WATTS wattmeter connected to serial usb port.
Examples
--------
>>> import time
>>> from mendevi.measures.adec import ADECWattmeter
>>> with ADECWattmeter() as usage:
... time.sleep(1)
...
>>>
"""
def __init__(self, port: str | None = None, *, no_fail: bool = False) -> None:
"""Initialize the usage context.
Parameters
----------
port : str, default=autodetect
The port to listen on. In gerenal it is "/dev/ttyUSB0".
By default, it is autodetected using :py:meth`guess_port`.
no_fail : bool, default=True
If False, raise RuntimeError if it fails to get the ADEC measure.
Otherwise (if True), return None instead of failing.
"""
assert isinstance(no_fail, bool), no_fail
super().__init__(daemon=True)
if port is None:
try:
port = self.guess_port()
except RuntimeError:
if not no_fail:
raise
else:
assert isinstance(port, "/dev/ttyUSB0"), port.__class__.__name__
self._stop_flag = False
self.port = port
self.res: dict | None = {"dt": [], "u_rms": [], "i_rms": [], "p_eff": []} if port else None
def _find_header(self, port: serial.Serial) -> float:
"""Read the stream until it encounters the header."""
buffer: bytes = self._read_port(port, len(HEADER))
while buffer != HEADER:
buffer = buffer[1:] + self._read_port(port)
return time.time()
def _read_frame(self, port: serial.Serial) -> dict[str]:
"""Read a full frame after the header has been consumed."""
# get frame informations from header
data = self._read_port(port, 24)
(
frame_size, # payload size (uint16 le)
_, # frame id number (uint16 le)
delta, # delay since previous frame, unit 2**-32 s (uint32)
samplecount, # number of accumulated sample (uint32)
u1s_cum, # sum of the u1**2 tensions (float32)
u2s_cum, # sum of the u2**2 tensions (float32)
u3s_cum, # sum of the u3**2 tensions (float32)
) = struct.unpack("<HHIIfff", data)
# print(frame_size, frame_id, delta, u1s_cum, u2s_cum, u3s_cum)
if frame_size < 30:
msg = f"the frame size is absurde {frame_size}"
raise ValueError(msg)
if samplecount == 0:
msg = "the first frame (samplecount=0) has to be rejected"
raise ValueError(msg)
# frame crc verification
data += self._read_port(port, frame_size-21) # contains crc
checksum = data[0] # faster than functools.reduce(operator.xor, data[:-1]), ...
for item in data[1:-1]:
checksum ^= item
if checksum != data[-1]:
msg = f"the frame checksum {checksum} doese not match the excpected checksum {data[-1]}"
raise ValueError(msg)
data = data[24:-1]
# extract cumulated current and cummulated efficient powers
is_cum, p_cum = [], [] # each cumulative square current an efficient power for each channel
while data:
is_cum.extend(struct.unpack("<ffffff", data[:24]))
p_cum.extend(struct.unpack("<ffffff", data[24:48]))
data = data[48:]
# final normalization
tot = float(samplecount)
return {
"dt": float(delta)*STEP,
"u_rms": [math.sqrt(u1s_cum / tot), math.sqrt(u2s_cum / tot), math.sqrt(u3s_cum / tot)],
"i_rms": [math.sqrt(is_cum_ / tot) for is_cum_ in is_cum],
"p_eff": [p_cum_ / tot for p_cum_ in p_cum],
}
@staticmethod
def _read_port(port: serial.Serial, nbr: int = 1) -> bytes:
"""Raise an exception if the port is not readable."""
if len(data := port.read(nbr)) == nbr:
return data
msg = f"failed to read on device {port.port!r}"
raise TimeoutError(msg)
[docs]
@staticmethod
def guess_port() -> str:
"""Automaticaly find the port in the list of reachable ports."""
known_vid_pid = [
(0x10C4, 0xEA60), # CP210x
(0x1A86, 0x7523), # CH340
(0x0403, 0x6001), # FTDI FT232
(0x0403, 0x6011), # FTDI FT4232
]
ports = comports()
if len( # first selection
selection := {
port.device for port in comports()
if any(port.vid == vid and port.pid == pid for vid, pid in known_vid_pid)
} or {p.device for p in ports},
) == 1:
return selection.pop()
# fallback on default values
if not (selection := {p for p in selection if "USB" in p or "ACM" in p}):
msg = "no usb port found for the wattmeter, please provide it"
raise RuntimeError(msg)
return min(selection)
[docs]
def run(self) -> None:
"""Perform the measures."""
if self.res is None:
return
# open the port
with serial.Serial(
self.port,
baudrate=BAUD,
timeout=1.0,
dsrdtr=False, # already default values, juste to be shure
rtscts=False, # already default values, juste to be shure
) as port:
port.dtr = port.rts = False # like in adecwatts c code
port.reset_input_buffer()
self._find_header(port) # reject the first frame
self._find_header(port) # and the second one as well
while not self._stop_flag:
self._find_header(port)
for lbl, value in self._read_frame(port).items():
self.res[lbl].append(value)
def __enter__(self) -> dict:
"""Start to measure.
Returns
-------
Consumption: dict[str]
* 'dt': The duration of the sample measure (in s).
* 'u_rms': The voltage of the 3 phases (in V).
* 'i_rms': The current of each channal (in A).
* 'p_eff': The efficient power of each channal (in W).
"""
self.start()
return self.res
def __exit__(self, *_: object) -> None:
"""Stop the measure and update the dictionary returnd by __enter__."""
self._stop_flag = True
self.join() # wait the last update of self.run
if __name__ == "__main__":
with ADECWattmeter() as usage:
time.sleep(10)
import pprint
pprint.pprint(usage)