Skip to content

Commit

Permalink
estuary-cdk: expand capabilities for incremental json processing
Browse files Browse the repository at this point in the history
A prior implementation of incremental json processing was able to yield records
from a JSON stream by prefix, discarding everything else from the stream. That
suits cases where the entire dataset is represented in a single large response,
but there are cases where the dataset is represented by multiple potentially
very large responses, and each of those responses contains some extra
information we need to proceed, such as a paging token. Parsing and processing
those responses while keeping memory usage efficient within the connector can be
a challenge.

This adds the ability to read a JSON stream incrementally and yield records by
prefix, while also returning anything that's left at the end. Effectively this
"remainder" is built up while processing the document as an object in-memory. As
long as this remainder is relatively small, memory usage should be negligible.

I had assumed CPU performance for this mechanism would be worse than the
standard processing of a response in its entirety through a Pydantic model, but
it turns out it's actually about 5-10% faster. I believe this is because the
JSON parser used by ijson is faster than the one used by Pydantic. We're still
building Pydantic models from the python objects yielded by the ijson parser,
but doing so using `model_validate` instead of `model_validate_json`.

I suspect CPU performance could be improved even more by not using Pydantic
models at all and instead using msgspec structs, but this does not seem
necessary right now.
  • Loading branch information
williamhbaker committed Jan 31, 2025
1 parent 85638f0 commit 6e8e31a
Show file tree
Hide file tree
Showing 6 changed files with 462 additions and 63 deletions.
45 changes: 6 additions & 39 deletions estuary-cdk/estuary_cdk/http.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
from dataclasses import dataclass
from logging import Logger
import ijson
from estuary_cdk.incremental_json_processor import Remainder
from pydantic import BaseModel
from typing import AsyncGenerator, Any, AsyncIterator, TypeVar
from typing import AsyncGenerator, Any, TypeVar
import abc
import aiohttp
import asyncio
Expand All @@ -24,28 +24,6 @@

StreamedObject = TypeVar("StreamedObject", bound=BaseModel)

class _AsyncStreamWrapper:
"""
Used to adapt an AsyncGenerator of bytes into a file-like object that can be
incrementally read by ijson.
"""
def __init__(self, gen: AsyncGenerator[bytes, None]):
self.gen: AsyncIterator[bytes] = gen
self.buf = b""

async def read(self, size: int = -1) -> bytes:
if size == -1:
return self.buf + b"".join([chunk async for chunk in self.gen])

while len(self.buf) < size:
try:
self.buf += await anext(self.gen)
except StopAsyncIteration:
break

data, self.buf = self.buf[:size], self.buf[size:]
return data

