Skip to content
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Changelog

## master / unreleased
* [CHANGE] The metrics `cortex_distributor_ingester_appends_total` and `distributor_ingester_append_failures_total` now includes a `type` label to differentiate between `samples` and `metadata`.

* [CHANGE] Experimental TSDB: renamed blocks meta fetcher metrics: #2375
* `cortex_querier_bucket_store_blocks_meta_syncs_total` > `cortex_querier_blocks_meta_syncs_total`
Expand Down
9 changes: 9 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -2022,6 +2022,11 @@ The `limits_config` configures default and per-tenant limits imposed by Cortex s
# CLI flag: -validation.max-label-names-per-series
[max_label_names_per_series: <int> | default = 30]

# Maximum length accepted for metric metadata. Metadata refers to Metric Name,
# HELP and UNIT.
# CLI flag: -validation.max-metadata-length
[max_metadata_length: <int> | default = 1024]

# Reject old samples.
# CLI flag: -validation.reject-old-samples
[reject_old_samples: <boolean> | default = false]
Expand All @@ -2035,6 +2040,10 @@ The `limits_config` configures default and per-tenant limits imposed by Cortex s
# CLI flag: -validation.create-grace-period
[creation_grace_period: <duration> | default = 10m0s]

# Enforce every metadata has a metric name.
# CLI flag: -validation.enforce-metadata-metric-name
[enforce_metadata_metric_name: <boolean> | default = true]

# Enforce every sample has a metric name.
# CLI flag: -validation.enforce-metric-name
[enforce_metric_name: <boolean> | default = true]
Expand Down
162 changes: 133 additions & 29 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"strings"
"time"

"github.com/go-kit/kit/log/level"
opentracing "github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -42,11 +43,21 @@ var (
Name: "distributor_received_samples_total",
Help: "The total number of received samples, excluding rejected and deduped samples.",
}, []string{"user"})
receivedMetadata = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "cortex",
Name: "distributor_received_metadata_total",
Help: "The total number of received metadata, excluding rejected.",
}, []string{"user"})
incomingSamples = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "cortex",
Name: "distributor_samples_in_total",
Help: "The total number of samples that have come in to the distributor, including rejected or deduped samples.",
}, []string{"user"})
incomingMetadata = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "cortex",
Name: "distributor_metadata_in_total",
Help: "The total number of metadata the have come in to the distributor, including rejected.",
}, []string{"user"})
nonHASamples = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "cortex",
Name: "distributor_non_ha_samples_received_total",
Expand All @@ -67,12 +78,12 @@ var (
Namespace: "cortex",
Name: "distributor_ingester_appends_total",
Help: "The total number of batch appends sent to ingesters.",
}, []string{"ingester"})
}, []string{"ingester", "type"})
ingesterAppendFailures = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "cortex",
Name: "distributor_ingester_append_failures_total",
Help: "The total number of failed batch appends sent to ingesters.",
}, []string{"ingester"})
}, []string{"ingester", "type"})
ingesterQueries = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "cortex",
Name: "distributor_ingester_queries_total",
Expand All @@ -95,6 +106,11 @@ var (
emptyPreallocSeries = ingester_client.PreallocTimeseries{}
)

const (
typeSamples = "samples"
typeMetadata = "metadata"
)

