Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

source-braintree-native: split incremental streams into distinct incremental & backfill processes #2295

Merged
merged 2 commits into from
Jan 27, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
148 changes: 147 additions & 1 deletion source-braintree-native/source_braintree_native/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@
SEARCH_LIMIT = 10_000
TRANSACTION_SEARCH_LIMIT = 50_000

# Disputes are searched using the received_date field, which is a date not a datetime. Using a window size
# smaller than 24 hours (1 day) would make the stream slower than necessary & fetch duplicate results from
# Braintree, so we enforce that dispute date windows must be at least 24 hours wide.
MIN_DISPUTES_WINDOW_SIZE = 24

TRANSACTION_SEARCH_FIELDS = [
'authorization_expired_at',
'authorized_at',
Expand Down Expand Up @@ -318,6 +323,39 @@ async def fetch_customers(
yield end


async def backfill_customers(
braintree_gateway: BraintreeGateway,
window_size: int,
log: Logger,
page: PageCursor | None,
cutoff: LogCursor,
) -> AsyncGenerator[IncrementalResource | PageCursor, None]:
assert isinstance(page, str)
assert isinstance(cutoff, datetime)

start = _str_to_dt(page)

if start >= cutoff:
return

end, collection = await _determine_window_end(
search_method=braintree_gateway.customer.search,
search_range_node_builder=CustomerSearch.created_at,
start=start,
initial_end=min(start + timedelta(hours=window_size), cutoff),
search_limit=SEARCH_LIMIT,
log=log
)

async for object in _async_iterator_wrapper(collection):
doc = IncrementalResource.model_validate(_braintree_object_to_dict(object))

if doc.created_at < cutoff:
yield doc

yield _dt_to_str(end)


async def fetch_credit_card_verifications(
braintree_gateway: BraintreeGateway,
window_size: int,
Expand Down Expand Up @@ -350,6 +388,39 @@ async def fetch_credit_card_verifications(
yield end


async def backfill_credit_card_verifications(
braintree_gateway: BraintreeGateway,
window_size: int,
log: Logger,
page: PageCursor | None,
cutoff: LogCursor,
) -> AsyncGenerator[IncrementalResource | PageCursor, None]:
assert isinstance(page, str)
assert isinstance(cutoff, datetime)

start = _str_to_dt(page)

if start >= cutoff:
return

end, collection = await _determine_window_end(
search_method=braintree_gateway.verification.search,
search_range_node_builder=CreditCardVerificationSearch.created_at,
start=start,
initial_end=min(start + timedelta(hours=window_size), cutoff),
search_limit=SEARCH_LIMIT,
log=log
)

async for object in _async_iterator_wrapper(collection):
doc = IncrementalResource.model_validate(_braintree_object_to_dict(object))

if doc.created_at < cutoff:
yield doc

yield _dt_to_str(end)


async def fetch_subscriptions(
braintree_gateway: BraintreeGateway,
window_size: int,
Expand Down Expand Up @@ -382,6 +453,39 @@ async def fetch_subscriptions(
yield end


async def backfill_subscriptions(
braintree_gateway: BraintreeGateway,
window_size: int,
log: Logger,
page: PageCursor | None,
cutoff: LogCursor,
) -> AsyncGenerator[IncrementalResource | PageCursor, None]:
assert isinstance(page, str)
assert isinstance(cutoff, datetime)

start = _str_to_dt(page)

if start >= cutoff:
return

end, collection = await _determine_window_end(
search_method=braintree_gateway.subscription.search,
search_range_node_builder=SubscriptionSearch.created_at,
start=start,
initial_end=min(start + timedelta(hours=window_size), cutoff),
search_limit=SEARCH_LIMIT,
log=log
)

async for object in _async_iterator_wrapper(collection):
doc = IncrementalResource.model_validate(_braintree_object_to_dict(object))

if doc.created_at < cutoff:
yield doc

yield _dt_to_str(end)


def _are_same_day(start: datetime, end: datetime) -> bool:
return start.date() == end.date()

Expand All @@ -394,7 +498,7 @@ async def fetch_disputes(
) -> AsyncGenerator[IncrementalResource | LogCursor, None]:
assert isinstance(log_cursor, datetime)
most_recent_created_at = log_cursor
window_end = log_cursor + timedelta(hours=window_size)
window_end = log_cursor + timedelta(hours=max(window_size, MIN_DISPUTES_WINDOW_SIZE))
end = min(window_end, datetime.now(tz=UTC))

# Braintree does not let us search disputes based on the created_at field, and the received_date field is
Expand Down Expand Up @@ -428,3 +532,45 @@ async def fetch_disputes(
yield most_recent_created_at
else:
yield end


async def backfill_disputes(
braintree_gateway: BraintreeGateway,
window_size: int,
log: Logger,
page: PageCursor | None,
cutoff: LogCursor,
) -> AsyncGenerator[IncrementalResource | PageCursor, None]:
assert isinstance(page, str)
assert isinstance(cutoff, datetime)

start = _str_to_dt(page)

if start >= cutoff:
return

window_end = start + timedelta(hours=max(window_size, MIN_DISPUTES_WINDOW_SIZE))
end = min(window_end, cutoff)

# Due to the potential day difference between received_date and created_at,
# always search the previous day for results created in this date window as well.
search_start = start - timedelta(days=1)

search_result = await asyncio.to_thread(
braintree_gateway.dispute.search,
DisputeSearch.received_date.between(search_start, end),
)

count = 0

async for object in _async_iterator_wrapper(search_result.disputes):
count += 1
doc = IncrementalResource.model_validate(_braintree_object_to_dict(object))

if start < doc.created_at <= end:
yield doc

if count >= SEARCH_LIMIT:
raise RuntimeError(_search_limit_error_message(count, "disputes"))

yield _dt_to_str(end)
6 changes: 6 additions & 0 deletions source-braintree-native/source_braintree_native/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
BaseDocument,
ConnectorState as GenericConnectorState,
LogCursor,
PageCursor,
ResourceConfig,
ResourceState,
)
Expand Down Expand Up @@ -86,3 +87,8 @@ class Transaction(IncrementalResource):
[BraintreeGateway, int, Logger, LogCursor],
AsyncGenerator[IncrementalResource | LogCursor, None],
]

IncrementalResourceFetchPageFn = Callable[
[BraintreeGateway, int, Logger, PageCursor, LogCursor],
AsyncGenerator[IncrementalResource | PageCursor, None],
]
32 changes: 23 additions & 9 deletions source-braintree-native/source_braintree_native/resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
FullRefreshResource,
IncrementalResource,
IncrementalResourceFetchChangesFn,
IncrementalResourceFetchPageFn,
)

from .api import (
Expand All @@ -25,9 +26,13 @@
fetch_transactions,
backfill_transactions,
fetch_credit_card_verifications,
backfill_credit_card_verifications,
fetch_customers,
backfill_customers,
fetch_disputes,
backfill_disputes,
fetch_subscriptions,
backfill_subscriptions,
)


Expand All @@ -39,12 +44,12 @@
("plans", "plan", None),
]

# Supported incremental resources and their corresponding name and fetch_changes function.
INCREMENTAL_RESOURCES: list[tuple[str, IncrementalResourceFetchChangesFn]] = [
("credit_card_verifications", fetch_credit_card_verifications),
("customers", fetch_customers),
("disputes", fetch_disputes),
("subscriptions", fetch_subscriptions),
# Supported incremental resources and their corresponding name, fetch_changes function, and fetch_page function.
INCREMENTAL_RESOURCES: list[tuple[str, IncrementalResourceFetchChangesFn, IncrementalResourceFetchPageFn]] = [
("credit_card_verifications", fetch_credit_card_verifications, backfill_credit_card_verifications),
("customers", fetch_customers, backfill_customers),
("disputes", fetch_disputes, backfill_disputes),
("subscriptions", fetch_subscriptions, backfill_subscriptions),
]


Expand Down Expand Up @@ -122,6 +127,7 @@ def incremental_resources(

def open(
fetch_changes_fn: IncrementalResourceFetchChangesFn,
fetch_page_fn: IncrementalResourceFetchPageFn,
gateway: BraintreeGateway,
window_size: int,
binding: CaptureBinding[ResourceConfig],
Expand All @@ -140,23 +146,31 @@ def open(
gateway,
window_size,
),
fetch_page=functools.partial(
fetch_page_fn,
gateway,
window_size,
)
)

cutoff = datetime.now(tz=UTC).replace(microsecond=0)

return [
common.Resource(
name=name,
key=["/id"],
model=IncrementalResource,
open=functools.partial(open, fetch_changes_fn, _create_gateway(config), config.advanced.window_size),
open=functools.partial(open, fetch_changes_fn, fetch_page_fn, _create_gateway(config), config.advanced.window_size),
initial_state=ResourceState(
inc=ResourceState.Incremental(cursor=config.start_date),
inc=ResourceState.Incremental(cursor=cutoff),
backfill=ResourceState.Backfill(next_page=_dt_to_str(config.start_date), cutoff=cutoff)
),
initial_config=ResourceConfig(
name=name, interval=timedelta(minutes=5)
),
schema_inference=True,
)
for (name, fetch_changes_fn) in INCREMENTAL_RESOURCES
for (name, fetch_changes_fn, fetch_page_fn) in INCREMENTAL_RESOURCES
]


Expand Down
Loading