Skip to content

Commit

Permalink
[docs] document diamond scatter
Browse files Browse the repository at this point in the history
  • Loading branch information
morsecodist committed Mar 8, 2024
1 parent a459b4c commit 14046ad
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 1 deletion.
27 changes: 27 additions & 0 deletions lib/idseq_utils/idseq_utils/diamond_scatter.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Diamond Scatter

`diamond_scatter.py` works with a [modified version of diamond](https://github.com/morsecodist/diamond) to use diamond's parallelization to work in separate jobs that can be run at any time instead of on an HPC cluster with a shared file system. This allows us to run alignment on batch with spot instances while dynamically scaling to meet uneven demand more easily.


The approach was to change the diamond code base as little as possible. That is why the `diamond_scatter.py` script exists, to control some aspects of the diamond parallelization management without adding it to diamond. If we want to fold this into diamond we would want to add what is in this script in addition to the fork but I am not sure this sort of parallelization is a goal of the diamond project so they may not be receptive.

# How Diamond Parallelization Works

Refer to [the docs](https://github.com/bbuchfink/diamond/wiki/6.-Distributed-computing) for a detailed explaination.

Diamond expects to run in different processes with a file system accessible by all processes. The directory contains a record of what work needs to be done that the different processes can update to communicate with one another. The diretory also allows the sharing of files between the processes. A parallel run starts with running an initialization command that creates the list of work needed for a particular alignment. Then, you can run as many processes as you want. Each processes will pick up a chunk of alignment work based on the work in the list, and will update the list when it begins. Each alignment chunk produces some intermediate files that they store in the directory. The final chunk, is able to detect it is the final chunk based on the work list and it has access to all of the intermediate files so it joins them.

# How Diamond was Modified

To make diamond run in parallel without a shared directory you need two main elements:

1. We need to split the joining from the alignment because the alignment chunk won't have all of the intermediate files. We want to be able to run the join later after we've gathered up all the intermediate files.
2. We need diamond to be able to run a single, specific, chunk and output intermediate files without deleting them.

To achieve 1 diamond was modified slightly to split the alignment command in two by adding flags: `--single-chunk` and `--join-chunks`. The `single-chunk` skips the checking for joining and cleaning up intermediate files and `join-chunks` skips alignment and goes right to that check.

Now all you need is to make sure the directory is in the correct state at each step of the processes. That is where the `diamond_scatter.py` script comes in. The script fills the role of initializing the parallel directory.

To do a single chunk of alignment work, instead of specifying all of the work in the work list the script just specifies that one chunk needs to be completed, but a specific chunk with a particular number. This is necessary because the intermediate file naming scheme uses chunk numbers so they will be needed for the final join. The modified diamond won't clean these up so you can copy them into remote storage and join at any time.

To join the chunks you just need to copy all of the intermediate files onto the node you are using to join. Thie script initializes the directory with the correct format and runs with the `join-chunks` flag.
37 changes: 36 additions & 1 deletion lib/idseq_utils/idseq_utils/diamond_scatter.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
"""
See diamond_scatter.md for a detailed description
"""

import os
import shutil
import sys
Expand Down Expand Up @@ -107,6 +111,9 @@ def zero_pad(n: int, m: int):


def make_par_dir(cwd: str, par_tmpdir: str):
"""
This creates a parallization coordination directory in the format diamond expects
"""
os.mkdir(join(cwd, par_tmpdir))
p_dir = join(cwd, par_tmpdir, "parallelizer")
os.mkdir(p_dir)
Expand All @@ -119,6 +126,11 @@ def make_par_dir(cwd: str, par_tmpdir: str):


def make_db(reference_fasta: str, output_dir: str, chunks: int):
"""
This creates a chunked diamond index. The chunks are named with a naming scheme such
that they have their chunk number, number of sequences, and number of letters. This
allows us to align these chunks without needing to store these parameteres separately.
"""
os.mkdir(output_dir)
chunk_size = (sum(1 for _ in SeqIO.parse(reference_fasta, "fasta")) // chunks) + 1
seqs = SeqIO.parse(reference_fasta, "fasta")
Expand All @@ -138,16 +150,22 @@ def make_db(reference_fasta: str, output_dir: str, chunks: int):


def blastx_chunk(db_chunk: str, output_dir: str, diamond_args: str, *query: str):
"""
Align one index chunk. Index chunks as created by `make_db`
"""
try:
os.mkdir(output_dir)
except OSError as e:
if e.errno != errno.EEXIST:
raise
# unpack the chunk number, n_seqs, and n_letters from the chunk naming scheme
chunk, n_seqs, n_letters = basename(db_chunk)[:-5].split("-")
with TemporaryDirectory() as tmp_dir:
make_par_dir(tmp_dir, "par-tmp")
# specify in the parallelization directory that we need to align this one chunk
with open(join(tmp_dir, "par-tmp", f"align_todo_{zero_pad(0, 6)}"), "w") as f:
f.writelines([align_chunk(int(chunk), 0, int(n_seqs), 0)])
# run the alignment
diamond_blastx(
cwd=tmp_dir,
par_tmpdir="par-tmp",
Expand All @@ -158,6 +176,8 @@ def blastx_chunk(db_chunk: str, output_dir: str, diamond_args: str, *query: str)
diamond_args=diamond_args,
queries=(abspath(q) for q in query),
)
# the intermediate output files will be named with this scheme
# construct their names so we can copy them as outputs
ref_block_name = f"ref_block_{zero_pad(0, 6)}_{zero_pad(int(chunk), 6)}"
ref_dict_name = f"ref_dict_{zero_pad(0, 6)}_{zero_pad(int(chunk), 6)}"
shutil.copy(
Expand All @@ -169,6 +189,12 @@ def blastx_chunk(db_chunk: str, output_dir: str, diamond_args: str, *query: str)


def mock_reference_fasta(chunks: int, chunk_size: int):
"""
This creates a dummy valid fasta which is required to build a dummy alignment index to feed
into the join command.
With the join flag the command doesn't actually use the index but the blastx command always expects a valid one
"""
letters = chunk = i = 0
while chunk < chunks:
n = 100
Expand All @@ -182,17 +208,25 @@ def mock_reference_fasta(chunks: int, chunk_size: int):
def blastx_join(chunk_dir: str, out: str, diamond_args: str, *query: str):
with TemporaryDirectory() as tmp_dir:
make_par_dir(tmp_dir, "par-tmp")
# specify in the parallelization directory that we need to join all the chunks
with open(join(tmp_dir, "par-tmp", f"join_todo_{zero_pad(0, 6)}"), "w") as f:
f.write("TOKEN\n")

# copy all of the intermediate files into the right spot
for f in os.listdir(chunk_dir):
shutil.copy(join(chunk_dir, f), join(tmp_dir, "par-tmp", f))

chunks = len(os.listdir(chunk_dir)) // 2
with NamedTemporaryFile() as ref_fasta, NamedTemporaryFile() as db:
# make fake db to appease diamond
# With the join flag the command doesn't actually use the index but the blastx command
# always expects a valid one

# make a dummy fasta
SeqIO.write(SeqRecord(Seq("M"), "ID"), ref_fasta.name, "fasta")
# turn it into a dummy index
diamond_makedb(tmp_dir, ref_fasta.name, db.name)
# join the chunks with diamond
diamond_blastx(
cwd=tmp_dir,
par_tmpdir="par-tmp",
Expand All @@ -203,7 +237,8 @@ def blastx_join(chunk_dir: str, out: str, diamond_args: str, *query: str):
diamond_args=diamond_args,
queries=(abspath(q) for q in query),
)


# the join outputs the final results in different files, concatenate them
with open(out, "w") as out_f:
for out_chunk in glob(join(tmp_dir, f"{out}_*")):
with open(out_chunk) as out_chunk_f:
Expand Down

0 comments on commit 14046ad

Please sign in to comment.