Skip to content
6 changes: 5 additions & 1 deletion libbeat/outputs/kafka/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,10 @@ func (r *msgRef) fail(msg *message, err error) {
len(msg.key)+len(msg.value))
r.client.observer.PermanentErrors(1)

Comment thread
khushijain21 marked this conversation as resolved.
case strings.Contains(err.Error(), "kafka: invalid configuration"):
Comment thread
khushijain21 marked this conversation as resolved.
Outdated
r.client.log.Errorf("Kafka (topic=%v): dropping message due to invalid configuration %v", msg.topic)
Comment thread
khushijain21 marked this conversation as resolved.
Outdated
r.client.observer.PermanentErrors(1)
Comment thread
khushijain21 marked this conversation as resolved.

case isAuthError(err):
r.client.log.Errorf("Kafka (topic=%v): authorisation error: %s", msg.topic, err)
r.client.observer.PermanentErrors(1)
Expand All @@ -404,7 +408,7 @@ func (r *msgRef) fail(msg *message, err error) {
default:
r.failed = append(r.failed, msg.data)
if r.err == nil {
// Don't overwrite an existing error. This way at tne end of the batch
// Don't overwrite an existing error. This way at the end of the batch
// we report the first error that we saw, rather than the last one.
r.err = err
}
Expand Down