From 9328e0360c8862fc8e93ad51eb16e0c276637b1f Mon Sep 17 00:00:00 2001 From: Alex Bair Date: Sat, 22 Feb 2025 11:16:07 -0500 Subject: [PATCH 1/2] source-intercom-native: reduce window size for conversation_parts to a reasonable size In `conversation_parts`, we attempt to only yield parts that have been updated since the previous sweep. Because of this, we can only checkpoint after checking a full date window. If a date window contains a large number of updated conversations, it can take `conversation_parts` more than one day to check all those conversations for updated parts. This has been observed for at least one task; about 4,000 pages of conversations were updated within 1 day (the smallest configurable date window). This commit adds some dynamic reduction logic to size of the date window `conversation_parts` checks in a single `fetch_conversation_parts` invocation. If there are more than 25 pages of conversations, then the date window is split in half. --- .../source_intercom_native/api.py | 53 +++++++++++++++++-- 1 file changed, 49 insertions(+), 4 deletions(-) diff --git a/source-intercom-native/source_intercom_native/api.py b/source-intercom-native/source_intercom_native/api.py index 3afc5f5ee8..c31a094d10 100644 --- a/source-intercom-native/source_intercom_native/api.py +++ b/source-intercom-native/source_intercom_native/api.py @@ -358,6 +358,38 @@ async def fetch_conversations( body = _generate_conversations_or_tickets_search_body(start, end, response.pages.next.starting_after) +async def _determine_window_end( + http: HTTPSession, + start: datetime, + initial_end: datetime, + log: Logger, +) -> datetime: + MAX_PAGE_QUANTITY = 25 + MIN_WINDOW_SIZE = timedelta(seconds=1) + url = f"{API}/conversations/search" + end = min(initial_end, datetime.now(tz=UTC)) + + while True: + body = _generate_conversations_or_tickets_search_body(_dt_to_s(start), _dt_to_s(end)) + response = ConversationsSearchResponse.model_validate_json( + await http.request(log, url, "POST", json=body) + ) + + total_pages = response.pages.total_pages + + if total_pages <= MAX_PAGE_QUANTITY: + return end + + delta = (end - start) / 2 + # Conversation searches do not have subsecond precision, so we cut off any microseconds from the window size. + reduced_window_size = delta - timedelta(microseconds=delta.microseconds) + + if reduced_window_size <= MIN_WINDOW_SIZE: + return start + MIN_WINDOW_SIZE + + end = start + reduced_window_size + + async def fetch_conversations_parts( http: HTTPSession, window_size: int, @@ -365,17 +397,28 @@ async def fetch_conversations_parts( log_cursor: LogCursor, ) -> AsyncGenerator[TimestampedResource | LogCursor, None]: assert isinstance(log_cursor, datetime) + url = f"{API}/conversations/search" start = _dt_to_s(log_cursor) - max_end = _dt_to_s(log_cursor + timedelta(days=window_size)) - end = min(max_end, _dt_to_s(datetime.now(tz=UTC))) + max_end_dt = log_cursor + timedelta(days=window_size) + max_end = _dt_to_s(max_end_dt) + + # Since conversation_parts cannot checkpoint until a complete date window is checked, + # _determine_window_end constricts windows to a size that should complete in a reasonable + # amount of time. + end = _dt_to_s( + await _determine_window_end( + http=http, + start=log_cursor, + initial_end=max_end_dt, + log=log + ) + ) last_seen_ts = start - url = f"{API}/conversations/search" body = _generate_conversations_or_tickets_search_body(start, end) - should_log_progress = _is_large_date_window(start, end) while True: response = ConversationsSearchResponse.model_validate_json( await http.request(log, url, "POST", json=body) @@ -387,6 +430,8 @@ async def fetch_conversations_parts( if total_pages == 0: break + should_log_progress = _is_large_date_window(start, end) or total_pages > 5 + if should_log_progress and _is_page_number_to_log(page_num, total_pages): log.info(f"Processing page {page_num} of {total_pages}.", { 'window_start': _s_to_dt(start), From 21aa3fecf71d27fd9cd01a44ad31187267594f53 Mon Sep 17 00:00:00 2001 From: Alex Bair Date: Mon, 24 Feb 2025 08:57:42 -0500 Subject: [PATCH 2/2] source-intercom-native: remove unneeded counts from `conversations` and `tickets` After the change in commit 7702e444366d42fd45898e84bd354737f7ea35f5 to checkpoint after every page, the counts in `conversations` and `tickets` no longer served any purpose. I missed removing them in that earlier commit, so they're being removed now. --- source-intercom-native/source_intercom_native/api.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/source-intercom-native/source_intercom_native/api.py b/source-intercom-native/source_intercom_native/api.py index c31a094d10..925ba308a0 100644 --- a/source-intercom-native/source_intercom_native/api.py +++ b/source-intercom-native/source_intercom_native/api.py @@ -263,8 +263,6 @@ async def fetch_tickets( url = f"{API}/tickets/search" body = _generate_conversations_or_tickets_search_body(start, end) - count = 0 - while True: response = TicketsSearchResponse.model_validate_json( await http.request(log, url, "POST", json=body) @@ -294,7 +292,6 @@ async def fetch_tickets( if ticket.updated_at > start: yield ticket - count += 1 if response.pages.next is None: yield _s_to_dt(last_seen_ts) @@ -318,8 +315,6 @@ async def fetch_conversations( url = f"{API}/conversations/search" body = _generate_conversations_or_tickets_search_body(start, end) - count = 0 - while True: response = ConversationsSearchResponse.model_validate_json( await http.request(log, url, "POST", json=body) @@ -349,7 +344,6 @@ async def fetch_conversations( if conversation.updated_at > start: yield conversation - count += 1 if response.pages.next is None: yield _s_to_dt(last_seen_ts)