Skip to content

Commit 0f272ce

Browse files
authored
Merge pull request #390 from nobl9/fix-table-load-race
Fix Table Setup race condition
2 parents bbfecd8 + 6766232 commit 0f272ce

File tree

2 files changed

+6
-28
lines changed

2 files changed

+6
-28
lines changed

partition_table.go

+5-27
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,6 @@ const (
1818

1919
// internal offset we use to detect if the offset has never been stored locally
2020
offsetNotStored int64 = -3
21-
22-
consumerDrainTimeout = time.Second
2321
)
2422

2523
// Backoff is used for adding backoff capabilities to the restarting
@@ -404,42 +402,22 @@ func (p *PartitionTable) markRecovered(ctx context.Context) error {
404402
}
405403

406404
func (p *PartitionTable) drainConsumer(cons sarama.PartitionConsumer) error {
407-
timeoutCtx, cancel := context.WithTimeout(context.Background(), consumerDrainTimeout)
408-
defer cancel()
409-
410405
errg, _ := multierr.NewErrGroup(context.Background())
411406

412407
// drain errors channel
413408
errg.Go(func() error {
414409
var errs *multierror.Error
415-
416-
for {
417-
select {
418-
case <-timeoutCtx.Done():
419-
p.log.Printf("draining errors channel timed out")
420-
return errs
421-
case err, ok := <-cons.Errors():
422-
if !ok {
423-
return errs
424-
}
425-
errs = multierror.Append(errs, err)
426-
}
410+
for err := range cons.Errors() {
411+
errs = multierror.Append(errs, err)
427412
}
413+
return errs
428414
})
429415

430416
// drain message channel
431417
errg.Go(func() error {
432-
for {
433-
select {
434-
case <-timeoutCtx.Done():
435-
p.log.Printf("draining messages channel timed out")
436-
return nil
437-
case _, ok := <-cons.Messages():
438-
if !ok {
439-
return nil
440-
}
441-
}
418+
for range cons.Messages() {
442419
}
420+
return nil
443421
})
444422

445423
return errg.Wait().ErrorOrNil()

systemtest/processor_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -447,7 +447,7 @@ func TestRebalanceSharePartitions(t *testing.T) {
447447
require.Equal(t, 0, p1Passive)
448448

449449
p2, cancelP2, p2Done := runProc(createProc())
450-
pollTimed(t, "p2 started", 10, p2.Recovered)
450+
pollTimed(t, "p2 started", 20, p2.Recovered)
451451
pollTimed(t, "p1 still running", 10, p1.Recovered)
452452

453453
// now p1 and p2 share the partitions

0 commit comments

Comments
 (0)