From 853e5235a23d943611cb9c847d728bbf8520ab15 Mon Sep 17 00:00:00 2001 From: Pedro Antonio Date: Sat, 2 Mar 2024 19:20:19 -0300 Subject: [PATCH 1/2] Handle sync tokens --- .../source_asana/source_asana/streams.py | 60 +++++++++---------- 1 file changed, 29 insertions(+), 31 deletions(-) diff --git a/source-asana/source_asana/source_asana/streams.py b/source-asana/source_asana/source_asana/streams.py index 625c432324..c87abe9c78 100644 --- a/source-asana/source_asana/source_asana/streams.py +++ b/source-asana/source_asana/source_asana/streams.py @@ -235,7 +235,8 @@ def path(self, stream_slice: Mapping[str, Any] = None, **kwargs) -> str: class Events(AsanaStream): primary_key = "created_at" - sync_token = None + sync_token: Optional[str] = None + raise_on_http_errors = False def path(self, stream_slice: Mapping[str, Any] = None, **kwargs) -> str: return "events" @@ -248,49 +249,46 @@ def read_records(self, *args, **kwargs): yield from super().read_records(*args, **kwargs) - # After reading records, update the sync token - self.sync_token = self.get_latest_sync_token() - - def get_latest_sync_token(self) -> str: - latest_sync_token = self.state.get("last_sync_token") # Get the previous sync token - - if latest_sync_token is None: - return None - - return latest_sync_token["sync"] # Extract the sync token value - - def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: - if response.status_code == 412: # Check if response is a 412 error - response_json = response.json() - if "sync" in response_json: # Check if new sync token is available - self.sync_token = response_json["sync"] - else: - self.sync_token = None - self.logger.warning("Sync token expired. Fetch the full dataset for this query now.") - else: - response_json = response.json() + def parse_response( + self, response: requests.Response, **kwargs + ) -> Iterable[Mapping]: + response_json: dict = response.json() + self.sync_token = response_json.get("sync") - # Check if response has new sync token - if "sync" in response_json: - self.sync_token = response_json["sync"] + if ( # Check if response is a 412 error + response.status_code == HTTPStatus.PRECONDITION_FAILED + or not self.sync_token + ): + self.logger.warning( + "Sync token expired. Fetch the full dataset for this query now." + ) + data = response_json.get("data", []) - yield from response_json.get("data", []) + yield from data - def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]: + def next_page_token( + self, response: requests.Response + ) -> Optional[Mapping[str, Any]]: decoded_response = response.json() last_sync = decoded_response.get("sync") if last_sync: return {"sync": last_sync} - def request_params(self, stream_slice: Mapping[str, Any] = None, **kwargs) -> MutableMapping[str, Any]: + def request_params( + self, stream_slice: Mapping[str, Any] = None, **kwargs + ) -> MutableMapping[str, Any]: params = super().request_params(**kwargs) params["resource"] = stream_slice["resource_gid"] + params["sync"] = self.sync_token return params def stream_slices(self, **kwargs) -> Iterable[Optional[Mapping[str, Any]]]: - yield from self.read_slices_from_records(stream_class=Projects, slice_field="resource_gid") - yield from self.read_slices_from_records(stream_class=Tasks, slice_field="resource_gid") - + yield from self.read_slices_from_records( + stream_class=Projects, slice_field="resource_gid" + ) + yield from self.read_slices_from_records( + stream_class=Tasks, slice_field="resource_gid" + ) class OrganizationExports(AsanaStream): From 951b0d78e23f3496a040a754449765696806e947 Mon Sep 17 00:00:00 2001 From: Pedro Antonio Date: Mon, 4 Mar 2024 14:29:48 -0300 Subject: [PATCH 2/2] Fix Events stream --- .../source_asana/source_asana/streams.py | 33 +++++++++---------- 1 file changed, 16 insertions(+), 17 deletions(-) diff --git a/source-asana/source_asana/source_asana/streams.py b/source-asana/source_asana/source_asana/streams.py index c87abe9c78..104bca6ce6 100644 --- a/source-asana/source_asana/source_asana/streams.py +++ b/source-asana/source_asana/source_asana/streams.py @@ -236,43 +236,42 @@ def path(self, stream_slice: Mapping[str, Any] = None, **kwargs) -> str: class Events(AsanaStream): primary_key = "created_at" sync_token: Optional[str] = None + has_more: bool = False raise_on_http_errors = False def path(self, stream_slice: Mapping[str, Any] = None, **kwargs) -> str: return "events" - def read_records(self, *args, **kwargs): - # Check if sync token is available - if self.sync_token is not None: - # Pass the sync token as a request parameter - kwargs["next_page_token"] = {"sync": self.sync_token} - - yield from super().read_records(*args, **kwargs) - def parse_response( self, response: requests.Response, **kwargs ) -> Iterable[Mapping]: - response_json: dict = response.json() - self.sync_token = response_json.get("sync") + payload: dict = response.json() + data = payload.get("data", []) if ( # Check if response is a 412 error response.status_code == HTTPStatus.PRECONDITION_FAILED - or not self.sync_token + and not self.sync_token ): self.logger.warning( "Sync token expired. Fetch the full dataset for this query now." ) - data = response_json.get("data", []) - yield from data + self.sync_token = payload.get("sync") + + return data def next_page_token( self, response: requests.Response ) -> Optional[Mapping[str, Any]]: - decoded_response = response.json() - last_sync = decoded_response.get("sync") - if last_sync: - return {"sync": last_sync} + payload = response.json() + + has_more = bool(payload.get("has_more")) + # self.sync_token = payload.get("sync") + + if not has_more: + self.logger.info("Nothing to read.") + return None + return {"sync": self.sync_token} def request_params( self, stream_slice: Mapping[str, Any] = None, **kwargs