diff --git a/pkg/kgo/client.go b/pkg/kgo/client.go index 486708c6..34c648bf 100644 --- a/pkg/kgo/client.go +++ b/pkg/kgo/client.go @@ -1645,6 +1645,13 @@ func missingOrCodeP(t string, p int32, exists bool, code int16) error { return kerr.ErrorForCode(code) } +func noLeader(t string, p int32, l int32) error { + if l < 0 { + return fmt.Errorf("topic %s partition %d has no leader according to the metadata lookup", t, p, l) + } + return nil +} + // This is a helper for the sharded requests below; if mapping metadata fails // to load topics or partitions, we group the failures by error. // @@ -1760,6 +1767,10 @@ func (cl *listOffsetsSharder) shard(ctx context.Context, kreq kmsg.Request) ([]i unknowns.err(err, t, partition) continue } + if err := noLeader(t, p.Partition, p.Leader); err != nil { + unknowns.err(err, t, partition) + continue + } brokerReq := brokerReqs[p.Leader] if brokerReq == nil { @@ -2178,6 +2189,10 @@ func (cl *deleteRecordsSharder) shard(ctx context.Context, kreq kmsg.Request) ([ unknowns.err(err, t, partition) continue } + if err := noLeader(t, p.Partition, p.Leader); err != nil { + unknowns.err(err, t, partition) + continue + } brokerReq := brokerReqs[p.Leader] if brokerReq == nil { @@ -2272,6 +2287,10 @@ func (cl *offsetForLeaderEpochSharder) shard(ctx context.Context, kreq kmsg.Requ unknowns.err(err, t, partition) continue } + if err := noLeader(t, p.Partition, p.Leader); err != nil { + unknowns.err(err, t, partition) + continue + } brokerReq := brokerReqs[p.Leader] if brokerReq == nil {