From 595fd0a6ee1210336d8ad185780d9dedbe056ee1 Mon Sep 17 00:00:00 2001 From: urso Date: Wed, 21 Dec 2016 14:35:38 +0100 Subject: [PATCH] Add kafka consumer groups metricset - mark kafka module as experimental - add new metricset - includes unit tests only - fix kafka API version settings --- CHANGELOG.asciidoc | 1 + libbeat/outputs/kafka/kafka.go | 4 +- metricbeat/docs/fields.asciidoc | 101 +++++++ metricbeat/docs/modules/kafka.asciidoc | 6 + .../docs/modules/kafka/consumergroup.asciidoc | 19 ++ metricbeat/include/list.go | 1 + metricbeat/metricbeat.template-es2x.json | 65 +++++ metricbeat/metricbeat.template.json | 56 ++++ metricbeat/module/kafka/_meta/docs.asciidoc | 2 + metricbeat/module/kafka/broker.go | 127 ++++++++- .../kafka/consumergroup/_meta/data.json | 36 +++ .../kafka/consumergroup/_meta/docs.asciidoc | 3 + .../kafka/consumergroup/_meta/fields.yml | 62 ++++ .../module/kafka/consumergroup/config.go | 37 +++ .../kafka/consumergroup/consumergroup.go | 101 +++++++ .../module/kafka/consumergroup/mock_test.go | 130 +++++++++ .../module/kafka/consumergroup/nameset.go | 31 ++ .../module/kafka/consumergroup/query.go | 210 ++++++++++++++ .../module/kafka/consumergroup/query_test.go | 265 ++++++++++++++++++ .../module/kafka/partition/partition.go | 2 + metricbeat/module/kafka/version.go | 62 ++++ 21 files changed, 1305 insertions(+), 16 deletions(-) create mode 100644 metricbeat/docs/modules/kafka/consumergroup.asciidoc create mode 100644 metricbeat/module/kafka/consumergroup/_meta/data.json create mode 100644 metricbeat/module/kafka/consumergroup/_meta/docs.asciidoc create mode 100644 metricbeat/module/kafka/consumergroup/_meta/fields.yml create mode 100644 metricbeat/module/kafka/consumergroup/config.go create mode 100644 metricbeat/module/kafka/consumergroup/consumergroup.go create mode 100644 metricbeat/module/kafka/consumergroup/mock_test.go create mode 100644 metricbeat/module/kafka/consumergroup/nameset.go create mode 100644 metricbeat/module/kafka/consumergroup/query.go create mode 100644 metricbeat/module/kafka/consumergroup/query_test.go create mode 100644 metricbeat/module/kafka/version.go diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 13f67474c55..94d23aacef4 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -59,6 +59,7 @@ https://github.com/elastic/beats/compare/v5.1.1...master[Check the HEAD diff] - Export number of cores for cpu module. {pull}3192[3192] - Experimental Prometheus module. {pull}3202[3202] - Add system socket module that reports all TCP sockets. {pull}3246[3246] +- Kafka consumer groups metricset. {pull}3240[3240] *Packetbeat* diff --git a/libbeat/outputs/kafka/kafka.go b/libbeat/outputs/kafka/kafka.go index ca2227cccf8..69795acd872 100644 --- a/libbeat/outputs/kafka/kafka.go +++ b/libbeat/outputs/kafka/kafka.go @@ -91,7 +91,9 @@ var ( "0.10.0.0": sarama.V0_10_0_0, "0.10.0.1": sarama.V0_10_0_1, "0.10.0": sarama.V0_10_0_1, - "0.10": sarama.V0_10_0_1, + "0.10.1.0": sarama.V0_10_1_0, + "0.10.1": sarama.V0_10_1_0, + "0.10": sarama.V0_10_1_0, } ) diff --git a/metricbeat/docs/fields.asciidoc b/metricbeat/docs/fields.asciidoc index 829c5220807..ffe2c2acce7 100644 --- a/metricbeat/docs/fields.asciidoc +++ b/metricbeat/docs/fields.asciidoc @@ -2401,6 +2401,107 @@ experimental[] +[float] +== consumergroup Fields + +consumergroup + + + +[float] +== broker Fields + +Broker Consumer Group Information have been read from (Broker handling the consumer group). + + + +[float] +=== kafka.consumergroup.broker.id + +type: long + +Broker id + + +[float] +=== kafka.consumergroup.broker.address + +type: keyword + +Broker address + + +[float] +=== kafka.consumergroup.id + +type: keyword + +Consumer Group ID + +[float] +=== kafka.consumergroup.topic + +type: keyword + +Topic name + +[float] +=== kafka.consumergroup.partition + +type: long + +Partition ID + +[float] +=== kafka.consumergroup.offset + +type: long + +consumer offset into partition being read + +[float] +=== kafka.consumergroup.meta + +type: text + +custom consumer meta data string + +[float] +=== kafka.consumergroup.error.code + +type: long + +kafka consumer/partition error code. + + +[float] +== client Fields + +Assigned client reading events from partition + + + +[float] +=== kafka.consumergroup.client.id + +type: keyword + +Client ID (kafka setting client.id) + +[float] +=== kafka.consumergroup.client.host + +type: keyword + +Client host + +[float] +=== kafka.consumergroup.client.member_id + +type: keyword + +internal consumer group member ID + [float] == partition Fields diff --git a/metricbeat/docs/modules/kafka.asciidoc b/metricbeat/docs/modules/kafka.asciidoc index a8f680419ff..e606f00bae9 100644 --- a/metricbeat/docs/modules/kafka.asciidoc +++ b/metricbeat/docs/modules/kafka.asciidoc @@ -5,6 +5,8 @@ This file is generated! See scripts/docs_collector.py [[metricbeat-module-kafka]] == kafka Module +experimental[] + This is the kafka Module. @@ -51,7 +53,11 @@ metricbeat.modules: The following metricsets are available: +* <> + * <> +include::kafka/consumergroup.asciidoc[] + include::kafka/partition.asciidoc[] diff --git a/metricbeat/docs/modules/kafka/consumergroup.asciidoc b/metricbeat/docs/modules/kafka/consumergroup.asciidoc new file mode 100644 index 00000000000..5f56fcb52e8 --- /dev/null +++ b/metricbeat/docs/modules/kafka/consumergroup.asciidoc @@ -0,0 +1,19 @@ +//// +This file is generated! See scripts/docs_collector.py +//// + +[[metricbeat-metricset-kafka-consumergroup]] +include::../../../module/kafka/consumergroup/_meta/docs.asciidoc[] + + +==== Fields + +For a description of each field in the metricset, see the +<> section. + +Here is an example document generated by this metricset: + +[source,json] +---- +include::../../../module/kafka/consumergroup/_meta/data.json[] +---- diff --git a/metricbeat/include/list.go b/metricbeat/include/list.go index 0524f3c5c28..8d0782555c1 100644 --- a/metricbeat/include/list.go +++ b/metricbeat/include/list.go @@ -25,6 +25,7 @@ import ( _ "github.com/elastic/beats/metricbeat/module/haproxy/info" _ "github.com/elastic/beats/metricbeat/module/haproxy/stat" _ "github.com/elastic/beats/metricbeat/module/kafka" + _ "github.com/elastic/beats/metricbeat/module/kafka/consumergroup" _ "github.com/elastic/beats/metricbeat/module/kafka/partition" _ "github.com/elastic/beats/metricbeat/module/mongodb" _ "github.com/elastic/beats/metricbeat/module/mongodb/status" diff --git a/metricbeat/metricbeat.template-es2x.json b/metricbeat/metricbeat.template-es2x.json index 8deb3bd08b9..7eabc2993a0 100644 --- a/metricbeat/metricbeat.template-es2x.json +++ b/metricbeat/metricbeat.template-es2x.json @@ -1311,6 +1311,71 @@ }, "kafka": { "properties": { + "consumergroup": { + "properties": { + "broker": { + "properties": { + "address": { + "ignore_above": 1024, + "index": "not_analyzed", + "type": "string" + }, + "id": { + "type": "long" + } + } + }, + "client": { + "properties": { + "host": { + "ignore_above": 1024, + "index": "not_analyzed", + "type": "string" + }, + "id": { + "ignore_above": 1024, + "index": "not_analyzed", + "type": "string" + }, + "member_id": { + "ignore_above": 1024, + "index": "not_analyzed", + "type": "string" + } + } + }, + "error": { + "properties": { + "code": { + "type": "long" + } + } + }, + "id": { + "ignore_above": 1024, + "index": "not_analyzed", + "type": "string" + }, + "meta": { + "index": "analyzed", + "norms": { + "enabled": false + }, + "type": "string" + }, + "offset": { + "type": "long" + }, + "partition": { + "type": "long" + }, + "topic": { + "ignore_above": 1024, + "index": "not_analyzed", + "type": "string" + } + } + }, "partition": { "properties": { "broker": { diff --git a/metricbeat/metricbeat.template.json b/metricbeat/metricbeat.template.json index f259e88356a..01aab4c3775 100644 --- a/metricbeat/metricbeat.template.json +++ b/metricbeat/metricbeat.template.json @@ -1318,6 +1318,62 @@ }, "kafka": { "properties": { + "consumergroup": { + "properties": { + "broker": { + "properties": { + "address": { + "ignore_above": 1024, + "type": "keyword" + }, + "id": { + "type": "long" + } + } + }, + "client": { + "properties": { + "host": { + "ignore_above": 1024, + "type": "keyword" + }, + "id": { + "ignore_above": 1024, + "type": "keyword" + }, + "member_id": { + "ignore_above": 1024, + "type": "keyword" + } + } + }, + "error": { + "properties": { + "code": { + "type": "long" + } + } + }, + "id": { + "ignore_above": 1024, + "type": "keyword" + }, + "meta": { + "norms": false, + "type": "text" + }, + "offset": { + "type": "long" + }, + "partition": { + "type": "long" + }, + "topic": { + "ignore_above": 1024, + "type": "keyword" + } + } + }, "partition": { "properties": { "broker": { diff --git a/metricbeat/module/kafka/_meta/docs.asciidoc b/metricbeat/module/kafka/_meta/docs.asciidoc index 13b1f1e6730..b5127454877 100644 --- a/metricbeat/module/kafka/_meta/docs.asciidoc +++ b/metricbeat/module/kafka/_meta/docs.asciidoc @@ -1,4 +1,6 @@ == kafka Module +experimental[] + This is the kafka Module. diff --git a/metricbeat/module/kafka/broker.go b/metricbeat/module/kafka/broker.go index 5334b5931fe..4f918fefbfb 100644 --- a/metricbeat/module/kafka/broker.go +++ b/metricbeat/module/kafka/broker.go @@ -16,11 +16,12 @@ import ( // Broker provides functionality for communicating with a single kafka broker type Broker struct { - b *sarama.Broker - cfg *sarama.Config + broker *sarama.Broker + cfg *sarama.Config - id int32 - matchID bool + advertisedAddr string + id int32 + matchID bool } // BrokerSettings defines common configurations used when connecting to a broker @@ -32,6 +33,18 @@ type BrokerSettings struct { Backoff time.Duration TLS *tls.Config Username, Password string + Version Version +} + +type GroupDescription struct { + Members map[string]MemberDescription +} + +type MemberDescription struct { + Err error + ClientID string + ClientHost string + Topics map[string][]int32 } const noID = -1 @@ -53,9 +66,10 @@ func NewBroker(host string, settings BrokerSettings) *Broker { cfg.Net.SASL.User = user cfg.Net.SASL.Password = settings.Password } + cfg.Version = settings.Version.get() return &Broker{ - b: sarama.NewBroker(host), + broker: sarama.NewBroker(host), cfg: cfg, id: noID, matchID: settings.MatchID, @@ -64,13 +78,13 @@ func NewBroker(host string, settings BrokerSettings) *Broker { // Close the broker connection func (b *Broker) Close() error { - closeBroker(b.b) + closeBroker(b.broker) return nil } // Connect connects the broker to the configured host func (b *Broker) Connect() error { - if err := b.b.Open(b.cfg); err != nil { + if err := b.broker.Open(b.cfg); err != nil { return err } @@ -79,31 +93,38 @@ func (b *Broker) Connect() error { } // current broker is bootstrap only. Get metadata to find id: - meta, err := queryMetadataWithRetry(b.b, b.cfg, nil) + meta, err := queryMetadataWithRetry(b.broker, b.cfg, nil) if err != nil { - closeBroker(b.b) + closeBroker(b.broker) return err } - other := findMatchingBroker(brokerAddress(b.b), meta.Brokers) + other := findMatchingBroker(brokerAddress(b.broker), meta.Brokers) if other == nil { // no broker found - closeBroker(b.b) + closeBroker(b.broker) return fmt.Errorf("No advertised broker with address %v found", b.Addr()) } debugf("found matching broker %v with id %v", other.Addr(), other.ID()) b.id = other.ID() + b.advertisedAddr = other.Addr() return nil } // Addr returns the configured broker endpoint. func (b *Broker) Addr() string { - return b.b.Addr() + return b.broker.Addr() +} + +// AdvertisedAddr returns the advertised broker address in case of +// matching broker has been found. +func (b *Broker) AdvertisedAddr() string { + return b.advertisedAddr } // GetMetadata fetches most recent cluster metadata from the broker. func (b *Broker) GetMetadata(topics ...string) (*sarama.MetadataResponse, error) { - return queryMetadataWithRetry(b.b, b.cfg, topics) + return queryMetadataWithRetry(b.broker, b.cfg, topics) } // GetTopicsMetadata fetches most recent topics/partition metadata from the broker. @@ -127,7 +148,7 @@ func (b *Broker) PartitionOffset( req.SetReplicaID(replicaID) } req.AddBlock(topic, partition, time, 1) - resp, err := b.b.GetAvailableOffsets(req) + resp, err := b.broker.GetAvailableOffsets(req) if err != nil { return -1, err } @@ -140,10 +161,86 @@ func (b *Broker) PartitionOffset( return block.Offsets[0], nil } +// ListGroups lists all groups managed by the broker. Other consumer +// groups might be managed by other brokers. +func (b *Broker) ListGroups() ([]string, error) { + resp, err := b.broker.ListGroups(&sarama.ListGroupsRequest{}) + if err != nil { + return nil, err + } + + if resp.Err != sarama.ErrNoError { + return nil, resp.Err + } + + if len(resp.Groups) == 0 { + return nil, nil + } + + groups := make([]string, 0, len(resp.Groups)) + for name := range resp.Groups { + groups = append(groups, name) + } + return groups, nil +} + +// DescribeGroups fetches group details from broker. +func (b *Broker) DescribeGroups( + queryGroups []string, +) (map[string]GroupDescription, error) { + requ := &sarama.DescribeGroupsRequest{Groups: queryGroups} + resp, err := b.broker.DescribeGroups(requ) + if err != nil { + return nil, err + } + + if len(resp.Groups) == 0 { + return nil, nil + } + + groups := map[string]GroupDescription{} + for _, descr := range resp.Groups { + if len(descr.Members) == 0 { + groups[descr.GroupId] = GroupDescription{} + continue + } + + members := map[string]MemberDescription{} + for memberID, memberDescr := range descr.Members { + assignment, err := memberDescr.GetMemberAssignment() + if err != nil { + members[memberID] = MemberDescription{ + ClientID: memberDescr.ClientId, + ClientHost: memberDescr.ClientHost, + Err: err, + } + continue + } + + members[memberID] = MemberDescription{ + ClientID: memberDescr.ClientId, + ClientHost: memberDescr.ClientHost, + Topics: assignment.Topics, + } + } + groups[descr.GroupId] = GroupDescription{Members: members} + } + + return groups, nil +} + +func (b *Broker) FetchGroupOffsets(group string) (*sarama.OffsetFetchResponse, error) { + requ := &sarama.OffsetFetchRequest{ + ConsumerGroup: group, + Version: 1, + } + return b.broker.FetchOffset(requ) +} + // ID returns the broker or -1 if the broker id is unknown. func (b *Broker) ID() int32 { if b.id == noID { - return b.b.ID() + return b.broker.ID() } return b.id } diff --git a/metricbeat/module/kafka/consumergroup/_meta/data.json b/metricbeat/module/kafka/consumergroup/_meta/data.json new file mode 100644 index 00000000000..cf7dcd10f60 --- /dev/null +++ b/metricbeat/module/kafka/consumergroup/_meta/data.json @@ -0,0 +1,36 @@ +{ + "@timestamp":"2016-05-23T08:05:34.853Z", + "type":"metricsets", + "beat":{ + "hostname":"localhost", + "name":"localhost", + "version": "6.0.0-alpha1" + }, + "metricset":{ + "host":"localhost", + "module":"kafka", + "name":"consumergroup", + "rtt":269 + }, + "kafka":{ + "consumergroup":{ + "id": "group", + "topic": "test", + "partition": 0, + "client": { + "host": "127.0.0.1", + "id": "client0", + "member_id": "client0-d20b677a-5740-433e-a7f8-fbdab1f0f150" + }, + "broker": { + "address": "kafka0:9092", + "id": 0 + }, + "error": { + "code": 0 + }, + "meta": "", + "offset": 0 + } + } +} diff --git a/metricbeat/module/kafka/consumergroup/_meta/docs.asciidoc b/metricbeat/module/kafka/consumergroup/_meta/docs.asciidoc new file mode 100644 index 00000000000..fcc5a6d353e --- /dev/null +++ b/metricbeat/module/kafka/consumergroup/_meta/docs.asciidoc @@ -0,0 +1,3 @@ +=== kafka consumergroup MetricSet + +This is the consumergroup metricset of the module kafka. diff --git a/metricbeat/module/kafka/consumergroup/_meta/fields.yml b/metricbeat/module/kafka/consumergroup/_meta/fields.yml new file mode 100644 index 00000000000..a69a7b8cd71 --- /dev/null +++ b/metricbeat/module/kafka/consumergroup/_meta/fields.yml @@ -0,0 +1,62 @@ +- name: consumergroup + type: group + description: > + consumergroup + fields: + - name: broker + type: group + description: > + Broker Consumer Group Information have been read from (Broker handling + the consumer group). + fields: + - name: id + type: long + description: > + Broker id + + - name: address + type: keyword + description: > + Broker address + + - name: id + type: keyword + description: Consumer Group ID + + - name: topic + type: keyword + description: Topic name + + - name: partition + type: long + description: Partition ID + + - name: offset + type: long + description: consumer offset into partition being read + + - name: meta + type: text + description: custom consumer meta data string + + - name: error.code + type: long + description: > + kafka consumer/partition error code. + + - name: client + type: group + description: > + Assigned client reading events from partition + fields: + - name: id + type: keyword + description: Client ID (kafka setting client.id) + + - name: host + type: keyword + description: Client host + + - name: member_id + type: keyword + description: internal consumer group member ID diff --git a/metricbeat/module/kafka/consumergroup/config.go b/metricbeat/module/kafka/consumergroup/config.go new file mode 100644 index 00000000000..b06710d279a --- /dev/null +++ b/metricbeat/module/kafka/consumergroup/config.go @@ -0,0 +1,37 @@ +package consumergroup + +import ( + "fmt" + "time" + + "github.com/elastic/beats/libbeat/outputs" +) + +type metricsetConfig struct { + Retries int `config:"retries" validate:"min=0"` + Backoff time.Duration `config:"backoff" validate:"min=0"` + TLS *outputs.TLSConfig `config:"ssl"` + Username string `config:"username"` + Password string `config:"password"` + ClientID string `config:"client_id"` + + Groups []string `config:"groups"` + Topics []string `config:"topics"` +} + +var defaultConfig = metricsetConfig{ + Retries: 3, + Backoff: 250 * time.Millisecond, + TLS: nil, + Username: "", + Password: "", + ClientID: "metricbeat", +} + +func (c *metricsetConfig) Validate() error { + if c.Username != "" && c.Password == "" { + return fmt.Errorf("password must be set when username is configured") + } + + return nil +} diff --git a/metricbeat/module/kafka/consumergroup/consumergroup.go b/metricbeat/module/kafka/consumergroup/consumergroup.go new file mode 100644 index 00000000000..3fcb81a8546 --- /dev/null +++ b/metricbeat/module/kafka/consumergroup/consumergroup.go @@ -0,0 +1,101 @@ +package consumergroup + +import ( + "crypto/tls" + + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/logp" + "github.com/elastic/beats/libbeat/outputs" + "github.com/elastic/beats/metricbeat/mb" + "github.com/elastic/beats/metricbeat/module/kafka" +) + +// init registers the MetricSet with the central registry. +func init() { + if err := mb.Registry.AddMetricSet("kafka", "consumergroup", New); err != nil { + panic(err) + } +} + +// MetricSet type defines all fields of the MetricSet +type MetricSet struct { + mb.BaseMetricSet + + broker *kafka.Broker + topics nameSet + groups nameSet +} + +type groupAssignment struct { + clientID string + memberID string + clientHost string +} + +var debugf = logp.MakeDebug("kafka") + +// New creates a new instance of the MetricSet. +func New(base mb.BaseMetricSet) (mb.MetricSet, error) { + logp.Warn("EXPERIMENTAL: The kafka consumergroup metricset is experimental") + + config := defaultConfig + if err := base.Module().UnpackConfig(&config); err != nil { + return nil, err + } + + var tls *tls.Config + tlsCfg, err := outputs.LoadTLSConfig(config.TLS) + if err != nil { + return nil, err + } + if tlsCfg != nil { + tls = tlsCfg.BuildModuleConfig("") + } + + timeout := base.Module().Config().Timeout + + cfg := kafka.BrokerSettings{ + MatchID: true, + DialTimeout: timeout, + ReadTimeout: timeout, + ClientID: config.ClientID, + Retries: config.Retries, + Backoff: config.Backoff, + TLS: tls, + Username: config.Username, + Password: config.Password, + + // consumer groups API requires at least 0.9.0.0 + Version: kafka.Version{"0.9.0.0"}, + } + + return &MetricSet{ + BaseMetricSet: base, + broker: kafka.NewBroker(base.Host(), cfg), + groups: makeNameSet(config.Groups...), + topics: makeNameSet(config.Topics...), + }, nil +} + +func (m *MetricSet) Fetch() ([]common.MapStr, error) { + if err := m.broker.Connect(); err != nil { + logp.Err("broker connect failed: %v", err) + return nil, err + } + + b := m.broker + defer b.Close() + + brokerInfo := common.MapStr{ + "id": b.ID(), + "address": b.AdvertisedAddr(), + } + + var events []common.MapStr + emitEvent := func(event common.MapStr) { + event["broker"] = brokerInfo + events = append(events, event) + } + err := fetchGroupInfo(emitEvent, b, m.groups.pred(), m.topics.pred()) + return events, err +} diff --git a/metricbeat/module/kafka/consumergroup/mock_test.go b/metricbeat/module/kafka/consumergroup/mock_test.go new file mode 100644 index 00000000000..238af427149 --- /dev/null +++ b/metricbeat/module/kafka/consumergroup/mock_test.go @@ -0,0 +1,130 @@ +package consumergroup + +import ( + "fmt" + "math/rand" + + "github.com/Shopify/sarama" + + "github.com/elastic/beats/metricbeat/module/kafka" +) + +type mockClient struct { + listGroups func() ([]string, error) + describeGroups func(group []string) (map[string]kafka.GroupDescription, error) + fetchGroupOffsets func(group string) (*sarama.OffsetFetchResponse, error) +} + +type mockState struct { + // group -> topics -> partitions -> offset + partitions map[string]map[string][]int64 // topics with group partition offsets + + // groups->client->topic->partitions ids + groups map[string][]map[string][]int32 // group/client assignments to topics and partition IDs +} + +func defaultMockClient(state mockState) *mockClient { + return &mockClient{ + listGroups: makeListGroups(state), + describeGroups: makeDescribeGroups(state), + fetchGroupOffsets: makeFetchGroupOffsets(state), + } +} + +func (c *mockClient) with(fn func(*mockClient)) *mockClient { + fn(c) + return c +} + +func makeListGroups(state mockState) func() ([]string, error) { + names := make([]string, 0, len(state.groups)) + for name := range state.groups { + names = append(names, name) + } + + return func() ([]string, error) { + return names, nil + } +} + +func makeDescribeGroups( + state mockState, +) func([]string) (map[string]kafka.GroupDescription, error) { + groups := map[string]kafka.GroupDescription{} + for name, st := range state.groups { + members := map[string]kafka.MemberDescription{} + for i, member := range st { + clientID := fmt.Sprintf("consumer-%v", i) + memberID := fmt.Sprintf("%v-%v", clientID, rand.Int()) + members[memberID] = kafka.MemberDescription{ + ClientID: clientID, + ClientHost: "/" + clientID, + Topics: member, + } + } + groups[name] = kafka.GroupDescription{Members: members} + } + + return func(group []string) (map[string]kafka.GroupDescription, error) { + ret := map[string]kafka.GroupDescription{} + for _, name := range group { + if g, found := groups[name]; found { + ret[name] = g + } + } + + if len(ret) == 0 { + ret = nil + } + return ret, nil + } +} + +func makeDescribeGroupsFail( + err error, +) func([]string) (map[string]kafka.GroupDescription, error) { + return func(_ []string) (map[string]kafka.GroupDescription, error) { + return nil, err + } +} + +func makeFetchGroupOffsets( + state mockState, +) func(group string) (*sarama.OffsetFetchResponse, error) { + return func(group string) (*sarama.OffsetFetchResponse, error) { + topics := state.partitions[group] + if topics == nil { + return &sarama.OffsetFetchResponse{}, nil + } + + blocks := map[string]map[int32]*sarama.OffsetFetchResponseBlock{} + for topic, partition := range topics { + T := map[int32]*sarama.OffsetFetchResponseBlock{} + blocks[topic] = T + + for i, offset := range partition { + T[int32(i)] = &sarama.OffsetFetchResponseBlock{ + Offset: int64(offset), + } + } + } + + return &sarama.OffsetFetchResponse{Blocks: blocks}, nil + } +} + +func makeFetchGroupOffsetsFail( + err error, +) func(string) (*sarama.OffsetFetchResponse, error) { + return func(_ string) (*sarama.OffsetFetchResponse, error) { + return nil, err + } +} + +func (c *mockClient) ListGroups() ([]string, error) { return c.listGroups() } +func (c *mockClient) DescribeGroups(groups []string) (map[string]kafka.GroupDescription, error) { + return c.describeGroups(groups) +} +func (c *mockClient) FetchGroupOffsets(group string) (*sarama.OffsetFetchResponse, error) { + return c.fetchGroupOffsets(group) +} diff --git a/metricbeat/module/kafka/consumergroup/nameset.go b/metricbeat/module/kafka/consumergroup/nameset.go new file mode 100644 index 00000000000..53435766362 --- /dev/null +++ b/metricbeat/module/kafka/consumergroup/nameset.go @@ -0,0 +1,31 @@ +package consumergroup + +type nameSet map[string]struct{} + +func makeNameSet(strings ...string) nameSet { + if len(strings) == 0 { + return nil + } + + set := nameSet{} + for _, s := range strings { + set[s] = struct{}{} + } + return set +} + +func (s nameSet) has(name string) bool { + if s == nil { + return true + } + + _, ok := s[name] + return ok +} + +func (s nameSet) pred() func(string) bool { + if s == nil || len(s) == 0 { + return nil + } + return s.has +} diff --git a/metricbeat/module/kafka/consumergroup/query.go b/metricbeat/module/kafka/consumergroup/query.go new file mode 100644 index 00000000000..f13277f3936 --- /dev/null +++ b/metricbeat/module/kafka/consumergroup/query.go @@ -0,0 +1,210 @@ +package consumergroup + +import ( + "sync" + + "github.com/Shopify/sarama" + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/logp" + "github.com/elastic/beats/metricbeat/module/kafka" +) + +type client interface { + ListGroups() ([]string, error) + DescribeGroups(group []string) (map[string]kafka.GroupDescription, error) + FetchGroupOffsets(group string) (*sarama.OffsetFetchResponse, error) +} + +func fetchGroupInfo( + emit func(common.MapStr), + b client, + groupsFilter, topicsFilter func(string) bool, +) error { + type result struct { + err error + group string + off *sarama.OffsetFetchResponse + } + + groups, err := listGroups(b, groupsFilter) + if err != nil { + logp.Err("failed to list known kafka groups: %v", err) + return err + } + if len(groups) == 0 { + return nil + } + + debugf("known consumer groups: ", groups) + + wg := sync.WaitGroup{} + results := make(chan result, len(groups)) + for _, group := range groups { + group := group + + wg.Add(1) + go func() { + defer wg.Done() + resp, err := fetchGroupOffset(b, group, topicsFilter) + if err != nil { + logp.Err("failed to fetch '%v' group offset: %v", group, err) + } + results <- result{err, group, resp} + }() + } + + go func() { + wg.Wait() + close(results) + }() + + assignments, err := fetchGroupAssignments(b, groups) + if err != nil { + // wait for workers to stop and drop results + for range results { + } + + return err + } + + for ret := range results { + if err := ret.err; err != nil { + // wait for workers to stop and drop results + for range results { + } + return err + } + + asgnGroup := assignments[ret.group] + for topic, partitions := range ret.off.Blocks { + var asgnTopic map[int32]groupAssignment + if asgnGroup != nil { + asgnTopic = asgnGroup[topic] + } + + for partition, info := range partitions { + event := common.MapStr{ + "id": ret.group, + "topic": topic, + "partition": partition, + "offset": info.Offset, + "meta": info.Metadata, + "error": common.MapStr{ + "code": info.Err, + }, + } + + if asgnTopic != nil { + if assignment, found := asgnTopic[partition]; found { + event["client"] = common.MapStr{ + "id": assignment.clientID, + "host": assignment.clientHost, + "member_id": assignment.memberID, + } + } + } + + emit(event) + } + } + } + + return nil +} + +func listGroups(b client, filter func(string) bool) ([]string, error) { + groups, err := b.ListGroups() + if err != nil { + return nil, err + } + + if filter == nil { + return groups, nil + } + + filtered := groups[:0] + for _, name := range groups { + if filter(name) { + filtered = append(filtered, name) + } + } + return filtered, nil +} + +func fetchGroupAssignments( + b client, + groupIDs []string, +) (map[string]map[string]map[int32]groupAssignment, error) { + resp, err := b.DescribeGroups(groupIDs) + if err != nil { + return nil, err + } + + groups := map[string]map[string]map[int32]groupAssignment{} + +groupLoop: + for groupID, info := range resp { + G := groups[groupID] + if G == nil { + G = map[string]map[int32]groupAssignment{} + groups[groupID] = G + } + + for memberID, memberDescr := range info.Members { + if memberDescr.Err != nil { + // group doesn't seem to use standardized member assignment encoding + // => try next group + continue groupLoop + } + + clientID := memberDescr.ClientID + clientHost := memberDescr.ClientHost + if len(clientHost) > 1 && clientHost[0] == '/' { + clientHost = clientHost[1:] + } + + meta := groupAssignment{ + memberID: memberID, + clientID: clientID, + clientHost: clientHost, + } + + for topic, partitions := range memberDescr.Topics { + T := G[topic] + if T == nil { + T = map[int32]groupAssignment{} + G[topic] = T + } + + for _, partition := range partitions { + T[partition] = meta + } + } + } + } + + return groups, nil +} + +func fetchGroupOffset( + b client, + group string, + topics func(string) bool, +) (*sarama.OffsetFetchResponse, error) { + resp, err := b.FetchGroupOffsets(group) + if err != nil { + return nil, err + } + + if topics == nil { + return resp, err + } + + for topic := range resp.Blocks { + if !topics(topic) { + delete(resp.Blocks, topic) + } + } + + return resp, nil +} diff --git a/metricbeat/module/kafka/consumergroup/query_test.go b/metricbeat/module/kafka/consumergroup/query_test.go new file mode 100644 index 00000000000..ce65e39fb08 --- /dev/null +++ b/metricbeat/module/kafka/consumergroup/query_test.go @@ -0,0 +1,265 @@ +package consumergroup + +import ( + "fmt" + "io" + "reflect" + "testing" + + "github.com/elastic/beats/libbeat/common" + "github.com/stretchr/testify/assert" +) + +func TestFetchGroupInfo(t *testing.T) { + noEvents := func(events []common.MapStr) { + assert.Len(t, events, 0) + } + + tests := []struct { + name string + client client + groups []string + topics []string + err error + expected []common.MapStr + validate func([]common.MapStr) + }{ + { + name: "Test all groups", + client: defaultMockClient(mockState{ + partitions: map[string]map[string][]int64{ + "group1": { + "topic1": {10, 11, 12}, + "topic3": {6, 7}, + }, + "group2": { + "topic2": {3}, + "topic3": {9, 10}, + }, + }, + groups: map[string][]map[string][]int32{ + "group1": { + {"topic1": {0, 2}, "topic3": {1}}, + {"topic1": {1}, "topic3": {0}}, + }, + "group2": { + {"topic2": {0}, "topic3": {0, 1}}, + }, + }, + }), + expected: []common.MapStr{ + testEvent("group1", "topic1", 0, common.MapStr{ + "client": clientMeta(0), + "offset": int64(10), + }), + testEvent("group1", "topic1", 1, common.MapStr{ + "client": clientMeta(1), + "offset": int64(11), + }), + testEvent("group1", "topic1", 2, common.MapStr{ + "client": clientMeta(0), + "offset": int64(12), + }), + testEvent("group1", "topic3", 0, common.MapStr{ + "client": clientMeta(1), + "offset": int64(6), + }), + testEvent("group1", "topic3", 1, common.MapStr{ + "client": clientMeta(0), + "offset": int64(7), + }), + testEvent("group2", "topic2", 0, common.MapStr{ + "client": clientMeta(0), + "offset": int64(3), + }), + testEvent("group2", "topic3", 0, common.MapStr{ + "client": clientMeta(0), + "offset": int64(9), + }), + testEvent("group2", "topic3", 1, common.MapStr{ + "client": clientMeta(0), + "offset": int64(10), + }), + }, + }, + + { + name: "filter topics and groups", + client: defaultMockClient(mockState{ + partitions: map[string]map[string][]int64{ + "group1": { + "topic1": {1, 2}, + "topic2": {3, 4}, + }, + "group2": { + "topic2": {5, 6}, + "topic3": {7, 8}, + }, + }, + groups: map[string][]map[string][]int32{ + "group1": { + {"topic1": {0, 1}, "topic2": {0, 1}}, + }, + "group2": { + {"topic1": {0, 1}, "topic2": {0, 1}}, + }, + }, + }), + groups: []string{"group1"}, + topics: []string{"topic1"}, + expected: []common.MapStr{ + testEvent("group1", "topic1", 0, common.MapStr{ + "client": clientMeta(0), + "offset": int64(1), + }), + testEvent("group1", "topic1", 1, common.MapStr{ + "client": clientMeta(0), + "offset": int64(2), + }), + }, + }, + + { + name: "no events on empty group", + client: defaultMockClient(mockState{}), + validate: noEvents, + }, + + { + name: "fail to list groups", + client: defaultMockClient(mockState{}).with(func(c *mockClient) { + c.listGroups = func() ([]string, error) { + return nil, io.EOF + } + }), + err: io.EOF, + validate: noEvents, + }, + + { + name: "fail if assignment query fails", + client: defaultMockClient(mockState{ + partitions: map[string]map[string][]int64{ + "group1": {"topic1": {1}}, + }, + groups: map[string][]map[string][]int32{ + "group1": {{"topic1": {0}}}, + }, + }).with(func(c *mockClient) { + c.describeGroups = makeDescribeGroupsFail(io.EOF) + }), + err: io.EOF, + validate: noEvents, + }, + + { + name: "fail when fetching group offsets", + client: defaultMockClient(mockState{ + partitions: map[string]map[string][]int64{ + "group1": {"topic1": {1}}, + }, + groups: map[string][]map[string][]int32{ + "group1": {{"topic1": {0}}}, + }, + }).with(func(c *mockClient) { + c.fetchGroupOffsets = makeFetchGroupOffsetsFail(io.EOF) + }), + err: io.EOF, + validate: noEvents, + }, + } + + for i, test := range tests { + t.Logf("run test (%v): %v", i, test.name) + + var events []common.MapStr + collectEvents := func(event common.MapStr) { + t.Logf("new event: %v", event) + events = append(events, event) + } + + indexEvents := func(events []common.MapStr) map[string]common.MapStr { + index := map[string]common.MapStr{} + for _, e := range events { + key := fmt.Sprintf("%v::%v::%v", + e["id"], e["topic"], e["partition"], + ) + index[key] = e + } + return index + } + + groups := makeNameSet(test.groups...).pred() + topics := makeNameSet(test.topics...).pred() + err := fetchGroupInfo(collectEvents, test.client, groups, topics) + if err != nil { + switch { + case test.err == nil: + t.Fatal(err) + case test.err != err: + t.Error(err) + } + continue + } + + indexed := indexEvents(events) + for key, expected := range indexEvents(test.expected) { + event, found := indexed[key] + if !found { + t.Errorf("Missing event: %v", key) + continue + } + assertEvent(t, expected, event) + } + + if test.validate != nil { + test.validate(events) + } + } +} + +func assertEvent(t *testing.T, expected, event common.MapStr) { + for field, exp := range expected { + val, found := event[field] + if !found { + t.Errorf("Missing field: %v", field) + continue + } + + if sub, ok := exp.(common.MapStr); ok { + assertEvent(t, sub, val.(common.MapStr)) + } else { + if !assert.Equal(t, exp, val) { + t.Logf("failed in field: %v", field) + t.Logf("type expected: %v", reflect.TypeOf(exp)) + t.Logf("type event: %v", reflect.TypeOf(val)) + t.Logf("------------------------------") + } + } + } +} + +func testEvent( + group, topic string, + partition int, + fields ...common.MapStr, +) common.MapStr { + event := common.MapStr{ + "id": group, + "topic": topic, + "partition": int32(partition), + } + + for _, extra := range fields { + for k, v := range extra { + event[k] = v + } + } + return event +} + +func clientMeta(id int) common.MapStr { + return common.MapStr{ + "id": fmt.Sprintf("consumer-%v", id), + } +} diff --git a/metricbeat/module/kafka/partition/partition.go b/metricbeat/module/kafka/partition/partition.go index 6637f4d11c5..a29b36547a9 100644 --- a/metricbeat/module/kafka/partition/partition.go +++ b/metricbeat/module/kafka/partition/partition.go @@ -37,6 +37,8 @@ var debugf = logp.MakeDebug("kafka") // New creates a new instance of the partition MetricSet. func New(base mb.BaseMetricSet) (mb.MetricSet, error) { + logp.Warn("EXPERIMENTAL: The kafka partition metricset is experimental") + config := defaultConfig if err := base.Module().UnpackConfig(&config); err != nil { return nil, err diff --git a/metricbeat/module/kafka/version.go b/metricbeat/module/kafka/version.go new file mode 100644 index 00000000000..16cc94442b4 --- /dev/null +++ b/metricbeat/module/kafka/version.go @@ -0,0 +1,62 @@ +package kafka + +import ( + "fmt" + + "github.com/Shopify/sarama" +) + +type Version struct { + String string +} + +var ( + minVersion = sarama.V0_8_2_0 + + kafkaVersions = map[string]sarama.KafkaVersion{ + "": sarama.V0_8_2_0, + + "0.8.2.0": sarama.V0_8_2_0, + "0.8.2.1": sarama.V0_8_2_1, + "0.8.2.2": sarama.V0_8_2_2, + "0.8.2": sarama.V0_8_2_2, + "0.8": sarama.V0_8_2_2, + + "0.9.0.0": sarama.V0_9_0_0, + "0.9.0.1": sarama.V0_9_0_1, + "0.9.0": sarama.V0_9_0_1, + "0.9": sarama.V0_9_0_1, + + "0.10.0.0": sarama.V0_10_0_0, + "0.10.0.1": sarama.V0_10_0_1, + "0.10.0": sarama.V0_10_0_1, + "0.10.1.0": sarama.V0_10_1_0, + "0.10.1": sarama.V0_10_1_0, + "0.10": sarama.V0_10_1_0, + } +) + +func (v *Version) Validate() error { + if _, ok := kafkaVersions[v.String]; !ok { + return fmt.Errorf("unknown/unsupported kafka vesion '%v'", v.String) + } + return nil +} + +func (v *Version) Unpack(s string) error { + tmp := Version{s} + if err := tmp.Validate(); err != nil { + return err + } + + *v = tmp + return nil +} + +func (v *Version) get() sarama.KafkaVersion { + if v, ok := kafkaVersions[v.String]; ok { + return v + } + + return minVersion +}