Skip to content

Commit

Permalink
Fix failing tests and change result of DeleteConsumerGroups
Browse files Browse the repository at this point in the history
also contains some documentation, changelog changes (minor).
  • Loading branch information
milindl committed Jan 16, 2023
1 parent 2c4d900 commit 16f671d
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 29 deletions.
4 changes: 2 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ This is a maintenance release:
Allow listing consumer groups per state.
* [KIP-396](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=97551484)
Partially implemented: support for AlterConsumerGroupOffsets.
* As result of the above KIPs, added (#TODO_ADD_PR)
* As result of the above KIPs, added (#923)
- `ListConsumerGroups` Admin operation. Supports listing by state.
- `DescribeConsumerGroups` Admin operation. Supports multiple groups.
- `DeleteConsumerGroups` Admin operation. Supports multiple groups (@santwanav).
- `DeleteConsumerGroups` Admin operation. Supports multiple groups (@vsantwana).
- `ListConsumerGroupOffsets` Admin operation. Currently, only supports
1 group with multiple partitions. Supports the `requireStable` option.
- `AlterConsumerGroupOffsets` Admin operation. Currently, only supports
Expand Down
40 changes: 26 additions & 14 deletions kafka/adminapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,13 @@ type DescribeConsumerGroupsResult struct {
ConsumerGroupDescriptions []ConsumerGroupDescription
}

// DeleteConsumerGroupResult represents the result of a DeleteConsumerGroups
// call.
type DeleteConsumerGroupResult struct {
// Slice of GroupResult.
GroupResults []GroupResult
}

// ListConsumerGroupOffsetsResult represents the result of a
// ListConsumerGroupOffsets operation.
type ListConsumerGroupOffsetsResult struct {
Expand Down Expand Up @@ -1916,9 +1923,11 @@ func (a *AdminClient) ListConsumerGroups(
// * `groups` - Slice of groups to describe. This should not be nil/empty.
// * `options` - DescribeConsumerGroupsAdminOption options.
//
// Returns a slice of ConsumerGroupDescriptions corresponding to the input
// groups, plus an error that is not `nil` for client level errors. Individual
// ConsumerGroupDescriptions inside the slice should also be checked for errors.
// Returns DescribeConsumerGroupsResult, which contains a slice of
// ConsumerGroupDescriptions corresponding to the input groups, plus an error
// that is not `nil` for client level errors. Individual
// ConsumerGroupDescriptions inside the slice should also be checked for
// errors.
func (a *AdminClient) DescribeConsumerGroups(
ctx context.Context, groups []string,
options ...DescribeConsumerGroupsAdminOption) (result DescribeConsumerGroupsResult, err error) {
Expand Down Expand Up @@ -1987,12 +1996,14 @@ func (a *AdminClient) DescribeConsumerGroups(
// * `groups` - A slice of groupIDs to delete.
// * `options` - DeleteConsumerGroupsAdminOption options.
//
// Returns a slice of GroupResults, with group-level errors, (if any) contained
// inside; and an error that is not nil for client level errors.
// Returns a DeleteConsumerGroupResult containing a slice of GroupResults, with
// group-level errors, (if any) contained inside; and an error that is not nil
// for client level errors.
func (a *AdminClient) DeleteConsumerGroups(
ctx context.Context,
groups []string, options ...DeleteConsumerGroupsAdminOption) (result []GroupResult, err error) {
groups []string, options ...DeleteConsumerGroupsAdminOption) (result DeleteConsumerGroupResult, err error) {
cGroups := make([]*C.rd_kafka_DeleteGroup_t, len(groups))
deleteResult := DeleteConsumerGroupResult{}

// Convert Go DeleteGroups to C DeleteGroups
for i, group := range groups {
Expand All @@ -2001,7 +2012,7 @@ func (a *AdminClient) DeleteConsumerGroups(

cGroups[i] = C.rd_kafka_DeleteGroup_new(cGroupID)
if cGroups[i] == nil {
return nil, newErrorFromString(ErrInvalidArg,
return deleteResult, newErrorFromString(ErrInvalidArg,
fmt.Sprintf("Invalid arguments for group %s", group))
}

Expand All @@ -2016,7 +2027,7 @@ func (a *AdminClient) DeleteConsumerGroups(
cOptions, err := adminOptionsSetup(
a.handle, C.RD_KAFKA_ADMIN_OP_DELETEGROUPS, genericOptions)
if err != nil {
return nil, err
return deleteResult, err
}
defer C.rd_kafka_AdminOptions_destroy(cOptions)

Expand All @@ -2035,7 +2046,7 @@ func (a *AdminClient) DeleteConsumerGroups(
// Wait for result, error or context timeout
rkev, err := a.waitResult(ctx, cQueue, C.RD_KAFKA_EVENT_DELETEGROUPS_RESULT)
if err != nil {
return nil, err
return deleteResult, err
}
defer C.rd_kafka_event_destroy(rkev)

Expand All @@ -2045,7 +2056,8 @@ func (a *AdminClient) DeleteConsumerGroups(
var cCnt C.size_t
cGroupRes := C.rd_kafka_DeleteGroups_result_groups(cRes, &cCnt)

return a.cToGroupResults(cGroupRes, cCnt)
deleteResult.GroupResults, err = a.cToGroupResults(cGroupRes, cCnt)
return deleteResult, err
}

// ListConsumerGroupOffsets fetches the offsets for topic partition(s) for
Expand Down Expand Up @@ -2149,10 +2161,10 @@ func (a *AdminClient) ListConsumerGroupOffsets(
// `groupsPartitions` has to be exactly one.
// * `options` - AlterConsumerGroupOffsetsAdminOption options.
//
// Returns a AlterConsumerGroupOffsetsResult, containng slice of
// ConsumerGroupTopicPartitions corresponding to the input slice, plus an error that is
// not `nil` for client level errors. Individual TopicPartitions inside each of
// the ConsumerGroupTopicPartitions should also be checked for errors.
// Returns a AlterConsumerGroupOffsetsResult, containing a slice of
// ConsumerGroupTopicPartitions corresponding to the input slice, plus an error
// that is not `nil` for client level errors. Individual TopicPartitions inside
// each of the ConsumerGroupTopicPartitions should also be checked for errors.
// This will succeed at the partition level only if the group is not actively
// subscribed to the corresponding topic(s).
func (a *AdminClient) AlterConsumerGroupOffsets(
Expand Down
6 changes: 3 additions & 3 deletions kafka/adminapi_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,7 @@ func testAdminAPIsDescribeConsumerGroups(
defer cancel()
descres, err := a.DescribeConsumerGroups(
ctx, nil, SetAdminRequestTimeout(time.Second))
if descres != nil || err == nil {
if descres.ConsumerGroupDescriptions != nil || err == nil {
t.Fatalf("Expected DescribeConsumerGroups to fail, but got result: %v, err: %v",
descres, err)
}
Expand All @@ -459,7 +459,7 @@ func testAdminAPIsDescribeConsumerGroups(
defer cancel()
descres, err = a.DescribeConsumerGroups(
ctx, []string{"test"}, SetAdminRequestTimeout(time.Second))
if descres != nil || err == nil {
if descres.ConsumerGroupDescriptions != nil || err == nil {
t.Fatalf("Expected DescribeConsumerGroups to fail, but got result: %v, err: %v",
descres, err)
}
Expand All @@ -474,7 +474,7 @@ func testAdminAPIsDeleteConsumerGroups(
defer cancel()
dgres, err := a.DeleteConsumerGroups(ctx, []string{"group1"},
SetAdminRequestTimeout(time.Second))
if dgres != nil || err == nil {
if dgres.GroupResults != nil || err == nil {
t.Fatalf("Expected DeleteGroups to fail, but got result: %v, err: %v",
dgres, err)
}
Expand Down
25 changes: 15 additions & 10 deletions kafka/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1368,14 +1368,15 @@ func TestAdminClient_DeleteConsumerGroups(t *testing.T) {
t.Errorf("DeleteConsumerGroups() failed: %s", err)
return
}
resultGroups := result.GroupResults

if len(result) != 1 || result[0].Group != groupID {
if len(resultGroups) != 1 || resultGroups[0].Group != groupID {
t.Errorf("Wrong group affected/no group affected")
return
}

if result[0].Error.code != ErrNonEmptyGroup {
t.Errorf("Encountered the wrong error after calling DeleteConsumerGroups %s", result[0].Error)
if resultGroups[0].Error.code != ErrNonEmptyGroup {
t.Errorf("Encountered the wrong error after calling DeleteConsumerGroups %s", resultGroups[0].Error)
return
}

Expand All @@ -1392,14 +1393,15 @@ func TestAdminClient_DeleteConsumerGroups(t *testing.T) {
t.Errorf("DeleteConsumerGroups() failed: %s", err)
return
}
resultGroups = result.GroupResults

if len(result) != 1 || result[0].Group != groupID {
if len(resultGroups) != 1 || resultGroups[0].Group != groupID {
t.Errorf("Wrong group affected/no group affected")
return
}

if result[0].Error.code != ErrNoError {
t.Errorf("Encountered an error after calling DeleteConsumerGroups %s", result[0].Error)
if resultGroups[0].Error.code != ErrNoError {
t.Errorf("Encountered an error after calling DeleteConsumerGroups %s", resultGroups[0].Error)
return
}

Expand Down Expand Up @@ -1525,13 +1527,14 @@ func TestAdminClient_ListAndDescribeConsumerGroups(t *testing.T) {
// Test the description of the consumer group.
ctx, cancel = context.WithTimeout(context.Background(), time.Second*30)
defer cancel()
groupDescs, err := ac.DescribeConsumerGroups(
groupDescResult, err := ac.DescribeConsumerGroups(
ctx, []string{groupID}, SetAdminRequestTimeout(30*time.Second))
if err != nil {
t.Errorf("Error describing consumer groups %s\n", err)
return
}

groupDescs := groupDescResult.ConsumerGroupDescriptions
if len(groupDescs) != 1 {
t.Errorf("Describing one group should give exactly one result %s\n", err)
return
Expand Down Expand Up @@ -1583,12 +1586,12 @@ func TestAdminClient_ListAndDescribeConsumerGroups(t *testing.T) {
for !isGroupStable {
ctx, cancel = context.WithTimeout(context.Background(), time.Second*30)
defer cancel()
groupDescs, err = ac.DescribeConsumerGroups(ctx, []string{groupID}, SetAdminRequestTimeout(30*time.Second))
groupDescResult, err = ac.DescribeConsumerGroups(ctx, []string{groupID}, SetAdminRequestTimeout(30*time.Second))
if err != nil {
t.Errorf("Error describing consumer groups %s\n", err)
return
}

groupDescs = groupDescResult.ConsumerGroupDescriptions
groupDesc = findConsumerGroupDescription(groupDescs, groupID)
if groupDesc == nil {
t.Errorf("Consumer group %s should be present\n", groupID)
Expand Down Expand Up @@ -1626,7 +1629,9 @@ func TestAdminClient_ListAndDescribeConsumerGroups(t *testing.T) {
// Try describing an empty group.
ctx, cancel = context.WithTimeout(context.Background(), time.Second*30)
defer cancel()
groupDescs, err = ac.DescribeConsumerGroups(ctx, []string{groupID}, SetAdminRequestTimeout(30*time.Second))
groupDescResult, err = ac.DescribeConsumerGroups(ctx, []string{groupID}, SetAdminRequestTimeout(30*time.Second))
groupDescs = groupDescResult.ConsumerGroupDescriptions

if err != nil {
t.Errorf("Error describing consumer groups %s\n", err)
return
Expand Down

0 comments on commit 16f671d

Please sign in to comment.