Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

estuary-cdk: allow a cron expression to re-initialize a binding's state #2437

Merged
merged 4 commits into from
Feb 24, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 73 additions & 3 deletions estuary-cdk/estuary_cdk/capture/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
)
from pydantic import AwareDatetime, BaseModel, Field, NonNegativeInt

from ..cron import next_fire
from ..flow import (
AccessToken,
BaseOAuth2Credentials,
Expand Down Expand Up @@ -112,6 +113,25 @@ def path(self) -> list[str]:
_ResourceConfig = TypeVar("_ResourceConfig", bound=ResourceConfig)


CRON_REGEX = (r"^"
r"((?:[0-5]?\d(?:-[0-5]?\d)?|\*(?:/[0-5]?\d)?)(?:,(?:[0-5]?\d(?:-[0-5]?\d)?|\*(?:/[0-5]?\d)?))*)\s+" # minute
r"((?:[01]?\d|2[0-3]|(?:[01]?\d|2[0-3])-(?:[01]?\d|2[0-3])|\*(?:/[01]?\d|/2[0-3])?)(?:,(?:[01]?\d|2[0-3]|(?:[01]?\d|2[0-3])-(?:[01]?\d|2[0-3])|\*(?:/[01]?\d|/2[0-3])?))*)\s+" # hour
r"((?:0?[1-9]|[12]\d|3[01]|(?:0?[1-9]|[12]\d|3[01])-(?:0?[1-9]|[12]\d|3[01])|\*(?:/[0-9]|/1[0-9]|/2[0-9]|/3[01])?)(?:,(?:0?[1-9]|[12]\d|3[01]|(?:0?[1-9]|[12]\d|3[01])-(?:0?[1-9]|[12]\d|3[01])|\*(?:/[0-9]|/1[0-9]|/2[0-9]|/3[01])?))*)\s+" # day of month
r"((?:[1-9]|1[0-2]|(?:[1-9]|1[0-2])-(?:[1-9]|1[0-2])|\*(?:/[1-9]|/1[0-2])?)(?:,(?:[1-9]|1[0-2]|(?:[1-9]|1[0-2])-(?:[1-9]|1[0-2])|\*(?:/[1-9]|/1[0-2])?))*)\s+" # month
r"((?:[0-6]|(?:[0-6])-(?:[0-6])|\*(?:/[0-6])?)(?:,(?:[0-6]|(?:[0-6])-(?:[0-6])|\*(?:/[0-6])?))*)" # day of week
r"$|^$" # Empty string to signify no schedule
)


class ResourceConfigWithSchedule(ResourceConfig):
schedule: str = Field(
default="",
title="Schedule",
description="Schedule to automatically rebackfill this binding. Accepts a cron expression.",
pattern=CRON_REGEX
)


class BaseResourceState(abc.ABC, BaseModel, extra="forbid"):
"""
AbstractResourceState is a base class for ResourceState classes.
Expand Down Expand Up @@ -169,6 +189,8 @@ class Snapshot(BaseModel, extra="forbid"):

snapshot: Snapshot | None = Field(default=None, description="Snapshot progress")

last_initialized: datetime | None = Field(default=None, description="The last time this state was initialized.")


_ResourceState = TypeVar("_ResourceState", bound=ResourceState)

Expand Down Expand Up @@ -409,19 +431,55 @@ async def _run(task: Task):
)
)

soonest_future_scheduled_initialization: datetime | None = None

for index, (binding, resource) in enumerate(resolved_bindings):
state: _ResourceState | None = open.state.bindingStateV1.get(
binding.stateKey
)

if state is None or binding.stateKey in backfill_requests:
should_initialize = state is None or binding.stateKey in backfill_requests

if state:
if state.last_initialized is None:
state.last_initialized = datetime.now(tz=UTC)
task.checkpoint(
ConnectorState(
bindingStateV1={binding.stateKey: state}
)
)

if isinstance(binding.resourceConfig, ResourceConfigWithSchedule):
cron_schedule = binding.resourceConfig.schedule
next_scheduled_initialization = next_fire(cron_schedule, state.last_initialized)

if next_scheduled_initialization and next_scheduled_initialization < datetime.now(tz=UTC):
# Re-initialize the binding if we missed a scheduled re-initialization.
should_initialize = True
if state.backfill:
task.log.warning(
f"Scheduled backfill for binding {resource.name} is taking precedence over its ongoing backfill."
" Please extend the binding's configured cron schedule if you'd like the previous backfill to"
" complete before the next scheduled backfill starts."
)

next_scheduled_initialization = next_fire(cron_schedule, datetime.now(tz=UTC))

if next_scheduled_initialization and soonest_future_scheduled_initialization:
soonest_future_scheduled_initialization = min(soonest_future_scheduled_initialization, next_scheduled_initialization)
elif next_scheduled_initialization:
soonest_future_scheduled_initialization = next_scheduled_initialization

if should_initialize:
# Checkpoint the binding's initialized state prior to any processing.
state = resource.initial_state
state.last_initialized = datetime.now(tz=UTC)

task.checkpoint(
ConnectorState(
bindingStateV1={binding.stateKey: resource.initial_state}
bindingStateV1={binding.stateKey: state}
)
)
state = resource.initial_state

resource.open(
binding,
Expand All @@ -431,6 +489,18 @@ async def _run(task: Task):
resolved_bindings
)

async def scheduled_stop(future_dt: datetime | None) -> None:
if not future_dt:
return None

sleep_duration = future_dt - datetime.now(tz=UTC)
await asyncio.sleep(sleep_duration.total_seconds())
task.stopping.event.set()

# Gracefully exit to ensure relatively close adherence to any bindings'
# re-initialization schedules.
asyncio.create_task(scheduled_stop(soonest_future_scheduled_initialization))

return (response.Opened(explicitAcknowledgements=False), _run)


Expand Down
25 changes: 25 additions & 0 deletions estuary-cdk/estuary_cdk/cron.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
from datetime import datetime, UTC, timedelta

import pycron

CONNECTOR_RESTART_INTERVAL = timedelta(hours=24)
SMALLEST_CRON_INCREMENT = timedelta(minutes=1)


# next_fire returns the earliest datetime between start and end (exclusive) that matches the cron expression.
def next_fire(
cron_expression: str,
start: datetime,
end: datetime = datetime.now(tz=UTC) + CONNECTOR_RESTART_INTERVAL
) -> datetime | None:
if not cron_expression:
return None

dt = start.replace(second=0, microsecond=0)

while dt < end:
dt += SMALLEST_CRON_INCREMENT
if pycron.is_now(cron_expression, dt):
return dt

return None
1 change: 1 addition & 0 deletions estuary-cdk/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ orjson = "^3.9.15"
pydantic = ">1.10,<3"
xxhash = "^3.4.1"
ijson = "^3.3.0"
pycron = "^3.1.2"

[tool.poetry.group.dev.dependencies]
debugpy = "^1.8.0"
Expand Down
52 changes: 52 additions & 0 deletions estuary-cdk/tests/test_cron_regex.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import pytest
import re
from estuary_cdk.capture.common import CRON_REGEX

pattern = re.compile(CRON_REGEX)

@pytest.mark.parametrize("cron", [
# Empty input
"",
# Wildcards for all positions.
"* * * * *",
# Steps
"*/5 * * * *",
# Minimum values
"0 0 1 1 0",
# Maximum values
"59 23 31 12 6",
# Lists
"0,30,1 0,12 1,4,5,23 3,4,6 1,4",
# Ranges
"0-59 0-23 1-31 1-12 0-6",
])
def test_valid_cron(cron):
assert pattern.match(cron) is not None

@pytest.mark.parametrize("cron", [
# Number of arguments
"*",
"* *",
"* * * *",
"* * * * * *",
# Invalid characters
"abc123 * * * *",
# Negative numbers
"-1 * * * *",
# Beyond min/max
"60 * * * *",
"* 24 * * *",
"* * 0 * *",
"* * 32 * *",
"* * * 0 *",
"* * * 13 *",
"* * * * 7",
# Invalid list syntax
",0,1 * * * *",
"0,1, * * * *",
# Invalid range syntax
"0-1- * * * *",
"0- * * * *",
])
def test_invalid_cron(cron):
assert pattern.match(cron) is None
Loading
Loading