Skip to content

Commit

Permalink
Merge pull request #71 from rmcar17/main
Browse files Browse the repository at this point in the history
ENH: Optimise CLI Usage of `ctree`
  • Loading branch information
GavinHuttley authored Nov 8, 2024
2 parents 64ec9d7 + 1de8a93 commit 87b87ce
Show file tree
Hide file tree
Showing 3 changed files with 239 additions and 127 deletions.
22 changes: 8 additions & 14 deletions src/diverse_seq/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -478,20 +478,13 @@ def ctree(
)
sys.exit(1)

seqids = dvs_data_store.get_seqids_from_store(seqfile)[:limit]
records = dvs_data_store.get_ordered_records(
dvs_data_store.HDF5DataStore(seqfile),
seqids,
)

arr2str_app = dvs_util.arr2str(moltype)

seqs = {}
for name, record in zip(seqids, records, strict=True):
seqs[name] = arr2str_app(record.read()) # pylint: disable=not-callable
seqids = dvs_data_store.get_seqids_from_store(seqfile)
if limit is not None:
seqids = seqids[:limit]

seqs = make_unaligned_seqs(seqs, moltype=moltype)
app = dvs_cluster.dvs_par_ctree(
app = dvs_cluster.dvs_cli_par_ctree(
seq_store=seqfile,
limit=limit,
k=k,
sketch_size=sketch_size,
moltype=moltype,
Expand All @@ -501,10 +494,11 @@ def ctree(
parallel=numprocs > 1,
show_progress=not hide_progress,
)
tree = app(seqs) # pylint: disable=not-callable
tree = app(seqids) # pylint: disable=not-callable
if not tree:
dvs_util.print_colour(tree, "red")
sys.exit(1)

tree.write(outpath)


Expand Down
311 changes: 216 additions & 95 deletions src/diverse_seq/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import multiprocessing
from collections.abc import Sequence
from contextlib import nullcontext
from pathlib import Path
from typing import Literal

import cogent3.app.typing as c3_types
Expand All @@ -14,6 +15,7 @@
from scipy.sparse import dok_matrix
from sklearn.cluster import AgglomerativeClustering

from diverse_seq import data_store as dvs_data_store
from diverse_seq.distance import (
BottomSketch,
euclidean_distance,
Expand Down Expand Up @@ -240,102 +242,8 @@ def make_cluster_tree(
return tree


@define_app(app_type=AppType.NON_COMPOSABLE)
class dvs_par_ctree(ClusterTreeBase):
"""Create a cluster tree from kmer distances in parallel."""

def __init__(
self,
*,
k: int = 12,
sketch_size: int | None = 3000,
moltype: str = "dna",
distance_mode: Literal["mash", "euclidean"] = "mash",
mash_canonical_kmers: bool | None = None,
show_progress: bool = False,
max_workers: int | None = None,
parallel: bool = True,
) -> None:
"""Initialise parameters for generating a kmer cluster tree.
Parameters
----------
k
kmer size
sketch_size
size of sketches, only applies to mash distance
moltype
seq collection molecular type
distance_mode
mash distance or euclidean distance between kmer freqs
mash_canonical_kmers
whether to use mash canonical kmers for mash distance
show_progress
whether to show progress bars
numprocs
number of workers, defaults to running serial
Notes
-----
This app is not composable but can run in parallel. It is
best suited to a single large sequence collection.
If mash_canonical_kmers is enabled when using the mash distance,
kmers are considered identical to their reverse complement.
References
----------
.. [1] Ondov, B. D., Treangen, T. J., Melsted, P., Mallonee, A. B.,
Bergman, N. H., Koren, S., & Phillippy, A. M. (2016).
Mash: fast genome and metagenome distance estimation using MinHash.
Genome biology, 17, 1-14.
"""
super().__init__(
k=k,
sketch_size=sketch_size,
moltype=moltype,
distance_mode=distance_mode,
mash_canonical_kmers=mash_canonical_kmers,
show_progress=show_progress,
)
if parallel:
if max_workers is None:
max_workers = multiprocessing.cpu_count()
self._numprocs = min(max_workers, multiprocessing.cpu_count())
else:
self._numprocs = 1

self._calc_dist = (
self._mash_dist if distance_mode == "mash" else self._euclidean_dist
)

def main(self, seqs: c3_types.SeqsCollectionType) -> PhyloNode:
"""Construct a cluster tree for a collection of sequences.
Parameters
----------
seqs
Sequence collection to form cluster tree for.
Returns
-------
PhyloNode
a cluster tree.
"""
seqs = seqs.degap()
self._executor = (
get_reusable_executor(max_workers=self._numprocs)
if self._numprocs != 1
else nullcontext()
)

seq_names = seqs.names
seq_arrays = [self._s2a(seqs.get_seq(name)) for name in seq_names] # pylint: disable=not-callable

with self._progress, self._executor:
distances = self._calc_dist(seq_arrays)
return make_cluster_tree(seq_names, distances, progress=self._progress)

class DvsParCtreeMixin:
def _mash_dist(self, seq_arrays: Sequence[SeqArray]) -> numpy.ndarray:
"""Calculates pairwise mash distances between sequences in parallel.
Expand Down Expand Up @@ -513,6 +421,219 @@ def mash_sketches_parallel(
return bottom_sketches


@define_app(app_type=AppType.NON_COMPOSABLE)
class dvs_par_ctree(ClusterTreeBase, DvsParCtreeMixin):
"""Create a cluster tree from kmer distances in parallel."""

def __init__(
self,
*,
k: int = 12,
sketch_size: int | None = 3000,
moltype: str = "dna",
distance_mode: Literal["mash", "euclidean"] = "mash",
mash_canonical_kmers: bool | None = None,
show_progress: bool = False,
max_workers: int | None = None,
parallel: bool = True,
) -> None:
"""Initialise parameters for generating a kmer cluster tree.
Parameters
----------
k
kmer size
sketch_size
size of sketches, only applies to mash distance
moltype
seq collection molecular type
distance_mode
mash distance or euclidean distance between kmer freqs
mash_canonical_kmers
whether to use mash canonical kmers for mash distance
show_progress
whether to show progress bars
numprocs
number of workers, defaults to running serial
Notes
-----
This is app is not composable but can run in parallel. It is
best suited to a single large sequence collection.
If mash_canonical_kmers is enabled when using the mash distance,
kmers are considered identical to their reverse complement.
References
----------
.. [1] Ondov, B. D., Treangen, T. J., Melsted, P., Mallonee, A. B.,
Bergman, N. H., Koren, S., & Phillippy, A. M. (2016).
Mash: fast genome and metagenome distance estimation using MinHash.
Genome biology, 17, 1-14.
"""
super().__init__(
k=k,
sketch_size=sketch_size,
moltype=moltype,
distance_mode=distance_mode,
mash_canonical_kmers=mash_canonical_kmers,
show_progress=show_progress,
)

if parallel:
if max_workers is None:
max_workers = multiprocessing.cpu_count()
self._numprocs = min(max_workers, multiprocessing.cpu_count())
else:
self._numprocs = 1

self._calc_dist = (
self._mash_dist if distance_mode == "mash" else self._euclidean_dist
)

def main(self, seqs: c3_types.SeqsCollectionType) -> PhyloNode:
"""Construct a cluster tree for a collection of sequences.
Parameters
----------
seqs
Sequence collection to form cluster tree for.
Returns
-------
PhyloNode
a cluster tree.
"""
seqs = seqs.degap()
self._executor = (
get_reusable_executor(max_workers=self._numprocs)
if self._numprocs != 1
else nullcontext()
)

seq_names = seqs.names
seq_arrays = [self._s2a(seqs.get_seq(name)) for name in seq_names] # pylint: disable=not-callable

with self._progress, self._executor:
distances = self._calc_dist(seq_arrays)
return make_cluster_tree(seq_names, distances, progress=self._progress)


@define_app(app_type=AppType.NON_COMPOSABLE)
class dvs_cli_par_ctree(ClusterTreeBase, DvsParCtreeMixin):
"""Create a cluster tree from kmer distances in parallel."""

def __init__(
self,
*,
seq_store: str | Path,
limit: int | None = None,
k: int = 12,
sketch_size: int | None = 3000,
moltype: str = "dna",
distance_mode: Literal["mash", "euclidean"] = "mash",
mash_canonical_kmers: bool | None = None,
show_progress: bool = False,
max_workers: int | None = None,
parallel: bool = True,
) -> None:
"""Initialise parameters for generating a kmer cluster tree.
Parameters
----------
k
kmer size
sketch_size
size of sketches, only applies to mash distance
moltype
seq collection molecular type
distance_mode
mash distance or euclidean distance between kmer freqs
mash_canonical_kmers
whether to use mash canonical kmers for mash distance
show_progress
whether to show progress bars
numprocs
number of workers, defaults to running serial
Notes
-----
This is app is not composable but can run in parallel. It is
best suited to a single large sequence collection.
If mash_canonical_kmers is enabled when using the mash distance,
kmers are considered identical to their reverse complement.
References
----------
.. [1] Ondov, B. D., Treangen, T. J., Melsted, P., Mallonee, A. B.,
Bergman, N. H., Koren, S., & Phillippy, A. M. (2016).
Mash: fast genome and metagenome distance estimation using MinHash.
Genome biology, 17, 1-14.
"""
super().__init__(
k=k,
sketch_size=sketch_size,
moltype=moltype,
distance_mode=distance_mode,
mash_canonical_kmers=mash_canonical_kmers,
show_progress=show_progress,
)

self._seq_store = seq_store
self._limit = limit

if parallel:
if max_workers is None:
max_workers = multiprocessing.cpu_count()
self._numprocs = min(max_workers, multiprocessing.cpu_count())
else:
self._numprocs = 1

self._calc_dist = (
self._mash_dist if distance_mode == "mash" else self._euclidean_dist
)

def main(self, seq_names: list[str]) -> PhyloNode:
"""Construct a cluster tree for a collection of sequences.
Parameters
----------
seqs
Sequence collection to form cluster tree for.
Returns
-------
PhyloNode
a cluster tree.
"""
dstore = dvs_data_store.HDF5DataStore(self._seq_store, mode="r")

self._executor = (
get_reusable_executor(max_workers=self._numprocs)
if self._numprocs != 1
else nullcontext()
)

seq_arrays = {
m.unique_id: m # pylint: disable=not-callable
for m in dstore.completed
if m.unique_id in seq_names
}
seq_arrays = [seq_arrays[name] for name in seq_names]
seq_arrays = seq_arrays[: self._limit] if self._limit else seq_arrays
seq_arrays = [
SeqArray(
member.unique_id, member.read(), self._moltype, member.data_store.source
)
for member in seq_arrays
]

with self._progress, self._executor:
distances = self._calc_dist(seq_arrays)
return make_cluster_tree(seq_names, distances, progress=self._progress)


def compute_mash_chunk_distances(
start_idx: int,
stride: int,
Expand Down
Loading

0 comments on commit 87b87ce

Please sign in to comment.