Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -1125,7 +1125,10 @@ func (ca *clusterAdmin) DeleteConsumerGroup(group string) error {
request := &DeleteGroupsRequest{
Groups: []string{group},
}
if ca.conf.Version.IsAtLeast(V2_0_0_0) {

if ca.conf.Version.IsAtLeast(V2_4_0_0) {
request.Version = 2
} else if ca.conf.Version.IsAtLeast(V2_0_0_0) {
request.Version = 1
}

Expand Down
25 changes: 23 additions & 2 deletions delete_groups_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,19 @@ func (r *DeleteGroupsRequest) setVersion(v int16) {
}

func (r *DeleteGroupsRequest) encode(pe packetEncoder) error {
return pe.putStringArray(r.Groups)
if err := pe.putStringArray(r.Groups); err != nil {
return err
}
pe.putEmptyTaggedFieldArray()
return nil
}

func (r *DeleteGroupsRequest) decode(pd packetDecoder, version int16) (err error) {
r.Groups, err = pd.getStringArray()
if err != nil {
return err
}
_, err = pd.getEmptyTaggedFieldArray()
return
}

Expand All @@ -27,15 +35,28 @@ func (r *DeleteGroupsRequest) version() int16 {
}

func (r *DeleteGroupsRequest) headerVersion() int16 {
if r.Version >= 2 {
return 2
}
return 1
}

func (r *DeleteGroupsRequest) isFlexible() bool {
return r.isFlexibleVersion(r.Version)
}

func (r *DeleteGroupsRequest) isFlexibleVersion(version int16) bool {
return version >= 2
}

func (r *DeleteGroupsRequest) isValidVersion() bool {
return r.Version >= 0 && r.Version <= 1
return r.Version >= 0 && r.Version <= 2
}

func (r *DeleteGroupsRequest) requiredVersion() KafkaVersion {
switch r.Version {
case 2:
return V2_4_0_0
case 1:
return V2_0_0_0
case 0:
Expand Down
40 changes: 40 additions & 0 deletions delete_groups_request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,24 @@ var (
0, 3, 'f', 'o', 'o', // group name: foo
0, 3, 'b', 'a', 'r', // group name: foo
}

emptyDeleteGroupsRequestV2 = []byte{
1,
0, // empty tagged fields
}

singleDeleteGroupsRequestV2 = []byte{
2, // 1 group
4, 'f', 'o', 'o', // group name: foo
0, // empty tagged fields
}

doubleDeleteGroupsRequestV2 = []byte{
3, // 2 groups
4, 'f', 'o', 'o', // group name: foo
4, 'b', 'a', 'r', // group name: foo
0, // empty tagged fields
}
)

func TestDeleteGroupsRequest(t *testing.T) {
Expand All @@ -34,3 +52,25 @@ func TestDeleteGroupsRequest(t *testing.T) {
request.AddGroup("bar")
testRequest(t, "two groups", request, doubleDeleteGroupsRequest)
}

func TestDeleteGroupsRequestV2(t *testing.T) {
var request *DeleteGroupsRequest

request = &DeleteGroupsRequest{
Version: 2,
}
testRequest(t, "no groups", request, emptyDeleteGroupsRequestV2)

request = &DeleteGroupsRequest{
Version: 2,
}
request.AddGroup("foo")
testRequest(t, "one group", request, singleDeleteGroupsRequestV2)

request = &DeleteGroupsRequest{
Version: 2,
}
request.AddGroup("foo")
request.AddGroup("bar")
testRequest(t, "two groups", request, doubleDeleteGroupsRequestV2)
}
26 changes: 23 additions & 3 deletions delete_groups_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@ func (r *DeleteGroupsResponse) encode(pe packetEncoder) error {
return err
}
pe.putInt16(int16(errorCode))
pe.putEmptyTaggedFieldArray()
}

pe.putEmptyTaggedFieldArray()
return nil
}

Expand All @@ -42,7 +44,8 @@ func (r *DeleteGroupsResponse) decode(pd packetDecoder, version int16) error {
return err
}
if n == 0 {
return nil
_, err = pd.getEmptyTaggedFieldArray()
return err
}

r.GroupErrorCodes = make(map[string]KError, n)
Expand All @@ -57,9 +60,13 @@ func (r *DeleteGroupsResponse) decode(pd packetDecoder, version int16) error {
}

r.GroupErrorCodes[groupID] = KError(errorCode)
if _, err := pd.getEmptyTaggedFieldArray(); err != nil {
return err
}
}

return nil
_, err = pd.getEmptyTaggedFieldArray()
return err
}

