Skip to content

Commit

Permalink
fix: make Kafka consumers channels size 1
Browse files Browse the repository at this point in the history
Signed-off-by: Alex Collins <[email protected]>
  • Loading branch information
alexec committed Oct 5, 2021
1 parent 64bf5d7 commit 4d2ad98
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 4 deletions.
2 changes: 1 addition & 1 deletion runner/sidecar/source/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func (s *kafkaSource) assignedPartition(ctx context.Context, partition int32) {
logger := s.logger.WithValues("partition", partition)
if _, ok := s.channels[partition]; !ok {
logger.Info("assigned partition")
s.channels[partition] = make(chan *kafka.Message, 256)
s.channels[partition] = make(chan *kafka.Message, 1)
go func() {
defer runtime.HandleCrash()
s.consumePartition(ctx, partition)
Expand Down
6 changes: 3 additions & 3 deletions test/kafka-stress/test-results.json
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
{
"TestKafkaAsyncSinkStress/.tps": 350,
"TestKafkaSinkStress/.tps": 200,
"TestKafkaSinkStress/.tps": 300,
"TestKafkaSinkStress/N=10,messageSize=100.tps": 200,
"TestKafkaSinkStress/N=10,messageSize=1000.tps": 150,
"TestKafkaSinkStress/N=50000.tps": 750,
"TestKafkaSinkStress/async=true.tps": 400,
"TestKafkaSinkStress/messageSize=1000.tps": 300,
"TestKafkaSinkStress/replicas=2.tps": 400,
"TestKafkaSourceStress/.tps": 350,
"TestKafkaSourceStress/.tps": 400,
"TestKafkaSourceStress/N=10,messageSize=100.tps": 450,
"TestKafkaSourceStress/N=10,messageSize=1000.tps": 650,
"TestKafkaSourceStress/N=50000.tps": 3150,
"TestKafkaSourceStress/messageSize=1000.tps": 850,
"TestKafkaSourceStress/replicas=2.tps": 950
"TestKafkaSourceStress/replicas=2.tps": 500
}

0 comments on commit 4d2ad98

Please sign in to comment.