Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

source-intercom-native: dynamically reduce conversation_parts window size #2439

Merged
merged 2 commits into from
Feb 24, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 49 additions & 10 deletions source-intercom-native/source_intercom_native/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -263,8 +263,6 @@ async def fetch_tickets(
url = f"{API}/tickets/search"
body = _generate_conversations_or_tickets_search_body(start, end)

count = 0

while True:
response = TicketsSearchResponse.model_validate_json(
await http.request(log, url, "POST", json=body)
Expand Down Expand Up @@ -294,7 +292,6 @@ async def fetch_tickets(

if ticket.updated_at > start:
yield ticket
count += 1

if response.pages.next is None:
yield _s_to_dt(last_seen_ts)
Expand All @@ -318,8 +315,6 @@ async def fetch_conversations(
url = f"{API}/conversations/search"
body = _generate_conversations_or_tickets_search_body(start, end)

count = 0

while True:
response = ConversationsSearchResponse.model_validate_json(
await http.request(log, url, "POST", json=body)
Expand Down Expand Up @@ -349,7 +344,6 @@ async def fetch_conversations(

if conversation.updated_at > start:
yield conversation
count += 1

if response.pages.next is None:
yield _s_to_dt(last_seen_ts)
Expand All @@ -358,24 +352,67 @@ async def fetch_conversations(
body = _generate_conversations_or_tickets_search_body(start, end, response.pages.next.starting_after)


async def _determine_window_end(
http: HTTPSession,
start: datetime,
initial_end: datetime,
log: Logger,
) -> datetime:
MAX_PAGE_QUANTITY = 25
MIN_WINDOW_SIZE = timedelta(seconds=1)
url = f"{API}/conversations/search"
end = min(initial_end, datetime.now(tz=UTC))

while True:
body = _generate_conversations_or_tickets_search_body(_dt_to_s(start), _dt_to_s(end))
response = ConversationsSearchResponse.model_validate_json(
await http.request(log, url, "POST", json=body)
)

total_pages = response.pages.total_pages

if total_pages <= MAX_PAGE_QUANTITY:
return end

delta = (end - start) / 2
# Conversation searches do not have subsecond precision, so we cut off any microseconds from the window size.
reduced_window_size = delta - timedelta(microseconds=delta.microseconds)

if reduced_window_size <= MIN_WINDOW_SIZE:
return start + MIN_WINDOW_SIZE

end = start + reduced_window_size


async def fetch_conversations_parts(
http: HTTPSession,
window_size: int,
log: Logger,
log_cursor: LogCursor,
) -> AsyncGenerator[TimestampedResource | LogCursor, None]:
assert isinstance(log_cursor, datetime)
url = f"{API}/conversations/search"

start = _dt_to_s(log_cursor)
max_end = _dt_to_s(log_cursor + timedelta(days=window_size))
end = min(max_end, _dt_to_s(datetime.now(tz=UTC)))
max_end_dt = log_cursor + timedelta(days=window_size)
max_end = _dt_to_s(max_end_dt)

# Since conversation_parts cannot checkpoint until a complete date window is checked,
# _determine_window_end constricts windows to a size that should complete in a reasonable
# amount of time.
end = _dt_to_s(
await _determine_window_end(
http=http,
start=log_cursor,
initial_end=max_end_dt,
log=log
)
)

last_seen_ts = start

url = f"{API}/conversations/search"
body = _generate_conversations_or_tickets_search_body(start, end)

should_log_progress = _is_large_date_window(start, end)
while True:
response = ConversationsSearchResponse.model_validate_json(
await http.request(log, url, "POST", json=body)
Expand All @@ -387,6 +424,8 @@ async def fetch_conversations_parts(
if total_pages == 0:
break

should_log_progress = _is_large_date_window(start, end) or total_pages > 5

if should_log_progress and _is_page_number_to_log(page_num, total_pages):
log.info(f"Processing page {page_num} of {total_pages}.", {
'window_start': _s_to_dt(start),
Expand Down
Loading