diff --git a/admin.go b/admin.go index 08352b734..dda0201f6 100644 --- a/admin.go +++ b/admin.go @@ -1294,10 +1294,11 @@ func (ca *clusterAdmin) AlterUserScramCredentials(u []AlterUserScramCredentialsU // Contains components: strict = false // Contains only components: strict = true func (ca *clusterAdmin) DescribeClientQuotas(components []QuotaFilterComponent, strict bool) ([]DescribeClientQuotasEntry, error) { - request := &DescribeClientQuotasRequest{ - Components: components, - Strict: strict, - } + request := NewDescribeClientQuotasRequest( + ca.conf.Version, + components, + strict, + ) b, err := ca.Controller() if err != nil { diff --git a/describe_client_quotas_request.go b/describe_client_quotas_request.go index af366f465..120ed332e 100644 --- a/describe_client_quotas_request.go +++ b/describe_client_quotas_request.go @@ -6,8 +6,15 @@ package sarama // match_type => INT8 // match => NULLABLE_STRING // strict => BOOLEAN +// DescribeClientQuotas Request (Version: 1) => [components] strict _tagged_fields +// components => entity_type match_type match _tagged_fields +// entity_type => COMPACT_STRING +// match_type => INT8 +// match => COMPACT_NULLABLE_STRING +// strict => BOOLEAN -// A filter to be applied to matching client quotas. +// DescribeClientQuotasRequest contains a filter to be applied to matching +// client quotas. // Components: the components to filter on // Strict: whether the filter only includes specified components type DescribeClientQuotasRequest struct { @@ -16,11 +23,22 @@ type DescribeClientQuotasRequest struct { Strict bool } +func NewDescribeClientQuotasRequest(version KafkaVersion, components []QuotaFilterComponent, strict bool) *DescribeClientQuotasRequest { + d := &DescribeClientQuotasRequest{ + Components: components, + Strict: strict, + } + if version.IsAtLeast(V2_8_0_0) { + d.Version = 1 + } + return d +} + func (d *DescribeClientQuotasRequest) setVersion(v int16) { d.Version = v } -// Describe a component for applying a client quota filter. +// QuotaFilterComponent describes a component for applying a client quota filter. // EntityType: the entity type the filter component applies to ("user", "client-id", "ip") // MatchType: the match type of the filter component (any, exact, default) // Match: the name that's matched exactly (used when MatchType is QuotaMatchExact) @@ -44,6 +62,8 @@ func (d *DescribeClientQuotasRequest) encode(pe packetEncoder) error { // Strict pe.putBool(d.Strict) + pe.putEmptyTaggedFieldArray() + return nil } @@ -73,7 +93,8 @@ func (d *DescribeClientQuotasRequest) decode(pd packetDecoder, version int16) er } d.Strict = strict - return nil + _, err = pd.getEmptyTaggedFieldArray() + return err } func (d *QuotaFilterComponent) encode(pe packetEncoder) error { @@ -100,6 +121,8 @@ func (d *QuotaFilterComponent) encode(pe packetEncoder) error { } } + pe.putEmptyTaggedFieldArray() + return nil } @@ -126,7 +149,9 @@ func (d *QuotaFilterComponent) decode(pd packetDecoder, version int16) error { if match != nil { d.Match = *match } - return nil + + _, err = pd.getEmptyTaggedFieldArray() + return err } func (d *DescribeClientQuotasRequest) key() int16 { @@ -138,13 +163,32 @@ func (d *DescribeClientQuotasRequest) version() int16 { } func (d *DescribeClientQuotasRequest) headerVersion() int16 { + if d.Version >= 1 { + return 2 + } + return 1 } func (d *DescribeClientQuotasRequest) isValidVersion() bool { - return d.Version == 0 + return d.Version >= 0 && d.Version <= 1 +} + +func (d *DescribeClientQuotasRequest) isFlexible() bool { + return d.isFlexibleVersion(d.Version) +} + +func (d *DescribeClientQuotasRequest) isFlexibleVersion(version int16) bool { + return version >= 1 } func (d *DescribeClientQuotasRequest) requiredVersion() KafkaVersion { - return V2_6_0_0 + switch d.Version { + case 1: + return V2_8_0_0 + case 0: + return V2_6_0_0 + default: + return V2_8_0_0 + } } diff --git a/describe_client_quotas_request_test.go b/describe_client_quotas_request_test.go index 4bd57581d..32c814345 100644 --- a/describe_client_quotas_request_test.go +++ b/describe_client_quotas_request_test.go @@ -36,6 +36,16 @@ var ( 255, 255, // match *string 0, // strict } + + describeClientQuotasV1 = []byte{ + 0x02, // components len + 0x05, 'u', 's', 'e', 'r', // entity type + 0x01, // match type (default name) + 0x00, // match (NULL) + 0x00, // empty tagged fields + 0x01, // strict (true) + 0x00, // empty tagged fields, + } ) func TestDescribeClientQuotasRequest(t *testing.T) { @@ -84,3 +94,17 @@ func TestDescribeClientQuotasRequest(t *testing.T) { } testRequest(t, "Match default client-id of any user", req, describeClientQuotasRequestMultiComponents) } + +func TestDescribeClientQuotasRequestV1(t *testing.T) { + req := &DescribeClientQuotasRequest{ + Version: 1, + Components: []QuotaFilterComponent{ + { + EntityType: "user", + MatchType: 1, + }, + }, + Strict: true, + } + testRequest(t, "V1", req, describeClientQuotasV1) +} diff --git a/describe_client_quotas_response.go b/describe_client_quotas_response.go index eab7e58ce..4cf9e5b65 100644 --- a/describe_client_quotas_response.go +++ b/describe_client_quotas_response.go @@ -15,6 +15,17 @@ import ( // values => key value // key => STRING // value => FLOAT64 +// DescribeClientQuotas Response (Version: 1) => throttle_time_ms error_code error_message [entries] _tagged_fields +// throttle_time_ms => INT32 +// error_code => INT16 +// error_message => COMPACT_NULLABLE_STRING +// entries => [entity] [values] _tagged_fields +// entity => entity_type entity_name _tagged_fields +// entity_type => COMPACT_STRING +// entity_name => COMPACT_NULLABLE_STRING +// values => key value _tagged_fields +// key => COMPACT_STRING +// value => FLOAT64 type DescribeClientQuotasResponse struct { Version int16 @@ -61,6 +72,7 @@ func (d *DescribeClientQuotasResponse) encode(pe packetEncoder) error { } } + pe.putEmptyTaggedFieldArray() return nil } @@ -100,7 +112,8 @@ func (d *DescribeClientQuotasResponse) decode(pd packetDecoder, version int16) ( d.Entries = []DescribeClientQuotasEntry{} } - return nil + _, err = pd.getEmptyTaggedFieldArray() + return err } func (d *DescribeClientQuotasEntry) encode(pe packetEncoder) error { @@ -125,8 +138,10 @@ func (d *DescribeClientQuotasEntry) encode(pe packetEncoder) error { } // value pe.putFloat64(value) + pe.putEmptyTaggedFieldArray() } + pe.putEmptyTaggedFieldArray() return nil } @@ -168,12 +183,17 @@ func (d *DescribeClientQuotasEntry) decode(pd packetDecoder, version int16) erro return err } d.Values[key] = value + _, err = pd.getEmptyTaggedFieldArray() + if err != nil { + return err + } } } else { d.Values = map[string]float64{} } - return nil + _, err = pd.getEmptyTaggedFieldArray() + return err } func (c *QuotaEntityComponent) encode(pe packetEncoder) error { @@ -192,6 +212,7 @@ func (c *QuotaEntityComponent) encode(pe packetEncoder) error { } } + pe.putEmptyTaggedFieldArray() return nil } @@ -216,7 +237,8 @@ func (c *QuotaEntityComponent) decode(pd packetDecoder, version int16) error { c.Name = *entityName } - return nil + _, err = pd.getEmptyTaggedFieldArray() + return err } func (d *DescribeClientQuotasResponse) key() int16 { @@ -228,15 +250,34 @@ func (d *DescribeClientQuotasResponse) version() int16 { } func (d *DescribeClientQuotasResponse) headerVersion() int16 { + if d.Version >= 1 { + return 1 + } + return 0 } func (d *DescribeClientQuotasResponse) isValidVersion() bool { - return d.Version == 0 + return d.Version >= 0 && d.Version <= 1 +} + +func (d *DescribeClientQuotasResponse) isFlexible() bool { + return d.isFlexibleVersion(d.Version) +} + +func (d *DescribeClientQuotasResponse) isFlexibleVersion(version int16) bool { + return version >= 1 } func (d *DescribeClientQuotasResponse) requiredVersion() KafkaVersion { - return V2_6_0_0 + switch d.Version { + case 1: + return V2_8_0_0 + case 0: + return V2_6_0_0 + default: + return V2_8_0_0 + } } func (r *DescribeClientQuotasResponse) throttleTime() time.Duration { diff --git a/describe_client_quotas_response_test.go b/describe_client_quotas_response_test.go index 1c9d6d36c..94b941942 100644 --- a/describe_client_quotas_response_test.go +++ b/describe_client_quotas_response_test.go @@ -2,7 +2,12 @@ package sarama -import "testing" +import ( + "testing" + "time" + + assert "github.com/stretchr/testify/require" +) var ( describeClientQuotasResponseError = []byte{ @@ -43,6 +48,24 @@ var ( 0, 18, 'c', 'o', 'n', 's', 'u', 'm', 'e', 'r', '_', 'b', 'y', 't', 'e', '_', 'r', 'a', 't', 'e', 65, 46, 132, 128, 0, 0, 0, 0, // 1000000 } + + describeClientQuotasResponseV1 = []byte{ + 0x00, 0x00, 0x00, 0x80, // ThrottleTime (128ms) + 0x00, 0x00, // ErrorCode + 0x01, // ErrorMsg (nil) + 0x02, // Entries (2) + 0x02, + 0x05, 'u', 's', 'e', 'r', // Entity Type + 0x00, // Entity Name (nil) + 0x00, // Empty tagged fields + 0x02, + // 0x13, 0x70, 0x72, 0x6f, 0x64, 0x75, 0x63, 0x65, 0x72, 0x5f, 0x62, 0x79, 0x74, 0x65, 0x5f, 0x72, 0x61, 0x74, 0x65, + 0x13, 'p', 'r', 'o', 'd', 'u', 'c', 'e', 'r', '_', 'b', 'y', 't', 'e', '_', 'r', 'a', 't', 'e', + 0x41, 0x2f, 0x40, 0x00, 0x00, 0x00, 0x00, 0x00, // 1024000 + 0x00, // Empty tagged fields + 0x00, // Empty tagged fields + 0x00, // Empty tagged fields + } ) func TestDescribeClientQuotasResponse(t *testing.T) { @@ -95,3 +118,23 @@ func TestDescribeClientQuotasResponse(t *testing.T) { } testResponse(t, "Complex Quota", res, describeClientQuotasResponseComplexEntity) } + +func TestDescribeClientQuotasResponseV1(t *testing.T) { + res := &DescribeClientQuotasResponse{Version: 1} + testVersionDecodable(t, "V1", res, describeClientQuotasResponseV1, 1) + assert.Equal(t, res.ThrottleTime, time.Millisecond*128) + assert.Len(t, res.Entries, 1) + assert.Equal(t, []DescribeClientQuotasEntry{ + { + Entity: []QuotaEntityComponent{ + { + EntityType: "user", + MatchType: 1, + }, + }, + Values: map[string]float64{ + "producer_byte_rate": 1024000, + }, + }, + }, res.Entries) +}