Skip to content

Commit ca08d86

Browse files
authored
Merge branch 'main' into feat/producer-tx
2 parents ba2b4bc + 5e2c2ef commit ca08d86

15 files changed

+365
-103
lines changed

async_producer.go

+15-9
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010

1111
"github.com/eapache/go-resiliency/breaker"
1212
"github.com/eapache/queue"
13+
"github.com/rcrowley/go-metrics"
1314
)
1415

1516
// AsyncProducer publishes Kafka messages using a non-blocking API. It routes messages
@@ -85,6 +86,8 @@ type asyncProducer struct {
8586

8687
txnmgr *transactionManager
8788
txLock sync.Mutex
89+
90+
metricsRegistry metrics.Registry
8891
}
8992

9093
// NewAsyncProducer creates a new AsyncProducer using the given broker addresses and configuration.
@@ -117,15 +120,16 @@ func newAsyncProducer(client Client) (AsyncProducer, error) {
117120
}
118121

119122
p := &asyncProducer{
120-
client: client,
121-
conf: client.Config(),
122-
errors: make(chan *ProducerError),
123-
input: make(chan *ProducerMessage),
124-
successes: make(chan *ProducerMessage),
125-
retries: make(chan *ProducerMessage),
126-
brokers: make(map[*Broker]*brokerProducer),
127-
brokerRefs: make(map[*brokerProducer]int),
128-
txnmgr: txnmgr,
123+
client: client,
124+
conf: client.Config(),
125+
errors: make(chan *ProducerError),
126+
input: make(chan *ProducerMessage),
127+
successes: make(chan *ProducerMessage),
128+
retries: make(chan *ProducerMessage),
129+
brokers: make(map[*Broker]*brokerProducer),
130+
brokerRefs: make(map[*brokerProducer]int),
131+
txnmgr: txnmgr,
132+
metricsRegistry: newCleanupRegistry(client.Config().MetricRegistry),
129133
}
130134

131135
// launch our singleton dispatchers
@@ -1228,6 +1232,8 @@ func (p *asyncProducer) shutdown() {
12281232
close(p.retries)
12291233
close(p.errors)
12301234
close(p.successes)
1235+
1236+
p.metricsRegistry.UnregisterAll()
12311237
}
12321238

