From 953519199be789a6469cdf644de0e215660e215d Mon Sep 17 00:00:00 2001 From: Khushi Jain Date: Thu, 10 Jul 2025 12:05:47 +0530 Subject: [PATCH 1/3] [kafka] Handle configuration errors (#45128) * [kafka] Handle configuration errors (cherry picked from commit 23f4491cca3ede2ca396e03e2bdb8fb954c566a8) # Conflicts: # libbeat/outputs/kafka/kafka_integration_test.go --- libbeat/outputs/kafka/client.go | 7 +- libbeat/outputs/kafka/config.go | 4 + .../outputs/kafka/kafka_integration_test.go | 101 ++++++++++++++++++ 3 files changed, 111 insertions(+), 1 deletion(-) diff --git a/libbeat/outputs/kafka/client.go b/libbeat/outputs/kafka/client.go index 414d82a597d9..3414adec7840 100644 --- a/libbeat/outputs/kafka/client.go +++ b/libbeat/outputs/kafka/client.go @@ -391,6 +391,11 @@ func (r *msgRef) fail(msg *message, err error) { len(msg.key)+len(msg.value)) r.client.observer.PermanentErrors(1) + // drop event if it exceeds size larger than max_message_bytes + case strings.Contains(err.Error(), "Attempt to produce message larger than configured Producer.MaxMessageBytes"): + r.client.log.Errorf("Kafka (topic=%v): dropping message as it exceeds max_mesage_bytes:", msg.topic) + r.client.observer.PermanentErrors(1) + case isAuthError(err): r.client.log.Errorf("Kafka (topic=%v): authorisation error: %s", msg.topic, err) r.client.observer.PermanentErrors(1) @@ -403,7 +408,7 @@ func (r *msgRef) fail(msg *message, err error) { default: r.failed = append(r.failed, msg.data) if r.err == nil { - // Don't overwrite an existing error. This way at tne end of the batch + // Don't overwrite an existing error. This way at the end of the batch // we report the first error that we saw, rather than the last one. r.err = err } diff --git a/libbeat/outputs/kafka/config.go b/libbeat/outputs/kafka/config.go index a082001995d5..17be4f2677b4 100644 --- a/libbeat/outputs/kafka/config.go +++ b/libbeat/outputs/kafka/config.go @@ -185,6 +185,10 @@ func (c *kafkaConfig) Validate() error { return errors.New("either 'topic' or 'topics' must be defined") } + if len(c.Headers) != 0 && c.Version < kafka.Version("0.11") { + return errors.New("including headers is not supported for kafka versions < 0.11") + } + // When running under Elastic-Agent we do not support dynamic topic // selection, so `topics` is not supported and `topic` is treated as an // plain string diff --git a/libbeat/outputs/kafka/kafka_integration_test.go b/libbeat/outputs/kafka/kafka_integration_test.go index e0f42e2667aa..f9dc47feb101 100644 --- a/libbeat/outputs/kafka/kafka_integration_test.go +++ b/libbeat/outputs/kafka/kafka_integration_test.go @@ -31,6 +31,9 @@ import ( "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/zap/zapcore" + "go.uber.org/zap/zaptest/observer" "github.com/elastic/sarama" @@ -42,6 +45,10 @@ import ( "github.com/elastic/beats/v7/libbeat/outputs/outest" "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" +<<<<<<< HEAD +======= + "github.com/elastic/elastic-agent-libs/logp/logptest" +>>>>>>> 23f4491cc ([kafka] Handle configuration errors (#45128)) "github.com/elastic/elastic-agent-libs/mapstr" ) @@ -347,6 +354,100 @@ func TestKafkaPublish(t *testing.T) { } } +func TestKafkaErrors(t *testing.T) { + id := strconv.Itoa(rand.Int()) + testTopic := fmt.Sprintf("test-libbeat-%s", id) + + tests := []struct { + title string + config map[string]interface{} + topic string + events []eventInfo + errorMessage string + }{ + { + "message of size large than `max_message_bytes` must be dropped", + map[string]interface{}{ + "max_message_bytes": "10", + }, + testTopic, + single(mapstr.M{ + "host": "test-host-random-message-which-is-long-enough", + "message": id, + }), + "dropping message as it exceeds max_mesage_bytes", + }, + } + + defaultConfig := map[string]interface{}{ + "hosts": []string{getTestKafkaHost()}, + "topic": testTopic, + "timeout": "1s", + } + + for _, test := range tests { + + cfg := makeConfig(t, defaultConfig) + if test.config != nil { + err := cfg.Merge(makeConfig(t, test.config)) + if err != nil { + t.Fatal(err) + } + } + + observed, zapLogs := observer.New(zapcore.DebugLevel) + logger, err := logp.ConfigureWithCoreLocal(logp.Config{}, observed) + require.NoError(t, err) + + grp, err := makeKafka(nil, beat.Info{Beat: "libbeat", IndexPrefix: "testbeat", Logger: logger}, outputs.NewNilObserver(), cfg) + if err != nil { + t.Fatal(err) + } + + output, ok := grp.Clients[0].(*client) + assert.True(t, ok, "grp.Clients[0] didn't contain a ptr to client") + if err := output.Connect(context.Background()); err != nil { + t.Fatal(err) + } + assert.Equal(t, output.index, "testbeat") + defer output.Close() + + // publish test events + var wg sync.WaitGroup + for i := range test.events { + batch := outest.NewBatch(test.events[i].events...) + batch.OnSignal = func(_ outest.BatchSignal) { + wg.Done() + } + + wg.Add(1) + err := output.Publish(context.Background(), batch) + if err != nil { + t.Fatal(err) + } + } + + // wait for all published batches to be ACKed + wg.Wait() + + t.Cleanup(func() { + if t.Failed() { + t.Logf("Debug Logs:\n") + for _, log := range zapLogs.TakeAll() { + data, err := json.Marshal(log) + if err != nil { + t.Errorf("failed encoding log as JSON: %s", err) + } + t.Logf("%s", string(data)) + } + return + } + }) + assert.GreaterOrEqual(t, zapLogs.FilterMessageSnippet(test.errorMessage).Len(), 1) + } + +} + func validateJSON(t *testing.T, value []byte, events []beat.Event) string { var decoded map[string]interface{} err := json.Unmarshal(value, &decoded) From 75ca0ddb1057342a08c888d6c8bc956b2ffc67a5 Mon Sep 17 00:00:00 2001 From: Khushi Jain Date: Thu, 10 Jul 2025 12:05:47 +0530 Subject: [PATCH 2/3] [kafka] Handle configuration errors (#45128) * [kafka] Handle configuration errors --- libbeat/outputs/kafka/kafka_integration_test.go | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/libbeat/outputs/kafka/kafka_integration_test.go b/libbeat/outputs/kafka/kafka_integration_test.go index f9dc47feb101..062b39aa0b1b 100644 --- a/libbeat/outputs/kafka/kafka_integration_test.go +++ b/libbeat/outputs/kafka/kafka_integration_test.go @@ -45,10 +45,7 @@ import ( "github.com/elastic/beats/v7/libbeat/outputs/outest" "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" -<<<<<<< HEAD -======= - "github.com/elastic/elastic-agent-libs/logp/logptest" ->>>>>>> 23f4491cc ([kafka] Handle configuration errors (#45128)) + "github.com/elastic/elastic-agent-libs/mapstr" ) From 41ea481e6ac5d6c7c18698d79343e70deaa030d6 Mon Sep 17 00:00:00 2001 From: Khushi Jain Date: Thu, 10 Jul 2025 21:46:36 +0530 Subject: [PATCH 3/3] add kafka test --- libbeat/outputs/kafka/kafka_integration_test.go | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) diff --git a/libbeat/outputs/kafka/kafka_integration_test.go b/libbeat/outputs/kafka/kafka_integration_test.go index 062b39aa0b1b..a79b31486aec 100644 --- a/libbeat/outputs/kafka/kafka_integration_test.go +++ b/libbeat/outputs/kafka/kafka_integration_test.go @@ -31,9 +31,6 @@ import ( "time" "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "go.uber.org/zap/zapcore" - "go.uber.org/zap/zaptest/observer" "github.com/elastic/sarama" @@ -392,11 +389,9 @@ func TestKafkaErrors(t *testing.T) { } } - observed, zapLogs := observer.New(zapcore.DebugLevel) - logger, err := logp.ConfigureWithCoreLocal(logp.Config{}, observed) - require.NoError(t, err) + logp.DevelopmentSetup(logp.ToObserverOutput()) - grp, err := makeKafka(nil, beat.Info{Beat: "libbeat", IndexPrefix: "testbeat", Logger: logger}, outputs.NewNilObserver(), cfg) + grp, err := makeKafka(nil, beat.Info{Beat: "libbeat", IndexPrefix: "testbeat"}, outputs.NewNilObserver(), cfg) if err != nil { t.Fatal(err) } @@ -430,7 +425,7 @@ func TestKafkaErrors(t *testing.T) { t.Cleanup(func() { if t.Failed() { t.Logf("Debug Logs:\n") - for _, log := range zapLogs.TakeAll() { + for _, log := range logp.ObserverLogs().TakeAll() { data, err := json.Marshal(log) if err != nil { t.Errorf("failed encoding log as JSON: %s", err) @@ -440,7 +435,7 @@ func TestKafkaErrors(t *testing.T) { return } }) - assert.GreaterOrEqual(t, zapLogs.FilterMessageSnippet(test.errorMessage).Len(), 1) + assert.GreaterOrEqual(t, logp.ObserverLogs().FilterMessageSnippet(test.errorMessage).Len(), 1) } }