Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
* [ENHANCEMENT] Experimental TSDB: sample ingestion errors are now reported via existing `cortex_discarded_samples_total` metric. #2370
* [ENHANCEMENT] Failures on samples at distributors and ingesters return the first validation error as opposed to the last. #2383
* [ENHANCEMENT] Experimental TSDB: Added `cortex_querier_blocks_meta_synced`, which reflects current state of synced blocks over all tenants. #2392
* [ENHANCEMENT] Added `cortex_distributor_latest_seen_sample_timestamp_seconds` metric to see how far behind Prometheus servers are in sending data. #2371

## 1.0.0 / 2020-04-02

Expand Down
17 changes: 17 additions & 0 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,10 @@ var (
Name: "distributor_replication_factor",
Help: "The configured replication factor.",
})
latestSeenSampleTimestampPerUser = promauto.NewGaugeVec(prometheus.GaugeOpts{
Name: "cortex_distributor_latest_seen_sample_timestamp_seconds",
Help: "Unix timestamp of latest received sample per user.",
}, []string{"user"})
emptyPreallocSeries = ingester_client.PreallocTimeseries{}
)

Expand Down Expand Up @@ -359,12 +363,25 @@ func (d *Distributor) Push(ctx context.Context, req *client.WriteRequest) (*clie
}
}

latestSampleTimestampMs := int64(0)
defer func() {
// Update this metric even in case of errors.
if latestSampleTimestampMs > 0 {
latestSeenSampleTimestampPerUser.WithLabelValues(userID).Set(float64(latestSampleTimestampMs) / 1000)
}
}()

