Source code for mendevi.cli.g5k

"""Help to excecute a script on a node of grid'5000."""

import getpass
import hashlib
import logging
import pathlib
import platform
import re
import subprocess
import time

import click
import orjson
import requests
from context_verbose import Printer

GOOD_REQ_STATUS = 200
SLEEP_FINAL = 600.0  # final sleeping time
SLEEP_INIT = 10.0  # initial sleeping time
SLEEP_TRANSITION = 3600.0  # the transition time between the init and final sleep


def _parse_args(prt: Printer, kwargs: dict) -> None:
    """Verification of the arguments."""
    # env_name
    kwargs["env_name"] = kwargs.get("env_name")
    if kwargs["env_name"] is None:
        match len(env_name := guess_envname()):
            case 0:
                msg = (
                    "No environment .yaml images were found.\n"
                    "You must therefore specify it with the argument --env-name.\n"
                    'See: "https://www.grid5000.fr/w/Getting_Started'
                    '#Deploying_your_nodes_to_get_root_access_and_'
                    'create_your_own_experimental_environment".'
                )
                raise OSError(
                    msg,
                )
            case 1:
                kwargs["env_name"] = env_name[0]
            case _:
                logging.getLogger(__name__).warning(
                    "serveral env name were founded %s, please specify --env-name",
                    env_name,
                )
    else:
        assert isinstance(kwargs["env_name"], str), kwargs["env_name"].__class__.__name__
    prt.print(f"env_name: {kwargs['env_name']}")

    # walltime
    kwargs["walltime"] = kwargs.get("walltime") or "24:00:00"
    assert isinstance(kwargs["walltime"], str), kwargs["walltime"].__class__.__name__
    prt.print(f"walltime: {kwargs['walltime']}")

    # server
    kwargs["server"] = kwargs.get("server", ())
    assert isinstance(kwargs["server"], tuple), kwargs["server"].__class__.__name__
    assert all(isinstance(s, str) for s in kwargs["server"]), kwargs["server"]
    if kwargs["server"]:
        prt.print(f"servers : {', '.join(kwargs['server'])}")

    # cluster
    assert "cluster" in kwargs, kwargs
    assert isinstance(kwargs["cluster"], str), kwargs["cluster"].__class__.__name__
    prt.print(f"cluster : {kwargs['cluster']}")

    # nodes
    kwargs["nodes"] = kwargs.get("nodes", 1)
    assert isinstance(kwargs["nodes"], int), kwargs["nodes"].__class__.__name__
    prt.print(f"nodes   : {kwargs['nodes']}")


