Skip to content

Commit 4a17d91

Browse files
committed
feat: send ApiVersionsRequest on broker open
Currently we don't use the actual response for anything, but lay the groundwork by sending an ApiVersionsRequest on broker open when the kafka version config has been set to 2.4.0.0 or newer. This is useful because the clientSoftwareName and version will show up in the server-side metrics. ``` kafka.server:clientSoftwareName=sarama,clientSoftwareVersion=1.30.0,listener=PLAINTEXT,networkProcessor=1,type=socket-server-metrics ```
1 parent 74aaffe commit 4a17d91

7 files changed

+53
-4
lines changed

admin_test.go

+5
Original file line numberDiff line numberDiff line change
@@ -340,13 +340,15 @@ func TestClusterAdminAlterPartitionReassignments(t *testing.T) {
340340
defer secondBroker.Close()
341341

342342
seedBroker.SetHandlerByMap(map[string]MockResponse{
343+
"ApiVersionsRequest": NewMockApiVersionsResponse(t),
343344
"MetadataRequest": NewMockMetadataResponse(t).
344345
SetController(secondBroker.BrokerID()).
345346
SetBroker(seedBroker.Addr(), seedBroker.BrokerID()).
346347
SetBroker(secondBroker.Addr(), secondBroker.BrokerID()),
347348
})
348349

349350
secondBroker.SetHandlerByMap(map[string]MockResponse{
351+
"ApiVersionsRequest": NewMockApiVersionsResponse(t),
350352
"AlterPartitionReassignmentsRequest": NewMockAlterPartitionReassignmentsResponse(t),
351353
})
352354

@@ -417,13 +419,15 @@ func TestClusterAdminListPartitionReassignments(t *testing.T) {
417419
defer secondBroker.Close()
418420

419421
seedBroker.SetHandlerByMap(map[string]MockResponse{
422+
"ApiVersionsRequest": NewMockApiVersionsResponse(t),
420423
"MetadataRequest": NewMockMetadataResponse(t).
421424
SetController(secondBroker.BrokerID()).
422425
SetBroker(seedBroker.Addr(), seedBroker.BrokerID()).
423426
SetBroker(secondBroker.Addr(), secondBroker.BrokerID()),
424427
})
425428

426429
secondBroker.SetHandlerByMap(map[string]MockResponse{
430+
"ApiVersionsRequest": NewMockApiVersionsResponse(t),
427431
"ListPartitionReassignmentsRequest": NewMockListPartitionReassignmentsResponse(t),
428432
})
429433

@@ -1335,6 +1339,7 @@ func TestDeleteOffset(t *testing.T) {
13351339
partition := int32(0)
13361340

13371341
handlerMap := map[string]MockResponse{
1342+
"ApiVersionsRequest": NewMockApiVersionsResponse(t),
13381343
"MetadataRequest": NewMockMetadataResponse(t).
13391344
SetController(seedBroker.BrokerID()).
13401345
SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),

api_versions_request.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
package sarama
22

3-
// const defaultClientSoftwareName = "sarama"
3+
const defaultClientSoftwareName = "sarama"
44

55
type ApiVersionsRequest struct {
66
// Version defines the protocol version to use for encode and decode

broker.go

+17-2
Original file line numberDiff line numberDiff line change
@@ -153,8 +153,23 @@ func (b *Broker) Open(conf *Config) error {
153153
b.lock.Lock()
154154

155155
go withRecover(func() {
156-
defer b.lock.Unlock()
157-
156+
defer func() {
157+
b.lock.Unlock()
158+
159+
// Send an ApiVersionsRequest to identify the client (KIP-511).
160+
// Ideally Sarama would use the response to control protocol versions,
161+
// but for now just fire-and-forget just to send
162+
if conf.Version.IsAtLeast(V2_4_0_0) && conf.ApiVersionsRequest {
163+
_, err = b.ApiVersions(&ApiVersionsRequest{
164+
Version: 3,
165+
ClientSoftwareName: defaultClientSoftwareName,
166+
ClientSoftwareVersion: version(),
167+
})
168+
if err != nil {
169+
Logger.Printf("Error while sending ApiVersionsRequest to broker %s: %s\n", b.addr, err)
170+
}
171+
}
172+
}()
158173
dialer := conf.getDialer()
159174
b.conn, b.connErr = dialer.Dial("tcp", b.addr)
160175
if b.connErr != nil {

broker_test.go

+1
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,7 @@ func TestSimpleBrokerCommunication(t *testing.T) {
9393
// Set the broker id in order to validate local broker metrics
9494
broker.id = 0
9595
conf := NewTestConfig()
96+
conf.ApiVersionsRequest = false
9697
conf.Version = tt.version
9798
err := broker.Open(conf)
9899
if err != nil {

config.go

+6
Original file line numberDiff line numberDiff line change
@@ -427,6 +427,11 @@ type Config struct {
427427
// in the background while user code is working, greatly improving throughput.
428428
// Defaults to 256.
429429
ChannelBufferSize int
430+
// ApiVersionsRequest determines whether Sarama should send an
431+
// ApiVersionsRequest message to each broker as part of its initial
432+
// connection. This defaults to `true` to match the official Java client
433+
// and most 3rdparty ones.
434+
ApiVersionsRequest bool
430435
// The version of Kafka that Sarama will assume it is running against.
431436
// Defaults to the oldest supported stable version. Since Kafka provides
432437
// backwards-compatibility, setting it to a version older than you have
@@ -492,6 +497,7 @@ func NewConfig() *Config {
492497

493498
c.ClientID = defaultClientID
494499
c.ChannelBufferSize = 256
500+
c.ApiVersionsRequest = true
495501
c.Version = DefaultVersion
496502
c.MetricRegistry = metrics.NewRegistry()
497503

real_decoder.go

+3-1
Original file line numberDiff line numberDiff line change
@@ -240,7 +240,9 @@ func (rd *realDecoder) getCompactString() (string, error) {
240240
}
241241

242242
length := int(n - 1)
243-
243+
if length < 0 {
244+
return "", errInvalidByteSliceLength
245+
}
244246
tmpStr := string(rd.raw[rd.off : rd.off+length])
245247
rd.off += length
246248
return tmpStr, nil

version.go

+20
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
package sarama
2+
3+
import "runtime/debug"
4+
5+
var v string
6+
7+
func version() string {
8+
if v == "" {
9+
bi, ok := debug.ReadBuildInfo()
10+
if ok {
11+
v = bi.Main.Version
12+
} else {
13+
// if we can't read a go module version then they're using a git
14+
// clone or vendored module so all we can do is report "dev" for
15+
// the version
16+
v = "dev"
17+
}
18+
}
19+
return v
20+
}

0 commit comments

Comments
 (0)