Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

estuary-cdk: expand capabilities for incremental json processing #2318

Merged
merged 1 commit into from
Feb 3, 2025

Conversation

williamhbaker
Copy link
Member

@williamhbaker williamhbaker commented Jan 31, 2025

Description:

A prior implementation of incremental JSON processing was able to yield records from a JSON stream by prefix, discarding everything else from the stream. That suits cases where the entire dataset is represented in a single large response, but there are cases where the dataset is represented by multiple potentially very large responses, and each of those responses contains some extra information we need to proceed, such as a paging token. Parsing and processing those responses while keeping memory usage efficient within the connector can be a challenge.

This adds the ability to read a JSON stream incrementally and yield records by prefix, while also returning anything that's left at the end. Effectively this "remainder" is built up while processing the document as an object in-memory. As long as this remainder is relatively small, memory usage should be negligible.

I had assumed CPU performance for this mechanism would be worse than the standard processing of a response in its entirety through a Pydantic model, but it turns out it's actually about 5-10% faster. I believe this is because the JSON parser used by ijson is faster than the one used by Pydantic. We're still building Pydantic models from the python objects yielded by the ijson parser, but doing so using model_validate instead of model_validate_json.

I suspect CPU performance could be improved even more by not using Pydantic models at all and instead using msgspec structs, but this does not seem necessary right now.

Workflow steps:

(How does one use this feature, and how has it changed)

Documentation links affected:

(list any documentation links that you created, or existing ones that you've identified as needing updates, along with a brief description)

Notes for reviewers:

(anything that might help someone review this PR)


This change is Reviewable

@williamhbaker williamhbaker force-pushed the wb/object-stream-remainder branch from 456a5c6 to b550c8b Compare January 31, 2025 20:38
@@ -125,31 +103,20 @@ async def request_lines(

return

async def request_object_stream(
async def request_stream(
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having this functionality tied to http felt a little cumbersome, and I guess you could end up with a byte stream in some other way, so I separated the JSON processing from the byte stream requesting.

A prior implementation of incremental json processing was able to yield records
from a JSON stream by prefix, discarding everything else from the stream. That
suits cases where the entire dataset is represented in a single large response,
but there are cases where the dataset is represented by multiple potentially
very large responses, and each of those responses contains some extra
information we need to proceed, such as a paging token. Parsing and processing
those responses while keeping memory usage efficient within the connector can be
a challenge.

This adds the ability to read a JSON stream incrementally and yield records by
prefix, while also returning anything that's left at the end. Effectively this
"remainder" is built up while processing the document as an object in-memory. As
long as this remainder is relatively small, memory usage should be negligible.

I had assumed CPU performance for this mechanism would be worse than the
standard processing of a response in its entirety through a Pydantic model, but
it turns out it's actually about 5-10% faster. I believe this is because the
JSON parser used by ijson is faster than the one used by Pydantic. We're still
building Pydantic models from the python objects yielded by the ijson parser,
but doing so using `model_validate` instead of `model_validate_json`.

I suspect CPU performance could be improved even more by not using Pydantic
models at all and instead using msgspec structs, but this does not seem
necessary right now.
@williamhbaker williamhbaker force-pushed the wb/object-stream-remainder branch from b550c8b to 6e8e31a Compare January 31, 2025 20:42
) -> tuple[str | None, bool]:
# Instead of using Pydantic's model_validate_json that uses json.loads internally,
# use json.JSONDecoder().raw_decode to reduce memory overhead when processing the response.
raw_response_bytes = await http.request(log, url, params=params)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would previously require buffering the entire response's bytes and plucking out a couple values from it, then making the same request again to incrementally process. Now we will only need to make the request a single time while still being able to incrementally process it.

@williamhbaker williamhbaker marked this pull request as ready for review January 31, 2025 20:45
@williamhbaker
Copy link
Member Author

@Alex-Bair I think you have a test for zendesk-native memory usage so I'd be interested to know what that looks like with this change.

@Alex-Bair
Copy link
Member

@Alex-Bair I think you have a test for zendesk-native memory usage so I'd be interested to know what that looks like with this change.

The steady-state memory usage looked about the same to me, hovering between 13-17%. But with this change, there were no spikes up to ~34% since the connector doesn't need to make an additional request for pagination information.

Copy link
Member

@Alex-Bair Alex-Bair left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! Really nice job!

@williamhbaker williamhbaker merged commit 142c259 into main Feb 3, 2025
73 of 84 checks passed
@williamhbaker williamhbaker deleted the wb/object-stream-remainder branch February 3, 2025 15:56
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants