#!/usr/bin/env python3
"""Perform the metrics and video properties measures."""
import datetime
import json
import logging
import pathlib
import sqlite3
from context_verbose import Printer
from flufl.lock import Lock
import cutcutcodec
import numpy as np
from mendevi.database.serialize import list_to_binary
from mendevi.utils import compute_video_hash
def _fill_vid_table(database: pathlib.Path, vid_id: bytes, video: pathlib.Path, **kwargs):
"""Complete the table t_vid_video."""
# get actual informations
with sqlite3.connect(database) as sql_database:
sql_database.row_factory = sqlite3.Row
cursor = sql_database.cursor()
prop = cursor.execute(
"SELECT * FROM t_vid_video WHERE vid_id=?", (vid_id,)
).fetchone()
prop = {} if prop is None else dict(prop)
# missing fields
fields = {
"vid_codec",
"vid_duration",
"vid_eotf",
"vid_fps",
"vid_frames",
"vid_gamut",
"vid_height",
"vid_name",
"vid_pix_fmt",
"vid_size",
"vid_uvq",
"vid_width",
}
fields -= {k for k, v in prop.items() if v is not None}
if kwargs["uvq"] is False:
fields -= {"vid_uvq"}
if not fields:
return
fields = sorted(fields) # to frozen order in sql request and for printing repetability
# fill missing fields
pad = max(map(len, fields)) - 4
with Printer(f"Get properties of {video.name}...", color="green") as prt:
for field in fields:
match field:
case "vid_duration":
prop["vid_duration"] = float(cutcutcodec.get_duration_video(video))
prt.print(f"{'duration':<{pad}}: ", end="")
prt.print_time(prop["vid_duration"], print_headers=False)
case "vid_fps":
prop["vid_fps"] = float(cutcutcodec.get_rate_video(video))
prt.print(f"{'fps':<{pad}}: {prop['vid_fps']:.2f} Hz")
case "vid_frames":
header, info = cutcutcodec.core.analysis.ffprobe.get_slices_metadata(video)
header, info = header[0], info[0]
frames = [dict(zip(header, l)) for l in info.tolist()]
prop["vid_frames"] = json.dumps(frames)
prt.print(f"{'frames':<{pad}}: {len(frames)} frames")
case "vid_height":
prop["vid_height"] = cutcutcodec.get_resolution(video)[0]
prt.print(f"{'height':<{pad}}: {prop['vid_height']} pixels")
case "vid_width":
prop["vid_width"] = cutcutcodec.get_resolution(video)[1]
prt.print(f"{'width':<{pad}}: {prop['vid_width']} pixels")
case "vid_pix_fmt":
prop["vid_pix_fmt"] = cutcutcodec.get_pix_fmt(video)
prt.print(f"{'pix fmt':<{pad}}: {prop['vid_pix_fmt']}")
case "vid_uvq":
uvq = cutcutcodec.compare(None, video, uvq=True)["uvq"]
prop["vid_uvq"] = list_to_binary(uvq)
prt.print(f"{'uvq':<{pad}}: {np.mean(uvq):.2f} +/- {np.std(uvq):.2f}")
case "vid_size":
prop["vid_size"] = video.stat().st_size
prt.print(f"{'size':<{pad}}: {prop['vid_size']*1e-6:.2f} MB")
case "vid_name":
prop["vid_name"] = video.name
prt.print(f"{'name':<{pad}}: {prop['vid_name']}")
case "vid_eotf":
prop["vid_eotf"] = cutcutcodec.get_colorspace(video).transfer
prt.print(f"{'eotf':<{pad}}: {prop['vid_eotf']}")
case "vid_gamut":
prop["vid_gamut"] = cutcutcodec.get_colorspace(video).primaries
prt.print(f"{'gamut':<{pad}}: {prop['vid_gamut']}")
case "vid_codec":
prop["vid_codec"] = cutcutcodec.get_codec_video(video)
prt.print(f"{'codec':<{pad}}: {prop['vid_codec']}")
# update result
with (
sqlite3.connect(database) as sql_database,
Lock(str(database.with_name(".dblock")), lifetime=datetime.timedelta(seconds=60)),
):
cursor = sql_database.cursor()
cursor.execute(
"INSERT OR IGNORE INTO t_vid_video (vid_id) VALUES (?)", (vid_id,)
)
cursor.execute(
(
f"UPDATE t_vid_video SET {', '.join(f'{k}=?' for k in fields)} "
"WHERE vid_id=?"
),
[prop[k] for k in fields] + [vid_id],
)
def _fill_met_table(
database: pathlib.Path,
ref_id: bytes,
ref_path: pathlib.Path,
dis_id: bytes,
dis_path: pathlib.Path,
**kwargs,
):
"""Complete the table t_met_metric."""
# get actual informations
with sqlite3.connect(database) as sql_database:
sql_database.row_factory = sqlite3.Row
cursor = sql_database.cursor()
metrics = cursor.execute(
"SELECT * FROM t_met_metric WHERE met_ref_vid_id=? AND met_dis_vid_id=?",
(ref_id, dis_id)
).fetchone()
metrics = {} if metrics is None else dict(metrics)
# missing fields
fields = [
*(("met_lpips_alex",) if kwargs["lpips_alex"] else ()),
*(("met_lpips_vgg",) if kwargs["lpips_vgg"] else ()),
*(("met_psnr",) if kwargs["psnr"] else ()),
*(("met_ssim",) if kwargs["ssim"] else ()),
*(("met_vmaf",) if kwargs["vmaf"] else ()),
]
fields = set(fields)
fields -= {k for k, v in metrics.items() if v is not None}
if not fields:
return
fields = sorted(fields) # to frozen order in sql request and for printing repetability
# fill missing fields
pad = max(map(len, fields)) - 4
with Printer(
f"Compute metrics between {ref_path.name} and {dis_path.name}...", color="green"
) as prt:
new_metrics = cutcutcodec.compare(
ref_path, dis_path,
lpips_alex=kwargs["lpips_alex"],
lpips_vgg=kwargs["lpips_vgg"],
psnr=kwargs["psnr"],
ssim=kwargs["ssim"],
vmaf=kwargs["vmaf"],
)
for metric in sorted(new_metrics):
values = new_metrics[metric]
prt.print(f"{metric:<{pad}}: {np.mean(values):.2f} +/- {np.std(values):.2f}")
metrics = {f"met_{m}": list_to_binary(v) for m, v in new_metrics.items()}
# update result
with (
sqlite3.connect(database) as sql_database,
Lock(str(database.with_name(".dblock")), lifetime=datetime.timedelta(seconds=60)),
):
cursor = sql_database.cursor()
cursor.execute(
"INSERT OR IGNORE INTO t_met_metric (met_ref_vid_id, met_dis_vid_id) VALUES (?, ?)",
(ref_id, dis_id)
)
cursor.execute(
(
f"UPDATE t_met_metric SET {', '.join(f'{k}=?' for k in fields)} "
"WHERE met_ref_vid_id=? AND met_dis_vid_id=?"
),
[metrics[k] for k in fields] + [ref_id, dis_id],
)
[docs]
def probe_and_store(database: pathlib.Path, video: pathlib.Path, **kwargs):
"""Measure the properties of the video.
Parameters
----------
database : pathlike
The path of the existing database to be updated.
video : pathlib.Path
The source video file to be annalysed.
**kwargs : dict
All the metrics
"""
assert isinstance(video, pathlib.Path), video.__class__.__name__
assert video.is_file(), video
vid_id: bytes = compute_video_hash(video)
_fill_vid_table(database, vid_id, video, **kwargs)
# get the references videos for comparative comparisons
references: dict[pathlib.Path, bytes] = kwargs["ref"].copy()
with sqlite3.connect(database) as sql_database:
cursor = sql_database.cursor()
for ref_name, ref_id in cursor.execute("""
SELECT vid_name, enc_src_vid_id FROM t_enc_encode
JOIN t_vid_video ON enc_src_vid_id=vid_id
WHERE enc_dst_vid_id=?""",
(vid_id,),
):
# try to find video full path
if (ref := database.with_name(ref_name)).is_file():
references[ref] = ref_id
elif (ref := video.with_name(ref_name)).is_file():
references[ref] = ref_id
else:
logging.info("failed to find the reference video %s", ref)
references = {ref: ref_id for ref, ref_id in references.items() if ref_id != vid_id}
# perform the comparative comparisons
for ref, ref_id in references.items():
_fill_met_table(database, ref_id, ref, vid_id, video, **kwargs)