Source code for mendevi.database.meta

#!/usr/bin/env python3

"""Help to get the good extractor."""

import ast
import collections
import importlib
import pathlib
import tempfile
import uuid

from mendevi.database import extract

ExtractContext = collections.namedtuple("ExtractContext", ["label", "func", "is_log"])


[docs] def extract_names(expr: str) -> set[str]: """Return all the symbols in the python expression. Examples -------- >>> from mendevi.database.meta import extract_names >>> extract_names("foo") {'foo'} >>> extract_names("[i**2 for i in foo]"") {'foo'} >>> extract_names("foo.bar") {'foo'} >>> extract_names("bar(foo)") {'foo'} >>> extract_names("foo.bar()") {'foo'} >>> """ try: nodes = list(ast.walk(ast.parse(expr, mode="exec"))) except SyntaxError as err: raise SyntaxError( f"the argument {expr!r} is not a valid python expression", ) from err reject = { n.id for n in nodes if isinstance(n, ast.Name) and isinstance(n.ctx, ast.Store | ast.Del) } | { n_.id for n in nodes if isinstance(n, ast.Call) and not isinstance(n.func, ast.Attribute) for n_ in ast.walk(n.func) if isinstance(n_, ast.Name) } candidates = {n.id for n in nodes if isinstance(n, ast.Name)} names = set(candidates - reject) # set usefull for empty case return names
[docs] def get_extractor(name: str, safe: bool = False) -> ExtractContext: """Get the way to deserialize a raw value. Parameters ---------- name : str The label name. safe : boolean, default=False If True, retrun a stupid value instead of raising KeyError. Returns ------- label : str The description of the physical quantity. This description can be used to label the axes of a graph. func : callable | str The function that performs the verification and deserialisation task, or the formula that allows you to find this quantity. is_log : boolean or None True to display in log space, False for linear. The value None means the axis is not continuous. """ assert isinstance(name, str), name.__class__.__name__ assert isinstance(safe, bool), safe.__class__.__name__ extractor = None match name: # catched by mendevi.cst.labels.extract_labels case "act_duration": return ExtractContext( "Video processing activity duration in seconds", extract.extract_act_duration, False, ) case "bitrate" | "rate": return ExtractContext( r"Video bitrate in $bit.s^{-1}$", "None if size is None or video_duration is None else 8.0 * size / video_duration", True, ) case "codec": return ExtractContext( "Codec name", extract.extract_codec, None, ) case "cores": return ExtractContext( "Average cumulative utilisation rate of logical cores", extract.extract_cores, False, ) case "decode_cmd" | "dec_cmd": return ExtractContext( "The ffmpeg command used for decoding", extract.extract_decode_cmd, None, ) case "decode_scenario" | "dec_scenario": return ExtractContext( "Unique string specific to the decoding scenario", 'f"cmd: {decode_cmd}, hostname: {hostname}"', None, ) case "effort" | "preset": return ExtractContext( "Effort provided as a parameter to the encoder", extract.extract_effort, None, ) case "encode_cmd" | "enc_cmd": return ExtractContext( "The ffmpeg command used for encoding", extract.extract_encode_cmd, None, ) case "encode_scenario" | "enc_scenario": return ExtractContext( "Unique string specific to the encoding scenario", 'f"cmd: {encode_cmd}, video_name: {video_name}, hostname: {hostname}"', None, ) case "encoder": return ExtractContext( "Name of the encoder", extract.extract_encoder, None, ) case "energy": return ExtractContext( "Total energy consumption in Joules", "float((powers[0] * powers[1]).sum())", True, ) case "energy_per_frame": return ExtractContext( "Average energy consumption per frame in Joules", "energy / nbr_frames", True, ) case "frames": extractor = ExtractContext( "The metadata of each frame", extract.extract_frames, None, ) case "height": extractor = ExtractContext( "Height of images in pixels", extract.extract_height, False, ) case "gamut" | "prim" | "primaries" | "color_primaries": extractor = ExtractContext( "The tristimulus primaries colors name", extract.extract_gamut, None, ) case "hostname": extractor = ExtractContext( "The machine name", extract.extract_hostname, None, ) case "lpips": extractor = ExtractContext( "Learned Perceptual Image Patch Similarity (LPIPS)", extract.extract_lpips, False, ) case "lpips_alex": extractor = ExtractContext( "Learned Perceptual Image Patch Similarity (LPIPS) with alex", extract.extract_lpips_alex, False, ) case "lpips_vgg": extractor = ExtractContext( "Learned Perceptual Image Patch Similarity (LPIPS) with vgg", extract.extract_lpips_vgg, False, ) case "power": extractor = ExtractContext( "Average power consumption in Watts", "energy / float(powers[0].sum())", False, ) case "powers": extractor = ExtractContext( "The interval duration and the average power in each intervals", extract.extract_powers, None, ) case "mode": extractor = ExtractContext( "Bitrate mode, constant (cbr) or variable (vbr)", extract.extract_mode, None, ) case "nb_frames" | "nbr_frames": extractor = ExtractContext( "The real number of frames of the video file", "len(frames)", True, ) case "profile": extractor = ExtractContext( "Profile of the video", ( "None if height is None and width is None else " "best_profile(height or width, width or height)" ), None, ) case "psnr": extractor = ExtractContext( "Peak Signal to Noise Ratio (PSNR)", extract.extract_psnr, False, ) case "quality": extractor = ExtractContext( "Quality level passed to the encoder", extract.extract_quality, False, ) case "range": extractor = ExtractContext( "Video encoding color range, 'tv' or 'pc'", extract.extract_range, None, ) case ( "reference_video_name" | "reference_vid_name" | "reference_name" | "ref_video_name" | "ref_vid_name" | "ref_name" ): extractor = ExtractContext( "Input video basename", extract.extract_reference_video_name, None, ) case "rms_sobel": extractor = ExtractContext( "Spacial root mean square sobel gradient complexity", extract.extract_rms_sobel, False, ) case "rms_time_diff": extractor = ExtractContext( "Temporal root means square time difference complexity", extract.extract_rms_time_diff, False, ) case "shape": extractor = ExtractContext( "The image shapes height x width in pixels", "(height, width)", None, ) case "ssim": extractor = ExtractContext( "Structural Similarity (SSIM)", extract.extract_ssim, False, ) case "ssim_comp" | "comp_ssim" | "ssim_rev" | "rev_ssim": extractor = ExtractContext( "Complementary of Structural Similarity (1-SSIM)", "1.0 - ssim", True, ) case "threads": extractor = ExtractContext( "Number of threads provided as a parameter to the encoder", extract.extract_threads, False, ) case "transfer" | "trans" | "color_transfer" | "eotf": extractor = ExtractContext( "The non-linear transfer function name", extract.extract_transfer, None, ) case "vmaf": extractor = ExtractContext( "Video Multi-Method Assessment Fusion (VMAF)", extract.extract_vmaf, False, ) case "video_duration" | "vid_duration": extractor = ExtractContext( "Video duration in seconds", extract.extract_video_duration, False, ) case "video_hash" | "vid_hash" | "video_md5" | "vid_md5": extractor = ExtractContext( "The hexadecimal md5 video file checksum", extract.extract_video_hash, None, ) case "video_name" | "vid_name" | "name": extractor = ExtractContext( "Video basename", extract.extract_video_name, None, ) case "video_size" | "vid_size" | "size": extractor = ExtractContext( "The total video file size in bytes", extract.extract_video_size, True, ) case "width": extractor = ExtractContext( "Width of images in pixels", extract.extract_height, False, ) if extractor is not None: return extractor if safe: return ExtractContext(name, name, False) raise KeyError(f"{name} is not recognised")
[docs] def merge_extractors( labels: set[str], alias: dict[str, str] | None = None, select: str | None = None, return_callable: bool = False, ) -> tuple[set[str]]: r'''Return the source code of the function that extracts all variables. Parameters ---------- labels : set[str] The returned variable names. These are the keys to the output dictionary. alias : dict[str, str], optional By default, the label extraction method is defined by the function :py:func:`get_extractor`. This list of aliases allows any unknown key to define a customised access method. select : str, optional A Python Boolean expression that raises a RejectError exception if it evaluates to False. return_callable : boolean, default=False By default, returns the source code of the function. If this option is set to True, an executable function is returned. Returns ------- lbls_atom : set[str] The name of the primary value to be extracted for the SQL query. func : list[str] or callable The function that consumes a line from the SQL query, and returns the dictionary of extracted values. Examples -------- >>> from mendevi.database.meta import merge_extractors >>> print("\n".join(merge_extractors({"rate", "enc_scenario"}, select="'yeti' in hostname")[1])) def line_extractor(raw: dict[str]) -> dict[str]: """Get the labels: enc_scenario, rate, reject.""" hostname = extract.extract_hostname(raw) reject = not ('yeti' in hostname) if reject: raise RejectError("this line must be filtered") encode_cmd = extract.extract_encode_cmd(raw) size = extract.extract_video_size(raw) video_name = extract.extract_video_name(raw) video_duration = extract.extract_video_duration(raw) enc_scenario = f"cmd: {encode_cmd}, video_name: {video_name}, hostname: {hostname}" rate = None if size is None or video_duration is None else 8.0 * size / video_duration return { 'enc_scenario': enc_scenario, 'rate': rate, 'reject': reject, } ''' assert isinstance(labels, set), labels.__class__.__name__ assert all(isinstance(lbl, str) for lbl in labels), labels.__class__.__name__ alias = (alias or {}).copy() assert isinstance(alias, dict), alias.__class__.__name__ assert all(isinstance(k, str) for k in alias), alias assert "reject" not in alias, "'reject' is a forbidden key for 'alias'" if select is not None: assert isinstance(select, str), select.__class__.__name__ # recursively extracts all steps # 1) initialisation of the tree leaves for lbl in labels: alias[lbl] = alias.get(lbl, get_extractor(lbl).func) labels = sorted(alias) if select is not None: alias["reject"] = f"not ({select})" # 2) recursive exploration of the tree to find the roots leaves = {leave for leave, expr in alias.items() if not callable(expr)} while leaves: new_alias = { lbl: get_extractor(lbl).func for branch in leaves for lbl in extract_names(alias[branch]) if lbl not in alias } alias |= new_alias leaves = {leave for leave, expr in new_alias.items() if not callable(expr)} # 3) keep all roots in meomry lbls_atom = {root for root, func in alias.items() if callable(func)} # organising lines in the correct order def get_roots(alias: dict[str], leave: str) -> str: """Explore the tree and return a root.""" if callable(alias[leave]): yield leave elif subtree := { root for lbl in extract_names(alias[leave]) if lbl in alias for root in get_roots(alias, lbl) }: yield from sorted(subtree) else: # case not callable but not subtree ever yield leave # 1) extract 'reject' first, go strait on it tree: list[tuple[str, str]] = [] # orderd dict while "reject" in alias: root = next(iter(get_roots(alias, "reject"))) tree.append((root, alias.pop(root))) # 2) extract the others while alias: for lbl in labels: if lbl in alias: root = next(iter(get_roots(alias, lbl))) tree.append((root, alias.pop(root))) # print the main functions code = [ "def line_extractor(raw: dict[str]) -> dict[str]:", f' """Get the labels: {", ".join(sorted(labels))}."""', ] for lbl, func in tree: if callable(func): code.append(f" {lbl} = extract.{func.__name__}(raw)") else: code.append(f" {lbl} = {func}") if lbl == "reject": code.extend([ " if reject:", ' raise RejectError("this line must be filtered")', ]) code.extend([ " return {", *(f" {lbl!r}: {lbl}," for lbl in labels), " }", ]) if not return_callable: return lbls_atom, code # import the source code as a function code = [ "from mendevi.utils import best_profile", "import mendevi.database.extract as extract", "", *code, ] path = pathlib.Path(tempfile.gettempdir()) / f"{uuid.uuid4().hex}.py" with open(path, "w", encoding="utf-8") as file: file.write("\n".join(code)) spec = importlib.util.spec_from_file_location(path.stem, path) modulevar = importlib.util.module_from_spec(spec) spec.loader.exec_module(modulevar) path.unlink() return lbls_atom, modulevar.line_extractor