Skip to content

Commit

Permalink
feat: Run Kafka offset commit loop every 1s, 2x faster, more robust.
Browse files Browse the repository at this point in the history
  • Loading branch information
alexec committed Sep 17, 2021
1 parent e85e697 commit d2bf012
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 13 deletions.
29 changes: 21 additions & 8 deletions runner/sidecar/source/kafka/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,43 +3,56 @@ package kafka
import (
"context"
"fmt"
"time"

"github.com/Shopify/sarama"
dfv1 "github.com/argoproj-labs/argo-dataflow/api/v1alpha1"
"github.com/argoproj-labs/argo-dataflow/runner/sidecar/source"
"github.com/opentracing/opentracing-go"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
)

type handler struct {
sourceName string
sourceURN string
process source.Process
i int
}

func (handler) Setup(_ sarama.ConsumerGroupSession) error {
func (h handler) Setup(sess sarama.ConsumerGroupSession) error {
logger.Info("Kafka handler set-up")
return nil
}

func (handler) Cleanup(_ sarama.ConsumerGroupSession) error {
func (h handler) Cleanup(sess sarama.ConsumerGroupSession) error {
logger.Info("Kafka handler clean-up")
sess.Commit()
return nil
}

func (h handler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
ctx := sess.Context()
defer runtime.HandleCrash()
ctx, cancel := context.WithCancel(sess.Context())
defer cancel()
go wait.JitterUntilWithContext(ctx, func(ctx context.Context) {
logger.Info("starting Kafka offset committer")
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
case <-ticker.C:
sess.Commit()
}
}
}, time.Second, 1.2, true)
defer sess.Commit()
logger.Info("starting consuming claim", "partition", claim.Partition())
for msg := range claim.Messages() {
if err := h.processMessage(ctx, msg); err != nil {
logger.Error(err, "failed to process message")
} else {
sess.MarkMessage(msg, "")
h.i++
if h.i%dfv1.CommitN == 0 {
sess.Commit()
}
}
}
return nil
Expand Down
2 changes: 1 addition & 1 deletion runner/sidecar/source/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func New(ctx context.Context, secretInterface corev1.SecretInterface, consumerGr
if err != nil {
return nil, err
}
h := handler{sourceName, sourceURN, process, 0}
h := handler{sourceName, sourceURN, process}
go wait.JitterUntil(func() {
defer runtime.HandleCrash()
ctx := context.Background()
Expand Down
4 changes: 2 additions & 2 deletions test/kafka-fmea/kafka_fmea_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func TestKafkaFMEA_PodDeletedDisruption(t *testing.T) {
DeletePod("kafka-main-0") // delete the pod to see that we recover and continue to process messages
WaitForPod("kafka-main-0")

ExpectKafkaTopicCount(sinkTopic, n, n+CommitN*2, 2*time.Minute)
ExpectKafkaTopicCount(sinkTopic, n, n, 2*time.Minute)
defer StartPortForward("kafka-main-0")()
WaitForNoErrors()
}
Expand Down Expand Up @@ -117,5 +117,5 @@ func TestKafkaFMEA_PipelineDeletedDisruption(t *testing.T) {
WaitForPodsToBeDeleted()
CreatePipeline(pl)

ExpectKafkaTopicCount(sinkTopic, n, n+CommitN*2, time.Minute)
ExpectKafkaTopicCount(sinkTopic, n, n, time.Minute)
}
4 changes: 2 additions & 2 deletions test/kafka-stress/test-results.json
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
{
"TestKafkaSinkStress/.tps": 650,
"TestKafkaSinkStress/.tps": 850,
"TestKafkaSinkStress/N=10,messageSize=100.tps": 200,
"TestKafkaSinkStress/N=10,messageSize=1000.tps": 150,
"TestKafkaSinkStress/async=true.tps": 400,
"TestKafkaSinkStress/messageSize=1000.tps": 300,
"TestKafkaSinkStress/replicas=2.tps": 400,
"TestKafkaSourceStress/.tps": 1250,
"TestKafkaSourceStress/.tps": 2750,
"TestKafkaSourceStress/N=10,messageSize=100.tps": 450,
"TestKafkaSourceStress/N=10,messageSize=1000.tps": 650,
"TestKafkaSourceStress/messageSize=1000.tps": 850,
Expand Down

0 comments on commit d2bf012

Please sign in to comment.