diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 5f8bfe5b372e..506042fb6ca9 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -279,6 +279,7 @@ otherwise no tag is added. {issue}42208[42208] {pull}42403[42403] - Add AWS OwningAccount support for cross account monitoring {issue}40570[40570] {pull}40691[40691] - Use namespace for GetListMetrics when exists in AWS {pull}41022[41022] - Only fetch cluster-level index stats summary {issue}36019[36019] {pull}42901[42901] +- Fix panic in kafka consumergroup member assignment fetching when there are 0 members in consumer group. {pull}44576[44576] - Upgrade `go.mongodb.org/mongo-driver` from `v1.14.0` to `v1.17.4` to fix connection leaks in MongoDB module {pull}44769[44769] *Osquerybeat* diff --git a/metricbeat/module/kafka/broker.go b/metricbeat/module/kafka/broker.go index 2ee059c76693..5b7a78804b18 100644 --- a/metricbeat/module/kafka/broker.go +++ b/metricbeat/module/kafka/broker.go @@ -141,7 +141,7 @@ func (b *Broker) Connect() error { other := finder.findBroker(brokerAddress(b.broker), meta.Brokers) if other == nil { // no broker found closeBroker(b.broker) - return fmt.Errorf("No advertised broker with address %v found", b.Addr()) + return fmt.Errorf("no advertised broker with address %v found", b.Addr()) } b.logger.Named("kafka").Debugf("found matching broker %v with id %v", other.Addr(), other.ID()) @@ -247,21 +247,12 @@ func (b *Broker) DescribeGroups( members := map[string]MemberDescription{} for memberID, memberDescr := range descr.Members { - assignment, err := memberDescr.GetMemberAssignment() + memberDescription, err := fromSaramaGroupMemberDescription(memberDescr) if err != nil { - members[memberID] = MemberDescription{ - ClientID: memberDescr.ClientId, - ClientHost: memberDescr.ClientHost, - Err: err, - } + b.logger.Debugf("error converting member description: %v", err) continue } - - members[memberID] = MemberDescription{ - ClientID: memberDescr.ClientId, - ClientHost: memberDescr.ClientHost, - Topics: assignment.Topics, - } + members[memberID] = memberDescription } groups[descr.GroupId] = GroupDescription{Members: members} } @@ -542,6 +533,31 @@ func (m *brokerFinder) lookupHosts(ips []net.IP) []string { return hosts } +func fromSaramaGroupMemberDescription(memberDescr *sarama.GroupMemberDescription) (MemberDescription, error) { + if memberDescr == nil { + return MemberDescription{}, errors.New("nil GroupMemberDescription") + } + + assignment, err := memberDescr.GetMemberAssignment() + if err != nil { + return MemberDescription{ //nolint:nilerr // in this case we should return no error and the error is reported in MemberDescription + ClientID: memberDescr.ClientId, + ClientHost: memberDescr.ClientHost, + Err: err, + }, nil + } + + assignmentTopics := make(map[string][]int32) + if assignment != nil { + assignmentTopics = assignment.Topics + } + return MemberDescription{ + ClientID: memberDescr.ClientId, + ClientHost: memberDescr.ClientHost, + Topics: assignmentTopics, + }, nil +} + func anyIPsMatch(as, bs []net.IP) bool { for _, a := range as { for _, b := range bs { diff --git a/metricbeat/module/kafka/broker_test.go b/metricbeat/module/kafka/broker_test.go index 2c824c057929..01faf62c6aab 100644 --- a/metricbeat/module/kafka/broker_test.go +++ b/metricbeat/module/kafka/broker_test.go @@ -18,14 +18,16 @@ package kafka import ( + "errors" "net" "testing" - "errors" + "github.com/stretchr/testify/require" "github.com/stretchr/testify/assert" "github.com/elastic/elastic-agent-libs/logp/logptest" + "github.com/elastic/sarama" ) type dummyNet struct{} @@ -160,3 +162,84 @@ func TestFindMatchingAddress(t *testing.T) { }) } } + +func Test_getMember(t *testing.T) { + tests := []struct { + name string + inputMemberDescr *sarama.GroupMemberDescription + + expectedErrMsg string + expectedResult MemberDescription + }{ + { + name: "success", + inputMemberDescr: &sarama.GroupMemberDescription{ + ClientId: "test-client", + ClientHost: "test-host", + MemberAssignment: []byte{0, 0, 0, 0, 0, 1, 0, 10, 116, 101, 115, 116, 45, 116, 111, 112, 105, 99, 0, 0, 0, 1, 0, 0, 0, 0, 255, 255, 255, 255}, + }, + + expectedErrMsg: "", + expectedResult: MemberDescription{ + Err: nil, + ClientID: "test-client", + ClientHost: "test-host", + Topics: map[string][]int32{ + "test-topic": {0}, + }, + }, + }, + { + name: "nil sarama GroupMemberDescription", + inputMemberDescr: nil, + + expectedErrMsg: "nil GroupMemberDescription", + expectedResult: MemberDescription{}, + }, + { + name: "0 members in the group", + inputMemberDescr: &sarama.GroupMemberDescription{ + ClientId: "test-client", + ClientHost: "test-host", + MemberAssignment: nil, + }, + + expectedErrMsg: "", + expectedResult: MemberDescription{ + Err: nil, + ClientID: "test-client", + ClientHost: "test-host", + Topics: map[string][]int32{}, + }, + }, + { + name: "ignore sarama error", + inputMemberDescr: &sarama.GroupMemberDescription{ + ClientId: "test-client", + ClientHost: "test-host", + MemberAssignment: []byte{1, 2, 3}, + }, + + expectedErrMsg: "", + expectedResult: MemberDescription{ + Err: errors.New("kafka: insufficient data to decode packet, more bytes expected"), + ClientID: "test-client", + ClientHost: "test-host", + Topics: nil, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result, err := fromSaramaGroupMemberDescription(tt.inputMemberDescr) + if tt.expectedErrMsg == "" { + require.NoError(t, err) + + assert.Equal(t, tt.expectedResult, result) + } else { + assert.Error(t, err, tt.expectedErrMsg) + } + }) + } +}