Skip to content

Commit

Permalink
source-intercom-native: add config option to use /companies/scroll
Browse files Browse the repository at this point in the history
…endpoint

Intercom has two separate endpoints for retrieving companies:
`/companies/list` (limited to 10k companies) and `/companies/scroll`
(limited to only one "scroll" happening at a time). For users that have
more than 10k companies in their Intercom account, they will need to use
the `/companies/scroll` endpoint to ensure no data is missed.

The `use_companies_list_endpoint` controls which of the two
company-related endpoints are used. Within the connector, an
`asyncio.Lock` is used to avoid attempting concurrent "scrolls" between
`companies` and `company_segments`.

The connector defaults to using `/companies/scroll`, preferring to
capture all data instead of potentially missing data if the user has >10k companies.

It may be worthwhile to remove the option to use `/companies/list` at a
later date. The main reasons I anticipate users would check the
`user_companies_list_endpoint` option are:
- they have another application using `/companies/scroll` that they
    can't disable (not sure how common that is)
- they want the slight speed boost in `companies` and `company_segments`
    (which seems relatively negligible with the testing I've done)
  • Loading branch information
Alex-Bair committed Jan 9, 2025
1 parent 1bbcc70 commit 04590ba
Show file tree
Hide file tree
Showing 4 changed files with 153 additions and 60 deletions.
138 changes: 79 additions & 59 deletions source-intercom-native/source_intercom_native/api.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
from datetime import datetime, UTC, timedelta
import json
from logging import Logger
Expand All @@ -16,6 +17,7 @@
ConversationResponse,
SegmentsResponse,
CompanyListResponse,
CompanyScrollResponse,
CompanySegmentsResponse,
)

Expand All @@ -25,6 +27,9 @@
COMPANIES_LIST_LIMIT = 10_000

COMPANIES_LIST_LIMIT_REACHED_REGEX = r"page limit reached, please use scroll API"
COMPANIES_SCROLL_IN_USE_BY_OTHER_APPLICATION_REGEX = r"scroll already exists for this workspace"

companies_scroll_lock = asyncio.Lock()

