Source code for bioconvert.core.benchmark

###########################################################################
# Bioconvert is a project to facilitate the interconversion               #
# of life science data from one format to another.                        #
#                                                                         #
# Copyright © 2018-2022 Institut Pasteur, Paris and CNRS.                 #
#                                                                         #
# bioconvert is free software: you can redistribute it and/or modify      #
# it under the terms of the GNU General Public License as published by    #
# the Free Software Foundation, either version 3 of the License, or       #
# (at your option) any later version.                                     #
#                                                                         #
# bioconvert is distributed in the hope that it will be useful,           #
# but WITHOUT ANY WARRANTY; without even the implied warranty of          #
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the           #
# GNU General Public License for more details.                            #
#                                                                         #
# You should have received a copy of the GNU General Public License       #
# along with this program (COPYING file).                                 #
# If not, see <http://www.gnu.org/licenses/>.                             #
#                                                                         #
# Repository: https://github.com/bioconvert/bioconvert                    #
# Documentation: http://bioconvert.readthedocs.io                         #
###########################################################################
"""Tools for benchmarking"""

import statistics
import threading
import time

import colorlog
import matplotlib.pyplot as plt
import pandas as pd
import pylab
import statsmodels.api
import statsmodels.formula.api
import statsmodels.stats.multitest
from bioconvert.core.utils import Timer
from tqdm import tqdm
import psutil

_log = colorlog.getLogger(__name__)


__all__ = ["Benchmark", "plot_multi_benchmark_max"]


