From eba13ca98a212e2918017763d562dd07b060222e Mon Sep 17 00:00:00 2001 From: Alex Bair Date: Mon, 3 Feb 2025 11:15:29 -0800 Subject: [PATCH 1/4] source-zendesk-support-native: validate start date is at least the epoch Using a start date before the epoch (i.e. a negative timestamp) in requests to Zendesk result in a 400 error. Validating the start date isn't before the epoch should avoid these errors. --- .../source_zendesk_support_native/models.py | 4 ++++ .../source_zendesk_support_native/resources.py | 3 +-- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/source-zendesk-support-native/source_zendesk_support_native/models.py b/source-zendesk-support-native/source_zendesk_support_native/models.py index 0049afa60a..b0e8910843 100644 --- a/source-zendesk-support-native/source_zendesk_support_native/models.py +++ b/source-zendesk-support-native/source_zendesk_support_native/models.py @@ -20,6 +20,9 @@ from pydantic import AfterValidator, AwareDatetime, BaseModel, Field +EPOCH = datetime(1970, 1, 1, tzinfo=UTC) + + def urlencode_field(field: str): return "{{#urlencode}}{{{ " + field + " }}}{{/urlencode}}" @@ -88,6 +91,7 @@ class EndpointConfig(BaseModel): description="UTC date and time in the format YYYY-MM-DDTHH:MM:SSZ. Any data generated before this date will not be replicated. If left blank, the start date will be set to 30 days before the present.", title="Start Date", default_factory=default_start_date, + ge=EPOCH, ) credentials: OAuth2Credentials | ApiToken = Field( discriminator="credentials_title", diff --git a/source-zendesk-support-native/source_zendesk_support_native/resources.py b/source-zendesk-support-native/source_zendesk_support_native/resources.py index 12607f801a..972057a78b 100644 --- a/source-zendesk-support-native/source_zendesk_support_native/resources.py +++ b/source-zendesk-support-native/source_zendesk_support_native/resources.py @@ -18,6 +18,7 @@ ResourceState, TimestampedResource, ZendeskResource, + EPOCH, CLIENT_SIDE_FILTERED_CURSOR_PAGINATED_RESOURCES, FULL_REFRESH_CURSOR_PAGINATED_RESOURCES, INCREMENTAL_CURSOR_EXPORT_RESOURCES, @@ -46,8 +47,6 @@ TIME_PARAMETER_DELAY, ) -EPOCH = datetime(1970, 1, 1, tzinfo=UTC) - async def validate_credentials( log: Logger, http: HTTPMixin, config: EndpointConfig ): From 5e762ac874ae994a13b2c34acfdd9cb964edd4fd Mon Sep 17 00:00:00 2001 From: Alex Bair Date: Mon, 3 Feb 2025 13:26:09 -0800 Subject: [PATCH 2/4] source-zendesk-support-native: don't fetch child resources while iterating through a tickets response With the change to streaming incremental export responses, ticket child resource streams ended up trying to maintain a long-lived tickets response while fetching child resources for each ticket within the response. This caused `aiohttp` TimeoutErrors. To get around this, we fetch all tickets in a single response then fetch child resources for those tickets. --- .../source_zendesk_support_native/api.py | 79 ++++++++++++++----- .../source_zendesk_support_native/models.py | 6 ++ 2 files changed, 65 insertions(+), 20 deletions(-) diff --git a/source-zendesk-support-native/source_zendesk_support_native/api.py b/source-zendesk-support-native/source_zendesk_support_native/api.py index 8af6d5f251..56908d4aea 100644 --- a/source-zendesk-support-native/source_zendesk_support_native/api.py +++ b/source-zendesk-support-native/source_zendesk_support_native/api.py @@ -12,6 +12,7 @@ FullRefreshCursorPaginatedResponse, ZendeskResource, TimestampedResource, + AbbreviatedTicket, IncrementalCursorExportResponse, TicketsResponse, UsersResponse, @@ -466,16 +467,37 @@ async def fetch_ticket_child_resources( tickets_generator = _fetch_incremental_cursor_export_resources(http, subdomain, "tickets", start_date, cursor, log) - async for result in tickets_generator: - if isinstance(result, str): - yield (result,) - else: - status = getattr(result, "status") - if status == "deleted": - continue + tickets: list[AbbreviatedTicket] = [] + + while True: + # Fetching comments for each ticket as we're streaming a tickets response often triggers aiohttp's TimeoutError. + # To avoid these TimeoutErrors, we fetch all ticket ids in a single response, then fetch the child resources for + # those ticket ids. + + next_page_cursor: str | None = None + async for result in tickets_generator: + if isinstance(result, TimestampedResource): + tickets.append(AbbreviatedTicket( + id=result.id, + status=getattr(result, "status"), + updated_at=result.updated_at + )) + elif isinstance(result, str): + next_page_cursor = result + break - async for child_resource in _fetch_ticket_child_resource(http, subdomain, path, response_model, result.id, log): - yield child_resource + if len(tickets) > 0 and next_page_cursor: + for ticket in tickets: + if ticket.status == 'deleted': + continue + + async for child_resource in _fetch_ticket_child_resource(http, subdomain, path, response_model, ticket.id, log): + yield child_resource + + yield (next_page_cursor,) + tickets = [] + elif not next_page_cursor: + break async def backfill_ticket_child_resources( @@ -494,18 +516,35 @@ async def backfill_ticket_child_resources( generator = _fetch_incremental_cursor_export_resources(http, subdomain, "tickets", start_date, page, log) - async for result in generator: - if isinstance(result, str): - yield result - elif result.updated_at < cutoff: - status = getattr(result, "status") - if status == "deleted": - continue + tickets: list[AbbreviatedTicket] = [] - async for child_resource in _fetch_ticket_child_resource(http, subdomain, path, response_model, result.id, log): - yield child_resource - else: - return + while True: + next_page_cursor: str | None = None + async for result in generator: + if isinstance(result, TimestampedResource): + tickets.append(AbbreviatedTicket( + id=result.id, + status=getattr(result, "status"), + updated_at=result.updated_at + )) + elif isinstance(result, str): + next_page_cursor = result + break + + if len(tickets) > 0 and next_page_cursor: + for ticket in tickets: + if ticket.updated_at >= cutoff: + return + if ticket.status == 'deleted': + continue + + async for child_resource in _fetch_ticket_child_resource(http, subdomain, path, response_model, ticket.id, log): + yield child_resource + + yield next_page_cursor + tickets = [] + elif not next_page_cursor: + break async def fetch_audit_logs( diff --git a/source-zendesk-support-native/source_zendesk_support_native/models.py b/source-zendesk-support-native/source_zendesk_support_native/models.py index b0e8910843..50207956cf 100644 --- a/source-zendesk-support-native/source_zendesk_support_native/models.py +++ b/source-zendesk-support-native/source_zendesk_support_native/models.py @@ -188,6 +188,12 @@ class TicketCommentsResponse(IncrementalCursorPaginatedResponse): resources: list[ZendeskResource] = Field(alias="comments") +class AbbreviatedTicket(BaseModel): + id: int + status: str + updated_at: AwareDatetime + + # Resources that are fetched by following the tickets stream & fetching resources for updated tickets in a separate request. # Tuples contain the name, path, and response model for each resource. TICKET_CHILD_RESOURCES: list[tuple[str, str, type[IncrementalCursorPaginatedResponse]]] = [ From e6fea69687ab81b039718d9ac006cbc8501c065e Mon Sep 17 00:00:00 2001 From: Alex Bair Date: Mon, 3 Feb 2025 13:42:35 -0800 Subject: [PATCH 3/4] source-zendesk-support-native: use smaller page size to avoid 504 responses in `ticket_metrics` Side loading metric sets can cause Zendesk's API to take a while to respond. Using the max page size of 1,000 has caused the intermediate routers/gateways to return a 504 response. This commit reduces the page size for `ticket_metrics` to 500 to mitigate these 504 responses. --- .../source_zendesk_support_native/api.py | 20 +++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/source-zendesk-support-native/source_zendesk_support_native/api.py b/source-zendesk-support-native/source_zendesk_support_native/api.py index 56908d4aea..e8f0e8c48a 100644 --- a/source-zendesk-support-native/source_zendesk_support_native/api.py +++ b/source-zendesk-support-native/source_zendesk_support_native/api.py @@ -25,6 +25,7 @@ ) CURSOR_PAGINATION_PAGE_SIZE = 100 +MAX_INCREMENTAL_EXPORT_PAGE_SIZE = 1000 MAX_SATISFACTION_RATINGS_WINDOW_SIZE = timedelta(days=30) # Zendesk errors out if a start or end time parameter is more recent than 60 seconds in the past. TIME_PARAMETER_DELAY = timedelta(seconds=60) @@ -321,7 +322,8 @@ async def _fetch_incremental_cursor_export_resources( start_date: datetime | None, cursor: str | None, log: Logger, - sideload_params: dict[str, str] | None = None + sideload_params: dict[str, str] | None = None, + page_size: int = MAX_INCREMENTAL_EXPORT_PAGE_SIZE, ) -> AsyncGenerator[TimestampedResource | str, None]: url = f"{url_base(subdomain)}/incremental" match name: @@ -334,7 +336,10 @@ async def _fetch_incremental_cursor_export_resources( case _: raise RuntimeError(f"Unknown incremental cursor pagination resource type {name}.") - params: dict[str, str | int] = {} + params: dict[str, str | int] = { + "per_page": page_size, + } + if sideload_params: params.update(sideload_params) @@ -656,7 +661,12 @@ async def fetch_ticket_metrics( "include": "metric_sets" } - generator = _fetch_incremental_cursor_export_resources(http, subdomain, "tickets", start_date, cursor, log, sideload_params) + # Sideloading metric sets can cause Zendesk's API to take a while to respond. If we try to use the max page size, + # Zendesk can take long enough to respond that the intermediate routers/gateways assume return a 504 response. + # The page size used for ticket metrics is reduced to 500 to mitigate these 504 responses. + page_size = 500 + + generator = _fetch_incremental_cursor_export_resources(http, subdomain, "tickets", start_date, cursor, log, sideload_params, page_size) async for result in generator: if isinstance(result, str): @@ -684,7 +694,9 @@ async def backfill_ticket_metrics( "include": "metric_sets" } - generator = _fetch_incremental_cursor_export_resources(http, subdomain, "tickets", start_date, page, log, sideload_params) + page_size = 500 + + generator = _fetch_incremental_cursor_export_resources(http, subdomain, "tickets", start_date, page, log, sideload_params, page_size) async for result in generator: From 501c407a8ad56c05c7dcd03bc57dfb1bdfb294b4 Mon Sep 17 00:00:00 2001 From: Alex Bair Date: Mon, 3 Feb 2025 13:48:00 -0800 Subject: [PATCH 4/4] source-zendesk-support-native: minor renaming --- .../source_zendesk_support_native/api.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/source-zendesk-support-native/source_zendesk_support_native/api.py b/source-zendesk-support-native/source_zendesk_support_native/api.py index e8f0e8c48a..d6659e6841 100644 --- a/source-zendesk-support-native/source_zendesk_support_native/api.py +++ b/source-zendesk-support-native/source_zendesk_support_native/api.py @@ -425,7 +425,7 @@ async def backfill_incremental_cursor_export_resources( return -async def _fetch_ticket_child_resource( +async def _fetch_ticket_child_resources( http: HTTPSession, subdomain: str, path: str, @@ -496,7 +496,7 @@ async def fetch_ticket_child_resources( if ticket.status == 'deleted': continue - async for child_resource in _fetch_ticket_child_resource(http, subdomain, path, response_model, ticket.id, log): + async for child_resource in _fetch_ticket_child_resources(http, subdomain, path, response_model, ticket.id, log): yield child_resource yield (next_page_cursor,) @@ -519,13 +519,13 @@ async def backfill_ticket_child_resources( assert isinstance(page, str) assert isinstance(cutoff, datetime) - generator = _fetch_incremental_cursor_export_resources(http, subdomain, "tickets", start_date, page, log) + tickets_generator = _fetch_incremental_cursor_export_resources(http, subdomain, "tickets", start_date, page, log) tickets: list[AbbreviatedTicket] = [] while True: next_page_cursor: str | None = None - async for result in generator: + async for result in tickets_generator: if isinstance(result, TimestampedResource): tickets.append(AbbreviatedTicket( id=result.id, @@ -543,7 +543,7 @@ async def backfill_ticket_child_resources( if ticket.status == 'deleted': continue - async for child_resource in _fetch_ticket_child_resource(http, subdomain, path, response_model, ticket.id, log): + async for child_resource in _fetch_ticket_child_resources(http, subdomain, path, response_model, ticket.id, log): yield child_resource yield next_page_cursor