Source code for bioconvert.core.decorators

###########################################################################
# Bioconvert is a project to facilitate the interconversion               #
# of life science data from one format to another.                        #
#                                                                         #
# Copyright © 2018-2022  Institut Pasteur, Paris and CNRS.                #
#                                                                         #
# bioconvert is free software: you can redistribute it and/or modify      #
# it under the terms of the GNU General Public License as published by    #
# the Free Software Foundation, either version 3 of the License, or       #
# (at your option) any later version.                                     #
#                                                                         #
# bioconvert is distributed in the hope that it will be useful,           #
# but WITHOUT ANY WARRANTY; without even the implied warranty of          #
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the           #
# GNU General Public License for more details.                            #
#                                                                         #
# You should have received a copy of the GNU General Public License       #
# along with this program (COPYING file).                                 #
# If not, see <http://www.gnu.org/licenses/>.                             #
#                                                                         #
# Repository: https://github.com/bioconvert/bioconvert                    #
# Documentation: http://bioconvert.readthedocs.io                         #
###########################################################################
"""Provides a general tool to perform pre/post compression"""
from distutils.spawn import find_executable
from functools import wraps
from os.path import splitext

import colorlog
import pkg_resources
from tempfile import NamedTemporaryFile

_log = colorlog.getLogger(__name__)


