Skip to content

Commit

Permalink
fix: use one Kafka client for consumer/producer
Browse files Browse the repository at this point in the history
  • Loading branch information
alexec committed Aug 6, 2021
1 parent d6e01d2 commit 1b18462
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 20 deletions.
18 changes: 1 addition & 17 deletions runner/sidecar/shared/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"crypto/tls"
"crypto/x509"
"fmt"
sharedutil "github.com/argoproj-labs/argo-dataflow/shared/util"
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"time"

Expand All @@ -14,30 +13,15 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

var (
cache = make(map[string]sarama.Client) // hash(config) -> client
logger = sharedutil.NewLogger()
)

func GetClient(ctx context.Context, secretInterface corev1.SecretInterface, k dfv1.KafkaConfig, startOffset string) (*sarama.Config, sarama.Client, error) {
func GetClient(ctx context.Context, secretInterface corev1.SecretInterface, k dfv1.KafkaConfig) (*sarama.Config, sarama.Client, error) {
config, err := newConfig(ctx, secretInterface, k)
if err != nil {
return nil, nil, err
}
if startOffset == "First" {
config.Consumer.Offsets.Initial = sarama.OffsetOldest
}
key := sharedutil.MustHash(k)
if client, ok := cache[key]; ok {
logger.Info("cache hit: reusing existing Kafka client")
return config, client, nil
}
logger.Info("cache miss: creating new Kafka client")
client, err := sarama.NewClient(k.Brokers, config)
if err != nil {
return nil, nil, err
}
cache[key] = client
return config, client, err
}

Expand Down
2 changes: 1 addition & 1 deletion runner/sidecar/sink/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ type kafkaSink struct {
}

func New(ctx context.Context, secretInterface corev1.SecretInterface, x dfv1.Kafka) (sink.Interface, error) {
_, client, err := kafka.GetClient(ctx, secretInterface, x.KafkaConfig, "")
_, client, err := kafka.GetClient(ctx, secretInterface, x.KafkaConfig)
if err != nil {
return nil, err
}
Expand Down
8 changes: 6 additions & 2 deletions runner/sidecar/source/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,13 @@ func (h handler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.Con
}

func New(ctx context.Context, secretInterface corev1.SecretInterface, clusterName, namespace, pipelineName, stepName, sourceName string, x dfv1.KafkaSource, f source.Func) (source.Interface, error) {
config, client, err := kafka.GetClient(ctx, secretInterface, x.Kafka.KafkaConfig, string(x.StartOffset))
config, client, err := kafka.GetClient(ctx, secretInterface, x.Kafka.KafkaConfig)
if err != nil {
return nil, err
}
if x.StartOffset == "First" {
config.Consumer.Offsets.Initial = sarama.OffsetOldest
}
// This ID can be up to 255 characters in length, and can include the following characters: a-z, A-Z, 0-9, . (dot), _ (underscore), and - (dash).
groupID := sharedutil.MustHash(fmt.Sprintf("%s.%s.%s.%s.sources.%s", clusterName, namespace, pipelineName, stepName, sourceName))
logger.Info("Kafka consumer group ID", "groupID", groupID)
Expand All @@ -64,10 +67,11 @@ func New(ctx context.Context, secretInterface corev1.SecretInterface, clusterNam
for {
logger.Info("starting Kafka consumption", "source", sourceName)
if err := consumerGroup.Consume(ctx, []string{x.Topic}, h); err != nil {
logger.Error(err, "failed to consume kafka topic", "source", sourceName)
if err == sarama.ErrClosedConsumerGroup {
logger.Info("failed to consume kafka topic", "error", err)
return
}
logger.Error(err, "failed to consume kafka topic", "source", sourceName)
}
}
}, 3*time.Second, 1.2, true, ctx.Done())
Expand Down

0 comments on commit 1b18462

Please sign in to comment.