diff --git a/src/diverse_seq/cli.py b/src/diverse_seq/cli.py index 9d041e7..4446cf5 100644 --- a/src/diverse_seq/cli.py +++ b/src/diverse_seq/cli.py @@ -478,20 +478,13 @@ def ctree( ) sys.exit(1) - seqids = dvs_data_store.get_seqids_from_store(seqfile)[:limit] - records = dvs_data_store.get_ordered_records( - dvs_data_store.HDF5DataStore(seqfile), - seqids, - ) - - arr2str_app = dvs_util.arr2str(moltype) - - seqs = {} - for name, record in zip(seqids, records, strict=True): - seqs[name] = arr2str_app(record.read()) # pylint: disable=not-callable + seqids = dvs_data_store.get_seqids_from_store(seqfile) + if limit is not None: + seqids = seqids[:limit] - seqs = make_unaligned_seqs(seqs, moltype=moltype) - app = dvs_cluster.dvs_par_ctree( + app = dvs_cluster.dvs_cli_par_ctree( + seq_store=seqfile, + limit=limit, k=k, sketch_size=sketch_size, moltype=moltype, @@ -501,10 +494,11 @@ def ctree( parallel=numprocs > 1, show_progress=not hide_progress, ) - tree = app(seqs) # pylint: disable=not-callable + tree = app(seqids) # pylint: disable=not-callable if not tree: dvs_util.print_colour(tree, "red") sys.exit(1) + tree.write(outpath) diff --git a/src/diverse_seq/cluster.py b/src/diverse_seq/cluster.py index 0938155..8140e0d 100644 --- a/src/diverse_seq/cluster.py +++ b/src/diverse_seq/cluster.py @@ -3,6 +3,7 @@ import multiprocessing from collections.abc import Sequence from contextlib import nullcontext +from pathlib import Path from typing import Literal import cogent3.app.typing as c3_types @@ -14,6 +15,7 @@ from scipy.sparse import dok_matrix from sklearn.cluster import AgglomerativeClustering +from diverse_seq import data_store as dvs_data_store from diverse_seq.distance import ( BottomSketch, euclidean_distance, @@ -240,102 +242,8 @@ def make_cluster_tree( return tree -@define_app(app_type=AppType.NON_COMPOSABLE) -class dvs_par_ctree(ClusterTreeBase): - """Create a cluster tree from kmer distances in parallel.""" - - def __init__( - self, - *, - k: int = 12, - sketch_size: int | None = 3000, - moltype: str = "dna", - distance_mode: Literal["mash", "euclidean"] = "mash", - mash_canonical_kmers: bool | None = None, - show_progress: bool = False, - max_workers: int | None = None, - parallel: bool = True, - ) -> None: - """Initialise parameters for generating a kmer cluster tree. - - Parameters - ---------- - k - kmer size - sketch_size - size of sketches, only applies to mash distance - moltype - seq collection molecular type - distance_mode - mash distance or euclidean distance between kmer freqs - mash_canonical_kmers - whether to use mash canonical kmers for mash distance - show_progress - whether to show progress bars - numprocs - number of workers, defaults to running serial - - Notes - ----- - This app is not composable but can run in parallel. It is - best suited to a single large sequence collection. - - If mash_canonical_kmers is enabled when using the mash distance, - kmers are considered identical to their reverse complement. - - References - ---------- - .. [1] Ondov, B. D., Treangen, T. J., Melsted, P., Mallonee, A. B., - Bergman, N. H., Koren, S., & Phillippy, A. M. (2016). - Mash: fast genome and metagenome distance estimation using MinHash. - Genome biology, 17, 1-14. - """ - super().__init__( - k=k, - sketch_size=sketch_size, - moltype=moltype, - distance_mode=distance_mode, - mash_canonical_kmers=mash_canonical_kmers, - show_progress=show_progress, - ) - if parallel: - if max_workers is None: - max_workers = multiprocessing.cpu_count() - self._numprocs = min(max_workers, multiprocessing.cpu_count()) - else: - self._numprocs = 1 - - self._calc_dist = ( - self._mash_dist if distance_mode == "mash" else self._euclidean_dist - ) - - def main(self, seqs: c3_types.SeqsCollectionType) -> PhyloNode: - """Construct a cluster tree for a collection of sequences. - - Parameters - ---------- - seqs - Sequence collection to form cluster tree for. - - Returns - ------- - PhyloNode - a cluster tree. - """ - seqs = seqs.degap() - self._executor = ( - get_reusable_executor(max_workers=self._numprocs) - if self._numprocs != 1 - else nullcontext() - ) - - seq_names = seqs.names - seq_arrays = [self._s2a(seqs.get_seq(name)) for name in seq_names] # pylint: disable=not-callable - - with self._progress, self._executor: - distances = self._calc_dist(seq_arrays) - return make_cluster_tree(seq_names, distances, progress=self._progress) +class DvsParCtreeMixin: def _mash_dist(self, seq_arrays: Sequence[SeqArray]) -> numpy.ndarray: """Calculates pairwise mash distances between sequences in parallel. @@ -513,6 +421,219 @@ def mash_sketches_parallel( return bottom_sketches +@define_app(app_type=AppType.NON_COMPOSABLE) +class dvs_par_ctree(ClusterTreeBase, DvsParCtreeMixin): + """Create a cluster tree from kmer distances in parallel.""" + + def __init__( + self, + *, + k: int = 12, + sketch_size: int | None = 3000, + moltype: str = "dna", + distance_mode: Literal["mash", "euclidean"] = "mash", + mash_canonical_kmers: bool | None = None, + show_progress: bool = False, + max_workers: int | None = None, + parallel: bool = True, + ) -> None: + """Initialise parameters for generating a kmer cluster tree. + + Parameters + ---------- + k + kmer size + sketch_size + size of sketches, only applies to mash distance + moltype + seq collection molecular type + distance_mode + mash distance or euclidean distance between kmer freqs + mash_canonical_kmers + whether to use mash canonical kmers for mash distance + show_progress + whether to show progress bars + numprocs + number of workers, defaults to running serial + + Notes + ----- + This is app is not composable but can run in parallel. It is + best suited to a single large sequence collection. + + If mash_canonical_kmers is enabled when using the mash distance, + kmers are considered identical to their reverse complement. + + References + ---------- + .. [1] Ondov, B. D., Treangen, T. J., Melsted, P., Mallonee, A. B., + Bergman, N. H., Koren, S., & Phillippy, A. M. (2016). + Mash: fast genome and metagenome distance estimation using MinHash. + Genome biology, 17, 1-14. + """ + super().__init__( + k=k, + sketch_size=sketch_size, + moltype=moltype, + distance_mode=distance_mode, + mash_canonical_kmers=mash_canonical_kmers, + show_progress=show_progress, + ) + + if parallel: + if max_workers is None: + max_workers = multiprocessing.cpu_count() + self._numprocs = min(max_workers, multiprocessing.cpu_count()) + else: + self._numprocs = 1 + + self._calc_dist = ( + self._mash_dist if distance_mode == "mash" else self._euclidean_dist + ) + + def main(self, seqs: c3_types.SeqsCollectionType) -> PhyloNode: + """Construct a cluster tree for a collection of sequences. + + Parameters + ---------- + seqs + Sequence collection to form cluster tree for. + + Returns + ------- + PhyloNode + a cluster tree. + """ + seqs = seqs.degap() + self._executor = ( + get_reusable_executor(max_workers=self._numprocs) + if self._numprocs != 1 + else nullcontext() + ) + + seq_names = seqs.names + seq_arrays = [self._s2a(seqs.get_seq(name)) for name in seq_names] # pylint: disable=not-callable + + with self._progress, self._executor: + distances = self._calc_dist(seq_arrays) + return make_cluster_tree(seq_names, distances, progress=self._progress) + + +@define_app(app_type=AppType.NON_COMPOSABLE) +class dvs_cli_par_ctree(ClusterTreeBase, DvsParCtreeMixin): + """Create a cluster tree from kmer distances in parallel.""" + + def __init__( + self, + *, + seq_store: str | Path, + limit: int | None = None, + k: int = 12, + sketch_size: int | None = 3000, + moltype: str = "dna", + distance_mode: Literal["mash", "euclidean"] = "mash", + mash_canonical_kmers: bool | None = None, + show_progress: bool = False, + max_workers: int | None = None, + parallel: bool = True, + ) -> None: + """Initialise parameters for generating a kmer cluster tree. + + Parameters + ---------- + k + kmer size + sketch_size + size of sketches, only applies to mash distance + moltype + seq collection molecular type + distance_mode + mash distance or euclidean distance between kmer freqs + mash_canonical_kmers + whether to use mash canonical kmers for mash distance + show_progress + whether to show progress bars + numprocs + number of workers, defaults to running serial + + Notes + ----- + This is app is not composable but can run in parallel. It is + best suited to a single large sequence collection. + + If mash_canonical_kmers is enabled when using the mash distance, + kmers are considered identical to their reverse complement. + + References + ---------- + .. [1] Ondov, B. D., Treangen, T. J., Melsted, P., Mallonee, A. B., + Bergman, N. H., Koren, S., & Phillippy, A. M. (2016). + Mash: fast genome and metagenome distance estimation using MinHash. + Genome biology, 17, 1-14. + """ + super().__init__( + k=k, + sketch_size=sketch_size, + moltype=moltype, + distance_mode=distance_mode, + mash_canonical_kmers=mash_canonical_kmers, + show_progress=show_progress, + ) + + self._seq_store = seq_store + self._limit = limit + + if parallel: + if max_workers is None: + max_workers = multiprocessing.cpu_count() + self._numprocs = min(max_workers, multiprocessing.cpu_count()) + else: + self._numprocs = 1 + + self._calc_dist = ( + self._mash_dist if distance_mode == "mash" else self._euclidean_dist + ) + + def main(self, seq_names: list[str]) -> PhyloNode: + """Construct a cluster tree for a collection of sequences. + + Parameters + ---------- + seqs + Sequence collection to form cluster tree for. + + Returns + ------- + PhyloNode + a cluster tree. + """ + dstore = dvs_data_store.HDF5DataStore(self._seq_store, mode="r") + + self._executor = ( + get_reusable_executor(max_workers=self._numprocs) + if self._numprocs != 1 + else nullcontext() + ) + + seq_arrays = { + m.unique_id: m # pylint: disable=not-callable + for m in dstore.completed + if m.unique_id in seq_names + } + seq_arrays = [seq_arrays[name] for name in seq_names] + seq_arrays = seq_arrays[: self._limit] if self._limit else seq_arrays + seq_arrays = [ + SeqArray( + member.unique_id, member.read(), self._moltype, member.data_store.source + ) + for member in seq_arrays + ] + + with self._progress, self._executor: + distances = self._calc_dist(seq_arrays) + return make_cluster_tree(seq_names, distances, progress=self._progress) + + def compute_mash_chunk_distances( start_idx: int, stride: int, diff --git a/src/diverse_seq/distance.py b/src/diverse_seq/distance.py index 7a0a40b..7d8c5d7 100644 --- a/src/diverse_seq/distance.py +++ b/src/diverse_seq/distance.py @@ -243,7 +243,7 @@ def mash_sketches( k, sketch_size, num_states, - mash_canonical=mash_canonical, + mash_canonical, ) progress.update(sketch_task, advance=1) @@ -251,12 +251,12 @@ def mash_sketches( return bottom_sketches +@numba.njit def mash_sketch( seq_array: np.ndarray, k: int, sketch_size: int, num_states: int, - *, mash_canonical: bool, ) -> BottomSketch: """Find the mash sketch for a sequence array. @@ -279,22 +279,19 @@ def mash_sketch( BottomSketch The bottom sketch for the given sequence seq_array. """ - kmer_hashes = set( - get_kmer_hashes( - seq_array, - k, - num_states, - mash_canonical=mash_canonical, - ), - ) + kmer_hashes = np.unique(get_kmer_hashes(seq_array, k, num_states, mash_canonical)) + + heap = [0] + heap.pop() - heap = [] for kmer_hash in kmer_hashes: if len(heap) < sketch_size: heapq.heappush(heap, -kmer_hash) else: heapq.heappushpop(heap, -kmer_hash) - return sorted(-kmer_hash for kmer_hash in heap) + kmer_hashes = [-kmer_hash for kmer_hash in heap] + kmer_hashes.sort() + return kmer_hashes @numba.njit @@ -302,9 +299,8 @@ def get_kmer_hashes( seq: np.ndarray, k: int, num_states: int, - *, mash_canonical: bool, -) -> list[int]: # pragma: no cover +) -> np.ndarray: # pragma: no cover """Get the kmer hashes comprising a sequence. Parameters @@ -325,22 +321,23 @@ def get_kmer_hashes( """ seq = seq.astype(np.int64) - kmer_hashes = [0] - kmer_hashes.pop() # numba requires list to be pre-populated to infer type + kmer_hashes = np.zeros(len(seq) - k + 1, dtype=np.int32) skip_until = 0 for i in range(k): if seq[i] >= num_states: skip_until = i + 1 + kmer_index = 0 for i in range(len(seq) - k + 1): if seq[i + k - 1] >= num_states: skip_until = i + k if i < skip_until: continue - kmer_hashes.append(hash_kmer(seq[i : i + k], mash_canonical)) - return kmer_hashes + kmer_hashes[kmer_index] = hash_kmer(seq[i : i + k], mash_canonical) + kmer_index += 1 + return kmer_hashes[:kmer_index] @numba.njit