From f9cd6255555eb3512eed891e1d30032948a3c592 Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Sun, 20 Feb 2022 22:01:04 -0700 Subject: [PATCH] consuming: handle exact offset consuming better Previously, if an exact offset was before the start offset or after the end, the client would loop on OffsetOutOfRange errors. Now, we validate the bounds and reset either to the start offset or the end offset if the exact offsetis too low or too high, respectively. We also better document this behavior. --- pkg/kgo/config.go | 8 +- pkg/kgo/consumer.go | 189 +++++++++++++++++++++++++++++++++++++++----- 2 files changed, 176 insertions(+), 21 deletions(-) diff --git a/pkg/kgo/config.go b/pkg/kgo/config.go index 17552e96..fdb6a5a5 100644 --- a/pkg/kgo/config.go +++ b/pkg/kgo/config.go @@ -1165,9 +1165,13 @@ func MaxConcurrentFetches(n int) ConsumerOpt { // ConsumeResetOffset sets the offset to restart consuming from when a // partition has no commits (for groups) or when beginning to consume a // partition (for direct partition consuming), or when a fetch sees an -// OffsetOutOfRange error, overriding the default ConsumeStartOffset. +// OffsetOutOfRange error, overriding the default NewOffset().AtStart(), i.e., +// the earliest offset. // -// Defaults to: NewOffset().AtStart() / Earliest Offset +// If you are choosing an exact offset to reset to (NewOffset.At(#)), if the +// offset is before the partition's log start offset or after the high +// watermark, this will reset to the start offset or end offset, respectively. +// Relative offsets are only obeyed if they fall within bounds. func ConsumeResetOffset(offset Offset) ConsumerOpt { return consumerOpt{func(cfg *cfg) { cfg.resetOffset = offset }} } diff --git a/pkg/kgo/consumer.go b/pkg/kgo/consumer.go index 91ea5ee6..672827c5 100644 --- a/pkg/kgo/consumer.go +++ b/pkg/kgo/consumer.go @@ -3,6 +3,7 @@ package kgo import ( "context" "fmt" + "sort" "strings" "sync" "sync/atomic" @@ -1497,15 +1498,117 @@ func (l *loadedOffsets) addAll(as []loadedOffset) loadedOffsets { func (cl *Client) listOffsetsForBrokerLoad(ctx context.Context, broker *broker, load offsetLoadMap, tps *topicsPartitions, results chan<- loadedOffsets) { loaded := loadedOffsets{broker: broker.meta.NodeID, loadType: loadTypeList} - kresp, err := broker.waitResp(ctx, load.buildListReq(cl.cfg.isolationLevel)) - if err != nil { + req1, req2 := load.buildListReq(cl.cfg.isolationLevel) + var ( + wg sync.WaitGroup + kresp2 kmsg.Response + err2 error + ) + if req2 != nil { + wg.Add(1) + go func() { + defer wg.Done() + kresp2, err2 = broker.waitResp(ctx, req2) + }() + } + kresp, err := broker.waitResp(ctx, req1) + wg.Wait() + if err != nil || err2 != nil { results <- loaded.addAll(load.errToLoaded(err)) return } topics := tps.load() resp := kresp.(*kmsg.ListOffsetsResponse) - for _, rTopic := range resp.Topics { + + // If we issued a second req to check that an exact offset is in + // bounds, then regrettably for safety, we have to ensure that the + // shapes of both responses match, and the topic & partition at each + // index matches. Anything that does not match is skipped (and would be + // a bug from Kafka), and we at the end return UnknownTopicOrPartition. + var resp2 *kmsg.ListOffsetsResponse + if req2 != nil { + resp2 = kresp2.(*kmsg.ListOffsetsResponse) + for _, r := range []*kmsg.ListOffsetsResponse{ + resp, + resp2, + } { + ts := r.Topics + sort.Slice(ts, func(i, j int) bool { + return ts[i].Topic < ts[j].Topic + }) + for i := range ts { + ps := ts[i].Partitions + sort.Slice(ps, func(i, j int) bool { + return ps[i].Partition < ps[j].Partition + }) + } + } + + lt := resp.Topics + rt := resp2.Topics + lkeept := lt[:0] + rkeept := rt[:0] + // Over each response, we only keep the topic if the topics match. + for len(lt) > 0 && len(rt) > 0 { + if lt[0].Topic < rt[0].Topic { + lt = lt[1:] + continue + } + if rt[0].Topic < lt[0].Topic { + rt = rt[1:] + continue + } + // As well, for topics that match, we only keep + // partitions that match. In this case, we also want + // both partitions to be error free, otherwise we keep + // an error on both. If one has old style offsets, + // both must. + lp := lt[0].Partitions + rp := rt[0].Partitions + lkeepp := lp[:0] + rkeepp := rp[:0] + for len(lp) > 0 && len(rp) > 0 { + if lp[0].Partition < rp[0].Partition { + lp = lp[1:] + continue + } + if rp[0].Partition < lp[0].Partition { + rp = rp[1:] + continue + } + if len(lp[0].OldStyleOffsets) > 0 && len(rp[0].OldStyleOffsets) == 0 || + len(lp[0].OldStyleOffsets) == 0 && len(rp[0].OldStyleOffsets) > 0 { + lp = lp[1:] + rp = rp[1:] + continue + } + if lp[0].ErrorCode != 0 { + rp[0].ErrorCode = lp[0].ErrorCode + } else if rp[0].ErrorCode != 0 { + lp[0].ErrorCode = rp[0].ErrorCode + } + lkeepp = append(lkeepp, lp[0]) + rkeepp = append(rkeepp, rp[0]) + lp = lp[1:] + rp = rp[1:] + } + // Now we update the partitions in the topic we are + // keeping, and keep our topic. + lt[0].Partitions = lkeepp + rt[0].Partitions = rkeepp + lkeept = append(lkeept, lt[0]) + rkeept = append(rkeept, rt[0]) + lt = lt[1:] + rt = rt[1:] + } + // Finally, update each response with the topics we kept. The + // shapes and indices are the same. + resp.Topics = lkeept + resp2.Topics = rkeept + } + + for i, rTopic := range resp.Topics { topic := rTopic.Topic loadParts, ok := load[topic] if !ok { @@ -1513,7 +1616,7 @@ func (cl *Client) listOffsetsForBrokerLoad(ctx context.Context, broker *broker, } topicPartitions := topics.loadTopic(topic) // must be non-nil at this point - for _, rPartition := range rTopic.Partitions { + for j, rPartition := range rTopic.Partitions { partition := rPartition.Partition loadPart, ok := loadParts[partition] if !ok { @@ -1540,15 +1643,41 @@ func (cl *Client) listOffsetsForBrokerLoad(ctx context.Context, broker *broker, delete(load, topic) } - offset := rPartition.Offset + loadPart.relative - if len(rPartition.OldStyleOffsets) > 0 { // if we have any, we used list offsets v0 - offset = rPartition.OldStyleOffsets[0] + loadPart.relative + offset := rPartition.Offset + if len(rPartition.OldStyleOffsets) > 0 { + offset = rPartition.OldStyleOffsets[0] // list offsets v0 } + + // If the user requested an exact offset, we asked for + // both the start and end offsets. We validate the + // exact offset (delta any relative) is within bounds. + // + // For start & end, we only obey relative if it is + // positive to the start or negative to the end. if loadPart.at >= 0 { - offset = loadPart.at + loadPart.relative // we obey exact requests, even if they end up past the end + // We ensured the resp2 shape is as we want, so + // these lookups are safe and the partition has + // no error. + pend := &resp2.Topics[i].Partitions[j] + start := offset + end := pend.Offset + if len(pend.OldStyleOffsets) > 0 { + end = pend.OldStyleOffsets[0] + } + rel := loadPart.at + loadPart.relative + if rel >= start { + offset = rel + } + if rel >= end { + offset = end + } + } else if loadPart.at == -2 && loadPart.relative > 0 { + offset += loadPart.relative + } else if loadPart.at == -1 && loadPart.relative < 0 { + offset += loadPart.relative } if offset < 0 { - offset = 0 + offset = 0 // sanity } loaded.add(loadedOffset{ @@ -1638,21 +1767,27 @@ func (cl *Client) loadEpochsForBrokerLoad(ctx context.Context, broker *broker, l results <- loaded.addAll(load.errToLoaded(kerr.UnknownTopicOrPartition)) } -func (o offsetLoadMap) buildListReq(isolationLevel int8) *kmsg.ListOffsetsRequest { - req := kmsg.NewPtrListOffsetsRequest() - req.ReplicaID = -1 - req.IsolationLevel = isolationLevel - req.Topics = make([]kmsg.ListOffsetsRequestTopic, 0, len(o)) +// In general this returns one request, but if the user is using exact offsets +// rather than start/end, then we issue both the start and end requests to +// ensure the user's requested offset is within bounds. +func (o offsetLoadMap) buildListReq(isolationLevel int8) (r1, r2 *kmsg.ListOffsetsRequest) { + r1 = kmsg.NewPtrListOffsetsRequest() + r1.ReplicaID = -1 + r1.IsolationLevel = isolationLevel + r1.Topics = make([]kmsg.ListOffsetsRequestTopic, 0, len(o)) + var createEnd bool for topic, partitions := range o { parts := make([]kmsg.ListOffsetsRequestTopicPartition, 0, len(partitions)) for partition, offset := range partitions { // If this partition is using an exact offset request, // then we are listing for a partition that was not yet - // loaded by the client (due to metadata). We use -1 - // just to ensure the partition is loaded. + // loaded by the client (due to metadata). We use -2 + // just to ensure things are loaded and to ensure we + // load the start offset to validate lower bounds. timestamp := offset.at if timestamp >= 0 { - timestamp = -1 + timestamp = -2 + createEnd = true } p := kmsg.NewListOffsetsRequestTopicPartition() p.Partition = partition @@ -1665,9 +1800,25 @@ func (o offsetLoadMap) buildListReq(isolationLevel int8) *kmsg.ListOffsetsReques t := kmsg.NewListOffsetsRequestTopic() t.Topic = topic t.Partitions = parts - req.Topics = append(req.Topics, t) + r1.Topics = append(r1.Topics, t) + } + + if createEnd { + r2 = kmsg.NewPtrListOffsetsRequest() + *r2 = *r1 + r2.Topics = append([]kmsg.ListOffsetsRequestTopic(nil), r1.Topics...) + for i := range r1.Topics { + l := &r2.Topics[i] + r := &r1.Topics[i] + *l = *r + l.Partitions = append([]kmsg.ListOffsetsRequestTopicPartition(nil), r.Partitions...) + for i := range l.Partitions { + l.Partitions[i].Timestamp = -1 + } + } } - return req + + return r1, r2 } func (o offsetLoadMap) buildEpochReq() *kmsg.OffsetForLeaderEpochRequest {