diff --git a/materialize-boilerplate/staged_files.go b/materialize-boilerplate/staged_files.go index e781e001fe..55c8ad6164 100644 --- a/materialize-boilerplate/staged_files.go +++ b/materialize-boilerplate/staged_files.go @@ -46,7 +46,6 @@ type StagedFiles[T any] struct { flushOnNextBinding bool stagedFiles []stagedFile[T] lastBinding int - didCleanupCurrent bool } // NewStagedFiles creates a StagedFiles instance, which is used for staging data @@ -94,7 +93,6 @@ func (sf *StagedFiles[T]) EncodeRow(ctx context.Context, binding int, row []any) } } sf.lastBinding = binding - sf.didCleanupCurrent = false return sf.stagedFiles[binding].encodeRow(ctx, row) } @@ -110,24 +108,24 @@ func (sf *StagedFiles[T]) Flush(binding int) ([]string, error) { // to use both as part of a deferred call, and an inline call to check for // deletion errors. func (sf *StagedFiles[T]) CleanupCurrentTransaction(ctx context.Context) error { - if !sf.didCleanupCurrent { - sf.didCleanupCurrent = true - - var uris []string - for _, f := range sf.stagedFiles { - if !f.started { - continue - } - - for _, u := range f.uploaded { - uris = append(uris, sf.client.URI(u)) - } + var uris []string + for i := range sf.stagedFiles { + f := &sf.stagedFiles[i] + if len(f.uploaded) == 0 { + continue } - return sf.client.Delete(ctx, uris) + for _, u := range f.uploaded { + uris = append(uris, sf.client.URI(u)) + } + f.uploaded = nil } - return nil + if len(uris) == 0 { + return nil + } + + return sf.client.Delete(ctx, uris) } // CleanupCheckpoint deletes files specified int he list of URIs. This can be