Source code for mendevi.database.extract

"""Define the functions that enable values to be extracted from a select query."""

import functools
import numbers
import pathlib
import re
import typing

import numpy as np
import orjson

from mendevi.cst.profiles import PROFILES
from mendevi.database.serialize import binary_to_list, binary_to_tensor

JOIN: dict[str: dict[str, str]] = {  # join = JOIN[destination_table][source_table]
    "t_vid_video": {
        "t_dec_decode": "JOIN t_vid_video ON t_dec_decode.dec_vid_id = t_vid_video.vid_id",
        "t_enc_encode": "JOIN t_vid_video ON t_enc_encode.enc_dst_vid_id = t_vid_video.vid_id",
        "t_met_metric": "JOIN t_vid_video ON t_met_metric.met_dis_vid_id = t_vid_video.vid_id",
    },
    "t_enc_encode": {
        "t_dec_decode": (
            # rather "JOIN t_enc_encode ON t_dec_decode.dec_vid_id = t_enc_encode.enc_dst_vid_id"
            # this requets select only one line of t_enc_encode
            "JOIN (\n"
            "    SELECT t_enc_encode.enc_dst_vid_id, MIN(enc_id) AS enc_id_min\n"
            "    FROM t_enc_encode\n"
            "    GROUP BY enc_dst_vid_id\n"
            ") AS t_enc_encode_single_dst_vid_id "
            "ON t_dec_decode.dec_vid_id = t_enc_encode_single_dst_vid_id.enc_dst_vid_id\n"
            "JOIN t_enc_encode ON t_enc_encode.enc_id = t_enc_encode_single_dst_vid_id.enc_id_min"
        ),
        "t_met_metric": "JOIN t_enc_encode ON t_act_activity.act_id = t_enc_encode.enc_act_id",
    },
    "t_dec_decode": {
        "t_enc_encode": (
            "JOIN t_dec_decode ON t_enc_encode.enc_dst_vid_id = t_dec_decode.dec_vid_id"
        ),
    },
    "t_met_metric": {
        "t_vid_video": "LEFT JOIN t_met_metric ON t_vid_video.vid_id = t_met_metric.met_dis_vid_id",
        "t_enc_encode": (
            "LEFT JOIN t_met_metric ON t_enc_encode.enc_dst_vid_id = t_met_metric.met_dis_vid_id "
            "AND t_enc_encode.enc_src_vid_id = t_met_metric.met_ref_vid_id"
        ),
        "t_dec_decode": (
            "LEFT JOIN t_met_metric ON t_dec_decode.dec_vid_id = t_met_metric.met_dis_vid_id"
        ),
    },
    "t_env_environment": {
        "t_dec_decode": (
            "JOIN t_env_environment ON t_dec_decode.dec_env_id = t_env_environment.env_id"
        ),
        "t_enc_encode": (
            "JOIN t_env_environment ON t_enc_encode.enc_env_id = t_env_environment.env_id"
        ),
    },
    "t_act_activity": {
        "t_dec_decode": "JOIN t_act_activity ON t_dec_decode.dec_act_id = t_act_activity.act_id",
        "t_enc_encode": "JOIN t_act_activity ON t_enc_encode.enc_act_id = t_act_activity.act_id",
        # "t_met_metric": (  # via t_dec_decode
        #     "JOIN t_dec_decode ON t_act_activity.act_id = t_dec_decode.dec_act_id "
        #     "JOIN t_met_metric ON t_dec_decode.dec_vid_id = t_met_metric.met_dis_vid_id"
        # ),
        # "t_met_metric": (  # via t_enc_encode, link on the encoded video
        #     "JOIN t_enc_encode ON t_act_activity.act_id = t_enc_encode.enc_act_id "
        #     "JOIN t_met_metric ON t_enc_encode.enc_dst_vid_id = t_met_metric.met_dis_vid_id"
        # ),
        "t_met_metric": (
            "JOIN t_vid_video AS t_dis_video ON t_met_metric.met_dis_vid_id = t_dis_video.vid_id "
            "JOIN t_enc_encode AS t_met_encode ON t_dis_video.vid_id = t_met_encode.enc_dst_vid_id "
            "JOIN t_act_activity ON t_met_encode.enc_act_id = t_act_activity.act_id"
        ),
    },
    "t_idle": {
        "t_env_environment": (
            "JOIN t_act_activity AS t_idle "
            "ON t_env_environment.env_idle_act_id = t_idle.act_id"
        ),
    },
    "t_ref_video": {  # the reference video
        "t_enc_encode": (
            "JOIN t_vid_video AS t_ref_video "
            "ON t_enc_encode.enc_src_vid_id = t_ref_video.vid_id"
        ),
        "t_dec_decode": (  # to avoid redundency, we may define only atomic steps then link them
            "JOIN (\n"
            "    SELECT t_enc_encode.enc_dst_vid_id, MIN(enc_id) AS enc_id_min\n"
            "    FROM t_enc_encode\n"
            "    GROUP BY enc_dst_vid_id\n"
            ") AS t_enc_encode_single_dst_vid_id_from_dec "
            "ON t_dec_decode.dec_vid_id = t_enc_encode_single_dst_vid_id_from_dec.enc_dst_vid_id\n"
            "JOIN t_enc_encode AS t_enc_from_dec "
            "ON t_enc_from_dec.enc_id = t_enc_encode_single_dst_vid_id_from_dec.enc_id_min\n"
            "JOIN t_vid_video AS t_ref_video "
            "ON t_enc_from_dec.enc_src_vid_id = t_ref_video.vid_id"
        ),
        "t_met_metric": (
            "JOIN t_vid_video AS t_ref_video "
            "ON t_met_metric.met_ref_vid_id = t_ref_video.vid_id"
        ),
        "t_vid_video": (
            "JOIN t_vid_video AS t_ref_video ON t_vid_video.vid_id = t_ref_video.vid_id"
        ),
    },
    "t_dst_video": {  # the transcoded video
        "t_enc_encode": (
            "JOIN t_vid_video AS t_dst_video "
            "ON t_enc_encode.enc_dst_vid_id = t_dst_video.vid_id"
        ),
        "t_dec_decode": (
            "JOIN t_vid_video AS t_dst_video "
            "ON t_dec_decode.dec_vid_id = t_dst_video.vid_id"
        ),
        "t_met_metric": (
            "JOIN t_vid_video AS t_dst_video "
            "ON t_met_metric.met_dis_vid_id = t_dst_video.vid_id"
        ),
        "t_vid_video": (
            "JOIN t_vid_video AS t_dst_video ON t_vid_video.vid_id = t_dst_video.vid_id"
        ),
    },
}
# JOIN["t_act_activity"]["t_met_metric"] = (
#     f"{JOIN['t_enc_encode']['t_met_metric']}\n"
#     f"{JOIN['t_act_activity']['t_enc_encode']}"
# )
# JOIN["t_ref_video"]["t_dec_decode"] = (  # and example of path
#     f"{JOIN['t_enc_encode']['t_dec_decode']}\n"
#     f"{JOIN['t_ref_video']['t_enc_encode']}"
# )


