Skip to content
This repository was archived by the owner on Aug 23, 2023. It is now read-only.

cleanup carbon metrics for out-of-order vs duplicate data points, cleaner names in sync with prom metrics #1288

Merged
merged 11 commits into from
Apr 23, 2019
Merged
15 changes: 5 additions & 10 deletions dashboards/extra/fakemetrics-discarded-samples.json
Original file line number Diff line number Diff line change
Expand Up @@ -86,30 +86,25 @@
"targets": [
{
"refCount": 0,
"refId": "A",
"target": "aliasByNode(metrictank.stats.$environment.$instance.tank.metrics_too_old.counter32, 4, 5)"
},
{
"refCount": 0,
"refId": "B",
"target": "aliasByNode(metrictank.stats.$environment.$instance.tank.add_to_closed_chunk.counter32, 4, 5)"
"refId": "F",
"target": "aliasByNode(metrictank.stats.$environment.default.tank.discarded.*.counter32, 4, 5, 6)"
},
{
"refCount": 0,
"refId": "C",
"target": "aliasByNode(metrictank.stats.$environment.$instance.input.*.metricdata.invalid.counter32, 4, 5, 6, 7)",
"target": "aliasByNode(metrictank.stats.$environment.$instance.input.*.metricdata.discarded.invalid.counter32, 4, 5, 6, 7, 8)",
"textEditor": false
},
{
"refCount": 0,
"refId": "D",
"target": "aliasByNode(metrictank.stats.$environment.$instance.input.*.metricpoint.invalid.counter32, 4, 5, 6, 7)",
"target": "aliasByNode(metrictank.stats.$environment.$instance.input.*.metricpoint.discarded.invalid.counter32, 4, 5, 6, 7, 8)",
"textEditor": false
},
{
"refCount": 0,
"refId": "E",
"target": "aliasByNode(metrictank.stats.$environment.$instance.input.*.metricpoint.unknown.counter32, 4, 5, 6, 7)",
"target": "aliasByNode(metrictank.stats.$environment.$instance.input.*.metricpoint.discarded.unknown.counter32, 4, 5, 6, 7, 8)",
"textEditor": false
}
],
Expand Down
10 changes: 5 additions & 5 deletions dashboards/main/metrictank.json
Original file line number Diff line number Diff line change
Expand Up @@ -155,19 +155,19 @@
},
{
"refId": "C",
"target": "groupByNodes(perSecond(metrictank.stats.$environment.$instance.input.*.*.invalid.counter32), 'sum', 5, 6, 7)"
"target": "groupByNodes(perSecond(metrictank.stats.$environment.$instance.input.*.*.discarded.invalid.counter32), 'sum', 5, 6, 7, 8)"
},
{
"refId": "D",
"target": "groupByNodes(perSecond(metrictank.stats.$environment.$instance.input.*.*.unknown.counter32), 'sum', 5, 6, 7)"
"target": "groupByNodes(perSecond(metrictank.stats.$environment.$instance.input.*.*.discarded.unknown.counter32), 'sum', 5, 6, 7, 8)"
},
{
"refId": "E",
"target": "alias(sumSeries(perSecond(metrictank.stats.$environment.$instance.tank.metrics_too_old.counter32)), 'too old')"
"target": "alias(sumSeries(perSecond(metrictank.stats.$environment.$instance.tank.discarded.sample-out-of-order.counter32)), 'too old')"
},
{
"refId": "F",
"target": "alias(sumSeries(perSecond(metrictank.stats.$environment.$instance.tank.add_to_closed_chunk.counter32)), 'add-to-saved')"
"target": "alias(sumSeries(perSecond(metrictank.stats.$environment.$instance.tank.discarded.received-too-late.counter32)), 'add-to-saved')"
Copy link
Contributor

@Dieterbe Dieterbe Apr 18, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ahh.. i love a clean dashboard json diff like this done.
was it a lot of work? I suspect you either did a manual json edit or had to do lots of git add -p or git checkout -p if you exported out of grafana.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This time I did manual json edit. Last time I did an export of grafana and managed to get a decent diff.

},
{
"refId": "G",
Expand Down Expand Up @@ -4763,4 +4763,4 @@
"title": "Metrictank",
"uid": "tQW3QShiz",
"version": 5
}
}
33 changes: 21 additions & 12 deletions docs/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -212,16 +212,16 @@ the duration of successful memory idx prunes
the duration of (successful) update of a metric to the memory idx
* `idx.metrics_active`:
the number of currently known metrics in the index
* `input.%s.metricdata.invalid`:
* `input.%s.metricdata.discarded.invalid`:
a count of times a metricdata was invalid by input plugin
* `input.%s.metricdata.received`:
the count of metricdata datapoints received by input plugin
* `input.%s.metricpoint.invalid`:
* `input.%s.metricpoint.discarded.invalid`:
a count of times a metricpoint was invalid by input plugin
* `input.%s.metricpoint.discarded.unknown`:
the count of times the ID of a received metricpoint was not in the index, by input plugin
* `input.%s.metricpoint.received`:
the count of metricpoint datapoints received by input plugin
* `input.%s.metricpoint.unknown`:
the count of times the ID of a received metricpoint was not in the index, by input plugin
* `input.%s.metricpoint_no_org.received`:
the count of metricpoint_no_org datapoints received by input plugin
* `input.carbon.metrics_decode_err`:
Expand Down Expand Up @@ -344,15 +344,27 @@ the duration of a put in the wait queue
how many rows come per get response
* `store.cassandra.to_iter`:
the duration of converting chunks to iterators
* `tank.add_to_closed_chunk`:
points received for the most recent chunk
when that chunk is already being "closed", ie the end-of-stream marker has been written to the chunk.
this indicates that your GC is actively sealing chunks and saving them before you have the chance to send
your (infrequent) updates. Any points revcieved for a chunk that has already been closed are discarded.
* `tank.chunk_operations.clear`:
a counter of how many chunks are cleared (replaced by new chunks)
* `tank.chunk_operations.create`:
a counter of how many chunks are created
* `tank.discarded.new-value-for-timestamp`:
points that have timestamps for which we already have data points.
these points are discarded.
data points can be incorrectly classified as metric tank.discarded.sample-out-of-order even when the timestamp
has already been used. This happens in two cases:
- when the reorder buffer is enabled, if the point is older than the reorder buffer retention window
- when the reorder buffer is disabled, if the point is older than the last data point
* `tank.discarded.received-too-late`:
points received for the most recent chunk
when that chunk is already being "closed", ie the end-of-stream marker has been written to the chunk.
this indicates that your GC is actively sealing chunks and saving them before you have the chance to send
your (infrequent) updates. Any points revcieved for a chunk that has already been closed are discarded.
* `tank.discarded.sample-out-of-order`:
points that go back in time beyond the scope of the optional reorder window.
these points will end up being dropped and lost.
* `tank.discarded.unknown`:
points that have been discarded for unknown reasons.
* `tank.gc_metric`:
the number of times the metrics GC is about to inspect a metric (series)
* `tank.metrics_active`:
Expand All @@ -362,9 +374,6 @@ the number of points received that are going back in time, but are still
within the reorder window. in such a case they will be inserted in the correct order.
E.g. if the reorder window is 60 (datapoints) then points may be inserted at random order as long as their
ts is not older than the 60th datapoint counting from the newest.
* `tank.metrics_too_old`:
points that go back in time beyond the scope of the optional reorder window.
these points will end up being dropped and lost.
* `tank.persist`:
how long it takes to persist a chunk (and chunks preceding it)
this is subject to backpressure from the store when the store's queue runs full
Expand Down
10 changes: 5 additions & 5 deletions docs/operations.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,14 @@ this will give instant insights in all the performance metrics of Metrictank.
* `metrictank.stats.$environment.$instance.cluster.primary.gauge1`: assure you have exactly 1 primary node (saving to cassandra) or as many as you have shardgroups, for sharded setups.
* `metrictank.stats.$environment.$instance.input.kafka-mdm.partition.*.lag.gauge64`: kafka lag, depending on your throughput you can always expect some lag, but it should be in the thousands not millions.
* `metrictank.stats.$environment.$instance.store.cassandra.write_queue.*.items.{min,max}.gauge32`: make sure the write queues are able to drain. For primary nodes that are also used for qureies, assert the write queues don't reach capacity, otherwise ingest will block and data will lag behind in queries.
* `metrictank.stats.$environment.$instance.input.*.metricpoint.unknown.counter32`: counter of MetricPoint messages for an unknown metric, will be dropped.
* `metrictank.stats.$environment.$instance.input.*.metricpoint.discarded.unknown.counter32`: counter of MetricPoint messages for an unknown metric, will be dropped.
* `metrictank.stats.$environment.$instance.input.*.metrics_decode_err.counter32`: counter of incoming data that could not be decoded.
* `metrictank.stats.$environment.$instance.input.*.*.invalid.counter32`: counter of incoming data that could not be decoded.
* `metrictank.stats.$environment.$instance.tank.metrics_too_old.counter32`: counter of points that are too old and can't be added.
* `metrictank.stats.$environment.$instance.input.*.*.discarded.invalid.counter32`: counter of incoming data that could not be decoded.
* `metrictank.stats.$environment.$instance.tank.discarded.sample-out-of-order.counter32`: counter of points that are too old and can't be added.
* `metrictank.stats.$environment.$instance.api.request.render.latency.*.gauge32`: shows how fast/slow metrictank responds to http queries
* `metrictank.stats.$environment.$instance.api.request.render*.status.*.counter32`: counters per status code. make sure most, or all result in http-200's.
* `metrictank.stats.$environment.$instance.store.cassandra.error.*`: shows erroring queries. Queries that result in errors (or timeouts) will result in missing data in your charts.
* `perSecond(metrictank.stats.$environment.$instance.tank.add_to_closed_chunk.counter32)`: Points dropped due to chunks being closed. Need to tune the chunk-max-stale setting or fix your data stream to not send old points so late.
* `perSecond(metrictank.stats.$environment.$instance.tank.discarded.received-too-late.counter32)`: Points dropped due to chunks being closed. Need to tune the chunk-max-stale setting or fix your data stream to not send old points so late.
* `metrictank.stats.$environment.$instance.recovered_errors.*.*.*` : any internal errors that were recovered from automatically (should be 0. If not, please create an issue)

