From 67fdac005784a11bc8ff64c4ad1dd47ee99ef10b Mon Sep 17 00:00:00 2001 From: Will Baker Date: Wed, 8 Jan 2025 19:21:28 -0500 Subject: [PATCH 1/2] source-hubspot-native: enable the property history configuration This makes the property history configuration active, so that users can optionally opt-in to capturing property history. --- source-hubspot-native/config.yaml | 5 +++-- source-hubspot-native/source_hubspot_native/api.py | 8 ++++++++ source-hubspot-native/source_hubspot_native/resources.py | 5 +---- 3 files changed, 12 insertions(+), 6 deletions(-) diff --git a/source-hubspot-native/config.yaml b/source-hubspot-native/config.yaml index 0c3e9e764f..1a3fcf469c 100644 --- a/source-hubspot-native/config.yaml +++ b/source-hubspot-native/config.yaml @@ -4,6 +4,7 @@ credentials: credentials_title: OAuth Credentials refresh_token_sops: ENC[AES256_GCM,data:pv1aNqErp9iHsVpNk1uPExiwQfuAq2KFmAS9okXiBDAChIu4,iv:PWxe5ZwPd5GGOiW2MtEucGbrRbZs5U0zLYIvTsNBw5c=,tag:dblxNOWFmXynn1oqdslWBQ==,type:str] token_expiry_date: "2024-02-01T17:01:21.703Z" +capturePropertyHistory: true sops: kms: [] gcp_kms: @@ -13,8 +14,8 @@ sops: azure_kv: [] hc_vault: [] age: [] - lastmodified: "2024-02-15T19:21:33Z" - mac: ENC[AES256_GCM,data:Z2MzPnszwAZ9uT7GBh0YWPibdvq7KdCOYvxpbGhQw8b+O5picVD5T3I+UOZNNbxrE7sYHo+HJ2FrJTkqOrT7RZyfmmjdv/cZuy5ecRJSIyhJrTu4GXLCfeHRyj0bbjZ6kif/qUMG2POUO/ZJXV9exXbi0X5HQSWe2hgBxYAUk60=,iv:l3WdO+yhJjfeX9xA/QQO91EH2KP8ICnzvpfi4eBYfu8=,tag:NGQDHhl8byl/hrrzSsOpSQ==,type:str] + lastmodified: "2025-01-09T00:37:44Z" + mac: ENC[AES256_GCM,data:TAhWgN4kUjhDAXrGIchfwTjicxL0mPrJ0LjUAJ1DyxbLuohwkQ2vNZ138Zb8DnfPT6P9wxnaipD7sGtTf6PwyXBivavvQPptVsAYknnMTdDsnCTnXi82h91vtey5wVglsEyAYbv5RpJArQjysPJldN15NUr/MjJNnSEldoFGSoE=,iv:U8Yi0puOzVXNwDK4amI8AGaKL2Ek0TQQbYf/mIC97yE=,tag:JAzqHF0xSpvT8k/N4QSDQA==,type:str] pgp: [] encrypted_suffix: _sops version: 3.8.1 diff --git a/source-hubspot-native/source_hubspot_native/api.py b/source-hubspot-native/source_hubspot_native/api.py index 1431739633..d7ff61fa86 100644 --- a/source-hubspot-native/source_hubspot_native/api.py +++ b/source-hubspot-native/source_hubspot_native/api.py @@ -497,8 +497,16 @@ async def _batches_gen() -> AsyncGenerator[Awaitable[Iterable[tuple[datetime, st for batch_it in itertools.batched(recent, 50 if with_history else 100): yield _do_batch_fetch(list(batch_it)) + total = len(recent) + if total >= 10_000: + log.info("will process large batch of changes with associations", {"total": total}) + + count = 0 async for res in buffer_ordered(_batches_gen(), 3): for ts, id, doc in res: + count += 1 + if count > 0 and count % 10_000 == 0: + log.info("fetching changes with associations", {"count": count, "total": total}) yield ts, id, doc diff --git a/source-hubspot-native/source_hubspot_native/resources.py b/source-hubspot-native/source_hubspot_native/resources.py index 62e510da85..e392e82afb 100644 --- a/source-hubspot-native/source_hubspot_native/resources.py +++ b/source-hubspot-native/source_hubspot_native/resources.py @@ -84,10 +84,7 @@ async def all_resources( # Docs reference: https://developers.hubspot.com/docs/api/crm/crm-custom-objects#retrieve-existing-custom-objects custom_object_path_components = [f"p_{n}" for n in custom_object_names] - # TODO(whb): Set this value from the endpoint configuration after all - # pre-existing tasks have had the option set to True. - # with_history = config.capturePropertyHistory - with_history = True + with_history = config.capturePropertyHistory custom_object_resources = [ crm_object_with_associations( From 729d298ea77dbb35694929e25a7c56093614ae64 Mon Sep 17 00:00:00 2001 From: Will Baker Date: Wed, 8 Jan 2025 19:56:17 -0500 Subject: [PATCH 2/2] source-hubspot-native: retry failures to fetch batches with associations HubSpot APIs will sometimes return random 500 errors, or timeouts. The connector often needs to string together thousands of successful API calls to work through a delayed fetch of a time window, so these random errors causing a connector restart results in a lot of downtime and wasted work. This adds some retry logic to retry these requests a few times before crashing the connector. --- .../source_hubspot_native/api.py | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/source-hubspot-native/source_hubspot_native/api.py b/source-hubspot-native/source_hubspot_native/api.py index d7ff61fa86..3b287a654b 100644 --- a/source-hubspot-native/source_hubspot_native/api.py +++ b/source-hubspot-native/source_hubspot_native/api.py @@ -487,9 +487,19 @@ async def _do_batch_fetch(batch: list[tuple[datetime, str]]) -> Iterable[tuple[d # Enable lookup of datetimes for IDs from the result batch. dts = {id: dt for dt, id in batch} - documents: BatchResult[CRMObject] = await fetch_batch_with_associations( - log, cls, http, with_history, object_name, [id for _, id in batch] - ) + attempt = 1 + while True: + try: + documents: BatchResult[CRMObject] = await fetch_batch_with_associations( + log, cls, http, with_history, object_name, [id for _, id in batch] + ) + break + except Exception as e: + if attempt == 5: + raise + log.warning("failed to fetch batch with associations (will retry)", {"error": str(e), "attempt": attempt}) + await asyncio.sleep(attempt * 2) + attempt += 1 return ((dts[str(doc.id)], str(doc.id), doc) for doc in documents.results)