Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

source-google-sheets-native: stream spreadsheet rows #2310

Merged
merged 3 commits into from
Jan 30, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 52 additions & 1 deletion estuary-cdk/estuary_cdk/http.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
from dataclasses import dataclass
from logging import Logger
import ijson
from pydantic import BaseModel
from typing import AsyncGenerator, Any
from typing import AsyncGenerator, Any, AsyncIterator, TypeVar
import abc
import aiohttp
import asyncio
Expand All @@ -21,6 +22,30 @@

DEFAULT_AUTHORIZATION_HEADER = "Authorization"

StreamedObject = TypeVar("StreamedObject", bound=BaseModel)

class _AsyncStreamWrapper:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This feels like it was a lot more difficult than it needed to be. ijson.items needs something "file-like" with a read method. aiohttp doesn't give us anything quite like that (not even its raw content seemed to work). So this adapter is the only way I could get things working with the byte stream coming from aiohttp.

"""
Used to adapt an AsyncGenerator of bytes into a file-like object that can be
incrementally read by ijson.
"""
def __init__(self, gen: AsyncGenerator[bytes, None]):
self.gen: AsyncIterator[bytes] = gen
self.buf = b""

async def read(self, size: int = -1) -> bytes:
if size == -1:
return self.buf + b"".join([chunk async for chunk in self.gen])

while len(self.buf) < size:
try:
self.buf += await anext(self.gen)
except StopAsyncIteration:
break

data, self.buf = self.buf[:size], self.buf[size:]
return data

class HTTPError(RuntimeError):
"""
HTTPError is an custom error class that provides the HTTP status code
Expand Down Expand Up @@ -99,6 +124,32 @@ async def request_lines(
yield buffer

return

async def request_object_stream(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To make sure I understand the intended use of this method, it's only to parse JSON objects located at a single prefix in the response, right? For example, in this response body, this method could be used to iterate over the objects under tickets, and a separate request will need to be made to get any of the top level fields (after_cursor, etc.)?

{
    "after_cursor": "cursor_to_next_page",
    "total_count": 2,
    "tickets": [
        {"id": 1},
        {"id": 2}
    ]
  }

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep that's right. ijson does have a lower-level interface where you could do it all in a single request, something like this:

    parser = ijson.parse(file)

    after_cursor = None
    total_count = None

    for prefix, event, value in parser:
        if prefix == "after_cursor" and event == "string":
            after_cursor = value
        elif prefix == "total_count" and event == "number":
            total_count = value
        elif prefix == "tickets.item": 
            yield value

I'm not totally sure what the best generalization of this pattern is. I felt like it was probably worth having this higher level request_object_stream capability, although it won't work for everything.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, I agree this will be useful even if it won't work for everything. Thanks for putting this together; it's definitely going to be useful when we're processing a huge response.

self,
log: Logger,
cls: type[StreamedObject],
prefix: str,
url: str,
method: str = "GET",
params: dict[str, Any] | None = None,
json: dict[str, Any] | None = None,
form: dict[str, Any] | None = None,
) -> AsyncGenerator[StreamedObject, None]:
"""
Request a url and incrementally decode a stream of JSON objects as
instances of `cls`.

Prefix is a path within the JSON document where objects to parse reside.
Usually it will end with the ".item" suffix, which allows iteration
through objects in an array. Example: "some.path.to.array.item".
"""

strm = self._request_stream(
log, url, method, params, json, form, True
)
async for obj in ijson.items_async(_AsyncStreamWrapper(strm), prefix):
yield cls.model_validate(obj)

@abc.abstractmethod
def _request_stream(
Expand Down
1 change: 1 addition & 0 deletions estuary-cdk/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ aiohttp = "^3.9.3"
orjson = "^3.9.15"
pydantic = ">1.10,<3"
xxhash = "^3.4.1"
ijson = "^3.3.0"

[tool.poetry.group.dev.dependencies]
debugpy = "^1.8.0"
Expand Down
106 changes: 105 additions & 1 deletion source-braintree-native/poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading