Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

materialize-mysql: flush load batches when bindings change #1297

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions materialize-mysql/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ WORKDIR /builder
# Download & compile dependencies early. Doing this separately allows for layer
# caching opportunities when no dependencies are updated.
COPY go.* ./
RUN go mod download
# RUN go mod download

# Copy in the latest flowctl-go for usage by tests.
COPY flow-bin/flowctl-go /usr/local/bin/flowctl-go
Expand All @@ -21,9 +21,11 @@ COPY materialize-sql ./materialize-sql
COPY testsupport ./testsupport

# Test and build the connector.
RUN go test -tags nozstd -v ./materialize-sql/...
RUN go test -tags nozstd,nodb -v ./materialize-mysql/...
RUN go build -tags nozstd -v -o ./connector ./materialize-mysql/...
# RUN go test -tags nozstd -v ./materialize-sql/...
# RUN go test -tags nozstd,nodb -v ./materialize-mysql/...
RUN --mount=type=cache,id=gomod,target=/go/pkg/mod \
--mount=type=cache,id=gobuild,target=/root/.cache/go-build \
go build -tags nozstd -v -o ./connector ./materialize-mysql/...

# Runtime Stage
################################################################################
Expand Down
56 changes: 38 additions & 18 deletions materialize-mysql/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -572,12 +572,12 @@ type batchMeta struct {
}

