Source code for mendevi.database.meta

"""Help to get the good extractor."""

import ast
import collections
import enum
import importlib
import pathlib
import tempfile
import typing
import uuid

import networkx as nx

from mendevi.database import extract


[docs] class Scale(enum.Flag): """Represent the nature of the variable, continuous or discrete.""" LINEAR = enum.auto() LOGARITHMIC = enum.auto() DISCRETE = enum.auto()
[docs] class Extractor(collections.abc.Mapping): """Correctly define a variable in a context. Examples -------- >>> from mendevi.database.meta import Scale, Extractor >>> Extractor("legend", "func", Scale.DISCRETE) Extractor('legend', 'func', Scale.DISCRETE) >>> {**_} {'legend': 'legend', 'func': 'func', 'scale': <Scale.DISCRETE: 4>} >>> """ def __init__(self, legend: str, func: typing.Callable | str, scale: Scale) -> None: """Create a dictlike named tuple.""" assert isinstance(legend, str), legend.__class__.__name__ assert callable(func) or isinstance(func, str), func.__class__.__name__ assert isinstance(scale, Scale), scale.__class__.__name__ self._legend, self._func, self._scale = legend, func, scale @property def legend(self) -> str: """Return the description.""" return self._legend @property def func(self) -> typing.Callable | str: """Return the function or the expression to compute the variable.""" return self._func @property def scale(self) -> Scale: """Return the kind of variable.""" return self._scale def __getitem__(self, item: str) -> object: """Return the attribute.""" return { "legend": self._legend, "func": self._func, "scale": self._scale, }[item] def __iter__(self) -> tuple: """Yield the keys.""" yield from ("legend", "func", "scale") def __len__(self) -> int: """Return the numbers of attributes.""" return 3 def __repr__(self) -> str: """Return a nice representation.""" return f"{self.__class__.__name__}({self._legend!r}, {self._func!r}, {self._scale})"
ALL_EXTRACTORS = { "act_duration": Extractor( "Video processing activity duration in seconds", extract.extract_act_duration, Scale.LINEAR, ), "act_duration_per_frame": Extractor( "Average video processing activity duration per frame in seconds", "None if act_duration is None or nbr_frames is None else 8.0 * act_duration / nbr_frames", Scale.LOGARITHMIC, ), "bitrate": Extractor( r"Video bitrate in $bit.s^{-1}$", "None if size is None or video_duration is None else 8.0 * size / video_duration", Scale.LOGARITHMIC, ), "category": Extractor( "3GPP TR 26.955, 5G Video Codec Characteristics", ( "{\n" ' "aov": "Gaming",\n' # ; Key: S5-R01",\n' ' "baolei_balloon": "Gaming",\n' # ; Key: S5-R04",\n' ' "baolei_man": "Gaming",\n' # ; Key: S5-R02",\n' ' "baolei_woman": "Gaming",\n' # ; Key: S5-R03",\n' ' "baolei_yard": "Gaming",\n' # ; Key: S5-R05",\n' ' "boat": "4K-TV",\n' # ; Key: S5-R5",\n' ' "bode_museum": "8K-TV",\n' # ; Key: S3-R16",\n' ' "brest_sedof": "4K-TV",\n' # ; Key: S5-R5",\n' ' "cosmos": "4K-TV",\n' # ; Key: S5-R5",\n' ' "cs_go": "Gaming",\n' # ; Key: S5-R12",\n' ' "elevator": "4K-TV",\n' # ; Key: S5-R5",\n' ' "fountain": "4K-TV",\n' # ; Key: S5-R5",\n' ' "graphics_mix_simple": "ScreenContent",\n' # ; Key: S3-R09",\n' ' "graphics_mix_transitions": "ScreenContent",\n' # ; Key: S3-R13",\n' ' "heroes_of_the_storm": "Gaming",\n' # ; Key: S5-R08",\n' ' "jianling_beach": "Gaming",\n' # ; Key: S5-R07",\n' ' "jianling_temple": "Gaming",\n' # ; Key: S5-R06",\n' ' "life_untouched": "4K-TV",\n' # ; Key: S5-R5",\n' ' "meridian": "4K-TV",\n' # ; Key: S5-R5",\n' ' "mine_craft": "Gaming",\n' # ; Key: S5-R11",\n' ' "mission_control": "ScreenContent",\n' # ; Key: S3-R17",\n' ' "mooving_text": "ScreenContent",\n' # ; Key: S3-R01",\n' ' "neon": "Messaging",\n' # ; Key: S3-R16",\n' ' "neptune_fountain_2": "8K-TV",\n' # ; Key: S3-R16",\n' ' "neptune_fountain_3": "8K-TV",\n' # ; Key: S3-R16",\n' ' "nocturne": "4K-TV",\n' # ; Key: S5-R5",\n' ' "oberbaum_spree": "8K-TV",\n' # ; Key: S3-R16",\n' ' "park_joy": "4K-TV",\n' # ; Key: S5-R5",\n' ' "project_cars": "Gaming",\n' # ; Key: S5-R09",\n' ' "quadriga_tree": "8K-TV",\n' # ; Key: S3-R16",\n' ' "rain_fruits": "4K-TV",\n' # ; Key: S5-R5",\n' ' "riverbank": "4K-TV",\n' # ; Key: S5-R5",\n' ' "skater": "Messaging",\n' # ; Key: S3-R16",\n' ' "soccer": "4K-TV",\n' # ; Key: S5-R5",\n' ' "sol_levante": "4K-TV",\n' # ; Key: S5-R5",\n' ' "sparks": "4K-TV",\n' # ; Key: S5-R5",\n' ' "star_craft": "Gaming",\n' # ; Key: S5-R13",\n' ' "subway_tree": "8K-TV",\n' # ; Key: S3-R16",\n' ' "text_mix_transition": "ScreenContent",\n' # ; Key: S3-R05",\n' ' "tiergarten_parkway": "8K-TV",\n' # ; Key: S3-R16",\n' ' "tunnel_flag": "4K-TV",\n' # ; Key: S5-R5",\n' ' "world_of_warcraft": "Gaming",\n' # ; Key: S5-R10",\n' "}.get(reference_video_stem, reference_video_stem)" ), Scale.DISCRETE, ), "codec": Extractor( "Codec name", extract.extract_codec, Scale.DISCRETE, ), "cores": Extractor( "Average cumulative utilisation rate of logical cores", extract.extract_cores, Scale.LINEAR, ), "decode_cmd": Extractor( "The ffmpeg command used for decoding", extract.extract_decode_cmd, Scale.DISCRETE, ), "decode_ram": Extractor( "Is the decoded video stored in RAM?", '"yes" if "/dev/shm" in decode_cmd else "no"', Scale.DISCRETE, ), "decode_scenario": Extractor( "Unique string specific to the decoding scenario", 'f"cmd: {decode_cmd}, hostname: {hostname}"', Scale.DISCRETE, ), "decoder": Extractor( "Name of the decoder", extract.extract_decoder, Scale.DISCRETE, ), "decoder_family": Extractor( "The type of decoder", '"cuvid" if str(decoder).endswith("_cuvid") else "cpu"', Scale.DISCRETE, ), "effort": Extractor( "Effort provided as a parameter to the encoder", extract.extract_effort, Scale.DISCRETE, ), "encode_cmd": Extractor( "The ffmpeg command used for encoding", extract.extract_encode_cmd, Scale.DISCRETE, ), "encode_ram": Extractor( "Is the encoded video stored in RAM?", '"yes" if "/dev/shm" in encode_cmd else "no"', Scale.DISCRETE, ), "encode_scenario": Extractor( "Unique string specific to the encoding scenario", 'f"cmd: {encode_cmd}, video_name: {video_name}, hostname: {hostname}"', Scale.DISCRETE, ), "encoder": Extractor( "Name of the encoder", extract.extract_encoder, Scale.DISCRETE, ), "encoder_family": Extractor( "The type of encoder", '"nvenc" if str(encoder).endswith("_nvenc") else "cpu"', Scale.DISCRETE, ), "energy": Extractor( "Total energy consumption in Joules", "None if powers is None else float((powers[0] * powers[1]).sum())", Scale.LOGARITHMIC, ), "energy_per_frame": Extractor( "Average energy consumption per frame in Joules", "None if energy is None or nbr_frames is None else energy / nbr_frames", Scale.LOGARITHMIC, ), "frames": Extractor( "The metadata of each frame", extract.extract_frames, Scale.DISCRETE, ), "gamut": Extractor( "The tristimulus primaries colors name", extract.extract_gamut, Scale.DISCRETE, ), "height": Extractor( "Height of images in pixels", extract.extract_height, Scale.LINEAR, ), "hostname": Extractor( "The machine name", extract.extract_hostname, Scale.DISCRETE, ), "lpips": Extractor( "Learned Perceptual Image Patch Similarity (LPIPS)", extract.extract_lpips, Scale.LINEAR, ), "lpips_alex": Extractor( "Learned Perceptual Image Patch Similarity (LPIPS) with alex", extract.extract_lpips_alex, Scale.LINEAR, ), "lpips_vgg": Extractor( "Learned Perceptual Image Patch Similarity (LPIPS) with vgg", extract.extract_lpips_vgg, Scale.LINEAR, ), "power": Extractor( "Average power consumption in Watts", "None if energy is None or powers is None else energy / float(powers[0].sum())", Scale.LINEAR, ), "powers": Extractor( "The interval duration and the average power in each intervals", extract.extract_powers, Scale.DISCRETE, ), "mode": Extractor( "Bitrate mode, constant (cbr) or variable (vbr)", extract.extract_mode, Scale.DISCRETE, ), "nbr_frames": Extractor( "The real number of frames of the video file", "None if frames is None else len(frames)", Scale.LOGARITHMIC, ), "profile": Extractor( "Profile of the video", ( "None if height is None and width is None else " "best_profile(height or width, width or height)" ), Scale.DISCRETE, ), "psnr": Extractor( "Peak Signal to Noise Ratio (PSNR)", extract.extract_psnr, Scale.LINEAR, ), "quality": Extractor( "Quality level passed to the encoder", extract.extract_quality, Scale.LINEAR, ), "range": Extractor( "Video encoding color range, 'tv' or 'pc'", extract.extract_range, Scale.DISCRETE, ), "reference_video_stem": Extractor( "Input video compact stem", extract.extract_reference_video_stem, Scale.DISCRETE, ), "rms_sobel": Extractor( "Spatial root mean square sobel gradient complexity", extract.extract_rms_sobel, Scale.LINEAR, ), "rms_time_diff": Extractor( "Temporal root means square time difference complexity", extract.extract_rms_time_diff, Scale.LINEAR, ), "shape": Extractor( "The image shapes height x width in pixels", "(height, width)", Scale.DISCRETE, ), "spatial_dct": Extractor( "Spatial DCT complexity", extract.extract_spatial_dct, Scale.LINEAR, ), "ssim": Extractor( "Structural Similarity (SSIM)", extract.extract_ssim, Scale.LINEAR, ), "ssim_comp": Extractor( "Complementary of Structural Similarity (1-SSIM)", "None if ssim is None else 1.0 - ssim", Scale.LOGARITHMIC, ), "temp": Extractor( "Average temperature in C", extract.extract_temp, Scale.LINEAR, ), "temporal_dct": Extractor( "Temporal DCT complexity", extract.extract_temporal_dct, Scale.LINEAR, ), "threads": Extractor( "Number of threads provided as a parameter to the encoder", extract.extract_threads, Scale.LINEAR, ), "transfer": Extractor( "The non-linear transfer function name", extract.extract_transfer, Scale.DISCRETE, ), "vmaf": Extractor( "Video Multi-Method Assessment Fusion (VMAF)", extract.extract_vmaf, Scale.LINEAR, ), "video_duration": Extractor( "Video duration in seconds", extract.extract_video_duration, Scale.LINEAR, ), "video_hash": Extractor( "The hexadecimal md5 video file checksum", extract.extract_video_hash, Scale.DISCRETE, ), "video_name": Extractor( "Full video basename", extract.extract_video_name, Scale.DISCRETE, ), "video_size": Extractor( "The total video file size in bytes", extract.extract_video_size, Scale.LOGARITHMIC, ), "width": Extractor( "Width of images in pixels", extract.extract_width, # comme dans ton code Scale.LINEAR, ), } ALIAS = { "actdur": "act_duration", "color_primaries": "gamut", "color_transfer": "transfer", "comp_ssim": "ssim_comp", "dec_cmd": "decode_cmd", "dec_ram": "decode_ram", "dec_scenario": "decode_scenario", "decoder_type": "decoder_family", "enc_cmd": "encode_cmd", "enc_ram": "encode_ram", "enc_scenario": "encode_scenario", "encoder_type": "encoder_family", "eotf": "transfer", "name": "video_name", "nb_frames": "nbr_frames", "preset": "effort", "prim": "gamut", "primaries": "gamut", "rate": "bitrate", "ref_stem": "reference_video_stem", "ref_vid_stem": "reference_video_stem", "ref_video_stem": "reference_video_stem", "reference_stem": "reference_video_stem", "reference_vid_stem": "reference_video_stem", "rev_ssim": "ssim_comp", "size": "video_size", "ssim_rev": "ssim_comp", "temperature": "temp", "trans": "transfer", "vid_duration": "video_duration", "vid_hash": "video_hash", "vid_md5": "video_hash", "vid_name": "video_name", "vid_size": "video_size", "video_md5": "video_hash", } def _import_extractor(code: list[str]) -> typing.Callable: """Import the function line_extractor.""" code = [ "import hashlib", "import math", "import re", "", "from mendevi.exceptions import RejectError", "from mendevi.utils import best_profile, uniform", "import mendevi.database.extract as extract", "", *code, ] path = pathlib.Path(tempfile.gettempdir()) / f"{uuid.uuid4().hex}.py" with path.open("w", encoding="utf-8") as file: file.write("\n".join(code)) spec = importlib.util.spec_from_file_location(path.stem, path) modulevar = importlib.util.module_from_spec(spec) spec.loader.exec_module(modulevar) path.unlink() return modulevar.line_extractor def _print_branch(leaf: str, skip: set[str], graph: nx.DiGraph) -> list[str]: """Return the code to extract a branch.""" branch_code: list[str] = [] predecessors = {leaf} - skip while predecessors: skip |= predecessors for lbl in sorted(predecessors): # sorted for reproductibility and lisibility legend, func = graph.nodes[lbl]["legend"], graph.nodes[lbl]["func"] branch_code.insert( 0, ( f" {lbl} = extract.{func.__name__}(raw) # {legend}" if callable(func) else f" {lbl} = {func} # {legend}" ), ) predecessors = {pp for p in predecessors for pp in graph.predecessors(p)} - skip return branch_code
[docs] def extract_names(expr: str) -> set[str]: """Return all the symbols in the python expression. Examples -------- >>> from mendevi.database.meta import extract_names >>> extract_names("foo") {'foo'} >>> extract_names("[i**2 for i in foo]") {'foo'} >>> extract_names("foo.bar") {'foo'} >>> extract_names("bar(foo)") {'foo'} >>> extract_names("foo.bar()") {'foo'} >>> """ modules: set[str] = {"re", "math"} try: nodes = list(ast.walk(ast.parse(expr, mode="exec"))) except SyntaxError as err: msg = f"the argument {expr!r} is not a valid python expression" raise SyntaxError( msg, ) from err reject = { n.id for n in nodes if isinstance(n, ast.Name) and isinstance(n.ctx, ast.Store | ast.Del) } | { n_.id for n in nodes if isinstance(n, ast.Call) and not isinstance(n.func, ast.Attribute) for n_ in ast.walk(n.func) if isinstance(n_, ast.Name) } | modules candidates = {n.id for n in nodes if isinstance(n, ast.Name)} return set(candidates - reject) # set usefull for empty case
[docs] def get_extractor(name: str, *, safe: bool = False) -> Extractor: """Get the way to deserialize a raw value. Parameters ---------- name : str The label name. safe : boolean, default=False If True, retrun a stupid value instead of raising KeyError. Returns ------- extractor : Extractor The label, func and scale. """ assert isinstance(name, str), name.__class__.__name__ assert isinstance(safe, bool), safe.__class__.__name__ # case log if name.startswith("log_"): extractor = get_extractor(name[4:], safe=safe) return Extractor( f"Log 10 of {extractor.legend.lower()}", f"None if {name[4:]} is None else math.log10({name[4:]})", Scale.LINEAR, ) # case direct if (extractor := ALL_EXTRACTORS.get(ALIAS.get(name, name))) is not None: return extractor if safe: return Extractor(name, name, Scale.LINEAR) msg = f"{name} is not recognised" raise KeyError(msg)
[docs] def merge_extractors( labels: set[str], alias: dict[str, str] | None = None, select: str | None = None, *, return_callable: bool = False, ) -> tuple[set[str], list[str] | typing.Callable]: r'''Return the source code of the function that extracts all variables. Parameters ---------- labels : set[str] The returned variable names. These are the keys to the output dictionary. alias : dict[str, str], optional To each new name, associate an expression. By default, the label extraction method is defined by the function :py:func:`get_extractor`. This list of aliases allows any unknown key to define a customised access method. select : str, optional A Python Boolean expression that raises a RejectError exception if it evaluates to False. return_callable : boolean, default=False By default, returns the source code of the function. If this option is set to True, an executable function is returned. Returns ------- lbls_atom : set[str] The name of the primary value to be extracted for the SQL query. func : list[str] or callable The function that consumes a line from the SQL query, and returns the dictionary of extracted values. Examples -------- >>> from mendevi.database.meta import merge_extractors >>> labels = {"enc_scenario", "rate", "power", "foo"} >>> alias = {"foo": "(abaca, energy)", "abaca": "(hostname, nbr_frames, enc_scenario)"} >>> select = '"yeti" not in hostname' >>> print("\n".join(merge_extractors(labels, alias, select)[1])) # doctest: +ELLIPSIS def line_extractor(raw: dict[str]) -> dict[str]: """Get the labels: enc_scenario, foo, power, rate. <BLANKLINE> Causality constraint: * abaca -> foo * enc_scenario -> abaca * encode_cmd -> enc_scenario * energy -> foo * energy -> power * frames -> nbr_frames * hostname -> abaca * hostname -> enc_scenario * hostname -> reject * nbr_frames -> abaca * powers -> energy * powers -> power * size -> rate * video_duration -> rate * video_name -> enc_scenario """ # reject wrong line hostname = extract.extract_hostname(raw) # The machine name reject = not ("yeti" not in hostname) # not ("yeti" not in hostname) if reject: msg = "this line must be filtered" raise RejectError(msg) <BLANKLINE> # extract data video_name = extract.extract_video_name(raw) # Full video basename encode_cmd = extract.extract_encode_cmd(raw) # The ffmpeg command used for encoding enc_scenario = f"cmd: {encode_cmd}, video_name: {video_name}, hostname: {hostname}" # Un... frames = extract.extract_frames(raw) # The metadata of each frame powers = extract.extract_powers(raw) # The interval duration and the average power in ea... nbr_frames = None if frames is None else len(frames) # The real number of frames of the ... energy = None if powers is None else float((powers[0] * powers[1]).sum()) # Total energy... abaca = (hostname, nbr_frames, enc_scenario) # alias foo = (abaca, energy) # alias power = None if energy is None or powers is None else energy / float(powers[0].sum()) # ... video_duration = extract.extract_video_duration(raw) # Video duration in seconds size = extract.extract_video_size(raw) # The total video file size in bytes rate = None if size is None or video_duration is None else 8.0 * size / video_duration #... <BLANKLINE> return { 'enc_scenario': enc_scenario, 'foo': foo, 'power': power, 'rate': rate, } >>> ''' assert isinstance(labels, set), labels.__class__.__name__ assert all(isinstance(lbl, str) for lbl in labels), labels alias = alias or {} assert isinstance(alias, dict), alias.__class__.__name__ assert select is None or isinstance(select, str), select.__class__.__name__ extractors = nx.DiGraph() def get_alias_extractor(alias: dict, lbl: str) -> Extractor: return ( Extractor("alias", alias[lbl], get_extractor(alias[lbl], safe=True).scale) if lbl in alias else get_extractor(lbl, safe=True) ) # initialise the graph with final leaves for label in labels: extractors.add_node(label, **get_alias_extractor(alias, label)) # extractors.add_nodes_from(labels) # adding leaves if select is not None: extractors.add_node( "reject", **get_extractor(f"not ({select})", safe=True), ) # construct the full tree while nodes := [ n for n, deg in extractors.in_degree() if deg == 0 and not callable(extractors.nodes[n]["func"]) ]: for node in nodes: for root in extract_names(alias.get(node, extractors.nodes[node]["func"])): extractors.add_node(root, **get_alias_extractor(alias, root)) extractors.add_edge(root, node) # # draw graph for debug: sudo apt install graphviz graphviz-dev && uv pip install pygraphviz # with open("/tmp/extractors.dot", "w") as file: # for node in extractors: # extractors.nodes[node]["label"] = ( # f"{node}\\n{'\\n'.join(f'{k}:{v!r}' for k, v in extractors.nodes[node].items())}" # ) # file.write(nx.nx_agraph.to_agraph(extractors).string()) # verification of undefinded variable or cycle if cycles := list(nx.simple_cycles(extractors)): msg = ( f"The extraction graph has cycles: {cycles}, " "which means that variables are not defined. " "You must specify an alias to break all cycles." ) raise ValueError(msg) # create the source code # 1) header code = [ "def line_extractor(raw: dict[str]) -> dict[str]:", f' """Get the labels: {", ".join(sorted(labels))}.', "", " Causality constraint:", *(f" * {prev} -> {node}" for prev, node in sorted(extractors.edges)), ' """', ] skip: set[str] = set() # processed nodes # 2) reject bad line if select is not None: code.extend([ " # reject wrong line", *_print_branch("reject", skip, extractors), " if reject:", ' msg = "this line must be filtered"', " raise RejectError(msg)", "", ]) # 3) extract the tree for all labels code.extend([ " # extract data", *(line for leaf in sorted(labels) for line in _print_branch(leaf, skip, extractors)), "", ]) # 4) cast and return the values code.extend([ " return {", *(f" {lbl!r}: {lbl}," for lbl in sorted(labels)), " }", ]) # import source code return ( {n for n in extractors if callable(extractors.nodes[n]["func"])}, (_import_extractor(code) if return_callable else code), )