"""All measurement helper."""
import contextlib
import numbers
import platform
import queue
import threading
import time
import typing
import numpy as np
from .adecwatts import ADECWattmeter
from .g5kpower import g5kpower
from .gpu import UsageGPU
from .psutil import Usage
from .rapl import RAPL
G5K_API_WAIT = 5.0 # waiting time (in sec) before data from the grid5000 API becomes accessible
[docs]
class Activity(threading.Thread):
"""Measure the computer activity of a section.
Examples
--------
>>> import pprint, time
>>> from mendevi.measures import Activity
>>> with Activity() as activity:
... time.sleep(0.1)
... activity.start_slice("lbl 1")
... time.sleep(0.2)
... _ = activity.stop_slice()
... activity.start_slice("lbl 2")
... time.sleep(0.3)
... activity.start_slice("lbl 3")
... time.sleep(0.4)
...
>>> slices, full = activity.get_slices(), activity.get_full()
>>> round(full["duration"], 2)
1.0
>>> sorted(slices)
['lbl 1', 'lbl 2', 'lbl 3']
>>> round(slices["lbl 2"]["duration"], 2)
0.3
>>>
"""
def __init__(self, sleep: numbers.Real = 50e-3) -> None:
"""Init the perf context.
Parameters
----------
sleep : float, default=50e-3
The time interval between 2 measures (in s).
"""
assert isinstance(sleep, numbers.Real), sleep.__class__.__name__
assert sleep > 0, sleep
super().__init__(daemon=True)
self._adec_catcher = ADECWattmeter(no_fail=True)
self._rapl_catcher = RAPL(sleep=sleep, no_fail=True)
self._usage_catcher = Usage(sleep=sleep)
self._usage_gpu_catcher = UsageGPU(sleep=sleep)
self._exit_queue = queue.Queue()
self._slices: dict[typing.Hashable, list[float, float]] = {} # {lbl: [start, stop], ...}
self._slices_current_lbl: typing.Hashable | None = None # the currently processing label
self._as_dict_called: bool = False
self.sleep = float(sleep)
self.res: dict = {}
[docs]
def get_full(self) -> dict | tuple[dict, dict]:
"""Collect the data and return the full activity.
Returns
-------
activity: dict[str]
* duration: float, the real measure duration.
* gpu_core: list[list[float]], tensor of detailed usage of each GPU in [0, 1].
* gpu_core_avg: float, the sum of the average usage of each GPU.
* gpu_dt: list[float], the duration of each interval (in s).
* gpu_power: list[list[float]], the sampled power in each point (in W).
* gpu_power_avg: float, the total power consumption of all GPU (in W).
* ps_core: list[list[float]], tensor of detailed usage of each logical core in %.
* ps_core_avg: float, the mean cummulated usage of all the logical cpus.
* ps_dt: list[float], the duration of each interval (in s).
* ps_ram: list[int], list of the sampled ram usage in bytes in each point.
* ps_ram_avg: float, the average ram usage in bytes in each point.
* ps_temp: dict[str, list[float]], the temperature of each device (in C).
* ps_temp_avg: dict[str, float], the average temperature of each device (in C).
* rapl_dt: list[float], the duration of each interval (in s).
* rapl_power: list[float], the average power in each interval (in W).
* rapl_power_avg: float, the average power, energy divided by the duration (in W).
* start: float, absolute timestamp.
* wattmeter_dt: list[float], the duration of each interval (in s).
* wattmeter_power: list[float], the sampled power in watt in each point.
* wattmeter_power_avg: float, the average power, energy divided by the duration (in W).
"""
self.get_slices()
return self.res[1]
[docs]
def get_slices(self) -> dict[str, dict]:
"""For each slices initialized by start_slice, are returned here.
Returns
-------
slices : dict[str, dict]
For each label given in ``start_slice``,
return the same dict as :py:meth:`get_full` for this specific slice.
"""
# stop measurements
if self._as_dict_called:
return self.res[0]
self._as_dict_called = True
self.join()
# request wattmeter power
if "wattmeter_dt" not in self.res: # ADEC and GRID'5000 are mutualy exclusive
time.sleep(max(
0.0,
G5K_API_WAIT + self.res["start"] + self.res["duration"] - time.time(),
))
try:
wattmeter = g5kpower(platform.node(), self.res["start"], self.res["duration"])
except ValueError:
wattmeter = None
else:
self.res |= {
"wattmeter_dt": wattmeter["dt"],
"wattmeter_power": wattmeter["power"],
}
# cut slices
dtlbl_datalbls = (
("ps_dt", ("ps_core", "ps_ram", "ps_temp")),
("rapl_dt", ("rapl_power",)),
("gpu_dt", ("gpu_core", "gpu_memory", "gpu_power")),
("wattmeter_dt", ("wattmeter_power",)),
)
dtlbl_datalbls = tuple((dt, fields) for dt, fields in dtlbl_datalbls if dt in self.res)
if self._slices:
slices = { # each slice, not concats
slicelbl: {
**{
fieldlbl: data
for dt_lbl, data_lbls in dtlbl_datalbls
for fieldlbl, data in self._extract_slice(
dt_lbl, data_lbls, start, stop,
).items()
},
"start": start,
"duration": stop-start,
}
for slicelbl, (start, stop) in self._slices.items()
}
# add the avg values
slices = { # for slices
slicelbl: {
**data, # keep the expanded data
**{ # add the averages
f"{flbl}_avg": self._trapez_integral_avg(data[dtlbl], data[flbl])
for dtlbl, fieldlbls in dtlbl_datalbls for flbl in fieldlbls
},
}
for slicelbl, data in slices.items()
}
self.res = (slices, self.res)
else:
self.res = ({}, self.res)
return self.res[0]
[docs]
def run(self) -> None:
"""Perform the measures."""
self.res["start"] = time.time()
with (
self._adec_catcher as adec,
self._rapl_catcher as rapl,
self._usage_catcher as usage,
self._usage_gpu_catcher as gpu,
):
self._exit_queue.get() # wait
self.res |= {
"ps_core": usage["cpu"],
"ps_dt": usage["dt"],
"ps_ram": usage["ram"],
"ps_temp": usage["temp"],
}
if adec is not None:
self.res |= {
"wattmeter_dt": adec["dt"],
"wattmeter_power": [sum(p) for p in adec["p_eff"]],
}
if rapl is not None:
self.res |= {
"rapl_dt": rapl["dt"],
"rapl_power": rapl["power"],
}
if gpu is not None:
self.res |= {
"gpu_core": gpu["gpu"],
"gpu_dt": gpu["dt"],
"gpu_memory": gpu["memory"],
"gpu_power": gpu["power"],
}
[docs]
def start_slice(self, lbl: typing.Hashable) -> None:
"""Add a new named slice of activity.
Parameters
----------
lbl : hashable
The section name, as a key of the final dictionay.
If it is called several times, the slice with the same label are concatenated.
Notes
-----
If this method is never called, the final result will not be pack into a dict.
"""
assert hasattr(lbl, "__hash__"), lbl.__class__.__name__
assert lbl is not None, "'None' is a forbidden label"
if lbl in self._slices:
msg = f"the method start_slice has already been called with the label {lbl!r}"
raise KeyError(msg)
# end last slice
if self._slices_current_lbl is not None and self._slices[self._slices_current_lbl]:
self._slices[self._slices_current_lbl][1] = (
self._slices[self._slices_current_lbl][1] or time.time()
)
# start new slice
self._slices[lbl] = [time.time(), None]
self._slices_current_lbl = lbl
[docs]
def stop_slice(self) -> typing.Hashable:
"""Close the last slice.
Returns
-------
lbl : hashable
The name of the closed slice.
"""
# check
if self._slices_current_lbl is None:
msg = "no slice capture in process"
raise RuntimeError(msg)
# end slice
self._slices[self._slices_current_lbl][1] = time.time()
# reset
lbl = self._slices_current_lbl
self._slices_current_lbl = None
return lbl
def _extract_slice(self, dt_lbl: str, data_lbls: list[str], start: float, stop: float) -> dict:
"""Extract data for the slice time.
Paramaters
----------
dt_lbl : str
The label of the reference time durations.
data_lbls : list[str]
Labels of interset, to be cut.
start, stop : float
The absolute timestamps of the start and stop slice.
Returns
-------
sub_slice : dict
The cut and selected data.
Only this labels and dt_lbl are in the final return dict.
"""
assert isinstance(dt_lbl, str), dt_lbl.__class__.__name__
assert dt_lbl in self.res, sorted(self.res)
assert isinstance(data_lbls, list | tuple), data_lbls.__class__.__name__
assert all(lbl in self.res for lbl in data_lbls), sorted(self.res)
assert isinstance(start, float), start.__class__.__name__
assert isinstance(stop, float), stop.__class__.__name__
# get slice start and stop indicies
dts = np.asarray(self.res[dt_lbl], dtype=np.float64)
timestamps = self.res["start"] + dts.cumsum()
timestamps = np.asarray([0.0, *timestamps.tolist(), np.inf], dtype=timestamps.dtype)
idx_start = int((timestamps < start).view(np.uint8).argmin() - 1)
idx_stop = int((timestamps >= stop).view(np.uint8).argmax())
# restrict timestamps diff (dt) interval
timestamps = timestamps[idx_start:idx_stop]
timestamps[0], timestamps[-1] = start, stop
# cut slices
def cut_slice(data: list | dict, idx_start: int, idx_stop: int) -> list:
"""Select the slice."""
match data:
case list() | tuple():
if not data:
return []
if isinstance(data[0], list | tuple): # shape (n_times_point, n_sensors)
return list(zip(
*(
cut_slice(subdata, idx_start, idx_stop)
for subdata in zip(*data, strict=True)
),
strict=True,
))
# return [cut_slice(subdata, idx_start, idx_stop) for subdata in data]
if isinstance(data[0], numbers.Real):
return [data[0], *data, data[-1]][idx_start:idx_stop]
raise TypeError
case dict():
return {
lbl: cut_slice(subdata, idx_start, idx_stop)
for lbl, subdata in data.items()
}
raise TypeError
return {
**{lbl: cut_slice(self.res[lbl], idx_start, idx_stop) for lbl in data_lbls},
dt_lbl: (timestamps[1:] - timestamps[:-1]).tolist(), # absolute to dt
}
def _trapez_integral_avg(self, times_delta: list[float], data: list | dict) -> float | dict:
"""Compute the average value of a data serie."""
match data:
case list() | tuple():
if not data:
return np.nan
if isinstance(data[0], list | tuple): # shape (n_times_point, n_sensors)
return sum(
self._trapez_integral_avg(times_delta, d) for d in zip(*data, strict=True)
)
if isinstance(data[0], numbers.Real):
if duration := sum(times_delta):
if len(times_delta) == len(data): # simple integral with rectangle
return sum(
dt * val for dt, val in zip(times_delta, data, strict=True)
) / duration
if len(times_delta) == len(data) - 1: # trapeze integration
return 0.5 * sum( # trapeze integration
dt * (bmin + bmax)
for dt, bmin, bmax
in zip(times_delta, data[:-1], data[1:], strict=True)
) / duration # average of integral
msg = (
f"the dt length {len(times_delta)} "
f"and the data length {len(data)} are not compatible"
)
raise ValueError(msg)
return np.nan
raise TypeError
case dict():
return {k: self._trapez_integral_avg(times_delta, d) for k, d in data.items()}
raise TypeError
def __enter__(self) -> typing.Self:
"""Start to measure.
Notes
-----
The returned dictionary is update inplace when we exit the code bloc.
Only the successfull field are created.
"""
self.start()
return self
def __exit__(self, *_: object) -> None:
"""Stop the measure and update the dictionary returnd by __enter__."""
# stop
self.res["duration"] = time.time() - self.res["start"]
self._exit_queue.put(None)
with contextlib.suppress(RuntimeError):
self.stop_slice()