Source code for dsw.spiderweb

from collections import Counter
from itertools import product
from networkx import DiGraph, find_cycle
from numpy import zeros, ones, array, random, log, sum, max, argmax, argsort, unique, intersect1d, where

from dsw.operation import Monitor, calculus_addition, calculus_multiplication, calculus_division
from dsw.operation import bit_to_number, number_to_bit, number_to_dna, dna_to_number
from dsw.graphized import obtain_vertices, obtain_formers, obtain_latters, path_matching, calculate_intersection_score


[docs]def encode(binary_message, accessor, start_index, is_faster=False, vt_length=0, shuffles=None, need_path=False, verbose=False): """ Encode a bit array by the specific accessor. :param binary_message: binary message. :type binary_message: numpy.ndarray :param accessor: accessor of the coding algorithm. :type accessor: numpy.ndarray :param start_index: virtual vertex to start encoding. :type start_index: int :param is_faster: encode in a faster way. :type is_faster: bool :param vt_length: length of Varshamov-Tenengolts code for error-correction. :type vt_length: int or None :param shuffles: shuffle relationships for bit-nucleotide mapping. :type: numpy.ndarray :param need_path: need to record the restricted state transition path. :type need_path: bool :param verbose: need to print log. :type verbose: bool :return: DNA sequence encoded by this graph (and VT check sequence if required). :rtype: str or (str, str) .. note:: If the parameter "is_faster" is set as True, the out-degree of coding digraph cannot contains 3. Example >>> from numpy import array >>> from dsw import encode >>> # accessor with GC-balanced >>> accessor = array([[-1, -1, -1, -1], [ 4, -1, -1, 7], [ 8, -1, -1, 11], [-1, -1, -1, -1], \ [-1, 1, 2, -1], [-1, -1, -1, -1], [-1, -1, -1, -1], [-1, 13, 14, -1], \ [-1, 1, 2, -1], [-1, -1, -1, -1], [-1, -1, -1, -1], [-1, 13, 14, -1], \ [-1, -1, -1, -1], [ 4, -1, -1, 7], [ 8, -1, -1, 11], [-1, -1, -1, -1]]) >>> binary_message = array([0, 1, 0, 1, 0, 1, 0, 1]) >>> encode(accessor=accessor, binary_message=binary_message, start_index=1) 'TCTCTCT' >>> encode(accessor=accessor, binary_message=binary_message, start_index=1, vt_length=5) ('TCTCTCT', 'TAAGC') >>> shuffles = array([[3, 2, 1, 0], [2, 3, 1, 0], [3, 1, 0, 2], [0, 3, 1, 2], \ [3, 2, 0, 1], [1, 0, 3, 2], [0, 3, 1, 2], [2, 0, 1, 3], \ [2, 3, 0, 1], [1, 0, 3, 2], [2, 0, 1, 3], [0, 1, 3, 2], \ [2, 3, 1, 0], [2, 0, 3, 1], [0, 1, 3, 2], [0, 3, 2, 1]]) >>> encode(accessor=accessor, binary_message=binary_message, shuffles=shuffles, start_index=1) 'AGAGAGA' """ monitor, record_path, vertex_index, dna_sequence, nucleotides = Monitor(), [], start_index, "", "ACGT" if not is_faster: quotient = bit_to_number(binary_message, verbose=verbose) total_state = len(quotient) # number of symbol. while quotient != "0": used_indices = where(accessor[vertex_index] >= 0)[0] if len(used_indices) > 1: # current vertex contains information. quotient, remainder = calculus_division(number=quotient, base=str(len(used_indices))) remainder = int(remainder) if shuffles is not None: # shuffle remainder based on the inputted shuffles. remainder = argsort(shuffles[vertex_index, used_indices])[remainder] value = used_indices[remainder] if need_path: record_path.append([vertex_index, 1]) elif len(used_indices) == 1: # current vertex does not contain information. value = used_indices[0] if need_path: record_path.append([vertex_index, 0]) else: # current vertex is wrong. raise ValueError("Current vertex doesn't have an out-degree, " + "the accessor or the start vertex is wrong!") nucleotide, vertex_index = nucleotides[value], accessor[vertex_index][value] dna_sequence += nucleotide if verbose: if quotient != "0": monitor(total_state - len(quotient), total_state) else: monitor(total_state, total_state) else: location = 0 while location < len(binary_message): used_indices = where(accessor[vertex_index] >= 0)[0] radix = len(used_indices) if radix == 4: # current vertex contains information. remainder = binary_message[location] * 2 + binary_message[location + 1] if shuffles is not None: # shuffle remainder based on the inputted shuffles. remainder = argsort(shuffles[vertex_index, used_indices])[remainder] value = used_indices[remainder] location += 2 elif radix == 2: remainder = binary_message[location] if shuffles is not None: # shuffle remainder based on the inputted shuffles. remainder = argsort(shuffles[vertex_index, used_indices])[remainder] value = used_indices[remainder] location += 1 elif radix == 1: # current vertex does not contain information. value = used_indices[0] elif radix == 3: raise ValueError("Not implementation!") else: # current vertex is wrong. raise ValueError("Current vertex doesn't have an out-degree, " + "the accessor or the start vertex is wrong!") nucleotide, vertex_index = nucleotides[value], accessor[vertex_index][value] dna_sequence += nucleotides[value] if need_path: record_path.append([vertex_index, int(radix > 1)]) if verbose: monitor(location + 1, len(binary_message)) if need_path: record_path = array(record_path) if vt_length > 0: vt_check = set_vt(dna_sequence=dna_sequence, vt_length=vt_length) if need_path: return dna_sequence, set_vt(dna_sequence=dna_sequence, vt_length=vt_length), record_path else: return dna_sequence, vt_check else: if need_path: return dna_sequence, record_path else: return dna_sequence
[docs]def decode(dna_sequence, bit_length, accessor, start_index, is_faster=False, vt_check=None, shuffles=None, verbose=False): """ Decode a DNA sequence by the specific accessor. :param dna_sequence: DNA sequence encoded by this graph. :type dna_sequence: str :param bit_length: length of the bit array. :type bit_length: int :param accessor: accessor of the coding algorithm (consistent with the encoding process). :type accessor: numpy.ndarray :param start_index: virtual vertex to start decoding (consistent with the encoding process). :type start_index: int :param is_faster: encode in a faster way. :type is_faster: bool :param vt_check: check sequence of Varshamov-Tenengolts code. :type vt_check: str or None :param shuffles: shuffle relationships for bit-nucleotide mapping. :type shuffles: numpy.ndarray :param verbose: need to print log. :type verbose: bool :return: binary message decoded by this graph. :rtype: numpy.ndarray :raise ValueError: if one or more errors are found. .. note:: If the parameter "is_faster" is set as True, the out-degree of coding digraph cannot contains 3. Example >>> from numpy import array >>> from dsw import decode >>> # accessor with GC-balanced >>> accessor = array([[-1, -1, -1, -1], [ 4, -1, -1, 7], [ 8, -1, -1, 11], [-1, -1, -1, -1], \ [-1, 1, 2, -1], [-1, -1, -1, -1], [-1, -1, -1, -1], [-1, 13, 14, -1], \ [-1, 1, 2, -1], [-1, -1, -1, -1], [-1, -1, -1, -1], [-1, 13, 14, -1], \ [-1, -1, -1, -1], [ 4, -1, -1, 7], [ 8, -1, -1, 11], [-1, -1, -1, -1]]) >>> dna_sequence = "TCTCTCT" >>> decode(accessor=accessor, dna_sequence=dna_sequence, start_index=1, bit_length=8) array([0, 1, 0, 1, 0, 1, 0, 1]) >>> vt_check = "TAAGC" >>> decode(accessor=accessor, dna_sequence=dna_sequence, start_index=1, vt_check=vt_check, bit_length=8) array([0, 1, 0, 1, 0, 1, 0, 1]) >>> shuffles = array([[3, 2, 1, 0], [2, 3, 1, 0], [3, 1, 0, 2], [0, 3, 1, 2], \ [3, 2, 0, 1], [1, 0, 3, 2], [0, 3, 1, 2], [2, 0, 1, 3], \ [2, 3, 0, 1], [1, 0, 3, 2], [2, 0, 1, 3], [0, 1, 3, 2], \ [2, 3, 1, 0], [2, 0, 3, 1], [0, 1, 3, 2], [0, 3, 2, 1]]) >>> decode(accessor=accessor, dna_sequence="TCTCTCT", start_index=1, shuffles=shuffles, bit_length=8) array([0, 0, 0, 0, 0, 0, 0, 0]) """ vertex_index, nucleotides, monitor = start_index, "ACGT", Monitor() if vt_check is not None: if vt_check != set_vt(dna_sequence=dna_sequence, vt_length=len(vt_check)): raise ValueError("At least one error is found in this DNA sequence!") if not is_faster: quotient, saved_values = "0", [] for location, nucleotide in enumerate(dna_sequence): used_indices = where(accessor[vertex_index] >= 0)[0] if len(used_indices) > 1: # current vertex contains information. used_nucleotides = [nucleotides[used_index] for used_index in used_indices] if nucleotide in used_nucleotides: # check whether the DNA sequence is right currently. remainder = used_nucleotides.index(nucleotide) else: raise ValueError("At least one error is found in this DNA sequence!") if shuffles is not None: # shuffle remainder based on the inputted shuffles. remainder = where(argsort(shuffles[vertex_index, used_indices]) == remainder)[0][0] saved_values.append((len(used_indices), remainder)) vertex_index = accessor[vertex_index][nucleotides.index(nucleotide)] elif len(used_indices) == 1: # current vertex does not contain information. used_nucleotide = nucleotides[used_indices[0]] if nucleotide == used_nucleotide: vertex_index = accessor[vertex_index][nucleotides.index(nucleotide)] else: raise ValueError("At least one error is found in this DNA sequence!") else: # current vertex is wrong. raise ValueError("Current vertex doesn't have an out-degree, " + "the accessor, the start vertex, or DNA sequence is wrong!") if verbose: monitor(location + 1, len(dna_sequence)) for location, (out_degree, number) in enumerate(saved_values[::-1]): quotient = calculus_multiplication(number=quotient, base=str(out_degree)) quotient = calculus_addition(number=quotient, base=str(number)) binary_message = array(number_to_bit(decimal_number=quotient, bit_length=bit_length), dtype=int) else: message_location, binary_message = 0, zeros(shape=(bit_length,), dtype=int) for location, nucleotide in enumerate(dna_sequence): used_indices = where(accessor[vertex_index] >= 0)[0] radix = len(used_indices) used_nucleotides = [nucleotides[used_index] for used_index in used_indices] if nucleotide in used_nucleotides: # check whether the DNA sequence is right currently. remainder = used_nucleotides.index(nucleotide) else: raise ValueError("At least one error is found in this DNA sequence!") if shuffles is not None: # shuffle remainder based on the inputted shuffles. remainder = where(argsort(shuffles[vertex_index, used_indices]) == remainder)[0][0] vertex_index = accessor[vertex_index][nucleotides.index(nucleotide)] if radix == 4: binary_message[message_location] = remainder // 2 binary_message[message_location + 1] = remainder % 2 message_location += 2 elif radix == 2: binary_message[message_location] = remainder % 2 message_location += 1 elif radix == 1: pass # do nothing. elif radix == 3: raise ValueError("Not implementation!") else: raise ValueError("Current vertex doesn't have an out-degree, " + "the accessor, the start vertex, or DNA sequence is wrong!") return binary_message
[docs]def set_vt(dna_sequence, vt_length): """ Set Varshamov-Tenengolts-based path check string ('salt-protected') from DNA (payload) sequence. :param dna_sequence: DNA sequence encoded through SPIDER-WEB. :type dna_sequence: str :param vt_length: length of DNA sequence (path check). :type vt_length: int or None :return: path check DNA sequence with required length. Example >>> from dsw import set_vt >>> dna_sequence = "TCTCTCT" >>> vt_length = 5 >>> set_vt(dna_sequence=dna_sequence, vt_length=vt_length) 'TAAGC' .. note:: Reference [1] Rom R. Varshamov and Grigory M. Tenengolts (1965) Avtomat. i Telemekh Reference [2] Grigory Tenengolts (1984) IEEE Transactions on Information Theory Reference [3] William H. Press et al. (2020) Proceedings of the National Academy of Sciences Reference [4] A. Xavier Kohll et al. (2020) Chemical Communications """ nucleotides = "ACGT" values = array([nucleotides.index(nucleotide) for nucleotide in dna_sequence]) vt_value = sum(where((values[1:] - values[:-1]) > 0)[0]) % (len(nucleotides) ** (vt_length - 1)) vt_flag = sum(values) % len(nucleotides) return nucleotides[vt_flag] + number_to_dna(decimal_number=int(vt_value), dna_length=vt_length - 1)
[docs]def repair_dna(dna_sequence, accessor, start_index, observed_length, vt_check=None, has_indel=False, heap_size=1e3): """ Repair the DNA sequence containing one (or more) errors. :param dna_sequence: DNA sequence waiting for recovery. :type dna_sequence: str :param accessor: accessor of the coding algorithm. :type accessor: numpy.ndarray :param start_index: virtual vertex to start encoding. :type start_index: int :param observed_length: length of the DNA sequence in a vertex. :type observed_length: int :param vt_check: check sequence of Varshamov-Tenengolts code. :type vt_check: str or None :param has_indel: consider insertion and/or deletion error. :type has_indel: bool :param heap_size: maximum heap size. :type heap_size: int :return: repaired DNA sequence set and additional information. :rtype: (list, (bool, bool, int)) Example >>> from numpy import array >>> from dsw import repair_dna >>> # accessor with GC-balanced >>> accessor = array([[-1, -1, -1, -1], [ 4, -1, -1, 7], [ 8, -1, -1, 11], [-1, -1, -1, -1], \ [-1, 1, 2, -1], [-1, -1, -1, -1], [-1, -1, -1, -1], [-1, 13, 14, -1], \ [-1, 1, 2, -1], [-1, -1, -1, -1], [-1, -1, -1, -1], [-1, 13, 14, -1], \ [-1, -1, -1, -1], [ 4, -1, -1, 7], [ 8, -1, -1, 11], [-1, -1, -1, -1]]) >>> dna_sequence = "TCTCTATCTCTC" # "TCTCTCTCTCTC" is original DNA sequence >>> repair_dna(dna_sequence=dna_sequence, accessor=accessor, start_index=1, observed_length=2, has_indel=True) (['TCTCTCTCTCTC', 'TCTCTGTCTCTC'], (1, False, 2, 14)) >>> vt_check = "AACGC" # check list of Varshamov-Tenengolts code. >>> repair_dna(dna_sequence=dna_sequence, accessor=accessor, start_index=1, observed_length=2, \ vt_check=vt_check, has_indel=True) (['TCTCTCTCTCTC'], (1, True, 2, 14)) """ nucleotides = "ACGT" location, vertex_index, index_queue = 0, start_index, -ones(shape=(len(dna_sequence),), dtype=int) split_sequences, chuck_sequences, index_markers = [""], [], [] detected_count, chuck_flag, visited_times = 0, False, 0 while location < len(dna_sequence): used_indices, nucleotide = where(accessor[vertex_index] >= 0)[0], dna_sequence[location] if dna_sequence[location] in [nucleotides[used_index] for used_index in used_indices]: split_sequences[-1] += nucleotide vertex_index = accessor[vertex_index][nucleotides.index(nucleotide)] index_queue[location] = vertex_index visited_times += 1 location += 1 elif len(split_sequences[-1]) > 0: detected_count += 1 split_sequences[-1] = split_sequences[-1][: - observed_length + 1] vertex_index = dna_to_number(dna_sequence[location + 1: location + observed_length + 1], is_string=False) split_sequences.append(nucleotides[vertex_index % 4]) index_markers.append(index_queue[location - observed_length: location]) chuck_sequences.append(dna_sequence[location - observed_length + 1: location + observed_length]) location += observed_length + 1 repaired_fragment_set = [set() for _ in range(len(index_markers))] for index, (chuck_sequence, index_marker) in enumerate(zip(chuck_sequences, index_markers)): for recall, vertex_index in enumerate(index_marker[::-1]): record, times = path_matching(dna_sequence=chuck_sequence, accessor=accessor, has_indel=has_indel, previous_index=vertex_index, occur_location=observed_length - recall - 1) visited_times += times for _, fragment in record: if dna_sequence not in repaired_fragment_set[index]: repaired_fragment_set[index].add(fragment) repaired_fragment_set[index] = list(repaired_fragment_set[index]) repaired_results, count = set(), 1 for fragments in repaired_fragment_set: count *= len(fragments) if count == 0 or count > heap_size: if vt_check is not None: if vt_check == set_vt(dna_sequence=dna_sequence, vt_length=len(vt_check)): return [dna_sequence], (0, False, 0, visited_times) else: return [], (0, True, 0, visited_times) else: return [dna_sequence], (0, False, 0, visited_times) for fragments in product(*repaired_fragment_set): repaired_dna_sequence = "" for index in range(len(split_sequences) - 1): repaired_dna_sequence += split_sequences[index] + fragments[index] repaired_dna_sequence += split_sequences[-1] if vt_check is not None: if vt_check == set_vt(dna_sequence=repaired_dna_sequence, vt_length=len(vt_check)): repaired_results.add(repaired_dna_sequence) else: chuck_flag = True else: repaired_results.add(repaired_dna_sequence) return sorted(list(repaired_results)), (detected_count, chuck_flag, count, visited_times)
[docs]def find_vertices(observed_length, bio_filter, verbose=False): """ Find valid vertices based on the given the biochemical constraints. :param observed_length: length of the DNA sequence in a vertex. :type observed_length: int :param bio_filter: screening operation for identifying the valid DNA sequence (required the given constraints). :type bio_filter: dsw.biofilter.DefaultBioFilter :param verbose: need to print log. :type verbose: bool :return: available vertices. :rtype: numpy.ndarray :raise ValueError: if no valid vertices are available under the required biochemical constraints. Example >>> from dsw import LocalBioFilter, find_vertices >>> bio_filter = LocalBioFilter(observed_length=2, max_homopolymer_runs=2, gc_range=[0.5, 0.5]) >>> # "1" refers to the available index of vertex, otherwise "0" refers to unavailable index of vertex. >>> find_vertices(observed_length=2, bio_filter=bio_filter).astype(int) array([0, 1, 1, 0, 1, 0, 0, 1, 1, 0, 0, 1, 0, 1, 1, 0]) .. note:: Reference [1] Florent Capelli and Yann Strozecki (2019) Discrete Applied Mathematics """ nucleotides = "ACGT" vertices, monitor = zeros(shape=(int(len(nucleotides) ** observed_length),), dtype=bool), Monitor() if verbose: print("Find valid vertices in this observed length of DNA sequence.") for vertex_index in range(len(vertices)): dna_sequence = number_to_dna(decimal_number=vertex_index, dna_length=observed_length) vertices[vertex_index] = bio_filter.valid(dna_sequence=dna_sequence) if verbose: monitor(vertex_index + 1, len(vertices), extra={"valid": sum(vertices[: vertex_index + 1])}) valid_rate = sum(vertices) / len(vertices) if valid_rate == 0: raise ValueError("No vertex is collected!") if verbose: print(str(round(valid_rate * 100, 2)) + "% (" + str(sum(vertices)) + ") valid vertices are collected.") return vertices
[docs]def connect_valid_graph(observed_length, vertices, verbose=False): """ Connect a valid graph by valid vertices. :param observed_length: length of the DNA sequence in a vertex. :type observed_length: int :param vertices: vertex accessor, in each cell, True is valid vertex and False is invalid vertex. :type vertices: numpy.ndarray :param verbose: need to print log. :type verbose: bool :return: accessor of the valid graph. :rtype: numpy.ndarray :raise ValueError: if no valid vertices are available. Example >>> from numpy import array >>> from dsw import connect_valid_graph >>> vertices = array([0, 1, 1, 0, 1, 0, 0, 1, 1, 0, 0, 1, 0, 1, 1, 0]) >>> connect_valid_graph(observed_length=2, vertices=vertices) array([[-1, -1, -1, -1], [ 4, -1, -1, 7], [ 8, -1, -1, 11], [-1, -1, -1, -1], [-1, 1, 2, -1], [-1, -1, -1, -1], [-1, -1, -1, -1], [-1, 13, 14, -1], [-1, 1, 2, -1], [-1, -1, -1, -1], [-1, -1, -1, -1], [-1, 13, 14, -1], [-1, -1, -1, -1], [ 4, -1, -1, 7], [ 8, -1, -1, 11], [-1, -1, -1, -1]]) .. note:: Reference [1] Nicolaas Govert de Bruijn (1946) Indagationes Mathematicae """ nucleotides = "ACGT" if vertices is None: raise ValueError("No collected vertex!") if verbose: print("Connect valid graph with valid vertices.") valid_rate, monitor = sum(vertices) / len(vertices), Monitor() if valid_rate > 0: accessor = -ones(shape=(int(len(nucleotides) ** observed_length), len(nucleotides)), dtype=int) for vertex_index in range(int(len(nucleotides) ** observed_length)): if vertices[vertex_index]: latters = obtain_latters(current=vertex_index, observed_length=observed_length) for position, latter_vertex_index in enumerate(latters): if vertices[latter_vertex_index]: accessor[vertex_index][position] = latter_vertex_index if verbose: monitor(vertex_index + 1, len(vertices)) if verbose: print("Valid graph is created.") return accessor else: raise ValueError("No collected vertex!")
[docs]def connect_coding_graph(observed_length, vertices, threshold, verbose=False): """ Connect a coding algorithm by valid vertices and the threshold for minimum out-degree. :param observed_length: length of the DNA sequence in a vertex. :type observed_length: int :param vertices: vertex accessor, in each cell, True is valid vertex and False is invalid vertex. :type vertices: numpy.ndarray :param threshold: threshold for minimum out-degree. :type threshold: int :param verbose: need to print log. :type verbose: bool :return: coding vertices and coding accessor. :rtype: (numpy.ndarray, numpy.ndarray) :raise ValueError: if no coding graph are created because of the vertex set and/or the trimming requirement. Example >>> from numpy import array >>> from dsw import connect_coding_graph >>> vertices = array([0, 1, 1, 0, 1, 0, 0, 1, 1, 0, 0, 1, 0, 1, 1, 0]) >>> vertices, accessor = connect_coding_graph(observed_length=2, vertices=vertices, threshold=2) >>> accessor array([[-1, -1, -1, -1], [ 4, -1, -1, 7], [ 8, -1, -1, 11], [-1, -1, -1, -1], [-1, 1, 2, -1], [-1, -1, -1, -1], [-1, -1, -1, -1], [-1, 13, 14, -1], [-1, 1, 2, -1], [-1, -1, -1, -1], [-1, -1, -1, -1], [-1, 13, 14, -1], [-1, -1, -1, -1], [ 4, -1, -1, 7], [ 8, -1, -1, 11], [-1, -1, -1, -1]]) >>> vertices.astype(int) array([0, 1, 1, 0, 1, 0, 0, 1, 1, 0, 0, 1, 0, 1, 1, 0]) .. note:: Reference [1] Nicolaas Govert de Bruijn (1946) Indagationes Mathematicae """ times, nucleotides = 1, "ACGT" while True: if verbose: print("Check the vertex collection requirement in round " + str(times) + ".") new_vertices, monitor = zeros(shape=(int(len(nucleotides) ** observed_length),), dtype=bool), Monitor() saved_indices = where(vertices != 0)[0] for current, vertex_index in enumerate(saved_indices): latter_indices = obtain_latters(current=vertex_index, observed_length=observed_length) new_vertices[vertex_index] = sum(vertices[latter_indices]) >= threshold if verbose: monitor(current + 1, len(saved_indices)) changed = sum(vertices) - sum(new_vertices) if verbose: print(str(round(sum(new_vertices) / len(vertices) * 100, 2)) + "% (" + str(sum(new_vertices)) + ") " + "valid vertices are saved.") if sum(new_vertices) < 1: raise ValueError("No coding graph is created!") if not changed: break vertices = new_vertices times += 1 valid_rate = sum(vertices) / len(vertices) if valid_rate > 0: accessor = -ones(shape=(int(len(nucleotides) ** observed_length), len(nucleotides)), dtype=int) for vertex_index in range(int(len(nucleotides) ** observed_length)): if vertices[vertex_index]: latters = obtain_latters(current=vertex_index, observed_length=observed_length) for position, latter_vertex_index in enumerate(latters): if vertices[latter_vertex_index]: accessor[vertex_index][position] = latter_vertex_index if verbose: monitor(vertex_index + 1, len(vertices)) if threshold == 1: while True: vertices = obtain_vertices(accessor) graph = DiGraph() for former_index, latter_indices in enumerate(accessor): for latter_index in latter_indices: if latter_index >= 0: graph.add_edge(u_of_edge=former_index, v_of_edge=latter_index) useless_vertices, cycle = [], find_cycle(graph) for former_index, latter_index in cycle: if len(where(accessor[former_index] >= 0)[0]) == 1: useless_vertices.append(former_index) if len(useless_vertices) == len(cycle): for useless_vertex in useless_vertices: accessor[useless_vertex] = -1 pairs = [(i, useless_vertex) for i in obtain_formers(useless_vertex, 10)] while len(pairs) > 0: new_pairs = [] for former_index, latter_index in pairs: previous = len(where(accessor[former_index] >= 0)[0]) accessor[former_index, latter_index % 4] = -1 current = len(where(accessor[former_index] >= 0)[0]) if previous > current == 0: new_pairs += [(i, former_index) for i in obtain_formers(former_index, 10)] pairs = new_pairs else: break if verbose: print("The coding graph is created.") return vertices, accessor else: raise ValueError("The coding graph cannot be created!")
[docs]def remove_nasty_arc(accessor, latter_map, iteration=0, has_insertion=True, has_deletion=True, verbose=False): """ Remove the nasty arc. :param accessor: accessor of the coding algorithm. :type accessor: numpy.ndarray :param latter_map: latter map of the coding algorithm. :type latter_map: dict :param iteration: current round if required. :type iteration: int :param has_insertion: need to repair insertion error. :type has_insertion: bool :param has_deletion: need to repair deletion error. :type has_deletion: bool :param verbose: need to print log. :type verbose: bool :return: adjusted accessor, adjusted latter map, removed arc, and maximum intersection score. :rtype: (numpy.ndarray, dict, tuple, int) Example >>> from numpy import array >>> from dsw import accessor_to_latter_map, remove_nasty_arc >>> # accessor with GC-balanced >>> accessor = array([[-1, -1, -1, -1], [ 4, -1, -1, 7], [ 8, -1, -1, 11], [-1, -1, -1, -1], \ [-1, 1, 2, -1], [-1, -1, -1, -1], [-1, -1, -1, -1], [-1, 13, 14, -1], \ [-1, 1, 2, -1], [-1, -1, -1, -1], [-1, -1, -1, -1], [-1, 13, 14, -1], \ [-1, -1, -1, -1], [ 4, -1, -1, 7], [ 8, -1, -1, 11], [-1, -1, -1, -1]]) >>> latter_map = accessor_to_latter_map(accessor) >>> result = remove_nasty_arc(accessor=accessor, latter_map=latter_map, iteration=0, verbose=False, \ has_insertion=True, has_deletion=True) >>> result[0] array([[-1, -1, -1, -1], [-1, -1, -1, 7], [ 8, -1, -1, 11], [-1, -1, -1, -1], [-1, 1, 2, -1], [-1, -1, -1, -1], [-1, -1, -1, -1], [-1, 13, 14, -1], [-1, 1, 2, -1], [-1, -1, -1, -1], [-1, -1, -1, -1], [-1, 13, 14, -1], [-1, -1, -1, -1], [ 4, -1, -1, 7], [ 8, -1, -1, 11], [-1, -1, -1, -1]]) >>> result[1] {1: [7], 2: [8, 11], 4: [1, 2], 7: [13, 14], 8: [1, 2], 11: [13, 14], 13: [4, 7], 14: [8, 11]} >>> result[2] (1, 4) >>> result[3] [16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16] .. note:: The graph information contained in "accessor" and "latter_map" is consistent. It is a gift for the follow-up investigation. That is, removing arc to improve the capability of the probabilistic error correction. """ nucleotides = "ACGT" observed_length = int(log(len(accessor)) / log(len(nucleotides))) if verbose: if iteration > 0: print("Calculate the intersection score for each remained arc in " + str(iteration) + " round(s).") else: print("Calculate the intersection score for each remained arc.") scores = calculate_intersection_score(latter_map=latter_map, has_insertion=has_insertion, has_deletion=has_deletion, observed_length=observed_length, verbose=verbose) vertex_indices = unique(where(scores == max(scores))[0]) former = intersect1d(obtain_vertices(accessor=accessor), vertex_indices)[0] former_value, latter_value = former % len(nucleotides), argmax(scores[former]) latter = int((former * len(nucleotides) + latter_value) % (len(nucleotides) ** observed_length)) accessor[former, latter_value] = -1 del latter_map[former][latter_map[former].index(latter)] if len(latter_map[former]) == 0: del latter_map[former] scores = scores.reshape(-1) scores = scores[scores > 0].tolist() score_record = array(list(Counter(scores).items())).T score_record = score_record[:, argsort(score_record[0])[::-1]] if verbose: print("Current scores are:") print(score_record) print("Remove arc " + number_to_dna(int(former), dna_length=observed_length) + " -> " + number_to_dna(int(latter), dna_length=observed_length) + " with the maximum intersection score " + str(max(scores)) + ".") return accessor, latter_map, (former, latter), scores
[docs]def create_random_shuffles(observed_length, random_seed=None, verbose=False): """ Create the shuffles for accessor through the random mechanism. :param observed_length: length of the DNA sequence in a vertex. :type observed_length: int :param random_seed: random seed for creating shuffles. :type random_seed: int :param verbose: need to print log. :type verbose: bool :return: shuffles for accessor. :rtype: numpy.ndarray Example >>> from dsw import create_random_shuffles >>> create_random_shuffles(observed_length=2, random_seed=2021) array([[3, 2, 1, 0], [2, 3, 1, 0], [3, 1, 0, 2], [0, 3, 1, 2], [3, 2, 0, 1], [1, 0, 3, 2], [0, 3, 1, 2], [2, 0, 1, 3], [2, 3, 0, 1], [1, 0, 3, 2], [2, 0, 1, 3], [0, 1, 3, 2], [2, 3, 1, 0], [2, 0, 3, 1], [0, 1, 3, 2], [0, 3, 2, 1]]) .. note:: The mapping shuffle strategy disrupts the digit-to-nucleotide mapping order, so it can be used as an privacy protection mechanism. Value 0 ~ 3 in each line shuffle only describes the relationship between progressive order and position. The original position is [0, 1, 2, 3]. If the follow-up nucleotides in a vertex are ["A", "G"] when the line shuffle is [3, 2, 1, 0], This line shuffle can be regarded as [1, -, 0, -] for ["A", -, "G", -]. Ths shuffle will not disclose the topology information of accessor. """ nucleotides = "ACGT" shuffles = zeros(shape=(4 ** observed_length, len(nucleotides)), dtype=int) shuffles[:, 1] = 1 shuffles[:, 2] = 2 shuffles[:, 3] = 3 random.seed(random_seed) monitor = Monitor() for index in range(4 ** observed_length): card = shuffles[index] random.shuffle(card) shuffles[index] = card if verbose: monitor(index + 1, 4 ** observed_length) random.seed(None) return shuffles