Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion libbeat/outputs/kafka/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,11 @@ func (r *msgRef) fail(msg *message, err error) {
len(msg.key)+len(msg.value))
r.client.observer.PermanentErrors(1)

// drop event if it exceeds size larger than max_message_bytes
case strings.Contains(err.Error(), "Attempt to produce message larger than configured Producer.MaxMessageBytes"):
r.client.log.Errorf("Kafka (topic=%v): dropping message as it exceeds max_mesage_bytes:", msg.topic)
r.client.observer.PermanentErrors(1)

case isAuthError(err):
r.client.log.Errorf("Kafka (topic=%v): authorisation error: %s", msg.topic, err)
r.client.observer.PermanentErrors(1)
Expand All @@ -404,7 +409,7 @@ func (r *msgRef) fail(msg *message, err error) {
default:
r.failed = append(r.failed, msg.data)
if r.err == nil {
// Don't overwrite an existing error. This way at tne end of the batch
// Don't overwrite an existing error. This way at the end of the batch
// we report the first error that we saw, rather than the last one.
r.err = err
}
Expand Down
4 changes: 4 additions & 0 deletions libbeat/outputs/kafka/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@

if c.Compression == "gzip" {
lvl := c.CompressionLevel
if lvl != sarama.CompressionLevelDefault && !(0 <= lvl && lvl <= 9) {

Check failure on line 179 in libbeat/outputs/kafka/config.go

View workflow job for this annotation

GitHub Actions / lint (ubuntu-latest)

QF1001: could apply De Morgan's law (staticcheck)
return fmt.Errorf("compression_level must be between 0 and 9")
}
}
Expand All @@ -185,6 +185,10 @@
return errors.New("either 'topic' or 'topics' must be defined")
}

if len(c.Headers) != 0 && c.Version < kafka.Version("0.11") {
return errors.New("including headers is not supported for kafka versions < 0.11")
}

// When running under Elastic-Agent we do not support dynamic topic
// selection, so `topics` is not supported and `topic` is treated as an
// plain string
Expand Down Expand Up @@ -238,7 +242,7 @@
k.Net.SASL.Enable = true
k.Net.SASL.Mechanism = sarama.SASLTypeGSSAPI
k.Net.SASL.GSSAPI = sarama.GSSAPIConfig{
AuthType: int(config.Kerberos.AuthType),

Check failure on line 245 in libbeat/outputs/kafka/config.go

View workflow job for this annotation

GitHub Actions / lint (ubuntu-latest)

G115: integer overflow conversion uint -> int (gosec)
KeyTabPath: config.Kerberos.KeyTabPath,
KerberosConfigPath: config.Kerberos.ConfigPath,
ServiceName: config.Kerberos.ServiceName,
Expand Down Expand Up @@ -266,12 +270,12 @@
k.Producer.MaxMessageBytes = *config.MaxMessageBytes
}
if config.RequiredACKs != nil {
k.Producer.RequiredAcks = sarama.RequiredAcks(*config.RequiredACKs)

Check failure on line 273 in libbeat/outputs/kafka/config.go

View workflow job for this annotation

GitHub Actions / lint (ubuntu-latest)

G115: integer overflow conversion int -> int16 (gosec)
}

compressionMode, ok := compressionModes[strings.ToLower(config.Compression)]
if !ok {
return nil, fmt.Errorf("Unknown compression mode: '%v'", config.Compression)

Check failure on line 278 in libbeat/outputs/kafka/config.go

View workflow job for this annotation

GitHub Actions / lint (ubuntu-latest)

ST1005: error strings should not be capitalized (staticcheck)
}
k.Producer.Compression = compressionMode

Expand Down Expand Up @@ -300,7 +304,7 @@

version, ok := config.Version.Get()
if !ok {
return nil, fmt.Errorf("Unknown/unsupported kafka version: %v", config.Version)

Check failure on line 307 in libbeat/outputs/kafka/config.go

View workflow job for this annotation

GitHub Actions / lint (ubuntu-latest)

ST1005: error strings should not be capitalized (staticcheck)
}
k.Version = version

