"""Perform encoding measures."""
import contextlib
import datetime
import logging
import math
import pathlib
import shlex
import shutil
import sqlite3
import uuid
import cutcutcodec
import numpy as np
import orjson
from context_verbose import Printer
from flufl.lock import Lock
from mendevi.cmd import CmdFFMPEG
from mendevi.convert import filter_best_order
from mendevi.database.serialize import list_to_binary, tensor_to_binary
from mendevi.decode import available_decoders
from mendevi.encoder import (
_encode_av1_nvenc,
_encode_av1_vaapi,
_encode_h264_nvenc,
_encode_h264_vaapi,
_encode_hevc_nvenc,
_encode_hevc_vaapi,
_encode_libaomav1,
_encode_libopenh264,
_encode_librav1e,
_encode_libsvtav1,
_encode_libvpx_vp9,
_encode_libx264,
_encode_libx265,
_encode_vp9_vaapi,
_encode_vvc,
)
from mendevi.utils import best_profile, compute_video_hash, cp_shm, hash_to_signature
ENCODERS_CMD = {
"av1_nvenc": _encode_av1_nvenc,
"av1_vaapi": _encode_av1_vaapi,
"h264_nvenc": _encode_h264_nvenc,
"h264_vaapi": _encode_h264_vaapi,
"hevc_nvenc": _encode_hevc_nvenc,
"hevc_vaapi": _encode_hevc_vaapi,
"libaom-av1": _encode_libaomav1,
"libopenh264": _encode_libopenh264,
"librav1e": _encode_librav1e,
"libsvtav1": _encode_libsvtav1,
"libvpx-vp9": _encode_libvpx_vp9,
"libx264": _encode_libx264,
"libx265": _encode_libx265,
"vp9_vaapi": _encode_vp9_vaapi,
"vvc": _encode_vvc,
}
[docs]
def encode(src: pathlib.Path, **kwargs: dict) -> tuple[pathlib.Path, str, str, dict[str]]:
"""Transcode an existing video.
Parameters
----------
src : pathlib.Path
The source video file to be transcoded.
**kwargs : dict
Transmitted to :py:func:`get_transcode_cmd`.
Returns
-------
dst : pathlib.Path
The transcoded video path. The stem contains the md5 hash of the file content.
cmd : str
The ffmpeg command.
log : str
The output generated by the cmd.
activity : dict[str]
The computeur activity during the transcoding process.
"""
assert isinstance(src, pathlib.Path), src.__class__.__name__
with cp_shm(src, threshold=kwargs["ram"]) as src_ram: # copy input video into ram
# find tempfile name
dst = src_ram.parent / f"{uuid.uuid4().hex}.mp4"
# get cmd
cmd = get_transcode_cmd(src_ram, dst, **kwargs)
if kwargs.get("callback") is None:
user_cmd = None
else:
user_cmd = kwargs["callback"](cmd, **kwargs.copy()) # copy for user safe
if isinstance(user_cmd, str):
user_cmd = shlex.split(user_cmd)
assert isinstance(user_cmd, list), user_cmd.__class__.__name__
# display
prt_cmd = " ".join(
map(
shlex.quote,
(
{
str(src_ram): str(src_ram.with_name("src.mp4")),
str(dst): str(dst.with_name("dst.mp4")),
}.get(c, c)
for c in user_cmd or cmd
),
),
)
with Printer(prt_cmd, color="green") as prt:
prt.print(f"input video: {src_ram}")
# transcode
log, activity = cmd.run(user_cmd)
# print
prt.print(f"avg cpu usage: {activity['ps_core_avg']:.1f} %")
prt.print(f"avg ram usage: {1e-9*np.mean(activity['ps_ram_avg']):.2g} Go")
if "rapl_power_avg" in activity:
prt.print(f"avg rapl power: {activity['rapl_power_avg']:.2g} W")
if "gpu_power_avg" in activity:
prt.print(f"avg GPU power: {activity['gpu_power_avg']:.2g} W")
if "wattmeter_power_avg" in activity:
prt.print(f"avg wattmeter power: {activity['wattmeter_power_avg']:.2g} W")
# compute file hash
signature = hash_to_signature(compute_video_hash(dst, fast=False))
prt.print(f"output video: sample_{signature}.mp4")
# move file
final_dst = src.parent / f"sample_{signature}.mp4"
if not final_dst.exists():
shutil.copy(dst, src.parent / f"sample_{signature}_partial.mp4")
shutil.move(src.parent / f"sample_{signature}_partial.mp4", final_dst)
final_dst.chmod(0o777)
dst.unlink()
return final_dst, prt_cmd, log, activity
[docs]
def encode_and_store(
database: pathlib.Path,
env_id: int,
src: pathlib.Path,
**kwargs: dict,
) -> None:
"""Transcode a video file and store the result in the database.
Parameters
----------
database : pathlike
The path of the existing database to be updated.
env_id : int
The primary integer key of the environment.
src : pathlib.Path
The path of the video to be decoded.
**kwargs
Transmitted to :py:func:`encode`.
"""
# transcode the video
dst, cmd, log, activity = encode(src, **kwargs)
with (
Lock(str(database.with_name(".dblock")), lifetime=datetime.timedelta(seconds=600)),
sqlite3.connect(database) as conn,
):
cursor = conn.cursor()
# fill video table
with contextlib.suppress(sqlite3.IntegrityError):
cursor.execute(
"INSERT INTO t_vid_video (vid_id, vid_name) VALUES (?, ?)",
(kwargs["src_vid_id"], src.name),
)
dst_vid_id: bytes = compute_video_hash(dst)
with contextlib.suppress(sqlite3.IntegrityError):
cursor.execute(
"INSERT INTO t_vid_video (vid_id, vid_name) VALUES (?, ?)",
(dst_vid_id, dst.name),
)
# fill activity table
activity = {
"act_duration": activity["duration"],
"act_gpu_dt": list_to_binary(activity.get("gpu_dt", None)),
"act_gpu_power": tensor_to_binary(activity.get("gpu_power", None)),
"act_ps_core": tensor_to_binary(activity["ps_core"]),
"act_ps_dt": list_to_binary(activity["ps_dt"]),
"act_ps_temp": orjson.dumps(
activity["ps_temp"], option=orjson.OPT_INDENT_2|orjson.OPT_SORT_KEYS,
),
"act_ps_ram": list_to_binary(activity["ps_ram"]),
"act_rapl_dt": list_to_binary(activity.get("rapl_dt", None)),
"act_rapl_power": list_to_binary(activity.get("rapl_power", None)),
"act_start": activity["start"],
"act_wattmeter_dt": list_to_binary(activity.get("wattmeter_dt", None)),
"act_wattmeter_power": list_to_binary(activity.get("wattmeter_power", None)),
}
keys = list(activity)
(act_id,) = cursor.execute(
(
f"INSERT INTO t_act_activity ({', '.join(keys)}) "
f"VALUES ({', '.join('?'*len(keys))}) RETURNING act_id"
),
[activity[k] for k in keys],
).fetchone()
# fill encode table
values = {
"enc_act_id": act_id,
"enc_cmd": cmd,
"enc_dst_vid_id": dst_vid_id,
"enc_effort": kwargs["effort"],
"enc_encoder": kwargs["encoder"],
"enc_env_id": env_id,
"enc_fps": float(kwargs["fps"]),
"enc_height": kwargs["resolution"][0],
"enc_log": log,
"enc_mode": kwargs["mode"],
"enc_pix_fmt": kwargs["pix_fmt"],
"enc_quality": kwargs["quality"],
"enc_src_vid_id": kwargs["src_vid_id"],
"enc_threads": kwargs["threads"],
"enc_width": kwargs["resolution"][1],
}
keys = list(values)
cursor.execute(
f"INSERT INTO t_enc_encode ({', '.join(keys)}) VALUES ({', '.join('?'*len(keys))})",
[values[k] for k in keys],
)
[docs]
def get_transcode_cmd(src: pathlib.Path, dst: pathlib.Path, **kwargs: dict) -> CmdFFMPEG:
"""Return the ffmpeg encode cmd."""
cmd = CmdFFMPEG(src, output=str(dst))
cmd.vid_filter = filter_best_order(
src,
additional_filter=kwargs["filter"],
fps=kwargs["fps"],
pix_fmt=kwargs["pix_fmt"],
resolution=kwargs["resolution"],
)
general, vid_filter, cmd.encode = ENCODERS_CMD[kwargs["encoder"]](**kwargs)
cmd.general = cmd.general + general
try:
family, decoder = next(iter(
available_decoders(cutcutcodec.get_codec_video(src), cutcutcodec.get_pix_fmt(src)),
))
except StopIteration:
logging.getLogger(__name__).info("no decoder found for %s", src)
if family == "vaapi":
cmd.general = [*cmd.general, "-hwaccel", "vaapi"]
else:
cmd.decode = decoder
if vid_filter:
cmd.vid_filter = (f"{cmd.vid_filter}," if cmd.vid_filter else "") + vid_filter
return cmd
[docs]
def quality_to_rate(kwargs: dict[str]) -> int:
"""Return the absolute target bitrate in kbit/s.
Based on https://twitch-overlay.fr/quelle-connexion-internet-choisir-pour-streamer-sur-twitch/
and https://bitmovin.com/blog/video-bitrate-streaming-hls-dash/
You can plot the bitrate with: mendevi plot mendevi.db -x bitrate -y psnr -f 'mode = "vbr"'
The flow margin is taken to be twice as small and twice as large as the recommendations.
"""
quality = kwargs["quality"]
assert isinstance(quality, float), quality.__class__.__name__
assert 0.0 <= quality <= 1.0, quality
match (profile := best_profile(*kwargs["resolution"])):
case "sd":
mini, maxi = 400, 2100 # kbit/s
case "hd":
mini, maxi = 1500, 6000
case "fhd":
mini, maxi = 3000, 9000
case "uhd4k":
mini, maxi = 10000, 51000
case _:
msg = f"please define a bitrate rule for the profile {profile}"
raise NotImplementedError(msg)
mini, maxi = mini // 2, maxi * 2 # apply margin
mini, maxi = math.log10(float(mini)), math.log10(float(maxi))
return round(10.0**(maxi-quality*(maxi-mini)))