Skip to content

Commit

Permalink
source-mysql-batch: Add 'daily at 6:00Z' polling schedule
Browse files Browse the repository at this point in the history
Previously we just had duration strings to specify polling
intervals for batch captures. This commit factors out that
duration handling into a separate 'schedule' helper package
and defines another type of schedule: `"daily at HH:MMZ"`
which consistently executes at the requested time of day (in UTC).

This refactoring will be applied to other batch SQL captures
in a followup commit.
  • Loading branch information
willdonnelly committed Feb 16, 2024
1 parent 3e06e50 commit b4ef86d
Show file tree
Hide file tree
Showing 6 changed files with 169 additions and 36 deletions.
75 changes: 75 additions & 0 deletions go/schedule/schedule.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package schedule

import (
"context"
"fmt"
"strings"
"time"
)

// A Schedule represents a sequence of points in time when an action should be performed.
type Schedule interface {
// Next returns the earliest instant in time greater than `afterTime` which
// satisfies the schedule.
Next(afterTime time.Time) time.Time
}

// Validate checks whether the provided schedule description is valid and returns an error if it's incorrect.
func Validate(desc string) error {
var _, err = Parse(desc)
return err
}

// Parse turns a textual schedule description into an object with the Schedule interface.
func Parse(desc string) (Schedule, error) {
if pollInterval, err := time.ParseDuration(desc); err == nil {
return &periodicSchedule{Period: pollInterval}, nil
}
if strings.HasPrefix(desc, "daily at ") {
var timeOfDay, err = time.Parse("15:04Z", strings.TrimPrefix(desc, "daily at "))
if err != nil {
return nil, fmt.Errorf("invalid time %q (time of day should look like '13:00Z'): %w", timeOfDay, err)
}
return &dailySchedule{TimeOfDay: timeOfDay}, nil
}
return nil, fmt.Errorf("invalid polling schedule %q", desc)
}

type periodicSchedule struct {
Period time.Duration
}

func (s *periodicSchedule) Next(after time.Time) time.Time {
return after.Add(s.Period)
}

type dailySchedule struct {
TimeOfDay time.Time
}

func (s *dailySchedule) Next(after time.Time) time.Time {
// Construct a timestamp with the appropriate time of day, on the same day as the
// 'after' timestamp. Then increment it day by day until it's actually greater than
// the 'after' timestamp.
var yyyy, mm, dd = after.UTC().Date()
var t = time.Date(yyyy, mm, dd, s.TimeOfDay.Hour(), s.TimeOfDay.Minute(), s.TimeOfDay.Second(), 0, time.UTC)
if !t.After(after) {
t = t.AddDate(0, 0, 1)
}
return t
}

// WaitForNext sleeps until the next scheduled execution time, or until the
// context is cancelled.
func WaitForNext(ctx context.Context, s Schedule, after time.Time) error {
var d = time.Until(s.Next(after))
if d <= 0 {
return nil
}
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(d):
return nil
}
}
64 changes: 64 additions & 0 deletions go/schedule/schedule_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package schedule

import (
"testing"
"time"

"github.com/stretchr/testify/require"
)

func TestPeriodicSchedule(t *testing.T) {
for _, tc := range []struct {
Schedule string
After string
Expect string
}{
{"1h", "2024-02-15T05:00:00Z", "2024-02-15T06:00:00Z"},
{"2h", "2024-02-15T12:34:56Z", "2024-02-15T14:34:56Z"},
{"6h", "2024-02-15T19:34:56Z", "2024-02-16T01:34:56Z"},
{"24h", "2024-02-15T19:34:56Z", "2024-02-16T19:34:56Z"},
{"168h", "2024-02-15T19:34:56Z", "2024-02-22T19:34:56Z"},
{"30m", "2024-02-15T19:34:56Z", "2024-02-15T20:04:56Z"},
{"10m", "2024-02-15T19:34:56Z", "2024-02-15T19:44:56Z"},
{"1m", "2024-02-15T19:34:56Z", "2024-02-15T19:35:56Z"},
{"30s", "2024-02-15T19:34:56Z", "2024-02-15T19:35:26Z"},
{"10s", "2024-02-15T19:34:56Z", "2024-02-15T19:35:06Z"},
{"1s", "2024-02-15T19:34:56Z", "2024-02-15T19:34:57Z"},
} {
sched, err := Parse(tc.Schedule)
require.NoError(t, err)
after, err := time.Parse(time.RFC3339, tc.After)
require.NoError(t, err)
var ts = sched.Next(after)
require.Equal(t, tc.Expect, ts.Format(time.RFC3339))
}
}

