Skip to content

Commit

Permalink
source-hubspot-native: parallel fetching of batches with associations
Browse files Browse the repository at this point in the history
Adds the `buffer_ordered` module, which can be used for processing streams of
awaitables with a configurable degree of concurrency, and returns the results of
those awaitables in the same order they were generated.

This is used in `fetch_changes_with_associations`. This pre-existing function
builds a list of object IDs to fetch associations for, and would previously
request them one batch at a time. Using `buffered_ordered` here should increase
throughput by making several of these requests concurrently instead.

`buffer_ordered` is split out into a separate module like this, even though it
is only used in that one place so far. I thought it was easier to reason about
and test this way, and I suspect this kind of strategy will be useful for future
efforts as well, so a little encapsulation now shouldn't hurt.
  • Loading branch information
williamhbaker committed Jan 8, 2025
1 parent ff450e8 commit 29bee35
Show file tree
Hide file tree
Showing 5 changed files with 191 additions and 19 deletions.
40 changes: 29 additions & 11 deletions source-hubspot-native/poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion source-hubspot-native/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ python = "^3.12"
[tool.poetry.group.dev.dependencies]
debugpy = "^1.8.0"
mypy = "^1.8.0"
pytest = "^7.4.3"
pytest = "^8.3.4"
pytest-asyncio = "^0.25.1"
pytest-insta = "^0.3.0"

[build-system]
Expand Down
20 changes: 13 additions & 7 deletions source-hubspot-native/source_hubspot_native/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
import functools
import itertools
from datetime import UTC, datetime, timedelta
import json
from logging import Logger
from typing import (
Any,
Expand Down Expand Up @@ -51,6 +50,7 @@
)

import source_hubspot_native.emitted_changes_cache as cache
from source_hubspot_native.buffer_ordered import buffer_ordered

HUB = "https://api.hubapi.com"

Expand Down Expand Up @@ -474,18 +474,24 @@ async def fetch_changes_with_associations(

recent.sort() # Oldest updates first.

for batch_it in itertools.batched(recent, 50):
batch = list(batch_it)

async def _do_batch_fetch(batch: list[tuple[datetime, str]]) -> Iterable[tuple[datetime, str, CRMObject]]:
# Enable lookup of datetimes for IDs from the result batch.
dts = {id: dt for dt, id in batch}

documents: BatchResult[CRMObject] = await fetch_batch_with_associations(
log, cls, http, object_name, [id for _, id in batch]
)
for doc in documents.results:
id = str(doc.id)
yield dts[id], id, doc

return ((dts[str(doc.id)], str(doc.id), doc) for doc in documents.results)


async def _batches_gen() -> AsyncGenerator[Awaitable[Iterable[tuple[datetime, str, CRMObject]]], None]:
for batch_it in itertools.batched(recent, 50):
yield _do_batch_fetch(list(batch_it))

async for res in buffer_ordered(_batches_gen(), 3):
for ts, id, doc in res:
yield ts, id, doc


async def fetch_search_objects(
Expand Down
122 changes: 122 additions & 0 deletions source-hubspot-native/source_hubspot_native/buffer_ordered.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
import asyncio
from dataclasses import dataclass
from typing import Any, AsyncGenerator, Awaitable, TypeVar

T = TypeVar("T")


@dataclass
class BufferWork[T]:
aw: Awaitable[T]
result: asyncio.Queue[T | None]

next: asyncio.Future["BufferWork[T] | None"]


async def buffer_ordered(
aws: AsyncGenerator[Awaitable[T], None],
concurrency: int,
) -> AsyncGenerator[T, None]:
"""
Run the stream of awaitables 'aws' concurrently and return the results of
each in order. There may be up to 'concurrency' results from the awaitables
held in memory at a time, so a lower concurrency may improve memory usage at
the expense of throughput.
Args:
aws: A stream of awaitables to run concurrently. concurrency: The
maximum number of concurrent awaitables to run.
Returns:
A stream of results from the awaitables, with the order matching the
order yielded by 'aws'.
"""

work: asyncio.Queue[BufferWork[T] | None] = asyncio.Queue(1)
next: asyncio.Future[BufferWork[T] | None] = asyncio.Future()

async def _producer():
current = next
try:
async for aw in aws:
this_next: asyncio.Future[BufferWork[T] | None] = asyncio.Future()
this_work = BufferWork(
aw=aw,
result=asyncio.Queue(1),
next=this_next,
)
current.set_result(this_work)
current = this_next
await work.put(this_work)

# Wait until the last item has been removed from the queue by a
# worker before requesting anything else from the awaitables
# generator. This prevents an awaitable from being held in limbo
# in this loop without being awaited if the producer or worker
# raises an exception and exits early.
await work.join()

# Send stopping signals to the output loop and workers.
current.set_result(None)
for _ in range(concurrency):
await work.put(None)
except Exception as e:
# Signal the output loop to stop.
current.set_result(None)
raise

async def _worker():
while True:
this_work = await work.get()
work.task_done() # Signal removal from the queue, per the note above in _producer.
if this_work is None:
break

try:
this_work.result.put_nowait(await this_work.aw)
except Exception as e:
# Signal the output loop to stop.
this_work.result.put_nowait(None)
raise
# Do not get another awaitable until this one has been fully handled
# by the output loop. This limits the number of pending work items
# to output, which is important their result may not necessarily be
# small and will be held in memory.
await this_work.result.join()

try:
async with asyncio.TaskGroup() as tg:
for coro in [_producer(), *[_worker() for _ in range(concurrency)]]:
tg.create_task(coro)

# Output loop.
while True:
finished_work = await next
if finished_work is None:
break

this_result = await finished_work.result.get()
if this_result is None:
break

yield this_result

# Signal the worker so it can start on another awaitable.
finished_work.result.task_done()
# Output the next result per the original ordering of input awaitables.
next = finished_work.next
except ExceptionGroup as eg:
# Raise the first error from the producer or any of the workers.
for e in eg.exceptions:
raise e
except Exception as e:
raise
finally:
# Await any queued awaitables, discarding further errors.
while not work.empty():
remaining_work = work.get_nowait()
if remaining_work is not None:
try:
await remaining_work.aw
except Exception as e:
pass
25 changes: 25 additions & 0 deletions source-hubspot-native/tests/test_buffer_ordered.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import asyncio
import random
from typing import AsyncGenerator, Awaitable

import pytest
import source_hubspot_native.buffer_ordered


@pytest.mark.asyncio
async def test_buffer_ordered():
fixture = [i for i in range(1007)]

async def _input() -> AsyncGenerator[Awaitable[int], None]:
for i in fixture:
# Include a short random delay to keep things interesting.
yield asyncio.sleep(random.randint(1, 10) / 1000, result=i)

output = []
async for result in source_hubspot_native.buffer_ordered.buffer_ordered(
_input(),
20,
):
output.append(result)

assert fixture == output

0 comments on commit 29bee35

Please sign in to comment.