diff --git a/runner/sidecar/source/kafka/kafka.go b/runner/sidecar/source/kafka/kafka.go index 099ebe97..ca0c89d4 100644 --- a/runner/sidecar/source/kafka/kafka.go +++ b/runner/sidecar/source/kafka/kafka.go @@ -28,6 +28,7 @@ type kafkaSource struct { consumer *kafka.Consumer topic string wg *sync.WaitGroup + mu sync.Mutex // for channels channels map[int32]chan *kafka.Message process source.Process totalLag int64 @@ -72,6 +73,7 @@ func New(ctx context.Context, secretInterface corev1.SecretInterface, mntr monit sourceURN: sourceURN, consumer: consumer, topic: x.Topic, + mu: sync.Mutex{}, channels: map[int32]chan *kafka.Message{}, // partition -> messages wg: &sync.WaitGroup{}, process: process, @@ -107,6 +109,8 @@ func (s *kafkaSource) processMessage(ctx context.Context, msg *kafka.Message) er func (s *kafkaSource) assignedPartition(ctx context.Context, partition int32) { logger := s.logger.WithValues("partition", partition) + s.mu.Lock() + defer s.mu.Unlock() if _, ok := s.channels[partition]; !ok { logger.Info("assigned partition") s.channels[partition] = make(chan *kafka.Message, 256) @@ -117,6 +121,8 @@ func (s *kafkaSource) assignedPartition(ctx context.Context, partition int32) { } func (s *kafkaSource) revokedPartition(partition int32) { + s.mu.Lock() + defer s.mu.Unlock() if _, ok := s.channels[partition]; ok { s.logger.Info("revoked partition", "partition", partition) close(s.channels[partition]) @@ -168,10 +174,12 @@ func (s *kafkaSource) startPollLoop(ctx context.Context) { func (s *kafkaSource) Close() error { s.logger.Info("closing partition channels") + s.mu.Lock() for key, ch := range s.channels { delete(s.channels, key) close(ch) } + s.mu.Unlock() s.logger.Info("waiting for partition consumers to finish") s.wg.Wait() s.logger.Info("closing consumer")