Skip to content

Commit

Permalink
support KIP-516 for Fetch
Browse files Browse the repository at this point in the history
This is fairly simple for the fetch request if we do not consider a
topic being deleted and recreated. If we do, then things get complicated
in the metadata code.

I'm not sure we want to support from a client a topic being deleted and
recreated, so we will punt on that until more of KIP-516 comes into
play.
  • Loading branch information
twmb committed Jul 10, 2021
1 parent 4fdc7e0 commit ebf2f07
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 14 deletions.
20 changes: 16 additions & 4 deletions generate/definitions/01_fetch
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@
// Note that starting in v3, Kafka began processing partitions in order,
// meaning the order of partitions in the fetch request is important due to
// potential size constraints.
FetchRequest => key 1, max version 12, flexible v12+
//
// Starting in v13, topics must use UUIDs rather than their string name
// identifiers.
FetchRequest => key 1, max version 13, flexible v12+
// The cluster ID, if known. This is used to validate metadata fetches
// prior to broker registration.
ClusterID: nullable-string(null) // tag 0
Expand Down Expand Up @@ -48,7 +51,9 @@ FetchRequest => key 1, max version 12, flexible v12+
// Topic contains topics to try to fetch records for.
Topics: [=>]
// Topic is a topic to try to fetch records for.
Topic: string
Topic: string // v0-v12
// TopicID is the uuid of the topic to fetch records for.
TopicID: uuid // v13+
// Partitions contains partitions in a topic to try to fetch records for.
Partitions: [=>]
// Partition is a partition in a topic to try to fetch records for.
Expand Down Expand Up @@ -78,7 +83,10 @@ FetchRequest => key 1, max version 12, flexible v12+
// See KIP-227 for more details.
ForgottenTopics: [=>] // v7+
// Topic is a topic to remove from being tracked (with the partitions below).
Topic: string
Topic: string // v7-v12
// TopicID is the uuid of a topic to remove from being tracked (with the
// partitions below).
TopicID: uuid // v13+
// Partitions are partitions to remove from tracking for a topic.
Partitions: [int32]
// Rack of the consumer making this request (see KIP-392; introduced in
Expand Down Expand Up @@ -106,7 +114,9 @@ FetchResponse =>
// for them.
Topics: [=>]
// Topic is a topic that records may have been received for.
Topic: string
Topic: string // v0-v12
// TopicID is the uuid of a topic that records may have been received for.
TopicID: uuid // v13+
// Partitions contains partitions in a topic that records may have
// been received for.
Partitions: [=>]
Expand Down Expand Up @@ -147,6 +157,8 @@ FetchResponse =>
//
// OFFSET_OUT_OF_RANGE is returned if requesting an offset past the
// current end offset or before the beginning offset.
//
// UNKNOWN_TOPIC_ID is returned if using uuid's and the uuid is unknown.
ErrorCode: int16
// HighWatermark is the current high watermark for this partition,
// that is, the current offset that is on all in sync replicas.
Expand Down
1 change: 1 addition & 0 deletions pkg/kgo/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,7 @@ func (cl *Client) fetchTopicMetadata(all bool, reqTopics []string) (map[string]*

cursor: &cursor{
topic: topicMeta.Topic,
topicID: topicMeta.TopicID,
partition: partMeta.Partition,
keepControl: cl.cfg.keepControl,
cursorsIdx: -1,
Expand Down
18 changes: 17 additions & 1 deletion pkg/kgo/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ func (s *source) removeCursor(rm *cursor) {
// cursor is where we are consuming from for an individual partition.
type cursor struct {
topic string
topicID [16]byte
partition int32

keepControl bool // whether to keep control records
Expand Down Expand Up @@ -684,8 +685,15 @@ func (s *source) handleReqResp(br *broker, req *fetchRequest, resp *kmsg.FetchRe
preferreds []cursorOffsetPreferred
updateMeta bool
)

for _, rt := range resp.Topics {
topic := rt.Topic
// v13 only uses topic IDs, so we have to map the response
// uuid's to our string topics.
if resp.Version >= 13 {
topic = req.id2topic[rt.TopicID]
}

// We always include all cursors on this source in the fetch;
// we should not receive any topics or partitions we do not
// expect.
Expand Down Expand Up @@ -1392,6 +1400,9 @@ type fetchRequest struct {
numOffsets int
usedOffsets usedOffsets

topic2id map[string][16]byte
id2topic map[[16]byte]string

// Session is a copy of the source session at the time a request is
// built. If the source is reset, the session it has is reset at the
// field level only. Our view of the original session is still valid.
Expand All @@ -1401,11 +1412,15 @@ type fetchRequest struct {
func (f *fetchRequest) addCursor(c *cursor) {
if f.usedOffsets == nil {
f.usedOffsets = make(usedOffsets)
f.id2topic = make(map[[16]byte]string)
f.topic2id = make(map[string][16]byte)
}
partitions := f.usedOffsets[c.topic]
if partitions == nil {
partitions = make(map[int32]*cursorOffsetNext)
f.usedOffsets[c.topic] = partitions
f.id2topic[c.topicID] = c.topic
f.topic2id[c.topic] = c.topicID
}
partitions[c.partition] = c.use()
f.numOffsets++
Expand Down Expand Up @@ -1443,7 +1458,8 @@ func (f *fetchRequest) AppendTo(dst []byte) []byte {

if reqTopic == nil {
req.Topics = append(req.Topics, kmsg.FetchRequestTopic{
Topic: topic,
Topic: topic,
TopicID: f.topic2id[topic],
})
reqTopic = &req.Topics[len(req.Topics)-1]
}
Expand Down
57 changes: 48 additions & 9 deletions pkg/kmsg/generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit ebf2f07

Please sign in to comment.