Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -1294,10 +1294,11 @@ func (ca *clusterAdmin) AlterUserScramCredentials(u []AlterUserScramCredentialsU
// Contains components: strict = false
// Contains only components: strict = true
func (ca *clusterAdmin) DescribeClientQuotas(components []QuotaFilterComponent, strict bool) ([]DescribeClientQuotasEntry, error) {
request := &DescribeClientQuotasRequest{
Components: components,
Strict: strict,
}
request := NewDescribeClientQuotasRequest(
ca.conf.Version,
components,
strict,
)

b, err := ca.Controller()
if err != nil {
Expand Down
56 changes: 50 additions & 6 deletions describe_client_quotas_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,15 @@ package sarama
// match_type => INT8
// match => NULLABLE_STRING
// strict => BOOLEAN
// DescribeClientQuotas Request (Version: 1) => [components] strict _tagged_fields
// components => entity_type match_type match _tagged_fields
// entity_type => COMPACT_STRING
// match_type => INT8
// match => COMPACT_NULLABLE_STRING
// strict => BOOLEAN

// A filter to be applied to matching client quotas.
// DescribeClientQuotasRequest contains a filter to be applied to matching
// client quotas.
// Components: the components to filter on
// Strict: whether the filter only includes specified components
type DescribeClientQuotasRequest struct {
Expand All @@ -16,11 +23,22 @@ type DescribeClientQuotasRequest struct {
Strict bool
}

func NewDescribeClientQuotasRequest(version KafkaVersion, components []QuotaFilterComponent, strict bool) *DescribeClientQuotasRequest {
d := &DescribeClientQuotasRequest{
Components: components,
Strict: strict,
}
if version.IsAtLeast(V2_8_0_0) {
d.Version = 1
}
return d
}

func (d *DescribeClientQuotasRequest) setVersion(v int16) {
d.Version = v
}

// Describe a component for applying a client quota filter.
// QuotaFilterComponent describes a component for applying a client quota filter.
// EntityType: the entity type the filter component applies to ("user", "client-id", "ip")
// MatchType: the match type of the filter component (any, exact, default)
// Match: the name that's matched exactly (used when MatchType is QuotaMatchExact)
Expand All @@ -44,6 +62,8 @@ func (d *DescribeClientQuotasRequest) encode(pe packetEncoder) error {
// Strict
pe.putBool(d.Strict)

pe.putEmptyTaggedFieldArray()

return nil
}

Expand Down Expand Up @@ -73,7 +93,8 @@ func (d *DescribeClientQuotasRequest) decode(pd packetDecoder, version int16) er
}
d.Strict = strict

return nil
_, err = pd.getEmptyTaggedFieldArray()
return err
}

func (d *QuotaFilterComponent) encode(pe packetEncoder) error {
Expand All @@ -100,6 +121,8 @@ func (d *QuotaFilterComponent) encode(pe packetEncoder) error {
}
}

pe.putEmptyTaggedFieldArray()

return nil
}

Expand All @@ -126,7 +149,9 @@ func (d *QuotaFilterComponent) decode(pd packetDecoder, version int16) error {
if match != nil {
d.Match = *match
}
return nil

_, err = pd.getEmptyTaggedFieldArray()
return err
}

func (d *DescribeClientQuotasRequest) key() int16 {
Expand All @@ -138,13 +163,32 @@ func (d *DescribeClientQuotasRequest) version() int16 {
}

func (d *DescribeClientQuotasRequest) headerVersion() int16 {
if d.Version >= 1 {
return 2
}

return 1
}

func (d *DescribeClientQuotasRequest) isValidVersion() bool {
return d.Version == 0
return d.Version >= 0 && d.Version <= 1
}

func (d *DescribeClientQuotasRequest) isFlexible() bool {
return d.isFlexibleVersion(d.Version)
}

func (d *DescribeClientQuotasRequest) isFlexibleVersion(version int16) bool {
return version >= 1
}

func (d *DescribeClientQuotasRequest) requiredVersion() KafkaVersion {
return V2_6_0_0
switch d.Version {
case 1:
return V2_8_0_0
case 0:
return V2_6_0_0
default:
return V2_8_0_0
}
}
24 changes: 24 additions & 0 deletions describe_client_quotas_request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,16 @@ var (
255, 255, // match *string
0, // strict
}

describeClientQuotasV1 = []byte{
0x02, // components len
0x05, 'u', 's', 'e', 'r', // entity type
0x01, // match type (default name)
0x00, // match (NULL)
0x00, // empty tagged fields
0x01, // strict (true)
0x00, // empty tagged fields,
}
)

func TestDescribeClientQuotasRequest(t *testing.T) {
Expand Down Expand Up @@ -84,3 +94,17 @@ func TestDescribeClientQuotasRequest(t *testing.T) {
}
testRequest(t, "Match default client-id of any user", req, describeClientQuotasRequestMultiComponents)
}

func TestDescribeClientQuotasRequestV1(t *testing.T) {
req := &DescribeClientQuotasRequest{
Version: 1,
Components: []QuotaFilterComponent{
{
EntityType: "user",
MatchType: 1,
},
},
Strict: true,
}
testRequest(t, "V1", req, describeClientQuotasV1)
}
51 changes: 46 additions & 5 deletions describe_client_quotas_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,17 @@ import (
// values => key value
// key => STRING
// value => FLOAT64
// DescribeClientQuotas Response (Version: 1) => throttle_time_ms error_code error_message [entries] _tagged_fields
// throttle_time_ms => INT32
// error_code => INT16
// error_message => COMPACT_NULLABLE_STRING
// entries => [entity] [values] _tagged_fields
// entity => entity_type entity_name _tagged_fields
// entity_type => COMPACT_STRING
// entity_name => COMPACT_NULLABLE_STRING
// values => key value _tagged_fields
// key => COMPACT_STRING
// value => FLOAT64

type DescribeClientQuotasResponse struct {
Version int16
Expand Down Expand Up @@ -61,6 +72,7 @@ func (d *DescribeClientQuotasResponse) encode(pe packetEncoder) error {
}
}

pe.putEmptyTaggedFieldArray()
return nil
}

Expand Down Expand Up @@ -100,7 +112,8 @@ func (d *DescribeClientQuotasResponse) decode(pd packetDecoder, version int16) (
d.Entries = []DescribeClientQuotasEntry{}
}

return nil
_, err = pd.getEmptyTaggedFieldArray()
return err
}

