-
Notifications
You must be signed in to change notification settings - Fork 4.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Metricset/kafka consumer groups #3240
Metricset/kafka consumer groups #3240
Conversation
4baa87b
to
10939b1
Compare
378a540
to
797f695
Compare
797f695
to
35cdcc3
Compare
queryGroups []string, | ||
) (map[string]GroupDescription, error) { | ||
requ := &sarama.DescribeGroupsRequest{Groups: queryGroups} | ||
resp, err := b.b.DescribeGroups(requ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
b.b. is not very readable ;-)
}, | ||
"kafka":{ | ||
"consumergroup":{ | ||
"example": "consumergroup" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should populate this for the docs.
description: > | ||
Broker address | ||
|
||
- name: group |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be id
, as it will be kafka.consumegroup.id
. Alternative is kafka.conumser.group
. Is the partition, topic, offset specific to a consumergroup or a consumer?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will rename it to id
Topics []string `config:"topics"` | ||
} | ||
|
||
var defaultConfig = metricsetConfig{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should move this to the kafka module later as I think it is the same for both metricsets.
import ( | ||
"crypto/tls" | ||
|
||
"github.com/Shopify/sarama" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
missing newline
clientHost string | ||
} | ||
|
||
var debugf = logp.MakeDebug("kafka") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
kafka.consumergroup?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
partitions is using kafka
as well. we should clean this in a followup PR. Do we follow a debug selector naming standard in metricbeat?
"fmt" | ||
"math/rand" | ||
|
||
"github.com/Shopify/sarama" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
newline
wg := sync.WaitGroup{} | ||
results := make(chan result, len(groups)) | ||
for _, group := range groups { | ||
group := group |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
copy needed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, as group
is used inside the go-routine. Copy is used to capture the group
in the local scope.
Alternatively one can write:
for group := range groups {
go func(group string) {
...
}(group)
}
which is similar to writing:
for group := range groups {
group := group
go func() {
...
}()
}
|
||
groupLoop: | ||
for groupID, info := range resp { | ||
G := groups[groupID] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
upper case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tend to use uppercase for local sets by convention (e.g. like N,M,L for length). Here G for group and T for topic. So not to shadow (overuse) topic
and group
used for correlating information. The 'sets' G and T hold results, so using a single upper-case makes them kinda stick out in this 'heavy' (potentially hard to follow) function.
} | ||
|
||
for topic, partitions := range memberDescr.Topics { | ||
T := G[topic] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
upper case?
21ba3f9
to
cba815d
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Waiting for CI to go green.
cba815d
to
032d0f5
Compare
- mark kafka module as experimental - add new metricset - includes unit tests only - fix kafka API version settings
032d0f5
to
595fd0a
Compare
Requires #3239
Add kafka consumer group metricset to query kafka consumer groups via kafka 0.9 consumer group API (queries kafka, not zookeeper). Requires consumer groups to be managed by kafka.