Skip to content

Commit 3de95be

Browse files
authored
Merge branch 'main' into kip-580-exponential-backoff-func
2 parents ff2dc5f + 60592f6 commit 3de95be

File tree

2 files changed

+34
-6
lines changed

2 files changed

+34
-6
lines changed

client.go

+14
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,9 @@ type Client interface {
113113
// LeastLoadedBroker retrieves broker that has the least responses pending
114114
LeastLoadedBroker() *Broker
115115

116+
// check if partition is readable
117+
PartitionNotReadable(topic string, partition int32) bool
118+
116119
// Close shuts down all broker connections managed by this client. It is required
117120
// to call this function before a client object passes out of scope, as it will
118121
// otherwise leak memory. You must close any Producers or Consumers using a client
@@ -1283,3 +1286,14 @@ type nopCloserClient struct {
12831286
func (ncc *nopCloserClient) Close() error {
12841287
return nil
12851288
}
1289+
1290+
func (client *client) PartitionNotReadable(topic string, partition int32) bool {
1291+
client.lock.RLock()
1292+
defer client.lock.RUnlock()
1293+
1294+
pm := client.metadata[topic][partition]
1295+
if pm == nil {
1296+
return true
1297+
}
1298+
return pm.Leader == -1
1299+
}

consumer_group.go

+20-6
Original file line numberDiff line numberDiff line change
@@ -861,18 +861,32 @@ func newConsumerGroupSession(ctx context.Context, parent *consumerGroup, claims
861861
return nil, err
862862
}
863863

864-
// start consuming
864+
// start consuming each topic partition in its own goroutine
865865
for topic, partitions := range claims {
866866
for _, partition := range partitions {
867-
sess.waitGroup.Add(1)
868-
867+
sess.waitGroup.Add(1) // increment wait group before spawning goroutine
869868
go func(topic string, partition int32) {
870869
defer sess.waitGroup.Done()
871-
872-
// cancel the as session as soon as the first
873-
// goroutine exits
870+
// cancel the group session as soon as any of the consume calls return
874871
defer sess.cancel()
875872

873+
// if partition not currently readable, wait for it to become readable
874+
if sess.parent.client.PartitionNotReadable(topic, partition) {
875+
timer := time.NewTimer(5 * time.Second)
876+
defer timer.Stop()
877+
878+
for sess.parent.client.PartitionNotReadable(topic, partition) {
879+
select {
880+
case <-ctx.Done():
881+
return
882+
case <-parent.closed:
883+
return
884+
case <-timer.C:
885+
timer.Reset(5 * time.Second)
886+
}
887+
}
888+
}
889+
876890
// consume a single topic/partition, blocking
877891
sess.consume(topic, partition)
878892
}(topic, partition)

0 commit comments

Comments
 (0)