Skip to content
This repository was archived by the owner on Aug 23, 2023. It is now read-only.

Commit

Permalink
cleanup input handler metrics
Browse files Browse the repository at this point in the history
* received and invalid on a per-messagetype basis
* track unknown metricpoint metrics
* remove unused MsgsAge metric

note: dashboard now requires recent graphite which has groupByNodes
  • Loading branch information
Dieterbe committed Apr 10, 2018
1 parent f05deab commit 01dabf9
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 25 deletions.
33 changes: 24 additions & 9 deletions dashboard.json
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,16 @@
"yaxis": 2
},
{
"alias": "/ in/",
"alias": "/received/",
"lines": true,
"points": false
"points": false,
"color": "#3f6833"
},
{
"alias": "/metricpoint.received/",
"lines": true,
"points": false,
"color": "#7eb26d"
},
{
"alias": "/invalid/",
Expand All @@ -110,6 +117,10 @@
"alias": "/decode err/",
"color": "#890F02"
},
{
"alias": "/unknown/",
"color": "#890F02"
},
{
"alias": "/pressure\\./",
"fill": 0,
Expand All @@ -126,34 +137,38 @@
"targets": [
{
"refId": "A",
"target": "aliasSub(sumSeries(perSecond(metrictank.stats.$environment.$instance.input.*.metrics_received.counter32)), '.*\\.([^\\.]+)\\.metrics_received.*', '\\1 in')"
"target": "aliasSub(sumSeries(perSecond(metrictank.stats.$environment.$instance.input.*.metrics_decode_err.counter32)), '.*\\.([^\\.]+)\\.metrics_decode_err.*', '\\1 decode err')"
},
{
"refId": "B",
"target": "alias(sumSeries(perSecond(metrictank.stats.$environment.$instance.tank.metrics_too_old.counter32)), 'too old')"
"target": "groupByNodes(perSecond(metrictank.stats.$environment.$instance.input.*.*.received.counter32), 'sum', 5, 6, 7)"
},
{
"refId": "C",
"target": "alias(sumSeries(perSecond(metrictank.stats.$environment.$instance.tank.add_to_closed_chunk.counter32)), 'add-to-saved')"
"target": "groupByNodes(perSecond(metrictank.stats.$environment.$instance.input.*.*.invalid.counter32), 'sum', 5, 6, 7)"
},
{
"refId": "D",
"target": "aliasSub(sumSeries(perSecond(metrictank.stats.$environment.$instance.input.*.metrics_decode_err.counter32)), '.*\\.([^\\.]+)\\.metrics_decode_err.*', '\\1 decode err')"
"target": "groupByNodes(perSecond(metrictank.stats.$environment.$instance.input.*.*.unknown.counter32), 'sum', 5, 6, 7)"
},
{
"refId": "E",
"target": "aliasSub(sumSeries(perSecond(metrictank.stats.$environment.$instance.input.*.metric_invalid.counter32)), '.*\\.([^\\.]+)\\.metric_invalid.*', '\\1 metric invalid')"
"target": "alias(sumSeries(perSecond(metrictank.stats.$environment.$instance.tank.metrics_too_old.counter32)), 'too old')"
},
{
"refId": "F",
"target": "alias(sumSeries(perSecond(metrictank.stats.$environment.$instance.tank.metrics_reordered.counter32)), 'reordered')"
"target": "alias(sumSeries(perSecond(metrictank.stats.$environment.$instance.tank.add_to_closed_chunk.counter32)), 'add-to-saved')"
},
{
"refId": "G",
"target": "aliasByNode(averageSeries(scale(perSecond(metrictank.stats.$environment.$instance.input.*.pressure.tank.counter32), 1e-9)), 6, 7)"
"target": "alias(sumSeries(perSecond(metrictank.stats.$environment.$instance.tank.metrics_reordered.counter32)), 'reordered')"
},
{
"refId": "H",
"target": "aliasByNode(averageSeries(scale(perSecond(metrictank.stats.$environment.$instance.input.*.pressure.tank.counter32), 1e-9)), 6, 7)"
},
{
"refId": "I",
"target": "aliasByNode(averageSeries(scale(perSecond(metrictank.stats.$environment.$instance.input.*.pressure.idx.counter32), 1e-9)), 6, 7)"
}
],
Expand Down
1 change: 1 addition & 0 deletions docs/operations.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ Just make sure to have a properly configured statsd setup (or adjust the dashboa
* `metrictank.stats.$environment.$instance.cluster.primary.gauge1`: assure you have exactly 1 primary node (saving to cassandra) or as many as you have shardgroups, for sharded setups.
* `metrictank.stats.$environment.$instance.store.cassandra.write_queue.*.items.{min,max}.gauge32`: make sure the write queues are able to drain and don't reach capacity, otherwise ingest will block
* `metrictank.stats.$environment.$instance.input.*.pressure.idx.counter32`: index pressure as a ns counter, rise is between 0 and 10^9 each second. Alert if increase is more than 4x10^8 each second: this would signify the index can't keep up with indexing new data and is blocking ingestion pipeline.
* `metrictank.stats.$environment.$instance.input.*.metricpoint.unknown.counter32`: counter of MetricPoint messages for an unknown metric, will be dropped.
* `metrictank.stats.$environment.$instance.api.request_handle.latency.*.gauge32`: shows how fast/slow metrictank responds to http queries
* `metrictank.stats.$environment.$instance.store.cassandra.error.*`: shows erroring queries. Queries that result in errors (or timeouts) will result in missing data in your charts.
* `perSecond(metrictank.stats.$environment.$instance.tank.add_to_closed_chunk.counter32)`: Points dropped due to chunks being closed. Need to tune the chunk-max-stale setting or fix your data stream to not send old points so late.
Expand Down
35 changes: 20 additions & 15 deletions input/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,23 +23,27 @@ type Handler interface {

// Default is a base handler for a metrics packet, aimed to be embedded by concrete implementations
type DefaultHandler struct {
metricsReceived *stats.Counter32
MetricInvalid *stats.Counter32 // metric metric_invalid is a count of times a metric did not validate
MsgsAge *stats.Meter32 // in ms
pressureIdx *stats.Counter32
pressureTank *stats.Counter32
receivedMD *stats.Counter32
receivedMP *stats.Counter32
invalidMD *stats.Counter32
invalidMP *stats.Counter32
unknownMP *stats.Counter32
pressureIdx *stats.Counter32
pressureTank *stats.Counter32

metrics mdata.Metrics
metricIndex idx.MetricIndex
}

func NewDefaultHandler(metrics mdata.Metrics, metricIndex idx.MetricIndex, input string) DefaultHandler {
return DefaultHandler{
metricsReceived: stats.NewCounter32(fmt.Sprintf("input.%s.metrics_received", input)),
MetricInvalid: stats.NewCounter32(fmt.Sprintf("input.%s.metric_invalid", input)),
MsgsAge: stats.NewMeter32(fmt.Sprintf("input.%s.message_age", input), false),
pressureIdx: stats.NewCounter32(fmt.Sprintf("input.%s.pressure.idx", input)),
pressureTank: stats.NewCounter32(fmt.Sprintf("input.%s.pressure.tank", input)),
receivedMD: stats.NewCounter32(fmt.Sprintf("input.%s.metricdata.received", input)),
receivedMP: stats.NewCounter32(fmt.Sprintf("input.%s.metricpoint.received", input)),
invalidMD: stats.NewCounter32(fmt.Sprintf("input.%s.metricdata.invalid", input)),
invalidMP: stats.NewCounter32(fmt.Sprintf("input.%s.metricpoint.invalid", input)),
unknownMP: stats.NewCounter32(fmt.Sprintf("input.%s.metricpoint.unknown", input)),
pressureIdx: stats.NewCounter32(fmt.Sprintf("input.%s.pressure.idx", input)),
pressureTank: stats.NewCounter32(fmt.Sprintf("input.%s.pressure.tank", input)),

metrics: metrics,
metricIndex: metricIndex,
Expand All @@ -49,9 +53,9 @@ func NewDefaultHandler(metrics mdata.Metrics, metricIndex idx.MetricIndex, input
// ProcessMetricPoint updates the index if possible, and stores the data if we have an index entry
// concurrency-safe.
func (in DefaultHandler) ProcessMetricPoint(point schema.MetricPoint, partition int32) {
in.metricsReceived.Inc()
in.receivedMP.Inc()
if !point.Valid() {
in.MetricInvalid.Inc()
in.invalidMP.Inc()
log.Debug("in: Invalid metric %v", point)
return
}
Expand All @@ -61,6 +65,7 @@ func (in DefaultHandler) ProcessMetricPoint(point schema.MetricPoint, partition
in.pressureIdx.Add(int(time.Since(pre).Nanoseconds()))

if !ok {
in.unknownMP.Inc()
return
}

Expand All @@ -74,15 +79,15 @@ func (in DefaultHandler) ProcessMetricPoint(point schema.MetricPoint, partition
// ProcessMetricData assures the data is stored and the metadata is in the index
// concurrency-safe.
func (in DefaultHandler) ProcessMetricData(md *schema.MetricData, partition int32) {
in.metricsReceived.Inc()
in.receivedMD.Inc()
err := md.Validate()
if err != nil {
in.MetricInvalid.Inc()
in.invalidMD.Inc()
log.Debug("in: Invalid metric %v: %s", md, err)
return
}
if md.Time == 0 {
in.MetricInvalid.Inc()
in.invalidMD.Inc()
log.Warn("in: invalid metric. metric.Time is 0. %s", md.Id)
return
}
Expand Down
2 changes: 1 addition & 1 deletion scripts/qa/verify_metrics_received.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def error(msg):
},
'data': {
'target':
'perSecond(metrictank.stats.docker-env.default.input.carbon.metrics_received.counter32)',
'perSecond(metrictank.stats.docker-env.default.input.carbon.metricdata.received.counter32)',
},
}

Expand Down

0 comments on commit 01dabf9

Please sign in to comment.