Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Add `fingerprint` processor. {issue}11173[11173] {pull}14205[14205]
- Add support for API keys in Elasticsearch outputs. {pull}14324[14324]
- Ensure that init containers are no longer tailed after they stop {pull}14394[14394]
- Add consumer_lag in Kafka consumergroup metricset {pull}14822[14822]

*Auditbeat*

Expand Down
9 changes: 9 additions & 0 deletions metricbeat/docs/fields.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -15734,6 +15734,15 @@ type: keyword

--

*`kafka.consumergroup.consumer_lag`*::
+
--
consumer lag for partition/topic calculated as the difference between the partition offset and consumer offset

type: long

--

*`kafka.consumergroup.error.code`*::
+
--
Expand Down
29 changes: 28 additions & 1 deletion metricbeat/module/kafka/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ func Version(version string) kafka.Version {
type Broker struct {
broker *sarama.Broker
cfg *sarama.Config
client sarama.Client

advertisedAddr string
id int32
Expand Down Expand Up @@ -96,6 +97,7 @@ func NewBroker(host string, settings BrokerSettings) *Broker {
return &Broker{
broker: sarama.NewBroker(host),
cfg: cfg,
client: nil,
id: noID,
matchID: settings.MatchID,
}
Expand All @@ -104,6 +106,7 @@ func NewBroker(host string, settings BrokerSettings) *Broker {
// Close the broker connection
func (b *Broker) Close() error {
closeBroker(b.broker)
b.client.Close()
return nil
}

Expand Down Expand Up @@ -134,6 +137,13 @@ func (b *Broker) Connect() error {
debugf("found matching broker %v with id %v", other.Addr(), other.ID())
b.id = other.ID()
b.advertisedAddr = other.Addr()

c, err := getClusterWideClient(b.Addr(), b.cfg)
if err != nil {
closeBroker(b.broker)
return fmt.Errorf("Could not get cluster client for advertised broker with address %v", b.Addr())
}
b.client = c
return nil
}

Expand Down Expand Up @@ -270,7 +280,16 @@ func (b *Broker) FetchGroupOffsets(group string, partitions map[string][]int32)
return b.broker.FetchOffset(requ)
}

// ID returns the broker or -1 if the broker id is unknown.
// FetchPartitionOffsetFromTheLeader fetches the OffsetNewest from the leader.
func (b *Broker) FetchPartitionOffsetFromTheLeader(topic string, partitionID int32) (int64, error) {
offset, err := b.client.GetOffset(topic, partitionID, sarama.OffsetNewest)
if err != nil {
return -1, err
}
return offset, nil
}

// ID returns the broker ID or -1 if the broker id is unknown.
func (b *Broker) ID() int32 {
if b.id == noID {
return b.broker.ID()
Expand Down Expand Up @@ -524,6 +543,14 @@ func anyIPsMatch(as, bs []net.IP) bool {
return false
}

func getClusterWideClient(addr string, cfg *sarama.Config) (sarama.Client, error) {
client, err := sarama.NewClient([]string{addr}, cfg)
if err != nil {
return nil, err
}
return client, nil
}

func brokerAddresses(brokers []*sarama.Broker) []string {
addresses := make([]string, len(brokers))
for i, b := range brokers {
Expand Down
36 changes: 20 additions & 16 deletions metricbeat/module/kafka/consumergroup/_meta/data.json
Original file line number Diff line number Diff line change
@@ -1,45 +1,49 @@
{
"@timestamp": "2017-10-12T08:05:34.853Z",
"beat": {
"hostname": "host.example.com",
"name": "host.example.com"
"event": {
"dataset": "kafka.consumergroup",
"duration": 115000,
"module": "kafka"
},
"kafka": {
"broker": {
"address": "172.18.0.2:9092",
"address": "localhost:32768",
"id": 0
},
"consumergroup": {
"broker": {
"address": "172.18.0.2:9092",
"address": "localhost:32768",
"id": 0
},
"client": {
"host": "172.18.0.1",
"id": "sarama",
"member_id": "sarama-fcb5a5db-0474-4f3a-a5af-29e2f14549c5"
"host": "127.0.0.1",
"id": "consumer-1",
"member_id": "consumer-1-a12ac7d4-00aa-45a0-8b35-0a9c6e880bf4"
},
"consumer_lag": 1059,
"error": {
"code": 0
},
"id": "test-group",
"id": "console-consumer-50413",
"meta": "",
"offset": -1,
"partition": 0,
"topic": "metricbeat-test"
"topic": "test"
},
"partition": {
"id": 0,
"topic_id": "0-metricbeat-test"
"topic_id": "0-test"
},
"topic": {
"name": "metricbeat-test"
"name": "test"
}
},
"metricset": {
"host": "kafka:9092",
"module": "kafka",
"name": "consumergroup",
"rtt": 115
"period": 10000
},
"service": {
"address": "localhost:32768",
"type": "kafka"
}
}
}
4 changes: 4 additions & 0 deletions metricbeat/module/kafka/consumergroup/_meta/fields.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@
type: keyword
description: custom consumer meta data string

- name: consumer_lag
type: long
description: consumer lag for partition/topic calculated as the difference between the partition offset and consumer offset

- name: error.code
type: long
description: >
Expand Down
13 changes: 10 additions & 3 deletions metricbeat/module/kafka/consumergroup/mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,10 @@ import (
)

type mockClient struct {
listGroups func() ([]string, error)
describeGroups func(group []string) (map[string]kafka.GroupDescription, error)
fetchGroupOffsets func(group string) (*sarama.OffsetFetchResponse, error)
listGroups func() ([]string, error)
describeGroups func(group []string) (map[string]kafka.GroupDescription, error)
fetchGroupOffsets func(group string) (*sarama.OffsetFetchResponse, error)
getPartitionOffsetFromTheLeader func(topic string, partitionID int32) (int64, error)
}

type mockState struct {
Expand All @@ -45,6 +46,9 @@ func defaultMockClient(state mockState) *mockClient {
listGroups: makeListGroups(state),
describeGroups: makeDescribeGroups(state),
fetchGroupOffsets: makeFetchGroupOffsets(state),
getPartitionOffsetFromTheLeader: func(topic string, partitionID int32) (int64, error) {
return 42, nil
},
}
}

Expand Down Expand Up @@ -145,3 +149,6 @@ func (c *mockClient) DescribeGroups(groups []string) (map[string]kafka.GroupDesc
func (c *mockClient) FetchGroupOffsets(group string, partitions map[string][]int32) (*sarama.OffsetFetchResponse, error) {
return c.fetchGroupOffsets(group)
}
func (c *mockClient) FetchPartitionOffsetFromTheLeader(topic string, partitionID int32) (int64, error) {
return c.getPartitionOffsetFromTheLeader(topic, partitionID)
}
27 changes: 21 additions & 6 deletions metricbeat/module/kafka/consumergroup/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type client interface {
ListGroups() ([]string, error)
DescribeGroups(group []string) (map[string]kafka.GroupDescription, error)
FetchGroupOffsets(group string, partitions map[string][]int32) (*sarama.OffsetFetchResponse, error)
FetchPartitionOffsetFromTheLeader(topic string, partitionID int32) (int64, error)
}

func fetchGroupInfo(
Expand Down Expand Up @@ -113,12 +114,19 @@ func fetchGroupInfo(

for topic, partitions := range ret.off.Blocks {
for partition, info := range partitions {
partitionOffset, err := getPartitionOffsetFromTheLeader(b, topic, partition)
if err != nil {
logp.Err("failed to fetch offset for (topic, partition): ('%v', %v)", topic, partition)
continue
}
consumerLag := partitionOffset - info.Offset
event := common.MapStr{
"id": ret.group,
"topic": topic,
"partition": partition,
"offset": info.Offset,
"meta": info.Metadata,
"id": ret.group,
"topic": topic,
"partition": partition,
"offset": info.Offset,
"meta": info.Metadata,
"consumer_lag": consumerLag,
"error": common.MapStr{
"code": info.Err,
},
Expand All @@ -133,7 +141,6 @@ func fetchGroupInfo(
}
}
}

emit(event)
}
}
Expand All @@ -145,6 +152,14 @@ func fetchGroupInfo(
return err
}

func getPartitionOffsetFromTheLeader(b client, topic string, partitionID int32) (int64, error) {
offset, err := b.FetchPartitionOffsetFromTheLeader(topic, partitionID)
if err != nil {
return -1, err
}
return offset, nil
}

func listGroups(b client, filter func(string) bool) ([]string, error) {
groups, err := b.ListGroups()
if err != nil {
Expand Down
52 changes: 31 additions & 21 deletions metricbeat/module/kafka/consumergroup/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,36 +67,44 @@ func TestFetchGroupInfo(t *testing.T) {
}),
expected: []common.MapStr{
testEvent("group1", "topic1", 0, common.MapStr{
"client": clientMeta(0),
"offset": int64(10),
"client": clientMeta(0),
"offset": int64(10),
"consumer_lag": int64(42) - int64(10),
}),
testEvent("group1", "topic1", 1, common.MapStr{
"client": clientMeta(1),
"offset": int64(11),
"client": clientMeta(1),
"offset": int64(11),
"consumer_lag": int64(42) - int64(11),
}),
testEvent("group1", "topic1", 2, common.MapStr{
"client": clientMeta(0),
"offset": int64(12),
"client": clientMeta(0),
"offset": int64(12),
"consumer_lag": int64(42) - int64(12),
}),
testEvent("group1", "topic3", 0, common.MapStr{
"client": clientMeta(1),
"offset": int64(6),
"client": clientMeta(1),
"offset": int64(6),
"consumer_lag": int64(42) - int64(6),
}),
testEvent("group1", "topic3", 1, common.MapStr{
"client": clientMeta(0),
"offset": int64(7),
"client": clientMeta(0),
"offset": int64(7),
"consumer_lag": int64(42) - int64(7),
}),
testEvent("group2", "topic2", 0, common.MapStr{
"client": clientMeta(0),
"offset": int64(3),
"client": clientMeta(0),
"offset": int64(3),
"consumer_lag": int64(42) - int64(3),
}),
testEvent("group2", "topic3", 0, common.MapStr{
"client": clientMeta(0),
"offset": int64(9),
"client": clientMeta(0),
"offset": int64(9),
"consumer_lag": int64(42) - int64(9),
}),
testEvent("group2", "topic3", 1, common.MapStr{
"client": clientMeta(0),
"offset": int64(10),
"client": clientMeta(0),
"offset": int64(10),
"consumer_lag": int64(42) - int64(10),
}),
},
},
Expand Down Expand Up @@ -127,12 +135,14 @@ func TestFetchGroupInfo(t *testing.T) {
topics: []string{"topic1"},
expected: []common.MapStr{
testEvent("group1", "topic1", 0, common.MapStr{
"client": clientMeta(0),
"offset": int64(1),
"client": clientMeta(0),
"offset": int64(1),
"consumer_lag": int64(42) - int64(1),
}),
testEvent("group1", "topic1", 1, common.MapStr{
"client": clientMeta(0),
"offset": int64(2),
"client": clientMeta(0),
"offset": int64(2),
"consumer_lag": int64(42) - int64(2),
}),
},
},
Expand Down Expand Up @@ -224,7 +234,7 @@ func TestFetchGroupInfo(t *testing.T) {
for key, expected := range indexEvents(test.expected) {
event, found := indexed[key]
if !found {
t.Errorf("Missing event: %v", key)
t.Errorf("Missing key %v from events: %v", key, events)
continue
}
assertEvent(t, expected, event)
Expand Down
2 changes: 1 addition & 1 deletion metricbeat/module/kafka/fields.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.