Skip to content

Latest commit

 

History

History
345 lines (267 loc) · 13.3 KB

处理序列文件.md

File metadata and controls

345 lines (267 loc) · 13.3 KB

处理序列文件

翻译整理自 Biopython Tutorial and Cookbook 第20章

根据 ID 列表从 FASTA/FATSQ 文件中取出序列

当你有一个大的序列文件比如(FASTA , FASTQ 或 SFF),和一个包含你感兴趣的 ID 列表(这个列表的第一列为 ID),而你希望根据这个列表从序列文件中取出想要的序列。试试下面的代码:

from Bio import SeqIO

input_file = "big_file.sff"
id_file = "short_list.txt"
output_file = "short_list.sff"

with open(id_file) as id_handle:
    wanted = set(line.rstrip("\n").split(None,1)[0] for line in id_handle)
print("Found %i unique identifiers in %s" % (len(wanted), id_file))

records = (r for r in SeqIO.parse(input_file, "sff") if r.id in wanted)
count = SeqIO.write(records, output_file, "sff")
print("Saved %i records from %s to %s" % (count, input_file, output_file))
if count < len(wanted):
    print("Warning %i IDs not found in %s" % (len(wanted) - count, input_file))

对于处理较大的高通量测序文件时,最好不要直接使用SeqIO,而是直接使用字符串。这个例子,展示了如何对 FATSQ 文件执行此操作:

from Bio.SeqIO.QualityIO import FastqGeneralIterator

input_file = "big_file.fastq"
id_file = "short_list.txt"
output_file = "short_list.fastq"

with open(id_file) as id_handle:
    # Taking first word on each line as an identifer
    wanted = set(line.rstrip("\n").split(None,1)[0] for line in id_handle)
print("Found %i unique identifiers in %s" % (len(wanted), id_file))

with open(input_file) as in_handle:
    with open(output_file, "w") as out_handle:
        for title, seq, qual in FastqGeneralIterator(in_handle):
            # The ID is the first word in the title line (after the @ sign):
            if title.split(None, 1)[0] in wanted:
                # This produces a standard 4-line FASTQ entry:
                out_handle.write("@%s\n%s\n+\n%s\n" % (title, seq, qual))
                count += 1
print("Saved %i records from %s to %s" % (count, input_file, output_file))
if count < len(wanted):
    print("Warning %i IDs not found in %s" % (len(wanted) - count, input_file))

生成随机基因组

当你正在检查基因组序列,寻找某些序列特征(也许有一些极端的局部GC%偏差,或一些可能的限制性酶切位点)。在使用你的 Python代码 之前,最好在根据相同基因组生成的随机序列上运行一下(毕竟,你发现的任何“特征”都可能是随机的)。

从 NCBI 获取鼠疫耶尔森氏菌pPCP1质粒序列的 FASTA 格式文件(NC_005816) 此文件只有一条序列,因此我们可以使用Bio.SeqIO.read()函数将其作为SeqRecord对象读取:

from Bio import SeqIO
original_rec = SeqIO.read("NC_005816.fasta", "fasta")

这里我们会用到 Python 内置的random模块来随机化序列,特别是random.shuffle方法。但是这个方法只能用于 Python 列表,而我们的序列是Seq对象,所以需要转换为列表:

import random
nuc_list = list(original_rec.seq)
random.shuffle(nuc_list)

为了输出重排的序列,需要将这个随机序列的列表转化为字符串,进而生成Seq对象:

from Bio.Seq import Seq
from Bio.SeqRecord import SeqRecord
shuffled_rec = SeqRecord(Seq("".join(nuc_list), original_rec.seq.alphabet),id="Shuffled", description="Based on %s" % original_rec.id)

更进一步,我们可以生成30个根据原始序列生成的随机序列:

import random
from Bio.Seq import Seq
from Bio.SeqRecord import SeqRecord
from Bio import SeqIO

original_rec = SeqIO.read("NC_005816.fasta", "fasta")

with open("shuffled.fasta", "w") as output_handle:
    for i in range(30):
        nuc_list = list(original_rec.seq)
        random.shuffle(nuc_list)
        shuffled_rec = SeqRecord(Seq("".join(nuc_list), original_rec.seq.alphabet),id="Shuffled%i" % (i+1),description="Based on %s" % original_rec.id)
        output_handle.write(shuffled_rec.format("fasta"))

另一种更好的方式就是把这个功能包装成函数:

import random
from Bio.Seq import Seq
from Bio.SeqRecord import SeqRecord
from Bio import SeqIO

