[kafka] Handle configuration errors#45128
Conversation
🤖 GitHub commentsExpand to view the GitHub comments
Just comment with:
|
|
This pull request does not have a backport label.
To fixup this pull request, you need to add the backport labels for the needed
|
|
Pinging @elastic/elastic-agent-data-plane (Team:Elastic-Agent-Data-Plane) |
belimawr
left a comment
There was a problem hiding this comment.
I don't think dropping messages on all configuration errors is the correct approach here.
The goal is to fix #44619 by bringing back the behaviour of dropping messages if they're too large. Other configuration issues should be handled the same way they're handled today.
The problem with dropping messages on any configuration errors is that a configuration error can be fixed with user intervention. In that case we do not want inputs that keep track of their progress (like Filestream) to ingest all available data just to have it discarded by the output. Effectively creating another instance of #43250.
Even though it's not ideal, I prefer to compare the error string looking for Attempt to produce message larger than configured Producer.MaxMessageBytes (see the PR introducing the change), if it matches, we then drop the event like we used to. Any other configuration error we still retry.
faec
left a comment
There was a problem hiding this comment.
Looks mostly fine, a couple small requests. (I'm about to be on PTO for a few days so feel free to merge in my absence once other reviewers are happy.)
|
@tiago can you take a look again. I have ensured we don't drop all configuration related and errors and also incorporated Fae's remarks |
belimawr
left a comment
There was a problem hiding this comment.
The code looks good now!
Sorry for not mentioning that before, but could you add some tests for the code you added? Especially now that we're looking at the error string we should have a test enduring the behaviour is not going to change.
belimawr
left a comment
There was a problem hiding this comment.
It looks great now!
I added some minor comments about formatting, feel free to ignore them ;)
|
@Mergifyio backport 8.17 8.18 8.19 9.0 9.1 |
✅ Backports have been createdDetails
|
* [kafka] Handle configuration errors (cherry picked from commit 23f4491) # Conflicts: # libbeat/outputs/kafka/client.go # libbeat/outputs/kafka/kafka_integration_test.go
* [kafka] Handle configuration errors (cherry picked from commit 23f4491) # Conflicts: # libbeat/outputs/kafka/kafka_integration_test.go
* [kafka] Handle configuration errors (cherry picked from commit 23f4491)
* [kafka] Handle configuration errors (cherry picked from commit 23f4491) # Conflicts: # libbeat/outputs/kafka/kafka_integration_test.go
* [kafka] Handle configuration errors (cherry picked from commit 23f4491)
* [kafka] Handle configuration errors
* [kafka] Handle configuration errors (#45128)
* [kafka] Handle configuration errors (#45128)
Proposed commit message
This PR handles kafka configuration errors by dropping the message. Since configuration errors are of permanent error type and should not be retried.
Checklist
CHANGELOG.next.asciidocorCHANGELOG-developer.next.asciidoc.How to test this PR locally
Bring kafka server up using following command
Use following filebeat.yml
and you can see this error
{ "log.level": "error", "@timestamp": "2025-07-02T15:28:04.373+0530", "log.logger": "kafka", "log.origin": { "function": "github.com/elastic/beats/v7/libbeat/outputs/kafka.(*msgRef).fail", "file.name": "kafka/client.go", "file.line": 396 }, "message": "Kafka (topic=beats): dropping message due to invalid configuration kafka: invalid configuration (Attempt to produce message larger than configured Producer.MaxMessageBytes: 851 > 10)", "service.name": "filebeat", "ecs.version": "1.6.0" }Related issues
max_message_bytes) instead of dropping it (>=v8.18.0) #44619