Skip to content

Commit

Permalink
source-{bigquery,postgres}-batch: Apply schedule refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
willdonnelly committed Feb 16, 2024
1 parent b4ef86d commit 92b645b
Show file tree
Hide file tree
Showing 9 changed files with 66 additions and 81 deletions.
8 changes: 4 additions & 4 deletions source-bigquery-batch/.snapshots/TestSpec
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@
"properties": {
"poll": {
"type": "string",
"title": "Default Poll Interval",
"description": "How often to execute fetch queries. Defaults to 24 hours if unset."
"title": "Default Polling Schedule",
"description": "When and how often to execute fetch queries. Defaults to '24h' if unset."
}
},
"additionalProperties": false,
Expand Down Expand Up @@ -73,8 +73,8 @@
},
"poll": {
"type": "string",
"title": "Poll Interval",
"description": "How often to execute the fetch query (overrides the connector default setting)",
"title": "Polling Schedule",
"description": "When and how often to execute the fetch query (overrides the connector default setting)",
"order": 1
}
},
Expand Down
47 changes: 20 additions & 27 deletions source-bigquery-batch/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

"cloud.google.com/go/bigquery"
"github.com/estuary/connectors/go/encrow"
"github.com/estuary/connectors/go/schedule"
schemagen "github.com/estuary/connectors/go/schema-gen"
boilerplate "github.com/estuary/connectors/source-boilerplate"
pc "github.com/estuary/flow/go/protocols/capture"
Expand Down Expand Up @@ -49,7 +50,7 @@ type Resource struct {
Name string `json:"name" jsonschema:"title=Name,description=The unique name of this resource." jsonschema_extras:"order=0"`
Template string `json:"template" jsonschema:"title=Query Template,description=The query template (pkg.go.dev/text/template) which will be rendered and then executed." jsonschema_extras:"multiline=true,order=3"`
Cursor []string `json:"cursor,omitempty" jsonschema:"title=Cursor Columns,description=The names of columns which should be persisted between query executions as a cursor." jsonschema_extras:"order=2"`
PollInterval string `json:"poll,omitempty" jsonschema:"title=Poll Interval,description=How often to execute the fetch query (overrides the connector default setting)" jsonschema_extras:"order=1"`
PollSchedule string `json:"poll,omitempty" jsonschema:"title=Polling Schedule,description=When and how often to execute the fetch query (overrides the connector default setting)" jsonschema_extras:"order=1"`
}

