diff --git a/runner/sidecar/source/kafka/handler.go b/runner/sidecar/source/kafka/handler.go index 175fe0af..bd8b7d70 100644 --- a/runner/sidecar/source/kafka/handler.go +++ b/runner/sidecar/source/kafka/handler.go @@ -39,6 +39,7 @@ func (h *handler) Cleanup(sess sarama.ConsumerGroupSession) error { func (h *handler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { ctx := sess.Context() logger.Info("starting consuming claim", "partition", claim.Partition()) + defer sess.Commit() for msg := range claim.Messages() { if err := h.processMessage(ctx, msg); err != nil { logger.Error(err, "failed to process message") diff --git a/test/kafka-stress/test-results.json b/test/kafka-stress/test-results.json index dd19d435..aac447bb 100644 --- a/test/kafka-stress/test-results.json +++ b/test/kafka-stress/test-results.json @@ -1,12 +1,12 @@ { - "TestKafkaSinkStress/.tps": 750, + "TestKafkaSinkStress/.tps": 600, "TestKafkaSinkStress/N=10,messageSize=100.tps": 200, "TestKafkaSinkStress/N=10,messageSize=1000.tps": 150, "TestKafkaSinkStress/N=50000.tps": 750, "TestKafkaSinkStress/async=true.tps": 400, "TestKafkaSinkStress/messageSize=1000.tps": 300, "TestKafkaSinkStress/replicas=2.tps": 400, - "TestKafkaSourceStress/.tps": 3450, + "TestKafkaSourceStress/.tps": 1300, "TestKafkaSourceStress/N=10,messageSize=100.tps": 450, "TestKafkaSourceStress/N=10,messageSize=1000.tps": 650, "TestKafkaSourceStress/N=50000.tps": 3150,