-
Notifications
You must be signed in to change notification settings - Fork 18
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
source-google-sheets-native: stream spreadsheet rows #2310
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,7 +1,8 @@ | ||
from dataclasses import dataclass | ||
from logging import Logger | ||
import ijson | ||
from pydantic import BaseModel | ||
from typing import AsyncGenerator, Any | ||
from typing import AsyncGenerator, Any, AsyncIterator, TypeVar | ||
import abc | ||
import aiohttp | ||
import asyncio | ||
|
@@ -21,6 +22,30 @@ | |
|
||
DEFAULT_AUTHORIZATION_HEADER = "Authorization" | ||
|
||
StreamedObject = TypeVar("StreamedObject", bound=BaseModel) | ||
|
||
class _AsyncStreamWrapper: | ||
""" | ||
Used to adapt an AsyncGenerator of bytes into a file-like object that can be | ||
incrementally read by ijson. | ||
""" | ||
def __init__(self, gen: AsyncGenerator[bytes, None]): | ||
self.gen: AsyncIterator[bytes] = gen | ||
self.buf = b"" | ||
|
||
async def read(self, size: int = -1) -> bytes: | ||
if size == -1: | ||
return self.buf + b"".join([chunk async for chunk in self.gen]) | ||
|
||
while len(self.buf) < size: | ||
try: | ||
self.buf += await anext(self.gen) | ||
except StopAsyncIteration: | ||
break | ||
|
||
data, self.buf = self.buf[:size], self.buf[size:] | ||
return data | ||
|
||
class HTTPError(RuntimeError): | ||
""" | ||
HTTPError is an custom error class that provides the HTTP status code | ||
|
@@ -99,6 +124,32 @@ async def request_lines( | |
yield buffer | ||
|
||
return | ||
|
||
async def request_object_stream( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. To make sure I understand the intended use of this method, it's only to parse JSON objects located at a single prefix in the response, right? For example, in this response body, this method could be used to iterate over the objects under {
"after_cursor": "cursor_to_next_page",
"total_count": 2,
"tickets": [
{"id": 1},
{"id": 2}
]
} There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yep that's right. parser = ijson.parse(file)
after_cursor = None
total_count = None
for prefix, event, value in parser:
if prefix == "after_cursor" and event == "string":
after_cursor = value
elif prefix == "total_count" and event == "number":
total_count = value
elif prefix == "tickets.item":
yield value I'm not totally sure what the best generalization of this pattern is. I felt like it was probably worth having this higher level There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yep, I agree this will be useful even if it won't work for everything. Thanks for putting this together; it's definitely going to be useful when we're processing a huge response. |
||
self, | ||
log: Logger, | ||
cls: type[StreamedObject], | ||
prefix: str, | ||
url: str, | ||
method: str = "GET", | ||
params: dict[str, Any] | None = None, | ||
json: dict[str, Any] | None = None, | ||
form: dict[str, Any] | None = None, | ||
) -> AsyncGenerator[StreamedObject, None]: | ||
""" | ||
Request a url and incrementally decode a stream of JSON objects as | ||
instances of `cls`. | ||
|
||
Prefix is a path within the JSON document where objects to parse reside. | ||
Usually it will end with the ".item" suffix, which allows iteration | ||
through objects in an array. Example: "some.path.to.array.item". | ||
""" | ||
|
||
strm = self._request_stream( | ||
log, url, method, params, json, form, True | ||
) | ||
async for obj in ijson.items_async(_AsyncStreamWrapper(strm), prefix): | ||
yield cls.model_validate(obj) | ||
|
||
@abc.abstractmethod | ||
def _request_stream( | ||
|
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This feels like it was a lot more difficult than it needed to be.
ijson.items
needs something "file-like" with aread
method.aiohttp
doesn't give us anything quite like that (not even its rawcontent
seemed to work). So this adapter is the only way I could get things working with the byte stream coming fromaiohttp
.