Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,7 @@ otherwise no tag is added. {issue}42208[42208] {pull}42403[42403]
- Add AWS OwningAccount support for cross account monitoring {issue}40570[40570] {pull}40691[40691]
- Use namespace for GetListMetrics when exists in AWS {pull}41022[41022]
- Only fetch cluster-level index stats summary {issue}36019[36019] {pull}42901[42901]
- Fix panic in kafka consumergroup member assignment fetching when there are 0 members in consumer group. {pull}44576[44576]
- Upgrade `go.mongodb.org/mongo-driver` from `v1.14.0` to `v1.17.4` to fix connection leaks in MongoDB module {pull}44769[44769]

*Osquerybeat*
Expand Down
42 changes: 29 additions & 13 deletions metricbeat/module/kafka/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func (b *Broker) Connect() error {
other := finder.findBroker(brokerAddress(b.broker), meta.Brokers)
if other == nil { // no broker found
closeBroker(b.broker)
return fmt.Errorf("No advertised broker with address %v found", b.Addr())
return fmt.Errorf("no advertised broker with address %v found", b.Addr())
}

b.logger.Named("kafka").Debugf("found matching broker %v with id %v", other.Addr(), other.ID())
Expand Down Expand Up @@ -247,21 +247,12 @@ func (b *Broker) DescribeGroups(

members := map[string]MemberDescription{}
for memberID, memberDescr := range descr.Members {
assignment, err := memberDescr.GetMemberAssignment()
memberDescription, err := fromSaramaGroupMemberDescription(memberDescr)
if err != nil {
members[memberID] = MemberDescription{
ClientID: memberDescr.ClientId,
ClientHost: memberDescr.ClientHost,
Err: err,
}
b.logger.Debugf("error converting member description: %v", err)
continue
}

members[memberID] = MemberDescription{
ClientID: memberDescr.ClientId,
ClientHost: memberDescr.ClientHost,
Topics: assignment.Topics,
}
members[memberID] = memberDescription
}
groups[descr.GroupId] = GroupDescription{Members: members}
}
Expand Down Expand Up @@ -542,6 +533,31 @@ func (m *brokerFinder) lookupHosts(ips []net.IP) []string {
return hosts
}

func fromSaramaGroupMemberDescription(memberDescr *sarama.GroupMemberDescription) (MemberDescription, error) {
if memberDescr == nil {
return MemberDescription{}, errors.New("nil GroupMemberDescription")
}

assignment, err := memberDescr.GetMemberAssignment()
if err != nil {
return MemberDescription{ //nolint:nilerr // in this case we should return no error and the error is reported in MemberDescription
ClientID: memberDescr.ClientId,
ClientHost: memberDescr.ClientHost,
Err: err,
}, nil
}

assignmentTopics := make(map[string][]int32)
if assignment != nil {
assignmentTopics = assignment.Topics
}
return MemberDescription{
ClientID: memberDescr.ClientId,
ClientHost: memberDescr.ClientHost,
Topics: assignmentTopics,
}, nil
}

func anyIPsMatch(as, bs []net.IP) bool {
for _, a := range as {
for _, b := range bs {
Expand Down
85 changes: 84 additions & 1 deletion metricbeat/module/kafka/broker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,16 @@
package kafka

import (
"errors"
"net"
"testing"

"errors"
"github.com/stretchr/testify/require"

"github.com/stretchr/testify/assert"

"github.com/elastic/elastic-agent-libs/logp/logptest"
"github.com/elastic/sarama"
)

type dummyNet struct{}
Expand Down Expand Up @@ -160,3 +162,84 @@ func TestFindMatchingAddress(t *testing.T) {
})
}
}

func Test_getMember(t *testing.T) {
tests := []struct {
name string
inputMemberDescr *sarama.GroupMemberDescription

expectedErrMsg string
expectedResult MemberDescription
}{
{
name: "success",
inputMemberDescr: &sarama.GroupMemberDescription{
ClientId: "test-client",
ClientHost: "test-host",
MemberAssignment: []byte{0, 0, 0, 0, 0, 1, 0, 10, 116, 101, 115, 116, 45, 116, 111, 112, 105, 99, 0, 0, 0, 1, 0, 0, 0, 0, 255, 255, 255, 255},
},

expectedErrMsg: "",
expectedResult: MemberDescription{
Err: nil,
ClientID: "test-client",
ClientHost: "test-host",
Topics: map[string][]int32{
"test-topic": {0},
},
},
},
{
name: "nil sarama GroupMemberDescription",
inputMemberDescr: nil,

expectedErrMsg: "nil GroupMemberDescription",
expectedResult: MemberDescription{},
},
{
name: "0 members in the group",
inputMemberDescr: &sarama.GroupMemberDescription{
ClientId: "test-client",
ClientHost: "test-host",
MemberAssignment: nil,
},

expectedErrMsg: "",
expectedResult: MemberDescription{
Err: nil,
ClientID: "test-client",
ClientHost: "test-host",
Topics: map[string][]int32{},
},
},
{
name: "ignore sarama error",
inputMemberDescr: &sarama.GroupMemberDescription{
ClientId: "test-client",
ClientHost: "test-host",
MemberAssignment: []byte{1, 2, 3},
},

expectedErrMsg: "",
expectedResult: MemberDescription{
Err: errors.New("kafka: insufficient data to decode packet, more bytes expected"),
ClientID: "test-client",
ClientHost: "test-host",
Topics: nil,
},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result, err := fromSaramaGroupMemberDescription(tt.inputMemberDescr)
if tt.expectedErrMsg == "" {
require.NoError(t, err)

assert.Equal(t, tt.expectedResult, result)
} else {
assert.Error(t, err, tt.expectedErrMsg)
}
})
}
}