// Distributor is a storage.SampleAppender and a client.Querier which
// forwards appends and queries to individual ingesters.
type Distributor struct {
Expand Down Expand Up @@ -250,17 +266,29 @@ func (d *Distributor) tokenForLabels(userID string, labels []client.LabelAdapter
return shardByMetricName(userID, metricName), nil
}

func (d *Distributor) tokenForMetadata(userID string, metricName string) uint32 {
if d.cfg.ShardByAllLabels {
return shardByMetricName(userID, metricName)
}

return shardByUser(userID)
}

func shardByMetricName(userID string, metricName string) uint32 {
h := shardByUser(userID)
h = client.HashAdd32(h, metricName)
return h
}

func shardByUser(userID string) uint32 {
h := client.HashNew32()
h = client.HashAdd32(h, userID)
h = client.HashAdd32(h, metricName)
return h
}

// This function generates different values for different order of same labels.
func shardByAllLabels(userID string, labels []client.LabelAdapter) uint32 {
h := client.HashNew32()
h = client.HashAdd32(h, userID)
h := shardByUser(userID)
for _, label := range labels {
h = client.HashAdd32(h, label.Name)
h = client.HashAdd32(h, label.Value)
Expand Down Expand Up @@ -326,6 +354,10 @@ func (d *Distributor) validateSeries(ts ingester_client.PreallocTimeseries, user
nil
}

func (d *Distributor) validateMetadata(m *ingester_client.MetricMetadata, userID string) error {
return validation.ValidateMetadata(d.limits, userID, m)
}

Comment on lines +357 to +360
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure this method brings anything at the moment. Should we get rid of it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As you're going to work on this, let's move it to the next PR.

// Push implements client.IngesterServer
func (d *Distributor) Push(ctx context.Context, req *client.WriteRequest) (*client.WriteResponse, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method has changed significantly, and mixes handling of samples and metadata. Would it make sense to split Push into two, to handle these two cases separately? (Push calling pushMetadata and pushSamples or something similar).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I plan to add another change that merges the two. Since ingesters receive a WriteRequest, they can just go together. I just wanted to avoid a big refactor upfront.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, ok. Push is quite big at the moment and could use some trimming.

userID, err := user.ExtractOrgID(ctx)
Expand All @@ -342,6 +374,17 @@ func (d *Distributor) Push(ctx context.Context, req *client.WriteRequest) (*clie
}
// Count the total samples in, prior to validation or deduplication, for comparison with other metrics.
incomingSamples.WithLabelValues(userID).Add(float64(numSamples))
// Count the total number of metadata in.
incomingMetadata.WithLabelValues(userID).Add(float64(len(req.Metadata)))

// A WriteRequest can only contain series or metadata but not both. This might change in the future.
// For each timeseries or samples, we compute a hash to distribute across ingesters;
// check each sample/metadata and discard if outside limits.
validatedTimeseries := make([]client.PreallocTimeseries, 0, len(req.Timeseries))
validatedMetadata := make([]*client.MetricMetadata, 0, len(req.Metadata))
metadataKeys := make([]uint32, 0, len(req.Metadata))
seriesKeys := make([]uint32, 0, len(req.Timeseries))
validatedSamples := 0

if d.limits.AcceptHASamples(userID) && len(req.Timeseries) > 0 {
cluster, replica := findHALabels(d.limits.HAReplicaLabel(userID), d.limits.HAClusterLabel(userID), req.Timeseries[0].Labels)
Expand Down Expand Up @@ -373,9 +416,6 @@ func (d *Distributor) Push(ctx context.Context, req *client.WriteRequest) (*clie

// For each timeseries, compute a hash to distribute across ingesters;
// check each sample and discard if outside limits.
validatedTimeseries := make([]client.PreallocTimeseries, 0, len(req.Timeseries))
keys := make([]uint32, 0, len(req.Timeseries))
validatedSamples := 0
for _, ts := range req.Timeseries {
// Use timestamp of latest sample in the series. If samples for series are not ordered, metric for user may be wrong.
if len(ts.Samples) > 0 {
Expand Down Expand Up @@ -424,28 +464,47 @@ func (d *Distributor) Push(ctx context.Context, req *client.WriteRequest) (*clie
continue
}

keys = append(keys, key)
seriesKeys = append(seriesKeys, key)
validatedTimeseries = append(validatedTimeseries, validatedSeries)
validatedSamples += len(ts.Samples)
}

for _, m := range req.Metadata {
err := d.validateMetadata(m, userID)

if err != nil {
if firstPartialErr == nil {
firstPartialErr = err
}

continue
}

metadataKeys = append(metadataKeys, d.tokenForMetadata(userID, m.MetricName))
validatedMetadata = append(validatedMetadata, m)
}

receivedSamples.WithLabelValues(userID).Add(float64(validatedSamples))
receivedMetadata.WithLabelValues(userID).Add(float64(len(validatedMetadata)))

if len(keys) == 0 {
// Ensure the request slice is reused if there's no series passing the validation.
if len(seriesKeys) == 0 && len(metadataKeys) == 0 {
// Ensure the request slice is reused if there's no series or metadata passing the validation.
client.ReuseSlice(req.Timeseries)

return &client.WriteResponse{}, firstPartialErr
}

now := time.Now()
if !d.ingestionRateLimiter.AllowN(now, userID, validatedSamples) {
totalN := validatedSamples + len(validatedMetadata)
if !d.ingestionRateLimiter.AllowN(now, userID, totalN) {
// Ensure the request slice is reused if the request is rate limited.
client.ReuseSlice(req.Timeseries)

// Return a 4xx here to have the client discard the data and not retry. If a client
// is sending too much data consistently we will unlikely ever catch up otherwise.
validation.DiscardedSamples.WithLabelValues(validation.RateLimited, userID).Add(float64(validatedSamples))
return nil, httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (%v) exceeded while adding %d samples", d.ingestionRateLimiter.Limit(now, userID), numSamples)
validation.DiscardedMetadata.WithLabelValues(validation.RateLimited, userID).Add(float64(len(validatedMetadata)))
return nil, httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (%v) exceeded while adding %d samples and %d metadata", d.ingestionRateLimiter.Limit(now, userID), validatedSamples, len(validatedMetadata))
}

var subRing ring.ReadRing
Expand All @@ -460,23 +519,49 @@ func (d *Distributor) Push(ctx context.Context, req *client.WriteRequest) (*clie
}
}

err = ring.DoBatch(ctx, subRing, keys, func(ingester ring.IngesterDesc, indexes []int) error {
timeseries := make([]client.PreallocTimeseries, 0, len(indexes))
for _, i := range indexes {
timeseries = append(timeseries, validatedTimeseries[i])
if len(metadataKeys) > 0 {
err = ring.DoBatch(ctx, subRing, metadataKeys, func(ingester ring.IngesterDesc, indexes []int) error {
metadata := make([]*client.MetricMetadata, 0, len(indexes))
for _, i := range indexes {
metadata = append(metadata, validatedMetadata[i])
}

localCtx, cancel := context.WithTimeout(context.Background(), d.cfg.RemoteTimeout)
defer cancel()
localCtx = user.InjectUserID(localCtx, userID)

if sp := opentracing.SpanFromContext(ctx); sp != nil {
localCtx = opentracing.ContextWithSpan(localCtx, sp)
}

return d.sendMetadata(localCtx, ingester, metadata)
}, func() {})
if err != nil {
// Metadata is a best-effort approach so we consider failures non-fatal, log them, and move on.
logger := util.WithContext(ctx, util.Logger)
level.Error(logger).Log("msg", "Failed to send metadata to ingesters", "err", err)
}
}

if len(seriesKeys) > 0 {
err = ring.DoBatch(ctx, subRing, seriesKeys, func(ingester ring.IngesterDesc, indexes []int) error {
timeseries := make([]client.PreallocTimeseries, 0, len(indexes))
for _, i := range indexes {
timeseries = append(timeseries, validatedTimeseries[i])
}

// Use a background context to make sure all ingesters get samples even if we return early
localCtx, cancel := context.WithTimeout(context.Background(), d.cfg.RemoteTimeout)
defer cancel()
localCtx = user.InjectOrgID(localCtx, userID)
if sp := opentracing.SpanFromContext(ctx); sp != nil {
localCtx = opentracing.ContextWithSpan(localCtx, sp)
// Use a background context to make sure all ingesters get samples even if we return early
localCtx, cancel := context.WithTimeout(context.Background(), d.cfg.RemoteTimeout)
defer cancel()
localCtx = user.InjectOrgID(localCtx, userID)
if sp := opentracing.SpanFromContext(ctx); sp != nil {
localCtx = opentracing.ContextWithSpan(localCtx, sp)
}
return d.sendSamples(localCtx, ingester, timeseries)
}, func() { client.ReuseSlice(req.Timeseries) })
if err != nil {
return nil, err
}
return d.sendSamples(localCtx, ingester, timeseries)
}, func() { client.ReuseSlice(req.Timeseries) })
if err != nil {
return nil, err
}
return &client.WriteResponse{}, firstPartialErr
}
Expand Down Expand Up @@ -515,9 +600,28 @@ func (d *Distributor) sendSamples(ctx context.Context, ingester ring.IngesterDes
}
_, err = c.Push(ctx, &req)

ingesterAppends.WithLabelValues(ingester.Addr).Inc()
ingesterAppends.WithLabelValues(ingester.Addr, typeSamples).Inc()
if err != nil {
ingesterAppendFailures.WithLabelValues(ingester.Addr, typeSamples).Inc()
}
return err
}

func (d *Distributor) sendMetadata(ctx context.Context, ingester ring.IngesterDesc, metadata []*client.MetricMetadata) error {
h, err := d.ingesterPool.GetClientFor(ingester.Addr)
if err != nil {
return err
}
c := h.(ingester_client.IngesterClient)

req := client.WriteRequest{
Metadata: metadata,
}
_, err = c.Push(ctx, &req)

ingesterAppends.WithLabelValues(ingester.Addr, typeMetadata).Inc()
if err != nil {
ingesterAppendFailures.WithLabelValues(ingester.Addr).Inc()
ingesterAppendFailures.WithLabelValues(ingester.Addr, typeMetadata).Inc()
}
return err
}
Expand Down
Loading