-
Notifications
You must be signed in to change notification settings - Fork 18
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
estuary-cdk: expand capabilities for incremental json processing #2318
Conversation
456a5c6
to
b550c8b
Compare
@@ -125,31 +103,20 @@ async def request_lines( | |||
|
|||
return | |||
|
|||
async def request_object_stream( | |||
async def request_stream( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Having this functionality tied to http
felt a little cumbersome, and I guess you could end up with a byte stream in some other way, so I separated the JSON processing from the byte stream requesting.
A prior implementation of incremental json processing was able to yield records from a JSON stream by prefix, discarding everything else from the stream. That suits cases where the entire dataset is represented in a single large response, but there are cases where the dataset is represented by multiple potentially very large responses, and each of those responses contains some extra information we need to proceed, such as a paging token. Parsing and processing those responses while keeping memory usage efficient within the connector can be a challenge. This adds the ability to read a JSON stream incrementally and yield records by prefix, while also returning anything that's left at the end. Effectively this "remainder" is built up while processing the document as an object in-memory. As long as this remainder is relatively small, memory usage should be negligible. I had assumed CPU performance for this mechanism would be worse than the standard processing of a response in its entirety through a Pydantic model, but it turns out it's actually about 5-10% faster. I believe this is because the JSON parser used by ijson is faster than the one used by Pydantic. We're still building Pydantic models from the python objects yielded by the ijson parser, but doing so using `model_validate` instead of `model_validate_json`. I suspect CPU performance could be improved even more by not using Pydantic models at all and instead using msgspec structs, but this does not seem necessary right now.
b550c8b
to
6e8e31a
Compare
) -> tuple[str | None, bool]: | ||
# Instead of using Pydantic's model_validate_json that uses json.loads internally, | ||
# use json.JSONDecoder().raw_decode to reduce memory overhead when processing the response. | ||
raw_response_bytes = await http.request(log, url, params=params) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This would previously require buffering the entire response's bytes and plucking out a couple values from it, then making the same request again to incrementally process. Now we will only need to make the request a single time while still being able to incrementally process it.
@Alex-Bair I think you have a test for zendesk-native memory usage so I'd be interested to know what that looks like with this change. |
The steady-state memory usage looked about the same to me, hovering between 13-17%. But with this change, there were no spikes up to ~34% since the connector doesn't need to make an additional request for pagination information. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! Really nice job!
Description:
A prior implementation of incremental JSON processing was able to yield records from a JSON stream by prefix, discarding everything else from the stream. That suits cases where the entire dataset is represented in a single large response, but there are cases where the dataset is represented by multiple potentially very large responses, and each of those responses contains some extra information we need to proceed, such as a paging token. Parsing and processing those responses while keeping memory usage efficient within the connector can be a challenge.
This adds the ability to read a JSON stream incrementally and yield records by prefix, while also returning anything that's left at the end. Effectively this "remainder" is built up while processing the document as an object in-memory. As long as this remainder is relatively small, memory usage should be negligible.
I had assumed CPU performance for this mechanism would be worse than the standard processing of a response in its entirety through a Pydantic model, but it turns out it's actually about 5-10% faster. I believe this is because the JSON parser used by ijson is faster than the one used by Pydantic. We're still building Pydantic models from the python objects yielded by the ijson parser, but doing so using
model_validate
instead ofmodel_validate_json
.I suspect CPU performance could be improved even more by not using Pydantic models at all and instead using msgspec structs, but this does not seem necessary right now.
Workflow steps:
(How does one use this feature, and how has it changed)
Documentation links affected:
(list any documentation links that you created, or existing ones that you've identified as needing updates, along with a brief description)
Notes for reviewers:
(anything that might help someone review this PR)
This change is