Source code for mendevi.encode

#!/usr/bin/env python3

"""Perform encoding measures."""

import datetime
import hashlib
import math
import pathlib
import re
import shlex
import shutil
import sqlite3
import subprocess
import tempfile

from context_verbose import Printer
from flufl.lock import Lock
import cutcutcodec
import numpy as np
import tqdm

from mendevi.convert import get_convert_cmd
from mendevi.database.serialize import list_to_binary, tensor_to_binary
from mendevi.measures import Activity
from mendevi.utils import best_profile, compute_video_hash, hash_to_signature


ENCODERS = {"libx264", "libx265", "libvpx-vp9", "libsvtav1", "vvc"}


[docs] def encode(src: pathlib.Path, **kwargs) -> tuple[pathlib.Path, str, dict[str]]: """Transcode an existing video. Parameters ---------- src : pathlib.Path The source video file to be transcoded. **kwargs : dict Transmitted to :py:func:`get_transcode_cmd`. Returns ------- dst : pathlib.Path The transcoded video path. The stem contains the md5 hash of the file content. cmd : str The ffmpeg command. activity : dict[str] The computeur activity during the transcoding process. """ assert isinstance(src, pathlib.Path), src.__class__.__name__ assert src.is_file(), src # find tempfile name signature = hashlib.md5( bytes(src) + " ".join(str(kwargs[k]) for k in sorted(kwargs)).encode("utf-8") ).hexdigest() dst = pathlib.Path(tempfile.gettempdir()) / f"{signature}.mp4" # get cmd cmd = get_transcode_cmd(src, dst, **kwargs) # display prt_cmd = " ".join( map(shlex.quote, [{str(src): "src.mp4", str(dst): "dst.mp4"}.get(c, c) for c in cmd]) ) with Printer(prt_cmd, color="green") as prt: prt.print(f"input video: {src.name}") load = tqdm.tqdm( dynamic_ncols=True, leave=False, smoothing=1e-6, total=round(float(cutcutcodec.get_duration_video(src)), 2), unit="s", ) # transcode with Activity() as activity, subprocess.Popen(cmd, stderr=subprocess.PIPE) as process: signature = b"" is_finish = False while not is_finish: while ( match := re.search( br"time=(?P<h>\d+):(?P<m>\d{1,2}):(?P<s>\d{1,2}\.\d*)", signature ) ) is None: if not (buff := process.stderr.read(32)): is_finish = True break signature += buff else: signature = signature[match.endpos:] elapsed = round( 3600.0*float(match["h"]) + 60.0*float(match["m"]) + float(match["s"]), 2, ) load.total = max(load.total, elapsed) load.update(elapsed-load.n) load.close() if process.returncode or not dst.stat().st_size: dst.unlink(missing_ok=True) raise RuntimeError(f"failed to execute {cmd}") # print prt.print(f"avg cpu usage: {activity['ps_core']:.1f} %") prt.print(f"avg ram usage: {1e-9*np.mean(activity['ps_ram']):.2g} Go") if "rapl_power" in activity: prt.print(f"avg rapl power: {activity['rapl_power']:.2g} W") if "wattmeter_power" in activity: prt.print(f"avg wattmeter power: {activity['wattmeter_power']:.2g} W") # compute file hash signature = hash_to_signature(compute_video_hash(dst)) prt.print(f"output video: sample_{signature}.mp4") # move file final_dst = src.parent / f"sample_{signature}.mp4" if not final_dst.exists(): shutil.copy(dst, src.parent / f"sample_{signature}_partial.mp4") shutil.move(src.parent / f"sample_{signature}_partial.mp4", final_dst) final_dst.chmod(0o777) dst.unlink() return final_dst, prt_cmd, activity
[docs] def encode_and_store( database: pathlib.Path, env_id: int, src: pathlib.Path, **kwargs, ): """Transcode a video file and store the result in the database. Parameters ---------- database : pathlike The path of the existing database to be updated. env_id : int The primary integer key of the environment. src : pathlib.Path The path of the video to be decoded. **kwargs Transmitted to :py:func:`encode`. Examples -------- >>> import pathlib, tempfile >>> from mendevi.database.complete import add_environment >>> from mendevi.database.create import create_database >>> from mendevi.encode import encode_and_store >>> src = pathlib.Path("/data/dataset/video/despacito.mp4") >>> create_database(database := pathlib.Path(tempfile.mktemp(suffix=".sqlite"))) >>> env_id = add_environment(database) >>> encode_and_store( ... database, env_id, src, ... encoder="libx264", profile="sd", effort="fast", quality=0.5, threads=8 ... ) >>> database.unlink() >>> """ # transcode the video dst, cmd, activity = encode(src, **kwargs) with ( sqlite3.connect(database) as sql_database, Lock(str(database.with_name(".dblock")), lifetime=datetime.timedelta(seconds=600)), ): cursor = sql_database.cursor() # fill video table try: cursor.execute( "INSERT INTO t_vid_video (vid_id, vid_name) VALUES (?, ?)", (kwargs["src_vid_id"], src.name) ) except sqlite3.IntegrityError: pass dst_vid_id: bytes = compute_video_hash(dst) try: cursor.execute( "INSERT INTO t_vid_video (vid_id, vid_name) VALUES (?, ?)", (dst_vid_id, dst.name) ) except sqlite3.IntegrityError: pass # fill activity table activity = { "act_duration": activity["duration"], "act_ps_core": tensor_to_binary(activity["ps_cores"]), "act_ps_dt": list_to_binary(activity["ps_dt"]), "act_ps_ram": list_to_binary(activity["ps_ram"]), "act_rapl_dt": list_to_binary(activity.get("rapl_dt", None)), "act_rapl_power": list_to_binary(activity.get("rapl_powers", None)), "act_start": activity["start"], "act_wattmeter_dt": list_to_binary(activity.get("wattmeter_dt", None)), "act_wattmeter_power": list_to_binary(activity.get("wattmeter_powers", None)), } keys = list(activity) (act_id,) = cursor.execute( ( f"INSERT INTO t_act_activity ({', '.join(keys)}) " f"VALUES ({', '.join('?'*len(keys))}) RETURNING act_id" ), [activity[k] for k in keys], ).fetchone() # fill encode table values = { "enc_act_id": act_id, "enc_cmd": cmd, "enc_dst_vid_id": dst_vid_id, "enc_effort": kwargs["effort"], "enc_encoder": kwargs["encoder"], "enc_env_id": env_id, "enc_fps": float(kwargs["fps"]), "enc_height": kwargs["resolution"][0], "enc_pix_fmt": kwargs["pix_fmt"], "enc_quality": kwargs["quality"], "enc_src_vid_id": kwargs["src_vid_id"], "enc_threads": kwargs["threads"], "enc_vbr": int(kwargs["vbr"]), "enc_width": kwargs["resolution"][1], } keys = list(values) cursor.execute( f"INSERT INTO t_enc_encode ({', '.join(keys)}) VALUES ({', '.join('?'*len(keys))})", [values[k] for k in keys] )
[docs] def get_transcode_cmd(src: pathlib.Path, dst: pathlib.Path, **kwargs) -> list[str]: """Return the ffmpeg encode cmd.""" def libsvtav1_lp(threads: int) -> int: """Convert threads in parralel level.""" # https://gitlab.com/AOMediaCodec/SVT-AV1/-/blob/master/Source/Lib/Globals/enc_handle.c#L598 # lp=1 -> threads=1 # lp=2 -> threads=2 # lp=3 -> threads=8 # lp=4 -> threads=12 # lp=5 -> threads=16 # lp=6 -> threads=20 return { # threads to lp 1: 1, 2: 2, 3: 2, 4: 2, 5: 2, 6: 3, 7: 3, 8: 3, 9: 3, 10: 3, 11: 4, 12: 4, 13: 4, 14: 4, 15: 5, 16: 5, 17: 5, }.get(threads, 6) # header cmd: list[str] = ["ffmpeg", "-y", "-i", str(src)] # filter if ( filter_cmd := get_convert_cmd( src, additional_filter=kwargs["filter"], fps=kwargs["fps"], pix_fmt=kwargs["pix_fmt"], resolution=kwargs["resolution"], ) ): cmd.extend(["-vf", filter_cmd]) # transcode cmd.append("-c:v") match kwargs["encoder"]: case "libx264": if kwargs["vbr"]: quality = ["-crf", str(round(kwargs["quality"]*51.0, 1))] else: rate = f"{quality_to_rate(kwargs)}k" quality = [ # https://trac.ffmpeg.org/wiki/Encode/H.264#CBRConstantBitRate "-b:v", rate, "-minrate", rate, "-maxrate", rate, "-bufsize", rate, "-x264-params", "nal-hrd=cbr", ] cmd.extend([ # https://ffmpeg.party/x264/ "libx264", *quality, "-preset", kwargs["effort"], "-tune", "ssim", "-threads", str(kwargs["threads"]), "-thread_type", "frame", ]) case "libx265": if kwargs["vbr"]: cmd.extend([ # https://x265.readthedocs.io/en/master/cli.html "libx265", "-crf", str(round(kwargs["quality"]*51.0, 1)), "-preset", kwargs["effort"], "-tune", "ssim", "-x265-params", ( f"frame-threads={kwargs['threads']}:" f"pools={kwargs['threads']}:" f"wpp={1 if kwargs['threads'] != 1 else 0}" ), ]) else: rate = quality_to_rate(kwargs) cmd.extend([ # https://x265.readthedocs.io/en/master/cli.html "libx265", "-b:v", f"{rate}k", "-preset", kwargs["effort"], "-tune", "ssim", "-x265-params", ( f"vbv-maxrate={rate}:vbv-bufsize={rate}:" f"frame-threads={kwargs['threads']}:" f"pools={kwargs['threads']}:" f"wpp={1 if kwargs['threads'] != 1 else 0}" ), ]) case "libvpx-vp9": # https://trac.ffmpeg.org/wiki/Encode/VP9 # https://wiki.webmproject.org/ffmpeg/vp9-encoding-guide # https://developers.google.com/media/vp9/settings if kwargs["vbr"]: quality = ["-crf", str(round(kwargs["quality"]*63.0)), "-b:v", "0"] else: rate = f"{quality_to_rate(kwargs)}k" quality = ["-b:v", rate, "-minrate", rate, "-maxrate", rate, "-lag-in-frames", "0"] cmd.extend([ "libvpx-vp9", *quality, # in [-16, 16] "-speed", {"slow": "-2", "medium": "1", "fast": "8"}[kwargs["effort"]], "-tune", "ssim", "-row-mt", "0", "-threads", str(kwargs["threads"]), ]) case "libsvtav1": if kwargs["vbr"]: cmd.extend([ "libsvtav1", "-crf", str(round(kwargs["quality"]*63.0)), "-preset", {"slow": "4", "medium": "6", "fast": "8"}[kwargs["effort"]], # "-tune", "ssim", # not same result as -svtav1-params tune=2 "-svtav1-params", f"film-grain=0:lp={libsvtav1_lp(kwargs['threads'])}:tune=2", ]) else: rate = f"{min(100_000, quality_to_rate(kwargs))}k" cmd.extend([ "libsvtav1", "-b:v", rate, "-minrate", rate, "-bufsize", rate, "-preset", {"slow": "4", "medium": "6", "fast": "8"}[kwargs["effort"]], "-tune", "ssim", # -svtav1-params tune=2 not supported in cbr "-svtav1-params", f"rc=1:film-grain=0:lp={libsvtav1_lp(kwargs['threads'])}", ]) case "vvc": # https://github.com/fraunhoferhhi/vvenc/wiki/FFmpeg-Integration if kwargs["vbr"]: quality = ["-qp", str(round(kwargs["quality"]*63.0))] else: rate = quality_to_rate(kwargs) quality = ["-b:v", f"{rate}k", "-maxrate", f"{round(1.5*rate)+1}k"] bit = int(re.search(r"(?P<bit>\d+)le", kwargs["pix_fmt"] + "8le")["bit"]) cmd.extend([ "vvc", *quality, "-preset", kwargs["effort"], "-qpa", "1", "-vvenc-params", f"internalbitdepth={bit}", "-threads", str(kwargs['threads']), ]) # final cmd.append(str(dst)) return cmd
[docs] def quality_to_rate(kwargs: dict[str]) -> int: """Return the absolute target bitrate in kbit/s. Based on https://twitch-overlay.fr/quelle-connexion-internet-choisir-pour-streamer-sur-twitch/ and https://bitmovin.com/blog/video-bitrate-streaming-hls-dash/ You can plot the bitrate with: mendevi plot mendevi.db -x bitrate -y psnr -f 'mode = "vbr"' """ quality = kwargs["quality"] assert isinstance(quality, float), quality.__class__.__name__ assert 0.0 <= quality <= 1.0, quality match (profile := best_profile(*kwargs["resolution"])): case "sd": mini, maxi = 400, 2100 case "hd": mini, maxi = 1500, 6000 case "fhd": mini, maxi = 3000, 9000 case "uhd4k": mini, maxi = 10000, 51000 case _: raise NotImplementedError(f"please define a bitrate rule for the profile {profile}") mini, maxi = math.log10(float(mini)), math.log10(float(maxi)) return round(10.0**(maxi-quality*(maxi-mini)))