diff --git a/estuary-cdk/estuary_cdk/capture/common.py b/estuary-cdk/estuary_cdk/capture/common.py index dc788c1c09..13079f3fbf 100644 --- a/estuary-cdk/estuary_cdk/capture/common.py +++ b/estuary-cdk/estuary_cdk/capture/common.py @@ -18,6 +18,7 @@ ) from pydantic import AwareDatetime, BaseModel, Field, NonNegativeInt +from ..cron import next_fire from ..flow import ( AccessToken, BaseOAuth2Credentials, @@ -112,6 +113,25 @@ def path(self) -> list[str]: _ResourceConfig = TypeVar("_ResourceConfig", bound=ResourceConfig) +CRON_REGEX = (r"^" + r"((?:[0-5]?\d(?:-[0-5]?\d)?|\*(?:/[0-5]?\d)?)(?:,(?:[0-5]?\d(?:-[0-5]?\d)?|\*(?:/[0-5]?\d)?))*)\s+" # minute + r"((?:[01]?\d|2[0-3]|(?:[01]?\d|2[0-3])-(?:[01]?\d|2[0-3])|\*(?:/[01]?\d|/2[0-3])?)(?:,(?:[01]?\d|2[0-3]|(?:[01]?\d|2[0-3])-(?:[01]?\d|2[0-3])|\*(?:/[01]?\d|/2[0-3])?))*)\s+" # hour + r"((?:0?[1-9]|[12]\d|3[01]|(?:0?[1-9]|[12]\d|3[01])-(?:0?[1-9]|[12]\d|3[01])|\*(?:/[0-9]|/1[0-9]|/2[0-9]|/3[01])?)(?:,(?:0?[1-9]|[12]\d|3[01]|(?:0?[1-9]|[12]\d|3[01])-(?:0?[1-9]|[12]\d|3[01])|\*(?:/[0-9]|/1[0-9]|/2[0-9]|/3[01])?))*)\s+" # day of month + r"((?:[1-9]|1[0-2]|(?:[1-9]|1[0-2])-(?:[1-9]|1[0-2])|\*(?:/[1-9]|/1[0-2])?)(?:,(?:[1-9]|1[0-2]|(?:[1-9]|1[0-2])-(?:[1-9]|1[0-2])|\*(?:/[1-9]|/1[0-2])?))*)\s+" # month + r"((?:[0-6]|(?:[0-6])-(?:[0-6])|\*(?:/[0-6])?)(?:,(?:[0-6]|(?:[0-6])-(?:[0-6])|\*(?:/[0-6])?))*)" # day of week + r"$|^$" # Empty string to signify no schedule +) + + +class ResourceConfigWithSchedule(ResourceConfig): + schedule: str = Field( + default="", + title="Schedule", + description="Schedule to automatically rebackfill this binding. Accepts a cron expression.", + pattern=CRON_REGEX + ) + + class BaseResourceState(abc.ABC, BaseModel, extra="forbid"): """ AbstractResourceState is a base class for ResourceState classes. @@ -169,6 +189,8 @@ class Snapshot(BaseModel, extra="forbid"): snapshot: Snapshot | None = Field(default=None, description="Snapshot progress") + last_initialized: datetime | None = Field(default=None, description="The last time this state was initialized.") + _ResourceState = TypeVar("_ResourceState", bound=ResourceState) @@ -409,19 +431,55 @@ async def _run(task: Task): ) ) + soonest_future_scheduled_initialization: datetime | None = None + for index, (binding, resource) in enumerate(resolved_bindings): state: _ResourceState | None = open.state.bindingStateV1.get( binding.stateKey ) - if state is None or binding.stateKey in backfill_requests: + should_initialize = state is None or binding.stateKey in backfill_requests + + if state: + if state.last_initialized is None: + state.last_initialized = datetime.now(tz=UTC) + task.checkpoint( + ConnectorState( + bindingStateV1={binding.stateKey: state} + ) + ) + + if isinstance(binding.resourceConfig, ResourceConfigWithSchedule): + cron_schedule = binding.resourceConfig.schedule + next_scheduled_initialization = next_fire(cron_schedule, state.last_initialized) + + if next_scheduled_initialization and next_scheduled_initialization < datetime.now(tz=UTC): + # Re-initialize the binding if we missed a scheduled re-initialization. + should_initialize = True + if state.backfill: + task.log.warning( + f"Scheduled backfill for binding {resource.name} is taking precedence over its ongoing backfill." + " Please extend the binding's configured cron schedule if you'd like the previous backfill to" + " complete before the next scheduled backfill starts." + ) + + next_scheduled_initialization = next_fire(cron_schedule, datetime.now(tz=UTC)) + + if next_scheduled_initialization and soonest_future_scheduled_initialization: + soonest_future_scheduled_initialization = min(soonest_future_scheduled_initialization, next_scheduled_initialization) + elif next_scheduled_initialization: + soonest_future_scheduled_initialization = next_scheduled_initialization + + if should_initialize: # Checkpoint the binding's initialized state prior to any processing. + state = resource.initial_state + state.last_initialized = datetime.now(tz=UTC) + task.checkpoint( ConnectorState( - bindingStateV1={binding.stateKey: resource.initial_state} + bindingStateV1={binding.stateKey: state} ) ) - state = resource.initial_state resource.open( binding, @@ -431,6 +489,18 @@ async def _run(task: Task): resolved_bindings ) + async def scheduled_stop(future_dt: datetime | None) -> None: + if not future_dt: + return None + + sleep_duration = future_dt - datetime.now(tz=UTC) + await asyncio.sleep(sleep_duration.total_seconds()) + task.stopping.event.set() + + # Gracefully exit to ensure relatively close adherence to any bindings' + # re-initialization schedules. + asyncio.create_task(scheduled_stop(soonest_future_scheduled_initialization)) + return (response.Opened(explicitAcknowledgements=False), _run) diff --git a/estuary-cdk/estuary_cdk/cron.py b/estuary-cdk/estuary_cdk/cron.py new file mode 100644 index 0000000000..459a861cda --- /dev/null +++ b/estuary-cdk/estuary_cdk/cron.py @@ -0,0 +1,25 @@ +from datetime import datetime, UTC, timedelta + +import pycron + +CONNECTOR_RESTART_INTERVAL = timedelta(hours=24) +SMALLEST_CRON_INCREMENT = timedelta(minutes=1) + + +# next_fire returns the earliest datetime between start and end (exclusive) that matches the cron expression. +def next_fire( + cron_expression: str, + start: datetime, + end: datetime = datetime.now(tz=UTC) + CONNECTOR_RESTART_INTERVAL +) -> datetime | None: + if not cron_expression: + return None + + dt = start.replace(second=0, microsecond=0) + + while dt < end: + dt += SMALLEST_CRON_INCREMENT + if pycron.is_now(cron_expression, dt): + return dt + + return None diff --git a/estuary-cdk/pyproject.toml b/estuary-cdk/pyproject.toml index 0db8eb1db3..5e8e1b9630 100644 --- a/estuary-cdk/pyproject.toml +++ b/estuary-cdk/pyproject.toml @@ -14,6 +14,7 @@ orjson = "^3.9.15" pydantic = ">1.10,<3" xxhash = "^3.4.1" ijson = "^3.3.0" +pycron = "^3.1.2" [tool.poetry.group.dev.dependencies] debugpy = "^1.8.0" diff --git a/estuary-cdk/tests/test_cron_regex.py b/estuary-cdk/tests/test_cron_regex.py new file mode 100644 index 0000000000..6949bfba97 --- /dev/null +++ b/estuary-cdk/tests/test_cron_regex.py @@ -0,0 +1,52 @@ +import pytest +import re +from estuary_cdk.capture.common import CRON_REGEX + +pattern = re.compile(CRON_REGEX) + +@pytest.mark.parametrize("cron", [ + # Empty input + "", + # Wildcards for all positions. + "* * * * *", + # Steps + "*/5 * * * *", + # Minimum values + "0 0 1 1 0", + # Maximum values + "59 23 31 12 6", + # Lists + "0,30,1 0,12 1,4,5,23 3,4,6 1,4", + # Ranges + "0-59 0-23 1-31 1-12 0-6", +]) +def test_valid_cron(cron): + assert pattern.match(cron) is not None + +@pytest.mark.parametrize("cron", [ + # Number of arguments + "*", + "* *", + "* * * *", + "* * * * * *", + # Invalid characters + "abc123 * * * *", + # Negative numbers + "-1 * * * *", + # Beyond min/max + "60 * * * *", + "* 24 * * *", + "* * 0 * *", + "* * 32 * *", + "* * * 0 *", + "* * * 13 *", + "* * * * 7", + # Invalid list syntax + ",0,1 * * * *", + "0,1, * * * *", + # Invalid range syntax + "0-1- * * * *", + "0- * * * *", +]) +def test_invalid_cron(cron): + assert pattern.match(cron) is None