Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Proposal: kafka output #942

Merged
merged 3 commits into from
Feb 10, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ https://github.com/elastic/beats/compare/v1.1.0...master[Check the HEAD diff]
- Add ability to create Elasticsearch mapping on startup {pull}639[639]
- Add option to elasticsearch output to pass http parameters in index operations {issue}805[805]
- Improve logstash and elasticsearch backoff behavior. {pull}927[927]
- Add experimental Kafka output. {pull}942[942]

*Packetbeat*
- Change the DNS library used throughout the dns package to github.com/miekg/dns. {pull}803[803]
Expand Down
12 changes: 12 additions & 0 deletions glide.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,15 @@ import:
version: v1.2.0
- package: github.com/miekg/dns
version: 85b661b2a6fc95a5a83e66d7730c4bc0b6e9c99e
- package: github.com/Shopify/sarama
version: v1.8.0
- package: github.com/klauspost/crc32
version: 6973dcf6594efa905c08260fe9120cae92ab4305
- package: github.com/golang/snappy
version: 894fd4616c897c201d223c3c0c128e8c648c96a2
- package: github.com/eapache/queue
version: ded5959c0d4e360646dc9e9908cff48666781367
- package: github.com/eapache/go-resiliency
version: b86b1ec0dd4209a588dc1285cdd471e73525c0b3
subpackages:
- breaker
75 changes: 75 additions & 0 deletions libbeat/docs/outputconfig.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ You can configure multiple outputs for exporting the correlated transactions. Cu
* <<elasticsearch-output,Elasticsearch>>
* <<logstash-output,Logstash>>
* <<redis-output,Redis (DEPRECATED)>>
* <<kafka-output,Kafka>>
* <<file-output,File>>
* <<console-output,Console>>

Expand Down Expand Up @@ -523,6 +524,80 @@ setting does not affect how events are published.

Setting `bulk_max_size` to values less than or equal to 0 disables buffering in libbeat.

[[kafka-output]]
==== Kafka Output

===== hosts

List of Kafka broker addressed to connect to.

===== topic

The kafka topic used for produced events. If use_type is set to true, topic will not be used.

===== use_type

Set kafka topic by event type. If use_type is false, topic must be configured. The deault is false.

===== client_id

Configurable ClientID used for logging, debugging and auditing purposes. The default is "beats".

===== worker

Number of concurrent load-balanced kafka output workers.

===== max_retries

The number of times to retry publishing an event after a publishing failure.
After the specified number of retries, the events are typically dropped.
Some Beats, such as Filebeat, ignore the `max_retries` setting and retry until all
events are published.

Set `max_retries` to a value less than 0 to retry until all events are published.

The default is 3.

===== bulk_max_size

The maximum number of events to bulk in a single Logstash request. The default is 2048.

===== timeout

The number of seconds to wait for responses from the Kafka brokers before timing
out. The default is 30 (seconds).

===== broker_timeout

Maximum duration a broker will wait for number of required ACKs. The default is 10s.

===== keep_alive

Keep-alive period for an active network connection. If 0s, keep-alives are disabled. The default is 0 seconds.

===== compression

Select output compression codec. Must be one of `none`, `snappy` and `gzip`. The default is `snappy`.

===== max_message_bytes

Max permitted size of json-encoded messages. Bigger messages will be dropped. The default value is 1000000 (bytes). Should be equal or less to the brokers `message.max.bytes`.

===== required_acks

ACK reliability level required from broker. 0=no response, 1=wait for local commit, -1=wait for all replicas to commit. The default is 1.

Note: If set to 0, no ACKs are returned by kafka. Messages might be lost silently on error.

===== flush_interval

The number of seconds to wait for new events between two producer API calls.

===== tls

Configuration options for TLS parameters like the root CA for Kibana connections. See
<<configuration-output-tls>> for more information.

[[file-output]]
==== File Output

Expand Down
188 changes: 188 additions & 0 deletions libbeat/outputs/kafka/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
package kafka

