Source code for damona.common

#
#  This file is part of Damona software
#
#  Copyright (c) 2020-2021 - Damona Development Team
#
#  File author(s):
#      Thomas Cokelaer <thomas.cokelaer@pasteur.fr>
#
#  Distributed under the terms of the 3-clause BSD license.
#  The full license is in the LICENSE file, distributed with this software.
#
#  website: https://github.com/cokelaer/damona
#  documentation: http://damona.readthedocs.io
#
##############################################################################
"""Image and Binary handlers. Provide also a Damona manager"""
import functools
import os
import pathlib
import re
import sys

import colorlog
from easydev import cmd_exists, md5

logger = colorlog.getLogger(__name__)


__all__ = ["Damona", "ImageReader", "BinaryReader", "DamonaInit", "get_container_cmd"]


def get_damona_path():
    """Return the :class:`pathlib.Path` pointed to by the ``DAMONA_PATH`` environment variable.

    Exits the process with an error message when the variable is not set.

    :returns: The Damona root path.
    :rtype: pathlib.Path
    :raises SystemExit: When ``DAMONA_PATH`` is not defined.
    """
    if "DAMONA_PATH" not in os.environ:
        logger.error(
            "DAMONA_PATH not found in your environment. You must define "
            "it. In this shell, type 'export DAMONA_PATH=PATH_WHERE_TO_PLACE_DAMONA'"
        )
        sys.exit(1)
    return pathlib.Path(os.environ["DAMONA_PATH"])