func (batch batchMeta) Write(converted []any) error {
record, err := rowToCSVRecord(converted)
if err != nil {
if record, err := rowToCSVRecord(converted); err != nil {
return fmt.Errorf("error encoding row to CSV: %w", err)
} else if err := batch.w.Write(record); err != nil {
return fmt.Errorf("writing csv record: %w", err)
}

batch.w.Write(record)
batch.w.Flush()
if err := batch.w.Error(); err != nil {
return fmt.Errorf("writing csv to buffer: %w", err)
Expand All @@ -586,7 +586,7 @@ func (batch batchMeta) Write(converted []any) error {
return nil
}

func setupBatch(ctx context.Context, readerSuffix string) batchMeta {
func setupBatch(readerSuffix string) batchMeta {
var buff bytes.Buffer
var writer = csv.NewWriter(&buff)

Expand All @@ -604,7 +604,7 @@ func setupBatch(ctx context.Context, readerSuffix string) batchMeta {
}
}

func drainBatch(ctx context.Context, txn *stdsql.Tx, query string, batch batchMeta) error {
func drainBatch(ctx context.Context, txn *stdsql.Tx, query string, batch *batchMeta) error {
batch.w.Flush()

if err := batch.w.Error(); err != nil {
Expand All @@ -618,7 +618,7 @@ func drainBatch(ctx context.Context, txn *stdsql.Tx, query string, batch batchMe
return nil
}

func drainUpdateBatch(ctx context.Context, txn *stdsql.Tx, b *binding, batch batchMeta) error {
func drainUpdateBatch(ctx context.Context, txn *stdsql.Tx, b *binding, batch *batchMeta) error {
if err := drainBatch(ctx, txn, b.updateLoadSQL, batch); err != nil {
return fmt.Errorf("store batch update on %q: %w", b.target.Identifier, err)
}
Expand All @@ -643,10 +643,26 @@ func (d *transactor) Load(it *m.LoadIterator, loaded func(int, json.RawMessage)
}
defer txn.Rollback()

var lastBinding = -1
var batches = make(map[int]batchMeta)
for it.Next() {
if lastBinding == -1 {
lastBinding = it.Binding
}

var b = d.bindings[it.Binding]

if lastBinding != it.Binding {
// Drain the prior batch as naturally-ordered key groupings are cycled through.
lastBatch := batches[lastBinding]
if lastBatch.buff.Len() > 0 {
if err := drainBatch(ctx, txn, d.bindings[lastBinding].loadLoadSQL, &lastBatch); err != nil {
return fmt.Errorf("load batch insert on %q: %w", b.target.Identifier, err)
}
}
lastBinding = it.Binding
}

if converted, err := b.target.ConvertKey(it.Key); err != nil {
return fmt.Errorf("converting Load key: %w", err)
} else {
Expand Down Expand Up @@ -674,15 +690,16 @@ func (d *transactor) Load(it *m.LoadIterator, loaded func(int, json.RawMessage)
}

if _, ok := batches[it.Binding]; !ok {
batches[it.Binding] = setupBatch(ctx, fmt.Sprintf("load_%d", it.Binding))
batches[it.Binding] = setupBatch(fmt.Sprintf("load_%d", it.Binding))
}

if err := batches[it.Binding].Write(converted); err != nil {
return fmt.Errorf("load writing data to batch on %q: %w", b.target.Identifier, err)
}

if batches[it.Binding].buff.Len() > batchSizeThreshold {
if err := drainBatch(ctx, txn, b.loadLoadSQL, batches[it.Binding]); err != nil {
batch := batches[it.Binding]
if err := drainBatch(ctx, txn, b.loadLoadSQL, &batch); err != nil {
return fmt.Errorf("load batch insert on %q: %w", b.target.Identifier, err)
}
}
Expand All @@ -694,8 +711,8 @@ func (d *transactor) Load(it *m.LoadIterator, loaded func(int, json.RawMessage)
continue
}
var b = d.bindings[bindingIndex]

if err := drainBatch(ctx, txn, b.loadLoadSQL, batches[it.Binding]); err != nil {
batch := batches[it.Binding]
if err := drainBatch(ctx, txn, b.loadLoadSQL, &batch); err != nil {
return fmt.Errorf("load batch insert on %q: %w", b.target.Identifier, err)
}
}
Expand Down Expand Up @@ -772,14 +789,15 @@ func (d *transactor) Store(it *m.StoreIterator) (_ m.StartCommitFunc, err error)
var insert = inserts[lastBinding]
var b = d.bindings[lastBinding]
if insert.buff.Len() > 0 {
if err := drainBatch(ctx, txn, b.storeLoadSQL, insert); err != nil {
if err := drainBatch(ctx, txn, b.storeLoadSQL, &insert); err != nil {
return nil, fmt.Errorf("store batch insert on %q: %w", b.target.Identifier, err)
}
}

var update = updates[lastBinding]
if update.buff.Len() > 0 {
if err := drainUpdateBatch(ctx, txn, b, updates[lastBinding]); err != nil {
batch := updates[lastBinding]
if err := drainUpdateBatch(ctx, txn, b, &batch); err != nil {
return nil, fmt.Errorf("store batch update on %q: %w", b.target.Identifier, err)
}
}
Expand Down Expand Up @@ -817,8 +835,8 @@ func (d *transactor) Store(it *m.StoreIterator) (_ m.StartCommitFunc, err error)
}

if _, ok := inserts[it.Binding]; !ok {
inserts[it.Binding] = setupBatch(ctx, fmt.Sprintf("store_%d", it.Binding))
updates[it.Binding] = setupBatch(ctx, fmt.Sprintf("update_%d", it.Binding))
inserts[it.Binding] = setupBatch(fmt.Sprintf("store_%d", it.Binding))
updates[it.Binding] = setupBatch(fmt.Sprintf("update_%d", it.Binding))
}

if it.Exists {
Expand All @@ -827,7 +845,8 @@ func (d *transactor) Store(it *m.StoreIterator) (_ m.StartCommitFunc, err error)
}

if updates[it.Binding].buff.Len() > batchSizeThreshold {
if err := drainUpdateBatch(ctx, txn, b, updates[it.Binding]); err != nil {
batch := updates[it.Binding]
if err := drainUpdateBatch(ctx, txn, b, &batch); err != nil {
return nil, fmt.Errorf("store batch update on %q: %w", b.target.Identifier, err)
}
}
Expand All @@ -837,7 +856,8 @@ func (d *transactor) Store(it *m.StoreIterator) (_ m.StartCommitFunc, err error)
}

if inserts[it.Binding].buff.Len() > batchSizeThreshold {
if err := drainBatch(ctx, txn, b.storeLoadSQL, inserts[it.Binding]); err != nil {
batch := inserts[it.Binding]
if err := drainBatch(ctx, txn, b.storeLoadSQL, &batch); err != nil {
return nil, fmt.Errorf("store batch insert on %q: %w", b.target.Identifier, err)
}
}
Expand All @@ -850,7 +870,7 @@ func (d *transactor) Store(it *m.StoreIterator) (_ m.StartCommitFunc, err error)
}

var b = d.bindings[bindingIndex]
if err := drainBatch(ctx, txn, b.storeLoadSQL, insert); err != nil {
if err := drainBatch(ctx, txn, b.storeLoadSQL, &insert); err != nil {
return nil, fmt.Errorf("store batch insert on %q: %w", b.target.Identifier, err)
}
}
Expand All @@ -861,7 +881,7 @@ func (d *transactor) Store(it *m.StoreIterator) (_ m.StartCommitFunc, err error)
}

var b = d.bindings[bindingIndex]
if err := drainUpdateBatch(ctx, txn, b, update); err != nil {
if err := drainUpdateBatch(ctx, txn, b, &update); err != nil {
return nil, fmt.Errorf("store batch update on %q: %w", b.target.Identifier, err)
}
}
Expand Down
Loading
Loading