Skip to content

Commit

Permalink
even older version
Browse files Browse the repository at this point in the history
  • Loading branch information
phoenixAja committed Feb 7, 2024
1 parent db7ac08 commit a213124
Showing 1 changed file with 29 additions and 34 deletions.
63 changes: 29 additions & 34 deletions workflows/index-generation/ncbi-compress/src/ncbi_compress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,45 +75,40 @@ pub mod ncbi_compress {
pub fn split_accessions_by_taxid(
input_fasta_path: &str,
mapping_file_path: Vec<String>,
output_dir: &str,
output_dir: &str

) -> std::path::PathBuf {
// create a temp dir containing one file per taxid that input fasta accessions are sorted into
// based on taxid in the mapping files (input fasta does not have taxid in header)

log::info!("Splitting accessions by taxid");
// Use a random string for the rocksdb directoru name
// A tempdir can be removed by the operating system before we are done with it which
// break the rocksdb. We need a random name so we can call this function multiple
// times from different threads during testing.
let rng = rand::thread_rng();
let dir: String = rng
.sample_iter(&Alphanumeric)
.take(20)
.map(char::from)
.collect();
let accession_to_taxid = rocksdb::DB::open_default(&dir).unwrap();
fs::create_dir_all(&output_dir).expect("Error creating output directory");

log::info!("Creating accession to taxid db");
log::info!("Creating accession to taxid mapping");

//let taxid_dir = TempDir::new_in(temp_file_output_dir, "accessions_by_taxid").unwrap();
// let taxid_path_str = format!("{}/accessions_by_taxid", temp_file_output_dir);
let taxid_path = Path::new(output_dir);
log::info!("Creating taxid dir {:?}", taxid_path);
let reader = fasta::Reader::from_file(&input_fasta_path).unwrap();
// Build a trie of the accessions in the input fasta
let mut builder = TrieBuilder::new();
reader.records().enumerate().for_each(|(i, result)| {
let record = result.unwrap();
let accession_id = record.id().split_whitespace().next().unwrap();
let accession_no_version = remove_accession_version(accession_id);
accession_to_taxid.put(accession_no_version, b"").unwrap();
if i % 1_000_000 == 0 {
builder.push(accession_no_version);
if i % 10_000 == 0 {
log::info!(" Processed {} accessions", i);
}
});
log::info!(" Finished loading accessions");
log::info!(" Started building accession trie");
let accessions_trie = builder.build();
log::info!(" Finished building accession trie");

mapping_file_path.par_iter().for_each(|mapping_file_path| {
// Build a trie of the accessions in the mapping files
let mut builder = TrieStoreBuilder::new();
mapping_file_path.iter().for_each(|mapping_file_path| {
log::info!(" Processing mapping file {:?}", mapping_file_path);
let reader = csv::ReaderBuilder::new()
.delimiter(b'\t')
Expand All @@ -130,11 +125,7 @@ pub mod ncbi_compress {
let accession_no_version = remove_accession_version(accession);

// Only output mappings if the accession is in the source fasta file
if accession_to_taxid
.get(accession_no_version)
.unwrap()
.is_none()
{
if !accessions_trie.exact_match(accession_no_version) {
return;
}

Expand All @@ -147,18 +138,18 @@ pub mod ncbi_compress {
record[2].parse::<u64>().unwrap()
};
added += 1;
// convert taxid to u64_vec
accession_to_taxid
.put(accession_no_version, taxid.to_be_bytes())
.unwrap();
builder.push(accession_no_version, taxid);

});
log::info!(
" Finished Processing mapping file {:?}, added {} mappings",
mapping_file_path,
added
);
});
log::info!("Finished building accession to taxid db");
log::info!(" Started building accession to taxid trie");
let accession_to_taxid = builder.build();
log::info!("Finished building accession to taxid trie");

log::info!("Splitting accessions by taxid");
let reader = fasta::Reader::from_file(&input_fasta_path).unwrap();
Expand All @@ -170,16 +161,13 @@ pub mod ncbi_compress {
let record = record.unwrap();
let accession_id = record.id().split_whitespace().next().unwrap();
let accession_no_version = remove_accession_version(accession_id);
let taxid = if let Some(taxid) = accession_to_taxid.get(accession_no_version).unwrap() {
if taxid.len() == 0 {
0 // no taxid found
} else {
u64::from_be_bytes(taxid.as_slice().try_into().unwrap())
}
let taxid = if let Some(taxid) = accession_to_taxid.get(accession_no_version) {
taxid
} else {
0 // no taxid found
};
let file_path = taxid_path.join(format!("{}.fasta", taxid));
// let file_path = taxid_dir.path().join(format!("{}.fasta", taxid));
let file = fs::OpenOptions::new()
.create(true)
.append(true)
Expand All @@ -188,9 +176,16 @@ pub mod ncbi_compress {
let mut writer = fasta::Writer::new(file);
writer.write_record(&record).unwrap();
}
fs::remove_dir_all(dir).expect("Error removing db");
log::info!("Finished splitting accessions by taxid");
// remove input fasta file
// match fs::remove_file(input_fasta_path) {
// Ok(()) => println!("input fasta deleted successfully"),
// Err(e) => println!("Error deleting input fasta : {:?}", e),
// }
// taxid_dir
log::info!("Finished splitting accessions by taxid");
taxid_path.to_path_buf()

}

pub fn fasta_compress_taxid_w_logging(
Expand Down

0 comments on commit a213124

Please sign in to comment.