[docs]def in_gz(func): """Marks a function as accepting gzipped input.""" func.in_gz = True return func
[docs]def make_in_gz_tester(converter): """Generates a function testing whether a conversion method of *converter* has the *in_gz* tag.""" def is_in_gz(method): """Accesses the function corresponding to *method* and tells whether it has the *in_gz* tag.""" return hasattr(getattr(converter, "_method_{}".format(method)), "in_gz") return is_in_gz
[docs]def compressor(func): """Decompress/compress input file without pipes Does not use pipe: we decompress and compress back the input file. The advantage is that it should work for any files (even very large). This decorator should be used by method that uses pure python code """ # https://stackoverflow.com/a/309000/1878788 @wraps(func) def wrapped(inst, *args, **kwargs): infile_name = inst.infile if type(inst.outfile) is not list: output_compressed = None if inst.outfile.endswith(".gz"): (inst.outfile, output_compressed) = splitext(inst.outfile) elif inst.outfile.endswith(".bz2"): (inst.outfile, output_compressed) = splitext(inst.outfile) elif inst.outfile.endswith(".dsrc"): # !!! only for fastq files (inst.outfile, output_compressed) = splitext(inst.outfile) # Now inst has the uncompressed output file name if infile_name.endswith(".gz"): # decompress input # TODO: https://stackoverflow.com/a/29371584/1878788 _log.info("Generating uncompressed version of {} ".format(infile_name)) (ungz_name, _) = splitext(infile_name) (_, base_suffix) = splitext(ungz_name) with NamedTemporaryFile(suffix=base_suffix) as ungz_infile: inst.infile = ungz_infile.name inst.shell("unpigz -c -p {} {} > {}".format(inst.threads, infile_name, inst.infile)) # computation results = func(inst, *args, **kwargs) inst.infile = infile_name else: results = func(inst, *args, **kwargs) # Compress output and restore inst output file name if output_compressed == ".gz": # TODO: this uses -f ; should be a _log.info("Compressing output into .gz") inst.shell("pigz -f -p {} {}".format(inst.threads, inst.outfile)) inst.outfile = inst.outfile + ".gz" elif output_compressed == ".bz2": _log.info("Compressing output into .bz2") inst.shell("pbzip2 -f -p{} {}".format(inst.threads, inst.outfile)) inst.outfile = inst.outfile + ".bz2" elif output_compressed == ".dsrc": # !!! only for FastQ files _log.info("Compressing output into .dsrc") inst.shell("dsrc c -t{} {} {}.dsrc".format(inst.threads, inst.outfile, inst.outfile)) inst.outfile = inst.outfile + ".dsrc" return results return in_gz(wrapped)
[docs]def out_compressor(func): """Compress output file without pipes This decorator should be used by method that uses pure python code """ # https://stackoverflow.com/a/309000/1878788 @wraps(func) def wrapped(inst, *args, **kwargs): output_compressed = None if inst.outfile.endswith(".gz"): (inst.outfile, output_compressed) = splitext(inst.outfile) elif inst.outfile.endswith(".bz2"): (inst.outfile, output_compressed) = splitext(inst.outfile) elif inst.outfile.endswith(".dsrc"): # !!! only for fastq files (inst.outfile, output_compressed) = splitext(inst.outfile) # Now inst has the uncompressed output file name # computation results = func(inst, *args, **kwargs) # Compress output and restore inst output file name if output_compressed == ".gz": # TODO: this uses -f ; should be a _log.info("Compressing output into .gz") inst.shell("pigz -f -p {} {}".format(inst.threads, inst.outfile)) inst.outfile = inst.outfile + ".gz" elif output_compressed == ".bz2": _log.info("Compressing output into .bz2") inst.shell("pbzip2 -f -p{} {}".format(inst.threads, inst.outfile)) inst.outfile = inst.outfile + ".bz2" elif output_compressed == ".dsrc": # !!! only for FastQ files _log.info("Compressing output into .dsrc") inst.shell("dsrc c -t{} {} {}.dsrc".format(inst.threads, inst.outfile, inst.outfile)) inst.outfile = inst.outfile + ".dsrc" return results return wrapped
[docs]def requires_nothing(func): """Marks a function as not needing dependencies.""" func.is_disabled = False return func
[docs]def requires( external_binary=None, python_library=None, external_binaries=None, python_libraries=None, ): """ :param external_binary: a system binary required for the method :param python_library: a python library required for the method :param external_binaries: an array of system binaries required for the method :param python_libraries: an array of python libraries required for the method :return: """ external_binaries = external_binaries or [] python_libraries = python_libraries or [] if external_binary: external_binaries.append(external_binary) if python_library: python_libraries.append(python_library) __missing_binaries = getattr(requires, "__missing_binaries", {}) requires.__missing_binaries = __missing_binaries __missing_libraries = getattr(requires, "__missing_libraries", {}) requires.__missing_libraries = __missing_libraries __pip_libraries = getattr(requires, "__pip_libraries", None) if __pip_libraries is None: __pip_libraries = [p.project_name for p in pkg_resources.working_set] requires.__pip_libraries = __pip_libraries def real_decorator(function): @wraps(function) def wrapped(inst, *args, **kwargs): return function(inst, *args, **kwargs) try: for bin in external_binaries: try: if __missing_binaries[bin]: raise Exception("{} has already be seen as missing".format(bin)) except KeyError: __missing_binaries[bin] = True # shell("which %s" % bin) if find_executable(bin) is None: raise Exception("{} was not found in path".format(bin)) __missing_binaries[bin] = False for lib in python_libraries: try: if __missing_libraries[lib]: raise Exception("{} has already be seen as missing".format(lib)) except KeyError: missing = lib not in __pip_libraries __missing_libraries[lib] = missing if missing: raise Exception("{} was not found by pip".format(lib)) wrapped.is_disabled = False except Exception as e: _log.debug(e) wrapped.is_disabled = True wrapped._required_binaries = external_binaries return wrapped return real_decorator
def get_known_dependencies_with_availability(as_dict=False): if as_dict: external_binaries = {} python_libraries = {} for binary, missing in getattr(requires, "__missing_binaries", {}).items(): external_binaries[binary] = dict( available=not missing, ) for library, missing in getattr(requires, "__missing_libraries", {}).items(): python_libraries[library] = dict( available=not missing, ) return dict( external_binaries=external_binaries, python_libraries=python_libraries, ) ret = [] for binary, status in sorted(getattr(requires, "__missing_binaries", {}).items()): ret.append( ( binary, not status, "binary", ) ) for library, status in sorted(getattr(requires, "__missing_libraries", {}).items()): ret.append( ( library, not status, "library", ) ) return ret