diff --git a/CHANGELOG.md b/CHANGELOG.md index b0f9aff9118..4c650328d14 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ * [ENHANCEMENT] Experimental TSDB: sample ingestion errors are now reported via existing `cortex_discarded_samples_total` metric. #2370 * [ENHANCEMENT] Failures on samples at distributors and ingesters return the first validation error as opposed to the last. #2383 * [ENHANCEMENT] Experimental TSDB: Added `cortex_querier_blocks_meta_synced`, which reflects current state of synced blocks over all tenants. #2392 +* [ENHANCEMENT] Added `cortex_distributor_latest_seen_sample_timestamp_seconds` metric to see how far behind Prometheus servers are in sending data. #2371 ## 1.0.0 / 2020-04-02 diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 094a5536e8f..20bba2ef046 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -88,6 +88,10 @@ var ( Name: "distributor_replication_factor", Help: "The configured replication factor.", }) + latestSeenSampleTimestampPerUser = promauto.NewGaugeVec(prometheus.GaugeOpts{ + Name: "cortex_distributor_latest_seen_sample_timestamp_seconds", + Help: "Unix timestamp of latest received sample per user.", + }, []string{"user"}) emptyPreallocSeries = ingester_client.PreallocTimeseries{} ) @@ -359,12 +363,25 @@ func (d *Distributor) Push(ctx context.Context, req *client.WriteRequest) (*clie } } + latestSampleTimestampMs := int64(0) + defer func() { + // Update this metric even in case of errors. + if latestSampleTimestampMs > 0 { + latestSeenSampleTimestampPerUser.WithLabelValues(userID).Set(float64(latestSampleTimestampMs) / 1000) + } + }() + // For each timeseries, compute a hash to distribute across ingesters; // check each sample and discard if outside limits. validatedTimeseries := make([]client.PreallocTimeseries, 0, len(req.Timeseries)) keys := make([]uint32, 0, len(req.Timeseries)) validatedSamples := 0 for _, ts := range req.Timeseries { + // Use timestamp of latest sample in the series. If samples for series are not ordered, metric for user may be wrong. + if len(ts.Samples) > 0 { + latestSampleTimestampMs = util.Max64(latestSampleTimestampMs, ts.Samples[len(ts.Samples)-1].TimestampMs) + } + // If we found both the cluster and replica labels, we only want to include the cluster label when // storing series in Cortex. If we kept the replica label we would end up with another series for the same // series we're trying to dedupe when HA tracking moves over to a different replica. diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index 3ed14d1a607..d969b70f526 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -15,6 +15,7 @@ import ( "time" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/testutil" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/promql" @@ -46,12 +47,18 @@ var ( ) func TestDistributor_Push(t *testing.T) { + metricNames := []string{ + "cortex_distributor_latest_seen_sample_timestamp_seconds", + } + for name, tc := range map[string]struct { numIngesters int happyIngesters int samples int + startTimestampMs int64 expectedResponse *client.WriteResponse expectedError error + expectedMetrics string }{ "A push of no samples shouldn't block or return error, even if ingesters are sad": { numIngesters: 3, @@ -63,34 +70,66 @@ func TestDistributor_Push(t *testing.T) { happyIngesters: 3, samples: 10, expectedResponse: success, + startTimestampMs: 123456789000, + expectedMetrics: ` + # HELP cortex_distributor_latest_seen_sample_timestamp_seconds Unix timestamp of latest received sample per user. + # TYPE cortex_distributor_latest_seen_sample_timestamp_seconds gauge + cortex_distributor_latest_seen_sample_timestamp_seconds{user="user"} 123456789.009 + `, }, "A push to 2 happy ingesters should succeed": { numIngesters: 3, happyIngesters: 2, samples: 10, expectedResponse: success, + startTimestampMs: 123456789000, + expectedMetrics: ` + # HELP cortex_distributor_latest_seen_sample_timestamp_seconds Unix timestamp of latest received sample per user. + # TYPE cortex_distributor_latest_seen_sample_timestamp_seconds gauge + cortex_distributor_latest_seen_sample_timestamp_seconds{user="user"} 123456789.009 + `, }, "A push to 1 happy ingesters should fail": { - numIngesters: 3, - happyIngesters: 1, - samples: 10, - expectedError: errFail, + numIngesters: 3, + happyIngesters: 1, + samples: 10, + expectedError: errFail, + startTimestampMs: 123456789000, + expectedMetrics: ` + # HELP cortex_distributor_latest_seen_sample_timestamp_seconds Unix timestamp of latest received sample per user. + # TYPE cortex_distributor_latest_seen_sample_timestamp_seconds gauge + cortex_distributor_latest_seen_sample_timestamp_seconds{user="user"} 123456789.009 + `, }, "A push to 0 happy ingesters should fail": { - numIngesters: 3, - happyIngesters: 0, - samples: 10, - expectedError: errFail, + numIngesters: 3, + happyIngesters: 0, + samples: 10, + expectedError: errFail, + startTimestampMs: 123456789000, + expectedMetrics: ` + # HELP cortex_distributor_latest_seen_sample_timestamp_seconds Unix timestamp of latest received sample per user. + # TYPE cortex_distributor_latest_seen_sample_timestamp_seconds gauge + cortex_distributor_latest_seen_sample_timestamp_seconds{user="user"} 123456789.009 + `, }, "A push exceeding burst size should fail": { - numIngesters: 3, - happyIngesters: 3, - samples: 30, - expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (20) exceeded while adding 30 samples"), + numIngesters: 3, + happyIngesters: 3, + samples: 30, + expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (20) exceeded while adding 30 samples"), + startTimestampMs: 123456789000, + expectedMetrics: ` + # HELP cortex_distributor_latest_seen_sample_timestamp_seconds Unix timestamp of latest received sample per user. + # TYPE cortex_distributor_latest_seen_sample_timestamp_seconds gauge + cortex_distributor_latest_seen_sample_timestamp_seconds{user="user"} 123456789.029 + `, }, } { for _, shardByAllLabels := range []bool{true, false} { t.Run(fmt.Sprintf("[%s](shardByAllLabels=%v)", name, shardByAllLabels), func(t *testing.T) { + latestSeenSampleTimestampPerUser.Reset() + limits := &validation.Limits{} flagext.DefaultValues(limits) limits.IngestionRate = 20 @@ -99,10 +138,16 @@ func TestDistributor_Push(t *testing.T) { d, _ := prepare(t, tc.numIngesters, tc.happyIngesters, 0, shardByAllLabels, limits, nil) defer services.StopAndAwaitTerminated(context.Background(), d) //nolint:errcheck - request := makeWriteRequest(tc.samples) + request := makeWriteRequest(tc.startTimestampMs, tc.samples) response, err := d.Push(ctx, request) assert.Equal(t, tc.expectedResponse, response) assert.Equal(t, tc.expectedError, err) + + // Check tracked Prometheus metrics. + if tc.expectedMetrics != "" { + err = testutil.GatherAndCompare(prometheus.DefaultGatherer, strings.NewReader(tc.expectedMetrics), metricNames...) + assert.NoError(t, err) + } }) } } @@ -189,7 +234,7 @@ func TestDistributor_PushIngestionRateLimiter(t *testing.T) { // Push samples in multiple requests to the first distributor for _, push := range testData.pushes { - request := makeWriteRequest(push.samples) + request := makeWriteRequest(0, push.samples) response, err := distributors[0].Push(ctx, request) if push.expectedError == nil { @@ -375,7 +420,7 @@ func TestDistributor_PushQuery(t *testing.T) { d, _ := prepare(t, tc.numIngesters, tc.happyIngesters, 0, tc.shardByAllLabels, nil, nil) defer services.StopAndAwaitTerminated(context.Background(), d) //nolint:errcheck - request := makeWriteRequest(tc.samples) + request := makeWriteRequest(0, tc.samples) writeResponse, err := d.Push(ctx, request) assert.Equal(t, &client.WriteResponse{}, writeResponse) assert.Nil(t, err) @@ -772,7 +817,7 @@ func prepare(t *testing.T, numIngesters, happyIngesters int, queryDelay time.Dur return d, ingesters } -func makeWriteRequest(samples int) *client.WriteRequest { +func makeWriteRequest(startTimestampMs int64, samples int) *client.WriteRequest { request := &client.WriteRequest{} for i := 0; i < samples; i++ { ts := client.PreallocTimeseries{ @@ -787,7 +832,7 @@ func makeWriteRequest(samples int) *client.WriteRequest { ts.Samples = []client.Sample{ { Value: float64(i), - TimestampMs: int64(i), + TimestampMs: startTimestampMs + int64(i), }, } request.Timeseries = append(request.Timeseries, ts)