diff --git a/libbeat/outputs/kafka/client.go b/libbeat/outputs/kafka/client.go index 414d82a597d9..3414adec7840 100644 --- a/libbeat/outputs/kafka/client.go +++ b/libbeat/outputs/kafka/client.go @@ -391,6 +391,11 @@ func (r *msgRef) fail(msg *message, err error) { len(msg.key)+len(msg.value)) r.client.observer.PermanentErrors(1) + // drop event if it exceeds size larger than max_message_bytes + case strings.Contains(err.Error(), "Attempt to produce message larger than configured Producer.MaxMessageBytes"): + r.client.log.Errorf("Kafka (topic=%v): dropping message as it exceeds max_mesage_bytes:", msg.topic) + r.client.observer.PermanentErrors(1) + case isAuthError(err): r.client.log.Errorf("Kafka (topic=%v): authorisation error: %s", msg.topic, err) r.client.observer.PermanentErrors(1) @@ -403,7 +408,7 @@ func (r *msgRef) fail(msg *message, err error) { default: r.failed = append(r.failed, msg.data) if r.err == nil { - // Don't overwrite an existing error. This way at tne end of the batch + // Don't overwrite an existing error. This way at the end of the batch // we report the first error that we saw, rather than the last one. r.err = err } diff --git a/libbeat/outputs/kafka/config.go b/libbeat/outputs/kafka/config.go index bf0b0a86d023..97bffd011fc6 100644 --- a/libbeat/outputs/kafka/config.go +++ b/libbeat/outputs/kafka/config.go @@ -185,6 +185,10 @@ func (c *kafkaConfig) Validate() error { return errors.New("either 'topic' or 'topics' must be defined") } + if len(c.Headers) != 0 && c.Version < kafka.Version("0.11") { + return errors.New("including headers is not supported for kafka versions < 0.11") + } + // When running under Elastic-Agent we do not support dynamic topic // selection, so `topics` is not supported and `topic` is treated as an // plain string diff --git a/libbeat/outputs/kafka/kafka_integration_test.go b/libbeat/outputs/kafka/kafka_integration_test.go index 18b75f1c6895..902cad34c5c2 100644 --- a/libbeat/outputs/kafka/kafka_integration_test.go +++ b/libbeat/outputs/kafka/kafka_integration_test.go @@ -347,6 +347,98 @@ func TestKafkaPublish(t *testing.T) { } } +func TestKafkaErrors(t *testing.T) { + id := strconv.Itoa(rand.Int()) + testTopic := fmt.Sprintf("test-libbeat-%s", id) + + tests := []struct { + title string + config map[string]interface{} + topic string + events []eventInfo + errorMessage string + }{ + { + "message of size large than `max_message_bytes` must be dropped", + map[string]interface{}{ + "max_message_bytes": "10", + }, + testTopic, + single(mapstr.M{ + "host": "test-host-random-message-which-is-long-enough", + "message": id, + }), + "dropping message as it exceeds max_mesage_bytes", + }, + } + + defaultConfig := map[string]interface{}{ + "hosts": []string{getTestKafkaHost()}, + "topic": testTopic, + "timeout": "1s", + } + + for _, test := range tests { + + cfg := makeConfig(t, defaultConfig) + if test.config != nil { + err := cfg.Merge(makeConfig(t, test.config)) + if err != nil { + t.Fatal(err) + } + } + + logp.DevelopmentSetup(logp.ToObserverOutput()) + + grp, err := makeKafka(nil, beat.Info{Beat: "libbeat", IndexPrefix: "testbeat"}, outputs.NewNilObserver(), cfg) + if err != nil { + t.Fatal(err) + } + + output, ok := grp.Clients[0].(*client) + assert.True(t, ok, "grp.Clients[0] didn't contain a ptr to client") + if err := output.Connect(context.Background()); err != nil { + t.Fatal(err) + } + assert.Equal(t, output.index, "testbeat") + defer output.Close() + + // publish test events + var wg sync.WaitGroup + for i := range test.events { + batch := outest.NewBatch(test.events[i].events...) + batch.OnSignal = func(_ outest.BatchSignal) { + wg.Done() + } + + wg.Add(1) + err := output.Publish(context.Background(), batch) + if err != nil { + t.Fatal(err) + } + } + + // wait for all published batches to be ACKed + wg.Wait() + + t.Cleanup(func() { + if t.Failed() { + t.Logf("Debug Logs:\n") + for _, log := range logp.ObserverLogs().TakeAll() { + data, err := json.Marshal(log) + if err != nil { + t.Errorf("failed encoding log as JSON: %s", err) + } + t.Logf("%s", string(data)) + } + return + } + }) + assert.GreaterOrEqual(t, logp.ObserverLogs().FilterMessageSnippet(test.errorMessage).Len(), 1) + } + +} + func validateJSON(t *testing.T, value []byte, events []beat.Event) string { var decoded map[string]interface{} err := json.Unmarshal(value, &decoded)