func TestDailySchedule(t *testing.T) {
for _, tc := range []struct {
Schedule string
After string
Expect string
}{
{"daily at 6:00Z", "2024-02-15T01:00:00Z", "2024-02-15T06:00:00Z"},
{"daily at 6:00Z", "2024-02-15T05:00:00Z", "2024-02-15T06:00:00Z"},
{"daily at 6:00Z", "2024-02-15T06:00:00Z", "2024-02-16T06:00:00Z"},
{"daily at 6:00Z", "2024-02-15T07:00:00Z", "2024-02-16T06:00:00Z"},
{"daily at 06:00Z", "2024-02-15T01:00:00Z", "2024-02-15T06:00:00Z"},
{"daily at 06:00Z", "2024-02-15T05:00:00Z", "2024-02-15T06:00:00Z"},
{"daily at 06:00Z", "2024-02-15T06:00:00Z", "2024-02-16T06:00:00Z"},
{"daily at 06:00Z", "2024-02-15T07:00:00Z", "2024-02-16T06:00:00Z"},
{"daily at 13:55Z", "2024-02-15T01:00:00Z", "2024-02-15T13:55:00Z"},
{"daily at 13:55Z", "2024-02-15T13:54:00Z", "2024-02-15T13:55:00Z"},
{"daily at 13:55Z", "2024-02-15T13:55:00Z", "2024-02-16T13:55:00Z"},
{"daily at 13:55Z", "2024-02-15T13:56:00Z", "2024-02-16T13:55:00Z"},
{"daily at 13:55Z", "2024-02-15T23:00:00Z", "2024-02-16T13:55:00Z"},
} {
sched, err := Parse(tc.Schedule)
require.NoError(t, err)
after, err := time.Parse(time.RFC3339, tc.After)
require.NoError(t, err)
var ts = sched.Next(after)
require.Equal(t, tc.Expect, ts.Format(time.RFC3339))
}
}
8 changes: 4 additions & 4 deletions source-mysql-batch/.snapshots/TestSpec
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@
"properties": {
"poll": {
"type": "string",
"title": "Default Poll Interval",
"description": "How often to execute fetch queries. Defaults to 24 hours if unset."
"title": "Default Polling Schedule",
"description": "When and how often to execute fetch queries. Defaults to '24h' if unset."
},
"dbname": {
"type": "string",
Expand Down Expand Up @@ -76,8 +76,8 @@
},
"poll": {
"type": "string",
"title": "Poll Interval",
"description": "How often to execute the fetch query (overrides the connector default setting)",
"title": "Polling Schedule",
"description": "When and how often to execute the fetch query (overrides the connector default setting)",
"order": 1
}
},
Expand Down
42 changes: 18 additions & 24 deletions source-mysql-batch/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"time"

"github.com/estuary/connectors/go/encrow"
"github.com/estuary/connectors/go/schedule"
schemagen "github.com/estuary/connectors/go/schema-gen"
boilerplate "github.com/estuary/connectors/source-boilerplate"
pc "github.com/estuary/flow/go/protocols/capture"
Expand Down Expand Up @@ -50,7 +51,7 @@ type Resource struct {
Name string `json:"name" jsonschema:"title=Name,description=The unique name of this resource." jsonschema_extras:"order=0"`
Template string `json:"template" jsonschema:"title=Query Template,description=The query template (pkg.go.dev/text/template) which will be rendered and then executed." jsonschema_extras:"multiline=true,order=3"`
Cursor []string `json:"cursor,omitempty" jsonschema:"title=Cursor Columns,description=The names of columns which should be persisted between query executions as a cursor." jsonschema_extras:"order=2"`
PollInterval string `json:"poll,omitempty" jsonschema:"title=Poll Interval,description=How often to execute the fetch query (overrides the connector default setting)" jsonschema_extras:"order=1"`
PollSchedule string `json:"poll,omitempty" jsonschema:"title=Polling Schedule,description=When and how often to execute the fetch query (overrides the connector default setting)" jsonschema_extras:"order=1"`
}

