From 6b9ad1ac2a38df25bde5aae45a616fc2b848346c Mon Sep 17 00:00:00 2001 From: Robert McArthur Date: Mon, 4 Nov 2024 12:14:19 +1100 Subject: [PATCH 1/6] ENH: optimise ctree cli --- src/diverse_seq/cli.py | 20 +-- src/diverse_seq/cluster.py | 313 +++++++++++++++++++++++++----------- src/diverse_seq/distance.py | 32 ++-- 3 files changed, 237 insertions(+), 128 deletions(-) diff --git a/src/diverse_seq/cli.py b/src/diverse_seq/cli.py index fe4773f..a596eb5 100644 --- a/src/diverse_seq/cli.py +++ b/src/diverse_seq/cli.py @@ -480,20 +480,12 @@ def ctree( sys.exit(1) seqids = dvs_data_store.get_seqids_from_store(seqfile)[:limit] - records = dvs_data_store.get_ordered_records( - dvs_data_store.HDF5DataStore(seqfile), - seqids, - ) - - arr2str_app = dvs_util.arr2str(moltype) - - seqs = {} - for name, record in zip(seqids, records, strict=True): - seqs[name] = arr2str_app(record.read()) # pylint: disable=not-callable - - seqs = make_unaligned_seqs(seqs, moltype=moltype) + if limit is not None: + seqids = seqids[:limit] - app = dvs_cluster.dvs_par_ctree( + app = dvs_cluster.dvs_cli_par_ctree( + seq_store=seqfile, + limit=limit, k=k, sketch_size=sketch_size, moltype=moltype, @@ -503,7 +495,7 @@ def ctree( parallel=numprocs > 1, show_progress=not hide_progress, ) - tree = app(seqs) # pylint: disable=not-callable + tree = app(seqids) # pylint: disable=not-callable tree.write(outpath) diff --git a/src/diverse_seq/cluster.py b/src/diverse_seq/cluster.py index e8db8c2..c26d857 100644 --- a/src/diverse_seq/cluster.py +++ b/src/diverse_seq/cluster.py @@ -3,6 +3,7 @@ import multiprocessing from collections.abc import Sequence from contextlib import nullcontext +from pathlib import Path from typing import Literal import cogent3.app.typing as c3_types @@ -14,6 +15,7 @@ from scipy.sparse import dok_matrix from sklearn.cluster import AgglomerativeClustering +from diverse_seq import data_store as dvs_data_store from diverse_seq.distance import ( BottomSketch, euclidean_distance, @@ -239,103 +241,7 @@ def make_cluster_tree( return tree -@define_app(app_type=AppType.NON_COMPOSABLE) -class dvs_par_ctree(ClusterTreeBase): - """Create a cluster tree from kmer distances in parallel.""" - - def __init__( - self, - *, - k: int = 12, - sketch_size: int | None = 3000, - moltype: str = "dna", - distance_mode: Literal["mash", "euclidean"] = "mash", - mash_canonical_kmers: bool | None = None, - show_progress: bool = False, - max_workers: int | None = None, - parallel: bool = True, - ) -> None: - """Initialise parameters for generating a kmer cluster tree. - - Parameters - ---------- - k - kmer size - sketch_size - size of sketches, only applies to mash distance - moltype - seq collection molecular type - distance_mode - mash distance or euclidean distance between kmer freqs - mash_canonical_kmers - whether to use mash canonical kmers for mash distance - show_progress - whether to show progress bars - numprocs - number of workers, defaults to running serial - - Notes - ----- - This is app is not composable but can run in parallel. It is - best suited to a single large sequence collection. - - If mash_canonical_kmers is enabled when using the mash distance, - kmers are considered identical to their reverse complement. - - References - ---------- - .. [1] Ondov, B. D., Treangen, T. J., Melsted, P., Mallonee, A. B., - Bergman, N. H., Koren, S., & Phillippy, A. M. (2016). - Mash: fast genome and metagenome distance estimation using MinHash. - Genome biology, 17, 1-14. - """ - super().__init__( - k=k, - sketch_size=sketch_size, - moltype=moltype, - distance_mode=distance_mode, - mash_canonical_kmers=mash_canonical_kmers, - show_progress=show_progress, - ) - - if parallel: - if max_workers is None: - max_workers = multiprocessing.cpu_count() - self._numprocs = min(max_workers, multiprocessing.cpu_count()) - else: - self._numprocs = 1 - - self._calc_dist = ( - self._mash_dist if distance_mode == "mash" else self._euclidean_dist - ) - - def main(self, seqs: c3_types.SeqsCollectionType) -> PhyloNode: - """Construct a cluster tree for a collection of sequences. - - Parameters - ---------- - seqs - Sequence collection to form cluster tree for. - - Returns - ------- - PhyloNode - a cluster tree. - """ - seqs = seqs.degap() - self._executor = ( - get_reusable_executor(max_workers=self._numprocs) - if self._numprocs != 1 - else nullcontext() - ) - - seq_names = seqs.names - seq_arrays = [self._s2a(seqs.get_seq(name)) for name in seq_names] # pylint: disable=not-callable - - with self._progress, self._executor: - distances = self._calc_dist(seq_arrays) - return make_cluster_tree(seq_names, distances, progress=self._progress) - +class DvsParCtreeMixin: def _mash_dist(self, seq_arrays: Sequence[SeqArray]) -> numpy.ndarray: """Calculates pairwise mash distances between sequences in parallel. @@ -513,6 +419,219 @@ def mash_sketches_parallel( return bottom_sketches +@define_app(app_type=AppType.NON_COMPOSABLE) +class dvs_par_ctree(ClusterTreeBase, DvsParCtreeMixin): + """Create a cluster tree from kmer distances in parallel.""" + + def __init__( + self, + *, + k: int = 12, + sketch_size: int | None = 3000, + moltype: str = "dna", + distance_mode: Literal["mash", "euclidean"] = "mash", + mash_canonical_kmers: bool | None = None, + show_progress: bool = False, + max_workers: int | None = None, + parallel: bool = True, + ) -> None: + """Initialise parameters for generating a kmer cluster tree. + + Parameters + ---------- + k + kmer size + sketch_size + size of sketches, only applies to mash distance + moltype + seq collection molecular type + distance_mode + mash distance or euclidean distance between kmer freqs + mash_canonical_kmers + whether to use mash canonical kmers for mash distance + show_progress + whether to show progress bars + numprocs + number of workers, defaults to running serial + + Notes + ----- + This is app is not composable but can run in parallel. It is + best suited to a single large sequence collection. + + If mash_canonical_kmers is enabled when using the mash distance, + kmers are considered identical to their reverse complement. + + References + ---------- + .. [1] Ondov, B. D., Treangen, T. J., Melsted, P., Mallonee, A. B., + Bergman, N. H., Koren, S., & Phillippy, A. M. (2016). + Mash: fast genome and metagenome distance estimation using MinHash. + Genome biology, 17, 1-14. + """ + super().__init__( + k=k, + sketch_size=sketch_size, + moltype=moltype, + distance_mode=distance_mode, + mash_canonical_kmers=mash_canonical_kmers, + show_progress=show_progress, + ) + + if parallel: + if max_workers is None: + max_workers = multiprocessing.cpu_count() + self._numprocs = min(max_workers, multiprocessing.cpu_count()) + else: + self._numprocs = 1 + + self._calc_dist = ( + self._mash_dist if distance_mode == "mash" else self._euclidean_dist + ) + + def main(self, seqs: c3_types.SeqsCollectionType) -> PhyloNode: + """Construct a cluster tree for a collection of sequences. + + Parameters + ---------- + seqs + Sequence collection to form cluster tree for. + + Returns + ------- + PhyloNode + a cluster tree. + """ + seqs = seqs.degap() + self._executor = ( + get_reusable_executor(max_workers=self._numprocs) + if self._numprocs != 1 + else nullcontext() + ) + + seq_names = seqs.names + seq_arrays = [self._s2a(seqs.get_seq(name)) for name in seq_names] # pylint: disable=not-callable + + with self._progress, self._executor: + distances = self._calc_dist(seq_arrays) + return make_cluster_tree(seq_names, distances, progress=self._progress) + + +@define_app(app_type=AppType.NON_COMPOSABLE) +class dvs_cli_par_ctree(ClusterTreeBase, DvsParCtreeMixin): + """Create a cluster tree from kmer distances in parallel.""" + + def __init__( + self, + *, + seq_store: str | Path, + limit: int | None = None, + k: int = 12, + sketch_size: int | None = 3000, + moltype: str = "dna", + distance_mode: Literal["mash", "euclidean"] = "mash", + mash_canonical_kmers: bool | None = None, + show_progress: bool = False, + max_workers: int | None = None, + parallel: bool = True, + ) -> None: + """Initialise parameters for generating a kmer cluster tree. + + Parameters + ---------- + k + kmer size + sketch_size + size of sketches, only applies to mash distance + moltype + seq collection molecular type + distance_mode + mash distance or euclidean distance between kmer freqs + mash_canonical_kmers + whether to use mash canonical kmers for mash distance + show_progress + whether to show progress bars + numprocs + number of workers, defaults to running serial + + Notes + ----- + This is app is not composable but can run in parallel. It is + best suited to a single large sequence collection. + + If mash_canonical_kmers is enabled when using the mash distance, + kmers are considered identical to their reverse complement. + + References + ---------- + .. [1] Ondov, B. D., Treangen, T. J., Melsted, P., Mallonee, A. B., + Bergman, N. H., Koren, S., & Phillippy, A. M. (2016). + Mash: fast genome and metagenome distance estimation using MinHash. + Genome biology, 17, 1-14. + """ + super().__init__( + k=k, + sketch_size=sketch_size, + moltype=moltype, + distance_mode=distance_mode, + mash_canonical_kmers=mash_canonical_kmers, + show_progress=show_progress, + ) + + self._seq_store = seq_store + self._limit = limit + + if parallel: + if max_workers is None: + max_workers = multiprocessing.cpu_count() + self._numprocs = min(max_workers, multiprocessing.cpu_count()) + else: + self._numprocs = 1 + + self._calc_dist = ( + self._mash_dist if distance_mode == "mash" else self._euclidean_dist + ) + + def main(self, seq_names: Sequence[str]) -> PhyloNode: + """Construct a cluster tree for a collection of sequences. + + Parameters + ---------- + seqs + Sequence collection to form cluster tree for. + + Returns + ------- + PhyloNode + a cluster tree. + """ + dstore = dvs_data_store.HDF5DataStore(self._seq_store, mode="r") + + self._executor = ( + get_reusable_executor(max_workers=self._numprocs) + if self._numprocs != 1 + else nullcontext() + ) + + seq_arrays = { + m.unique_id: m # pylint: disable=not-callable + for m in dstore.completed + if m.unique_id in seq_names + } + seq_arrays = [seq_arrays[name] for name in seq_names] + seq_arrays = seq_arrays[: self._limit] if self._limit else seq_arrays + seq_arrays = [ + SeqArray( + member.unique_id, member.read(), self._moltype, member.data_store.source + ) + for member in seq_arrays + ] + + with self._progress, self._executor: + distances = self._calc_dist(seq_arrays) + return make_cluster_tree(seq_names, distances, progress=self._progress) + + def compute_mash_chunk_distances( start_idx: int, stride: int, diff --git a/src/diverse_seq/distance.py b/src/diverse_seq/distance.py index 7bbc807..ed718eb 100644 --- a/src/diverse_seq/distance.py +++ b/src/diverse_seq/distance.py @@ -252,7 +252,7 @@ def mash_sketches( k, sketch_size, num_states, - mash_canonical=mash_canonical, + mash_canonical, ) progress.update(sketch_task, advance=1) @@ -260,12 +260,12 @@ def mash_sketches( return bottom_sketches +@numba.njit def mash_sketch( seq_array: np.ndarray, k: int, sketch_size: int, num_states: int, - *, mash_canonical: bool, ) -> BottomSketch: """Find the mash sketch for a sequence array. @@ -288,22 +288,19 @@ def mash_sketch( BottomSketch The bottom sketch for the given sequence seq_array. """ - kmer_hashes = set( - get_kmer_hashes( - seq_array, - k, - num_states, - mash_canonical=mash_canonical, - ), - ) + kmer_hashes = np.unique(get_kmer_hashes(seq_array, k, num_states, mash_canonical)) + + heap = [0] + heap.pop() - heap = [] for kmer_hash in kmer_hashes: if len(heap) < sketch_size: heapq.heappush(heap, -kmer_hash) else: heapq.heappushpop(heap, -kmer_hash) - return sorted(-kmer_hash for kmer_hash in heap) + kmer_hashes = [-kmer_hash for kmer_hash in heap] + kmer_hashes.sort() + return kmer_hashes @numba.njit @@ -313,7 +310,7 @@ def get_kmer_hashes( num_states: int, *, mash_canonical: bool, -) -> list[int]: +) -> np.ndarray: """Get the kmer hashes comprising a sequence. Parameters @@ -334,22 +331,23 @@ def get_kmer_hashes( """ seq = seq.astype(np.int64) - kmer_hashes = [0] - kmer_hashes.pop() # numba requires list to be pre-populated to infer type + kmer_hashes = np.zeros(len(seq) - k + 1, dtype=np.int32) skip_until = 0 for i in range(k): if seq[i] >= num_states: skip_until = i + 1 + kmer_index = 0 for i in range(len(seq) - k + 1): if seq[i + k - 1] >= num_states: skip_until = i + k if i < skip_until: continue - kmer_hashes.append(hash_kmer(seq[i : i + k], mash_canonical)) - return kmer_hashes + kmer_hashes[kmer_index] = hash_kmer(seq[i : i + k], mash_canonical) + kmer_index += 1 + return kmer_hashes[:kmer_index] @numba.njit From 35be7c69da4af8d282267f4fa7633926ff1ef7a1 Mon Sep 17 00:00:00 2001 From: Robert McArthur Date: Mon, 4 Nov 2024 12:32:27 +1100 Subject: [PATCH 2/6] DEV: debug mac failure in ci --- .github/workflows/ci.yml | 1 + src/diverse_seq/cli.py | 1 + 2 files changed, 2 insertions(+) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 6d70a44..2b68f01 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -10,6 +10,7 @@ jobs: runs-on: ${{ matrix.os }} strategy: + fail-fast: false matrix: os: [ubuntu-latest, macos-latest, windows-latest] python-version: ["3.10", "3.11", "3.12"] diff --git a/src/diverse_seq/cli.py b/src/diverse_seq/cli.py index a596eb5..f227f95 100644 --- a/src/diverse_seq/cli.py +++ b/src/diverse_seq/cli.py @@ -496,6 +496,7 @@ def ctree( show_progress=not hide_progress, ) tree = app(seqids) # pylint: disable=not-callable + print(tree) tree.write(outpath) From 8ab6143da7896980c1b28e6d77429a887de82d44 Mon Sep 17 00:00:00 2001 From: Robert McArthur Date: Mon, 4 Nov 2024 12:43:58 +1100 Subject: [PATCH 3/6] DEV: remove debugging, change input type to cli app --- .github/workflows/ci.yml | 1 - src/diverse_seq/cli.py | 1 - src/diverse_seq/cluster.py | 2 +- 3 files changed, 1 insertion(+), 3 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 2b68f01..6d70a44 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -10,7 +10,6 @@ jobs: runs-on: ${{ matrix.os }} strategy: - fail-fast: false matrix: os: [ubuntu-latest, macos-latest, windows-latest] python-version: ["3.10", "3.11", "3.12"] diff --git a/src/diverse_seq/cli.py b/src/diverse_seq/cli.py index f227f95..a596eb5 100644 --- a/src/diverse_seq/cli.py +++ b/src/diverse_seq/cli.py @@ -496,7 +496,6 @@ def ctree( show_progress=not hide_progress, ) tree = app(seqids) # pylint: disable=not-callable - print(tree) tree.write(outpath) diff --git a/src/diverse_seq/cluster.py b/src/diverse_seq/cluster.py index c26d857..8111b04 100644 --- a/src/diverse_seq/cluster.py +++ b/src/diverse_seq/cluster.py @@ -592,7 +592,7 @@ def __init__( self._mash_dist if distance_mode == "mash" else self._euclidean_dist ) - def main(self, seq_names: Sequence[str]) -> PhyloNode: + def main(self, seq_names: list[str]) -> PhyloNode: """Construct a cluster tree for a collection of sequences. Parameters From e45f9d857f2e07fb4f259d3e05643bc0f7475f0a Mon Sep 17 00:00:00 2001 From: Robert McArthur Date: Mon, 4 Nov 2024 16:23:43 +1100 Subject: [PATCH 4/6] MAINT: add back pragma: no cover removed by merge --- src/diverse_seq/distance.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/diverse_seq/distance.py b/src/diverse_seq/distance.py index 4e178c3..60f0a3e 100644 --- a/src/diverse_seq/distance.py +++ b/src/diverse_seq/distance.py @@ -302,7 +302,7 @@ def get_kmer_hashes( num_states: int, *, mash_canonical: bool, -) -> np.ndarray: +) -> np.ndarray: # pragma: no cover """Get the kmer hashes comprising a sequence. Parameters From b6f7524b163a2e3078686d941d9a943859970e85 Mon Sep 17 00:00:00 2001 From: Robert McArthur Date: Fri, 8 Nov 2024 11:51:11 +1100 Subject: [PATCH 5/6] MAINT: remove kwarg only restriction for `mash_canonical` --- src/diverse_seq/distance.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/diverse_seq/distance.py b/src/diverse_seq/distance.py index 60f0a3e..42fb451 100644 --- a/src/diverse_seq/distance.py +++ b/src/diverse_seq/distance.py @@ -300,7 +300,6 @@ def get_kmer_hashes( seq: np.ndarray, k: int, num_states: int, - *, mash_canonical: bool, ) -> np.ndarray: # pragma: no cover """Get the kmer hashes comprising a sequence. From 1de8a93567edcf8deb88058d58f833c248debdbf Mon Sep 17 00:00:00 2001 From: GavinHuttley Date: Fri, 8 Nov 2024 12:09:03 +1100 Subject: [PATCH 6/6] MAINT: fix indentation issue --- src/diverse_seq/cli.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/diverse_seq/cli.py b/src/diverse_seq/cli.py index 95ad757..4446cf5 100644 --- a/src/diverse_seq/cli.py +++ b/src/diverse_seq/cli.py @@ -499,7 +499,7 @@ def ctree( dvs_util.print_colour(tree, "red") sys.exit(1) - tree.write(outpath) + tree.write(outpath) if __name__ == "__main__":