class HTTPError(RuntimeError):
"""
HTTPError is an custom error class that provides the HTTP status code
Expand Down Expand Up @@ -125,31 +103,20 @@ async def request_lines(

return

async def request_object_stream(
async def request_stream(
self,
log: Logger,
cls: type[StreamedObject],
prefix: str,
url: str,
method: str = "GET",
params: dict[str, Any] | None = None,
json: dict[str, Any] | None = None,
form: dict[str, Any] | None = None,
) -> AsyncGenerator[StreamedObject, None]:
"""
Request a url and incrementally decode a stream of JSON objects as
instances of `cls`.
Prefix is a path within the JSON document where objects to parse reside.
Usually it will end with the ".item" suffix, which allows iteration
through objects in an array. Example: "some.path.to.array.item".
"""
) -> AsyncGenerator[bytes, None]:
"""Request a url and and return the raw response as a stream of bytes"""

strm = self._request_stream(
return self._request_stream(
log, url, method, params, json, form, True
)
async for obj in ijson.items_async(_AsyncStreamWrapper(strm), prefix):
yield cls.model_validate(obj)

@abc.abstractmethod
def _request_stream(
Expand Down
174 changes: 174 additions & 0 deletions estuary-cdk/estuary_cdk/incremental_json_processor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
from dataclasses import dataclass
from typing import Any, AsyncGenerator, Callable, Generic, List, TypeVar

import ijson
from pydantic import BaseModel

StreamedItem = TypeVar("StreamedItem", bound=BaseModel)

Remainder = TypeVar("Remainder", bound=BaseModel)


class _NoopRemainder(BaseModel):
pass


class IncrementalJsonProcessor(Generic[StreamedItem, Remainder]):
"""
Processes a stream of JSON bytes incrementally, yielding objects of type
`streamed_item_cls` along the way. Once iteration is complete, the
"remainder" of the input that was not present under the specific prefix can
be obtained using the `get_remainder` method.
The prefix is a path within the JSON document where objects to parse reside.
Usually it will end with the ".item" suffix, which allows iteration through
objects in an array. More nuanced scenarios are also possible.
Does not currently support parsing NDJSON since that can be more efficiently
processed via other means.
Example usage if you only want the items and don't care about the remainder:
```python
async for item in IncrementalJsonProcessor(
input,
prefix="data.items",
streamed_item_cls=MyItem,
):
do_something_with(item)
```
For using the remainder after iteration is complete, keep a reference to the
processor:
```python
processor = IncrementalJsonProcessor(
input,
prefix="data.items",
streamed_item_cls=MyItem,
remainder_cls=MyRemainder,
)
async for item processor:
do_something_with(item)
do_something_with(processor.get_remainder())
```
See the tests file `test_incremental_json_processor.py` for more examples,
including various formulations of the prefix to support more complex data
structures.
"""

def __init__(
self,
input: AsyncGenerator[bytes, None],
prefix: str,
streamed_item_cls: type[StreamedItem],
remainder_cls: type[Remainder] = _NoopRemainder,
):
self.input = input
self.prefix = prefix
self.streamed_item_cls = streamed_item_cls
self.remainder_cls = remainder_cls
self.parser = ijson.parse_async(_AsyncStreamWrapper(input))
self.remainder_builder = _ObjectBuilder()
self.done = False

def __aiter__(self):
return self

async def __anext__(self) -> StreamedItem:
while True:
try:
current, event, value = await anext(self.parser)
except StopAsyncIteration:
self.done = True
raise

if current == self.prefix:
break

self.remainder_builder.event(event, value)

if event not in ("start_map", "start_array"):
return self.streamed_item_cls.model_validate(value)

depth = 1
obj = _ObjectBuilder()
while depth:
obj.event(event, value)
try:
current, event, value = await anext(self.parser)
except StopAsyncIteration:
raise ValueError("Incomplete JSON structure")

if event in ("start_map", "start_array"):
depth += 1
elif event in ("end_map", "end_array"):
depth -= 1

return self.streamed_item_cls.model_validate(obj.get_value())

def get_remainder(self) -> Remainder:
if not self.done:
raise Exception(
"attempted to access remainder before fully processing input"
)

return self.remainder_cls.model_validate(self.remainder_builder.get_value())


class _AsyncStreamWrapper:
def __init__(self, gen: AsyncGenerator[bytes, None]):
self.gen = gen
self.buf = b""

async def read(self, size: int = -1) -> bytes:
if size == -1:
return self.buf + b"".join([chunk async for chunk in self.gen])

while len(self.buf) < size:
try:
self.buf += await anext(self.gen)
except StopAsyncIteration:
break

data, self.buf = self.buf[:size], self.buf[size:]
return data


class _ObjectBuilder:
def __init__(self) -> None:
self.value: Any = None
self.key: str | None = None

def initial_set(value: Any) -> None:
self.value = value

self.stack: List[Callable[[Any], None]] = [initial_set]

def event(self, event: str, value: Any) -> None:
match event:
case "map_key":
self.key = value
case "start_map":
mappable = dict()
self.stack[-1](mappable)

def setter(value: Any) -> None:
assert self.key is not None
mappable[self.key] = value

self.stack.append(setter)
case "start_array":
array = []
self.stack[-1](array)

self.stack.append(array.append)
case "end_array" | "end_map":
self.stack.pop()
case _:
self.stack[-1](value)

def get_value(self) -> Any:
del self.stack[:] # This seems to improve memory usage significantly.
return self.value
1 change: 1 addition & 0 deletions estuary-cdk/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ debugpy = "^1.8.0"
mypy = "^1.8.0"
pytest = "^7.4.3"
pytest-insta = "^0.3.0"
pytest-asyncio = "0.23.8"

[build-system]
requires = ["poetry-core"]
Expand Down
Loading

0 comments on commit 6e8e31a

Please sign in to comment.