Source code for mendevi.decode

"""Perform decoding measures."""

import contextlib
import datetime
import functools
import logging
import pathlib
import re
import shlex
import sqlite3

import cutcutcodec
import numpy as np
import orjson
from context_verbose import Printer
from flufl.lock import Lock

from mendevi.cmd import CmdFFMPEG
from mendevi.convert import filter_best_order
from mendevi.database.serialize import list_to_binary, tensor_to_binary
from mendevi.utils import cp_shm


[docs] def decode(vid: pathlib.Path, **kwargs: dict) -> tuple[str, str | None, str, dict[str]]: """Decode an existing video. Parameters ---------- vid : pathlib.Path The source video file to be decoded. **kwargs : dict Transmitted to :py:func:`get_decode_cmd`. Returns ------- cmd : str The ffmpeg command. activity : dict[str] The computeur activity during the decoding process. """ assert isinstance(vid, pathlib.Path), vid.__class__.__name__ with cp_shm(vid, threshold=kwargs["ram"]) as vid_ram: # copy input video into ram # get cmd cmd = get_decode_cmd( vid_ram, kwargs.get("filter"), kwargs.get("resolution"), kwargs.get("family"), ) if kwargs.get("callback") is None: user_cmd = None else: user_cmd = kwargs["callback"](cmd, **kwargs.copy()) # copy for user safe if isinstance(user_cmd, str): user_cmd = shlex.split(user_cmd) assert isinstance(user_cmd, list), user_cmd.__class__.__name__ # display prt_cmd = " ".join(map( shlex.quote, ({str(vid_ram): str(vid_ram.with_name("vid.mp4"))}.get(c, c) for c in user_cmd or cmd), )) with Printer(prt_cmd, color="green") as prt: prt.print(f"input video: {vid_ram}") # decode log, activity = cmd.run(user_cmd) # print prt.print(f"avg cpu usage: {activity['ps_core_avg']:.1f} %") prt.print(f"avg ram usage: {1e-9*np.mean(activity['ps_ram_avg']):.2g} Go") if "rapl_power_avg" in activity: prt.print(f"avg rapl power: {activity['rapl_power_avg']:.2g} W") if "gpu_power_avg" in activity: prt.print(f"avg GPU power: {activity['gpu_power_avg']:.2g} W") if "wattmeter_power_avg" in activity: prt.print(f"avg wattmeter power: {activity['wattmeter_power_avg']:.2g} W") decoder = [*cmd.decode, None][0] return prt_cmd, decoder, log, activity
[docs] def decode_and_store( database: pathlib.Path, env_id: int, vid: pathlib.Path, **kwargs: dict, ) -> None: """Decode a video file and store the result in the database. Parameters ---------- database : pathlike The path of the existing database to be updated. env_id : int The primary integer key of the environment. vid : pathlib.Path The path of the video to be encoded. **kwargs Transmitted to :py:func:`decode`. """ # decode the video cmd, decoder, log, activity = decode(vid, **kwargs) with ( Lock(str(database.with_name(".dblock")), lifetime=datetime.timedelta(seconds=600)), sqlite3.connect(database) as conn, ): cursor = conn.cursor() # fill video table with contextlib.suppress(sqlite3.IntegrityError): cursor.execute( "INSERT INTO t_vid_video (vid_id, vid_name) VALUES (?, ?)", (kwargs["dec_vid_id"], vid.name), ) # fill activity table activity = { "act_duration": activity["duration"], "act_gpu_dt": list_to_binary(activity.get("gpu_dt", None)), "act_gpu_power": tensor_to_binary(activity.get("gpu_power", None)), "act_ps_core": tensor_to_binary(activity["ps_core"]), "act_ps_dt": list_to_binary(activity["ps_dt"]), "act_ps_temp": orjson.dumps( activity["ps_temp"], option=orjson.OPT_INDENT_2|orjson.OPT_SORT_KEYS, ), "act_ps_ram": list_to_binary(activity["ps_ram"]), "act_rapl_dt": list_to_binary(activity.get("rapl_dt", None)), "act_rapl_power": list_to_binary(activity.get("rapl_power", None)), "act_start": activity["start"], "act_wattmeter_dt": list_to_binary(activity.get("wattmeter_dt", None)), "act_wattmeter_power": list_to_binary(activity.get("wattmeter_power", None)), } keys = list(activity) (act_id,) = cursor.execute( ( f"INSERT INTO t_act_activity ({', '.join(keys)}) " f"VALUES ({', '.join('?'*len(keys))}) RETURNING act_id" ), [activity[k] for k in keys], ).fetchone() # fill decode table values = { "dec_act_id": act_id, "dec_cmd": cmd, "dec_decoder": decoder, "dec_env_id": env_id, "dec_height": kwargs.get("resolution", (None, None))[0], "dec_log": log, "dec_pix_fmt": "rgb24", "dec_vid_id": kwargs["dec_vid_id"], "dec_width": kwargs.get("resolution", (None, None))[1], } keys = list(values) cursor.execute( f"INSERT INTO t_dec_decode ({', '.join(keys)}) VALUES ({', '.join('?'*len(keys))})", [values[k] for k in keys], )
[docs] def get_decode_cmd( video: pathlib.Path, additional_filter: str, resolution: tuple[int, int] | None = None, family: str | None = None, ) -> CmdFFMPEG: """Return the ffmpeg decode cmd. Parameters ---------- video : pathlib.Path The video to be decoded. It is required to know the resolution in order to adapt the filter. additional_filter : str The additional video filter, (can be an empty string). resolution : tuple[int, int], optional The new (heigh, width) video shape. family : str, optional If provided, force to use a specific decoder type. Returns ------- filter : str The full ffmpeg decode bash command arguments. Examples -------- >>> import cutcutcodec >>> from mendevi.decode import get_decode_cmd >>> video = cutcutcodec.utils.get_project_root() / "media" / "video" / "intro.webm" >>> print(get_decode_cmd(video, additional_filter="", resolution=None)) # doctest: +ELLIPSIS ffmpeg -hide_banner -y -loglevel verbose -threads 1 -i /...intro.webm -vf format=rgb24 -f null - >>> print(get_decode_cmd(video, additional_filter="", resolution=(480, 720))) ffmpeg ... -i ...intro.webm -vf scale=h=480:w=720:sws_flags=bicubic,format=rgb24 -f null - >>> """ cmd = CmdFFMPEG(video) cmd.general = [*cmd.general, "-threads", "1"] cmd.vid_filter = filter_best_order( video, additional_filter=additional_filter, fps=None, pix_fmt="rgb24", resolution=resolution, ) if family is not None: for family_, decoder in available_decoders( cutcutcodec.get_codec_video(video), cutcutcodec.get_pix_fmt(video), ): if family == family_: if family == "vaapi": cmd.general = [*cmd.general, "-hwaccel", "vaapi"] else: cmd.decode = decoder break else: msg = f"impossible to find a {family} decoder for {video}" raise RuntimeError(msg) return cmd
[docs] @functools.cache def create_video_sample(codec: str, pix_fmt: str) -> pathlib.Path: """Generate a small sample with the given codec and pixel format.""" sample = pathlib.Path("/dev/shm") / f"{codec}_{pix_fmt}.mp4" if sample.exists(): return sample # cmd = CmdFFMPEG( # error in libvpx-vp9 v1.16.0, ok in v1.15.2 # video="/dev/urandom", # general=[ # "-y", # "-f", "rawvideo", # "-s", "256x256", # "-pix_fmt", pix_fmt, # "-r", "1", # "-to", "2", # ], # output=str(sample), # ) cmd = CmdFFMPEG( video=f"nullsrc=size=256x256:rate=1,format={pix_fmt}", general=[ "-y", "-f", "lavfi", "-r", "1", "-to", "2", ], output=str(sample), ) if ( encode := { "av1": "libsvtav1", "h264": "libx264", "hevc": "libx265", "vp9": "libvpx-vp9", "vvc": "vvc", }.get(codec) ) is None: msg = f"create a sample with the codec {codec} is not yet supported" raise NotImplementedError(msg) cmd.encode = encode cmd.run() # create the test file (never fail) return sample
[docs] @functools.cache def try_decode_sample_encoder(sample: pathlib.Path, decoder: str) -> bool: """Return True if ffmpeg can decode that video.""" cmd = ( CmdFFMPEG(general=["-loglevel", "verbose", "-hwaccel", "vaapi"], video=sample) if decoder == "vaapi" else CmdFFMPEG(decode=decoder, video=sample) ) try: out, _ = cmd.run() except RuntimeError: return False if decoder == "vaapi": if not (pix_fmts := set(re.findall(r"pix_?fmt\s?:\s?(\w+)", out))): msg = ( "failed to detect the pixel format used by the decoder, " "maybe the ffmpeg api has changed, please adapt the regex" ) raise ValueError(msg) return any(p in pix_fmts for p in ("vaapi", "nv12", "p010")) return True
[docs] def available_decoders(codec: str, pix_fmt: str) -> tuple[str, str | None]: """Yield the ffmpeg encoder to decode the video from an accelerated device.""" sample = create_video_sample(codec, pix_fmt) # general: -hwaccel cuda -hwaccel_output_format cuda match codec: case "av1": candidates = [ ("cuvid", "av1_cuvid"), ("vaapi", "vaapi"), ("cpu", "libdav1d"), ("cpu", "libaom-av1"), ] case "h264": candidates = [ ("cuvid", "h264_cuvid"), ("vaapi", "vaapi"), ("cpu", "h264"), ("cpu", "libopenh264"), ] case "hevc": candidates = [("cuvid", "hevc_cuvid"), ("vaapi", "vaapi"), ("cpu", "hevc")] case "vvc": candidates = [("cuvid", "vvc_cuvid"), ("vaapi", "vaapi"), ("cpu", "vvc")] case "vp9": candidates = [ ("cuvid", "vp9_cuvid"), ("vaapi", "vaapi"), ("cpu", "libvpx-vp9"), ("cpu", "vp9"), ] case _: logging.getLogger(__name__).info("no decoder tested for the codec %s", codec) candidates = [] for family, decoder in candidates: if try_decode_sample_encoder(sample, decoder): yield family, decoder