Skip to content

Commit

Permalink
materialize-boilerplate: fix staged files cleanup again
Browse files Browse the repository at this point in the history
In f4a82c4 a partial fix was implemented to prevent attempts to cleanup the same
files again when a binding had data for one but not a subsequent transaction.

That change fixed that issue, but it also made it so that staged files just
never got cleaned up, because `f.started` is set to `false` by `f.flush()`,
which is always called before the cleanup.

A complete fix is here, which clears out the list of uploaded files after
sending them off for deletion.
  • Loading branch information
williamhbaker committed Feb 27, 2025
1 parent 49f5235 commit 4c6ae3c
Showing 1 changed file with 14 additions and 16 deletions.
30 changes: 14 additions & 16 deletions materialize-boilerplate/staged_files.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ type StagedFiles[T any] struct {
flushOnNextBinding bool
stagedFiles []stagedFile[T]
lastBinding int
didCleanupCurrent bool
}

// NewStagedFiles creates a StagedFiles instance, which is used for staging data
Expand Down Expand Up @@ -94,7 +93,6 @@ func (sf *StagedFiles[T]) EncodeRow(ctx context.Context, binding int, row []any)
}
}
sf.lastBinding = binding
sf.didCleanupCurrent = false

return sf.stagedFiles[binding].encodeRow(ctx, row)
}
Expand All @@ -110,24 +108,24 @@ func (sf *StagedFiles[T]) Flush(binding int) ([]string, error) {
// to use both as part of a deferred call, and an inline call to check for
// deletion errors.
func (sf *StagedFiles[T]) CleanupCurrentTransaction(ctx context.Context) error {
if !sf.didCleanupCurrent {
sf.didCleanupCurrent = true

var uris []string
for _, f := range sf.stagedFiles {
if !f.started {
continue
}

for _, u := range f.uploaded {
uris = append(uris, sf.client.URI(u))
}
var uris []string
for i := range sf.stagedFiles {
f := &sf.stagedFiles[i]
if len(f.uploaded) == 0 {
continue
}

return sf.client.Delete(ctx, uris)
for _, u := range f.uploaded {
uris = append(uris, sf.client.URI(u))
}
f.uploaded = nil
}

return nil
if len(uris) == 0 {
return nil
}

return sf.client.Delete(ctx, uris)
}

// CleanupCheckpoint deletes files specified int he list of URIs. This can be
Expand Down

0 comments on commit 4c6ae3c

Please sign in to comment.