Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -602,6 +602,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Allow node/namespace metadata to be disabled on kubernetes metagen and ensure add_kubernetes_metadata honors host {pull}23012[23012]
- Add `wineventlog` schema to `decode_xml` processor. {issue}23910[23910] {pull}24726[24726]
- Add new ECS 1.9 field `cloud.service.name` to `add_cloud_metadata` processor. {pull}24993[24993]
- Libbeat: report queue capacity, output batch size, and output client count to monitoring. {pull}24700[24700]

*Auditbeat*

Expand Down
54 changes: 54 additions & 0 deletions libbeat/publisher/pipeline/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,16 @@ import (
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/libbeat/monitoring"
"github.com/elastic/beats/v7/libbeat/outputs"
"github.com/elastic/beats/v7/libbeat/publisher"
"github.com/elastic/beats/v7/libbeat/publisher/processing"
"github.com/elastic/beats/v7/libbeat/publisher/queue"
"github.com/elastic/beats/v7/libbeat/publisher/queue/memqueue"
"github.com/elastic/beats/v7/libbeat/tests/resources"
Expand Down Expand Up @@ -205,3 +211,51 @@ func TestClientWaitClose(t *testing.T) {
}
})
}

func TestMonitoring(t *testing.T) {
const (
maxEvents = 123
batchSize = 456
numClients = 42
)
var config Config
err := common.MustNewConfigFrom(map[string]interface{}{
"queue.mem.events": maxEvents,
"queue.mem.flush.min_events": 1,
}).Unpack(&config)
require.NoError(t, err)

metrics := monitoring.NewRegistry()
telemetry := monitoring.NewRegistry()
pipeline, err := Load(
beat.Info{},
Monitors{
Metrics: metrics,
Telemetry: telemetry,
},
config,
processing.Supporter(nil),
func(outputs.Observer) (string, outputs.Group, error) {
clients := make([]outputs.Client, numClients)
for i := range clients {
clients[i] = newMockClient(func(publisher.Batch) error {
return nil
})
}
return "output_name", outputs.Group{
BatchSize: batchSize,
Clients: clients,
}, nil
},
)
require.NoError(t, err)
defer pipeline.Close()

metricsSnapshot := monitoring.CollectFlatSnapshot(metrics, monitoring.Full, true)
assert.Equal(t, int64(maxEvents), metricsSnapshot.Ints["pipeline.queue.max_events"])

telemetrySnapshot := monitoring.CollectFlatSnapshot(telemetry, monitoring.Full, true)
assert.Equal(t, "output_name", telemetrySnapshot.Strings["output.name"])
assert.Equal(t, int64(batchSize), telemetrySnapshot.Ints["output.batch_size"])
assert.Equal(t, int64(numClients), telemetrySnapshot.Ints["output.clients"])
}
2 changes: 2 additions & 0 deletions libbeat/publisher/pipeline/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,8 @@ func loadOutput(
telemetry = monitors.Telemetry.NewRegistry("output")
}
monitoring.NewString(telemetry, "name").Set(outName)
monitoring.NewInt(telemetry, "batch_size").Set(int64(out.BatchSize))
monitoring.NewInt(telemetry, "clients").Set(int64(len(out.Clients)))
}

