Source code for mendevi.probe

"""Perform the metrics and video properties measures."""

import datetime
import logging
import pathlib
import sqlite3

import cutcutcodec
import numpy as np
import orjson
from context_verbose import Printer
from flufl.lock import Lock

from mendevi.database.serialize import list_to_binary
from mendevi.utils import compute_video_hash


[docs] def open_ro(database: pathlib.Path, conn: sqlite3.Connection | None) -> sqlite3.Connection: """Open a new conn if the previous one is not already opened.""" if conn is None: conn = sqlite3.connect(f"file:{database}?mode=ro", uri=True, timeout=30) conn.row_factory = sqlite3.Row return conn
def _fill_vid_table( database: pathlib.Path, vid_id: bytes, video: pathlib.Path, conn: sqlite3.Connection | None = None, **kwargs: dict, ) -> sqlite3.Connection | None: """Complete the table t_vid_video.""" # get actual informations conn = open_ro(database, conn) prop = conn.execute( "SELECT * FROM t_vid_video WHERE vid_id=?", (vid_id,), ).fetchone() prop = {} if prop is None else dict(prop) # missing fields fields = { "vid_codec", "vid_duration", "vid_eotf", "vid_fps", "vid_frames", "vid_gamut", "vid_height", "vid_name", "vid_pix_fmt", "vid_range", "vid_size", "vid_width", *(("vid_rms_sobel",) if kwargs["rms_sobel"] else ()), *(("vid_rms_time_diff",) if kwargs["rms_time_diff"] else ()), *(("vid_spatial_dct",) if kwargs["spatial_dct"] else ()), *(("vid_temporal_dct",) if kwargs["temporal_dct"] else ()), *(("vid_uvq",) if kwargs["uvq"] else ()), } fields -= {k for k, v in prop.items() if v is not None} if not fields: return conn conn.close() # fill missing fields pad = max(map(len, fields)) - 4 with Printer(f"Get properties of {video.name}...", color="green") as prt: # basic fields for field in fields - { "vid_rms_sobel", "vid_rms_time_diff", "vid_spatial_dct", "vid_temporal_dct", "vid_uvq", }: try: prop[field] = _fill_vid_table_prop(field, prt, pad, video) except cutcutcodec.core.exceptions.MissingInformation as err: logging.getLogger(__name__).warning(err) prop[field] = None # metric fields new_metrics = cutcutcodec.video_metrics( video, rms_sobel="vid_rms_sobel" in fields, rms_time_diff="vid_rms_time_diff" in fields, spatial_dct="vid_spatial_dct" in fields, temporal_dct="vid_temporal_dct" in fields, uvq="vid_uvq" in fields, ) for metric in sorted(new_metrics): values = new_metrics[metric] prt.print(f"{metric:<{pad}}: {np.nanmean(values):.2f} +/- {np.nanstd(values):.2f}") prop |= {f"vid_{m}": list_to_binary(v) for m, v in new_metrics.items()} # update result fields = list(fields) # to frozen order for sql request with ( Lock(str(database.with_name(".dblock")), lifetime=datetime.timedelta(seconds=600)), sqlite3.connect(database) as conn, ): cursor = conn.cursor() cursor.execute( "INSERT OR IGNORE INTO t_vid_video (vid_id) VALUES (?)", (vid_id,), ) cursor.execute( ( f"UPDATE t_vid_video SET {', '.join(f'{k}=?' for k in fields)} " "WHERE vid_id=?" ), [prop[k] for k in fields] + [vid_id], ) return None def _fill_vid_table_prop(field: str, prt: Printer, pad: int, video: pathlib.Path) -> object: """Help _fill_vid_table.""" prop = None match field: case "vid_duration": prop = float(cutcutcodec.get_duration_video(video)) prt.print(f"{'duration':<{pad}}: ", end="") prt.print_time(prop, print_headers=False) case "vid_fps": prop = float(cutcutcodec.get_rate_video(video)) prt.print(f"{'fps':<{pad}}: {prop:.2f} Hz") case "vid_frames": header, info = cutcutcodec.core.analysis.ffprobe.get_slices_metadata(video) header, info = header[0], info[0] frames = [dict(zip(header, line, strict=False)) for line in info.tolist()] prop = orjson.dumps( frames, option=orjson.OPT_INDENT_2|orjson.OPT_SORT_KEYS, ).decode() prt.print(f"{'frames':<{pad}}: {len(frames)} frames") case "vid_height": prop = cutcutcodec.get_resolution(video)[0] prt.print(f"{'height':<{pad}}: {prop} pixels") case "vid_width": prop = cutcutcodec.get_resolution(video)[1] prt.print(f"{'width':<{pad}}: {prop} pixels") case "vid_size": prop = video.stat().st_size prt.print(f"{'size':<{pad}}: {prop*1e-6:.2f} MB") case "vid_pix_fmt" | "vid_range" | "vid_name" | "vid_eotf" | "vid_gamut" | "vid_codec": prop = { "vid_pix_fmt": cutcutcodec.get_pix_fmt, "vid_range": cutcutcodec.get_range, "vid_name": lambda p: p.name, "vid_eotf": lambda p: cutcutcodec.get_colorspace(p).transfer, "vid_gamut": lambda p: cutcutcodec.get_colorspace(p).primaries, "vid_codec": cutcutcodec.get_codec_video, }[field](video) prt.print(f"{' '.join(field.split('_')[1:]):<{pad}}: {prop}") case _: logging.getLogger(__name__).warning("field %s not recognised, skipped", field) return prop def _fill_met_table( database: pathlib.Path, ref_id: bytes, ref_path: pathlib.Path, dis_id: bytes, dis_path: pathlib.Path, **kwargs: dict, ) -> sqlite3.Connection | None: """Complete the table t_met_metric.""" # get actual informations conn = open_ro(database, kwargs.pop("conn")) metrics = conn.execute( "SELECT * FROM t_met_metric WHERE met_ref_vid_id=? AND met_dis_vid_id=?", (ref_id, dis_id), ).fetchone() metrics = {} if metrics is None else dict(metrics) # missing fields fields = { *(("met_lpips_alex",) if kwargs["lpips_alex"] else ()), *(("met_lpips_vgg",) if kwargs["lpips_vgg"] else ()), *(("met_psnr",) if kwargs["psnr"] else ()), *(("met_ssim",) if kwargs["ssim"] else ()), *(("met_vif",) if kwargs["vif"] else ()), *(("met_vmaf",) if kwargs["vmaf"] else ()), } fields -= {k for k, v in metrics.items() if v is not None} if not fields: return conn conn.close() fields = sorted(fields) # to frozen order in sql request and for printing repetability # fill missing fields pad = max(map(len, fields)) - 4 with Printer( f"Compute metrics between {ref_path.name} and {dis_path.name}...", color="green", ) as prt: new_metrics = cutcutcodec.video_metrics( dis_path, ref_path, lpips_alex=kwargs["lpips_alex"], lpips_vgg=kwargs["lpips_vgg"], psnr=kwargs["psnr"], ssim=kwargs["ssim"], vif=kwargs["vif"], vmaf=kwargs["vmaf"], ) for metric in sorted(new_metrics): values = new_metrics[metric] prt.print(f"{metric:<{pad}}: {np.nanmean(values):.2f} +/- {np.nanstd(values):.2f}") metrics = {f"met_{m}": list_to_binary(v) for m, v in new_metrics.items()} # update result with ( Lock(str(database.with_name(".dblock")), lifetime=datetime.timedelta(seconds=600)), sqlite3.connect(database) as conn, ): cursor = conn.cursor() cursor.execute( "INSERT OR IGNORE INTO t_met_metric (met_ref_vid_id, met_dis_vid_id) VALUES (?, ?)", (ref_id, dis_id), ) cursor.execute( ( f"UPDATE t_met_metric SET {', '.join(f'{k}=?' for k in fields)} " "WHERE met_ref_vid_id=? AND met_dis_vid_id=?" ), [metrics[k] for k in fields] + [ref_id, dis_id], ) return None
[docs] def probe_and_store( database: pathlib.Path, video: pathlib.Path, conn: sqlite3.Connection | None = None, **kwargs: dict, ) -> sqlite3.Connection | None: """Measure the properties of the video. Parameters ---------- database : pathlike The path of the existing database to be updated. video : pathlib.Path The source video file to be annalysed. conn: sqlite3.Connection, optional A read only database connection. **kwargs : dict All the metrics """ assert isinstance(video, pathlib.Path), video.__class__.__name__ assert video.is_file(), video vid_id: bytes = compute_video_hash(video) conn = _fill_vid_table(database, vid_id, video, conn=conn, **kwargs) conn = open_ro(database, conn) # get the references videos for comparative comparisons references: dict[pathlib.Path, bytes] = kwargs["ref"].copy() for res in conn.execute( """ SELECT vid_name, enc_src_vid_id FROM t_enc_encode JOIN t_vid_video ON enc_src_vid_id=vid_id WHERE enc_dst_vid_id=? """, (vid_id,), ): ref_stem, ref_id = res["vid_name"], res["enc_src_vid_id"] # try to find video full path if ( ref := database.with_name(ref_stem)).is_file() or (ref := video.with_name(ref_stem) ).is_file(): references[ref] = ref_id else: logging.getLogger(__name__).info("failed to find the reference video %s", ref) references = {ref: ref_id for ref, ref_id in references.items() if ref_id != vid_id} # perform the comparative metrics for ref, ref_id in references.items(): conn = _fill_met_table(database, ref_id, ref, vid_id, video, conn=conn, **kwargs) return conn