Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

source-zendesk-support-native: misc fixes #2329

Merged
merged 4 commits into from
Feb 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 77 additions & 26 deletions source-zendesk-support-native/source_zendesk_support_native/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
FullRefreshCursorPaginatedResponse,
ZendeskResource,
TimestampedResource,
AbbreviatedTicket,
IncrementalCursorExportResponse,
TicketsResponse,
UsersResponse,
Expand All @@ -24,6 +25,7 @@
)

CURSOR_PAGINATION_PAGE_SIZE = 100
MAX_INCREMENTAL_EXPORT_PAGE_SIZE = 1000
MAX_SATISFACTION_RATINGS_WINDOW_SIZE = timedelta(days=30)
# Zendesk errors out if a start or end time parameter is more recent than 60 seconds in the past.
TIME_PARAMETER_DELAY = timedelta(seconds=60)
Expand Down Expand Up @@ -320,7 +322,8 @@ async def _fetch_incremental_cursor_export_resources(
start_date: datetime | None,
cursor: str | None,
log: Logger,
sideload_params: dict[str, str] | None = None
sideload_params: dict[str, str] | None = None,
page_size: int = MAX_INCREMENTAL_EXPORT_PAGE_SIZE,
) -> AsyncGenerator[TimestampedResource | str, None]:
url = f"{url_base(subdomain)}/incremental"
match name:
Expand All @@ -333,7 +336,10 @@ async def _fetch_incremental_cursor_export_resources(
case _:
raise RuntimeError(f"Unknown incremental cursor pagination resource type {name}.")

params: dict[str, str | int] = {}
params: dict[str, str | int] = {
"per_page": page_size,
}

if sideload_params:
params.update(sideload_params)

Expand Down Expand Up @@ -419,7 +425,7 @@ async def backfill_incremental_cursor_export_resources(
return


async def _fetch_ticket_child_resource(
async def _fetch_ticket_child_resources(
http: HTTPSession,
subdomain: str,
path: str,
Expand Down Expand Up @@ -466,16 +472,37 @@ async def fetch_ticket_child_resources(

tickets_generator = _fetch_incremental_cursor_export_resources(http, subdomain, "tickets", start_date, cursor, log)

async for result in tickets_generator:
if isinstance(result, str):
yield (result,)
else:
status = getattr(result, "status")
if status == "deleted":
continue
tickets: list[AbbreviatedTicket] = []

async for child_resource in _fetch_ticket_child_resource(http, subdomain, path, response_model, result.id, log):
yield child_resource
while True:
# Fetching comments for each ticket as we're streaming a tickets response often triggers aiohttp's TimeoutError.
# To avoid these TimeoutErrors, we fetch all ticket ids in a single response, then fetch the child resources for
# those ticket ids.

next_page_cursor: str | None = None
async for result in tickets_generator:
if isinstance(result, TimestampedResource):
tickets.append(AbbreviatedTicket(
id=result.id,
status=getattr(result, "status"),
updated_at=result.updated_at
))
elif isinstance(result, str):
next_page_cursor = result
break

if len(tickets) > 0 and next_page_cursor:
for ticket in tickets:
if ticket.status == 'deleted':
continue

async for child_resource in _fetch_ticket_child_resources(http, subdomain, path, response_model, ticket.id, log):
yield child_resource

yield (next_page_cursor,)
tickets = []
elif not next_page_cursor:
break


async def backfill_ticket_child_resources(
Expand All @@ -492,20 +519,37 @@ async def backfill_ticket_child_resources(
assert isinstance(page, str)
assert isinstance(cutoff, datetime)

generator = _fetch_incremental_cursor_export_resources(http, subdomain, "tickets", start_date, page, log)
tickets_generator = _fetch_incremental_cursor_export_resources(http, subdomain, "tickets", start_date, page, log)

async for result in generator:
if isinstance(result, str):
yield result
elif result.updated_at < cutoff:
status = getattr(result, "status")
if status == "deleted":
continue
tickets: list[AbbreviatedTicket] = []

async for child_resource in _fetch_ticket_child_resource(http, subdomain, path, response_model, result.id, log):
yield child_resource
else:
return
while True:
next_page_cursor: str | None = None
async for result in tickets_generator:
if isinstance(result, TimestampedResource):
tickets.append(AbbreviatedTicket(
id=result.id,
status=getattr(result, "status"),
updated_at=result.updated_at
))
elif isinstance(result, str):
next_page_cursor = result
break

if len(tickets) > 0 and next_page_cursor:
for ticket in tickets:
if ticket.updated_at >= cutoff:
return
if ticket.status == 'deleted':
continue

async for child_resource in _fetch_ticket_child_resources(http, subdomain, path, response_model, ticket.id, log):
yield child_resource

yield next_page_cursor
tickets = []
elif not next_page_cursor:
break


async def fetch_audit_logs(
Expand Down Expand Up @@ -617,7 +661,12 @@ async def fetch_ticket_metrics(
"include": "metric_sets"
}

generator = _fetch_incremental_cursor_export_resources(http, subdomain, "tickets", start_date, cursor, log, sideload_params)
# Sideloading metric sets can cause Zendesk's API to take a while to respond. If we try to use the max page size,
# Zendesk can take long enough to respond that the intermediate routers/gateways assume return a 504 response.
# The page size used for ticket metrics is reduced to 500 to mitigate these 504 responses.
page_size = 500

generator = _fetch_incremental_cursor_export_resources(http, subdomain, "tickets", start_date, cursor, log, sideload_params, page_size)

async for result in generator:
if isinstance(result, str):
Expand Down Expand Up @@ -645,7 +694,9 @@ async def backfill_ticket_metrics(
"include": "metric_sets"
}

generator = _fetch_incremental_cursor_export_resources(http, subdomain, "tickets", start_date, page, log, sideload_params)
page_size = 500

generator = _fetch_incremental_cursor_export_resources(http, subdomain, "tickets", start_date, page, log, sideload_params, page_size)

async for result in generator:

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
from pydantic import AfterValidator, AwareDatetime, BaseModel, Field


EPOCH = datetime(1970, 1, 1, tzinfo=UTC)


def urlencode_field(field: str):
return "{{#urlencode}}{{{ " + field + " }}}{{/urlencode}}"

Expand Down Expand Up @@ -88,6 +91,7 @@ class EndpointConfig(BaseModel):
description="UTC date and time in the format YYYY-MM-DDTHH:MM:SSZ. Any data generated before this date will not be replicated. If left blank, the start date will be set to 30 days before the present.",
title="Start Date",
default_factory=default_start_date,
ge=EPOCH,
)
credentials: OAuth2Credentials | ApiToken = Field(
discriminator="credentials_title",
Expand Down Expand Up @@ -184,6 +188,12 @@ class TicketCommentsResponse(IncrementalCursorPaginatedResponse):
resources: list[ZendeskResource] = Field(alias="comments")


class AbbreviatedTicket(BaseModel):
id: int
status: str
updated_at: AwareDatetime


# Resources that are fetched by following the tickets stream & fetching resources for updated tickets in a separate request.
# Tuples contain the name, path, and response model for each resource.
TICKET_CHILD_RESOURCES: list[tuple[str, str, type[IncrementalCursorPaginatedResponse]]] = [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
ResourceState,
TimestampedResource,
ZendeskResource,
EPOCH,
CLIENT_SIDE_FILTERED_CURSOR_PAGINATED_RESOURCES,
FULL_REFRESH_CURSOR_PAGINATED_RESOURCES,
INCREMENTAL_CURSOR_EXPORT_RESOURCES,
Expand Down Expand Up @@ -46,8 +47,6 @@
TIME_PARAMETER_DELAY,
)

EPOCH = datetime(1970, 1, 1, tzinfo=UTC)

async def validate_credentials(
log: Logger, http: HTTPMixin, config: EndpointConfig
):
Expand Down
Loading