// Validate checks that the resource spec possesses all required properties.
Expand All @@ -69,9 +70,9 @@ func (r Resource) Validate() error {
if slices.Contains(r.Cursor, "") {
return fmt.Errorf("cursor column names can't be empty (got %q)", r.Cursor)
}
if r.PollInterval != "" {
if _, err := time.ParseDuration(r.PollInterval); err != nil {
return fmt.Errorf("invalid poll interval %q: %w", r.PollInterval, err)
if r.PollSchedule != "" {
if err := schedule.Validate(r.PollSchedule); err != nil {
return fmt.Errorf("invalid polling schedule %q: %w", r.PollSchedule, err)
}
}
return nil
Expand Down Expand Up @@ -575,7 +576,7 @@ func (c *capture) worker(ctx context.Context, binding *bindingInfo) error {
"name": res.Name,
"tmpl": res.Template,
"cursor": res.Cursor,
"poll": res.PollInterval,
"poll": res.PollSchedule,
}).Info("starting worker")

var queryTemplate, err = template.New("query").Parse(res.Template)
Expand Down Expand Up @@ -616,34 +617,26 @@ func (c *capture) poll(ctx context.Context, binding *bindingInfo, tmpl *template
"CursorFields": quotedCursorNames,
}

// Polling interval can be configured per binding. If unset, falls back to the
// connector global polling interval.
var pollStr = c.Config.Advanced.PollInterval
if res.PollInterval != "" {
pollStr = res.PollInterval
// Polling schedule can be configured per binding. If unset, falls back to the
// connector global polling schedule.
var pollScheduleStr = c.Config.Advanced.PollSchedule
if res.PollSchedule != "" {
pollScheduleStr = res.PollSchedule
}
pollInterval, err := time.ParseDuration(pollStr)
var pollSchedule, err = schedule.Parse(pollScheduleStr)
if err != nil {
return fmt.Errorf("invalid poll interval %q: %w", res.PollInterval, err)
return fmt.Errorf("failed to parse polling schedule %q: %w", pollScheduleStr, err)
}

// Sleep until it's been more than <pollInterval> since the last iteration.
if !state.LastPolled.IsZero() && time.Since(state.LastPolled) < pollInterval {
var sleepDuration = time.Until(state.LastPolled.Add(pollInterval))
log.WithFields(log.Fields{
"name": res.Name,
"wait": sleepDuration.String(),
"poll": pollInterval.String(),
}).Info("waiting for next poll")
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(sleepDuration):
}
log.WithFields(log.Fields{
"name": res.Name,
"poll": pollScheduleStr,
}).Info("waiting for next scheduled poll")
if err := schedule.WaitForNext(ctx, pollSchedule, state.LastPolled); err != nil {
return err
}
log.WithFields(log.Fields{
"name": res.Name,
"poll": pollInterval.String(),
"poll": pollScheduleStr,
"prev": state.LastPolled.Format(time.RFC3339Nano),
}).Info("ready to poll")

Expand Down
14 changes: 7 additions & 7 deletions source-bigquery-batch/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ import (
"encoding/json"
"fmt"
"strings"
"time"

"cloud.google.com/go/bigquery"
"github.com/estuary/connectors/go/schedule"
schemagen "github.com/estuary/connectors/go/schema-gen"
boilerplate "github.com/estuary/connectors/source-boilerplate"
log "github.com/sirupsen/logrus"
Expand All @@ -25,7 +25,7 @@ type Config struct {
}

type advancedConfig struct {
PollInterval string `json:"poll,omitempty" jsonschema:"title=Default Poll Interval,description=How often to execute fetch queries. Defaults to 24 hours if unset."`
PollSchedule string `json:"poll,omitempty" jsonschema:"title=Default Polling Schedule,description=When and how often to execute fetch queries. Defaults to '24h' if unset."`
}

// Validate checks that the configuration possesses all required properties.
Expand All @@ -46,18 +46,18 @@ func (c *Config) Validate() error {
if !json.Valid([]byte(c.CredentialsJSON)) {
return fmt.Errorf("service account credentials must be valid JSON, and the provided credentials were not")
}
if c.Advanced.PollInterval != "" {
if _, err := time.ParseDuration(c.Advanced.PollInterval); err != nil {
return fmt.Errorf("invalid default poll interval %q: %w", c.Advanced.PollInterval, err)
if c.Advanced.PollSchedule != "" {
if err := schedule.Validate(c.Advanced.PollSchedule); err != nil {
return fmt.Errorf("invalid default polling schedule %q: %w", c.Advanced.PollSchedule, err)
}
}
return nil
}

// SetDefaults fills in the default values for unset optional parameters.
func (c *Config) SetDefaults() {
if c.Advanced.PollInterval == "" {
c.Advanced.PollInterval = "24h"
if c.Advanced.PollSchedule == "" {
c.Advanced.PollSchedule = "24h"
}
}

Expand Down
2 changes: 1 addition & 1 deletion source-bigquery-batch/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ func testCaptureSpec(t testing.TB) *st.CaptureSpec {
ProjectID: *projectID,
Dataset: *testDataset,
Advanced: advancedConfig{
PollInterval: "200ms",
PollSchedule: "200ms",
},
}

Expand Down
5 changes: 2 additions & 3 deletions source-mysql-batch/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -669,8 +669,8 @@ func (c *capture) poll(ctx context.Context, binding *bindingInfo, tmpl *template
"CursorFields": quotedCursorNames,
}

// Polling interval can be configured per binding. If unset, falls back to the
// connector global polling interval.
// Polling schedule can be configured per binding. If unset, falls back to the
// connector global polling schedule.
var pollScheduleStr = c.Config.Advanced.PollSchedule
if res.PollSchedule != "" {
pollScheduleStr = res.PollSchedule
Expand All @@ -679,7 +679,6 @@ func (c *capture) poll(ctx context.Context, binding *bindingInfo, tmpl *template
if err != nil {
return fmt.Errorf("failed to parse polling schedule %q: %w", pollScheduleStr, err)
}

log.WithFields(log.Fields{
"name": res.Name,
"poll": pollScheduleStr,
Expand Down
8 changes: 4 additions & 4 deletions source-postgres-batch/.snapshots/TestSpec
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@
"properties": {
"poll": {
"type": "string",
"title": "Default Poll Interval",
"description": "How often to execute fetch queries. Defaults to 5 minutes if unset."
"title": "Default Polling Schedule",
"description": "When and how often to execute fetch queries. Defaults to '5m' if unset."
},
"sslmode": {
"type": "string",
Expand Down Expand Up @@ -91,8 +91,8 @@
},
"poll": {
"type": "string",
"title": "Poll Interval",
"description": "How often to execute the fetch query (overrides the connector default setting)",
"title": "Polling Schedule",
"description": "When and how often to execute the fetch query (overrides the connector default setting)",
"order": 1
}
},
Expand Down
47 changes: 20 additions & 27 deletions source-postgres-batch/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"time"

"github.com/estuary/connectors/go/encrow"
"github.com/estuary/connectors/go/schedule"
schemagen "github.com/estuary/connectors/go/schema-gen"
boilerplate "github.com/estuary/connectors/source-boilerplate"
pc "github.com/estuary/flow/go/protocols/capture"
Expand Down Expand Up @@ -47,7 +48,7 @@ type Resource struct {
Name string `json:"name" jsonschema:"title=Name,description=The unique name of this resource." jsonschema_extras:"order=0"`
Template string `json:"template" jsonschema:"title=Query Template,description=The query template (pkg.go.dev/text/template) which will be rendered and then executed." jsonschema_extras:"multiline=true,order=3"`
Cursor []string `json:"cursor" jsonschema:"title=Cursor Columns,description=The names of columns which should be persisted between query executions as a cursor." jsonschema_extras:"order=2"`
PollInterval string `json:"poll,omitempty" jsonschema:"title=Poll Interval,description=How often to execute the fetch query (overrides the connector default setting)" jsonschema_extras:"order=1"`
PollSchedule string `json:"poll,omitempty" jsonschema:"title=Polling Schedule,description=When and how often to execute the fetch query (overrides the connector default setting)" jsonschema_extras:"order=1"`
}

// Validate checks that the resource spec possesses all required properties.
Expand All @@ -67,9 +68,9 @@ func (r Resource) Validate() error {
if slices.Contains(r.Cursor, "") {
return fmt.Errorf("cursor column names can't be empty (got %q)", r.Cursor)
}
if r.PollInterval != "" {
if _, err := time.ParseDuration(r.PollInterval); err != nil {
return fmt.Errorf("invalid poll interval %q: %w", r.PollInterval, err)
if r.PollSchedule != "" {
if err := schedule.Validate(r.PollSchedule); err != nil {
return fmt.Errorf("invalid polling schedule %q: %w", r.PollSchedule, err)
}
}
return nil
Expand Down Expand Up @@ -596,7 +597,7 @@ func (c *capture) worker(ctx context.Context, binding *bindingInfo) error {
"name": res.Name,
"tmpl": res.Template,
"cursor": res.Cursor,
"poll": res.PollInterval,
"poll": res.PollSchedule,
}).Info("starting worker")

var queryTemplate, err = template.New("query").Parse(res.Template)
Expand Down Expand Up @@ -632,34 +633,26 @@ func (c *capture) poll(ctx context.Context, binding *bindingInfo, tmpl *template
"CursorFields": quotedCursorNames,
}

// Polling interval can be configured per binding. If unset, falls back to the
// connector global polling interval.
var pollStr = c.Config.Advanced.PollInterval
if res.PollInterval != "" {
pollStr = res.PollInterval
// Polling schedule can be configured per binding. If unset, falls back to the
// connector global polling schedule.
var pollScheduleStr = c.Config.Advanced.PollSchedule
if res.PollSchedule != "" {
pollScheduleStr = res.PollSchedule
}
pollInterval, err := time.ParseDuration(pollStr)
var pollSchedule, err = schedule.Parse(pollScheduleStr)
if err != nil {
return fmt.Errorf("invalid poll interval %q: %w", res.PollInterval, err)
return fmt.Errorf("failed to parse polling schedule %q: %w", pollScheduleStr, err)
}

// Sleep until it's been more than <pollInterval> since the last iteration.
if !state.LastPolled.IsZero() && time.Since(state.LastPolled) < pollInterval {
var sleepDuration = time.Until(state.LastPolled.Add(pollInterval))
log.WithFields(log.Fields{
"name": res.Name,
"wait": sleepDuration.String(),
"poll": pollInterval.String(),
}).Info("waiting for next poll")
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(sleepDuration):
}
log.WithFields(log.Fields{
"name": res.Name,
"poll": pollScheduleStr,
}).Info("waiting for next scheduled poll")
if err := schedule.WaitForNext(ctx, pollSchedule, state.LastPolled); err != nil {
return err
}
log.WithFields(log.Fields{
"name": res.Name,
"poll": pollInterval.String(),
"poll": pollScheduleStr,
"prev": state.LastPolled.Format(time.RFC3339Nano),
}).Info("ready to poll")

Expand Down
14 changes: 7 additions & 7 deletions source-postgres-batch/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ import (
"fmt"
"net/url"
"strings"
"time"

"github.com/estuary/connectors/go/schedule"
schemagen "github.com/estuary/connectors/go/schema-gen"
boilerplate "github.com/estuary/connectors/source-boilerplate"
log "github.com/sirupsen/logrus"
Expand All @@ -27,7 +27,7 @@ type Config struct {
}

type advancedConfig struct {
PollInterval string `json:"poll,omitempty" jsonschema:"title=Default Poll Interval,description=How often to execute fetch queries. Defaults to 5 minutes if unset."`
PollSchedule string `json:"poll,omitempty" jsonschema:"title=Default Polling Schedule,description=When and how often to execute fetch queries. Defaults to '5m' if unset."`
SSLMode string `json:"sslmode,omitempty" jsonschema:"title=SSL Mode,description=Overrides SSL connection behavior by setting the 'sslmode' parameter.,enum=disable,enum=allow,enum=prefer,enum=require,enum=verify-ca,enum=verify-full"`
}

Expand All @@ -43,9 +43,9 @@ func (c *Config) Validate() error {
return fmt.Errorf("missing '%s'", req[0])
}
}
if c.Advanced.PollInterval != "" {
if _, err := time.ParseDuration(c.Advanced.PollInterval); err != nil {
return fmt.Errorf("invalid default poll interval %q: %w", c.Advanced.PollInterval, err)
if c.Advanced.PollSchedule != "" {
if err := schedule.Validate(c.Advanced.PollSchedule); err != nil {
return fmt.Errorf("invalid default polling schedule %q: %w", c.Advanced.PollSchedule, err)
}
}
return nil
Expand All @@ -60,8 +60,8 @@ func (c *Config) SetDefaults() {
c.Address += ":5432"
}

if c.Advanced.PollInterval == "" {
c.Advanced.PollInterval = "5m"
if c.Advanced.PollSchedule == "" {
c.Advanced.PollSchedule = "5m"
}
}

Expand Down
2 changes: 1 addition & 1 deletion source-postgres-batch/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func testCaptureSpec(t testing.TB) *st.CaptureSpec {
Password: *dbCapturePass,
Database: *dbName,
Advanced: advancedConfig{
PollInterval: "200ms",
PollSchedule: "200ms",
},
}

Expand Down

0 comments on commit 92b645b

Please sign in to comment.