"""Allow to handle ffmpeg command line."""
import contextlib
import io
import pathlib
import re
import shlex
import subprocess
import typing
import cutcutcodec
import tqdm
from mendevi.measures import Activity
[docs]
class CmdFFMPEG:
"""Allow easy manipulation of a complete ffmpeg expression.
ffmpeg -y -hide_banner -loglevel verbose
Attributes
----------
general : list[str]
The options immediately after ffmpeg and immediately before the decoder (read and write).
decode : list[str]
The options between -c:v and -i (read and write).
video : pathlib.Path
The input video path (readonly).
vid_filter : str
The filter string after -vf (read and write).
encode : list[str]
The encoder name and options after -c:v (read and write).
output : list[str]
The final arguments of the ffmpeg cmd (read and write).
"""
def __init__(
self,
video: pathlib.Path | str,
**kwargs: dict[str],
) -> None:
"""Initialise the ffmpeg cmd.
Parameters
----------
video : pathlike
The input video path.
**kwargs: dict
See above.
general : list[str], str, optional
The options immediately after ffmpeg and immediately before the decoder.
decode : list[str], str, optional
The options between ``-c:v`` and ``-i``.
vid_filter : str, optional
The filter string after ``-vf``.
encode: list[str], str, optional
The encoder name and options after ``-c:v``.
output: list[str], str, default="-f null -"
The final arguments of the ffmpeg command, after the encoder description.
"""
# initialisation
video = pathlib.Path(video).expanduser()
self._video = video
self._general = self._decode = self._vid_filter = self._encode = self._output = None
# parse and check, using setter
self.general = kwargs.get("general")
self.decode = kwargs.get("decode")
self.vid_filter = kwargs.get("vid_filter", "")
self.encode = kwargs.get("encode")
self.output = kwargs.get("output")
[docs]
def copy(self) -> typing.Self:
"""Return an independant copy of self."""
return CmdFFMPEG(
video=self.video,
general=self.general,
decode=self.decode,
vid_filter=self.vid_filter,
encode=self.encode,
output=self.output,
)
@property
def decode(self) -> list[str]:
"""Return the options between -c:v and -i."""
return self._decode.copy()
@decode.setter
def decode(self, decode: list[str] | str | None) -> None:
"""Update the options between -c:v and -i."""
match decode:
case None:
self._decode = []
case str():
self._decode = shlex.split(decode)
case list():
assert all(isinstance(cmd, str) for cmd in decode), decode
self._decode = decode.copy()
case _:
msg = f"'decode' has to be None, str or list, not {decode.__class__.__name__}"
raise TypeError(msg)
@property
def encode(self) -> list[str]:
"""Return the encoder name and options after -c:v."""
return self._encode.copy()
@encode.setter
def encode(self, encode: list[str] | str | None) -> None:
"""Update the encoder name and options after -c:v."""
match encode:
case None:
self._encode = []
case str():
self._encode = shlex.split(encode)
case list():
assert all(isinstance(cmd, str) for cmd in encode), encode
self._encode = encode.copy()
case _:
msg = f"'encode' has to be None, str or list, not {encode.__class__.__name__}"
raise TypeError(msg)
@property
def general(self) -> list[str]:
"""Return the options immediately after ffmpeg and immediately before the decoder."""
if self._general is None:
return ["-hide_banner", "-y", "-loglevel", "verbose"]
return self._general.copy()
@general.setter
def general(self, general: list[str] | str | None) -> None:
"""Update the options immediately after ffmpeg and immediately before the decoder."""
match general:
case None:
self._general = None
case str():
self._general = shlex.split(general)
case list():
assert all(isinstance(cmd, str) for cmd in general), general
self._general = general.copy()
case _:
msg = f"'general' has to be None, str or list, not {general.__class__.__name__}"
raise TypeError(msg)
@property
def output(self) -> list[str]:
"""Return the final arguments of the ffmpeg command, after the encoder."""
return self._output.copy()
@output.setter
def output(self, output: list[str] | str | None) -> None:
"""Update the final arguments of the ffmpeg command, after the encoder."""
match output:
case None:
self._output = ["-f", "null", "-"]
case str():
self._output = shlex.split(output)
case list():
assert all(isinstance(cmd, str) for cmd in output), output
self._output = output.copy()
case _:
msg = f"'output' has to be None, str or list, not {output.__class__.__name__}"
raise TypeError(msg)
[docs]
def run(self, cmd: list[str] | str | None = None) -> tuple[str, dict[str]]:
"""Execute the command and return the stderr output.
If a cmd is provided, it is used instead of self.
"""
# verification
if cmd is None:
cmd = list(self)
elif isinstance(cmd, str):
cmd = shlex.split(cmd)
assert isinstance(cmd, list), cmd.__class__.__name__
# preparation of progress bar
total = None
if self.video.exists():
with contextlib.suppress(cutcutcodec.core.exceptions.MissingStreamError):
total = round(float(cutcutcodec.get_duration_video(self.video)), 2)
load = tqdm.tqdm(
dynamic_ncols=True,
leave=False,
smoothing=1e-8,
total=total,
unit="s",
)
# utilitaries
def read_lines(stream: io.BufferedReader) -> bytes:
r"""Yield each line (separated by \n or \r)."""
line: bytes = b""
while buff := stream.read(64):
*prev, line = re.split(br"\n|\r", line + buff)
yield from prev
if line:
yield line
# run command
output: bytes = b""
with Activity() as activity, subprocess.Popen(cmd, stderr=subprocess.PIPE) as process:
activity.start_slice("cmd") # more acurate than activity.get_full()
for line in read_lines(process.stderr):
if (
match := re.search(
br"time=(?P<h>\d+):(?P<m>\d{1,2}):(?P<s>\d{1,2}\.\d*)", line,
)
) is None:
output += line + b"\n"
else:
elapsed = round(
3600.0*float(match["h"]) + 60.0*float(match["m"]) + float(match["s"]), 2,
)
load.total = None if load.total is None else max(load.total, elapsed)
load.update(elapsed-load.n)
activity.stop_slice() # if ommited, implicitely called avec process.__exit__
load.close()
if process.returncode:
msg = f"failed to execute {cmd}:\n{output}"
raise RuntimeError(msg)
return output.decode("utf-8"), activity.get_slices()["cmd"]
@property
def vid_filter(self) -> str:
"""Return the filter string after -vf."""
return self._vid_filter
@vid_filter.setter
def vid_filter(self, vid_filter: str) -> None:
"""Update the filter string after -vf."""
assert isinstance(vid_filter, str), vid_filter.__class__.__name__
self._vid_filter = vid_filter
@property
def video(self) -> pathlib.Path:
"""Return the input video path."""
return self._video
def __iter__(self) -> str:
"""Iterate over each parameter to be compatible with list(self)."""
yield "ffmpeg"
yield from self.general
if self._decode:
yield "-c:v"
yield from self._decode
yield "-i"
yield str(self._video)
if self._vid_filter:
yield "-vf"
yield self._vid_filter
if self._encode:
yield "-c:v"
yield from self._encode
yield from self._output
def __str__(self) -> str:
"""Return the full shell cmd.
Examples
--------
>>> from mendevi.cmd import CmdFFMPEG
>>> print(CmdFFMPEG(video="src.mp4"))
ffmpeg -hide_banner -y -loglevel verbose -i src.mp4 -f null -
>>>
"""
return " ".join(map(shlex.quote, self))