diff --git a/pkg/kgo/partitioner.go b/pkg/kgo/partitioner.go index a58652a0..c33a45d7 100644 --- a/pkg/kgo/partitioner.go +++ b/pkg/kgo/partitioner.go @@ -45,22 +45,35 @@ type TopicBackupPartitioner interface { TopicPartitioner // PartitionByBackup is similar to Partition, but has an additional - // backupFn function. This function will return the number of buffered - // records per index. The function can only be called up to n times, - // calling it any more will panic. - PartitionByBackup(r *Record, n int, backupFn func() (int, int64)) int + // backupIter. This iterator will return the number of buffered records + // per partition index. The iterator's Next function can only be called + // up to n times, calling it any more will panic. + PartitionByBackup(r *Record, n int, backupIter TopicBackupIter) int } -type lpInput struct { - on int +// TopicBackupIter is an iterates through partition indices. +type TopicBackupIter interface { + // Next returns the next partition index and the total buffered records + // for the partition. If Rem returns 0, calling this function again + // will panic. + Next() (int, int64) + // Rem returns the number of elements left to iterate through. + Rem() int +} + +type leastBackupInput struct { mapping []*topicPartition } -func (i *lpInput) next() (int, int64) { - on := i.on - buffered := atomic.LoadInt64(&i.mapping[on].records.buffered) - i.on++ - return on, buffered +func (i *leastBackupInput) Next() (int, int64) { + last := len(i.mapping) - 1 + buffered := atomic.LoadInt64(&i.mapping[last].records.buffered) + i.mapping = i.mapping[:last] + return last, buffered +} + +func (i *leastBackupInput) Rem() int { + return len(i.mapping) } // BasicConsistentPartitioner wraps a single function to provide a Partitioner @@ -113,27 +126,28 @@ func ManualPartitioner() Partitioner { // LeastBackupPartitioner prioritizes partitioning by three factors, in order: // // 1) pin to the current pick until there is a new batch -// 2) on new batch, choose the least backed up partition +// 2) on new batch, choose the least backed up partition (the partition with +// the fewest amount of buffered records) // 3) if multiple partitions are equally least-backed-up, choose one at random // -// This algorithm prioritizes lead-backed-up throughput, which may result in -// unequal partitioning. It is likely that the algorithm will talk most to the +// This algorithm prioritizes least-backed-up throughput, which may result in +// unequal partitioning. It is likely that this algorithm will talk most to the // broker that it has the best connection to. // // This algorithm is resilient to brokers going down: if a few brokers die, it // is possible your throughput will be so high that the maximum buffered -// records will be reached in any offline partitions before metadata responds -// that the broker is offline. With the standard partitioning algorithms, the -// only recovery is if the partition is remapped or if the broker comes back -// online. With the lead backup partitioner, downed partitions will see slight -// backup, but then the other partitions that are still accepting writes will -// get all of the writes. +// records will be reached in the now-offline partitions before metadata +// responds that the broker is offline. With the standard partitioning +// algorithms, the only recovery is if the partition is remapped or if the +// broker comes back online. With the least backup partitioner, downed +// partitions will see slight backup, but then the other partitions that are +// still accepting writes will get all of the writes and your client will not +// be blocked. // // Under ideal scenarios (no broker / connection issues), StickyPartitioner is -// faster than LeastBackupPartitioner due to the sticky partitioner doing less -// work while partitioning. This partitioner is only recommended if you are a -// producer consistently dealing with flaky connections or problematic brokers -// and do not mind uneven load on your brokers. +// equivalent to LeastBackupPartitioner. This partitioner is only recommended +// if you are a producer consistently dealing with flaky connections or +// problematic brokers and do not mind uneven load on your brokers. func LeastBackupPartitioner() Partitioner { return new(leastBackupPartitioner) } @@ -148,25 +162,34 @@ func (*leastBackupPartitioner) ForTopic(string) TopicPartitioner { func newLeastBackupTopicPartitioner() leastBackupTopicPartitioner { return leastBackupTopicPartitioner{ onPart: -1, + rng: rand.New(rand.NewSource(time.Now().UnixNano())), } } type leastBackupTopicPartitioner struct { onPart int + rng *rand.Rand } func (p *leastBackupTopicPartitioner) OnNewBatch() { p.onPart = -1 } func (*leastBackupTopicPartitioner) RequiresConsistency(*Record) bool { return false } func (*leastBackupTopicPartitioner) Partition(*Record, int) int { panic("unreachable") } -func (p *leastBackupTopicPartitioner) PartitionByBackup(_ *Record, n int, backupFn func() (int, int64)) int { +func (p *leastBackupTopicPartitioner) PartitionByBackup(_ *Record, n int, backup TopicBackupIter) int { if p.onPart == -1 || p.onPart >= n { leastBackup := int64(math.MaxInt64) + npicked := 0 for ; n > 0; n-- { - pick, backup := backupFn() + pick, backup := backup.Next() if backup < leastBackup { leastBackup = backup p.onPart = pick + npicked = 1 + } else { + npicked++ // resevoir sampling with k = 1 + if p.rng.Intn(npicked) == 0 { + p.onPart = pick + } } } } diff --git a/pkg/kgo/producer.go b/pkg/kgo/producer.go index fbe97de1..ebede442 100644 --- a/pkg/kgo/producer.go +++ b/pkg/kgo/producer.go @@ -408,12 +408,11 @@ func (cl *Client) doPartitionRecord(parts *topicPartitions, partsData *topicPart var pick int tlp, _ := parts.partitioner.(TopicBackupPartitioner) if tlp != nil { - if parts.lpInput == nil { - parts.lpInput = new(lpInput) + if parts.lb == nil { + parts.lb = new(leastBackupInput) } - parts.lpInput.on = 0 - parts.lpInput.mapping = mapping - pick = tlp.PartitionByBackup(pr.Record, len(mapping), parts.lpInput.next) + parts.lb.mapping = mapping + pick = tlp.PartitionByBackup(pr.Record, len(mapping), parts.lb) } else { pick = parts.partitioner.Partition(pr.Record, len(mapping)) } @@ -429,9 +428,8 @@ func (cl *Client) doPartitionRecord(parts *topicPartitions, partsData *topicPart parts.partitioner.OnNewBatch() if tlp != nil { - parts.lpInput.on = 0 - parts.lpInput.mapping = mapping - pick = tlp.PartitionByBackup(pr.Record, len(mapping), parts.lpInput.next) + parts.lb.mapping = mapping + pick = tlp.PartitionByBackup(pr.Record, len(mapping), parts.lb) } else { pick = parts.partitioner.Partition(pr.Record, len(mapping)) } diff --git a/pkg/kgo/topics_and_partitions.go b/pkg/kgo/topics_and_partitions.go index cb7cd27c..bdc47918 100644 --- a/pkg/kgo/topics_and_partitions.go +++ b/pkg/kgo/topics_and_partitions.go @@ -19,7 +19,7 @@ type topicPartitions struct { partsMu sync.Mutex partitioner TopicPartitioner - lpInput *lpInput // for partitioning if the partitioner is a LoadTopicPartitioner + lb *leastBackupInput // for partitioning if the partitioner is a LoadTopicPartitioner } func (t *topicPartitions) load() *topicPartitionsData { return t.v.Load().(*topicPartitionsData) }