Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(concurrent perpartition cursor): Add parent state updates #343

Merged
merged 31 commits into from
Feb 24, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
707a6c6
Add API Budget
tolik0 Feb 5, 2025
b6bcdd7
Refactor to move api_budget to root level
tolik0 Feb 6, 2025
040ff9e
Format
tolik0 Feb 6, 2025
824d2c6
Merge branch 'main' into tolik0/add-api-budget
tolik0 Feb 7, 2025
15f830c
Update for backward compatibility
tolik0 Feb 7, 2025
1285668
Add unit tests
tolik0 Feb 9, 2025
7be9842
Add FixedWindowCallRatePolicy unit test
tolik0 Feb 9, 2025
8d3bfce
Change the partitions limit to 1000
tolik0 Feb 10, 2025
509ea05
Refactored switching logic
tolik0 Feb 10, 2025
8d44150
Increase the limit for number of partitions in memory
tolik0 Feb 10, 2025
b3f9897
Merge branch 'tolik0/add-api-budget-limit-1000' into tolik0/refactor-…
tolik0 Feb 11, 2025
342375c
Refactor ConcurrentPerPartitionCursor to not use ConcurrentCursor wit…
tolik0 Feb 12, 2025
05f4db7
Delete code from another branch
tolik0 Feb 12, 2025
c0bc645
Fix cursor value from record
tolik0 Feb 12, 2025
52b95e3
Add throttling for state emitting in ConcurrentPerPartitionCursor
tolik0 Feb 13, 2025
1166a7a
Fix unit tests
tolik0 Feb 17, 2025
4a7d9ec
Move switching to global logic
tolik0 Feb 17, 2025
19ad269
Revert test limits
tolik0 Feb 17, 2025
667700f
Merge branch 'main' into tolik0/refactor-concurrent-global-cursor
tolik0 Feb 17, 2025
6498528
Fix format
tolik0 Feb 17, 2025
d3e7fe2
Add parent state updates
tolik0 Feb 17, 2025
7b4964e
Move acquiring the semaphore
tolik0 Feb 17, 2025
8617cc8
Merge branch 'tolik0/refactor-concurrent-global-cursor' into tolik0/c…
tolik0 Feb 17, 2025
a8db6b6
Merge branch 'main' into tolik0/concurrent-perpartition-add-parent-st…
tolik0 Feb 18, 2025
203c131
Refactor to store only unique states
tolik0 Feb 18, 2025
671fab4
Add intermediate states validation to unit tests
tolik0 Feb 18, 2025
a1d98fb
Fix format
tolik0 Feb 18, 2025
eff25ec
Add unit tests
tolik0 Feb 19, 2025
c51f840
Update unit tests
tolik0 Feb 21, 2025
4a18954
Add deleting finished semaphores
tolik0 Feb 21, 2025
a7ece97
Delete testing prints
tolik0 Feb 21, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,10 @@ def __init__(
# the oldest partitions can be efficiently removed, maintaining the most recent partitions.
self._cursor_per_partition: OrderedDict[str, ConcurrentCursor] = OrderedDict()
self._semaphore_per_partition: OrderedDict[str, threading.Semaphore] = OrderedDict()

# Parent-state tracking: store each partition’s parent state in creation order
self._partition_parent_state_map: OrderedDict[str, Mapping[str, Any]] = OrderedDict()

self._finished_partitions: set[str] = set()
self._lock = threading.Lock()
self._timer = Timer()
Expand Down Expand Up @@ -155,11 +159,62 @@ def close_partition(self, partition: Partition) -> None:
and self._semaphore_per_partition[partition_key]._value == 0
):
self._update_global_cursor(cursor.state[self.cursor_field.cursor_field_key])
self._emit_state_message()

self._check_and_update_parent_state()

self._emit_state_message()

def _check_and_update_parent_state(self) -> None:
"""
Pop the leftmost partition state from _partition_parent_state_map only if
*all partitions* up to (and including) that partition key in _semaphore_per_partition
are fully finished (i.e. in _finished_partitions and semaphore._value == 0).
Additionally, delete finished semaphores with a value of 0 to free up memory,
as they are only needed to track errors and completion status.
"""
last_closed_state = None

while self._partition_parent_state_map:
# Look at the earliest partition key in creation order
earliest_key = next(iter(self._partition_parent_state_map))

# Verify ALL partitions from the left up to earliest_key are finished
all_left_finished = True
for p_key, sem in list(
self._semaphore_per_partition.items()
): # Use list to allow modification during iteration
# If any earlier partition is still not finished, we must stop
if p_key not in self._finished_partitions or sem._value != 0:
all_left_finished = False
break
# Once we've reached earliest_key in the semaphore order, we can stop checking
if p_key == earliest_key:
break

