Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ENH: Optimise CLI Usage of ctree #71

Merged
merged 8 commits into from
Nov 8, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 6 additions & 14 deletions src/diverse_seq/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -480,20 +480,12 @@ def ctree(
sys.exit(1)

seqids = dvs_data_store.get_seqids_from_store(seqfile)[:limit]
records = dvs_data_store.get_ordered_records(
dvs_data_store.HDF5DataStore(seqfile),
seqids,
)

arr2str_app = dvs_util.arr2str(moltype)

seqs = {}
for name, record in zip(seqids, records, strict=True):
seqs[name] = arr2str_app(record.read()) # pylint: disable=not-callable

seqs = make_unaligned_seqs(seqs, moltype=moltype)
if limit is not None:
seqids = seqids[:limit]

app = dvs_cluster.dvs_par_ctree(
app = dvs_cluster.dvs_cli_par_ctree(
seq_store=seqfile,
limit=limit,
k=k,
sketch_size=sketch_size,
moltype=moltype,
Expand All @@ -503,7 +495,7 @@ def ctree(
parallel=numprocs > 1,
show_progress=not hide_progress,
)
tree = app(seqs) # pylint: disable=not-callable
tree = app(seqids) # pylint: disable=not-callable
tree.write(outpath)


Expand Down
313 changes: 216 additions & 97 deletions src/diverse_seq/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import multiprocessing
from collections.abc import Sequence
from contextlib import nullcontext
from pathlib import Path
from typing import Literal

import cogent3.app.typing as c3_types
Expand All @@ -14,6 +15,7 @@
from scipy.sparse import dok_matrix
from sklearn.cluster import AgglomerativeClustering

from diverse_seq import data_store as dvs_data_store
from diverse_seq.distance import (
BottomSketch,
euclidean_distance,
Expand Down Expand Up @@ -239,103 +241,7 @@ def make_cluster_tree(
return tree


@define_app(app_type=AppType.NON_COMPOSABLE)
class dvs_par_ctree(ClusterTreeBase):
"""Create a cluster tree from kmer distances in parallel."""

def __init__(
self,
*,
k: int = 12,
sketch_size: int | None = 3000,
moltype: str = "dna",
distance_mode: Literal["mash", "euclidean"] = "mash",
mash_canonical_kmers: bool | None = None,
show_progress: bool = False,
max_workers: int | None = None,
parallel: bool = True,
) -> None:
"""Initialise parameters for generating a kmer cluster tree.

Parameters
----------
k
kmer size
sketch_size
size of sketches, only applies to mash distance
moltype
seq collection molecular type
distance_mode
mash distance or euclidean distance between kmer freqs
mash_canonical_kmers
whether to use mash canonical kmers for mash distance
show_progress
whether to show progress bars
numprocs
number of workers, defaults to running serial

Notes
-----
This is app is not composable but can run in parallel. It is
best suited to a single large sequence collection.

If mash_canonical_kmers is enabled when using the mash distance,
kmers are considered identical to their reverse complement.

References
----------
.. [1] Ondov, B. D., Treangen, T. J., Melsted, P., Mallonee, A. B.,
Bergman, N. H., Koren, S., & Phillippy, A. M. (2016).
Mash: fast genome and metagenome distance estimation using MinHash.
Genome biology, 17, 1-14.
"""
super().__init__(
k=k,
sketch_size=sketch_size,
moltype=moltype,
distance_mode=distance_mode,
mash_canonical_kmers=mash_canonical_kmers,
show_progress=show_progress,
)

if parallel:
if max_workers is None:
max_workers = multiprocessing.cpu_count()
self._numprocs = min(max_workers, multiprocessing.cpu_count())
else:
self._numprocs = 1

self._calc_dist = (
self._mash_dist if distance_mode == "mash" else self._euclidean_dist
)

def main(self, seqs: c3_types.SeqsCollectionType) -> PhyloNode:
"""Construct a cluster tree for a collection of sequences.

Parameters
----------
seqs
Sequence collection to form cluster tree for.

Returns
-------
PhyloNode
a cluster tree.
"""
seqs = seqs.degap()
self._executor = (
get_reusable_executor(max_workers=self._numprocs)
if self._numprocs != 1
else nullcontext()
)

seq_names = seqs.names
seq_arrays = [self._s2a(seqs.get_seq(name)) for name in seq_names] # pylint: disable=not-callable

with self._progress, self._executor:
distances = self._calc_dist(seq_arrays)
return make_cluster_tree(seq_names, distances, progress=self._progress)

class DvsParCtreeMixin:
rmcar17 marked this conversation as resolved.
Show resolved Hide resolved
def _mash_dist(self, seq_arrays: Sequence[SeqArray]) -> numpy.ndarray:
"""Calculates pairwise mash distances between sequences in parallel.

Expand Down Expand Up @@ -513,6 +419,219 @@ def mash_sketches_parallel(
return bottom_sketches


@define_app(app_type=AppType.NON_COMPOSABLE)
class dvs_par_ctree(ClusterTreeBase, DvsParCtreeMixin):
"""Create a cluster tree from kmer distances in parallel."""

def __init__(
self,
*,
k: int = 12,
sketch_size: int | None = 3000,
moltype: str = "dna",
distance_mode: Literal["mash", "euclidean"] = "mash",
mash_canonical_kmers: bool | None = None,
show_progress: bool = False,
max_workers: int | None = None,
parallel: bool = True,
) -> None:
"""Initialise parameters for generating a kmer cluster tree.

Parameters
----------
k
kmer size
sketch_size
size of sketches, only applies to mash distance
moltype
seq collection molecular type
distance_mode
mash distance or euclidean distance between kmer freqs
mash_canonical_kmers
whether to use mash canonical kmers for mash distance
show_progress
whether to show progress bars
numprocs
number of workers, defaults to running serial

Notes
-----
This is app is not composable but can run in parallel. It is
best suited to a single large sequence collection.

If mash_canonical_kmers is enabled when using the mash distance,
kmers are considered identical to their reverse complement.

References
----------
.. [1] Ondov, B. D., Treangen, T. J., Melsted, P., Mallonee, A. B.,
Bergman, N. H., Koren, S., & Phillippy, A. M. (2016).
Mash: fast genome and metagenome distance estimation using MinHash.
Genome biology, 17, 1-14.
"""
super().__init__(
k=k,
sketch_size=sketch_size,
moltype=moltype,
distance_mode=distance_mode,
mash_canonical_kmers=mash_canonical_kmers,
show_progress=show_progress,
)

if parallel:
if max_workers is None:
max_workers = multiprocessing.cpu_count()
self._numprocs = min(max_workers, multiprocessing.cpu_count())
else:
self._numprocs = 1

self._calc_dist = (
self._mash_dist if distance_mode == "mash" else self._euclidean_dist
)

def main(self, seqs: c3_types.SeqsCollectionType) -> PhyloNode:
"""Construct a cluster tree for a collection of sequences.

Parameters
----------
seqs
Sequence collection to form cluster tree for.

Returns
-------
PhyloNode
a cluster tree.
"""
seqs = seqs.degap()
self._executor = (
get_reusable_executor(max_workers=self._numprocs)
if self._numprocs != 1
else nullcontext()
)

seq_names = seqs.names
seq_arrays = [self._s2a(seqs.get_seq(name)) for name in seq_names] # pylint: disable=not-callable

with self._progress, self._executor:
distances = self._calc_dist(seq_arrays)
return make_cluster_tree(seq_names, distances, progress=self._progress)


@define_app(app_type=AppType.NON_COMPOSABLE)
class dvs_cli_par_ctree(ClusterTreeBase, DvsParCtreeMixin):
"""Create a cluster tree from kmer distances in parallel."""

def __init__(
self,
*,
seq_store: str | Path,
limit: int | None = None,
k: int = 12,
sketch_size: int | None = 3000,
moltype: str = "dna",
distance_mode: Literal["mash", "euclidean"] = "mash",
mash_canonical_kmers: bool | None = None,
show_progress: bool = False,
max_workers: int | None = None,
parallel: bool = True,
) -> None:
"""Initialise parameters for generating a kmer cluster tree.

Parameters
----------
k
kmer size
sketch_size
size of sketches, only applies to mash distance
moltype
seq collection molecular type
distance_mode
mash distance or euclidean distance between kmer freqs
mash_canonical_kmers
whether to use mash canonical kmers for mash distance
show_progress
whether to show progress bars
numprocs
number of workers, defaults to running serial

Notes
-----
This is app is not composable but can run in parallel. It is
best suited to a single large sequence collection.

If mash_canonical_kmers is enabled when using the mash distance,
kmers are considered identical to their reverse complement.

References
----------
.. [1] Ondov, B. D., Treangen, T. J., Melsted, P., Mallonee, A. B.,
Bergman, N. H., Koren, S., & Phillippy, A. M. (2016).
Mash: fast genome and metagenome distance estimation using MinHash.
Genome biology, 17, 1-14.
"""
super().__init__(
k=k,
sketch_size=sketch_size,
moltype=moltype,
distance_mode=distance_mode,
mash_canonical_kmers=mash_canonical_kmers,
show_progress=show_progress,
)

self._seq_store = seq_store
self._limit = limit

if parallel:
if max_workers is None:
max_workers = multiprocessing.cpu_count()
self._numprocs = min(max_workers, multiprocessing.cpu_count())
else:
self._numprocs = 1

self._calc_dist = (
self._mash_dist if distance_mode == "mash" else self._euclidean_dist
)

def main(self, seq_names: list[str]) -> PhyloNode:
"""Construct a cluster tree for a collection of sequences.

Parameters
----------
seqs
Sequence collection to form cluster tree for.

Returns
-------
PhyloNode
a cluster tree.
"""
dstore = dvs_data_store.HDF5DataStore(self._seq_store, mode="r")

self._executor = (
get_reusable_executor(max_workers=self._numprocs)
if self._numprocs != 1
else nullcontext()
)

seq_arrays = {
m.unique_id: m # pylint: disable=not-callable
for m in dstore.completed
if m.unique_id in seq_names
}
seq_arrays = [seq_arrays[name] for name in seq_names]
seq_arrays = seq_arrays[: self._limit] if self._limit else seq_arrays
seq_arrays = [
SeqArray(
member.unique_id, member.read(), self._moltype, member.data_store.source
)
for member in seq_arrays
]

with self._progress, self._executor:
distances = self._calc_dist(seq_arrays)
return make_cluster_tree(seq_names, distances, progress=self._progress)


def compute_mash_chunk_distances(
start_idx: int,
stride: int,
Expand Down
Loading