From f0789345dab8cb42671e31ca916cf4e9bd13b186 Mon Sep 17 00:00:00 2001 From: Franz Eichhorn Date: Tue, 28 Jun 2022 16:58:23 +0200 Subject: [PATCH] bugfix duplicate channel creation on every message visit (#387) --- partition_processor.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/partition_processor.go b/partition_processor.go index 999f5c98..e96ecb18 100644 --- a/partition_processor.go +++ b/partition_processor.go @@ -706,12 +706,13 @@ func (pp *PartitionProcessor) VisitValues(ctx context.Context, name string, meta } defer it.Release() + stopping := pp.stopping() for it.Next() { // add one that we were able to be put into the queue. // wg.Done will be called by the visit handler as commit wg.Add(1) select { - case <-pp.stopping(): + case <-stopping: drainVisitInput() wg.Done() return ErrVisitAborted @@ -741,7 +742,7 @@ func (pp *PartitionProcessor) VisitValues(ctx context.Context, name string, meta wg.Wait() }() select { - case <-pp.stopping(): + case <-stopping: drainVisitInput() return ErrVisitAborted case <-ctx.Done():