diff --git a/admin.go b/admin.go index 8f2a44d69..fdb111dcd 100644 --- a/admin.go +++ b/admin.go @@ -794,6 +794,10 @@ func (ca *clusterAdmin) IncrementalAlterConfig(resourceType ConfigResourceType, ValidateOnly: validateOnly, } + if ca.conf.Version.IsAtLeast(V2_4_0_0) { + request.Version = 1 + } + var ( b *Broker err error diff --git a/alter_configs_response.go b/alter_configs_response.go index 4c458ddad..d41bac59e 100644 --- a/alter_configs_response.go +++ b/alter_configs_response.go @@ -89,6 +89,7 @@ func (a *AlterConfigsResourceResponse) encode(pe packetEncoder) error { if err != nil { return err } + pe.putEmptyTaggedFieldArray() return nil } @@ -99,11 +100,15 @@ func (a *AlterConfigsResourceResponse) decode(pd packetDecoder, version int16) e } a.ErrorCode = errCode - e, err := pd.getString() + e, err := pd.getNullableString() if err != nil { return err } - a.ErrorMsg = e + if e == nil { + a.ErrorMsg = "" + } else { + a.ErrorMsg = *e + } t, err := pd.getInt8() if err != nil { @@ -117,7 +122,8 @@ func (a *AlterConfigsResourceResponse) decode(pd packetDecoder, version int16) e } a.Name = name - return nil + _, err = pd.getEmptyTaggedFieldArray() + return err } func (a *AlterConfigsResponse) key() int16 { diff --git a/functional_admin_test.go b/functional_admin_test.go index 203764edf..3b93f85a5 100644 --- a/functional_admin_test.go +++ b/functional_admin_test.go @@ -4,8 +4,10 @@ package sarama import ( "context" + "fmt" "maps" "slices" + "strconv" "testing" "time" @@ -440,3 +442,58 @@ func TestFuncAdminDeleteTopic(t *testing.T) { t.Fatal(err) } } + +func TestFuncAdminIncrementalAlterConfigs(t *testing.T) { + checkKafkaVersion(t, "2.3.0.0") + setupFunctionalTest(t) + defer teardownFunctionalTest(t) + + config := NewFunctionalTestConfig() + adminClient, err := NewClusterAdmin(FunctionalTestEnv.KafkaBrokerAddrs, config) + if err != nil { + t.Fatal(err) + } + defer safeClose(t, adminClient) + + brokerIDs := make([]int32, len(FunctionalTestEnv.KafkaBrokerAddrs)) + for i := range brokerIDs { + brokerIDs[i] = int32(i + 1) + } + + getConfigValue := func(config string) int { + resource := ConfigResource{ + Type: BrokerResource, + Name: "1", + ConfigNames: []string{config}, + } + res, err := adminClient.DescribeConfig(resource) + if err != nil { + t.Fatal(err) + } + if len(res) != 1 { + t.Fatalf("expected 1 config in response but got %d", len(res)) + } + if res[0].Name != config { + t.Fatalf("expected config in response name to be '%s' but got '%s'", config, res[0].Name) + } + n, err := strconv.Atoi(res[0].Value) + if err != nil { + t.Fatalf("failed to parse config in response value '%s': %v", res[0].Value, err) + } + return n + } + configName := "log.cleaner.backoff.ms" + n := getConfigValue(configName) + n++ + value := fmt.Sprintf("%d", n) + err = adminClient.IncrementalAlterConfig(BrokerResource, "1", + map[string]IncrementalAlterConfigsEntry{ + configName: { + Operation: IncrementalAlterConfigsOperationSet, + Value: &value, + }, + }, false) + if err != nil { + t.Fatalf("failed to alter config: %v", err) + } +} diff --git a/incremental_alter_configs_request.go b/incremental_alter_configs_request.go index 0ad302cfb..e04868431 100644 --- a/incremental_alter_configs_request.go +++ b/incremental_alter_configs_request.go @@ -43,6 +43,8 @@ func (a *IncrementalAlterConfigsRequest) encode(pe packetEncoder) error { } pe.putBool(a.ValidateOnly) + + pe.putEmptyTaggedFieldArray() return nil } @@ -69,7 +71,8 @@ func (a *IncrementalAlterConfigsRequest) decode(pd packetDecoder, version int16) a.ValidateOnly = validateOnly - return nil + _, err = pd.getEmptyTaggedFieldArray() + return err } func (a *IncrementalAlterConfigsResource) encode(pe packetEncoder) error { @@ -93,6 +96,7 @@ func (a *IncrementalAlterConfigsResource) encode(pe packetEncoder) error { } } + pe.putEmptyTaggedFieldArray() return nil } @@ -131,6 +135,7 @@ func (a *IncrementalAlterConfigsResource) decode(pd packetDecoder, version int16 a.ConfigEntries[name] = v } } + _, err = pd.getEmptyTaggedFieldArray() return err } @@ -141,6 +146,7 @@ func (a *IncrementalAlterConfigsEntry) encode(pe packetEncoder) error { return err } + pe.putEmptyTaggedFieldArray() return nil } @@ -158,7 +164,8 @@ func (a *IncrementalAlterConfigsEntry) decode(pd packetDecoder, version int16) e a.Value = s - return nil + _, err = pd.getEmptyTaggedFieldArray() + return err } func (a *IncrementalAlterConfigsRequest) key() int16 { @@ -170,13 +177,29 @@ func (a *IncrementalAlterConfigsRequest) version() int16 { } func (a *IncrementalAlterConfigsRequest) headerVersion() int16 { + if a.Version >= 1 { + return 2 + } return 1 } func (a *IncrementalAlterConfigsRequest) isValidVersion() bool { - return a.Version == 0 + return a.Version >= 0 && a.Version <= 1 +} + +func (a *IncrementalAlterConfigsRequest) isFlexible() bool { + return a.isFlexibleVersion(a.Version) +} + +func (a *IncrementalAlterConfigsRequest) isFlexibleVersion(version int16) bool { + return version >= 1 } func (a *IncrementalAlterConfigsRequest) requiredVersion() KafkaVersion { - return V2_3_0_0 + switch a.Version { + case 1: + return V2_4_0_0 + default: + return V2_3_0_0 + } } diff --git a/incremental_alter_configs_request_test.go b/incremental_alter_configs_request_test.go index 1b7d691ad..ab9b735d8 100644 --- a/incremental_alter_configs_request_test.go +++ b/incremental_alter_configs_request_test.go @@ -43,6 +43,54 @@ var ( '1', '0', '0', '0', 0, // don't validate } + + emptyIncrementalAlterConfigsRequestV1 = []byte{ + 1, // 0 configs + 0, // don't Validate + 0, // empty tagged fields + } + + singleIncrementalAlterConfigsRequestV1 = []byte{ + 2, // 1 config + 2, // a topic + 4, 'f', 'o', 'o', // topic name: foo + 2, // 1 config name + 11, // 10 chars + 's', 'e', 'g', 'm', 'e', 'n', 't', '.', 'm', 's', + 0, // OperationSet + 5, + '1', '0', '0', '0', + 0, // empty tagged fields + 0, // empty tagged fields + 0, // don't validate + 0, // empty tagged fields + } + + doubleIncrementalAlterConfigsRequestV1 = []byte{ + 3, // 2 config + 2, // a topic + 4, 'f', 'o', 'o', // topic name: foo + 2, // 1 config name + 11, // 10 chars + 's', 'e', 'g', 'm', 'e', 'n', 't', '.', 'm', 's', + 0, // OperationSet + 5, + '1', '0', '0', '0', + 0, // empty tagged fields + 0, // empty tagged fields + 2, // a topic + 4, 'b', 'a', 'r', // topic name: foo + 2, // 2 config + 13, // 12 chars + 'r', 'e', 't', 'e', 'n', 't', 'i', 'o', 'n', '.', 'm', 's', + 1, // OperationDelete + 5, + '1', '0', '0', '0', + 0, // empty tagged fields + 0, // empty tagged fields + 0, // don't validate + 0, // empty tagged fields + } ) func TestIncrementalAlterConfigsRequest(t *testing.T) { @@ -98,3 +146,60 @@ func TestIncrementalAlterConfigsRequest(t *testing.T) { testRequest(t, "two configs", request, doubleIncrementalAlterConfigsRequest) } + +func TestIncrementalAlterConfigsRequestV1(t *testing.T) { + var request *IncrementalAlterConfigsRequest + + request = &IncrementalAlterConfigsRequest{ + Version: 1, + Resources: []*IncrementalAlterConfigsResource{}, + } + testRequest(t, "no requests", request, emptyIncrementalAlterConfigsRequestV1) + + configValue := "1000" + request = &IncrementalAlterConfigsRequest{ + Version: 1, + Resources: []*IncrementalAlterConfigsResource{ + { + Type: TopicResource, + Name: "foo", + ConfigEntries: map[string]IncrementalAlterConfigsEntry{ + "segment.ms": { + Operation: IncrementalAlterConfigsOperationSet, + Value: &configValue, + }, + }, + }, + }, + } + + testRequest(t, "one config", request, singleIncrementalAlterConfigsRequestV1) + + request = &IncrementalAlterConfigsRequest{ + Version: 1, + Resources: []*IncrementalAlterConfigsResource{ + { + Type: TopicResource, + Name: "foo", + ConfigEntries: map[string]IncrementalAlterConfigsEntry{ + "segment.ms": { + Operation: IncrementalAlterConfigsOperationSet, + Value: &configValue, + }, + }, + }, + { + Type: TopicResource, + Name: "bar", + ConfigEntries: map[string]IncrementalAlterConfigsEntry{ + "retention.ms": { + Operation: IncrementalAlterConfigsOperationDelete, + Value: &configValue, + }, + }, + }, + }, + } + + testRequest(t, "two configs", request, doubleIncrementalAlterConfigsRequestV1) +} diff --git a/incremental_alter_configs_response.go b/incremental_alter_configs_response.go index 94a1247b3..8b7fd7b62 100644 --- a/incremental_alter_configs_response.go +++ b/incremental_alter_configs_response.go @@ -51,7 +51,8 @@ func (a *IncrementalAlterConfigsResponse) decode(pd packetDecoder, version int16 } } - return nil + _, err = pd.getEmptyTaggedFieldArray() + return err } func (a *IncrementalAlterConfigsResponse) key() int16 { @@ -63,15 +64,31 @@ func (a *IncrementalAlterConfigsResponse) version() int16 { } func (a *IncrementalAlterConfigsResponse) headerVersion() int16 { + if a.Version >= 1 { + return 1 + } return 0 } +func (a *IncrementalAlterConfigsResponse) isFlexible() bool { + return a.isFlexibleVersion(a.Version) +} + +func (a *IncrementalAlterConfigsResponse) isFlexibleVersion(version int16) bool { + return version >= 1 +} + func (a *IncrementalAlterConfigsResponse) isValidVersion() bool { - return a.Version == 0 + return a.Version >= 0 && a.Version <= 1 } func (a *IncrementalAlterConfigsResponse) requiredVersion() KafkaVersion { - return V2_3_0_0 + switch a.Version { + case 1: + return V2_4_0_0 + default: + return V2_3_0_0 + } } func (r *IncrementalAlterConfigsResponse) throttleTime() time.Duration { diff --git a/incremental_alter_configs_response_test.go b/incremental_alter_configs_response_test.go index 0b7688b0b..5e033b421 100644 --- a/incremental_alter_configs_response_test.go +++ b/incremental_alter_configs_response_test.go @@ -20,6 +20,34 @@ var ( 2, // topic 0, 3, 'f', 'o', 'o', } + + incrementalAlterResponseEmptyV1 = []byte{ + 0, 0, 0, 0, // throttle + 1, // no configs + 0, // empty tagged fields + } + + incrementalAlterResponsePopulatedV1 = []byte{ + 0, 0, 0, 0, // throttle + 2, // response + 0, 0, // errorcode + 1, // empty string + 2, // topic + 4, 'f', 'o', 'o', + 0, // empty tagged fields + 0, // empty tagged fields + } + + incrementalAlterConfigsResponseBrokerV1 = []byte{ + 0, 0, 0, 0, // throttle time + 2, // 1+1 response + 0, 0, // error code + 0, // null string + 4, // broker resource + 2, '1', // broker + 0, // empty tagged fields + 0, // empty tagged fields + } ) func TestIncrementalAlterConfigsResponse(t *testing.T) { @@ -45,3 +73,39 @@ func TestIncrementalAlterConfigsResponse(t *testing.T) { } testResponse(t, "response with error", response, incrementalAlterResponsePopulated) } + +func TestIncrementalAlterConfigsResponseV1(t *testing.T) { + var response *IncrementalAlterConfigsResponse + + response = &IncrementalAlterConfigsResponse{ + Resources: []*AlterConfigsResourceResponse{}, + } + testVersionDecodable(t, "empty", response, incrementalAlterResponseEmptyV1, 1) + if len(response.Resources) != 0 { + t.Error("Expected no groups") + } + + response = &IncrementalAlterConfigsResponse{ + Resources: []*AlterConfigsResourceResponse{ + { + ErrorCode: 0, + ErrorMsg: "", + Type: TopicResource, + Name: "foo", + }, + }, + } + testVersionDecodable(t, "response with error", response, incrementalAlterResponsePopulatedV1, 1) + + response = &IncrementalAlterConfigsResponse{ + Resources: []*AlterConfigsResourceResponse{ + { + ErrorCode: 0, + ErrorMsg: "", + Type: BrokerResource, + Name: "1", + }, + }, + } + testVersionDecodable(t, "response with error", response, incrementalAlterConfigsResponseBrokerV1, 1) +} diff --git a/request_test.go b/request_test.go index e7e78ef25..19c3819af 100644 --- a/request_test.go +++ b/request_test.go @@ -354,15 +354,14 @@ func TestAllocateBodyProtocolVersions(t *testing.T) { // apiKeySyncGroup: 4, // up from 3 // TODO: DescribeGroupsRequest v5 is not supported, but expected for KafkaVersion 2.4.0 // apiKeyDescribeGroups: 5, // up from 3 - apiKeyListGroups: 3, // up from 2 - apiKeyApiVersions: 3, // up from 2 - apiKeyCreateTopics: 5, // up from 3 - apiKeyDeleteTopics: 4, // up from 3 - apiKeyInitProducerId: 2, // up from 1 - apiKeyDeleteGroups: 2, // up from 1 - apiKeyElectLeaders: 2, // up from 0 - // TODO: IncrementalAlterConfigsRequest v1 is not supported, but expected for KafkaVersion 2.4.0 - // apiKeyIncrementalAlterConfigs: 1, // up from 0 + apiKeyListGroups: 3, // up from 2 + apiKeyApiVersions: 3, // up from 2 + apiKeyCreateTopics: 5, // up from 3 + apiKeyDeleteTopics: 4, // up from 3 + apiKeyInitProducerId: 2, // up from 1 + apiKeyDeleteGroups: 2, // up from 1 + apiKeyElectLeaders: 2, // up from 0 + apiKeyIncrementalAlterConfigs: 1, // up from 0 apiKeyAlterPartitionReassignments: 0, // new in 2.4 apiKeyListPartitionReassignments: 0, // new in 2.4 apiKeyOffsetDelete: 0, // new in 2.4