From 4c6ae3cbb4e5ed7b8d866535986697310b0e8bf5 Mon Sep 17 00:00:00 2001 From: Will Baker Date: Wed, 26 Feb 2025 13:03:30 -0500 Subject: [PATCH] materialize-boilerplate: fix staged files cleanup again In f4a82c4 a partial fix was implemented to prevent attempts to cleanup the same files again when a binding had data for one but not a subsequent transaction. That change fixed that issue, but it also made it so that staged files just never got cleaned up, because `f.started` is set to `false` by `f.flush()`, which is always called before the cleanup. A complete fix is here, which clears out the list of uploaded files after sending them off for deletion. --- materialize-boilerplate/staged_files.go | 30 ++++++++++++------------- 1 file changed, 14 insertions(+), 16 deletions(-) diff --git a/materialize-boilerplate/staged_files.go b/materialize-boilerplate/staged_files.go index e781e001fe..55c8ad6164 100644 --- a/materialize-boilerplate/staged_files.go +++ b/materialize-boilerplate/staged_files.go @@ -46,7 +46,6 @@ type StagedFiles[T any] struct { flushOnNextBinding bool stagedFiles []stagedFile[T] lastBinding int - didCleanupCurrent bool } // NewStagedFiles creates a StagedFiles instance, which is used for staging data @@ -94,7 +93,6 @@ func (sf *StagedFiles[T]) EncodeRow(ctx context.Context, binding int, row []any) } } sf.lastBinding = binding - sf.didCleanupCurrent = false return sf.stagedFiles[binding].encodeRow(ctx, row) } @@ -110,24 +108,24 @@ func (sf *StagedFiles[T]) Flush(binding int) ([]string, error) { // to use both as part of a deferred call, and an inline call to check for // deletion errors. func (sf *StagedFiles[T]) CleanupCurrentTransaction(ctx context.Context) error { - if !sf.didCleanupCurrent { - sf.didCleanupCurrent = true - - var uris []string - for _, f := range sf.stagedFiles { - if !f.started { - continue - } - - for _, u := range f.uploaded { - uris = append(uris, sf.client.URI(u)) - } + var uris []string + for i := range sf.stagedFiles { + f := &sf.stagedFiles[i] + if len(f.uploaded) == 0 { + continue } - return sf.client.Delete(ctx, uris) + for _, u := range f.uploaded { + uris = append(uris, sf.client.URI(u)) + } + f.uploaded = nil } - return nil + if len(uris) == 0 { + return nil + } + + return sf.client.Delete(ctx, uris) } // CleanupCheckpoint deletes files specified int he list of URIs. This can be