diff --git a/pkg/kgo/source.go b/pkg/kgo/source.go index 4b9a8f20..3d6ef59d 100644 --- a/pkg/kgo/source.go +++ b/pkg/kgo/source.go @@ -1496,6 +1496,9 @@ type fetchRequest struct { numOffsets int usedOffsets usedOffsets + torder []string // order of topics to write + porder map[string][]int32 // per topic, order of partitions to write + topic2id map[string][16]byte id2topic map[[16]byte]string @@ -1510,6 +1513,7 @@ func (f *fetchRequest) addCursor(c *cursor) { f.usedOffsets = make(usedOffsets) f.id2topic = make(map[[16]byte]string) f.topic2id = make(map[string][16]byte) + f.porder = make(map[string][]int32) } partitions := f.usedOffsets[c.topic] if partitions == nil { @@ -1517,8 +1521,10 @@ func (f *fetchRequest) addCursor(c *cursor) { f.usedOffsets[c.topic] = partitions f.id2topic[c.topicID] = c.topic f.topic2id[c.topic] = c.topicID + f.torder = append(f.torder, c.topic) } partitions[c.partition] = c.use() + f.porder[c.topic] = append(f.porder[c.topic], c.partition) f.numOffsets++ } @@ -1547,7 +1553,8 @@ func (f *fetchRequest) AppendTo(dst []byte) []byte { sessionUsed = make(map[string]map[int32]struct{}, len(f.usedOffsets)) } - for topic, partitions := range f.usedOffsets { + for _, topic := range f.torder { + partitions := f.usedOffsets[topic] var reqTopic *kmsg.FetchRequestTopic sessionTopic := f.session.lookupTopic(topic) @@ -1557,7 +1564,8 @@ func (f *fetchRequest) AppendTo(dst []byte) []byte { usedTopic = make(map[int32]struct{}, len(partitions)) } - for partition, cursorOffsetNext := range partitions { + for _, partition := range f.porder[topic] { + cursorOffsetNext := partitions[partition] if usedTopic != nil { usedTopic[partition] = struct{}{}