From e40835f2aaf42004e8c7a8b8981e2db4b1ec4a05 Mon Sep 17 00:00:00 2001 From: Sebastian Galkin Date: Tue, 18 Feb 2025 20:57:18 -0300 Subject: [PATCH] Python's `Repository.ancestry` now returns an iterator Closes: #750 --- .../icechunk-python/cheatsheets/git-users.md | 4 +-- docs/docs/icechunk-python/version-control.md | 4 +-- .../python/icechunk/_icechunk_python.pyi | 4 +-- icechunk-python/python/icechunk/repository.py | 2 +- icechunk-python/src/repository.rs | 29 +----------------- icechunk-python/src/streams.rs | 30 ++++++++++++++++++- .../tests/test_commit_properties.py | 2 +- 7 files changed, 38 insertions(+), 37 deletions(-) diff --git a/docs/docs/icechunk-python/cheatsheets/git-users.md b/docs/docs/icechunk-python/cheatsheets/git-users.md index f7f02ac2..52008f45 100644 --- a/docs/docs/icechunk-python/cheatsheets/git-users.md +++ b/docs/docs/icechunk-python/cheatsheets/git-users.md @@ -81,7 +81,7 @@ At this point, the tip of the branch is now the snapshot `198273178639187` and a In Icechunk, you can view the history of a branch by using the [`repo.ancestry()`](../reference/#icechunk.Repository.ancestry) command, similar to the `git log` command. ```python -repo.ancestry(branch="my-new-branch") +[ancestor for ancestor in repo.ancestry(branch="my-new-branch")] #[Snapshot(id='198273178639187', ...), ...] ``` @@ -156,7 +156,7 @@ We can also view the history of a tag by using the [`repo.ancestry()`](../refere repo.ancestry(tag="my-new-tag") ``` -This will return a list of snapshots that are ancestors of the tag. Similar to branches we can lookup the snapshot that a tag is based on by using the [`repo.lookup_tag()`](../reference/#icechunk.Repository.lookup_tag) command. +This will return an iterator of snapshots that are ancestors of the tag. Similar to branches we can lookup the snapshot that a tag is based on by using the [`repo.lookup_tag()`](../reference/#icechunk.Repository.lookup_tag) command. ```python repo.lookup_tag("my-new-tag") diff --git a/docs/docs/icechunk-python/version-control.md b/docs/docs/icechunk-python/version-control.md index 347dd084..ca3f1e00 100644 --- a/docs/docs/icechunk-python/version-control.md +++ b/docs/docs/icechunk-python/version-control.md @@ -27,7 +27,7 @@ repo = icechunk.Repository.create(icechunk.in_memory_storage()) On creating a new [`Repository`](../reference/#icechunk.Repository), it will automatically create a `main` branch with an initial snapshot. We can take a look at the ancestry of the `main` branch to confirm this. ```python -repo.ancestry(branch="main") +[ancestor for ancestor in repo.ancestry(branch="main")] # [SnapshotInfo(id="A840RMN5CF807CM66RY0", parent_id=None, written_at=datetime.datetime(2025,1,30,19,52,41,592998, tzinfo=datetime.timezone.utc), message="Repository...")] ``` @@ -36,7 +36,7 @@ repo.ancestry(branch="main") The [`ancestry`](./reference/#icechunk.Repository.ancestry) method can be used to inspect the ancestry of any branch, snapshot, or tag. -We get back a list of [`SnapshotInfo`](../reference/#icechunk.SnapshotInfo) objects, which contain information about the snapshot, including its ID, the ID of its parent snapshot, and the time it was written. +We get back an iterator of [`SnapshotInfo`](../reference/#icechunk.SnapshotInfo) objects, which contain information about the snapshot, including its ID, the ID of its parent snapshot, and the time it was written. ## Creating a snapshot diff --git a/icechunk-python/python/icechunk/_icechunk_python.pyi b/icechunk-python/python/icechunk/_icechunk_python.pyi index 6bdfc57c..b1b8806d 100644 --- a/icechunk-python/python/icechunk/_icechunk_python.pyi +++ b/icechunk-python/python/icechunk/_icechunk_python.pyi @@ -1,6 +1,6 @@ import abc import datetime -from collections.abc import AsyncGenerator, AsyncIterator +from collections.abc import AsyncGenerator, AsyncIterator, Iterator from enum import Enum from typing import Any @@ -907,7 +907,7 @@ class PyRepository: branch: str | None = None, tag: str | None = None, snapshot: str | None = None, - ) -> list[SnapshotInfo]: ... + ) -> Iterator[SnapshotInfo]: ... def async_ancestry( self, *, diff --git a/icechunk-python/python/icechunk/repository.py b/icechunk-python/python/icechunk/repository.py index 8e83a369..b6052e1f 100644 --- a/icechunk-python/python/icechunk/repository.py +++ b/icechunk-python/python/icechunk/repository.py @@ -219,7 +219,7 @@ def ancestry( ----- Only one of the arguments can be specified. """ - return self._repository.ancestry(branch=branch, tag=tag, snapshot=snapshot) + return self._repository.async_ancestry(branch=branch, tag=tag, snapshot=snapshot) def async_ancestry( self, diff --git a/icechunk-python/src/repository.rs b/icechunk-python/src/repository.rs index f1841424..e8b83434 100644 --- a/icechunk-python/src/repository.rs +++ b/icechunk-python/src/repository.rs @@ -511,34 +511,7 @@ impl PyRepository { PyStorage(Arc::clone(self.0.storage())) } - #[pyo3(signature = (*, branch = None, tag = None, snapshot = None))] - pub fn ancestry( - &self, - py: Python<'_>, - branch: Option, - tag: Option, - snapshot: Option, - ) -> PyResult> { - // This function calls block_on, so we need to allow other thread python to make progress - py.allow_threads(move || { - let version = args_to_version_info(branch, tag, snapshot)?; - - // TODO: this holds everything in memory - pyo3_async_runtimes::tokio::get_runtime().block_on(async move { - let ancestry = self - .0 - .ancestry(&version) - .await - .map_err(PyIcechunkStoreError::RepositoryError)? - .map_ok(Into::::into) - .try_collect::>() - .await - .map_err(PyIcechunkStoreError::RepositoryError)?; - Ok(ancestry) - }) - }) - } - + /// Returns an object that is both a sync and an async iterator #[pyo3(signature = (*, branch = None, tag = None, snapshot = None))] pub fn async_ancestry( &self, diff --git a/icechunk-python/src/streams.rs b/icechunk-python/src/streams.rs index f4b06408..2deae06a 100644 --- a/icechunk-python/src/streams.rs +++ b/icechunk-python/src/streams.rs @@ -1,7 +1,10 @@ use std::{pin::Pin, sync::Arc}; use futures::{Stream, StreamExt}; -use pyo3::{exceptions::PyStopAsyncIteration, prelude::*}; +use pyo3::{ + exceptions::{PyStopAsyncIteration, PyStopIteration}, + prelude::*, +}; use tokio::sync::Mutex; type PyObjectStream = Arc>> + Send>>>>; @@ -31,6 +34,10 @@ impl PyAsyncGenerator { slf } + fn __iter__(slf: PyRef<'_, Self>) -> PyRef<'_, Self> { + slf + } + /// This is an anext implementation. /// /// Notable thing here is that we return PyResult>. @@ -62,4 +69,25 @@ impl PyAsyncGenerator { // of pyo3? pyo3_async_runtimes::tokio::future_into_py(py, future) } + + fn __next__<'py>( + slf: PyRefMut<'py, Self>, + py: Python<'py>, + ) -> PyResult> { + // Arc::clone is cheap, so we can clone the Arc here because we move into the + // future block + let stream = slf.stream.clone(); + + py.allow_threads(move || { + let next = pyo3_async_runtimes::tokio::get_runtime().block_on(async move { + let mut unlocked = stream.lock().await; + unlocked.next().await + }); + match next { + Some(Ok(val)) => Ok(Some(val)), + Some(Err(err)) => Err(err), + None => Err(PyStopIteration::new_err("The iterator is exhausted")), + } + }) + } } diff --git a/icechunk-python/tests/test_commit_properties.py b/icechunk-python/tests/test_commit_properties.py index 18bc59e7..71c97ea3 100644 --- a/icechunk-python/tests/test_commit_properties.py +++ b/icechunk-python/tests/test_commit_properties.py @@ -27,7 +27,7 @@ def test_property_types() -> None: } snapshot_id = session.commit("some commit", props) - info = repo.ancestry(branch="main")[0] + info = [p for p in repo.ancestry(branch="main")][0] assert info.message == "some commit" assert info.id == snapshot_id assert info.parent_id == parent_id