From 95b69b9e8ce6c4b553c5d76d7ef8f540e9850ecd Mon Sep 17 00:00:00 2001 From: Khushi Jain Date: Tue, 1 Jul 2025 15:36:50 +0530 Subject: [PATCH 1/8] [kafka] Handle configuration errors --- libbeat/outputs/kafka/client.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/libbeat/outputs/kafka/client.go b/libbeat/outputs/kafka/client.go index 9944c5878671..8d5f065d903e 100644 --- a/libbeat/outputs/kafka/client.go +++ b/libbeat/outputs/kafka/client.go @@ -392,6 +392,10 @@ func (r *msgRef) fail(msg *message, err error) { len(msg.key)+len(msg.value)) r.client.observer.PermanentErrors(1) + case strings.Contains(err.Error(), "kafka: invalid configuration"): + r.client.log.Errorf("Kafka (topic=%v): dropping message due to invalid configuration %v", msg.topic) + r.client.observer.PermanentErrors(1) + case isAuthError(err): r.client.log.Errorf("Kafka (topic=%v): authorisation error: %s", msg.topic, err) r.client.observer.PermanentErrors(1) @@ -404,7 +408,7 @@ func (r *msgRef) fail(msg *message, err error) { default: r.failed = append(r.failed, msg.data) if r.err == nil { - // Don't overwrite an existing error. This way at tne end of the batch + // Don't overwrite an existing error. This way at the end of the batch // we report the first error that we saw, rather than the last one. r.err = err } From 56a457d43d79c519300cab3d9150d2c871ff81ff Mon Sep 17 00:00:00 2001 From: Khushi Jain Date: Wed, 2 Jul 2025 15:43:34 +0530 Subject: [PATCH 2/8] address comments --- libbeat/outputs/kafka/client.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libbeat/outputs/kafka/client.go b/libbeat/outputs/kafka/client.go index 8d5f065d903e..01d67d8561da 100644 --- a/libbeat/outputs/kafka/client.go +++ b/libbeat/outputs/kafka/client.go @@ -393,7 +393,7 @@ func (r *msgRef) fail(msg *message, err error) { r.client.observer.PermanentErrors(1) case strings.Contains(err.Error(), "kafka: invalid configuration"): - r.client.log.Errorf("Kafka (topic=%v): dropping message due to invalid configuration %v", msg.topic) + r.client.log.Errorf("Kafka (topic=%v): dropping message due to invalid configuration: %v", msg.topic, err) r.client.observer.PermanentErrors(1) case isAuthError(err): From 6c4a57207a105c25930a9e098feacde9877ca924 Mon Sep 17 00:00:00 2001 From: Khushi Jain Date: Thu, 3 Jul 2025 12:35:39 +0530 Subject: [PATCH 3/8] address comments --- libbeat/outputs/kafka/client.go | 13 +++---------- libbeat/outputs/kafka/config.go | 4 ++++ 2 files changed, 7 insertions(+), 10 deletions(-) diff --git a/libbeat/outputs/kafka/client.go b/libbeat/outputs/kafka/client.go index 01d67d8561da..c5737bc0eefd 100644 --- a/libbeat/outputs/kafka/client.go +++ b/libbeat/outputs/kafka/client.go @@ -385,17 +385,10 @@ func (r *msgRef) fail(msg *message, err error) { case errors.Is(err, sarama.ErrInvalidMessage): r.client.log.Errorf("Kafka (topic=%v): dropping invalid message", msg.topic) r.client.observer.PermanentErrors(1) - - case errors.Is(err, sarama.ErrMessageSizeTooLarge) || errors.Is(err, sarama.ErrInvalidMessageSize): - r.client.log.Errorf("Kafka (topic=%v): dropping too large message of size %v.", - msg.topic, - len(msg.key)+len(msg.value)) - r.client.observer.PermanentErrors(1) - - case strings.Contains(err.Error(), "kafka: invalid configuration"): - r.client.log.Errorf("Kafka (topic=%v): dropping message due to invalid configuration: %v", msg.topic, err) + // drop event if it exceeds size larger than max_message_bytes + case strings.Contains(err.Error(), "Attempt to produce message larger than configured Producer.MaxMessageBytes"): + r.client.log.Errorf("Kafka (topic=%v): dropping message as it exceeds max_mesage_bytes: configure `output.kafka.max_message_bytes` to change this behavior", msg.topic) r.client.observer.PermanentErrors(1) - case isAuthError(err): r.client.log.Errorf("Kafka (topic=%v): authorisation error: %s", msg.topic, err) r.client.observer.PermanentErrors(1) diff --git a/libbeat/outputs/kafka/config.go b/libbeat/outputs/kafka/config.go index bf0b0a86d023..97bffd011fc6 100644 --- a/libbeat/outputs/kafka/config.go +++ b/libbeat/outputs/kafka/config.go @@ -185,6 +185,10 @@ func (c *kafkaConfig) Validate() error { return errors.New("either 'topic' or 'topics' must be defined") } + if len(c.Headers) != 0 && c.Version < kafka.Version("0.11") { + return errors.New("including headers is not supported for kafka versions < 0.11") + } + // When running under Elastic-Agent we do not support dynamic topic // selection, so `topics` is not supported and `topic` is treated as an // plain string From f85c6b500feea3092eba5813fb12c7103a2e4c9e Mon Sep 17 00:00:00 2001 From: Khushi Jain Date: Thu, 3 Jul 2025 12:37:47 +0530 Subject: [PATCH 4/8] address comments --- libbeat/outputs/kafka/client.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/libbeat/outputs/kafka/client.go b/libbeat/outputs/kafka/client.go index c5737bc0eefd..5aa7d661e75f 100644 --- a/libbeat/outputs/kafka/client.go +++ b/libbeat/outputs/kafka/client.go @@ -385,6 +385,11 @@ func (r *msgRef) fail(msg *message, err error) { case errors.Is(err, sarama.ErrInvalidMessage): r.client.log.Errorf("Kafka (topic=%v): dropping invalid message", msg.topic) r.client.observer.PermanentErrors(1) + case errors.Is(err, sarama.ErrMessageSizeTooLarge) || errors.Is(err, sarama.ErrInvalidMessageSize): + r.client.log.Errorf("Kafka (topic=%v): dropping too large message of size %v.", + msg.topic, + len(msg.key)+len(msg.value)) + r.client.observer.PermanentErrors(1) // drop event if it exceeds size larger than max_message_bytes case strings.Contains(err.Error(), "Attempt to produce message larger than configured Producer.MaxMessageBytes"): r.client.log.Errorf("Kafka (topic=%v): dropping message as it exceeds max_mesage_bytes: configure `output.kafka.max_message_bytes` to change this behavior", msg.topic) From cb1215a822bf9ab74d6942f3ae6a2c92d91f9e9f Mon Sep 17 00:00:00 2001 From: Khushi Jain Date: Thu, 3 Jul 2025 12:38:47 +0530 Subject: [PATCH 5/8] address comments --- libbeat/outputs/kafka/client.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libbeat/outputs/kafka/client.go b/libbeat/outputs/kafka/client.go index 5aa7d661e75f..aadce7bb355c 100644 --- a/libbeat/outputs/kafka/client.go +++ b/libbeat/outputs/kafka/client.go @@ -392,7 +392,7 @@ func (r *msgRef) fail(msg *message, err error) { r.client.observer.PermanentErrors(1) // drop event if it exceeds size larger than max_message_bytes case strings.Contains(err.Error(), "Attempt to produce message larger than configured Producer.MaxMessageBytes"): - r.client.log.Errorf("Kafka (topic=%v): dropping message as it exceeds max_mesage_bytes: configure `output.kafka.max_message_bytes` to change this behavior", msg.topic) + r.client.log.Errorf("Kafka (topic=%v): dropping message as it exceeds max_mesage_bytes: configure `output.kafka.max_message_bytes` to a larger value", msg.topic) r.client.observer.PermanentErrors(1) case isAuthError(err): r.client.log.Errorf("Kafka (topic=%v): authorisation error: %s", msg.topic, err) From 58f6e2a09293aa80389ffd5040bece835d580ae2 Mon Sep 17 00:00:00 2001 From: Khushi Jain Date: Mon, 7 Jul 2025 11:27:35 +0530 Subject: [PATCH 6/8] chnage message --- libbeat/outputs/kafka/client.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libbeat/outputs/kafka/client.go b/libbeat/outputs/kafka/client.go index aadce7bb355c..8faf9112e206 100644 --- a/libbeat/outputs/kafka/client.go +++ b/libbeat/outputs/kafka/client.go @@ -392,7 +392,7 @@ func (r *msgRef) fail(msg *message, err error) { r.client.observer.PermanentErrors(1) // drop event if it exceeds size larger than max_message_bytes case strings.Contains(err.Error(), "Attempt to produce message larger than configured Producer.MaxMessageBytes"): - r.client.log.Errorf("Kafka (topic=%v): dropping message as it exceeds max_mesage_bytes: configure `output.kafka.max_message_bytes` to a larger value", msg.topic) + r.client.log.Errorf("Kafka (topic=%v): dropping message as it exceeds max_mesage_bytes:", msg.topic) r.client.observer.PermanentErrors(1) case isAuthError(err): r.client.log.Errorf("Kafka (topic=%v): authorisation error: %s", msg.topic, err) From 850b67853a5f8965317c1a8ce3984eccd6df92f4 Mon Sep 17 00:00:00 2001 From: Khushi Jain Date: Wed, 9 Jul 2025 11:38:59 +0530 Subject: [PATCH 7/8] add test --- .../outputs/kafka/kafka_integration_test.go | 98 +++++++++++++++++++ 1 file changed, 98 insertions(+) diff --git a/libbeat/outputs/kafka/kafka_integration_test.go b/libbeat/outputs/kafka/kafka_integration_test.go index dbf55cf5b40c..91f88eb6d326 100644 --- a/libbeat/outputs/kafka/kafka_integration_test.go +++ b/libbeat/outputs/kafka/kafka_integration_test.go @@ -31,6 +31,9 @@ import ( "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/zap/zapcore" + "go.uber.org/zap/zaptest/observer" "github.com/elastic/sarama" @@ -41,6 +44,7 @@ import ( _ "github.com/elastic/beats/v7/libbeat/outputs/codec/json" "github.com/elastic/beats/v7/libbeat/outputs/outest" "github.com/elastic/elastic-agent-libs/config" + "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/logp/logptest" "github.com/elastic/elastic-agent-libs/mapstr" ) @@ -347,6 +351,100 @@ func TestKafkaPublish(t *testing.T) { } } +func TestKafkaErrors(t *testing.T) { + id := strconv.Itoa(rand.Int()) + testTopic := fmt.Sprintf("test-libbeat-%s", id) + + tests := []struct { + title string + config map[string]interface{} + topic string + events []eventInfo + errorMessage string + }{ + { + "message of size large than `max_message_bytes` must be dropped", + map[string]interface{}{ + "max_message_bytes": "10", + }, + testTopic, + single(mapstr.M{ + "host": "test-host-random-message-which-is-long-enough", + "message": id, + }), + "dropping message as it exceeds max_mesage_bytes", + }, + } + + defaultConfig := map[string]interface{}{ + "hosts": []string{getTestKafkaHost()}, + "topic": testTopic, + "timeout": "1s", + } + + for _, test := range tests { + + cfg := makeConfig(t, defaultConfig) + if test.config != nil { + err := cfg.Merge(makeConfig(t, test.config)) + if err != nil { + t.Fatal(err) + } + } + + observed, zapLogs := observer.New(zapcore.DebugLevel) + logger, err := logp.ConfigureWithCoreLocal(logp.Config{}, observed) + require.NoError(t, err) + + grp, err := makeKafka(nil, beat.Info{Beat: "libbeat", IndexPrefix: "testbeat", Logger: logger}, outputs.NewNilObserver(), cfg) + if err != nil { + t.Fatal(err) + } + + output, ok := grp.Clients[0].(*client) + assert.True(t, ok, "grp.Clients[0] didn't contain a ptr to client") + if err := output.Connect(context.Background()); err != nil { + t.Fatal(err) + } + assert.Equal(t, output.index, "testbeat") + defer output.Close() + + // publish test events + var wg sync.WaitGroup + for i := range test.events { + batch := outest.NewBatch(test.events[i].events...) + batch.OnSignal = func(_ outest.BatchSignal) { + wg.Done() + } + + wg.Add(1) + err := output.Publish(context.Background(), batch) + if err != nil { + t.Fatal(err) + } + } + + // wait for all published batches to be ACKed + wg.Wait() + + t.Cleanup(func() { + if t.Failed() { + t.Logf("Debug Logs:\n") + for _, log := range zapLogs.TakeAll() { + data, err := json.Marshal(log) + if err != nil { + t.Errorf("failed encoding log as JSON: %s", err) + } + t.Logf("%s", string(data)) + } + return + } + }) + assert.GreaterOrEqual(t, zapLogs.FilterMessageSnippet(test.errorMessage).Len(), 1) + } + +} + func validateJSON(t *testing.T, value []byte, events []beat.Event) string { var decoded map[string]interface{} err := json.Unmarshal(value, &decoded) From 2e3fdb700ee0acb87ce35ff46601459ba89aaa83 Mon Sep 17 00:00:00 2001 From: Khushi Jain Date: Thu, 10 Jul 2025 09:48:06 +0530 Subject: [PATCH 8/8] formatting --- libbeat/outputs/kafka/client.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/libbeat/outputs/kafka/client.go b/libbeat/outputs/kafka/client.go index 8faf9112e206..e4eac456c620 100644 --- a/libbeat/outputs/kafka/client.go +++ b/libbeat/outputs/kafka/client.go @@ -385,15 +385,18 @@ func (r *msgRef) fail(msg *message, err error) { case errors.Is(err, sarama.ErrInvalidMessage): r.client.log.Errorf("Kafka (topic=%v): dropping invalid message", msg.topic) r.client.observer.PermanentErrors(1) + case errors.Is(err, sarama.ErrMessageSizeTooLarge) || errors.Is(err, sarama.ErrInvalidMessageSize): r.client.log.Errorf("Kafka (topic=%v): dropping too large message of size %v.", msg.topic, len(msg.key)+len(msg.value)) r.client.observer.PermanentErrors(1) + // drop event if it exceeds size larger than max_message_bytes case strings.Contains(err.Error(), "Attempt to produce message larger than configured Producer.MaxMessageBytes"): r.client.log.Errorf("Kafka (topic=%v): dropping message as it exceeds max_mesage_bytes:", msg.topic) r.client.observer.PermanentErrors(1) + case isAuthError(err): r.client.log.Errorf("Kafka (topic=%v): authorisation error: %s", msg.topic, err) r.client.observer.PermanentErrors(1)