Source code for mendevi.cmd

"""Allow to handle ffmpeg command line."""

import contextlib
import io
import pathlib
import re
import shlex
import subprocess
import typing

import cutcutcodec
import tqdm

from mendevi.measures import Activity


[docs] class CmdFFMPEG: """Allow easy manipulation of a complete ffmpeg expression. ffmpeg -y -hide_banner -loglevel verbose Attributes ---------- general : list[str] The options immediately after ffmpeg and immediately before the decoder (read and write). decode : list[str] The options between -c:v and -i (read and write). video : pathlib.Path The input video path (readonly). vid_filter : str The filter string after -vf (read and write). encode : list[str] The encoder name and options after -c:v (read and write). output : list[str] The final arguments of the ffmpeg cmd (read and write). """ def __init__( self, video: pathlib.Path | str, **kwargs: dict[str], ) -> None: """Initialise the ffmpeg cmd. Parameters ---------- video : pathlike The input video path. **kwargs: dict See above. general : list[str], str, optional The options immediately after ffmpeg and immediately before the decoder. decode : list[str], str, optional The options between ``-c:v`` and ``-i``. vid_filter : str, optional The filter string after ``-vf``. encode: list[str], str, optional The encoder name and options after ``-c:v``. output: list[str], str, default="-f null -" The final arguments of the ffmpeg command, after the encoder description. """ # initialisation video = pathlib.Path(video).expanduser() self._video = video self._general = self._decode = self._vid_filter = self._encode = self._output = None # parse and check, using setter self.general = kwargs.get("general") self.decode = kwargs.get("decode") self.vid_filter = kwargs.get("vid_filter", "") self.encode = kwargs.get("encode") self.output = kwargs.get("output")
[docs] def copy(self) -> typing.Self: """Return an independant copy of self.""" return CmdFFMPEG( video=self.video, general=self.general, decode=self.decode, vid_filter=self.vid_filter, encode=self.encode, output=self.output, )
@property def decode(self) -> list[str]: """Return the options between -c:v and -i.""" return self._decode.copy() @decode.setter def decode(self, decode: list[str] | str | None) -> None: """Update the options between -c:v and -i.""" match decode: case None: self._decode = [] case str(): self._decode = shlex.split(decode) case list(): assert all(isinstance(cmd, str) for cmd in decode), decode self._decode = decode.copy() case _: msg = f"'decode' has to be None, str or list, not {decode.__class__.__name__}" raise TypeError(msg) @property def encode(self) -> list[str]: """Return the encoder name and options after -c:v.""" return self._encode.copy() @encode.setter def encode(self, encode: list[str] | str | None) -> None: """Update the encoder name and options after -c:v.""" match encode: case None: self._encode = [] case str(): self._encode = shlex.split(encode) case list(): assert all(isinstance(cmd, str) for cmd in encode), encode self._encode = encode.copy() case _: msg = f"'encode' has to be None, str or list, not {encode.__class__.__name__}" raise TypeError(msg) @property def general(self) -> list[str]: """Return the options immediately after ffmpeg and immediately before the decoder.""" if self._general is None: return ["-hide_banner", "-y", "-loglevel", "verbose"] return self._general.copy() @general.setter def general(self, general: list[str] | str | None) -> None: """Update the options immediately after ffmpeg and immediately before the decoder.""" match general: case None: self._general = None case str(): self._general = shlex.split(general) case list(): assert all(isinstance(cmd, str) for cmd in general), general self._general = general.copy() case _: msg = f"'general' has to be None, str or list, not {general.__class__.__name__}" raise TypeError(msg) @property def output(self) -> list[str]: """Return the final arguments of the ffmpeg command, after the encoder.""" return self._output.copy() @output.setter def output(self, output: list[str] | str | None) -> None: """Update the final arguments of the ffmpeg command, after the encoder.""" match output: case None: self._output = ["-f", "null", "-"] case str(): self._output = shlex.split(output) case list(): assert all(isinstance(cmd, str) for cmd in output), output self._output = output.copy() case _: msg = f"'output' has to be None, str or list, not {output.__class__.__name__}" raise TypeError(msg)
[docs] def run(self, cmd: list[str] | str | None = None) -> tuple[str, dict[str]]: """Execute the command and return the stderr output. If a cmd is provided, it is used instead of self. """ # verification if cmd is None: cmd = list(self) elif isinstance(cmd, str): cmd = shlex.split(cmd) assert isinstance(cmd, list), cmd.__class__.__name__ # preparation of progress bar total = None if self.video.exists(): with contextlib.suppress(cutcutcodec.core.exceptions.MissingStreamError): total = round(float(cutcutcodec.get_duration_video(self.video)), 2) load = tqdm.tqdm( dynamic_ncols=True, leave=False, smoothing=1e-8, total=total, unit="s", ) # utilitaries def read_lines(stream: io.BufferedReader) -> bytes: r"""Yield each line (separated by \n or \r).""" line: bytes = b"" while buff := stream.read(64): *prev, line = re.split(br"\n|\r", line + buff) yield from prev if line: yield line # run command output: bytes = b"" with Activity() as activity, subprocess.Popen(cmd, stderr=subprocess.PIPE) as process: activity.start_slice("cmd") # more acurate than activity.get_full() for line in read_lines(process.stderr): if ( match := re.search( br"time=(?P<h>\d+):(?P<m>\d{1,2}):(?P<s>\d{1,2}\.\d*)", line, ) ) is None: output += line + b"\n" else: elapsed = round( 3600.0*float(match["h"]) + 60.0*float(match["m"]) + float(match["s"]), 2, ) load.total = None if load.total is None else max(load.total, elapsed) load.update(elapsed-load.n) activity.stop_slice() # if ommited, implicitely called avec process.__exit__ load.close() if process.returncode: msg = f"failed to execute {cmd}:\n{output}" raise RuntimeError(msg) return output.decode("utf-8"), activity.get_slices()["cmd"]
@property def vid_filter(self) -> str: """Return the filter string after -vf.""" return self._vid_filter @vid_filter.setter def vid_filter(self, vid_filter: str) -> None: """Update the filter string after -vf.""" assert isinstance(vid_filter, str), vid_filter.__class__.__name__ self._vid_filter = vid_filter @property def video(self) -> pathlib.Path: """Return the input video path.""" return self._video def __iter__(self) -> str: """Iterate over each parameter to be compatible with list(self).""" yield "ffmpeg" yield from self.general if self._decode: yield "-c:v" yield from self._decode yield "-i" yield str(self._video) if self._vid_filter: yield "-vf" yield self._vid_filter if self._encode: yield "-c:v" yield from self._encode yield from self._output def __str__(self) -> str: """Return the full shell cmd. Examples -------- >>> from mendevi.cmd import CmdFFMPEG >>> print(CmdFFMPEG(video="src.mp4")) ffmpeg -hide_banner -y -loglevel verbose -i src.mp4 -f null - >>> """ return " ".join(map(shlex.quote, self))