From bc391a31c0eb2ad202f6636e37d9326475b6e706 Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Mon, 6 Dec 2021 22:07:59 -0700 Subject: [PATCH] source: rotate through topics/partitions as we fetch We previously rotated through partitions as we evaluated whether they should be added to fetch requests, but this was meaningless because the partitions are added to a two-layer map, which has random, mostly not-actually-random range ordering. We now keep our evaluated cursors in order, meaning we do eventually rotate through partitions in order. Per topic, the partitions rotate until we get have one full rotation, and then we rotate topics. Around it goes. --- pkg/kgo/source.go | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/pkg/kgo/source.go b/pkg/kgo/source.go index 4b9a8f20..3d6ef59d 100644 --- a/pkg/kgo/source.go +++ b/pkg/kgo/source.go @@ -1496,6 +1496,9 @@ type fetchRequest struct { numOffsets int usedOffsets usedOffsets + torder []string // order of topics to write + porder map[string][]int32 // per topic, order of partitions to write + topic2id map[string][16]byte id2topic map[[16]byte]string @@ -1510,6 +1513,7 @@ func (f *fetchRequest) addCursor(c *cursor) { f.usedOffsets = make(usedOffsets) f.id2topic = make(map[[16]byte]string) f.topic2id = make(map[string][16]byte) + f.porder = make(map[string][]int32) } partitions := f.usedOffsets[c.topic] if partitions == nil { @@ -1517,8 +1521,10 @@ func (f *fetchRequest) addCursor(c *cursor) { f.usedOffsets[c.topic] = partitions f.id2topic[c.topicID] = c.topic f.topic2id[c.topic] = c.topicID + f.torder = append(f.torder, c.topic) } partitions[c.partition] = c.use() + f.porder[c.topic] = append(f.porder[c.topic], c.partition) f.numOffsets++ } @@ -1547,7 +1553,8 @@ func (f *fetchRequest) AppendTo(dst []byte) []byte { sessionUsed = make(map[string]map[int32]struct{}, len(f.usedOffsets)) } - for topic, partitions := range f.usedOffsets { + for _, topic := range f.torder { + partitions := f.usedOffsets[topic] var reqTopic *kmsg.FetchRequestTopic sessionTopic := f.session.lookupTopic(topic) @@ -1557,7 +1564,8 @@ func (f *fetchRequest) AppendTo(dst []byte) []byte { usedTopic = make(map[int32]struct{}, len(partitions)) } - for partition, cursorOffsetNext := range partitions { + for _, partition := range f.porder[topic] { + cursorOffsetNext := partitions[partition] if usedTopic != nil { usedTopic[partition] = struct{}{}