Skip to content

Commit fcd0ea5

Browse files
authored
fix: track and supply leader epoch to FetchRequest (#2389)
Include the consumer's view of the leader in Fetch requests as per KIP-320 Contributes-to: #2365
1 parent 72ea327 commit fcd0ea5

File tree

5 files changed

+48
-30
lines changed

5 files changed

+48
-30
lines changed

client.go

+19-11
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,10 @@ type Client interface {
5050
// topic/partition, as determined by querying the cluster metadata.
5151
Leader(topic string, partitionID int32) (*Broker, error)
5252

53+
// LeaderAndEpoch returns the the leader and its epoch for the current
54+
// topic/partition, as determined by querying the cluster metadata.
55+
LeaderAndEpoch(topic string, partitionID int32) (*Broker, int32, error)
56+
5357
// Replicas returns the set of all replica IDs for the given partition.
5458
Replicas(topic string, partitionID int32) ([]int32, error)
5559

@@ -452,21 +456,25 @@ func (client *client) OfflineReplicas(topic string, partitionID int32) ([]int32,
452456
}
453457

454458
func (client *client) Leader(topic string, partitionID int32) (*Broker, error) {
459+
leader, _, err := client.LeaderAndEpoch(topic, partitionID)
460+
return leader, err
461+
}
462+
463+
func (client *client) LeaderAndEpoch(topic string, partitionID int32) (*Broker, int32, error) {
455464
if client.Closed() {
456-
return nil, ErrClosedClient
465+
return nil, -1, ErrClosedClient
457466
}
458467

459-
leader, err := client.cachedLeader(topic, partitionID)
460-
468+
leader, epoch, err := client.cachedLeader(topic, partitionID)
461469
if leader == nil {
462470
err = client.RefreshMetadata(topic)
463471
if err != nil {
464-
return nil, err
472+
return nil, -1, err
465473
}
466-
leader, err = client.cachedLeader(topic, partitionID)
474+
leader, epoch, err = client.cachedLeader(topic, partitionID)
467475
}
468476

469-
return leader, err
477+
return leader, epoch, err
470478
}
471479

472480
func (client *client) RefreshBrokers(addrs []string) error {
@@ -848,7 +856,7 @@ func (client *client) setPartitionCache(topic string, partitionSet partitionType
848856
return ret
849857
}
850858

851-
func (client *client) cachedLeader(topic string, partitionID int32) (*Broker, error) {
859+
func (client *client) cachedLeader(topic string, partitionID int32) (*Broker, int32, error) {
852860
client.lock.RLock()
853861
defer client.lock.RUnlock()
854862

@@ -857,18 +865,18 @@ func (client *client) cachedLeader(topic string, partitionID int32) (*Broker, er
857865
metadata, ok := partitions[partitionID]
858866
if ok {
859867
if errors.Is(metadata.Err, ErrLeaderNotAvailable) {
860-
return nil, ErrLeaderNotAvailable
868+
return nil, -1, ErrLeaderNotAvailable
861869
}
862870
b := client.brokers[metadata.Leader]
863871
if b == nil {
864-
return nil, ErrLeaderNotAvailable
872+
return nil, -1, ErrLeaderNotAvailable
865873
}
866874
_ = b.Open(client.conf)
867-
return b, nil
875+
return b, metadata.LeaderEpoch, nil
868876
}
869877
}
870878

871-
return nil, ErrUnknownTopicOrPartition
879+
return nil, -1, ErrUnknownTopicOrPartition
872880
}
873881

874882
func (client *client) getOffset(topic string, partitionID int32, time int64) (int64, error) {

consumer.go

+18-11
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,7 @@ func (c *consumer) ConsumePartition(topic string, partition int32, offset int64)
165165
messages: make(chan *ConsumerMessage, c.conf.ChannelBufferSize),
166166
errors: make(chan *ConsumerError, c.conf.ChannelBufferSize),
167167
feeder: make(chan *FetchResponse, 1),
168+
leaderEpoch: invalidLeaderEpoch,
168169
preferredReadReplica: invalidPreferredReplicaID,
169170
trigger: make(chan none, 1),
170171
dying: make(chan none),
@@ -175,9 +176,8 @@ func (c *consumer) ConsumePartition(topic string, partition int32, offset int64)
175176
return nil, err
176177
}
177178

178-
var leader *Broker
179-
var err error
180-
if leader, err = c.client.Leader(child.topic, child.partition); err != nil {
179+
leader, epoch, err := c.client.LeaderAndEpoch(child.topic, child.partition)
180+
if err != nil {
181181
return nil, err
182182
}
183183

@@ -188,6 +188,7 @@ func (c *consumer) ConsumePartition(topic string, partition int32, offset int64)
188188
go withRecover(child.dispatcher)
189189
go withRecover(child.responseFeeder)
190190

191+
child.leaderEpoch = epoch
191192
child.broker = c.refBrokerConsumer(leader)
192193
child.broker.input <- child
193194

@@ -400,6 +401,7 @@ type partitionConsumer struct {
400401
errors chan *ConsumerError
401402
feeder chan *FetchResponse
402403

404+
leaderEpoch int32
403405
preferredReadReplica int32
404406

405407
trigger, dying chan none
@@ -463,11 +465,11 @@ func (child *partitionConsumer) dispatcher() {
463465
close(child.feeder)
464466
}
465467

466-
func (child *partitionConsumer) preferredBroker() (*Broker, error) {
468+
func (child *partitionConsumer) preferredBroker() (*Broker, int32, error) {
467469
if child.preferredReadReplica >= 0 {
468470
broker, err := child.consumer.client.Broker(child.preferredReadReplica)
469471
if err == nil {
470-
return broker, nil
472+
return broker, child.leaderEpoch, nil
471473
}
472474
Logger.Printf(
473475
"consumer/%s/%d failed to find active broker for preferred read replica %d - will fallback to leader",
@@ -480,21 +482,21 @@ func (child *partitionConsumer) preferredBroker() (*Broker, error) {
480482
}
481483

482484
// if preferred replica cannot be found fallback to leader
483-
return child.consumer.client.Leader(child.topic, child.partition)
485+
return child.consumer.client.LeaderAndEpoch(child.topic, child.partition)
484486
}
485487

486488
func (child *partitionConsumer) dispatch() error {
487489
if err := child.consumer.client.RefreshMetadata(child.topic); err != nil {
488490
return err
489491
}
490492

491-
broker, err := child.preferredBroker()
493+
broker, epoch, err := child.preferredBroker()
492494
if err != nil {
493495
return err
494496
}
495497

498+
child.leaderEpoch = epoch
496499
child.broker = child.consumer.refBrokerConsumer(broker)
497-
498500
child.broker.input <- child
499501

500502
return nil
@@ -987,7 +989,7 @@ func (bc *brokerConsumer) handleResponses() {
987989
child.responseResult = nil
988990

989991
if result == nil {
990-
if preferredBroker, err := child.preferredBroker(); err == nil {
992+
if preferredBroker, _, err := child.preferredBroker(); err == nil {
991993
if bc.broker.ID() != preferredBroker.ID() {
992994
// not an error but needs redispatching to consume from preferred replica
993995
Logger.Printf(
@@ -1014,7 +1016,12 @@ func (bc *brokerConsumer) handleResponses() {
10141016
Logger.Printf("consumer/%s/%d shutting down because %s\n", child.topic, child.partition, result)
10151017
close(child.trigger)
10161018
delete(bc.subscriptions, child)
1017-
} else if errors.Is(result, ErrUnknownTopicOrPartition) || errors.Is(result, ErrNotLeaderForPartition) || errors.Is(result, ErrLeaderNotAvailable) || errors.Is(result, ErrReplicaNotAvailable) {
1019+
} else if errors.Is(result, ErrUnknownTopicOrPartition) ||
1020+
errors.Is(result, ErrNotLeaderForPartition) ||
1021+
errors.Is(result, ErrLeaderNotAvailable) ||
1022+
errors.Is(result, ErrReplicaNotAvailable) ||
1023+
errors.Is(result, ErrFencedLeaderEpoch) ||
1024+
errors.Is(result, ErrUnknownLeaderEpoch) {
10181025
// not an error, but does need redispatching
10191026
Logger.Printf("consumer/broker/%d abandoned subscription to %s/%d because %s\n",
10201027
bc.broker.ID(), child.topic, child.partition, result)
@@ -1092,7 +1099,7 @@ func (bc *brokerConsumer) fetchNewMessages() (*FetchResponse, error) {
10921099

10931100
for child := range bc.subscriptions {
10941101
if !child.IsPaused() {
1095-
request.AddBlock(child.topic, child.partition, child.offset, child.fetchSize)
1102+
request.AddBlock(child.topic, child.partition, child.offset, child.fetchSize, child.leaderEpoch)
10961103
}
10971104
}
10981105

fetch_request.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -302,7 +302,7 @@ func (r *FetchRequest) requiredVersion() KafkaVersion {
302302
}
303303
}
304304

305-
func (r *FetchRequest) AddBlock(topic string, partitionID int32, fetchOffset int64, maxBytes int32) {
305+
func (r *FetchRequest) AddBlock(topic string, partitionID int32, fetchOffset int64, maxBytes int32, leaderEpoch int32) {
306306
if r.blocks == nil {
307307
r.blocks = make(map[string]map[int32]*fetchRequestBlock)
308308
}
@@ -320,7 +320,7 @@ func (r *FetchRequest) AddBlock(topic string, partitionID int32, fetchOffset int
320320
tmp.maxBytes = maxBytes
321321
tmp.fetchOffset = fetchOffset
322322
if r.Version >= 9 {
323-
tmp.currentLeaderEpoch = int32(-1)
323+
tmp.currentLeaderEpoch = leaderEpoch
324324
}
325325

326326
r.blocks[topic][partitionID] = tmp

fetch_request_test.go

+5-5
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ var (
4141
0x00, 0x05, 't', 'o', 'p', 'i', 'c',
4242
0x00, 0x00, 0x00, 0x01,
4343
0x00, 0x00, 0x00, 0x12, // partitionID
44-
0xFF, 0xFF, 0xFF, 0xFF, // currentLeaderEpoch
44+
0x00, 0x00, 0x00, 0x66, // currentLeaderEpoch
4545
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x34, // fetchOffset
4646
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // logStartOffset
4747
0x00, 0x00, 0x00, 0x56, // maxBytes
@@ -67,7 +67,7 @@ func TestFetchRequest(t *testing.T) {
6767
request := new(FetchRequest)
6868
request.MaxWaitTime = 0
6969
request.MinBytes = 0
70-
request.AddBlock("topic", 0x12, 0x34, 0x56)
70+
request.AddBlock("topic", 0x12, 0x34, 0x56, -1)
7171
testRequest(t, "one block", request, fetchRequestOneBlock)
7272
})
7373

@@ -76,18 +76,18 @@ func TestFetchRequest(t *testing.T) {
7676
request.Version = 4
7777
request.MaxBytes = 0xFF
7878
request.Isolation = ReadCommitted
79-
request.AddBlock("topic", 0x12, 0x34, 0x56)
79+
request.AddBlock("topic", 0x12, 0x34, 0x56, -1)
8080
testRequest(t, "one block v4", request, fetchRequestOneBlockV4)
8181
})
8282

83-
t.Run("one block v11 rackid", func(t *testing.T) {
83+
t.Run("one block v11 rackid and leader epoch", func(t *testing.T) {
8484
request := new(FetchRequest)
8585
request.Version = 11
8686
request.MaxBytes = 0xFF
8787
request.Isolation = ReadCommitted
8888
request.SessionID = 0xAA
8989
request.SessionEpoch = 0xEE
90-
request.AddBlock("topic", 0x12, 0x34, 0x56)
90+
request.AddBlock("topic", 0x12, 0x34, 0x56, 0x66)
9191
request.RackID = "rack01"
9292
testRequest(t, "one block v11 rackid", request, fetchRequestOneBlockV11)
9393
})

fetch_response.go

+4-1
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,10 @@ import (
88
"github.com/rcrowley/go-metrics"
99
)
1010

11-
const invalidPreferredReplicaID = -1
11+
const (
12+
invalidLeaderEpoch = -1
13+
invalidPreferredReplicaID = -1
14+
)
1215

1316
type AbortedTransaction struct {
1417
// ProducerID contains the producer id associated with the aborted transaction.

0 commit comments

Comments
 (0)