Skip to content

Commit 5ce40d4

Browse files
committed
Rework and removed useless pointers
1 parent 839d956 commit 5ce40d4

17 files changed

+1280
-0
lines changed

admin.go

+67
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,14 @@ type ClusterAdmin interface {
113113
// Upsert SCRAM users
114114
UpsertUserScramCredentials(upsert []AlterUserScramCredentialsUpsert) ([]*AlterUserScramCredentialsResult, error)
115115

116+
// Get client quota configurations corresponding to the specified filter.
117+
// This operation is supported by brokers with version 2.6.0.0 or higher.
118+
DescribeClientQuotas(components []QuotaFilterComponent, strict bool) ([]DescribeClientQuotasEntry, error)
119+
120+
// Alters client quota configurations with the specified alterations.
121+
// This operation is supported by brokers with version 2.6.0.0 or higher.
122+
AlterClientQuotas(entity []QuotaEntityComponent, op ClientQuotasOp, validateOnly bool) error
123+
116124
// Close shuts down the admin and closes underlying client.
117125
Close() error
118126
}
@@ -1004,3 +1012,62 @@ func (ca *clusterAdmin) AlterUserScramCredentials(u []AlterUserScramCredentialsU
10041012

10051013
return rsp.Results, nil
10061014
}
1015+
1016+
// Describe All : use an empty/nil components slice + strict = false
1017+
// Contains components: strict = false
1018+
// Contains only components: strict = true
1019+
func (ca *clusterAdmin) DescribeClientQuotas(components []QuotaFilterComponent, strict bool) ([]DescribeClientQuotasEntry, error) {
1020+
request := &DescribeClientQuotasRequest{
1021+
Components: components,
1022+
Strict: strict,
1023+
}
1024+
1025+
b, err := ca.Controller()
1026+
if err != nil {
1027+
return nil, err
1028+
}
1029+
1030+
rsp, err := b.DescribeClientQuotas(request)
1031+
if err != nil {
1032+
return nil, err
1033+
}
1034+
1035+
if rsp.ErrorMsg != nil {
1036+
return nil, errors.New(*rsp.ErrorMsg)
1037+
}
1038+
if rsp.ErrorCode != ErrNoError {
1039+
return nil, rsp.ErrorCode
1040+
}
1041+
1042+
return rsp.Entries, nil
1043+
}
1044+
1045+
func (ca *clusterAdmin) AlterClientQuotas(entity []QuotaEntityComponent, op ClientQuotasOp, validateOnly bool) error {
1046+
entry := AlterClientQuotasEntry{
1047+
Entity: entity,
1048+
Ops: []ClientQuotasOp{op},
1049+
}
1050+
1051+
request := &AlterClientQuotasRequest{
1052+
Entries: []AlterClientQuotasEntry{entry},
1053+
ValidateOnly: validateOnly,
1054+
}
1055+
1056+
b, err := ca.Controller()
1057+
if err != nil {
1058+
return err
1059+
}
1060+
1061+
rsp, err := b.AlterClientQuotas(request)
1062+
if err != nil {
1063+
return err
1064+
}
1065+
1066+
for _, entry := range rsp.Entries {
1067+
if entry.ErrorCode != ErrNoError {
1068+
return entry.ErrorCode
1069+
}
1070+
}
1071+
1072+
return nil
1073+
}

alter_client_quotas_request.go

