From 04590ba59886956a33baf72eb7a0916fcb390811 Mon Sep 17 00:00:00 2001 From: Alex Bair Date: Wed, 8 Jan 2025 18:18:58 -0500 Subject: [PATCH] source-intercom-native: add config option to use `/companies/scroll` endpoint Intercom has two separate endpoints for retrieving companies: `/companies/list` (limited to 10k companies) and `/companies/scroll` (limited to only one "scroll" happening at a time). For users that have more than 10k companies in their Intercom account, they will need to use the `/companies/scroll` endpoint to ensure no data is missed. The `use_companies_list_endpoint` controls which of the two company-related endpoints are used. Within the connector, an `asyncio.Lock` is used to avoid attempting concurrent "scrolls" between `companies` and `company_segments`. The connector defaults to using `/companies/scroll`, preferring to capture all data instead of potentially missing data if the user has >10k companies. It may be worthwhile to remove the option to use `/companies/list` at a later date. The main reasons I anticipate users would check the `user_companies_list_endpoint` option are: - they have another application using `/companies/scroll` that they can't disable (not sure how common that is) - they want the slight speed boost in `companies` and `company_segments` (which seems relatively negligible with the testing I've done) --- .../source_intercom_native/api.py | 138 ++++++++++-------- .../source_intercom_native/models.py | 15 ++ .../source_intercom_native/resources.py | 54 ++++++- .../snapshots__spec__capture.stdout.json | 6 + 4 files changed, 153 insertions(+), 60 deletions(-) diff --git a/source-intercom-native/source_intercom_native/api.py b/source-intercom-native/source_intercom_native/api.py index b32f7dfe09..57a598a79e 100644 --- a/source-intercom-native/source_intercom_native/api.py +++ b/source-intercom-native/source_intercom_native/api.py @@ -1,3 +1,4 @@ +import asyncio from datetime import datetime, UTC, timedelta import json from logging import Logger @@ -16,6 +17,7 @@ ConversationResponse, SegmentsResponse, CompanyListResponse, + CompanyScrollResponse, CompanySegmentsResponse, ) @@ -25,6 +27,9 @@ COMPANIES_LIST_LIMIT = 10_000 COMPANIES_LIST_LIMIT_REACHED_REGEX = r"page limit reached, please use scroll API" +COMPANIES_SCROLL_IN_USE_BY_OTHER_APPLICATION_REGEX = r"scroll already exists for this workspace" + +companies_scroll_lock = asyncio.Lock() def _dt_to_s(dt: datetime) -> int: return int(dt.timestamp()) @@ -379,18 +384,11 @@ async def fetch_segments( yield _s_to_dt(last_seen_ts) -async def fetch_companies( +async def _list_companies( http: HTTPSession, log: Logger, - log_cursor: LogCursor, -) -> AsyncGenerator[TimestampedResource | LogCursor, None]: - assert isinstance(log_cursor, datetime) - - log_cursor_ts = _dt_to_s(log_cursor) - last_seen_ts = log_cursor_ts - +) -> AsyncGenerator[TimestampedResource, None]: url = f"{API}/companies/list" - current_page = 1 params = { "per_page": 60, @@ -406,8 +404,6 @@ async def fetch_companies( ) except HTTPError as err: # End pagination and checkpoint any documents if we hit the limit for the /companies/list endpoint. - # If support for the /companies/scroll endpoint is added, we can re-evaluate whether to break or - # fail here & tell users to use the /companies/scroll endpoint option. if err.code == 400 and bool(re.search(COMPANIES_LIST_LIMIT_REACHED_REGEX, err.message, re.DOTALL)): break else: @@ -416,14 +412,11 @@ async def fetch_companies( if not exceeds_list_limit and response.total_count > COMPANIES_LIST_LIMIT: log.warning(f"{response.total_count} companies found." " This is greater than the maximum number of companies returned by the /companies/list endpoint, and the connector could be missing data." - f" Contact Estuary support to request this stream use the /companies/scroll endpoint to retrieve more than {COMPANIES_LIST_LIMIT} companies.") + f" Consider configuring the connector to use the /companies/scroll endpoint to retrieve more than {COMPANIES_LIST_LIMIT} companies.") exceeds_list_limit = True for company in response.data: - if company.updated_at > last_seen_ts: - last_seen_ts = company.updated_at - if company.updated_at > log_cursor_ts: - yield company + yield company if current_page >= response.pages.total_pages: break @@ -431,6 +424,59 @@ async def fetch_companies( current_page += 1 params['page'] = current_page + +async def _scroll_companies( + http: HTTPSession, + log: Logger, +) -> AsyncGenerator[TimestampedResource, None]: + url = f"{API}/companies/scroll" + params = {} + + # The Intercom API only lets a single "scroll" through /companies/scroll happen at a time, + # and the companies_scroll_lock prevents the connector from attempting multiple concurrent "scrolls". + async with companies_scroll_lock: + while True: + try: + response = CompanyScrollResponse.model_validate_json( + await http.request(log, url, method="GET", params=params) + ) + except HTTPError as err: + if err.code == 400 and bool(re.search(COMPANIES_SCROLL_IN_USE_BY_OTHER_APPLICATION_REGEX, err.message, re.DOTALL)): + log.fatal( + "Unable to access the /companies/scroll endpoint because it's in use by a different application." + " Please ensure no other application is using the /companies/scroll endpoint or " + " configure the connector to use the alternative /companies/list endpoint." + ) + raise + + if len(response.data) == 0: + break + + for company in response.data: + yield company + + params['scroll_param'] = response.scroll_param + + +async def fetch_companies( + http: HTTPSession, + use_list_endpoint: bool, + log: Logger, + log_cursor: LogCursor, +) -> AsyncGenerator[TimestampedResource | LogCursor, None]: + assert isinstance(log_cursor, datetime) + + log_cursor_ts = _dt_to_s(log_cursor) + last_seen_ts = log_cursor_ts + + companies_func = _list_companies if use_list_endpoint else _scroll_companies + + async for company in companies_func(http, log): + if company.updated_at > last_seen_ts: + last_seen_ts = company.updated_at + if company.updated_at > log_cursor_ts: + yield company + # Results are not returned sorted by a timestamp field, # so we can't yield a cursor until pagination is complete. if last_seen_ts > log_cursor_ts: @@ -439,6 +485,7 @@ async def fetch_companies( async def fetch_company_segments( http: HTTPSession, + use_list_endpoint: bool, log: Logger, log_cursor: LogCursor, ) -> AsyncGenerator[TimestampedResource | LogCursor, None]: @@ -447,54 +494,27 @@ async def fetch_company_segments( log_cursor_ts = _dt_to_s(log_cursor) last_seen_ts = log_cursor_ts - url = f"{API}/companies/list" - - current_page = 1 - params = { - "per_page": 60, - "page": current_page - } - - exceeds_list_limit = False - - while True: - try: - response = CompanyListResponse.model_validate_json( - await http.request(log, url, method="POST", params=params) - ) - except HTTPError as err: - # End pagination and checkpoint any documents if we hit the limit for the /companies/list endpoint. - # If support for the /companies/scroll endpoint is added, we can re-evaluate whether to break or - # fail here & tell users to use the /companies/scroll endpoint option. - if err.code == 400 and bool(re.search(COMPANIES_LIST_LIMIT_REACHED_REGEX, err.message, re.DOTALL)): - break - else: - raise - - if not exceeds_list_limit and response.total_count > COMPANIES_LIST_LIMIT: - log.warning(f"{response.total_count} companies found." - " This is greater than the maximum number of companies returned by the /companies/list endpoint, and the connector could be missing data." - f" Contact Estuary support to request this stream use the /companies/scroll endpoint to retrieve more than {COMPANIES_LIST_LIMIT} companies.") - exceeds_list_limit = True + company_ids: list[str] = [] - for company in response.data: - segments_url = f"{API}/companies/{company.id}/segments" + companies_func = _list_companies if use_list_endpoint else _scroll_companies - company_segments = CompanySegmentsResponse.model_validate_json( - await http.request(log, segments_url) - ) + # Fetch & buffer company ids to avoid using the /companies/scroll endpoint for longer than necessary and + # avoid exceeding the one minute timeout for a single "scroll" if we were to fetch segments while scrolling. + async for company in companies_func(http, log): + company_ids.append(company.id) - for segment in company_segments.data: - if segment.updated_at > last_seen_ts: - last_seen_ts = segment.updated_at - if segment.updated_at > log_cursor_ts: - yield segment + for id in company_ids: + segments_url = f"{API}/companies/{id}/segments" - if current_page >= response.pages.total_pages: - break + company_segments = CompanySegmentsResponse.model_validate_json( + await http.request(log, segments_url) + ) - current_page += 1 - params['page'] = current_page + for segment in company_segments.data: + if segment.updated_at > last_seen_ts: + last_seen_ts = segment.updated_at + if segment.updated_at > log_cursor_ts: + yield segment # Results are not returned sorted by a timestamp field, # so we can't yield a cursor until pagination is complete. diff --git a/source-intercom-native/source_intercom_native/models.py b/source-intercom-native/source_intercom_native/models.py index 49690c751e..ee1625ae02 100644 --- a/source-intercom-native/source_intercom_native/models.py +++ b/source-intercom-native/source_intercom_native/models.py @@ -67,6 +67,11 @@ class Advanced(BaseModel): default=5, gt=0, )] + use_companies_list_endpoint: bool = Field( + description="If selected, the /companies/list endpoint is used instead of the /companies/scroll endpoint. Typically, leave as the default unless the connector's logs indicate otherwise.", + title="Use /companies/list endpoint", + default=False, + ) advanced: Advanced = Field( default_factory=Advanced, #type: ignore @@ -156,11 +161,21 @@ class Pagination(BaseModel, extra="forbid"): pages: Pagination +class CompanyScrollResponse(BaseModel, extra="allow"): + data: list[TimestampedResource] + scroll_param: str + + ClientSideFilteringResourceFetchChangesFn = Callable[ [HTTPSession, Logger, LogCursor], AsyncGenerator[TimestampedResource | LogCursor, None], ] +CompanyResourceFetchChangesFn = Callable[ + [HTTPSession, bool, Logger, LogCursor], + AsyncGenerator[TimestampedResource | LogCursor, None], +] + IncrementalDateWindowResourceFetchChangesFn = Callable[ [HTTPSession, int, Logger, LogCursor], AsyncGenerator[TimestampedResource | LogCursor, None], diff --git a/source-intercom-native/source_intercom_native/resources.py b/source-intercom-native/source_intercom_native/resources.py index d378fe2e1a..b5f7b328ef 100644 --- a/source-intercom-native/source_intercom_native/resources.py +++ b/source-intercom-native/source_intercom_native/resources.py @@ -13,6 +13,7 @@ IntercomResource, TimestampedResource, ClientSideFilteringResourceFetchChangesFn, + CompanyResourceFetchChangesFn, IncrementalDateWindowResourceFetchChangesFn, OAUTH2_SPEC, ) @@ -51,8 +52,13 @@ # Each tuple contains the resource's name and its fetch function. CLIENT_SIDE_FILTERED_RESOURCES: list[tuple[str, ClientSideFilteringResourceFetchChangesFn]] = [ ("segments", fetch_segments), +] + +# Company-related resources. These are also filtered on the client side, but require +# an additional config setting to determine which endpoint to use. +COMPANY_RESOURCES: list[tuple[str, CompanyResourceFetchChangesFn]] = [ ("companies", fetch_companies), - ("company_segments", fetch_company_segments) + ("company_segments", fetch_company_segments), ] @@ -249,6 +255,51 @@ def open( return resources +def company_resources( + log: Logger, http: HTTPMixin, config: EndpointConfig +) -> list[common.Resource]: + + def open( + fetch_fn: CompanyResourceFetchChangesFn, + binding: CaptureBinding[ResourceConfig], + binding_index: int, + state: ResourceState, + task: Task, + all_bindings + ): + common.open_binding( + binding, + binding_index, + state, + task, + fetch_changes=functools.partial( + fetch_fn, + http, + config.advanced.use_companies_list_endpoint, + ) + ) + + resources = [ + common.Resource( + name=name, + key=["/id"], + model=TimestampedResource, + open=functools.partial(open, fetch_fn), + initial_state=ResourceState( + inc=ResourceState.Incremental(cursor=config.start_date), + ), + initial_config=ResourceConfig( + name=name, interval=timedelta(minutes=5) + ), + schema_inference=True, + ) + for (name, fetch_fn) in COMPANY_RESOURCES + ] + + return resources + + + async def all_resources( log: Logger, http: HTTPMixin, config: EndpointConfig ) -> list[common.Resource]: @@ -259,4 +310,5 @@ async def all_resources( *incremental_date_window_resources(log, http, config), conversations(log, http, config), *client_side_filtered_resources(log, http, config), + *company_resources(log, http, config), ] diff --git a/source-intercom-native/tests/snapshots/snapshots__spec__capture.stdout.json b/source-intercom-native/tests/snapshots/snapshots__spec__capture.stdout.json index ef7a6e5f18..6a26195692 100644 --- a/source-intercom-native/tests/snapshots/snapshots__spec__capture.stdout.json +++ b/source-intercom-native/tests/snapshots/snapshots__spec__capture.stdout.json @@ -31,6 +31,12 @@ "exclusiveMinimum": 0, "title": "Window Size", "type": "integer" + }, + "use_companies_list_endpoint": { + "default": false, + "description": "If selected, the /companies/list endpoint is used instead of the /companies/scroll endpoint. Typically, leave as the default unless the connector's logs indicate otherwise.", + "title": "Use /companies/list endpoint", + "type": "boolean" } }, "title": "Advanced",