From 0f680022f1b949c04cdaca8b87cb8cd92bc3c2f8 Mon Sep 17 00:00:00 2001 From: Kun Zhao Date: Wed, 23 Nov 2022 15:50:00 +0800 Subject: [PATCH 1/9] add kafka topic consumer plugin --- plugins/inputs/kafka_topic_consumer/README.md | 108 +++++++ .../kafka_topic_consumer.go | 293 ++++++++++++++++++ 2 files changed, 401 insertions(+) create mode 100644 plugins/inputs/kafka_topic_consumer/README.md create mode 100644 plugins/inputs/kafka_topic_consumer/kafka_topic_consumer.go diff --git a/plugins/inputs/kafka_topic_consumer/README.md b/plugins/inputs/kafka_topic_consumer/README.md new file mode 100644 index 0000000000000..42185738cc792 --- /dev/null +++ b/plugins/inputs/kafka_topic_consumer/README.md @@ -0,0 +1,108 @@ +# Kafka Topic Consumer Input Plugin + +The [Kafka][kafka] topic consumer plugin is an alternative burrow plugin. + +It fetches consumer group offset and lag information from the kafka instead of the burrow. + +Support the kafka's verson must greater than 0.10.2. + +## Configuration + +```toml @sample.conf +# Read metrics from Kafka topics +[[inputs.kafka_topic_consumer]] + ## Kafka brokers. + brokers = ["localhost:9092"] + + ## Topics interested in offset and lag + ## (regexp, default is ".*" represents all topics, ) + topics = .* + + ## ConsumerGroups interested in offset and lag + ## (regexp, default is ".*" represents all consumer groups) + consumer_groups = .* + + ## Optional Client id + # client_id = "Telegraf" + + ## Set the minimal supported Kafka version. Setting this enables the use of new + ## Kafka features and APIs. Must be 0.10.2.0 or greater. + ## ex: version = "1.1.0" + # version = "" + + ## Optional TLS Config + # tls_ca = "/etc/telegraf/ca.pem" + # tls_cert = "/etc/telegraf/cert.pem" + # tls_key = "/etc/telegraf/key.pem" + ## Use TLS but skip chain & host verification + # insecure_skip_verify = false + + ## SASL authentication credentials. These settings should typically be used + ## with TLS encryption enabled + # sasl_username = "kafka" + # sasl_password = "secret" + + ## Optional SASL: + ## one of: OAUTHBEARER, PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, GSSAPI + ## (defaults to PLAIN) + # sasl_mechanism = "" + + ## used if sasl_mechanism is GSSAPI (experimental) + # sasl_gssapi_service_name = "" + # ## One of: KRB5_USER_AUTH and KRB5_KEYTAB_AUTH + # sasl_gssapi_auth_type = "KRB5_USER_AUTH" + # sasl_gssapi_kerberos_config_path = "/" + # sasl_gssapi_realm = "realm" + # sasl_gssapi_key_tab_path = "" + # sasl_gssapi_disable_pafxfast = false + + ## used if sasl_mechanism is OAUTHBEARER (experimental) + # sasl_access_token = "" + + ## SASL protocol version. When connecting to Azure EventHub set to 0. + # sasl_version = 1 + + # Disable Kafka metadata full fetch + # metadata_full = false + + ## Name of the consumer group. + # consumer_group = "telegraf_metrics_consumers" + + ## Compression codec represents the various compression codecs recognized by + ## Kafka in messages. + ## 0 : None + ## 1 : Gzip + ## 2 : Snappy + ## 3 : LZ4 + ## 4 : ZSTD + # compression_codec = 0 + + + ## Consumer group partition assignment strategy; one of "range", "roundrobin" or "sticky". + # balance_strategy = "range" + + ## Maximum length of a message to consume, in bytes (default 0/unlimited); + ## larger messages are dropped + max_message_len = 1000000 + + ## Maximum messages to read from the broker that have not been written by an + ## output. For best throughput set based on the number of metrics within + ## each message and the size of the output's metric_batch_size. + ## + ## For example, if each message from the queue contains 10 metrics and the + ## output metric_batch_size is 1000, setting this to 100 will ensure that a + ## full batch is collected and the write is triggered immediately without + ## waiting until the next flush_interval. + # max_undelivered_messages = 1000 + + ## Maximum amount of time the consumer should take to process messages. If + ## the debug log prints messages from sarama about 'abandoning subscription + ## to [topic] because consuming was taking too long', increase this value to + ## longer than the time taken by the output plugin(s). + ## + ## Note that the effective timeout could be between 'max_processing_time' and + ## '2 * max_processing_time'. + # max_processing_time = "100ms" +``` + +[kafka]: https://kafka.apache.org \ No newline at end of file diff --git a/plugins/inputs/kafka_topic_consumer/kafka_topic_consumer.go b/plugins/inputs/kafka_topic_consumer/kafka_topic_consumer.go new file mode 100644 index 0000000000000..f133e37b991f7 --- /dev/null +++ b/plugins/inputs/kafka_topic_consumer/kafka_topic_consumer.go @@ -0,0 +1,293 @@ +//go:generate ../../../tools/readme_config_includer/generator +package kafka_topic_consumer + +import ( + _ "embed" + "regexp" + "strconv" + "sync" + "time" + + "github.com/Shopify/sarama" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/config" + "github.com/influxdata/telegraf/plugins/common/kafka" + "github.com/influxdata/telegraf/plugins/inputs" +) + +// DO NOT REMOVE THE NEXT TWO LINES! This is required to embed the sampleConfig data. +//go:embed sample.conf +var sampleConfig string + +const ( + defaultMaxUndeliveredMessages = 1000 + defaultMaxProcessingTime = config.Duration(100 * time.Millisecond) + defaultConsumerGroup = "telegraf_metrics_consumers" + reconnectDelay = 5 * time.Second +) + +type empty struct{} +type semaphore chan empty + +type kafkaTopicConsumer struct { + Brokers []string `toml:"brokers"` + ConsumerGroups string `toml:"consumer_groups"` + Topics string `toml:"topics"` + TopicTag string `toml:"topic_tag"` + Log telegraf.Logger `toml:"-"` + + kafka.ReadConfig + config *sarama.Config `toml:"-"` + + client sarama.Client `toml:"-"` + + topicFilter *regexp.Regexp `toml:"-"` + groupFilter *regexp.Regexp `toml:"-"` +} + +type groupLag struct { + groupID string + topic string + lagSum int64 + offsetSum int64 + items []groupLagItem +} + +type groupLagItem struct { + partition int32 + lag int64 + offset int64 +} + +func init() { + inputs.Add("kafka_topic_consumer", func() telegraf.Input { + return &kafkaTopicConsumer{} + }) +} + +// Gather takes in an accumulator and adds the metrics that the Input +// gathers. This is called every agent.interval +func (k *kafkaTopicConsumer) Gather(acc telegraf.Accumulator) error { + var err error + var wg = sync.WaitGroup{} + + topics, err := k.client.Topics() + if err != nil { + k.Log.Warn("fetch topics info error: %s", err) + return err + } + + topicPartitionsOffsets := make(map[string]map[int32]int64) + for _, topic := range topics { + if topic == "" || !k.topicFilter.MatchString(topic) { + continue + } + topicPartitionsOffsets[topic], err = k.fetchTopicPartitionOffset(topic) + if err != nil { + k.Log.Warnf("fetch topic: %s, partition infomation error: %s", topic, err) + continue + } + } + + brokers := k.client.Brokers() + if len(brokers) == 0 { + k.Log.Warnf("the kafka has't any brokers") + return nil + } + groupLags := make([]groupLag, 0) + for _, broker := range brokers { + wg.Add(1) + go func() { + defer wg.Done() + if err := broker.Open(k.client.Config()); err != nil && err != sarama.ErrAlreadyConnected { + k.Log.Errorf("cannot connect to broker %d: %v", broker.ID(), err) + return + } + + groupLag := k.fetchLagData(broker, topicPartitionsOffsets) + groupLags = append(groupLags, groupLag...) + }() + } + wg.Wait() + + k.writeToAccumulator(groupLags, topicPartitionsOffsets, acc) + return nil +} + +func (k *kafkaTopicConsumer) fetchTopicPartitionOffset(topic string) (map[int32]int64, error) { + partitions, err := k.client.Partitions(topic) + if err != nil { + return nil, err + } + result := make(map[int32]int64, len(partitions)) + for _, partition := range partitions { + currentOffset, err := k.client.GetOffset(topic, partition, sarama.OffsetNewest) + if err != nil { + k.Log.Warnf("fetch topic: %s, partition: %d newest offset error: %s", topic, partition, err) + continue + } + result[partition] = currentOffset + } + return result, nil + +} + +func (k *kafkaTopicConsumer) fetchLagData(broker *sarama.Broker, topicPatitionsOffsets map[string]map[int32]int64) []groupLag { + groups, err := broker.ListGroups(&sarama.ListGroupsRequest{}) + if err != nil { + k.Log.Errorf("cannot get consumer group: %v", err) + return nil + } + groupIds := make([]string, 0) + for groupId := range groups.Groups { + if k.groupFilter.MatchString(groupId) { + groupIds = append(groupIds, groupId) + } + } + + result := make([]groupLag, 0) + for _, group := range groupIds { + offsetFetchResponse := k.getGroupAllTopicOffset(broker, group, topicPatitionsOffsets) + for topic, partitions := range offsetFetchResponse.Blocks { + var currentOffsetSum int64 + var lagSum int64 + partitionConsumed := false + items := make([]groupLagItem, 0) + for partition, offsetResponseBlock := range partitions { + err := offsetResponseBlock.Err + if err != sarama.ErrNoError { + k.Log.Errorf("partition %d offset of the topic: %s and consumer group: %s error :%v", partition, topic, group, err.Error()) + continue + } + currentOffset := offsetResponseBlock.Offset + currentOffsetSum += currentOffset + // consume offset + + if offset, ok := topicPatitionsOffsets[topic][partition]; ok { + var lag int64 + if offsetResponseBlock.Offset == -1 { + lag = offset + } else { + partitionConsumed = true + lag = offset - offsetResponseBlock.Offset + } + lagSum += lag // no matter what partition + // lag + items = append(items, groupLagItem{lag: lag, partition: partition, offset: offset}) + } else { + k.Log.Warnf("no offset of topic %s partition %d, cannot get consumer group lag", topic, partition) + } + } + if !partitionConsumed { + k.Log.Debugf("the current group: %s hasn't insterest with the topic: %s , skipd", group, topic) + continue + } + // currentOffsetSum, lagSum (all topic) + result = append(result, groupLag{topic: topic, groupID: group, lagSum: lagSum, offsetSum: currentOffsetSum, items: items}) + } + } + return result + +} + +func (k *kafkaTopicConsumer) getGroupAllTopicOffset(broker *sarama.Broker, group string, topicPatitionsOffsets map[string]map[int32]int64) *sarama.OffsetFetchResponse { + offsetFetchRequest := sarama.OffsetFetchRequest{ConsumerGroup: group, Version: 1} + for topic, partitions := range topicPatitionsOffsets { + for partition := range partitions { + offsetFetchRequest.AddPartition(topic, partition) + } + } + // len(group.members), group.groupId + offsetFetchResponse, err := broker.FetchOffset(&offsetFetchRequest) + if err != nil { + k.Log.Errorf("Cannot get offset of group %s: %v", group, err) + return nil + } + return offsetFetchResponse +} + +func (k *kafkaTopicConsumer) writeToAccumulator(groupLags []groupLag, topicPatitionsOffsets map[string]map[int32]int64, acc telegraf.Accumulator) { + timestamp := time.Now().UnixMilli() + for topic, partitions := range topicPatitionsOffsets { + for partition, offset := range partitions { + acc.AddFields("burrow_topic", + map[string]interface{}{ + "offset": offset, + "timestamp": timestamp, + }, + map[string]string{ + "topic": topic, + "partition": strconv.Itoa(int(partition)), + }) + } + } + + for _, groupLag := range groupLags { + for _, item := range groupLag.items { + acc.AddFields("burrow_partition", // + map[string]interface{}{ + "lag": item.lag, + "offset": item.offset, + "timestamp": timestamp, + }, // + map[string]string{ + "group": groupLag.groupID, + "topic": groupLag.topic, + "partition": strconv.FormatInt(int64(item.partition), 10), + }, // + ) + } + + acc.AddFields("burrow_group", + map[string]interface{}{ + "total_lag": groupLag.lagSum, + "lag": groupLag.lagSum, + "offset_sum": groupLag.offsetSum, + "timestamp": timestamp, + "partition_count": len(groupLag.items), + }, + map[string]string{ + "group": groupLag.groupID, + }) + } + +} + +func (k *kafkaTopicConsumer) SampleConfig() string { + return sampleConfig +} + +func (k *kafkaTopicConsumer) Init() error { + + var err error + + cfg := sarama.NewConfig() + // Kafka version 0.10.2.0 is required for consumer groups. + cfg.Version = sarama.V0_10_2_0 + + if err := k.SetConfig(cfg); err != nil { + return err + } + + k.config = cfg + k.client, err = sarama.NewClient(k.Brokers, k.config) + if err != nil { + k.Log.Errorf("create a kafka client for %+v failed: %s", k.Brokers, err) + return err + } + + if k.ConsumerGroups == "" { + k.ConsumerGroups = ".*" + } + + if k.Topics == "" { + k.Topics = ".*" + } + + k.groupFilter = regexp.MustCompile(k.ConsumerGroups) + k.topicFilter = regexp.MustCompile(k.Topics) + + k.Log.Debugf("initialize kafka_topic_consumer input plugin succeed") + return nil +} From 88259bcb363744d2b2f5641a279894de9321895f Mon Sep 17 00:00:00 2001 From: Kun Zhao Date: Wed, 23 Nov 2022 15:53:22 +0800 Subject: [PATCH 2/9] Add sample conf --- plugins/inputs/kafka_topic_consumer/README.md | 2 +- .../inputs/kafka_topic_consumer/sample.conf | 94 +++++++++++++++++++ plugins/inputs/mongodb/README.md | 5 +- 3 files changed, 98 insertions(+), 3 deletions(-) create mode 100644 plugins/inputs/kafka_topic_consumer/sample.conf diff --git a/plugins/inputs/kafka_topic_consumer/README.md b/plugins/inputs/kafka_topic_consumer/README.md index 42185738cc792..0590648cd2935 100644 --- a/plugins/inputs/kafka_topic_consumer/README.md +++ b/plugins/inputs/kafka_topic_consumer/README.md @@ -9,7 +9,7 @@ Support the kafka's verson must greater than 0.10.2. ## Configuration ```toml @sample.conf -# Read metrics from Kafka topics +# Read consumer group offset and lag from Kafka. [[inputs.kafka_topic_consumer]] ## Kafka brokers. brokers = ["localhost:9092"] diff --git a/plugins/inputs/kafka_topic_consumer/sample.conf b/plugins/inputs/kafka_topic_consumer/sample.conf new file mode 100644 index 0000000000000..668c15b8441fc --- /dev/null +++ b/plugins/inputs/kafka_topic_consumer/sample.conf @@ -0,0 +1,94 @@ +# Read consumer group offset and lag from Kafka. +[[inputs.kafka_topic_consumer]] + ## Kafka brokers. + brokers = ["localhost:9092"] + + ## Topics interested in offset and lag + ## (regexp, default is ".*" represents all topics, ) + topics = .* + + ## ConsumerGroups interested in offset and lag + ## (regexp, default is ".*" represents all consumer groups) + consumer_groups = .* + + ## Optional Client id + # client_id = "Telegraf" + + ## Set the minimal supported Kafka version. Setting this enables the use of new + ## Kafka features and APIs. Must be 0.10.2.0 or greater. + ## ex: version = "1.1.0" + # version = "" + + ## Optional TLS Config + # tls_ca = "/etc/telegraf/ca.pem" + # tls_cert = "/etc/telegraf/cert.pem" + # tls_key = "/etc/telegraf/key.pem" + ## Use TLS but skip chain & host verification + # insecure_skip_verify = false + + ## SASL authentication credentials. These settings should typically be used + ## with TLS encryption enabled + # sasl_username = "kafka" + # sasl_password = "secret" + + ## Optional SASL: + ## one of: OAUTHBEARER, PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, GSSAPI + ## (defaults to PLAIN) + # sasl_mechanism = "" + + ## used if sasl_mechanism is GSSAPI (experimental) + # sasl_gssapi_service_name = "" + # ## One of: KRB5_USER_AUTH and KRB5_KEYTAB_AUTH + # sasl_gssapi_auth_type = "KRB5_USER_AUTH" + # sasl_gssapi_kerberos_config_path = "/" + # sasl_gssapi_realm = "realm" + # sasl_gssapi_key_tab_path = "" + # sasl_gssapi_disable_pafxfast = false + + ## used if sasl_mechanism is OAUTHBEARER (experimental) + # sasl_access_token = "" + + ## SASL protocol version. When connecting to Azure EventHub set to 0. + # sasl_version = 1 + + # Disable Kafka metadata full fetch + # metadata_full = false + + ## Name of the consumer group. + # consumer_group = "telegraf_metrics_consumers" + + ## Compression codec represents the various compression codecs recognized by + ## Kafka in messages. + ## 0 : None + ## 1 : Gzip + ## 2 : Snappy + ## 3 : LZ4 + ## 4 : ZSTD + # compression_codec = 0 + + + ## Consumer group partition assignment strategy; one of "range", "roundrobin" or "sticky". + # balance_strategy = "range" + + ## Maximum length of a message to consume, in bytes (default 0/unlimited); + ## larger messages are dropped + max_message_len = 1000000 + + ## Maximum messages to read from the broker that have not been written by an + ## output. For best throughput set based on the number of metrics within + ## each message and the size of the output's metric_batch_size. + ## + ## For example, if each message from the queue contains 10 metrics and the + ## output metric_batch_size is 1000, setting this to 100 will ensure that a + ## full batch is collected and the write is triggered immediately without + ## waiting until the next flush_interval. + # max_undelivered_messages = 1000 + + ## Maximum amount of time the consumer should take to process messages. If + ## the debug log prints messages from sarama about 'abandoning subscription + ## to [topic] because consuming was taking too long', increase this value to + ## longer than the time taken by the output plugin(s). + ## + ## Note that the effective timeout could be between 'max_processing_time' and + ## '2 * max_processing_time'. + # max_processing_time = "100ms" \ No newline at end of file diff --git a/plugins/inputs/mongodb/README.md b/plugins/inputs/mongodb/README.md index d32f019c69abc..8be31c7ba16e3 100644 --- a/plugins/inputs/mongodb/README.md +++ b/plugins/inputs/mongodb/README.md @@ -41,8 +41,7 @@ All MongoDB server versions from 2.6 and higher are supported. # insecure_skip_verify = false ## Skip Ping at initialization - ## If you are not sure that your mongodb is already running, skipping the ping operation will allow you to run Telegraf normally and mongodb will try to reconnect by itself - # skip_ping_at_init = false + # skip_ping_at_init = true ``` ### Permissions @@ -315,3 +314,5 @@ mongodb_col_stats,collection=foo,db_name=local,hostname=127.0.0.1:27017 size=375 mongodb_shard_stats,hostname=127.0.0.1:27017,in_use=3i,available=3i,created=4i,refreshing=0i 1522799074000000000 mongodb_top_stats,collection=foo,total_time=1471,total_count=158,read_lock_time=49614,read_lock_count=657,write_lock_time=49125456,write_lock_count=9841,queries_time=174,queries_count=495,get_more_time=498,get_more_count=46,insert_time=2651,insert_count=1265,update_time=0,update_count=0,remove_time=0,remove_count=0,commands_time=498611,commands_count=4615 ``` +5,get_more_time=498,get_more_count=46,insert_time=2651,insert_count=1265,update_time=0,update_count=0,remove_time=0,remove_count=0,commands_time=498611,commands_count=4615 +``` From 2fc48010deb83de83e8c9a865827d0c48ce0b481 Mon Sep 17 00:00:00 2001 From: Kun Zhao Date: Wed, 23 Nov 2022 16:35:56 +0800 Subject: [PATCH 3/9] Register kafka topic consumer plugin --- plugins/inputs/all/all.go | 1 + 1 file changed, 1 insertion(+) diff --git a/plugins/inputs/all/all.go b/plugins/inputs/all/all.go index 549397d24d877..7a051750e299e 100644 --- a/plugins/inputs/all/all.go +++ b/plugins/inputs/all/all.go @@ -89,6 +89,7 @@ import ( _ "github.com/influxdata/telegraf/plugins/inputs/jti_openconfig_telemetry" _ "github.com/influxdata/telegraf/plugins/inputs/kafka_consumer" _ "github.com/influxdata/telegraf/plugins/inputs/kafka_consumer_legacy" + _ "github.com/influxdata/telegraf/plugins/inputs/kafka_topic_consumer" _ "github.com/influxdata/telegraf/plugins/inputs/kapacitor" _ "github.com/influxdata/telegraf/plugins/inputs/kernel" _ "github.com/influxdata/telegraf/plugins/inputs/kernel_vmstat" From eec0b528df619b12276844f0475d8bd0bb02861c Mon Sep 17 00:00:00 2001 From: Kun Zhao Date: Wed, 23 Nov 2022 17:39:01 +0800 Subject: [PATCH 4/9] Add excludes topics --- plugins/inputs/kafka_topic_consumer/README.md | 8 ++- .../kafka_topic_consumer.go | 72 +++++++++---------- .../inputs/kafka_topic_consumer/sample.conf | 8 ++- 3 files changed, 46 insertions(+), 42 deletions(-) diff --git a/plugins/inputs/kafka_topic_consumer/README.md b/plugins/inputs/kafka_topic_consumer/README.md index 0590648cd2935..1a474469b71dc 100644 --- a/plugins/inputs/kafka_topic_consumer/README.md +++ b/plugins/inputs/kafka_topic_consumer/README.md @@ -16,11 +16,15 @@ Support the kafka's verson must greater than 0.10.2. ## Topics interested in offset and lag ## (regexp, default is ".*" represents all topics, ) - topics = .* + topics = ".*" ## ConsumerGroups interested in offset and lag ## (regexp, default is ".*" represents all consumer groups) - consumer_groups = .* + consumer_groups = ".*" + + ## ExcludeTopics exclude uninterested topics + ## (regexp, default is "", represents excluding none topics) + exclude_topics = "__.*" ## Optional Client id # client_id = "Telegraf" diff --git a/plugins/inputs/kafka_topic_consumer/kafka_topic_consumer.go b/plugins/inputs/kafka_topic_consumer/kafka_topic_consumer.go index f133e37b991f7..2981d1b285484 100644 --- a/plugins/inputs/kafka_topic_consumer/kafka_topic_consumer.go +++ b/plugins/inputs/kafka_topic_consumer/kafka_topic_consumer.go @@ -11,7 +11,6 @@ import ( "github.com/Shopify/sarama" "github.com/influxdata/telegraf" - "github.com/influxdata/telegraf/config" "github.com/influxdata/telegraf/plugins/common/kafka" "github.com/influxdata/telegraf/plugins/inputs" ) @@ -20,45 +19,38 @@ import ( //go:embed sample.conf var sampleConfig string -const ( - defaultMaxUndeliveredMessages = 1000 - defaultMaxProcessingTime = config.Duration(100 * time.Millisecond) - defaultConsumerGroup = "telegraf_metrics_consumers" - reconnectDelay = 5 * time.Second -) - -type empty struct{} -type semaphore chan empty - -type kafkaTopicConsumer struct { - Brokers []string `toml:"brokers"` - ConsumerGroups string `toml:"consumer_groups"` - Topics string `toml:"topics"` - TopicTag string `toml:"topic_tag"` - Log telegraf.Logger `toml:"-"` +type ( + kafkaTopicConsumer struct { + Brokers []string `toml:"brokers"` + ConsumerGroups string `toml:"consumer_groups"` + Topics string `toml:"topics"` + ExcludeTopics string `toml:"exclude_topics"` + Log telegraf.Logger `toml:"-"` - kafka.ReadConfig - config *sarama.Config `toml:"-"` + kafka.ReadConfig + config *sarama.Config `toml:"-"` - client sarama.Client `toml:"-"` + client sarama.Client `toml:"-"` - topicFilter *regexp.Regexp `toml:"-"` - groupFilter *regexp.Regexp `toml:"-"` -} + topicFilter *regexp.Regexp `toml:"-"` + groupFilter *regexp.Regexp `toml:"-"` + excludeTopics *regexp.Regexp `toml:"-"` + } -type groupLag struct { - groupID string - topic string - lagSum int64 - offsetSum int64 - items []groupLagItem -} + groupLag struct { + groupID string + topic string + lagSum int64 + offsetSum int64 + items []groupLagItem + } -type groupLagItem struct { - partition int32 - lag int64 - offset int64 -} + groupLagItem struct { + partition int32 + lag int64 + offset int64 + } +) func init() { inputs.Add("kafka_topic_consumer", func() telegraf.Input { @@ -80,7 +72,7 @@ func (k *kafkaTopicConsumer) Gather(acc telegraf.Accumulator) error { topicPartitionsOffsets := make(map[string]map[int32]int64) for _, topic := range topics { - if topic == "" || !k.topicFilter.MatchString(topic) { + if topic == "" || !k.topicFilter.MatchString(topic) || (k.excludeTopics != nil && k.excludeTopics.MatchString(topic)) { continue } topicPartitionsOffsets[topic], err = k.fetchTopicPartitionOffset(topic) @@ -98,7 +90,7 @@ func (k *kafkaTopicConsumer) Gather(acc telegraf.Accumulator) error { groupLags := make([]groupLag, 0) for _, broker := range brokers { wg.Add(1) - go func() { + go func(broker *sarama.Broker) { defer wg.Done() if err := broker.Open(k.client.Config()); err != nil && err != sarama.ErrAlreadyConnected { k.Log.Errorf("cannot connect to broker %d: %v", broker.ID(), err) @@ -107,7 +99,7 @@ func (k *kafkaTopicConsumer) Gather(acc telegraf.Accumulator) error { groupLag := k.fetchLagData(broker, topicPartitionsOffsets) groupLags = append(groupLags, groupLag...) - }() + }(broker) } wg.Wait() @@ -288,6 +280,10 @@ func (k *kafkaTopicConsumer) Init() error { k.groupFilter = regexp.MustCompile(k.ConsumerGroups) k.topicFilter = regexp.MustCompile(k.Topics) + if k.ExcludeTopics != "" { + k.excludeTopics = regexp.MustCompile(k.ExcludeTopics) + } + k.Log.Debugf("initialize kafka_topic_consumer input plugin succeed") return nil } diff --git a/plugins/inputs/kafka_topic_consumer/sample.conf b/plugins/inputs/kafka_topic_consumer/sample.conf index 668c15b8441fc..0d353f75a32e8 100644 --- a/plugins/inputs/kafka_topic_consumer/sample.conf +++ b/plugins/inputs/kafka_topic_consumer/sample.conf @@ -5,11 +5,15 @@ ## Topics interested in offset and lag ## (regexp, default is ".*" represents all topics, ) - topics = .* + topics = ".*" ## ConsumerGroups interested in offset and lag ## (regexp, default is ".*" represents all consumer groups) - consumer_groups = .* + consumer_groups = ".*" + + ## ExcludeTopics exclude uninterested topics + ## (regexp, default is "", represents excluding none topics) + exclude_topics = "__.*" ## Optional Client id # client_id = "Telegraf" From b1453d0328bfe97862cdd1aa2cc7c6caf66c98c1 Mon Sep 17 00:00:00 2001 From: Kun Zhao Date: Thu, 24 Nov 2022 10:02:58 +0800 Subject: [PATCH 5/9] Change name --- .../kafka_topic_consumer.go | 7 ++-- .../inputs/kafka_topic_consumer/sample.conf | 41 +------------------ 2 files changed, 5 insertions(+), 43 deletions(-) diff --git a/plugins/inputs/kafka_topic_consumer/kafka_topic_consumer.go b/plugins/inputs/kafka_topic_consumer/kafka_topic_consumer.go index 2981d1b285484..d6d68538f1c7a 100644 --- a/plugins/inputs/kafka_topic_consumer/kafka_topic_consumer.go +++ b/plugins/inputs/kafka_topic_consumer/kafka_topic_consumer.go @@ -203,7 +203,7 @@ func (k *kafkaTopicConsumer) writeToAccumulator(groupLags []groupLag, topicPatit timestamp := time.Now().UnixMilli() for topic, partitions := range topicPatitionsOffsets { for partition, offset := range partitions { - acc.AddFields("burrow_topic", + acc.AddFields("kafka_topic_offset", map[string]interface{}{ "offset": offset, "timestamp": timestamp, @@ -217,7 +217,7 @@ func (k *kafkaTopicConsumer) writeToAccumulator(groupLags []groupLag, topicPatit for _, groupLag := range groupLags { for _, item := range groupLag.items { - acc.AddFields("burrow_partition", // + acc.AddFields("kafka_topic_partition", // map[string]interface{}{ "lag": item.lag, "offset": item.offset, @@ -231,7 +231,7 @@ func (k *kafkaTopicConsumer) writeToAccumulator(groupLags []groupLag, topicPatit ) } - acc.AddFields("burrow_group", + acc.AddFields("kafka_group_topic", map[string]interface{}{ "total_lag": groupLag.lagSum, "lag": groupLag.lagSum, @@ -241,6 +241,7 @@ func (k *kafkaTopicConsumer) writeToAccumulator(groupLags []groupLag, topicPatit }, map[string]string{ "group": groupLag.groupID, + "topic": groupLag.topic, }) } diff --git a/plugins/inputs/kafka_topic_consumer/sample.conf b/plugins/inputs/kafka_topic_consumer/sample.conf index 0d353f75a32e8..185d0b378a872 100644 --- a/plugins/inputs/kafka_topic_consumer/sample.conf +++ b/plugins/inputs/kafka_topic_consumer/sample.conf @@ -56,43 +56,4 @@ # sasl_version = 1 # Disable Kafka metadata full fetch - # metadata_full = false - - ## Name of the consumer group. - # consumer_group = "telegraf_metrics_consumers" - - ## Compression codec represents the various compression codecs recognized by - ## Kafka in messages. - ## 0 : None - ## 1 : Gzip - ## 2 : Snappy - ## 3 : LZ4 - ## 4 : ZSTD - # compression_codec = 0 - - - ## Consumer group partition assignment strategy; one of "range", "roundrobin" or "sticky". - # balance_strategy = "range" - - ## Maximum length of a message to consume, in bytes (default 0/unlimited); - ## larger messages are dropped - max_message_len = 1000000 - - ## Maximum messages to read from the broker that have not been written by an - ## output. For best throughput set based on the number of metrics within - ## each message and the size of the output's metric_batch_size. - ## - ## For example, if each message from the queue contains 10 metrics and the - ## output metric_batch_size is 1000, setting this to 100 will ensure that a - ## full batch is collected and the write is triggered immediately without - ## waiting until the next flush_interval. - # max_undelivered_messages = 1000 - - ## Maximum amount of time the consumer should take to process messages. If - ## the debug log prints messages from sarama about 'abandoning subscription - ## to [topic] because consuming was taking too long', increase this value to - ## longer than the time taken by the output plugin(s). - ## - ## Note that the effective timeout could be between 'max_processing_time' and - ## '2 * max_processing_time'. - # max_processing_time = "100ms" \ No newline at end of file + # metadata_full = false \ No newline at end of file From 2c26e528bc39f17eed06058bef611893ffe1502d Mon Sep 17 00:00:00 2001 From: Kun Zhao Date: Thu, 24 Nov 2022 10:28:21 +0800 Subject: [PATCH 6/9] Update README --- plugins/inputs/kafka_topic_consumer/README.md | 40 ------------------- 1 file changed, 40 deletions(-) diff --git a/plugins/inputs/kafka_topic_consumer/README.md b/plugins/inputs/kafka_topic_consumer/README.md index 1a474469b71dc..0576bdc607a0b 100644 --- a/plugins/inputs/kafka_topic_consumer/README.md +++ b/plugins/inputs/kafka_topic_consumer/README.md @@ -68,45 +68,5 @@ Support the kafka's verson must greater than 0.10.2. # Disable Kafka metadata full fetch # metadata_full = false - - ## Name of the consumer group. - # consumer_group = "telegraf_metrics_consumers" - - ## Compression codec represents the various compression codecs recognized by - ## Kafka in messages. - ## 0 : None - ## 1 : Gzip - ## 2 : Snappy - ## 3 : LZ4 - ## 4 : ZSTD - # compression_codec = 0 - - - ## Consumer group partition assignment strategy; one of "range", "roundrobin" or "sticky". - # balance_strategy = "range" - - ## Maximum length of a message to consume, in bytes (default 0/unlimited); - ## larger messages are dropped - max_message_len = 1000000 - - ## Maximum messages to read from the broker that have not been written by an - ## output. For best throughput set based on the number of metrics within - ## each message and the size of the output's metric_batch_size. - ## - ## For example, if each message from the queue contains 10 metrics and the - ## output metric_batch_size is 1000, setting this to 100 will ensure that a - ## full batch is collected and the write is triggered immediately without - ## waiting until the next flush_interval. - # max_undelivered_messages = 1000 - - ## Maximum amount of time the consumer should take to process messages. If - ## the debug log prints messages from sarama about 'abandoning subscription - ## to [topic] because consuming was taking too long', increase this value to - ## longer than the time taken by the output plugin(s). - ## - ## Note that the effective timeout could be between 'max_processing_time' and - ## '2 * max_processing_time'. - # max_processing_time = "100ms" ``` - [kafka]: https://kafka.apache.org \ No newline at end of file From 13b16f3805cc60a665b20c3bd4786547dbc69720 Mon Sep 17 00:00:00 2001 From: Kun Zhao Date: Thu, 24 Nov 2022 10:49:10 +0800 Subject: [PATCH 7/9] Update README --- plugins/inputs/kafka_topic_consumer/README.md | 136 ++++++++++++++++++ 1 file changed, 136 insertions(+) diff --git a/plugins/inputs/kafka_topic_consumer/README.md b/plugins/inputs/kafka_topic_consumer/README.md index 0576bdc607a0b..c7d126d3f2b84 100644 --- a/plugins/inputs/kafka_topic_consumer/README.md +++ b/plugins/inputs/kafka_topic_consumer/README.md @@ -69,4 +69,140 @@ Support the kafka's verson must greater than 0.10.2. # Disable Kafka metadata full fetch # metadata_full = false ``` + + +## Metrics + +### Fields + +* `kafka_group_topic` (one event per each consumer group) + * partition_count (int, `number of partitions`) + * offset_sum (int64, `total offset of all partitions`) + * total_lag (int64, `totallag`) + * lag (int64, `always equal to total_lag || 0`) + * timestamp (int64) + +* `kafka_topic_partition` (one event per each topic partition) + * lag (int64, `current_lag || 0`) + * offset (int64) + * timestamp (int64) + +* `kafka_topic_offset` (one event per topic offset) + * offset (int64) + * timestamp (int64) + +### Tags + +* `kafka_group_topic` + * group (string) + * topic (string) + +* `kafka_topic_partition` + * group (string) + * topic (string) + * partition (int) + * owner (string) + +* `kafka_topic_offset` + * topic (string) + * partition (int) + +### Example + +#### kafka_group_topic + +```json +{ + "fields":{ + "lag":1, + "offset_sum":388, + "partition_count":3, + "timestamp":1669255740005, + "total_lag":1 + }, + "name":"kafka_group_topic", + "tags":{ + "category":"infrastructure", + "cluster_name":"kafka_3e189edb8ad149a4", + "deploy_mode":"host", + "group":"things-data-listener", + "host":"zhaokun-tm1801", + "host_ipv4":"172.16.0.28", + "host_name":"VM-0-28-ubuntu", + "index":"kafkaserver", + "instance":"kafka_3e189edb8ad149a4", + "node_name":"node_03a703046f", + "service":"kafka_3e189edb8ad149a4", + "system":"megacloud-kafka", + "tags":"_metrics", + "topic":"to_cloud", + "type":"kafkaserver" + }, + "timestamp":1669255740 +} +``` + +#### kafka_topic_partition + +```json +{ + "fields":{ + "lag":1, + "offset":137, + "timestamp":1669255840001 + }, + "name":"kafka_topic_partition", + "tags":{ + "category":"infrastructure", + "cluster_name":"kafka_3e189edb8ad149a4", + "deploy_mode":"host", + "group":"things-data-listener", + "host":"zhaokun-tm1801", + "host_ipv4":"172.16.0.28", + "host_name":"VM-0-28-ubuntu", + "index":"kafkaserver", + "instance":"kafka_3e189edb8ad149a4", + "node_name":"node_03a703046f", + "partition":"0", + "service":"kafka_3e189edb8ad149a4", + "system":"megacloud-kafka", + "tags":"_metrics", + "topic":"to_cloud", + "type":"kafkaserver" + }, + "timestamp":1669255840 +} +``` + +#### kafka_topic_offset + +```json +{ + "fields":{ + "offset":140, + "timestamp":1669255890001 + }, + "name":"kafka_topic_offset", + "tags":{ + "category":"infrastructure", + "cluster_name":"kafka_3e189edb8ad149a4", + "deploy_mode":"host", + "host":"zhaokun-tm1801", + "host_ipv4":"172.16.0.28", + "host_name":"VM-0-28-ubuntu", + "index":"kafkaserver", + "instance":"kafka_3e189edb8ad149a4", + "node_name":"node_03a703046f", + "partition":"1", + "service":"kafka_3e189edb8ad149a4", + "system":"megacloud-kafka", + "tags":"_metrics", + "topic":"to_cloud", + "type":"kafkaserver" + }, + "timestamp":1669255890 +} + +``` + [kafka]: https://kafka.apache.org \ No newline at end of file From fd98442d25552637c850cc70febb9fcf19d559b1 Mon Sep 17 00:00:00 2001 From: Kun Zhao Date: Thu, 24 Nov 2022 11:25:31 +0800 Subject: [PATCH 8/9] Update plugins/inputs/mongodb/README.md Co-authored-by: yufu.deng <954068039@qq.com> --- plugins/inputs/mongodb/README.md | 2 -- 1 file changed, 2 deletions(-) diff --git a/plugins/inputs/mongodb/README.md b/plugins/inputs/mongodb/README.md index 8be31c7ba16e3..9ee3852e0dd16 100644 --- a/plugins/inputs/mongodb/README.md +++ b/plugins/inputs/mongodb/README.md @@ -314,5 +314,3 @@ mongodb_col_stats,collection=foo,db_name=local,hostname=127.0.0.1:27017 size=375 mongodb_shard_stats,hostname=127.0.0.1:27017,in_use=3i,available=3i,created=4i,refreshing=0i 1522799074000000000 mongodb_top_stats,collection=foo,total_time=1471,total_count=158,read_lock_time=49614,read_lock_count=657,write_lock_time=49125456,write_lock_count=9841,queries_time=174,queries_count=495,get_more_time=498,get_more_count=46,insert_time=2651,insert_count=1265,update_time=0,update_count=0,remove_time=0,remove_count=0,commands_time=498611,commands_count=4615 ``` -5,get_more_time=498,get_more_count=46,insert_time=2651,insert_count=1265,update_time=0,update_count=0,remove_time=0,remove_count=0,commands_time=498611,commands_count=4615 -``` From 0d2b968f580a746f01b424d3276feeee93cce57b Mon Sep 17 00:00:00 2001 From: Kun Zhao Date: Thu, 24 Nov 2022 12:20:46 +0800 Subject: [PATCH 9/9] Add total consume update --- plugins/inputs/kafka_topic_consumer/README.md | 6 +-- .../kafka_topic_consumer.go | 37 ++++++++++++------- 2 files changed, 26 insertions(+), 17 deletions(-) diff --git a/plugins/inputs/kafka_topic_consumer/README.md b/plugins/inputs/kafka_topic_consumer/README.md index c7d126d3f2b84..f841eece6e61f 100644 --- a/plugins/inputs/kafka_topic_consumer/README.md +++ b/plugins/inputs/kafka_topic_consumer/README.md @@ -69,8 +69,6 @@ Support the kafka's verson must greater than 0.10.2. # Disable Kafka metadata full fetch # metadata_full = false ``` - - ## Metrics ### Fields @@ -78,7 +76,8 @@ Support the kafka's verson must greater than 0.10.2. * `kafka_group_topic` (one event per each consumer group) * partition_count (int, `number of partitions`) * offset_sum (int64, `total offset of all partitions`) - * total_lag (int64, `totallag`) + * current_offset_sum (int64 `total offset of consumers to consume message`) + * total_lag (int64, `total lag`) * lag (int64, `always equal to total_lag || 0`) * timestamp (int64) @@ -101,7 +100,6 @@ Support the kafka's verson must greater than 0.10.2. * group (string) * topic (string) * partition (int) - * owner (string) * `kafka_topic_offset` * topic (string) diff --git a/plugins/inputs/kafka_topic_consumer/kafka_topic_consumer.go b/plugins/inputs/kafka_topic_consumer/kafka_topic_consumer.go index d6d68538f1c7a..9a69548e62a56 100644 --- a/plugins/inputs/kafka_topic_consumer/kafka_topic_consumer.go +++ b/plugins/inputs/kafka_topic_consumer/kafka_topic_consumer.go @@ -38,11 +38,14 @@ type ( } groupLag struct { - groupID string - topic string - lagSum int64 + groupID string + topic string + lagSum int64 + // topic total offset of produced messages. offsetSum int64 - items []groupLagItem + // consumer consumes message at currrent offset of a topic. + currentOffsetSum int64 + items []groupLagItem } groupLagItem struct { @@ -143,6 +146,7 @@ func (k *kafkaTopicConsumer) fetchLagData(broker *sarama.Broker, topicPatitionsO offsetFetchResponse := k.getGroupAllTopicOffset(broker, group, topicPatitionsOffsets) for topic, partitions := range offsetFetchResponse.Blocks { var currentOffsetSum int64 + var offsetSum int64 var lagSum int64 partitionConsumed := false items := make([]groupLagItem, 0) @@ -152,8 +156,9 @@ func (k *kafkaTopicConsumer) fetchLagData(broker *sarama.Broker, topicPatitionsO k.Log.Errorf("partition %d offset of the topic: %s and consumer group: %s error :%v", partition, topic, group, err.Error()) continue } - currentOffset := offsetResponseBlock.Offset - currentOffsetSum += currentOffset + if offsetResponseBlock.Offset != -1 { + currentOffsetSum += offsetResponseBlock.Offset + } // consume offset if offset, ok := topicPatitionsOffsets[topic][partition]; ok { @@ -165,6 +170,7 @@ func (k *kafkaTopicConsumer) fetchLagData(broker *sarama.Broker, topicPatitionsO lag = offset - offsetResponseBlock.Offset } lagSum += lag // no matter what partition + offsetSum += offset // lag items = append(items, groupLagItem{lag: lag, partition: partition, offset: offset}) } else { @@ -172,11 +178,15 @@ func (k *kafkaTopicConsumer) fetchLagData(broker *sarama.Broker, topicPatitionsO } } if !partitionConsumed { - k.Log.Debugf("the current group: %s hasn't insterest with the topic: %s , skipd", group, topic) + k.Log.Debugf("the current group: %s hasn't insterest with the topic: %s , ignored", group, topic) continue } // currentOffsetSum, lagSum (all topic) - result = append(result, groupLag{topic: topic, groupID: group, lagSum: lagSum, offsetSum: currentOffsetSum, items: items}) + result = append(result, groupLag{topic: topic, groupID: group, + lagSum: lagSum, + offsetSum: offsetSum, + currentOffsetSum: currentOffsetSum, + items: items}) } } return result @@ -233,11 +243,12 @@ func (k *kafkaTopicConsumer) writeToAccumulator(groupLags []groupLag, topicPatit acc.AddFields("kafka_group_topic", map[string]interface{}{ - "total_lag": groupLag.lagSum, - "lag": groupLag.lagSum, - "offset_sum": groupLag.offsetSum, - "timestamp": timestamp, - "partition_count": len(groupLag.items), + "total_lag": groupLag.lagSum, + "lag": groupLag.lagSum, + "offset_sum": groupLag.offsetSum, + "current_offset_sum": groupLag.currentOffsetSum, + "timestamp": timestamp, + "partition_count": len(groupLag.items), }, map[string]string{ "group": groupLag.groupID,