Source code for mendevi.database.meta
#!/usr/bin/env python3
"""Help to get the good extractor."""
import ast
import collections
import typing
from mendevi.database import extract
ExtractContext = collections.namedtuple("ExtractContext", ["label", "func", "is_log"])
[docs]
def extract_names(expr: str) -> set[str]:
"""Return all the symbols in the python expression.
Examples
--------
>>> from mendevi.database.meta import extract_names
>>> extract_names("foo")
{'foo'}
>>> extract_names("[i**2 for i in foo]"")
{'foo'}
>>> extract_names("foo.bar")
{'foo'}
>>> extract_names("bar(foo)")
{'foo'}
>>> extract_names("foo.bar()")
{'foo'}
>>>
"""
try:
nodes = list(ast.walk(ast.parse(expr, mode="exec")))
except SyntaxError as err:
raise SyntaxError(
f"the argument {expr!r} is not a valid python expression"
) from err
reject = {
n.id for n in nodes if isinstance(n, ast.Name) and isinstance(n.ctx, ast.Store | ast.Del)
} | {
n_.id
for n in nodes if isinstance(n, ast.Call) and not isinstance(n.func, ast.Attribute)
for n_ in ast.walk(n.func) if isinstance(n_, ast.Name)
}
candidates = {n.id for n in nodes if isinstance(n, ast.Name)}
names = set(candidates - reject) # set usefull for empty case
return names
[docs]
def get_extractor(name: str, safe: bool = False) -> ExtractContext:
"""Get the way to deserialize a raw value.
Parameters
----------
name : str
The label name.
safe : boolean, default=False
If True, retrun a stupid value instead of raising KeyError.
Returns
-------
label : str
The description of the physical quantity.
This description can be used to label the axes of a graph.
func : callable | str
The function that performs the verification and deserialisation task,
or the formula that allows you to find this quantity.
is_log : boolean or None
True to display in log space, False for linear.
The value None means the axis is not continuous.
"""
assert isinstance(name, str), name.__class__.__name__
assert isinstance(safe, bool), safe.__class__.__name__
extractor = None
match name: # catched by mendevi.cst.labels.extract_labels
case "act_duration":
return ExtractContext(
"Video processing activity duration in seconds",
extract.extract_act_duration,
False,
)
case "bitrate" | "rate":
return ExtractContext(
r"Video bitrate in $bit.s^{-1}$",
"8.0 * size / video_duration",
True,
)
case "codec":
return ExtractContext(
"Codec name",
extract.extract_codec,
None,
)
case "cores":
return ExtractContext(
"Average cumulative utilisation rate of logical cores",
extract.extract_cores,
False,
)
case "effort" | "preset":
return ExtractContext(
"Effort provided as a parameter to the encoder",
extract.extract_effort,
None,
)
case "enc_scenario":
return ExtractContext(
"Unique string specific to the encoding scenario",
extract.extract_enc_scenario,
None,
)
case "encoder":
return ExtractContext(
"Name of the encoder",
extract.extract_encoder,
None,
)
case "energy":
return ExtractContext(
"Total energy consumption in Joules",
"float((powers[0] * powers[1]).sum())",
True,
)
case "frames":
extractor = ExtractContext(
"The metadata of each frame",
extract.extract_frames,
None,
)
case "height":
extractor = ExtractContext(
"The height of images in pixels",
extract.extract_height,
False,
)
case "lpips":
extractor = ExtractContext(
"Learned Perceptual Image Patch Similarity (LPIPS)",
extract.extract_lpips,
False,
)
case "lpips_alex":
extractor = ExtractContext(
"Learned Perceptual Image Patch Similarity (LPIPS) with alex",
extract.extract_lpips_alex,
False,
)
case "lpips_vgg":
extractor = ExtractContext(
"Learned Perceptual Image Patch Similarity (LPIPS) with vgg",
extract.extract_lpips_vgg,
False,
)
case "power":
extractor = ExtractContext(
"Average power consumption in Watts",
"energy / float(powers[0].sum())",
False,
)
case "powers":
extractor = ExtractContext(
"The interval duration and the average power in each intervals",
extract.extract_powers,
None
)
case "mode":
extractor = ExtractContext(
"Bitrate mode, constant (cbr) or variable (vbr)",
extract.extract_mode,
None,
)
case "nb_frames" | "nbr_frames":
extractor = ExtractContext(
"The real number of frames of the video file",
"len(frames)",
True,
)
case "profile":
extractor = ExtractContext(
"Profile of the video",
"best_profile(height, width)",
None,
)
case "psnr":
extractor = ExtractContext(
"Peak Signal to Noise Ratio (PSNR)",
extract.extract_psnr,
False,
)
case "quality":
extractor = ExtractContext(
"Quality level passed to the encoder",
extract.extract_quality,
False,
)
case "shape":
extractor = ExtractContext(
"The image shapes height x width in pixels",
"(height, width)",
None,
)
case "ssim":
extractor = ExtractContext(
"Structural Similarity (SSIM)",
extract.extract_ssim,
False,
)
case "ssim_comp" | "comp_ssim" | "ssim_rev" | "rev_ssim":
extractor = ExtractContext(
"Complementary of Structural Similarity (1-SSIM)",
extract.extract_ssim_comp,
True,
)
case "threads":
extractor = ExtractContext(
"Number of threads provided as a parameter to the encoder",
extract.extract_threads,
False,
)
case "vmaf":
extractor = ExtractContext(
"Video Multi-Method Assessment Fusion (VMAF)",
extract.extract_vmaf,
False,
)
case "video_duration" | "vid_duration":
extractor = ExtractContext(
"Video duration in seconds",
extract.extract_video_duration,
False,
)
case "video_name" | "vid_name" | "name":
extractor = ExtractContext(
"Input video name",
extract.extract_video_name,
None,
)
case "video_size" | "vid_size" | "size":
extractor = ExtractContext(
"The total video file size in bytes",
extract.extract_video_size,
True,
)
case "width":
extractor = ExtractContext(
"The width of images in pixels",
extract.extract_height,
False,
)
if extractor is not None:
return extractor
if safe:
return ExtractContext(name, name, False)
raise KeyError(f"{name} is not recognised")
[docs]
def merge_extractors(labels: set[str], select: typing.Optional[str] = None) -> ast.Module:
r'''Return the source code of the function that extracts all variables.
Examples
--------
>>> from mendevi.database.meta import merge_extractors
>>> print("\n".join(merge_extractors({"rate", "profile"})[1]))
def line_extractor(raw: dict[str]) -> dict[str]:
"""Get the labels: profile, rate."""
# deserialisation of basic values
profile = extract.extract_profile(raw)
size = extract.extract_video_size(raw)
video_duration = extract.extract_video_duration(raw)
<BLANKLINE>
# association of basic values
rate = 8.0 * size / video_duration
<BLANKLINE>
# packaging
return {
'profile': profile
'rate': rate
}
>>>
'''
assert isinstance(labels, set), labels.__class__.__name__
assert all(isinstance(lbl, str) for lbl in labels), labels.__class__.__name__
if select is not None:
assert isinstance(select, str), select.__class__.__name__
def get_atom_tree(labels: set[str]) -> tuple[set[str], list[str]]:
"""Return the minimalist labels name and the way to associate them."""
lbl_atom: set[str] = set() # all atomic symbols
tree: list[str] = [] # intermediate symbols, in the correct order
lbl_func = {lbl: get_extractor(lbl).func for ls in labels for lbl in extract_names(ls)}
while lbl_func:
lbl_atom |= {lbl for lbl, f in lbl_func.items() if callable(f)}
tree = sorted(lbl for lbl in lbl_func if lbl not in lbl_atom) + tree
lbl_func = {
lbl: get_extractor(lbl).func
for ls in lbl_func.values() if isinstance(ls, str)
for lbl in extract_names(ls)
}
return lbl_atom, tree
# selector
if select is not None:
select_lbl_atom, select_tree = get_atom_tree(extract_names(select))
check_lines = [
" # exit if data are undesirable",
*(
f" {lbl} = extract.{get_extractor(lbl).func.__name__}(raw)"
for lbl in sorted(select_lbl_atom)
),
*(f" {lbl} = {get_extractor(lbl).func}" for lbl in select_tree),
f" if not ({select}):",
' raise RejectError("this line must be filtered")',
"",
]
select_tree = set(select_tree)
else:
select_lbl_atom = set()
select_tree = set()
check_lines = []
# final code, all together
lbl_atom, tree = get_atom_tree(labels)
code = [
"def line_extractor(raw: dict[str]) -> dict[str]:",
f' """Get the labels: {", ".join(sorted(labels))}."""',
*check_lines,
" # extract revelant values",
*(
f" {lbl} = extract.{get_extractor(lbl).func.__name__}(raw)"
for lbl in sorted(lbl_atom - select_lbl_atom) # limit redundancy
),
*(f" {lbl} = {get_extractor(lbl).func}" for lbl in tree if lbl not in select_tree),
"",
" # packaging",
" return {",
*(f" {lbl!r}: {lbl}," for lbl in sorted(labels)),
" }"
]
return lbl_atom | select_lbl_atom, code