Skip to content
8 changes: 5 additions & 3 deletions libbeat/outputs/kafka/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -385,13 +385,15 @@ func (r *msgRef) fail(msg *message, err error) {
case errors.Is(err, sarama.ErrInvalidMessage):
r.client.log.Errorf("Kafka (topic=%v): dropping invalid message", msg.topic)
r.client.observer.PermanentErrors(1)

Comment thread
khushijain21 marked this conversation as resolved.
case errors.Is(err, sarama.ErrMessageSizeTooLarge) || errors.Is(err, sarama.ErrInvalidMessageSize):
r.client.log.Errorf("Kafka (topic=%v): dropping too large message of size %v.",
msg.topic,
len(msg.key)+len(msg.value))
r.client.observer.PermanentErrors(1)

Comment thread
khushijain21 marked this conversation as resolved.
// drop event if it exceeds size larger than max_message_bytes
case strings.Contains(err.Error(), "Attempt to produce message larger than configured Producer.MaxMessageBytes"):
r.client.log.Errorf("Kafka (topic=%v): dropping message as it exceeds max_mesage_bytes:", msg.topic)
r.client.observer.PermanentErrors(1)
Comment thread
khushijain21 marked this conversation as resolved.
case isAuthError(err):
r.client.log.Errorf("Kafka (topic=%v): authorisation error: %s", msg.topic, err)
r.client.observer.PermanentErrors(1)
Expand All @@ -404,7 +406,7 @@ func (r *msgRef) fail(msg *message, err error) {
default:
r.failed = append(r.failed, msg.data)
if r.err == nil {
// Don't overwrite an existing error. This way at tne end of the batch
// Don't overwrite an existing error. This way at the end of the batch
// we report the first error that we saw, rather than the last one.
r.err = err
}
Expand Down
4 changes: 4 additions & 0 deletions libbeat/outputs/kafka/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@

if c.Compression == "gzip" {
lvl := c.CompressionLevel
if lvl != sarama.CompressionLevelDefault && !(0 <= lvl && lvl <= 9) {

Check failure on line 179 in libbeat/outputs/kafka/config.go

View workflow job for this annotation

GitHub Actions / lint (ubuntu-latest)

QF1001: could apply De Morgan's law (staticcheck)
return fmt.Errorf("compression_level must be between 0 and 9")
}
}
Expand All @@ -185,6 +185,10 @@
return errors.New("either 'topic' or 'topics' must be defined")
}

if len(c.Headers) != 0 && c.Version < kafka.Version("0.11") {
return errors.New("including headers is not supported for kafka versions < 0.11")
}

// When running under Elastic-Agent we do not support dynamic topic
// selection, so `topics` is not supported and `topic` is treated as an
// plain string
Expand Down Expand Up @@ -238,7 +242,7 @@
k.Net.SASL.Enable = true
k.Net.SASL.Mechanism = sarama.SASLTypeGSSAPI
k.Net.SASL.GSSAPI = sarama.GSSAPIConfig{
AuthType: int(config.Kerberos.AuthType),

Check failure on line 245 in libbeat/outputs/kafka/config.go

View workflow job for this annotation

GitHub Actions / lint (ubuntu-latest)

G115: integer overflow conversion uint -> int (gosec)
KeyTabPath: config.Kerberos.KeyTabPath,
KerberosConfigPath: config.Kerberos.ConfigPath,
ServiceName: config.Kerberos.ServiceName,
Expand Down Expand Up @@ -266,12 +270,12 @@
k.Producer.MaxMessageBytes = *config.MaxMessageBytes
}
if config.RequiredACKs != nil {
k.Producer.RequiredAcks = sarama.RequiredAcks(*config.RequiredACKs)

Check failure on line 273 in libbeat/outputs/kafka/config.go

View workflow job for this annotation

GitHub Actions / lint (ubuntu-latest)

G115: integer overflow conversion int -> int16 (gosec)
}

compressionMode, ok := compressionModes[strings.ToLower(config.Compression)]
if !ok {
return nil, fmt.Errorf("Unknown compression mode: '%v'", config.Compression)

Check failure on line 278 in libbeat/outputs/kafka/config.go

View workflow job for this annotation

GitHub Actions / lint (ubuntu-latest)

ST1005: error strings should not be capitalized (staticcheck)
}
k.Producer.Compression = compressionMode

Expand Down Expand Up @@ -300,7 +304,7 @@

version, ok := config.Version.Get()
if !ok {
return nil, fmt.Errorf("Unknown/unsupported kafka version: %v", config.Version)

Check failure on line 307 in libbeat/outputs/kafka/config.go

View workflow job for this annotation

GitHub Actions / lint (ubuntu-latest)

ST1005: error strings should not be capitalized (staticcheck)
}
k.Version = version

Expand Down Expand Up @@ -332,7 +336,7 @@
// compute 'base' duration for exponential backoff
dur := cfg.Max
if retries < maxBackoffRetries {
dur = time.Duration(uint64(cfg.Init) * uint64(1<<retries))

Check failure on line 339 in libbeat/outputs/kafka/config.go

View workflow job for this annotation

GitHub Actions / lint (ubuntu-latest)

G115: integer overflow conversion int64 -> uint64 (gosec)
}

// apply about equaly distributed jitter in second half of the interval, such that the wait
Expand Down
Loading