Source code for mendevi.measures

"""All measurement helper."""

import numbers
import platform
import queue
import threading
import time

from .g5kpower import g5kpower
from .gpu import UsageGPU
from .psutil import Usage
from .rapl import RAPL

G5K_API_WAIT = 2.0  # waiting time (in seconds) before data from the grid5000 API becomes accessible


[docs] class Activity(threading.Thread): """Measure the computer activity of a section. Examples -------- >>> import pprint, time >>> from mendevi.measures import Activity >>> with Activity() as activity: ... time.sleep(1) ... >>> pprint.pprint(activity) >>> """ def __init__(self, sleep: numbers.Real = 50e-3) -> None: """Init the perf context. Parameters ---------- sleep : float, default=50e-3 The time interval between 2 measures (in s). """ assert isinstance(sleep, numbers.Real), sleep.__class__.__name__ assert sleep > 0, sleep super().__init__(daemon=True) self._rapl_catcher = RAPL(sleep=sleep, no_fail=True) self._usage_catcher = Usage(sleep=sleep) self._usage_gpu_catcher = UsageGPU(sleep=sleep) self._exit_queue = queue.Queue() self.sleep = float(sleep) self.res: dict = {}
[docs] def run(self) -> None: """Perform the measures.""" self.res["start"] = time.time() with ( self._rapl_catcher as rapl, self._usage_catcher as usage, self._usage_gpu_catcher as gpu, ): self._exit_queue.get() # wait self.res |= { "ps_core": usage["cpu"], "ps_cores": usage["cpus"], "ps_dt": usage["dt"], "ps_ram": usage["ram"], } if rapl is not None: self.res |= { "rapl_dt": rapl["dt"], "rapl_energy": rapl["energy"], "rapl_power": rapl["power"], "rapl_powers": rapl["powers"], } if gpu is not None: self.res |= { "gpu_cores": gpu["gpus"], "gpu_dt": gpu["dt"], "gpu_energy": gpu["energy"], "gpu_memory": gpu["memory"], "gpu_power": gpu["power"], "gpu_powers": gpu["powers"], }
def __enter__(self) -> dict: r"""Start to measure. Returns ------- activity: dict[str] * duration: float, the real measure duration. * gpu_cores: list[list[float]], tensor of detailed usage of each GPU in [0, 1]. * gpu_dt: list[float], the duration of each interval (in s). * gpu_energy: float, the total energy consumption of all GPU (in J). * gpu_power: float, the total power consumption of all GPU (in W). * gpu_powers: list[list[float]], the sampled power in each point (in W). * ps_core: float, the mean cummulated usage of all the logical cpus. * ps_cores: list[list[float]], tensor of detailed usage of each logical core in %. * ps_dt: list[float], the duration of each interval (in s). * ps_ram: list[int], list of the sampled ram usage in bytes in each point. * rapl_dt: list[float], the duration of each interval (in s). * rapl_energy: float, the total energy consumption (in J). * rapl_power: float, the average power, energy divided by the duration (in W). * rapl_powers: list[float], the average power in each interval (in W). * start: float, absolute timestamp. * wattmeter_dt: list[float], the duration of each interval (in s). * wattmeter_energy: float, the total energy consumption (in J). * wattmeter_power: float, the average power, energy divided by the duration (in W). * wattmeter_powers: list[float], the sampled power in watt in each point. Notes ----- The returned dictionary is update inplace when we exit the code bloc. Only the successfull field are created. """ self.start() return self.res def __exit__(self, *_: object) -> None: """Stop the measure and update the dictionary returnd by __enter__.""" # stop self.res["duration"] = time.time() - self.res["start"] self._exit_queue.put(None) self.join() # request wattmeter power time.sleep(max(0.0, G5K_API_WAIT + self.res["start"] + self.res["duration"] - time.time())) try: wattmeter = g5kpower(platform.node(), self.res["start"], self.res["duration"]) except ValueError: wattmeter = None else: self.res |= { "wattmeter_dt": wattmeter["dt"], "wattmeter_energy": wattmeter["energy"], "wattmeter_power": wattmeter["power"], "wattmeter_powers": wattmeter["powers"], }