Source code for mendevi.encode

"""Perform encoding measures."""

import contextlib
import datetime
import logging
import math
import pathlib
import shlex
import shutil
import sqlite3
import uuid

import cutcutcodec
import numpy as np
import orjson
from context_verbose import Printer
from flufl.lock import Lock

from mendevi.cmd import CmdFFMPEG
from mendevi.convert import filter_best_order
from mendevi.database.serialize import list_to_binary, tensor_to_binary
from mendevi.decode import available_decoders
from mendevi.encoder import (
    _encode_av1_nvenc,
    _encode_av1_vaapi,
    _encode_h264_nvenc,
    _encode_h264_vaapi,
    _encode_hevc_nvenc,
    _encode_hevc_vaapi,
    _encode_libaomav1,
    _encode_libopenh264,
    _encode_librav1e,
    _encode_libsvtav1,
    _encode_libvpx_vp9,
    _encode_libx264,
    _encode_libx265,
    _encode_vp9_vaapi,
    _encode_vvc,
)
from mendevi.utils import best_profile, compute_video_hash, cp_shm, hash_to_signature

ENCODERS_CMD = {
    "av1_nvenc": _encode_av1_nvenc,
    "av1_vaapi": _encode_av1_vaapi,
    "h264_nvenc": _encode_h264_nvenc,
    "h264_vaapi": _encode_h264_vaapi,
    "hevc_nvenc": _encode_hevc_nvenc,
    "hevc_vaapi": _encode_hevc_vaapi,
    "libaom-av1": _encode_libaomav1,
    "libopenh264": _encode_libopenh264,
    "librav1e": _encode_librav1e,
    "libsvtav1": _encode_libsvtav1,
    "libvpx-vp9": _encode_libvpx_vp9,
    "libx264": _encode_libx264,
    "libx265": _encode_libx265,
    "vp9_vaapi": _encode_vp9_vaapi,
    "vvc": _encode_vvc,
}


