diff --git a/admin.go b/admin.go index 4b8f02995..2a2919b01 100644 --- a/admin.go +++ b/admin.go @@ -1125,7 +1125,10 @@ func (ca *clusterAdmin) DeleteConsumerGroup(group string) error { request := &DeleteGroupsRequest{ Groups: []string{group}, } - if ca.conf.Version.IsAtLeast(V2_0_0_0) { + + if ca.conf.Version.IsAtLeast(V2_4_0_0) { + request.Version = 2 + } else if ca.conf.Version.IsAtLeast(V2_0_0_0) { request.Version = 1 } diff --git a/delete_groups_request.go b/delete_groups_request.go index 72bc8bc3a..e6c1db0f1 100644 --- a/delete_groups_request.go +++ b/delete_groups_request.go @@ -10,11 +10,19 @@ func (r *DeleteGroupsRequest) setVersion(v int16) { } func (r *DeleteGroupsRequest) encode(pe packetEncoder) error { - return pe.putStringArray(r.Groups) + if err := pe.putStringArray(r.Groups); err != nil { + return err + } + pe.putEmptyTaggedFieldArray() + return nil } func (r *DeleteGroupsRequest) decode(pd packetDecoder, version int16) (err error) { r.Groups, err = pd.getStringArray() + if err != nil { + return err + } + _, err = pd.getEmptyTaggedFieldArray() return } @@ -27,15 +35,28 @@ func (r *DeleteGroupsRequest) version() int16 { } func (r *DeleteGroupsRequest) headerVersion() int16 { + if r.Version >= 2 { + return 2 + } return 1 } +func (r *DeleteGroupsRequest) isFlexible() bool { + return r.isFlexibleVersion(r.Version) +} + +func (r *DeleteGroupsRequest) isFlexibleVersion(version int16) bool { + return version >= 2 +} + func (r *DeleteGroupsRequest) isValidVersion() bool { - return r.Version >= 0 && r.Version <= 1 + return r.Version >= 0 && r.Version <= 2 } func (r *DeleteGroupsRequest) requiredVersion() KafkaVersion { switch r.Version { + case 2: + return V2_4_0_0 case 1: return V2_0_0_0 case 0: diff --git a/delete_groups_request_test.go b/delete_groups_request_test.go index 8381829a0..4001258f8 100644 --- a/delete_groups_request_test.go +++ b/delete_groups_request_test.go @@ -17,6 +17,24 @@ var ( 0, 3, 'f', 'o', 'o', // group name: foo 0, 3, 'b', 'a', 'r', // group name: foo } + + emptyDeleteGroupsRequestV2 = []byte{ + 1, + 0, // empty tagged fields + } + + singleDeleteGroupsRequestV2 = []byte{ + 2, // 1 group + 4, 'f', 'o', 'o', // group name: foo + 0, // empty tagged fields + } + + doubleDeleteGroupsRequestV2 = []byte{ + 3, // 2 groups + 4, 'f', 'o', 'o', // group name: foo + 4, 'b', 'a', 'r', // group name: foo + 0, // empty tagged fields + } ) func TestDeleteGroupsRequest(t *testing.T) { @@ -34,3 +52,25 @@ func TestDeleteGroupsRequest(t *testing.T) { request.AddGroup("bar") testRequest(t, "two groups", request, doubleDeleteGroupsRequest) } + +func TestDeleteGroupsRequestV2(t *testing.T) { + var request *DeleteGroupsRequest + + request = &DeleteGroupsRequest{ + Version: 2, + } + testRequest(t, "no groups", request, emptyDeleteGroupsRequestV2) + + request = &DeleteGroupsRequest{ + Version: 2, + } + request.AddGroup("foo") + testRequest(t, "one group", request, singleDeleteGroupsRequestV2) + + request = &DeleteGroupsRequest{ + Version: 2, + } + request.AddGroup("foo") + request.AddGroup("bar") + testRequest(t, "two groups", request, doubleDeleteGroupsRequestV2) +} diff --git a/delete_groups_response.go b/delete_groups_response.go index 60504a796..508b0f084 100644 --- a/delete_groups_response.go +++ b/delete_groups_response.go @@ -25,8 +25,10 @@ func (r *DeleteGroupsResponse) encode(pe packetEncoder) error { return err } pe.putInt16(int16(errorCode)) + pe.putEmptyTaggedFieldArray() } + pe.putEmptyTaggedFieldArray() return nil } @@ -42,7 +44,8 @@ func (r *DeleteGroupsResponse) decode(pd packetDecoder, version int16) error { return err } if n == 0 { - return nil + _, err = pd.getEmptyTaggedFieldArray() + return err } r.GroupErrorCodes = make(map[string]KError, n) @@ -57,9 +60,13 @@ func (r *DeleteGroupsResponse) decode(pd packetDecoder, version int16) error { } r.GroupErrorCodes[groupID] = KError(errorCode) + if _, err := pd.getEmptyTaggedFieldArray(); err != nil { + return err + } } - return nil + _, err = pd.getEmptyTaggedFieldArray() + return err } func (r *DeleteGroupsResponse) key() int16 { @@ -71,15 +78,28 @@ func (r *DeleteGroupsResponse) version() int16 { } func (r *DeleteGroupsResponse) headerVersion() int16 { + if r.Version >= 2 { + return 1 + } return 0 } +func (r *DeleteGroupsResponse) isFlexible() bool { + return r.isFlexibleVersion(r.Version) +} + +func (r *DeleteGroupsResponse) isFlexibleVersion(version int16) bool { + return version >= 2 +} + func (r *DeleteGroupsResponse) isValidVersion() bool { - return r.Version >= 0 && r.Version <= 1 + return r.Version >= 0 && r.Version <= 2 } func (r *DeleteGroupsResponse) requiredVersion() KafkaVersion { switch r.Version { + case 2: + return V2_4_0_0 case 1: return V2_0_0_0 case 0: diff --git a/delete_groups_response_test.go b/delete_groups_response_test.go index 824f0217a..29688f521 100644 --- a/delete_groups_response_test.go +++ b/delete_groups_response_test.go @@ -26,6 +26,30 @@ var ( 0, 3, 'f', 'o', 'o', // group name 0, 0, // no error } + + emptyDeleteGroupsResponseV2 = []byte{ + 0, 0, 0, 0, // does not violate any quota + 1, // no groups + 0, // empty tagged fields + } + + errorDeleteGroupsResponseV2 = []byte{ + 0, 0, 0, 0, // does not violate any quota + 2, // 1 group + 4, 'f', 'o', 'o', // group name + 0, 31, // error ErrClusterAuthorizationFailed + 0, // empty tagged fields + 0, // empty tagged fields + } + + noErrorDeleteGroupsResponseV2 = []byte{ + 0, 0, 0, 0, // does not violate any quota + 2, // 1 group + 4, 'f', 'o', 'o', // group name + 0, 0, // no error + 0, // empty tagged fields + 0, // empty tagged fields + } ) func TestDeleteGroupsResponse(t *testing.T) { @@ -58,3 +82,34 @@ func TestDeleteGroupsResponse(t *testing.T) { t.Error("Expected error ErrClusterAuthorizationFailed, found:", response.GroupErrorCodes["foo"]) } } + +func TestDeleteGroupsResponseV2(t *testing.T) { + var response *DeleteGroupsResponse + + response = new(DeleteGroupsResponse) + testVersionDecodable(t, "empty", response, emptyDeleteGroupsResponseV2, 2) + if response.ThrottleTime != 0 { + t.Error("Expected no violation") + } + if len(response.GroupErrorCodes) != 0 { + t.Error("Expected no groups") + } + + response = new(DeleteGroupsResponse) + testVersionDecodable(t, "error", response, errorDeleteGroupsResponseV2, 2) + if response.ThrottleTime != 0 { + t.Error("Expected no violation") + } + if !errors.Is(response.GroupErrorCodes["foo"], ErrClusterAuthorizationFailed) { + t.Error("Expected error ErrClusterAuthorizationFailed, found:", response.GroupErrorCodes["foo"]) + } + + response = new(DeleteGroupsResponse) + testVersionDecodable(t, "no error", response, noErrorDeleteGroupsResponseV2, 2) + if response.ThrottleTime != 0 { + t.Error("Expected no violation") + } + if !errors.Is(response.GroupErrorCodes["foo"], ErrNoError) { + t.Error("Expected error ErrClusterAuthorizationFailed, found:", response.GroupErrorCodes["foo"]) + } +} diff --git a/functional_admin_test.go b/functional_admin_test.go index d1fc17876..0553a89a5 100644 --- a/functional_admin_test.go +++ b/functional_admin_test.go @@ -347,3 +347,53 @@ func TestFuncAdminDescribeLogDirs(t *testing.T) { } } } + +func TestFuncAdminDeleteGroup(t *testing.T) { + checkKafkaVersion(t, "2.4.0") + setupFunctionalTest(t) + defer teardownFunctionalTest(t) + config := NewFunctionalTestConfig() + client, err := NewClient(FunctionalTestEnv.KafkaBrokerAddrs, config) + defer safeClose(t, client) + if err != nil { + t.Fatal(err) + } + + // create a consumer group + groupID := testFuncConsumerGroupID(t) + consumerGroup, err := NewConsumerGroupFromClient(groupID, client) + if err != nil { + t.Fatal(err) + } + defer safeClose(t, consumerGroup) + + offsetMgr, _ := NewOffsetManagerFromClient(groupID, client) + defer safeClose(t, offsetMgr) + markOffset(t, offsetMgr, "test.1", 0, 1) + offsetMgr.Commit() + + admin, err := NewClusterAdminFromClient(client) + if err != nil { + t.Fatal(err) + } + groups, err := admin.ListConsumerGroups() + if err != nil { + t.Fatal(err) + } + if len(groups) != 1 { + t.Fatal("Expected single group. Found ", len(groups), "groups.") + } + + err = admin.DeleteConsumerGroup(groupID) + if err != nil { + t.Fatal(err) + } + + groups, err = admin.ListConsumerGroups() + if err != nil { + t.Fatal(err) + } + if len(groups) != 0 { + t.Fatal("Expected no group. Found ", len(groups), "groups.") + } +} diff --git a/request_test.go b/request_test.go index a588deaa9..81cc45f87 100644 --- a/request_test.go +++ b/request_test.go @@ -361,9 +361,8 @@ func TestAllocateBodyProtocolVersions(t *testing.T) { // TODO: DeleteTopicsRequest v4 is not supported, but expected for KafkaVersion 2.4.0 // apiKeyDeleteTopics: 4, // up from 3 apiKeyInitProducerId: 2, // up from 1 - // TODO: DeleteGroupsRequest v2 is not supported, but expected for KafkaVersion 2.4.0 - // apiKeyDeleteGroups: 2, // up from 1 - apiKeyElectLeaders: 2, // up from 0 + apiKeyDeleteGroups: 2, // up from 1 + apiKeyElectLeaders: 2, // up from 0 // TODO: IncrementalAlterConfigsRequest v1 is not supported, but expected for KafkaVersion 2.4.0 // apiKeyIncrementalAlterConfigs: 1, // up from 0 apiKeyAlterPartitionReassignments: 0, // new in 2.4