-
Notifications
You must be signed in to change notification settings - Fork 4.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Kafka module #2969
Kafka module #2969
Conversation
b0e1154
to
89cabf1
Compare
type: list | ||
description: > | ||
List of replicas ids | ||
- name: broker |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this field appears twice
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed
- name: brocker | ||
type: integer | ||
description: > | ||
Broker id |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just publish broker ID or also the address? Sure, broker id is not supposed to change within cluster. But one might prefer to search by hostname.
These metadata are available from kafka: https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-MetadataResponse
The address can be found from Broker.Addr in sarma
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea. I also added the address
- name: replicas | ||
type: list | ||
description: > | ||
List of replicas ids |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
to make it more clear: set of alive nodes (broker IDs?)
You planning to integrate ISR notes? In kafka some slave nodes might not be in sync with broker yet.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed it. I saw that there are some differences with notes and was thinking about adding a "status" field but not for the first version.
config.Net.ReadTimeout = m.Module().Config().Timeout | ||
config.ClientID = "metricbeat" | ||
|
||
client, err := sarama.NewClient([]string{m.Host()}, config) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
error check missing?
Why just one host. Often one configures a set of 'bootstrap' endpoints to query meta-data. If one fails, ask next endpoint.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
error check added.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Having one host per metricset request is the usual pattern in Metricbeat. For simplicity reason I would not change this now for Kafka.
But this brings up an interesting question: Assuming each Kafka host is monitored with a list of Hosts, currently lots of data will be duplicated in elasticsearch as not only data is fetched from the partitions on the host the client connects to.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In kafka (in comparison to other services), you do not monitor one host, but you monitor one cluster. That is, when configuring the hosts
, all hosts MUST belong to the same cluster. Having multiple bootstraping
hosts is for redundancy, to decrease the chances of errors.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For the first version we should recommend to only use one Host as part of the docs but think long term on how we handle these kind of situations in metricbeat as this will not only apply to Kafka.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmmm... is it required by module-interface to have a method doing Host() string
? Why not have the module/metricset decide?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the issue is only Kafka specific, we could also have a bootstrap_hosts: [...]
config option for the Kafka module. Lets handle this as soon as it pops up as an issue.
for _, topic := range topics { | ||
partitions, err := client.Partitions(topic) | ||
if err != nil { | ||
logp.Err("Fetch partition info for topic %s: %s", topic, err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is there a chance partitions not being empty in err? We still want to process potentially incomplete data?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking at the Sarama code, it looks like if an error is returned, partitions is always nil
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I still wonder if we want to return an error here. By returning an error, an error event
will be published, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It will return an error event. But the problem is that it will abort the execution of the event fetch for all coming topics.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this case, we might consider adding an error event to events
here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That would mean that we have an "error" entry at the wrong namespace of the event. Will open a separate issue to discuss this improvement for Metricbeat.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here is the related issue: #3004
replicas, err := client.Replicas(topic, partition) | ||
if err != nil { | ||
logp.Err("Fetch replicas for partition %s in topic %s: %s", partition, topic, err) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's kinda sad we don't get the replica offsets: https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetAPI(AKAListOffset)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not even sure if we should provide the replicas ids here or not.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd say yes, as these are part of the cluster state.
@urso New version pushed. |
ea68d8f
to
9d667c0
Compare
jenkins, retest it |
"id": brocker.ID(), | ||
"address": brocker.Addr(), | ||
}, | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm... I was in hope for getting the offset per replica and create an event per replicate. The broker
is the current leader to a partition. All other brokers handling the same partition will be the replica. Every now and then the leader is re-elected, turning the broker into an replica and another former replica into the leader.
Some event like:
common.MapStr {
"topic": topic,
"partition": partition,
"broker": common.MapStr {
"id": id,
"address": brokers[id],
}
"offset": offset,
"leader": id == leaderID,
}
The address of every broker and broker id is included in meta-data.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Follow up github issue: #3005
First module added is partition which contains stats about each existing partition.
No description provided.