From 13584b5bd6de6959469852b70df73cfcc712a821 Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Mon, 20 Jan 2025 16:15:53 -0700 Subject: [PATCH] kadm: always request authorized operations --- pkg/kadm/groups.go | 16 ++++++++++------ pkg/kadm/metadata.go | 27 +++++++++++++++++---------- 2 files changed, 27 insertions(+), 16 deletions(-) diff --git a/pkg/kadm/groups.go b/pkg/kadm/groups.go index 9f1fc60b..ffad3556 100644 --- a/pkg/kadm/groups.go +++ b/pkg/kadm/groups.go @@ -104,6 +104,8 @@ type DescribedGroup struct { Protocol string // Protocol is the partition assignor strategy this group is using. Members []DescribedGroupMember // Members contains the members of this group sorted first by InstanceID, or if nil, by MemberID. + AuthorizedOperations []ACLOperation // AuthorizedOperations contains operations the requesting client is allowed to perform on this group. + Err error // Err is non-nil if the group could not be described. } @@ -275,6 +277,7 @@ func (cl *Client) DescribeGroups(ctx context.Context, groups ...string) (Describ req := kmsg.NewPtrDescribeGroupsRequest() req.Groups = groups + req.IncludeAuthorizedOperations = true shards := cl.cl.RequestSharded(ctx, req) described := make(DescribedGroups) @@ -285,12 +288,13 @@ func (cl *Client) DescribeGroups(ctx context.Context, groups ...string) (Describ return err } g := DescribedGroup{ - Group: rg.Group, - Coordinator: b, - State: rg.State, - ProtocolType: rg.ProtocolType, - Protocol: rg.Protocol, - Err: kerr.ErrorForCode(rg.ErrorCode), + Group: rg.Group, + Coordinator: b, + State: rg.State, + ProtocolType: rg.ProtocolType, + Protocol: rg.Protocol, + AuthorizedOperations: DecodeACLOperations(rg.AuthorizedOperations), + Err: kerr.ErrorForCode(rg.ErrorCode), } for _, rm := range rg.Members { gm := DescribedGroupMember{ diff --git a/pkg/kadm/metadata.go b/pkg/kadm/metadata.go index f2797186..6b3f5b27 100644 --- a/pkg/kadm/metadata.go +++ b/pkg/kadm/metadata.go @@ -85,6 +85,8 @@ type TopicDetail struct { IsInternal bool // IsInternal is whether the topic is an internal topic. Partitions PartitionDetails // Partitions contains details about the topic's partitions. + AuthorizedOperations []ACLOperation // AuthorizedOperations contains operations the requesting client is allowed to perform on this topic. + Err error // Err is non-nil if the topic could not be loaded. } @@ -183,10 +185,11 @@ func (ds TopicDetails) TopicsList() TopicsList { // Metadata is the data from a metadata response. type Metadata struct { - Cluster string // Cluster is the cluster name, if any. - Controller int32 // Controller is the node ID of the controller broker, if available, otherwise -1. - Brokers BrokerDetails // Brokers contains broker details, sorted by default. - Topics TopicDetails // Topics contains topic details. + Cluster string // Cluster is the cluster name, if any. + Controller int32 // Controller is the node ID of the controller broker, if available, otherwise -1. + Brokers BrokerDetails // Brokers contains broker details, sorted by default. + Topics TopicDetails // Topics contains topic details. + AuthorizedOperations []ACLOperation // AuthorizedOperations contains operations the requesting client is allowed to perform, if the client has Describe permissions on the cluster. } func int32s(is []int32) []int32 { @@ -226,6 +229,8 @@ func (cl *Client) Metadata( func (cl *Client) metadata(ctx context.Context, noTopics bool, topics []string) (Metadata, error) { req := kmsg.NewPtrMetadataRequest() + req.IncludeClusterAuthorizedOperations = true + req.IncludeTopicAuthorizedOperations = true for _, t := range topics { rt := kmsg.NewMetadataRequestTopic() rt.Topic = kmsg.StringPtr(t) @@ -245,10 +250,11 @@ func (cl *Client) metadata(ctx context.Context, noTopics bool, topics []string) return Metadata{}, err } td := TopicDetail{ - ID: t.TopicID, - Partitions: make(map[int32]PartitionDetail), - IsInternal: t.IsInternal, - Err: kerr.ErrorForCode(t.ErrorCode), + ID: t.TopicID, + Partitions: make(map[int32]PartitionDetail), + IsInternal: t.IsInternal, + AuthorizedOperations: DecodeACLOperations(t.AuthorizedOperations), + Err: kerr.ErrorForCode(t.ErrorCode), } if t.Topic != nil { td.Topic = *t.Topic @@ -271,8 +277,9 @@ func (cl *Client) metadata(ctx context.Context, noTopics bool, topics []string) } m := Metadata{ - Controller: resp.ControllerID, - Topics: tds, + Controller: resp.ControllerID, + Topics: tds, + AuthorizedOperations: DecodeACLOperations(resp.AuthorizedOperations), } if resp.ClusterID != nil { m.Cluster = *resp.ClusterID