Skip to content

Commit

Permalink
materialize-sql: update_delay variant for idempotent apply pattern
Browse files Browse the repository at this point in the history
  • Loading branch information
mdibaiee committed Jan 24, 2024
1 parent 708a0b7 commit 493529e
Show file tree
Hide file tree
Showing 3 changed files with 141 additions and 71 deletions.
42 changes: 22 additions & 20 deletions materialize-databricks/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,9 @@ type transactor struct {
store struct {
conn *stdsql.Conn
round int
// number of documents stored in current transaction
// set to StoreIterator.Total
stored int
}
bindings []*binding

Expand Down Expand Up @@ -511,12 +514,9 @@ func (d *transactor) Store(it *m.StoreIterator) (_ m.StartCommitFunc, err error)
if err != nil {
return nil, m.FinishedOperation(fmt.Errorf("creating checkpoint json: %w", err))
}
var commitOp = func(ctx context.Context) error {
<-ctx.Done()
return ctx.Err()
}

return &pf.ConnectorState{UpdatedJson: checkpointJSON}, sql.CommitWithDelay(ctx, d.store.round, d.updateDelay, it.Total, commitOp)
d.store.stored = it.Total
return &pf.ConnectorState{UpdatedJson: checkpointJSON}, nil
}, nil
}

