Source code for bioconvert.io.scf

import copy
import struct
from collections import defaultdict

import colorlog

_log = colorlog.getLogger(__name__)


def read_scf(infile):
    sequence = ""
    qualities = []

    with open(infile, "rb") as input_file:

        # Header
        """
        char[4] magic_number -> usually .scf
        uint samples -> Number of elements in Samples matrix
        uint samples_offset -> Byte offset from start of file
        uint bases -> Number of bases in Bases matrix
        uint bases_left_clip -> OBSOLETE: No. bases in left clip (vector)
        uint bases_right_clip -> OBSOLETE: No. bases in right clip (qual)
        uint bases_offset -> Byte offset from start of file
        uint comments_size -> Number of bytes in Comment section
        uint comments_offset -> Byte offset from start of file
        char[4] version -> "ver.rev", eg '2' '.' '0' '0' or '3' '.' '0' '0'
        uint sample_size -> Size of samples in bytes 1=8bits, 2=16bits*/
        uint code_set -> code set used (but ignored!)*/
        uint private_size -> No. of bytes of Private data, 0 if none
        uint private_offset -> Byte offset from start of file
        char[18] spare -> Unused
        """
        buff = input_file.read(56)
        (
            magic_number,
            samples,
            samples_offset,
            bases,
            bases_left_clip,
            bases_right_clip,
            bases_offset,
            comments_size,
            comments_offset,
            version,
            sample_size,
            code_set,
            private_size,
            private_offset,
        ) = struct.unpack("!4s 8I 4s 4I", buff)
        # Header is supposed to be 128B
        if samples_offset != 128:
            print("Warning: possible bad file")
        # End of header, unused data
        spare_size = samples_offset - 56  # Should be 72
        buff = input_file.read(spare_size)
        # spare = struct.unpack("!%dI" % (spare_size/4), buff)

        # Trying to replicate the next_seq perl function
        # V2
        if float(version) < 3:

            accuracies = defaultdict(list)
            qualities = []
            length = samples * sample_size * 4
            buff = read_from_buffer(input_file, length, samples_offset)

            # Get the base information
            length = bases * 12
            input_file.seek(bases_offset)
            buff = read_from_buffer(input_file, length, bases_offset)
            # Distill the information into its fractions
            for i in range(0, length, 12):
                # uint peak_index -> Index into Samples matrix for base posn
                # uchar prob_A -> Probability of it being an A
                # uchar prob_C -> Probability of it being an C
                # uchar prob_G -> Probability of it being an G
                # uchar prob_T -> Probability of it being an T
                # char base -> Called base character
                # uchar[3] spare -> Spare
                read = struct.unpack("!I B B B B s 3B", buff[i : i + 12])
                # indices = read[0]
                curbase = read[5].lower().decode("utf-8")
                if curbase == "a":
                    currqual = read[1]
                    accuracies[curbase].append(currqual)
                elif curbase == "c":
                    currqual = read[2]
                    accuracies[curbase].append(currqual)
                elif curbase == "g":
                    currqual = read[3]
                    accuracies[curbase].append(currqual)
                elif curbase == "t":
                    currqual = read[4]
                    accuracies[curbase].append(currqual)
                else:
                    currqual = -1

                sequence += str(curbase).upper()
                qualities.append(currqual)

        # V3
        else:
            length = samples * sample_size
            transformed_read = []
            # Just loop 4 times
            for _ in "acgt":
                buff = read_from_buffer(input_file, length, samples_offset)
                byte = "!%dH" % samples
                # Not tested
                if sample_size == 1:
                    byte = "%db" % samples

                read_tmp = struct.unpack(byte, buff)

                # this little spurt of nonsense is because
                # the trace values are given in the binary
                # file as unsigned shorts but they really
                # are signed deltas. 30000 is an arbitrary number
                # (will there be any traces with a given
                # point greater then 30000? I hope not.
                # once the read is read, it must be changed
                # from relative
                read = []
                for i in read_tmp:
                    if i > 30000:
                        i -= 65536
                    read.append(i)
                transformed_read += delta(read, "backward")

                # For 8-bit data we need to emulate a signed/unsigned
                # cast that is implicit in the C implementations
                # Not tested
                if sample_size == 1:
                    for i, _ in enumerate(transformed_read):
                        if transformed_read[i] < 0:
                            transformed_read[i] += 256

            # Get the peak index information
            offset = bases_offset
            length = bases * 4
            buff = read_from_buffer(input_file, length, offset)
            offset += length
            # Get the accuracy information
            buff = read_from_buffer(input_file, length, offset)
            accuracies = get_v3_base_accuracies(buff)
            # Get the base information
            offset += length
            length = bases
            buff = read_from_buffer(input_file, length, offset)
            seq = struct.unpack("%ds" % length, buff)
            for i in seq:
                sequence += i.decode("utf-8")
            # Extract the calls from the accuracy information.
            qualities = get_v3_quality(sequence, accuracies)
            # Bug in V3, 1 bytes is added for unknown reason
            comments_size = comments_size - 1

        # Comments
        comments = ""
        input_file.seek(comments_offset)
        # 1 bytes is added to comments for unknown reason (2 in V3)
        while input_file.tell() < comments_offset + comments_size - 1:
            tmp = input_file.read(1)
            comments += struct.unpack("c", tmp)[0].decode("utf-8")

    return sequence, qualities, comments


# Return 'length' bits of file 'f_file' starting at offset 'offset'
[docs]def read_from_buffer(f_file, length, offset): """Return 'length' bits of file 'f_file' starting at offset 'offset'""" f_file.seek(offset) buff = f_file.read(length) if len(buff) != length: print("The read was incomplete! Trying harder.") missing_length = length - len(buff) buff2 = f_file.read(missing_length) buff += buff2 if len(buff) != length: print( "Unexpected end of file while reading from SCF file. I \ should have read " + str(length) + " but instead got " + str(len(buff)) + "! Current file position is " + str(f_file.tell()) + "." ) return buff
# Unpack the base accuracies for v3 scf def get_v3_base_accuracies(buff): # 4 keys: a, c, g and t accuracies = defaultdict(list) length = len(buff) qlength = int(length / 4) offset = 0 # For all bases for curbase in "acgt": read = struct.unpack("%dB" % qlength, buff[offset : offset + qlength]) for i in read: accuracies[curbase].append(i) offset += qlength return accuracies # Get the quality for all bases in v3 scf def get_v3_quality(sequence, accuracies): qualities = [] for i, _ in enumerate(sequence): curbase = sequence[i].lower() if curbase == "a" or curbase == "c" or curbase == "g" or curbase == "t": currqual = accuracies[curbase][i] else: # N value most of the time currqual = -1 qualities.append(currqual) return qualities # If job == DELTA_IT: # Change a series of sample points to a series of delta delta values: # ie change them in two steps: # first: delta = current_value - previous_value # then: delta_delta = delta - previous_delta # else # do the reverse def delta(rsamples, direction): samples = copy.deepcopy(rsamples) if direction == "forward": for i in range(len(samples) - 1, 1, -1): samples[i] = samples[i] - 2 * samples[i - 1] + samples[i - 2] samples[1] = samples[1] - 2 * samples[0] elif direction == "backward": p_sample1 = 0 p_sample2 = 0 for i, _ in enumerate(samples): p_sample1 = p_sample1 + samples[i] samples[i] = p_sample1 + p_sample2 p_sample2 = samples[i] else: msg = 'Bad direction in \'delta\'. Use" forward" or" backward".' _log.critical(msg) raise Exception(msg) return samples