Source code for mendevi.utils

"""Provide simple tools."""

import base64
import contextlib
import functools
import hashlib
import logging
import math
import multiprocessing.pool
import numbers
import pathlib
import re
import shutil
import tempfile
import typing
from fractions import Fraction

import cutcutcodec
import tqdm
from cutcutcodec.core.io import VIDEO_SUFFIXES

from mendevi.cst.profiles import PROFILES

HASHSIZE = 16  # md5 has number of bytes
PATHLIKE = str | bytes | pathlib.Path


[docs] @functools.cache def best_profile(height: numbers.Integral, width: numbers.Integral) -> str: """Return the closest profile name. Examples -------- >>> from mendevi.utils import best_profile >>> best_profile(1080, 1920) 'fhd' >>> """ assert isinstance(height, numbers.Integral), height.__class__.__name__ assert isinstance(width, numbers.Integral), width.__class__.__name__ size = math.sqrt(float(height * width)) dist_to_profile = { abs(math.sqrt(float(v["resolution"][0]*v["resolution"][1])) - size): p for p, v in PROFILES.items() } return dist_to_profile[min(dist_to_profile)]
[docs] def compute_video_hash( videos: PATHLIKE | typing.Iterable[PATHLIKE], *, fast: bool=True, ) -> bytes | dict[pathlib.Path, bytes]: r"""Compute the checksum of the video. For :math:`n` hash of :math:`b` bits, the proba of the colision :math:`C` is :math:`p(C) = 1 - \left(\frac{2^k-1}{2^k}\right)^{\frac{n(n-1)}{2}}`. The md5 hash uses :math:`b = 128` bits. If we add one video per second durring 10 years, the proba of colision is about :math:`p(C) \approx 1.46*10^{-22}`. That's why the md5 hash is used to identify the video files. Parameters ---------- videos : pathlike or list[pathlike] The single or set of video you want to compute the signature. fast : boolean, default=True If the hash appears in the file name, it is returned immediately. If False, the actual checksum is systematically recalculated. Returns ------- signatures The md5 checksum of the video file. In the case of a multiple file, a dictionary containing the file and the hash is returned rather a single hash. If the file does not exists, return None. """ assert isinstance(fast, bool), fast.__class__.__name__ def _hash(video_fast: tuple[PATHLIKE, bool]) -> pathlib.Path: video, fast = video_fast video = pathlib.Path(video) # [2-7a-z]{26} for mendevi > 1.2.2 if fast and (match := re.search(r"[0-9a-f]{32}|[2-7a-z]{26}", video.stem)): return video, signature_to_hash(match.group()) if not (video := video.expanduser()).is_file(): return video, None with video.open("rb") as raw: return video, hashlib.file_digest(raw, "md5").digest() if isinstance(videos, list | tuple | set | frozenset): with multiprocessing.pool.ThreadPool() as pool: return dict(tqdm.tqdm( pool.imap_unordered(_hash, ((v, fast) for v in videos)), desc="compute videos checksum", dynamic_ncols=True, leave=False, smoothing=1e-8, total=len(videos), unit="video", )) return _hash((videos, fast))[1]
[docs] @contextlib.contextmanager def cp_shm(file: pathlib.Path, threshold: float) -> pathlib.Path: """Copy the file into /dev/shm is there is enougth free space, otherwise in /tmp.""" assert isinstance(file, pathlib.Path), file.__class__.__name__ assert file.exists(), file shm = pathlib.Path("/dev/shm") tmp = pathlib.Path(tempfile.gettempdir()) file_copy = None if ( file.parts[:len(shm.parts)] != shm.parts # avoid copy to the same device and shutil.disk_usage(shm).free > threshold * file.stat().st_size ): file_copy = shm / file.name shutil.copy(file, file_copy) elif file.parts[:len(tmp.parts)] != tmp.parts: file_copy = tmp / file.name shutil.copy(file, file_copy) try: yield file_copy or file finally: if file_copy is not None: file_copy.unlink()
[docs] @functools.cache def get_pix_fmt(*args: tuple) -> str: """Alias to cutcutcodec func.""" return cutcutcodec.get_pix_fmt(*args)
[docs] @functools.cache def get_project_root() -> pathlib.Path: """Return the absolute project root folder. Examples -------- >>> from mendevi.utils import get_project_root >>> root = get_project_root() >>> root.is_dir() True >>> root.name 'mendevi' >>> sorted(p.name for p in root.iterdir()) # doctest: +ELLIPSIS ['__init__.py', '__main__.py', ...] >>> """ return pathlib.Path(__file__).resolve().parent
[docs] @functools.cache def get_rate_video(*args: tuple) -> Fraction: """Alias to cutcutcodec func.""" return cutcutcodec.get_rate_video(*args)
[docs] @functools.cache def get_resolution(*args: tuple) -> tuple[int, int]: """Alias to cutcutcodec func.""" return cutcutcodec.get_resolution(*args)
[docs] def hash_to_signature(checksum: bytes) -> str: r"""Convert the md5 binary hash value into an urlsafe string. Bijection of :py:func:`signature_to_hash`. Parameters ---------- checksum : bytes The 128 bit binary hash value. Returns ------- signature : str The 26 ascii [2-7a-z] symbols string of the converted checksum. Examples -------- >>> from mendevi.utils import hash_to_signature >>> hash_to_signature(b"\xd4\x1d\x8c\xd9\x8f\x00\xb2\x04\xe9\x80\t\x98\xec\xf8B~") 'd41d8cd98f00b204e9800998ecf8427e' >>> """ assert isinstance(checksum, bytes), checksum.__class__.__name__ assert len(checksum) == HASHSIZE, len(checksum) return checksum.hex()
[docs] def signature_to_hash(signature: str) -> bytes: r"""Convert the string signature into the md5 checksum. Bijection of :py:func:`hash_to_signature`. Parameters ---------- signature : str The 26 ascii [2-7a-z] symbols string of the converted checksum. Returns ------- checksum : bytes The 128 bit binary hash value. Examples -------- >>> from mendevi.utils import signature_to_hash >>> signature_to_hash("d41d8cd98f00b204e9800998ecf8427e") b'\xd4\x1d\x8c\xd9\x8f\x00\xb2\x04\xe9\x80\t\x98\xec\xf8B~' >>> signature_to_hash("2qoyzwmpaczaj2mabgmoz6ccpy") # mendevi < 1.2.2 b'\xd4\x1d\x8c\xd9\x8f\x00\xb2\x04\xe9\x80\t\x98\xec\xf8B~' >>> """ assert isinstance(signature, str), signature.__class__.__name__ try: # version >= 1.2.2 assert re.fullmatch(r"[0-9a-f]{32}", signature), signature except AssertionError: # retrocompatibility version < 1.2.2 assert re.fullmatch(r"[2-7a-z]{26}", signature), signature return base64.b32decode(f"{signature.upper()}======".encode()) else: return bytes.fromhex(signature)
[docs] def unfold_video_files( paths: typing.Iterable[PATHLIKE], ) -> typing.Iterable[pathlib.Path]: """Explore recursively the folders to find the video path. Parameters ---------- paths : list[pathlike] All the folders, files, glob or recursive glob expression. Yields ------ filename : pathlib.Path The path of the video. """ assert hasattr(paths, "__iter__"), paths.__class__.__name__ for path_general in paths: path = pathlib.Path(path_general).expanduser() if path.is_file(): yield path elif path.is_dir(): for root, _, files in path.walk(): for file_name in files: file = root / file_name if file.suffix.lower() in VIDEO_SUFFIXES: yield file elif "*" in path.name and path.parent.is_dir(): yield from unfold_video_files(path.parent.glob(path.name)) elif "**" in (parts := path.parts): idx = parts.index("**") yield from unfold_video_files( pathlib.Path(*parts[:idx]).glob(pathlib.Path(*parts[idx:])), ) else: logging.getLogger(__name__).warning("the path %s is not correct", path)
[docs] def uniform(data: object) -> float: """Return a uniform floating-point number between [0 and 1[. This can be very useful for reliably rejecting data based on a specified probability. """ hash_bytes = hashlib.md5(str(data).encode(encoding="utf-8"), usedforsecurity=True).digest() return float(f"0.{abs(int.from_bytes(hash_bytes))}")