Skip to content

Commit

Permalink
Merge pull request #390 from nobl9/fix-table-load-race
Browse files Browse the repository at this point in the history
Fix Table Setup race condition
  • Loading branch information
mmreza79 authored Jul 12, 2022
2 parents bbfecd8 + 6766232 commit 0f272ce
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 28 deletions.
32 changes: 5 additions & 27 deletions partition_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ const (

// internal offset we use to detect if the offset has never been stored locally
offsetNotStored int64 = -3

consumerDrainTimeout = time.Second
)

// Backoff is used for adding backoff capabilities to the restarting
Expand Down Expand Up @@ -404,42 +402,22 @@ func (p *PartitionTable) markRecovered(ctx context.Context) error {
}

func (p *PartitionTable) drainConsumer(cons sarama.PartitionConsumer) error {
timeoutCtx, cancel := context.WithTimeout(context.Background(), consumerDrainTimeout)
defer cancel()

errg, _ := multierr.NewErrGroup(context.Background())

// drain errors channel
errg.Go(func() error {
var errs *multierror.Error

for {
select {
case <-timeoutCtx.Done():
p.log.Printf("draining errors channel timed out")
return errs
case err, ok := <-cons.Errors():
if !ok {
return errs
}
errs = multierror.Append(errs, err)
}
for err := range cons.Errors() {
errs = multierror.Append(errs, err)
}
return errs
})

// drain message channel
errg.Go(func() error {
for {
select {
case <-timeoutCtx.Done():
p.log.Printf("draining messages channel timed out")
return nil
case _, ok := <-cons.Messages():
if !ok {
return nil
}
}
for range cons.Messages() {
}
return nil
})

return errg.Wait().ErrorOrNil()
Expand Down
2 changes: 1 addition & 1 deletion systemtest/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,7 @@ func TestRebalanceSharePartitions(t *testing.T) {
require.Equal(t, 0, p1Passive)

p2, cancelP2, p2Done := runProc(createProc())
pollTimed(t, "p2 started", 10, p2.Recovered)
pollTimed(t, "p2 started", 20, p2.Recovered)
pollTimed(t, "p1 still running", 10, p1.Recovered)

// now p1 and p2 share the partitions
Expand Down

0 comments on commit 0f272ce

Please sign in to comment.