Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kafka input plugin #5

Merged
merged 9 commits into from
Nov 24, 2022

Conversation

zhao-kun
Copy link

A Kafka input plugin that collects consumers' lag of the topics in the embed plugin way instead of depending on the Burrow.

@zhao-kun
Copy link
Author

The configuration for the local test

[agent]
    interval = "10s"
    debug = false
    # Override default hostname, if empty use os.Hostname()
    hostname = ""
    round_interval = true
    flush_interval = "10s"
    flush_jitter = "0s"
    collection_jitter = "0s"
    metric_batch_size = 1000
    metric_buffer_limit = 10000
    quiet = false
    logfile = "/home/zhaokun/tmp/telegraf.log"
[[inputs.kafka_topic_consumer]]
    ## Kafka brokers.
    brokers = ["localhost:9092"]

    ## Topics interested in offset and lag
    ## (regexp, default is ".*" represents all topics, )
    topics = "to_cloud"

    ## ConsumerGroups interested in offset and lag
    ## (regexp, default is ".*" represents all consumer groups)
    consumer_groups = ".*"

    exclude_topics = "__.*"

    [inputs.kafka_topic_consumer.tags]
        cluster_name = "kafka_3e189edb8ad149a4"
        instance = "kafka_3e189edb8ad149a4"
        system = "megacloud-kafka"
        service = "kafka_3e189edb8ad149a4"
        host_ipv4 = "172.16.0.28"
        deploy_mode = "host"
        type = "kafkaserver"
        index = "kafkaserver"
        tags = "_metrics"
        category = "infrastructure"
        node_name = "node_03a703046f"
        host_name = "VM-0-28-ubuntu"
    [inputs.burrow.tagdrop]
        topic = ["__consumer_offsets"]
[[outputs.file]]
    ## Files to write to, "stdout" is a specially handled file.
    files = ["stdout"]

    ## Use batch serialization format instead of line based delimiting.  The
    ## batch format allows for the production of non line based output formats and
    ## may more efficiently encode and write metrics.
    # use_batch_format = false

    ## The file will be rotated after the time interval specified.  When set
    ## to 0 no time based rotation is performed.
    # rotation_interval = "0h"

    ## The logfile will be rotated when it becomes larger than the specified
    ## size.  When set to 0 no size based rotation is performed.
    # rotation_max_size = "0MB"

    ## Maximum number of rotated archives to keep, any older logs are deleted.
    ## If set to -1, no archives are removed.
    # rotation_max_archives = 5

    ## Data format to output.
    ## Each data format has its own unique set of configuration options, read
    ## more about them here:
    ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md
    data_format = "json"

@zhao-kun zhao-kun marked this pull request as ready for review November 24, 2022 02:05
@zhao-kun
Copy link
Author

change the original three group metric names as follows:
burrow_topic -> kafka_topic_offset
burrow_partition -> kafka_topic_partition
burrow_group -> kafka_group_topic

@zhao-kun
Copy link
Author

kafka_group_topic

Sum offset and lag for a consumer group with the specified topic

lag value is always equal to total_lag, reserve the field of the total_lag is for backward compatible

{
  "fields":{
    "lag":1,
    "offset_sum":388,
    "partition_count":3,
    "timestamp":1669255740005,
    "total_lag":1
  },
  "name":"kafka_group_topic",
  "tags":{
    "category":"infrastructure",
    "cluster_name":"kafka_3e189edb8ad149a4",
    "deploy_mode":"host",
    "group":"things-data-listener",
    "host":"zhaokun-tm1801",
    "host_ipv4":"172.16.0.28",
    "host_name":"VM-0-28-ubuntu",
    "index":"kafkaserver",
    "instance":"kafka_3e189edb8ad149a4",
    "node_name":"node_03a703046f",
    "service":"kafka_3e189edb8ad149a4",
    "system":"megacloud-kafka",
    "tags":"_metrics",
    "topic":"to_cloud",
    "type":"kafkaserver"
  },
  "timestamp":1669255740
}

kafka_topic_partition

Collect lag and offset per topic's partition and consumer group

{
  "fields":{
    "lag":1,
    "offset":137,
    "timestamp":1669255840001
  },
  "name":"kafka_topic_partition",
  "tags":{
    "category":"infrastructure",
    "cluster_name":"kafka_3e189edb8ad149a4",
    "deploy_mode":"host",
    "group":"things-data-listener",
    "host":"zhaokun-tm1801",
    "host_ipv4":"172.16.0.28",
    "host_name":"VM-0-28-ubuntu",
    "index":"kafkaserver",
    "instance":"kafka_3e189edb8ad149a4",
    "node_name":"node_03a703046f",
    "partition":"0",
    "service":"kafka_3e189edb8ad149a4",
    "system":"megacloud-kafka",
    "tags":"_metrics",
    "topic":"to_cloud",
    "type":"kafkaserver"
  },
  "timestamp":1669255840
}

kafka_topic_offset

Collect topic offset per partition

{
  "fields":{
    "offset":140,
    "timestamp":1669255890001
  },
  "name":"kafka_topic_offset",
  "tags":{
    "category":"infrastructure",
    "cluster_name":"kafka_3e189edb8ad149a4",
    "deploy_mode":"host",
    "host":"zhaokun-tm1801",
    "host_ipv4":"172.16.0.28",
    "host_name":"VM-0-28-ubuntu",
    "index":"kafkaserver",
    "instance":"kafka_3e189edb8ad149a4",
    "node_name":"node_03a703046f",
    "partition":"1",
    "service":"kafka_3e189edb8ad149a4",
    "system":"megacloud-kafka",
    "tags":"_metrics",
    "topic":"to_cloud",
    "type":"kafkaserver"
  },
  "timestamp":1669255890
}

@observeralone
Copy link
Collaborator

the Metrics and Example Output can write in readme like:mongodb metrics

@zhao-kun
Copy link
Author

the Metrics and Example Output can write in readme like:mongodb metrics

Updated

continue
}
currentOffset := offsetResponseBlock.Offset
currentOffsetSum += currentOffset
Copy link

@landyking landyking Nov 24, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to check the case when currentOffset is -1

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated

@landyking landyking merged commit 8ed2f26 into megaease:master Nov 24, 2022
@zhao-kun zhao-kun deleted the kafka-topic-consumer-plugin branch November 24, 2022 06:06
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants