diff --git a/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py b/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py index ddcba0470..715589026 100644 --- a/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py +++ b/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py @@ -95,6 +95,10 @@ def __init__( # the oldest partitions can be efficiently removed, maintaining the most recent partitions. self._cursor_per_partition: OrderedDict[str, ConcurrentCursor] = OrderedDict() self._semaphore_per_partition: OrderedDict[str, threading.Semaphore] = OrderedDict() + + # Parent-state tracking: store each partition’s parent state in creation order + self._partition_parent_state_map: OrderedDict[str, Mapping[str, Any]] = OrderedDict() + self._finished_partitions: set[str] = set() self._lock = threading.Lock() self._timer = Timer() @@ -155,11 +159,62 @@ def close_partition(self, partition: Partition) -> None: and self._semaphore_per_partition[partition_key]._value == 0 ): self._update_global_cursor(cursor.state[self.cursor_field.cursor_field_key]) - self._emit_state_message() + + self._check_and_update_parent_state() + + self._emit_state_message() + + def _check_and_update_parent_state(self) -> None: + """ + Pop the leftmost partition state from _partition_parent_state_map only if + *all partitions* up to (and including) that partition key in _semaphore_per_partition + are fully finished (i.e. in _finished_partitions and semaphore._value == 0). + Additionally, delete finished semaphores with a value of 0 to free up memory, + as they are only needed to track errors and completion status. + """ + last_closed_state = None + + while self._partition_parent_state_map: + # Look at the earliest partition key in creation order + earliest_key = next(iter(self._partition_parent_state_map)) + + # Verify ALL partitions from the left up to earliest_key are finished + all_left_finished = True + for p_key, sem in list( + self._semaphore_per_partition.items() + ): # Use list to allow modification during iteration + # If any earlier partition is still not finished, we must stop + if p_key not in self._finished_partitions or sem._value != 0: + all_left_finished = False + break + # Once we've reached earliest_key in the semaphore order, we can stop checking + if p_key == earliest_key: + break + + # If the partitions up to earliest_key are not all finished, break the while-loop + if not all_left_finished: + break + + # Pop the leftmost entry from parent-state map + _, closed_parent_state = self._partition_parent_state_map.popitem(last=False) + last_closed_state = closed_parent_state + + # Clean up finished semaphores with value 0 up to and including earliest_key + for p_key in list(self._semaphore_per_partition.keys()): + sem = self._semaphore_per_partition[p_key] + if p_key in self._finished_partitions and sem._value == 0: + del self._semaphore_per_partition[p_key] + logger.debug(f"Deleted finished semaphore for partition {p_key} with value 0") + if p_key == earliest_key: + break + + # Update _parent_state if we popped at least one partition + if last_closed_state is not None: + self._parent_state = last_closed_state def ensure_at_least_one_state_emitted(self) -> None: """ - The platform expect to have at least one state message on successful syncs. Hence, whatever happens, we expect this method to be + The platform expects at least one state message on successful syncs. Hence, whatever happens, we expect this method to be called. """ if not any( @@ -201,13 +256,19 @@ def stream_slices(self) -> Iterable[StreamSlice]: slices = self._partition_router.stream_slices() self._timer.start() - for partition in slices: - yield from self._generate_slices_from_partition(partition) + for partition, last, parent_state in iterate_with_last_flag_and_state( + slices, self._partition_router.get_stream_state + ): + yield from self._generate_slices_from_partition(partition, parent_state) - def _generate_slices_from_partition(self, partition: StreamSlice) -> Iterable[StreamSlice]: + def _generate_slices_from_partition( + self, partition: StreamSlice, parent_state: Mapping[str, Any] + ) -> Iterable[StreamSlice]: # Ensure the maximum number of partitions is not exceeded self._ensure_partition_limit() + partition_key = self._to_partition_key(partition.partition) + cursor = self._cursor_per_partition.get(self._to_partition_key(partition.partition)) if not cursor: cursor = self._create_cursor( @@ -216,18 +277,26 @@ def _generate_slices_from_partition(self, partition: StreamSlice) -> Iterable[St ) with self._lock: self._number_of_partitions += 1 - self._cursor_per_partition[self._to_partition_key(partition.partition)] = cursor - self._semaphore_per_partition[self._to_partition_key(partition.partition)] = ( - threading.Semaphore(0) - ) + self._cursor_per_partition[partition_key] = cursor + self._semaphore_per_partition[partition_key] = threading.Semaphore(0) + + with self._lock: + if ( + len(self._partition_parent_state_map) == 0 + or self._partition_parent_state_map[ + next(reversed(self._partition_parent_state_map)) + ] + != parent_state + ): + self._partition_parent_state_map[partition_key] = deepcopy(parent_state) for cursor_slice, is_last_slice, _ in iterate_with_last_flag_and_state( cursor.stream_slices(), lambda: None, ): - self._semaphore_per_partition[self._to_partition_key(partition.partition)].release() + self._semaphore_per_partition[partition_key].release() if is_last_slice: - self._finished_partitions.add(self._to_partition_key(partition.partition)) + self._finished_partitions.add(partition_key) yield StreamSlice( partition=partition, cursor_slice=cursor_slice, extra_fields=partition.extra_fields ) @@ -257,9 +326,9 @@ def _ensure_partition_limit(self) -> None: while len(self._cursor_per_partition) > self.DEFAULT_MAX_PARTITIONS_NUMBER - 1: # Try removing finished partitions first for partition_key in list(self._cursor_per_partition.keys()): - if ( - partition_key in self._finished_partitions - and self._semaphore_per_partition[partition_key]._value == 0 + if partition_key in self._finished_partitions and ( + partition_key not in self._semaphore_per_partition + or self._semaphore_per_partition[partition_key]._value == 0 ): oldest_partition = self._cursor_per_partition.pop( partition_key @@ -338,9 +407,6 @@ def _set_initial_state(self, stream_state: StreamState) -> None: self._cursor_per_partition[self._to_partition_key(state["partition"])] = ( self._create_cursor(state["cursor"]) ) - self._semaphore_per_partition[self._to_partition_key(state["partition"])] = ( - threading.Semaphore(0) - ) # set default state for missing partitions if it is per partition with fallback to global if self._GLOBAL_STATE_KEY in stream_state: diff --git a/unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py b/unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py index 767d24874..3b4b4fe24 100644 --- a/unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py +++ b/unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py @@ -20,6 +20,14 @@ ConcurrentDeclarativeSource, ) from airbyte_cdk.sources.declarative.incremental import ConcurrentPerPartitionCursor +from airbyte_cdk.sources.declarative.stream_slicers.declarative_partition_generator import ( + DeclarativePartition, +) +from airbyte_cdk.sources.streams.concurrent.cursor import CursorField +from airbyte_cdk.sources.streams.concurrent.state_converters.datetime_stream_state_converter import ( + CustomFormatConcurrentStreamStateConverter, +) +from airbyte_cdk.sources.types import StreamSlice from airbyte_cdk.test.catalog_builder import CatalogBuilder, ConfiguredAirbyteStreamBuilder from airbyte_cdk.test.entrypoint_wrapper import EntrypointOutput, read @@ -2027,6 +2035,8 @@ def test_incremental_parent_state_no_records( "cursor": {"updated_at": PARENT_COMMENT_CURSOR_PARTITION_1}, } ], + "state": {}, + "use_global_cursor": False, "parent_state": {"posts": {"updated_at": PARENT_POSTS_CURSOR}}, } }, @@ -3017,3 +3027,426 @@ def test_state_throttling(mocker): cursor._emit_state_message() mock_connector_manager.update_state_for_stream.assert_called_once() mock_repo.emit_message.assert_called_once() + + +def test_given_no_partitions_processed_when_close_partition_then_no_state_update(): + mock_cursor = MagicMock() + # No slices for no partitions + mock_cursor.stream_slices.side_effect = [iter([])] + mock_cursor.state = {} # Empty state for no partitions + + cursor_factory_mock = MagicMock() + cursor_factory_mock.create.return_value = mock_cursor + + connector_state_converter = CustomFormatConcurrentStreamStateConverter( + datetime_format="%Y-%m-%dT%H:%M:%SZ", + input_datetime_formats=["%Y-%m-%dT%H:%M:%SZ"], + is_sequential_state=True, + cursor_granularity=timedelta(0), + ) + + cursor = ConcurrentPerPartitionCursor( + cursor_factory=cursor_factory_mock, + partition_router=MagicMock(), + stream_name="test_stream", + stream_namespace=None, + stream_state={}, + message_repository=MagicMock(), + connector_state_manager=MagicMock(), + connector_state_converter=connector_state_converter, + cursor_field=CursorField(cursor_field_key="updated_at"), + ) + partition_router = cursor._partition_router + partition_router.stream_slices.return_value = iter([]) + partition_router.get_stream_state.return_value = {} + + slices = list(cursor.stream_slices()) # Call once + for slice in slices: + cursor.close_partition( + DeclarativePartition("test_stream", {}, MagicMock(), MagicMock(), slice) + ) + + assert cursor.state == { + "use_global_cursor": False, + "lookback_window": 0, + "states": [], + } + assert len(cursor._cursor_per_partition) == 0 + assert len(cursor._semaphore_per_partition) == 0 + assert len(cursor._partition_parent_state_map) == 0 + assert mock_cursor.stream_slices.call_count == 0 # No calls since no partitions + + +def test_given_unfinished_first_parent_partition_no_parent_state_update(): + # Create two mock cursors with different states for each partition + mock_cursor_1 = MagicMock() + mock_cursor_1.stream_slices.return_value = iter( + [ + {"slice1": "data1"}, + {"slice2": "data1"}, # First partition slices + ] + ) + mock_cursor_1.state = {"updated_at": "2024-01-01T00:00:00Z"} # State for partition "1" + + mock_cursor_2 = MagicMock() + mock_cursor_2.stream_slices.return_value = iter( + [ + {"slice2": "data2"}, + {"slice2": "data2"}, # Second partition slices + ] + ) + mock_cursor_2.state = {"updated_at": "2024-01-02T00:00:00Z"} # State for partition "2" + + # Configure cursor factory to return different mock cursors based on partition + cursor_factory_mock = MagicMock() + cursor_factory_mock.create.side_effect = [mock_cursor_1, mock_cursor_2] + + connector_state_converter = CustomFormatConcurrentStreamStateConverter( + datetime_format="%Y-%m-%dT%H:%M:%SZ", + input_datetime_formats=["%Y-%m-%dT%H:%M:%SZ"], + is_sequential_state=True, + cursor_granularity=timedelta(0), + ) + + cursor = ConcurrentPerPartitionCursor( + cursor_factory=cursor_factory_mock, + partition_router=MagicMock(), + stream_name="test_stream", + stream_namespace=None, + stream_state={ + "states": [ + {"partition": {"id": "1"}, "cursor": {"updated_at": "2024-01-01T00:00:00Z"}} + ], + "state": {"updated_at": "2024-01-01T00:00:00Z"}, + "lookback_window": 86400, + "parent_state": {"posts": {"updated_at": "2024-01-01T00:00:00Z"}}, + }, + message_repository=MagicMock(), + connector_state_manager=MagicMock(), + connector_state_converter=connector_state_converter, + cursor_field=CursorField(cursor_field_key="updated_at"), + ) + partition_router = cursor._partition_router + all_partitions = [ + StreamSlice(partition={"id": "1"}, cursor_slice={}, extra_fields={}), + StreamSlice(partition={"id": "2"}, cursor_slice={}, extra_fields={}), # New partition + ] + partition_router.stream_slices.return_value = iter(all_partitions) + partition_router.get_stream_state.side_effect = [ + {"posts": {"updated_at": "2024-01-04T00:00:00Z"}}, # Initial parent state + {"posts": {"updated_at": "2024-01-05T00:00:00Z"}}, # Updated parent state for new partition + ] + + slices = list(cursor.stream_slices()) + # Close all partitions except from the first one + for slice in slices[1:]: + cursor.close_partition( + DeclarativePartition("test_stream", {}, MagicMock(), MagicMock(), slice) + ) + cursor.ensure_at_least_one_state_emitted() + + state = cursor.state + assert state == { + "use_global_cursor": False, + "states": [ + {"partition": {"id": "1"}, "cursor": {"updated_at": "2024-01-01T00:00:00Z"}}, + {"partition": {"id": "2"}, "cursor": {"updated_at": "2024-01-02T00:00:00Z"}}, + ], + "state": {"updated_at": "2024-01-01T00:00:00Z"}, + "lookback_window": 86400, + "parent_state": {"posts": {"updated_at": "2024-01-01T00:00:00Z"}}, + } + assert mock_cursor_1.stream_slices.call_count == 1 # Called once for each partition + assert mock_cursor_2.stream_slices.call_count == 1 # Called once for each partition + assert len(cursor._semaphore_per_partition) == 2 + + +def test_given_unfinished_last_parent_partition_with_partial_parent_state_update(): + # Create two mock cursors with different states for each partition + mock_cursor_1 = MagicMock() + mock_cursor_1.stream_slices.return_value = iter( + [ + {"slice1": "data1"}, + {"slice2": "data1"}, # First partition slices + ] + ) + mock_cursor_1.state = {"updated_at": "2024-01-02T00:00:00Z"} # State for partition "1" + + mock_cursor_2 = MagicMock() + mock_cursor_2.stream_slices.return_value = iter( + [ + {"slice2": "data2"}, + {"slice2": "data2"}, # Second partition slices + ] + ) + mock_cursor_2.state = {"updated_at": "2024-01-01T00:00:00Z"} # State for partition "2" + + # Configure cursor factory to return different mock cursors based on partition + cursor_factory_mock = MagicMock() + cursor_factory_mock.create.side_effect = [mock_cursor_1, mock_cursor_2] + + connector_state_converter = CustomFormatConcurrentStreamStateConverter( + datetime_format="%Y-%m-%dT%H:%M:%SZ", + input_datetime_formats=["%Y-%m-%dT%H:%M:%SZ"], + is_sequential_state=True, + cursor_granularity=timedelta(0), + ) + + cursor = ConcurrentPerPartitionCursor( + cursor_factory=cursor_factory_mock, + partition_router=MagicMock(), + stream_name="test_stream", + stream_namespace=None, + stream_state={ + "states": [ + {"partition": {"id": "1"}, "cursor": {"updated_at": "2024-01-01T00:00:00Z"}} + ], + "state": {"updated_at": "2024-01-01T00:00:00Z"}, + "lookback_window": 86400, + "parent_state": {"posts": {"updated_at": "2024-01-01T00:00:00Z"}}, + }, + message_repository=MagicMock(), + connector_state_manager=MagicMock(), + connector_state_converter=connector_state_converter, + cursor_field=CursorField(cursor_field_key="updated_at"), + ) + partition_router = cursor._partition_router + all_partitions = [ + StreamSlice(partition={"id": "1"}, cursor_slice={}, extra_fields={}), + StreamSlice(partition={"id": "2"}, cursor_slice={}, extra_fields={}), # New partition + ] + partition_router.stream_slices.return_value = iter(all_partitions) + partition_router.get_stream_state.side_effect = [ + {"posts": {"updated_at": "2024-01-04T00:00:00Z"}}, # Initial parent state + {"posts": {"updated_at": "2024-01-05T00:00:00Z"}}, # Updated parent state for new partition + ] + + slices = list(cursor.stream_slices()) + # Close all partitions except from the first one + for slice in slices[:-1]: + cursor.close_partition( + DeclarativePartition("test_stream", {}, MagicMock(), MagicMock(), slice) + ) + cursor.ensure_at_least_one_state_emitted() + + state = cursor.state + assert state == { + "use_global_cursor": False, + "states": [ + {"partition": {"id": "1"}, "cursor": {"updated_at": "2024-01-02T00:00:00Z"}}, + {"partition": {"id": "2"}, "cursor": {"updated_at": "2024-01-01T00:00:00Z"}}, + ], + "state": {"updated_at": "2024-01-01T00:00:00Z"}, + "lookback_window": 86400, + "parent_state": {"posts": {"updated_at": "2024-01-04T00:00:00Z"}}, + } + assert mock_cursor_1.stream_slices.call_count == 1 # Called once for each partition + assert mock_cursor_2.stream_slices.call_count == 1 # Called once for each partition + assert len(cursor._semaphore_per_partition) == 1 + + +def test_given_all_partitions_finished_when_close_partition_then_final_state_emitted(): + mock_cursor = MagicMock() + # Simulate one slice per cursor + mock_cursor.stream_slices.side_effect = [ + iter( + [ + {"slice1": "data"}, # First slice for partition 1 + ] + ), + iter( + [ + {"slice2": "data"}, # First slice for partition 2 + ] + ), + ] + mock_cursor.state = {"updated_at": "2024-01-02T00:00:00Z"} # Set cursor state (latest) + + cursor_factory_mock = MagicMock() + cursor_factory_mock.create.return_value = mock_cursor + + connector_state_converter = CustomFormatConcurrentStreamStateConverter( + datetime_format="%Y-%m-%dT%H:%M:%SZ", + input_datetime_formats=["%Y-%m-%dT%H:%M:%SZ"], + is_sequential_state=True, + cursor_granularity=timedelta(0), + ) + + cursor = ConcurrentPerPartitionCursor( + cursor_factory=cursor_factory_mock, + partition_router=MagicMock(), + stream_name="test_stream", + stream_namespace=None, + stream_state={ + "states": [ + {"partition": {"id": "1"}, "cursor": {"updated_at": "2024-01-01T00:00:00Z"}}, + {"partition": {"id": "2"}, "cursor": {"updated_at": "2024-01-02T00:00:00Z"}}, + ], + "state": {"updated_at": "2024-01-02T00:00:00Z"}, + "lookback_window": 86400, + "parent_state": {"posts": {"updated_at": "2024-01-03T00:00:00Z"}}, + }, + message_repository=MagicMock(), + connector_state_manager=MagicMock(), + connector_state_converter=connector_state_converter, + cursor_field=CursorField(cursor_field_key="updated_at"), + ) + partition_router = cursor._partition_router + partitions = [ + StreamSlice(partition={"id": "1"}, cursor_slice={}, extra_fields={}), + StreamSlice(partition={"id": "2"}, cursor_slice={}, extra_fields={}), + ] + partition_router.stream_slices.return_value = iter(partitions) + partition_router.get_stream_state.return_value = { + "posts": {"updated_at": "2024-01-06T00:00:00Z"} + } + + slices = list(cursor.stream_slices()) + for slice in slices: + cursor.close_partition( + DeclarativePartition("test_stream", {}, MagicMock(), MagicMock(), slice) + ) + + cursor.ensure_at_least_one_state_emitted() + + final_state = cursor.state + assert final_state["use_global_cursor"] is False + assert len(final_state["states"]) == 2 + assert final_state["state"]["updated_at"] == "2024-01-02T00:00:00Z" + assert final_state["parent_state"] == {"posts": {"updated_at": "2024-01-06T00:00:00Z"}} + assert final_state["lookback_window"] == 1 + assert cursor._message_repository.emit_message.call_count == 2 + assert mock_cursor.stream_slices.call_count == 2 # Called once for each partition + assert len(cursor._semaphore_per_partition) == 1 + + +def test_given_partition_limit_exceeded_when_close_partition_then_switch_to_global_cursor(): + mock_cursor = MagicMock() + # Simulate one slice per cursor + mock_cursor.stream_slices.side_effect = [iter([{"slice" + str(i): "data"}]) for i in range(3)] + mock_cursor.state = {"updated_at": "2024-01-01T00:00:00Z"} # Set cursor state + + cursor_factory_mock = MagicMock() + cursor_factory_mock.create.return_value = mock_cursor + + connector_state_converter = CustomFormatConcurrentStreamStateConverter( + datetime_format="%Y-%m-%dT%H:%M:%SZ", + input_datetime_formats=["%Y-%m-%dT%H:%M:%SZ"], + is_sequential_state=True, + cursor_granularity=timedelta(0), + ) + + cursor = ConcurrentPerPartitionCursor( + cursor_factory=cursor_factory_mock, + partition_router=MagicMock(), + stream_name="test_stream", + stream_namespace=None, + stream_state={}, + message_repository=MagicMock(), + connector_state_manager=MagicMock(), + connector_state_converter=connector_state_converter, + cursor_field=CursorField(cursor_field_key="updated_at"), + ) + # Override default limit for testing + cursor.DEFAULT_MAX_PARTITIONS_NUMBER = 2 + cursor.SWITCH_TO_GLOBAL_LIMIT = 1 + + partition_router = cursor._partition_router + partitions = [ + StreamSlice(partition={"id": str(i)}, cursor_slice={}, extra_fields={}) for i in range(3) + ] # 3 partitions + partition_router.stream_slices.return_value = iter(partitions) + partition_router.get_stream_state.side_effect = [ + {"updated_at": "2024-01-02T00:00:00Z"}, + {"updated_at": "2024-01-03T00:00:00Z"}, + {"updated_at": "2024-01-04T00:00:00Z"}, + {"updated_at": "2024-01-04T00:00:00Z"}, + ] + + slices = list(cursor.stream_slices()) + for slice in slices: + cursor.close_partition( + DeclarativePartition("test_stream", {}, MagicMock(), MagicMock(), slice) + ) + cursor.ensure_at_least_one_state_emitted() + + final_state = cursor.state + assert len(slices) == 3 + assert final_state["use_global_cursor"] is True + assert len(final_state.get("states", [])) == 0 # No per-partition states + assert final_state["parent_state"] == {"updated_at": "2024-01-04T00:00:00Z"} + assert "lookback_window" in final_state + assert len(cursor._cursor_per_partition) <= cursor.DEFAULT_MAX_PARTITIONS_NUMBER + assert mock_cursor.stream_slices.call_count == 3 # Called once for each partition + + +def test_semaphore_cleanup(): + # Create two mock cursors with different states for each partition + mock_cursor_1 = MagicMock() + mock_cursor_1.stream_slices.return_value = iter( + [ + {"slice1": "data1"}, + {"slice2": "data1"}, # First partition slices + ] + ) + mock_cursor_1.state = {"updated_at": "2024-01-02T00:00:00Z"} # State for partition "1" + + mock_cursor_2 = MagicMock() + mock_cursor_2.stream_slices.return_value = iter( + [ + {"slice2": "data2"}, + {"slice2": "data2"}, # Second partition slices + ] + ) + mock_cursor_2.state = {"updated_at": "2024-01-03T00:00:00Z"} # State for partition "2" + + # Configure cursor factory to return different mock cursors based on partition + cursor_factory_mock = MagicMock() + cursor_factory_mock.create.side_effect = [mock_cursor_1, mock_cursor_2] + + cursor = ConcurrentPerPartitionCursor( + cursor_factory=cursor_factory_mock, + partition_router=MagicMock(), + stream_name="test_stream", + stream_namespace=None, + stream_state={}, + message_repository=MagicMock(), + connector_state_manager=MagicMock(), + connector_state_converter=MagicMock(), + cursor_field=CursorField(cursor_field_key="updated_at"), + ) + + # Simulate partitions with unique parent states + slices = [ + StreamSlice(partition={"id": "1"}, cursor_slice={}), + StreamSlice(partition={"id": "2"}, cursor_slice={}), + ] + cursor._partition_router.stream_slices.return_value = iter(slices) + # Simulate unique parent states for each partition + cursor._partition_router.get_stream_state.side_effect = [ + {"parent": {"state": "state1"}}, # Parent state for partition "1" + {"parent": {"state": "state2"}}, # Parent state for partition "2" + ] + + # Generate slices to populate semaphores and parent states + generated_slices = list( + cursor.stream_slices() + ) # Populate _semaphore_per_partition and _partition_parent_state_map + + # Verify initial state + assert len(cursor._semaphore_per_partition) == 2 + assert len(cursor._partition_parent_state_map) == 2 + assert cursor._partition_parent_state_map['{"id":"1"}'] == {"parent": {"state": "state1"}} + assert cursor._partition_parent_state_map['{"id":"2"}'] == {"parent": {"state": "state2"}} + + # Close partitions to acquire semaphores (value back to 0) + for s in generated_slices: + cursor.close_partition(DeclarativePartition("test_stream", {}, MagicMock(), MagicMock(), s)) + + # Check state after closing partitions + assert len(cursor._finished_partitions) == 2 + assert len(cursor._semaphore_per_partition) == 0 + assert '{"id":"1"}' not in cursor._semaphore_per_partition + assert '{"id":"2"}' not in cursor._semaphore_per_partition + assert len(cursor._partition_parent_state_map) == 0 # All parent states should be popped + assert cursor._parent_state == {"parent": {"state": "state2"}} # Last parent state