Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
* [BUGFIX] Fix configuration for TLS server validation, TLS skip verify was hardcoded to true for all TLS configurations and prevented validation of server certificates. #3030
* [BUGFIX] Fixes the Alertmanager panicking when no `-alertmanager.web.external-url` is provided. #3017
* [BUGFIX] Fixes the registration of the Alertmanager API metrics `cortex_alertmanager_alerts_received_total` and `cortex_alertmanager_alerts_invalid_total`. #3065
* [BUGFIX] An index optimisation actually slows things down when using caching. Moved it to the right location. #2973
* [BUGFIX] Ingester: If push request contained both valid and invalid samples, valid samples were ingested but not stored to WAL of the chunks storage. This has been fixed. #3067

## 1.3.0 / 2020-08-21
Expand Down
31 changes: 21 additions & 10 deletions pkg/chunk/chunk_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -460,16 +460,6 @@ func (c *baseStore) lookupIdsByMetricNameMatcher(ctx context.Context, from, thro
} else if matcher.Type == labels.MatchEqual {
labelName = matcher.Name
queries, err = c.schema.GetReadQueriesForMetricLabelValue(from, through, userID, metricName, matcher.Name, matcher.Value)
} else if matcher.Type == labels.MatchRegexp && len(FindSetMatches(matcher.Value)) > 0 {
set := FindSetMatches(matcher.Value)
for _, v := range set {
var qs []IndexQuery
qs, err = c.schema.GetReadQueriesForMetricLabelValue(from, through, userID, metricName, matcher.Name, v)
if err != nil {
break
}
queries = append(queries, qs...)
}
} else {
labelName = matcher.Name
queries, err = c.schema.GetReadQueriesForMetricLabel(from, through, userID, metricName, matcher.Name)
Expand Down Expand Up @@ -550,13 +540,34 @@ func (c *baseStore) parseIndexEntries(_ context.Context, entries []IndexEntry, m
return nil, nil
}

matchSet := map[string]struct{}{}
if matcher != nil && matcher.Type == labels.MatchRegexp {
set := FindSetMatches(matcher.Value)
for _, v := range set {
matchSet[v] = struct{}{}
}
}

result := make([]string, 0, len(entries))
for _, entry := range entries {
chunkKey, labelValue, _, err := parseChunkTimeRangeValue(entry.RangeValue, entry.Value)
if err != nil {
return nil, err
}

// If the matcher is like a set (=~"a|b|c|d|...") and
// the label value is not in that set move on.
if len(matchSet) > 0 {
if _, ok := matchSet[string(labelValue)]; !ok {
continue
}

// If its in the set, then add it to set, we don't need to run
// matcher on it again.
result = append(result, chunkKey)
continue
}

if matcher != nil && !matcher.Matches(string(labelValue)) {
continue
}
Expand Down
205 changes: 24 additions & 181 deletions pkg/chunk/chunk_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ import (
"fmt"
"math/rand"
"reflect"
"sort"
"sync"
"testing"
"time"

Expand All @@ -23,7 +21,6 @@ import (

"github.com/cortexproject/cortex/pkg/chunk/cache"
"github.com/cortexproject/cortex/pkg/chunk/encoding"
"github.com/cortexproject/cortex/pkg/querier/astmapper"
"github.com/cortexproject/cortex/pkg/util/flagext"
"github.com/cortexproject/cortex/pkg/util/validation"
)
Expand Down Expand Up @@ -507,6 +504,10 @@ func TestChunkStore_getMetricNameChunks(t *testing.T) {
`foo{bar=~"beep|baz"}`,
[]Chunk{chunk1, chunk2},
},
{
`foo{bar=~"beeping|baz"}`,
[]Chunk{chunk1},
},
{
`foo{toms="code", bar=~"beep|baz"}`,
[]Chunk{chunk1, chunk2},
Expand Down Expand Up @@ -546,177 +547,6 @@ func TestChunkStore_getMetricNameChunks(t *testing.T) {
}
}

// TestChunkStore_verifyRegexSetOptimizations tests if chunks are fetched correctly when we have the metric name
func TestChunkStore_verifyRegexSetOptimizations(t *testing.T) {
ctx := context.Background()
now := model.Now()

testCases := []struct {
query string
expect []string
}{
{
`foo`,
[]string{"foo"},
},
{
`foo{bar="baz"}`,
[]string{"foo{bar=\"baz\"}"},
},
{
`foo{bar!="baz"}`,
[]string{"foo"},
},
{
`foo{toms="code", bar="beep"}`,
[]string{"foo{bar=\"beep\"}", "foo{toms=\"code\"}"},
},
{
`foo{bar=~"beep"}`,
[]string{"foo{bar=\"beep\"}"},
},
{
`foo{bar=~"beep|baz"}`,
[]string{"foo{bar=\"baz\"}", "foo{bar=\"beep\"}"},
},
{
`foo{toms="code", bar=~"beep|baz"}`,
[]string{"foo{bar=\"baz\"}", "foo{bar=\"beep\"}", "foo{toms=\"code\"}"},
},
{
`foo{bar=~".+"}`,
[]string{"foo{bar}"},
},
}

for _, schema := range schemas {
var storeCfg StoreConfig
flagext.DefaultValues(&storeCfg)

schemaCfg := DefaultSchemaConfig("", schema, 0)
schemaObj, err := schemaCfg.Configs[0].CreateSchema()
require.NoError(t, err)

var mockSchema = &mockBaseSchema{schema: schemaObj}

switch s := schemaObj.(type) {
case StoreSchema:
schemaObj = mockStoreSchema{mockBaseSchema: mockSchema, schema: s}
case SeriesStoreSchema:
schemaObj = mockSeriesStoreSchema{mockBaseSchema: mockSchema, schema: s}
}

store := newTestChunkStoreConfigWithMockStorage(t, schemaCfg, schemaObj, storeCfg)
defer store.Stop()

from := now.Add(-time.Hour)
through := now

for _, tc := range testCases {
t.Run(fmt.Sprintf("%s / %s", tc.query, schema), func(t *testing.T) {
// reset queries for test
mockSchema.resetQueries()

t.Log("========= Running query", tc.query, "with schema", schema)
matchers, err := parser.ParseMetricSelector(tc.query)
if err != nil {
t.Fatal(err)
}

_, err = store.Get(ctx, userID, from, through, matchers...)
require.NoError(t, err)

qs := mockSchema.getQueries()
sort.Strings(qs)

if !reflect.DeepEqual(tc.expect, qs) {
t.Fatalf("%s: wrong queries - %s", tc.query, test.Diff(tc.expect, qs))
}
})
}
}
}

type mockBaseSchema struct {
schema BaseSchema

mu sync.Mutex
queries []string
}

func (m *mockBaseSchema) getQueries() []string {
m.mu.Lock()
defer m.mu.Unlock()
return m.queries
}

func (m *mockBaseSchema) resetQueries() {
m.mu.Lock()
defer m.mu.Unlock()
m.queries = nil
}

func (m *mockBaseSchema) GetReadQueriesForMetric(from, through model.Time, userID string, metricName string) ([]IndexQuery, error) {
m.mu.Lock()
m.queries = append(m.queries, metricName)
m.mu.Unlock()

return m.schema.GetReadQueriesForMetric(from, through, userID, metricName)
}

func (m *mockBaseSchema) GetReadQueriesForMetricLabel(from, through model.Time, userID string, metricName string, labelName string) ([]IndexQuery, error) {
m.mu.Lock()
m.queries = append(m.queries, fmt.Sprintf("%s{%s}", metricName, labelName))
m.mu.Unlock()

return m.schema.GetReadQueriesForMetricLabel(from, through, userID, metricName, labelName)
}

func (m *mockBaseSchema) GetReadQueriesForMetricLabelValue(from, through model.Time, userID string, metricName string, labelName string, labelValue string) ([]IndexQuery, error) {
m.mu.Lock()
m.queries = append(m.queries, fmt.Sprintf("%s{%s=%q}", metricName, labelName, labelValue))
m.mu.Unlock()
return m.schema.GetReadQueriesForMetricLabelValue(from, through, userID, metricName, labelName, labelValue)
}

func (m *mockBaseSchema) FilterReadQueries(queries []IndexQuery, shard *astmapper.ShardAnnotation) []IndexQuery {
return m.schema.FilterReadQueries(queries, shard)
}

type mockStoreSchema struct {
*mockBaseSchema
schema StoreSchema
}

func (m mockStoreSchema) GetWriteEntries(from, through model.Time, userID string, metricName string, labels labels.Labels, chunkID string) ([]IndexEntry, error) {
return m.schema.GetWriteEntries(from, through, userID, metricName, labels, chunkID)
}

type mockSeriesStoreSchema struct {
*mockBaseSchema
schema SeriesStoreSchema
}

func (m mockSeriesStoreSchema) GetCacheKeysAndLabelWriteEntries(from, through model.Time, userID string, metricName string, labels labels.Labels, chunkID string) ([]string, [][]IndexEntry, error) {
return m.schema.GetCacheKeysAndLabelWriteEntries(from, through, userID, metricName, labels, chunkID)
}

func (m mockSeriesStoreSchema) GetChunkWriteEntries(from, through model.Time, userID string, metricName string, labels labels.Labels, chunkID string) ([]IndexEntry, error) {
return m.schema.GetChunkWriteEntries(from, through, userID, metricName, labels, chunkID)
}

func (m mockSeriesStoreSchema) GetChunksForSeries(from, through model.Time, userID string, seriesID []byte) ([]IndexQuery, error) {
return m.schema.GetChunksForSeries(from, through, userID, seriesID)
}

func (m mockSeriesStoreSchema) GetLabelNamesForSeries(from, through model.Time, userID string, seriesID []byte) ([]IndexQuery, error) {
return m.schema.GetLabelNamesForSeries(from, through, userID, seriesID)
}

func (m mockSeriesStoreSchema) GetSeriesDeleteEntries(from, through model.Time, userID string, metric labels.Labels, hasChunksForIntervalFunc hasChunksForIntervalFunc) ([]IndexEntry, error) {
return m.schema.GetSeriesDeleteEntries(from, through, userID, metric, hasChunksForIntervalFunc)
}

func mustNewLabelMatcher(matchType labels.MatchType, name string, value string) *labels.Matcher {
return labels.MustNewMatcher(matchType, name, value)
}
Expand Down Expand Up @@ -1006,13 +836,13 @@ func TestStoreMaxLookBack(t *testing.T) {
require.Equal(t, now, chunks[0].Through)
}

func benchmarkParseIndexEntries(i int64, b *testing.B) {
func benchmarkParseIndexEntries(i int64, regex string, b *testing.B) {
b.ReportAllocs()
b.StopTimer()
store := &store{}
ctx := context.Background()
entries := generateIndexEntries(i)
matcher, err := labels.NewMatcher(labels.MatchRegexp, "", ".*")
matcher, err := labels.NewMatcher(labels.MatchRegexp, "", regex)
if err != nil {
b.Fatal(err)
}
Expand All @@ -1022,16 +852,29 @@ func benchmarkParseIndexEntries(i int64, b *testing.B) {
if err != nil {
b.Fatal(err)
}
if len(keys) != len(entries)/2 {
if regex == ".*" && len(keys) != len(entries)/2 {
b.Fatalf("expected keys:%d got:%d", len(entries)/2, len(keys))
}
}
}

func BenchmarkParseIndexEntries500(b *testing.B) { benchmarkParseIndexEntries(500, b) }
func BenchmarkParseIndexEntries2500(b *testing.B) { benchmarkParseIndexEntries(2500, b) }
func BenchmarkParseIndexEntries10000(b *testing.B) { benchmarkParseIndexEntries(10000, b) }
func BenchmarkParseIndexEntries50000(b *testing.B) { benchmarkParseIndexEntries(50000, b) }
func BenchmarkParseIndexEntries500(b *testing.B) { benchmarkParseIndexEntries(500, ".*", b) }
func BenchmarkParseIndexEntries2500(b *testing.B) { benchmarkParseIndexEntries(2500, ".*", b) }
func BenchmarkParseIndexEntries10000(b *testing.B) { benchmarkParseIndexEntries(10000, ".*", b) }
func BenchmarkParseIndexEntries50000(b *testing.B) { benchmarkParseIndexEntries(50000, ".*", b) }

func BenchmarkParseIndexEntriesRegexSet500(b *testing.B) {
benchmarkParseIndexEntries(500, "labelvalue0|labelvalue1|labelvalue2|labelvalue3|labelvalue600", b)
}
func BenchmarkParseIndexEntriesRegexSet2500(b *testing.B) {
benchmarkParseIndexEntries(2500, "labelvalue0|labelvalue1|labelvalue2|labelvalue3|labelvalue600", b)
}
func BenchmarkParseIndexEntriesRegexSet10000(b *testing.B) {
benchmarkParseIndexEntries(10000, "labelvalue0|labelvalue1|labelvalue2|labelvalue3|labelvalue600", b)
}
func BenchmarkParseIndexEntriesRegexSet50000(b *testing.B) {
benchmarkParseIndexEntries(50000, "labelvalue0|labelvalue1|labelvalue2|labelvalue3|labelvalue600", b)
}

func generateIndexEntries(n int64) []IndexEntry {
res := make([]IndexEntry, 0, n)
Expand Down