Expand All @@ -530,27 +530,29 @@ func renderWithFiles(tpl string, files ...string) string {
// Acknowledge merges data from temporary table to main table
// TODO: run these queries concurrently for improved performance
func (d *transactor) Acknowledge(ctx context.Context) (*pf.ConnectorState, error) {
log.Info("store: starting committing changes")
for _, q := range d.cp.Queries {
if _, err := d.store.conn.ExecContext(ctx, q); err != nil {
// When doing a recovery apply, it may be the case that some tables & files have already been deleted after being applied
// it is okay to skip them in this case
if d.cpRecovery {
if strings.Contains(err.Error(), "PATH_NOT_FOUND") || strings.Contains(err.Error(), "Path does not exist") || strings.Contains(err.Error(), "Table doesn't exist") || strings.Contains(err.Error(), "TABLE_OR_VIEW_NOT_FOUND") {
continue
return sql.AcknowledgeWithDelay(ctx, d.store.round, d.updateDelay, d.store.stored, func(ctx context.Context) (*pf.ConnectorState, error) {
log.Info("store: starting committing changes")
for _, q := range d.cp.Queries {
if _, err := d.store.conn.ExecContext(ctx, q); err != nil {
// When doing a recovery apply, it may be the case that some tables & files have already been deleted after being applied
// it is okay to skip them in this case
if d.cpRecovery {
if strings.Contains(err.Error(), "PATH_NOT_FOUND") || strings.Contains(err.Error(), "Path does not exist") || strings.Contains(err.Error(), "Table doesn't exist") || strings.Contains(err.Error(), "TABLE_OR_VIEW_NOT_FOUND") {
continue
}
}
return nil, fmt.Errorf("query %q failed: %w", q, err)
}
return nil, fmt.Errorf("query %q failed: %w", q, err)
}
}
log.Info("store: finished committing changes")
log.Info("store: finished committing changes")

// Cleanup files and tables
d.deleteFiles(ctx, d.cp.ToDelete)
// Cleanup files and tables
d.deleteFiles(ctx, d.cp.ToDelete)

d.cpRecovery = false
d.cpRecovery = false

return nil, nil
return nil, nil
})
}

func (d *transactor) Destroy() {
Expand Down
106 changes: 56 additions & 50 deletions materialize-snowflake/snowflake.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,9 @@ type transactor struct {
conn *stdsql.Conn
fence *sql.Fence
round int
// number of documents stored in current transaction
// set to StoreIterator.Total
stored int
}
templates map[string]*template.Template
bindings []*binding
Expand Down Expand Up @@ -506,6 +509,7 @@ func (d *transactor) Store(it *m.StoreIterator) (m.StartCommitFunc, error) {
return nil, m.FinishedOperation(fmt.Errorf("creating checkpoint json: %w", err))
}

d.store.stored = it.Total
return &pf.ConnectorState{UpdatedJson: checkpointJSON, MergePatch: true}, nil
}, nil
}
Expand All @@ -516,62 +520,64 @@ func renderWithDir(tpl string, dir string) string {

// Acknowledge merges data from temporary table to main table
func (d *transactor) Acknowledge(ctx context.Context) (*pf.ConnectorState, error) {
var asyncCtx = sf.WithAsyncMode(ctx)
log.Info("store: starting committing changes")

// Run the queries using AsyncMode, which means that `ExecContext` will not block
// until the query is successful, rather we will store the results of these queries
// in a map so that we can then call `RowsAffected` on them, blocking until
// the queries are actually executed and done
var results = make(map[string]stdsql.Result)
for stateKey, item := range d.cp {
if len(item.Query) == 0 {
continue
}
// we skip queries that belong to tables which do not have a binding anymore
// since these tables might be deleted already
if !d.hasStateKey(stateKey) {
continue
}

log.WithField("table", item.Table).Info("store: starting query")
if result, err := d.store.conn.ExecContext(asyncCtx, item.Query); err != nil {
return nil, fmt.Errorf("query %q failed: %w", item.Query, err)
} else {
results[stateKey] = result
}
}
return sql.AcknowledgeWithDelay(ctx, d.store.round, d.updateDelay, d.store.stored, func(ctx context.Context) (*pf.ConnectorState, error) {
var asyncCtx = sf.WithAsyncMode(ctx)
log.Info("store: starting committing changes")

// Run the queries using AsyncMode, which means that `ExecContext` will not block
// until the query is successful, rather we will store the results of these queries
// in a map so that we can then call `RowsAffected` on them, blocking until
// the queries are actually executed and done
var results = make(map[string]stdsql.Result)
for stateKey, item := range d.cp {
if len(item.Query) == 0 {
continue
}
// we skip queries that belong to tables which do not have a binding anymore
// since these tables might be deleted already
if !d.hasStateKey(stateKey) {
continue
}

for stateKey, r := range results {
var item = d.cp[stateKey]
if _, err := r.RowsAffected(); err != nil {
return nil, fmt.Errorf("query failed: %w", err)
log.WithField("table", item.Table).Info("store: starting query")
if result, err := d.store.conn.ExecContext(asyncCtx, item.Query); err != nil {
return nil, fmt.Errorf("query %q failed: %w", item.Query, err)
} else {
results[stateKey] = result
}
}
log.WithField("table", item.Table).Info("store: finished query")

d.deleteFiles(ctx, []string{item.ToDelete})
}
for stateKey, r := range results {
var item = d.cp[stateKey]
if _, err := r.RowsAffected(); err != nil {
return nil, fmt.Errorf("query failed: %w", err)
}
log.WithField("table", item.Table).Info("store: finished query")

log.Info("store: finished committing changes")
d.deleteFiles(ctx, []string{item.ToDelete})
}

// After having applied the checkpoint, we try to clean up the checkpoint in the ack response
// so that a restart of the connector does not need to run the same queries again
// Note that this is an best-effort "attempt" and there is no guarantee that this checkpoint update
// can actually be committed
// Important to note that in this case we do not reset the checkpoint for all bindings, but only the ones
// that have been committed in this transaction. The reason is that it may be the case that a binding
// which has been disabled right after a failed attempt to run its queries, must be able to recover by enabling
// the binding and running the queries that are pending for its last transaction.
var checkpointClear = make(checkpoint)
for _, b := range d.bindings {
checkpointClear[b.target.StateKey] = nil
}
var checkpointJSON, err = json.Marshal(checkpointClear)
if err != nil {
return nil, fmt.Errorf("creating checkpoint clearing json: %w", err)
}
log.Info("store: finished committing changes")

// After having applied the checkpoint, we try to clean up the checkpoint in the ack response
// so that a restart of the connector does not need to run the same queries again
// Note that this is an best-effort "attempt" and there is no guarantee that this checkpoint update
// can actually be committed
// Important to note that in this case we do not reset the checkpoint for all bindings, but only the ones
// that have been committed in this transaction. The reason is that it may be the case that a binding
// which has been disabled right after a failed attempt to run its queries, must be able to recover by enabling
// the binding and running the queries that are pending for its last transaction.
var checkpointClear = make(checkpoint)
for _, b := range d.bindings {
checkpointClear[b.target.StateKey] = nil
}
var checkpointJSON, err = json.Marshal(checkpointClear)
if err != nil {
return nil, fmt.Errorf("creating checkpoint clearing json: %w", err)
}

return &pf.ConnectorState{UpdatedJson: json.RawMessage(checkpointJSON), MergePatch: true}, nil
return &pf.ConnectorState{UpdatedJson: json.RawMessage(checkpointJSON), MergePatch: true}, nil
})
}

func (d *transactor) hasStateKey(stateKey string) bool {
Expand Down
64 changes: 63 additions & 1 deletion materialize-sql/update_delay.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"time"

m "github.com/estuary/connectors/go/protocols/materialize"
pf "github.com/estuary/flow/go/protocols/flow"
log "github.com/sirupsen/logrus"
)

Expand Down Expand Up @@ -49,7 +50,9 @@ func CommitWithDelay(ctx context.Context, round int, delay time.Duration, stored
return err
}

if round == 1 {
// round zero may mean the transactor has not yet started a new transaction and is
// acknowledging the prior transaction
if round <= 1 {
// Always skip the delay on the first round of transactions, which is often an
// artificially small transaction of the immediately-ready documents.
log.Debug("will not delay commit acknowledgement of the first transaction")
Expand Down Expand Up @@ -81,6 +84,65 @@ func CommitWithDelay(ctx context.Context, round int, delay time.Duration, stored
})
}

// AcknowledgeWithDelay wraps a commitFn in a function that may add additional delay prior to
// returning. When used in transactor.Acknowledge from a materialization utilizing idempotent apply commits,
// this can spread out transaction processing and result in fewer, larger transactions which may be
// desirable to reduce warehouse compute costs or comply with rate limits. The delay is bypassed if
// the actual commit operation takes longer than the configured delay, or if the number of stored
// documents is large (see storeThreshold above).
//
// Delaying the return from this function delays acknowledgement back to the runtime that the commit
// has finished. The commit will still apply to the endpoint, but holding back the runtime
// acknowledgement will delay the start of the next transaction while allowing the runtime to
// continue combining over documents for the next transaction.
//
// It is always possible for a connector to restart between committing to the endpoint and sending
// the runtime acknowledgement of that commit. The chance of this happening is greater when
// intentionally adding a delay between these events. In the case of idempotent apply pattern
// connectors this means that on restart the first call to transactor.Acknowledge will attempt to
// commit the changes again (which may in this case be a no-op). This time since the round will be zero
// there will be no delay.
func AcknowledgeWithDelay(ctx context.Context, round int, delay time.Duration, stored int, commitFn func(context.Context) (*pf.ConnectorState, error)) (*pf.ConnectorState, error) {
started := time.Now()

state, err := commitFn(ctx)
if err != nil {
return nil, err
}

// round zero may mean the transactor has not yet started a new transaction and is
// acknowledging the prior transaction
if round <= 1 {
// Always skip the delay on the first round of transactions, which is often an
// artificially small transaction of the immediately-ready documents.
log.Debug("will not delay commit acknowledgement of the first transaction")
return state, nil
}

remainingDelay := delay - time.Since(started)

logEntry := log.WithFields(log.Fields{
"stored": stored,
"storedThreshold": storeThreshold,
"remainingDelay": remainingDelay.String(),
"configuredDelay": delay.String(),
})

if stored > storeThreshold || remainingDelay <= 0 {
logEntry.Debug("will acknowledge commit without further delay")
return state, nil
}

logEntry.Debug("delaying before acknowledging commit")

select {
case <-ctx.Done():
return nil, ctx.Err()
case <-time.After(remainingDelay):
return state, nil
}
}

// ParseDelay parses the delay Go duration string, returning and error if it is not valid, the
// parsed value if it is not an empty string, and the defaultUpdateDelay otherwise.
func ParseDelay(delay string) (time.Duration, error) {
Expand Down

0 comments on commit 493529e

Please sign in to comment.