Source code for damona.common

#
#  This file is part of Damona software
#
#  Copyright (c) 2020-2021 - Damona Development Team
#
#  File author(s):
#      Thomas Cokelaer <thomas.cokelaer@pasteur.fr>
#
#  Distributed under the terms of the 3-clause BSD license.
#  The full license is in the LICENSE file, distributed with this software.
#
#  website: https://github.com/cokelaer/damona
#  documentation: http://damona.readthedocs.io
#
##############################################################################
"""Image and Binary handlers. Provide also a Damona manager"""
import functools
import os
import pathlib
import re
import sys

import colorlog
from easydev import cmd_exists, md5

logger = colorlog.getLogger(__name__)


__all__ = ["Damona", "ImageReader", "BinaryReader", "DamonaInit"]


def get_damona_path():
    if "DAMONA_PATH" not in os.environ:
        logger.error(
            "DAMONA_PATH not found in your environment. You must define "
            "it. In this shell, type 'export DAMONA_PATH=PATH_WHERE_TO_PLACE_DAMONA'"
        )
        sys.exit(1)
    return pathlib.Path(os.environ["DAMONA_PATH"])


[docs]class DamonaInit: """Class to create images/bin directory for DAMONA This is called each time damona is started to make sure the required config file are present. This class simply create the *~/.config/damona/envs* and images directories. It also checks whether **DAMONA_PATH** and **DAMONA_SINGULARITY_OPTIONS** variables are defined in the environment. """ def __init__(self): if "DAMONA_PATH" not in os.environ: logger.critical( """DAMONA_PATH was not found in your environment. Before using Damona, you have to copy/paste the following code in your ~/.bashrc file once for all (start a new shell afterwards): if [ ! -f "~/.config/damona/damona.sh" ] ; then source ~/.config/damona/damona.sh fi This will create DAMONA_PATH variable that points to your home/.config/damona/ directory. You can redefine the DAMONA_PATH later to point towards another path if needed.""" ) # This is not an error per se but damona cannot work without DAMONA_PATH # Yet, we do not want to raise an error especially for the CI sys.exit(0) if "DAMONA_SINGULARITY_OPTIONS" not in os.environ: logger.warning( """No DAMONA_SINGULARITY_OPTIONS variable found in your environment. To remove this message, set a DAMONA_SINGULARITY_OPTIONS variable in your shell. For explanation about this variable, please see https://damona.readthedocs.io/en/latest/userguide.html#DAMONA_SINGULARITY_OPTIONS """ ) self.damona_path = pathlib.Path(os.environ["DAMONA_PATH"]) os.makedirs(self.damona_path, exist_ok=True) os.makedirs(self.damona_path / "envs" / "base" / "bin", exist_ok=True) os.makedirs(self.damona_path / "images" / "damona_buffer", exist_ok=True)
[docs]class Damona: """Global manager to get information about environments, binaries, images.""" def __init__(self): #: This attribute stored the path where images and environments are stored self.damona_path = get_damona_path() def _get_config_path(self): return self.damona_path / "damona.cfg" config_path = property(_get_config_path, doc="Get the Damona config file location") def _get_image_directory(self): return self.damona_path / "images" images_directory = property(_get_image_directory, doc="Get the Damona images directory location") def _get_environments_path(self): return self.damona_path / "envs" environments_path = property(_get_environments_path, doc="Get the Damona environments directory location")
[docs] def find_orphan_binaries(self): """Find binaries in all environments that are orphans By orphans, we mean that their image is not present anymore for some reasons (e.g., users delete it manually). """ binaries = self.get_all_binaries() orphans = [] for x in binaries: br = BinaryReader(x) if br.is_image_available() is False: # pragma: no cover logger.warning(f"{x} image is not available. This binary is an orphan") orphans.append(x) return orphans
[docs] def get_environments(self): """return the list of environments names""" from damona.environ import Environ env = Environ() return env.environment_names
[docs] def get_all_binaries(self): """Return list of all binaries in all environments""" from damona.environ import Environ env = Environ() binaries = [e.get_installed_binaries() for e in env.environments] binaries = set([x for y in binaries for x in y]) return binaries
[docs] def find_orphan_images(self): """Get images that have no binaries in any environments""" binaries = self.get_all_binaries() images = self.get_all_images() Nb = len(binaries) Ni = len(images) # keep print to make sure it is seen print(f"Found {Ni} images and {Nb} binaries. Checking consistencies") used_images = [] for binary in binaries: br = BinaryReader(binary) used_images.append(pathlib.Path(br.image)) used_images = set(used_images) Nu = len(used_images) # No = Ni - Nu # keep print to make sure it is seen print(f"{Nu} images is/are used. ") orphans = [] for image in sorted(images): if image not in used_images: # pragma: no cover logger.info(f"{image} image not used.") orphans.append(image) return orphans
[docs] def get_all_images(self): """Return list of all images""" from damona.environ import Images images = Images() return list(images.files)
[docs] def is_image_used(self, name): """Return True if this image is used The image name has no ".img" extension and uses _ instead of : character :: # get images used from damona import Damona d = Damona() d.get_all_images() # Note that names are encoded as NAME_X.Y.Z d.is_image_used("fastqc_0.11.9") """ images_used = set([x for x in self.get_all_binaries() if name == BinaryReader(x).get_image().replace(":", "_")]) return bool(images_used)
[docs]class ImageReader: """Manage a single Singularity image""" def __init__(self, name): """.. rubric:: **Constructor** :param name: the input name of the image (fullpath) :: >>> from damona.common import ImageReader >>> ir = ImageReader("~/.config/damona/images/fastqc_0.11.9.img") >>> ir.md5 >>> ir.is_orphan() >>> ir.name >>> print(ir.shortname) 'fastqc_0.11.9.img' >>> print(ir.version) '0.11.9' """ self.filename = pathlib.Path(name) if self.is_valid_name() is False: logger.error("Invalid image name. Your input image must end in .img or .sif ; version must be X.Y.Z") sys.exit(1)
[docs] def delete(self): if self.is_orphan(): logger.warning(f"deleting {self.filename} since it is not used anymore by any environments") self.filename.unlink() else: logger.warning( f"{self.filename} not deleted because it is still used. Removing an image that is used is not yet implemented" )
def _get_short_name(self): return self.filename.name shortname = property(_get_short_name, doc="Get the filename (NAME_X.Y.Z.img)")
[docs] def is_valid_name(self): """Check whether the name is valid. Must be in the form NAME_X.Y.Z.img """ pattern = r".+_(v|)\d+\.\d+\.\d+(.+|)\.(img|sif)" p = re.compile(pattern) if p.match(self.shortname): return True else: return False
def _get_executable_name(self): pattern = r"_(v|)\d+\.\d+\.\d+(.+|)\.(img|sif)" p = re.compile(pattern) ss = p.search(self.shortname) guess = self.shortname[0 : ss.span()[0]] return guess guessed_executable = property(_get_executable_name, doc="Guess the executable from the filename") def _get_version(self): pattern = r"_(v|)\d+\.\d+\.\d+(.+|)\.(img|sif)" p = re.compile(pattern) ss = p.search(self.shortname) version = ss.group().replace(".sif", "").replace(".img", "") if version[0] == "_": version = version[1:] if version[0] == "v": version = version[1:] return version version = property(_get_version, doc="Get the version") def _get_md5sum(self): md5sum = md5(self.filename) return md5sum md5 = property(_get_md5sum, doc="compute and return the md5 of the file")
[docs] def is_orphan(self): binaries = Damona().get_all_binaries() linked_binaries = [] for binary in binaries: if BinaryReader(binary).is_image_available(): linked_binaries.append(binary) if len(linked_binaries) == 0: return True else: return False
[docs] def is_installed(self): """Return True is the file exists in the DAMONA_PATH""" damona_path = pathlib.Path(os.environ["DAMONA_PATH"]) if (damona_path / "images" / self.filename.name).exists(): return True else: return False
def __repr__(self): txt = f"name: {self.filename.absolute()}\n" txt += f"shortname: {self.shortname}\n" txt += f"md5: {self.md5}\n" txt += f"version: {self.version}\n" txt += f"guessed executable: {self.guessed_executable}" return txt
[docs]class BinaryReader: """Manage a single binary :: >>> from damona.common import BinaryReader >>> br = BinaryReader("~/.config/damona/envs/base/bin/fastqc") >>> br.get_image() 'fastqc:0.11.9' >>> br.is_image_available() True """ def __init__(self, filename): """.. rubric:: constructor :param str filename: the input name of the binary file Can be use to check whether the binary is not orphan and its image is still available. """ logger.debug(f"{filename}") if isinstance(filename, str): filename = pathlib.Path(filename) self.filename = filename with self.filename.open("r") as fin: data = [x for x in fin.readlines() if x.strip().startswith("singularity")] data = data[0] data = data.replace("${DAMONA_SINGULARITY_OPTIONS}", "") try: image_path = data.split("exec")[1].split()[0] except: # pragma: no cover image_path = data.split("run")[1].split()[0] logger.warning(f"command line in {self.filename} uses 'run'; should be reinstalled ") if "DAMONA_PATH" in os.environ: DAMONA_PATH = os.environ["DAMONA_PATH"] self.image = image_path.replace("${DAMONA_PATH}", DAMONA_PATH) else: self.image = image_path
[docs] def is_image_available(self): """Return True if the image used by the binary does exist""" if "DAMONA_PATH" not in os.environ: logger.error("You must define DAMONA_PATH") sys.exit(1) damona_path = os.environ["DAMONA_PATH"] if os.path.exists(self.image.replace("${DAMONA_PATH}", damona_path)): return True else: return False
[docs] def get_image(self): """Return the container used by the binary""" # we assume the user did not edit the binary file # so we expect one uncommented line with self.filename.open("r") as fin: command = [line for line in fin.readlines() if line.strip() and line.strip()[0] != "#"] # where /images is to be followed by the container image = [x for x in command[0].split() if "/images/" in x] image = image[0].split() container = image[0].split("/")[-1] container = container.replace(".img", "") container = ":".join(container.rsplit("_", 1)) return container
def requires_singularity(func): """A decorator to check presence of singularity""" @functools.wraps(func) def wrapper(ref, *args, **kwargs): if cmd_exists("singularity"): return func(ref, *args, **kwargs) else: logger.error("singularity command was not found. You must install 'singularity' to use Damona") return wrapper