Skip to content

Commit 6e46431

Browse files
committed
Add Incremental Config updates API
1 parent 03b4a4f commit 6e46431

7 files changed

+439
-28
lines changed

alter_configs_response.go

+43-28
Original file line numberDiff line numberDiff line change
@@ -23,16 +23,9 @@ func (a *AlterConfigsResponse) encode(pe packetEncoder) error {
2323
return err
2424
}
2525

26-
for i := range a.Resources {
27-
pe.putInt16(a.Resources[i].ErrorCode)
28-
err := pe.putString(a.Resources[i].ErrorMsg)
29-
if err != nil {
30-
return nil
31-
}
32-
pe.putInt8(int8(a.Resources[i].Type))
33-
err = pe.putString(a.Resources[i].Name)
34-
if err != nil {
35-
return nil
26+
for _, v := range a.Resources {
27+
if err := v.encode(pe); err != nil {
28+
return err
3629
}
3730
}
3831

@@ -56,30 +49,52 @@ func (a *AlterConfigsResponse) decode(pd packetDecoder, version int16) error {
5649
for i := range a.Resources {
5750
a.Resources[i] = new(AlterConfigsResourceResponse)
5851

59-
errCode, err := pd.getInt16()
60-
if err != nil {
52+
if err := a.Resources[i].decode(pd, version); err != nil {
6153
return err
6254
}
63-
a.Resources[i].ErrorCode = errCode
55+
}
6456

65-
e, err := pd.getString()
66-
if err != nil {
67-
return err
68-
}
69-
a.Resources[i].ErrorMsg = e
57+
return nil
58+
}
7059

71-
t, err := pd.getInt8()
72-
if err != nil {
73-
return err
74-
}
75-
a.Resources[i].Type = ConfigResourceType(t)
60+
func (a *AlterConfigsResourceResponse) encode(pe packetEncoder) error {
61+
pe.putInt16(a.ErrorCode)
62+
err := pe.putString(a.ErrorMsg)
63+
if err != nil {
64+
return nil
65+
}
66+
pe.putInt8(int8(a.Type))
67+
err = pe.putString(a.Name)
68+
if err != nil {
69+
return nil
70+
}
71+
return nil
72+
}
7673

77-
name, err := pd.getString()
78-
if err != nil {
79-
return err
80-
}
81-
a.Resources[i].Name = name
74+
func (a *AlterConfigsResourceResponse) decode(pd packetDecoder, version int16) error {
75+
errCode, err := pd.getInt16()
76+
if err != nil {
77+
return err
78+
}
79+
a.ErrorCode = errCode
80+
81+
e, err := pd.getString()
82+
if err != nil {
83+
return err
84+
}
85+
a.ErrorMsg = e
86+
87+
t, err := pd.getInt8()
88+
if err != nil {
89+
return err
90+
}
91+
a.Type = ConfigResourceType(t)
92+
93+
name, err := pd.getString()
94+
if err != nil {
95+
return err
8296
}
97+
a.Name = name
8398

8499
return nil
85100
}

broker.go

+12
Original file line numberDiff line numberDiff line change
@@ -666,6 +666,18 @@ func (b *Broker) AlterConfigs(request *AlterConfigsRequest) (*AlterConfigsRespon
666666
return response, nil
667667
}
668668

669+
// IncrementalAlterConfigs sends a request to incremental alter config and return a response or error
670+
func (b *Broker) IncrementalAlterConfigs(request *IncrementalAlterConfigsRequest) (*IncrementalAlterConfigsResponse, error) {
671+
response := new(IncrementalAlterConfigsResponse)
672+
673+
err := b.sendAndReceive(request, response)
674+
if err != nil {
675+
return nil, err
676+
}
677+
678+
return response, nil
679+
}
680+
669681
// DeleteGroups sends a request to delete groups and returns a response or error
670682
func (b *Broker) DeleteGroups(request *DeleteGroupsRequest) (*DeleteGroupsResponse, error) {
671683
response := new(DeleteGroupsResponse)

incremental_alter_configs_request.go

+173
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,173 @@
1+
package sarama
2+
3+
type IncrementalAlterConfigsOperation int8
4+
5+
const (
6+
IncrementalAlterConfigsOperationSet IncrementalAlterConfigsOperation = iota
7+
IncrementalAlterConfigsOperationDelete
8+
IncrementalAlterConfigsOperationAppend
9+
IncrementalAlterConfigsOperationSubtract
10+
)
11+
12+
// IncrementalAlterConfigsRequest is an incremental alter config request type
13+
type IncrementalAlterConfigsRequest struct {
14+
Resources []*IncrementalAlterConfigsResource
15+
ValidateOnly bool
16+
}
17+
18+
type IncrementalAlterConfigsResource struct {
19+
Type ConfigResourceType
20+
Name string
21+
ConfigEntries map[string]IncrementalAlterConfigsEntry
22+
}
23+
24+
type IncrementalAlterConfigsEntry struct {
25+
Operation IncrementalAlterConfigsOperation
26+
Value *string
27+
}
28+
29+
func (a *IncrementalAlterConfigsRequest) encode(pe packetEncoder) error {
30+
if err := pe.putArrayLength(len(a.Resources)); err != nil {
31+
return err
32+
}
33+
34+
for _, r := range a.Resources {
35+
if err := r.encode(pe); err != nil {
36+
return err
37+
}
38+
}
39+
40+
pe.putBool(a.ValidateOnly)
41+
return nil
42+
}
43+
44+
func (a *IncrementalAlterConfigsRequest) decode(pd packetDecoder, version int16) error {
45+
resourceCount, err := pd.getArrayLength()
46+
if err != nil {
47+
return err
48+
}
49+
50+
a.Resources = make([]*IncrementalAlterConfigsResource, resourceCount)
51+
for i := range a.Resources {
52+
r := &IncrementalAlterConfigsResource{}
53+
err = r.decode(pd, version)
54+
if err != nil {
55+
return err
56+
}
57+
a.Resources[i] = r
58+
}
59+
60+
validateOnly, err := pd.getBool()
61+
if err != nil {
62+
return err
63+
}
64+
65+
a.ValidateOnly = validateOnly
66+
67+
return nil
68+
}
69+
70+
func (a *IncrementalAlterConfigsResource) encode(pe packetEncoder) error {
71+
pe.putInt8(int8(a.Type))
72+
73+
if err := pe.putString(a.Name); err != nil {
74+
return err
75+
}
76+
77+
if err := pe.putArrayLength(len(a.ConfigEntries)); err != nil {
78+
return err
79+
}
80+
81+
for name, e := range a.ConfigEntries {
82+
if err := pe.putString(name); err != nil {
83+
return err
84+
}
85+
86+
if err := e.encode(pe); err != nil {
87+
return err
88+
}
89+
}
90+
91+
return nil
92+
}
93+
94+
func (a *IncrementalAlterConfigsResource) decode(pd packetDecoder, version int16) error {
95+
t, err := pd.getInt8()
96+
if err != nil {
97+
return err
98+
}
99+
a.Type = ConfigResourceType(t)
100+
101+
name, err := pd.getString()
102+
if err != nil {
103+
return err
104+
}
105+
a.Name = name
106+
107+
n, err := pd.getArrayLength()
108+
if err != nil {
109+
return err
110+
}
111+
112+
if n > 0 {
113+
a.ConfigEntries = make(map[string]IncrementalAlterConfigsEntry, n)
114+
for i := 0; i < n; i++ {
115+
name, err := pd.getString()
116+
if err != nil {
117+
return err
118+
}
119+
120+
var v IncrementalAlterConfigsEntry
121+
122+
if err := v.decode(pd, version); err != nil {
123+
return err
124+
}
125+
126+
a.ConfigEntries[name] = v
127+
}
128+
}
129+
return err
130+
}
131+
132+
func (a *IncrementalAlterConfigsEntry) encode(pe packetEncoder) error {
133+
pe.putInt8(int8(a.Operation))
134+
135+
if err := pe.putNullableString(a.Value); err != nil {
136+
return err
137+
}
138+
139+
return nil
140+
}
141+
142+
func (a *IncrementalAlterConfigsEntry) decode(pd packetDecoder, version int16) error {
143+
t, err := pd.getInt8()
144+
if err != nil {
145+
return err
146+
}
147+
a.Operation = IncrementalAlterConfigsOperation(t)
148+
149+
s, err := pd.getNullableString()
150+
if err != nil {
151+
return err
152+
}
153+
154+
a.Value = s
155+
156+
return nil
157+
}
158+
159+
func (a *IncrementalAlterConfigsRequest) key() int16 {
160+
return 44
161+
}
162+
163+
func (a *IncrementalAlterConfigsRequest) version() int16 {
164+
return 0
165+
}
166+
167+
func (a *IncrementalAlterConfigsRequest) headerVersion() int16 {
168+
return 1
169+
}
170+
171+
func (a *IncrementalAlterConfigsRequest) requiredVersion() KafkaVersion {
172+
return V2_3_0_0
173+
}
+98
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
package sarama
2+
3+
import "testing"
4+
5+
var (
6+
emptyIncrementalAlterConfigsRequest = []byte{
7+
0, 0, 0, 0, // 0 configs
8+
0, // don't Validate
9+
}
10+
11+
singleIncrementalAlterConfigsRequest = []byte{
12+
0, 0, 0, 1, // 1 config
13+
2, // a topic
14+
0, 3, 'f', 'o', 'o', // topic name: foo
15+
0, 0, 0, 1, // 1 config name
16+
0, 10, // 10 chars
17+
's', 'e', 'g', 'm', 'e', 'n', 't', '.', 'm', 's',
18+
0, // OperationSet
19+
0, 4,
20+
'1', '0', '0', '0',
21+
0, // don't validate
22+
}
23+
24+
doubleIncrementalAlterConfigsRequest = []byte{
25+
0, 0, 0, 2, // 2 config
26+
2, // a topic
27+
0, 3, 'f', 'o', 'o', // topic name: foo
28+
0, 0, 0, 1, // 1 config name
29+
0, 10, // 10 chars
30+
's', 'e', 'g', 'm', 'e', 'n', 't', '.', 'm', 's',
31+
0, // OperationSet
32+
0, 4,
33+
'1', '0', '0', '0',
34+
2, // a topic
35+
0, 3, 'b', 'a', 'r', // topic name: foo
36+
0, 0, 0, 1, // 2 config
37+
0, 12, // 12 chars
38+
'r', 'e', 't', 'e', 'n', 't', 'i', 'o', 'n', '.', 'm', 's',
39+
1, // OperationDelete
40+
0, 4,
41+
'1', '0', '0', '0',
42+
0, // don't validate
43+
}
44+
)
45+
46+
func TestIncrementalAlterConfigsRequest(t *testing.T) {
47+
var request *IncrementalAlterConfigsRequest
48+
49+
request = &IncrementalAlterConfigsRequest{
50+
Resources: []*IncrementalAlterConfigsResource{},
51+
}
52+
testRequest(t, "no requests", request, emptyIncrementalAlterConfigsRequest)
53+
54+
configValue := "1000"
55+
request = &IncrementalAlterConfigsRequest{
56+
Resources: []*IncrementalAlterConfigsResource{
57+
{
58+
Type: TopicResource,
59+
Name: "foo",
60+
ConfigEntries: map[string]IncrementalAlterConfigsEntry{
61+
"segment.ms": {
62+
Operation: IncrementalAlterConfigsOperationSet,
63+
Value: &configValue,
64+
},
65+
},
66+
},
67+
},
68+
}
69+
70+
testRequest(t, "one config", request, singleIncrementalAlterConfigsRequest)
71+
72+
request = &IncrementalAlterConfigsRequest{
73+
Resources: []*IncrementalAlterConfigsResource{
74+
{
75+
Type: TopicResource,
76+
Name: "foo",
77+
ConfigEntries: map[string]IncrementalAlterConfigsEntry{
78+
"segment.ms": {
79+
Operation: IncrementalAlterConfigsOperationSet,
80+
Value: &configValue,
81+
},
82+
},
83+
},
84+
{
85+
Type: TopicResource,
86+
Name: "bar",
87+
ConfigEntries: map[string]IncrementalAlterConfigsEntry{
88+
"retention.ms": {
89+
Operation: IncrementalAlterConfigsOperationDelete,
90+
Value: &configValue,
91+
},
92+
},
93+
},
94+
},
95+
}
96+
97+
testRequest(t, "two configs", request, doubleIncrementalAlterConfigsRequest)
98+
}

0 commit comments

Comments
 (0)