Skip to content

Commit 2441dcb

Browse files
committed
fix: clear preferredReadReplica if broker shutdown
After Sarama had been given a preferred replica to consume from, it was mistakenly latching onto that value and not unsetting it in the case that the preferred replica broker was shutdown and left the cluster metadata. Fetches continued to work as long as that broker remained shutdown, because they were now being sent to the Leader, which would service them itself as it had no better preferred replica to point the client at. However, consumption would then hang after the broker came back online, because the Leader would stop returning records in the FetchResponse and would instead just return the preferred replicaID, expecting the client to send its FetchRequests over there. However, because the partitionConsumer had latched the value of preferredReplica it never dispatched to (re-)connect to the preferred replica and instead just continued to send FetchRequests to the leader and received no records back. Contributes-to: #2090 Signed-off-by: Dominic Evans <[email protected]>
1 parent 9d82f86 commit 2441dcb

File tree

3 files changed

+160
-2
lines changed

3 files changed

+160
-2
lines changed

consumer.go

+12-2
Original file line numberDiff line numberDiff line change
@@ -351,7 +351,6 @@ func (child *partitionConsumer) dispatcher() {
351351
child.broker = nil
352352
}
353353

354-
Logger.Printf("consumer/%s/%d finding new broker\n", child.topic, child.partition)
355354
if err := child.dispatch(); err != nil {
356355
child.sendError(err)
357356
child.trigger <- none{}
@@ -372,6 +371,14 @@ func (child *partitionConsumer) preferredBroker() (*Broker, error) {
372371
if err == nil {
373372
return broker, nil
374373
}
374+
Logger.Printf(
375+
"consumer/%s/%d failed to find active broker for preferred read replica %d - will fallback to leader",
376+
child.topic, child.partition, child.preferredReadReplica)
377+
378+
// if we couldn't find it, discard the replica preference and trigger a
379+
// metadata refresh whilst falling back to consuming from the leader again
380+
child.preferredReadReplica = invalidPreferredReplicaID
381+
_ = child.consumer.client.RefreshMetadata(child.topic)
375382
}
376383

377384
// if preferred replica cannot be found fallback to leader
@@ -856,6 +863,9 @@ func (bc *brokerConsumer) handleResponses() {
856863
if preferredBroker, err := child.preferredBroker(); err == nil {
857864
if bc.broker.ID() != preferredBroker.ID() {
858865
// not an error but needs redispatching to consume from preferred replica
866+
Logger.Printf(
867+
"consumer/broker/%d abandoned in favour of preferred replica broker/%d\n",
868+
bc.broker.ID(), preferredBroker.ID())
859869
child.trigger <- none{}
860870
delete(bc.subscriptions, child)
861871
}
@@ -864,7 +874,7 @@ func (bc *brokerConsumer) handleResponses() {
864874
}
865875

866876
// Discard any replica preference.
867-
child.preferredReadReplica = -1
877+
child.preferredReadReplica = invalidPreferredReplicaID
868878

869879
switch result {
870880
case errTimedOut:
+127
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
//go:build functional
2+
// +build functional
3+
4+
package sarama
5+
6+
import (
7+
"context"
8+
"fmt"
9+
"strconv"
10+
"sync"
11+
"testing"
12+
"time"
13+
)
14+
15+
func TestConsumerFetchFollowerFailover(t *testing.T) {
16+
const (
17+
topic = "test.1"
18+
numMsg = 1000
19+
)
20+
21+
newConfig := func() *Config {
22+
config := NewConfig()
23+
config.ClientID = t.Name()
24+
config.Version = V2_8_0_0
25+
config.Producer.Return.Successes = true
26+
return config
27+
}
28+
29+
config := newConfig()
30+
31+
// pick a partition and find the ID for one of the follower brokers
32+
admin, err := NewClusterAdmin(FunctionalTestEnv.KafkaBrokerAddrs, config)
33+
if err != nil {
34+
t.Fatal(err)
35+
}
36+
defer admin.Close()
37+
38+
metadata, err := admin.DescribeTopics([]string{topic})
39+
if err != nil {
40+
t.Fatal(err)
41+
}
42+
partition := metadata[0].Partitions[0]
43+
leader := metadata[0].Partitions[0].Leader
44+
follower := int32(-1)
45+
for _, replica := range partition.Replicas {
46+
if replica == leader {
47+
continue
48+
}
49+
follower = replica
50+
break
51+
}
52+
53+
t.Logf("topic %s has leader kafka-%d and our chosen follower is kafka-%d", topic, leader, follower)
54+
55+
// match our clientID to the given broker so our requests should end up fetching from that follower
56+
config.RackID = strconv.FormatInt(int64(follower), 10)
57+
58+
consumer, err := NewConsumer(FunctionalTestEnv.KafkaBrokerAddrs, config)
59+
if err != nil {
60+
t.Fatal(err)
61+
}
62+
63+
pc, err := consumer.ConsumePartition(topic, partition.ID, OffsetOldest)
64+
if err != nil {
65+
t.Fatal(err)
66+
}
67+
defer func() {
68+
pc.Close()
69+
consumer.Close()
70+
}()
71+
72+
producer, err := NewSyncProducer(FunctionalTestEnv.KafkaBrokerAddrs, config)
73+
if err != nil {
74+
t.Fatal(err)
75+
}
76+
defer producer.Close()
77+
78+
var wg sync.WaitGroup
79+
wg.Add(numMsg)
80+
81+
go func() {
82+
for i := 0; i < numMsg; i++ {
83+
msg := &ProducerMessage{
84+
Topic: topic, Key: nil, Value: StringEncoder(fmt.Sprintf("%s %-3d", t.Name(), i))}
85+
if _, offset, err := producer.SendMessage(msg); err != nil {
86+
t.Error(i, err)
87+
} else if offset%50 == 0 {
88+
t.Logf("sent: %d\n", offset)
89+
}
90+
wg.Done()
91+
time.Sleep(time.Millisecond * 25)
92+
}
93+
}()
94+
95+
i := 0
96+
97+
for ; i < numMsg/8; i++ {
98+
msg := <-pc.Messages()
99+
if msg.Offset%50 == 0 {
100+
t.Logf("recv: %d\n", msg.Offset)
101+
}
102+
}
103+
104+
if err := stopDockerTestBroker(context.Background(), follower); err != nil {
105+
t.Fatal(err)
106+
}
107+
108+
for ; i < numMsg/3; i++ {
109+
msg := <-pc.Messages()
110+
if msg.Offset%50 == 0 {
111+
t.Logf("recv: %d\n", msg.Offset)
112+
}
113+
}
114+
115+
if err := startDockerTestBroker(context.Background(), follower); err != nil {
116+
t.Fatal(err)
117+
}
118+
119+
for ; i < numMsg; i++ {
120+
msg := <-pc.Messages()
121+
if msg.Offset%50 == 0 {
122+
t.Logf("recv: %d\n", msg.Offset)
123+
}
124+
}
125+
126+
wg.Wait()
127+
}

functional_test.go

+21
Original file line numberDiff line numberDiff line change
@@ -221,6 +221,7 @@ func existingEnvironment(ctx context.Context, env *testEnvironment) (bool, error
221221
toxiproxyHost := toxiproxyURL.Hostname()
222222

223223
env.ToxiproxyClient = toxiproxy.NewClient(toxiproxyAddr)
224+
env.Proxies = map[string]*toxiproxy.Proxy{}
224225
for i := 1; i <= 5; i++ {
225226
proxyName := fmt.Sprintf("kafka%d", i)
226227
proxy, err := env.ToxiproxyClient.Proxy(proxyName)
@@ -262,6 +263,26 @@ func tearDownDockerTestEnvironment(ctx context.Context, env *testEnvironment) er
262263
return nil
263264
}
264265

266+
func startDockerTestBroker(ctx context.Context, brokerID int32) error {
267+
c := exec.Command("docker-compose", "start", fmt.Sprintf("kafka-%d", brokerID))
268+
c.Stdout = os.Stdout
269+
c.Stderr = os.Stderr
270+
if err := c.Run(); err != nil {
271+
return fmt.Errorf("failed to run docker-compose to start test broker kafka-%d: %w", brokerID, err)
272+
}
273+
return nil
274+
}
275+
276+
func stopDockerTestBroker(ctx context.Context, brokerID int32) error {
277+
c := exec.Command("docker-compose", "stop", fmt.Sprintf("kafka-%d", brokerID))
278+
c.Stdout = os.Stdout
279+
c.Stderr = os.Stderr
280+
if err := c.Run(); err != nil {
281+
return fmt.Errorf("failed to run docker-compose to stop test broker kafka-%d: %w", brokerID, err)
282+
}
283+
return nil
284+
}
285+
265286
func prepareTestTopics(ctx context.Context, env *testEnvironment) error {
266287
Logger.Println("creating test topics")
267288
var testTopicNames []string

0 commit comments

Comments
 (0)