Skip to content

Commit

Permalink
add RoundRobinPartitioner
Browse files Browse the repository at this point in the history
This actually showed that OnNewBatch should not be a required method on
the Partitioner interface but instead should be an optional extension.
If it is required, then every message published to round robin goes to a
new batch:
- partition on 1?
  - no, new batch
- partition on 2
  - inc to 3
All partitioning starts on odd partitions, then completes on even.
Removing OnNewBatch gets us to true round robin.

Closes #66.
  • Loading branch information
twmb committed Aug 23, 2021
1 parent 5be804d commit 0d01f74
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 7 deletions.
47 changes: 42 additions & 5 deletions pkg/kgo/partitioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,6 @@ type Partitioner interface {

// TopicPartitioner partitions records in an individual topic.
type TopicPartitioner interface {
// OnNewBatch is called when producing a record if that record would
// trigger a new batch on its current partition.
OnNewBatch()
// RequiresConsistency returns true if a record must hash to the same
// partition even if a partition is down.
// If true, a record may hash to a partition that cannot be written to
Expand All @@ -36,7 +33,20 @@ type TopicPartitioner interface {
Partition(r *Record, n int) int
}

// TopicBackupPartitioner is an optional extension interface to the
// TopicPartitionerOnNewBatch is an optional extension interface to
// TopicPartitioner that calls OnNewBatch before any new batch is created. If
// buffering a record would cause a new batch, OnNewBatch is called.
//
// This interface allows for partitioner implementations that effectively pin
// to a partition until a new batch is created, after which the partitioner can
// choose which next partition to use.
type TopicPartitionerOnNewBatch interface {
// OnNewBatch is called when producing a record if that record would
// trigger a new batch on its current partition.
OnNewBatch()
}

// TopicBackupPartitioner is an optional extension interface to
// TopicPartitioner that can partition by the number of records buffered.
//
// If a partitioner implements this interface, the Partition function will
Expand Down Expand Up @@ -105,7 +115,6 @@ type basicTopicPartitioner struct {
fn func(*Record, int) int
}

func (*basicTopicPartitioner) OnNewBatch() {}
func (*basicTopicPartitioner) RequiresConsistency(*Record) bool { return true }
func (b *basicTopicPartitioner) Partition(r *Record, n int) int { return b.fn(r, n) }

Expand All @@ -123,6 +132,34 @@ func ManualPartitioner() Partitioner {
})
}

// RoundRobinPartitioner is a partitioner that round-robin's through all
// available partitions. This algorithm has lower throughput and causes higher
// CPU load on brokers, but can be useful if you want to ensure an even
// distribution of records to partitions.
func RoundRobinPartitioner() Partitioner {
return new(roundRobinPartitioner)
}

type roundRobinPartitioner struct{}

func (*roundRobinPartitioner) ForTopic(string) TopicPartitioner {
return new(roundRobinTopicPartitioner)
}

type roundRobinTopicPartitioner struct {
on int
}

func (*roundRobinTopicPartitioner) RequiresConsistency(*Record) bool { return false }
func (r *roundRobinTopicPartitioner) Partition(_ *Record, n int) int {
if r.on >= n {
r.on = 0
}
ret := r.on
r.on++
return ret
}

// LeastBackupPartitioner prioritizes partitioning by three factors, in order:
//
// 1) pin to the current pick until there is a new batch
Expand Down
6 changes: 4 additions & 2 deletions pkg/kgo/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -423,9 +423,11 @@ func (cl *Client) doPartitionRecord(parts *topicPartitions, partsData *topicPart

partition := mapping[pick]

processed := partition.records.bufferRecord(pr, true) // KIP-480
onNewBatch, _ := parts.partitioner.(TopicPartitionerOnNewBatch)
abortOnNewBatch := onNewBatch != nil
processed := partition.records.bufferRecord(pr, abortOnNewBatch) // KIP-480
if !processed {
parts.partitioner.OnNewBatch()
onNewBatch.OnNewBatch()

if tlp != nil {
parts.lb.mapping = mapping
Expand Down

0 comments on commit 0d01f74

Please sign in to comment.