Skip to content

Commit

Permalink
NCBI Compress - parallel split by taxid (#314)
Browse files Browse the repository at this point in the history
  • Loading branch information
phoenixAja authored Jan 18, 2024
1 parent 12bb2d6 commit e411f50
Show file tree
Hide file tree
Showing 21 changed files with 12,459 additions and 8,240 deletions.
60 changes: 10 additions & 50 deletions workflows/index-generation/index-generation.wdl
Original file line number Diff line number Diff line change
Expand Up @@ -53,18 +53,11 @@ workflow index_generation {
}

if (!skip_protein_compression) {
call SortFasta as SortFastaNR {
input:
fasta = DownloadNR.nr,
combined_sorted_path = "combined_sorted_nr.fa",
cpu = 64,
docker_image_id = docker_image_id
}

call CompressDatabase as CompressNR {
input:
database_type = "nr",
sorted_fasta = SortFastaNR.sorted,
fasta = DownloadNR.nr,
accession2taxid_files = [DownloadAccession2Taxid.pdb, DownloadAccession2Taxid.prot],
k = nr_compression_k,
scaled = nr_compression_scaled,
Expand All @@ -77,18 +70,10 @@ workflow index_generation {
File nr_or_compressed = select_first([CompressNR.compressed, DownloadNR.nr])

if (!skip_nuc_compression) {
call SortFasta as SortFastaNT {
input:
fasta = DownloadNT.nt,
combined_sorted_path = "combined_sorted_nt.fa",
cpu = 64,
docker_image_id = docker_image_id
}

call CompressDatabase as CompressNT {
input:
database_type = "nt",
sorted_fasta = SortFastaNT.sorted,
fasta = DownloadNT.nt,
accession2taxid_files = [DownloadAccession2Taxid.nucl_wgs, DownloadAccession2Taxid.nucl_gb],
k = nt_compression_k,
scaled = nt_compression_scaled,
Expand Down Expand Up @@ -617,40 +602,10 @@ task GenerateIndexMinimap2 {
}
}

task SortFasta {
input {
File fasta
String combined_sorted_path
Int cpu
Int threads = if cpu * 0.6 < 1 then 1 else floor(cpu * 0.6)
String docker_image_id
}

command <<<
# Sort NT/NR by length with the longer sequences first
# This is needed because the downstream compression algorithm iterates through NT/NR in order only emitting
# sequences if they are not contained by what it has already seen. If a shorter sequence is
# contained by a longer sequence, and the shorter sequence were to come first, it would be emitted
# even though it is redundant to the longer sequence.
set -euxo pipefail

ncbi-compress sort-fasta-by-sequence-length \
--input-fasta ~{fasta} \
--output ~{combined_sorted_path}
>>>
output {
File sorted = combined_sorted_path
}
runtime {
docker: docker_image_id
cpu: cpu
}
}

task CompressDatabase {
input {
String database_type = "nt" # nt or nr
File sorted_fasta
File fasta
Array[File] accession2taxid_files
Int k
Int scaled
Expand All @@ -664,19 +619,24 @@ task CompressDatabase {

READS_BY_TAXID_PATH=reads_by_taxid
SPLIT_APART_TAXID_DIR_NAME=split_apart_taxid
SORTED_TAXID_DIR_NAME=sorted_taxid

# It is critical that this split happens in the same step as the compression
# If the directory is a step output it will be uploaded, which takes an enormous amount of time
ncbi-compress break-into-individual-taxids-only \
--input-fasta ~{sorted_fasta} \
--input-fasta ~{fasta} \
--accession-mapping-files ~{sep=" " accession2taxid_files} \
--output-dir $READS_BY_TAXID_PATH

ncbi-compress sort-taxid-dir-by-sequence-length \
--input-taxid-dir $READS_BY_TAXID_PATH \
--output-taxid-dir $SORTED_TAXID_DIR_NAME

mkdir $SPLIT_APART_TAXID_DIR_NAME

if [ "~{logging_enabled}" == "true" ]; then
ncbi-compress fasta-compress-from-taxid-dir ~{if database_type == "nr" then "--is-protein-fasta" else ""} \
--input-fasta-dir $READS_BY_TAXID_PATH \
--input-fasta-dir $SORTED_TAXID_DIR_NAME \
--output-fasta ~{database_type}_compressed.fa \
--k ~{k} \
--scaled ~{scaled} \
Expand Down
27 changes: 14 additions & 13 deletions workflows/index-generation/ncbi-compress/src/commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ pub mod commands {
use tempdir::TempDir;

use crate::ncbi_compress::ncbi_compress;
use crate::fasta_tools::fasta_tools;

fn fasta_compress_w_logging_option(
input_fasta_path: &str,
Expand Down Expand Up @@ -254,19 +255,21 @@ pub mod commands {
fs::create_dir_all(&temp_file_output_dir).expect("Error creating output directory");
let temp_taxid_dir = TempDir::new_in(temp_file_output_dir, "accessions_by_taxid")
.expect("Error creating tempdir");
let temp_taxid_dir_str = temp_taxid_dir.path().to_str().unwrap();

// let taxid_dir_str = format!("{}/accessions_by_taxid", temp_file_output_dir);
let taxid_dir = ncbi_compress::split_accessions_by_taxid(
let temp_taxid_dir_str = format!("{}/accessions_by_taxid", temp_file_output_dir);
;
ncbi_compress::split_accessions_by_taxid(
input_fasta_path,
accession_mapping_files,
&temp_taxid_dir_str,
);

let sorted_taxid_dir = format!("{}/sorted_taxid_dir", temp_file_output_dir);
fasta_tools::sort_taxid_dir_by_sequence_length(&temp_taxid_dir_str, &sorted_taxid_dir);

let mut accession_count = 0;
let mut unique_accession_count = 0;
let mut writer = fasta::Writer::to_file(output_fasta_path).unwrap();
for (_i, entry) in fs::read_dir(taxid_dir).unwrap().enumerate() {
for (_i, entry) in fs::read_dir(sorted_taxid_dir).unwrap().enumerate() {
let entry = entry.unwrap();
let path = entry.path();
let input_fasta_path = path.to_str().unwrap();
Expand Down Expand Up @@ -300,8 +303,7 @@ mod tests {
#[test]
fn test_fasta_compress_from_fasta_skip_split_by_taxid() {
let input_fasta_path = "test_data/fasta_tools/inputs/nt";
let truth_fasta_path =
"test_data/commands/fasta_compress_from_fasta_skip_split_by_taxid/truth-ouputs/nt_out.fa";
let truth_fasta_path = "test_data/commands/common_truth_output/nt_out.fa";
let test_directory = tempdir().unwrap();
let temp_dir_path_str = test_directory.path().to_str().unwrap();
let test_fasta_path = format!("{}/nt.fa", temp_dir_path_str);
Expand Down Expand Up @@ -337,6 +339,7 @@ mod tests {

#[test]
fn test_fasta_compress_from_fasta_end_to_end() {

let input_fasta_path = "test_data/fasta_tools/inputs/nt";
let truth_fasta_path = "test_data/commands/common_truth_output/nt_out.fa";
let test_directory = tempdir().unwrap();
Expand Down Expand Up @@ -365,7 +368,6 @@ mod tests {
let enable_sequence_retention_logging = false;
let logging_contained_in_tree_fn = "";
let logging_contained_in_chunk_fn = "";

commands::fasta_compress_end_to_end(
input_fasta_path,
input_mapping_file_paths,
Expand All @@ -382,8 +384,8 @@ mod tests {
logging_contained_in_tree_fn,
logging_contained_in_chunk_fn,
);
util::compare_fasta_records_from_files(&truth_fasta_path, &test_fasta_path);

assert!(util::are_files_equal(truth_fasta_path, &test_fasta_path));
}

#[test]
Expand Down Expand Up @@ -422,12 +424,11 @@ mod tests {
logging_contained_in_chunk_fn,
);

assert!(util::are_files_equal(truth_fasta_path, &output_fasta_path));
util::compare_fasta_records_from_files(truth_fasta_path, &output_fasta_path);
}

#[test]
fn test_split_fasta_into_chunks() {
use crate::util::util::create_fasta_records_from_file;
use bio::io::fasta;

use crate::commands::commands::count_fasta_reads;
Expand Down Expand Up @@ -469,9 +470,9 @@ mod tests {
.unwrap();

let mut chunk_1_reader =
create_fasta_records_from_file(format!("{}/123_1.fa", temp_dir_path_str).as_str());
util::create_fasta_records_from_file(format!("{}/123_1.fa", temp_dir_path_str).as_str());
let mut chunk_2_reader =
create_fasta_records_from_file(format!("{}/123_2.fa", temp_dir_path_str).as_str());
util::create_fasta_records_from_file(format!("{}/123_2.fa", temp_dir_path_str).as_str());

let mut expected_chunk_1_records = vec![
fasta::Record::with_attrs("1", None, b"ACGT"),
Expand Down
62 changes: 48 additions & 14 deletions workflows/index-generation/ncbi-compress/src/fasta_tools.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ pub mod fasta_tools {

use bio::io::fasta;
use rayon::slice::ParallelSliceMut;
use rayon::prelude::*;


struct OffsetWriter<'a> {
file: &'a mut fs::File,
Expand Down Expand Up @@ -83,6 +85,8 @@ pub mod fasta_tools {
let mut records = fasta::Reader::from_file(&input_fasta_path)
.unwrap() // unwrap left here because this is an anyhow error
.records();


while let Some(Ok(record)) = records.next() {
fasta::Writer::new(&mut scratch).write_record(&record)?;
let sequence_length = record.seq().len();
Expand Down Expand Up @@ -142,6 +146,27 @@ pub mod fasta_tools {
}
Ok(())
}

pub fn sort_taxid_dir_by_sequence_length(input_taxid_dir: &str, output_taxid_dir: &str) {
fs::create_dir_all(&output_taxid_dir).expect("Error creating output directory");

// Read the directory and collect entries
let entries: Vec<_> = fs::read_dir(input_taxid_dir)
.expect("Failed to read directory")
.filter_map(Result::ok)
.collect();

entries.par_iter().for_each(|entry| {
let path = entry.path();
let input_fasta_path = path.to_str().unwrap();
let input_fasta_basename = path.file_name().unwrap().to_str().unwrap();
let output_fasta_path = format!("{}/{}", output_taxid_dir, input_fasta_basename);
let _ = sort_fasta_by_sequence_length(
input_fasta_path,
&output_fasta_path,
);
});
}
}

#[cfg(test)]
Expand All @@ -151,6 +176,7 @@ mod tests {
use tempfile::tempdir;

use crate::fasta_tools::fasta_tools;
use crate::util::util;

// Define a struct to hold both ID and sequence
#[derive(Eq, PartialEq)]
Expand All @@ -174,29 +200,19 @@ mod tests {

#[test]
fn test_sort_fasta_by_sequence_length() {
use crate::util::util::create_fasta_records_from_file;
let temp_dir = tempdir().unwrap();
let temp_file = temp_dir.path().join("sorted.fa");
let output_truth_fasta_file = "test_data/fasta_tools/truth_outputs/nt.sorted";
let input_fasta_file = "test_data/fasta_tools/inputs/nt";
let output_truth_fasta_file = "test_data/fasta_tools/truth_outputs/sorted_taxids/9771.fasta";
let input_fasta_file = "test_data/fasta_tools/inputs/unordered_taxids/9771.fasta";

let temp_file_path_str = temp_file.to_str().unwrap();
fasta_tools::sort_fasta_by_sequence_length(input_fasta_file, temp_file_path_str).unwrap();

let mut test_data_records = create_fasta_records_from_file(&temp_file_path_str);
let mut truth_data_records = create_fasta_records_from_file(&output_truth_fasta_file);

assert_eq!(
truth_data_records.sort(),
test_data_records.sort(),
"Files do not match: {:?}",
temp_file_path_str
);
util::compare_fasta_records_from_files(output_truth_fasta_file, temp_file_path_str);
}

#[test]
fn test_count_accessions_by_taxid() {
use crate::util::util::are_files_equal;
let output_truth_tsv_file = "test_data/fasta_tools/truth_outputs/count_accessions_by_taxid/output_counts.tsv";

let test_truth_tsv_file = tempfile::NamedTempFile::new().unwrap();
Expand All @@ -207,7 +223,25 @@ mod tests {
test_truth_tsv_file_path_str,
);

are_files_equal(output_truth_tsv_file, test_truth_tsv_file_path_str);
assert!(util::are_files_equal(output_truth_tsv_file, test_truth_tsv_file_path_str))
}

#[test]
fn test_sort_taxid_dir_by_sequence_length() {
use crate::util::util::compare_fasta_records_from_files;
let temp_dir = tempdir().unwrap();
let temp_dir_path_str = temp_dir.path().to_str().unwrap();
let output_truth_taxid_dir = "test_data/fasta_tools/truth_outputs/sorted_taxids";
let input_taxid_dir = "test_data/fasta_tools/inputs/unordered_taxids";

fasta_tools::sort_taxid_dir_by_sequence_length(input_taxid_dir, temp_dir_path_str);

for entry in std::fs::read_dir(temp_dir_path_str).unwrap() {
let entry = entry.unwrap();
let path = entry.path();
let output_fasta_file = path.to_str().unwrap();
let truth_fasta_file = format!("{}/{}", output_truth_taxid_dir, path.file_name().unwrap().to_str().unwrap());
assert!(util::are_files_equal(output_fasta_file, &truth_fasta_file));
}
}
}
27 changes: 26 additions & 1 deletion workflows/index-generation/ncbi-compress/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@ use ncbi_compress::commands::commands::{
fasta_compress_end_to_end, fasta_compress_from_fasta_skip_split_by_taxid,
fasta_compress_from_taxid_dir,
};
use ncbi_compress::fasta_tools::fasta_tools::{sort_fasta_by_sequence_length, count_accessions_by_taxid};
use ncbi_compress::fasta_tools::fasta_tools::{
sort_fasta_by_sequence_length,
count_accessions_by_taxid,
sort_taxid_dir_by_sequence_length
};
use ncbi_compress::ncbi_compress::ncbi_compress::split_accessions_by_taxid;

use ncbi_compress::logging::logging;
Expand All @@ -31,6 +35,22 @@ pub fn main() {
.required(true),
),
)
.subcommand(
Command::new("sort-taxid-dir-by-sequence-length")
.about("sort_taxid_dir_by_sequence_length")
.arg(
Arg::new("input_taxid_dir")
.help("Input directory of taxid fasta files")
.long("input-taxid-dir")
.required(true),
)
.arg(
Arg::new("output_taxid_dir")
.help("Output directory for the sorted taxid fasta files")
.long("output-taxid-dir")
.required(true),
),
)
.subcommand(
Command::new("break-into-individual-taxids-only")
.about("break_into_individual_taxids_only")
Expand Down Expand Up @@ -337,6 +357,11 @@ pub fn main() {
let output_fasta = sub_m.get_one::<String>("output_fasta").unwrap();
sort_fasta_by_sequence_length(input_fasta, output_fasta).unwrap();
}
Some(("sort-taxid-dir-by-sequence-length", sub_m)) => {
let input_taxid_dir = sub_m.get_one::<String>("input_taxid_dir").unwrap();
let output_taxid_dir = sub_m.get_one::<String>("output_taxid_dir").unwrap();
sort_taxid_dir_by_sequence_length(&input_taxid_dir, &output_taxid_dir);
}
Some(("break-into-individual-taxids-only", sub_m)) => {
let input_fasta = sub_m.get_one::<String>("input_fasta").unwrap();
let accession_mapping_files: Vec<String> = sub_m
Expand Down
Loading

0 comments on commit e411f50

Please sign in to comment.