From caadb8bb80ee8e4901bcf484c44ee827a8b2af70 Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Fri, 20 Aug 2021 12:29:42 -0600 Subject: [PATCH] partitioner: rename LeastLoad to LeastBackup Least load implies that we are sending to the least-loaded broker, not that this is sending based off of the client internal load. Least backup better implies backup to a broker, which is more in the spirit of what this partitioner is about. --- pkg/kgo/partitioner.go | 58 +++++++++++++++++++++--------------------- pkg/kgo/producer.go | 6 ++--- 2 files changed, 32 insertions(+), 32 deletions(-) diff --git a/pkg/kgo/partitioner.go b/pkg/kgo/partitioner.go index 69c35d77..a58652a0 100644 --- a/pkg/kgo/partitioner.go +++ b/pkg/kgo/partitioner.go @@ -36,19 +36,19 @@ type TopicPartitioner interface { Partition(r *Record, n int) int } -// TopicLoadPartitioner is an optional extension interface to the -// TopicPartitioner that can partition by load. +// TopicBackupPartitioner is an optional extension interface to the +// TopicPartitioner that can partition by the number of records buffered. // // If a partitioner implements this interface, the Partition function will // never be called. -type TopicLoadPartitioner interface { +type TopicBackupPartitioner interface { TopicPartitioner - // PartitionByLoad is similar to Partition, but has an additional - // loadFn function. This function will return the number of buffered + // PartitionByBackup is similar to Partition, but has an additional + // backupFn function. This function will return the number of buffered // records per index. The function can only be called up to n times, // calling it any more will panic. - PartitionByLoad(r *Record, n int, loadFn func() (int, int64)) int + PartitionByBackup(r *Record, n int, backupFn func() (int, int64)) int } type lpInput struct { @@ -110,13 +110,13 @@ func ManualPartitioner() Partitioner { }) } -// LeastLoadPartitioner prioritizes partitioning by three factors, in order: +// LeastBackupPartitioner prioritizes partitioning by three factors, in order: // // 1) pin to the current pick until there is a new batch -// 2) on new batch, choose the least loaded partition -// 3) if multiple partitions are equally least-loaded, choose one at random +// 2) on new batch, choose the least backed up partition +// 3) if multiple partitions are equally least-backed-up, choose one at random // -// This algorithm prioritizes lead-loaded throughput, which may result in +// This algorithm prioritizes lead-backed-up throughput, which may result in // unequal partitioning. It is likely that the algorithm will talk most to the // broker that it has the best connection to. // @@ -125,47 +125,47 @@ func ManualPartitioner() Partitioner { // records will be reached in any offline partitions before metadata responds // that the broker is offline. With the standard partitioning algorithms, the // only recovery is if the partition is remapped or if the broker comes back -// online. With the lead load partitioner, downed partitions will see slight +// online. With the lead backup partitioner, downed partitions will see slight // backup, but then the other partitions that are still accepting writes will -// get all of the load. +// get all of the writes. // // Under ideal scenarios (no broker / connection issues), StickyPartitioner is -// faster than LeastLoadPartitioner due to the sticky partitioner doing less +// faster than LeastBackupPartitioner due to the sticky partitioner doing less // work while partitioning. This partitioner is only recommended if you are a // producer consistently dealing with flaky connections or problematic brokers // and do not mind uneven load on your brokers. -func LeastLoadPartitioner() Partitioner { - return new(leastLoadPartitioner) +func LeastBackupPartitioner() Partitioner { + return new(leastBackupPartitioner) } -type leastLoadPartitioner struct{} +type leastBackupPartitioner struct{} -func (*leastLoadPartitioner) ForTopic(string) TopicPartitioner { - p := newLeastLoadTopicPartitioner() +func (*leastBackupPartitioner) ForTopic(string) TopicPartitioner { + p := newLeastBackupTopicPartitioner() return &p } -func newLeastLoadTopicPartitioner() leastLoadTopicPartitioner { - return leastLoadTopicPartitioner{ +func newLeastBackupTopicPartitioner() leastBackupTopicPartitioner { + return leastBackupTopicPartitioner{ onPart: -1, } } -type leastLoadTopicPartitioner struct { +type leastBackupTopicPartitioner struct { onPart int } -func (p *leastLoadTopicPartitioner) OnNewBatch() { p.onPart = -1 } -func (*leastLoadTopicPartitioner) RequiresConsistency(*Record) bool { return false } -func (*leastLoadTopicPartitioner) Partition(*Record, int) int { panic("unreachable") } +func (p *leastBackupTopicPartitioner) OnNewBatch() { p.onPart = -1 } +func (*leastBackupTopicPartitioner) RequiresConsistency(*Record) bool { return false } +func (*leastBackupTopicPartitioner) Partition(*Record, int) int { panic("unreachable") } -func (p *leastLoadTopicPartitioner) PartitionByLoad(_ *Record, n int, loadFn func() (int, int64)) int { +func (p *leastBackupTopicPartitioner) PartitionByBackup(_ *Record, n int, backupFn func() (int, int64)) int { if p.onPart == -1 || p.onPart >= n { - leastLoad := int64(math.MaxInt64) + leastBackup := int64(math.MaxInt64) for ; n > 0; n-- { - pick, load := loadFn() - if load < leastLoad { - leastLoad = load + pick, backup := backupFn() + if backup < leastBackup { + leastBackup = backup p.onPart = pick } } diff --git a/pkg/kgo/producer.go b/pkg/kgo/producer.go index 98bc7b4a..fbe97de1 100644 --- a/pkg/kgo/producer.go +++ b/pkg/kgo/producer.go @@ -406,14 +406,14 @@ func (cl *Client) doPartitionRecord(parts *topicPartitions, partsData *topicPart } var pick int - tlp, _ := parts.partitioner.(TopicLoadPartitioner) + tlp, _ := parts.partitioner.(TopicBackupPartitioner) if tlp != nil { if parts.lpInput == nil { parts.lpInput = new(lpInput) } parts.lpInput.on = 0 parts.lpInput.mapping = mapping - pick = tlp.PartitionByLoad(pr.Record, len(mapping), parts.lpInput.next) + pick = tlp.PartitionByBackup(pr.Record, len(mapping), parts.lpInput.next) } else { pick = parts.partitioner.Partition(pr.Record, len(mapping)) } @@ -431,7 +431,7 @@ func (cl *Client) doPartitionRecord(parts *topicPartitions, partsData *topicPart if tlp != nil { parts.lpInput.on = 0 parts.lpInput.mapping = mapping - pick = tlp.PartitionByLoad(pr.Record, len(mapping), parts.lpInput.next) + pick = tlp.PartitionByBackup(pr.Record, len(mapping), parts.lpInput.next) } else { pick = parts.partitioner.Partition(pr.Record, len(mapping)) }