Skip to content

Commit

Permalink
source: use forgotten topics for sessions
Browse files Browse the repository at this point in the history
Previously, if we paused a partition, we would still receive data for
that partition in fetch responses because we did not signal that we are
no longer consuming the topic. The would result in wasted skipped data.

As well, if a partition moved from one broker to another, we never
"forgot" it on the old, and hygiene wise, that meant we would still be
asking for it.

We now use forgotten topics properly so that if we used something
previously in the session but are not now, we forget it and properly
track our state.

In a way, this completes KIP-227, whereas before it was mostly done.
  • Loading branch information
twmb committed Nov 12, 2021
1 parent 9a07b59 commit 7cd959c
Showing 1 changed file with 61 additions and 12 deletions.
73 changes: 61 additions & 12 deletions pkg/kgo/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -432,9 +432,7 @@ func (s *source) createReq() *fetchRequest {
isolationLevel: s.cl.cfg.isolationLevel,

// We copy a view of the session for the request, which allows
// us to reset the source (resetting only its fields without
// modifying the prior map) while the request may be reading
// its copy of the original fields.
// modify source while the request may be reading its copy.
session: s.session,
}

Expand Down Expand Up @@ -715,13 +713,6 @@ func (s *source) fetch(consumerSession *consumerSession, doneFetch chan<- struct
s.session.bumpEpoch(resp.SessionID)
}

// If we moved any partitions to preferred replicas, we reset the
// session. We do this after bumping the epoch just to ensure that we
// have truly reset the session. (TODO switch to usingForgottenTopics)
if len(preferreds) > 0 {
s.session.reset()
}

if updateMeta && !reloadOffsets.loadWithSessionNow(consumerSession, updateWhy) {
s.cl.triggerUpdateMetadataNow(updateWhy)
}
Expand Down Expand Up @@ -773,6 +764,10 @@ func (s *source) handleReqResp(br *broker, req *fetchRequest, resp *kmsg.FetchRe
// expect.
topicOffsets, ok := req.usedOffsets[topic]
if !ok {
s.cl.cfg.logger.Log(LogLevelWarn, "broker returned topic from fetch that we did not ask for",
"broker", logID(s.nodeID),
"topic", topic,
)
continue
}

Expand All @@ -786,6 +781,11 @@ func (s *source) handleReqResp(br *broker, req *fetchRequest, resp *kmsg.FetchRe
partition := rp.Partition
partOffset, ok := topicOffsets[partition]
if !ok {
s.cl.cfg.logger.Log(LogLevelWarn, "broker returned partition from fetch that we did not ask for",
"broker", logID(s.nodeID),
"topic", topic,
"partition", partition,
)
continue
}

Expand Down Expand Up @@ -1536,12 +1536,30 @@ func (f *fetchRequest) AppendTo(dst []byte) []byte {
req.SessionEpoch = f.session.epoch
req.Rack = f.rack

// We track which partitions we add in this request; any partitions
// missing that are already in the session get added to forgotten
// topics at the end.
var sessionUsed map[string]map[int32]struct{}
if !f.session.killed {
sessionUsed = make(map[string]map[int32]struct{}, len(f.usedOffsets))
}

for topic, partitions := range f.usedOffsets {

var reqTopic *kmsg.FetchRequestTopic
sessionTopic := f.session.lookupTopic(topic)

var usedTopic map[int32]struct{}
if sessionUsed != nil {
usedTopic = make(map[int32]struct{}, len(partitions))
}

for partition, cursorOffsetNext := range partitions {

if usedTopic != nil {
usedTopic[partition] = struct{}{}
}

if !sessionTopic.hasPartitionAt(
partition,
cursorOffsetNext.offset,
Expand All @@ -1566,6 +1584,38 @@ func (f *fetchRequest) AppendTo(dst []byte) []byte {
reqTopic.Partitions = append(reqTopic.Partitions, reqPartition)
}
}

if sessionUsed != nil {
sessionUsed[topic] = usedTopic
}
}

// Now for everything that we did not use in our session, add it to
// forgotten topics and remove it from the session.
if sessionUsed != nil {
for topic, partitions := range f.session.used {
var forgottenTopic *kmsg.FetchRequestForgottenTopic
topicUsed := sessionUsed[topic]
for partition := range partitions {
if topicUsed != nil {
if _, partitionUsed := topicUsed[partition]; partitionUsed {
continue
}
}
if forgottenTopic == nil {
t := kmsg.NewFetchRequestForgottenTopic()
t.Topic = topic
t.TopicID = f.topic2id[topic]
req.ForgottenTopics = append(req.ForgottenTopics, t)
forgottenTopic = &req.ForgottenTopics[len(req.ForgottenTopics)-1]
}
forgottenTopic.Partitions = append(forgottenTopic.Partitions, partition)
delete(partitions, partition)
}
if len(partitions) == 0 {
delete(f.session.used, topic)
}
}
}

return req.AppendTo(dst)
Expand All @@ -1582,8 +1632,7 @@ func (f *fetchRequest) ResponseKind() kmsg.Response {
}

// fetchSessions, introduced in KIP-227, allow us to send less information back
// and forth to a Kafka broker. Rather than relying on forgotten topics to
// remove partitions from a session, we just simply reset the session.
// and forth to a Kafka broker.
type fetchSession struct {
id int32
epoch int32
Expand Down

0 comments on commit 7cd959c

Please sign in to comment.