Skip to content
This repository has been archived by the owner on Jun 4, 2021. It is now read-only.

Commit

Permalink
Fixed Kafka disconnection issue (#1179)
Browse files Browse the repository at this point in the history
* Mhhh ok that's how it should be fixed

Signed-off-by: Francesco Guardiani <[email protected]>

* Fixed data race

Signed-off-by: Francesco Guardiani <[email protected]>

* Implemented propagation of Close()

Signed-off-by: Francesco Guardiani <[email protected]>
  • Loading branch information
slinkydeveloper authored Apr 30, 2020
1 parent 560f979 commit f05867c
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 12 deletions.
14 changes: 14 additions & 0 deletions kafka/channel/pkg/dispatcher/dispatcher_it_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,20 @@ func TestDispatcher(t *testing.T) {
deadLetterWg.Wait()
transformationsWg.Wait()
receiverWg.Wait()

// Try to close consumer groups
err = dispatcher.UpdateHostToChannelMap(&multichannelfanout.Config{})
if err != nil {
t.Fatal(err)
}

failed, err = dispatcher.UpdateKafkaConsumers(&multichannelfanout.Config{})
if err != nil {
t.Fatal(err)
}
if len(failed) != 0 {
t.Fatal(err)
}
}

func createReverseProxy(t *testing.T, host string) *httputil.ReverseProxy {
Expand Down
26 changes: 21 additions & 5 deletions kafka/common/pkg/kafka/consumer_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,13 @@ type kafkaConsumerGroupFactoryImpl struct {
}

type customConsumerGroup struct {
closeFn context.CancelFunc
handlerErrorChannel chan error
sarama.ConsumerGroup
}

// Merge handler errors chan and consumer group error chan
func (c customConsumerGroup) Errors() <-chan error {
func (c *customConsumerGroup) Errors() <-chan error {
errors := make(chan error, 10)
var wg sync.WaitGroup
wg.Add(2)
Expand All @@ -65,6 +66,11 @@ func (c customConsumerGroup) Errors() <-chan error {
return errors
}

func (c *customConsumerGroup) Close() error {
c.closeFn()
return c.ConsumerGroup.Close()
}

var _ sarama.ConsumerGroup = (*customConsumerGroup)(nil)

func (c kafkaConsumerGroupFactoryImpl) StartConsumerGroup(groupID string, topics []string, logger *zap.Logger, handler KafkaConsumerHandler) (sarama.ConsumerGroup, error) {
Expand All @@ -76,14 +82,24 @@ func (c kafkaConsumerGroupFactoryImpl) StartConsumerGroup(groupID string, topics

consumerHandler := NewConsumerHandler(logger, handler)

ctx, cancelFn := context.WithCancel(context.Background())

go func() {
err2 := consumerGroup.Consume(context.TODO(), topics, &consumerHandler)
if err2 != nil {
consumerHandler.errors <- err2
for {
if err2 := consumerGroup.Consume(context.Background(), topics, &consumerHandler); err2 != nil {
consumerHandler.errors <- err2
}
// Let's check if Consume stopped because of closing
select {
case <-ctx.Done():
return
default:
continue
}
}
}()

return customConsumerGroup{consumerHandler.errors, consumerGroup}, err
return &customConsumerGroup{cancelFn, consumerHandler.errors, consumerGroup}, err
}

func NewConsumerGroupFactory(client sarama.Client) KafkaConsumerGroupFactory {
Expand Down
19 changes: 12 additions & 7 deletions kafka/common/pkg/kafka/consumer_factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package kafka
import (
"context"
"errors"
"sync"
"testing"

"github.com/Shopify/sarama"
Expand All @@ -32,14 +33,17 @@ type mockConsumerGroup struct {
mustGenerateConsumerGroupError bool
mustGenerateHandlerError bool
consumeMustReturnError bool
generateErrorOnce sync.Once
}

func (m mockConsumerGroup) Consume(ctx context.Context, topics []string, handler sarama.ConsumerGroupHandler) error {
func (m *mockConsumerGroup) Consume(ctx context.Context, topics []string, handler sarama.ConsumerGroupHandler) error {
if m.mustGenerateHandlerError {
go func() {
h := handler.(*saramaConsumerHandler)
h.errors <- errors.New("cgh")
_ = h.Cleanup(nil)
m.generateErrorOnce.Do(func() {
h := handler.(*saramaConsumerHandler)
h.errors <- errors.New("cgh")
_ = h.Cleanup(nil)
})
}()
}
if m.consumeMustReturnError {
Expand All @@ -48,7 +52,7 @@ func (m mockConsumerGroup) Consume(ctx context.Context, topics []string, handler
return nil
}

func (m mockConsumerGroup) Errors() <-chan error {
func (m *mockConsumerGroup) Errors() <-chan error {
ch := make(chan error)
go func() {
if m.mustGenerateConsumerGroupError {
Expand All @@ -59,18 +63,19 @@ func (m mockConsumerGroup) Errors() <-chan error {
return ch
}

func (m mockConsumerGroup) Close() error {
func (m *mockConsumerGroup) Close() error {
return nil
}

func mockedNewConsumerGroupFromClient(mockInputMessageCh chan *sarama.ConsumerMessage, mustGenerateConsumerGroupError bool, mustGenerateHandlerError bool, consumeMustReturnError bool, mustFail bool) func(groupID string, client sarama.Client) (group sarama.ConsumerGroup, e error) {
if !mustFail {
return func(groupID string, client sarama.Client) (group sarama.ConsumerGroup, e error) {
return mockConsumerGroup{
return &mockConsumerGroup{
mockInputMessageCh: mockInputMessageCh,
mustGenerateConsumerGroupError: mustGenerateConsumerGroupError,
mustGenerateHandlerError: mustGenerateHandlerError,
consumeMustReturnError: consumeMustReturnError,
generateErrorOnce: sync.Once{},
}, nil
}
} else {
Expand Down

0 comments on commit f05867c

Please sign in to comment.