If you expect consistent or predictable load, you may also want to monitor:
Expand Down Expand Up @@ -164,7 +164,7 @@ For more information on profiling see the excellent [Profiling Go Programs](http
* check if any points are being rejected, using the ingest chart on the dashboard (e.g. out of order, invalid)
* can use debug logging to trace data throughout the pipeline. mt-store-cat to see what's in cassandra, mt-kafka-mdm-sniff, etc.
* if it's old data, make sure you have a primary that can save data to cassandra, that the write queue can drain
* check `metric-max-stale` and `chunk-max-stale` settings, make sure chunks are not being prematurely sealed (happens in some rare cases if you send data very infrequently. see `tank.add_to_closed_chunk` metric)
* check `metric-max-stale` and `chunk-max-stale` settings, make sure chunks are not being prematurely sealed (happens in some rare cases if you send data very infrequently. see `tank.discarded.received-too-late` metric)
* did you restart instances? if so: make sure your instances start replaying data within the allotted "overhead window". E.g. if your kafka retention is 7 hours and your largest chunks are 6 hours, then instances need to start replaying data within an hour after startup. (so make sure processing of metricpersist messages, index loading, etc doesn't take too long). Any subsequent restart (e.g. due to kafka removing a segment currently being consumed) starts the process from zero again, so watch out. Increase kafka retention as needed.

In the below example, we:
Expand Down
12 changes: 6 additions & 6 deletions input/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,12 @@ func NewDefaultHandler(metrics mdata.Metrics, metricIndex idx.MetricIndex, input
receivedMP: stats.NewCounter32(fmt.Sprintf("input.%s.metricpoint.received", input)),
// metric input.%s.metricpoint_no_org.received is the count of metricpoint_no_org datapoints received by input plugin
receivedMPNO: stats.NewCounter32(fmt.Sprintf("input.%s.metricpoint_no_org.received", input)),
// metric input.%s.metricdata.invalid is a count of times a metricdata was invalid by input plugin
invalidMD: stats.NewCounterRate32(fmt.Sprintf("input.%s.metricdata.invalid", input)),
// metric input.%s.metricpoint.invalid is a count of times a metricpoint was invalid by input plugin
invalidMP: stats.NewCounterRate32(fmt.Sprintf("input.%s.metricpoint.invalid", input)),
// metric input.%s.metricpoint.unknown is the count of times the ID of a received metricpoint was not in the index, by input plugin
unknownMP: stats.NewCounter32(fmt.Sprintf("input.%s.metricpoint.unknown", input)),
// metric input.%s.metricdata.discarded.invalid is a count of times a metricdata was invalid by input plugin
invalidMD: stats.NewCounterRate32(fmt.Sprintf("input.%s.metricdata.discarded.invalid", input)),
// metric input.%s.metricpoint.discarded.invalid is a count of times a metricpoint was invalid by input plugin
invalidMP: stats.NewCounterRate32(fmt.Sprintf("input.%s.metricpoint.discarded.invalid", input)),
// metric input.%s.metricpoint.discarded.unknown is the count of times the ID of a received metricpoint was not in the index, by input plugin
unknownMP: stats.NewCounter32(fmt.Sprintf("input.%s.metricpoint.discarded.unknown", input)),

metrics: metrics,
metricIndex: metricIndex,
Expand Down
35 changes: 22 additions & 13 deletions mdata/aggmetric.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/grafana/metrictank/consolidation"
"github.com/grafana/metrictank/mdata/cache"
"github.com/grafana/metrictank/mdata/chunk"
mdataerrors "github.com/grafana/metrictank/mdata/errors"
"github.com/raintank/schema"
log "github.com/sirupsen/logrus"
)
Expand Down Expand Up @@ -421,15 +422,8 @@ func (a *AggMetric) Add(ts uint32, val float64) {
}
}
} else {
var reason string
switch err {
case errMetricTooOld:
reason = sampleOutOfOrder
metricsTooOld.Inc()
default:
reason = "unknown"
}
PromDiscardedSamples.WithLabelValues(reason, strconv.Itoa(int(a.key.MKey.Org))).Inc()
log.Debugf("AM: failed to add metric to reorder buffer for %s. %s", a.key, err)
a.discardedMetricsInc(err)
}
}
}
Expand Down Expand Up @@ -469,15 +463,14 @@ func (a *AggMetric) add(ts uint32, val float64) {
if currentChunk.Series.Finished {
// if we've already 'finished' the chunk, it means it has the end-of-stream marker and any new points behind it wouldn't be read by an iterator
// you should monitor this metric closely, it indicates that maybe your GC settings don't match how you actually send data (too late)
addToClosedChunk.Inc()
discardedReceivedTooLate.Inc()
PromDiscardedSamples.WithLabelValues(receivedTooLate, strconv.Itoa(int(a.key.MKey.Org))).Inc()
return
}

if err := currentChunk.Push(ts, val); err != nil {
log.Debugf("AM: failed to add metric to chunk for %s. %s", a.key, err)
metricsTooOld.Inc()
PromDiscardedSamples.WithLabelValues(sampleOutOfOrder, strconv.Itoa(int(a.key.MKey.Org))).Inc()
a.discardedMetricsInc(err)
return
}
totalPoints.Inc()
Expand All @@ -488,7 +481,7 @@ func (a *AggMetric) add(ts uint32, val float64) {
}
} else if t0 < currentChunk.Series.T0 {
log.Debugf("AM: Point at %d has t0 %d, goes back into previous chunk. CurrentChunk t0: %d, LastTs: %d", ts, t0, currentChunk.Series.T0, currentChunk.Series.T)
metricsTooOld.Inc()
discardedSampleOutOfOrder.Inc()
PromDiscardedSamples.WithLabelValues(sampleOutOfOrder, strconv.Itoa(int(a.key.MKey.Org))).Inc()
return
} else {
Expand Down Expand Up @@ -631,3 +624,19 @@ func (a *AggMetric) gcAggregators(now, chunkMinTs, metricMinTs uint32) (uint32,
}
return points, stale
}

