Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion libbeat/outputs/kafka/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,11 @@ func (r *msgRef) fail(msg *message, err error) {
len(msg.key)+len(msg.value))
r.client.observer.PermanentErrors(1)

// drop event if it exceeds size larger than max_message_bytes
case strings.Contains(err.Error(), "Attempt to produce message larger than configured Producer.MaxMessageBytes"):
r.client.log.Errorf("Kafka (topic=%v): dropping message as it exceeds max_mesage_bytes:", msg.topic)
r.client.observer.PermanentErrors(1)

case isAuthError(err):
r.client.log.Errorf("Kafka (topic=%v): authorisation error: %s", msg.topic, err)
r.client.observer.PermanentErrors(1)
Expand All @@ -403,7 +408,7 @@ func (r *msgRef) fail(msg *message, err error) {
default:
r.failed = append(r.failed, msg.data)
if r.err == nil {
// Don't overwrite an existing error. This way at tne end of the batch
// Don't overwrite an existing error. This way at the end of the batch
// we report the first error that we saw, rather than the last one.
r.err = err
}
Expand Down
4 changes: 4 additions & 0 deletions libbeat/outputs/kafka/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,10 @@ func (c *kafkaConfig) Validate() error {
return errors.New("either 'topic' or 'topics' must be defined")
}

if len(c.Headers) != 0 && c.Version < kafka.Version("0.11") {
return errors.New("including headers is not supported for kafka versions < 0.11")
}

// When running under Elastic-Agent we do not support dynamic topic
// selection, so `topics` is not supported and `topic` is treated as an
// plain string
Expand Down
101 changes: 101 additions & 0 deletions libbeat/outputs/kafka/kafka_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zapcore"
"go.uber.org/zap/zaptest/observer"

"github.com/elastic/sarama"

Expand All @@ -42,6 +45,10 @@
"github.com/elastic/beats/v7/libbeat/outputs/outest"
"github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
<<<<<<< HEAD

Check failure on line 48 in libbeat/outputs/kafka/kafka_integration_test.go

View workflow job for this annotation

GitHub Actions / lint (ubuntu-latest)

missing import path (typecheck)
=======

Check failure on line 49 in libbeat/outputs/kafka/kafka_integration_test.go

View workflow job for this annotation

GitHub Actions / lint (ubuntu-latest)

missing import path (typecheck)
"github.com/elastic/elastic-agent-libs/logp/logptest"
>>>>>>> 23f4491cc ([kafka] Handle configuration errors (#45128))

Check failure on line 51 in libbeat/outputs/kafka/kafka_integration_test.go

View workflow job for this annotation

GitHub Actions / lint (ubuntu-latest)

missing import path (typecheck)
"github.com/elastic/elastic-agent-libs/mapstr"
)

Expand Down Expand Up @@ -347,6 +354,100 @@
}
}

func TestKafkaErrors(t *testing.T) {
id := strconv.Itoa(rand.Int())
testTopic := fmt.Sprintf("test-libbeat-%s", id)

tests := []struct {
title string
config map[string]interface{}
topic string
events []eventInfo
errorMessage string
}{
{
"message of size large than `max_message_bytes` must be dropped",
map[string]interface{}{
"max_message_bytes": "10",
},
testTopic,
single(mapstr.M{
"host": "test-host-random-message-which-is-long-enough",
"message": id,
}),
"dropping message as it exceeds max_mesage_bytes",
},
}

defaultConfig := map[string]interface{}{
"hosts": []string{getTestKafkaHost()},
"topic": testTopic,
"timeout": "1s",
}

for _, test := range tests {

cfg := makeConfig(t, defaultConfig)
if test.config != nil {
err := cfg.Merge(makeConfig(t, test.config))
if err != nil {
t.Fatal(err)
}
}

observed, zapLogs := observer.New(zapcore.DebugLevel)
logger, err := logp.ConfigureWithCoreLocal(logp.Config{}, observed)
require.NoError(t, err)

grp, err := makeKafka(nil, beat.Info{Beat: "libbeat", IndexPrefix: "testbeat", Logger: logger}, outputs.NewNilObserver(), cfg)
if err != nil {
t.Fatal(err)
}

output, ok := grp.Clients[0].(*client)
assert.True(t, ok, "grp.Clients[0] didn't contain a ptr to client")
if err := output.Connect(context.Background()); err != nil {
t.Fatal(err)
}
assert.Equal(t, output.index, "testbeat")
defer output.Close()

// publish test events
var wg sync.WaitGroup
for i := range test.events {
batch := outest.NewBatch(test.events[i].events...)
batch.OnSignal = func(_ outest.BatchSignal) {
wg.Done()
}

wg.Add(1)
err := output.Publish(context.Background(), batch)
if err != nil {
t.Fatal(err)
}
}

// wait for all published batches to be ACKed
wg.Wait()

t.Cleanup(func() {
if t.Failed() {
t.Logf("Debug Logs:\n")
for _, log := range zapLogs.TakeAll() {
data, err := json.Marshal(log)
if err != nil {
t.Errorf("failed encoding log as JSON: %s", err)
}
t.Logf("%s", string(data))
}
return
}
})
assert.GreaterOrEqual(t, zapLogs.FilterMessageSnippet(test.errorMessage).Len(), 1)
}

}

func validateJSON(t *testing.T, value []byte, events []beat.Event) string {
var decoded map[string]interface{}
err := json.Unmarshal(value, &decoded)
Expand Down
Loading