// Validate checks that the resource spec possesses all required properties.
Expand All @@ -70,9 +71,9 @@ func (r Resource) Validate() error {
if slices.Contains(r.Cursor, "") {
return fmt.Errorf("cursor column names can't be empty (got %q)", r.Cursor)
}
if r.PollInterval != "" {
if _, err := time.ParseDuration(r.PollInterval); err != nil {
return fmt.Errorf("invalid poll interval %q: %w", r.PollInterval, err)
if r.PollSchedule != "" {
if err := schedule.Validate(r.PollSchedule); err != nil {
return fmt.Errorf("invalid polling schedule %q: %w", r.PollSchedule, err)
}
}
return nil
Expand Down Expand Up @@ -577,7 +578,7 @@ func (c *capture) worker(ctx context.Context, binding *bindingInfo) error {
"name": res.Name,
"tmpl": res.Template,
"cursor": res.Cursor,
"poll": res.PollInterval,
"poll": res.PollSchedule,
}).Info("starting worker")

var queryTemplate, err = template.New("query").Parse(res.Template)
Expand Down Expand Up @@ -670,32 +671,25 @@ func (c *capture) poll(ctx context.Context, binding *bindingInfo, tmpl *template

// Polling interval can be configured per binding. If unset, falls back to the
// connector global polling interval.
var pollStr = c.Config.Advanced.PollInterval
if res.PollInterval != "" {
pollStr = res.PollInterval
var pollScheduleStr = c.Config.Advanced.PollSchedule
if res.PollSchedule != "" {
pollScheduleStr = res.PollSchedule
}
pollInterval, err := time.ParseDuration(pollStr)
var pollSchedule, err = schedule.Parse(pollScheduleStr)
if err != nil {
return fmt.Errorf("invalid poll interval %q: %w", res.PollInterval, err)
return fmt.Errorf("failed to parse polling schedule %q: %w", pollScheduleStr, err)
}

// Sleep until it's been more than <pollInterval> since the last iteration.
if !state.LastPolled.IsZero() && time.Since(state.LastPolled) < pollInterval {
var sleepDuration = time.Until(state.LastPolled.Add(pollInterval))
log.WithFields(log.Fields{
"name": res.Name,
"wait": sleepDuration.String(),
"poll": pollInterval.String(),
}).Info("waiting for next poll")
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(sleepDuration):
}
log.WithFields(log.Fields{
"name": res.Name,
"poll": pollScheduleStr,
}).Info("waiting for next scheduled poll")
if err := schedule.WaitForNext(ctx, pollSchedule, state.LastPolled); err != nil {
return err
}
log.WithFields(log.Fields{
"name": res.Name,
"poll": pollInterval.String(),
"poll": pollScheduleStr,
"prev": state.LastPolled.Format(time.RFC3339Nano),
}).Info("ready to poll")

Expand Down
14 changes: 7 additions & 7 deletions source-mysql-batch/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ import (
"encoding/json"
"fmt"
"strings"
"time"

cerrors "github.com/estuary/connectors/go/connector-errors"
"github.com/estuary/connectors/go/schedule"
schemagen "github.com/estuary/connectors/go/schema-gen"
boilerplate "github.com/estuary/connectors/source-boilerplate"
"github.com/go-mysql-org/go-mysql/client"
Expand All @@ -27,7 +27,7 @@ type Config struct {
}

type advancedConfig struct {
PollInterval string `json:"poll,omitempty" jsonschema:"title=Default Poll Interval,description=How often to execute fetch queries. Defaults to 24 hours if unset."`
PollSchedule string `json:"poll,omitempty" jsonschema:"title=Default Polling Schedule,description=When and how often to execute fetch queries. Defaults to '24h' if unset."`
DBName string `json:"dbname,omitempty" jsonschema:"title=Database Name,description=The name of database to connect to. In general this shouldn't matter. The connector can discover and capture from all databases it's authorized to access."`
}

Expand All @@ -43,9 +43,9 @@ func (c *Config) Validate() error {
return fmt.Errorf("missing '%s'", req[0])
}
}
if c.Advanced.PollInterval != "" {
if _, err := time.ParseDuration(c.Advanced.PollInterval); err != nil {
return fmt.Errorf("invalid default poll interval %q: %w", c.Advanced.PollInterval, err)
if c.Advanced.PollSchedule != "" {
if err := schedule.Validate(c.Advanced.PollSchedule); err != nil {
return fmt.Errorf("invalid default polling schedule %q: %w", c.Advanced.PollSchedule, err)
}
}
return nil
Expand All @@ -60,8 +60,8 @@ func (c *Config) SetDefaults() {
c.Address += ":3306"
}

if c.Advanced.PollInterval == "" {
c.Advanced.PollInterval = "24h"
if c.Advanced.PollSchedule == "" {
c.Advanced.PollSchedule = "24h"
}
}

Expand Down
2 changes: 1 addition & 1 deletion source-mysql-batch/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func testCaptureSpec(t testing.TB) *st.CaptureSpec {
User: *dbCaptureUser,
Password: *dbCapturePass,
Advanced: advancedConfig{
PollInterval: "200ms",
PollSchedule: "200ms",
},
}

Expand Down

0 comments on commit b4ef86d

Please sign in to comment.