From bb19a8b4c5bc191aea1ae45d4fb495f516b62a9b Mon Sep 17 00:00:00 2001 From: Alex Bair Date: Tue, 11 Feb 2025 10:21:40 -0500 Subject: [PATCH 1/3] estuary-cdk: additional logging to debug infrequent hanging imported connectors There have been instances where imported Airbyte connectors just hang without any apparent network traffic. Debugging this has been difficult, so this commit adds some additional logging to help troubleshoot what coroutines exist when a SIGQUIT is injected. --- estuary-cdk/estuary_cdk/__init__.py | 5 +++++ estuary-cdk/estuary_cdk/shim_airbyte_cdk.py | 3 ++- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/estuary-cdk/estuary_cdk/__init__.py b/estuary-cdk/estuary_cdk/__init__.py index 591d48e77b..bf4b7d8297 100644 --- a/estuary-cdk/estuary_cdk/__init__.py +++ b/estuary-cdk/estuary_cdk/__init__.py @@ -95,6 +95,11 @@ def dump_all_tasks(signum, frame): if task is this_task: continue + log.info("Attempting to inject SIGQUIT exception into task.", { + "task.get_name()": task.get_name(), + "task.get_coro().__name__": task.get_coro().__name__, + }) + # Reach inside the task coroutine to inject an exception, which # will unwind the task stack and lets us print a precise stack trace. try: diff --git a/estuary-cdk/estuary_cdk/shim_airbyte_cdk.py b/estuary-cdk/estuary_cdk/shim_airbyte_cdk.py index d902c2e9c9..ef0fcdf718 100644 --- a/estuary-cdk/estuary_cdk/shim_airbyte_cdk.py +++ b/estuary-cdk/estuary_cdk/shim_airbyte_cdk.py @@ -352,7 +352,7 @@ async def _run( await asyncio.sleep(0) if task.stopping.event.is_set(): - task.log.debug(f"Airbyte shim is yielding to stop.") + task.log.info(f"Airbyte shim is yielding to stop.") return if record := message.record: @@ -449,5 +449,6 @@ async def _run( raise RuntimeError("unexpected AirbyteMessage", message) # Emit a final checkpoint before exiting. + task.log.info("Emitting final checkpoint for sweep.") task.checkpoint(connector_state, merge_patch=False) return None From 6b3b4560c042590522e277b530850709cff40f66 Mon Sep 17 00:00:00 2001 From: Alex Bair Date: Tue, 11 Feb 2025 10:23:36 -0500 Subject: [PATCH 2/3] estuary-cdk: patch requests session send method to always have a timeout The `requests` package does not have a default timeout for its `send` method. I did not see any code in version 0.52.10 of the `airbyte_cdk` (the version almost all of the imported connectors use) that passes a timeout argument, so I'm patching the `send` method to use a default timeout if one isn't provided. This is to help troubleshoot & hopefully avoid imported Airbyte connectors that infrequently hang without any network traffic. --- .../estuary_cdk/requests_session_send_patch.py | 15 +++++++++++++++ 1 file changed, 15 insertions(+) create mode 100644 estuary-cdk/estuary_cdk/requests_session_send_patch.py diff --git a/estuary-cdk/estuary_cdk/requests_session_send_patch.py b/estuary-cdk/estuary_cdk/requests_session_send_patch.py new file mode 100644 index 0000000000..e4fbcbdcdb --- /dev/null +++ b/estuary-cdk/estuary_cdk/requests_session_send_patch.py @@ -0,0 +1,15 @@ +import requests + +# Airbyte's CDK does not set a timeout for HTTP requests, so we patch it to always have a timeout. + +DEFAULT_TIMEOUT = 60 * 60 * 6 # 6 hours + +lib_send = requests.Session.send + +def send(*args, **kwargs): + if kwargs.get("timeout", None) is None: + kwargs["timeout"] = DEFAULT_TIMEOUT + + return lib_send(*args, **kwargs) + +setattr(requests.Session, "send", send) From 4e17524b98ffb8b5984cf15457c7d0439262b79c Mon Sep 17 00:00:00 2001 From: Alex Bair Date: Tue, 11 Feb 2025 17:04:33 -0500 Subject: [PATCH 3/3] source-* (Airbyte imports): patch default timeout into requests package --- source-airtable/source_airtable/__main__.py | 1 + source-asana/source_asana/__main__.py | 1 + source-brevo/source_brevo/__main__.py | 1 + source-facebook-marketing/source_facebook_marketing/__main__.py | 1 + source-google-ads/source_google_ads/__main__.py | 1 + source-hubspot/source_hubspot/__main__.py | 1 + source-iterable/source_iterable/__main__.py | 1 + source-jira-native/source_jira_native/__main__.py | 1 + source-klaviyo/source_klaviyo/__main__.py | 1 + source-linkedin-ads-v2/source_linkedin_ads_v2/__main__.py | 1 + source-linkedin-pages/source_linkedin_pages/__main__.py | 1 + source-mixpanel-native/source_mixpanel_native/__main__.py | 1 + source-recharge/source_recharge/__main__.py | 1 + source-twilio/source_twilio/__main__.py | 1 + source-zendesk-support/source_zendesk_support/__main__.py | 1 + 15 files changed, 15 insertions(+) diff --git a/source-airtable/source_airtable/__main__.py b/source-airtable/source_airtable/__main__.py index d5bdb20c5c..a3fc0ff8bb 100644 --- a/source-airtable/source_airtable/__main__.py +++ b/source-airtable/source_airtable/__main__.py @@ -1,4 +1,5 @@ import estuary_cdk.pydantic_polyfill # Must be first. +import estuary_cdk.requests_session_send_patch # Must be second. import asyncio from estuary_cdk import shim_airbyte_cdk, flow diff --git a/source-asana/source_asana/__main__.py b/source-asana/source_asana/__main__.py index f40486290f..892b0bd90e 100644 --- a/source-asana/source_asana/__main__.py +++ b/source-asana/source_asana/__main__.py @@ -1,4 +1,5 @@ import estuary_cdk.pydantic_polyfill # Must be first. +import estuary_cdk.requests_session_send_patch # Must be second. import asyncio from estuary_cdk import shim_airbyte_cdk, flow diff --git a/source-brevo/source_brevo/__main__.py b/source-brevo/source_brevo/__main__.py index 364b551c66..cea31f646a 100644 --- a/source-brevo/source_brevo/__main__.py +++ b/source-brevo/source_brevo/__main__.py @@ -1,4 +1,5 @@ import estuary_cdk.pydantic_polyfill # Must be first. +import estuary_cdk.requests_session_send_patch # Must be second. import asyncio diff --git a/source-facebook-marketing/source_facebook_marketing/__main__.py b/source-facebook-marketing/source_facebook_marketing/__main__.py index 4947caebe0..2178251662 100644 --- a/source-facebook-marketing/source_facebook_marketing/__main__.py +++ b/source-facebook-marketing/source_facebook_marketing/__main__.py @@ -1,4 +1,5 @@ import estuary_cdk.pydantic_polyfill # Must be first. +import estuary_cdk.requests_session_send_patch # Must be second. import asyncio from estuary_cdk import shim_airbyte_cdk, flow diff --git a/source-google-ads/source_google_ads/__main__.py b/source-google-ads/source_google_ads/__main__.py index ae2741e73f..1b7ac3dcfc 100644 --- a/source-google-ads/source_google_ads/__main__.py +++ b/source-google-ads/source_google_ads/__main__.py @@ -1,4 +1,5 @@ import estuary_cdk.pydantic_polyfill # Must be first. +import estuary_cdk.requests_session_send_patch # Must be second. import asyncio from estuary_cdk import shim_airbyte_cdk, flow diff --git a/source-hubspot/source_hubspot/__main__.py b/source-hubspot/source_hubspot/__main__.py index e800bdb77d..33d63798b4 100644 --- a/source-hubspot/source_hubspot/__main__.py +++ b/source-hubspot/source_hubspot/__main__.py @@ -1,4 +1,5 @@ import estuary_cdk.pydantic_polyfill # Must be first. +import estuary_cdk.requests_session_send_patch # Must be second. import asyncio import urllib diff --git a/source-iterable/source_iterable/__main__.py b/source-iterable/source_iterable/__main__.py index e7959592dc..81a7c17160 100644 --- a/source-iterable/source_iterable/__main__.py +++ b/source-iterable/source_iterable/__main__.py @@ -1,4 +1,5 @@ import estuary_cdk.pydantic_polyfill # Must be first. +import estuary_cdk.requests_session_send_patch # Must be second. import asyncio import urllib diff --git a/source-jira-native/source_jira_native/__main__.py b/source-jira-native/source_jira_native/__main__.py index 0e206e15ba..a1feada166 100644 --- a/source-jira-native/source_jira_native/__main__.py +++ b/source-jira-native/source_jira_native/__main__.py @@ -1,4 +1,5 @@ import estuary_cdk.pydantic_polyfill # Must be first. +import estuary_cdk.requests_session_send_patch # Must be second. import asyncio import urllib diff --git a/source-klaviyo/source_klaviyo/__main__.py b/source-klaviyo/source_klaviyo/__main__.py index 09f128ae36..99fe4b1483 100644 --- a/source-klaviyo/source_klaviyo/__main__.py +++ b/source-klaviyo/source_klaviyo/__main__.py @@ -1,4 +1,5 @@ import estuary_cdk.pydantic_polyfill # Must be first. +import estuary_cdk.requests_session_send_patch # Must be second. import asyncio import urllib diff --git a/source-linkedin-ads-v2/source_linkedin_ads_v2/__main__.py b/source-linkedin-ads-v2/source_linkedin_ads_v2/__main__.py index f230784673..562f8ae77d 100644 --- a/source-linkedin-ads-v2/source_linkedin_ads_v2/__main__.py +++ b/source-linkedin-ads-v2/source_linkedin_ads_v2/__main__.py @@ -1,4 +1,5 @@ import estuary_cdk.pydantic_polyfill # Must be first. +import estuary_cdk.requests_session_send_patch # Must be second. import asyncio import urllib diff --git a/source-linkedin-pages/source_linkedin_pages/__main__.py b/source-linkedin-pages/source_linkedin_pages/__main__.py index 6d0bc61f86..3f5d26fa62 100644 --- a/source-linkedin-pages/source_linkedin_pages/__main__.py +++ b/source-linkedin-pages/source_linkedin_pages/__main__.py @@ -1,4 +1,5 @@ import estuary_cdk.pydantic_polyfill # Must be first. # noqa +import estuary_cdk.requests_session_send_patch # Must be second. import asyncio from estuary_cdk import shim_airbyte_cdk, flow diff --git a/source-mixpanel-native/source_mixpanel_native/__main__.py b/source-mixpanel-native/source_mixpanel_native/__main__.py index 452dda612c..ab12c184ad 100644 --- a/source-mixpanel-native/source_mixpanel_native/__main__.py +++ b/source-mixpanel-native/source_mixpanel_native/__main__.py @@ -1,4 +1,5 @@ import estuary_cdk.pydantic_polyfill # Must be first. +import estuary_cdk.requests_session_send_patch # Must be second. import asyncio import urllib diff --git a/source-recharge/source_recharge/__main__.py b/source-recharge/source_recharge/__main__.py index 9f6a1b89c1..f5189e4355 100644 --- a/source-recharge/source_recharge/__main__.py +++ b/source-recharge/source_recharge/__main__.py @@ -1,4 +1,5 @@ import estuary_cdk.pydantic_polyfill # Must be first. +import estuary_cdk.requests_session_send_patch # Must be second. import asyncio import urllib diff --git a/source-twilio/source_twilio/__main__.py b/source-twilio/source_twilio/__main__.py index 7dfdae9c17..a6d64c6ea8 100644 --- a/source-twilio/source_twilio/__main__.py +++ b/source-twilio/source_twilio/__main__.py @@ -1,4 +1,5 @@ import estuary_cdk.pydantic_polyfill # Must be first. +import estuary_cdk.requests_session_send_patch # Must be second. import asyncio import urllib diff --git a/source-zendesk-support/source_zendesk_support/__main__.py b/source-zendesk-support/source_zendesk_support/__main__.py index f44a639ff6..6ac265fb2d 100644 --- a/source-zendesk-support/source_zendesk_support/__main__.py +++ b/source-zendesk-support/source_zendesk_support/__main__.py @@ -1,4 +1,5 @@ import estuary_cdk.pydantic_polyfill # Must be first. +import estuary_cdk.requests_session_send_patch # Must be second. import asyncio import json