"""Attempt to complete the empty fields 'act_wattmeter*'."""
import datetime
import logging
import pathlib
import sqlite3
import time
import click
import tqdm
from context_verbose import Printer
from flufl.lock import Lock
from mendevi.database.serialize import list_to_binary
from mendevi.measures.g5kpower import g5kpower
from .parse import parse_videos_database
INTERVAL = 60 # time in seconds between 2 database flush
[docs]
def get_power(hostname: str, start: float, duration: float, **kwargs: dict) -> tuple[bytes, bytes]:
"""Exctract the wattmeters data for that specific activity."""
wattmeter_data = g5kpower(
hostname, start, duration, login=kwargs.get("login"), password=kwargs.get("password"),
)
wattmeter_dt, wattmeter_power = wattmeter_data["dt"], wattmeter_data["powers"]
return list_to_binary(wattmeter_dt), list_to_binary(wattmeter_power)
@click.command()
@click.argument("database", type=click.Path())
@click.option(
"-l", "--login",
type=str,
help="The grid'5000 login to request the api.",
)
@click.option(
"-p", "--password",
type=str,
help="The grid'5000 password to request the api.",
)
def main(database: str, **kwargs: dict) -> None:
"""Injects or updates power data into the database.
\b
Parameters
----------
database : pathlike
The source SQL database to be filled.
**kwargs: dict
Please refer to the detailed arguments below.
login, password : str, optional
The grid'5000 login and password to acces the api from the outside.
"""
# parse args
with Printer("Parse configuration...") as prt:
_, database = parse_videos_database(prt, (), database)
# get missing fields
with sqlite3.connect(f"file:{database}?mode=ro", uri=True) as conn:
fields = conn.execute(
"""
SELECT act_id, act_start, act_duration, env_hostname
FROM t_env_environment
JOIN t_act_activity ON t_act_activity.act_id = t_env_environment.env_idle_act_id
WHERE act_wattmeter_dt IS NULL or act_wattmeter_power IS NULL
UNION ALL
SELECT act_id, act_start, act_duration, env_hostname
FROM t_enc_encode
JOIN t_act_activity ON t_act_activity.act_id = t_enc_encode.enc_act_id
JOIN t_env_environment ON t_env_environment.env_id = t_enc_encode.enc_env_id
WHERE act_wattmeter_dt IS NULL or act_wattmeter_power IS NULL
UNION ALL
SELECT act_id, act_start, act_duration, env_hostname
FROM t_dec_decode
JOIN t_act_activity ON t_act_activity.act_id = t_dec_decode.dec_act_id
JOIN t_env_environment ON t_env_environment.env_id = t_dec_decode.dec_env_id
WHERE act_wattmeter_dt IS NULL or act_wattmeter_power IS NULL
""",
).fetchall()
def _flush_buff(buff: list, database: pathlib.Path) -> None:
"""Add the fields into the database."""
with (
Lock(str(database.with_name(".dblock")), lifetime=datetime.timedelta(seconds=600)),
sqlite3.connect(database) as conn,
):
cursor = conn.cursor()
for act_id, wattmeter_dt, wattmeter_power in buff:
cursor.execute(
"""
UPDATE t_act_activity SET act_wattmeter_dt=?, act_wattmeter_power=?
WHERE act_id=?
""",
(wattmeter_dt, wattmeter_power, act_id),
)
# fill missing data
timestamp, buff = time.time(), []
for act_id, start, duration, hostname in tqdm.tqdm(
fields,
dynamic_ncols=True,
leave=True,
smoothing=1e-8,
unit="measures",
):
try:
buff.append([act_id, *get_power(hostname, start, duration, **kwargs)])
except ValueError:
logging.getLogger(__name__).exception("skip act_id=%s", act_id)
continue
if time.time() - timestamp >= INTERVAL:
_flush_buff(buff, database)
timestamp, buff = time.time(), []
_flush_buff(buff, database)