func (a *AggMetric) discardedMetricsInc(err error) {
var reason string
switch err {
case mdataerrors.ErrMetricTooOld:
reason = sampleOutOfOrder
discardedSampleOutOfOrder.Inc()
case mdataerrors.ErrMetricNewValueForTimestamp:
reason = newValueForTimestamp
discardedNewValueForTimestamp.Inc()
default:
discardedUnknown.Inc()
reason = "unknown"
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

from #1201 :

Every point we receive should either be persisted or we should increment a counter to say we discarded

thus in the unknown case, we should also increment a carbon counter.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we create a new catch-all sort of counter? tank.discarded.unknown perhaps?
(note that this case does not happen atm)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

PromDiscardedSamples.WithLabelValues(reason, strconv.Itoa(int(a.key.MKey.Org))).Inc()
}
8 changes: 4 additions & 4 deletions mdata/aggmetric_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,10 +257,10 @@ func TestAggMetricWithReorderBuffer(t *testing.T) {
c.Add(375, 375)
c.Verify(true, 120, 479, 121, 375)

metricsTooOld.SetUint32(0)
discardedSampleOutOfOrder.SetUint32(0)

// adds 10 entries that are out of order and the reorder buffer should order the first 9
// the last item (365) will be too old, so it increases metricsTooOld counter
// the last item (365) will be too old, so it increases discardedSampleOutOfOrder counter
for i := uint32(374); i > 364; i-- {
c.Add(i, float64(i))
}
Expand All @@ -270,8 +270,8 @@ func TestAggMetricWithReorderBuffer(t *testing.T) {
c.Verify(true, 120, 380, 121, 375)

// one point has been added out of order and too old for the buffer to reorder
if metricsTooOld.Peek() != 1 {
t.Fatalf("Expected the out of order count to be 1, not %d", metricsTooOld.Peek())
if discardedSampleOutOfOrder.Peek() != 1 {
t.Fatalf("Expected the out of order count to be 1, not %d", discardedSampleOutOfOrder.Peek())
}
}

Expand Down
7 changes: 5 additions & 2 deletions mdata/chunk/chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"

"github.com/grafana/metrictank/mdata/chunk/tsz"
"github.com/grafana/metrictank/mdata/errors"
)

// Chunk is a chunk of data. not concurrency safe.
Expand Down Expand Up @@ -39,8 +40,10 @@ func (c *Chunk) String() string {
}

func (c *Chunk) Push(t uint32, v float64) error {
if t <= c.Series.T {
return fmt.Errorf("Point must be newer than already added points. t:%d lastTs: %d", t, c.Series.T)
if t == c.Series.T {
return errors.ErrMetricNewValueForTimestamp
} else if t < c.Series.T {
return errors.ErrMetricTooOld
}
c.Series.Push(t, v)
c.NumPoints++
Expand Down
38 changes: 38 additions & 0 deletions mdata/chunk/chunk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"math"
"testing"

"github.com/grafana/metrictank/mdata/errors"
"github.com/raintank/schema"
)

Expand Down Expand Up @@ -81,3 +82,40 @@ func equal(exp, got []schema.Point) bool {
}
return true
}

type expectedData struct {
NumPoints uint32
Err error
}

func testPush(t *testing.T, points []schema.Point, expected []expectedData) {
chunk := New(0)
for i := range points {
err := chunk.Push(points[i].Ts, points[i].Val)
if chunk.NumPoints != expected[i].NumPoints {
t.Fatalf("Expected %d points pushed but had %d", expected[i].NumPoints, chunk.NumPoints)
}
if err != expected[i].Err {
t.Fatalf("Expected error %v but received %v", expected[i].Err, err)
}
}
}

func TestChunkPush(t *testing.T) {
points := []schema.Point{
{Ts: 1001, Val: 100},
{Ts: 1002, Val: 100},
{Ts: 1002, Val: 100},
{Ts: 1003, Val: 100},
{Ts: 999, Val: 100},
}
expected := []expectedData{
{1, nil},
{2, nil},
{2, errors.ErrMetricNewValueForTimestamp},
{3, nil},
{3, errors.ErrMetricTooOld},
}

testPush(t, points, expected)
}
10 changes: 10 additions & 0 deletions mdata/errors/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package errors

import (
"errors"
)

var (
ErrMetricTooOld = errors.New("metric too old")
ErrMetricNewValueForTimestamp = errors.New("new value for existing timestamp")
)
Loading