+194
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,194 @@
1+
package sarama
2+
3+
// AlterClientQuotas Request (Version: 0) => [entries] validate_only
4+
// entries => [entity] [ops]
5+
// entity => entity_type entity_name
6+
// entity_type => STRING
7+
// entity_name => NULLABLE_STRING
8+
// ops => key value remove
9+
// key => STRING
10+
// value => FLOAT64
11+
// remove => BOOLEAN
12+
// validate_only => BOOLEAN
13+
14+
type AlterClientQuotasRequest struct {
15+
Entries []AlterClientQuotasEntry // The quota configuration entries to alter.
16+
ValidateOnly bool // Whether the alteration should be validated, but not performed.
17+
}
18+
19+
type AlterClientQuotasEntry struct {
20+
Entity []QuotaEntityComponent // The quota entity to alter.
21+
Ops []ClientQuotasOp // An individual quota configuration entry to alter.
22+
}
23+
24+
type ClientQuotasOp struct {
25+
Key string // The quota configuration key.
26+
Value float64 // The value to set, otherwise ignored if the value is to be removed.
27+
Remove bool // Whether the quota configuration value should be removed, otherwise set.
28+
}
29+
30+
func (a *AlterClientQuotasRequest) encode(pe packetEncoder) error {
31+
// Entries
32+
if err := pe.putArrayLength(len(a.Entries)); err != nil {
33+
return err
34+
}
35+
for _, e := range a.Entries {
36+
if err := e.encode(pe); err != nil {
37+
return err
38+
}
39+
}
40+
41+
// ValidateOnly
42+
pe.putBool(a.ValidateOnly)
43+
44+
return nil
45+
}
46+
47+
func (a *AlterClientQuotasRequest) decode(pd packetDecoder, version int16) error {
48+
// Entries
49+
entryCount, err := pd.getArrayLength()
50+
if err != nil {
51+
return err
52+
}
53+
if entryCount > 0 {
54+
a.Entries = make([]AlterClientQuotasEntry, entryCount)
55+
for i := range a.Entries {
56+
e := AlterClientQuotasEntry{}
57+
if err = e.decode(pd, version); err != nil {
58+
return err
59+
}
60+
a.Entries[i] = e
61+
}
62+
} else {
63+
a.Entries = []AlterClientQuotasEntry{}
64+
}
65+
66+
// ValidateOnly
67+
validateOnly, err := pd.getBool()
68+
if err != nil {
69+
return err
70+
}
71+
a.ValidateOnly = validateOnly
72+
73+
return nil
74+
}
75+
76+
func (a *AlterClientQuotasEntry) encode(pe packetEncoder) error {
77+
// Entity
78+
if err := pe.putArrayLength(len(a.Entity)); err != nil {
79+
return err
80+
}
81+
for _, component := range a.Entity {
82+
if err := component.encode(pe); err != nil {
83+
return err
84+
}
85+
}
86+
87+
// Ops
88+
if err := pe.putArrayLength(len(a.Ops)); err != nil {
89+
return err
90+
}
91+
for _, o := range a.Ops {
92+
if err := o.encode(pe); err != nil {
93+
return err
94+
}
95+
}
96+
97+
return nil
98+
}
99+
100+
func (a *AlterClientQuotasEntry) decode(pd packetDecoder, version int16) error {
101+
// Entity
102+
componentCount, err := pd.getArrayLength()
103+
if err != nil {
104+
return err
105+
}
106+
if componentCount > 0 {
107+
a.Entity = make([]QuotaEntityComponent, componentCount)
108+
for i := 0; i < componentCount; i++ {
109+
component := QuotaEntityComponent{}
110+
if err := component.decode(pd, version); err != nil {
111+
return err
112+
}
113+
a.Entity[i] = component
114+
}
115+
} else {
116+
a.Entity = []QuotaEntityComponent{}
117+
}
118+
119+
// Ops
120+
opCount, err := pd.getArrayLength()
121+
if err != nil {
122+
return err
123+
}
124+
if opCount > 0 {
125+
a.Ops = make([]ClientQuotasOp, opCount)
126+
for i := range a.Ops {
127+
c := ClientQuotasOp{}
128+
if err = c.decode(pd, version); err != nil {
129+
return err
130+
}
131+
a.Ops[i] = c
132+
}
133+
} else {
134+
a.Ops = []ClientQuotasOp{}
135+
}
136+
137+
return nil
138+
}
139+
140+
func (c *ClientQuotasOp) encode(pe packetEncoder) error {
141+
// Key
142+
if err := pe.putString(c.Key); err != nil {
143+
return err
144+
}
145+
146+
// Value
147+
pe.putFloat64(c.Value)
148+
149+
// Remove
150+
pe.putBool(c.Remove)
151+
152+
return nil
153+
}
154+
155+
func (c *ClientQuotasOp) decode(pd packetDecoder, version int16) error {
156+
// Key
157+
key, err := pd.getString()
158+
if err != nil {
159+
return err
160+
}
161+
c.Key = key
162+
163+
// Value
164+
value, err := pd.getFloat64()
165+
if err != nil {
166+
return err
167+
}
168+
c.Value = value
169+
170+
// Remove
171+
remove, err := pd.getBool()
172+
if err != nil {
173+
return err
174+
}
175+
c.Remove = remove
176+
177+
return nil
178+
}
179+
180+
func (a *AlterClientQuotasRequest) key() int16 {
181+
return 49
182+
}
183+
184+
func (a *AlterClientQuotasRequest) version() int16 {
185+
return 0
186+
}
187+
188+
func (a *AlterClientQuotasRequest) headerVersion() int16 {
189+
return 1
190+
}
191+
192+
func (a *AlterClientQuotasRequest) requiredVersion() KafkaVersion {
193+
return V2_6_0_0
194+
}

0 commit comments

Comments
 (0)