Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 33 additions & 64 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"strings"
"time"

"github.com/go-kit/kit/log/level"
opentracing "github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -354,10 +353,6 @@ func (d *Distributor) validateSeries(ts ingester_client.PreallocTimeseries, user
nil
}

func (d *Distributor) validateMetadata(m *ingester_client.MetricMetadata, userID string) error {
return validation.ValidateMetadata(d.limits, userID, m)
}

// Push implements client.IngesterServer
func (d *Distributor) Push(ctx context.Context, req *client.WriteRequest) (*client.WriteResponse, error) {
userID, err := user.ExtractOrgID(ctx)
Expand Down Expand Up @@ -470,7 +465,7 @@ func (d *Distributor) Push(ctx context.Context, req *client.WriteRequest) (*clie
}

for _, m := range req.Metadata {
err := d.validateMetadata(m, userID)
err := validation.ValidateMetadata(d.limits, userID, m)

if err != nil {
if firstPartialErr == nil {
Expand Down Expand Up @@ -519,49 +514,32 @@ func (d *Distributor) Push(ctx context.Context, req *client.WriteRequest) (*clie
}
}

if len(metadataKeys) > 0 {
err = ring.DoBatch(ctx, subRing, metadataKeys, func(ingester ring.IngesterDesc, indexes []int) error {
metadata := make([]*client.MetricMetadata, 0, len(indexes))
for _, i := range indexes {
metadata = append(metadata, validatedMetadata[i])
}
keys := append(seriesKeys, metadataKeys...)
initialMetadataIndex := len(seriesKeys)

localCtx, cancel := context.WithTimeout(context.Background(), d.cfg.RemoteTimeout)
defer cancel()
localCtx = user.InjectUserID(localCtx, userID)

if sp := opentracing.SpanFromContext(ctx); sp != nil {
localCtx = opentracing.ContextWithSpan(localCtx, sp)
}

return d.sendMetadata(localCtx, ingester, metadata)
}, func() {})
if err != nil {
// Metadata is a best-effort approach so we consider failures non-fatal, log them, and move on.
logger := util.WithContext(ctx, util.Logger)
level.Error(logger).Log("msg", "Failed to send metadata to ingesters", "err", err)
}
}
err = ring.DoBatch(ctx, subRing, keys, func(ingester ring.IngesterDesc, indexes []int) error {
timeseries := make([]client.PreallocTimeseries, 0, len(indexes))
var metadata []*client.MetricMetadata

if len(seriesKeys) > 0 {
err = ring.DoBatch(ctx, subRing, seriesKeys, func(ingester ring.IngesterDesc, indexes []int) error {
timeseries := make([]client.PreallocTimeseries, 0, len(indexes))
for _, i := range indexes {
for _, i := range indexes {
if i >= initialMetadataIndex {
metadata = append(metadata, validatedMetadata[i-initialMetadataIndex])
} else {
timeseries = append(timeseries, validatedTimeseries[i])
}
}

// Use a background context to make sure all ingesters get samples even if we return early
localCtx, cancel := context.WithTimeout(context.Background(), d.cfg.RemoteTimeout)
defer cancel()
localCtx = user.InjectOrgID(localCtx, userID)
if sp := opentracing.SpanFromContext(ctx); sp != nil {
localCtx = opentracing.ContextWithSpan(localCtx, sp)
}
return d.sendSamples(localCtx, ingester, timeseries)
}, func() { client.ReuseSlice(req.Timeseries) })
if err != nil {
return nil, err
// Use a background context to make sure all ingesters get samples even if we return early
localCtx, cancel := context.WithTimeout(context.Background(), d.cfg.RemoteTimeout)
defer cancel()
localCtx = user.InjectOrgID(localCtx, userID)
if sp := opentracing.SpanFromContext(ctx); sp != nil {
localCtx = opentracing.ContextWithSpan(localCtx, sp)
}
return d.send(localCtx, ingester, timeseries, metadata)
}, func() { client.ReuseSlice(req.Timeseries) })
if err != nil {
return nil, err
}
return &client.WriteResponse{}, firstPartialErr
}
Expand All @@ -588,7 +566,7 @@ func sortLabelsIfNeeded(labels []client.LabelAdapter) {
})
}

func (d *Distributor) sendSamples(ctx context.Context, ingester ring.IngesterDesc, timeseries []client.PreallocTimeseries) error {
func (d *Distributor) send(ctx context.Context, ingester ring.IngesterDesc, timeseries []client.PreallocTimeseries, metadata []*client.MetricMetadata) error {
h, err := d.ingesterPool.GetClientFor(ingester.Addr)
if err != nil {
return err
Expand All @@ -597,32 +575,23 @@ func (d *Distributor) sendSamples(ctx context.Context, ingester ring.IngesterDes

req := client.WriteRequest{
Timeseries: timeseries,
Metadata: metadata,
}
_, err = c.Push(ctx, &req)

ingesterAppends.WithLabelValues(ingester.Addr, typeSamples).Inc()
if err != nil {
ingesterAppendFailures.WithLabelValues(ingester.Addr, typeSamples).Inc()
}
return err
}

func (d *Distributor) sendMetadata(ctx context.Context, ingester ring.IngesterDesc, metadata []*client.MetricMetadata) error {
h, err := d.ingesterPool.GetClientFor(ingester.Addr)
if err != nil {
return err
if len(metadata) > 0 {
ingesterAppends.WithLabelValues(ingester.Addr, typeMetadata).Inc()
if err != nil {
ingesterAppendFailures.WithLabelValues(ingester.Addr, typeMetadata).Inc()
}
}
c := h.(ingester_client.IngesterClient)

req := client.WriteRequest{
Metadata: metadata,
if len(timeseries) > 0 {
ingesterAppends.WithLabelValues(ingester.Addr, typeSamples).Inc()
if err != nil {
ingesterAppendFailures.WithLabelValues(ingester.Addr, typeSamples).Inc()
}
}
_, err = c.Push(ctx, &req)

ingesterAppends.WithLabelValues(ingester.Addr, typeMetadata).Inc()
if err != nil {
ingesterAppendFailures.WithLabelValues(ingester.Addr, typeMetadata).Inc()
}
return err
}

Expand Down
97 changes: 70 additions & 27 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,20 @@ var (
)

func TestDistributor_Push(t *testing.T) {
metricNames := []string{
"cortex_distributor_latest_seen_sample_timestamp_seconds",
}
// Metrics to assert on.
lastSeenTimestamp := "cortex_distributor_latest_seen_sample_timestamp_seconds"
distributorAppend := "cortex_distributor_ingester_appends_total"
distributorAppendFailure := "cortex_distributor_ingester_append_failures_total"

type samplesIn struct {
num int
startTimestampMs int64
}
for name, tc := range map[string]struct {
metricNames []string
numIngesters int
happyIngesters int
samples int
startTimestampMs int64
samples samplesIn
metadata int
expectedResponse *client.WriteResponse
expectedError error
Expand All @@ -69,10 +74,10 @@ func TestDistributor_Push(t *testing.T) {
"A push to 3 happy ingesters should succeed": {
numIngesters: 3,
happyIngesters: 3,
samples: 5,
samples: samplesIn{num: 5, startTimestampMs: 123456789000},
metadata: 5,
expectedResponse: success,
startTimestampMs: 123456789000,
metricNames: []string{lastSeenTimestamp},
expectedMetrics: `
# HELP cortex_distributor_latest_seen_sample_timestamp_seconds Unix timestamp of latest received sample per user.
# TYPE cortex_distributor_latest_seen_sample_timestamp_seconds gauge
Expand All @@ -82,57 +87,95 @@ func TestDistributor_Push(t *testing.T) {
"A push to 2 happy ingesters should succeed": {
numIngesters: 3,
happyIngesters: 2,
samples: 5,
samples: samplesIn{num: 5, startTimestampMs: 123456789000},
metadata: 5,
expectedResponse: success,
startTimestampMs: 123456789000,
metricNames: []string{lastSeenTimestamp},
expectedMetrics: `
# HELP cortex_distributor_latest_seen_sample_timestamp_seconds Unix timestamp of latest received sample per user.
# TYPE cortex_distributor_latest_seen_sample_timestamp_seconds gauge
cortex_distributor_latest_seen_sample_timestamp_seconds{user="user"} 123456789.004
`,
},
"A push to 1 happy ingesters should fail": {
numIngesters: 3,
happyIngesters: 1,
samples: 10,
expectedError: errFail,
startTimestampMs: 123456789000,
numIngesters: 3,
happyIngesters: 1,
samples: samplesIn{num: 10, startTimestampMs: 123456789000},
expectedError: errFail,
metricNames: []string{lastSeenTimestamp},
expectedMetrics: `
# HELP cortex_distributor_latest_seen_sample_timestamp_seconds Unix timestamp of latest received sample per user.
# TYPE cortex_distributor_latest_seen_sample_timestamp_seconds gauge
cortex_distributor_latest_seen_sample_timestamp_seconds{user="user"} 123456789.009
`,
},
"A push to 0 happy ingesters should fail": {
numIngesters: 3,
happyIngesters: 0,
samples: 10,
expectedError: errFail,
startTimestampMs: 123456789000,
numIngesters: 3,
happyIngesters: 0,
samples: samplesIn{num: 10, startTimestampMs: 123456789000},
expectedError: errFail,
metricNames: []string{lastSeenTimestamp},
expectedMetrics: `
# HELP cortex_distributor_latest_seen_sample_timestamp_seconds Unix timestamp of latest received sample per user.
# TYPE cortex_distributor_latest_seen_sample_timestamp_seconds gauge
cortex_distributor_latest_seen_sample_timestamp_seconds{user="user"} 123456789.009
`,
},
"A push exceeding burst size should fail": {
numIngesters: 3,
happyIngesters: 3,
samples: 25,
metadata: 5,
expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (20) exceeded while adding 25 samples and 5 metadata"),
startTimestampMs: 123456789000,
numIngesters: 3,
happyIngesters: 3,
samples: samplesIn{num: 25, startTimestampMs: 123456789000},
metadata: 5,
expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (20) exceeded while adding 25 samples and 5 metadata"),
metricNames: []string{lastSeenTimestamp},
expectedMetrics: `
# HELP cortex_distributor_latest_seen_sample_timestamp_seconds Unix timestamp of latest received sample per user.
# TYPE cortex_distributor_latest_seen_sample_timestamp_seconds gauge
cortex_distributor_latest_seen_sample_timestamp_seconds{user="user"} 123456789.024
`,
},
"A push to ingesters should report the correct metrics with no metadata": {
numIngesters: 3,
happyIngesters: 2,
samples: samplesIn{num: 1, startTimestampMs: 123456789000},
metadata: 0,
metricNames: []string{distributorAppend, distributorAppendFailure},
expectedResponse: success,
expectedMetrics: `
# HELP cortex_distributor_ingester_append_failures_total The total number of failed batch appends sent to ingesters.
# TYPE cortex_distributor_ingester_append_failures_total counter
cortex_distributor_ingester_append_failures_total{ingester="2",type="samples"} 1
# HELP cortex_distributor_ingester_appends_total The total number of batch appends sent to ingesters.
# TYPE cortex_distributor_ingester_appends_total counter
cortex_distributor_ingester_appends_total{ingester="0",type="samples"} 1
cortex_distributor_ingester_appends_total{ingester="1",type="samples"} 1
cortex_distributor_ingester_appends_total{ingester="2",type="samples"} 1
`,
},
"A push to ingesters should report the correct metrics with no samples": {
numIngesters: 3,
happyIngesters: 2,
samples: samplesIn{num: 0, startTimestampMs: 123456789000},
metadata: 1,
metricNames: []string{distributorAppend, distributorAppendFailure},
expectedResponse: success,
expectedMetrics: `
# HELP cortex_distributor_ingester_append_failures_total The total number of failed batch appends sent to ingesters.
# TYPE cortex_distributor_ingester_append_failures_total counter
cortex_distributor_ingester_append_failures_total{ingester="2",type="metadata"} 1
# HELP cortex_distributor_ingester_appends_total The total number of batch appends sent to ingesters.
# TYPE cortex_distributor_ingester_appends_total counter
cortex_distributor_ingester_appends_total{ingester="0",type="metadata"} 1
cortex_distributor_ingester_appends_total{ingester="1",type="metadata"} 1
cortex_distributor_ingester_appends_total{ingester="2",type="metadata"} 1
`,
},
} {
for _, shardByAllLabels := range []bool{true, false} {
t.Run(fmt.Sprintf("[%s](shardByAllLabels=%v)", name, shardByAllLabels), func(t *testing.T) {
latestSeenSampleTimestampPerUser.Reset()
ingesterAppends.Reset()
ingesterAppendFailures.Reset()

limits := &validation.Limits{}
flagext.DefaultValues(limits)
Expand All @@ -142,14 +185,14 @@ func TestDistributor_Push(t *testing.T) {
d, _ := prepare(t, tc.numIngesters, tc.happyIngesters, 0, shardByAllLabels, limits, nil)
defer services.StopAndAwaitTerminated(context.Background(), d) //nolint:errcheck

request := makeWriteRequest(tc.startTimestampMs, tc.samples, tc.metadata)
request := makeWriteRequest(tc.samples.startTimestampMs, tc.samples.num, tc.metadata)
response, err := d.Push(ctx, request)
assert.Equal(t, tc.expectedResponse, response)
assert.Equal(t, tc.expectedError, err)

// Check tracked Prometheus metrics.
if tc.expectedMetrics != "" {
err = testutil.GatherAndCompare(prometheus.DefaultGatherer, strings.NewReader(tc.expectedMetrics), metricNames...)
err = testutil.GatherAndCompare(prometheus.DefaultGatherer, strings.NewReader(tc.expectedMetrics), tc.metricNames...)
assert.NoError(t, err)
}
})
Expand Down
2 changes: 0 additions & 2 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,10 +373,8 @@ func (i *Ingester) Push(ctx context.Context, req *client.WriteRequest) (*client.
}

if len(req.Metadata) > 0 {
// Given requests can only contain either metadata or samples, no-op if there is metadata for now.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically, this is "no longer true" from an implementation perspective. The remote write protocol only sends metadata or samples (for now) but IMO that comment/responsibility belongs in the distributor. The ingester should operate under no assumption that you'd receive one or the other if we can (e.g. no increase of complexity).

Obviously, this is my own personal opinion with what I know about the system, but others feel free to suggest any alternatives.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we're going to handle samples and metadata in the same push, then I would suggest that we return errors for them separately. As it is now, errors reported by ingesters will be counted for both samples and metadata (provided both were present in the request).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry Peter, I'm not following your comment. With this change, we're no longer making assumptions that a WriteRequest can come with one or the other.

The error handling in send is purely for metrics reporting. In the ingesters, we simply log when we receive metadata, so erroring out on that is not an option at the moment.

The comment I originally left in the distributors is an operational concern, but for the code itself, it wouldn't make a difference - that is indented.

logger := util.WithContext(ctx, util.Logger)
level.Debug(logger).Log("msg", "metadata received in the ingester", "count", len(req.Metadata))
return &client.WriteResponse{}, nil
}

for _, ts := range req.Timeseries {
Expand Down