def make_shuffle_record(record, new_id):
    nuc_list = list(record.seq)
    random.shuffle(nuc_list)
    return SeqRecord(Seq("".join(nuc_list), record.seq.alphabet),id=new_id, description="Based on %s" % original_rec.id)

original_rec = SeqIO.read("NC_005816.fasta","fasta")
shuffled_recs = (make_shuffle_record(original_rec, "Shuffled%i" % (i+1)) for i in range(30))
SeqIO.write(shuffled_recs, "shuffled.fasta", "fasta")

翻译 CDS 序列

现在你手头有一些CDS序列(FASTA格式),要想把他们翻译成氨基酸序列,你肯定会想到用Seq对象的translate方法:

from Bio.SeqRecord import SeqRecord
def make_protein_record(nuc_record):
    """Returns a new SeqRecord with the translated sequence (default table)."""
    return SeqRecord(seq = nuc_record.seq.translate(cds=True), \
                     id = "trans_" + nuc_record.id, \
                     description = "translation of CDS, using default table")

然后,我们可以使用这个函数将输入的核苷酸转换蛋白序列。比较优雅的方式就是使用生成表达式:

from Bio import SeqIO
proteins = (make_protein_record(nuc_rec) for nuc_rec in \
            SeqIO.parse("coding_sequences.fasta", "fasta"))
SeqIO.write(proteins, "translations.fasta", "fasta")

将 FASTA 文件中的序列变为大写

有时候 FASTA 文件中的序列包含了大写和小写字母(小写字母为低质量区域),你希望将他们都转变为大写,用SeqRecord对象的upper()方法可以很容易地实现:

from Bio import SeqIO
records = (rec.upper() for rec in SeqIO.parse("mixed.fasta", "fasta"))
count = SeqIO.write(records, "upper.fasta", "fasta")
print("Converted %i records to upper case" % count)

可以看到作者大量应用了生成表达式来处理文件,因为使用生成表达式的好处在于一次只有一条记录保存在内存中,而不是把所有文件都读入内存。

对序列文件进行排序

比如你想对一些序列按照序列的长度来排序,要是较小的序列文件你可以直接读入:

from Bio import SeqIO
records = list(SeqIO.parse("ls_orchid.fasta", "fasta"))
records.sort(key=lambda r: len(r))
SeqIO.write(records, "sorted_orchids.fasta", "fasta")

如果需要按降序排序,可以这样:

from Bio import SeqIO
records = list(SeqIO.parse("ls_orchid.fasta", "fasta"))
records.sort(key=lambda r: -len(r))
SeqIO.write(records, "sorted_orchids.fasta", "fasta")

但是面对非常大的文件,你不能直接全都读到内存里,这时可使用bio.seqio.index()

from Bio import SeqIO
# Get the lengths and ids, and sort on length
len_and_ids = sorted((len(rec), rec.id) for rec in
                     SeqIO.parse("ls_orchid.fasta", "fasta"))
ids = reversed([id for (length, id) in len_and_ids])
del len_and_ids  # free this memory
record_index = SeqIO.index("ls_orchid.fasta", "fasta")
records = (record_index[id] for id in ids)
SeqIO.write(records, "sorted.fasta", "fasta")

首先,使用Bio.SeqIO.parse()扫描整个文件,在元组中保存序列 ID 及其长度。然后我们对此列表进行排序使它们按长度顺序排列,并丢弃长度。用SeqIO.index()对所有序列建立索引,使我们可根据 ID 列表来检索序列,并将它们传递给Bio.SeqIO.write()进行输出。

对 FASTQ 文件进行简单的质控

对测序文件进行质控是生信分析中最基础的工作,下面的代码展示了如何对SeqRecord对象进行质控,选取最小质量值超过20的序列:

from Bio import SeqIO

good_reads = (rec for rec in \
              SeqIO.parse("SRR020192.fastq", "fastq") \
              if min(rec.letter_annotations["phred_quality"]) >= 20)
count = SeqIO.write(good_reads, "good_quality.fastq", "fastq")
print("Saved %i reads" % count)

可以发现rec.letter_annotations就是包含每个碱基质量值的字典。通常原始测序数据较大,请尽量使用生成器表达式来减少内存使用。

切除引物序列

在这个例子中,我们假设GATGACGGTGT是存在于原始序列 5' 端的引物,首先,我们可以使用Bio.SeqIO结合startswith方法将有引物的序列输出:

from Bio import SeqIO
primer_reads = (rec for rec in \
                SeqIO.parse("SRR020192.fastq", "fastq") \
                if rec.seq.startswith("GATGACGGTGT"))
count = SeqIO.write(primer_reads, "with_primer.fastq", "fastq")
print("Saved %i reads" % count)

要是想在输出的同时把引物也去除掉,只要稍作改动即可:

from Bio import SeqIO
trimmed_primer_reads = (rec[11:] for rec in \
                        SeqIO.parse("SRR020192.fastq", "fastq") \
                        if rec.seq.startswith("GATGACGGTGT"))
count = SeqIO.write(trimmed_primer_reads, "with_primer_trimmed.fastq", "fastq")
print("Saved %i reads" % count)

要是想把有引物的序列去除引物后输出,没引物的序列原样输出,这需要定义一个function

from Bio import SeqIO
def trim_primer(record, primer):
    if record.seq.startswith(primer):
        return record[len(primer):]
    else:
        return record

trimmed_reads = (trim_primer(record, "GATGACGGTGT") for record in \
                 SeqIO.parse("SRR020192.fastq", "fastq"))
count = SeqIO.write(trimmed_reads, "trimmed.fastq", "fastq")
print("Saved %i reads" % count

此外,你也可以使用一个生成器函数来替代生成器表达式:

from Bio import SeqIO
def trim_primers(records, primer):
    """Removes perfect primer sequences at start of reads.

    This is a generator function, the records argument should
    be a list or iterator returning SeqRecord objects.
    """
    len_primer = len(primer) #cache this for later
    for record in records:
        if record.seq.startswith(primer):
            yield record[len_primer:]
        else:
            yield record

original_reads = SeqIO.parse("SRR020192.fastq", "fastq")
trimmed_reads = trim_primers(original_reads, "GATGACGGTGT")
count = SeqIO.write(trimmed_reads, "trimmed.fastq", "fastq")
print("Saved %i reads" % count)

切除接头序列

这次我们同样假设GATGACGGTGT是接头序列,在原始序列的任何地方都有可能出现这个序列,而不仅仅是在开头:

from Bio import SeqIO

def trim_adaptors(records, adaptor):
    """Trims perfect adaptor sequences.

    This is a generator function, the records argument should
    be a list or iterator returning SeqRecord objects.
    """
    len_adaptor = len(adaptor) #cache this for later
    for record in records:
        index = record.seq.find(adaptor)
        if index == -1:
            #adaptor not found, so won't trim
            yield record
        else:
            #trim off the adaptor
            yield record[index+len_adaptor:]

original_reads = SeqIO.parse("SRR020192.fastq", "fastq")
trimmed_reads = trim_adaptors(original_reads, "GATGACGGTGT")
count = SeqIO.write(trimmed_reads, "trimmed.fastq", "fastq")
print("Saved %i reads" % count)

有一些reads在切除接头以后被剪切得非常短,所以我们还可以设置一个最短长度:

from Bio import SeqIO

def trim_adaptors(records, adaptor, min_len):
    """Trims perfect adaptor sequences, checks read length.

    This is a generator function, the records argument should
    be a list or iterator returning SeqRecord objects.
    """
    len_adaptor = len(adaptor) #cache this for later
    for record in records:
        len_record = len(record) #cache this for later
        if len_record < min_len:
           #Too short to keep
           continue
        index = record.seq.find(adaptor)
        if index == -1:
            #adaptor not found, so won't trim
            yield record
        elif len_record - index - len_adaptor >= min_len:
            #after trimming this will still be long enough
            yield record[index+len_adaptor:]

original_reads = SeqIO.parse("SRR020192.fastq", "fastq")
trimmed_reads = trim_adaptors(original_reads, "GATGACGGTGT", 100)
count = SeqIO.write(trimmed_reads, "trimmed.fastq", "fastq")
print("Saved %i reads" % count)

当然对于上面的代码,你还可以做一些改进,比如模糊匹配,支持双端reads,在去除接头的同时进行质控等等。

对 FASTQ 文件建立索引

FASTQ 原始文件通常很大,导致你可能无法一次性地将数据导入内存中,所以前面我们常用生成器表达式来一次只读取一条记录。但也有时候你没必要使用一个大的循环或迭代器,你可能只需要通过每条reads的名词从原始文件中取序列,使用Bio.SeqIO.index()可以很轻易地完成:

>>> from Bio import SeqIO
>>> fq_dict = SeqIO.index("SRR020192.fastq", "fastq")
>>> len(fq_dict)
41892
>>> fq_dict.keys()[:4]
['SRR020192.38240', 'SRR020192.23181', 'SRR020192.40568', 'SRR020192.23186']
>>> fq_dict["SRR020192.23186"].seq
Seq('GTCCCAGTATTCGGATTTGTCTGCCAAAACAATGAAATTGACACAGTTTACAAC...CCG', SingleLetterAlphabet())