From 01dabf9804971ebc8afb0d7186d4894a5e217c66 Mon Sep 17 00:00:00 2001 From: Dieter Plaetinck Date: Tue, 10 Apr 2018 11:10:19 +0300 Subject: [PATCH] cleanup input handler metrics * received and invalid on a per-messagetype basis * track unknown metricpoint metrics * remove unused MsgsAge metric note: dashboard now requires recent graphite which has groupByNodes --- dashboard.json | 33 ++++++++++++++++++------- docs/operations.md | 1 + input/input.go | 35 +++++++++++++++------------ scripts/qa/verify_metrics_received.py | 2 +- 4 files changed, 46 insertions(+), 25 deletions(-) diff --git a/dashboard.json b/dashboard.json index 9154fa4207..719b7928b8 100644 --- a/dashboard.json +++ b/dashboard.json @@ -98,9 +98,16 @@ "yaxis": 2 }, { - "alias": "/ in/", + "alias": "/received/", "lines": true, - "points": false + "points": false, + "color": "#3f6833" + }, + { + "alias": "/metricpoint.received/", + "lines": true, + "points": false, + "color": "#7eb26d" }, { "alias": "/invalid/", @@ -110,6 +117,10 @@ "alias": "/decode err/", "color": "#890F02" }, + { + "alias": "/unknown/", + "color": "#890F02" + }, { "alias": "/pressure\\./", "fill": 0, @@ -126,34 +137,38 @@ "targets": [ { "refId": "A", - "target": "aliasSub(sumSeries(perSecond(metrictank.stats.$environment.$instance.input.*.metrics_received.counter32)), '.*\\.([^\\.]+)\\.metrics_received.*', '\\1 in')" + "target": "aliasSub(sumSeries(perSecond(metrictank.stats.$environment.$instance.input.*.metrics_decode_err.counter32)), '.*\\.([^\\.]+)\\.metrics_decode_err.*', '\\1 decode err')" }, { "refId": "B", - "target": "alias(sumSeries(perSecond(metrictank.stats.$environment.$instance.tank.metrics_too_old.counter32)), 'too old')" + "target": "groupByNodes(perSecond(metrictank.stats.$environment.$instance.input.*.*.received.counter32), 'sum', 5, 6, 7)" }, { "refId": "C", - "target": "alias(sumSeries(perSecond(metrictank.stats.$environment.$instance.tank.add_to_closed_chunk.counter32)), 'add-to-saved')" + "target": "groupByNodes(perSecond(metrictank.stats.$environment.$instance.input.*.*.invalid.counter32), 'sum', 5, 6, 7)" }, { "refId": "D", - "target": "aliasSub(sumSeries(perSecond(metrictank.stats.$environment.$instance.input.*.metrics_decode_err.counter32)), '.*\\.([^\\.]+)\\.metrics_decode_err.*', '\\1 decode err')" + "target": "groupByNodes(perSecond(metrictank.stats.$environment.$instance.input.*.*.unknown.counter32), 'sum', 5, 6, 7)" }, { "refId": "E", - "target": "aliasSub(sumSeries(perSecond(metrictank.stats.$environment.$instance.input.*.metric_invalid.counter32)), '.*\\.([^\\.]+)\\.metric_invalid.*', '\\1 metric invalid')" + "target": "alias(sumSeries(perSecond(metrictank.stats.$environment.$instance.tank.metrics_too_old.counter32)), 'too old')" }, { "refId": "F", - "target": "alias(sumSeries(perSecond(metrictank.stats.$environment.$instance.tank.metrics_reordered.counter32)), 'reordered')" + "target": "alias(sumSeries(perSecond(metrictank.stats.$environment.$instance.tank.add_to_closed_chunk.counter32)), 'add-to-saved')" }, { "refId": "G", - "target": "aliasByNode(averageSeries(scale(perSecond(metrictank.stats.$environment.$instance.input.*.pressure.tank.counter32), 1e-9)), 6, 7)" + "target": "alias(sumSeries(perSecond(metrictank.stats.$environment.$instance.tank.metrics_reordered.counter32)), 'reordered')" }, { "refId": "H", + "target": "aliasByNode(averageSeries(scale(perSecond(metrictank.stats.$environment.$instance.input.*.pressure.tank.counter32), 1e-9)), 6, 7)" + }, + { + "refId": "I", "target": "aliasByNode(averageSeries(scale(perSecond(metrictank.stats.$environment.$instance.input.*.pressure.idx.counter32), 1e-9)), 6, 7)" } ], diff --git a/docs/operations.md b/docs/operations.md index 6f60ea8f45..334b5dbad7 100644 --- a/docs/operations.md +++ b/docs/operations.md @@ -22,6 +22,7 @@ Just make sure to have a properly configured statsd setup (or adjust the dashboa * `metrictank.stats.$environment.$instance.cluster.primary.gauge1`: assure you have exactly 1 primary node (saving to cassandra) or as many as you have shardgroups, for sharded setups. * `metrictank.stats.$environment.$instance.store.cassandra.write_queue.*.items.{min,max}.gauge32`: make sure the write queues are able to drain and don't reach capacity, otherwise ingest will block * `metrictank.stats.$environment.$instance.input.*.pressure.idx.counter32`: index pressure as a ns counter, rise is between 0 and 10^9 each second. Alert if increase is more than 4x10^8 each second: this would signify the index can't keep up with indexing new data and is blocking ingestion pipeline. +* `metrictank.stats.$environment.$instance.input.*.metricpoint.unknown.counter32`: counter of MetricPoint messages for an unknown metric, will be dropped. * `metrictank.stats.$environment.$instance.api.request_handle.latency.*.gauge32`: shows how fast/slow metrictank responds to http queries * `metrictank.stats.$environment.$instance.store.cassandra.error.*`: shows erroring queries. Queries that result in errors (or timeouts) will result in missing data in your charts. * `perSecond(metrictank.stats.$environment.$instance.tank.add_to_closed_chunk.counter32)`: Points dropped due to chunks being closed. Need to tune the chunk-max-stale setting or fix your data stream to not send old points so late. diff --git a/input/input.go b/input/input.go index 79adf1623e..e544845858 100644 --- a/input/input.go +++ b/input/input.go @@ -23,11 +23,13 @@ type Handler interface { // Default is a base handler for a metrics packet, aimed to be embedded by concrete implementations type DefaultHandler struct { - metricsReceived *stats.Counter32 - MetricInvalid *stats.Counter32 // metric metric_invalid is a count of times a metric did not validate - MsgsAge *stats.Meter32 // in ms - pressureIdx *stats.Counter32 - pressureTank *stats.Counter32 + receivedMD *stats.Counter32 + receivedMP *stats.Counter32 + invalidMD *stats.Counter32 + invalidMP *stats.Counter32 + unknownMP *stats.Counter32 + pressureIdx *stats.Counter32 + pressureTank *stats.Counter32 metrics mdata.Metrics metricIndex idx.MetricIndex @@ -35,11 +37,13 @@ type DefaultHandler struct { func NewDefaultHandler(metrics mdata.Metrics, metricIndex idx.MetricIndex, input string) DefaultHandler { return DefaultHandler{ - metricsReceived: stats.NewCounter32(fmt.Sprintf("input.%s.metrics_received", input)), - MetricInvalid: stats.NewCounter32(fmt.Sprintf("input.%s.metric_invalid", input)), - MsgsAge: stats.NewMeter32(fmt.Sprintf("input.%s.message_age", input), false), - pressureIdx: stats.NewCounter32(fmt.Sprintf("input.%s.pressure.idx", input)), - pressureTank: stats.NewCounter32(fmt.Sprintf("input.%s.pressure.tank", input)), + receivedMD: stats.NewCounter32(fmt.Sprintf("input.%s.metricdata.received", input)), + receivedMP: stats.NewCounter32(fmt.Sprintf("input.%s.metricpoint.received", input)), + invalidMD: stats.NewCounter32(fmt.Sprintf("input.%s.metricdata.invalid", input)), + invalidMP: stats.NewCounter32(fmt.Sprintf("input.%s.metricpoint.invalid", input)), + unknownMP: stats.NewCounter32(fmt.Sprintf("input.%s.metricpoint.unknown", input)), + pressureIdx: stats.NewCounter32(fmt.Sprintf("input.%s.pressure.idx", input)), + pressureTank: stats.NewCounter32(fmt.Sprintf("input.%s.pressure.tank", input)), metrics: metrics, metricIndex: metricIndex, @@ -49,9 +53,9 @@ func NewDefaultHandler(metrics mdata.Metrics, metricIndex idx.MetricIndex, input // ProcessMetricPoint updates the index if possible, and stores the data if we have an index entry // concurrency-safe. func (in DefaultHandler) ProcessMetricPoint(point schema.MetricPoint, partition int32) { - in.metricsReceived.Inc() + in.receivedMP.Inc() if !point.Valid() { - in.MetricInvalid.Inc() + in.invalidMP.Inc() log.Debug("in: Invalid metric %v", point) return } @@ -61,6 +65,7 @@ func (in DefaultHandler) ProcessMetricPoint(point schema.MetricPoint, partition in.pressureIdx.Add(int(time.Since(pre).Nanoseconds())) if !ok { + in.unknownMP.Inc() return } @@ -74,15 +79,15 @@ func (in DefaultHandler) ProcessMetricPoint(point schema.MetricPoint, partition // ProcessMetricData assures the data is stored and the metadata is in the index // concurrency-safe. func (in DefaultHandler) ProcessMetricData(md *schema.MetricData, partition int32) { - in.metricsReceived.Inc() + in.receivedMD.Inc() err := md.Validate() if err != nil { - in.MetricInvalid.Inc() + in.invalidMD.Inc() log.Debug("in: Invalid metric %v: %s", md, err) return } if md.Time == 0 { - in.MetricInvalid.Inc() + in.invalidMD.Inc() log.Warn("in: invalid metric. metric.Time is 0. %s", md.Id) return } diff --git a/scripts/qa/verify_metrics_received.py b/scripts/qa/verify_metrics_received.py index c0a69f1151..2646415d8b 100755 --- a/scripts/qa/verify_metrics_received.py +++ b/scripts/qa/verify_metrics_received.py @@ -29,7 +29,7 @@ def error(msg): }, 'data': { 'target': - 'perSecond(metrictank.stats.docker-env.default.input.carbon.metrics_received.counter32)', + 'perSecond(metrictank.stats.docker-env.default.input.carbon.metricdata.received.counter32)', }, }