Skip to content

Commit

Permalink
Merge pull request #2284 from urso/enh/kafka-partitioner
Browse files Browse the repository at this point in the history
configurable kafka partitioner
  • Loading branch information
tsg authored Aug 24, 2016
2 parents 8e46617 + 3597027 commit a025e36
Show file tree
Hide file tree
Showing 16 changed files with 1,119 additions and 146 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ https://github.com/elastic/beats/compare/v5.0.0-alpha5...master[Check the HEAD d
- Make Kafka metadata update configurable. {pull}2190[2190]
- Add kafka version setting (optional) enabling kafka broker version support. {pull}2190[2190]
- Add kafka message timestamp if at least version 0.10 is configured. {pull}2190[2190]
- Add configurable kafka event key setting. {pull}2284[2284]
- Add settings for configuring the kafka partitioning strategy. {pull}2284[2284]
- Add partitioner settings `reachable_only` to ignore partitions not reachable by network. {pull}2284[2284]
- Enhance contains condition to work on fields that are arrays of strings. {issue}2237[2237]
- Lookup the configuration file relative to the `-path.config` CLI flag. {pull}2245[2245]
- Re-write import_dashboards.sh in Golang. {pull}2155[2155]
Expand Down
17 changes: 17 additions & 0 deletions filebeat/filebeat.full.yml
Original file line number Diff line number Diff line change
Expand Up @@ -481,6 +481,23 @@ output.elasticsearch:
# using any event field. To set the topic from document type use `%{[type]}`.
#topic: beats

# The Kafka event key setting. Use format string to create unique event key.
# By default no event key will be generated.
#key: ''

# The Kafka event partitioning strategy. Default hashing strategy is `hash`
# using the `output.kafka.key` setting or randomly distributes events if
# `output.kafka.key` is not configured.
#partition.hash:
# If enabled, events will only be published to partitions with reachable
# leaders. Default is false.
#reachable_only: false

# Configure alternative event field names used to compute the hash value.
# If empty `output.kafka.key` setting will be used.
# Default value is empty list.
#hash: []

# Authentication details. Password is required if username is set.
#username: ''
#password: ''
Expand Down
17 changes: 17 additions & 0 deletions libbeat/_meta/config.full.yml
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,23 @@ output.elasticsearch:
# using any event field. To set the topic from document type use `%{[type]}`.
#topic: beats

# The Kafka event key setting. Use format string to create unique event key.
# By default no event key will be generated.
#key: ''

# The Kafka event partitioning strategy. Default hashing strategy is `hash`
# using the `output.kafka.key` setting or randomly distributes events if
# `output.kafka.key` is not configured.
#partition.hash:
# If enabled, events will only be published to partitions with reachable
# leaders. Default is false.
#reachable_only: false

# Configure alternative event field names used to compute the hash value.
# If empty `output.kafka.key` setting will be used.
# Default value is empty list.
#hash: []

# Authentication details. Password is required if username is set.
#username: ''
#password: ''
Expand Down
7 changes: 7 additions & 0 deletions libbeat/common/datetime.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package common

import (
"encoding/binary"
"encoding/json"
"errors"
"hash"
"time"
)

Expand Down Expand Up @@ -30,6 +32,11 @@ func (t *Time) UnmarshalJSON(data []byte) (err error) {
return
}

func (t Time) Hash32(h hash.Hash32) error {
err := binary.Write(h, binary.LittleEndian, time.Time(t).UnixNano())
return err
}

