Source code for mendevi.utils

#!/usr/bin/env python3

"""Provide simple tools."""

import base64
import functools
import hashlib
import logging
import multiprocessing.pool
import numbers
import pathlib
import platform
import queue
import re
import threading
import time
import typing

from cutcutcodec.core.io import VIDEO_SUFFIXES
import cutcutcodec
import tqdm

from mendevi.g5kpower import g5kpower
from mendevi.psutil import Usage
from mendevi.rapl import RAPL

PATHLIKE = str | bytes | pathlib.Path


[docs] class Activity(threading.Thread): """Measure the computer activity of a section. Examples -------- >>> import pprint, time >>> from mendevi.utils import Activity >>> with Activity() as activity: ... time.sleep(1) ... >>> pprint.pprint(activity) >>> """ def __init__(self, sleep: numbers.Real=50e-3): """Initalize the perf context. Parameters ---------- sleep : float, default=50e-3 The time interval between 2 measures (in s). """ assert isinstance(sleep, numbers.Real), sleep.__class__.__name__ assert sleep > 0, sleep super().__init__(daemon=True) self._rapl_catcher = RAPL(sleep=sleep, no_fail=True) self._usage_catcher = Usage(sleep=sleep) self._exit_queue = queue.Queue() self.sleep = float(sleep) self.res: dict = {}
[docs] def run(self): """Perform the measures.""" self.res["start"] = time.time() with self._rapl_catcher as rapl, self._usage_catcher as usage: self._exit_queue.get() # wait self.res |= { "ps_core": usage["cpu"], "ps_cores": usage["cpus"], "ps_dt": usage["dt"], "ps_ram": usage["ram"], } if rapl is not None: self.res |= { "rapl_dt": rapl["dt"], "rapl_energy": rapl["energy"], "rapl_power": rapl["power"], "rapl_powers": rapl["powers"], }
def __enter__(self) -> dict: r"""Start to measure. Returns ------- activity: dict[str] * duration: float, the real measure duration. * ps_core: float, the mean cummulated usage of all the logical cpus. * ps_cores: list[list[float]], tensor of detailed usage of each logical core in %. * ps_dt: list[float], the duration of each interval (in s). * ps_ram: list[int], list of the sampled ram usage in bytes in each point. * rapl_dt: list[float], the duration of each interval (in s). * rapl_energy: float, the total energy consumption (in J). * rapl_power: float, the average power, energy divided by the duration (in w). * rapl_powers: list[float], the average power in watt in each interval. * start: float, absolute timestamp. * wattmeter_dt: list[float], the duration of each interval (in s). * wattmeter_energy: float, the total energy consumption (in J). * wattmeter_power: float, the average power, energy divided by the duration (in w). * wattmeter_powers: list[float], the sampled power in watt in each point. Notes ----- The returned dictionary is update inplace when we exit the code bloc. Only the successfull field are created. """ self.start() return self.res def __exit__(self, *_): """Stop the measure and update the dictionary returnd by __enter__.""" # stop self.res["duration"] = time.time() - self.res["start"] self._exit_queue.put(None) self.join() # request wattmeter power try: wattmeter = g5kpower(platform.node(), self.res["start"], self.res["duration"]) except ValueError: wattmeter = None else: self.res |= { "wattmeter_dt": wattmeter["dt"], "wattmeter_energy": wattmeter["energy"], "wattmeter_power": wattmeter["power"], "wattmeter_powers": wattmeter["powers"], }
[docs] def compute_video_hash( videos: PATHLIKE | typing.Iterable[PATHLIKE] ) -> bytes | dict[pathlib.Path, bytes]: r"""Compute the checksum of the video. For :math:`n` hash of :math:`b` bits, the proba of the colision :math:`C` is :math:`p(C) = 1 - \left(\frac{2^k-1}{2^k}\right)^{\frac{n(n-1)}{2}}`. The md5 hash uses :math:`b = 128` bits. If we add one video per second durring 10 years, the proba of colision is about :math:`p(C) \approx 1.46*10^{-22}`. That's why the md5 hash is used to identify the video files. Parameters ---------- videos : pathlike or list[pathlike] The single or set of video you want to compute the signature. Returns ------- signatures The md5 checksum of the video file. In the case of a multiple file, a dictionary containing the file and the hash is returned rather a single hash. If the file does not exists, return None. """ def _hash(video: PATHLIKE) -> pathlib.Path: video = pathlib.Path(video) if match := re.search(r"[2-7a-z]{26}", video.stem): return video, signature_to_hash(match.group()) if not (video := video.expanduser()).is_file(): return video, None with open(video, "rb") as raw: return video, hashlib.file_digest(raw, "md5").digest() if isinstance(videos, list | tuple | set | frozenset): with multiprocessing.pool.ThreadPool() as pool: return dict(tqdm.tqdm( pool.imap_unordered(_hash, videos), desc="compute videos checksum", dynamic_ncols=True, leave=False, smoothing=1e-6, total=len(videos), unit="video", )) return _hash(videos)[1]
[docs] @functools.cache def get_pix_fmt(*args): """Alias to cutcutcodec func.""" return cutcutcodec.get_pix_fmt(*args)
[docs] def get_project_root() -> pathlib.Path: """Return the absolute project root folder. Examples -------- >>> from mendevi.utils import get_project_root >>> root = get_project_root() >>> root.is_dir() True >>> root.name 'mendevi' >>> sorted(p.name for p in root.iterdir()) # doctest: +ELLIPSIS ['__init__.py', '__main__.py', ...] >>> """ return pathlib.Path(__file__).resolve().parent
[docs] @functools.cache def get_rate_video(*args): """Alias to cutcutcodec func.""" return cutcutcodec.get_rate_video(*args)
[docs] @functools.cache def get_resolution(*args): """Alias to cutcutcodec func.""" return cutcutcodec.get_resolution(*args)
[docs] def hash_to_signature(checksum: bytes) -> str: r"""Convert the md5 binary hash value into an urlsafe string. Bijection of :py:func:`signature_to_hash`. Parameters ---------- checksum : bytes The 128 bit binary hash value. Returns ------- signature : str The 26 ascii [2-7a-z] symbols string of the converted checksum. Examples -------- >>> from mendevi.utils import hash_to_signature >>> hash_to_signature(b"\xd4\x1d\x8c\xd9\x8f\x00\xb2\x04\xe9\x80\t\x98\xec\xf8B~") '2qoyzwmpaczaj2mabgmoz6ccpy' >>> """ assert isinstance(checksum, bytes), checksum.__class__.__name__ assert len(checksum) == 16, len(checksum) return base64.b32encode(checksum)[:26].decode().lower()
[docs] def signature_to_hash(signature: str) -> bytes: r"""Convert the string signature into the md5 checksum. Bijection of :py:func:`hash_to_signature`. Parameters ---------- signature : str The 26 ascii [2-7a-z] symbols string of the converted checksum. Returns ------- checksum : bytes The 128 bit binary hash value. Examples -------- >>> from mendevi.utils import signature_to_hash >>> signature_to_hash("2qoyzwmpaczaj2mabgmoz6ccpy") b'\xd4\x1d\x8c\xd9\x8f\x00\xb2\x04\xe9\x80\t\x98\xec\xf8B~' >>> """ assert isinstance(signature, str), signature.__class__.__name__ assert re.fullmatch(r"[2-7a-z]{26}", signature), signature return base64.b32decode(f"{signature.upper()}======".encode())
[docs] def unfold_video_files( paths: typing.Iterable[PATHLIKE] ) -> typing.Iterable[pathlib.Path]: """Explore recursively the folders to find the video path. Parameters ---------- paths : list[pathlike] All the folders, files, glob or recursive glob expression. Yields ------ filename : pathlib.Path The path of the video. """ assert hasattr(paths, "__iter__"), paths.__class__.__name__ for path in paths: path = pathlib.Path(path).expanduser() if path.is_file(): yield path elif path.is_dir(): for root, _, files in path.walk(): for file in files: file = root / file if file.suffix.lower() in VIDEO_SUFFIXES: yield file elif "*" in path.name and path.parent.is_dir(): yield from unfold_video_files(path.parent.glob(path.name)) elif "**" in (parts := path.parts): idx = parts.index("**") yield from unfold_video_files( pathlib.Path(*parts[:idx]).glob(pathlib.Path(*parts[idx:])) ) else: logging.warning("the path %s is not correct", path)