Skip to content

Commit

Permalink
source-hubspot-native: make capturing property history optional
Browse files Browse the repository at this point in the history
The propertiesWithHistory field is often very large, and incurs significant cost
both in terms of data transferred and complexity of the resulting document
schema.

Going forward requesting propertiesWithHistory will be disabled by default, and
can be enabled by opting in to the new configuration option.

This commit adds the configuration option and plumbing to get it where it needs
to be. A follow-up change will enable the configuration option after all
pre-existing tasks have it enabled, so that their behavior is unchanged.
  • Loading branch information
williamhbaker committed Jan 8, 2025
1 parent 29bee35 commit 446c3d0
Show file tree
Hide file tree
Showing 5 changed files with 86 additions and 643 deletions.
98 changes: 54 additions & 44 deletions source-hubspot-native/source_hubspot_native/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ async def fetch_page_with_associations(
# Closed over via functools.partial:
cls: type[CRMObject],
http: HTTPSession,
with_history: bool,
object_name: str,
# Remainder is common.FetchPageFn:
log: Logger,
Expand Down Expand Up @@ -123,10 +124,12 @@ async def fetch_page_with_associations(
property_names = ",".join(props)

input = {
"limit": 50, # Maximum when requesting history.
"limit": 100,
"properties": property_names,
"propertiesWithHistory": property_names,
}
if with_history:
input["propertiesWithHistory"] = property_names
input["limit"] = 50
if len(cls.ASSOCIATED_ENTITIES) > 0:
input["associations"] = ",".join(cls.ASSOCIATED_ENTITIES)
if page:
Expand Down Expand Up @@ -157,7 +160,8 @@ async def fetch_page_with_associations(
output[idx].updatedAt = doc.updatedAt # We'll discard this document per the check a little further down.

output[idx].properties.update(doc.properties)
output[idx].propertiesWithHistory.update(doc.propertiesWithHistory)
if with_history:
output[idx].propertiesWithHistory.update(doc.propertiesWithHistory)

for doc in output:
if doc.updatedAt < cutoff:
Expand All @@ -171,6 +175,7 @@ async def _fetch_batch(
log: Logger,
cls: type[CRMObject],
http: HTTPSession,
with_history: bool,
object_name: str,
ids: Iterable[str],
) -> BatchResult[CRMObject]:
Expand All @@ -182,8 +187,9 @@ async def _fetch_batch(
input = {
"inputs": [{"id": id} for id in ids],
"properties": property_names,
"propertiesWithHistory": property_names,
}
if with_history:
input["propertiesWithHistory"] = property_names

_cls: Any = cls # Silence mypy false-positive.
return BatchResult[_cls].model_validate_json(
Expand Down Expand Up @@ -211,12 +217,13 @@ async def fetch_batch_with_associations(
log: Logger,
cls: type[CRMObject],
http: HTTPSession,
with_history: bool,
object_name: str,
ids: list[str],
) -> BatchResult[CRMObject]:

batch, all_associated = await asyncio.gather(
_fetch_batch(log, cls, http, object_name, ids),
_fetch_batch(log, cls, http, with_history, object_name, ids),
asyncio.gather(
*(
fetch_association(log, cls, http, object_name, ids, e)
Expand All @@ -239,7 +246,7 @@ async def fetch_batch_with_associations(


FetchRecentFn = Callable[
[Logger, HTTPSession, datetime, datetime | None],
[Logger, HTTPSession, bool, datetime, datetime | None],
AsyncGenerator[tuple[datetime, str, Any], None],
]
'''
Expand All @@ -257,7 +264,7 @@ async def fetch_batch_with_associations(
'''

FetchDelayedFn = Callable[
[Logger, HTTPSession, datetime, datetime],
[Logger, HTTPSession, bool, datetime, datetime],
AsyncGenerator[tuple[datetime, str, Any], None],
]
'''
Expand Down Expand Up @@ -313,6 +320,7 @@ async def process_changes(
fetch_recent: FetchRecentFn,
fetch_delayed: FetchDelayedFn,
http: HTTPSession,
with_history: bool,
# Remainder is common.FetchChangesFn:
log: Logger,
log_cursor: LogCursor,
Expand Down Expand Up @@ -359,7 +367,7 @@ async def process_changes(

max_ts: datetime = log_cursor
try:
async for ts, key, obj in fetch_recent(log, http, log_cursor, fetch_recent_end_time):
async for ts, key, obj in fetch_recent(log, http, with_history, log_cursor, fetch_recent_end_time):
if fetch_recent_end_time and ts > fetch_recent_end_time:
continue
elif ts > log_cursor:
Expand Down Expand Up @@ -391,7 +399,7 @@ async def process_changes(
if delayed_fetch_next_end - delayed_fetch_next_start > delayed_fetch_minimum_window:
# Poll the delayed stream for documents if we need to.
try:
async for ts, key, obj in fetch_delayed(log, http, delayed_fetch_next_start, delayed_fetch_next_end):
async for ts, key, obj in fetch_delayed(log, http, with_history, delayed_fetch_next_start, delayed_fetch_next_end):
if ts > delayed_fetch_next_end:
# In case the FetchDelayedFn is unable to filter based on
# `delayed_fetch_next_end`.
Expand Down Expand Up @@ -440,6 +448,7 @@ async def fetch_changes_with_associations(
fetcher: _FetchIdsFn,
log: Logger,
http: HTTPSession,
with_history: bool,
since: datetime,
until: datetime | None,
) -> AsyncGenerator[tuple[datetime, str, CRMObject], None]:
Expand Down Expand Up @@ -479,14 +488,13 @@ async def _do_batch_fetch(batch: list[tuple[datetime, str]]) -> Iterable[tuple[d
dts = {id: dt for dt, id in batch}

documents: BatchResult[CRMObject] = await fetch_batch_with_associations(
log, cls, http, object_name, [id for _, id in batch]
log, cls, http, with_history, object_name, [id for _, id in batch]
)

return ((dts[str(doc.id)], str(doc.id), doc) for doc in documents.results)


async def _batches_gen() -> AsyncGenerator[Awaitable[Iterable[tuple[datetime, str, CRMObject]]], None]:
for batch_it in itertools.batched(recent, 50):
for batch_it in itertools.batched(recent, 50 if with_history else 100):
yield _do_batch_fetch(list(batch_it))

async for res in buffer_ordered(_batches_gen(), 3):
Expand Down Expand Up @@ -728,30 +736,30 @@ async def fetch_search_objects_modified_at(


def fetch_recent_custom_objects(
object_name: str, log: Logger, http: HTTPSession, since: datetime, until: datetime | None
object_name: str, log: Logger, http: HTTPSession, with_history: bool, since: datetime, until: datetime | None
) -> AsyncGenerator[tuple[datetime, str, CustomObject], None]:

async def do_fetch(page: PageCursor, count: int) -> tuple[Iterable[tuple[datetime, str]], PageCursor]:
return await fetch_search_objects(object_name, log, http, since, until, page)

return fetch_changes_with_associations(
object_name, CustomObject, do_fetch, log, http, since, until
object_name, CustomObject, do_fetch, log, http, with_history, since, until
)


def fetch_delayed_custom_objects(
object_name: str, log: Logger, http: HTTPSession, since: datetime, until: datetime
object_name: str, log: Logger, http: HTTPSession, with_history: bool, since: datetime, until: datetime
) -> AsyncGenerator[tuple[datetime, str, CustomObject], None]:

async def do_fetch(page: PageCursor, count: int) -> tuple[Iterable[tuple[datetime, str]], PageCursor]:
return await fetch_search_objects(object_name, log, http, since, until, page)

return fetch_changes_with_associations(
object_name, CustomObject, do_fetch, log, http, since, until
object_name, CustomObject, do_fetch, log, http, with_history, since, until
)

def fetch_recent_companies(
log: Logger, http: HTTPSession, since: datetime, until: datetime | None,
log: Logger, http: HTTPSession, with_history: bool, since: datetime, until: datetime | None,
) -> AsyncGenerator[tuple[datetime, str, Company], None]:

async def do_fetch(page: PageCursor, count: int) -> tuple[Iterable[tuple[datetime, str]], PageCursor]:
Expand All @@ -772,24 +780,24 @@ async def do_fetch(page: PageCursor, count: int) -> tuple[Iterable[tuple[datetim


return fetch_changes_with_associations(
Names.companies, Company, do_fetch, log, http, since, until
Names.companies, Company, do_fetch, log, http, with_history, since, until
)


def fetch_delayed_companies(
log: Logger, http: HTTPSession, since: datetime, until: datetime
log: Logger, http: HTTPSession, with_history: bool, since: datetime, until: datetime
) -> AsyncGenerator[tuple[datetime, str, Company], None]:

async def do_fetch(page: PageCursor, count: int) -> tuple[Iterable[tuple[datetime, str]], PageCursor]:
return await fetch_search_objects(Names.companies, log, http, since, until, page)

return fetch_changes_with_associations(
Names.companies, Company, do_fetch, log, http, since, until
Names.companies, Company, do_fetch, log, http, with_history, since, until
)


def fetch_recent_contacts(
log: Logger, http: HTTPSession, since: datetime, until: datetime | None
log: Logger, http: HTTPSession, with_history: bool, since: datetime, until: datetime | None
) -> AsyncGenerator[tuple[datetime, str, Contact], None]:
async def do_fetch(page: PageCursor, count: int) -> tuple[Iterable[tuple[datetime, str]], PageCursor]:
if count >= 9_900:
Expand All @@ -816,24 +824,24 @@ async def do_fetch(page: PageCursor, count: int) -> tuple[Iterable[tuple[datetim


return fetch_changes_with_associations(
Names.contacts, Contact, do_fetch, log, http, since, until
Names.contacts, Contact, do_fetch, log, http, with_history, since, until
)


def fetch_delayed_contacts(
log: Logger, http: HTTPSession, since: datetime, until: datetime
log: Logger, http: HTTPSession, with_history: bool, since: datetime, until: datetime
) -> AsyncGenerator[tuple[datetime, str, Contact], None]:

async def do_fetch(page: PageCursor, count: int) -> tuple[Iterable[tuple[datetime, str]], PageCursor]:
return await fetch_search_objects(Names.contacts, log, http, since, until, page, "lastmodifieddate")

return fetch_changes_with_associations(
Names.contacts, Contact, do_fetch, log, http, since, until
Names.contacts, Contact, do_fetch, log, http, with_history, since, until
)


def fetch_recent_deals(
log: Logger, http: HTTPSession, since: datetime, until: datetime | None
log: Logger, http: HTTPSession, with_history: bool, since: datetime, until: datetime | None
) -> AsyncGenerator[tuple[datetime, str, Deal], None]:

async def do_fetch(page: PageCursor, count: int) -> tuple[Iterable[tuple[datetime, str]], PageCursor]:
Expand All @@ -855,19 +863,19 @@ async def do_fetch(page: PageCursor, count: int) -> tuple[Iterable[tuple[datetim


return fetch_changes_with_associations(
Names.deals, Deal, do_fetch, log, http, since, until
Names.deals, Deal, do_fetch, log, http, with_history, since, until
)


def fetch_delayed_deals(
log: Logger, http: HTTPSession, since: datetime, until: datetime
log: Logger, http: HTTPSession, with_history: bool, since: datetime, until: datetime
) -> AsyncGenerator[tuple[datetime, str, Deal], None]:

async def do_fetch(page: PageCursor, count: int) -> tuple[Iterable[tuple[datetime, str]], PageCursor]:
return await fetch_search_objects(Names.deals, log, http, since, until, page)

return fetch_changes_with_associations(
Names.deals, Deal, do_fetch, log, http, since, until
Names.deals, Deal, do_fetch, log, http, with_history, since, until
)


Expand Down Expand Up @@ -895,21 +903,22 @@ async def _fetch_engagements(


def fetch_recent_engagements(
log: Logger, http: HTTPSession, since: datetime, until: datetime | None
log: Logger, http: HTTPSession, with_history: bool, since: datetime, until: datetime | None
) -> AsyncGenerator[tuple[datetime, str, Engagement], None]:
return fetch_changes_with_associations(
Names.engagements,
Engagement,
functools.partial(_fetch_engagements, log, http),
log,
http,
with_history,
since,
until,
)


def fetch_delayed_engagements(
log: Logger, http: HTTPSession, since: datetime, until: datetime
log: Logger, http: HTTPSession, with_history: bool, since: datetime, until: datetime
) -> AsyncGenerator[tuple[datetime, str, Engagement], None]:
# There is no way to fetch engagements other than starting with the most
# recent and reading backward, so this is the same process as fetching
Expand All @@ -921,13 +930,14 @@ def fetch_delayed_engagements(
functools.partial(_fetch_engagements, log, http),
log,
http,
with_history,
since,
until,
)


def fetch_recent_tickets(
log: Logger, http: HTTPSession, since: datetime, until: datetime | None
log: Logger, http: HTTPSession, with_history: bool, since: datetime, until: datetime | None
) -> AsyncGenerator[tuple[datetime, str, Ticket], None]:

async def do_fetch(page: PageCursor, count: int) -> tuple[Iterable[tuple[datetime, str]], PageCursor]:
Expand All @@ -944,67 +954,67 @@ async def do_fetch(page: PageCursor, count: int) -> tuple[Iterable[tuple[datetim


return fetch_changes_with_associations(
Names.tickets, Ticket, do_fetch, log, http, since, until
Names.tickets, Ticket, do_fetch, log, http, with_history, since, until
)


def fetch_delayed_tickets(
log: Logger, http: HTTPSession, since: datetime, until: datetime
log: Logger, http: HTTPSession, with_history: bool, since: datetime, until: datetime
) -> AsyncGenerator[tuple[datetime, str, Ticket], None]:

async def do_fetch(page: PageCursor, count: int) -> tuple[Iterable[tuple[datetime, str]], PageCursor]:
return await fetch_search_objects(Names.tickets, log, http, since, until, page)

return fetch_changes_with_associations(
Names.tickets, Ticket, do_fetch, log, http, since, until
Names.tickets, Ticket, do_fetch, log, http, with_history, since, until
)


def fetch_recent_products(
log: Logger, http: HTTPSession, since: datetime, until: datetime | None
log: Logger, http: HTTPSession, with_history: bool, since: datetime, until: datetime | None
) -> AsyncGenerator[tuple[datetime, str, Product], None]:

async def do_fetch(page: PageCursor, count: int) -> tuple[Iterable[tuple[datetime, str]], PageCursor]:
return await fetch_search_objects(Names.products, log, http, since, until, page)

return fetch_changes_with_associations(
Names.products, Product, do_fetch, log, http, since, until
Names.products, Product, do_fetch, log, http, with_history, since, until
)


def fetch_delayed_products(
log: Logger, http: HTTPSession, since: datetime, until: datetime
log: Logger, http: HTTPSession, with_history: bool, since: datetime, until: datetime
) -> AsyncGenerator[tuple[datetime, str, Product], None]:

async def do_fetch(page: PageCursor, count: int) -> tuple[Iterable[tuple[datetime, str]], PageCursor]:
return await fetch_search_objects(Names.products, log, http, since, until, page)

return fetch_changes_with_associations(
Names.products, Product, do_fetch, log, http, since, until
Names.products, Product, do_fetch, log, http, with_history, since, until
)


def fetch_recent_line_items(
log: Logger, http: HTTPSession, since: datetime, until: datetime | None
log: Logger, http: HTTPSession, with_history: bool, since: datetime, until: datetime | None
) -> AsyncGenerator[tuple[datetime, str, LineItem], None]:

async def do_fetch(page: PageCursor, count: int) -> tuple[Iterable[tuple[datetime, str]], PageCursor]:
return await fetch_search_objects(Names.line_items, log, http, since, until, page)

return fetch_changes_with_associations(
Names.line_items, LineItem, do_fetch, log, http, since, until
Names.line_items, LineItem, do_fetch, log, http, with_history, since, until
)


def fetch_delayed_line_items(
log: Logger, http: HTTPSession, since: datetime, until: datetime
log: Logger, http: HTTPSession, with_history: bool, since: datetime, until: datetime
) -> AsyncGenerator[tuple[datetime, str, LineItem], None]:

async def do_fetch(page: PageCursor, count: int) -> tuple[Iterable[tuple[datetime, str]], PageCursor]:
return await fetch_search_objects(Names.line_items, log, http, since, until, page)

return fetch_changes_with_associations(
Names.line_items, LineItem, do_fetch, log, http, since, until
Names.line_items, LineItem, do_fetch, log, http, with_history, since, until
)

async def list_custom_objects(
Expand Down Expand Up @@ -1077,13 +1087,13 @@ async def _fetch_email_events(


def fetch_recent_email_events(
log: Logger, http: HTTPSession, since: datetime, until: datetime | None
log: Logger, http: HTTPSession, _: bool, since: datetime, until: datetime | None
) -> AsyncGenerator[tuple[datetime, str, EmailEvent], None]:

return _fetch_email_events(log, http, since + timedelta(milliseconds=1), until)

def fetch_delayed_email_events(
log: Logger, http: HTTPSession, since: datetime, until: datetime
log: Logger, http: HTTPSession, _: bool, since: datetime, until: datetime
) -> AsyncGenerator[tuple[datetime, str, EmailEvent], None]:

return _fetch_email_events(log, http, since, until)
Expand Down
Loading

0 comments on commit 446c3d0

Please sign in to comment.