From a618b38ab6d628cc87c80f365ce3f1cd7a717aab Mon Sep 17 00:00:00 2001 From: Alex Collins Date: Mon, 20 Sep 2021 14:44:53 -0700 Subject: [PATCH] fix: Partial revert of 76e1d95 (#360) Signed-off-by: Alex Collins --- runner/sidecar/source/kafka/handler.go | 55 ++++++-------------------- 1 file changed, 12 insertions(+), 43 deletions(-) diff --git a/runner/sidecar/source/kafka/handler.go b/runner/sidecar/source/kafka/handler.go index 68662256..175fe0af 100644 --- a/runner/sidecar/source/kafka/handler.go +++ b/runner/sidecar/source/kafka/handler.go @@ -3,32 +3,26 @@ package kafka import ( "context" "fmt" - "sync" - "time" "github.com/Shopify/sarama" dfv1 "github.com/argoproj-labs/argo-dataflow/api/v1alpha1" "github.com/argoproj-labs/argo-dataflow/runner/sidecar/source" "github.com/opentracing/opentracing-go" "k8s.io/apimachinery/pkg/util/runtime" - "k8s.io/apimachinery/pkg/util/wait" ) type handler struct { - sourceName string - sourceURN string - process source.Process - offsetCommitter bool - mu sync.Mutex + sourceName string + sourceURN string + process source.Process + i int } func newHandler(sourceName, sourceURN string, process source.Process) sarama.ConsumerGroupHandler { return &handler{ - sourceName: sourceName, - sourceURN: sourceURN, - process: process, - offsetCommitter: false, - mu: sync.Mutex{}, + sourceName: sourceName, + sourceURN: sourceURN, + process: process, } } @@ -39,49 +33,24 @@ func (h *handler) Setup(sess sarama.ConsumerGroupSession) error { func (h *handler) Cleanup(sess sarama.ConsumerGroupSession) error { logger.Info("Kafka handler clean-up") - sess.Commit() return nil } func (h *handler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { - defer runtime.HandleCrash() - ctx, cancel := context.WithCancel(sess.Context()) - defer cancel() - h.startOffsetCommitter(sess, ctx) + ctx := sess.Context() logger.Info("starting consuming claim", "partition", claim.Partition()) for msg := range claim.Messages() { if err := h.processMessage(ctx, msg); err != nil { logger.Error(err, "failed to process message") } else { sess.MarkMessage(msg, "") - } - } - return nil -} - -func (h *handler) startOffsetCommitter(sess sarama.ConsumerGroupSession, ctx context.Context) { - h.mu.Lock() - defer h.mu.Unlock() - // this func will be called once per partition, but we only need one committer, - // so return if one exists - if h.offsetCommitter { - return - } - h.offsetCommitter = true - go wait.JitterUntilWithContext(ctx, func(ctx context.Context) { - logger.Info("starting Kafka offset committer") - ticker := time.NewTicker(time.Second) - defer ticker.Stop() - for { - select { - case <-ctx.Done(): - h.offsetCommitter = false - sess.Commit() - case <-ticker.C: + h.i++ + if h.i%dfv1.CommitN == 0 { sess.Commit() } } - }, time.Second, 1.2, true) + } + return nil } func (h *handler) processMessage(ctx context.Context, msg *sarama.ConsumerMessage) error {