Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion libbeat/outputs/kafka/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@

ref := &msgRef{
client: c,
count: int32(len(events)),

Check failure on line 175 in libbeat/outputs/kafka/client.go

View workflow job for this annotation

GitHub Actions / lint (macos-latest)

G115: integer overflow conversion int -> int32 (gosec)
total: len(events),
failed: nil,
batch: batch,
Expand Down Expand Up @@ -271,7 +271,7 @@
defer c.log.Debug("Stop kafka ack worker")

for libMsg := range ch {
msg := libMsg.Metadata.(*message)

Check failure on line 274 in libbeat/outputs/kafka/client.go

View workflow job for this annotation

GitHub Actions / lint (macos-latest)

Error return value is not checked (errcheck)
msg.ref.done()
}
}
Expand All @@ -282,7 +282,7 @@
defer c.log.Debug("Stop kafka error handler")

for errMsg := range ch {
msg := errMsg.Msg.Metadata.(*message)

Check failure on line 285 in libbeat/outputs/kafka/client.go

View workflow job for this annotation

GitHub Actions / lint (macos-latest)

Error return value is not checked (errcheck)
msg.ref.fail(msg, errMsg.Err)

if errors.Is(errMsg.Err, breaker.ErrBreakerOpen) {
Expand Down Expand Up @@ -382,6 +382,11 @@
len(msg.key)+len(msg.value))
r.client.observer.PermanentErrors(1)

// drop event if it exceeds size larger than max_message_bytes
case strings.Contains(err.Error(), "Attempt to produce message larger than configured Producer.MaxMessageBytes"):
r.client.log.Errorf("Kafka (topic=%v): dropping message as it exceeds max_mesage_bytes:", msg.topic)
r.client.observer.PermanentErrors(1)

case isAuthError(err):
r.client.log.Errorf("Kafka (topic=%v): authorisation error: %s", msg.topic, err)
r.client.observer.PermanentErrors(1)
Expand All @@ -394,7 +399,7 @@
default:
r.failed = append(r.failed, msg.data)
if r.err == nil {
// Don't overwrite an existing error. This way at tne end of the batch
// Don't overwrite an existing error. This way at the end of the batch
// we report the first error that we saw, rather than the last one.
r.err = err
}
Expand Down
4 changes: 4 additions & 0 deletions libbeat/outputs/kafka/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
"errors"
"fmt"
"math"
"math/rand"

Check failure on line 24 in libbeat/outputs/kafka/config.go

View workflow job for this annotation

GitHub Actions / lint (macos-latest)

import 'math/rand' is not allowed from list 'main': superseded by math/rand/v2 (depguard)
"strings"
"time"

Expand Down Expand Up @@ -177,7 +177,7 @@

if c.Compression == "gzip" {
lvl := c.CompressionLevel
if lvl != sarama.CompressionLevelDefault && !(0 <= lvl && lvl <= 9) {

Check failure on line 180 in libbeat/outputs/kafka/config.go

View workflow job for this annotation

GitHub Actions / lint (macos-latest)

QF1001: could apply De Morgan's law (staticcheck)
return fmt.Errorf("compression_level must be between 0 and 9")
}
}
Expand All @@ -186,6 +186,10 @@
return errors.New("either 'topic' or 'topics' must be defined")
}

if len(c.Headers) != 0 && c.Version < kafka.Version("0.11") {
return errors.New("including headers is not supported for kafka versions < 0.11")
}

// When running under Elastic-Agent we do not support dynamic topic
// selection, so `topics` is not supported and `topic` is treated as an
// plain string
Expand Down Expand Up @@ -239,7 +243,7 @@
k.Net.SASL.Enable = true
k.Net.SASL.Mechanism = sarama.SASLTypeGSSAPI
k.Net.SASL.GSSAPI = sarama.GSSAPIConfig{
AuthType: int(config.Kerberos.AuthType),

Check failure on line 246 in libbeat/outputs/kafka/config.go

View workflow job for this annotation

GitHub Actions / lint (macos-latest)

G115: integer overflow conversion uint -> int (gosec)
KeyTabPath: config.Kerberos.KeyTabPath,
KerberosConfigPath: config.Kerberos.ConfigPath,
ServiceName: config.Kerberos.ServiceName,
Expand Down Expand Up @@ -267,7 +271,7 @@
k.Producer.MaxMessageBytes = *config.MaxMessageBytes
}
if config.RequiredACKs != nil {
k.Producer.RequiredAcks = sarama.RequiredAcks(*config.RequiredACKs)

Check failure on line 274 in libbeat/outputs/kafka/config.go

View workflow job for this annotation

GitHub Actions / lint (macos-latest)

G115: integer overflow conversion int -> int16 (gosec)
}