[docs] class SqlLinker: """Allow you to add an SQL query to an extractor.""" def __init__(self, *select: str) -> None: """Initialise the linker. Parameters ---------- select : args[str] The fields to be returned (juste after SELECT), with the optional alias. """ assert all(isinstance(s, str) for s in select), select self.select: list[str] = sorted(set(select)) @property def sql(self) -> str: """Write the sql request.""" # find all possible junctions dst_tables = {s.split(".")[0] for s in self.select} joins: dict[str] = {} for src_table in {t for j in JOIN.values() for t in j}: join: set[str] = set() for dst_table in dst_tables - {src_table}: if dst_table not in JOIN: break if src_table not in JOIN[dst_table]: break join.add(JOIN[dst_table][src_table]) else: joins[src_table] = join # put in form the queries queries: list[str] = [] for src_table in sorted( joins, key=lambda t: ( len(joins[t]), { "t_vid_video": 0, "t_act_activity": 1, "t_met_metric": 2, "t_enc_encode": 3, "t_dec_decode": 4, "t_env_environment": 5, }[t], ), ): # priority no join select_str = f"SELECT {', '.join(self.select)}" if len(select_str) >= 80: select_str = f"SELECT\n {',\n '.join(self.select)}" table_str = f"FROM {src_table}" if (join_str := "\n".join( re.sub(" ON ", "\n ON ", j) for j in sorted(joins[src_table]) )): sql = f"{select_str}\n{table_str}\n{join_str}" else: sql = f"{select_str}\n{table_str}" queries.append(sql) return queries def __call__(self, func: typing.Callable) -> typing.Callable: """Decorate a function. Returns ------- A decorated function with the select. The docstring of the decorated function is also modified to illustrate the minimal SQL query with an example. """ # set attributes func.select = self.select # set doctrsing doc: list[str] = (func.__doc__ or "").split("\n") example = "\nor, alternativaly\n".join( ( "\n" ".. code:: sql\n" "\n" f" {'\n '.join(sql.split('\n'))}" "\n" ) for sql in self.sql ) doc.insert(1, example) func.__doc__ = "\n".join(doc) return func
[docs] def verif(func: typing.Callable) -> typing.Callable: """Perform few verifications.""" @functools.wraps(func) def decorated(raw: dict[str]) -> float: assert isinstance(raw, dict), raw.__class__.__name__ for select in func.select: name = select.split(" ")[-1].split(".")[-1] assert name in raw, \ f"correct the SQL query because {select} is missing (in {func.__name__})" return func(raw) return decorated
[docs] @verif @SqlLinker("t_act_activity.act_duration") def extract_act_duration(raw: dict[str]) -> float: """Return the video processing activity duration in seconds. Parameters ---------- raw : dict[str] The result line of select request. """ if (act_duration := raw["act_duration"]) is None: return None assert isinstance(act_duration, numbers.Real), act_duration.__class__.__name__ assert act_duration > 0.0, act_duration.__class__.__name__ return float(act_duration)
[docs] @verif @SqlLinker("t_dst_video.vid_codec") def extract_codec(raw: dict[str]) -> str: """Return the codec name. Parameters ---------- raw : dict[str] The result line of select request. """ if (codec := raw["vid_codec"]) is None: return None assert isinstance(codec, str), codec.__class__.__name__ return str(codec)
[docs] @verif @SqlLinker("t_act_activity.act_ps_dt", "t_act_activity.act_ps_core") def extract_cores(raw: dict[str]) -> float: """Return the average cumulative utilisation rate of logical cores. Parameters ---------- raw : dict[str] The result line of select request. """ act_ps_dt, act_ps_core = raw["act_ps_dt"], raw["act_ps_core"] if act_ps_dt is None or act_ps_core is None: return None act_ps_dt = binary_to_list(act_ps_dt) act_ps_core = binary_to_tensor(act_ps_core).sum(axis=1) integral = (act_ps_core * act_ps_dt).sum() # act_ps_core is already the average on each dt average = integral / act_ps_dt.sum() return float(average) / 100.0 # normalisation
[docs] @verif @SqlLinker("t_dec_decode.dec_cmd", "t_vid_video.vid_name") def extract_decode_cmd(raw: dict[str]) -> str: """Return the ffmpeg command used for decoding. Parameters ---------- raw : dict[str] The result line of select request. """ if (cmd := raw["dec_cmd"]) is None: return None vid = raw["vid_name"] or "vid.mp4" return cmd.replace("vid.mp4", vid)
[docs] @verif @SqlLinker("t_dec_decode.dec_decoder") def extract_decoder(raw: dict[str]) -> str: """Return name of the decoder. Parameters ---------- raw : dict[str] The result line of select request. """ if (decoder := raw["dec_decoder"]) is None: return None assert isinstance(decoder, str), decoder.__class__.__name__ return str(decoder)
[docs] @verif @SqlLinker("t_enc_encode.enc_effort") def extract_effort(raw: dict[str]) -> str: """Return the effort provided as a parameter to the encoder. Parameters ---------- raw : dict[str] The result line of select request. """ if (enc_effort := raw["enc_effort"]) is None: return None assert isinstance(enc_effort, str), enc_effort.__class__.__name__ return str(enc_effort)
[docs] @verif @SqlLinker("t_enc_encode.enc_cmd") def extract_encode_cmd(raw: dict[str]) -> str: """Return the ffmpeg command used for encoding. Parameters ---------- raw : dict[str] The result line of select request. """ if (cmd := raw["enc_cmd"]) is None: return None return cmd
[docs] @verif @SqlLinker("t_enc_encode.enc_encoder") def extract_encoder(raw: dict[str]) -> str: """Return the name of the encoder. Parameters ---------- raw : dict[str] The result line of select request. """ if (encoder := raw["enc_encoder"]) is None: return None assert isinstance(encoder, str), encoder.__class__.__name__ return str(encoder)
[docs] @verif @SqlLinker("t_vid_video.vid_frames") def extract_frames(raw: dict[str]) -> list[dict]: """Return the metadata of each frame. Parameters ---------- raw : dict[str] The result line of select request. """ if (frames := raw["vid_frames"]) is None: return None return orjson.loads(frames)
[docs] @verif @SqlLinker("t_dst_video.vid_gamut") def extract_gamut(raw: dict[str]) -> str: """Return the tristimulus primaries colors name. Parameters ---------- raw : dict[str] The result line of select request. """ if (gamut := raw["vid_gamut"]) is None: return None assert isinstance(gamut, str), gamut.__class__.__name__ return gamut
[docs] @verif @SqlLinker("t_dst_video.vid_height") def extract_height(raw: dict[str]) -> int: """Return the height of images in pixels. Parameters ---------- raw : dict[str] The result line of select request. """ if (height := raw["vid_height"]) is None: return None return int(height)
[docs] @verif @SqlLinker("t_env_environment.env_hostname") def extract_hostname(raw: dict[str]) -> str: """Return the machine name. Parameters ---------- raw : dict[str] The result line of select request. """ if (hostname := raw["env_hostname"]) is None: return None return str(hostname)
[docs] @verif @SqlLinker("t_met_metric.met_lpips_alex", "t_met_metric.met_lpips_vgg") def extract_lpips(raw: dict[str]) -> float: """Return the Learned Perceptual Image Patch Similarity (LPIPS) with alex. Parameters ---------- raw : dict[str] The result line of select request. """ if (lpips := raw["met_lpips_vgg"] or raw["met_lpips_alex"]) is None: return None return float(np.nanmean(binary_to_list(lpips)))
[docs] @verif @SqlLinker("t_met_metric.met_lpips_alex") def extract_lpips_alex(raw: dict[str]) -> float: """Return the Learned Perceptual Image Patch Similarity (LPIPS) with alex. Parameters ---------- raw : dict[str] The result line of select request. """ if (lpips := raw["met_lpips_alex"]) is None: return None return float(np.nanmean(binary_to_list(lpips)))
[docs] @verif @SqlLinker("t_met_metric.met_lpips_vgg") def extract_lpips_vgg(raw: dict[str]) -> float: """Return the Learned Perceptual Image Patch Similarity (LPIPS) with vgg. Parameters ---------- raw : dict[str] The result line of select request. """ if (lpips := raw["met_lpips_vgg"]) is None: return None return float(np.nanmean(binary_to_list(lpips)))
[docs] @verif @SqlLinker("t_enc_encode.enc_mode") def extract_mode(raw: dict[str]) -> str: """Return the bitrate mode, constant (cbr) or variable (vbr). Parameters ---------- raw : dict[str] The result line of select request. """ if (mode := raw["enc_mode"]) is None: return None assert isinstance(mode, str), mode.__class__.__name__ assert mode in {"cbr", "vbr"}, mode return mode
[docs] @SqlLinker( "t_act_activity.act_rapl_dt", "t_act_activity.act_rapl_power", "t_act_activity.act_wattmeter_dt", "t_act_activity.act_wattmeter_power", ) def extract_powers(raw: dict[str]) -> tuple: """Return the interval duration and the average power in each intervals. Parameters ---------- raw : dict[str] The result line of select request. """ assert isinstance(raw, dict), raw.__class__.__name__ assert ( ("act_wattmeter_dt" in raw and "act_wattmeter_power" in raw) or ("act_rapl_dt" in raw and "act_rapl_power" in raw) ), "Please correct the SQL query." act_dt, act_power = raw["act_wattmeter_dt"], raw["act_wattmeter_power"] if act_dt is not None and act_power is not None: # if it comes from wattmeter act_dt, act_power = binary_to_list(act_dt), binary_to_list(act_power) act_power = 0.5 * (act_power[:-1] + act_power[1:]) # trapez method else: # if it comes from rapl act_dt, act_power = raw["act_rapl_dt"], raw["act_rapl_power"] if act_dt is None or act_power is None: return None act_dt, act_power = binary_to_list(act_dt), binary_to_list(act_power) return act_dt, act_power
[docs] @verif @SqlLinker("t_met_metric.met_psnr") def extract_psnr(raw: dict[str]) -> float: """Return the Peak Signal to Noise Ratio (PSNR). Parameters ---------- raw : dict[str] The result line of select request. """ if (psnr := raw["met_psnr"]) is None: return None return float(np.nanmean(binary_to_list(psnr)))
[docs] @verif @SqlLinker("t_enc_encode.enc_quality") def extract_quality(raw: dict[str]) -> float: """Return the quality level passed to the encoder. Parameters ---------- raw : dict[str] The result line of select request. """ if (enc_quality := raw["enc_quality"]) is None: return None assert isinstance(enc_quality, numbers.Real), enc_quality.__class__.__name__ assert 0.0 <= enc_quality <= 1.0, enc_quality return float(enc_quality)
[docs] @verif @SqlLinker("t_dst_video.vid_range") def extract_range(raw: dict[str]) -> str: """Return the video encoding color range, 'tv' or 'pc'. Parameters ---------- raw : dict[str] The result line of select request. """ if (range_ := raw["vid_range"]) is None: return None assert isinstance(range_, str), range_.__class__.__name__ assert range_ in {"pc", "tv"}, range_ return range_
[docs] @verif @SqlLinker("t_ref_video.vid_name AS ref_vid_name") def extract_reference_video_stem(raw: dict[str]) -> str: """Return the input video compact stem. Parameters ---------- raw : dict[str] The result line of select request. """ if (vid_name := raw["ref_vid_name"]) is None: return None assert isinstance(vid_name, str), vid_name.__class__.__name__ return re.sub(fr"^reference_([\w.]+?)_(?:{'|'.join(PROFILES)})\.\w+$", r"\1", vid_name)
[docs] @verif @SqlLinker("t_vid_video.vid_rms_sobel") def extract_rms_sobel(raw: dict[str]) -> str: """Return the spatial root mean square sobel gradient complexity. Parameters ---------- raw : dict[str] The result line of select request. """ if (comp := raw["vid_rms_sobel"]) is None: return None return float(np.nanmean(binary_to_list(comp)))
[docs] @verif @SqlLinker("t_vid_video.vid_rms_time_diff") def extract_rms_time_diff(raw: dict[str]) -> str: """Return the temporal root means square time difference complexity. Parameters ---------- raw : dict[str] The result line of select request. """ if (comp := raw["vid_rms_time_diff"]) is None: return None return float(np.nanmean(binary_to_list(comp)))
[docs] @verif @SqlLinker("t_vid_video.vid_spatial_dct") def extract_spatial_dct(raw: dict[str]) -> str: """Return the spatial dct complexity. Parameters ---------- raw : dict[str] The result line of select request. """ if (comp := raw["vid_spatial_dct"]) is None: return None return float(np.nanmean(binary_to_list(comp)))
[docs] @verif @SqlLinker("t_met_metric.met_ssim") def extract_ssim(raw: dict[str]) -> float: """Return the Structural Similarity (SSIM). Parameters ---------- raw : dict[str] The result line of select request. """ if (ssim := raw["met_ssim"]) is None: return None return float(np.nanmean(binary_to_list(ssim)))
[docs] @verif @SqlLinker("t_act_activity.act_ps_dt", "t_act_activity.act_ps_temp") def extract_temp(raw: dict[str]) -> float: """Return the average temperature in C. Parameters ---------- raw : dict[str] The result line of select request. """ act_ps_dt, act_ps_temp = raw["act_ps_dt"], raw["act_ps_temp"] if act_ps_dt is None or act_ps_temp is None: return None act_ps_dt = binary_to_list(act_ps_dt) act_ps_temp = orjson.loads(act_ps_temp) avg_temp = sum(np.asarray(tps, act_ps_dt.dtype) for tps in act_ps_temp.values()) integral = (avg_temp * act_ps_dt).sum() average = integral / (act_ps_dt.sum() * len(act_ps_temp)) return float(average)
[docs] @verif @SqlLinker("t_vid_video.vid_temporal_dct") def extract_temporal_dct(raw: dict[str]) -> str: """Return the temporal dct complexity. Parameters ---------- raw : dict[str] The result line of select request. """ if (comp := raw["vid_temporal_dct"]) is None: return None return float(np.nanmean(binary_to_list(comp)))
[docs] @verif @SqlLinker("t_enc_encode.enc_threads") def extract_threads(raw: dict[str]) -> int: """Return the number of threads provided as a parameter to the encoder. Parameters ---------- raw : dict[str] The result line of select request. """ if (threads := raw["enc_threads"]) is None: return None assert isinstance(threads, numbers.Integral), threads.__class__.__name__ assert threads >= 1, threads.__class__.__name__ return int(threads)
[docs] @verif @SqlLinker("t_dst_video.vid_eotf") def extract_transfer(raw: dict[str]) -> str: """Return the non-linear transfer function name. Parameters ---------- raw : dict[str] The result line of select request. """ if (transfer := raw["vid_eotf"]) is None: return None assert isinstance(transfer, str), transfer.__class__.__name__ return transfer
[docs] @verif @SqlLinker("t_dst_video.vid_duration") def extract_video_duration(raw: dict[str]) -> float: """Return the video duration in seconds. Parameters ---------- raw : dict[str] The result line of select request. """ if (duration := raw["vid_duration"]) is None: return None assert isinstance(duration, numbers.Real), duration.__class__.__name__ return float(duration)
[docs] @verif @SqlLinker("t_vid_video.vid_id") def extract_video_hash(raw: dict[str]) -> str: """Return the hexadecimal md5 video file checksum. Parameters ---------- raw : dict[str] The result line of select request. """ return raw["vid_id"].hex()
[docs] @verif @SqlLinker("t_vid_video.vid_name") def extract_video_name(raw: dict[str]) -> str: """Return the full video basename. Parameters ---------- raw : dict[str] The result line of select request. """ if (vid_name := raw["vid_name"]) is None: return None assert isinstance(vid_name, str), vid_name.__class__.__name__ return pathlib.Path(vid_name).name
[docs] @verif @SqlLinker("t_dst_video.vid_size") def extract_video_size(raw: dict[str]) -> int: """Return the total video file size in bytes. Parameters ---------- raw : dict[str] The result line of select request. """ if (size := raw["vid_size"]) is None: return None assert isinstance(size, numbers.Integral), size.__class__.__name__ return int(size)
[docs] @verif @SqlLinker("t_met_metric.met_vmaf") def extract_vmaf(raw: dict[str]) -> float: """Return the Video Multi-Method Assessment Fusion (VMAF). Parameters ---------- raw : dict[str] The result line of select request. """ if (vmaf := raw["met_vmaf"]) is None: return None return float(np.nanmean(binary_to_list(vmaf)))
[docs] @verif @SqlLinker("t_dst_video.vid_width") def extract_width(raw: dict[str]) -> int: """Return the width of images in pixels. Parameters ---------- raw : dict[str] The result line of select request. """ if (width := raw["vid_width"]) is None: return None return int(width)