From 3bf7fa9f159a7c76b1bcdd640c765b333766f748 Mon Sep 17 00:00:00 2001 From: Travis Patterson Date: Wed, 28 Aug 2024 13:11:36 -0600 Subject: [PATCH] fix: sanatize structured metadata at query time (#13983) --- pkg/logql/log/pipeline.go | 22 ++++++++++++++++----- pkg/logql/log/pipeline_test.go | 36 ++++++++++++++++++++++++++++++++++ 2 files changed, 53 insertions(+), 5 deletions(-) diff --git a/pkg/logql/log/pipeline.go b/pkg/logql/log/pipeline.go index 1396e552c655..efd3acadd41e 100644 --- a/pkg/logql/log/pipeline.go +++ b/pkg/logql/log/pipeline.go @@ -6,6 +6,8 @@ import ( "sync" "unsafe" + "github.com/prometheus/prometheus/storage/remote/otlptranslator/prometheus" + "github.com/prometheus/prometheus/model/labels" ) @@ -67,7 +69,7 @@ func (n *noopPipeline) ForStream(labels labels.Labels) StreamPipeline { } n.mu.RUnlock() - sp := &noopStreamPipeline{n.baseBuilder.ForLabels(labels, h)} + sp := &noopStreamPipeline{n.baseBuilder.ForLabels(labels, h), make([]int, 0, 10)} n.mu.Lock() defer n.mu.Unlock() @@ -92,7 +94,8 @@ func IsNoopPipeline(p Pipeline) bool { } type noopStreamPipeline struct { - builder *LabelsBuilder + builder *LabelsBuilder + offsetsBuf []int } func (n noopStreamPipeline) ReferencedStructuredMetadata() bool { @@ -101,6 +104,9 @@ func (n noopStreamPipeline) ReferencedStructuredMetadata() bool { func (n noopStreamPipeline) Process(_ int64, line []byte, structuredMetadata ...labels.Label) ([]byte, LabelsResult, bool) { n.builder.Reset() + for i, lb := range structuredMetadata { + structuredMetadata[i].Name = prometheus.NormalizeLabel(lb.Name) + } n.builder.Add(StructuredMetadataLabel, structuredMetadata...) return line, n.builder.LabelsResult(), true } @@ -176,12 +182,13 @@ func NewPipeline(stages []Stage) Pipeline { } type streamPipeline struct { - stages []Stage - builder *LabelsBuilder + stages []Stage + builder *LabelsBuilder + offsetsBuf []int } func NewStreamPipeline(stages []Stage, labelsBuilder *LabelsBuilder) StreamPipeline { - return &streamPipeline{stages, labelsBuilder} + return &streamPipeline{stages, labelsBuilder, make([]int, 0, 10)} } func (p *pipeline) ForStream(labels labels.Labels) StreamPipeline { @@ -220,6 +227,11 @@ func (p *streamPipeline) ReferencedStructuredMetadata() bool { func (p *streamPipeline) Process(ts int64, line []byte, structuredMetadata ...labels.Label) ([]byte, LabelsResult, bool) { var ok bool p.builder.Reset() + + for i, lb := range structuredMetadata { + structuredMetadata[i].Name = prometheus.NormalizeLabel(lb.Name) + } + p.builder.Add(StructuredMetadataLabel, structuredMetadata...) for _, s := range p.stages { diff --git a/pkg/logql/log/pipeline_test.go b/pkg/logql/log/pipeline_test.go index ffa5df0d50b9..78a97778919f 100644 --- a/pkg/logql/log/pipeline_test.go +++ b/pkg/logql/log/pipeline_test.go @@ -56,6 +56,18 @@ func TestNoopPipeline(t *testing.T) { require.Equal(t, expectedLabelsResults.String(), lbr.String()) require.Equal(t, true, matches) + // test structured metadata with disallowed label names + structuredMetadata = append(labels.FromStrings("y", "1", "z", "2"), labels.Label{Name: "zsomething-bad", Value: "foo"}) + expectedStructuredMetadata := append(labels.FromStrings("y", "1", "z", "2"), labels.Label{Name: "zsomething_bad", Value: "foo"}) + expectedLabelsResults = append(lbs, expectedStructuredMetadata...) + + l, lbr, matches = pipeline.ForStream(lbs).Process(0, []byte(""), structuredMetadata...) + require.Equal(t, []byte(""), l) + require.Equal(t, NewLabelsResult(expectedLabelsResults.String(), expectedLabelsResults.Hash(), lbs, expectedStructuredMetadata, labels.EmptyLabels()), lbr) + require.Equal(t, expectedLabelsResults.Hash(), lbr.Hash()) + require.Equal(t, expectedLabelsResults.String(), lbr.String()) + require.Equal(t, true, matches) + pipeline.Reset() require.Len(t, pipeline.cache, 0) } @@ -171,6 +183,17 @@ func TestPipelineWithStructuredMetadata(t *testing.T) { require.Equal(t, nil, lbr) require.Equal(t, false, matches) + // test structured metadata with disallowed label names + withBadLabel := append(structuredMetadata, labels.Label{Name: "zsomething-bad", Value: "foo"}) + expectedStructuredMetadata := append(structuredMetadata, labels.Label{Name: "zsomething_bad", Value: "foo"}) + expectedLabelsResults = append(lbs, expectedStructuredMetadata...) + + _, lbr, matches = p.ForStream(lbs).Process(0, []byte(""), withBadLabel...) + require.Equal(t, NewLabelsResult(expectedLabelsResults.String(), expectedLabelsResults.Hash(), lbs, expectedStructuredMetadata, labels.EmptyLabels()), lbr) + require.Equal(t, expectedLabelsResults.Hash(), lbr.Hash()) + require.Equal(t, expectedLabelsResults.String(), lbr.String()) + require.Equal(t, true, matches) + // Reset caches p.baseBuilder.del = []string{"foo", "bar"} p.baseBuilder.add = [numValidCategories]labels.Labels{ @@ -566,6 +589,19 @@ func Benchmark_Pipeline(b *testing.B) { } }) + b.Run("pipeline bytes no invalid structured metadata", func(b *testing.B) { + b.ResetTimer() + for n := 0; n < b.N; n++ { + resLine, resLbs, resMatches = sp.Process(0, line, labels.Label{Name: "valid_name", Value: "foo"}) + } + }) + b.Run("pipeline string with invalid structured metadata", func(b *testing.B) { + b.ResetTimer() + for n := 0; n < b.N; n++ { + resLine, resLbs, resMatches = sp.Process(0, line, labels.Label{Name: "invalid-name", Value: "foo"}, labels.Label{Name: "other-invalid-name", Value: "foo"}) + } + }) + extractor, err := NewLineSampleExtractor(CountExtractor, stages, []string{"cluster", "level"}, false, false) require.NoError(b, err) ex := extractor.ForStream(lbs)