// ParseTime parses a time in the TsLayout format.
func ParseTime(timespec string) (Time, error) {
t, err := time.Parse(TsLayout, timespec)
Expand Down
17 changes: 17 additions & 0 deletions libbeat/common/fmtstr/formatevents.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,23 @@ func (fs *EventFormatString) Run(event common.MapStr) (string, error) {
return ctx.buf.String(), nil
}

// RunBytes executes the format string returning a new expanded string of type
// `[]byte` or an error if execution or event field expansion fails.
func (fs *EventFormatString) RunBytes(event common.MapStr) ([]byte, error) {
ctx := newEventCtx(len(fs.fields))
defer releaseCtx(ctx)

buf := bytes.NewBuffer(nil)
if err := fs.collectFields(ctx, event); err != nil {
return nil, err
}
err := fs.formatter.Eval(ctx, buf)
if err != nil {
return nil, err
}
return buf.Bytes(), nil
}

// Eval executes the format string, writing the resulting string into the provided output buffer. Returns error if execution or event field expansion fails.
func (fs *EventFormatString) Eval(out *bytes.Buffer, event common.MapStr) error {
ctx := newEventCtx(len(fs.fields))
Expand Down
88 changes: 59 additions & 29 deletions libbeat/outputs/kafka/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@ package kafka
import (
"encoding/json"
"expvar"
"fmt"
"sync"
"sync/atomic"
"time"

"github.com/Shopify/sarama"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/fmtstr"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/outputs"
"github.com/elastic/beats/libbeat/outputs/outil"
Expand All @@ -18,6 +20,7 @@ import (
type client struct {
hosts []string
topic outil.Selector
key *fmtstr.EventFormatString
config sarama.Config

producer sarama.AsyncProducer
Expand All @@ -41,12 +44,14 @@ var (

func newKafkaClient(
hosts []string,
key *fmtstr.EventFormatString,
topic outil.Selector,
cfg *sarama.Config,
) (*client, error) {
c := &client{
hosts: hosts,
topic: topic,
key: key,
config: *cfg,
}
return c, nil
Expand Down Expand Up @@ -106,48 +111,74 @@ func (c *client) AsyncPublishEvents(

ch := c.producer.Input()

for _, d := range data {
event := d.Event
topic, err := c.topic.Select(event)
var ts time.Time

// message timestamps have been added to kafka with version 0.10.0.0
if c.config.Version.IsAtLeast(sarama.V0_10_0_0) {
if tsRaw, ok := event["@timestamp"]; ok {
if tmp, ok := tsRaw.(common.Time); ok {
ts = time.Time(tmp)
} else if tmp, ok := tsRaw.(time.Time); ok {
ts = tmp
}
}
}
for i := range data {
d := &data[i]

jsonEvent, err := json.Marshal(event)
msg, err := c.getEventMessage(d)
if err != nil {
logp.Err("Dropping event: %v", err)
ref.done()
continue
}
msg.ref = ref

msg.initProducerMessage()
ch <- &msg.msg
}

return nil
}

func (c *client) getEventMessage(data *outputs.Data) (*message, error) {
event := data.Event
msg := messageFromData(data)
if msg.topic != "" {
return msg, nil
}

msg.event = event

msg := &sarama.ProducerMessage{
Metadata: ref,
Topic: topic,
Value: sarama.ByteEncoder(jsonEvent),
Timestamp: ts,
topic, err := c.topic.Select(event)
if err != nil {
return nil, fmt.Errorf("setting kafka topic failed with %v", err)
}
msg.topic = topic

jsonEvent, err := json.Marshal(event)
if err != nil {
return nil, fmt.Errorf("json encoding failed with %v", err)
}
msg.value = jsonEvent

// message timestamps have been added to kafka with version 0.10.0.0
var ts time.Time
if c.config.Version.IsAtLeast(sarama.V0_10_0_0) {
if tsRaw, ok := event["@timestamp"]; ok {
if tmp, ok := tsRaw.(common.Time); ok {
ts = time.Time(tmp)
} else if tmp, ok := tsRaw.(time.Time); ok {
ts = tmp
}
}
}
msg.ts = ts

ch <- msg
if c.key != nil {
if key, err := c.key.RunBytes(event); err == nil {
msg.key = key
}
}

return nil
return msg, nil
}

func (c *client) successWorker(ch <-chan *sarama.ProducerMessage) {
defer c.wg.Done()
defer debugf("Stop kafka ack worker")

for msg := range ch {
ref := msg.Metadata.(*msgRef)
ref.done()
for libMsg := range ch {
msg := libMsg.Metadata.(*message)
msg.ref.done()
}
}

Expand All @@ -156,9 +187,8 @@ func (c *client) errorWorker(ch <-chan *sarama.ProducerError) {
defer debugf("Stop kafka error handler")

for errMsg := range ch {
msg := errMsg.Msg
ref := msg.Metadata.(*msgRef)
ref.fail(errMsg.Err)
msg := errMsg.Msg.Metadata.(*message)
msg.ref.fail(errMsg.Err)
}
}

Expand Down
25 changes: 25 additions & 0 deletions libbeat/outputs/kafka/common_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package kafka

import "math/rand"

// common helpers used by unit+integration tests

func randString(length int) string {
return string(randASCIIBytes(length))
}

func randASCIIBytes(length int) []byte {
b := make([]byte, length)
for i := range b {
b[i] = randChar()
}
return b
}

func randChar() byte {
start, end := 'a', 'z'
if rand.Int31n(2) == 1 {
start, end = 'A', 'Z'
}
return byte(rand.Int31n(end-start+1) + start)
}
36 changes: 20 additions & 16 deletions libbeat/outputs/kafka/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,26 +6,30 @@ import (
"strings"
"time"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/fmtstr"
"github.com/elastic/beats/libbeat/outputs"
)

type kafkaConfig struct {
Hosts []string `config:"hosts" validate:"required"`
TLS *outputs.TLSConfig `config:"tls"`
Timeout time.Duration `config:"timeout" validate:"min=1"`
Worker int `config:"worker" validate:"min=1"`
Metadata metaConfig `config:"metadata"`
KeepAlive time.Duration `config:"keep_alive" validate:"min=0"`
MaxMessageBytes *int `config:"max_message_bytes" validate:"min=1"`
RequiredACKs *int `config:"required_acks" validate:"min=-1"`
BrokerTimeout time.Duration `config:"broker_timeout" validate:"min=1"`
Compression string `config:"compression"`
Version string `config:"version"`
MaxRetries int `config:"max_retries" validate:"min=-1,nonzero"`
ClientID string `config:"client_id"`
ChanBufferSize int `config:"channel_buffer_size" validate:"min=1"`
Username string `config:"username"`
Password string `config:"password"`
Hosts []string `config:"hosts" validate:"required"`
TLS *outputs.TLSConfig `config:"tls"`
Timeout time.Duration `config:"timeout" validate:"min=1"`
Worker int `config:"worker" validate:"min=1"`
Metadata metaConfig `config:"metadata"`
Key *fmtstr.EventFormatString `config:"key"`
Partition map[string]*common.Config `config:"partition"`
KeepAlive time.Duration `config:"keep_alive" validate:"min=0"`
MaxMessageBytes *int `config:"max_message_bytes" validate:"min=1"`
RequiredACKs *int `config:"required_acks" validate:"min=-1"`
BrokerTimeout time.Duration `config:"broker_timeout" validate:"min=1"`
Compression string `config:"compression"`
Version string `config:"version"`
MaxRetries int `config:"max_retries" validate:"min=-1,nonzero"`
ClientID string `config:"client_id"`
ChanBufferSize int `config:"channel_buffer_size" validate:"min=1"`
Username string `config:"username"`
Password string `config:"password"`
}

type metaConfig struct {
Expand Down
Loading

0 comments on commit a025e36

Please sign in to comment.