Source code for bioconvert.core.benchmark

###########################################################################
# Bioconvert is a project to facilitate the interconversion               #
# of life science data from one format to another.                        #
#                                                                         #
# Copyright © 2018-2022 Institut Pasteur, Paris and CNRS.                 #
#                                                                         #
# bioconvert is free software: you can redistribute it and/or modify      #
# it under the terms of the GNU General Public License as published by    #
# the Free Software Foundation, either version 3 of the License, or       #
# (at your option) any later version.                                     #
#                                                                         #
# bioconvert is distributed in the hope that it will be useful,           #
# but WITHOUT ANY WARRANTY; without even the implied warranty of          #
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the           #
# GNU General Public License for more details.                            #
#                                                                         #
# You should have received a copy of the GNU General Public License       #
# along with this program (COPYING file).                                 #
# If not, see <http://www.gnu.org/licenses/>.                             #
#                                                                         #
# Repository: https://github.com/bioconvert/bioconvert                    #
# Documentation: http://bioconvert.readthedocs.io                         #
###########################################################################
"""Tools for benchmarking"""

import json
import statistics
import threading
import time

import colorlog
from bioconvert.core.utils import Timer
from tqdm import tqdm

_log = colorlog.getLogger(__name__)


__all__ = ["Benchmark", "plot_multi_benchmark_max"]


