diff --git a/estuary-cdk/estuary_cdk/__init__.py b/estuary-cdk/estuary_cdk/__init__.py index 591d48e77b..bf4b7d8297 100644 --- a/estuary-cdk/estuary_cdk/__init__.py +++ b/estuary-cdk/estuary_cdk/__init__.py @@ -95,6 +95,11 @@ def dump_all_tasks(signum, frame): if task is this_task: continue + log.info("Attempting to inject SIGQUIT exception into task.", { + "task.get_name()": task.get_name(), + "task.get_coro().__name__": task.get_coro().__name__, + }) + # Reach inside the task coroutine to inject an exception, which # will unwind the task stack and lets us print a precise stack trace. try: diff --git a/estuary-cdk/estuary_cdk/requests_session_send_patch.py b/estuary-cdk/estuary_cdk/requests_session_send_patch.py new file mode 100644 index 0000000000..e4fbcbdcdb --- /dev/null +++ b/estuary-cdk/estuary_cdk/requests_session_send_patch.py @@ -0,0 +1,15 @@ +import requests + +# Airbyte's CDK does not set a timeout for HTTP requests, so we patch it to always have a timeout. + +DEFAULT_TIMEOUT = 60 * 60 * 6 # 6 hours + +lib_send = requests.Session.send + +def send(*args, **kwargs): + if kwargs.get("timeout", None) is None: + kwargs["timeout"] = DEFAULT_TIMEOUT + + return lib_send(*args, **kwargs) + +setattr(requests.Session, "send", send) diff --git a/estuary-cdk/estuary_cdk/shim_airbyte_cdk.py b/estuary-cdk/estuary_cdk/shim_airbyte_cdk.py index d902c2e9c9..ef0fcdf718 100644 --- a/estuary-cdk/estuary_cdk/shim_airbyte_cdk.py +++ b/estuary-cdk/estuary_cdk/shim_airbyte_cdk.py @@ -352,7 +352,7 @@ async def _run( await asyncio.sleep(0) if task.stopping.event.is_set(): - task.log.debug(f"Airbyte shim is yielding to stop.") + task.log.info(f"Airbyte shim is yielding to stop.") return if record := message.record: @@ -449,5 +449,6 @@ async def _run( raise RuntimeError("unexpected AirbyteMessage", message) # Emit a final checkpoint before exiting. + task.log.info("Emitting final checkpoint for sweep.") task.checkpoint(connector_state, merge_patch=False) return None diff --git a/source-airtable/source_airtable/__main__.py b/source-airtable/source_airtable/__main__.py index d5bdb20c5c..a3fc0ff8bb 100644 --- a/source-airtable/source_airtable/__main__.py +++ b/source-airtable/source_airtable/__main__.py @@ -1,4 +1,5 @@ import estuary_cdk.pydantic_polyfill # Must be first. +import estuary_cdk.requests_session_send_patch # Must be second. import asyncio from estuary_cdk import shim_airbyte_cdk, flow diff --git a/source-asana/source_asana/__main__.py b/source-asana/source_asana/__main__.py index f40486290f..892b0bd90e 100644 --- a/source-asana/source_asana/__main__.py +++ b/source-asana/source_asana/__main__.py @@ -1,4 +1,5 @@ import estuary_cdk.pydantic_polyfill # Must be first. +import estuary_cdk.requests_session_send_patch # Must be second. import asyncio from estuary_cdk import shim_airbyte_cdk, flow diff --git a/source-brevo/source_brevo/__main__.py b/source-brevo/source_brevo/__main__.py index 364b551c66..cea31f646a 100644 --- a/source-brevo/source_brevo/__main__.py +++ b/source-brevo/source_brevo/__main__.py @@ -1,4 +1,5 @@ import estuary_cdk.pydantic_polyfill # Must be first. +import estuary_cdk.requests_session_send_patch # Must be second. import asyncio diff --git a/source-facebook-marketing/source_facebook_marketing/__main__.py b/source-facebook-marketing/source_facebook_marketing/__main__.py index 4947caebe0..2178251662 100644 --- a/source-facebook-marketing/source_facebook_marketing/__main__.py +++ b/source-facebook-marketing/source_facebook_marketing/__main__.py @@ -1,4 +1,5 @@ import estuary_cdk.pydantic_polyfill # Must be first. +import estuary_cdk.requests_session_send_patch # Must be second. import asyncio from estuary_cdk import shim_airbyte_cdk, flow diff --git a/source-google-ads/source_google_ads/__main__.py b/source-google-ads/source_google_ads/__main__.py index ae2741e73f..1b7ac3dcfc 100644 --- a/source-google-ads/source_google_ads/__main__.py +++ b/source-google-ads/source_google_ads/__main__.py @@ -1,4 +1,5 @@ import estuary_cdk.pydantic_polyfill # Must be first. +import estuary_cdk.requests_session_send_patch # Must be second. import asyncio from estuary_cdk import shim_airbyte_cdk, flow diff --git a/source-hubspot/source_hubspot/__main__.py b/source-hubspot/source_hubspot/__main__.py index e800bdb77d..33d63798b4 100644 --- a/source-hubspot/source_hubspot/__main__.py +++ b/source-hubspot/source_hubspot/__main__.py @@ -1,4 +1,5 @@ import estuary_cdk.pydantic_polyfill # Must be first. +import estuary_cdk.requests_session_send_patch # Must be second. import asyncio import urllib diff --git a/source-iterable/source_iterable/__main__.py b/source-iterable/source_iterable/__main__.py index e7959592dc..81a7c17160 100644 --- a/source-iterable/source_iterable/__main__.py +++ b/source-iterable/source_iterable/__main__.py @@ -1,4 +1,5 @@ import estuary_cdk.pydantic_polyfill # Must be first. +import estuary_cdk.requests_session_send_patch # Must be second. import asyncio import urllib diff --git a/source-jira-native/source_jira_native/__main__.py b/source-jira-native/source_jira_native/__main__.py index 0e206e15ba..a1feada166 100644 --- a/source-jira-native/source_jira_native/__main__.py +++ b/source-jira-native/source_jira_native/__main__.py @@ -1,4 +1,5 @@ import estuary_cdk.pydantic_polyfill # Must be first. +import estuary_cdk.requests_session_send_patch # Must be second. import asyncio import urllib diff --git a/source-klaviyo/source_klaviyo/__main__.py b/source-klaviyo/source_klaviyo/__main__.py index 09f128ae36..99fe4b1483 100644 --- a/source-klaviyo/source_klaviyo/__main__.py +++ b/source-klaviyo/source_klaviyo/__main__.py @@ -1,4 +1,5 @@ import estuary_cdk.pydantic_polyfill # Must be first. +import estuary_cdk.requests_session_send_patch # Must be second. import asyncio import urllib diff --git a/source-linkedin-ads-v2/source_linkedin_ads_v2/__main__.py b/source-linkedin-ads-v2/source_linkedin_ads_v2/__main__.py index f230784673..562f8ae77d 100644 --- a/source-linkedin-ads-v2/source_linkedin_ads_v2/__main__.py +++ b/source-linkedin-ads-v2/source_linkedin_ads_v2/__main__.py @@ -1,4 +1,5 @@ import estuary_cdk.pydantic_polyfill # Must be first. +import estuary_cdk.requests_session_send_patch # Must be second. import asyncio import urllib diff --git a/source-linkedin-pages/source_linkedin_pages/__main__.py b/source-linkedin-pages/source_linkedin_pages/__main__.py index 6d0bc61f86..3f5d26fa62 100644 --- a/source-linkedin-pages/source_linkedin_pages/__main__.py +++ b/source-linkedin-pages/source_linkedin_pages/__main__.py @@ -1,4 +1,5 @@ import estuary_cdk.pydantic_polyfill # Must be first. # noqa +import estuary_cdk.requests_session_send_patch # Must be second. import asyncio from estuary_cdk import shim_airbyte_cdk, flow diff --git a/source-mixpanel-native/source_mixpanel_native/__main__.py b/source-mixpanel-native/source_mixpanel_native/__main__.py index 452dda612c..ab12c184ad 100644 --- a/source-mixpanel-native/source_mixpanel_native/__main__.py +++ b/source-mixpanel-native/source_mixpanel_native/__main__.py @@ -1,4 +1,5 @@ import estuary_cdk.pydantic_polyfill # Must be first. +import estuary_cdk.requests_session_send_patch # Must be second. import asyncio import urllib diff --git a/source-recharge/source_recharge/__main__.py b/source-recharge/source_recharge/__main__.py index 9f6a1b89c1..f5189e4355 100644 --- a/source-recharge/source_recharge/__main__.py +++ b/source-recharge/source_recharge/__main__.py @@ -1,4 +1,5 @@ import estuary_cdk.pydantic_polyfill # Must be first. +import estuary_cdk.requests_session_send_patch # Must be second. import asyncio import urllib diff --git a/source-twilio/source_twilio/__main__.py b/source-twilio/source_twilio/__main__.py index 7dfdae9c17..a6d64c6ea8 100644 --- a/source-twilio/source_twilio/__main__.py +++ b/source-twilio/source_twilio/__main__.py @@ -1,4 +1,5 @@ import estuary_cdk.pydantic_polyfill # Must be first. +import estuary_cdk.requests_session_send_patch # Must be second. import asyncio import urllib diff --git a/source-zendesk-support/source_zendesk_support/__main__.py b/source-zendesk-support/source_zendesk_support/__main__.py index f44a639ff6..6ac265fb2d 100644 --- a/source-zendesk-support/source_zendesk_support/__main__.py +++ b/source-zendesk-support/source_zendesk_support/__main__.py @@ -1,4 +1,5 @@ import estuary_cdk.pydantic_polyfill # Must be first. +import estuary_cdk.requests_session_send_patch # Must be second. import asyncio import json