Skip to content

Commit

Permalink
fix: change Kafka to use stats for pending
Browse files Browse the repository at this point in the history
  • Loading branch information
alexec committed Oct 2, 2021
1 parent 63ea3fa commit dc03e6c
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 25 deletions.
56 changes: 32 additions & 24 deletions runner/sidecar/source/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package kafka

import (
"context"
"encoding/json"
"fmt"
"sync"
"time"
Expand All @@ -28,6 +29,7 @@ type kafkaSource struct {
wg *sync.WaitGroup
channels map[int32]chan *kafka.Message
process source.Process
totalLag int64
}

func New(ctx context.Context, secretInterface corev1.SecretInterface, mntr monitor.Interface, consumerGroupID, sourceName, sourceURN string, replica int, x dfv1.KafkaSource, process source.Process) (source.Interface, error) {
Expand All @@ -45,6 +47,7 @@ func New(ctx context.Context, secretInterface corev1.SecretInterface, mntr monit
} else {
config["auto.offset.reset"] = "latest"
}
config["statistics.interval.ms"] = 5 * 1000
logger.Info("Kafka config", "config", sharedutil.MustJSON(sharedkafka.RedactConfigMap(config)))
// https://github.com/confluentinc/confluent-kafka-go/blob/master/examples/consumer_example/consumer_example.go
consumer, err := kafka.NewConsumer(&config)
Expand Down Expand Up @@ -112,6 +115,22 @@ func (s *kafkaSource) revokedPartition(ctx context.Context, partition int32) {
s.mntr.RevokedPartition(ctx, s.sourceURN, partition)
}

type Stats struct {
Topics map[string]struct {
Partitions map[string]struct {
ConsumerLag int64 `json:"consumer_lag"`
} `json:"partitions"`
} `json:"topics"`
}

func (s Stats) totalLag(topic string) int64 {
var totalLag int64
for _, p := range s.Topics[topic].Partitions {
totalLag += p.ConsumerLag
}
return totalLag
}

func (s *kafkaSource) startPollLoop(ctx context.Context) {
s.logger.Info("starting poll loop")
for {
Expand All @@ -134,6 +153,15 @@ func (s *kafkaSource) startPollLoop(ctx context.Context) {
}()
s.channels[e.TopicPartition.Partition] <- e
}()
case *kafka.Stats:
// https://github.com/edenhill/librdkafka/wiki/Consumer-lag-monitoring
// https://github.com/confluentinc/confluent-kafka-go/blob/master/examples/stats_example/stats_example.go
stats := &Stats{}
if err := json.Unmarshal([]byte(e.String()), stats); err != nil {
s.logger.Error(err, "failed to unmarshall stats")
} else {
s.totalLag = stats.totalLag(s.topic)
}
case kafka.Error:
s.logger.Error(fmt.Errorf("%v", e), "poll error")
case nil:
Expand All @@ -158,31 +186,11 @@ func (s *kafkaSource) Close() error {
}

func (s *kafkaSource) GetPending(context.Context) (uint64, error) {
// TODO - only works for assigned partitions
toppars, err := s.consumer.Assignment()
if err != nil {
return 0, err
}
toppars, err = s.consumer.Committed(toppars, 3*1000)
if err != nil {
return 0, err
}
var low, high int64
var pending int64
for _, t := range toppars {
low, high, err = s.consumer.QueryWatermarkOffsets(*t.Topic, t.Partition, 3*1000)
if err != nil {
return 0, err
}
offset := int64(t.Offset)
if t.Offset == kafka.OffsetInvalid {
offset = low
}
if d := high - offset; d > 0 {
pending += d
}
if s.totalLag >= 0 {
return uint64(s.totalLag), nil
} else {
return 0, nil
}
return uint64(pending), nil
}

func (s *kafkaSource) rebalanced(ctx context.Context, event kafka.Event) error {
Expand Down
1 change: 1 addition & 0 deletions test/kafka-stress/kafka_stress_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ func TestKafkaSourceStress(t *testing.T) {
defer StartTPSReporter(t, "main", prefix, n)()
go PumpKafkaTopic(topic, n, prefix, Params.MessageSize)
WaitForPending()
WaitForNothingPending()
WaitForTotalSunkMessages(n, Params.Timeout)
}

Expand Down
6 changes: 5 additions & 1 deletion test/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,11 @@ import (
)

func WaitForPending() {
ExpectMetric("pending", Gt(0))
ExpectMetric("sources_pending", Gt(0))
}

func WaitForNothingPending() {
ExpectMetric("sources_pending", Eq(0))
}

func WaitForTotalSourceMessages(v int) {
Expand Down

0 comments on commit dc03e6c

Please sign in to comment.