func (r *DeleteGroupsResponse) key() int16 {
Expand All @@ -71,15 +78,28 @@ func (r *DeleteGroupsResponse) version() int16 {
}

func (r *DeleteGroupsResponse) headerVersion() int16 {
if r.Version >= 2 {
return 1
}
return 0
}

func (r *DeleteGroupsResponse) isFlexible() bool {
return r.isFlexibleVersion(r.Version)
}

func (r *DeleteGroupsResponse) isFlexibleVersion(version int16) bool {
return version >= 2
}

func (r *DeleteGroupsResponse) isValidVersion() bool {
return r.Version >= 0 && r.Version <= 1
return r.Version >= 0 && r.Version <= 2
}

func (r *DeleteGroupsResponse) requiredVersion() KafkaVersion {
switch r.Version {
case 2:
return V2_4_0_0
case 1:
return V2_0_0_0
case 0:
Expand Down
55 changes: 55 additions & 0 deletions delete_groups_response_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,30 @@ var (
0, 3, 'f', 'o', 'o', // group name
0, 0, // no error
}

emptyDeleteGroupsResponseV2 = []byte{
0, 0, 0, 0, // does not violate any quota
1, // no groups
0, // empty tagged fields
}

errorDeleteGroupsResponseV2 = []byte{
0, 0, 0, 0, // does not violate any quota
2, // 1 group
4, 'f', 'o', 'o', // group name
0, 31, // error ErrClusterAuthorizationFailed
0, // empty tagged fields
0, // empty tagged fields
}

noErrorDeleteGroupsResponseV2 = []byte{
0, 0, 0, 0, // does not violate any quota
2, // 1 group
4, 'f', 'o', 'o', // group name
0, 0, // no error
0, // empty tagged fields
0, // empty tagged fields
}
)

func TestDeleteGroupsResponse(t *testing.T) {
Expand Down Expand Up @@ -58,3 +82,34 @@ func TestDeleteGroupsResponse(t *testing.T) {
t.Error("Expected error ErrClusterAuthorizationFailed, found:", response.GroupErrorCodes["foo"])
}
}

func TestDeleteGroupsResponseV2(t *testing.T) {
var response *DeleteGroupsResponse

response = new(DeleteGroupsResponse)
testVersionDecodable(t, "empty", response, emptyDeleteGroupsResponseV2, 2)
if response.ThrottleTime != 0 {
t.Error("Expected no violation")
}
if len(response.GroupErrorCodes) != 0 {
t.Error("Expected no groups")
}

response = new(DeleteGroupsResponse)
testVersionDecodable(t, "error", response, errorDeleteGroupsResponseV2, 2)
if response.ThrottleTime != 0 {
t.Error("Expected no violation")
}
if !errors.Is(response.GroupErrorCodes["foo"], ErrClusterAuthorizationFailed) {
t.Error("Expected error ErrClusterAuthorizationFailed, found:", response.GroupErrorCodes["foo"])
}

response = new(DeleteGroupsResponse)
testVersionDecodable(t, "no error", response, noErrorDeleteGroupsResponseV2, 2)
if response.ThrottleTime != 0 {
t.Error("Expected no violation")
}
if !errors.Is(response.GroupErrorCodes["foo"], ErrNoError) {
t.Error("Expected error ErrClusterAuthorizationFailed, found:", response.GroupErrorCodes["foo"])
}
}
50 changes: 50 additions & 0 deletions functional_admin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,3 +347,53 @@ func TestFuncAdminDescribeLogDirs(t *testing.T) {
}
}
}

func TestFuncAdminDeleteGroup(t *testing.T) {
checkKafkaVersion(t, "2.4.0")
setupFunctionalTest(t)
defer teardownFunctionalTest(t)
config := NewFunctionalTestConfig()
client, err := NewClient(FunctionalTestEnv.KafkaBrokerAddrs, config)
defer safeClose(t, client)
if err != nil {
t.Fatal(err)
}

// create a consumer group
groupID := testFuncConsumerGroupID(t)
consumerGroup, err := NewConsumerGroupFromClient(groupID, client)
if err != nil {
t.Fatal(err)
}
defer safeClose(t, consumerGroup)

offsetMgr, _ := NewOffsetManagerFromClient(groupID, client)
defer safeClose(t, offsetMgr)
markOffset(t, offsetMgr, "test.1", 0, 1)
offsetMgr.Commit()

admin, err := NewClusterAdminFromClient(client)
if err != nil {
t.Fatal(err)
}
groups, err := admin.ListConsumerGroups()
if err != nil {
t.Fatal(err)
}
if len(groups) != 1 {
t.Fatal("Expected single group. Found ", len(groups), "groups.")
}

err = admin.DeleteConsumerGroup(groupID)
if err != nil {
t.Fatal(err)
}

groups, err = admin.ListConsumerGroups()
if err != nil {
t.Fatal(err)
}
if len(groups) != 0 {
t.Fatal("Expected no group. Found ", len(groups), "groups.")
}
}
5 changes: 2 additions & 3 deletions request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,9 +361,8 @@ func TestAllocateBodyProtocolVersions(t *testing.T) {
// TODO: DeleteTopicsRequest v4 is not supported, but expected for KafkaVersion 2.4.0
// apiKeyDeleteTopics: 4, // up from 3
apiKeyInitProducerId: 2, // up from 1
// TODO: DeleteGroupsRequest v2 is not supported, but expected for KafkaVersion 2.4.0
// apiKeyDeleteGroups: 2, // up from 1
apiKeyElectLeaders: 2, // up from 0
apiKeyDeleteGroups: 2, // up from 1
apiKeyElectLeaders: 2, // up from 0
// TODO: IncrementalAlterConfigsRequest v1 is not supported, but expected for KafkaVersion 2.4.0
// apiKeyIncrementalAlterConfigs: 1, // up from 0
apiKeyAlterPartitionReassignments: 0, // new in 2.4
Expand Down
Loading