Skip to content

Commit

Permalink
feat: cammit Kafka offset async (#411)
Browse files Browse the repository at this point in the history
Signed-off-by: Alex Collins <[email protected]>
  • Loading branch information
alexec authored Oct 5, 2021
1 parent e056b27 commit 5bbe50a
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 23 deletions.
54 changes: 34 additions & 20 deletions runner/sidecar/source/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ func New(ctx context.Context, secretInterface corev1.SecretInterface, mntr monit
// https://docs.confluent.io/cloud/current/client-apps/optimizing/throughput.html
config["fetch.min.bytes"] = 100000
config["fetch.wait.max.ms"] = seconds / 2
// config["go.events.channel.enable"] = true
// config["max.poll.interval.ms"] = 300 * seconds
logger.Info("Kafka config", "config", sharedutil.MustJSON(sharedkafka.RedactConfigMap(config)))
// https://github.com/confluentinc/confluent-kafka-go/blob/master/examples/consumer_example/consumer_example.go
consumer, err := kafka.NewConsumer(&config)
Expand Down Expand Up @@ -160,8 +162,7 @@ func (s *kafkaSource) startPollLoop(ctx context.Context) {

func (s *kafkaSource) Close() error {
s.logger.Info("closing partition channels")
for key, ch := range s.channels {
delete(s.channels, key)
for _, ch := range s.channels {
close(ch)
}
s.logger.Info("waiting for partition consumers to finish")
Expand Down Expand Up @@ -195,29 +196,42 @@ func (s *kafkaSource) consumePartition(ctx context.Context, partition int32) {
logger := s.logger.WithValues("partition", partition)
logger.Info("consuming partition")
s.wg.Add(1)
var firstCommittedOffset, lastCommittedOffset int64 = -1, -1
var lastUncommitted *kafka.Message
commitLastUncommitted := func() {
if lastUncommitted != nil {
if _, err := s.consumer.CommitMessage(lastUncommitted); err != nil {
logger.Error(err, "failed to commit message", "offset", lastUncommitted.TopicPartition.Offset)
}
lastUncommitted = nil
}
}
defer func() {
logger.Info("done consuming partition", "firstCommittedOffset", firstCommittedOffset, "lastCommittedOffset", lastCommittedOffset)
logger.Info("committing last uncommitted message")
commitLastUncommitted()
logger.Info("done consuming partition")
s.wg.Done()
}()
for msg := range s.channels[partition] {
offset := int64(msg.TopicPartition.Offset)
logger := logger.WithValues("offset", offset)
if err := s.processMessage(ctx, msg); err != nil {
if errors.Is(err, context.Canceled) {
logger.Info("failed to process message", "err", err.Error())
} else {
logger.Error(err, "failed to process message")
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
commitLastUncommitted()
case msg, ok := <-s.channels[partition]:
if !ok {
return
}
} else {
if _, err := s.consumer.CommitMessage(msg); err != nil {
logger.Error(err, "failed to commit message")
} else {
if firstCommittedOffset == -1 {
firstCommittedOffset = offset
logger.Info("offset", "firstCommittedOffset", firstCommittedOffset)
offset := int64(msg.TopicPartition.Offset)
logger := logger.WithValues("offset", offset)
println("offset=", offset)
if err := s.processMessage(ctx, msg); err != nil {
if errors.Is(err, context.Canceled) {
logger.Info("failed to process message", "err", err.Error())
} else {
logger.Error(err, "failed to process message")
}
lastCommittedOffset = offset
} else {
lastUncommitted = msg
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions test/kafka-stress/test-results.json
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
{
"TestKafkaAsyncSinkStress/.tps": 500,
"TestKafkaSinkStress/.tps": 450,
"TestKafkaAsyncSinkStress/.tps": 1300,
"TestKafkaSinkStress/.tps": 550,
"TestKafkaSinkStress/N=10,messageSize=100.tps": 200,
"TestKafkaSinkStress/N=10,messageSize=1000.tps": 150,
"TestKafkaSinkStress/N=50000.tps": 750,
"TestKafkaSinkStress/async=true.tps": 400,
"TestKafkaSinkStress/messageSize=1000.tps": 300,
"TestKafkaSinkStress/replicas=2.tps": 400,
"TestKafkaSourceStress/.tps": 450,
"TestKafkaSourceStress/.tps": 1950,
"TestKafkaSourceStress/N=10,messageSize=100.tps": 450,
"TestKafkaSourceStress/N=10,messageSize=1000.tps": 650,
"TestKafkaSourceStress/N=50000.tps": 3150,
Expand Down

0 comments on commit 5bbe50a

Please sign in to comment.