diff --git a/source-intercom-native/source_intercom_native/api.py b/source-intercom-native/source_intercom_native/api.py index 3afc5f5ee8..925ba308a0 100644 --- a/source-intercom-native/source_intercom_native/api.py +++ b/source-intercom-native/source_intercom_native/api.py @@ -263,8 +263,6 @@ async def fetch_tickets( url = f"{API}/tickets/search" body = _generate_conversations_or_tickets_search_body(start, end) - count = 0 - while True: response = TicketsSearchResponse.model_validate_json( await http.request(log, url, "POST", json=body) @@ -294,7 +292,6 @@ async def fetch_tickets( if ticket.updated_at > start: yield ticket - count += 1 if response.pages.next is None: yield _s_to_dt(last_seen_ts) @@ -318,8 +315,6 @@ async def fetch_conversations( url = f"{API}/conversations/search" body = _generate_conversations_or_tickets_search_body(start, end) - count = 0 - while True: response = ConversationsSearchResponse.model_validate_json( await http.request(log, url, "POST", json=body) @@ -349,7 +344,6 @@ async def fetch_conversations( if conversation.updated_at > start: yield conversation - count += 1 if response.pages.next is None: yield _s_to_dt(last_seen_ts) @@ -358,6 +352,38 @@ async def fetch_conversations( body = _generate_conversations_or_tickets_search_body(start, end, response.pages.next.starting_after) +async def _determine_window_end( + http: HTTPSession, + start: datetime, + initial_end: datetime, + log: Logger, +) -> datetime: + MAX_PAGE_QUANTITY = 25 + MIN_WINDOW_SIZE = timedelta(seconds=1) + url = f"{API}/conversations/search" + end = min(initial_end, datetime.now(tz=UTC)) + + while True: + body = _generate_conversations_or_tickets_search_body(_dt_to_s(start), _dt_to_s(end)) + response = ConversationsSearchResponse.model_validate_json( + await http.request(log, url, "POST", json=body) + ) + + total_pages = response.pages.total_pages + + if total_pages <= MAX_PAGE_QUANTITY: + return end + + delta = (end - start) / 2 + # Conversation searches do not have subsecond precision, so we cut off any microseconds from the window size. + reduced_window_size = delta - timedelta(microseconds=delta.microseconds) + + if reduced_window_size <= MIN_WINDOW_SIZE: + return start + MIN_WINDOW_SIZE + + end = start + reduced_window_size + + async def fetch_conversations_parts( http: HTTPSession, window_size: int, @@ -365,17 +391,28 @@ async def fetch_conversations_parts( log_cursor: LogCursor, ) -> AsyncGenerator[TimestampedResource | LogCursor, None]: assert isinstance(log_cursor, datetime) + url = f"{API}/conversations/search" start = _dt_to_s(log_cursor) - max_end = _dt_to_s(log_cursor + timedelta(days=window_size)) - end = min(max_end, _dt_to_s(datetime.now(tz=UTC))) + max_end_dt = log_cursor + timedelta(days=window_size) + max_end = _dt_to_s(max_end_dt) + + # Since conversation_parts cannot checkpoint until a complete date window is checked, + # _determine_window_end constricts windows to a size that should complete in a reasonable + # amount of time. + end = _dt_to_s( + await _determine_window_end( + http=http, + start=log_cursor, + initial_end=max_end_dt, + log=log + ) + ) last_seen_ts = start - url = f"{API}/conversations/search" body = _generate_conversations_or_tickets_search_body(start, end) - should_log_progress = _is_large_date_window(start, end) while True: response = ConversationsSearchResponse.model_validate_json( await http.request(log, url, "POST", json=body) @@ -387,6 +424,8 @@ async def fetch_conversations_parts( if total_pages == 0: break + should_log_progress = _is_large_date_window(start, end) or total_pages > 5 + if should_log_progress and _is_page_number_to_log(page_num, total_pages): log.info(f"Processing page {page_num} of {total_pages}.", { 'window_start': _s_to_dt(start),