Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make use of LabelHints.Limit for LabelNames and LabelValues requests (#8805) #10410

Merged
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
* [CHANGE] Ruler: cap the rate of retries for remote query evaluation to 170/sec. This is configurable via `-ruler.query-frontend.max-retries-rate`. #10375 #10403
* [CHANGE] Query-frontend: Add `topic` label to `cortex_ingest_storage_reader_last_produced_offset_requests_total`, `cortex_ingest_storage_reader_last_produced_offset_failures_total`, `cortex_ingest_storage_reader_last_produced_offset_request_duration_seconds`, `cortex_ingest_storage_reader_partition_start_offset_requests_total`, `cortex_ingest_storage_reader_partition_start_offset_failures_total`, `cortex_ingest_storage_reader_partition_start_offset_request_duration_seconds` metrics. #10462
* [CHANGE] Ingester: Set `-ingester.ooo-native-histograms-ingestion-enabled` to true by default. #10483
* [CHANGE] Distributor, querier, ingester and store-gateway: Make use of LabelHints.Limit for label names and values requests. #10410
santileira marked this conversation as resolved.
Show resolved Hide resolved
* [FEATURE] Distributor: Add experimental Influx handler. #10153
* [ENHANCEMENT] Query Frontend: Return server-side `samples_processed` statistics. #10103
* [ENHANCEMENT] Distributor: OTLP receiver now converts also metric metadata. See also https://github.com/prometheus/prometheus/pull/15416. #10168
Expand Down
17 changes: 13 additions & 4 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/relabel"
"github.com/prometheus/prometheus/scrape"
"github.com/prometheus/prometheus/storage"
"go.uber.org/atomic"
"golang.org/x/sync/errgroup"

Expand Down Expand Up @@ -1971,13 +1972,13 @@ func queryIngesterPartitionsRingZoneSorter(preferredZone string) ring.ZoneSorter

// LabelValuesForLabelName returns the label values associated with the given labelName, among all series with samples
// timestamp between from and to, and series labels matching the optional matchers.
func (d *Distributor) LabelValuesForLabelName(ctx context.Context, from, to model.Time, labelName model.LabelName, matchers ...*labels.Matcher) ([]string, error) {
func (d *Distributor) LabelValuesForLabelName(ctx context.Context, from, to model.Time, labelName model.LabelName, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, error) {
replicationSets, err := d.getIngesterReplicationSetsForQuery(ctx)
if err != nil {
return nil, err
}

req, err := ingester_client.ToLabelValuesRequest(labelName, from, to, matchers)
req, err := ingester_client.ToLabelValuesRequest(labelName, from, to, hints, matchers)
if err != nil {
return nil, err
}
Expand All @@ -2004,6 +2005,10 @@ func (d *Distributor) LabelValuesForLabelName(ctx context.Context, from, to mode
// We need the values returned to be sorted.
slices.Sort(values)

if hints != nil && hints.Limit > 0 && len(values) > hints.Limit {
values = values[:hints.Limit]
}

return values, nil
}

Expand Down Expand Up @@ -2693,13 +2698,13 @@ func maxFromZones[T ~float64 | ~uint64](seriesCountByZone map[string]T) (val T)

// LabelNames returns the names of all labels from series with samples timestamp between from and to, and matching
// the input optional series label matchers. The returned label names are sorted.
func (d *Distributor) LabelNames(ctx context.Context, from, to model.Time, matchers ...*labels.Matcher) ([]string, error) {
func (d *Distributor) LabelNames(ctx context.Context, from, to model.Time, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, error) {
replicationSets, err := d.getIngesterReplicationSetsForQuery(ctx)
if err != nil {
return nil, err
}

req, err := ingester_client.ToLabelNamesRequest(from, to, matchers)
req, err := ingester_client.ToLabelNamesRequest(from, to, hints, matchers)
if err != nil {
return nil, err
}
Expand All @@ -2725,6 +2730,10 @@ func (d *Distributor) LabelNames(ctx context.Context, from, to model.Time, match

slices.Sort(values)

if hints != nil && hints.Limit > 0 && len(values) > hints.Limit {
values = values[:hints.Limit]
}

return values, nil
}

Expand Down
63 changes: 59 additions & 4 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/relabel"
"github.com/prometheus/prometheus/scrape"
"github.com/prometheus/prometheus/storage"
promtestutil "github.com/prometheus/prometheus/util/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -3290,6 +3291,7 @@ func TestDistributor_LabelNames(t *testing.T) {

tests := map[string]struct {
shuffleShardSize int
hints *storage.LabelHints
matchers []*labels.Matcher
expectedResult []string
expectedIngesters int
Expand All @@ -3308,6 +3310,14 @@ func TestDistributor_LabelNames(t *testing.T) {
expectedResult: []string{labels.MetricName, "reason", "status"},
expectedIngesters: numIngesters,
},
"should filter metrics by single matcher and apply limit": {
santileira marked this conversation as resolved.
Show resolved Hide resolved
hints: &storage.LabelHints{Limit: 2},
matchers: []*labels.Matcher{
mustNewMatcher(labels.MatchEqual, model.MetricNameLabel, "test_1"),
},
expectedResult: []string{labels.MetricName, "reason"},
expectedIngesters: numIngesters,
},
"should filter metrics by multiple matchers": {
matchers: []*labels.Matcher{
mustNewMatcher(labels.MatchEqual, "status", "200"),
Expand All @@ -3316,6 +3326,15 @@ func TestDistributor_LabelNames(t *testing.T) {
expectedResult: []string{labels.MetricName, "status"},
expectedIngesters: numIngesters,
},
"should filter metrics by multiple matchers and apply limit": {
hints: &storage.LabelHints{Limit: 1},
matchers: []*labels.Matcher{
mustNewMatcher(labels.MatchEqual, "status", "200"),
mustNewMatcher(labels.MatchEqual, model.MetricNameLabel, "test_1"),
},
expectedResult: []string{labels.MetricName},
expectedIngesters: numIngesters,
},
"should query only ingesters belonging to tenant's subring if shuffle sharding is enabled": {
shuffleShardSize: 3,
matchers: []*labels.Matcher{
Expand All @@ -3324,6 +3343,15 @@ func TestDistributor_LabelNames(t *testing.T) {
expectedResult: []string{labels.MetricName, "reason", "status"},
expectedIngesters: 3,
},
"should query only ingesters belonging to tenant's subring if shuffle sharding is enabled and apply limit": {
shuffleShardSize: 3,
hints: &storage.LabelHints{Limit: 1},
matchers: []*labels.Matcher{
mustNewMatcher(labels.MatchEqual, model.MetricNameLabel, "test_1"),
},
expectedResult: []string{labels.MetricName},
expectedIngesters: 3,
},
}

for testName, testData := range tests {
Expand Down Expand Up @@ -3365,7 +3393,7 @@ func TestDistributor_LabelNames(t *testing.T) {
require.NoError(t, err)
}

names, err := ds[0].LabelNames(ctx, now, now, testData.matchers...)
names, err := ds[0].LabelNames(ctx, now, now, testData.hints, testData.matchers...)
require.NoError(t, err)
assert.ElementsMatch(t, testData.expectedResult, names)

Expand Down Expand Up @@ -3551,6 +3579,7 @@ func TestDistributor_LabelValuesForLabelName(t *testing.T) {
tests := map[string]struct {
from, to model.Time
expectedLabelValues []string
hints *storage.LabelHints
matchers []*labels.Matcher
}{
"all time selected, no matchers": {
Expand All @@ -3569,6 +3598,18 @@ func TestDistributor_LabelValuesForLabelName(t *testing.T) {
expectedLabelValues: []string{"label_1"},
matchers: []*labels.Matcher{mustNewMatcher(labels.MatchEqual, "reason", "broken")},
},
"all time selected, no matchers, hints provided without limit": {
from: 0,
to: 300_000,
hints: &storage.LabelHints{Limit: 0},
expectedLabelValues: []string{"label_0", "label_1"},
},
"all time selected, no matchers, limit provided": {
from: 0,
to: 300_000,
hints: &storage.LabelHints{Limit: 1},
expectedLabelValues: []string{"label_0"},
},
}

for testName, testCase := range tests {
Expand Down Expand Up @@ -3599,7 +3640,7 @@ func TestDistributor_LabelValuesForLabelName(t *testing.T) {
require.NoError(t, err)
}

response, err := ds[0].LabelValuesForLabelName(ctx, testCase.from, testCase.to, labels.MetricName, testCase.matchers...)
response, err := ds[0].LabelValuesForLabelName(ctx, testCase.from, testCase.to, labels.MetricName, testCase.hints, testCase.matchers...)
require.NoError(t, err)
assert.ElementsMatch(t, response, testCase.expectedLabelValues)
})
Expand Down Expand Up @@ -6447,7 +6488,7 @@ func (i *mockIngester) LabelValues(ctx context.Context, req *client.LabelValuesR
return nil, errFail
}

labelName, from, to, matchers, err := client.FromLabelValuesRequest(req)
labelName, from, to, hints, matchers, err := client.FromLabelValuesRequest(req)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -6481,6 +6522,9 @@ func (i *mockIngester) LabelValues(ctx context.Context, req *client.LabelValuesR

slices.Sort(response)

if hints != nil && hints.Limit > 0 && len(response) > hints.Limit {
response = response[:hints.Limit]
}
return &client.LabelValuesResponse{LabelValues: response}, nil
}

Expand All @@ -6498,21 +6542,32 @@ func (i *mockIngester) LabelNames(ctx context.Context, req *client.LabelNamesReq
return nil, errFail
}

_, _, matchers, err := client.FromLabelNamesRequest(req)
_, _, hints, matchers, err := client.FromLabelNamesRequest(req)
if err != nil {
return nil, err
}

response := client.LabelNamesResponse{}
labelsSet := map[string]struct{}{}

for _, ts := range i.timeseries {
santileira marked this conversation as resolved.
Show resolved Hide resolved
if match(ts.Labels, matchers) {
for _, lbl := range ts.Labels {
if _, ok := labelsSet[lbl.Name]; ok {
santileira marked this conversation as resolved.
Show resolved Hide resolved
continue
}

labelsSet[lbl.Name] = struct{}{}
response.LabelNames = append(response.LabelNames, lbl.Name)
}
}
}
slices.Sort(response.LabelNames)

if hints != nil && hints.Limit > 0 && len(response.LabelNames) > hints.Limit {
response.LabelNames = response.LabelNames[:hints.Limit]
}

santileira marked this conversation as resolved.
Show resolved Hide resolved
return &response, nil
}

Expand Down
39 changes: 30 additions & 9 deletions pkg/ingester/client/compat.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"

"github.com/grafana/mimir/pkg/mimirpb"
)
Expand Down Expand Up @@ -108,62 +109,82 @@ func FromMetricsForLabelMatchersResponse(resp *MetricsForLabelMatchersResponse)
}

// ToLabelValuesRequest builds a LabelValuesRequest proto
func ToLabelValuesRequest(labelName model.LabelName, from, to model.Time, matchers []*labels.Matcher) (*LabelValuesRequest, error) {
func ToLabelValuesRequest(labelName model.LabelName, from, to model.Time, hints *storage.LabelHints, matchers []*labels.Matcher) (*LabelValuesRequest, error) {
ms, err := ToLabelMatchers(matchers)
if err != nil {
return nil, err
}

var limit int64
if hints != nil && hints.Limit > 0 {
limit = int64(hints.Limit)
}
return &LabelValuesRequest{
LabelName: string(labelName),
StartTimestampMs: int64(from),
EndTimestampMs: int64(to),
Matchers: &LabelMatchers{Matchers: ms},
Limit: limit,
}, nil
}

// FromLabelValuesRequest unpacks a LabelValuesRequest proto
func FromLabelValuesRequest(req *LabelValuesRequest) (string, int64, int64, []*labels.Matcher, error) {
func FromLabelValuesRequest(req *LabelValuesRequest) (string, int64, int64, *storage.LabelHints, []*labels.Matcher, error) {
var err error
var hints *storage.LabelHints
var matchers []*labels.Matcher

if req.Matchers != nil {
matchers, err = FromLabelMatchers(req.Matchers.Matchers)
if err != nil {
return "", 0, 0, nil, err
return "", 0, 0, nil, nil, err
}
}

return req.LabelName, req.StartTimestampMs, req.EndTimestampMs, matchers, nil
if req.Limit > 0 {
hints = &storage.LabelHints{Limit: int(req.Limit)}
}

return req.LabelName, req.StartTimestampMs, req.EndTimestampMs, hints, matchers, nil
}

// ToLabelNamesRequest builds a LabelNamesRequest proto
func ToLabelNamesRequest(from, to model.Time, matchers []*labels.Matcher) (*LabelNamesRequest, error) {
func ToLabelNamesRequest(from, to model.Time, hints *storage.LabelHints, matchers []*labels.Matcher) (*LabelNamesRequest, error) {
ms, err := ToLabelMatchers(matchers)
if err != nil {
return nil, err
}

var limit int64
if hints != nil && hints.Limit > 0 {
limit = int64(hints.Limit)
}

return &LabelNamesRequest{
StartTimestampMs: int64(from),
EndTimestampMs: int64(to),
Matchers: &LabelMatchers{Matchers: ms},
Limit: limit,
}, nil
}

// FromLabelNamesRequest unpacks a LabelNamesRequest proto
func FromLabelNamesRequest(req *LabelNamesRequest) (int64, int64, []*labels.Matcher, error) {
func FromLabelNamesRequest(req *LabelNamesRequest) (int64, int64, *storage.LabelHints, []*labels.Matcher, error) {
var err error
var hints *storage.LabelHints
var matchers []*labels.Matcher

if req.Matchers != nil {
matchers, err = FromLabelMatchers(req.Matchers.Matchers)
if err != nil {
return 0, 0, nil, err
return 0, 0, nil, nil, err
}
}

return req.StartTimestampMs, req.EndTimestampMs, matchers, nil
if req.Limit != 0 {
hints = &storage.LabelHints{Limit: int(req.Limit)}
}

return req.StartTimestampMs, req.EndTimestampMs, hints, matchers, nil
}

func ToActiveSeriesRequest(matchers []*labels.Matcher) (*ActiveSeriesRequest, error) {
Expand Down
7 changes: 5 additions & 2 deletions pkg/ingester/client/compat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -73,15 +74,17 @@ func TestLabelNamesRequest(t *testing.T) {
mint, maxt = 0, 10
)

hints := &storage.LabelHints{Limit: 10}
matchers := []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")}

req, err := ToLabelNamesRequest(mint, maxt, matchers)
req, err := ToLabelNamesRequest(mint, maxt, hints, matchers)
require.NoError(t, err)

actualMinT, actualMaxT, actualMatchers, err := FromLabelNamesRequest(req)
actualMinT, actualMaxT, actualHints, actualMatchers, err := FromLabelNamesRequest(req)
require.NoError(t, err)

assert.Equal(t, int64(mint), actualMinT)
assert.Equal(t, int64(maxt), actualMaxT)
assert.Equal(t, hints, actualHints)
assert.Equal(t, matchers, actualMatchers)
}
Loading