func (d *DescribeClientQuotasEntry) encode(pe packetEncoder) error {
Expand All @@ -125,8 +138,10 @@ func (d *DescribeClientQuotasEntry) encode(pe packetEncoder) error {
}
// value
pe.putFloat64(value)
pe.putEmptyTaggedFieldArray()
}

pe.putEmptyTaggedFieldArray()
return nil
}

Expand Down Expand Up @@ -168,12 +183,17 @@ func (d *DescribeClientQuotasEntry) decode(pd packetDecoder, version int16) erro
return err
}
d.Values[key] = value
_, err = pd.getEmptyTaggedFieldArray()
if err != nil {
return err
}
}
} else {
d.Values = map[string]float64{}
}

return nil
_, err = pd.getEmptyTaggedFieldArray()
return err
}

func (c *QuotaEntityComponent) encode(pe packetEncoder) error {
Expand All @@ -192,6 +212,7 @@ func (c *QuotaEntityComponent) encode(pe packetEncoder) error {
}
}

pe.putEmptyTaggedFieldArray()
return nil
}

Expand All @@ -216,7 +237,8 @@ func (c *QuotaEntityComponent) decode(pd packetDecoder, version int16) error {
c.Name = *entityName
}

return nil
_, err = pd.getEmptyTaggedFieldArray()
return err
}

func (d *DescribeClientQuotasResponse) key() int16 {
Expand All @@ -228,15 +250,34 @@ func (d *DescribeClientQuotasResponse) version() int16 {
}

func (d *DescribeClientQuotasResponse) headerVersion() int16 {
if d.Version >= 1 {
return 1
}

return 0
}

func (d *DescribeClientQuotasResponse) isValidVersion() bool {
return d.Version == 0
return d.Version >= 0 && d.Version <= 1
}

func (d *DescribeClientQuotasResponse) isFlexible() bool {
return d.isFlexibleVersion(d.Version)
}

func (d *DescribeClientQuotasResponse) isFlexibleVersion(version int16) bool {
return version >= 1
}

func (d *DescribeClientQuotasResponse) requiredVersion() KafkaVersion {
return V2_6_0_0
switch d.Version {
case 1:
return V2_8_0_0
case 0:
return V2_6_0_0
default:
return V2_8_0_0
}
}

func (r *DescribeClientQuotasResponse) throttleTime() time.Duration {
Expand Down
45 changes: 44 additions & 1 deletion describe_client_quotas_response_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,12 @@

package sarama

import "testing"
import (
"testing"
"time"

assert "github.com/stretchr/testify/require"
)

var (
describeClientQuotasResponseError = []byte{
Expand Down Expand Up @@ -43,6 +48,24 @@ var (
0, 18, 'c', 'o', 'n', 's', 'u', 'm', 'e', 'r', '_', 'b', 'y', 't', 'e', '_', 'r', 'a', 't', 'e',
65, 46, 132, 128, 0, 0, 0, 0, // 1000000
}

describeClientQuotasResponseV1 = []byte{
0x00, 0x00, 0x00, 0x80, // ThrottleTime (128ms)
0x00, 0x00, // ErrorCode
0x01, // ErrorMsg (nil)
0x02, // Entries (2)
0x02,
0x05, 'u', 's', 'e', 'r', // Entity Type
0x00, // Entity Name (nil)
0x00, // Empty tagged fields
0x02,
// 0x13, 0x70, 0x72, 0x6f, 0x64, 0x75, 0x63, 0x65, 0x72, 0x5f, 0x62, 0x79, 0x74, 0x65, 0x5f, 0x72, 0x61, 0x74, 0x65,
0x13, 'p', 'r', 'o', 'd', 'u', 'c', 'e', 'r', '_', 'b', 'y', 't', 'e', '_', 'r', 'a', 't', 'e',
0x41, 0x2f, 0x40, 0x00, 0x00, 0x00, 0x00, 0x00, // 1024000
0x00, // Empty tagged fields
0x00, // Empty tagged fields
0x00, // Empty tagged fields
}
)

func TestDescribeClientQuotasResponse(t *testing.T) {
Expand Down Expand Up @@ -95,3 +118,23 @@ func TestDescribeClientQuotasResponse(t *testing.T) {
}
testResponse(t, "Complex Quota", res, describeClientQuotasResponseComplexEntity)
}

func TestDescribeClientQuotasResponseV1(t *testing.T) {
res := &DescribeClientQuotasResponse{Version: 1}
testVersionDecodable(t, "V1", res, describeClientQuotasResponseV1, 1)
assert.Equal(t, res.ThrottleTime, time.Millisecond*128)
assert.Len(t, res.Entries, 1)
assert.Equal(t, []DescribeClientQuotasEntry{
{
Entity: []QuotaEntityComponent{
{
EntityType: "user",
MatchType: 1,
},
},
Values: map[string]float64{
"producer_byte_rate": 1024000,
},
},
}, res.Entries)
}
Loading