[docs] def encode(src: pathlib.Path, **kwargs: dict) -> tuple[pathlib.Path, str, str, dict[str]]: """Transcode an existing video. Parameters ---------- src : pathlib.Path The source video file to be transcoded. **kwargs : dict Transmitted to :py:func:`get_transcode_cmd`. Returns ------- dst : pathlib.Path The transcoded video path. The stem contains the md5 hash of the file content. cmd : str The ffmpeg command. log : str The output generated by the cmd. activity : dict[str] The computeur activity during the transcoding process. """ assert isinstance(src, pathlib.Path), src.__class__.__name__ with cp_shm(src, threshold=kwargs["ram"]) as src_ram: # copy input video into ram # find tempfile name dst = src_ram.parent / f"{uuid.uuid4().hex}.mp4" # get cmd cmd = get_transcode_cmd(src_ram, dst, **kwargs) if kwargs.get("callback") is None: user_cmd = None else: user_cmd = kwargs["callback"](cmd, **kwargs.copy()) # copy for user safe if isinstance(user_cmd, str): user_cmd = shlex.split(user_cmd) assert isinstance(user_cmd, list), user_cmd.__class__.__name__ # display prt_cmd = " ".join( map( shlex.quote, ( { str(src_ram): str(src_ram.with_name("src.mp4")), str(dst): str(dst.with_name("dst.mp4")), }.get(c, c) for c in user_cmd or cmd ), ), ) with Printer(prt_cmd, color="green") as prt: prt.print(f"input video: {src_ram}") # transcode log, activity = cmd.run(user_cmd) # print prt.print(f"avg cpu usage: {activity['ps_core_avg']:.1f} %") prt.print(f"avg ram usage: {1e-9*np.mean(activity['ps_ram_avg']):.2g} Go") if "rapl_power_avg" in activity: prt.print(f"avg rapl power: {activity['rapl_power_avg']:.2g} W") if "gpu_power_avg" in activity: prt.print(f"avg GPU power: {activity['gpu_power_avg']:.2g} W") if "wattmeter_power_avg" in activity: prt.print(f"avg wattmeter power: {activity['wattmeter_power_avg']:.2g} W") # compute file hash signature = hash_to_signature(compute_video_hash(dst, fast=False)) prt.print(f"output video: sample_{signature}.mp4") # move file final_dst = src.parent / f"sample_{signature}.mp4" if not final_dst.exists(): shutil.copy(dst, src.parent / f"sample_{signature}_partial.mp4") shutil.move(src.parent / f"sample_{signature}_partial.mp4", final_dst) final_dst.chmod(0o777) dst.unlink() return final_dst, prt_cmd, log, activity
[docs] def encode_and_store( database: pathlib.Path, env_id: int, src: pathlib.Path, **kwargs: dict, ) -> None: """Transcode a video file and store the result in the database. Parameters ---------- database : pathlike The path of the existing database to be updated. env_id : int The primary integer key of the environment. src : pathlib.Path The path of the video to be decoded. **kwargs Transmitted to :py:func:`encode`. """ # transcode the video dst, cmd, log, activity = encode(src, **kwargs) with ( Lock(str(database.with_name(".dblock")), lifetime=datetime.timedelta(seconds=600)), sqlite3.connect(database) as conn, ): cursor = conn.cursor() # fill video table with contextlib.suppress(sqlite3.IntegrityError): cursor.execute( "INSERT INTO t_vid_video (vid_id, vid_name) VALUES (?, ?)", (kwargs["src_vid_id"], src.name), ) dst_vid_id: bytes = compute_video_hash(dst) with contextlib.suppress(sqlite3.IntegrityError): cursor.execute( "INSERT INTO t_vid_video (vid_id, vid_name) VALUES (?, ?)", (dst_vid_id, dst.name), ) # fill activity table activity = { "act_duration": activity["duration"], "act_gpu_dt": list_to_binary(activity.get("gpu_dt", None)), "act_gpu_power": tensor_to_binary(activity.get("gpu_power", None)), "act_ps_core": tensor_to_binary(activity["ps_core"]), "act_ps_dt": list_to_binary(activity["ps_dt"]), "act_ps_temp": orjson.dumps( activity["ps_temp"], option=orjson.OPT_INDENT_2|orjson.OPT_SORT_KEYS, ), "act_ps_ram": list_to_binary(activity["ps_ram"]), "act_rapl_dt": list_to_binary(activity.get("rapl_dt", None)), "act_rapl_power": list_to_binary(activity.get("rapl_power", None)), "act_start": activity["start"], "act_wattmeter_dt": list_to_binary(activity.get("wattmeter_dt", None)), "act_wattmeter_power": list_to_binary(activity.get("wattmeter_power", None)), } keys = list(activity) (act_id,) = cursor.execute( ( f"INSERT INTO t_act_activity ({', '.join(keys)}) " f"VALUES ({', '.join('?'*len(keys))}) RETURNING act_id" ), [activity[k] for k in keys], ).fetchone() # fill encode table values = { "enc_act_id": act_id, "enc_cmd": cmd, "enc_dst_vid_id": dst_vid_id, "enc_effort": kwargs["effort"], "enc_encoder": kwargs["encoder"], "enc_env_id": env_id, "enc_fps": float(kwargs["fps"]), "enc_height": kwargs["resolution"][0], "enc_log": log, "enc_mode": kwargs["mode"], "enc_pix_fmt": kwargs["pix_fmt"], "enc_quality": kwargs["quality"], "enc_src_vid_id": kwargs["src_vid_id"], "enc_threads": kwargs["threads"], "enc_width": kwargs["resolution"][1], } keys = list(values) cursor.execute( f"INSERT INTO t_enc_encode ({', '.join(keys)}) VALUES ({', '.join('?'*len(keys))})", [values[k] for k in keys], )
[docs] def get_transcode_cmd(src: pathlib.Path, dst: pathlib.Path, **kwargs: dict) -> CmdFFMPEG: """Return the ffmpeg encode cmd.""" cmd = CmdFFMPEG(src, output=str(dst)) cmd.vid_filter = filter_best_order( src, additional_filter=kwargs["filter"], fps=kwargs["fps"], pix_fmt=kwargs["pix_fmt"], resolution=kwargs["resolution"], ) general, vid_filter, cmd.encode = ENCODERS_CMD[kwargs["encoder"]](**kwargs) cmd.general = cmd.general + general try: family, decoder = next(iter( available_decoders(cutcutcodec.get_codec_video(src), cutcutcodec.get_pix_fmt(src)), )) except StopIteration: logging.getLogger(__name__).info("no decoder found for %s", src) if family == "vaapi": cmd.general = [*cmd.general, "-hwaccel", "vaapi"] else: cmd.decode = decoder if vid_filter: cmd.vid_filter = (f"{cmd.vid_filter}," if cmd.vid_filter else "") + vid_filter return cmd
[docs] def quality_to_rate(kwargs: dict[str]) -> int: """Return the absolute target bitrate in kbit/s. Based on https://twitch-overlay.fr/quelle-connexion-internet-choisir-pour-streamer-sur-twitch/ and https://bitmovin.com/blog/video-bitrate-streaming-hls-dash/ You can plot the bitrate with: mendevi plot mendevi.db -x bitrate -y psnr -f 'mode = "vbr"' The flow margin is taken to be twice as small and twice as large as the recommendations. """ quality = kwargs["quality"] assert isinstance(quality, float), quality.__class__.__name__ assert 0.0 <= quality <= 1.0, quality match (profile := best_profile(*kwargs["resolution"])): case "sd": mini, maxi = 400, 2100 # kbit/s case "hd": mini, maxi = 1500, 6000 case "fhd": mini, maxi = 3000, 9000 case "uhd4k": mini, maxi = 10000, 51000 case _: msg = f"please define a bitrate rule for the profile {profile}" raise NotImplementedError(msg) mini, maxi = mini // 2, maxi * 2 # apply margin mini, maxi = math.log10(float(mini)), math.log10(float(maxi)) return round(10.0**(maxi-quality*(maxi-mini)))