def _dt_to_s(dt: datetime) -> int:
return int(dt.timestamp())
Expand Down Expand Up @@ -379,18 +384,11 @@ async def fetch_segments(
yield _s_to_dt(last_seen_ts)


async def fetch_companies(
async def _list_companies(
http: HTTPSession,
log: Logger,
log_cursor: LogCursor,
) -> AsyncGenerator[TimestampedResource | LogCursor, None]:
assert isinstance(log_cursor, datetime)

log_cursor_ts = _dt_to_s(log_cursor)
last_seen_ts = log_cursor_ts

) -> AsyncGenerator[TimestampedResource, None]:
url = f"{API}/companies/list"

current_page = 1
params = {
"per_page": 60,
Expand All @@ -406,8 +404,6 @@ async def fetch_companies(
)
except HTTPError as err:
# End pagination and checkpoint any documents if we hit the limit for the /companies/list endpoint.
# If support for the /companies/scroll endpoint is added, we can re-evaluate whether to break or
# fail here & tell users to use the /companies/scroll endpoint option.
if err.code == 400 and bool(re.search(COMPANIES_LIST_LIMIT_REACHED_REGEX, err.message, re.DOTALL)):
break
else:
Expand All @@ -416,21 +412,71 @@ async def fetch_companies(
if not exceeds_list_limit and response.total_count > COMPANIES_LIST_LIMIT:
log.warning(f"{response.total_count} companies found."
" This is greater than the maximum number of companies returned by the /companies/list endpoint, and the connector could be missing data."
f" Contact Estuary support to request this stream use the /companies/scroll endpoint to retrieve more than {COMPANIES_LIST_LIMIT} companies.")
f" Consider configuring the connector to use the /companies/scroll endpoint to retrieve more than {COMPANIES_LIST_LIMIT} companies.")
exceeds_list_limit = True

for company in response.data:
if company.updated_at > last_seen_ts:
last_seen_ts = company.updated_at
if company.updated_at > log_cursor_ts:
yield company
yield company

if current_page >= response.pages.total_pages:
break

current_page += 1
params['page'] = current_page


async def _scroll_companies(
http: HTTPSession,
log: Logger,
) -> AsyncGenerator[TimestampedResource, None]:
url = f"{API}/companies/scroll"
params = {}

# The Intercom API only lets a single "scroll" through /companies/scroll happen at a time,
# and the companies_scroll_lock prevents the connector from attempting multiple concurrent "scrolls".
async with companies_scroll_lock:
while True:
try:
response = CompanyScrollResponse.model_validate_json(
await http.request(log, url, method="GET", params=params)
)
except HTTPError as err:
if err.code == 400 and bool(re.search(COMPANIES_SCROLL_IN_USE_BY_OTHER_APPLICATION_REGEX, err.message, re.DOTALL)):
log.fatal(
"Unable to access the /companies/scroll endpoint because it's in use by a different application."
" Please ensure no other application is using the /companies/scroll endpoint or "
" configure the connector to use the alternative /companies/list endpoint."
)
raise

if len(response.data) == 0:
break

for company in response.data:
yield company

params['scroll_param'] = response.scroll_param


async def fetch_companies(
http: HTTPSession,
use_list_endpoint: bool,
log: Logger,
log_cursor: LogCursor,
) -> AsyncGenerator[TimestampedResource | LogCursor, None]:
assert isinstance(log_cursor, datetime)

log_cursor_ts = _dt_to_s(log_cursor)
last_seen_ts = log_cursor_ts

companies_func = _list_companies if use_list_endpoint else _scroll_companies

async for company in companies_func(http, log):
if company.updated_at > last_seen_ts:
last_seen_ts = company.updated_at
if company.updated_at > log_cursor_ts:
yield company

# Results are not returned sorted by a timestamp field,
# so we can't yield a cursor until pagination is complete.
if last_seen_ts > log_cursor_ts:
Expand All @@ -439,6 +485,7 @@ async def fetch_companies(

async def fetch_company_segments(
http: HTTPSession,
use_list_endpoint: bool,
log: Logger,
log_cursor: LogCursor,
) -> AsyncGenerator[TimestampedResource | LogCursor, None]:
Expand All @@ -447,54 +494,27 @@ async def fetch_company_segments(
log_cursor_ts = _dt_to_s(log_cursor)
last_seen_ts = log_cursor_ts

url = f"{API}/companies/list"

current_page = 1
params = {
"per_page": 60,
"page": current_page
}

exceeds_list_limit = False

while True:
try:
response = CompanyListResponse.model_validate_json(
await http.request(log, url, method="POST", params=params)
)
except HTTPError as err:
# End pagination and checkpoint any documents if we hit the limit for the /companies/list endpoint.
# If support for the /companies/scroll endpoint is added, we can re-evaluate whether to break or
# fail here & tell users to use the /companies/scroll endpoint option.
if err.code == 400 and bool(re.search(COMPANIES_LIST_LIMIT_REACHED_REGEX, err.message, re.DOTALL)):
break
else:
raise

if not exceeds_list_limit and response.total_count > COMPANIES_LIST_LIMIT:
log.warning(f"{response.total_count} companies found."
" This is greater than the maximum number of companies returned by the /companies/list endpoint, and the connector could be missing data."
f" Contact Estuary support to request this stream use the /companies/scroll endpoint to retrieve more than {COMPANIES_LIST_LIMIT} companies.")
exceeds_list_limit = True
company_ids: list[str] = []

for company in response.data:
segments_url = f"{API}/companies/{company.id}/segments"
companies_func = _list_companies if use_list_endpoint else _scroll_companies

company_segments = CompanySegmentsResponse.model_validate_json(
await http.request(log, segments_url)
)
# Fetch & buffer company ids to avoid using the /companies/scroll endpoint for longer than necessary and
# avoid exceeding the one minute timeout for a single "scroll" if we were to fetch segments while scrolling.
async for company in companies_func(http, log):
company_ids.append(company.id)

for segment in company_segments.data:
if segment.updated_at > last_seen_ts:
last_seen_ts = segment.updated_at
if segment.updated_at > log_cursor_ts:
yield segment
for id in company_ids:
segments_url = f"{API}/companies/{id}/segments"

if current_page >= response.pages.total_pages:
break
company_segments = CompanySegmentsResponse.model_validate_json(
await http.request(log, segments_url)
)

current_page += 1
params['page'] = current_page
for segment in company_segments.data:
if segment.updated_at > last_seen_ts:
last_seen_ts = segment.updated_at
if segment.updated_at > log_cursor_ts:
yield segment

# Results are not returned sorted by a timestamp field,
# so we can't yield a cursor until pagination is complete.
Expand Down
15 changes: 15 additions & 0 deletions source-intercom-native/source_intercom_native/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,11 @@ class Advanced(BaseModel):
default=5,
gt=0,
)]
use_companies_list_endpoint: bool = Field(
description="If selected, the /companies/list endpoint is used instead of the /companies/scroll endpoint. Typically, leave as the default unless the connector's logs indicate otherwise.",
title="Use /companies/list endpoint",
default=False,
)

advanced: Advanced = Field(
default_factory=Advanced, #type: ignore
Expand Down Expand Up @@ -156,11 +161,21 @@ class Pagination(BaseModel, extra="forbid"):
pages: Pagination


class CompanyScrollResponse(BaseModel, extra="allow"):
data: list[TimestampedResource]
scroll_param: str


ClientSideFilteringResourceFetchChangesFn = Callable[
[HTTPSession, Logger, LogCursor],
AsyncGenerator[TimestampedResource | LogCursor, None],
]

CompanyResourceFetchChangesFn = Callable[
[HTTPSession, bool, Logger, LogCursor],
AsyncGenerator[TimestampedResource | LogCursor, None],
]

IncrementalDateWindowResourceFetchChangesFn = Callable[
[HTTPSession, int, Logger, LogCursor],
AsyncGenerator[TimestampedResource | LogCursor, None],
Expand Down
54 changes: 53 additions & 1 deletion source-intercom-native/source_intercom_native/resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
IntercomResource,
TimestampedResource,
ClientSideFilteringResourceFetchChangesFn,
CompanyResourceFetchChangesFn,
IncrementalDateWindowResourceFetchChangesFn,
OAUTH2_SPEC,
)
Expand Down Expand Up @@ -51,8 +52,13 @@
# Each tuple contains the resource's name and its fetch function.
CLIENT_SIDE_FILTERED_RESOURCES: list[tuple[str, ClientSideFilteringResourceFetchChangesFn]] = [
("segments", fetch_segments),
]

# Company-related resources. These are also filtered on the client side, but require
# an additional config setting to determine which endpoint to use.
COMPANY_RESOURCES: list[tuple[str, CompanyResourceFetchChangesFn]] = [
("companies", fetch_companies),
("company_segments", fetch_company_segments)
("company_segments", fetch_company_segments),
]


Expand Down Expand Up @@ -249,6 +255,51 @@ def open(
return resources


def company_resources(
log: Logger, http: HTTPMixin, config: EndpointConfig
) -> list[common.Resource]:

def open(
fetch_fn: CompanyResourceFetchChangesFn,
binding: CaptureBinding[ResourceConfig],
binding_index: int,
state: ResourceState,
task: Task,
all_bindings
):
common.open_binding(
binding,
binding_index,
state,
task,
fetch_changes=functools.partial(
fetch_fn,
http,
config.advanced.use_companies_list_endpoint,
)
)

resources = [
common.Resource(
name=name,
key=["/id"],
model=TimestampedResource,
open=functools.partial(open, fetch_fn),
initial_state=ResourceState(
inc=ResourceState.Incremental(cursor=config.start_date),
),
initial_config=ResourceConfig(
name=name, interval=timedelta(minutes=5)
),
schema_inference=True,
)
for (name, fetch_fn) in COMPANY_RESOURCES
]

return resources



async def all_resources(
log: Logger, http: HTTPMixin, config: EndpointConfig
) -> list[common.Resource]:
Expand All @@ -259,4 +310,5 @@ async def all_resources(
*incremental_date_window_resources(log, http, config),
conversations(log, http, config),
*client_side_filtered_resources(log, http, config),
*company_resources(log, http, config),
]
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,12 @@
"exclusiveMinimum": 0,
"title": "Window Size",
"type": "integer"
},
"use_companies_list_endpoint": {
"default": false,
"description": "If selected, the /companies/list endpoint is used instead of the /companies/scroll endpoint. Typically, leave as the default unless the connector's logs indicate otherwise.",
"title": "Use /companies/list endpoint",
"type": "boolean"
}
},
"title": "Advanced",
Expand Down

0 comments on commit 04590ba

Please sign in to comment.