12331239
func (p *asyncProducer) bumpIdempotentProducerEpoch() {

broker.go

+22-40
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,7 @@ type Broker struct {
3333
responses chan *responsePromise
3434
done chan bool
3535

36-
registeredMetrics map[string]struct{}
37-
36+
metricRegistry metrics.Registry
3837
incomingByteRate metrics.Meter
3938
requestRate metrics.Meter
4039
fetchRate metrics.Meter
@@ -174,6 +173,8 @@ func (b *Broker) Open(conf *Config) error {
174173

175174
b.lock.Lock()
176175

176+
b.metricRegistry = newCleanupRegistry(conf.MetricRegistry)
177+
177178
go withRecover(func() {
178179
defer func() {
179180
b.lock.Unlock()
@@ -208,15 +209,15 @@ func (b *Broker) Open(conf *Config) error {
208209
b.conf = conf
209210

210211
// Create or reuse the global metrics shared between brokers
211-
b.incomingByteRate = metrics.GetOrRegisterMeter("incoming-byte-rate", conf.MetricRegistry)
212-
b.requestRate = metrics.GetOrRegisterMeter("request-rate", conf.MetricRegistry)
213-
b.fetchRate = metrics.GetOrRegisterMeter("consumer-fetch-rate", conf.MetricRegistry)
214-
b.requestSize = getOrRegisterHistogram("request-size", conf.MetricRegistry)
215-
b.requestLatency = getOrRegisterHistogram("request-latency-in-ms", conf.MetricRegistry)
216-
b.outgoingByteRate = metrics.GetOrRegisterMeter("outgoing-byte-rate", conf.MetricRegistry)
217-
b.responseRate = metrics.GetOrRegisterMeter("response-rate", conf.MetricRegistry)
218-
b.responseSize = getOrRegisterHistogram("response-size", conf.MetricRegistry)
219-
b.requestsInFlight = metrics.GetOrRegisterCounter("requests-in-flight", conf.MetricRegistry)
212+
b.incomingByteRate = metrics.GetOrRegisterMeter("incoming-byte-rate", b.metricRegistry)
213+
b.requestRate = metrics.GetOrRegisterMeter("request-rate", b.metricRegistry)
214+
b.fetchRate = metrics.GetOrRegisterMeter("consumer-fetch-rate", b.metricRegistry)
215+
b.requestSize = getOrRegisterHistogram("request-size", b.metricRegistry)
216+
b.requestLatency = getOrRegisterHistogram("request-latency-in-ms", b.metricRegistry)
217+
b.outgoingByteRate = metrics.GetOrRegisterMeter("outgoing-byte-rate", b.metricRegistry)
218+
b.responseRate = metrics.GetOrRegisterMeter("response-rate", b.metricRegistry)
219+
b.responseSize = getOrRegisterHistogram("response-size", b.metricRegistry)
220+
b.requestsInFlight = metrics.GetOrRegisterCounter("requests-in-flight", b.metricRegistry)
220221
// Do not gather metrics for seeded broker (only used during bootstrap) because they share
221222
// the same id (-1) and are already exposed through the global metrics above
222223
if b.id >= 0 && !metrics.UseNilMetrics {
@@ -326,7 +327,7 @@ func (b *Broker) Close() error {
326327
b.done = nil
327328
b.responses = nil
328329

329-
b.unregisterMetrics()
330+
b.metricRegistry.UnregisterAll()
330331

331332
if err == nil {
332333
DebugLogger.Printf("Closed connection to broker %s\n", b.addr)
@@ -442,7 +443,7 @@ func (b *Broker) AsyncProduce(request *ProduceRequest, cb ProduceCallback) error
442443
return
443444
}
444445

445-
if err := versionedDecode(packets, res, request.version(), b.conf.MetricRegistry); err != nil {
446+
if err := versionedDecode(packets, res, request.version(), b.metricRegistry); err != nil {
446447
// Malformed response
447448
cb(nil, err)
448449
return
@@ -987,7 +988,7 @@ func (b *Broker) sendInternal(rb protocolBody, promise *responsePromise) error {
987988
}
988989

989990
req := &request{correlationID: b.correlationID, clientID: b.conf.ClientID, body: rb}
990-
buf, err := encode(req, b.conf.MetricRegistry)
991+
buf, err := encode(req, b.metricRegistry)
991992
if err != nil {
992993
return err
993994
}
@@ -1037,7 +1038,7 @@ func (b *Broker) sendAndReceive(req protocolBody, res protocolBody) error {
10371038
func (b *Broker) handleResponsePromise(req protocolBody, res protocolBody, promise *responsePromise) error {
10381039
select {
10391040
case buf := <-promise.packets:
1040-
return versionedDecode(buf, res, req.version(), b.conf.MetricRegistry)
1041+
return versionedDecode(buf, res, req.version(), b.metricRegistry)
10411042
case err := <-promise.errors:
10421043
return err
10431044
}
@@ -1129,7 +1130,7 @@ func (b *Broker) responseReceiver() {
11291130
}
11301131

11311132
decodedHeader := responseHeader{}
1132-
err = versionedDecode(header, &decodedHeader, response.headerVersion, b.conf.MetricRegistry)
1133+
err = versionedDecode(header, &decodedHeader, response.headerVersion, b.metricRegistry)
11331134
if err != nil {
11341135
b.updateIncomingCommunicationMetrics(bytesReadHeader, requestLatency)
11351136
dead = err
@@ -1251,7 +1252,7 @@ func (b *Broker) sendAndReceiveSASLHandshake(saslType SASLMechanism, version int
12511252
rb := &SaslHandshakeRequest{Mechanism: string(saslType), Version: version}
12521253

12531254
req := &request{correlationID: b.correlationID, clientID: b.conf.ClientID, body: rb}
1254-
buf, err := encode(req, b.conf.MetricRegistry)
1255+
buf, err := encode(req, b.metricRegistry)
12551256
if err != nil {
12561257
return err
12571258
}
@@ -1288,7 +1289,7 @@ func (b *Broker) sendAndReceiveSASLHandshake(saslType SASLMechanism, version int
12881289
b.updateIncomingCommunicationMetrics(n+8, time.Since(requestTime))
12891290
res := &SaslHandshakeResponse{}
12901291

1291-
err = versionedDecode(payload, res, 0, b.conf.MetricRegistry)
1292+
err = versionedDecode(payload, res, 0, b.metricRegistry)
12921293
if err != nil {
12931294
Logger.Printf("Failed to parse SASL handshake : %s\n", err.Error())
12941295
return err
@@ -1630,38 +1631,19 @@ func (b *Broker) registerMetrics() {
16301631
b.brokerThrottleTime = b.registerHistogram("throttle-time-in-ms")
16311632
}
16321633

1633-
func (b *Broker) unregisterMetrics() {
1634-
for name := range b.registeredMetrics {
1635-
b.conf.MetricRegistry.Unregister(name)
1636-
}
1637-
b.registeredMetrics = nil
1638-
}
1639-
16401634
func (b *Broker) registerMeter(name string) metrics.Meter {
16411635
nameForBroker := getMetricNameForBroker(name, b)
1642-
if b.registeredMetrics == nil {
1643-
b.registeredMetrics = map[string]struct{}{}
1644-
}
1645-
b.registeredMetrics[nameForBroker] = struct{}{}
1646-
return metrics.GetOrRegisterMeter(nameForBroker, b.conf.MetricRegistry)
1636+
return metrics.GetOrRegisterMeter(nameForBroker, b.metricRegistry)
16471637
}
16481638

16491639
func (b *Broker) registerHistogram(name string) metrics.Histogram {
16501640
nameForBroker := getMetricNameForBroker(name, b)
1651-
if b.registeredMetrics == nil {
1652-
b.registeredMetrics = map[string]struct{}{}
1653-
}
1654-
b.registeredMetrics[nameForBroker] = struct{}{}
1655-
return getOrRegisterHistogram(nameForBroker, b.conf.MetricRegistry)
1641+
return getOrRegisterHistogram(nameForBroker, b.metricRegistry)
16561642
}
16571643

16581644
func (b *Broker) registerCounter(name string) metrics.Counter {
16591645
nameForBroker := getMetricNameForBroker(name, b)
1660-
if b.registeredMetrics == nil {
1661-
b.registeredMetrics = map[string]struct{}{}
1662-
}
1663-
b.registeredMetrics[nameForBroker] = struct{}{}
1664-
return metrics.GetOrRegisterCounter(nameForBroker, b.conf.MetricRegistry)
1646+
return metrics.GetOrRegisterCounter(nameForBroker, b.metricRegistry)
16651647
}
16661648

16671649
func validServerNameTLS(addr string, cfg *tls.Config) *tls.Config {

client_test.go

+24
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ import (
88
"syscall"
99
"testing"
1010
"time"
11+
12+
"github.com/rcrowley/go-metrics"
1113
)
1214

1315
func safeClose(t testing.TB, c io.Closer) {
@@ -1096,3 +1098,25 @@ func TestInitProducerIDConnectionRefused(t *testing.T) {
10961098

10971099
safeClose(t, client)
10981100
}
1101+
1102+
func TestMetricsCleanup(t *testing.T) {
1103+
seedBroker := NewMockBroker(t, 1)
1104+
seedBroker.Returns(new(MetadataResponse))
1105+
1106+
config := NewTestConfig()
1107+
metrics.GetOrRegisterMeter("a", config.MetricRegistry)
1108+
1109+
client, err := NewClient([]string{seedBroker.Addr()}, config)
1110+
if err != nil {
1111+
t.Fatal(err)
1112+
}
1113+
safeClose(t, client)
1114+
1115+
// Wait async close
1116+
time.Sleep(10 * time.Millisecond)
1117+
1118+
all := config.MetricRegistry.GetAll()
1119+
if len(all) != 1 || all["a"] == nil {
1120+
t.Errorf("excepted 1 metric, found: %v", all)
1121+
}
1122+
}

config.go

+25-3
Original file line numberDiff line numberDiff line change
@@ -295,7 +295,16 @@ type Config struct {
295295
}
296296
Rebalance struct {
297297
// Strategy for allocating topic partitions to members (default BalanceStrategyRange)
298+
// Deprecated: Strategy exists for historical compatibility
299+
// and should not be used. Please use GroupStrategies.
298300
Strategy BalanceStrategy
301+
302+
// GroupStrategies is the priority-ordered list of client-side consumer group
303+
// balancing strategies that will be offered to the coordinator. The first
304+
// strategy that all group members support will be chosen by the leader.
305+
// default: [BalanceStrategyRange]
306+
GroupStrategies []BalanceStrategy
307+
299308
// The maximum allowed time for each worker to join the group once a rebalance has begun.
300309
// This is basically a limit on the amount of time needed for all tasks to flush any pending
301310
// data and commit offsets. If the timeout is exceeded, then the worker will be removed from
@@ -530,7 +539,7 @@ func NewConfig() *Config {
530539

531540
c.Consumer.Group.Session.Timeout = 10 * time.Second
532541
c.Consumer.Group.Heartbeat.Interval = 3 * time.Second
533-
c.Consumer.Group.Rebalance.Strategy = BalanceStrategyRange
542+
c.Consumer.Group.Rebalance.GroupStrategies = []BalanceStrategy{BalanceStrategyRange}
534543
c.Consumer.Group.Rebalance.Timeout = 60 * time.Second
535544
c.Consumer.Group.Rebalance.Retry.Max = 4
536545
c.Consumer.Group.Rebalance.Retry.Backoff = 2 * time.Second
@@ -775,6 +784,10 @@ func (c *Config) Validate() error {
775784
Logger.Println("Deprecation warning: Consumer.Offsets.CommitInterval exists for historical compatibility" +
776785
" and should not be used. Please use Consumer.Offsets.AutoCommit, the current value will be ignored")
777786
}
787+
if c.Consumer.Group.Rebalance.Strategy != nil {
788+
Logger.Println("Deprecation warning: Consumer.Group.Rebalance.Strategy exists for historical compatibility" +
789+
" and should not be used. Please use Consumer.Group.Rebalance.GroupStrategies")
790+
}
778791

779792
// validate IsolationLevel
780793
if c.Consumer.IsolationLevel == ReadCommitted && !c.Version.IsAtLeast(V0_11_0_0) {
@@ -789,15 +802,24 @@ func (c *Config) Validate() error {
789802
return ConfigurationError("Consumer.Group.Heartbeat.Interval must be >= 1ms")
790803
case c.Consumer.Group.Heartbeat.Interval >= c.Consumer.Group.Session.Timeout:
791804
return ConfigurationError("Consumer.Group.Heartbeat.Interval must be < Consumer.Group.Session.Timeout")
792-
case c.Consumer.Group.Rebalance.Strategy == nil:
793-
return ConfigurationError("Consumer.Group.Rebalance.Strategy must not be empty")
805+
case c.Consumer.Group.Rebalance.Strategy == nil && len(c.Consumer.Group.Rebalance.GroupStrategies) == 0:
806+
return ConfigurationError("Consumer.Group.Rebalance.GroupStrategies or Consumer.Group.Rebalance.Strategy must not be empty")
807+
case c.Consumer.Group.Rebalance.Strategy != nil && len(c.Consumer.Group.Rebalance.GroupStrategies) != 0:
808+
return ConfigurationError("Consumer.Group.Rebalance.GroupStrategies and Consumer.Group.Rebalance.Strategy cannot be set at the same time")
794809
case c.Consumer.Group.Rebalance.Timeout <= time.Millisecond:
795810
return ConfigurationError("Consumer.Group.Rebalance.Timeout must be >= 1ms")
796811
case c.Consumer.Group.Rebalance.Retry.Max < 0:
797812
return ConfigurationError("Consumer.Group.Rebalance.Retry.Max must be >= 0")
798813
case c.Consumer.Group.Rebalance.Retry.Backoff < 0:
799814
return ConfigurationError("Consumer.Group.Rebalance.Retry.Backoff must be >= 0")
800815
}
816+
817+
for _, strategy := range c.Consumer.Group.Rebalance.GroupStrategies {
818+
if strategy == nil {
819+
return ConfigurationError("elements in Consumer.Group.Rebalance.Strategies must not be empty")
820+
}
821+
}
822+
801823
if c.Consumer.Group.InstanceId != "" {
802824
if !c.Version.IsAtLeast(V2_3_0_0) {
803825
return ConfigurationError("Consumer.Group.InstanceId need Version >= 2.3")

consumer.go

+19-8
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@ type consumer struct {
104104
children map[string]map[int32]*partitionConsumer
105105
brokerConsumers map[*Broker]*brokerConsumer
106106
client Client
107+
metricRegistry metrics.Registry
107108
lock sync.Mutex
108109
}
109110

@@ -136,12 +137,14 @@ func newConsumer(client Client) (Consumer, error) {
136137
conf: client.Config(),
137138
children: make(map[string]map[int32]*partitionConsumer),
138139
brokerConsumers: make(map[*Broker]*brokerConsumer),
140+
metricRegistry: newCleanupRegistry(client.Config().MetricRegistry),
139141
}
140142

141143
return c, nil
142144
}
143145

144146
func (c *consumer) Close() error {
147+
c.metricRegistry.UnregisterAll()
145148
return c.client.Close()
146149
}
147150

@@ -678,13 +681,9 @@ func (child *partitionConsumer) parseRecords(batch *RecordBatch) ([]*ConsumerMes
678681
}
679682

680683
func (child *partitionConsumer) parseResponse(response *FetchResponse) ([]*ConsumerMessage, error) {
681-
var (
682-
metricRegistry = child.conf.MetricRegistry
683-
consumerBatchSizeMetric metrics.Histogram
684-
)
685-
686-
if metricRegistry != nil {
687-
consumerBatchSizeMetric = getOrRegisterHistogram("consumer-batch-size", metricRegistry)
684+
var consumerBatchSizeMetric metrics.Histogram
685+
if child.consumer != nil && child.consumer.metricRegistry != nil {
686+
consumerBatchSizeMetric = getOrRegisterHistogram("consumer-batch-size", child.consumer.metricRegistry)
688687
}
689688

690689
// If request was throttled and empty we log and return without error
@@ -709,7 +708,9 @@ func (child *partitionConsumer) parseResponse(response *FetchResponse) ([]*Consu
709708
return nil, err
710709
}
711710

712-
consumerBatchSizeMetric.Update(int64(nRecs))
711+
if consumerBatchSizeMetric != nil {
712+
consumerBatchSizeMetric.Update(int64(nRecs))
713+
}
713714

714715
if block.PreferredReadReplica != invalidPreferredReplicaID {
715716
child.preferredReadReplica = block.PreferredReadReplica
@@ -944,6 +945,16 @@ func (bc *brokerConsumer) subscriptionConsumer() {
944945

945946
bc.acks.Add(len(bc.subscriptions))
946947
for child := range bc.subscriptions {
948+
if _, ok := response.Blocks[child.topic]; !ok {
949+
bc.acks.Done()
950+
continue
951+
}
952+
953+
if _, ok := response.Blocks[child.topic][child.partition]; !ok {
954+
bc.acks.Done()
955+
continue
956+
}
957+
947958
child.feeder <- response
948959
}
949960
bc.acks.Wait()

0 commit comments

Comments
 (0)