return out, nil
Expand Down
60 changes: 37 additions & 23 deletions libbeat/publisher/pipeline/monitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type clientObserver interface {

type queueObserver interface {
queueACKed(n int)
queueMaxEvents(n int)
}

type outputObserver interface {
Expand All @@ -62,7 +63,10 @@ type outputObserver interface {
// event-handlers only (e.g. the client centric events callbacks)
type metricsObserver struct {
metrics *monitoring.Registry
vars metricsObserverVars
}

type metricsObserverVars struct {
// clients metrics
clients *monitoring.Uint

Expand All @@ -72,7 +76,8 @@ type metricsObserver struct {
activeEvents *monitoring.Uint

// queue metrics
ackedQueue *monitoring.Uint
queueACKed *monitoring.Uint
queueMaxEvents *monitoring.Uint
}

func newMetricsObserver(metrics *monitoring.Registry) *metricsObserver {
Expand All @@ -83,18 +88,21 @@ func newMetricsObserver(metrics *monitoring.Registry) *metricsObserver {

return &metricsObserver{
metrics: metrics,
clients: monitoring.NewUint(reg, "clients"),
vars: metricsObserverVars{
clients: monitoring.NewUint(reg, "clients"),

events: monitoring.NewUint(reg, "events.total"),
filtered: monitoring.NewUint(reg, "events.filtered"),
published: monitoring.NewUint(reg, "events.published"),
failed: monitoring.NewUint(reg, "events.failed"),
dropped: monitoring.NewUint(reg, "events.dropped"),
retry: monitoring.NewUint(reg, "events.retry"),
events: monitoring.NewUint(reg, "events.total"),
filtered: monitoring.NewUint(reg, "events.filtered"),
published: monitoring.NewUint(reg, "events.published"),
failed: monitoring.NewUint(reg, "events.failed"),
dropped: monitoring.NewUint(reg, "events.dropped"),
retry: monitoring.NewUint(reg, "events.retry"),

ackedQueue: monitoring.NewUint(reg, "queue.acked"),
queueACKed: monitoring.NewUint(reg, "queue.acked"),
queueMaxEvents: monitoring.NewUint(reg, "queue.max_events"),

activeEvents: monitoring.NewUint(reg, "events.active"),
activeEvents: monitoring.NewUint(reg, "events.active"),
},
}
}

Expand All @@ -109,39 +117,39 @@ func (o *metricsObserver) cleanup() {
//

// (pipeline) pipeline did finish creating a new client instance
func (o *metricsObserver) clientConnected() { o.clients.Inc() }
func (o *metricsObserver) clientConnected() { o.vars.clients.Inc() }

// (client) close being called on client
func (o *metricsObserver) clientClosing() {}

// (client) client finished processing close
func (o *metricsObserver) clientClosed() { o.clients.Dec() }
func (o *metricsObserver) clientClosed() { o.vars.clients.Dec() }

//
// client publish events
//

// (client) client is trying to publish a new event
func (o *metricsObserver) newEvent() {
o.events.Inc()
o.activeEvents.Inc()
o.vars.events.Inc()
o.vars.activeEvents.Inc()
}

// (client) event is filtered out (on purpose or failed)
func (o *metricsObserver) filteredEvent() {
o.filtered.Inc()
o.activeEvents.Dec()
o.vars.filtered.Inc()
o.vars.activeEvents.Dec()
}

// (client) managed to push an event into the publisher pipeline
func (o *metricsObserver) publishedEvent() {
o.published.Inc()
o.vars.published.Inc()
}

// (client) client closing down or DropIfFull is set
func (o *metricsObserver) failedPublishEvent() {
o.failed.Inc()
o.activeEvents.Dec()
o.vars.failed.Inc()
o.vars.activeEvents.Dec()
}

//
Expand All @@ -150,8 +158,13 @@ func (o *metricsObserver) failedPublishEvent() {

// (queue) number of events ACKed by the queue/broker in use
func (o *metricsObserver) queueACKed(n int) {
o.ackedQueue.Add(uint64(n))
o.activeEvents.Sub(uint64(n))
o.vars.queueACKed.Add(uint64(n))
o.vars.activeEvents.Sub(uint64(n))
}

// (queue) maximum queue event capacity
func (o *metricsObserver) queueMaxEvents(n int) {
o.vars.queueMaxEvents.Set(uint64(n))
}

//
Expand All @@ -166,12 +179,12 @@ func (o *metricsObserver) eventsFailed(int) {}

// (retryer) number of events dropped by retryer
func (o *metricsObserver) eventsDropped(n int) {
o.dropped.Add(uint64(n))
o.vars.dropped.Add(uint64(n))
}

// (retryer) number of events pushed to the output worker queue
func (o *metricsObserver) eventsRetry(n int) {
o.retry.Add(uint64(n))
o.vars.retry.Add(uint64(n))
}

// (output) number of events to be forwarded to the output client
Expand All @@ -193,6 +206,7 @@ func (*emptyObserver) filteredEvent() {}
func (*emptyObserver) publishedEvent() {}
func (*emptyObserver) failedPublishEvent() {}
func (*emptyObserver) queueACKed(n int) {}
func (*emptyObserver) queueMaxEvents(int) {}
func (*emptyObserver) updateOutputGroup() {}
func (*emptyObserver) eventsFailed(int) {}
func (*emptyObserver) eventsDropped(int) {}
Expand Down
1 change: 1 addition & 0 deletions libbeat/publisher/pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ func New(
// Only active if pipeline can drop events.
maxEvents = 64000
}
p.observer.queueMaxEvents(maxEvents)
p.eventSema = newSema(maxEvents)

p.output = newOutputController(beat, monitors, p.observer, p.queue)
Expand Down