Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 7 additions & 11 deletions libbeat/publisher/pipeline/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,6 @@ import (
)

// client connects a beat with the processors and pipeline queue.
//
// TODO: All ackers currently drop any late incoming ACK. Some beats still might
// be interested in handling/waiting for event ACKs more globally
// -> add support for not dropping pending ACKs
type client struct {
pipeline *Pipeline
processors beat.Processor
Expand Down Expand Up @@ -101,8 +97,8 @@ func (c *client) publish(e beat.Event) {
event, err = c.processors.Run(event)
publish = event != nil
if err != nil {
// TODO: introduce dead-letter queue?

// If we introduce a dead-letter queue, this is where we should
// route the event to it.
log.Errorf("Failed to publish event: %v", err)
}
}
Expand All @@ -124,7 +120,7 @@ func (c *client) publish(e beat.Event) {
}

if c.reportEvents {
c.pipeline.waitCloser.inc()
c.pipeline.waitCloseGroup.Add(1)
}

var published bool
Expand All @@ -139,7 +135,7 @@ func (c *client) publish(e beat.Event) {
} else {
c.onDroppedOnPublish(e)
if c.reportEvents {
c.pipeline.waitCloser.dec(1)
c.pipeline.waitCloseGroup.Add(-1)
}
}
}
Expand Down Expand Up @@ -178,7 +174,7 @@ func (c *client) Close() error {
return nil
}

// unlink is the final step of closing a client. It cancells the connect of the
// unlink is the final step of closing a client. It cancels the connect of the
// client as producer to the queue.
func (c *client) unlink() {
log := c.logger()
Expand All @@ -189,7 +185,7 @@ func (c *client) unlink() {
if c.reportEvents {
log.Debugf("client: remove client events")
if n > 0 {
c.pipeline.waitCloser.dec(n)
c.pipeline.waitCloseGroup.Add(-n)
}
}

Expand Down Expand Up @@ -271,7 +267,7 @@ func (w *clientCloseWaiter) ACKEvents(n int) {
}

// The Close signal from the pipeline is ignored. Instead the client
// explicitely uses `signalClose` and `wait` before it continues with the
// explicitly uses `signalClose` and `wait` before it continues with the
// closing sequence.
func (w *clientCloseWaiter) Close() {}

Expand Down
82 changes: 18 additions & 64 deletions libbeat/publisher/pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,10 @@ type Pipeline struct {

observer observer

eventer pipelineEventer

// wait close support
waitCloseMode WaitCloseMode
waitOnClose bool
waitCloseTimeout time.Duration
waitCloser *waitCloser
waitCloseGroup sync.WaitGroup

// closeRef signal propagation support
guardStartSigPropagation sync.Once
Expand Down Expand Up @@ -102,10 +100,6 @@ const (
// to ACK any outstanding events. This is independent of Clients asking for
// ACK and/or WaitClose. Clients can still optionally configure WaitClose themselves.
WaitOnPipelineClose

// WaitOnClientClose applies WaitClose timeout to each client connecting to
// the pipeline. Clients are still allowed to overwrite WaitClose with a timeout > 0s.
WaitOnClientClose
)

// OutputReloader interface, that can be queried from an active publisher pipeline.
Expand All @@ -117,16 +111,6 @@ type OutputReloader interface {
) error
}

type pipelineEventer struct {
observer queueObserver
waitClose *waitCloser
}

type waitCloser struct {
// keep track of total number of active events (minus dropped by processors)
events sync.WaitGroup
}

type queueFactory func(queue.ACKListener) (queue.Queue, error)

// New create a new Pipeline instance from a queue instance and a set of outputs.
Expand All @@ -149,24 +133,16 @@ func New(
beatInfo: beat,
monitors: monitors,
observer: nilObserver,
waitCloseMode: settings.WaitCloseMode,
waitOnClose: settings.WaitCloseMode == WaitOnPipelineClose && settings.WaitClose > 0,
waitCloseTimeout: settings.WaitClose,
processors: settings.Processors,
}

if monitors.Metrics != nil {
p.observer = newMetricsObserver(monitors.Metrics)
}
p.eventer.observer = p.observer

if settings.WaitCloseMode == WaitOnPipelineClose && settings.WaitClose > 0 {
p.waitCloser = &waitCloser{}

// waitCloser decrements counter on queue ACK (not per client)
p.eventer.waitClose = p.waitCloser
}

p.queue, err = queueFactory(&p.eventer)
p.queue, err = queueFactory(p)
if err != nil {
return nil, err
}
Expand All @@ -185,6 +161,16 @@ func New(
return p, nil
}

// OnACK implements the queue.ACKListener interface, so the queue can notify the
// Pipeline when events are acknowledged.
func (p *Pipeline) OnACK(n int) {
p.observer.queueACKed(n)

if p.waitOnClose {
p.waitCloseGroup.Add(-n)
}
}

// Close stops the pipeline, outputs and queue.
// If WaitClose with WaitOnPipelineClose mode is configured, Close will block
// for a duration of WaitClose, if there are still active events in the pipeline.
Expand All @@ -194,10 +180,10 @@ func (p *Pipeline) Close() error {

log.Debug("close pipeline")

if p.waitCloser != nil {
if p.waitOnClose {
ch := make(chan struct{})
go func() {
p.waitCloser.wait()
p.waitCloseGroup.Wait()
ch <- struct{}{}
}()

Expand All @@ -208,7 +194,6 @@ func (p *Pipeline) Close() error {
case <-time.After(p.waitCloseTimeout):
// timeout -> close pipeline with pending events
}

}

// Note: active clients are not closed / disconnected.
Expand Down Expand Up @@ -258,16 +243,7 @@ func (p *Pipeline) ConnectWith(cfg beat.ClientConfig) (beat.Client, error) {
}

waitClose := cfg.WaitClose
reportEvents := p.waitCloser != nil

switch p.waitCloseMode {
case NoWaitOnClose:

case WaitOnClientClose:
if waitClose <= 0 {
waitClose = p.waitCloseTimeout
}
}
reportEvents := p.waitOnClose

processors, err := p.createEventProcessing(cfg.Processing, publishDisabled)
if err != nil {
Expand Down Expand Up @@ -296,7 +272,7 @@ func (p *Pipeline) ConnectWith(cfg beat.ClientConfig) (beat.Client, error) {
cfg.Events.DroppedOnPublish(event)
}
if reportEvents {
p.waitCloser.dec(1)
p.waitCloseGroup.Add(-1)
}
}
}
Expand Down Expand Up @@ -420,28 +396,6 @@ func (p *Pipeline) createEventProcessing(cfg beat.ProcessingConfig, noPublish bo
return p.processors.Create(cfg, noPublish)
}

func (e *pipelineEventer) OnACK(n int) {
e.observer.queueACKed(n)

if wc := e.waitClose; wc != nil {
wc.dec(n)
}
}

func (e *waitCloser) inc() {
e.events.Add(1)
}

func (e *waitCloser) dec(n int) {
for i := 0; i < n; i++ {
e.events.Done()
}
}

func (e *waitCloser) wait() {
e.events.Wait()
}

// OutputReloader returns a reloadable object for the output section of this pipeline
func (p *Pipeline) OutputReloader() OutputReloader {
return p.output
Expand Down