import (
"encoding/json"
"expvar"
"sync"
"sync/atomic"
"time"

"github.com/Shopify/sarama"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
)

type client struct {
hosts []string
topic string
useType bool
config sarama.Config

producer sarama.AsyncProducer

wg sync.WaitGroup

isConnected int32
}

type msgRef struct {
count int32
err atomic.Value
batch []common.MapStr
cb func([]common.MapStr, error)
}

var (
ackedEvents = expvar.NewInt("libbeatKafkaPublishedAndAckedEvents")
eventsNotAcked = expvar.NewInt("libbeatKafkaPublishedButNotAckedEvents")
publishEventsCallCount = expvar.NewInt("libbeatKafkaPublishEventsCallCount")
)

func newKafkaClient(hosts []string, topic string, useType bool, cfg *sarama.Config) (*client, error) {
c := &client{
hosts: hosts,
useType: useType,
topic: topic,
config: *cfg,
}
return c, nil
}

func (c *client) Connect(timeout time.Duration) error {
debugf("connect: %v", c.hosts)

c.config.Net.DialTimeout = timeout

// try to connect
producer, err := sarama.NewAsyncProducer(c.hosts, &c.config)
if err != nil {
logp.Err("Kafka connect fails with: %v", err)
return err
}

c.producer = producer

c.wg.Add(2)
go c.successWorker(producer.Successes())
go c.errorWorker(producer.Errors())
atomic.StoreInt32(&c.isConnected, 1)

return nil
}

func (c *client) Close() error {
if c.IsConnected() {
debugf("closed kafka client")

c.producer.AsyncClose()
c.wg.Wait()
atomic.StoreInt32(&c.isConnected, 0)
c.producer = nil
}
return nil
}

func (c *client) IsConnected() bool {
return atomic.LoadInt32(&c.isConnected) != 0
}

func (c *client) AsyncPublishEvent(
cb func(error),
event common.MapStr,
) error {
return c.AsyncPublishEvents(func(_ []common.MapStr, err error) {
cb(err)
}, []common.MapStr{event})
}

func (c *client) AsyncPublishEvents(
cb func([]common.MapStr, error),
events []common.MapStr,
) error {
publishEventsCallCount.Add(1)
debugf("publish events")

ref := &msgRef{
count: int32(len(events)),
batch: events,
cb: cb,
}

ch := c.producer.Input()

for _, event := range events {
topic := c.topic
if c.useType {
topic = event["type"].(string)
}

jsonEvent, err := json.Marshal(event)
if err != nil {
ref.done()
continue
}

msg := &sarama.ProducerMessage{
Metadata: ref,
Topic: topic,
Value: sarama.ByteEncoder(jsonEvent),
}

ch <- msg
}

return nil
}

func (c *client) successWorker(ch <-chan *sarama.ProducerMessage) {
defer c.wg.Done()
defer debugf("Stop kafka ack worker")

for msg := range ch {
ref := msg.Metadata.(*msgRef)
ref.done()
}
}

func (c *client) errorWorker(ch <-chan *sarama.ProducerError) {
defer c.wg.Done()
defer debugf("Stop kafka error handler")

for errMsg := range ch {
msg := errMsg.Msg
ref := msg.Metadata.(*msgRef)
ref.fail(errMsg.Err)
}
}

func (r *msgRef) done() {
r.dec()
}

func (r *msgRef) fail(err error) {
debugf("Kafka publish failed with: %v", err)

r.err.Store(err)
r.dec()
}

func (r *msgRef) dec() {
i := atomic.AddInt32(&r.count, -1)
if i > 0 {
return
}

debugf("finished kafka batch")

var err error
v := r.err.Load()
if v != nil {
err = v.(error)
eventsNotAcked.Add(int64(len(r.batch)))
r.cb(r.batch, err)
} else {
ackedEvents.Add(int64(len(r.batch)))
r.cb(nil, nil)
}
}
Loading