diff --git a/CHANGELOG.md b/CHANGELOG.md index c326dd20162..4bc61a02d6a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,7 @@ ## master / unreleased +* [CHANGE] The metrics `cortex_distributor_ingester_appends_total` and `distributor_ingester_append_failures_total` now includes a `type` label to differentiate between `samples` and `metadata`. #2336 * [CHANGE] Experimental TSDB: renamed blocks meta fetcher metrics: #2375 * `cortex_querier_bucket_store_blocks_meta_syncs_total` > `cortex_querier_blocks_meta_syncs_total` * `cortex_querier_bucket_store_blocks_meta_sync_failures_total` > `cortex_querier_blocks_meta_sync_failures_total` diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index 72dd13d75b8..0c68fb87978 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -2022,6 +2022,11 @@ The `limits_config` configures default and per-tenant limits imposed by Cortex s # CLI flag: -validation.max-label-names-per-series [max_label_names_per_series: | default = 30] +# Maximum length accepted for metric metadata. Metadata refers to Metric Name, +# HELP and UNIT. +# CLI flag: -validation.max-metadata-length +[max_metadata_length: | default = 1024] + # Reject old samples. # CLI flag: -validation.reject-old-samples [reject_old_samples: | default = false] @@ -2035,6 +2040,10 @@ The `limits_config` configures default and per-tenant limits imposed by Cortex s # CLI flag: -validation.create-grace-period [creation_grace_period: | default = 10m0s] +# Enforce every metadata has a metric name. +# CLI flag: -validation.enforce-metadata-metric-name +[enforce_metadata_metric_name: | default = true] + # Enforce every sample has a metric name. # CLI flag: -validation.enforce-metric-name [enforce_metric_name: | default = true] diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 20bba2ef046..3b14a7fa64e 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -8,6 +8,7 @@ import ( "strings" "time" + "github.com/go-kit/kit/log/level" opentracing "github.com/opentracing/opentracing-go" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" @@ -42,11 +43,21 @@ var ( Name: "distributor_received_samples_total", Help: "The total number of received samples, excluding rejected and deduped samples.", }, []string{"user"}) + receivedMetadata = promauto.NewCounterVec(prometheus.CounterOpts{ + Namespace: "cortex", + Name: "distributor_received_metadata_total", + Help: "The total number of received metadata, excluding rejected.", + }, []string{"user"}) incomingSamples = promauto.NewCounterVec(prometheus.CounterOpts{ Namespace: "cortex", Name: "distributor_samples_in_total", Help: "The total number of samples that have come in to the distributor, including rejected or deduped samples.", }, []string{"user"}) + incomingMetadata = promauto.NewCounterVec(prometheus.CounterOpts{ + Namespace: "cortex", + Name: "distributor_metadata_in_total", + Help: "The total number of metadata the have come in to the distributor, including rejected.", + }, []string{"user"}) nonHASamples = promauto.NewCounterVec(prometheus.CounterOpts{ Namespace: "cortex", Name: "distributor_non_ha_samples_received_total", @@ -67,12 +78,12 @@ var ( Namespace: "cortex", Name: "distributor_ingester_appends_total", Help: "The total number of batch appends sent to ingesters.", - }, []string{"ingester"}) + }, []string{"ingester", "type"}) ingesterAppendFailures = promauto.NewCounterVec(prometheus.CounterOpts{ Namespace: "cortex", Name: "distributor_ingester_append_failures_total", Help: "The total number of failed batch appends sent to ingesters.", - }, []string{"ingester"}) + }, []string{"ingester", "type"}) ingesterQueries = promauto.NewCounterVec(prometheus.CounterOpts{ Namespace: "cortex", Name: "distributor_ingester_queries_total", @@ -95,6 +106,11 @@ var ( emptyPreallocSeries = ingester_client.PreallocTimeseries{} ) +const ( + typeSamples = "samples" + typeMetadata = "metadata" +) + // Distributor is a storage.SampleAppender and a client.Querier which // forwards appends and queries to individual ingesters. type Distributor struct { @@ -250,17 +266,29 @@ func (d *Distributor) tokenForLabels(userID string, labels []client.LabelAdapter return shardByMetricName(userID, metricName), nil } +func (d *Distributor) tokenForMetadata(userID string, metricName string) uint32 { + if d.cfg.ShardByAllLabels { + return shardByMetricName(userID, metricName) + } + + return shardByUser(userID) +} + func shardByMetricName(userID string, metricName string) uint32 { + h := shardByUser(userID) + h = client.HashAdd32(h, metricName) + return h +} + +func shardByUser(userID string) uint32 { h := client.HashNew32() h = client.HashAdd32(h, userID) - h = client.HashAdd32(h, metricName) return h } // This function generates different values for different order of same labels. func shardByAllLabels(userID string, labels []client.LabelAdapter) uint32 { - h := client.HashNew32() - h = client.HashAdd32(h, userID) + h := shardByUser(userID) for _, label := range labels { h = client.HashAdd32(h, label.Name) h = client.HashAdd32(h, label.Value) @@ -326,6 +354,10 @@ func (d *Distributor) validateSeries(ts ingester_client.PreallocTimeseries, user nil } +func (d *Distributor) validateMetadata(m *ingester_client.MetricMetadata, userID string) error { + return validation.ValidateMetadata(d.limits, userID, m) +} + // Push implements client.IngesterServer func (d *Distributor) Push(ctx context.Context, req *client.WriteRequest) (*client.WriteResponse, error) { userID, err := user.ExtractOrgID(ctx) @@ -342,6 +374,17 @@ func (d *Distributor) Push(ctx context.Context, req *client.WriteRequest) (*clie } // Count the total samples in, prior to validation or deduplication, for comparison with other metrics. incomingSamples.WithLabelValues(userID).Add(float64(numSamples)) + // Count the total number of metadata in. + incomingMetadata.WithLabelValues(userID).Add(float64(len(req.Metadata))) + + // A WriteRequest can only contain series or metadata but not both. This might change in the future. + // For each timeseries or samples, we compute a hash to distribute across ingesters; + // check each sample/metadata and discard if outside limits. + validatedTimeseries := make([]client.PreallocTimeseries, 0, len(req.Timeseries)) + validatedMetadata := make([]*client.MetricMetadata, 0, len(req.Metadata)) + metadataKeys := make([]uint32, 0, len(req.Metadata)) + seriesKeys := make([]uint32, 0, len(req.Timeseries)) + validatedSamples := 0 if d.limits.AcceptHASamples(userID) && len(req.Timeseries) > 0 { cluster, replica := findHALabels(d.limits.HAReplicaLabel(userID), d.limits.HAClusterLabel(userID), req.Timeseries[0].Labels) @@ -373,9 +416,6 @@ func (d *Distributor) Push(ctx context.Context, req *client.WriteRequest) (*clie // For each timeseries, compute a hash to distribute across ingesters; // check each sample and discard if outside limits. - validatedTimeseries := make([]client.PreallocTimeseries, 0, len(req.Timeseries)) - keys := make([]uint32, 0, len(req.Timeseries)) - validatedSamples := 0 for _, ts := range req.Timeseries { // Use timestamp of latest sample in the series. If samples for series are not ordered, metric for user may be wrong. if len(ts.Samples) > 0 { @@ -424,28 +464,47 @@ func (d *Distributor) Push(ctx context.Context, req *client.WriteRequest) (*clie continue } - keys = append(keys, key) + seriesKeys = append(seriesKeys, key) validatedTimeseries = append(validatedTimeseries, validatedSeries) validatedSamples += len(ts.Samples) } + + for _, m := range req.Metadata { + err := d.validateMetadata(m, userID) + + if err != nil { + if firstPartialErr == nil { + firstPartialErr = err + } + + continue + } + + metadataKeys = append(metadataKeys, d.tokenForMetadata(userID, m.MetricName)) + validatedMetadata = append(validatedMetadata, m) + } + receivedSamples.WithLabelValues(userID).Add(float64(validatedSamples)) + receivedMetadata.WithLabelValues(userID).Add(float64(len(validatedMetadata))) - if len(keys) == 0 { - // Ensure the request slice is reused if there's no series passing the validation. + if len(seriesKeys) == 0 && len(metadataKeys) == 0 { + // Ensure the request slice is reused if there's no series or metadata passing the validation. client.ReuseSlice(req.Timeseries) return &client.WriteResponse{}, firstPartialErr } now := time.Now() - if !d.ingestionRateLimiter.AllowN(now, userID, validatedSamples) { + totalN := validatedSamples + len(validatedMetadata) + if !d.ingestionRateLimiter.AllowN(now, userID, totalN) { // Ensure the request slice is reused if the request is rate limited. client.ReuseSlice(req.Timeseries) // Return a 4xx here to have the client discard the data and not retry. If a client // is sending too much data consistently we will unlikely ever catch up otherwise. validation.DiscardedSamples.WithLabelValues(validation.RateLimited, userID).Add(float64(validatedSamples)) - return nil, httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (%v) exceeded while adding %d samples", d.ingestionRateLimiter.Limit(now, userID), numSamples) + validation.DiscardedMetadata.WithLabelValues(validation.RateLimited, userID).Add(float64(len(validatedMetadata))) + return nil, httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (%v) exceeded while adding %d samples and %d metadata", d.ingestionRateLimiter.Limit(now, userID), validatedSamples, len(validatedMetadata)) } var subRing ring.ReadRing @@ -460,23 +519,49 @@ func (d *Distributor) Push(ctx context.Context, req *client.WriteRequest) (*clie } } - err = ring.DoBatch(ctx, subRing, keys, func(ingester ring.IngesterDesc, indexes []int) error { - timeseries := make([]client.PreallocTimeseries, 0, len(indexes)) - for _, i := range indexes { - timeseries = append(timeseries, validatedTimeseries[i]) + if len(metadataKeys) > 0 { + err = ring.DoBatch(ctx, subRing, metadataKeys, func(ingester ring.IngesterDesc, indexes []int) error { + metadata := make([]*client.MetricMetadata, 0, len(indexes)) + for _, i := range indexes { + metadata = append(metadata, validatedMetadata[i]) + } + + localCtx, cancel := context.WithTimeout(context.Background(), d.cfg.RemoteTimeout) + defer cancel() + localCtx = user.InjectUserID(localCtx, userID) + + if sp := opentracing.SpanFromContext(ctx); sp != nil { + localCtx = opentracing.ContextWithSpan(localCtx, sp) + } + + return d.sendMetadata(localCtx, ingester, metadata) + }, func() {}) + if err != nil { + // Metadata is a best-effort approach so we consider failures non-fatal, log them, and move on. + logger := util.WithContext(ctx, util.Logger) + level.Error(logger).Log("msg", "Failed to send metadata to ingesters", "err", err) } + } + + if len(seriesKeys) > 0 { + err = ring.DoBatch(ctx, subRing, seriesKeys, func(ingester ring.IngesterDesc, indexes []int) error { + timeseries := make([]client.PreallocTimeseries, 0, len(indexes)) + for _, i := range indexes { + timeseries = append(timeseries, validatedTimeseries[i]) + } - // Use a background context to make sure all ingesters get samples even if we return early - localCtx, cancel := context.WithTimeout(context.Background(), d.cfg.RemoteTimeout) - defer cancel() - localCtx = user.InjectOrgID(localCtx, userID) - if sp := opentracing.SpanFromContext(ctx); sp != nil { - localCtx = opentracing.ContextWithSpan(localCtx, sp) + // Use a background context to make sure all ingesters get samples even if we return early + localCtx, cancel := context.WithTimeout(context.Background(), d.cfg.RemoteTimeout) + defer cancel() + localCtx = user.InjectOrgID(localCtx, userID) + if sp := opentracing.SpanFromContext(ctx); sp != nil { + localCtx = opentracing.ContextWithSpan(localCtx, sp) + } + return d.sendSamples(localCtx, ingester, timeseries) + }, func() { client.ReuseSlice(req.Timeseries) }) + if err != nil { + return nil, err } - return d.sendSamples(localCtx, ingester, timeseries) - }, func() { client.ReuseSlice(req.Timeseries) }) - if err != nil { - return nil, err } return &client.WriteResponse{}, firstPartialErr } @@ -515,9 +600,28 @@ func (d *Distributor) sendSamples(ctx context.Context, ingester ring.IngesterDes } _, err = c.Push(ctx, &req) - ingesterAppends.WithLabelValues(ingester.Addr).Inc() + ingesterAppends.WithLabelValues(ingester.Addr, typeSamples).Inc() + if err != nil { + ingesterAppendFailures.WithLabelValues(ingester.Addr, typeSamples).Inc() + } + return err +} + +func (d *Distributor) sendMetadata(ctx context.Context, ingester ring.IngesterDesc, metadata []*client.MetricMetadata) error { + h, err := d.ingesterPool.GetClientFor(ingester.Addr) + if err != nil { + return err + } + c := h.(ingester_client.IngesterClient) + + req := client.WriteRequest{ + Metadata: metadata, + } + _, err = c.Push(ctx, &req) + + ingesterAppends.WithLabelValues(ingester.Addr, typeMetadata).Inc() if err != nil { - ingesterAppendFailures.WithLabelValues(ingester.Addr).Inc() + ingesterAppendFailures.WithLabelValues(ingester.Addr, typeMetadata).Inc() } return err } diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index d969b70f526..b76ffcd1de5 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -56,6 +56,7 @@ func TestDistributor_Push(t *testing.T) { happyIngesters int samples int startTimestampMs int64 + metadata int expectedResponse *client.WriteResponse expectedError error expectedMetrics string @@ -68,25 +69,27 @@ func TestDistributor_Push(t *testing.T) { "A push to 3 happy ingesters should succeed": { numIngesters: 3, happyIngesters: 3, - samples: 10, + samples: 5, + metadata: 5, expectedResponse: success, startTimestampMs: 123456789000, expectedMetrics: ` # HELP cortex_distributor_latest_seen_sample_timestamp_seconds Unix timestamp of latest received sample per user. # TYPE cortex_distributor_latest_seen_sample_timestamp_seconds gauge - cortex_distributor_latest_seen_sample_timestamp_seconds{user="user"} 123456789.009 + cortex_distributor_latest_seen_sample_timestamp_seconds{user="user"} 123456789.004 `, }, "A push to 2 happy ingesters should succeed": { numIngesters: 3, happyIngesters: 2, - samples: 10, + samples: 5, + metadata: 5, expectedResponse: success, startTimestampMs: 123456789000, expectedMetrics: ` # HELP cortex_distributor_latest_seen_sample_timestamp_seconds Unix timestamp of latest received sample per user. # TYPE cortex_distributor_latest_seen_sample_timestamp_seconds gauge - cortex_distributor_latest_seen_sample_timestamp_seconds{user="user"} 123456789.009 + cortex_distributor_latest_seen_sample_timestamp_seconds{user="user"} 123456789.004 `, }, "A push to 1 happy ingesters should fail": { @@ -116,13 +119,14 @@ func TestDistributor_Push(t *testing.T) { "A push exceeding burst size should fail": { numIngesters: 3, happyIngesters: 3, - samples: 30, - expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (20) exceeded while adding 30 samples"), + samples: 25, + metadata: 5, + expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (20) exceeded while adding 25 samples and 5 metadata"), startTimestampMs: 123456789000, expectedMetrics: ` # HELP cortex_distributor_latest_seen_sample_timestamp_seconds Unix timestamp of latest received sample per user. # TYPE cortex_distributor_latest_seen_sample_timestamp_seconds gauge - cortex_distributor_latest_seen_sample_timestamp_seconds{user="user"} 123456789.029 + cortex_distributor_latest_seen_sample_timestamp_seconds{user="user"} 123456789.024 `, }, } { @@ -138,7 +142,7 @@ func TestDistributor_Push(t *testing.T) { d, _ := prepare(t, tc.numIngesters, tc.happyIngesters, 0, shardByAllLabels, limits, nil) defer services.StopAndAwaitTerminated(context.Background(), d) //nolint:errcheck - request := makeWriteRequest(tc.startTimestampMs, tc.samples) + request := makeWriteRequest(tc.startTimestampMs, tc.samples, tc.metadata) response, err := d.Push(ctx, request) assert.Equal(t, tc.expectedResponse, response) assert.Equal(t, tc.expectedError, err) @@ -156,6 +160,7 @@ func TestDistributor_Push(t *testing.T) { func TestDistributor_PushIngestionRateLimiter(t *testing.T) { type testPush struct { samples int + metadata int expectedError error } @@ -172,10 +177,12 @@ func TestDistributor_PushIngestionRateLimiter(t *testing.T) { ingestionRate: 10, ingestionBurstSize: 10, pushes: []testPush{ - {samples: 5, expectedError: nil}, - {samples: 6, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (10) exceeded while adding 6 samples")}, - {samples: 5, expectedError: nil}, - {samples: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (10) exceeded while adding 1 samples")}, + {samples: 4, expectedError: nil}, + {metadata: 1, expectedError: nil}, + {samples: 6, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (10) exceeded while adding 6 samples and 0 metadata")}, + {samples: 4, metadata: 1, expectedError: nil}, + {samples: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (10) exceeded while adding 1 samples and 0 metadata")}, + {metadata: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (10) exceeded while adding 0 samples and 1 metadata")}, }, }, "global strategy: limit should be evenly shared across distributors": { @@ -184,10 +191,12 @@ func TestDistributor_PushIngestionRateLimiter(t *testing.T) { ingestionRate: 10, ingestionBurstSize: 5, pushes: []testPush{ - {samples: 3, expectedError: nil}, - {samples: 3, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (5) exceeded while adding 3 samples")}, {samples: 2, expectedError: nil}, - {samples: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (5) exceeded while adding 1 samples")}, + {samples: 1, expectedError: nil}, + {samples: 2, metadata: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (5) exceeded while adding 2 samples and 1 metadata")}, + {samples: 2, expectedError: nil}, + {samples: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (5) exceeded while adding 1 samples and 0 metadata")}, + {metadata: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (5) exceeded while adding 0 samples and 1 metadata")}, }, }, "global strategy: burst should set to each distributor": { @@ -196,10 +205,12 @@ func TestDistributor_PushIngestionRateLimiter(t *testing.T) { ingestionRate: 10, ingestionBurstSize: 20, pushes: []testPush{ - {samples: 15, expectedError: nil}, - {samples: 6, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (5) exceeded while adding 6 samples")}, + {samples: 10, expectedError: nil}, {samples: 5, expectedError: nil}, - {samples: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (5) exceeded while adding 1 samples")}, + {samples: 5, metadata: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (5) exceeded while adding 5 samples and 1 metadata")}, + {samples: 5, expectedError: nil}, + {samples: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (5) exceeded while adding 1 samples and 0 metadata")}, + {metadata: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (5) exceeded while adding 0 samples and 1 metadata")}, }, }, } @@ -234,7 +245,7 @@ func TestDistributor_PushIngestionRateLimiter(t *testing.T) { // Push samples in multiple requests to the first distributor for _, push := range testData.pushes { - request := makeWriteRequest(0, push.samples) + request := makeWriteRequest(0, push.samples, push.metadata) response, err := distributors[0].Push(ctx, request) if push.expectedError == nil { @@ -337,6 +348,7 @@ func TestDistributor_PushQuery(t *testing.T) { numIngesters int happyIngesters int samples int + metadata int matchers []*labels.Matcher expectedResponse model.Matrix expectedError error @@ -420,7 +432,7 @@ func TestDistributor_PushQuery(t *testing.T) { d, _ := prepare(t, tc.numIngesters, tc.happyIngesters, 0, tc.shardByAllLabels, nil, nil) defer services.StopAndAwaitTerminated(context.Background(), d) //nolint:errcheck - request := makeWriteRequest(0, tc.samples) + request := makeWriteRequest(0, tc.samples, tc.metadata) writeResponse, err := d.Push(ctx, request) assert.Equal(t, &client.WriteResponse{}, writeResponse) assert.Nil(t, err) @@ -750,7 +762,7 @@ func mockWriteRequest(lbls labels.Labels, value float64, timestampMs int64) *cli }, } - return client.ToWriteRequest([]labels.Labels{lbls}, samples, client.API) + return client.ToWriteRequest([]labels.Labels{lbls}, samples, nil, client.API) } func prepare(t *testing.T, numIngesters, happyIngesters int, queryDelay time.Duration, shardByAllLabels bool, limits *validation.Limits, kvStore kv.Client) (*Distributor, []mockIngester) { @@ -817,7 +829,7 @@ func prepare(t *testing.T, numIngesters, happyIngesters int, queryDelay time.Dur return d, ingesters } -func makeWriteRequest(startTimestampMs int64, samples int) *client.WriteRequest { +func makeWriteRequest(startTimestampMs int64, samples int, metadata int) *client.WriteRequest { request := &client.WriteRequest{} for i := 0; i < samples; i++ { ts := client.PreallocTimeseries{ @@ -837,6 +849,16 @@ func makeWriteRequest(startTimestampMs int64, samples int) *client.WriteRequest } request.Timeseries = append(request.Timeseries, ts) } + + for i := 0; i < metadata; i++ { + m := &client.MetricMetadata{ + MetricName: fmt.Sprintf("metric_%d", i), + Type: client.COUNTER, + Help: fmt.Sprintf("a help for metric_%d", i), + } + request.Metadata = append(request.Metadata, m) + } + return request } @@ -1142,19 +1164,20 @@ func TestDistributorValidation(t *testing.T) { future, past := now.Add(5*time.Hour), now.Add(-25*time.Hour) for i, tc := range []struct { - labels []labels.Labels - samples []client.Sample - err error + metadata []*client.MetricMetadata + labels []labels.Labels + samples []client.Sample + err error }{ // Test validation passes. { - labels: []labels.Labels{{{Name: labels.MetricName, Value: "testmetric"}, {Name: "foo", Value: "bar"}}}, + metadata: []*client.MetricMetadata{{MetricName: "testmetric", Help: "a test metric.", Unit: "", Type: client.COUNTER}}, + labels: []labels.Labels{{{Name: labels.MetricName, Value: "testmetric"}, {Name: "foo", Value: "bar"}}}, samples: []client.Sample{{ TimestampMs: int64(now), Value: 1, }}, }, - // Test validation fails for very old samples. { labels: []labels.Labels{{{Name: labels.MetricName, Value: "testmetric"}, {Name: "foo", Value: "bar"}}}, @@ -1196,6 +1219,16 @@ func TestDistributorValidation(t *testing.T) { }, err: httpgrpc.Errorf(http.StatusBadRequest, `sample for 'testmetric{foo2="bar2", foo="bar"}' has 3 label names; limit 2`), }, + // Test metadata validation fails + { + metadata: []*client.MetricMetadata{{MetricName: "", Help: "a test metric.", Unit: "", Type: client.COUNTER}}, + labels: []labels.Labels{{{Name: labels.MetricName, Value: "testmetric"}, {Name: "foo", Value: "bar"}}}, + samples: []client.Sample{{ + TimestampMs: int64(now), + Value: 1, + }}, + err: httpgrpc.Errorf(http.StatusBadRequest, `metadata missing metric name`), + }, } { t.Run(strconv.Itoa(i), func(t *testing.T) { var limits validation.Limits @@ -1209,7 +1242,7 @@ func TestDistributorValidation(t *testing.T) { d, _ := prepare(t, 3, 3, 0, true, &limits, nil) defer services.StopAndAwaitTerminated(context.Background(), d) //nolint:errcheck - _, err := d.Push(ctx, client.ToWriteRequest(tc.labels, tc.samples, client.API)) + _, err := d.Push(ctx, client.ToWriteRequest(tc.labels, tc.samples, tc.metadata, client.API)) require.Equal(t, tc.err, err) }) } diff --git a/pkg/ingester/client/compat.go b/pkg/ingester/client/compat.go index 3e308f1ded7..bacf98117fb 100644 --- a/pkg/ingester/client/compat.go +++ b/pkg/ingester/client/compat.go @@ -18,11 +18,12 @@ import ( var json = jsoniter.ConfigCompatibleWithStandardLibrary -// ToWriteRequest converts matched slices of Labels and Samples into a WriteRequest proto. +// ToWriteRequest converts matched slices of Labels, Samples and Metadata into a WriteRequest proto. // It gets timeseries from the pool, so ReuseSlice() should be called when done. -func ToWriteRequest(lbls []labels.Labels, samples []Sample, source WriteRequest_SourceEnum) *WriteRequest { +func ToWriteRequest(lbls []labels.Labels, samples []Sample, metadata []*MetricMetadata, source WriteRequest_SourceEnum) *WriteRequest { req := &WriteRequest{ Timeseries: slicePool.Get().([]PreallocTimeseries), + Metadata: metadata, Source: source, } diff --git a/pkg/ingester/client/cortex.pb.go b/pkg/ingester/client/cortex.pb.go index d87bd4ced45..7441a4a7244 100644 --- a/pkg/ingester/client/cortex.pb.go +++ b/pkg/ingester/client/cortex.pb.go @@ -80,9 +80,49 @@ func (WriteRequest_SourceEnum) EnumDescriptor() ([]byte, []int) { return fileDescriptor_893a47d0a749d749, []int{0, 0} } +type MetricMetadata_MetricType int32 + +const ( + UNKNOWN MetricMetadata_MetricType = 0 + COUNTER MetricMetadata_MetricType = 1 + GAUGE MetricMetadata_MetricType = 2 + HISTOGRAM MetricMetadata_MetricType = 3 + GAUGEHISTOGRAM MetricMetadata_MetricType = 4 + SUMMARY MetricMetadata_MetricType = 5 + INFO MetricMetadata_MetricType = 6 + STATESET MetricMetadata_MetricType = 7 +) + +var MetricMetadata_MetricType_name = map[int32]string{ + 0: "UNKNOWN", + 1: "COUNTER", + 2: "GAUGE", + 3: "HISTOGRAM", + 4: "GAUGEHISTOGRAM", + 5: "SUMMARY", + 6: "INFO", + 7: "STATESET", +} + +var MetricMetadata_MetricType_value = map[string]int32{ + "UNKNOWN": 0, + "COUNTER": 1, + "GAUGE": 2, + "HISTOGRAM": 3, + "GAUGEHISTOGRAM": 4, + "SUMMARY": 5, + "INFO": 6, + "STATESET": 7, +} + +func (MetricMetadata_MetricType) EnumDescriptor() ([]byte, []int) { + return fileDescriptor_893a47d0a749d749, []int{24, 0} +} + type WriteRequest struct { Timeseries []PreallocTimeseries `protobuf:"bytes,1,rep,name=timeseries,proto3,customtype=PreallocTimeseries" json:"timeseries"` Source WriteRequest_SourceEnum `protobuf:"varint,2,opt,name=Source,json=source,proto3,enum=cortex.WriteRequest_SourceEnum" json:"Source,omitempty"` + Metadata []*MetricMetadata `protobuf:"bytes,3,rep,name=metadata,proto3" json:"metadata,omitempty"` } func (m *WriteRequest) Reset() { *m = WriteRequest{} } @@ -124,6 +164,13 @@ func (m *WriteRequest) GetSource() WriteRequest_SourceEnum { return API } +func (m *WriteRequest) GetMetadata() []*MetricMetadata { + if m != nil { + return m.Metadata + } + return nil +} + type WriteResponse struct { } @@ -1213,6 +1260,73 @@ func (m *LabelMatchers) GetMatchers() []*LabelMatcher { return nil } +type MetricMetadata struct { + Type MetricMetadata_MetricType `protobuf:"varint,1,opt,name=type,proto3,enum=cortex.MetricMetadata_MetricType" json:"type,omitempty"` + MetricName string `protobuf:"bytes,2,opt,name=metric_name,json=metricName,proto3" json:"metric_name,omitempty"` + Help string `protobuf:"bytes,4,opt,name=help,proto3" json:"help,omitempty"` + Unit string `protobuf:"bytes,5,opt,name=unit,proto3" json:"unit,omitempty"` +} + +func (m *MetricMetadata) Reset() { *m = MetricMetadata{} } +func (*MetricMetadata) ProtoMessage() {} +func (*MetricMetadata) Descriptor() ([]byte, []int) { + return fileDescriptor_893a47d0a749d749, []int{24} +} +func (m *MetricMetadata) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *MetricMetadata) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_MetricMetadata.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *MetricMetadata) XXX_Merge(src proto.Message) { + xxx_messageInfo_MetricMetadata.Merge(m, src) +} +func (m *MetricMetadata) XXX_Size() int { + return m.Size() +} +func (m *MetricMetadata) XXX_DiscardUnknown() { + xxx_messageInfo_MetricMetadata.DiscardUnknown(m) +} + +var xxx_messageInfo_MetricMetadata proto.InternalMessageInfo + +func (m *MetricMetadata) GetType() MetricMetadata_MetricType { + if m != nil { + return m.Type + } + return UNKNOWN +} + +func (m *MetricMetadata) GetMetricName() string { + if m != nil { + return m.MetricName + } + return "" +} + +func (m *MetricMetadata) GetHelp() string { + if m != nil { + return m.Help + } + return "" +} + +func (m *MetricMetadata) GetUnit() string { + if m != nil { + return m.Unit + } + return "" +} + type Metric struct { Labels []LabelAdapter `protobuf:"bytes,1,rep,name=labels,proto3,customtype=LabelAdapter" json:"labels"` } @@ -1220,7 +1334,7 @@ type Metric struct { func (m *Metric) Reset() { *m = Metric{} } func (*Metric) ProtoMessage() {} func (*Metric) Descriptor() ([]byte, []int) { - return fileDescriptor_893a47d0a749d749, []int{24} + return fileDescriptor_893a47d0a749d749, []int{25} } func (m *Metric) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -1258,7 +1372,7 @@ type LabelMatcher struct { func (m *LabelMatcher) Reset() { *m = LabelMatcher{} } func (*LabelMatcher) ProtoMessage() {} func (*LabelMatcher) Descriptor() ([]byte, []int) { - return fileDescriptor_893a47d0a749d749, []int{25} + return fileDescriptor_893a47d0a749d749, []int{26} } func (m *LabelMatcher) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -1318,7 +1432,7 @@ type TimeSeriesFile struct { func (m *TimeSeriesFile) Reset() { *m = TimeSeriesFile{} } func (*TimeSeriesFile) ProtoMessage() {} func (*TimeSeriesFile) Descriptor() ([]byte, []int) { - return fileDescriptor_893a47d0a749d749, []int{26} + return fileDescriptor_893a47d0a749d749, []int{27} } func (m *TimeSeriesFile) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -1381,7 +1495,7 @@ type TransferTSDBResponse struct { func (m *TransferTSDBResponse) Reset() { *m = TransferTSDBResponse{} } func (*TransferTSDBResponse) ProtoMessage() {} func (*TransferTSDBResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_893a47d0a749d749, []int{27} + return fileDescriptor_893a47d0a749d749, []int{28} } func (m *TransferTSDBResponse) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -1413,6 +1527,7 @@ var xxx_messageInfo_TransferTSDBResponse proto.InternalMessageInfo func init() { proto.RegisterEnum("cortex.MatchType", MatchType_name, MatchType_value) proto.RegisterEnum("cortex.WriteRequest_SourceEnum", WriteRequest_SourceEnum_name, WriteRequest_SourceEnum_value) + proto.RegisterEnum("cortex.MetricMetadata_MetricType", MetricMetadata_MetricType_name, MetricMetadata_MetricType_value) proto.RegisterType((*WriteRequest)(nil), "cortex.WriteRequest") proto.RegisterType((*WriteResponse)(nil), "cortex.WriteResponse") proto.RegisterType((*ReadRequest)(nil), "cortex.ReadRequest") @@ -1437,6 +1552,7 @@ func init() { proto.RegisterType((*LabelPair)(nil), "cortex.LabelPair") proto.RegisterType((*Sample)(nil), "cortex.Sample") proto.RegisterType((*LabelMatchers)(nil), "cortex.LabelMatchers") + proto.RegisterType((*MetricMetadata)(nil), "cortex.MetricMetadata") proto.RegisterType((*Metric)(nil), "cortex.Metric") proto.RegisterType((*LabelMatcher)(nil), "cortex.LabelMatcher") proto.RegisterType((*TimeSeriesFile)(nil), "cortex.TimeSeriesFile") @@ -1446,87 +1562,98 @@ func init() { func init() { proto.RegisterFile("cortex.proto", fileDescriptor_893a47d0a749d749) } var fileDescriptor_893a47d0a749d749 = []byte{ - // 1275 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xb4, 0x57, 0xcd, 0x6f, 0x1b, 0x45, - 0x14, 0xdf, 0x89, 0x3f, 0x12, 0x3f, 0x6f, 0x5c, 0x67, 0x92, 0xb6, 0xe9, 0x16, 0x36, 0x65, 0xa4, - 0x16, 0x0b, 0xa8, 0x5b, 0x52, 0x15, 0x7a, 0x00, 0x55, 0x4e, 0xeb, 0xb6, 0x46, 0x49, 0x9a, 0xae, - 0x5d, 0x40, 0x48, 0xc8, 0xda, 0xd8, 0x93, 0x64, 0xc5, 0x7e, 0xb8, 0x3b, 0xb3, 0x88, 0x1e, 0x90, - 0x90, 0xf8, 0x03, 0xe8, 0x91, 0x3f, 0x81, 0x33, 0x17, 0x38, 0x73, 0xea, 0xb1, 0xc7, 0x8a, 0x43, - 0x45, 0x9d, 0x0b, 0xc7, 0xfe, 0x09, 0x68, 0x67, 0x66, 0xd7, 0xbb, 0xae, 0x2d, 0xca, 0x47, 0x6f, - 0x9e, 0xf7, 0x7e, 0xef, 0xb7, 0xef, 0x73, 0xe6, 0x19, 0xf4, 0x41, 0x10, 0x72, 0xfa, 0x4d, 0x73, - 0x14, 0x06, 0x3c, 0xc0, 0x65, 0x79, 0x32, 0x2e, 0x1e, 0x3a, 0xfc, 0x28, 0xda, 0x6f, 0x0e, 0x02, - 0xef, 0xd2, 0x61, 0x70, 0x18, 0x5c, 0x12, 0xea, 0xfd, 0xe8, 0x40, 0x9c, 0xc4, 0x41, 0xfc, 0x92, - 0x66, 0xe4, 0x57, 0x04, 0xfa, 0x67, 0xa1, 0xc3, 0xa9, 0x45, 0x1f, 0x44, 0x94, 0x71, 0xbc, 0x0b, - 0xc0, 0x1d, 0x8f, 0x32, 0x1a, 0x3a, 0x94, 0xad, 0xa3, 0x73, 0x85, 0x46, 0x75, 0x13, 0x37, 0xd5, - 0xa7, 0x7a, 0x8e, 0x47, 0xbb, 0x42, 0xb3, 0x65, 0x3c, 0x7e, 0xb6, 0xa1, 0xfd, 0xfe, 0x6c, 0x03, - 0xef, 0x85, 0xd4, 0x76, 0xdd, 0x60, 0xd0, 0x4b, 0xad, 0xac, 0x0c, 0x03, 0xfe, 0x10, 0xca, 0xdd, - 0x20, 0x0a, 0x07, 0x74, 0x7d, 0xe1, 0x1c, 0x6a, 0xd4, 0x36, 0x37, 0x12, 0xae, 0xec, 0x57, 0x9b, - 0x12, 0xd2, 0xf6, 0x23, 0xcf, 0x2a, 0x33, 0xf1, 0x9b, 0x6c, 0x00, 0x4c, 0xa4, 0x78, 0x11, 0x0a, - 0xad, 0xbd, 0x4e, 0x5d, 0xc3, 0x4b, 0x50, 0xb4, 0xee, 0x6f, 0xb7, 0xeb, 0x88, 0x9c, 0x80, 0x65, - 0xc5, 0xc1, 0x46, 0x81, 0xcf, 0x28, 0xf9, 0x18, 0xaa, 0x16, 0xb5, 0x87, 0x49, 0x24, 0x4d, 0x58, - 0x7c, 0x10, 0x65, 0xc3, 0x58, 0x4b, 0x3e, 0x7d, 0x2f, 0xa2, 0xe1, 0x43, 0x05, 0xb3, 0x12, 0x10, - 0xb9, 0x0e, 0xba, 0x34, 0x97, 0x74, 0xf8, 0x12, 0x2c, 0x86, 0x94, 0x45, 0x2e, 0x4f, 0xec, 0x4f, - 0x4e, 0xd9, 0x4b, 0x9c, 0x95, 0xa0, 0xc8, 0x8f, 0x08, 0xf4, 0x2c, 0x35, 0x7e, 0x0f, 0x30, 0xe3, - 0x76, 0xc8, 0xfb, 0x22, 0x1f, 0xdc, 0xf6, 0x46, 0x7d, 0x2f, 0x26, 0x43, 0x8d, 0x82, 0x55, 0x17, - 0x9a, 0x5e, 0xa2, 0xd8, 0x61, 0xb8, 0x01, 0x75, 0xea, 0x0f, 0xf3, 0xd8, 0x05, 0x81, 0xad, 0x51, - 0x7f, 0x98, 0x45, 0x5e, 0x86, 0x25, 0xcf, 0xe6, 0x83, 0x23, 0x1a, 0xb2, 0xf5, 0x42, 0x3e, 0xb4, - 0x6d, 0x7b, 0x9f, 0xba, 0x3b, 0x52, 0x69, 0xa5, 0x28, 0xd2, 0x81, 0xe5, 0x9c, 0xd3, 0xf8, 0xda, - 0x2b, 0x96, 0xb9, 0x18, 0x97, 0x39, 0x5b, 0x50, 0xf2, 0x08, 0xc1, 0xaa, 0xe0, 0xea, 0xf2, 0x90, - 0xda, 0x5e, 0xca, 0x78, 0x1d, 0xaa, 0x83, 0xa3, 0xc8, 0xff, 0x2a, 0x47, 0x79, 0xfa, 0x65, 0xca, - 0x1b, 0x31, 0x48, 0xf1, 0x66, 0x2d, 0xa6, 0x5c, 0x5a, 0xf8, 0x07, 0x2e, 0x5d, 0x01, 0x2c, 0xe2, - 0xfe, 0xd4, 0x76, 0x23, 0xca, 0x92, 0xec, 0xbf, 0x09, 0xe0, 0xc6, 0xd2, 0xbe, 0x6f, 0x7b, 0x54, - 0x64, 0xbd, 0x62, 0x55, 0x84, 0x64, 0xd7, 0xf6, 0x28, 0xb9, 0x06, 0xab, 0x39, 0x23, 0x15, 0xc6, - 0x5b, 0xa0, 0x4b, 0xab, 0xaf, 0x85, 0x5c, 0xc4, 0x51, 0xb1, 0xaa, 0xee, 0x04, 0x4a, 0x56, 0x61, - 0x65, 0x3b, 0xa1, 0x49, 0xbe, 0x46, 0xae, 0x2a, 0x1f, 0x94, 0x50, 0xb1, 0x6d, 0x40, 0x75, 0xe2, - 0x43, 0x42, 0x06, 0xa9, 0x13, 0x8c, 0x60, 0xa8, 0xdf, 0x67, 0x34, 0xec, 0x72, 0x9b, 0xa7, 0x54, - 0xbf, 0x20, 0x58, 0xc9, 0x08, 0x15, 0xd5, 0x79, 0xa8, 0x39, 0xfe, 0x21, 0x65, 0xdc, 0x09, 0xfc, - 0x7e, 0x68, 0x73, 0x19, 0x12, 0xb2, 0x96, 0x53, 0xa9, 0x65, 0x73, 0x1a, 0x47, 0xed, 0x47, 0x5e, - 0x3f, 0xcd, 0x22, 0x6a, 0x14, 0xad, 0x8a, 0x1f, 0x79, 0x32, 0x79, 0x71, 0x4b, 0xda, 0x23, 0xa7, - 0x3f, 0xc5, 0x54, 0x10, 0x4c, 0x75, 0x7b, 0xe4, 0x74, 0x72, 0x64, 0x4d, 0x58, 0x0d, 0x23, 0x97, - 0x4e, 0xc3, 0x8b, 0x02, 0xbe, 0x12, 0xab, 0x72, 0x78, 0xf2, 0x25, 0xac, 0xc6, 0x8e, 0x77, 0x6e, - 0xe6, 0x5d, 0x3f, 0x0d, 0x8b, 0x11, 0xa3, 0x61, 0xdf, 0x19, 0xaa, 0x32, 0x94, 0xe3, 0x63, 0x67, - 0x88, 0x2f, 0x42, 0x71, 0x68, 0x73, 0x5b, 0xb8, 0x59, 0xdd, 0x3c, 0x93, 0x14, 0xfb, 0xa5, 0xe0, - 0x2d, 0x01, 0x23, 0xb7, 0x01, 0xc7, 0x2a, 0x96, 0x67, 0x7f, 0x1f, 0x4a, 0x2c, 0x16, 0xa8, 0x96, - 0x3b, 0x9b, 0x65, 0x99, 0xf2, 0xc4, 0x92, 0x48, 0xf2, 0x33, 0x02, 0x73, 0x87, 0xf2, 0xd0, 0x19, - 0xb0, 0x5b, 0x41, 0x98, 0x9d, 0x19, 0xf6, 0xba, 0x67, 0xf7, 0x1a, 0xe8, 0xc9, 0x54, 0xf6, 0x19, - 0xe5, 0x6a, 0x7e, 0x4f, 0xce, 0x9a, 0x5f, 0x66, 0x55, 0x13, 0x68, 0x97, 0x72, 0xd2, 0x81, 0x8d, - 0xb9, 0x3e, 0xab, 0x54, 0x5c, 0x80, 0xb2, 0x27, 0x20, 0x2a, 0x17, 0xb5, 0x84, 0x56, 0x1a, 0x5a, - 0x4a, 0x4b, 0x7e, 0x43, 0x70, 0x62, 0x6a, 0x22, 0xe3, 0x10, 0x0e, 0xc2, 0xc0, 0x53, 0xb5, 0xce, - 0x56, 0xab, 0x16, 0xcb, 0x3b, 0x4a, 0xdc, 0x19, 0x66, 0xcb, 0xb9, 0x90, 0x2b, 0xe7, 0x75, 0x28, - 0x8b, 0xd6, 0x4e, 0x6e, 0xa5, 0x95, 0x5c, 0x54, 0x7b, 0xb6, 0x13, 0x6e, 0xad, 0xa9, 0x67, 0x43, - 0x17, 0xa2, 0xd6, 0xd0, 0x1e, 0x71, 0x1a, 0x5a, 0xca, 0x0c, 0xbf, 0x0b, 0x65, 0x79, 0x23, 0xac, - 0x17, 0x05, 0xc1, 0x72, 0x42, 0x90, 0xbd, 0x34, 0x14, 0x84, 0xfc, 0x80, 0xa0, 0x24, 0x5d, 0x7f, - 0x5d, 0xb5, 0x32, 0x60, 0x89, 0xfa, 0x83, 0x60, 0xe8, 0xf8, 0x87, 0x62, 0x44, 0x4a, 0x56, 0x7a, - 0xc6, 0x58, 0xb5, 0x6e, 0x3c, 0x0b, 0xba, 0xea, 0xcf, 0x75, 0x38, 0xd5, 0x0b, 0x6d, 0x9f, 0x1d, - 0xd0, 0x50, 0x38, 0x96, 0x16, 0x86, 0x7c, 0x0b, 0x30, 0xc9, 0x77, 0x26, 0x4f, 0xe8, 0xdf, 0xe5, - 0xa9, 0x09, 0x8b, 0xcc, 0xf6, 0x46, 0x6e, 0x7a, 0x4f, 0xa6, 0x85, 0xee, 0x0a, 0xb1, 0xca, 0x54, - 0x02, 0x22, 0x57, 0xa1, 0x92, 0x52, 0xc7, 0x9e, 0xa7, 0x37, 0xa2, 0x6e, 0x89, 0xdf, 0x78, 0x0d, - 0x4a, 0xe2, 0xbe, 0x13, 0x89, 0xd0, 0x2d, 0x79, 0x20, 0x2d, 0x28, 0x4b, 0xbe, 0x89, 0x5e, 0xde, - 0x39, 0xf2, 0x10, 0xdf, 0x95, 0x33, 0xb2, 0x58, 0xe5, 0x93, 0x14, 0x92, 0x16, 0x2c, 0xe7, 0x5a, - 0x35, 0xf7, 0x76, 0xa1, 0x57, 0x7c, 0xbb, 0xca, 0xb2, 0x7d, 0xff, 0x73, 0xde, 0x48, 0x1f, 0xf4, - 0xec, 0x47, 0xf0, 0x79, 0x28, 0xf2, 0x87, 0x23, 0x19, 0x55, 0x6d, 0x42, 0x27, 0xd4, 0xbd, 0x87, - 0x23, 0x6a, 0x09, 0x75, 0x9a, 0x31, 0xd9, 0xed, 0x53, 0x19, 0x2b, 0x08, 0xa1, 0xca, 0xd8, 0xf7, - 0x08, 0x6a, 0x93, 0x42, 0xdf, 0x72, 0x5c, 0xfa, 0x7f, 0xcc, 0x95, 0x01, 0x4b, 0x07, 0x8e, 0x4b, - 0x85, 0x0f, 0xf2, 0x73, 0xe9, 0x79, 0x66, 0x1f, 0x9e, 0x82, 0xb5, 0xa4, 0x0f, 0x7b, 0xdd, 0x9b, - 0x5b, 0x49, 0x17, 0xbe, 0xf3, 0x09, 0x54, 0xd2, 0xd0, 0x70, 0x05, 0x4a, 0xed, 0x7b, 0xf7, 0x5b, - 0xdb, 0x75, 0x0d, 0x2f, 0x43, 0x65, 0xf7, 0x6e, 0xaf, 0x2f, 0x8f, 0x08, 0x9f, 0x80, 0xaa, 0xd5, - 0xbe, 0xdd, 0xfe, 0xbc, 0xbf, 0xd3, 0xea, 0xdd, 0xb8, 0x53, 0x5f, 0xc0, 0x18, 0x6a, 0x52, 0xb0, - 0x7b, 0x57, 0xc9, 0x0a, 0x9b, 0xc7, 0x25, 0x58, 0x4a, 0x7c, 0xc7, 0x57, 0xa1, 0xb8, 0x17, 0xb1, - 0x23, 0xbc, 0x36, 0x6b, 0xb9, 0x33, 0x4e, 0x4e, 0x49, 0xd5, 0x4c, 0x68, 0xf8, 0x03, 0x28, 0x89, - 0x4d, 0x02, 0xcf, 0xdc, 0xcc, 0x8c, 0xd9, 0xfb, 0x16, 0xd1, 0xf0, 0x4d, 0xa8, 0x66, 0x36, 0x90, - 0x39, 0xd6, 0x67, 0x73, 0xd2, 0xfc, 0xb2, 0x42, 0xb4, 0xcb, 0x08, 0xdf, 0x81, 0x6a, 0x66, 0x01, - 0xc0, 0x46, 0xae, 0x99, 0x72, 0xab, 0xc4, 0x84, 0x6b, 0xc6, 0xc6, 0x40, 0x34, 0xdc, 0x06, 0x98, - 0xbc, 0xfd, 0xf8, 0x4c, 0x0e, 0x9c, 0x5d, 0x12, 0x0c, 0x63, 0x96, 0x2a, 0xa5, 0xd9, 0x82, 0x4a, - 0xfa, 0xf2, 0xe1, 0xf5, 0x19, 0x8f, 0xa1, 0x24, 0x99, 0xff, 0x4c, 0x12, 0x0d, 0xdf, 0x02, 0xbd, - 0xe5, 0xba, 0xaf, 0x42, 0x63, 0x64, 0x35, 0x6c, 0x9a, 0xc7, 0x85, 0xd3, 0x73, 0x1e, 0x1b, 0x7c, - 0x21, 0xff, 0xa8, 0xcc, 0x7b, 0x41, 0x8d, 0xb7, 0xff, 0x16, 0x97, 0x7e, 0x6d, 0x07, 0x6a, 0xf9, - 0x8b, 0x13, 0xcf, 0x5b, 0x1c, 0x0d, 0x33, 0x55, 0xcc, 0xbe, 0x69, 0xb5, 0x46, 0x5c, 0x59, 0x3d, - 0xdb, 0xff, 0xf8, 0xd4, 0xcb, 0x64, 0xf1, 0x68, 0x1a, 0x6f, 0x4c, 0x73, 0x65, 0xa7, 0x25, 0x66, - 0xda, 0xfa, 0xe8, 0xc9, 0x73, 0x53, 0x7b, 0xfa, 0xdc, 0xd4, 0x5e, 0x3c, 0x37, 0xd1, 0x77, 0x63, - 0x13, 0xfd, 0x34, 0x36, 0xd1, 0xe3, 0xb1, 0x89, 0x9e, 0x8c, 0x4d, 0xf4, 0xc7, 0xd8, 0x44, 0x7f, - 0x8e, 0x4d, 0xed, 0xc5, 0xd8, 0x44, 0x8f, 0x8e, 0x4d, 0xed, 0xc9, 0xb1, 0xa9, 0x3d, 0x3d, 0x36, - 0xb5, 0x2f, 0xca, 0x03, 0xd7, 0xa1, 0x3e, 0xdf, 0x2f, 0x8b, 0xff, 0x58, 0x57, 0xfe, 0x0a, 0x00, - 0x00, 0xff, 0xff, 0x82, 0xee, 0x79, 0x78, 0xaa, 0x0d, 0x00, 0x00, + // 1444 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xb4, 0x57, 0x4b, 0x6f, 0xdb, 0xc6, + 0x16, 0x26, 0xad, 0x87, 0xad, 0x23, 0x5a, 0xa1, 0xc7, 0x8e, 0xe3, 0x28, 0xf7, 0x52, 0xc9, 0x00, + 0xc9, 0x35, 0xee, 0xbd, 0x51, 0x52, 0x07, 0x69, 0xbd, 0x68, 0x11, 0xc8, 0x89, 0xec, 0xa8, 0xb5, + 0x64, 0x67, 0x24, 0x35, 0x6d, 0x81, 0x42, 0xa0, 0xa5, 0xb1, 0x4d, 0x94, 0xa4, 0x14, 0x3e, 0x8a, + 0x7a, 0x51, 0xa0, 0x40, 0x97, 0x5d, 0x34, 0xcb, 0xfe, 0x84, 0xae, 0xbb, 0xe9, 0xbe, 0xab, 0x2c, + 0xb3, 0x0c, 0xba, 0x08, 0x1a, 0x79, 0xd3, 0x65, 0xd0, 0x5f, 0x50, 0xcc, 0x83, 0x14, 0xa9, 0xc8, + 0x68, 0xfa, 0xc8, 0x8e, 0x73, 0xce, 0x37, 0xdf, 0x9c, 0x39, 0xaf, 0x39, 0x04, 0xad, 0x3f, 0xf4, + 0x02, 0xfa, 0x45, 0x75, 0xe4, 0x0d, 0x83, 0x21, 0xca, 0x8b, 0x55, 0xf9, 0xfa, 0x91, 0x15, 0x1c, + 0x87, 0x07, 0xd5, 0xfe, 0xd0, 0xb9, 0x71, 0x34, 0x3c, 0x1a, 0xde, 0xe0, 0xea, 0x83, 0xf0, 0x90, + 0xaf, 0xf8, 0x82, 0x7f, 0x89, 0x6d, 0xf8, 0x37, 0x15, 0xb4, 0x87, 0x9e, 0x15, 0x50, 0x42, 0x1f, + 0x85, 0xd4, 0x0f, 0x50, 0x0b, 0x20, 0xb0, 0x1c, 0xea, 0x53, 0xcf, 0xa2, 0xfe, 0x9a, 0x7a, 0x39, + 0xb3, 0x5e, 0xdc, 0x40, 0x55, 0x79, 0x54, 0xc7, 0x72, 0x68, 0x9b, 0x6b, 0xb6, 0xca, 0x4f, 0x9e, + 0x57, 0x94, 0x9f, 0x9f, 0x57, 0xd0, 0xbe, 0x47, 0x4d, 0xdb, 0x1e, 0xf6, 0x3b, 0xf1, 0x2e, 0x92, + 0x60, 0x40, 0xef, 0x40, 0xbe, 0x3d, 0x0c, 0xbd, 0x3e, 0x5d, 0x9b, 0xbb, 0xac, 0xae, 0x97, 0x36, + 0x2a, 0x11, 0x57, 0xf2, 0xd4, 0xaa, 0x80, 0xd4, 0xdd, 0xd0, 0x21, 0x79, 0x9f, 0x7f, 0xa3, 0x4d, + 0x58, 0x70, 0x68, 0x60, 0x0e, 0xcc, 0xc0, 0x5c, 0xcb, 0x70, 0x33, 0x56, 0xa3, 0xad, 0x4d, 0x1a, + 0x78, 0x56, 0xbf, 0x29, 0xb5, 0x5b, 0xd9, 0x27, 0xcf, 0x2b, 0x2a, 0x89, 0xd1, 0xb8, 0x02, 0x30, + 0xe1, 0x43, 0xf3, 0x90, 0xa9, 0xed, 0x37, 0x74, 0x05, 0x2d, 0x40, 0x96, 0x74, 0x77, 0xeb, 0xba, + 0x8a, 0xcf, 0xc1, 0xa2, 0x3c, 0xdd, 0x1f, 0x0d, 0x5d, 0x9f, 0xe2, 0xf7, 0xa0, 0x48, 0xa8, 0x39, + 0x88, 0x7c, 0x50, 0x85, 0xf9, 0x47, 0x61, 0xd2, 0x01, 0x2b, 0xd1, 0xc9, 0x0f, 0x42, 0xea, 0x9d, + 0x48, 0x18, 0x89, 0x40, 0xf8, 0x0e, 0x68, 0x62, 0xbb, 0xa0, 0x43, 0x37, 0x60, 0xde, 0xa3, 0x7e, + 0x68, 0x07, 0xd1, 0xfe, 0xf3, 0x53, 0xfb, 0x05, 0x8e, 0x44, 0x28, 0xfc, 0x9d, 0x0a, 0x5a, 0x92, + 0x1a, 0xfd, 0x1f, 0x90, 0x1f, 0x98, 0x5e, 0xd0, 0xe3, 0x9e, 0x0c, 0x4c, 0x67, 0xd4, 0x73, 0x18, + 0x99, 0xba, 0x9e, 0x21, 0x3a, 0xd7, 0x74, 0x22, 0x45, 0xd3, 0x47, 0xeb, 0xa0, 0x53, 0x77, 0x90, + 0xc6, 0xce, 0x71, 0x6c, 0x89, 0xba, 0x83, 0x24, 0xf2, 0x26, 0x2c, 0x38, 0x66, 0xd0, 0x3f, 0xa6, + 0x9e, 0x2f, 0x9d, 0x1a, 0x5f, 0x6d, 0xd7, 0x3c, 0xa0, 0x76, 0x53, 0x28, 0x49, 0x8c, 0xc2, 0x0d, + 0x58, 0x4c, 0x19, 0x8d, 0x36, 0x5f, 0x33, 0x41, 0x58, 0x54, 0x94, 0x64, 0x2a, 0xe0, 0xc7, 0x2a, + 0x2c, 0x73, 0xae, 0x76, 0xe0, 0x51, 0xd3, 0x89, 0x19, 0xef, 0x40, 0xb1, 0x7f, 0x1c, 0xba, 0x9f, + 0xa5, 0x28, 0x2f, 0xbc, 0x4a, 0x79, 0x97, 0x81, 0x24, 0x6f, 0x72, 0xc7, 0x94, 0x49, 0x73, 0x7f, + 0xc2, 0xa4, 0x5b, 0x80, 0xf8, 0xbd, 0x3f, 0x34, 0xed, 0x90, 0xfa, 0x91, 0xf7, 0xff, 0x0d, 0x60, + 0x33, 0x69, 0xcf, 0x35, 0x1d, 0xca, 0xbd, 0x5e, 0x20, 0x05, 0x2e, 0x69, 0x99, 0x0e, 0xc5, 0x9b, + 0xb0, 0x9c, 0xda, 0x24, 0xaf, 0x71, 0x05, 0x34, 0xb1, 0xeb, 0x73, 0x2e, 0xe7, 0xf7, 0x28, 0x90, + 0xa2, 0x3d, 0x81, 0xe2, 0x65, 0x58, 0xda, 0x8d, 0x68, 0xa2, 0xd3, 0xf0, 0x6d, 0x69, 0x83, 0x14, + 0x4a, 0xb6, 0x0a, 0x14, 0x27, 0x36, 0x44, 0x64, 0x10, 0x1b, 0xe1, 0x63, 0x04, 0x7a, 0xd7, 0xa7, + 0x5e, 0x3b, 0x30, 0x83, 0x98, 0xea, 0x47, 0x15, 0x96, 0x12, 0x42, 0x49, 0x75, 0x15, 0x4a, 0x96, + 0x7b, 0x44, 0xfd, 0xc0, 0x1a, 0xba, 0x3d, 0xcf, 0x0c, 0xc4, 0x95, 0x54, 0xb2, 0x18, 0x4b, 0x89, + 0x19, 0x50, 0x76, 0x6b, 0x37, 0x74, 0x7a, 0xb1, 0x17, 0xd5, 0xf5, 0x2c, 0x29, 0xb8, 0xa1, 0x23, + 0x9c, 0xc7, 0x52, 0xd2, 0x1c, 0x59, 0xbd, 0x29, 0xa6, 0x0c, 0x67, 0xd2, 0xcd, 0x91, 0xd5, 0x48, + 0x91, 0x55, 0x61, 0xd9, 0x0b, 0x6d, 0x3a, 0x0d, 0xcf, 0x72, 0xf8, 0x12, 0x53, 0xa5, 0xf0, 0xf8, + 0x53, 0x58, 0x66, 0x86, 0x37, 0xee, 0xa5, 0x4d, 0xbf, 0x00, 0xf3, 0xa1, 0x4f, 0xbd, 0x9e, 0x35, + 0x90, 0x61, 0xc8, 0xb3, 0x65, 0x63, 0x80, 0xae, 0x43, 0x96, 0x77, 0x06, 0x66, 0x66, 0x71, 0xe3, + 0x62, 0x14, 0xec, 0x57, 0x2e, 0x4f, 0x38, 0x0c, 0xef, 0x00, 0x62, 0x2a, 0x3f, 0xcd, 0xfe, 0x16, + 0xe4, 0x7c, 0x26, 0x90, 0x29, 0x77, 0x29, 0xc9, 0x32, 0x65, 0x09, 0x11, 0x48, 0xfc, 0x83, 0x0a, + 0x86, 0x68, 0x3f, 0xfe, 0xf6, 0xd0, 0x4b, 0xd6, 0x8c, 0xff, 0xa6, 0x6b, 0x77, 0x13, 0xb4, 0xa8, + 0x2a, 0x7b, 0x3e, 0x0d, 0x64, 0xfd, 0x9e, 0x9f, 0x55, 0xbf, 0x3e, 0x29, 0x46, 0xd0, 0x36, 0x0d, + 0x70, 0x03, 0x2a, 0x67, 0xda, 0x2c, 0x5d, 0x71, 0x0d, 0xf2, 0x0e, 0x87, 0x48, 0x5f, 0x94, 0xd2, + 0xbd, 0x96, 0x48, 0x2d, 0xfe, 0x49, 0x85, 0x73, 0x53, 0x15, 0xc9, 0xae, 0x70, 0xe8, 0x0d, 0x1d, + 0x19, 0xeb, 0x64, 0xb4, 0x4a, 0x4c, 0xde, 0x90, 0xe2, 0xc6, 0x20, 0x19, 0xce, 0xb9, 0x54, 0x38, + 0xef, 0x40, 0x9e, 0xa7, 0x76, 0xd4, 0x95, 0x96, 0x52, 0xb7, 0xda, 0x37, 0x2d, 0x6f, 0x6b, 0x45, + 0x3e, 0x38, 0x1a, 0x17, 0xd5, 0x06, 0xe6, 0x28, 0xa0, 0x1e, 0x91, 0xdb, 0xd0, 0xff, 0x20, 0x2f, + 0x3a, 0xc2, 0x5a, 0x96, 0x13, 0x2c, 0x46, 0x04, 0xc9, 0xa6, 0x21, 0x21, 0xf8, 0x5b, 0x15, 0x72, + 0xc2, 0xf4, 0x37, 0x15, 0xab, 0x32, 0x2c, 0x50, 0xb7, 0x3f, 0x1c, 0x58, 0xee, 0x11, 0x2f, 0x91, + 0x1c, 0x89, 0xd7, 0x08, 0xc9, 0xd4, 0x65, 0xb5, 0xa0, 0xc9, 0xfc, 0x5c, 0x83, 0xd5, 0x8e, 0x67, + 0xba, 0xfe, 0x21, 0xf5, 0xb8, 0x61, 0x71, 0x60, 0xf0, 0x97, 0x00, 0x13, 0x7f, 0x27, 0xfc, 0xa4, + 0xfe, 0x35, 0x3f, 0x55, 0x61, 0xde, 0x37, 0x9d, 0x91, 0x1d, 0xf7, 0xc9, 0x38, 0xd0, 0x6d, 0x2e, + 0x96, 0x9e, 0x8a, 0x40, 0xf8, 0x36, 0x14, 0x62, 0x6a, 0x66, 0x79, 0xdc, 0x11, 0x35, 0xc2, 0xbf, + 0xd1, 0x0a, 0xe4, 0x78, 0xbf, 0xe3, 0x8e, 0xd0, 0x88, 0x58, 0xe0, 0x1a, 0xe4, 0x05, 0xdf, 0x44, + 0x2f, 0x7a, 0x8e, 0x58, 0xb0, 0x5e, 0x39, 0xc3, 0x8b, 0xc5, 0x60, 0xe2, 0x42, 0x5c, 0x83, 0xc5, + 0x54, 0xaa, 0xa6, 0xde, 0x2e, 0xf5, 0xb5, 0xde, 0xae, 0x6f, 0xe6, 0xa0, 0x94, 0x9e, 0x15, 0xd0, + 0x6d, 0xc8, 0x06, 0x27, 0x23, 0x61, 0x4d, 0x69, 0xe3, 0xca, 0xec, 0x89, 0x42, 0x2e, 0x3b, 0x27, + 0x23, 0x4a, 0x38, 0x9c, 0x75, 0x63, 0x51, 0x00, 0xe2, 0x49, 0x10, 0xc9, 0x0b, 0x42, 0xc4, 0xda, + 0x31, 0x73, 0xcd, 0x31, 0xb5, 0x47, 0x3c, 0xa8, 0x05, 0xc2, 0xbf, 0x99, 0x2c, 0x74, 0xad, 0x60, + 0x2d, 0x27, 0x64, 0xec, 0x1b, 0x9f, 0x00, 0x4c, 0xc8, 0x51, 0x11, 0xe6, 0xbb, 0xad, 0x0f, 0x5a, + 0x7b, 0x0f, 0x5b, 0xba, 0xc2, 0x16, 0x77, 0xf7, 0xba, 0xad, 0x4e, 0x9d, 0xe8, 0x2a, 0x2a, 0x40, + 0x6e, 0xa7, 0xd6, 0xdd, 0xa9, 0xeb, 0x73, 0x68, 0x11, 0x0a, 0xf7, 0x1b, 0xed, 0xce, 0xde, 0x0e, + 0xa9, 0x35, 0xf5, 0x0c, 0x42, 0x50, 0xe2, 0x9a, 0x89, 0x2c, 0xcb, 0xb6, 0xb6, 0xbb, 0xcd, 0x66, + 0x8d, 0x7c, 0xac, 0xe7, 0xd8, 0x9c, 0xd3, 0x68, 0x6d, 0xef, 0xe9, 0x79, 0xa4, 0xc1, 0x42, 0xbb, + 0x53, 0xeb, 0xd4, 0xdb, 0xf5, 0x8e, 0x3e, 0x8f, 0x1b, 0x90, 0x17, 0x47, 0xff, 0xed, 0x2c, 0xc2, + 0x3d, 0xd0, 0x92, 0x2e, 0x47, 0x57, 0x53, 0x5e, 0x8d, 0xe9, 0xb8, 0x3a, 0xe1, 0xc5, 0x28, 0x7f, + 0x84, 0xfb, 0xa6, 0xf2, 0x27, 0xc3, 0x85, 0x32, 0x7f, 0xbe, 0x56, 0xa1, 0x34, 0x49, 0xfb, 0x6d, + 0xcb, 0xa6, 0xff, 0x44, 0x97, 0x29, 0xc3, 0xc2, 0xa1, 0x65, 0x53, 0x6e, 0x83, 0x38, 0x2e, 0x5e, + 0xcf, 0xac, 0xca, 0x55, 0x58, 0x89, 0xaa, 0xb2, 0xd3, 0xbe, 0xb7, 0x15, 0xd5, 0xe4, 0x7f, 0xdf, + 0x87, 0x42, 0x7c, 0x35, 0x16, 0xa9, 0xfa, 0x83, 0x6e, 0x6d, 0x57, 0x57, 0x58, 0xa4, 0x5a, 0x7b, + 0x9d, 0x9e, 0x58, 0xaa, 0xe8, 0x1c, 0x14, 0x49, 0x7d, 0xa7, 0xfe, 0x51, 0xaf, 0x59, 0xeb, 0xdc, + 0xbd, 0xaf, 0xcf, 0xb1, 0xd0, 0x09, 0x41, 0x6b, 0x4f, 0xca, 0x32, 0x1b, 0xa7, 0x39, 0x58, 0x88, + 0x6c, 0x67, 0xd9, 0xb9, 0x1f, 0xfa, 0xc7, 0x68, 0x65, 0xd6, 0x90, 0x5c, 0x3e, 0x3f, 0x25, 0x95, + 0x1d, 0x42, 0x41, 0x6f, 0x43, 0x8e, 0xcf, 0x55, 0x68, 0xe6, 0x9c, 0x5a, 0x9e, 0x3d, 0x7d, 0x62, + 0x05, 0xdd, 0x83, 0x62, 0x62, 0x1e, 0x3b, 0x63, 0xf7, 0xa5, 0x94, 0x34, 0x3d, 0xba, 0x61, 0xe5, + 0xa6, 0x8a, 0xee, 0x43, 0x31, 0x31, 0x0e, 0xa1, 0x72, 0x2a, 0x99, 0x52, 0x83, 0xd5, 0x84, 0x6b, + 0xc6, 0xfc, 0x84, 0x15, 0x54, 0x07, 0x98, 0x4c, 0x42, 0xe8, 0x62, 0x0a, 0x9c, 0x1c, 0x99, 0xca, + 0xe5, 0x59, 0xaa, 0x98, 0x66, 0x0b, 0x0a, 0xf1, 0x1c, 0x80, 0xd6, 0x66, 0x8c, 0x06, 0x82, 0xe4, + 0xec, 0xa1, 0x01, 0x2b, 0x68, 0x1b, 0xb4, 0x9a, 0x6d, 0xbf, 0x0e, 0x4d, 0x39, 0xa9, 0xf1, 0xa7, + 0x79, 0x6c, 0xb8, 0x70, 0xc6, 0xd3, 0x8b, 0xae, 0xa5, 0x9b, 0xcf, 0x59, 0xf3, 0x44, 0xf9, 0x3f, + 0x7f, 0x88, 0x8b, 0x4f, 0x6b, 0x42, 0x29, 0xfd, 0x8c, 0xa0, 0xb3, 0xc6, 0xe8, 0xb2, 0x11, 0x2b, + 0x66, 0xbf, 0x3b, 0xca, 0x3a, 0x8b, 0xac, 0x96, 0xcc, 0x7f, 0xb4, 0xfa, 0x2a, 0x19, 0x2b, 0xcd, + 0xf2, 0xbf, 0xa6, 0xb9, 0x92, 0xd5, 0xc2, 0x98, 0xb6, 0xde, 0x7d, 0xfa, 0xc2, 0x50, 0x9e, 0xbd, + 0x30, 0x94, 0x97, 0x2f, 0x0c, 0xf5, 0xab, 0xb1, 0xa1, 0x7e, 0x3f, 0x36, 0xd4, 0x27, 0x63, 0x43, + 0x7d, 0x3a, 0x36, 0xd4, 0x5f, 0xc6, 0x86, 0xfa, 0xeb, 0xd8, 0x50, 0x5e, 0x8e, 0x0d, 0xf5, 0xf1, + 0xa9, 0xa1, 0x3c, 0x3d, 0x35, 0x94, 0x67, 0xa7, 0x86, 0xf2, 0x49, 0xbe, 0x6f, 0x5b, 0xd4, 0x0d, + 0x0e, 0xf2, 0xfc, 0x5f, 0xf5, 0xd6, 0xef, 0x01, 0x00, 0x00, 0xff, 0xff, 0x5b, 0xa2, 0xf3, 0x2b, + 0xf2, 0x0e, 0x00, 0x00, } func (x MatchType) String() string { @@ -1543,6 +1670,13 @@ func (x WriteRequest_SourceEnum) String() string { } return strconv.Itoa(int(x)) } +func (x MetricMetadata_MetricType) String() string { + s, ok := MetricMetadata_MetricType_name[int32(x)] + if ok { + return s + } + return strconv.Itoa(int(x)) +} func (this *WriteRequest) Equal(that interface{}) bool { if that == nil { return this == nil @@ -1573,6 +1707,14 @@ func (this *WriteRequest) Equal(that interface{}) bool { if this.Source != that1.Source { return false } + if len(this.Metadata) != len(that1.Metadata) { + return false + } + for i := range this.Metadata { + if !this.Metadata[i].Equal(that1.Metadata[i]) { + return false + } + } return true } func (this *WriteResponse) Equal(that interface{}) bool { @@ -2249,6 +2391,39 @@ func (this *LabelMatchers) Equal(that interface{}) bool { } return true } +func (this *MetricMetadata) Equal(that interface{}) bool { + if that == nil { + return this == nil + } + + that1, ok := that.(*MetricMetadata) + if !ok { + that2, ok := that.(MetricMetadata) + if ok { + that1 = &that2 + } else { + return false + } + } + if that1 == nil { + return this == nil + } else if this == nil { + return false + } + if this.Type != that1.Type { + return false + } + if this.MetricName != that1.MetricName { + return false + } + if this.Help != that1.Help { + return false + } + if this.Unit != that1.Unit { + return false + } + return true +} func (this *Metric) Equal(that interface{}) bool { if that == nil { return this == nil @@ -2366,10 +2541,13 @@ func (this *WriteRequest) GoString() string { if this == nil { return "nil" } - s := make([]string, 0, 6) + s := make([]string, 0, 7) s = append(s, "&client.WriteRequest{") s = append(s, "Timeseries: "+fmt.Sprintf("%#v", this.Timeseries)+",\n") s = append(s, "Source: "+fmt.Sprintf("%#v", this.Source)+",\n") + if this.Metadata != nil { + s = append(s, "Metadata: "+fmt.Sprintf("%#v", this.Metadata)+",\n") + } s = append(s, "}") return strings.Join(s, "") } @@ -2663,6 +2841,19 @@ func (this *LabelMatchers) GoString() string { s = append(s, "}") return strings.Join(s, "") } +func (this *MetricMetadata) GoString() string { + if this == nil { + return "nil" + } + s := make([]string, 0, 8) + s = append(s, "&client.MetricMetadata{") + s = append(s, "Type: "+fmt.Sprintf("%#v", this.Type)+",\n") + s = append(s, "MetricName: "+fmt.Sprintf("%#v", this.MetricName)+",\n") + s = append(s, "Help: "+fmt.Sprintf("%#v", this.Help)+",\n") + s = append(s, "Unit: "+fmt.Sprintf("%#v", this.Unit)+",\n") + s = append(s, "}") + return strings.Join(s, "") +} func (this *Metric) GoString() string { if this == nil { return "nil" @@ -3240,6 +3431,20 @@ func (m *WriteRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) { _ = i var l int _ = l + if len(m.Metadata) > 0 { + for iNdEx := len(m.Metadata) - 1; iNdEx >= 0; iNdEx-- { + { + size, err := m.Metadata[iNdEx].MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintCortex(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0x1a + } + } if m.Source != 0 { i = encodeVarintCortex(dAtA, i, uint64(m.Source)) i-- @@ -4135,6 +4340,55 @@ func (m *LabelMatchers) MarshalToSizedBuffer(dAtA []byte) (int, error) { return len(dAtA) - i, nil } +func (m *MetricMetadata) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *MetricMetadata) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *MetricMetadata) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if len(m.Unit) > 0 { + i -= len(m.Unit) + copy(dAtA[i:], m.Unit) + i = encodeVarintCortex(dAtA, i, uint64(len(m.Unit))) + i-- + dAtA[i] = 0x2a + } + if len(m.Help) > 0 { + i -= len(m.Help) + copy(dAtA[i:], m.Help) + i = encodeVarintCortex(dAtA, i, uint64(len(m.Help))) + i-- + dAtA[i] = 0x22 + } + if len(m.MetricName) > 0 { + i -= len(m.MetricName) + copy(dAtA[i:], m.MetricName) + i = encodeVarintCortex(dAtA, i, uint64(len(m.MetricName))) + i-- + dAtA[i] = 0x12 + } + if m.Type != 0 { + i = encodeVarintCortex(dAtA, i, uint64(m.Type)) + i-- + dAtA[i] = 0x8 + } + return len(dAtA) - i, nil +} + func (m *Metric) Marshal() (dAtA []byte, err error) { size := m.Size() dAtA = make([]byte, size) @@ -4314,6 +4568,12 @@ func (m *WriteRequest) Size() (n int) { if m.Source != 0 { n += 1 + sovCortex(uint64(m.Source)) } + if len(m.Metadata) > 0 { + for _, e := range m.Metadata { + l = e.Size() + n += 1 + l + sovCortex(uint64(l)) + } + } return n } @@ -4691,6 +4951,30 @@ func (m *LabelMatchers) Size() (n int) { return n } +func (m *MetricMetadata) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.Type != 0 { + n += 1 + sovCortex(uint64(m.Type)) + } + l = len(m.MetricName) + if l > 0 { + n += 1 + l + sovCortex(uint64(l)) + } + l = len(m.Help) + if l > 0 { + n += 1 + l + sovCortex(uint64(l)) + } + l = len(m.Unit) + if l > 0 { + n += 1 + l + sovCortex(uint64(l)) + } + return n +} + func (m *Metric) Size() (n int) { if m == nil { return 0 @@ -4770,9 +5054,15 @@ func (this *WriteRequest) String() string { if this == nil { return "nil" } + repeatedStringForMetadata := "[]*MetricMetadata{" + for _, f := range this.Metadata { + repeatedStringForMetadata += strings.Replace(f.String(), "MetricMetadata", "MetricMetadata", 1) + "," + } + repeatedStringForMetadata += "}" s := strings.Join([]string{`&WriteRequest{`, `Timeseries:` + fmt.Sprintf("%v", this.Timeseries) + `,`, `Source:` + fmt.Sprintf("%v", this.Source) + `,`, + `Metadata:` + repeatedStringForMetadata + `,`, `}`, }, "") return s @@ -5081,6 +5371,19 @@ func (this *LabelMatchers) String() string { }, "") return s } +func (this *MetricMetadata) String() string { + if this == nil { + return "nil" + } + s := strings.Join([]string{`&MetricMetadata{`, + `Type:` + fmt.Sprintf("%v", this.Type) + `,`, + `MetricName:` + fmt.Sprintf("%v", this.MetricName) + `,`, + `Help:` + fmt.Sprintf("%v", this.Help) + `,`, + `Unit:` + fmt.Sprintf("%v", this.Unit) + `,`, + `}`, + }, "") + return s +} func (this *Metric) String() string { if this == nil { return "nil" @@ -5215,6 +5518,40 @@ func (m *WriteRequest) Unmarshal(dAtA []byte) error { break } } + case 3: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Metadata", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowCortex + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthCortex + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthCortex + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Metadata = append(m.Metadata, &MetricMetadata{}) + if err := m.Metadata[len(m.Metadata)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex default: iNdEx = preIndex skippy, err := skipCortex(dAtA[iNdEx:]) @@ -7479,6 +7816,174 @@ func (m *LabelMatchers) Unmarshal(dAtA []byte) error { } return nil } +func (m *MetricMetadata) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowCortex + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: MetricMetadata: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: MetricMetadata: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Type", wireType) + } + m.Type = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowCortex + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.Type |= MetricMetadata_MetricType(b&0x7F) << shift + if b < 0x80 { + break + } + } + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field MetricName", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowCortex + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthCortex + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthCortex + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.MetricName = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 4: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Help", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowCortex + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthCortex + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthCortex + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Help = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 5: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Unit", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowCortex + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthCortex + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthCortex + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Unit = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipCortex(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthCortex + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthCortex + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} func (m *Metric) Unmarshal(dAtA []byte) error { l := len(dAtA) iNdEx := 0 diff --git a/pkg/ingester/client/cortex.proto b/pkg/ingester/client/cortex.proto index 8ca40645277..6a341b59983 100644 --- a/pkg/ingester/client/cortex.proto +++ b/pkg/ingester/client/cortex.proto @@ -34,6 +34,7 @@ message WriteRequest { RULE = 1; } SourceEnum Source = 2; + repeated MetricMetadata metadata = 3 [(gogoproto.nullable) = true]; } message WriteResponse {} @@ -142,6 +143,24 @@ message LabelMatchers { repeated LabelMatcher matchers = 1; } +message MetricMetadata { + enum MetricType { + UNKNOWN = 0; + COUNTER = 1; + GAUGE = 2; + HISTOGRAM = 3; + GAUGEHISTOGRAM = 4; + SUMMARY = 5; + INFO = 6; + STATESET = 7; + } + + MetricType type = 1; + string metric_name = 2; + string help = 4; + string unit = 5; +} + message Metric { repeated LabelPair labels = 1 [(gogoproto.nullable) = false, (gogoproto.customtype) = "LabelAdapter"]; } diff --git a/pkg/ingester/client/timeseries.go b/pkg/ingester/client/timeseries.go index 165c17ab615..73f619163fb 100644 --- a/pkg/ingester/client/timeseries.go +++ b/pkg/ingester/client/timeseries.go @@ -43,7 +43,7 @@ type PreallocConfig struct{} // RegisterFlags registers configuration settings. func (PreallocConfig) RegisterFlags(f *flag.FlagSet) { - f.IntVar(&expectedTimeseries, "ingester-client.expected-timeseries", expectedTimeseries, "Expected number of timeseries per request, use for preallocations.") + f.IntVar(&expectedTimeseries, "ingester-client.expected-timeseries", expectedTimeseries, "Expected number of timeseries per request, used for preallocations.") f.IntVar(&expectedLabels, "ingester-client.expected-labels", expectedLabels, "Expected number of labels per timeseries, used for preallocations.") f.IntVar(&expectedSamplesPerSeries, "ingester-client.expected-samples-per-series", expectedSamplesPerSeries, "Expected number of samples per timeseries, used for preallocations.") } @@ -266,11 +266,12 @@ func (bs *LabelAdapter) Compare(other LabelAdapter) int { } // ReuseSlice puts the slice back into a sync.Pool for reuse. -func ReuseSlice(slice []PreallocTimeseries) { - for i := range slice { - ReuseTimeseries(slice[i].TimeSeries) +func ReuseSlice(ts []PreallocTimeseries) { + for i := range ts { + ReuseTimeseries(ts[i].TimeSeries) } - slicePool.Put(slice[:0]) //nolint:staticcheck //see comment on slicePool for more details + + slicePool.Put(ts[:0]) //nolint:staticcheck //see comment on slicePool for more details } // ReuseTimeseries puts the timeseries back into a sync.Pool for reuse. diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 37e49fe4c58..45e2df1668b 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -372,6 +372,13 @@ func (i *Ingester) Push(ctx context.Context, req *client.WriteRequest) (*client. } } + if len(req.Metadata) > 0 { + // Given requests can only contain either metadata or samples, no-op if there is metadata for now. + logger := util.WithContext(ctx, util.Logger) + level.Debug(logger).Log("msg", "metadata received in the ingester", "count", len(req.Metadata)) + return &client.WriteResponse{}, nil + } + for _, ts := range req.Timeseries { for _, s := range ts.Samples { // append() copies the memory in `ts.Labels` except on the error path diff --git a/pkg/ingester/ingester_test.go b/pkg/ingester/ingester_test.go index 6ba960f4ed4..f8c5528a222 100644 --- a/pkg/ingester/ingester_test.go +++ b/pkg/ingester/ingester_test.go @@ -171,7 +171,7 @@ func pushTestSamples(t *testing.T, ing *Ingester, numSeries, samplesPerSeries, o // Append samples. for _, userID := range userIDs { ctx := user.InjectOrgID(context.Background(), userID) - _, err := ing.Push(ctx, client.ToWriteRequest(matrixToLables(testData[userID]), matrixToSamples(testData[userID]), client.API)) + _, err := ing.Push(ctx, client.ToWriteRequest(matrixToLables(testData[userID]), matrixToSamples(testData[userID]), nil, client.API)) require.NoError(t, err) } @@ -387,11 +387,11 @@ func TestIngesterUserSeriesLimitExceeded(t *testing.T) { // Append only one series first, expect no error. ctx := user.InjectOrgID(context.Background(), userID) - _, err := ing.Push(ctx, client.ToWriteRequest([]labels.Labels{labels1}, []client.Sample{sample1}, client.API)) + _, err := ing.Push(ctx, client.ToWriteRequest([]labels.Labels{labels1}, []client.Sample{sample1}, nil, client.API)) require.NoError(t, err) // Append to two series, expect series-exceeded error. - _, err = ing.Push(ctx, client.ToWriteRequest([]labels.Labels{labels1, labels3}, []client.Sample{sample2, sample3}, client.API)) + _, err = ing.Push(ctx, client.ToWriteRequest([]labels.Labels{labels1, labels3}, []client.Sample{sample2, sample3}, nil, client.API)) if resp, ok := httpgrpc.HTTPResponseFromError(err); !ok || resp.Code != http.StatusTooManyRequests { t.Fatalf("expected error about exceeding metrics per user, got %v", err) } @@ -444,11 +444,11 @@ func TestIngesterMetricSeriesLimitExceeded(t *testing.T) { // Append only one series first, expect no error. ctx := user.InjectOrgID(context.Background(), userID) - _, err := ing.Push(ctx, client.ToWriteRequest([]labels.Labels{labels1}, []client.Sample{sample1}, client.API)) + _, err := ing.Push(ctx, client.ToWriteRequest([]labels.Labels{labels1}, []client.Sample{sample1}, nil, client.API)) require.NoError(t, err) // Append to two series, expect series-exceeded error. - _, err = ing.Push(ctx, client.ToWriteRequest([]labels.Labels{labels1, labels3}, []client.Sample{sample2, sample3}, client.API)) + _, err = ing.Push(ctx, client.ToWriteRequest([]labels.Labels{labels1, labels3}, []client.Sample{sample2, sample3}, nil, client.API)) if resp, ok := httpgrpc.HTTPResponseFromError(err); !ok || resp.Code != http.StatusTooManyRequests { t.Fatalf("expected error about exceeding series per metric, got %v", err) } @@ -507,7 +507,7 @@ func TestIngesterValidation(t *testing.T) { }, } { t.Run(tc.desc, func(t *testing.T) { - _, err := ing.Push(ctx, client.ToWriteRequest(tc.lbls, tc.samples, client.API)) + _, err := ing.Push(ctx, client.ToWriteRequest(tc.lbls, tc.samples, nil, client.API)) require.Equal(t, tc.err, err) }) } @@ -617,7 +617,7 @@ func benchmarkIngesterPush(b *testing.B, limits validation.Limits, errorsExpecte for i := range allSamples { allSamples[i].TimestampMs = int64(j + 1) } - _, err := ing.Push(ctx, client.ToWriteRequest(allLabels, allSamples, client.API)) + _, err := ing.Push(ctx, client.ToWriteRequest(allLabels, allSamples, nil, client.API)) if !errorsExpected { require.NoError(b, err) } diff --git a/pkg/ingester/ingester_v2_test.go b/pkg/ingester/ingester_v2_test.go index 143ab571d14..93f562546f9 100644 --- a/pkg/ingester/ingester_v2_test.go +++ b/pkg/ingester/ingester_v2_test.go @@ -60,10 +60,12 @@ func TestIngester_v2Push(t *testing.T) { client.ToWriteRequest( []labels.Labels{metricLabels}, []client.Sample{{Value: 1, TimestampMs: 9}}, + nil, client.API), client.ToWriteRequest( []labels.Labels{metricLabels}, []client.Sample{{Value: 2, TimestampMs: 10}}, + nil, client.API), }, expectedErr: nil, @@ -96,10 +98,12 @@ func TestIngester_v2Push(t *testing.T) { client.ToWriteRequest( []labels.Labels{metricLabels}, []client.Sample{{Value: 2, TimestampMs: 10}}, + nil, client.API), client.ToWriteRequest( []labels.Labels{metricLabels}, []client.Sample{{Value: 1, TimestampMs: 9}}, + nil, client.API), }, expectedErr: httpgrpc.Errorf(http.StatusBadRequest, wrapWithUser(errors.Wrapf(tsdb.ErrOutOfOrderSample, "series=%s, timestamp=%s", metricLabels.String(), model.Time(9).Time().Format(time.RFC3339Nano)), userID).Error()), @@ -135,10 +139,12 @@ func TestIngester_v2Push(t *testing.T) { client.ToWriteRequest( []labels.Labels{metricLabels}, []client.Sample{{Value: 2, TimestampMs: 1575043969}}, + nil, client.API), client.ToWriteRequest( []labels.Labels{metricLabels}, []client.Sample{{Value: 1, TimestampMs: 1575043969 - (86400 * 1000)}}, + nil, client.API), }, expectedErr: httpgrpc.Errorf(http.StatusBadRequest, wrapWithUser(errors.Wrapf(tsdb.ErrOutOfBounds, "series=%s, timestamp=%s", metricLabels.String(), model.Time(1575043969-(86400*1000)).Time().Format(time.RFC3339Nano)), userID).Error()), @@ -174,10 +180,12 @@ func TestIngester_v2Push(t *testing.T) { client.ToWriteRequest( []labels.Labels{metricLabels}, []client.Sample{{Value: 2, TimestampMs: 1575043969}}, + nil, client.API), client.ToWriteRequest( []labels.Labels{metricLabels}, []client.Sample{{Value: 1, TimestampMs: 1575043969}}, + nil, client.API), }, expectedErr: httpgrpc.Errorf(http.StatusBadRequest, wrapWithUser(errors.Wrapf(tsdb.ErrAmendSample, "series=%s, timestamp=%s", metricLabels.String(), model.Time(1575043969).Time().Format(time.RFC3339Nano)), userID).Error()), @@ -297,6 +305,7 @@ func TestIngester_v2Push_ShouldHandleTheCaseTheCachedReferenceIsInvalid(t *testi req := client.ToWriteRequest( []labels.Labels{metricLabels}, []client.Sample{{Value: float64(j), TimestampMs: int64(j)}}, + nil, client.API) _, err := i.v2Push(ctx, req) @@ -361,10 +370,12 @@ func TestIngester_v2Push_ShouldCorrectlyTrackMetricsInMultiTenantScenario(t *tes client.ToWriteRequest( []labels.Labels{metricLabels}, []client.Sample{{Value: 1, TimestampMs: 9}}, + nil, client.API), client.ToWriteRequest( []labels.Labels{metricLabels}, []client.Sample{{Value: 2, TimestampMs: 10}}, + nil, client.API), } @@ -989,7 +1000,7 @@ func mockWriteRequest(lbls labels.Labels, value float64, timestampMs int64) (*cl }, } - req := client.ToWriteRequest([]labels.Labels{lbls}, samples, client.API) + req := client.ToWriteRequest([]labels.Labels{lbls}, samples, nil, client.API) // Generate the expected response expectedQueryRes := &client.QueryResponse{ diff --git a/pkg/ingester/lifecycle_test.go b/pkg/ingester/lifecycle_test.go index 53e056ef85f..fe0048d7404 100644 --- a/pkg/ingester/lifecycle_test.go +++ b/pkg/ingester/lifecycle_test.go @@ -366,7 +366,7 @@ func TestIngesterFlush(t *testing.T) { } ) ctx := user.InjectOrgID(context.Background(), userID) - _, err := ing.Push(ctx, client.ToWriteRequest(lbls, sampleData, client.API)) + _, err := ing.Push(ctx, client.ToWriteRequest(lbls, sampleData, nil, client.API)) require.NoError(t, err) // We add a 100ms sleep into the flush loop, such that we can reliably detect diff --git a/pkg/ingester/mapper.go b/pkg/ingester/mapper.go index ae51c4667a4..87c1f622b7d 100644 --- a/pkg/ingester/mapper.go +++ b/pkg/ingester/mapper.go @@ -61,7 +61,7 @@ func (m *fpMapper) mapFP(fp model.Fingerprint, metric labelPairs) model.Fingerpr if ok { // FP exists in memory, but is it for the same metric? if metric.equal(s.metric) { - // Yupp. We are done. + // Yup. We are done. return fp } // Collision detected! diff --git a/pkg/ingester/series_map.go b/pkg/ingester/series_map.go index 7fd97f32bbe..a8e4ba70613 100644 --- a/pkg/ingester/series_map.go +++ b/pkg/ingester/series_map.go @@ -13,7 +13,7 @@ import ( const seriesMapShards = 128 // seriesMap maps fingerprints to memory series. All its methods are -// goroutine-safe. A seriesMap is effectively is a goroutine-safe version of +// goroutine-safe. A seriesMap is effectively a goroutine-safe version of // map[model.Fingerprint]*memorySeries. type seriesMap struct { size int32 diff --git a/pkg/ruler/compat.go b/pkg/ruler/compat.go index 78ea5e54551..563f63b0701 100644 --- a/pkg/ruler/compat.go +++ b/pkg/ruler/compat.go @@ -36,7 +36,7 @@ func (a *appender) AddFast(l labels.Labels, ref uint64, t int64, v float64) erro } func (a *appender) Commit() error { - _, err := a.pusher.Push(user.InjectOrgID(context.Background(), a.userID), client.ToWriteRequest(a.labels, a.samples, client.RULE)) + _, err := a.pusher.Push(user.InjectOrgID(context.Background(), a.userID), client.ToWriteRequest(a.labels, a.samples, nil, client.RULE)) a.labels = nil a.samples = nil return err diff --git a/pkg/util/validation/limits.go b/pkg/util/validation/limits.go index abf69dbec51..7af7876ed6a 100644 --- a/pkg/util/validation/limits.go +++ b/pkg/util/validation/limits.go @@ -22,21 +22,23 @@ const ( // limits via flags, or per-user limits via yaml config. type Limits struct { // Distributor enforced limits. - IngestionRate float64 `yaml:"ingestion_rate"` - IngestionRateStrategy string `yaml:"ingestion_rate_strategy"` - IngestionBurstSize int `yaml:"ingestion_burst_size"` - AcceptHASamples bool `yaml:"accept_ha_samples"` - HAClusterLabel string `yaml:"ha_cluster_label"` - HAReplicaLabel string `yaml:"ha_replica_label"` - DropLabels flagext.StringSlice `yaml:"drop_labels"` - MaxLabelNameLength int `yaml:"max_label_name_length"` - MaxLabelValueLength int `yaml:"max_label_value_length"` - MaxLabelNamesPerSeries int `yaml:"max_label_names_per_series"` - RejectOldSamples bool `yaml:"reject_old_samples"` - RejectOldSamplesMaxAge time.Duration `yaml:"reject_old_samples_max_age"` - CreationGracePeriod time.Duration `yaml:"creation_grace_period"` - EnforceMetricName bool `yaml:"enforce_metric_name"` - SubringSize int `yaml:"user_subring_size"` + IngestionRate float64 `yaml:"ingestion_rate"` + IngestionRateStrategy string `yaml:"ingestion_rate_strategy"` + IngestionBurstSize int `yaml:"ingestion_burst_size"` + AcceptHASamples bool `yaml:"accept_ha_samples"` + HAClusterLabel string `yaml:"ha_cluster_label"` + HAReplicaLabel string `yaml:"ha_replica_label"` + DropLabels flagext.StringSlice `yaml:"drop_labels"` + MaxLabelNameLength int `yaml:"max_label_name_length"` + MaxLabelValueLength int `yaml:"max_label_value_length"` + MaxLabelNamesPerSeries int `yaml:"max_label_names_per_series"` + MaxMetadataLength int `yaml:"max_metadata_length"` + RejectOldSamples bool `yaml:"reject_old_samples"` + RejectOldSamplesMaxAge time.Duration `yaml:"reject_old_samples_max_age"` + CreationGracePeriod time.Duration `yaml:"creation_grace_period"` + EnforceMetadataMetricName bool `yaml:"enforce_metadata_metric_name"` + EnforceMetricName bool `yaml:"enforce_metric_name"` + SubringSize int `yaml:"user_subring_size"` // Ingester enforced limits. MaxSeriesPerQuery int `yaml:"max_series_per_query"` @@ -71,10 +73,12 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) { f.IntVar(&l.MaxLabelNameLength, "validation.max-length-label-name", 1024, "Maximum length accepted for label names") f.IntVar(&l.MaxLabelValueLength, "validation.max-length-label-value", 2048, "Maximum length accepted for label value. This setting also applies to the metric name") f.IntVar(&l.MaxLabelNamesPerSeries, "validation.max-label-names-per-series", 30, "Maximum number of label names per series.") + f.IntVar(&l.MaxMetadataLength, "validation.max-metadata-length", 1024, "Maximum length accepted for metric metadata. Metadata refers to Metric Name, HELP and UNIT.") f.BoolVar(&l.RejectOldSamples, "validation.reject-old-samples", false, "Reject old samples.") f.DurationVar(&l.RejectOldSamplesMaxAge, "validation.reject-old-samples.max-age", 14*24*time.Hour, "Maximum accepted sample age before rejecting.") f.DurationVar(&l.CreationGracePeriod, "validation.create-grace-period", 10*time.Minute, "Duration which table will be created/deleted before/after it's needed; we won't accept sample from before this time.") f.BoolVar(&l.EnforceMetricName, "validation.enforce-metric-name", true, "Enforce every sample has a metric name.") + f.BoolVar(&l.EnforceMetadataMetricName, "validation.enforce-metadata-metric-name", true, "Enforce every metadata has a metric name.") f.IntVar(&l.MaxSeriesPerQuery, "ingester.max-series-per-query", 100000, "The maximum number of series that a query can return.") f.IntVar(&l.MaxSamplesPerQuery, "ingester.max-samples-per-query", 1000000, "The maximum number of samples that a query can return.") @@ -204,6 +208,12 @@ func (o *Overrides) MaxLabelNamesPerSeries(userID string) int { return o.getOverridesForUser(userID).MaxLabelNamesPerSeries } +// MaxMetadataLength returns maximum length metadata can be. Metadata refers +// to the Metric Name, HELP and UNIT. +func (o *Overrides) MaxMetadataLength(userID string) int { + return o.getOverridesForUser(userID).MaxMetadataLength +} + // RejectOldSamples returns true when we should reject samples older than certain // age. func (o *Overrides) RejectOldSamples(userID string) bool { @@ -272,6 +282,11 @@ func (o *Overrides) EnforceMetricName(userID string) bool { return o.getOverridesForUser(userID).EnforceMetricName } +// EnforceMetadataMetricName whether to enforce the presence of a metric name on metadata. +func (o *Overrides) EnforceMetadataMetricName(userID string) bool { + return o.getOverridesForUser(userID).EnforceMetadataMetricName +} + // CardinalityLimit returns the maximum number of timeseries allowed in a query. func (o *Overrides) CardinalityLimit(userID string) int { return o.getOverridesForUser(userID).CardinalityLimit diff --git a/pkg/util/validation/validate.go b/pkg/util/validation/validate.go index 12476479e0a..b3802d9c7dc 100644 --- a/pkg/util/validation/validate.go +++ b/pkg/util/validation/validate.go @@ -17,6 +17,17 @@ import ( const ( discardReasonLabel = "reason" + errMetadataMissingMetricName = "metadata missing metric name" + errMetadataTooLong = "metadata '%s' value too long: %.200q metric %.200q" + + typeMetricName = "METRIC_NAME" + typeHelp = "HELP" + typeUnit = "UNIT" + + metricNameTooLong = "metric_name_too_long" + helpTooLong = "help_too_long" + unitTooLong = "unit_too_long" + errMissingMetricName = "sample missing metric name" errInvalidMetricName = "sample invalid metric name: %.200q" errInvalidLabel = "sample invalid label: %.200q metric %.200q" @@ -55,9 +66,17 @@ var DiscardedSamples = prometheus.NewCounterVec( }, []string{discardReasonLabel, "user"}, ) +var DiscardedMetadata = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "cortex_discarded_metadata_total", + Help: "The total number of metadata that were discarded.", + }, + []string{discardReasonLabel, "user"}, +) func init() { prometheus.MustRegister(DiscardedSamples) + prometheus.MustRegister(DiscardedMetadata) } // SampleValidationConfig helps with getting required config to validate sample. @@ -150,6 +169,45 @@ func ValidateLabels(cfg LabelValidationConfig, userID string, ls []client.LabelA return nil } +// MetadataValidationConfig helps with getting required config to validate metadata. +type MetadataValidationConfig interface { + EnforceMetadataMetricName(userID string) bool + MaxMetadataLength(userID string) int +} + +// ValidateMetadata returns an err if a metric metadata is invalid. +func ValidateMetadata(cfg MetadataValidationConfig, userID string, metadata *client.MetricMetadata) error { + if cfg.EnforceMetadataMetricName(userID) && metadata.MetricName == "" { + DiscardedMetadata.WithLabelValues(missingMetricName, userID).Inc() + return httpgrpc.Errorf(http.StatusBadRequest, errMetadataMissingMetricName) + } + + maxMetadataValueLength := cfg.MaxMetadataLength(userID) + var reason string + var cause string + var metadataType string + if len(metadata.MetricName) > maxMetadataValueLength { + metadataType = typeMetricName + reason = metricNameTooLong + cause = metadata.MetricName + } else if len(metadata.Help) > maxMetadataValueLength { + metadataType = typeHelp + reason = helpTooLong + cause = metadata.Help + } else if len(metadata.Unit) > maxMetadataValueLength { + metadataType = typeUnit + reason = unitTooLong + cause = metadata.Unit + } + + if reason != "" { + DiscardedMetadata.WithLabelValues(reason, userID).Inc() + return httpgrpc.Errorf(http.StatusBadRequest, errMetadataTooLong, metadataType, cause, metadata.MetricName) + } + + return nil +} + // this function formats label adapters as a metric name with labels, while preserving // label order, and keeping duplicates. If there are multiple "__name__" labels, only // first one is used as metric name, other ones will be included as regular labels. diff --git a/pkg/util/validation/validate_test.go b/pkg/util/validation/validate_test.go index 1378437a155..56dc9a0f314 100644 --- a/pkg/util/validation/validate_test.go +++ b/pkg/util/validation/validate_test.go @@ -34,6 +34,19 @@ func (v validateLabelsCfg) MaxLabelValueLength(userID string) int { return v.maxLabelValueLength } +type validateMetadataCfg struct { + enforceMetadataMetricName bool + maxMetadataLength int +} + +func (vm validateMetadataCfg) EnforceMetadataMetricName(userID string) bool { + return vm.enforceMetadataMetricName +} + +func (vm validateMetadataCfg) MaxMetadataLength(userID string) int { + return vm.maxMetadataLength +} + func TestValidateLabels(t *testing.T) { var cfg validateLabelsCfg userID := "testUser" @@ -82,6 +95,50 @@ func TestValidateLabels(t *testing.T) { } } +func TestValidateMetadata(t *testing.T) { + userID := "testUser" + var cfg validateMetadataCfg + cfg.enforceMetadataMetricName = true + cfg.maxMetadataLength = 22 + + for _, c := range []struct { + desc string + metadata *client.MetricMetadata + err error + }{ + { + "with a valid config", + &client.MetricMetadata{MetricName: "go_goroutines", Type: client.COUNTER, Help: "Number of goroutines.", Unit: ""}, + nil, + }, + { + "with no metric name", + &client.MetricMetadata{MetricName: "", Type: client.COUNTER, Help: "Number of goroutines.", Unit: ""}, + httpgrpc.Errorf(http.StatusBadRequest, "metadata missing metric name"), + }, + { + "with a long metric name", + &client.MetricMetadata{MetricName: "go_goroutines_and_routines_and_routines", Type: client.COUNTER, Help: "Number of goroutines.", Unit: ""}, + httpgrpc.Errorf(http.StatusBadRequest, "metadata 'METRIC_NAME' value too long: \"go_goroutines_and_routines_and_routines\" metric \"go_goroutines_and_routines_and_routines\""), + }, + { + "with a long help", + &client.MetricMetadata{MetricName: "go_goroutines", Type: client.COUNTER, Help: "Number of goroutines that currently exist.", Unit: ""}, + httpgrpc.Errorf(http.StatusBadRequest, "metadata 'HELP' value too long: \"Number of goroutines that currently exist.\" metric \"go_goroutines\""), + }, + { + "with a long unit", + &client.MetricMetadata{MetricName: "go_goroutines", Type: client.COUNTER, Help: "Number of goroutines.", Unit: "a_made_up_unit_that_is_really_long"}, + httpgrpc.Errorf(http.StatusBadRequest, "metadata 'UNIT' value too long: \"a_made_up_unit_that_is_really_long\" metric \"go_goroutines\""), + }, + } { + t.Run(c.desc, func(t *testing.T) { + err := ValidateMetadata(cfg, userID, c.metadata) + assert.Equal(t, c.err, err, "wrong error") + }) + } +} + func TestValidateLabelOrder(t *testing.T) { var cfg validateLabelsCfg cfg.maxLabelNameLength = 10