From 7cd959c47b31722f250f7c0210c39f74db0e1f13 Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Thu, 11 Nov 2021 18:34:27 -0700 Subject: [PATCH] source: use forgotten topics for sessions Previously, if we paused a partition, we would still receive data for that partition in fetch responses because we did not signal that we are no longer consuming the topic. The would result in wasted skipped data. As well, if a partition moved from one broker to another, we never "forgot" it on the old, and hygiene wise, that meant we would still be asking for it. We now use forgotten topics properly so that if we used something previously in the session but are not now, we forget it and properly track our state. In a way, this completes KIP-227, whereas before it was mostly done. --- pkg/kgo/source.go | 73 +++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 61 insertions(+), 12 deletions(-) diff --git a/pkg/kgo/source.go b/pkg/kgo/source.go index 6086969c..4dae4090 100644 --- a/pkg/kgo/source.go +++ b/pkg/kgo/source.go @@ -432,9 +432,7 @@ func (s *source) createReq() *fetchRequest { isolationLevel: s.cl.cfg.isolationLevel, // We copy a view of the session for the request, which allows - // us to reset the source (resetting only its fields without - // modifying the prior map) while the request may be reading - // its copy of the original fields. + // modify source while the request may be reading its copy. session: s.session, } @@ -715,13 +713,6 @@ func (s *source) fetch(consumerSession *consumerSession, doneFetch chan<- struct s.session.bumpEpoch(resp.SessionID) } - // If we moved any partitions to preferred replicas, we reset the - // session. We do this after bumping the epoch just to ensure that we - // have truly reset the session. (TODO switch to usingForgottenTopics) - if len(preferreds) > 0 { - s.session.reset() - } - if updateMeta && !reloadOffsets.loadWithSessionNow(consumerSession, updateWhy) { s.cl.triggerUpdateMetadataNow(updateWhy) } @@ -773,6 +764,10 @@ func (s *source) handleReqResp(br *broker, req *fetchRequest, resp *kmsg.FetchRe // expect. topicOffsets, ok := req.usedOffsets[topic] if !ok { + s.cl.cfg.logger.Log(LogLevelWarn, "broker returned topic from fetch that we did not ask for", + "broker", logID(s.nodeID), + "topic", topic, + ) continue } @@ -786,6 +781,11 @@ func (s *source) handleReqResp(br *broker, req *fetchRequest, resp *kmsg.FetchRe partition := rp.Partition partOffset, ok := topicOffsets[partition] if !ok { + s.cl.cfg.logger.Log(LogLevelWarn, "broker returned partition from fetch that we did not ask for", + "broker", logID(s.nodeID), + "topic", topic, + "partition", partition, + ) continue } @@ -1536,12 +1536,30 @@ func (f *fetchRequest) AppendTo(dst []byte) []byte { req.SessionEpoch = f.session.epoch req.Rack = f.rack + // We track which partitions we add in this request; any partitions + // missing that are already in the session get added to forgotten + // topics at the end. + var sessionUsed map[string]map[int32]struct{} + if !f.session.killed { + sessionUsed = make(map[string]map[int32]struct{}, len(f.usedOffsets)) + } + for topic, partitions := range f.usedOffsets { var reqTopic *kmsg.FetchRequestTopic sessionTopic := f.session.lookupTopic(topic) + var usedTopic map[int32]struct{} + if sessionUsed != nil { + usedTopic = make(map[int32]struct{}, len(partitions)) + } + for partition, cursorOffsetNext := range partitions { + + if usedTopic != nil { + usedTopic[partition] = struct{}{} + } + if !sessionTopic.hasPartitionAt( partition, cursorOffsetNext.offset, @@ -1566,6 +1584,38 @@ func (f *fetchRequest) AppendTo(dst []byte) []byte { reqTopic.Partitions = append(reqTopic.Partitions, reqPartition) } } + + if sessionUsed != nil { + sessionUsed[topic] = usedTopic + } + } + + // Now for everything that we did not use in our session, add it to + // forgotten topics and remove it from the session. + if sessionUsed != nil { + for topic, partitions := range f.session.used { + var forgottenTopic *kmsg.FetchRequestForgottenTopic + topicUsed := sessionUsed[topic] + for partition := range partitions { + if topicUsed != nil { + if _, partitionUsed := topicUsed[partition]; partitionUsed { + continue + } + } + if forgottenTopic == nil { + t := kmsg.NewFetchRequestForgottenTopic() + t.Topic = topic + t.TopicID = f.topic2id[topic] + req.ForgottenTopics = append(req.ForgottenTopics, t) + forgottenTopic = &req.ForgottenTopics[len(req.ForgottenTopics)-1] + } + forgottenTopic.Partitions = append(forgottenTopic.Partitions, partition) + delete(partitions, partition) + } + if len(partitions) == 0 { + delete(f.session.used, topic) + } + } } return req.AppendTo(dst) @@ -1582,8 +1632,7 @@ func (f *fetchRequest) ResponseKind() kmsg.Response { } // fetchSessions, introduced in KIP-227, allow us to send less information back -// and forth to a Kafka broker. Rather than relying on forgotten topics to -// remove partitions from a session, we just simply reset the session. +// and forth to a Kafka broker. type fetchSession struct { id int32 epoch int32