Skip to content

Commit

Permalink
check in chunk before tree
Browse files Browse the repository at this point in the history
  • Loading branch information
phoenixAja committed Jan 18, 2024
1 parent e411f50 commit 7489460
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 72 deletions.
7 changes: 1 addition & 6 deletions workflows/index-generation/ncbi-compress/src/commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ pub mod commands {
use std::fs;

use bio::io::fasta;
use tempdir::TempDir;

use crate::ncbi_compress::ncbi_compress;
use crate::fasta_tools::fasta_tools;
Expand Down Expand Up @@ -253,10 +252,7 @@ pub mod commands {

// create a temp dir containing one file per taxid that input fasta accessions are sorted into
fs::create_dir_all(&temp_file_output_dir).expect("Error creating output directory");
let temp_taxid_dir = TempDir::new_in(temp_file_output_dir, "accessions_by_taxid")
.expect("Error creating tempdir");
let temp_taxid_dir_str = format!("{}/accessions_by_taxid", temp_file_output_dir);
;
ncbi_compress::split_accessions_by_taxid(
input_fasta_path,
accession_mapping_files,
Expand Down Expand Up @@ -333,8 +329,7 @@ mod tests {
logging_contained_in_tree_fn,
logging_contained_in_chunk_fn,
);

assert!(util::are_files_equal(truth_fasta_path, &test_fasta_path));
util::compare_fasta_records_from_files(&truth_fasta_path, &test_fasta_path);
}

#[test]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,6 @@ mod tests {

#[test]
fn test_sort_taxid_dir_by_sequence_length() {
use crate::util::util::compare_fasta_records_from_files;
let temp_dir = tempdir().unwrap();
let temp_dir_path_str = temp_dir.path().to_str().unwrap();
let output_truth_taxid_dir = "test_data/fasta_tools/truth_outputs/sorted_taxids";
Expand Down
145 changes: 80 additions & 65 deletions workflows/index-generation/ncbi-compress/src/ncbi_compress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ pub mod ncbi_compress {
use sourmash::signature::SigsTrait;
use sourmash::sketch::minhash::KmerMinHash;

use crate::fasta_tools::fasta_tools::sort_fasta_by_sequence_length;
use crate::logging::logging;
use crate::minhashtree_w_logging::minhashtree_w_logging::{
MinHashTreeWithLogging, MinHashTreeWithLoggingFunctionality,
Expand Down Expand Up @@ -346,45 +345,87 @@ pub mod ncbi_compress {
) {
let reader = fasta::Reader::from_file(&input_fasta_path).unwrap();
// let mut tree: BloomSetTree<_, 4096> = BloomSetTree::new(branch_factor);
let mut sketches = Vec::new();
let mut sketches: Vec<KmerMinHash> = Vec::new();

// Initialize a temporary vector to store the unique items from each chunk
let mut tmp: Vec<KmerMinHash> = Vec::with_capacity(chunk_size);
let writer = Mutex::new(writer);

let mut records_iter = reader.records();
let mut chunk = records_iter
.borrow_mut()
.take(chunk_size)
.collect::<Vec<_>>();
accession_count.add_assign(chunk.len() as u64);
while chunk.len() > 0 {
// Take a chunk of accessions and return the ones that are not already in sketches


loop {
// Initialize a temporary vector to store the unique items from each chunk
let mut unique_in_chunk: Vec<(KmerMinHash, &fasta::Record)> = Vec::with_capacity(chunk_size);
let mut unique_in_tree_and_chunk: Vec<KmerMinHash> = Vec::with_capacity(chunk_size);

let chunk = records_iter
.borrow_mut()
.take(chunk_size)
.collect::<Vec<_>>();

accession_count.add_assign(chunk.len() as u64);

if chunk.len() == 0 {
break;
}

// create signatures for each record in the chunk
let chunk_signatures = chunk
.par_iter()
.filter_map(|r| {
let record = r.as_ref().unwrap();
let mut hash;
if is_protein_fasta {
hash = KmerMinHash::new(
scaled,
k,
HashFunctions::murmur64_protein,
seed,
false,
0,
);
hash.add_protein(record.seq()).unwrap();
} else {
hash = KmerMinHash::new(
scaled,
k,
HashFunctions::murmur64_DNA,
seed,
false,
0,
.par_iter()
.filter_map(|r| {
let record = r.as_ref().unwrap();
let mut hash;
if is_protein_fasta {
hash = KmerMinHash::new(
scaled,
k,
HashFunctions::murmur64_protein,
seed,
false,
0,
);
hash.add_protein(record.seq()).unwrap();
Some((hash, record))
} else {
hash = KmerMinHash::new(
scaled,
k,
HashFunctions::murmur64_DNA,
seed,
false,
0,
);
hash.add_sequence(record.seq(), true).unwrap();
Some((hash, record))
}
}).collect::<Vec<_>>();

// we need to make sure records within the chunk arn't similar to each other before
// we check them against the larger tree
for (hash, record) in chunk_signatures {
let similar = unique_in_chunk
.par_iter()
.any(|(other, _record)| containment(&hash, &other).unwrap() >= similarity_threshold);

if !similar {
unique_accession_count.add_assign(1);
unique_in_chunk.push((hash, record));

if *unique_accession_count % 1_000_000 == 0 {
log::info!(
"Processed {} accessions, {} unique",
accession_count,
unique_accession_count
);
hash.add_sequence(record.seq(), true).unwrap();
}
}
}

// Check if any of unique hashes in the chunk have similarity to the ones already in the tree
unique_in_tree_and_chunk = unique_in_chunk
.par_iter()
.filter_map(|(hash, record)| {
// Take a chunk of accessions and return the ones that are not already in sketches

// Do a parallel search over the sketches to see if any are similar don't
// return the item if you find a similar item. We are implementing this
// manually instead of using the linear index built into sourmash because
Expand All @@ -399,44 +440,18 @@ pub mod ncbi_compress {
{
None
} else {
Some((record, hash))
let mut writer = writer.lock().unwrap();
writer.write_record(record).unwrap();
Some(hash.clone())
}
})
.collect::<Vec<_>>();

// Now we have a chunk of signatures that aren't in sketches, we just need to make sure
// they don't duplicate each other then we can add them. Breaking it up this way
// enables a bit more parallelism on the big search, it makes more sense if we have a
// more efficient search structure for all of the sketches.
for (record, hash) in chunk_signatures {
// Perform a faster similarity check over just this chunk because we may have similarities within a chunk
let similar = tmp
.par_iter()
.any(|other| containment(&hash, &other).unwrap() >= similarity_threshold);

if !similar {
unique_accession_count.add_assign(1);
tmp.push(hash);
writer.write_record(record).unwrap();

if *unique_accession_count % 1_000_000 == 0 {
log::info!(
"Processed {} accessions, {} unique",
accession_count,
unique_accession_count
);
}
}
}
// for hash in tmp {
// tree.insert(hash);
// }
sketches.extend_from_slice(&tmp);
tmp.clear();
chunk = records_iter
.borrow_mut()
.take(chunk_size)
.collect::<Vec<_>>();

sketches.extend_from_slice(unique_in_tree_and_chunk.as_slice());
}
}
}
Expand Down

0 comments on commit 7489460

Please sign in to comment.