Skip to content

Commit

Permalink
Merge pull request #772 from urso/enh/pub-simpl
Browse files Browse the repository at this point in the history
Simplify publisher pipeline
  • Loading branch information
ruflin committed Jan 21, 2016
2 parents 8048e3b + 17c12ea commit fec24e0
Show file tree
Hide file tree
Showing 38 changed files with 431 additions and 362 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ https://github.com/elastic/beats/compare/1.0.0...master[Check the HEAD diff]
*Affecting all Beats*
- Some publisher options refactoring in libbeat {pull}684[684]
- Run function to start a beat no returns an error instead of directly exiting. {pull}771[771]
- Move event preprocessor applying GeoIP to packetbeat {pull}772[772]

*Packetbeat*

Expand Down
1 change: 1 addition & 0 deletions filebeat/input/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ func (f *FileEvent) ToMapStr() common.MapStr {
"message": f.Text,
"type": f.DocumentType,
"input_type": f.InputType,
"count": 1,
}

if f.Fields != nil {
Expand Down
18 changes: 10 additions & 8 deletions libbeat/beat/beat.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,13 @@ import (
"runtime"
"sync"

"github.com/satori/go.uuid"

"github.com/elastic/beats/libbeat/cfgfile"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/outputs"
"github.com/elastic/beats/libbeat/publisher"
"github.com/elastic/beats/libbeat/service"

"github.com/satori/go.uuid"
)

// Beater interface that every beat must use
Expand All @@ -56,12 +56,13 @@ type FlagsHandler interface {

// Basic beat information
type Beat struct {
Name string
Version string
Config *BeatConfig
BT Beater
Events publisher.Client
UUID uuid.UUID
Name string
Version string
Config *BeatConfig
BT Beater
Publisher *publisher.PublisherType
Events publisher.Client
UUID uuid.UUID

exit chan struct{}
error error
Expand Down Expand Up @@ -212,6 +213,7 @@ func (b *Beat) LoadConfig() error {
return fmt.Errorf("error Initialising publisher: %v\n", err)
}

b.Publisher = pub
b.Events = pub.Client()

logp.Info("Init Beat: %s; Version: %s", b.Name, b.Version)
Expand Down
36 changes: 19 additions & 17 deletions libbeat/publisher/async.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
)

type asyncPublisher struct {
messageWorker
outputs []worker
pub *PublisherType
ws workerSignal
Expand All @@ -30,28 +29,12 @@ func newAsyncPublisher(pub *PublisherType, hwm, bulkHWM int) *asyncPublisher {
}

p.outputs = outputs
p.messageWorker.init(&pub.wsPublisher, hwm, bulkHWM, newPreprocessor(pub, p))
return p
}

// onStop will send stop signal to message batching workers
func (p *asyncPublisher) onStop() { p.ws.stop() }

func (p *asyncPublisher) onMessage(m message) {
debug("async forward to outputers (%v)", len(p.outputs))

// m.signal is not set yet. But a async client type supporting signals might
// be implemented in the furute.
// If m.signal is nil, NewSplitSignaler will return nil -> signaler will
// only set if client did send one
if m.context.signal != nil && len(p.outputs) > 1 {
m.context.signal = outputs.NewSplitSignaler(m.context.signal, len(p.outputs))
}
for _, o := range p.outputs {
o.send(m)
}
}

func (p *asyncPublisher) client() eventPublisher {
return p
}
Expand All @@ -66,6 +49,25 @@ func (p *asyncPublisher) PublishEvents(ctx context, events []common.MapStr) bool
return true
}

func (p *asyncPublisher) send(m message) {
if p.pub.disabled {
debug("publisher disabled")
outputs.SignalCompleted(m.context.signal)
return
}

// m.signal is not set yet. But a async client type supporting signals might
// be implemented in the future.
// If m.signal is nil, NewSplitSignaler will return nil -> signaler will
// only set if client did send one
if m.context.signal != nil && len(p.outputs) > 1 {
m.context.signal = outputs.NewSplitSignaler(m.context.signal, len(p.outputs))
}
for _, o := range p.outputs {
o.send(m)
}
}

func asyncOutputer(ws *workerSignal, hwm, bulkHWM int, worker *outputWorker) worker {
config := worker.config

Expand Down
32 changes: 32 additions & 0 deletions libbeat/publisher/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"expvar"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/outputs"
)

Expand Down Expand Up @@ -32,6 +33,9 @@ type ChanClient struct {

type client struct {
publisher *PublisherType

beatMeta common.MapStr
tags []string
}

// ClientOption allows API users to set additional options when publishing events.
Expand Down Expand Up @@ -62,18 +66,46 @@ func Signal(signaler outputs.Signaler) ClientOption {
}
}

func newClient(pub *PublisherType) *client {
return &client{
publisher: pub,
beatMeta: common.MapStr{
"name": pub.name,
"hostname": pub.hostname,
},
tags: pub.tags,
}
}

func (c *client) PublishEvent(event common.MapStr, opts ...ClientOption) bool {
c.annotateEvent(event)

ctx, client := c.getClient(opts)
publishedEvents.Add(1)
return client.PublishEvent(ctx, event)
}

func (c *client) PublishEvents(events []common.MapStr, opts ...ClientOption) bool {
for _, event := range events {
c.annotateEvent(event)
}

ctx, client := c.getClient(opts)
publishedEvents.Add(int64(len(events)))
return client.PublishEvents(ctx, events)
}

func (c *client) annotateEvent(event common.MapStr) {
event["beat"] = c.beatMeta
if len(c.tags) > 0 {
event["tags"] = c.tags
}

if logp.IsDebug("publish") {
PrintPublishEvent(event)
}
}

func (c *client) getClient(opts []ClientOption) (context, eventPublisher) {
var ctx context
for _, opt := range opts {
Expand Down
195 changes: 0 additions & 195 deletions libbeat/publisher/preprocess.go

This file was deleted.

Loading

0 comments on commit fec24e0

Please sign in to comment.