#!/usr/bin/env python3
"""Perform decoding measures."""
import datetime
import os
import pathlib
import shlex
import sqlite3
import subprocess
from context_verbose import Printer
from flufl.lock import Lock
import numpy as np
from mendevi.database.serialize import list_to_binary, tensor_to_binary
from mendevi.utils import Activity, get_resolution
[docs]
def decode(src: pathlib.Path, **kwargs) -> tuple[str, dict[str]]:
"""Decode an existing video.
Parameters
----------
src : pathlib.Path
The source video file to be decoded.
**kwargs : dict
Transmitted to :py:func:`get_decode_cmd`.
Returns
-------
cmd : str
The ffmpeg command.
activity : dict[str]
The computeur activity during the decoding process.
"""
cmd = get_decode_cmd(src, kwargs.get("resolution", None))
prt_cmd = " ".join(map(shlex.quote, [{str(src): "src.mp4"}.get(c, c) for c in cmd]))
with Printer(prt_cmd, color="green") as prt:
prt.print(f"video: {src.name}")
with Activity() as activity:
subprocess.run(cmd, check=True, capture_output=True)
# print
prt.print(f"avg cpu usage: {activity['ps_core']:.1f} %")
prt.print(f"avg ram usage: {1e-9*np.mean(activity['ps_ram']):.2g} Go")
if "rapl_power" in activity:
prt.print(f"avg rapl power: {activity['rapl_power']:.2g} W")
if "wattmeter_power" in activity:
prt.print(f"avg wattmeter power: {activity['wattmeter_power']:.2g} W")
return prt_cmd, activity
[docs]
def decode_and_store(
database: pathlib.Path,
env_id: int,
src: pathlib.Path,
**kwargs,
):
"""Decode a video file and store the result in the database.
Parameters
----------
database : pathlike
The path of the existing database to be updated.
**kwargs
Transmitted to :py:func:`decode`.
"""
# decode the video
cmd, activity = decode(src, **kwargs)
with (
sqlite3.connect(database) as sql_database,
Lock(str(database.with_name(".dblock")), lifetime=datetime.timedelta(seconds=60)),
):
cursor = sql_database.cursor()
# fill video table
try:
cursor.execute(
"INSERT INTO t_vid_video (vid_id, vid_name) VALUES (?, ?)",
(kwargs["dec_vid_id"], src.name)
)
except sqlite3.IntegrityError:
pass
# fill activity table
activity = {
"act_duration": activity["duration"],
"act_ps_core": tensor_to_binary(activity["ps_cores"]),
"act_ps_dt": list_to_binary(activity["ps_dt"]),
"act_ps_ram": list_to_binary(activity["ps_ram"]),
"act_rapl_dt": list_to_binary(activity.get("rapl_dt", None)),
"act_rapl_power": list_to_binary(activity.get("rapl_powers", None)),
"act_start": activity["start"],
"act_wattmeter_dt": list_to_binary(activity.get("wattmeter_dt", None)),
"act_wattmeter_power": list_to_binary(activity.get("wattmeter_powers", None)),
}
keys = list(activity)
(act_id,) = cursor.execute(
(
f"INSERT INTO t_act_activity ({', '.join(keys)}) "
f"VALUES ({', '.join('?'*len(keys))}) RETURNING act_id"
),
[activity[k] for k in keys],
).fetchone()
# fill decode table
values = {
"dec_act_id": act_id,
"dec_cmd": cmd,
"dec_env_id": env_id,
"dec_height": kwargs.get("resolution", (None, None))[0],
"dec_pix_fmt": "rgb24",
"dec_vid_id": kwargs["dec_vid_id"],
"dec_width": kwargs.get("resolution", (None, None))[1],
}
keys = list(values)
cursor.execute(
f"INSERT INTO t_dec_decode ({', '.join(keys)}) VALUES ({', '.join('?'*len(keys))})",
[values[k] for k in keys]
)
[docs]
def get_decode_cmd(video: pathlib.Path, resolution: tuple[int, int] | None) -> list[str]:
"""Return the ffmpeg decode cmd.
Parameters
----------
video : pathlib.Path
The video to be decoded.
It is required to know the resolution in order to adapt the filter.
resolution : tuple[int, int], optional
The new (heigh, width) video shape.
Returns
-------
filter : str
The full ffmpeg decode bash command arguments.
Examples
--------
>>> import cutcutcodec
>>> from mendevi.decode import get_decode_cmd
>>> media = cutcutcodec.utils.get_project_root().parent / "media" / "video" / "intro.webm"
>>> get_decode_cmd(media, resolution=None) # doctest: +ELLIPSIS
['ffmpeg', '-i', '.../media/video/intro.webm', '-vf', 'format=rgb24', '-f', 'null', '/dev/null']
>>> get_decode_cmd(media, resolution=(480, 720))
[..., '-vf', 'scale=h=480:w=720:sws_flags=bicubic,format=rgb24', '-f', 'null', '/dev/null']
>>>
"""
if resolution is not None:
assert isinstance(resolution, tuple), resolution.__class__.__name__
assert len(resolution) == 2, resolution
assert isinstance(resolution[0], int) and isinstance(resolution[1], int), resolution
assert (resolution[0], resolution[1]) > (0, 0), resolution
filters: list[str] = []
if resolution is not None and get_resolution(video) != resolution:
filters = [f"scale=h={resolution[0]}:w={resolution[1]}:sws_flags=bicubic"]
filters.append("format=rgb24") # to match monitor pixel conversion
filters = ",".join(filters)
return ["ffmpeg", "-i", str(video), "-vf", filters, "-f", "null", os.devnull]