Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
7988d30
feat(admin): add describe cluster api support
DCjanus Nov 26, 2025
80c0bec
feat(describe-cluster): add example and extend admin support
DCjanus Nov 26, 2025
9dd956b
chore(deps): update dependencies to latest versions
DCjanus Nov 26, 2025
1f85f51
feat(describe-cluster): align example with admin helper
DCjanus Nov 26, 2025
e80c18d
feat(admin): remove debug print from DescribeCluster method
DCjanus Nov 26, 2025
cf965ca
fix(describe-cluster): align min version
DCjanus Nov 26, 2025
61bbd25
fix: document describe cluster gaps and sasl casing
DCjanus Nov 27, 2025
9df3b04
chore(examples): remove describe_cluster sample
DCjanus Dec 1, 2025
1b0a1b2
chore(deps): tidy go.sum
DCjanus Dec 1, 2025
2276139
chore(examples): tidy go.sum in submodules
DCjanus Dec 1, 2025
9efa7b3
chore(examples): bump sarama dep but keep local replace
DCjanus Dec 1, 2025
c73d2a2
fix(client): add nilguards to updateBroker (#3393)
dnwe Nov 27, 2025
01d79f5
chore(ci): bump the actions group across 1 directory with 3 updates (…
dependabot[bot] Nov 27, 2025
29e7bb0
chore(ci): bump github/codeql-action from 4.30.9 to 4.31.0 (#3364)
dependabot[bot] Nov 27, 2025
abdd82c
chore(deps): update docker/bake-action action to v6.10.0 (#3392)
renovate[bot] Nov 27, 2025
b4f0989
chore(deps): update dependency golangci/golangci-lint to v2.6.2 (#3366)
renovate[bot] Nov 27, 2025
4665646
Merge branch 'IBM:main' into feat/describe-cluster-support
DCjanus Dec 8, 2025
ae5f735
Merge remote-tracking branch 'upstream/main' into feat/describe-clust…
DCjanus Dec 19, 2025
4520b6c
chore(examples): tidy modules for compress v1.18.2
DCjanus Dec 19, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 66 additions & 1 deletion admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"io"
"maps"
"math/rand"
"net"
"strconv"
"sync"
"time"
Expand Down Expand Up @@ -309,6 +310,51 @@ func (ca *clusterAdmin) DescribeTopics(topics []string) (metadata []*TopicMetada
}

func (ca *clusterAdmin) DescribeCluster() (brokers []*Broker, controllerID int32, err error) {
if ca.conf.Version.IsAtLeast(V2_8_0_0) {
brokers, controllerID, err = ca.describeClusterUsingAPI()
if err == nil {
return brokers, controllerID, nil
}
if !errors.Is(err, ErrUnsupportedVersion) {
return nil, 0, err
}
}
return ca.describeClusterUsingMetadata()
}

func (ca *clusterAdmin) describeClusterUsingAPI() (brokers []*Broker, controllerID int32, err error) {
var response *DescribeClusterResponse
err = ca.retryOnError(isRetriableControllerError, func() error {
controller, err := ca.Controller()
if err != nil {
return err
}

request := NewDescribeClusterRequest(ca.conf.Version)
response, err = controller.DescribeCluster(request)
if err != nil {
return err
}
if !errors.Is(response.Err, ErrNoError) {
if isRetriableControllerError(response.Err) {
_, _ = ca.refreshController()
}
if response.ErrorMessage != nil && *response.ErrorMessage != "" {
return fmt.Errorf("%w: %s", response.Err, *response.ErrorMessage)
}
return response.Err
}
return nil
})
if err != nil {
return nil, 0, err
}

brokers = convertDescribeClusterBrokers(response.Brokers)
return brokers, response.ControllerID, nil
}

func (ca *clusterAdmin) describeClusterUsingMetadata() (brokers []*Broker, controllerID int32, err error) {
var response *MetadataResponse
err = ca.retryOnError(isRetriableControllerError, func() error {
controller, err := ca.Controller()
Expand All @@ -324,12 +370,31 @@ func (ca *clusterAdmin) DescribeCluster() (brokers []*Broker, controllerID int32
return err
})
if err != nil {
return nil, int32(0), err
return nil, 0, err
}

return response.Brokers, response.ControllerID, nil
}

func convertDescribeClusterBrokers(entries []*DescribeClusterBroker) []*Broker {
// TODO: DescribeCluster brokers currently drop DescribeCluster-specific fields
// such as IsFenced (KIP-1073) and ClusterAuthorizedOperations because Broker
// has no equivalents yet. This keeps API parity with MetadataResponse for now,
// but the richer fields need to be surfaced in a future change.
if len(entries) == 0 {
return nil
}
result := make([]*Broker, 0, len(entries))
for _, info := range entries {
addr := net.JoinHostPort(info.Host, strconv.Itoa(int(info.Port)))
b := NewBroker(addr)
b.id = info.BrokerID
b.rack = info.Rack
result = append(result, b)
}
return result
}

func (ca *clusterAdmin) findBroker(id int32) (*Broker, error) {
brokers := ca.client.Brokers()
for _, b := range brokers {
Expand Down
1 change: 1 addition & 0 deletions api_versions.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,4 +81,5 @@ const (
apiKeyAlterClientQuotas = 49
apiKeyDescribeUserScramCredentials = 50
apiKeyAlterUserScramCredentials = 51
apiKeyDescribeCluster = 60
)
11 changes: 11 additions & 0 deletions broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,17 @@ func (b *Broker) GetMetadata(request *MetadataRequest) (*MetadataResponse, error
return response, nil
}

func (b *Broker) DescribeCluster(request *DescribeClusterRequest) (*DescribeClusterResponse, error) {
response := new(DescribeClusterResponse)
response.Version = request.Version

if err := b.sendAndReceive(request, response); err != nil {
return nil, err
}

return response, nil
}

// GetConsumerMetadata send a consumer metadata request and returns a consumer metadata response or error
func (b *Broker) GetConsumerMetadata(request *ConsumerMetadataRequest) (*ConsumerMetadataResponse, error) {
response := new(ConsumerMetadataResponse)
Expand Down
107 changes: 107 additions & 0 deletions describe_cluster_request.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
package sarama

// DescribeClusterEndpointType enumerates the DescribeCluster endpoint types
// defined by the Kafka protocol.
const (
DescribeClusterEndpointTypeBrokers int8 = 1
DescribeClusterEndpointTypeControllers int8 = 2
)

type DescribeClusterRequest struct {
Version int16

IncludeClusterAuthorizedOperations bool
EndpointType int8
IncludeFencedBrokers bool
}

func NewDescribeClusterRequest(version KafkaVersion) *DescribeClusterRequest {
req := &DescribeClusterRequest{}

switch {
case version.IsAtLeast(V4_0_0_0):
req.Version = 2
case version.IsAtLeast(V3_7_0_0):
req.Version = 1
default:
req.Version = 0
}

if req.Version >= 1 {
req.EndpointType = DescribeClusterEndpointTypeBrokers
}
if req.Version >= 2 {
req.IncludeFencedBrokers = true
}

return req
}

func (r *DescribeClusterRequest) encode(pe packetEncoder) error {
if r.Version < 0 || r.Version > 2 {
return PacketEncodingError{"invalid or unsupported DescribeClusterRequest version"}
}

pe.putBool(r.IncludeClusterAuthorizedOperations)
if r.Version >= 1 {
pe.putInt8(r.EndpointType)
}
if r.Version >= 2 {
pe.putBool(r.IncludeFencedBrokers)
}
pe.putEmptyTaggedFieldArray()

return nil
}

func (r *DescribeClusterRequest) decode(pd packetDecoder, version int16) error {
r.Version = version
if r.Version < 0 || r.Version > 2 {
return PacketDecodingError{"invalid or unsupported DescribeClusterRequest version"}
}

var err error
if r.IncludeClusterAuthorizedOperations, err = pd.getBool(); err != nil {
return err
}
if r.Version >= 1 {
if r.EndpointType, err = pd.getInt8(); err != nil {
return err
}
}
if r.Version >= 2 {
if r.IncludeFencedBrokers, err = pd.getBool(); err != nil {
return err
}
}

_, err = pd.getEmptyTaggedFieldArray()
return err
}

func (r *DescribeClusterRequest) key() int16 { return apiKeyDescribeCluster }

func (r *DescribeClusterRequest) version() int16 { return r.Version }

func (r *DescribeClusterRequest) setVersion(v int16) { r.Version = v }

func (r *DescribeClusterRequest) headerVersion() int16 { return 2 }

func (r *DescribeClusterRequest) isValidVersion() bool {
return r.Version >= 0 && r.Version <= 2
}

func (r *DescribeClusterRequest) isFlexible() bool { return true }

func (r *DescribeClusterRequest) isFlexibleVersion(version int16) bool { return version >= 0 }

func (r *DescribeClusterRequest) requiredVersion() KafkaVersion {
switch r.Version {
case 2:
return V4_0_0_0
case 1:
return V3_7_0_0
default:
return V2_8_0_0
}
}
54 changes: 54 additions & 0 deletions describe_cluster_request_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
//go:build !functional

package sarama

import "testing"

func TestDescribeClusterRequestVersions(t *testing.T) {
t.Run("v0", func(t *testing.T) {
req := &DescribeClusterRequest{Version: 0, IncludeClusterAuthorizedOperations: true}
testRequestWithoutByteComparison(t, "DescribeClusterRequest v0", req)
})

t.Run("v1", func(t *testing.T) {
req := &DescribeClusterRequest{
Version: 1,
EndpointType: DescribeClusterEndpointTypeControllers,
}
testRequestWithoutByteComparison(t, "DescribeClusterRequest v1", req)
})

t.Run("v2", func(t *testing.T) {
req := &DescribeClusterRequest{
Version: 2,
EndpointType: DescribeClusterEndpointTypeBrokers,
IncludeFencedBrokers: true,
IncludeClusterAuthorizedOperations: true,
}
testRequestWithoutByteComparison(t, "DescribeClusterRequest v2", req)
})
}

func TestNewDescribeClusterRequest(t *testing.T) {
testCases := []struct {
version KafkaVersion
expectedVer int16
}{
{V2_8_0_0, 0},
{V3_7_0_0, 1},
{V4_0_0_0, 2},
}

for _, tc := range testCases {
req := NewDescribeClusterRequest(tc.version)
if req.Version != tc.expectedVer {
t.Fatalf("version mismatch for %v: got %d", tc.version, req.Version)
}
if req.Version >= 1 && req.EndpointType == 0 {
t.Fatalf("endpoint type not set for version %d", req.Version)
}
if req.Version < 2 && req.IncludeFencedBrokers {
t.Fatalf("include fenced brokers unexpectedly enabled for version %d", req.Version)
}
}
}
Loading
Loading