# If the partitions up to earliest_key are not all finished, break the while-loop
if not all_left_finished:
break

# Pop the leftmost entry from parent-state map
_, closed_parent_state = self._partition_parent_state_map.popitem(last=False)
last_closed_state = closed_parent_state

# Clean up finished semaphores with value 0 up to and including earliest_key
for p_key in list(self._semaphore_per_partition.keys()):
sem = self._semaphore_per_partition[p_key]
if p_key in self._finished_partitions and sem._value == 0:
del self._semaphore_per_partition[p_key]
logger.debug(f"Deleted finished semaphore for partition {p_key} with value 0")
if p_key == earliest_key:
break

# Update _parent_state if we popped at least one partition
if last_closed_state is not None:
self._parent_state = last_closed_state

def ensure_at_least_one_state_emitted(self) -> None:
"""
The platform expect to have at least one state message on successful syncs. Hence, whatever happens, we expect this method to be
The platform expects at least one state message on successful syncs. Hence, whatever happens, we expect this method to be
called.
"""
if not any(
Expand Down Expand Up @@ -201,13 +256,19 @@ def stream_slices(self) -> Iterable[StreamSlice]:

slices = self._partition_router.stream_slices()
self._timer.start()
for partition in slices:
yield from self._generate_slices_from_partition(partition)
for partition, last, parent_state in iterate_with_last_flag_and_state(
slices, self._partition_router.get_stream_state
):
yield from self._generate_slices_from_partition(partition, parent_state)

def _generate_slices_from_partition(self, partition: StreamSlice) -> Iterable[StreamSlice]:
def _generate_slices_from_partition(
self, partition: StreamSlice, parent_state: Mapping[str, Any]
) -> Iterable[StreamSlice]:
# Ensure the maximum number of partitions is not exceeded
self._ensure_partition_limit()

partition_key = self._to_partition_key(partition.partition)

cursor = self._cursor_per_partition.get(self._to_partition_key(partition.partition))
if not cursor:
cursor = self._create_cursor(
Expand All @@ -216,18 +277,26 @@ def _generate_slices_from_partition(self, partition: StreamSlice) -> Iterable[St
)
with self._lock:
self._number_of_partitions += 1
self._cursor_per_partition[self._to_partition_key(partition.partition)] = cursor
self._semaphore_per_partition[self._to_partition_key(partition.partition)] = (
threading.Semaphore(0)
)
self._cursor_per_partition[partition_key] = cursor
self._semaphore_per_partition[partition_key] = threading.Semaphore(0)

with self._lock:
if (
len(self._partition_parent_state_map) == 0
or self._partition_parent_state_map[
next(reversed(self._partition_parent_state_map))
]
!= parent_state
):
self._partition_parent_state_map[partition_key] = deepcopy(parent_state)

for cursor_slice, is_last_slice, _ in iterate_with_last_flag_and_state(
cursor.stream_slices(),
lambda: None,
):
self._semaphore_per_partition[self._to_partition_key(partition.partition)].release()
self._semaphore_per_partition[partition_key].release()
if is_last_slice:
self._finished_partitions.add(self._to_partition_key(partition.partition))
self._finished_partitions.add(partition_key)
yield StreamSlice(
partition=partition, cursor_slice=cursor_slice, extra_fields=partition.extra_fields
)
Expand Down Expand Up @@ -257,9 +326,9 @@ def _ensure_partition_limit(self) -> None:
while len(self._cursor_per_partition) > self.DEFAULT_MAX_PARTITIONS_NUMBER - 1:
# Try removing finished partitions first
for partition_key in list(self._cursor_per_partition.keys()):
if (
partition_key in self._finished_partitions
and self._semaphore_per_partition[partition_key]._value == 0
if partition_key in self._finished_partitions and (
partition_key not in self._semaphore_per_partition
or self._semaphore_per_partition[partition_key]._value == 0
):
oldest_partition = self._cursor_per_partition.pop(
partition_key
Expand Down Expand Up @@ -338,9 +407,6 @@ def _set_initial_state(self, stream_state: StreamState) -> None:
self._cursor_per_partition[self._to_partition_key(state["partition"])] = (
self._create_cursor(state["cursor"])
)
self._semaphore_per_partition[self._to_partition_key(state["partition"])] = (
threading.Semaphore(0)
)

# set default state for missing partitions if it is per partition with fallback to global
if self._GLOBAL_STATE_KEY in stream_state:
Expand Down
Loading
Loading