__version__ = "v1.0"
__copyright__ = "Copyright 2022"
__license__ = "MIT"
__author__ = "Adam Cribbs lab"
import time
import numpy as np
import pandas as pd
from resimpy.util.random.Number import number as rannum
from resimpy.util.sequence.symbol.Single import single as dnasgl
[docs]class error(object):
def __init__(self, method):
self.method = method
def __call__(self, deal):
from functools import wraps
if self.method == 'default':
func = self.postable
@wraps(deal)
def indexing(ph, *args, **kwargs):
print('===>sequencing...')
res2p = deal(ph, **kwargs)
# print(func(res2p))
return func(res2p)
return indexing
[docs] def postable(self, res2p, kind='index_by_same_len'):
# print(res2p.keys())
seq_stime = time.time()
data_seq = pd.DataFrame(res2p['data_spl'], columns=['read', 'sam_id', 'source'])
del res2p['data']
del res2p['data_spl']
print('======>{} reads to be sequenced'.format(data_seq.shape[0]))
print('======>constructing the position table starts...')
pcr_postable_stime = time.time()
if kind == 'index_by_same_len':
seq_pos_ids, seq_ids = self.postableIndexBySameLen(seq_len=len(data_seq['read'][0]), num_seq=data_seq.shape[0])
elif kind == 'index_by_lambda':
seq_ids = []
seq_pos_ids = []
data_seq.apply(lambda x: self.postableLambda(x, seq_ids, seq_pos_ids), axis=1)
else:
seq_pos_ids, seq_ids = self.postableIndexBySameLen(seq_len=len(data_seq['read'][0]), num_seq=data_seq.shape[0])
pos_table = {'seq_ids': seq_ids, 'seq_pos_ids': seq_pos_ids}
pcr_postable_etime = time.time()
print('======>time for constructing the position table {time:.3f}s'.format(time=pcr_postable_etime - pcr_postable_stime))
seq_nt_num = len(seq_ids)
print('======>{} nucleotides to be sequenced'.format(seq_nt_num))
print('======>determining PCR error numbers starts...')
seq_err_num_simu_stime = time.time()
if res2p['err_num_met'] == 'binomial':
seq_err_num = rannum().binomial(n=seq_nt_num, p=res2p['seq_error'], use_seed=True, seed=1)
elif res2p['err_num_met'] == 'nbinomial':
seq_err_num = rannum().nbinomial(
n=seq_nt_num * (1 - res2p['seq_error']),
p=1 - res2p['seq_error'],
use_seed=True,
seed=1
)
else:
seq_err_num = rannum().binomial(n=seq_nt_num, p=res2p['seq_error'], use_seed=True, seed=1)
print('======>{} nucleotides to be erroneous in sequencing'.format(seq_err_num))
err_lin_ids = rannum().uniform(low=0, high=seq_nt_num, num=seq_err_num, use_seed=True, seed=1)
# print(err_lin_ids)
arr_err_pos = []# [[row1, col1], [row2, col2], ...]
for i in err_lin_ids:
arr_err_pos.append([pos_table['seq_ids'][i], pos_table['seq_pos_ids'][i]])
pseudo_nums = rannum().uniform(low=0, high=3, num=seq_err_num, use_seed=False)
# print(pseudo_nums)
seq_err_num_simu_etime = time.time()
print('======>time for determining sequencing error numbers {time:.3f}s'.format(time=seq_err_num_simu_etime - seq_err_num_simu_stime))
print('======>assigning sequencing errors starts...')
seq_err_assign_stime = time.time()
data_seq['read'] = data_seq.apply(lambda x: list(x['read']), axis=1)
for pos_err, pseudo_num in zip(arr_err_pos, pseudo_nums):
pcr_err_base = data_seq.loc[pos_err[0], 'read'][pos_err[1]]
dna_map = dnasgl().todict(
nucleotides=dnasgl().getEleTrimmed(
ele_loo=pcr_err_base,
universal=True,
),
reverse=True,
)
# print('before', data_seq.loc[pos_err[0], 'read'][pos_err[1]])
data_seq.loc[pos_err[0], 'read'][pos_err[1]] = dna_map[pseudo_num]
# print('after', data_seq.loc[pos_err[0], 'read'][pos_err[1]])
del arr_err_pos
del pseudo_nums
seq_err_assign_etime = time.time()
print('======>time for assigning sequencing errors {time:.2f}s'.format(time=seq_err_assign_etime - seq_err_assign_stime))
data_seq['read'] = data_seq.apply(lambda x: ''.join(x['read']), axis=1)
res2p['data'] = data_seq.values
del data_seq
seq_etime = time.time()
print('======>sequencing time: {time:.2f}s'.format(time=seq_etime - seq_stime))
return res2p
[docs] def postableIndexBySameLen(self, seq_len, num_seq):
nt_ids = [i for i in range(seq_len)]
seq_pos_ids = nt_ids * num_seq
seq_ids = np.array([[i] * seq_len for i in range(num_seq)]).ravel().tolist()
return seq_pos_ids, seq_ids
[docs] def postableLambda(self, x, ids, pos_ids):
l = len(x['read'])
for i in range(l):
pos_ids.append(i)
for i in [x.name] * l:
ids.append(i)
return ids, pos_ids
[docs] def epos(self, err_lin_ids, t):
pos_err = []
for i in err_lin_ids:
cc = 0
for id, j in enumerate(t):
cc += j
if cc >= i:
# print(cc, i)
pos_err.append([id, (j - 1) - (cc % i)])
break
return pos_err
[docs] def tactic1(self, arr_2d):
result = {}
len_arr = len(arr_2d[0])
if len_arr == 2:
for item in arr_2d:
result[item[0]] = item[1]
else:
for item in arr_2d:
result[item[0]] = item[1:]
return result
[docs] def todict(self, bases, reverse=False):
aa_dict = {}
for k, v in enumerate(bases):
aa_dict[v] = k
if reverse:
aa_dict = {v: k for k, v in aa_dict.items()}
return aa_dict