diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index cff924b3bd60..946d1dc8c956 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -129,6 +129,8 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Fix setting unique registry for non beat receivers {issue}42288[42288] {pull}42292[42292] - The Kafka output now drops events when there is an authorisation error {issue}42343[42343] {pull}42401[42401] - Fix autodiscovery memory leak related to metadata of start events {pull}41748[41748] +- All standard queue metrics are now included in metrics monitoring, including: `added.{events, bytes}`, `consumed.{events, bytes}`, `removed.{events, bytes}`, and `filled.{events, bytes, pct}`. {pull}42439[42439] +- The following output latency metrics are now included in metrics monitoring: `output.latency.{count, max, median, p99}`. {pull}42439[42439] *Auditbeat* diff --git a/libbeat/publisher/pipeline/monitoring.go b/libbeat/publisher/pipeline/monitoring.go index 4a1e5ad76a1a..50a32ad13fbe 100644 --- a/libbeat/publisher/pipeline/monitoring.go +++ b/libbeat/publisher/pipeline/monitoring.go @@ -74,11 +74,6 @@ type metricsObserverVars struct { eventsTotal, eventsFiltered, eventsPublished, eventsFailed *monitoring.Uint eventsDropped, eventsRetry *monitoring.Uint // (retryer) drop/retry counters activeEvents *monitoring.Uint - - // queue metrics - queueACKed *monitoring.Uint - queueMaxEvents *monitoring.Uint - percentQueueFull *monitoring.Float } func newMetricsObserver(metrics *monitoring.Registry) *metricsObserver { @@ -118,19 +113,6 @@ func newMetricsObserver(metrics *monitoring.Registry) *metricsObserver { // events.dropped counts events that were dropped because errors from // the output workers exceeded the configured maximum retry count. eventsDropped: monitoring.NewUint(reg, "events.dropped"), - - // (Gauge) queue.max_events measures the maximum number of events the - // queue will accept, or 0 if there is none. - queueMaxEvents: monitoring.NewUint(reg, "queue.max_events"), - - // queue.acked counts events that have been acknowledged by the output - // workers. This includes events that were dropped for fatal errors, - // which are also reported in events.dropped. - queueACKed: monitoring.NewUint(reg, "queue.acked"), - - // (Gauge) queue.filled.pct.events measures the fraction (from 0 to 1) - // of the queue's event capacity that is currently filled. - percentQueueFull: monitoring.NewFloat(reg, "queue.filled.pct.events"), }, } } diff --git a/metricbeat/module/beat/stats/data.go b/metricbeat/module/beat/stats/data.go index 3df496f0a95b..8cb118669690 100644 --- a/metricbeat/module/beat/stats/data.go +++ b/metricbeat/module/beat/stats/data.go @@ -71,13 +71,40 @@ var ( "write": c.Dict("write", s.Schema{ "bytes": c.Int("bytes"), "errors": c.Int("errors"), + "latency": c.Dict("latency", s.Schema{ + "count": c.Int("count"), + "max": c.Int("max"), + "median": c.Float("median"), + "p99": c.Float("p99"), + }), }), }), "pipeline": c.Dict("pipeline", s.Schema{ "clients": c.Int("clients"), "queue": c.Dict("queue", s.Schema{ - "acked": c.Int("acked"), "max_events": c.Int("max_events"), + + "added": c.Dict("added", s.Schema{ + "events": c.Int("events"), + "bytes": c.Int("bytes"), + }), + "consumed": c.Dict("consumed", s.Schema{ + "events": c.Int("events"), + "bytes": c.Int("bytes"), + }), + "removed": c.Dict("removed", s.Schema{ + "events": c.Int("events"), + "bytes": c.Int("bytes"), + }), + "filled": c.Dict("filled", s.Schema{ + "events": c.Int("events"), + "bytes": c.Int("bytes"), + "pct": c.Float("pct"), + }), + + // Backwards compatibility: "acked" is the old name for + // "removed.events" and should not be used by new code/dashboards. + "acked": c.Int("acked"), }), "events": c.Dict("events", s.Schema{ "active": c.Int("active"),