Source code for bioconvert.core.registry

# Bioconvert is a project to facilitate the interconversion               #
# of life science data from one format to another.                        #
#                                                                         #
# Copyright © 2018  Institut Pasteur, Paris and CNRS.                     #
#                                                                         #
# bioconvert is free software: you can redistribute it and/or modify      #
# it under the terms of the GNU General Public License as published by    #
# the Free Software Foundation, either version 3 of the License, or       #
# (at your option) any later version.                                     #
#                                                                         #
# bioconvert is distributed in the hope that it will be useful,           #
# but WITHOUT ANY WARRANTY; without even the implied warranty of          #
# GNU General Public License for more details.                            #
#                                                                         #
# You should have received a copy of the GNU General Public License       #
# along with this program (COPYING file).                                 #
# If not, see <>.                             #
"""Main bioconvert registry that fetches automatically the relevant converter"""
import importlib
import inspect
import itertools
import pkgutil

import bioconvert
import colorlog

_log = colorlog.getLogger(__name__)

__all__ = ["Registry"]

[docs]class Registry(object): """class to centralise information about available conversions :: from bioconvert.core.registry import Registry r = Registry() r.conversion_exists("BAM", "BED") # returns number of available methods for each converter conv_class = r[(".bam", ".bed")] converter = conv_class(input_file, output_file) converter.convert() """ def __init__(self): self._ext_registry = {} self._fmt_registry = {} self._fill_registry(bioconvert.__path__) self._build_path_dict() def _fill_registry(self, path, target=None, including_not_available_converter=False): """ Explore the directory converters to discover all converter classes (a concrete class which inherits from :class:`ConvBase`) and fill the register with the input format and output format associated to this converter. This is called in the constructor once with including_not_available_converter set to False and called at any time to :meth:`get_all_conversions` with including_not_available_converter set to True. :param str path: the path of a directory to explore (not recursive) :param str target: :param bool including_not_available_converter: """ target = self if target is None else target def is_converter(item): """Check if a module is a converter""" obj_name, obj = item if not inspect.isclass(obj): return False # Note that on some Python version, the isabstract is buggy. # Therefore, the isabstract does not return False for ConvBase # hence the additional check (obj_name in ["ConvBase"]) return issubclass(obj, bioconvert.ConvBase) and obj_name not in ["ConvBase"] modules = pkgutil.iter_modules(path=path) for _, module_name, *_ in modules: if module_name != "__init__": try: module = importlib.import_module("bioconvert." + module_name) except (ImportError, TypeError) as err: _log.warning("skip module '{}': {}".format(module_name, err)) continue converters = inspect.getmembers(module) converters = [c for c in converters if is_converter(c)] for converter_name, converter in converters: if converter is not None: format_pair = (converter.input_fmt, converter.output_fmt) target[(format_pair)] = converter # have all the combinaisons between the extensions of # output formats of the convertes combo_input_ext = tuple(itertools.product(*converter.input_ext)) # have all the combinaisons between the extensions of output # formats of the convertes combo_output_ext = tuple(itertools.product(*converter.output_ext)) all_ext_pair = tuple(itertools.product(combo_input_ext, (combo_output_ext))) for ext_pair in all_ext_pair: if len(converter.available_methods) == 0 and not including_not_available_converter: _log.debug( "converter '{}' for {} -> {} was not added as no method is available".format( converter_name, *ext_pair ) ) else: self.set_ext(ext_pair, converter) def _build_path_dict(self): """ Construct dictionaries of dictionaries containing shortest paths from one format to another. """ from networkx import DiGraph, all_pairs_shortest_path # all_pairs_shortest_path yields pairs (n, d) where # * n is a node # * d is a dict where # * keys are other nodes # * values are shortest paths (i.e. lists of nodes) # from n to these nodes # Converting this to dict results in a dict of dicts where # * the first key is the source node # * the second key is the destination node # * the value is a shortest path between these nodes. # self._path_dict[in_fmt][out_fmt] = [in_fmt, ..., out_fmt] self._path_dict = dict( all_pairs_shortest_path( # Directed graph of available in_fmt -> out_fmt conversions DiGraph(self.get_conversions()) ) ) # self._path_dict_ext[in_ext][out_ext] = [in_ext, ..., out_ext] self._path_dict_ext = dict( all_pairs_shortest_path( # Directed graph of available in_ext -> out_ext conversions DiGraph(self.get_conversions_from_ext()) ) )
[docs] def conversion_path(self, input_fmt, output_fmt): """ Return a list of conversion steps to get from input and output formats :param tuple input_fmt: :param tuple output_fmt: Each step in the list is a pair of formats. """ try: fmt_steps = self._path_dict[input_fmt][output_fmt] except KeyError: fmt_steps = [] return list(zip(fmt_steps, fmt_steps[1:]))
def __setitem__(self, format_pair, convertor): """ Register new convertor from input format to output format. :param format_pair: the input format, the output format :type format_pair: tuple of 2 strings :param convertor: the convertor which handle the conversion from input_fmt -> output_fmt :type convertor: :class:`ConvBase` object """ if format_pair in self._fmt_registry: raise KeyError( "an other converter already exists for {} -> {}".format( "_".join(format_pair[0]), "_".join(format_pair[1]) ) ) self._fmt_registry[format_pair] = convertor def _check_input_ext(self, ext_pair): assert len(ext_pair) == 2, "parameter must be a tuple with 2 items" assert isinstance(ext_pair[0], tuple), "first item must be a tuple" assert isinstance(ext_pair[1], tuple), "second item must be a tuple"
[docs] def set_ext(self, ext_pair, convertor): """ Register new convertor from input extension and output extension in a list. We can have a list of multiple convertors for one ext_pair. :param tuple ext_pair: tuple containing the input extensions and the output extensions e.g. ( ("fastq",) , ("fasta") ) :param convertor: the convertor which handle the conversion from input_ext -> output_ext :type convertor: list of :class:`ConvBase` object """ self._check_input_ext(ext_pair) if ext_pair in self._ext_registry: self._ext_registry[ext_pair].append(convertor) else: self._ext_registry[ext_pair] = [convertor]
def __getitem__(self, format_pair): """ :param format_pair: the input format, the output format :type format_pair: tuple of 2 strings :return: an object of subclass o :class:`ConvBase` """ format_pair = (format_pair[0], format_pair[1]) return self._fmt_registry[format_pair]
[docs] def get_ext(self, ext_pair): """ Copy the registry into a dict that behaves like a list to be able to have multiple values for a single key and from a key have all converter able to do the conversion from the input extension to the output extension. :param ext_pair: the input extension, the output extension :type ext_pair: tuple of 2 strings :return: list of objects of subclass o :class:`ConvBase` """ self._check_input_ext(ext_pair) return self._ext_registry[ext_pair]
def __contains__(self, format_pair): """ Can use membership operation on registry to test if a converter to go form input format to output format exists. :param format_pair: the input format, the output format :type format_pair: tuple (or list) of 2 items. The items must be a string or a tuple/list of strings. :return: True if format_pair is in registry otherwise False. :: r = Registry() ('VCF', 'BCF') in r """ # make sure input is tuple of 2 items if isinstance(format_pair, (tuple, list)) is False: raise ValueError("input argument must be a tuple or list of 2 items") if isinstance(format_pair, list): format_pair = tuple(format_pair) # make sure we have a pair if len(format_pair) != 2: raise ValueError("input must have 2 items") # make sure each item is a string or tuple/list and convert into tuples # first item if isinstance(format_pair[0], str): format_pair = ((format_pair[0],), format_pair[1]) elif isinstance(format_pair[0], list): format_pair = (tuple(format_pair[0]), format_pair[1]) # second item if isinstance(format_pair[1], str): format_pair = (format_pair[0], (format_pair[1],)) elif isinstance(format_pair[1], list): format_pair = (format_pair[0], tuple(format_pair[1])) # make sure it is upper case for item in format_pair[1]: item = item.upper() for item in format_pair[0]: item = item.upper() return format_pair in self._fmt_registry def __iter__(self): """ Make registry iterable through format_pair (str input format, str output format) """ for format_pair in self._fmt_registry: yield format_pair
[docs] def get_conversions(self): """ :return: a generator which allow to iterate on all available conversions a conversion is encoded by a tuple of 2 strings (input format, output format) :retype: generator """ for conv in self._fmt_registry: yield conv
[docs] def get_converters_names(self): """ :return: a generator that allows to get the name of the converter from the subclass (ConvBase object) :rtype: generator """ for converter in self._fmt_registry.values(): yield converter.__name__.lower()
[docs] def get_conversions_from_ext(self): """ :return: a generator which allow to iterate on all available conversions a conversion is encoded by a tuple of 2 strings (input extension, output extension) :rtype: generator """ for conv in self._ext_registry: yield conv
[docs] def get_all_conversions(self): """ :return: a generator which allow to iterate on all available conversions and their availability; a conversion is encoded by a tuple of 2 strings (input format, output format) :retype: generator (input format, output format, status) """ all_converter = {} self._fill_registry( bioconvert.__path__, target=all_converter, including_not_available_converter=True, ) for i, o in all_converter: yield i, o, (i, o) in self._fmt_registry and len(self._fmt_registry[(i, o)].available_methods) > 0
[docs] def conversion_exists(self, input_fmt, output_fmt, allow_indirect=False): """ :param str input_fmt: the input format :param str output_fmt: the output format :param boolean allow_indirect: whether to count indirect conversions :return: True if a converter which transform input_fmt into output_fmt exists :rtype: boolean """ input_fmt = tuple([x.upper() for x in input_fmt]) output_fmt = tuple([x.upper() for x in output_fmt]) return (input_fmt, output_fmt) in self._fmt_registry or ( allow_indirect and len(self.conversion_path(input_fmt, output_fmt)) )
def get_info(self): converters = set([self[this] for this in self._fmt_registry]) data = {} for converter in converters: data[converter] = len(converter.available_methods) return data
[docs] def iter_converters(self, allow_indirect: bool = False): """ :param bool allow_indirect: also return indirect conversion :return: a generator to iterate over (in_fmt, out_fmt, converter class when direct, path when indirect) :rtype: a generator """ # if allow_indirect: for start, stops in self._path_dict.items(): for stop, path in stops.items(): if len(path) == 1: pass elif len(path) == 2: yield start, stop, self._fmt_registry[(start, stop)], None elif allow_indirect: yield start, stop, None, path
# return # for conv, converter in self._fmt_registry.items(): # in_fmt, out_fmt = conv # yield in_fmt, out_fmt, converter, None def __str__(self): data = C = data["converters"] F = data["formats"] M = data["methods"] txt = "Number of formats: {}".format(F) txt += "\n" + "Number of converters: {}".format(C) txt += "\n" + "Number of methods : {}".format(M) return txt def info(self): info = self.get_info() _converters = [x for x in info.items()] _data = [info[k] for k, v in info.items()] C = len(_converters) M = sum(_data) F = len(set([x for items in self.get_all_conversions() for x in items])) return { "formats": F, "converters": C, "methods": M, "methods_per_converter": round(float(M) / C, 2), }