[docs] class DamonaInit: """Class to create images/bin directory for DAMONA This is called each time damona is started to make sure the required config file are present. This class simply create the *~/.config/damona/envs* and images directories. It also checks whether **DAMONA_PATH** and **DAMONA_SINGULARITY_OPTIONS** variables are defined in the environment. """ # Shell configuration details: RC file, source line, and manual-add command SHELL_CONFIGS = { "bash": { "rc_file": "~/.bashrc", "source_line": "source ~/.config/damona/damona.sh", "manual_cmd": 'echo "source ~/.config/damona/damona.sh" >> ~/.bashrc', }, "zsh": { "rc_file": "~/.zshrc", "source_line": "source ~/.config/damona/damona.zsh", "manual_cmd": 'echo "source ~/.config/damona/damona.zsh" >> ~/.zshrc', }, "fish": { "rc_file": "~/.config/fish/config.fish", "source_line": "source ~/.config/damona/damona.fish", "manual_cmd": 'echo "source ~/.config/damona/damona.fish" >> ~/.config/fish/config.fish', }, } def __init__(self): if "DAMONA_PATH" not in os.environ: self._report_missing_config() # This is not an error per se but damona cannot work without DAMONA_PATH # Yet, we do not want to raise an error especially for the CI sys.exit(0) if "DAMONA_SINGULARITY_OPTIONS" not in os.environ: logger.warning( """No DAMONA_SINGULARITY_OPTIONS variable found in your environment. To remove this message, set a DAMONA_SINGULARITY_OPTIONS variable in your shell. For explanation about this variable, please see https://damona.readthedocs.io/en/latest/userguide.html#DAMONA_SINGULARITY_OPTIONS """ ) self.damona_path = pathlib.Path(os.environ["DAMONA_PATH"]) os.makedirs(self.damona_path, exist_ok=True) os.makedirs(self.damona_path / "envs" / "base" / "bin", exist_ok=True) os.makedirs(self.damona_path / "images" / "damona_buffer", exist_ok=True) def _get_shell_config_status(self): """Check which shell RC files contain the damona source line. Returns a dict mapping shell name to a status dict with keys: - ``rc_file``: expanded path to the RC file - ``source_line``: the expected source line - ``manual_cmd``: command to add the source line manually - ``configured``: True if the source line is present in the RC file """ status = {} for shell, info in self.SHELL_CONFIGS.items(): rc_path = pathlib.Path(info["rc_file"]).expanduser() configured = rc_path.exists() and info["source_line"] in rc_path.read_text() status[shell] = { "rc_file": rc_path, "source_line": info["source_line"], "manual_cmd": info["manual_cmd"], "configured": configured, } return status def _report_missing_config(self): """Log a targeted message when DAMONA_PATH is not set. Inspects each supported shell's RC file to determine whether the damona source line is already present and tailors the guidance accordingly. """ status = self._get_shell_config_status() configured_shells = [s for s, v in status.items() if v["configured"]] unconfigured_shells = [s for s, v in status.items() if not v["configured"]] msg = "DAMONA_PATH was not found in your environment.\n\n" if configured_shells: # Source lines are present; user just needs to reload the shell shell_list = ", ".join(configured_shells) msg += ( f"Damona configuration was found in your {shell_list} shell " "configuration file(s).\n" "Please open a new terminal or source the appropriate file:\n\n" ) for shell in configured_shells: msg += f" source {status[shell]['rc_file']}\n" else: # No RC file has been configured yet msg += ( "Damona could not find its initialization line in any of your " "shell configuration files.\n" "Please add the relevant line for your shell:\n\n" ) if unconfigured_shells: if configured_shells: msg += "\nTo configure damona for additional shells, run:\n\n" else: msg += "To configure damona, run:\n\n" for shell in unconfigured_shells: msg += f" # {shell}\n {status[shell]['manual_cmd']}\n\n" msg += ( "After adding the line, open a new terminal (or source the file)\n" "for the changes to take effect.\n\n" ) msg += ( "Once configured, DAMONA_PATH will point to ~/.config/damona/.\n" "You can redefine DAMONA_PATH later to use a different location." ) logger.critical(msg)
[docs] class Damona: """Global manager to get information about environments, binaries, images.""" def __init__(self): #: This attribute stored the path where images and environments are stored self.damona_path = get_damona_path() def _get_config_path(self): return self.damona_path / "damona.cfg" config_path = property(_get_config_path, doc="Get the Damona config file location") def _get_image_directory(self): return self.damona_path / "images" images_directory = property(_get_image_directory, doc="Get the Damona images directory location") def _get_environments_path(self): return self.damona_path / "envs" environments_path = property(_get_environments_path, doc="Get the Damona environments directory location")
[docs] def find_orphan_binaries(self): """Find binaries in all environments that are orphans By orphans, we mean that their image is not present anymore for some reasons (e.g., users delete it manually). """ binaries = self.get_all_binaries() orphans = [] for x in binaries: br = BinaryReader(x) if br.is_image_available() is False: # pragma: no cover logger.warning(f"{x} image is not available. This binary is an orphan") orphans.append(x) return orphans
[docs] def get_environments(self): """return the list of environments names""" from damona.environ import Environ env = Environ() return env.environment_names
[docs] def get_all_binaries(self): """Return list of all binaries in all environments""" from damona.environ import Environ env = Environ() binaries = [e.get_installed_binaries() for e in env.environments] binaries = set([x for y in binaries for x in y]) return binaries
[docs] def find_orphan_images(self): """Get images that have no binaries in any environments""" binaries = self.get_all_binaries() images = self.get_all_images() Nb = len(binaries) Ni = len(images) # keep print to make sure it is seen print(f"Found {Ni} images and {Nb} binaries. Checking consistencies") used_images = [] for binary in binaries: br = BinaryReader(binary) used_images.append(pathlib.Path(br.image)) used_images = set(used_images) Nu = len(used_images) # No = Ni - Nu # keep print to make sure it is seen print(f"{Nu} images is/are used. ") orphans = [] for image in sorted(images): if image not in used_images: # pragma: no cover logger.info(f"{image} image not used.") orphans.append(image) return orphans
[docs] def get_all_images(self): """Return list of all images""" from damona.environ import Images images = Images() return list(images.files)
[docs] def is_image_used(self, name): """Return True if this image is used The image name has no ".img" extension and uses _ instead of : character :: # get images used from damona import Damona d = Damona() d.get_all_images() # Note that names are encoded as NAME_X.Y.Z d.is_image_used("fastqc_0.11.9") """ images_used = set([x for x in self.get_all_binaries() if name == BinaryReader(x).get_image().replace(":", "_")]) return bool(images_used)
[docs] class ImageReader: """Manage a single Singularity image""" def __init__(self, name): """.. rubric:: **Constructor** :param name: the input name of the image (fullpath) :: >>> from damona.common import ImageReader >>> ir = ImageReader("~/.config/damona/images/fastqc_0.11.9.img") >>> ir.md5 >>> ir.is_orphan() >>> ir.name >>> print(ir.shortname) 'fastqc_0.11.9.img' >>> print(ir.version) '0.11.9' """ self.filename = pathlib.Path(name) if self.is_valid_name() is False: logger.error( f"Invalid image name ({self.shortname}). Your input image must end in .img or .sif ; version must be X.Y.Z" ) sys.exit(1)
[docs] def delete(self): """Delete the image file from disk if it is no longer used by any environment. If the image is still referenced by at least one binary alias the deletion is skipped and a warning is logged instead. """ if self.is_orphan(): logger.warning(f"deleting {self.filename} since it is not used anymore by any environments") self.filename.unlink() else: logger.warning( f"{self.filename} not deleted because it is still used. Removing an image that is used is not yet implemented" )
def _get_short_name(self): return self.filename.name shortname = property(_get_short_name, doc="Get the filename (NAME_X.Y.Z.img)")
[docs] def is_valid_name(self): """Check whether the name is valid. Must be in the form NAME_X.Y.Z.img """ pattern = r".+_(v|)\d+\.\d+\.\d+(.+|)\.(img|sif)" p = re.compile(pattern) if p.match(self.shortname): return True else: return False
def _get_executable_name(self): pattern = r"_(v|)\d+\.\d+\.\d+(.+|)\.(img|sif)" p = re.compile(pattern) ss = p.search(self.shortname) guess = self.shortname[0 : ss.span()[0]] return guess guessed_executable = property(_get_executable_name, doc="Guess the executable from the filename") def _get_version(self): pattern = r"_(v|)\d+\.\d+\.\d+(.+|)\.(img|sif)" p = re.compile(pattern) ss = p.search(self.shortname) version = ss.group().replace(".sif", "").replace(".img", "") if version[0] == "_": version = version[1:] if version[0] == "v": version = version[1:] return version version = property(_get_version, doc="Get the version") def _get_md5sum(self): md5sum = md5(self.filename) return md5sum md5 = property(_get_md5sum, doc="compute and return the md5 of the file")
[docs] def is_orphan(self): """Return ``True`` if no environment binary currently points to this image. An image is considered an *orphan* when it exists in the images directory but no binary alias in any environment references it. :returns: ``True`` when no binary uses this image, ``False`` otherwise. :rtype: bool """ binaries = Damona().get_all_binaries() linked_binaries = [] for binary in binaries: if BinaryReader(binary).is_image_available(): linked_binaries.append(binary) if len(linked_binaries) == 0: return True else: return False
[docs] def is_installed(self): """Return True is the file exists in the DAMONA_PATH""" damona_path = pathlib.Path(os.environ["DAMONA_PATH"]) if (damona_path / "images" / self.filename.name).exists(): return True else: return False
def __repr__(self): txt = f"name: {self.filename.absolute()}\n" txt += f"shortname: {self.shortname}\n" txt += f"md5: {self.md5}\n" txt += f"version: {self.version}\n" txt += f"guessed executable: {self.guessed_executable}" return txt
[docs] class BinaryReader: """Manage a single binary :: >>> from damona.common import BinaryReader >>> br = BinaryReader("~/.config/damona/envs/base/bin/fastqc") >>> br.get_image() 'fastqc:0.11.9' >>> br.is_image_available() True """ def __init__(self, filename): """.. rubric:: constructor :param str filename: the input name of the binary file Can be use to check whether the binary is not orphan and its image is still available. """ logger.debug(f"{filename}") if isinstance(filename, str): filename = pathlib.Path(filename) self.filename = filename with self.filename.open("r") as fin: data = [ x for x in fin.readlines() if x.strip().startswith("singularity") or x.strip().startswith("apptainer") ] data = data[0] data = data.replace("${DAMONA_SINGULARITY_OPTIONS}", "") try: image_path = data.split("exec")[1].split()[0] except: # pragma: no cover image_path = data.split("run")[1].split()[0] logger.warning(f"command line in {self.filename} uses 'run'; should be reinstalled ") if "DAMONA_PATH" in os.environ: DAMONA_PATH = os.environ["DAMONA_PATH"] self.image = image_path.replace("${DAMONA_PATH}", DAMONA_PATH) else: self.image = image_path
[docs] def is_image_available(self): """Return True if the image used by the binary does exist""" if "DAMONA_PATH" not in os.environ: logger.error("You must define DAMONA_PATH") sys.exit(1) damona_path = os.environ["DAMONA_PATH"] if os.path.exists(self.image.replace("${DAMONA_PATH}", damona_path)): return True else: return False
[docs] def get_image(self): """Return the container used by the binary""" # we assume the user did not edit the binary file # so we expect one uncommented line with self.filename.open("r") as fin: command = [line for line in fin.readlines() if line.strip() and line.strip()[0] != "#"] # where /images is to be followed by the container image = [x for x in command[0].split() if "/images/" in x] image = image[0].split() container = image[0].split("/")[-1] container = container.replace(".img", "") container = ":".join(container.rsplit("_", 1)) return container
[docs] def get_container_cmd(): """Return the available container command (``singularity`` or ``apptainer``). ``singularity`` is preferred for backward compatibility. When only ``apptainer`` is present on the system that command is returned instead. :returns: ``"singularity"`` or ``"apptainer"``, or ``None`` when neither is found. :rtype: str or None """ if cmd_exists("singularity"): return "singularity" elif cmd_exists("apptainer"): return "apptainer" return None
def requires_singularity(func): """A decorator to check presence of singularity or apptainer""" @functools.wraps(func) def wrapper(ref, *args, **kwargs): if get_container_cmd() is not None: return func(ref, *args, **kwargs) else: logger.error( "Neither 'singularity' nor 'apptainer' command was found. " "You must install one of them to use Damona" ) return wrapper