Skip to content

Commit

Permalink
LeastBackupPartitioner: fix, speedup
Browse files Browse the repository at this point in the history
The docs indicate that if multiple partitions have the same backup, a
random partition will be chosen. The random choice was forgotten, so
when things were going fast, we always chose one partition. We now
choose one at random using resevoir sample with k=1 (i.e., a very easy
iterative random choice).

Additionally, using an interface seems to be a good bit faster than
using a func. The LeastBackupPartitioner is now faster for me locally
than the StickyKeyPartitioner.
  • Loading branch information
twmb committed Aug 23, 2021
1 parent 841c44d commit 5be804d
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 35 deletions.
75 changes: 49 additions & 26 deletions pkg/kgo/partitioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,22 +45,35 @@ type TopicBackupPartitioner interface {
TopicPartitioner

// PartitionByBackup is similar to Partition, but has an additional
// backupFn function. This function will return the number of buffered
// records per index. The function can only be called up to n times,
// calling it any more will panic.
PartitionByBackup(r *Record, n int, backupFn func() (int, int64)) int
// backupIter. This iterator will return the number of buffered records
// per partition index. The iterator's Next function can only be called
// up to n times, calling it any more will panic.
PartitionByBackup(r *Record, n int, backupIter TopicBackupIter) int
}

type lpInput struct {
on int
// TopicBackupIter is an iterates through partition indices.
type TopicBackupIter interface {
// Next returns the next partition index and the total buffered records
// for the partition. If Rem returns 0, calling this function again
// will panic.
Next() (int, int64)
// Rem returns the number of elements left to iterate through.
Rem() int
}

type leastBackupInput struct {
mapping []*topicPartition
}

func (i *lpInput) next() (int, int64) {
on := i.on
buffered := atomic.LoadInt64(&i.mapping[on].records.buffered)
i.on++
return on, buffered
func (i *leastBackupInput) Next() (int, int64) {
last := len(i.mapping) - 1
buffered := atomic.LoadInt64(&i.mapping[last].records.buffered)
i.mapping = i.mapping[:last]
return last, buffered
}

func (i *leastBackupInput) Rem() int {
return len(i.mapping)
}

// BasicConsistentPartitioner wraps a single function to provide a Partitioner
Expand Down Expand Up @@ -113,27 +126,28 @@ func ManualPartitioner() Partitioner {
// LeastBackupPartitioner prioritizes partitioning by three factors, in order:
//
// 1) pin to the current pick until there is a new batch
// 2) on new batch, choose the least backed up partition
// 2) on new batch, choose the least backed up partition (the partition with
// the fewest amount of buffered records)
// 3) if multiple partitions are equally least-backed-up, choose one at random
//
// This algorithm prioritizes lead-backed-up throughput, which may result in
// unequal partitioning. It is likely that the algorithm will talk most to the
// This algorithm prioritizes least-backed-up throughput, which may result in
// unequal partitioning. It is likely that this algorithm will talk most to the
// broker that it has the best connection to.
//
// This algorithm is resilient to brokers going down: if a few brokers die, it
// is possible your throughput will be so high that the maximum buffered
// records will be reached in any offline partitions before metadata responds
// that the broker is offline. With the standard partitioning algorithms, the
// only recovery is if the partition is remapped or if the broker comes back
// online. With the lead backup partitioner, downed partitions will see slight
// backup, but then the other partitions that are still accepting writes will
// get all of the writes.
// records will be reached in the now-offline partitions before metadata
// responds that the broker is offline. With the standard partitioning
// algorithms, the only recovery is if the partition is remapped or if the
// broker comes back online. With the least backup partitioner, downed
// partitions will see slight backup, but then the other partitions that are
// still accepting writes will get all of the writes and your client will not
// be blocked.
//
// Under ideal scenarios (no broker / connection issues), StickyPartitioner is
// faster than LeastBackupPartitioner due to the sticky partitioner doing less
// work while partitioning. This partitioner is only recommended if you are a
// producer consistently dealing with flaky connections or problematic brokers
// and do not mind uneven load on your brokers.
// equivalent to LeastBackupPartitioner. This partitioner is only recommended
// if you are a producer consistently dealing with flaky connections or
// problematic brokers and do not mind uneven load on your brokers.
func LeastBackupPartitioner() Partitioner {
return new(leastBackupPartitioner)
}
Expand All @@ -148,25 +162,34 @@ func (*leastBackupPartitioner) ForTopic(string) TopicPartitioner {
func newLeastBackupTopicPartitioner() leastBackupTopicPartitioner {
return leastBackupTopicPartitioner{
onPart: -1,
rng: rand.New(rand.NewSource(time.Now().UnixNano())),
}
}

type leastBackupTopicPartitioner struct {
onPart int
rng *rand.Rand
}

func (p *leastBackupTopicPartitioner) OnNewBatch() { p.onPart = -1 }
func (*leastBackupTopicPartitioner) RequiresConsistency(*Record) bool { return false }
func (*leastBackupTopicPartitioner) Partition(*Record, int) int { panic("unreachable") }

func (p *leastBackupTopicPartitioner) PartitionByBackup(_ *Record, n int, backupFn func() (int, int64)) int {
func (p *leastBackupTopicPartitioner) PartitionByBackup(_ *Record, n int, backup TopicBackupIter) int {
if p.onPart == -1 || p.onPart >= n {
leastBackup := int64(math.MaxInt64)
npicked := 0
for ; n > 0; n-- {
pick, backup := backupFn()
pick, backup := backup.Next()
if backup < leastBackup {
leastBackup = backup
p.onPart = pick
npicked = 1
} else {
npicked++ // resevoir sampling with k = 1
if p.rng.Intn(npicked) == 0 {
p.onPart = pick
}
}
}
}
Expand Down
14 changes: 6 additions & 8 deletions pkg/kgo/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,12 +408,11 @@ func (cl *Client) doPartitionRecord(parts *topicPartitions, partsData *topicPart
var pick int
tlp, _ := parts.partitioner.(TopicBackupPartitioner)
if tlp != nil {
if parts.lpInput == nil {
parts.lpInput = new(lpInput)
if parts.lb == nil {
parts.lb = new(leastBackupInput)
}
parts.lpInput.on = 0
parts.lpInput.mapping = mapping
pick = tlp.PartitionByBackup(pr.Record, len(mapping), parts.lpInput.next)
parts.lb.mapping = mapping
pick = tlp.PartitionByBackup(pr.Record, len(mapping), parts.lb)
} else {
pick = parts.partitioner.Partition(pr.Record, len(mapping))
}
Expand All @@ -429,9 +428,8 @@ func (cl *Client) doPartitionRecord(parts *topicPartitions, partsData *topicPart
parts.partitioner.OnNewBatch()

if tlp != nil {
parts.lpInput.on = 0
parts.lpInput.mapping = mapping
pick = tlp.PartitionByBackup(pr.Record, len(mapping), parts.lpInput.next)
parts.lb.mapping = mapping
pick = tlp.PartitionByBackup(pr.Record, len(mapping), parts.lb)
} else {
pick = parts.partitioner.Partition(pr.Record, len(mapping))
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/kgo/topics_and_partitions.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ type topicPartitions struct {

partsMu sync.Mutex
partitioner TopicPartitioner
lpInput *lpInput // for partitioning if the partitioner is a LoadTopicPartitioner
lb *leastBackupInput // for partitioning if the partitioner is a LoadTopicPartitioner
}

func (t *topicPartitions) load() *topicPartitionsData { return t.v.Load().(*topicPartitionsData) }
Expand Down

0 comments on commit 5be804d

Please sign in to comment.