Skip to content

Commit

Permalink
partitioner: rename LeastLoad to LeastBackup
Browse files Browse the repository at this point in the history
Least load implies that we are sending to the least-loaded broker, not
that this is sending based off of the client internal load. Least backup
better implies backup to a broker, which is more in the spirit of what
this partitioner is about.
  • Loading branch information
twmb committed Aug 20, 2021
1 parent 589c5e5 commit caadb8b
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 32 deletions.
58 changes: 29 additions & 29 deletions pkg/kgo/partitioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,19 +36,19 @@ type TopicPartitioner interface {
Partition(r *Record, n int) int
}

// TopicLoadPartitioner is an optional extension interface to the
// TopicPartitioner that can partition by load.
// TopicBackupPartitioner is an optional extension interface to the
// TopicPartitioner that can partition by the number of records buffered.
//
// If a partitioner implements this interface, the Partition function will
// never be called.
type TopicLoadPartitioner interface {
type TopicBackupPartitioner interface {
TopicPartitioner

// PartitionByLoad is similar to Partition, but has an additional
// loadFn function. This function will return the number of buffered
// PartitionByBackup is similar to Partition, but has an additional
// backupFn function. This function will return the number of buffered
// records per index. The function can only be called up to n times,
// calling it any more will panic.
PartitionByLoad(r *Record, n int, loadFn func() (int, int64)) int
PartitionByBackup(r *Record, n int, backupFn func() (int, int64)) int
}

type lpInput struct {
Expand Down Expand Up @@ -110,13 +110,13 @@ func ManualPartitioner() Partitioner {
})
}

// LeastLoadPartitioner prioritizes partitioning by three factors, in order:
// LeastBackupPartitioner prioritizes partitioning by three factors, in order:
//
// 1) pin to the current pick until there is a new batch
// 2) on new batch, choose the least loaded partition
// 3) if multiple partitions are equally least-loaded, choose one at random
// 2) on new batch, choose the least backed up partition
// 3) if multiple partitions are equally least-backed-up, choose one at random
//
// This algorithm prioritizes lead-loaded throughput, which may result in
// This algorithm prioritizes lead-backed-up throughput, which may result in
// unequal partitioning. It is likely that the algorithm will talk most to the
// broker that it has the best connection to.
//
Expand All @@ -125,47 +125,47 @@ func ManualPartitioner() Partitioner {
// records will be reached in any offline partitions before metadata responds
// that the broker is offline. With the standard partitioning algorithms, the
// only recovery is if the partition is remapped or if the broker comes back
// online. With the lead load partitioner, downed partitions will see slight
// online. With the lead backup partitioner, downed partitions will see slight
// backup, but then the other partitions that are still accepting writes will
// get all of the load.
// get all of the writes.
//
// Under ideal scenarios (no broker / connection issues), StickyPartitioner is
// faster than LeastLoadPartitioner due to the sticky partitioner doing less
// faster than LeastBackupPartitioner due to the sticky partitioner doing less
// work while partitioning. This partitioner is only recommended if you are a
// producer consistently dealing with flaky connections or problematic brokers
// and do not mind uneven load on your brokers.
func LeastLoadPartitioner() Partitioner {
return new(leastLoadPartitioner)
func LeastBackupPartitioner() Partitioner {
return new(leastBackupPartitioner)
}

type leastLoadPartitioner struct{}
type leastBackupPartitioner struct{}

func (*leastLoadPartitioner) ForTopic(string) TopicPartitioner {
p := newLeastLoadTopicPartitioner()
func (*leastBackupPartitioner) ForTopic(string) TopicPartitioner {
p := newLeastBackupTopicPartitioner()
return &p
}

func newLeastLoadTopicPartitioner() leastLoadTopicPartitioner {
return leastLoadTopicPartitioner{
func newLeastBackupTopicPartitioner() leastBackupTopicPartitioner {
return leastBackupTopicPartitioner{
onPart: -1,
}
}

type leastLoadTopicPartitioner struct {
type leastBackupTopicPartitioner struct {
onPart int
}

func (p *leastLoadTopicPartitioner) OnNewBatch() { p.onPart = -1 }
func (*leastLoadTopicPartitioner) RequiresConsistency(*Record) bool { return false }
func (*leastLoadTopicPartitioner) Partition(*Record, int) int { panic("unreachable") }
func (p *leastBackupTopicPartitioner) OnNewBatch() { p.onPart = -1 }
func (*leastBackupTopicPartitioner) RequiresConsistency(*Record) bool { return false }
func (*leastBackupTopicPartitioner) Partition(*Record, int) int { panic("unreachable") }

func (p *leastLoadTopicPartitioner) PartitionByLoad(_ *Record, n int, loadFn func() (int, int64)) int {
func (p *leastBackupTopicPartitioner) PartitionByBackup(_ *Record, n int, backupFn func() (int, int64)) int {
if p.onPart == -1 || p.onPart >= n {
leastLoad := int64(math.MaxInt64)
leastBackup := int64(math.MaxInt64)
for ; n > 0; n-- {
pick, load := loadFn()
if load < leastLoad {
leastLoad = load
pick, backup := backupFn()
if backup < leastBackup {
leastBackup = backup
p.onPart = pick
}
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/kgo/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -406,14 +406,14 @@ func (cl *Client) doPartitionRecord(parts *topicPartitions, partsData *topicPart
}

var pick int
tlp, _ := parts.partitioner.(TopicLoadPartitioner)
tlp, _ := parts.partitioner.(TopicBackupPartitioner)
if tlp != nil {
if parts.lpInput == nil {
parts.lpInput = new(lpInput)
}
parts.lpInput.on = 0
parts.lpInput.mapping = mapping
pick = tlp.PartitionByLoad(pr.Record, len(mapping), parts.lpInput.next)
pick = tlp.PartitionByBackup(pr.Record, len(mapping), parts.lpInput.next)
} else {
pick = parts.partitioner.Partition(pr.Record, len(mapping))
}
Expand All @@ -431,7 +431,7 @@ func (cl *Client) doPartitionRecord(parts *topicPartitions, partsData *topicPart
if tlp != nil {
parts.lpInput.on = 0
parts.lpInput.mapping = mapping
pick = tlp.PartitionByLoad(pr.Record, len(mapping), parts.lpInput.next)
pick = tlp.PartitionByBackup(pr.Record, len(mapping), parts.lpInput.next)
} else {
pick = parts.partitioner.Partition(pr.Record, len(mapping))
}
Expand Down

0 comments on commit caadb8b

Please sign in to comment.