Source code for mendevi.measures.gpu
"""Query the GPUs activity."""
import numbers
import queue
import threading
import time
import pynvml # uv pip install nvidia-ml-py
# initialisation
try:
pynvml.nvmlInit()
except pynvml.NVMLError:
GPUS = 0
else:
GPUS: int = pynvml.nvmlDeviceGetCount()
[docs]
def measure() -> dict[str]:
"""Get a instantaneous capture of all gpus."""
memory, gpu, power = [], [], []
for i in range(GPUS):
hand = pynvml.nvmlDeviceGetHandleByIndex(i)
power.append(pynvml.nvmlDeviceGetPowerUsage(hand) / 1000.0) # in Watts
gpu.append(pynvml.nvmlDeviceGetUtilizationRates(hand).gpu / 100.0) # in [0, 1]
memory.append(pynvml.nvmlDeviceGetMemoryInfo(hand).used) # in bytes
return {"memory": memory, "gpu": gpu, "power": power}
[docs]
class UsageGPU(threading.Thread):
"""Use pynvml through a python context manager.
Examples
--------
>>> from mendevi.measures.gpu import UsageGPU
>>> with UsageGPU() as gpu:
... pass
...
>>>
"""
def __init__(self, sleep: numbers.Real = 50e-3) -> None:
"""Initialize the usage context.
Parameters
----------
sleep : float, default=50e-3
The time interval between 2 measures (in s).
"""
super().__init__(daemon=True)
assert isinstance(sleep, numbers.Real), sleep.__class__.__name__
assert sleep > 0, sleep
self._signal_queue = queue.Queue()
self._stop_flag = False
self.sleep = float(sleep)
self.res: dict | None = {
"ts": [],
"gpu": [],
"memory": [],
"power": [],
} if GPUS else None
[docs]
def run(self) -> None:
"""Perform the measures."""
is_warming = True
if self.res is None:
self._signal_queue.put(None)
return
# reset the measures
for data in self.res.values():
data.clear()
# start measurements
self.res["ts"].append(time.time())
try:
while True:
for cat, vals in measure().items():
self.res[cat].append(vals)
if is_warming:
is_warming = False
self._signal_queue.put(None) # send signal catched by .__enter__()
time.sleep(max(0.0, self.sleep + self.res["ts"][-1] - time.time()))
self.res["ts"].append(time.time())
if self._stop_flag:
break
except Exception as err:
self._signal_queue.put(err)
raise
def __enter__(self) -> dict:
"""Start to measure.
Returns
-------
Consumption: dict[str]
* 'gpu': The mean usage of all the logical gpus. Shape (n_points, n_gpu).
* 'memory': The memory used for each gpu (in bytes). Shape (n_points, n_gpu).
* 'power': The power measured between 2 consecutive points (in w). (n_points, n_gpu).
* 'ts': The time duration of each measurements (in s). Shape (n_points+1,).
"""
self.start()
if (err := self._signal_queue.get()) is not None: # wait until the first frame is catched
raise err
return self.res
def __exit__(self, *_: object) -> None:
"""Stop the measure and update the dictionary returnd by __enter__."""
self._stop_flag = True
self.join() # wait the last update of self.run
try:
err = self._signal_queue.get_nowait()
except queue.Empty:
pass
else:
raise err