Skip to content

Commit

Permalink
fix: Partial revert of 76e1d95 (#360)
Browse files Browse the repository at this point in the history
Signed-off-by: Alex Collins <[email protected]>
  • Loading branch information
alexec authored Sep 20, 2021
1 parent 2176e72 commit a618b38
Showing 1 changed file with 12 additions and 43 deletions.
55 changes: 12 additions & 43 deletions runner/sidecar/source/kafka/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,32 +3,26 @@ package kafka
import (
"context"
"fmt"
"sync"
"time"

"github.com/Shopify/sarama"
dfv1 "github.com/argoproj-labs/argo-dataflow/api/v1alpha1"
"github.com/argoproj-labs/argo-dataflow/runner/sidecar/source"
"github.com/opentracing/opentracing-go"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
)

type handler struct {
sourceName string
sourceURN string
process source.Process
offsetCommitter bool
mu sync.Mutex
sourceName string
sourceURN string
process source.Process
i int
}

func newHandler(sourceName, sourceURN string, process source.Process) sarama.ConsumerGroupHandler {
return &handler{
sourceName: sourceName,
sourceURN: sourceURN,
process: process,
offsetCommitter: false,
mu: sync.Mutex{},
sourceName: sourceName,
sourceURN: sourceURN,
process: process,
}
}

Expand All @@ -39,49 +33,24 @@ func (h *handler) Setup(sess sarama.ConsumerGroupSession) error {

func (h *handler) Cleanup(sess sarama.ConsumerGroupSession) error {
logger.Info("Kafka handler clean-up")
sess.Commit()
return nil
}

func (h *handler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
defer runtime.HandleCrash()
ctx, cancel := context.WithCancel(sess.Context())
defer cancel()
h.startOffsetCommitter(sess, ctx)
ctx := sess.Context()
logger.Info("starting consuming claim", "partition", claim.Partition())
for msg := range claim.Messages() {
if err := h.processMessage(ctx, msg); err != nil {
logger.Error(err, "failed to process message")
} else {
sess.MarkMessage(msg, "")
}
}
return nil
}

func (h *handler) startOffsetCommitter(sess sarama.ConsumerGroupSession, ctx context.Context) {
h.mu.Lock()
defer h.mu.Unlock()
// this func will be called once per partition, but we only need one committer,
// so return if one exists
if h.offsetCommitter {
return
}
h.offsetCommitter = true
go wait.JitterUntilWithContext(ctx, func(ctx context.Context) {
logger.Info("starting Kafka offset committer")
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
h.offsetCommitter = false
sess.Commit()
case <-ticker.C:
h.i++
if h.i%dfv1.CommitN == 0 {
sess.Commit()
}
}
}, time.Second, 1.2, true)
}
return nil
}

func (h *handler) processMessage(ctx context.Context, msg *sarama.ConsumerMessage) error {
Expand Down

0 comments on commit a618b38

Please sign in to comment.