From 92b645b37930fd9681b8e5d400e2684c39720834 Mon Sep 17 00:00:00 2001 From: Will Donnelly Date: Fri, 16 Feb 2024 12:35:31 -0600 Subject: [PATCH] source-{bigquery,postgres}-batch: Apply schedule refactor --- source-bigquery-batch/.snapshots/TestSpec | 8 ++-- source-bigquery-batch/driver.go | 47 ++++++++++------------- source-bigquery-batch/main.go | 14 +++---- source-bigquery-batch/main_test.go | 2 +- source-mysql-batch/driver.go | 5 +-- source-postgres-batch/.snapshots/TestSpec | 8 ++-- source-postgres-batch/driver.go | 47 ++++++++++------------- source-postgres-batch/main.go | 14 +++---- source-postgres-batch/main_test.go | 2 +- 9 files changed, 66 insertions(+), 81 deletions(-) diff --git a/source-bigquery-batch/.snapshots/TestSpec b/source-bigquery-batch/.snapshots/TestSpec index 44aae58ae5..71c3b37e04 100644 --- a/source-bigquery-batch/.snapshots/TestSpec +++ b/source-bigquery-batch/.snapshots/TestSpec @@ -27,8 +27,8 @@ "properties": { "poll": { "type": "string", - "title": "Default Poll Interval", - "description": "How often to execute fetch queries. Defaults to 24 hours if unset." + "title": "Default Polling Schedule", + "description": "When and how often to execute fetch queries. Defaults to '24h' if unset." } }, "additionalProperties": false, @@ -73,8 +73,8 @@ }, "poll": { "type": "string", - "title": "Poll Interval", - "description": "How often to execute the fetch query (overrides the connector default setting)", + "title": "Polling Schedule", + "description": "When and how often to execute the fetch query (overrides the connector default setting)", "order": 1 } }, diff --git a/source-bigquery-batch/driver.go b/source-bigquery-batch/driver.go index 8cda7bc447..d8e03b3650 100644 --- a/source-bigquery-batch/driver.go +++ b/source-bigquery-batch/driver.go @@ -14,6 +14,7 @@ import ( "cloud.google.com/go/bigquery" "github.com/estuary/connectors/go/encrow" + "github.com/estuary/connectors/go/schedule" schemagen "github.com/estuary/connectors/go/schema-gen" boilerplate "github.com/estuary/connectors/source-boilerplate" pc "github.com/estuary/flow/go/protocols/capture" @@ -49,7 +50,7 @@ type Resource struct { Name string `json:"name" jsonschema:"title=Name,description=The unique name of this resource." jsonschema_extras:"order=0"` Template string `json:"template" jsonschema:"title=Query Template,description=The query template (pkg.go.dev/text/template) which will be rendered and then executed." jsonschema_extras:"multiline=true,order=3"` Cursor []string `json:"cursor,omitempty" jsonschema:"title=Cursor Columns,description=The names of columns which should be persisted between query executions as a cursor." jsonschema_extras:"order=2"` - PollInterval string `json:"poll,omitempty" jsonschema:"title=Poll Interval,description=How often to execute the fetch query (overrides the connector default setting)" jsonschema_extras:"order=1"` + PollSchedule string `json:"poll,omitempty" jsonschema:"title=Polling Schedule,description=When and how often to execute the fetch query (overrides the connector default setting)" jsonschema_extras:"order=1"` } // Validate checks that the resource spec possesses all required properties. @@ -69,9 +70,9 @@ func (r Resource) Validate() error { if slices.Contains(r.Cursor, "") { return fmt.Errorf("cursor column names can't be empty (got %q)", r.Cursor) } - if r.PollInterval != "" { - if _, err := time.ParseDuration(r.PollInterval); err != nil { - return fmt.Errorf("invalid poll interval %q: %w", r.PollInterval, err) + if r.PollSchedule != "" { + if err := schedule.Validate(r.PollSchedule); err != nil { + return fmt.Errorf("invalid polling schedule %q: %w", r.PollSchedule, err) } } return nil @@ -575,7 +576,7 @@ func (c *capture) worker(ctx context.Context, binding *bindingInfo) error { "name": res.Name, "tmpl": res.Template, "cursor": res.Cursor, - "poll": res.PollInterval, + "poll": res.PollSchedule, }).Info("starting worker") var queryTemplate, err = template.New("query").Parse(res.Template) @@ -616,34 +617,26 @@ func (c *capture) poll(ctx context.Context, binding *bindingInfo, tmpl *template "CursorFields": quotedCursorNames, } - // Polling interval can be configured per binding. If unset, falls back to the - // connector global polling interval. - var pollStr = c.Config.Advanced.PollInterval - if res.PollInterval != "" { - pollStr = res.PollInterval + // Polling schedule can be configured per binding. If unset, falls back to the + // connector global polling schedule. + var pollScheduleStr = c.Config.Advanced.PollSchedule + if res.PollSchedule != "" { + pollScheduleStr = res.PollSchedule } - pollInterval, err := time.ParseDuration(pollStr) + var pollSchedule, err = schedule.Parse(pollScheduleStr) if err != nil { - return fmt.Errorf("invalid poll interval %q: %w", res.PollInterval, err) + return fmt.Errorf("failed to parse polling schedule %q: %w", pollScheduleStr, err) } - - // Sleep until it's been more than since the last iteration. - if !state.LastPolled.IsZero() && time.Since(state.LastPolled) < pollInterval { - var sleepDuration = time.Until(state.LastPolled.Add(pollInterval)) - log.WithFields(log.Fields{ - "name": res.Name, - "wait": sleepDuration.String(), - "poll": pollInterval.String(), - }).Info("waiting for next poll") - select { - case <-ctx.Done(): - return ctx.Err() - case <-time.After(sleepDuration): - } + log.WithFields(log.Fields{ + "name": res.Name, + "poll": pollScheduleStr, + }).Info("waiting for next scheduled poll") + if err := schedule.WaitForNext(ctx, pollSchedule, state.LastPolled); err != nil { + return err } log.WithFields(log.Fields{ "name": res.Name, - "poll": pollInterval.String(), + "poll": pollScheduleStr, "prev": state.LastPolled.Format(time.RFC3339Nano), }).Info("ready to poll") diff --git a/source-bigquery-batch/main.go b/source-bigquery-batch/main.go index 2b286d05a5..8353e73436 100644 --- a/source-bigquery-batch/main.go +++ b/source-bigquery-batch/main.go @@ -5,9 +5,9 @@ import ( "encoding/json" "fmt" "strings" - "time" "cloud.google.com/go/bigquery" + "github.com/estuary/connectors/go/schedule" schemagen "github.com/estuary/connectors/go/schema-gen" boilerplate "github.com/estuary/connectors/source-boilerplate" log "github.com/sirupsen/logrus" @@ -25,7 +25,7 @@ type Config struct { } type advancedConfig struct { - PollInterval string `json:"poll,omitempty" jsonschema:"title=Default Poll Interval,description=How often to execute fetch queries. Defaults to 24 hours if unset."` + PollSchedule string `json:"poll,omitempty" jsonschema:"title=Default Polling Schedule,description=When and how often to execute fetch queries. Defaults to '24h' if unset."` } // Validate checks that the configuration possesses all required properties. @@ -46,9 +46,9 @@ func (c *Config) Validate() error { if !json.Valid([]byte(c.CredentialsJSON)) { return fmt.Errorf("service account credentials must be valid JSON, and the provided credentials were not") } - if c.Advanced.PollInterval != "" { - if _, err := time.ParseDuration(c.Advanced.PollInterval); err != nil { - return fmt.Errorf("invalid default poll interval %q: %w", c.Advanced.PollInterval, err) + if c.Advanced.PollSchedule != "" { + if err := schedule.Validate(c.Advanced.PollSchedule); err != nil { + return fmt.Errorf("invalid default polling schedule %q: %w", c.Advanced.PollSchedule, err) } } return nil @@ -56,8 +56,8 @@ func (c *Config) Validate() error { // SetDefaults fills in the default values for unset optional parameters. func (c *Config) SetDefaults() { - if c.Advanced.PollInterval == "" { - c.Advanced.PollInterval = "24h" + if c.Advanced.PollSchedule == "" { + c.Advanced.PollSchedule = "24h" } } diff --git a/source-bigquery-batch/main_test.go b/source-bigquery-batch/main_test.go index 4b443d5e2e..a7d163ca6e 100644 --- a/source-bigquery-batch/main_test.go +++ b/source-bigquery-batch/main_test.go @@ -190,7 +190,7 @@ func testCaptureSpec(t testing.TB) *st.CaptureSpec { ProjectID: *projectID, Dataset: *testDataset, Advanced: advancedConfig{ - PollInterval: "200ms", + PollSchedule: "200ms", }, } diff --git a/source-mysql-batch/driver.go b/source-mysql-batch/driver.go index 98ee025eb8..685f779551 100644 --- a/source-mysql-batch/driver.go +++ b/source-mysql-batch/driver.go @@ -669,8 +669,8 @@ func (c *capture) poll(ctx context.Context, binding *bindingInfo, tmpl *template "CursorFields": quotedCursorNames, } - // Polling interval can be configured per binding. If unset, falls back to the - // connector global polling interval. + // Polling schedule can be configured per binding. If unset, falls back to the + // connector global polling schedule. var pollScheduleStr = c.Config.Advanced.PollSchedule if res.PollSchedule != "" { pollScheduleStr = res.PollSchedule @@ -679,7 +679,6 @@ func (c *capture) poll(ctx context.Context, binding *bindingInfo, tmpl *template if err != nil { return fmt.Errorf("failed to parse polling schedule %q: %w", pollScheduleStr, err) } - log.WithFields(log.Fields{ "name": res.Name, "poll": pollScheduleStr, diff --git a/source-postgres-batch/.snapshots/TestSpec b/source-postgres-batch/.snapshots/TestSpec index 27750c3de8..0b8b222d72 100644 --- a/source-postgres-batch/.snapshots/TestSpec +++ b/source-postgres-batch/.snapshots/TestSpec @@ -31,8 +31,8 @@ "properties": { "poll": { "type": "string", - "title": "Default Poll Interval", - "description": "How often to execute fetch queries. Defaults to 5 minutes if unset." + "title": "Default Polling Schedule", + "description": "When and how often to execute fetch queries. Defaults to '5m' if unset." }, "sslmode": { "type": "string", @@ -91,8 +91,8 @@ }, "poll": { "type": "string", - "title": "Poll Interval", - "description": "How often to execute the fetch query (overrides the connector default setting)", + "title": "Polling Schedule", + "description": "When and how often to execute the fetch query (overrides the connector default setting)", "order": 1 } }, diff --git a/source-postgres-batch/driver.go b/source-postgres-batch/driver.go index 2f2af619b5..0c9a4f64ba 100644 --- a/source-postgres-batch/driver.go +++ b/source-postgres-batch/driver.go @@ -13,6 +13,7 @@ import ( "time" "github.com/estuary/connectors/go/encrow" + "github.com/estuary/connectors/go/schedule" schemagen "github.com/estuary/connectors/go/schema-gen" boilerplate "github.com/estuary/connectors/source-boilerplate" pc "github.com/estuary/flow/go/protocols/capture" @@ -47,7 +48,7 @@ type Resource struct { Name string `json:"name" jsonschema:"title=Name,description=The unique name of this resource." jsonschema_extras:"order=0"` Template string `json:"template" jsonschema:"title=Query Template,description=The query template (pkg.go.dev/text/template) which will be rendered and then executed." jsonschema_extras:"multiline=true,order=3"` Cursor []string `json:"cursor" jsonschema:"title=Cursor Columns,description=The names of columns which should be persisted between query executions as a cursor." jsonschema_extras:"order=2"` - PollInterval string `json:"poll,omitempty" jsonschema:"title=Poll Interval,description=How often to execute the fetch query (overrides the connector default setting)" jsonschema_extras:"order=1"` + PollSchedule string `json:"poll,omitempty" jsonschema:"title=Polling Schedule,description=When and how often to execute the fetch query (overrides the connector default setting)" jsonschema_extras:"order=1"` } // Validate checks that the resource spec possesses all required properties. @@ -67,9 +68,9 @@ func (r Resource) Validate() error { if slices.Contains(r.Cursor, "") { return fmt.Errorf("cursor column names can't be empty (got %q)", r.Cursor) } - if r.PollInterval != "" { - if _, err := time.ParseDuration(r.PollInterval); err != nil { - return fmt.Errorf("invalid poll interval %q: %w", r.PollInterval, err) + if r.PollSchedule != "" { + if err := schedule.Validate(r.PollSchedule); err != nil { + return fmt.Errorf("invalid polling schedule %q: %w", r.PollSchedule, err) } } return nil @@ -596,7 +597,7 @@ func (c *capture) worker(ctx context.Context, binding *bindingInfo) error { "name": res.Name, "tmpl": res.Template, "cursor": res.Cursor, - "poll": res.PollInterval, + "poll": res.PollSchedule, }).Info("starting worker") var queryTemplate, err = template.New("query").Parse(res.Template) @@ -632,34 +633,26 @@ func (c *capture) poll(ctx context.Context, binding *bindingInfo, tmpl *template "CursorFields": quotedCursorNames, } - // Polling interval can be configured per binding. If unset, falls back to the - // connector global polling interval. - var pollStr = c.Config.Advanced.PollInterval - if res.PollInterval != "" { - pollStr = res.PollInterval + // Polling schedule can be configured per binding. If unset, falls back to the + // connector global polling schedule. + var pollScheduleStr = c.Config.Advanced.PollSchedule + if res.PollSchedule != "" { + pollScheduleStr = res.PollSchedule } - pollInterval, err := time.ParseDuration(pollStr) + var pollSchedule, err = schedule.Parse(pollScheduleStr) if err != nil { - return fmt.Errorf("invalid poll interval %q: %w", res.PollInterval, err) + return fmt.Errorf("failed to parse polling schedule %q: %w", pollScheduleStr, err) } - - // Sleep until it's been more than since the last iteration. - if !state.LastPolled.IsZero() && time.Since(state.LastPolled) < pollInterval { - var sleepDuration = time.Until(state.LastPolled.Add(pollInterval)) - log.WithFields(log.Fields{ - "name": res.Name, - "wait": sleepDuration.String(), - "poll": pollInterval.String(), - }).Info("waiting for next poll") - select { - case <-ctx.Done(): - return ctx.Err() - case <-time.After(sleepDuration): - } + log.WithFields(log.Fields{ + "name": res.Name, + "poll": pollScheduleStr, + }).Info("waiting for next scheduled poll") + if err := schedule.WaitForNext(ctx, pollSchedule, state.LastPolled); err != nil { + return err } log.WithFields(log.Fields{ "name": res.Name, - "poll": pollInterval.String(), + "poll": pollScheduleStr, "prev": state.LastPolled.Format(time.RFC3339Nano), }).Info("ready to poll") diff --git a/source-postgres-batch/main.go b/source-postgres-batch/main.go index d8687896e4..d0c3f3711f 100644 --- a/source-postgres-batch/main.go +++ b/source-postgres-batch/main.go @@ -7,8 +7,8 @@ import ( "fmt" "net/url" "strings" - "time" + "github.com/estuary/connectors/go/schedule" schemagen "github.com/estuary/connectors/go/schema-gen" boilerplate "github.com/estuary/connectors/source-boilerplate" log "github.com/sirupsen/logrus" @@ -27,7 +27,7 @@ type Config struct { } type advancedConfig struct { - PollInterval string `json:"poll,omitempty" jsonschema:"title=Default Poll Interval,description=How often to execute fetch queries. Defaults to 5 minutes if unset."` + PollSchedule string `json:"poll,omitempty" jsonschema:"title=Default Polling Schedule,description=When and how often to execute fetch queries. Defaults to '5m' if unset."` SSLMode string `json:"sslmode,omitempty" jsonschema:"title=SSL Mode,description=Overrides SSL connection behavior by setting the 'sslmode' parameter.,enum=disable,enum=allow,enum=prefer,enum=require,enum=verify-ca,enum=verify-full"` } @@ -43,9 +43,9 @@ func (c *Config) Validate() error { return fmt.Errorf("missing '%s'", req[0]) } } - if c.Advanced.PollInterval != "" { - if _, err := time.ParseDuration(c.Advanced.PollInterval); err != nil { - return fmt.Errorf("invalid default poll interval %q: %w", c.Advanced.PollInterval, err) + if c.Advanced.PollSchedule != "" { + if err := schedule.Validate(c.Advanced.PollSchedule); err != nil { + return fmt.Errorf("invalid default polling schedule %q: %w", c.Advanced.PollSchedule, err) } } return nil @@ -60,8 +60,8 @@ func (c *Config) SetDefaults() { c.Address += ":5432" } - if c.Advanced.PollInterval == "" { - c.Advanced.PollInterval = "5m" + if c.Advanced.PollSchedule == "" { + c.Advanced.PollSchedule = "5m" } } diff --git a/source-postgres-batch/main_test.go b/source-postgres-batch/main_test.go index 0ae461a3e7..8fbd00b06f 100644 --- a/source-postgres-batch/main_test.go +++ b/source-postgres-batch/main_test.go @@ -99,7 +99,7 @@ func testCaptureSpec(t testing.TB) *st.CaptureSpec { Password: *dbCapturePass, Database: *dbName, Advanced: advancedConfig{ - PollInterval: "200ms", + PollSchedule: "200ms", }, }