// For each timeseries, compute a hash to distribute across ingesters;
// check each sample and discard if outside limits.
validatedTimeseries := make([]client.PreallocTimeseries, 0, len(req.Timeseries))
keys := make([]uint32, 0, len(req.Timeseries))
validatedSamples := 0
for _, ts := range req.Timeseries {
// Use timestamp of latest sample in the series. If samples for series are not ordered, metric for user may be wrong.
if len(ts.Samples) > 0 {
latestSampleTimestampMs = util.Max64(latestSampleTimestampMs, ts.Samples[len(ts.Samples)-1].TimestampMs)
}

// If we found both the cluster and replica labels, we only want to include the cluster label when
// storing series in Cortex. If we kept the replica label we would end up with another series for the same
// series we're trying to dedupe when HA tracking moves over to a different replica.
Expand Down
79 changes: 62 additions & 17 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/promql"
Expand Down Expand Up @@ -46,12 +47,18 @@ var (
)

func TestDistributor_Push(t *testing.T) {
metricNames := []string{
"cortex_distributor_latest_seen_sample_timestamp_seconds",
}

for name, tc := range map[string]struct {
numIngesters int
happyIngesters int
samples int
startTimestampMs int64
expectedResponse *client.WriteResponse
expectedError error
expectedMetrics string
}{
"A push of no samples shouldn't block or return error, even if ingesters are sad": {
numIngesters: 3,
Expand All @@ -63,34 +70,66 @@ func TestDistributor_Push(t *testing.T) {
happyIngesters: 3,
samples: 10,
expectedResponse: success,
startTimestampMs: 123456789000,
expectedMetrics: `
# HELP cortex_distributor_latest_seen_sample_timestamp_seconds Unix timestamp of latest received sample per user.
# TYPE cortex_distributor_latest_seen_sample_timestamp_seconds gauge
cortex_distributor_latest_seen_sample_timestamp_seconds{user="user"} 123456789.009
`,
},
"A push to 2 happy ingesters should succeed": {
numIngesters: 3,
happyIngesters: 2,
samples: 10,
expectedResponse: success,
startTimestampMs: 123456789000,
expectedMetrics: `
# HELP cortex_distributor_latest_seen_sample_timestamp_seconds Unix timestamp of latest received sample per user.
# TYPE cortex_distributor_latest_seen_sample_timestamp_seconds gauge
cortex_distributor_latest_seen_sample_timestamp_seconds{user="user"} 123456789.009
`,
},
"A push to 1 happy ingesters should fail": {
numIngesters: 3,
happyIngesters: 1,
samples: 10,
expectedError: errFail,
numIngesters: 3,
happyIngesters: 1,
samples: 10,
expectedError: errFail,
startTimestampMs: 123456789000,
expectedMetrics: `
# HELP cortex_distributor_latest_seen_sample_timestamp_seconds Unix timestamp of latest received sample per user.
# TYPE cortex_distributor_latest_seen_sample_timestamp_seconds gauge
cortex_distributor_latest_seen_sample_timestamp_seconds{user="user"} 123456789.009
`,
},
"A push to 0 happy ingesters should fail": {
numIngesters: 3,
happyIngesters: 0,
samples: 10,
expectedError: errFail,
numIngesters: 3,
happyIngesters: 0,
samples: 10,
expectedError: errFail,
startTimestampMs: 123456789000,
expectedMetrics: `
# HELP cortex_distributor_latest_seen_sample_timestamp_seconds Unix timestamp of latest received sample per user.
# TYPE cortex_distributor_latest_seen_sample_timestamp_seconds gauge
cortex_distributor_latest_seen_sample_timestamp_seconds{user="user"} 123456789.009
`,
},
"A push exceeding burst size should fail": {
numIngesters: 3,
happyIngesters: 3,
samples: 30,
expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (20) exceeded while adding 30 samples"),
numIngesters: 3,
happyIngesters: 3,
samples: 30,
expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (20) exceeded while adding 30 samples"),
startTimestampMs: 123456789000,
expectedMetrics: `
# HELP cortex_distributor_latest_seen_sample_timestamp_seconds Unix timestamp of latest received sample per user.
# TYPE cortex_distributor_latest_seen_sample_timestamp_seconds gauge
cortex_distributor_latest_seen_sample_timestamp_seconds{user="user"} 123456789.029
`,
},
} {
for _, shardByAllLabels := range []bool{true, false} {
t.Run(fmt.Sprintf("[%s](shardByAllLabels=%v)", name, shardByAllLabels), func(t *testing.T) {
latestSeenSampleTimestampPerUser.Reset()

limits := &validation.Limits{}
flagext.DefaultValues(limits)
limits.IngestionRate = 20
Expand All @@ -99,10 +138,16 @@ func TestDistributor_Push(t *testing.T) {
d, _ := prepare(t, tc.numIngesters, tc.happyIngesters, 0, shardByAllLabels, limits, nil)
defer services.StopAndAwaitTerminated(context.Background(), d) //nolint:errcheck

request := makeWriteRequest(tc.samples)
request := makeWriteRequest(tc.startTimestampMs, tc.samples)
response, err := d.Push(ctx, request)
assert.Equal(t, tc.expectedResponse, response)
assert.Equal(t, tc.expectedError, err)

// Check tracked Prometheus metrics.
if tc.expectedMetrics != "" {
err = testutil.GatherAndCompare(prometheus.DefaultGatherer, strings.NewReader(tc.expectedMetrics), metricNames...)
assert.NoError(t, err)
}
})
}
}
Expand Down Expand Up @@ -189,7 +234,7 @@ func TestDistributor_PushIngestionRateLimiter(t *testing.T) {

// Push samples in multiple requests to the first distributor
for _, push := range testData.pushes {
request := makeWriteRequest(push.samples)
request := makeWriteRequest(0, push.samples)
response, err := distributors[0].Push(ctx, request)

if push.expectedError == nil {
Expand Down Expand Up @@ -375,7 +420,7 @@ func TestDistributor_PushQuery(t *testing.T) {
d, _ := prepare(t, tc.numIngesters, tc.happyIngesters, 0, tc.shardByAllLabels, nil, nil)
defer services.StopAndAwaitTerminated(context.Background(), d) //nolint:errcheck

request := makeWriteRequest(tc.samples)
request := makeWriteRequest(0, tc.samples)
writeResponse, err := d.Push(ctx, request)
assert.Equal(t, &client.WriteResponse{}, writeResponse)
assert.Nil(t, err)
Expand Down Expand Up @@ -772,7 +817,7 @@ func prepare(t *testing.T, numIngesters, happyIngesters int, queryDelay time.Dur
return d, ingesters
}

func makeWriteRequest(samples int) *client.WriteRequest {
func makeWriteRequest(startTimestampMs int64, samples int) *client.WriteRequest {
request := &client.WriteRequest{}
for i := 0; i < samples; i++ {
ts := client.PreallocTimeseries{
Expand All @@ -787,7 +832,7 @@ func makeWriteRequest(samples int) *client.WriteRequest {
ts.Samples = []client.Sample{
{
Value: float64(i),
TimestampMs: int64(i),
TimestampMs: startTimestampMs + int64(i),
},
}
request.Timeseries = append(request.Timeseries, ts)
Expand Down