Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

estuary-cdk: additional SIGQUIT logging & patching requests.Session.send for Airbyte imports #2380

Merged
merged 3 commits into from
Feb 18, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions estuary-cdk/estuary_cdk/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,11 @@ def dump_all_tasks(signum, frame):
if task is this_task:
continue

log.info("Attempting to inject SIGQUIT exception into task.", {
"task.get_name()": task.get_name(),
"task.get_coro().__name__": task.get_coro().__name__,
})

# Reach inside the task coroutine to inject an exception, which
# will unwind the task stack and lets us print a precise stack trace.
try:
Expand Down
15 changes: 15 additions & 0 deletions estuary-cdk/estuary_cdk/requests_session_send_patch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import requests

# Airbyte's CDK does not set a timeout for HTTP requests, so we patch it to always have a timeout.

DEFAULT_TIMEOUT = 60 * 60 * 6 # 6 hours

lib_send = requests.Session.send

def send(*args, **kwargs):
if kwargs.get("timeout", None) is None:
kwargs["timeout"] = DEFAULT_TIMEOUT

return lib_send(*args, **kwargs)

setattr(requests.Session, "send", send)
3 changes: 2 additions & 1 deletion estuary-cdk/estuary_cdk/shim_airbyte_cdk.py
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ async def _run(
await asyncio.sleep(0)

if task.stopping.event.is_set():
task.log.debug(f"Airbyte shim is yielding to stop.")
task.log.info(f"Airbyte shim is yielding to stop.")
return

if record := message.record:
Expand Down Expand Up @@ -449,5 +449,6 @@ async def _run(
raise RuntimeError("unexpected AirbyteMessage", message)

# Emit a final checkpoint before exiting.
task.log.info("Emitting final checkpoint for sweep.")
task.checkpoint(connector_state, merge_patch=False)
return None
1 change: 1 addition & 0 deletions source-airtable/source_airtable/__main__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import estuary_cdk.pydantic_polyfill # Must be first.
import estuary_cdk.requests_session_send_patch # Must be second.

import asyncio
from estuary_cdk import shim_airbyte_cdk, flow
Expand Down
1 change: 1 addition & 0 deletions source-asana/source_asana/__main__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import estuary_cdk.pydantic_polyfill # Must be first.
import estuary_cdk.requests_session_send_patch # Must be second.

import asyncio
from estuary_cdk import shim_airbyte_cdk, flow
Expand Down
1 change: 1 addition & 0 deletions source-brevo/source_brevo/__main__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import estuary_cdk.pydantic_polyfill # Must be first.
import estuary_cdk.requests_session_send_patch # Must be second.

import asyncio

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import estuary_cdk.pydantic_polyfill # Must be first.
import estuary_cdk.requests_session_send_patch # Must be second.

import asyncio
from estuary_cdk import shim_airbyte_cdk, flow
Expand Down
1 change: 1 addition & 0 deletions source-google-ads/source_google_ads/__main__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import estuary_cdk.pydantic_polyfill # Must be first.
import estuary_cdk.requests_session_send_patch # Must be second.

import asyncio
from estuary_cdk import shim_airbyte_cdk, flow
Expand Down
1 change: 1 addition & 0 deletions source-hubspot/source_hubspot/__main__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import estuary_cdk.pydantic_polyfill # Must be first.
import estuary_cdk.requests_session_send_patch # Must be second.

import asyncio
import urllib
Expand Down
1 change: 1 addition & 0 deletions source-iterable/source_iterable/__main__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import estuary_cdk.pydantic_polyfill # Must be first.
import estuary_cdk.requests_session_send_patch # Must be second.

import asyncio
import urllib
Expand Down
1 change: 1 addition & 0 deletions source-jira-native/source_jira_native/__main__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import estuary_cdk.pydantic_polyfill # Must be first.
import estuary_cdk.requests_session_send_patch # Must be second.

import asyncio
import urllib
Expand Down
1 change: 1 addition & 0 deletions source-klaviyo/source_klaviyo/__main__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import estuary_cdk.pydantic_polyfill # Must be first.
import estuary_cdk.requests_session_send_patch # Must be second.

import asyncio
import urllib
Expand Down
1 change: 1 addition & 0 deletions source-linkedin-ads-v2/source_linkedin_ads_v2/__main__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import estuary_cdk.pydantic_polyfill # Must be first.
import estuary_cdk.requests_session_send_patch # Must be second.

import asyncio
import urllib
Expand Down
1 change: 1 addition & 0 deletions source-linkedin-pages/source_linkedin_pages/__main__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import estuary_cdk.pydantic_polyfill # Must be first. # noqa
import estuary_cdk.requests_session_send_patch # Must be second.

import asyncio
from estuary_cdk import shim_airbyte_cdk, flow
Expand Down
1 change: 1 addition & 0 deletions source-mixpanel-native/source_mixpanel_native/__main__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import estuary_cdk.pydantic_polyfill # Must be first.
import estuary_cdk.requests_session_send_patch # Must be second.

import asyncio
import urllib
Expand Down
1 change: 1 addition & 0 deletions source-recharge/source_recharge/__main__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import estuary_cdk.pydantic_polyfill # Must be first.
import estuary_cdk.requests_session_send_patch # Must be second.

import asyncio
import urllib
Expand Down
1 change: 1 addition & 0 deletions source-twilio/source_twilio/__main__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import estuary_cdk.pydantic_polyfill # Must be first.
import estuary_cdk.requests_session_send_patch # Must be second.

import asyncio
import urllib
Expand Down
1 change: 1 addition & 0 deletions source-zendesk-support/source_zendesk_support/__main__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import estuary_cdk.pydantic_polyfill # Must be first.
import estuary_cdk.requests_session_send_patch # Must be second.

import asyncio
import json
Expand Down
Loading