[docs] def guess_envname() -> list[str]: """Try to find the envname. Returns ------- envnames : list[str] The candidate envnames, as a list of urls formated like: https://api.grid5000.fr/stable/sites/{{site}}/public/{{username}}/{{file}} """ envnames: list[str] = [] root = pathlib.Path.home() / "public" for path in root.rglob("*.yaml"): with path.open("r", encoding="utf-8") as file: content = file.read() if re.search(r"file:\s*https?://[\w.]+\.grid5000\.fr", content): envnames.append( "https://api.grid5000.fr/stable" f"/sites/{platform.node()[1:]}" # "frennes", "fgrenoble", "ftoulouse" ... f"/public/{getpass.getuser()}" # os.getlogin failed sometimes f"/{path.relative_to(root)}", ) return envnames
[docs] def guess_roles() -> list[str]: """Return the roles list. Returns ------- roles : list[str] The name of the group to which one belongs """ roles = re.findall( r"\S+", subprocess.run(["id", "-Gn"], capture_output=True, check=True).stdout.decode("utf-8"), ) return [r for r in roles if r != "g5k-users"]
[docs] def guess_servers() -> list[str]: """Return the available nodes address with a wattmeter.""" # see oarnodes -l # get full node list try: servers = subprocess.run(["oarnodes", "--json"], capture_output=True, check=True) except FileNotFoundError: return [] servers = orjson.loads(servers.stdout) # select with wattmeter servers = { ref: desc for ref, desc in servers.items() if ( desc["wattmeter"] in {True, "yes", "YES", "ok", "OK"} and desc["deploy"] ) } return list(servers)
[docs] def guess_storage() -> list[str]: """Return the accessible paths under "/srv/storage". If they have not been mounted for a while, NFS disks may be hidden with a simple ls command. This function attempts to predict the volume name in order to increase the chances of success. The rules are defined here: https://www.grid5000.fr/w/Group_Storage """ # guessed with ls if not (root := pathlib.Path("/srv/storage")).exists(): return [] candidates = set(root.iterdir()) # try to get server_name url = "https://www.grid5000.fr/w/Group_Storage" req = requests.get(url, timeout=60) if req.status_code != GOOD_REQ_STATUS: logging.getLogger(__name__).warning("the request %s failed", req) servers = [] elif not (req := req.text): logging.getLogger(__name__).warning("the request %s gives an empty result", req) servers = [] else: servers = re.findall(r"storage\d+\.\w+\.grid5000\.fr", req) # try to get storage_name prefixes = guess_roles() prefixes.extend(n for s in prefixes.copy() for n in s.split("-")) # merge candidates together with a cartesian product candidates |= {root / f"{p}@{s}" for s in set(servers) for p in set(prefixes)} # test if exists and permision (.exists() doesn't work with this nfs) storages: list[str] = [] for path in candidates: try: with (path / "tmp_empty_mendevi").open("wb"): pass except (FileNotFoundError, PermissionError): continue (path / "tmp_empty_mendevi").unlink() storages.append(str(path)) return storages
[docs] def is_g5k_frontend(*, fail: bool = True) -> bool: """Test if we are on the grid'5000 frontend.""" assert isinstance(fail, bool), fail.__class__.__name__ if pathlib.Path("/etc/grid5000").exists(): return True if not fail: return False msg = ( "You are not on the Grid'5000 frontend!\n" "The command 'mendevi g5k ...' must be executed exclusively from the frontend.\n" 'See: "https://www.grid5000.fr/w/Getting_Started#Connect_to_a_Grid\'5000_access_machine"' ) raise OSError( msg, )
[docs] def set_ansible_cfg() -> None: """Create and fill the file ~/.ansible.cfg.""" path = pathlib.Path.home() / ".ansible.cfg" if not path.exists(): with path.open("w", encoding="utf-8") as file: file.write("[defaults]\n") file.write("interpreter_python = /root/.pyenv/shims/python\n")
@click.command() @click.argument("cmd", type=str) @click.option( "-e", "--env-name", type=str, help="The kadeploy3 environment to use.", ) @click.option( "-w", "--walltime", type=str, help="Job duration.", ) @click.option( "-s", "server", type=str, multiple=True, help="The specific node name.", ) @click.option( "-c", "--cluster", type=str, help="The generique cluster name to use.", ) @click.option( "-n", "--nodes", type=int, default=1, help="The number of nodes to be deployed.", ) def main(cmd: str, **kwargs: dict) -> None: """Deploy a grid'5000 node and run a mendevi script inside it. \b Parameters ---------- **kwargs: dict Please refer to the detailed arguments below. cmd : str The mendevi command to be executed on the deployed node. When a path begins with ./, this part is automatically replaced by the shared folder file. env_name : str, optional The kadeploy3 yaml path file environment to use. By default, the first environment found is used. walltime : str, default=24:00:00 The maximum reservation time for the node. If the script finishes before this time, the resources are released before the expiry date. server : tuple[str], optional The specific node names (ex: ("paradoxe-1.rennes.grid5000.fr",)). cluster : str The name of the cluster to used (ex: "paradoxe"). nodes : int, default=1 The number of nodes to deploy (incompatible with ``server``). """ is_g5k_frontend() set_ansible_cfg() with Printer("Parse configuration...") as prt: assert isinstance(cmd, str), cmd.__class__.__name__ if " ./" in cmd and len(data := guess_storage()) == 1: cmd = re.split(r"\s+\./", cmd) cmd = f"{cmd[0]}{''.join(f' {data[0]}/{c}' for c in cmd[1:])}" prt.print(f"cmd : {cmd}") _parse_args(prt, kwargs) # run on the grid import enoslib # noqa: PLC0415 enoslib.init_logging() conf = ( # doc: https://discovery.gitlabpages.inria.fr/enoslib/apidoc/infra.html#g5k-schema enoslib.G5kConf.from_settings( env_name=kwargs["env_name"], job_name=f"mendevi {hashlib.md5(cmd.encode('utf-8')).hexdigest()}", job_type=["deploy", "exotic"], walltime=kwargs["walltime"], monitor="wattmetre_power_watt", # queue="production", # "default", "testing", "besteffort" ) .add_machine( roles=guess_roles(), cluster=kwargs["cluster"], nodes=kwargs["nodes"], **({"servers": list(kwargs["server"])} if kwargs["server"] else {}), ) ) sleep = SLEEP_INIT warnings: set[str] = set() # the displayed warnings with enoslib.G5k(conf) as (roles, _): # get and release all grid5000 resources main_results = enoslib.run_command(cmd, roles=roles, background=True) log_files = {result.results_file for result in main_results} while log_files: # waiting a moment # at the beginning, we wait SLEEP_INIT, and at the end, we wait SLEEP_FINAL # it takes n steps, with n = math.log(SLEEP_FINAL/SLEEP_INIT) / math.log(alpha) # with alpha = 1.0 + (SLEEP_FINAL - SLEEP_INIT) / SLEEP_TRANSITION # the full transition time take SLEEP_TRANSITION time.sleep(sleep) sleep *= 1.0 + (SLEEP_FINAL - SLEEP_INIT) / SLEEP_TRANSITION sleep = min(sleep, SLEEP_FINAL) for log_file, res in [ (log_file_, res_) for log_file_ in log_files.copy() for res_ in enoslib.run_command( ( f'if [ -f "{log_file_}" ]; ' f'then cat "{log_file_}"; ' 'else echo "<no-log-yet>"; fi' ), roles=roles, on_error_continue=True, ) ]: # display warnings for msg in res.payload.get("warnings", []): if msg not in warnings: logging.getLogger(__name__).warning(msg) warnings.add(msg) # skip when file log is not yet created try: if res.payload["stdout"] == "<no-log-yet>": continue except KeyError: logging.getLogger(__name__).exception("stop communication with %s", log_files) log_files.remove(log_file) continue # get job result try: res_ = orjson.loads(res.payload["stdout"]) except orjson.JSONDecodeError: logging.getLogger(__name__).exception(res.payload) raise if res_.get("changed", False) or "rc" in res_: print(res_.get("stdout", "")) logging.getLogger(__name__).error(res_.get("stderr", "")) log_files.remove(log_file)