"""Parse and verify some user input."""
import importlib
import inspect
import math
import pathlib
import re
import typing
import click
import context_verbose
import Levenshtein
from context_verbose import Printer
from mendevi.cst.labels import LABELS
from mendevi.database.create import create_database, is_sqlite
from mendevi.database.meta import extract_names
from mendevi.utils import unfold_video_files
[docs]
class CallBackType(click.ParamType):
"""Parse the callback function."""
name = "callback"
[docs]
def convert(self, value: str, param: click.Option, ctx: click.Context) -> typing.Callable:
"""Normalize pixel format."""
# extract clues
if (func_name := re.search(r"\.py\?(?P<func>[a-zA-Z_]\w*)$", value)) is not None:
func_name = func_name["func"]
value = value[:-len(func_name)-1]
path = pathlib.Path(value)
if not path.exists():
self.fail(f"{path!r} does not exists", param, ctx)
# import the user file
spec = importlib.util.spec_from_file_location(path.stem, path)
modulevar = importlib.util.module_from_spec(spec)
spec.loader.exec_module(modulevar)
# find the function
funcs = dict(inspect.getmembers(modulevar, inspect.isfunction))
def is_local(func: typing.Callable, path: pathlib.Path) -> bool:
"""Return True if func is defined in the path file."""
try:
file = inspect.getfile(func)
except TypeError: # got builtin_function_or_method
return False
return pathlib.Path(file).resolve() == path.resolve()
funcs = {name: func for name, func in funcs.items() if is_local(func, path)}
if func_name is None:
if len(funcs) != 1:
self.fail(
f"please specifiy the function to use {', '.join(sorted(funcs))}", param, ctx,
)
func_name = next(iter(funcs))
if func_name not in funcs:
self.fail(
f"the func name {func_name!r} is not in {', '.join(sorted(funcs))}", param, ctx,
)
return funcs[func_name]
[docs]
class CopyRamType(click.ParamType):
"""Parse the ram copy format."""
name = "ram"
[docs]
def convert(self, value: str | float, param: click.Option, ctx: click.Context) -> float:
"""Normalize pixel format."""
if isinstance(value, str):
value = value.lower()
if (converted := {"yes": 0.0, "no": math.inf}.get(value)) is not None:
return converted
try:
value = float(value)
except ValueError as err:
self.fail(err, param, ctx)
if not isinstance(value, float):
self.fail(f"{value!r} has to be float", param, ctx)
if value < 0.0:
self.fail(f"{value!r} is not positive", param, ctx)
return value
[docs]
class PixelParamType(click.ParamType):
"""Parse the pixel format."""
name = "pixel"
[docs]
def convert(self, value: str, param: click.Option, ctx: click.Context) -> str:
"""Normalize pixel format."""
value = value.lower()
if not re.fullmatch(r"yuv4[24][024]p(?:10le|12le)?", value):
self.fail(f"{value!r} is not a valid pixel format", param, ctx)
return value
[docs]
class ResolutionParamType(click.ParamType):
"""Parse the resolution."""
name = "resolution"
[docs]
def convert(self, value: str, param: click.Option, ctx: click.Context) -> tuple[int, int]:
"""Convert a video resolution into tuple."""
if (match := re.search(
r"(?P<t1>he)?\D*(?P<d1>\d+)\D+?(?P<t2>w)?\D*(?P<d2>\d+)", value.lower(),
)) is None:
self.fail(f"{value!r} is not a valid video shape", param, ctx)
if match["t1"] == "he" or match["t2"] == "w":
return (int(match["d1"]), int(match["d2"]))
return (int(match["d2"]), int(match["d1"]))
def _guess_database(files: tuple[str]) -> str | None:
"""Try to find if one of the file is a database file."""
candidates: set[str] = set()
for file in files:
if is_sqlite(file):
candidates.add(file)
if len(candidates) == 1:
return candidates.pop()
if len(candidates) > 1:
msg = f"only one database must be provided, {candidates} are given"
raise ValueError(msg)
for file_ in files:
file = pathlib.Path(file_) / "mendevi.db"
if is_sqlite(file):
candidates.add(str(file))
if len(candidates) == 1:
return candidates.pop()
return None
[docs]
def parse_expr(expr: str | None, prt: Printer, start_msg: str) -> None:
"""Ensure that the expression provided is valid."""
if expr is None:
return
assert isinstance(expr, str), expr.__class__.__name__
variables = extract_names(expr) # retrieves the names of variables in the expression
if excess := variables - set(LABELS):
def dist(label: list[str], excess: set[str]) -> tuple[int, str]:
return (
min(Levenshtein.distance(label, e) for e in excess),
label,
)
msg = (
f"{', '.join(sorted(excess))} are not recognized variables.\n"
f"Do you mean {', '.join(sorted(LABELS, key=lambda lab: dist(lab, excess)))} ?"
)
raise ValueError(msg)
prt.print(f"{start_msg}: {expr}")
[docs]
def parse_videos_database(
prt: context_verbose.Printer,
videos: tuple[str],
database: str | None = None,
*,
_quiet: bool = False,
) -> tuple[list[pathlib.Path], pathlib.Path]:
"""Find or create the database and extract all the videos.
Parameters
----------
prt : context_verbose.Printer
The Printer instance to verbose the process.
videos : tuple[str]
The full pseudo pathlike video pointers.
database : str, optional
The provided link to the video.
Returns
-------
videos : list[pathlib.Path]
All the existing unfolded video files.
database : pathlib.Path
The existing database path.
"""
# test if database is provided
assert isinstance(videos, tuple), videos.__class__.__name__
assert all(isinstance(v, str) for v in videos), videos
database = database or _guess_database(videos)
# videos
videos = list(unfold_video_files(videos))
if len(videos) == 1 and not _quiet:
prt.print(f"video : {videos[0]}")
elif len(videos) > 1 and not _quiet:
prt.print(f"videos : {len(videos)} files founded")
# database
if not database:
database_candidates = {v.parent for v in videos}
database_candidates = {f / "mendevi.db" for f in database_candidates}
if not (database := sorted({d for d in database_candidates if d.is_file()})):
if len(database_candidates) != 1: # if no ambiguity, we can create it
msg = "please provide the database path"
raise ValueError(msg)
database = database_candidates
database = database.pop()
else:
database = pathlib.Path(database).expanduser()
database = database / "mendevi.db" if database.is_dir() else database
if not database.exists():
create_database(database)
if not _quiet:
prt.print(f"database : {database} (just created)")
elif not _quiet:
prt.print(f"database : {database}")
return videos, database