diff --git a/CHANGELOG.md b/CHANGELOG.md index 390570cd68f..1b2f7d76996 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,7 @@ * [ENHANCEMENT] Ingester: added new metric `cortex_ingester_active_series` to track active series more accurately. Also added options to control whether active series tracking is enabled (`-ingester.active-series-enabled`, defaults to false), and how often this metric is updated (`-ingester.active-series-update-period`) and max idle time for series to be considered inactive (`-ingester.active-series-idle-timeout`). #3153 * [BUGFIX] Ruler: directories in the configured `rules-path` will be removed on startup and shutdown in order to ensure they don't persist between runs. #3195 +* [BUGFIX] Handle hash-collisions in the query path. #3192 ## 1.4.0-rc.0 in progress diff --git a/integration/querier_test.go b/integration/querier_test.go index ebc80b273ca..5594e176ae5 100644 --- a/integration/querier_test.go +++ b/integration/querier_test.go @@ -619,3 +619,104 @@ func TestQuerierWithChunksStorage(t *testing.T) { assertServiceMetricsPrefixes(t, Querier, querier) assertServiceMetricsPrefixes(t, TableManager, tableManager) } + +func TestHashCollisionHandling(t *testing.T) { + s, err := e2e.NewScenario(networkName) + require.NoError(t, err) + defer s.Close() + + require.NoError(t, writeFileToSharedDir(s, cortexSchemaConfigFile, []byte(cortexSchemaConfigYaml))) + flags := mergeFlags(ChunksStorageFlags, map[string]string{}) + + // Start dependencies. + dynamo := e2edb.NewDynamoDB() + + consul := e2edb.NewConsul() + require.NoError(t, s.StartAndWaitReady(consul, dynamo)) + + tableManager := e2ecortex.NewTableManager("table-manager", ChunksStorageFlags, "") + require.NoError(t, s.StartAndWaitReady(tableManager)) + + // Wait until the first table-manager sync has completed, so that we're + // sure the tables have been created. + require.NoError(t, tableManager.WaitSumMetrics(e2e.Greater(0), "cortex_table_manager_sync_success_timestamp_seconds")) + + // Start Cortex components for the write path. + distributor := e2ecortex.NewDistributor("distributor", consul.NetworkHTTPEndpoint(), flags, "") + ingester := e2ecortex.NewIngester("ingester", consul.NetworkHTTPEndpoint(), flags, "") + require.NoError(t, s.StartAndWaitReady(distributor, ingester)) + + // Wait until the distributor has updated the ring. + require.NoError(t, distributor.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total")) + + // Push a series for each user to Cortex. + now := time.Now() + + c, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), "", "", "", "user-0") + require.NoError(t, err) + + var series []prompb.TimeSeries + var expectedVector model.Vector + // Generate two series which collide on fingerprints and fast fingerprints. + tsMillis := e2e.TimeToMilliseconds(now) + metric1 := []prompb.Label{ + {Name: "A", Value: "K6sjsNNczPl"}, + {Name: labels.MetricName, Value: "fingerprint_collision"}, + } + metric2 := []prompb.Label{ + {Name: "A", Value: "cswpLMIZpwt"}, + {Name: labels.MetricName, Value: "fingerprint_collision"}, + } + + series = append(series, prompb.TimeSeries{ + Labels: metric1, + Samples: []prompb.Sample{ + {Value: float64(0), Timestamp: tsMillis}, + }, + }) + expectedVector = append(expectedVector, &model.Sample{ + Metric: prompbLabelsToModelMetric(metric1), + Value: model.SampleValue(float64(0)), + Timestamp: model.Time(tsMillis), + }) + series = append(series, prompb.TimeSeries{ + Labels: metric2, + Samples: []prompb.Sample{ + {Value: float64(1), Timestamp: tsMillis}, + }, + }) + expectedVector = append(expectedVector, &model.Sample{ + Metric: prompbLabelsToModelMetric(metric2), + Value: model.SampleValue(float64(1)), + Timestamp: model.Time(tsMillis), + }) + + res, err := c.Push(series) + require.NoError(t, err) + require.Equal(t, 200, res.StatusCode) + + querier := e2ecortex.NewQuerier("querier", consul.NetworkHTTPEndpoint(), flags, "") + require.NoError(t, s.StartAndWaitReady(querier)) + + // Wait until the querier has updated the ring. + require.NoError(t, querier.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total")) + + // Query the series. + c, err = e2ecortex.NewClient("", querier.HTTPEndpoint(), "", "", "user-0") + require.NoError(t, err) + + result, err := c.Query("fingerprint_collision", now) + require.NoError(t, err) + require.Equal(t, model.ValVector, result.Type()) + require.Equal(t, expectedVector, result.(model.Vector)) +} + +func prompbLabelsToModelMetric(pbLabels []prompb.Label) model.Metric { + metric := model.Metric{} + + for _, l := range pbLabels { + metric[model.LabelName(l.Name)] = model.LabelValue(l.Value) + } + + return metric +} diff --git a/pkg/distributor/query.go b/pkg/distributor/query.go index a7813c6453f..abf52c9be41 100644 --- a/pkg/distributor/query.go +++ b/pkg/distributor/query.go @@ -170,32 +170,32 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSet ri return nil, err } - hashToChunkseries := map[model.Fingerprint]ingester_client.TimeSeriesChunk{} - hashToTimeSeries := map[model.Fingerprint]ingester_client.TimeSeries{} + hashToChunkseries := map[string]ingester_client.TimeSeriesChunk{} + hashToTimeSeries := map[string]ingester_client.TimeSeries{} for _, result := range results { response := result.(*ingester_client.QueryStreamResponse) // Parse any chunk series for _, series := range response.Chunkseries { - hash := client.FastFingerprint(series.Labels) - existing := hashToChunkseries[hash] + key := client.LabelsToKeyString(client.FromLabelAdaptersToLabels(series.Labels)) + existing := hashToChunkseries[key] existing.Labels = series.Labels existing.Chunks = append(existing.Chunks, series.Chunks...) - hashToChunkseries[hash] = existing + hashToChunkseries[key] = existing } // Parse any time series for _, series := range response.Timeseries { - hash := client.FastFingerprint(series.Labels) - existing := hashToTimeSeries[hash] + key := client.LabelsToKeyString(client.FromLabelAdaptersToLabels(series.Labels)) + existing := hashToTimeSeries[key] existing.Labels = series.Labels if existing.Samples == nil { existing.Samples = series.Samples } else { existing.Samples = mergeSamples(existing.Samples, series.Samples) } - hashToTimeSeries[hash] = existing + hashToTimeSeries[key] = existing } } diff --git a/pkg/ingester/client/compat.go b/pkg/ingester/client/compat.go index 6d840669efe..19d1f1b73b0 100644 --- a/pkg/ingester/client/compat.go +++ b/pkg/ingester/client/compat.go @@ -328,6 +328,16 @@ func Fingerprint(labels labels.Labels) model.Fingerprint { return model.Fingerprint(sum) } +// LabelsToKeyString is used to form a string to be used as +// the hashKey. Don't print, use l.String() for printing. +func LabelsToKeyString(l labels.Labels) string { + // We are allocating 1024, even though most series are less than 600b long. + // But this is not an issue as this function is being inlined when called in a loop + // and buffer allocated is a static buffer and not a dynamic buffer on the heap. + b := make([]byte, 0, 1024) + return string(l.Bytes(b)) +} + // MarshalJSON implements json.Marshaler. func (s Sample) MarshalJSON() ([]byte, error) { t, err := json.Marshal(model.Time(s.TimestampMs)) diff --git a/pkg/ingester/client/compat_test.go b/pkg/ingester/client/compat_test.go index a3ab479bc27..e3c8ac46088 100644 --- a/pkg/ingester/client/compat_test.go +++ b/pkg/ingester/client/compat_test.go @@ -4,6 +4,7 @@ import ( "fmt" "reflect" "sort" + "strconv" "testing" "unsafe" @@ -216,3 +217,53 @@ func verifyCollision(t *testing.T, collision bool, ls1 labels.Labels, ls2 labels t.Errorf("expected different fingerprints for %v (%016x) and %v (%016x)", ls1.String(), Fingerprint(ls1), ls2.String(), Fingerprint(ls2)) } } + +// The main usecase for `LabelsToKeyString` is to generate hashKeys +// for maps. We are benchmarking that here. +func BenchmarkSeriesMap(b *testing.B) { + benchmarkSeriesMap(100000, b) +} + +func benchmarkSeriesMap(numSeries int, b *testing.B) { + series := makeSeries(numSeries) + sm := make(map[string]int, numSeries) + + b.ReportAllocs() + b.ResetTimer() + for n := 0; n < b.N; n++ { + for i, s := range series { + sm[LabelsToKeyString(s)] = i + } + + for _, s := range series { + _, ok := sm[LabelsToKeyString(s)] + if !ok { + b.Fatal("element missing") + } + } + + if len(sm) != numSeries { + b.Fatal("the number of series expected:", numSeries, "got:", len(sm)) + } + } +} + +func makeSeries(n int) []labels.Labels { + series := make([]labels.Labels, 0, n) + for i := 0; i < n; i++ { + series = append(series, labels.FromMap(map[string]string{ + "label0": "value0", + "label1": "value1", + "label2": "value2", + "label3": "value3", + "label4": "value4", + "label5": "value5", + "label6": "value6", + "label7": "value7", + "label8": "value8", + "label9": strconv.Itoa(i), + })) + } + + return series +} diff --git a/pkg/querier/chunk_store_queryable.go b/pkg/querier/chunk_store_queryable.go index b36db0a2429..da1170005d7 100644 --- a/pkg/querier/chunk_store_queryable.go +++ b/pkg/querier/chunk_store_queryable.go @@ -53,10 +53,10 @@ func (q *chunkStoreQuerier) Select(_ bool, sp *storage.SelectHints, matchers ... // Series in the returned set are sorted alphabetically by labels. func partitionChunks(chunks []chunk.Chunk, mint, maxt int64, iteratorFunc chunkIteratorFunc) storage.SeriesSet { - chunksBySeries := map[model.Fingerprint][]chunk.Chunk{} + chunksBySeries := map[string][]chunk.Chunk{} for _, c := range chunks { - fp := client.Fingerprint(c.Metric) - chunksBySeries[fp] = append(chunksBySeries[fp], c) + key := client.LabelsToKeyString(c.Metric) + chunksBySeries[key] = append(chunksBySeries[key], c) } series := make([]storage.Series, 0, len(chunksBySeries))