From 3e025741012402ebd95e5d38d6daaace51733cea Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Wed, 5 Oct 2022 22:32:35 -0600 Subject: [PATCH] kgo: occasionally use seed brokers when choosing "random" brokers See the embedded comments -- previously, we could get into a pathological case where all discovered brokers are down. We would never be able to refresh metadata because we would only try talking to discovered brokers. --- pkg/kgo/client.go | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/pkg/kgo/client.go b/pkg/kgo/client.go index 1c1286ad..b8e0f86b 100644 --- a/pkg/kgo/client.go +++ b/pkg/kgo/client.go @@ -404,8 +404,12 @@ func (cl *Client) broker() *broker { cl.brokersMu.Lock() // full lock needed for anyBrokerIdx below defer cl.brokersMu.Unlock() + // Every time we loop through all discovered brokers, we issue one + // request to the next seed. This ensures that if all discovered + // brokers are down, we will *eventually* loop through seeds and + // hopefully have a reachable seed. var b *broker - if len(cl.brokers) > 0 { + if len(cl.brokers) > 0 && int(cl.anyBrokerIdx) < len(cl.brokers) { cl.anyBrokerIdx %= int32(len(cl.brokers)) b = cl.brokers[cl.anyBrokerIdx] cl.anyBrokerIdx++ @@ -413,6 +417,13 @@ func (cl *Client) broker() *broker { cl.anySeedIdx %= int32(len(cl.seeds)) b = cl.seeds[cl.anySeedIdx] cl.anySeedIdx++ + + // If we have brokers, we ranged past discovered brokers. + // We now reset the anyBrokerIdx to begin ranging through + // discovered brokers again. + if len(cl.brokers) > 0 { + cl.anyBrokerIdx %= int32(len(cl.brokers)) + } } return b }