compressionMode, ok := compressionModes[strings.ToLower(config.Compression)]
Expand Down Expand Up @@ -333,7 +337,7 @@
// compute 'base' duration for exponential backoff
dur := cfg.Max
if retries < maxBackoffRetries {
dur = time.Duration(uint64(cfg.Init) * uint64(1<<retries))

Check failure on line 340 in libbeat/outputs/kafka/config.go

View workflow job for this annotation

GitHub Actions / lint (macos-latest)

G115: integer overflow conversion int64 -> uint64 (gosec)
}

// apply about equaly distributed jitter in second half of the interval, such that the wait
Expand Down
92 changes: 92 additions & 0 deletions libbeat/outputs/kafka/kafka_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
"context"
"encoding/json"
"fmt"
"math/rand"

Check failure on line 26 in libbeat/outputs/kafka/kafka_integration_test.go

View workflow job for this annotation

GitHub Actions / lint (macos-latest)

import 'math/rand' is not allowed from list 'main': superseded by math/rand/v2 (depguard)
"os"
"strconv"
"sync"
Expand Down Expand Up @@ -318,7 +318,7 @@

validate := validateJSON
if fmt, exists := test.config["codec.format.string"]; exists {
validate = makeValidateFmtStr(fmt.(string))

Check failure on line 321 in libbeat/outputs/kafka/kafka_integration_test.go

View workflow job for this annotation

GitHub Actions / lint (macos-latest)

Error return value is not checked (errcheck)
}

cfgHeaders, headersSet := test.config["headers"]
Expand Down Expand Up @@ -346,6 +346,98 @@
}
}

func TestKafkaErrors(t *testing.T) {
id := strconv.Itoa(rand.Int())
testTopic := fmt.Sprintf("test-libbeat-%s", id)

tests := []struct {
title string
config map[string]interface{}
topic string
events []eventInfo
errorMessage string
}{
{
"message of size large than `max_message_bytes` must be dropped",
map[string]interface{}{
"max_message_bytes": "10",
},
testTopic,
single(mapstr.M{
"host": "test-host-random-message-which-is-long-enough",
"message": id,
}),
"dropping message as it exceeds max_mesage_bytes",
},
}

defaultConfig := map[string]interface{}{
"hosts": []string{getTestKafkaHost()},
"topic": testTopic,
"timeout": "1s",
}

for _, test := range tests {

cfg := makeConfig(t, defaultConfig)
if test.config != nil {
err := cfg.Merge(makeConfig(t, test.config))
if err != nil {
t.Fatal(err)
}
}

logp.DevelopmentSetup(logp.ToObserverOutput())

grp, err := makeKafka(nil, beat.Info{Beat: "libbeat", IndexPrefix: "testbeat"}, outputs.NewNilObserver(), cfg)
if err != nil {
t.Fatal(err)
}

output, ok := grp.Clients[0].(*client)
assert.True(t, ok, "grp.Clients[0] didn't contain a ptr to client")
if err := output.Connect(context.Background()); err != nil {
t.Fatal(err)
}
assert.Equal(t, output.index, "testbeat")
defer output.Close()

// publish test events
var wg sync.WaitGroup
for i := range test.events {
batch := outest.NewBatch(test.events[i].events...)
batch.OnSignal = func(_ outest.BatchSignal) {
wg.Done()
}

wg.Add(1)
err := output.Publish(context.Background(), batch)
if err != nil {
t.Fatal(err)
}
}

// wait for all published batches to be ACKed
wg.Wait()

t.Cleanup(func() {
if t.Failed() {
t.Logf("Debug Logs:\n")
for _, log := range logp.ObserverLogs().TakeAll() {
data, err := json.Marshal(log)
if err != nil {
t.Errorf("failed encoding log as JSON: %s", err)
}
t.Logf("%s", string(data))
}
return
}
})
assert.GreaterOrEqual(t, logp.ObserverLogs().FilterMessageSnippet(test.errorMessage).Len(), 1)
}

}

func validateJSON(t *testing.T, value []byte, events []beat.Event) string {
var decoded map[string]interface{}
err := json.Unmarshal(value, &decoded)
Expand Down
Loading