[docs]class Benchmark: """Convenient class to benchmark several methods for a given converter :: c = BAM2COV(infile, outfile) b = Benchmark(c, N=5) b.run_methods() b.plot() """ def __init__(self, obj, N=5, to_exclude=None, to_include=None): """.. rubric:: Constructor :param obj: can be an instance of a converter class or a class name :param int N: number of replicates :param list to_exclude: methods to exclude from the benchmark :param list to_include: methods to include ONLY Use one of *to_exclude* or *to_include*. If both are provided, only the *to_include* one is used. """ if isinstance(obj, str): raise NotImplementedError self.converter = obj self.N = N self.results = None if to_exclude is None: self.to_exclude = [] else: self.to_exclude = to_exclude if to_include is None: self.to_include = [] else: self.to_include = to_include # CPU and memory usage self.cpu_percent = [] self.memory_usage = []
[docs] def run_methods(self): """Runs the benchmarks, and stores the timings in *self.results*.""" results = {"time": {}, "CPU": {}, "memory": {}} methods = self.converter.available_methods[:] # a copy ! if self.to_include: methods = [x for x in methods if x in self.to_include] elif self.to_exclude: methods = [x for x in methods if x not in self.to_exclude] for method in methods: times = [] CPUs = [] MEMs = [] for i in tqdm(range(self.N), desc="Evaluating method {}".format(method)): with Timer(times): # Need to get all extra arguments for specify method e.g Bam2BIGWIG.uscs method kwargs = {"method": method} for k, v in self.converter.others.items(): kwargs[k] = v # start monitoring CPU and Memory usage self.stop_monitoring = False # reset CPU and memory usage before monitoring self.cpu_percent = [] self.memory_usage = [] # start monitoring monitor_thread = threading.Thread(target=self.monitor_usage) monitor_thread.start() # the actual computation self.converter(**kwargs) # stop monitoring self.stop_monitoring = True monitor_thread.join() # gather information L = len(self.cpu_percent) CPUs.append(sum(self.cpu_percent) / L) MEMs.append(sum(self.memory_usage) / L) results["CPU"][method] = CPUs results["memory"][method] = MEMs results["time"][method] = times self.results = results
def monitor_usage(self): while not self.stop_monitoring: # Retrieve CPU and memory usage cpu_percent = psutil.cpu_percent() memory_usage = psutil.virtual_memory().percent # Adjust the sleep time as per your requirement time.sleep(0.05) self.cpu_percent.append(cpu_percent) self.memory_usage.append(memory_usage)
[docs] def plot(self, rerun=False, ylabel=None, rot_xticks=0, boxplot_args={}, mode="time"): """Plots the benchmark results, running the benchmarks if needed or if *rerun* is True. :param rot_xlabel: rotation of the xticks function :param boxplot_args: dictionary with any of the pylab.boxplot arguments :param mode: either time, CPU or memory :return: dataframe with all results """ if self.results is None or rerun is True: self.run_methods() assert mode in ["time", "CPU", "memory"], f"mode must be time, CPU or memory; {mode} provided" data = self.results[mode].copy() if mode == "time" and ylabel is None: ylabel = "Time (seconds)" elif mode == "CPU" and ylabel is None: ylabel = "CPU usage (%)" elif mode == "memory" and ylabel is None: ylabel = "Memory usage (%)" methods = sorted(data, key=lambda x: pylab.mean(data[x])) pylab.boxplot([data[x] for x in methods], **boxplot_args) # pylab.xticks([1+this for this in range(len(methods))], methods) if "vert" in boxplot_args and boxplot_args["vert"] is False: pylab.yticks(*zip(*enumerate(methods, start=1)), rotation=rot_xticks) pylab.xlabel(ylabel) else: pylab.xticks(*zip(*enumerate(methods, start=1)), rotation=rot_xticks) pylab.ylabel(ylabel) pylab.xlim([0, len(methods) + 1]) pylab.grid(True) pylab.tight_layout() # make sure it does not change with a copy return self.results.copy()
[docs]def plot_multi_benchmark_max(path_json, output_filename="multi_benchmark.png", min_ylim=0, mode=None): """Plotting function for the Snakefile_benchmark to be found in the doc The json file looks like:: { "awk":{ "0":0.777020216, "1":0.9638044834, "2":1.7623617649, "3":0.8348755836 }, "seqtk":{ "0":1.0024843216, "1":0.6313509941, "2":1.4048073292, "3":1.0554351807 }, "Benchmark":{ "0":1, "1":1, "2":2, "3":2 } } Number of benchmark is infered from field 'Benchmark'. """ # for back compatibility with 1.0.0 if mode is None: # open and read JSON file df = pd.read_json(path_json) else: with open(path_json, "r") as fin: dd = json.loads(fin.read()) df = pd.DataFrame(dd[mode]) # how many runs per method ? N = len(df["Benchmark"]) / len(df["Benchmark"].unique()) N = len(df["Benchmark"]) / N # Retrieving method names methods = list(df) # Removed the entry from the list that matches benchmark methods.remove("Benchmark") # We rotate the JSON object in relation to the benchmark number to be able to group them by methods df2 = df.pivot(columns="Benchmark") # Display of the boxplots with font at 7 and the grid is removed to make it easier to read df2.boxplot(fontsize=7, grid=False) # Initializing the x-axis title placement list l = [] # Initialization of the variable which will be used to separate the different methods sep = 0 # Initialization of the variable that will be used to save the lowest median median_best = 0 # Initialization of the variable that will be used to save the name of the method with the lowest median best_method = None for i in methods: if sep != 0: # Creation of a separation between methods plt.axvline(sep + 0.5, ls="--", color="red") # Calculation of the median for a method median = statistics.median(df[i]) if median_best == 0 or median_best >= median: # If the calculated median is lower than the recorded one, the old median and the old method are placed by the new one median_best = median best_method = i # We plot the median of each method plt.hlines(y=median, xmin=0.5 + sep, xmax=N + 0.5 + sep, color="orange") l.append(sep + (N / 2 + 0.5)) sep += N # The name of each method is displayed on the x-axis plt.xticks(l, methods) # Creation of the path variable which will give the title of the output PNG image path = path_json.split("/")[1] _, max_ylim = plt.ylim() plt.ylim([min_ylim, max_ylim]) # Backup of the benchmark of the different conversion in the form of a PNG image plt.savefig(output_filename, dpi=200) ############################## T-TEST ############################## # We recover the different times of the best method value_best_method = df[best_method] # Initialization of the dictionary which will save the results of the t-test of each method t_test = dict() for i in methods: if i != best_method: # We recover the different times of the method value_method = df[i] # Application of the t-test between the best method and all the other methods and saving these results in the dictionnary t_test comp = statsmodels.stats.weightstats.CompareMeans.from_data(value_best_method, value_method) (T_stats, P_value, degrees_f) = comp.ttest_ind() T_dict = {"t-stats": T_stats} P_dict = {"p-value": P_value} D_dict = {"Degree of freedom": degrees_f} t_test[i] = (T_dict, P_dict, D_dict) ############################## MULTITEST ############################## # Initialization of the list which will store all the p-values of the previous t-tests list_p_value = [] # Initialization of the list which will store all the methods other than the best method list_method = [] for i in t_test: # Retrieval of the p-values and the name of the different methods list_p_value.append(t_test[i][1]["p-value"]) list_method.append(i) # Application of the multitest on the different conversion methods using the bonferroni method ( areSignificant, correctedPvalues, _, _, ) = statsmodels.stats.multitest.multipletests(list_p_value, alpha=0.05, method="bonferroni") for i in range(len(list_method)): print( f"- By comparing the {list_method[i]} method with the best one ({best_method}), we check H0: {areSignificant[i]} with corrected P-value: {correctedPvalues[i]}" )