diff --git a/admin.go b/admin.go index a88fe6b06..da220ed2a 100644 --- a/admin.go +++ b/admin.go @@ -6,6 +6,7 @@ import ( "io" "maps" "math/rand" + "net" "strconv" "sync" "time" @@ -309,6 +310,51 @@ func (ca *clusterAdmin) DescribeTopics(topics []string) (metadata []*TopicMetada } func (ca *clusterAdmin) DescribeCluster() (brokers []*Broker, controllerID int32, err error) { + if ca.conf.Version.IsAtLeast(V2_8_0_0) { + brokers, controllerID, err = ca.describeClusterUsingAPI() + if err == nil { + return brokers, controllerID, nil + } + if !errors.Is(err, ErrUnsupportedVersion) { + return nil, 0, err + } + } + return ca.describeClusterUsingMetadata() +} + +func (ca *clusterAdmin) describeClusterUsingAPI() (brokers []*Broker, controllerID int32, err error) { + var response *DescribeClusterResponse + err = ca.retryOnError(isRetriableControllerError, func() error { + controller, err := ca.Controller() + if err != nil { + return err + } + + request := NewDescribeClusterRequest(ca.conf.Version) + response, err = controller.DescribeCluster(request) + if err != nil { + return err + } + if !errors.Is(response.Err, ErrNoError) { + if isRetriableControllerError(response.Err) { + _, _ = ca.refreshController() + } + if response.ErrorMessage != nil && *response.ErrorMessage != "" { + return fmt.Errorf("%w: %s", response.Err, *response.ErrorMessage) + } + return response.Err + } + return nil + }) + if err != nil { + return nil, 0, err + } + + brokers = convertDescribeClusterBrokers(response.Brokers) + return brokers, response.ControllerID, nil +} + +func (ca *clusterAdmin) describeClusterUsingMetadata() (brokers []*Broker, controllerID int32, err error) { var response *MetadataResponse err = ca.retryOnError(isRetriableControllerError, func() error { controller, err := ca.Controller() @@ -324,12 +370,31 @@ func (ca *clusterAdmin) DescribeCluster() (brokers []*Broker, controllerID int32 return err }) if err != nil { - return nil, int32(0), err + return nil, 0, err } return response.Brokers, response.ControllerID, nil } +func convertDescribeClusterBrokers(entries []*DescribeClusterBroker) []*Broker { + // TODO: DescribeCluster brokers currently drop DescribeCluster-specific fields + // such as IsFenced (KIP-1073) and ClusterAuthorizedOperations because Broker + // has no equivalents yet. This keeps API parity with MetadataResponse for now, + // but the richer fields need to be surfaced in a future change. + if len(entries) == 0 { + return nil + } + result := make([]*Broker, 0, len(entries)) + for _, info := range entries { + addr := net.JoinHostPort(info.Host, strconv.Itoa(int(info.Port))) + b := NewBroker(addr) + b.id = info.BrokerID + b.rack = info.Rack + result = append(result, b) + } + return result +} + func (ca *clusterAdmin) findBroker(id int32) (*Broker, error) { brokers := ca.client.Brokers() for _, b := range brokers { diff --git a/api_versions.go b/api_versions.go index e993432d0..cf0d0b871 100644 --- a/api_versions.go +++ b/api_versions.go @@ -81,4 +81,5 @@ const ( apiKeyAlterClientQuotas = 49 apiKeyDescribeUserScramCredentials = 50 apiKeyAlterUserScramCredentials = 51 + apiKeyDescribeCluster = 60 ) diff --git a/broker.go b/broker.go index 7c559cfcb..c56146eb5 100644 --- a/broker.go +++ b/broker.go @@ -400,6 +400,17 @@ func (b *Broker) GetMetadata(request *MetadataRequest) (*MetadataResponse, error return response, nil } +func (b *Broker) DescribeCluster(request *DescribeClusterRequest) (*DescribeClusterResponse, error) { + response := new(DescribeClusterResponse) + response.Version = request.Version + + if err := b.sendAndReceive(request, response); err != nil { + return nil, err + } + + return response, nil +} + // GetConsumerMetadata send a consumer metadata request and returns a consumer metadata response or error func (b *Broker) GetConsumerMetadata(request *ConsumerMetadataRequest) (*ConsumerMetadataResponse, error) { response := new(ConsumerMetadataResponse) diff --git a/describe_cluster_request.go b/describe_cluster_request.go new file mode 100644 index 000000000..abd0ebe68 --- /dev/null +++ b/describe_cluster_request.go @@ -0,0 +1,107 @@ +package sarama + +// DescribeClusterEndpointType enumerates the DescribeCluster endpoint types +// defined by the Kafka protocol. +const ( + DescribeClusterEndpointTypeBrokers int8 = 1 + DescribeClusterEndpointTypeControllers int8 = 2 +) + +type DescribeClusterRequest struct { + Version int16 + + IncludeClusterAuthorizedOperations bool + EndpointType int8 + IncludeFencedBrokers bool +} + +func NewDescribeClusterRequest(version KafkaVersion) *DescribeClusterRequest { + req := &DescribeClusterRequest{} + + switch { + case version.IsAtLeast(V4_0_0_0): + req.Version = 2 + case version.IsAtLeast(V3_7_0_0): + req.Version = 1 + default: + req.Version = 0 + } + + if req.Version >= 1 { + req.EndpointType = DescribeClusterEndpointTypeBrokers + } + if req.Version >= 2 { + req.IncludeFencedBrokers = true + } + + return req +} + +func (r *DescribeClusterRequest) encode(pe packetEncoder) error { + if r.Version < 0 || r.Version > 2 { + return PacketEncodingError{"invalid or unsupported DescribeClusterRequest version"} + } + + pe.putBool(r.IncludeClusterAuthorizedOperations) + if r.Version >= 1 { + pe.putInt8(r.EndpointType) + } + if r.Version >= 2 { + pe.putBool(r.IncludeFencedBrokers) + } + pe.putEmptyTaggedFieldArray() + + return nil +} + +func (r *DescribeClusterRequest) decode(pd packetDecoder, version int16) error { + r.Version = version + if r.Version < 0 || r.Version > 2 { + return PacketDecodingError{"invalid or unsupported DescribeClusterRequest version"} + } + + var err error + if r.IncludeClusterAuthorizedOperations, err = pd.getBool(); err != nil { + return err + } + if r.Version >= 1 { + if r.EndpointType, err = pd.getInt8(); err != nil { + return err + } + } + if r.Version >= 2 { + if r.IncludeFencedBrokers, err = pd.getBool(); err != nil { + return err + } + } + + _, err = pd.getEmptyTaggedFieldArray() + return err +} + +func (r *DescribeClusterRequest) key() int16 { return apiKeyDescribeCluster } + +func (r *DescribeClusterRequest) version() int16 { return r.Version } + +func (r *DescribeClusterRequest) setVersion(v int16) { r.Version = v } + +func (r *DescribeClusterRequest) headerVersion() int16 { return 2 } + +func (r *DescribeClusterRequest) isValidVersion() bool { + return r.Version >= 0 && r.Version <= 2 +} + +func (r *DescribeClusterRequest) isFlexible() bool { return true } + +func (r *DescribeClusterRequest) isFlexibleVersion(version int16) bool { return version >= 0 } + +func (r *DescribeClusterRequest) requiredVersion() KafkaVersion { + switch r.Version { + case 2: + return V4_0_0_0 + case 1: + return V3_7_0_0 + default: + return V2_8_0_0 + } +} diff --git a/describe_cluster_request_test.go b/describe_cluster_request_test.go new file mode 100644 index 000000000..6549714a7 --- /dev/null +++ b/describe_cluster_request_test.go @@ -0,0 +1,54 @@ +//go:build !functional + +package sarama + +import "testing" + +func TestDescribeClusterRequestVersions(t *testing.T) { + t.Run("v0", func(t *testing.T) { + req := &DescribeClusterRequest{Version: 0, IncludeClusterAuthorizedOperations: true} + testRequestWithoutByteComparison(t, "DescribeClusterRequest v0", req) + }) + + t.Run("v1", func(t *testing.T) { + req := &DescribeClusterRequest{ + Version: 1, + EndpointType: DescribeClusterEndpointTypeControllers, + } + testRequestWithoutByteComparison(t, "DescribeClusterRequest v1", req) + }) + + t.Run("v2", func(t *testing.T) { + req := &DescribeClusterRequest{ + Version: 2, + EndpointType: DescribeClusterEndpointTypeBrokers, + IncludeFencedBrokers: true, + IncludeClusterAuthorizedOperations: true, + } + testRequestWithoutByteComparison(t, "DescribeClusterRequest v2", req) + }) +} + +func TestNewDescribeClusterRequest(t *testing.T) { + testCases := []struct { + version KafkaVersion + expectedVer int16 + }{ + {V2_8_0_0, 0}, + {V3_7_0_0, 1}, + {V4_0_0_0, 2}, + } + + for _, tc := range testCases { + req := NewDescribeClusterRequest(tc.version) + if req.Version != tc.expectedVer { + t.Fatalf("version mismatch for %v: got %d", tc.version, req.Version) + } + if req.Version >= 1 && req.EndpointType == 0 { + t.Fatalf("endpoint type not set for version %d", req.Version) + } + if req.Version < 2 && req.IncludeFencedBrokers { + t.Fatalf("include fenced brokers unexpectedly enabled for version %d", req.Version) + } + } +} diff --git a/describe_cluster_response.go b/describe_cluster_response.go new file mode 100644 index 000000000..28ae76c0b --- /dev/null +++ b/describe_cluster_response.go @@ -0,0 +1,176 @@ +package sarama + +import "time" + +type DescribeClusterResponse struct { + Version int16 + + ThrottleTimeMs int32 + Err KError + ErrorMessage *string + EndpointType int8 + ClusterID string + ControllerID int32 + Brokers []*DescribeClusterBroker + + ClusterAuthorizedOperations int32 +} + +type DescribeClusterBroker struct { + BrokerID int32 + Host string + Port int32 + Rack *string + IsFenced bool +} + +func (r *DescribeClusterResponse) encode(pe packetEncoder) error { + if r.Version < 0 || r.Version > 2 { + return PacketEncodingError{"invalid or unsupported DescribeClusterResponse version"} + } + + pe.putInt32(r.ThrottleTimeMs) + pe.putInt16(int16(r.Err)) + if err := pe.putNullableString(r.ErrorMessage); err != nil { + return err + } + if r.Version >= 1 { + pe.putInt8(r.EndpointType) + } + if err := pe.putString(r.ClusterID); err != nil { + return err + } + pe.putInt32(r.ControllerID) + + if err := pe.putArrayLength(len(r.Brokers)); err != nil { + return err + } + for _, broker := range r.Brokers { + if err := broker.encode(pe, r.Version); err != nil { + return err + } + } + + pe.putInt32(r.ClusterAuthorizedOperations) + pe.putEmptyTaggedFieldArray() + return nil +} + +func (r *DescribeClusterResponse) decode(pd packetDecoder, version int16) error { + r.Version = version + if r.Version < 0 || r.Version > 2 { + return PacketDecodingError{"invalid or unsupported DescribeClusterResponse version"} + } + + var err error + if r.ThrottleTimeMs, err = pd.getInt32(); err != nil { + return err + } + if r.Err, err = pd.getKError(); err != nil { + return err + } + if r.ErrorMessage, err = pd.getNullableString(); err != nil { + return err + } + if r.Version >= 1 { + if r.EndpointType, err = pd.getInt8(); err != nil { + return err + } + } + if r.ClusterID, err = pd.getString(); err != nil { + return err + } + if r.ControllerID, err = pd.getInt32(); err != nil { + return err + } + + brokerCount, err := pd.getArrayLength() + if err != nil { + return err + } + r.Brokers = make([]*DescribeClusterBroker, brokerCount) + for i := 0; i < brokerCount; i++ { + broker := &DescribeClusterBroker{} + if err := broker.decode(pd, r.Version); err != nil { + return err + } + r.Brokers[i] = broker + } + + if r.ClusterAuthorizedOperations, err = pd.getInt32(); err != nil { + return err + } + + _, err = pd.getEmptyTaggedFieldArray() + return err +} + +func (r *DescribeClusterResponse) key() int16 { return apiKeyDescribeCluster } + +func (r *DescribeClusterResponse) version() int16 { return r.Version } + +func (r *DescribeClusterResponse) setVersion(v int16) { r.Version = v } + +func (r *DescribeClusterResponse) headerVersion() int16 { return 1 } + +func (r *DescribeClusterResponse) isValidVersion() bool { + return r.Version >= 0 && r.Version <= 2 +} + +func (r *DescribeClusterResponse) isFlexible() bool { return true } + +func (r *DescribeClusterResponse) isFlexibleVersion(version int16) bool { return version >= 0 } + +func (r *DescribeClusterResponse) requiredVersion() KafkaVersion { + switch r.Version { + case 2: + return V4_0_0_0 + case 1: + return V3_7_0_0 + default: + return V2_8_0_0 + } +} + +func (r *DescribeClusterResponse) throttleTime() time.Duration { + return time.Duration(r.ThrottleTimeMs) * time.Millisecond +} + +func (b *DescribeClusterBroker) encode(pe packetEncoder, version int16) error { + pe.putInt32(b.BrokerID) + if err := pe.putString(b.Host); err != nil { + return err + } + pe.putInt32(b.Port) + if err := pe.putNullableString(b.Rack); err != nil { + return err + } + if version >= 2 { + pe.putBool(b.IsFenced) + } + pe.putEmptyTaggedFieldArray() + return nil +} + +func (b *DescribeClusterBroker) decode(pd packetDecoder, version int16) error { + var err error + if b.BrokerID, err = pd.getInt32(); err != nil { + return err + } + if b.Host, err = pd.getString(); err != nil { + return err + } + if b.Port, err = pd.getInt32(); err != nil { + return err + } + if b.Rack, err = pd.getNullableString(); err != nil { + return err + } + if version >= 2 { + if b.IsFenced, err = pd.getBool(); err != nil { + return err + } + } + _, err = pd.getEmptyTaggedFieldArray() + return err +} diff --git a/describe_cluster_response_test.go b/describe_cluster_response_test.go new file mode 100644 index 000000000..13fbcc1e4 --- /dev/null +++ b/describe_cluster_response_test.go @@ -0,0 +1,28 @@ +//go:build !functional + +package sarama + +import "testing" + +func TestDescribeClusterResponse(t *testing.T) { + rib := &DescribeClusterBroker{ + BrokerID: 1, + Host: "localhost", + Port: 9092, + Rack: nullString("rack-a"), + IsFenced: true, + } + + resp := &DescribeClusterResponse{ + Version: 2, + ThrottleTimeMs: 10, + Err: ErrNoError, + EndpointType: DescribeClusterEndpointTypeBrokers, + ClusterID: "cluster-1", + ControllerID: 1, + Brokers: []*DescribeClusterBroker{rib}, + ClusterAuthorizedOperations: 7, + } + + testResponse(t, "DescribeClusterResponse", resp, nil) +} diff --git a/examples/consumergroup/go.mod b/examples/consumergroup/go.mod index 723ff7c58..7fa672a9a 100644 --- a/examples/consumergroup/go.mod +++ b/examples/consumergroup/go.mod @@ -2,21 +2,21 @@ module github.com/IBM/sarama/examples/consumer go 1.24.0 -require github.com/IBM/sarama v1.45.0 +require github.com/IBM/sarama v1.46.3 require ( github.com/davecgh/go-spew v1.1.1 // indirect github.com/eapache/go-resiliency v1.7.0 // indirect github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 // indirect github.com/eapache/queue v1.1.0 // indirect - github.com/golang/snappy v0.0.4 // indirect + github.com/golang/snappy v1.0.0 // indirect github.com/hashicorp/go-uuid v1.0.3 // indirect github.com/jcmturner/aescts/v2 v2.0.0 // indirect github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect github.com/jcmturner/gofork v1.7.6 // indirect github.com/jcmturner/gokrb5/v8 v8.4.4 // indirect github.com/jcmturner/rpc/v2 v2.0.3 // indirect - github.com/klauspost/compress v1.18.1 // indirect + github.com/klauspost/compress v1.18.2 // indirect github.com/pierrec/lz4/v4 v4.1.22 // indirect github.com/rcrowley/go-metrics v0.0.0-20250401214520-65e299d6c5c9 // indirect golang.org/x/crypto v0.45.0 // indirect diff --git a/examples/consumergroup/go.sum b/examples/consumergroup/go.sum index da67d3a5e..7486c1530 100644 --- a/examples/consumergroup/go.sum +++ b/examples/consumergroup/go.sum @@ -9,8 +9,8 @@ github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc= github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I= github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw= github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g= -github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= -github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/golang/snappy v1.0.0 h1:Oy607GVXHs7RtbggtPBnr2RmDArIsAefDwvrdWvRhGs= +github.com/golang/snappy v1.0.0/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/gorilla/securecookie v1.1.1/go.mod h1:ra0sb63/xPlUeL+yeDciTfxMRAA+MP+HVt/4epWDjd4= github.com/gorilla/sessions v1.2.1/go.mod h1:dk2InVEVJ0sfLlnXv9EAgkf6ecYs/i80K/zI+bUmuGM= github.com/hashicorp/go-uuid v1.0.2/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= @@ -28,8 +28,8 @@ github.com/jcmturner/gokrb5/v8 v8.4.4 h1:x1Sv4HaTpepFkXbt2IkL29DXRf8sOfZXo8eRKh6 github.com/jcmturner/gokrb5/v8 v8.4.4/go.mod h1:1btQEpgT6k+unzCwX1KdWMEwPPkkgBtP+F6aCACiMrs= github.com/jcmturner/rpc/v2 v2.0.3 h1:7FXXj8Ti1IaVFpSAziCZWNzbNuZmnvw/i6CqLNdWfZY= github.com/jcmturner/rpc/v2 v2.0.3/go.mod h1:VUJYCIDm3PVOEHw8sgt091/20OJjskO/YJki3ELg/Hc= -github.com/klauspost/compress v1.18.1 h1:bcSGx7UbpBqMChDtsF28Lw6v/G94LPrrbMbdC3JH2co= -github.com/klauspost/compress v1.18.1/go.mod h1:ZQFFVG+MdnR0P+l6wpXgIL4NTtwiKIdBnrBd8Nrxr+0= +github.com/klauspost/compress v1.18.2 h1:iiPHWW0YrcFgpBYhsA6D1+fqHssJscY/Tm/y2Uqnapk= +github.com/klauspost/compress v1.18.2/go.mod h1:R0h/fSBs8DE4ENlcrlib3PsXS61voFxhIs2DeRhCvJ4= github.com/pierrec/lz4/v4 v4.1.22 h1:cKFw6uJDK+/gfw5BcDL0JL5aBsAFdsIT18eRtLj7VIU= github.com/pierrec/lz4/v4 v4.1.22/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= @@ -62,8 +62,8 @@ golang.org/x/net v0.47.0 h1:Mx+4dIFzqraBXUugkia1OOvlD6LemFo1ALMHjrXDOhY= golang.org/x/net v0.47.0/go.mod h1:/jNxtkgq5yWUGYkaZGqo27cfGZ1c5Nen03aYrrKpVRU= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.17.0 h1:l60nONMj9l5drqw6jlhIELNv9I0A4OFgRsG9k2oT9Ug= -golang.org/x/sync v0.17.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= +golang.org/x/sync v0.18.0 h1:kr88TuHDroi+UVf+0hZnirlk8o8T+4MrK6mr60WkH/I= +golang.org/x/sync v0.18.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= diff --git a/examples/exactly_once/go.mod b/examples/exactly_once/go.mod index 99fedd3a1..132b26bb0 100644 --- a/examples/exactly_once/go.mod +++ b/examples/exactly_once/go.mod @@ -2,7 +2,7 @@ module github.com/IBM/sarama/examples/exactly_once go 1.24.0 -require github.com/IBM/sarama v1.45.0 +require github.com/IBM/sarama v1.46.3 require ( github.com/davecgh/go-spew v1.1.1 // indirect @@ -16,7 +16,7 @@ require ( github.com/jcmturner/gofork v1.7.6 // indirect github.com/jcmturner/gokrb5/v8 v8.4.4 // indirect github.com/jcmturner/rpc/v2 v2.0.3 // indirect - github.com/klauspost/compress v1.18.1 // indirect + github.com/klauspost/compress v1.18.2 // indirect github.com/pierrec/lz4/v4 v4.1.22 // indirect github.com/rcrowley/go-metrics v0.0.0-20250401214520-65e299d6c5c9 // indirect golang.org/x/crypto v0.45.0 // indirect diff --git a/examples/exactly_once/go.sum b/examples/exactly_once/go.sum index da67d3a5e..5aa6fda55 100644 --- a/examples/exactly_once/go.sum +++ b/examples/exactly_once/go.sum @@ -28,8 +28,8 @@ github.com/jcmturner/gokrb5/v8 v8.4.4 h1:x1Sv4HaTpepFkXbt2IkL29DXRf8sOfZXo8eRKh6 github.com/jcmturner/gokrb5/v8 v8.4.4/go.mod h1:1btQEpgT6k+unzCwX1KdWMEwPPkkgBtP+F6aCACiMrs= github.com/jcmturner/rpc/v2 v2.0.3 h1:7FXXj8Ti1IaVFpSAziCZWNzbNuZmnvw/i6CqLNdWfZY= github.com/jcmturner/rpc/v2 v2.0.3/go.mod h1:VUJYCIDm3PVOEHw8sgt091/20OJjskO/YJki3ELg/Hc= -github.com/klauspost/compress v1.18.1 h1:bcSGx7UbpBqMChDtsF28Lw6v/G94LPrrbMbdC3JH2co= -github.com/klauspost/compress v1.18.1/go.mod h1:ZQFFVG+MdnR0P+l6wpXgIL4NTtwiKIdBnrBd8Nrxr+0= +github.com/klauspost/compress v1.18.2 h1:iiPHWW0YrcFgpBYhsA6D1+fqHssJscY/Tm/y2Uqnapk= +github.com/klauspost/compress v1.18.2/go.mod h1:R0h/fSBs8DE4ENlcrlib3PsXS61voFxhIs2DeRhCvJ4= github.com/pierrec/lz4/v4 v4.1.22 h1:cKFw6uJDK+/gfw5BcDL0JL5aBsAFdsIT18eRtLj7VIU= github.com/pierrec/lz4/v4 v4.1.22/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= @@ -62,8 +62,8 @@ golang.org/x/net v0.47.0 h1:Mx+4dIFzqraBXUugkia1OOvlD6LemFo1ALMHjrXDOhY= golang.org/x/net v0.47.0/go.mod h1:/jNxtkgq5yWUGYkaZGqo27cfGZ1c5Nen03aYrrKpVRU= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.17.0 h1:l60nONMj9l5drqw6jlhIELNv9I0A4OFgRsG9k2oT9Ug= -golang.org/x/sync v0.17.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= +golang.org/x/sync v0.18.0 h1:kr88TuHDroi+UVf+0hZnirlk8o8T+4MrK6mr60WkH/I= +golang.org/x/sync v0.18.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= diff --git a/examples/http_server/go.mod b/examples/http_server/go.mod index 2f5c2452b..bae92128b 100644 --- a/examples/http_server/go.mod +++ b/examples/http_server/go.mod @@ -2,7 +2,7 @@ module github.com/IBM/sarama/examples/http_server go 1.24.0 -require github.com/IBM/sarama v1.45.0 +require github.com/IBM/sarama v1.46.3 require ( github.com/davecgh/go-spew v1.1.1 // indirect @@ -16,7 +16,7 @@ require ( github.com/jcmturner/gofork v1.7.6 // indirect github.com/jcmturner/gokrb5/v8 v8.4.4 // indirect github.com/jcmturner/rpc/v2 v2.0.3 // indirect - github.com/klauspost/compress v1.18.1 // indirect + github.com/klauspost/compress v1.18.2 // indirect github.com/pierrec/lz4/v4 v4.1.22 // indirect github.com/rcrowley/go-metrics v0.0.0-20250401214520-65e299d6c5c9 // indirect golang.org/x/crypto v0.45.0 // indirect diff --git a/examples/http_server/go.sum b/examples/http_server/go.sum index da67d3a5e..5aa6fda55 100644 --- a/examples/http_server/go.sum +++ b/examples/http_server/go.sum @@ -28,8 +28,8 @@ github.com/jcmturner/gokrb5/v8 v8.4.4 h1:x1Sv4HaTpepFkXbt2IkL29DXRf8sOfZXo8eRKh6 github.com/jcmturner/gokrb5/v8 v8.4.4/go.mod h1:1btQEpgT6k+unzCwX1KdWMEwPPkkgBtP+F6aCACiMrs= github.com/jcmturner/rpc/v2 v2.0.3 h1:7FXXj8Ti1IaVFpSAziCZWNzbNuZmnvw/i6CqLNdWfZY= github.com/jcmturner/rpc/v2 v2.0.3/go.mod h1:VUJYCIDm3PVOEHw8sgt091/20OJjskO/YJki3ELg/Hc= -github.com/klauspost/compress v1.18.1 h1:bcSGx7UbpBqMChDtsF28Lw6v/G94LPrrbMbdC3JH2co= -github.com/klauspost/compress v1.18.1/go.mod h1:ZQFFVG+MdnR0P+l6wpXgIL4NTtwiKIdBnrBd8Nrxr+0= +github.com/klauspost/compress v1.18.2 h1:iiPHWW0YrcFgpBYhsA6D1+fqHssJscY/Tm/y2Uqnapk= +github.com/klauspost/compress v1.18.2/go.mod h1:R0h/fSBs8DE4ENlcrlib3PsXS61voFxhIs2DeRhCvJ4= github.com/pierrec/lz4/v4 v4.1.22 h1:cKFw6uJDK+/gfw5BcDL0JL5aBsAFdsIT18eRtLj7VIU= github.com/pierrec/lz4/v4 v4.1.22/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= @@ -62,8 +62,8 @@ golang.org/x/net v0.47.0 h1:Mx+4dIFzqraBXUugkia1OOvlD6LemFo1ALMHjrXDOhY= golang.org/x/net v0.47.0/go.mod h1:/jNxtkgq5yWUGYkaZGqo27cfGZ1c5Nen03aYrrKpVRU= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.17.0 h1:l60nONMj9l5drqw6jlhIELNv9I0A4OFgRsG9k2oT9Ug= -golang.org/x/sync v0.17.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= +golang.org/x/sync v0.18.0 h1:kr88TuHDroi+UVf+0hZnirlk8o8T+4MrK6mr60WkH/I= +golang.org/x/sync v0.18.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= diff --git a/examples/interceptors/go.mod b/examples/interceptors/go.mod index e4ec20c81..f10003ae8 100644 --- a/examples/interceptors/go.mod +++ b/examples/interceptors/go.mod @@ -3,7 +3,7 @@ module github.com/IBM/sarama/examples/interceptors go 1.24.0 require ( - github.com/IBM/sarama v1.45.0 + github.com/IBM/sarama v1.46.3 go.opentelemetry.io/otel v1.29.0 go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v1.29.0 go.opentelemetry.io/otel/sdk v1.29.0 @@ -25,7 +25,7 @@ require ( github.com/jcmturner/gofork v1.7.6 // indirect github.com/jcmturner/gokrb5/v8 v8.4.4 // indirect github.com/jcmturner/rpc/v2 v2.0.3 // indirect - github.com/klauspost/compress v1.18.1 // indirect + github.com/klauspost/compress v1.18.2 // indirect github.com/pierrec/lz4/v4 v4.1.22 // indirect github.com/rcrowley/go-metrics v0.0.0-20250401214520-65e299d6c5c9 // indirect go.opentelemetry.io/otel/metric v1.29.0 // indirect diff --git a/examples/interceptors/go.sum b/examples/interceptors/go.sum index e86869af1..b30ab660b 100644 --- a/examples/interceptors/go.sum +++ b/examples/interceptors/go.sum @@ -37,8 +37,8 @@ github.com/jcmturner/gokrb5/v8 v8.4.4 h1:x1Sv4HaTpepFkXbt2IkL29DXRf8sOfZXo8eRKh6 github.com/jcmturner/gokrb5/v8 v8.4.4/go.mod h1:1btQEpgT6k+unzCwX1KdWMEwPPkkgBtP+F6aCACiMrs= github.com/jcmturner/rpc/v2 v2.0.3 h1:7FXXj8Ti1IaVFpSAziCZWNzbNuZmnvw/i6CqLNdWfZY= github.com/jcmturner/rpc/v2 v2.0.3/go.mod h1:VUJYCIDm3PVOEHw8sgt091/20OJjskO/YJki3ELg/Hc= -github.com/klauspost/compress v1.18.1 h1:bcSGx7UbpBqMChDtsF28Lw6v/G94LPrrbMbdC3JH2co= -github.com/klauspost/compress v1.18.1/go.mod h1:ZQFFVG+MdnR0P+l6wpXgIL4NTtwiKIdBnrBd8Nrxr+0= +github.com/klauspost/compress v1.18.2 h1:iiPHWW0YrcFgpBYhsA6D1+fqHssJscY/Tm/y2Uqnapk= +github.com/klauspost/compress v1.18.2/go.mod h1:R0h/fSBs8DE4ENlcrlib3PsXS61voFxhIs2DeRhCvJ4= github.com/pierrec/lz4/v4 v4.1.22 h1:cKFw6uJDK+/gfw5BcDL0JL5aBsAFdsIT18eRtLj7VIU= github.com/pierrec/lz4/v4 v4.1.22/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= @@ -83,8 +83,8 @@ golang.org/x/net v0.47.0 h1:Mx+4dIFzqraBXUugkia1OOvlD6LemFo1ALMHjrXDOhY= golang.org/x/net v0.47.0/go.mod h1:/jNxtkgq5yWUGYkaZGqo27cfGZ1c5Nen03aYrrKpVRU= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.17.0 h1:l60nONMj9l5drqw6jlhIELNv9I0A4OFgRsG9k2oT9Ug= -golang.org/x/sync v0.17.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= +golang.org/x/sync v0.18.0 h1:kr88TuHDroi+UVf+0hZnirlk8o8T+4MrK6mr60WkH/I= +golang.org/x/sync v0.18.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= diff --git a/examples/sasl_scram_client/go.mod b/examples/sasl_scram_client/go.mod index 471dc1280..1b81aee54 100644 --- a/examples/sasl_scram_client/go.mod +++ b/examples/sasl_scram_client/go.mod @@ -3,7 +3,7 @@ module github.com/IBM/sarama/examples/sasl_scram_client go 1.24.0 require ( - github.com/IBM/sarama v1.45.0 + github.com/IBM/sarama v1.46.3 github.com/xdg-go/scram v1.1.2 ) @@ -19,7 +19,7 @@ require ( github.com/jcmturner/gofork v1.7.6 // indirect github.com/jcmturner/gokrb5/v8 v8.4.4 // indirect github.com/jcmturner/rpc/v2 v2.0.3 // indirect - github.com/klauspost/compress v1.18.1 // indirect + github.com/klauspost/compress v1.18.2 // indirect github.com/pierrec/lz4/v4 v4.1.22 // indirect github.com/rcrowley/go-metrics v0.0.0-20250401214520-65e299d6c5c9 // indirect github.com/xdg-go/pbkdf2 v1.0.0 // indirect diff --git a/examples/sasl_scram_client/go.sum b/examples/sasl_scram_client/go.sum index a42a7a7d4..0d8e35d5d 100644 --- a/examples/sasl_scram_client/go.sum +++ b/examples/sasl_scram_client/go.sum @@ -28,8 +28,8 @@ github.com/jcmturner/gokrb5/v8 v8.4.4 h1:x1Sv4HaTpepFkXbt2IkL29DXRf8sOfZXo8eRKh6 github.com/jcmturner/gokrb5/v8 v8.4.4/go.mod h1:1btQEpgT6k+unzCwX1KdWMEwPPkkgBtP+F6aCACiMrs= github.com/jcmturner/rpc/v2 v2.0.3 h1:7FXXj8Ti1IaVFpSAziCZWNzbNuZmnvw/i6CqLNdWfZY= github.com/jcmturner/rpc/v2 v2.0.3/go.mod h1:VUJYCIDm3PVOEHw8sgt091/20OJjskO/YJki3ELg/Hc= -github.com/klauspost/compress v1.18.1 h1:bcSGx7UbpBqMChDtsF28Lw6v/G94LPrrbMbdC3JH2co= -github.com/klauspost/compress v1.18.1/go.mod h1:ZQFFVG+MdnR0P+l6wpXgIL4NTtwiKIdBnrBd8Nrxr+0= +github.com/klauspost/compress v1.18.2 h1:iiPHWW0YrcFgpBYhsA6D1+fqHssJscY/Tm/y2Uqnapk= +github.com/klauspost/compress v1.18.2/go.mod h1:R0h/fSBs8DE4ENlcrlib3PsXS61voFxhIs2DeRhCvJ4= github.com/pierrec/lz4/v4 v4.1.22 h1:cKFw6uJDK+/gfw5BcDL0JL5aBsAFdsIT18eRtLj7VIU= github.com/pierrec/lz4/v4 v4.1.22/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= diff --git a/examples/txn_producer/go.mod b/examples/txn_producer/go.mod index 316dce1c1..ad95d3186 100644 --- a/examples/txn_producer/go.mod +++ b/examples/txn_producer/go.mod @@ -3,7 +3,7 @@ module github.com/IBM/sarama/examples/txn_producer go 1.24.0 require ( - github.com/IBM/sarama v1.45.0 + github.com/IBM/sarama v1.46.3 github.com/rcrowley/go-metrics v0.0.0-20250401214520-65e299d6c5c9 ) @@ -19,7 +19,7 @@ require ( github.com/jcmturner/gofork v1.7.6 // indirect github.com/jcmturner/gokrb5/v8 v8.4.4 // indirect github.com/jcmturner/rpc/v2 v2.0.3 // indirect - github.com/klauspost/compress v1.18.1 // indirect + github.com/klauspost/compress v1.18.2 // indirect github.com/pierrec/lz4/v4 v4.1.22 // indirect golang.org/x/crypto v0.45.0 // indirect golang.org/x/net v0.47.0 // indirect diff --git a/examples/txn_producer/go.sum b/examples/txn_producer/go.sum index da67d3a5e..5aa6fda55 100644 --- a/examples/txn_producer/go.sum +++ b/examples/txn_producer/go.sum @@ -28,8 +28,8 @@ github.com/jcmturner/gokrb5/v8 v8.4.4 h1:x1Sv4HaTpepFkXbt2IkL29DXRf8sOfZXo8eRKh6 github.com/jcmturner/gokrb5/v8 v8.4.4/go.mod h1:1btQEpgT6k+unzCwX1KdWMEwPPkkgBtP+F6aCACiMrs= github.com/jcmturner/rpc/v2 v2.0.3 h1:7FXXj8Ti1IaVFpSAziCZWNzbNuZmnvw/i6CqLNdWfZY= github.com/jcmturner/rpc/v2 v2.0.3/go.mod h1:VUJYCIDm3PVOEHw8sgt091/20OJjskO/YJki3ELg/Hc= -github.com/klauspost/compress v1.18.1 h1:bcSGx7UbpBqMChDtsF28Lw6v/G94LPrrbMbdC3JH2co= -github.com/klauspost/compress v1.18.1/go.mod h1:ZQFFVG+MdnR0P+l6wpXgIL4NTtwiKIdBnrBd8Nrxr+0= +github.com/klauspost/compress v1.18.2 h1:iiPHWW0YrcFgpBYhsA6D1+fqHssJscY/Tm/y2Uqnapk= +github.com/klauspost/compress v1.18.2/go.mod h1:R0h/fSBs8DE4ENlcrlib3PsXS61voFxhIs2DeRhCvJ4= github.com/pierrec/lz4/v4 v4.1.22 h1:cKFw6uJDK+/gfw5BcDL0JL5aBsAFdsIT18eRtLj7VIU= github.com/pierrec/lz4/v4 v4.1.22/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= @@ -62,8 +62,8 @@ golang.org/x/net v0.47.0 h1:Mx+4dIFzqraBXUugkia1OOvlD6LemFo1ALMHjrXDOhY= golang.org/x/net v0.47.0/go.mod h1:/jNxtkgq5yWUGYkaZGqo27cfGZ1c5Nen03aYrrKpVRU= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.17.0 h1:l60nONMj9l5drqw6jlhIELNv9I0A4OFgRsG9k2oT9Ug= -golang.org/x/sync v0.17.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= +golang.org/x/sync v0.18.0 h1:kr88TuHDroi+UVf+0hZnirlk8o8T+4MrK6mr60WkH/I= +golang.org/x/sync v0.18.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= diff --git a/request.go b/request.go index 83a1c466f..861815452 100644 --- a/request.go +++ b/request.go @@ -217,6 +217,8 @@ func allocateBody(key, version int16) protocolBody { return &DescribeUserScramCredentialsRequest{Version: version} case apiKeyAlterUserScramCredentials: return &AlterUserScramCredentialsRequest{Version: version} + case apiKeyDescribeCluster: + return &DescribeClusterRequest{Version: version} // 52: VoteRequest // 53: BeginQuorumEpochRequest // 54: EndQuorumEpochRequest diff --git a/request_test.go b/request_test.go index 581d08232..5dc6e7c08 100644 --- a/request_test.go +++ b/request_test.go @@ -73,7 +73,7 @@ var names = map[int16]string{ 57: "UpdateFeaturesRequest", 58: "EnvelopeRequest", 59: "FetchSnapshotRequest", - 60: "DescribeClusterRequest", + apiKeyDescribeCluster: "DescribeClusterRequest", 61: "DescribeProducersRequest", 62: "BrokerRegistrationRequest", 63: "BrokerHeartbeatRequest", @@ -174,6 +174,8 @@ func allocateResponseBody(req protocolBody) protocolBody { return &DescribeUserScramCredentialsResponse{Version: version} case apiKeyAlterUserScramCredentials: return &AlterUserScramCredentialsResponse{Version: version} + case apiKeyDescribeCluster: + return &DescribeClusterResponse{Version: version} } return nil }