Source code for resimpy.pcr.ErrorTransloc

__version__ = "v1.0"
__copyright__ = "Copyright 2022"
__license__ = "MIT"
__author__ = "Adam Cribbs lab"

import time
import numpy as np
import pandas as pd
from resimpy.util.random.Number import number as rannum
from resimpy.util.sequence.symbol.Single import single as dnasgl


[docs]class errorTransloc(object): def __init__(self, method): self.method = method def __call__(self, deal): from functools import wraps if self.method == 'default': func = self.postable @wraps(deal) def indexing(ph, *args, **kwargs): print('======>error making...') res2p = deal(ph, **kwargs) # print(func(res2p)) return func(res2p) return indexing
[docs] def postable(self, res2p, kind='index_by_same_len'): """ .. test: ----- # dna = ['A', 'T', 'G', 'C'] # dna.remove(pcr_err_base) # dna_map = self.todict(dna, reverse=True) # data_pcr.loc[pos_err[0], 'read'][pos_err[1]] = dna_map[pseudo_num] # pcr_err_base = data_pcr[pos_err[0]][0][pos_err[1]] # d = [ # [i, j] for i in ampl_read_len_df.index for j in np.arange(ampl_read_len_df[i]) # ] # print(len(d)) # ampl_read_len_list = np.arange(len(d))[:, np.newaxis] # map_tab = np.concatenate((ampl_read_len_list, d), axis=1) # print(self.tactic1(map_tab)) .. Method: ------- =>string concat for pos_err, pseudo_num in zip(arr_err_pos, pseudo_nums): tar_read = data_pcr.loc[pos_err[0], 'read'] pcr_err_base = tar_read[pos_err[1]] dna_map = dnasgl().todict( nucleotides=dnasgl().getEleTrimmed( ele_loo=pcr_err_base, universal=True, ), reverse=True, ) # print('before', data_pcr.loc[pos_err[0], 'read'][pos_err[1]]) data_pcr.loc[pos_err[0], 'read'] = tar_read[:pos_err[1]] + dna_map[pseudo_num] + tar_read[pos_err[1]+1:] :param res2p: :return: """ # print(res2p.keys()) pcr_stime = time.time() data_pcr = pd.DataFrame(res2p['data_spl'], columns=['read', 'r1_id', 'r2_id', 'transloc_stat', 'transloc_side', 'sam_id', 'source']) data_pcr['transloc_stat'] = 'fake_no' transloc_num = rannum().binomial(n=data_pcr.shape[0], p=res2p['transloc_rate'], use_seed=True, seed=res2p['ipcr'] + 1) # print('++++++++> {}'.format(transloc_num)) transloc_rid = rannum().choice(high=data_pcr.shape[0], num=transloc_num*2, replace=False) transloc_rids = np.reshape(transloc_rid, (transloc_num, 2)) # print('++++++++> {}'.format(transloc_rid)) # print('++++++++> {}'.format(transloc_rids.shape)) for pos_read in transloc_rids: transloc_read_before_1st = data_pcr.loc[pos_read[0]] transloc_read_before_2nd = data_pcr.loc[pos_read[1]] # print('1st before {}'.format(transloc_read_before_1st['read'])) # print(transloc_read_before_1st['r1_id']) # print(transloc_read_before_1st['r2_id']) # print(transloc_read_before_1st['transloc_stat']) # print(transloc_read_before_1st['transloc_side']) # print('2nd before {}'.format(transloc_read_before_2nd['read'])) # print(transloc_read_before_2nd['r1_id']) # print(transloc_read_before_2nd['r2_id']) # print(transloc_read_before_2nd['transloc_stat']) # print(transloc_read_before_2nd['transloc_side']) transloc_read_after_1st = transloc_read_before_1st['read'][:36] + transloc_read_before_2nd['read'][36:] transloc_read_after_2nd = transloc_read_before_2nd['read'][:36] + transloc_read_before_1st['read'][36:] # print('1st after {}'.format(transloc_read_after_1st)) # print('2nd after {}'.format(transloc_read_after_2nd)) data_pcr.loc[pos_read[0], 'read'] = transloc_read_after_1st data_pcr.loc[pos_read[1], 'read'] = transloc_read_after_2nd data_pcr.loc[pos_read[0], 'transloc_stat'] = 'fake_yes' data_pcr.loc[pos_read[1], 'transloc_stat'] = 'fake_yes' data_pcr.loc[pos_read[0], 'transloc_side'] = 'r' data_pcr.loc[pos_read[1], 'transloc_side'] = 'r' tmp_r2_id_1st = data_pcr.loc[pos_read[0], 'r2_id'] tmp_r2_id_2nd = data_pcr.loc[pos_read[1], 'r2_id'] data_pcr.loc[pos_read[0], 'r2_id'] = tmp_r2_id_2nd data_pcr.loc[pos_read[1], 'r2_id'] = tmp_r2_id_1st # print(transloc_read_before_1st['transloc_stat']) # print(transloc_read_before_2nd['transloc_stat']) # print(data_pcr.loc[pos_read[0], 'read']) # print(data_pcr.loc[pos_read[1], 'read']) # print(data_pcr.loc[pos_read[0], 'r2_id']) # print(data_pcr.loc[pos_read[1], 'r2_id']) print(data_pcr['transloc_stat']) del res2p['data_spl'] # print(len(data_pcr['read'][0])) # print(data_pcr.shape[0]) res2p['recorder_pcr_read_num'].append(data_pcr.shape[0]) print('======>{} reads to be amplified'.format(data_pcr.shape[0])) print('======>constructing the position table starts...') # print(data_pcr) pcr_postable_stime = time.time() if kind == 'index_by_same_len': seq_pos_ids, seq_ids = self.postableIndexBySameLen(seq_len=len(data_pcr['read'][0]), num_seq=data_pcr.shape[0]) elif kind == 'index_by_lambda': seq_ids = [] seq_pos_ids = [] data_pcr.apply(lambda x: self.postableLambda(x, seq_ids, seq_pos_ids), axis=1) else: seq_pos_ids, seq_ids = self.postableIndexBySameLen(seq_len=len(data_pcr['read'][0]), num_seq=data_pcr.shape[0]) # print(seq_ids) pos_table = {'seq_ids': seq_ids, 'seq_pos_ids': seq_pos_ids} print('======>time for constructing the position table {time:.3f}s'.format(time=time.time() - pcr_postable_stime)) ampl_nt_num = len(seq_ids) print('======>{} nucleotides to be amplified'.format(ampl_nt_num)) res2p['recorder_nucleotide_num'].append(ampl_nt_num) print('======>determining PCR error numbers starts...') pcr_err_num_simu_stime = time.time() if res2p['err_num_met'] == 'binomial': pcr_err_num = rannum().binomial(n=ampl_nt_num, p=res2p['pcr_error'], use_seed=True, seed=res2p['ipcr'] + 1) elif res2p['err_num_met'] == 'nbinomial': pcr_err_num = rannum().nbinomial( n=ampl_nt_num*(1-res2p['pcr_error']), p=1-res2p['pcr_error'], use_seed=True, seed=res2p['ipcr'] + 1 ) # print('++++++++> {}'.format(pcr_err_num)) else: pcr_err_num = rannum().binomial(n=ampl_nt_num, p=res2p['pcr_error'], use_seed=True, seed=res2p['ipcr'] + 1) print('======>{} nucleotides to be erroneous at this PCR'.format(pcr_err_num)) res2p['recorder_pcr_err_num'].append(pcr_err_num) spl_nt_ids = rannum().uniform(low=0, high=ampl_nt_num, num=pcr_err_num, use_seed=True, seed=res2p['ipcr'] + 1) arr_err_pos = [] for i in spl_nt_ids: arr_err_pos.append([pos_table['seq_ids'][i], pos_table['seq_pos_ids'][i]]) pseudo_nums = rannum().uniform(low=0, high=3, num=pcr_err_num, use_seed=False) # print(pseudo_nums) print('======>time for determining PCR error numbers {time:.3f}s'.format(time=time.time() - pcr_err_num_simu_stime)) print('======>assigning PCR errors starts...') pcr_err_assign_stime = time.time() data_pcr['read'] = data_pcr.apply(lambda x: list(x['read']), axis=1) # caser = self.todict5(arr_err_pos) # data_pcr['read'] = data_pcr.apply(lambda x: self.worktable(x, caser), axis=1) for pos_err, pseudo_num in zip(arr_err_pos, pseudo_nums): pcr_err_base = data_pcr.loc[pos_err[0], 'read'][pos_err[1]] dna_map = dnasgl().todict( nucleotides=dnasgl().getEleTrimmed( ele_loo=pcr_err_base, universal=True, ), reverse=True, ) # print('before', data_pcr.loc[pos_err[0], 'read'][pos_err[1]]) data_pcr.loc[pos_err[0], 'read'][pos_err[1]] = dna_map[pseudo_num] # print('after', data_pcr.loc[pos_err[0], 'read'][pos_err[1]]) print('======>time for assigning PCR errors {time:.2f}s'.format(time=time.time() - pcr_err_assign_stime)) print('======>merging the PCR duplicates and all previous sequences starts...') del arr_err_pos del spl_nt_ids pcr_merge_stime = time.time() data_pcr['read'] = data_pcr.apply(lambda x: ''.join(x['read']), axis=1) data_pcr['source'] = 'pcr' + str(res2p['ipcr'] + 1) # print(data_pcr.values) data_pcr = np.array(data_pcr) res2p['data'] = np.concatenate((res2p['data'], data_pcr), axis=0) del data_pcr print('======>time for merging sequences {time:.2f}s'.format(time=time.time() - pcr_merge_stime)) print('======>Summary report:') print('=========>PCR time: {time:.2f}s'.format(time=time.time() - pcr_stime)) print('=========>the dimensions of the data: number of reads: {}'.format(res2p['data'].shape)) print('=========>the number of reads at this PCR: {}, '.format(res2p['recorder_pcr_read_num'])) print('=========>the number of nucleotides at this PCR: {}, '.format(res2p['recorder_nucleotide_num'])) print('=========>the number of errors at this PCR: {}, '.format(res2p['recorder_pcr_err_num'])) return res2p
[docs] def postableIndexBySameLen(self, seq_len, num_seq): nt_ids = [i for i in range(seq_len)] seq_pos_ids = nt_ids * num_seq seq_ids = np.array([[i] * seq_len for i in range(num_seq)]).ravel().tolist() return seq_pos_ids, seq_ids
[docs] def postableLambda(self, x, ids, pos_ids): for i in range(x['read_len']): pos_ids.append(i) ids = ids + [x.name] * x['read_len'] return ids, pos_ids
[docs] def worktable(self, x, ids): if x.name not in ids: return x['read'] else: tt = x['read'] for i in ids[x.name]: pcr_err_base = tt[i] dna_map = dnasgl().todict( nucleotides=dnasgl().getEleTrimmed( ele_loo=pcr_err_base, universal=True, ), reverse=True, ) tt = tt[:i] + dna_map[2] + tt[i+1:] return tt
[docs] def todict(self, nucleotides, reverse=False): aa_dict = {} for k, v in enumerate(nucleotides): aa_dict[v] = k if reverse: aa_dict = {v: k for k, v in aa_dict.items()} return aa_dict
[docs] def todict5(self, arr_2d): result = {} for item in arr_2d: result[item[0]] = [] for item in arr_2d: if item[0] in result.keys(): result[item[0]].append(item[1]) # print(result[item[0]]) return result