Skip to content

Commit

Permalink
estuary-cdk: extend ResourceConfig` to allow a cron expression for …
Browse files Browse the repository at this point in the history
…re-initializing a resource's state on a schedule

For a couple capture connectors, bindings need backfilled on some schedule.
So far, we've been manually backfilling these bindings every day/week,
and it would be nice if a resource config level setting existed to
automatically backfill.

This commit extends the existing `ResourceConfig` with the
`ResourceConfigWithSchedule` class, allowing resources to specify a
schedule for their state to be re-initialized (i.e backfill the binding).
This schedule must either be an empty string or a valid cron expression
accepted by `pycron`.

In order to know when a binding's state was last initialized, the
`last_initialized` property has been added to all bindings' states.

Since the resource config is the same for all bindings in a
connector, a connector that needs to use `ResourceConfigWithSchedule`
for at least one binding must use it for all bindings.

`pycron` was chosen as the cron package for a few reasons:
- The typically clear choice for a Python cron package `croniter` may be
  unpublished from PyPI in the near future due to the maintainer's
  concerns around EU CRA laws & liabilities. It is not clear that a
  different maintainer will take over & keep `croniter` available, and
  beyond forking & maintaining our own `croniter` repo, I don't see a
  longterm solution to using that package in our code.
- `pycron` has the minimal functionality we need & appears to be somewhat
  actively maintained & used by others.

If there are bindings with cron schedules, we spawn a coroutine to stop
the connector at the soonest future scheduled re-initialzation time to
adhere to the bindings' schedules relatively closely.
  • Loading branch information
Alex-Bair committed Feb 24, 2025
1 parent d76a67f commit e893266
Show file tree
Hide file tree
Showing 4 changed files with 151 additions and 3 deletions.
76 changes: 73 additions & 3 deletions estuary-cdk/estuary_cdk/capture/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
)
from pydantic import AwareDatetime, BaseModel, Field, NonNegativeInt

from ..cron import next_fire
from ..flow import (
AccessToken,
BaseOAuth2Credentials,
Expand Down Expand Up @@ -112,6 +113,25 @@ def path(self) -> list[str]:
_ResourceConfig = TypeVar("_ResourceConfig", bound=ResourceConfig)


CRON_REGEX = (r"^"
r"((?:[0-5]?\d(?:-[0-5]?\d)?|\*(?:/[0-5]?\d)?)(?:,(?:[0-5]?\d(?:-[0-5]?\d)?|\*(?:/[0-5]?\d)?))*)\s+" # minute
r"((?:[01]?\d|2[0-3]|(?:[01]?\d|2[0-3])-(?:[01]?\d|2[0-3])|\*(?:/[01]?\d|/2[0-3])?)(?:,(?:[01]?\d|2[0-3]|(?:[01]?\d|2[0-3])-(?:[01]?\d|2[0-3])|\*(?:/[01]?\d|/2[0-3])?))*)\s+" # hour
r"((?:0?[1-9]|[12]\d|3[01]|(?:0?[1-9]|[12]\d|3[01])-(?:0?[1-9]|[12]\d|3[01])|\*(?:/[0-9]|/1[0-9]|/2[0-9]|/3[01])?)(?:,(?:0?[1-9]|[12]\d|3[01]|(?:0?[1-9]|[12]\d|3[01])-(?:0?[1-9]|[12]\d|3[01])|\*(?:/[0-9]|/1[0-9]|/2[0-9]|/3[01])?))*)\s+" # day of month
r"((?:[1-9]|1[0-2]|(?:[1-9]|1[0-2])-(?:[1-9]|1[0-2])|\*(?:/[1-9]|/1[0-2])?)(?:,(?:[1-9]|1[0-2]|(?:[1-9]|1[0-2])-(?:[1-9]|1[0-2])|\*(?:/[1-9]|/1[0-2])?))*)\s+" # month
r"((?:[0-6]|(?:[0-6])-(?:[0-6])|\*(?:/[0-6])?)(?:,(?:[0-6]|(?:[0-6])-(?:[0-6])|\*(?:/[0-6])?))*)" # day of week
r"$|^$" # Empty string to signify no schedule
)


class ResourceConfigWithSchedule(ResourceConfig):
schedule: str = Field(
default="",
title="Schedule",
description="Schedule to automatically rebackfill this binding. Accepts a cron expression.",
pattern=CRON_REGEX
)


class BaseResourceState(abc.ABC, BaseModel, extra="forbid"):
"""
AbstractResourceState is a base class for ResourceState classes.
Expand Down Expand Up @@ -169,6 +189,8 @@ class Snapshot(BaseModel, extra="forbid"):

snapshot: Snapshot | None = Field(default=None, description="Snapshot progress")

last_initialized: datetime | None = Field(default=None, description="The last time this state was initialized.")


_ResourceState = TypeVar("_ResourceState", bound=ResourceState)

Expand Down Expand Up @@ -409,19 +431,55 @@ async def _run(task: Task):
)
)

soonest_future_scheduled_initialization: datetime | None = None

for index, (binding, resource) in enumerate(resolved_bindings):
state: _ResourceState | None = open.state.bindingStateV1.get(
binding.stateKey
)

if state is None or binding.stateKey in backfill_requests:
should_initialize = state is None or binding.stateKey in backfill_requests

if state:
if state.last_initialized is None:
state.last_initialized = datetime.now(tz=UTC)
task.checkpoint(
ConnectorState(
bindingStateV1={binding.stateKey: state}
)
)

if isinstance(binding.resourceConfig, ResourceConfigWithSchedule):
cron_schedule = binding.resourceConfig.schedule
next_scheduled_initialization = next_fire(cron_schedule, state.last_initialized)

if next_scheduled_initialization and next_scheduled_initialization < datetime.now(tz=UTC):
# Re-initialize the binding if we missed a scheduled re-initialization.
should_initialize = True
if state.backfill:
task.log.warning(
f"Scheduled backfill for binding {resource.name} is taking precedence over its ongoing backfill."
" Please extend the binding's configured cron schedule if you'd like the previous backfill to"
" complete before the next scheduled backfill starts."
)

next_scheduled_initialization = next_fire(cron_schedule, datetime.now(tz=UTC))

if next_scheduled_initialization and soonest_future_scheduled_initialization:
soonest_future_scheduled_initialization = min(soonest_future_scheduled_initialization, next_scheduled_initialization)
elif next_scheduled_initialization:
soonest_future_scheduled_initialization = next_scheduled_initialization

if should_initialize:
# Checkpoint the binding's initialized state prior to any processing.
state = resource.initial_state
state.last_initialized = datetime.now(tz=UTC)

task.checkpoint(
ConnectorState(
bindingStateV1={binding.stateKey: resource.initial_state}
bindingStateV1={binding.stateKey: state}
)
)
state = resource.initial_state

resource.open(
binding,
Expand All @@ -431,6 +489,18 @@ async def _run(task: Task):
resolved_bindings
)

async def scheduled_stop(future_dt: datetime | None) -> None:
if not future_dt:
return None

sleep_duration = future_dt - datetime.now(tz=UTC)
await asyncio.sleep(sleep_duration.total_seconds())
task.stopping.event.set()

# Gracefully exit to ensure relatively close adherence to any bindings'
# re-initialization schedules.
asyncio.create_task(scheduled_stop(soonest_future_scheduled_initialization))

return (response.Opened(explicitAcknowledgements=False), _run)


Expand Down
25 changes: 25 additions & 0 deletions estuary-cdk/estuary_cdk/cron.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
from datetime import datetime, UTC, timedelta

import pycron

CONNECTOR_RESTART_INTERVAL = timedelta(hours=24)
SMALLEST_CRON_INCREMENT = timedelta(minutes=1)


# next_fire returns the earliest datetime between start and end (exclusive) that matches the cron expression.
def next_fire(
cron_expression: str,
start: datetime,
end: datetime = datetime.now(tz=UTC) + CONNECTOR_RESTART_INTERVAL
) -> datetime | None:
if not cron_expression:
return None

dt = start.replace(second=0, microsecond=0)

while dt < end:
dt += SMALLEST_CRON_INCREMENT
if pycron.is_now(cron_expression, dt):
return dt

return None
1 change: 1 addition & 0 deletions estuary-cdk/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ orjson = "^3.9.15"
pydantic = ">1.10,<3"
xxhash = "^3.4.1"
ijson = "^3.3.0"
pycron = "^3.1.2"

[tool.poetry.group.dev.dependencies]
debugpy = "^1.8.0"
Expand Down
52 changes: 52 additions & 0 deletions estuary-cdk/tests/test_cron_regex.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import pytest
import re
from estuary_cdk.capture.common import CRON_REGEX

pattern = re.compile(CRON_REGEX)

@pytest.mark.parametrize("cron", [
# Empty input
"",
# Wildcards for all positions.
"* * * * *",
# Steps
"*/5 * * * *",
# Minimum values
"0 0 1 1 0",
# Maximum values
"59 23 31 12 6",
# Lists
"0,30,1 0,12 1,4,5,23 3,4,6 1,4",
# Ranges
"0-59 0-23 1-31 1-12 0-6",
])
def test_valid_cron(cron):
assert pattern.match(cron) is not None

@pytest.mark.parametrize("cron", [
# Number of arguments
"*",
"* *",
"* * * *",
"* * * * * *",
# Invalid characters
"abc123 * * * *",
# Negative numbers
"-1 * * * *",
# Beyond min/max
"60 * * * *",
"* 24 * * *",
"* * 0 * *",
"* * 32 * *",
"* * * 0 *",
"* * * 13 *",
"* * * * 7",
# Invalid list syntax
",0,1 * * * *",
"0,1, * * * *",
# Invalid range syntax
"0-1- * * * *",
"0- * * * *",
])
def test_invalid_cron(cron):
assert pattern.match(cron) is None

0 comments on commit e893266

Please sign in to comment.