[docs] class Benchmark: """Convenient class to benchmark several methods for a given converter :: c = BAM2COV(infile, outfile) b = Benchmark(c, N=5) b.run_methods() b.plot() """ def __init__(self, obj, N=5, to_exclude=None, to_include=None): """.. rubric:: Constructor :param obj: can be an instance of a converter class or a class name :param int N: number of replicates :param list to_exclude: methods to exclude from the benchmark :param list to_include: methods to include ONLY Use one of *to_exclude* or *to_include*. If both are provided, only the *to_include* one is used. """ if isinstance(obj, str): raise NotImplementedError self.converter = obj self.N = N self.results = None if to_exclude is None: self.to_exclude = [] else: self.to_exclude = to_exclude if to_include is None: self.to_include = [] else: self.to_include = to_include # CPU and memory usage self.cpu_percent = [] self.memory_usage = []
[docs] def run_methods(self): """Runs the benchmarks, and stores the timings in *self.results*.""" results = {"time": {}, "CPU": {}, "memory": {}} methods = self.converter.available_methods[:] # a copy ! if self.to_include: methods = [x for x in methods if x in self.to_include] elif self.to_exclude: methods = [x for x in methods if x not in self.to_exclude] for method in methods: times = [] CPUs = [] MEMs = [] for i in tqdm(range(self.N), desc="Evaluating method {}".format(method)): with Timer(times): # Need to get all extra arguments for specify method e.g Bam2BIGWIG.uscs method kwargs = {"method": method} for k, v in self.converter.others.items(): kwargs[k] = v # start monitoring CPU and Memory usage self.stop_monitoring = False # reset CPU and memory usage before monitoring self.cpu_percent = [] self.memory_usage = [] # start monitoring monitor_thread = threading.Thread(target=self.monitor_usage) monitor_thread.start() # the actual computation self.converter(**kwargs) # stop monitoring self.stop_monitoring = True monitor_thread.join() # gather information L = len(self.cpu_percent) CPUs.append(sum(self.cpu_percent) / L) MEMs.append(sum(self.memory_usage) / L) results["CPU"][method] = CPUs results["memory"][method] = MEMs results["time"][method] = times self.results = results
def monitor_usage(self): import psutil while not self.stop_monitoring: # Retrieve CPU and memory usage cpu_percent = psutil.cpu_percent() memory_usage = psutil.virtual_memory().percent # Adjust the sleep time as per your requirement time.sleep(0.05) self.cpu_percent.append(cpu_percent) self.memory_usage.append(memory_usage)
[docs] def plot(self, rerun=False, ylabel=None, rot_xticks=0, boxplot_args={}, mode="time"): """Plots the benchmark results, running the benchmarks if needed or if *rerun* is True. :param rot_xlabel: rotation of the xticks function :param boxplot_args: dictionary with any of the pylab.boxplot arguments :param mode: either time, CPU or memory :return: dataframe with all results """ import pylab if self.results is None or rerun is True: self.run_methods() assert mode in ["time", "CPU", "memory"], f"mode must be time, CPU or memory; {mode} provided" data = self.results[mode].copy() if mode == "time" and ylabel is None: ylabel = "Time (seconds)" elif mode == "CPU" and ylabel is None: ylabel = "CPU usage (%)" elif mode == "memory" and ylabel is None: ylabel = "Memory usage (%)" methods = sorted(data, key=lambda x: pylab.mean(data[x])) pylab.boxplot([data[x] for x in methods], **boxplot_args) # pylab.xticks([1+this for this in range(len(methods))], methods) if "vert" in boxplot_args and boxplot_args["vert"] is False: pylab.yticks(*zip(*enumerate(methods, start=1)), rotation=rot_xticks) pylab.xlabel(ylabel) else: pylab.xticks(*zip(*enumerate(methods, start=1)), rotation=rot_xticks) pylab.ylabel(ylabel) pylab.xlim([0, len(methods) + 1]) pylab.grid(True) pylab.tight_layout() # make sure it does not change with a copy return self.results.copy()
[docs] def plot_multi_benchmark_max(path_json, output_filename="multi_benchmark.png", min_ylim=0, mode=None): """Plotting function for the Snakefile_benchmark to be found in the doc The json file looks like:: { "awk":{ "0":0.777020216, "1":0.9638044834, "2":1.7623617649, "3":0.8348755836 }, "seqtk":{ "0":1.0024843216, "1":0.6313509941, "2":1.4048073292, "3":1.0554351807 }, "Benchmark":{ "0":1, "1":1, "2":2, "3":2 } } Number of benchmark is infered from field 'Benchmark'. """ import matplotlib.pyplot as plt import pandas as pd import statsmodels.stats.weightstats import statsmodels.stats.multitest # for back compatibility with 1.0.0 if mode is None: # open and read JSON file df = pd.read_json(path_json) else: with open(path_json, "r") as fin: dd = json.loads(fin.read()) df = pd.DataFrame(dd[mode]) # how many runs per method ? N = len(df["Benchmark"]) / len(df["Benchmark"].unique()) N = len(df["Benchmark"]) / N # Retrieving method names methods = list(df) # Removed the entry from the list that matches benchmark methods.remove("Benchmark") # We rotate the JSON object in relation to the benchmark number to be able to group them by methods df2 = df.pivot(columns="Benchmark") # Display of the boxplots with font at 7 and the grid is removed to make it easier to read df2.boxplot(fontsize=7, grid=False) # Initializing the x-axis title placement list l = [] # Initialization of the variable which will be used to separate the different methods sep = 0 # Initialization of the variable that will be used to save the lowest median median_best = 0 # Initialization of the variable that will be used to save the name of the method with the lowest median best_method = None for i in methods: if sep != 0: # Creation of a separation between methods plt.axvline(sep + 0.5, ls="--", color="red") # Calculation of the median for a method median = statistics.median(df[i]) if median_best == 0 or median_best >= median: # If the calculated median is lower than the recorded one, the old median and the old method are placed by the new one median_best = median best_method = i # We plot the median of each method plt.hlines(y=median, xmin=0.5 + sep, xmax=N + 0.5 + sep, color="orange") l.append(sep + (N / 2 + 0.5)) sep += N # The name of each method is displayed on the x-axis plt.xticks(l, methods) # Creation of the path variable which will give the title of the output PNG image path = path_json.split("/")[1] _, max_ylim = plt.ylim() plt.ylim([min_ylim, max_ylim]) # Backup of the benchmark of the different conversion in the form of a PNG image plt.savefig(output_filename, dpi=200) ############################## T-TEST ############################## # We recover the different times of the best method value_best_method = df[best_method] # Initialization of the dictionary which will save the results of the t-test of each method t_test = dict() for i in methods: if i != best_method: # We recover the different times of the method value_method = df[i] # Application of the t-test between the best method and all the other methods and saving these results in the dictionnary t_test comp = statsmodels.stats.weightstats.CompareMeans.from_data(value_best_method, value_method) (T_stats, P_value, degrees_f) = comp.ttest_ind() T_dict = {"t-stats": T_stats} P_dict = {"p-value": P_value} D_dict = {"Degree of freedom": degrees_f} t_test[i] = (T_dict, P_dict, D_dict) ############################## MULTITEST ############################## # Initialization of the list which will store all the p-values of the previous t-tests list_p_value = [] # Initialization of the list which will store all the methods other than the best method list_method = [] for i in t_test: # Retrieval of the p-values and the name of the different methods list_p_value.append(t_test[i][1]["p-value"]) list_method.append(i) # Application of the multitest on the different conversion methods using the bonferroni method ( areSignificant, correctedPvalues, _, _, ) = statsmodels.stats.multitest.multipletests(list_p_value, alpha=0.05, method="bonferroni") for i in range(len(list_method)): print( f"- By comparing the {list_method[i]} method with the best one ({best_method}), we check H0: {areSignificant[i]} with corrected P-value: {correctedPvalues[i]}" )