当你有一个大的序列文件比如(FASTA , FASTQ 或 SFF),和一个包含你感兴趣的 ID 列表(这个列表的第一列为 ID),而你希望根据这个列表从序列文件中取出想要的序列。试试下面的代码:
from Bio import SeqIO
input_file = "big_file.sff"
id_file = "short_list.txt"
output_file = "short_list.sff"
with open(id_file) as id_handle:
wanted = set(line.rstrip("\n").split(None,1)[0] for line in id_handle)
print("Found %i unique identifiers in %s" % (len(wanted), id_file))
records = (r for r in SeqIO.parse(input_file, "sff") if r.id in wanted)
count = SeqIO.write(records, output_file, "sff")
print("Saved %i records from %s to %s" % (count, input_file, output_file))
if count < len(wanted):
print("Warning %i IDs not found in %s" % (len(wanted) - count, input_file))
对于处理较大的高通量测序文件时,最好不要直接使用SeqIO
,而是直接使用字符串。这个例子,展示了如何对 FATSQ 文件执行此操作:
from Bio.SeqIO.QualityIO import FastqGeneralIterator
input_file = "big_file.fastq"
id_file = "short_list.txt"
output_file = "short_list.fastq"
with open(id_file) as id_handle:
# Taking first word on each line as an identifer
wanted = set(line.rstrip("\n").split(None,1)[0] for line in id_handle)
print("Found %i unique identifiers in %s" % (len(wanted), id_file))
with open(input_file) as in_handle:
with open(output_file, "w") as out_handle:
for title, seq, qual in FastqGeneralIterator(in_handle):
# The ID is the first word in the title line (after the @ sign):
if title.split(None, 1)[0] in wanted:
# This produces a standard 4-line FASTQ entry:
out_handle.write("@%s\n%s\n+\n%s\n" % (title, seq, qual))
count += 1
print("Saved %i records from %s to %s" % (count, input_file, output_file))
if count < len(wanted):
print("Warning %i IDs not found in %s" % (len(wanted) - count, input_file))
当你正在检查基因组序列,寻找某些序列特征(也许有一些极端的局部GC%偏差,或一些可能的限制性酶切位点)。在使用你的 Python代码 之前,最好在根据相同基因组生成的随机序列上运行一下(毕竟,你发现的任何“特征”都可能是随机的)。
从 NCBI 获取鼠疫耶尔森氏菌pPCP1质粒序列的 FASTA 格式文件(NC_005816)
此文件只有一条序列,因此我们可以使用Bio.SeqIO.read()
函数将其作为SeqRecord
对象读取:
from Bio import SeqIO
original_rec = SeqIO.read("NC_005816.fasta", "fasta")
这里我们会用到 Python 内置的random
模块来随机化序列,特别是random.shuffle
方法。但是这个方法只能用于 Python 列表,而我们的序列是Seq
对象,所以需要转换为列表:
import random
nuc_list = list(original_rec.seq)
random.shuffle(nuc_list)
为了输出重排的序列,需要将这个随机序列的列表转化为字符串,进而生成Seq
对象:
from Bio.Seq import Seq
from Bio.SeqRecord import SeqRecord
shuffled_rec = SeqRecord(Seq("".join(nuc_list), original_rec.seq.alphabet),id="Shuffled", description="Based on %s" % original_rec.id)
更进一步,我们可以生成30个根据原始序列生成的随机序列:
import random
from Bio.Seq import Seq
from Bio.SeqRecord import SeqRecord
from Bio import SeqIO
original_rec = SeqIO.read("NC_005816.fasta", "fasta")
with open("shuffled.fasta", "w") as output_handle:
for i in range(30):
nuc_list = list(original_rec.seq)
random.shuffle(nuc_list)
shuffled_rec = SeqRecord(Seq("".join(nuc_list), original_rec.seq.alphabet),id="Shuffled%i" % (i+1),description="Based on %s" % original_rec.id)
output_handle.write(shuffled_rec.format("fasta"))
另一种更好的方式就是把这个功能包装成函数:
import random
from Bio.Seq import Seq
from Bio.SeqRecord import SeqRecord
from Bio import SeqIO
def make_shuffle_record(record, new_id):
nuc_list = list(record.seq)
random.shuffle(nuc_list)
return SeqRecord(Seq("".join(nuc_list), record.seq.alphabet),id=new_id, description="Based on %s" % original_rec.id)
original_rec = SeqIO.read("NC_005816.fasta","fasta")
shuffled_recs = (make_shuffle_record(original_rec, "Shuffled%i" % (i+1)) for i in range(30))
SeqIO.write(shuffled_recs, "shuffled.fasta", "fasta")
现在你手头有一些CDS序列(FASTA格式),要想把他们翻译成氨基酸序列,你肯定会想到用Seq
对象的translate
方法:
from Bio.SeqRecord import SeqRecord
def make_protein_record(nuc_record):
"""Returns a new SeqRecord with the translated sequence (default table)."""
return SeqRecord(seq = nuc_record.seq.translate(cds=True), \
id = "trans_" + nuc_record.id, \
description = "translation of CDS, using default table")
然后,我们可以使用这个函数将输入的核苷酸转换蛋白序列。比较优雅的方式就是使用生成表达式:
from Bio import SeqIO
proteins = (make_protein_record(nuc_rec) for nuc_rec in \
SeqIO.parse("coding_sequences.fasta", "fasta"))
SeqIO.write(proteins, "translations.fasta", "fasta")
有时候 FASTA 文件中的序列包含了大写和小写字母(小写字母为低质量区域),你希望将他们都转变为大写,用SeqRecord
对象的upper()
方法可以很容易地实现:
from Bio import SeqIO
records = (rec.upper() for rec in SeqIO.parse("mixed.fasta", "fasta"))
count = SeqIO.write(records, "upper.fasta", "fasta")
print("Converted %i records to upper case" % count)
可以看到作者大量应用了生成表达式来处理文件,因为使用生成表达式的好处在于一次只有一条记录保存在内存中,而不是把所有文件都读入内存。
比如你想对一些序列按照序列的长度来排序,要是较小的序列文件你可以直接读入:
from Bio import SeqIO
records = list(SeqIO.parse("ls_orchid.fasta", "fasta"))
records.sort(key=lambda r: len(r))
SeqIO.write(records, "sorted_orchids.fasta", "fasta")
如果需要按降序排序,可以这样:
from Bio import SeqIO
records = list(SeqIO.parse("ls_orchid.fasta", "fasta"))
records.sort(key=lambda r: -len(r))
SeqIO.write(records, "sorted_orchids.fasta", "fasta")
但是面对非常大的文件,你不能直接全都读到内存里,这时可使用bio.seqio.index()
:
from Bio import SeqIO
# Get the lengths and ids, and sort on length
len_and_ids = sorted((len(rec), rec.id) for rec in
SeqIO.parse("ls_orchid.fasta", "fasta"))
ids = reversed([id for (length, id) in len_and_ids])
del len_and_ids # free this memory
record_index = SeqIO.index("ls_orchid.fasta", "fasta")
records = (record_index[id] for id in ids)
SeqIO.write(records, "sorted.fasta", "fasta")
首先,使用Bio.SeqIO.parse()
扫描整个文件,在元组中保存序列 ID 及其长度。然后我们对此列表进行排序使它们按长度顺序排列,并丢弃长度。用SeqIO.index()
对所有序列建立索引,使我们可根据 ID 列表来检索序列,并将它们传递给Bio.SeqIO.write()
进行输出。
对测序文件进行质控是生信分析中最基础的工作,下面的代码展示了如何对SeqRecord
对象进行质控,选取最小质量值超过20的序列:
from Bio import SeqIO
good_reads = (rec for rec in \
SeqIO.parse("SRR020192.fastq", "fastq") \
if min(rec.letter_annotations["phred_quality"]) >= 20)
count = SeqIO.write(good_reads, "good_quality.fastq", "fastq")
print("Saved %i reads" % count)
可以发现rec.letter_annotations
就是包含每个碱基质量值的字典。通常原始测序数据较大,请尽量使用生成器表达式来减少内存使用。
在这个例子中,我们假设GATGACGGTGT
是存在于原始序列 5' 端的引物,首先,我们可以使用Bio.SeqIO
结合startswith
方法将有引物的序列输出:
from Bio import SeqIO
primer_reads = (rec for rec in \
SeqIO.parse("SRR020192.fastq", "fastq") \
if rec.seq.startswith("GATGACGGTGT"))
count = SeqIO.write(primer_reads, "with_primer.fastq", "fastq")
print("Saved %i reads" % count)
要是想在输出的同时把引物也去除掉,只要稍作改动即可:
from Bio import SeqIO
trimmed_primer_reads = (rec[11:] for rec in \
SeqIO.parse("SRR020192.fastq", "fastq") \
if rec.seq.startswith("GATGACGGTGT"))
count = SeqIO.write(trimmed_primer_reads, "with_primer_trimmed.fastq", "fastq")
print("Saved %i reads" % count)
要是想把有引物的序列去除引物后输出,没引物的序列原样输出,这需要定义一个function
:
from Bio import SeqIO
def trim_primer(record, primer):
if record.seq.startswith(primer):
return record[len(primer):]
else:
return record
trimmed_reads = (trim_primer(record, "GATGACGGTGT") for record in \
SeqIO.parse("SRR020192.fastq", "fastq"))
count = SeqIO.write(trimmed_reads, "trimmed.fastq", "fastq")
print("Saved %i reads" % count
此外,你也可以使用一个生成器函数来替代生成器表达式:
from Bio import SeqIO
def trim_primers(records, primer):
"""Removes perfect primer sequences at start of reads.
This is a generator function, the records argument should
be a list or iterator returning SeqRecord objects.
"""
len_primer = len(primer) #cache this for later
for record in records:
if record.seq.startswith(primer):
yield record[len_primer:]
else:
yield record
original_reads = SeqIO.parse("SRR020192.fastq", "fastq")
trimmed_reads = trim_primers(original_reads, "GATGACGGTGT")
count = SeqIO.write(trimmed_reads, "trimmed.fastq", "fastq")
print("Saved %i reads" % count)
这次我们同样假设GATGACGGTGT
是接头序列,在原始序列的任何地方都有可能出现这个序列,而不仅仅是在开头:
from Bio import SeqIO
def trim_adaptors(records, adaptor):
"""Trims perfect adaptor sequences.
This is a generator function, the records argument should
be a list or iterator returning SeqRecord objects.
"""
len_adaptor = len(adaptor) #cache this for later
for record in records:
index = record.seq.find(adaptor)
if index == -1:
#adaptor not found, so won't trim
yield record
else:
#trim off the adaptor
yield record[index+len_adaptor:]
original_reads = SeqIO.parse("SRR020192.fastq", "fastq")
trimmed_reads = trim_adaptors(original_reads, "GATGACGGTGT")
count = SeqIO.write(trimmed_reads, "trimmed.fastq", "fastq")
print("Saved %i reads" % count)
有一些reads
在切除接头以后被剪切得非常短,所以我们还可以设置一个最短长度:
from Bio import SeqIO
def trim_adaptors(records, adaptor, min_len):
"""Trims perfect adaptor sequences, checks read length.
This is a generator function, the records argument should
be a list or iterator returning SeqRecord objects.
"""
len_adaptor = len(adaptor) #cache this for later
for record in records:
len_record = len(record) #cache this for later
if len_record < min_len:
#Too short to keep
continue
index = record.seq.find(adaptor)
if index == -1:
#adaptor not found, so won't trim
yield record
elif len_record - index - len_adaptor >= min_len:
#after trimming this will still be long enough
yield record[index+len_adaptor:]
original_reads = SeqIO.parse("SRR020192.fastq", "fastq")
trimmed_reads = trim_adaptors(original_reads, "GATGACGGTGT", 100)
count = SeqIO.write(trimmed_reads, "trimmed.fastq", "fastq")
print("Saved %i reads" % count)
当然对于上面的代码,你还可以做一些改进,比如模糊匹配,支持双端reads
,在去除接头的同时进行质控等等。
FASTQ 原始文件通常很大,导致你可能无法一次性地将数据导入内存中,所以前面我们常用生成器表达式来一次只读取一条记录。但也有时候你没必要使用一个大的循环或迭代器,你可能只需要通过每条reads
的名词从原始文件中取序列,使用Bio.SeqIO.index()
可以很轻易地完成:
>>> from Bio import SeqIO
>>> fq_dict = SeqIO.index("SRR020192.fastq", "fastq")
>>> len(fq_dict)
41892
>>> fq_dict.keys()[:4]
['SRR020192.38240', 'SRR020192.23181', 'SRR020192.40568', 'SRR020192.23186']
>>> fq_dict["SRR020192.23186"].seq
Seq('GTCCCAGTATTCGGATTTGTCTGCCAAAACAATGAAATTGACACAGTTTACAAC...CCG', SingleLetterAlphabet())