Expand Down Expand Up @@ -332,7 +336,7 @@
// compute 'base' duration for exponential backoff
dur := cfg.Max
if retries < maxBackoffRetries {
dur = time.Duration(uint64(cfg.Init) * uint64(1<<retries))

Check failure on line 339 in libbeat/outputs/kafka/config.go

View workflow job for this annotation

GitHub Actions / lint (ubuntu-latest)

G115: integer overflow conversion int64 -> uint64 (gosec)
}

// apply about equaly distributed jitter in second half of the interval, such that the wait
Expand Down
98 changes: 98 additions & 0 deletions libbeat/outputs/kafka/kafka_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ import (
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zapcore"
"go.uber.org/zap/zaptest/observer"

"github.com/elastic/sarama"

Expand All @@ -41,6 +44,7 @@ import (
_ "github.com/elastic/beats/v7/libbeat/outputs/codec/json"
"github.com/elastic/beats/v7/libbeat/outputs/outest"
"github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/logp/logptest"
"github.com/elastic/elastic-agent-libs/mapstr"
)
Expand Down Expand Up @@ -347,6 +351,100 @@ func TestKafkaPublish(t *testing.T) {
}
}

func TestKafkaErrors(t *testing.T) {
id := strconv.Itoa(rand.Int())
testTopic := fmt.Sprintf("test-libbeat-%s", id)

tests := []struct {
title string
config map[string]interface{}
topic string
events []eventInfo
errorMessage string
}{
{
"message of size large than `max_message_bytes` must be dropped",
map[string]interface{}{
"max_message_bytes": "10",
},
testTopic,
single(mapstr.M{
"host": "test-host-random-message-which-is-long-enough",
"message": id,
}),
"dropping message as it exceeds max_mesage_bytes",
},
}

defaultConfig := map[string]interface{}{
"hosts": []string{getTestKafkaHost()},
"topic": testTopic,
"timeout": "1s",
}

for _, test := range tests {

cfg := makeConfig(t, defaultConfig)
if test.config != nil {
err := cfg.Merge(makeConfig(t, test.config))
if err != nil {
t.Fatal(err)
}
}

observed, zapLogs := observer.New(zapcore.DebugLevel)
logger, err := logp.ConfigureWithCoreLocal(logp.Config{}, observed)
require.NoError(t, err)

grp, err := makeKafka(nil, beat.Info{Beat: "libbeat", IndexPrefix: "testbeat", Logger: logger}, outputs.NewNilObserver(), cfg)
if err != nil {
t.Fatal(err)
}

output, ok := grp.Clients[0].(*client)
assert.True(t, ok, "grp.Clients[0] didn't contain a ptr to client")
if err := output.Connect(context.Background()); err != nil {
t.Fatal(err)
}
assert.Equal(t, output.index, "testbeat")
defer output.Close()

// publish test events
var wg sync.WaitGroup
for i := range test.events {
batch := outest.NewBatch(test.events[i].events...)
batch.OnSignal = func(_ outest.BatchSignal) {
wg.Done()
}

wg.Add(1)
err := output.Publish(context.Background(), batch)
if err != nil {
t.Fatal(err)
}
}

// wait for all published batches to be ACKed
wg.Wait()

t.Cleanup(func() {
if t.Failed() {
t.Logf("Debug Logs:\n")
for _, log := range zapLogs.TakeAll() {
data, err := json.Marshal(log)
if err != nil {
t.Errorf("failed encoding log as JSON: %s", err)
}
t.Logf("%s", string(data))
}
return
}
})
assert.GreaterOrEqual(t, zapLogs.FilterMessageSnippet(test.errorMessage).Len(), 1)
}

}

func validateJSON(t *testing.T, value []byte, events []beat.Event) string {
var decoded map[string]interface{}
err := json.Unmarshal(value, &decoded)
Expand Down
Loading