Source code for bioconvert.io.maf

###########################################################################
# Bioconvert is a project to facilitate the interconversion               #
# of life science data from one format to another.                        #
#                                                                         #
# Authors: see CONTRIBUTORS.rst                                           #
# Copyright © 2018-2019  Institut Pasteur, Paris and CNRS.                #
# See the COPYRIGHT file for details                                      #
#                                                                         #
# bioconvert is free software: you can redistribute it and/or modify      #
# it under the terms of the GNU General Public License as published by    #
# the Free Software Foundation, either version 3 of the License, or       #
# (at your option) any later version.                                     #
#                                                                         #
# bioconvert is distributed in the hope that it will be useful,           #
# but WITHOUT ANY WARRANTY; without even the implied warranty of          #
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the           #
# GNU General Public License for more details.                            #
#                                                                         #
# You should have received a copy of the GNU General Public License       #
# along with this program (COPYING file).                                 #
# If not, see <http://www.gnu.org/licenses/>.                             #
###########################################################################
import math
from itertools import groupby


[docs]class MAFLine(object): """A reader for :term:`MAF` format. mode refname start algsize strand refsize alignment :: a s ref 100 10 + 100000 ---AGC-CAT-CATT s contig 0 10 + 10 ---AGC-CAT-CATT a s ref 100 12 + 100000 ---AGC-CAT-CATTTT s contig 0 12 + 12 ---AGC-CAT-CATTTT The alignments are stored by pair, one item for the reference, one for the query. The query (second line) starts at zero. """ def __init__(self, line): self._items = line.split() assert self._items[0] in "aspq", self._items @property def name(self): return self._items[1] @property def sequence_size(self): return int(self._items[5]) @property def mode(self): return self._items[0] @property def strand(self): return self._items[4] @property def alignment_start(self): # alignment start return int(self._items[2]) @property def alignment_size(self): return int(self._items[3]) @property def alignment(self): return self._items[6] def get_alignment_length(self): return len(self._items[6].replace("-", ""))
[docs]class MAF(object): """A reader for :term:`MAF` format.""" def __init__(self, filename, outfile=None): self.filename = filename self.outfile = outfile
[docs] def count_insertions(self, alnString): """return length without insertion, forward and reverse shift""" gaps = alnString.count("-") forwardFrameshifts = alnString.count("\\") reverseFrameshifts = alnString.count("/") letters = len(alnString) - gaps - forwardFrameshifts - reverseFrameshifts return letters, forwardFrameshifts, reverseFrameshifts
# @SQ SN:NC_002929 LN:4086189 # @PG ID:bioconvert VN:?? CL:bioconvert input.maf output.sam def to_sam(self): # identifier flag ref start qual cigar * 0 0 sequence_ref qual NM: MD: # AS XS RG .... fout = open(self.outfile, "w") fout.write("@HD\tVN:1.3\tSO:unsorted\n") msg = "maf2paf found q starting a new line." with open(self.filename, "r") as fin: # scan once to get sequence name and length. # only reference is of interest names = set() top = True for line in fin: if len(line.strip()) == 0: top = True continue m = MAFLine(line) if m.mode == "s" and top is True: if m.name not in names: names.add(m.name) fout.write("@SQ\tSN:{}\tLN:{}\n".format(m.name, m.sequence_size)) top = False from bioconvert import version fout.write( "@PG\tID:{0}\tPN:{0}\tVN:{1}\tCL:{0} {2} {3}\n".format("bioconvert", version, self.filename, self.outfile) ) with open(self.filename, "r") as fin: for line in fin: # skipping empty lines if line.strip() == "": continue if line[0] == "a": s = [] if "=" in line: tags = dict(i.split("=") for i in line[1].split()) else: tags = {} elif line[0] == "s": s.append(line) elif line[0] == "q": # quality ? raise NotImplementedError(msg) elif line[0] == "p": raise NotImplementedError(msg) elif this[0] == "#": print(this) # now that we have the two lines, save into SAM file if len(s) > 2: raise NotImplementedError("mutliple alignment not implemented yet") if len(s) == 2: ref = MAFLine(s[0]) query = MAFLine(s[1]) if ref.strand != "+": raise Exception("for SAM, the 1st strand in each alignment must be +") flag = self.get_flag(ref.alignment, query.strand) # This is the slowest part of the code cigar = get_cigar(ref, query) qual = "*" if "mismap" in tags: mapq = tags["mismap"] else: mapq = "255" # missing 254 is maximum pos = int(ref.alignment_start) + 1 # convert to 1-based coordinate data = [ query.name, flag, ref.name, pos, mapq, cigar, "*", 0, 0, query.alignment.replace("-", "").upper(), qual, ] # MD # XS # RG:Z:1 if "score" in tags: data.append("AS:i:{}".format(tags["score"])) if "expect" in tags: data.append("EZ:Z:{}".format(tags["expect"])) # try: mapq = mapqFromProb(maf.namesAndValues["mismap"]) # except KeyError: mapq = mapqMissing editDistance = sum(1 for x, y in zip(ref.alignment, query.alignment) if x != y) # no special treatment of ambiguous bases: might be a minor bug editDistance = "NM:i:" + str(editDistance) data.append(editDistance) fout.write("\t".join([str(x) for x in data]) + "\n") s = [] if len(s): raise ValueError("Your MAf file seems truncated. ") fout.close() def get_flag(self, qName, query_strand): if qName.endswith("/1"): qName = qName[:-2] if query_strand == "+": flag = "99" # 1 + 2 + 32 + 64 else: flag = "83" # 1 + 2 + 16 + 64 elif qName.endswith("/2"): qName = qName[:-2] if query_strand == "+": flag = "163" # 1 + 2 + 32 + 128 else: flag = "147" # 1 + 2 + 16 + 128 else: if query_strand == "+": flag = "0" else: flag = "16" return flag
def mapqFromProb(probString): mapqMaximum = 100 try: p = float(probString) except ValueError: raise Exception("bad probability: " + probString) if p < 0 or p > 1: raise Exception("bad probability: " + probString) if p == 0: return mapqMaximum phred = -10 * math.log(p, 10) if phred >= mapqMaximum: return str(mapqMaximum) return str(int(round(phred))) def cigarCategory(alignmentColumn): x, y = alignmentColumn if x == "-": if y == "-": return "P" else: return "I" else: if y == "-": return "D" elif x != y: return "X" else: return "M" def cigarParts(beg, alignmentColumns, end): if beg: yield str(beg) + "H" # (doesn't handle translated alignments) for k, v in groupby(alignmentColumns, cigarCategory): yield str(sum(1 for _ in v)) + k if end: yield str(end) + "H" def get_cigar(m1, m2): qRevStart = m2.sequence_size - m2.alignment_start - m2.alignment_size cigar = "".join(cigarParts(m2.alignment_start, zip(m1.alignment, m2.alignment), qRevStart)) return cigar