From 0d01f7456b4dca1d987cd431e3925092611fbb99 Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Mon, 23 Aug 2021 14:54:16 -0600 Subject: [PATCH] add RoundRobinPartitioner This actually showed that OnNewBatch should not be a required method on the Partitioner interface but instead should be an optional extension. If it is required, then every message published to round robin goes to a new batch: - partition on 1? - no, new batch - partition on 2 - inc to 3 All partitioning starts on odd partitions, then completes on even. Removing OnNewBatch gets us to true round robin. Closes #66. --- pkg/kgo/partitioner.go | 47 +++++++++++++++++++++++++++++++++++++----- pkg/kgo/producer.go | 6 ++++-- 2 files changed, 46 insertions(+), 7 deletions(-) diff --git a/pkg/kgo/partitioner.go b/pkg/kgo/partitioner.go index c33a45d7..0b4387b7 100644 --- a/pkg/kgo/partitioner.go +++ b/pkg/kgo/partitioner.go @@ -23,9 +23,6 @@ type Partitioner interface { // TopicPartitioner partitions records in an individual topic. type TopicPartitioner interface { - // OnNewBatch is called when producing a record if that record would - // trigger a new batch on its current partition. - OnNewBatch() // RequiresConsistency returns true if a record must hash to the same // partition even if a partition is down. // If true, a record may hash to a partition that cannot be written to @@ -36,7 +33,20 @@ type TopicPartitioner interface { Partition(r *Record, n int) int } -// TopicBackupPartitioner is an optional extension interface to the +// TopicPartitionerOnNewBatch is an optional extension interface to +// TopicPartitioner that calls OnNewBatch before any new batch is created. If +// buffering a record would cause a new batch, OnNewBatch is called. +// +// This interface allows for partitioner implementations that effectively pin +// to a partition until a new batch is created, after which the partitioner can +// choose which next partition to use. +type TopicPartitionerOnNewBatch interface { + // OnNewBatch is called when producing a record if that record would + // trigger a new batch on its current partition. + OnNewBatch() +} + +// TopicBackupPartitioner is an optional extension interface to // TopicPartitioner that can partition by the number of records buffered. // // If a partitioner implements this interface, the Partition function will @@ -105,7 +115,6 @@ type basicTopicPartitioner struct { fn func(*Record, int) int } -func (*basicTopicPartitioner) OnNewBatch() {} func (*basicTopicPartitioner) RequiresConsistency(*Record) bool { return true } func (b *basicTopicPartitioner) Partition(r *Record, n int) int { return b.fn(r, n) } @@ -123,6 +132,34 @@ func ManualPartitioner() Partitioner { }) } +// RoundRobinPartitioner is a partitioner that round-robin's through all +// available partitions. This algorithm has lower throughput and causes higher +// CPU load on brokers, but can be useful if you want to ensure an even +// distribution of records to partitions. +func RoundRobinPartitioner() Partitioner { + return new(roundRobinPartitioner) +} + +type roundRobinPartitioner struct{} + +func (*roundRobinPartitioner) ForTopic(string) TopicPartitioner { + return new(roundRobinTopicPartitioner) +} + +type roundRobinTopicPartitioner struct { + on int +} + +func (*roundRobinTopicPartitioner) RequiresConsistency(*Record) bool { return false } +func (r *roundRobinTopicPartitioner) Partition(_ *Record, n int) int { + if r.on >= n { + r.on = 0 + } + ret := r.on + r.on++ + return ret +} + // LeastBackupPartitioner prioritizes partitioning by three factors, in order: // // 1) pin to the current pick until there is a new batch diff --git a/pkg/kgo/producer.go b/pkg/kgo/producer.go index ebede442..ed6f86a0 100644 --- a/pkg/kgo/producer.go +++ b/pkg/kgo/producer.go @@ -423,9 +423,11 @@ func (cl *Client) doPartitionRecord(parts *topicPartitions, partsData *topicPart partition := mapping[pick] - processed := partition.records.bufferRecord(pr, true) // KIP-480 + onNewBatch, _ := parts.partitioner.(TopicPartitionerOnNewBatch) + abortOnNewBatch := onNewBatch != nil + processed := partition.records.bufferRecord(pr, abortOnNewBatch) // KIP-480 if !processed { - parts.partitioner.OnNewBatch() + onNewBatch.OnNewBatch() if tlp != nil { parts.lb.mapping = mapping