Skip to content

Commit

Permalink
source: rotate through topics/partitions as we fetch
Browse files Browse the repository at this point in the history
We previously rotated through partitions as we evaluated whether they
should be added to fetch requests, but this was meaningless because the
partitions are added to a two-layer map, which has random, mostly
not-actually-random range ordering.

We now keep our evaluated cursors in order, meaning we do eventually
rotate through partitions in order. Per topic, the partitions rotate
until we get have one full rotation, and then we rotate topics. Around
it goes.
  • Loading branch information
twmb committed Dec 7, 2021
1 parent 6bbdaa2 commit bc391a3
Showing 1 changed file with 10 additions and 2 deletions.
12 changes: 10 additions & 2 deletions pkg/kgo/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -1496,6 +1496,9 @@ type fetchRequest struct {
numOffsets int
usedOffsets usedOffsets

torder []string // order of topics to write
porder map[string][]int32 // per topic, order of partitions to write

topic2id map[string][16]byte
id2topic map[[16]byte]string

Expand All @@ -1510,15 +1513,18 @@ func (f *fetchRequest) addCursor(c *cursor) {
f.usedOffsets = make(usedOffsets)
f.id2topic = make(map[[16]byte]string)
f.topic2id = make(map[string][16]byte)
f.porder = make(map[string][]int32)
}
partitions := f.usedOffsets[c.topic]
if partitions == nil {
partitions = make(map[int32]*cursorOffsetNext)
f.usedOffsets[c.topic] = partitions
f.id2topic[c.topicID] = c.topic
f.topic2id[c.topic] = c.topicID
f.torder = append(f.torder, c.topic)
}
partitions[c.partition] = c.use()
f.porder[c.topic] = append(f.porder[c.topic], c.partition)
f.numOffsets++
}

Expand Down Expand Up @@ -1547,7 +1553,8 @@ func (f *fetchRequest) AppendTo(dst []byte) []byte {
sessionUsed = make(map[string]map[int32]struct{}, len(f.usedOffsets))
}

for topic, partitions := range f.usedOffsets {
for _, topic := range f.torder {
partitions := f.usedOffsets[topic]

var reqTopic *kmsg.FetchRequestTopic
sessionTopic := f.session.lookupTopic(topic)
Expand All @@ -1557,7 +1564,8 @@ func (f *fetchRequest) AppendTo(dst []byte) []byte {
usedTopic = make(map[int32]struct{}, len(partitions))
}

for partition, cursorOffsetNext := range partitions {
for _, partition := range f.porder[topic] {
cursorOffsetNext := partitions[partition]

if usedTopic != nil {
usedTopic[partition] = struct{}{}
Expand Down

0 comments on commit bc391a3

Please sign in to comment.