Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions libbeat/publisher/pipeline/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func (c *client) publish(e beat.Event) {
}

if c.reportEvents {
c.pipeline.waitCloser.inc()
c.pipeline.waitCloseGroup.Add(1)
}

var published bool
Expand All @@ -139,7 +139,7 @@ func (c *client) publish(e beat.Event) {
} else {
c.onDroppedOnPublish(e)
if c.reportEvents {
c.pipeline.waitCloser.dec(1)
c.pipeline.waitCloseGroup.Add(-1)
}
}
}
Expand Down Expand Up @@ -189,7 +189,7 @@ func (c *client) unlink() {
if c.reportEvents {
log.Debugf("client: remove client events")
if n > 0 {
c.pipeline.waitCloser.dec(n)
c.pipeline.waitCloseGroup.Add(-n)
}
}

Expand Down
82 changes: 18 additions & 64 deletions libbeat/publisher/pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,10 @@ type Pipeline struct {

observer observer

eventer pipelineEventer

// wait close support
waitCloseMode WaitCloseMode
waitOnClose bool
waitCloseTimeout time.Duration
waitCloser *waitCloser
waitCloseGroup sync.WaitGroup

// closeRef signal propagation support
guardStartSigPropagation sync.Once
Expand Down Expand Up @@ -102,10 +100,6 @@ const (
// to ACK any outstanding events. This is independent of Clients asking for
// ACK and/or WaitClose. Clients can still optionally configure WaitClose themselves.
WaitOnPipelineClose

// WaitOnClientClose applies WaitClose timeout to each client connecting to
// the pipeline. Clients are still allowed to overwrite WaitClose with a timeout > 0s.
WaitOnClientClose
)

// OutputReloader interface, that can be queried from an active publisher pipeline.
Expand All @@ -117,16 +111,6 @@ type OutputReloader interface {
) error
}

type pipelineEventer struct {
observer queueObserver
waitClose *waitCloser
}

type waitCloser struct {
// keep track of total number of active events (minus dropped by processors)
events sync.WaitGroup
}

type queueFactory func(queue.ACKListener) (queue.Queue, error)

// New create a new Pipeline instance from a queue instance and a set of outputs.
Expand All @@ -149,24 +133,16 @@ func New(
beatInfo: beat,
monitors: monitors,
observer: nilObserver,
waitCloseMode: settings.WaitCloseMode,
waitOnClose: settings.WaitCloseMode == WaitOnPipelineClose && settings.WaitClose > 0,
waitCloseTimeout: settings.WaitClose,
processors: settings.Processors,
}

if monitors.Metrics != nil {
p.observer = newMetricsObserver(monitors.Metrics)
}
p.eventer.observer = p.observer

if settings.WaitCloseMode == WaitOnPipelineClose && settings.WaitClose > 0 {
p.waitCloser = &waitCloser{}

// waitCloser decrements counter on queue ACK (not per client)
p.eventer.waitClose = p.waitCloser
}

p.queue, err = queueFactory(&p.eventer)
p.queue, err = queueFactory(p)
if err != nil {
return nil, err
}
Expand All @@ -185,6 +161,16 @@ func New(
return p, nil
}

// OnACK implements the queue.ACKListener interface, so the queue can notify the
// Pipeline when events are acknowledged.
func (p *Pipeline) OnACK(n int) {
p.observer.queueACKed(n)

if p.waitOnClose {
p.waitCloseGroup.Add(-n)
}
}

// Close stops the pipeline, outputs and queue.
// If WaitClose with WaitOnPipelineClose mode is configured, Close will block
// for a duration of WaitClose, if there are still active events in the pipeline.
Expand All @@ -194,10 +180,10 @@ func (p *Pipeline) Close() error {

log.Debug("close pipeline")

if p.waitCloser != nil {
if p.waitOnClose {
ch := make(chan struct{})
go func() {
p.waitCloser.wait()
p.waitCloseGroup.Wait()
ch <- struct{}{}
}()

Expand All @@ -208,7 +194,6 @@ func (p *Pipeline) Close() error {
case <-time.After(p.waitCloseTimeout):
// timeout -> close pipeline with pending events
}

}

// Note: active clients are not closed / disconnected.
Expand Down Expand Up @@ -258,16 +243,7 @@ func (p *Pipeline) ConnectWith(cfg beat.ClientConfig) (beat.Client, error) {
}

waitClose := cfg.WaitClose
reportEvents := p.waitCloser != nil

switch p.waitCloseMode {
case NoWaitOnClose:

case WaitOnClientClose:
if waitClose <= 0 {
waitClose = p.waitCloseTimeout
}
}
reportEvents := p.waitOnClose

processors, err := p.createEventProcessing(cfg.Processing, publishDisabled)
if err != nil {
Expand Down Expand Up @@ -296,7 +272,7 @@ func (p *Pipeline) ConnectWith(cfg beat.ClientConfig) (beat.Client, error) {
cfg.Events.DroppedOnPublish(event)
}
if reportEvents {
p.waitCloser.dec(1)
p.waitCloseGroup.Add(-1)
}
}
}
Expand Down Expand Up @@ -420,28 +396,6 @@ func (p *Pipeline) createEventProcessing(cfg beat.ProcessingConfig, noPublish bo
return p.processors.Create(cfg, noPublish)
}

func (e *pipelineEventer) OnACK(n int) {
e.observer.queueACKed(n)

if wc := e.waitClose; wc != nil {
wc.dec(n)
}
}

func (e *waitCloser) inc() {
e.events.Add(1)
}

func (e *waitCloser) dec(n int) {
for i := 0; i < n; i++ {
e.events.Done()
}
}

func (e *waitCloser) wait() {
e.events.Wait()
}

// OutputReloader returns a reloadable object for the output section of this pipeline
func (p *Pipeline) OutputReloader() OutputReloader {
return p.output
Expand Down