From ebf2f074423813d0220622721ee4438aa97c50a3 Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Sat, 10 Jul 2021 01:02:33 -0600 Subject: [PATCH] support KIP-516 for Fetch This is fairly simple for the fetch request if we do not consider a topic being deleted and recreated. If we do, then things get complicated in the metadata code. I'm not sure we want to support from a client a topic being deleted and recreated, so we will punt on that until more of KIP-516 comes into play. --- generate/definitions/01_fetch | 20 +++++++++--- pkg/kgo/metadata.go | 1 + pkg/kgo/source.go | 18 ++++++++++- pkg/kmsg/generated.go | 57 +++++++++++++++++++++++++++++------ 4 files changed, 82 insertions(+), 14 deletions(-) diff --git a/generate/definitions/01_fetch b/generate/definitions/01_fetch index 198fc8e8..17a53ee8 100644 --- a/generate/definitions/01_fetch +++ b/generate/definitions/01_fetch @@ -7,7 +7,10 @@ // Note that starting in v3, Kafka began processing partitions in order, // meaning the order of partitions in the fetch request is important due to // potential size constraints. -FetchRequest => key 1, max version 12, flexible v12+ +// +// Starting in v13, topics must use UUIDs rather than their string name +// identifiers. +FetchRequest => key 1, max version 13, flexible v12+ // The cluster ID, if known. This is used to validate metadata fetches // prior to broker registration. ClusterID: nullable-string(null) // tag 0 @@ -48,7 +51,9 @@ FetchRequest => key 1, max version 12, flexible v12+ // Topic contains topics to try to fetch records for. Topics: [=>] // Topic is a topic to try to fetch records for. - Topic: string + Topic: string // v0-v12 + // TopicID is the uuid of the topic to fetch records for. + TopicID: uuid // v13+ // Partitions contains partitions in a topic to try to fetch records for. Partitions: [=>] // Partition is a partition in a topic to try to fetch records for. @@ -78,7 +83,10 @@ FetchRequest => key 1, max version 12, flexible v12+ // See KIP-227 for more details. ForgottenTopics: [=>] // v7+ // Topic is a topic to remove from being tracked (with the partitions below). - Topic: string + Topic: string // v7-v12 + // TopicID is the uuid of a topic to remove from being tracked (with the + // partitions below). + TopicID: uuid // v13+ // Partitions are partitions to remove from tracking for a topic. Partitions: [int32] // Rack of the consumer making this request (see KIP-392; introduced in @@ -106,7 +114,9 @@ FetchResponse => // for them. Topics: [=>] // Topic is a topic that records may have been received for. - Topic: string + Topic: string // v0-v12 + // TopicID is the uuid of a topic that records may have been received for. + TopicID: uuid // v13+ // Partitions contains partitions in a topic that records may have // been received for. Partitions: [=>] @@ -147,6 +157,8 @@ FetchResponse => // // OFFSET_OUT_OF_RANGE is returned if requesting an offset past the // current end offset or before the beginning offset. + // + // UNKNOWN_TOPIC_ID is returned if using uuid's and the uuid is unknown. ErrorCode: int16 // HighWatermark is the current high watermark for this partition, // that is, the current offset that is on all in sync replicas. diff --git a/pkg/kgo/metadata.go b/pkg/kgo/metadata.go index cac6cdfa..ae552740 100644 --- a/pkg/kgo/metadata.go +++ b/pkg/kgo/metadata.go @@ -394,6 +394,7 @@ func (cl *Client) fetchTopicMetadata(all bool, reqTopics []string) (map[string]* cursor: &cursor{ topic: topicMeta.Topic, + topicID: topicMeta.TopicID, partition: partMeta.Partition, keepControl: cl.cfg.keepControl, cursorsIdx: -1, diff --git a/pkg/kgo/source.go b/pkg/kgo/source.go index b70b2832..cb0ba9a9 100644 --- a/pkg/kgo/source.go +++ b/pkg/kgo/source.go @@ -90,6 +90,7 @@ func (s *source) removeCursor(rm *cursor) { // cursor is where we are consuming from for an individual partition. type cursor struct { topic string + topicID [16]byte partition int32 keepControl bool // whether to keep control records @@ -684,8 +685,15 @@ func (s *source) handleReqResp(br *broker, req *fetchRequest, resp *kmsg.FetchRe preferreds []cursorOffsetPreferred updateMeta bool ) + for _, rt := range resp.Topics { topic := rt.Topic + // v13 only uses topic IDs, so we have to map the response + // uuid's to our string topics. + if resp.Version >= 13 { + topic = req.id2topic[rt.TopicID] + } + // We always include all cursors on this source in the fetch; // we should not receive any topics or partitions we do not // expect. @@ -1392,6 +1400,9 @@ type fetchRequest struct { numOffsets int usedOffsets usedOffsets + topic2id map[string][16]byte + id2topic map[[16]byte]string + // Session is a copy of the source session at the time a request is // built. If the source is reset, the session it has is reset at the // field level only. Our view of the original session is still valid. @@ -1401,11 +1412,15 @@ type fetchRequest struct { func (f *fetchRequest) addCursor(c *cursor) { if f.usedOffsets == nil { f.usedOffsets = make(usedOffsets) + f.id2topic = make(map[[16]byte]string) + f.topic2id = make(map[string][16]byte) } partitions := f.usedOffsets[c.topic] if partitions == nil { partitions = make(map[int32]*cursorOffsetNext) f.usedOffsets[c.topic] = partitions + f.id2topic[c.topicID] = c.topic + f.topic2id[c.topic] = c.topicID } partitions[c.partition] = c.use() f.numOffsets++ @@ -1443,7 +1458,8 @@ func (f *fetchRequest) AppendTo(dst []byte) []byte { if reqTopic == nil { req.Topics = append(req.Topics, kmsg.FetchRequestTopic{ - Topic: topic, + Topic: topic, + TopicID: f.topic2id[topic], }) reqTopic = &req.Topics[len(req.Topics)-1] } diff --git a/pkg/kmsg/generated.go b/pkg/kmsg/generated.go index 3f999e2a..857d20c7 100644 --- a/pkg/kmsg/generated.go +++ b/pkg/kmsg/generated.go @@ -3059,6 +3059,9 @@ type FetchRequestTopic struct { // Topic is a topic to try to fetch records for. Topic string + // TopicID is the uuid of the topic to fetch records for. + TopicID [16]byte // v13+ + // Partitions contains partitions in a topic to try to fetch records for. Partitions []FetchRequestTopicPartition @@ -3082,7 +3085,11 @@ func NewFetchRequestTopic() FetchRequestTopic { type FetchRequestForgottenTopic struct { // Topic is a topic to remove from being tracked (with the partitions below). - Topic string + Topic string // v7+ + + // TopicID is the uuid of a topic to remove from being tracked (with the + // partitions below). + TopicID [16]byte // v13+ // Partitions are partitions to remove from tracking for a topic. Partitions []int32 @@ -3114,6 +3121,9 @@ func NewFetchRequestForgottenTopic() FetchRequestForgottenTopic { // Note that starting in v3, Kafka began processing partitions in order, // meaning the order of partitions in the fetch request is important due to // potential size constraints. +// +// Starting in v13, topics must use UUIDs rather than their string name +// identifiers. type FetchRequest struct { // Version is the version of this message used with a Kafka broker. Version int16 @@ -3188,7 +3198,7 @@ type FetchRequest struct { } func (*FetchRequest) Key() int16 { return 1 } -func (*FetchRequest) MaxVersion() int16 { return 12 } +func (*FetchRequest) MaxVersion() int16 { return 13 } func (v *FetchRequest) SetVersion(version int16) { v.Version = version } func (v *FetchRequest) GetVersion() int16 { return v.Version } func (v *FetchRequest) IsFlexible() bool { return v.Version >= 12 } @@ -3245,7 +3255,7 @@ func (v *FetchRequest) AppendTo(dst []byte) []byte { } for i := range v { v := &v[i] - { + if version >= 0 && version <= 12 { v := v.Topic if isFlexible { dst = kbin.AppendCompactString(dst, v) @@ -3253,6 +3263,10 @@ func (v *FetchRequest) AppendTo(dst []byte) []byte { dst = kbin.AppendString(dst, v) } } + if version >= 13 { + v := v.TopicID + dst = kbin.AppendUuid(dst, v) + } { v := v.Partitions if isFlexible { @@ -3307,7 +3321,7 @@ func (v *FetchRequest) AppendTo(dst []byte) []byte { } for i := range v { v := &v[i] - { + if version >= 7 && version <= 12 { v := v.Topic if isFlexible { dst = kbin.AppendCompactString(dst, v) @@ -3315,6 +3329,10 @@ func (v *FetchRequest) AppendTo(dst []byte) []byte { dst = kbin.AppendString(dst, v) } } + if version >= 13 { + v := v.TopicID + dst = kbin.AppendUuid(dst, v) + } { v := v.Partitions if isFlexible { @@ -3429,7 +3447,7 @@ func (v *FetchRequest) ReadFrom(src []byte) error { v := &a[i] v.Default() s := v - { + if version >= 0 && version <= 12 { var v string if isFlexible { v = b.CompactString() @@ -3438,6 +3456,10 @@ func (v *FetchRequest) ReadFrom(src []byte) error { } s.Topic = v } + if version >= 13 { + v := b.Uuid() + s.TopicID = v + } { v := s.Partitions a := v @@ -3514,7 +3536,7 @@ func (v *FetchRequest) ReadFrom(src []byte) error { v := &a[i] v.Default() s := v - { + if version >= 7 && version <= 12 { var v string if isFlexible { v = b.CompactString() @@ -3523,6 +3545,10 @@ func (v *FetchRequest) ReadFrom(src []byte) error { } s.Topic = v } + if version >= 13 { + v := b.Uuid() + s.TopicID = v + } { v := s.Partitions a := v @@ -3757,6 +3783,8 @@ type FetchResponseTopicPartition struct { // // OFFSET_OUT_OF_RANGE is returned if requesting an offset past the // current end offset or before the beginning offset. + // + // UNKNOWN_TOPIC_ID is returned if using uuid's and the uuid is unknown. ErrorCode int16 // HighWatermark is the current high watermark for this partition, @@ -3861,6 +3889,9 @@ type FetchResponseTopic struct { // Topic is a topic that records may have been received for. Topic string + // TopicID is the uuid of a topic that records may have been received for. + TopicID [16]byte // v13+ + // Partitions contains partitions in a topic that records may have // been received for. Partitions []FetchResponseTopicPartition @@ -3922,7 +3953,7 @@ type FetchResponse struct { } func (*FetchResponse) Key() int16 { return 1 } -func (*FetchResponse) MaxVersion() int16 { return 12 } +func (*FetchResponse) MaxVersion() int16 { return 13 } func (v *FetchResponse) SetVersion(version int16) { v.Version = version } func (v *FetchResponse) GetVersion() int16 { return v.Version } func (v *FetchResponse) IsFlexible() bool { return v.Version >= 12 } @@ -3955,7 +3986,7 @@ func (v *FetchResponse) AppendTo(dst []byte) []byte { } for i := range v { v := &v[i] - { + if version >= 0 && version <= 12 { v := v.Topic if isFlexible { dst = kbin.AppendCompactString(dst, v) @@ -3963,6 +3994,10 @@ func (v *FetchResponse) AppendTo(dst []byte) []byte { dst = kbin.AppendString(dst, v) } } + if version >= 13 { + v := v.TopicID + dst = kbin.AppendUuid(dst, v) + } { v := v.Partitions if isFlexible { @@ -4186,7 +4221,7 @@ func (v *FetchResponse) ReadFrom(src []byte) error { v := &a[i] v.Default() s := v - { + if version >= 0 && version <= 12 { var v string if isFlexible { v = b.CompactString() @@ -4195,6 +4230,10 @@ func (v *FetchResponse) ReadFrom(src []byte) error { } s.Topic = v } + if version >= 13 { + v := b.Uuid() + s.TopicID = v + } { v := s.Partitions a := v