diff --git a/docs/sources/reference/cli/run.md b/docs/sources/reference/cli/run.md index f1fca370831..e5d6f048b1b 100644 --- a/docs/sources/reference/cli/run.md +++ b/docs/sources/reference/cli/run.md @@ -66,19 +66,28 @@ The following flags are supported: * `--stability.level`: The minimum permitted stability level of functionality. Supported values: `experimental`, `public-preview`, and `generally-available` (default `"generally-available"`). * `--feature.community-components.enabled`: Enable community components (default `false`). * `--feature.component-shutdown-deadline`: Maximum duration to wait for a component to shut down before giving up and logging an error (default `"10m"`). +* `--feature.prometheus.direct-fanout.enabled`: Enable experimental direct fanout for metric forwarding without a global label store. * `--windows.priority`: The priority to set for the {{< param "PRODUCT_NAME" >}} process when running on Windows. This is only available on Windows. Supported values: `above_normal`, `below_normal`, `normal`, `high`, `idle`, or `realtime` (default `"normal"`). {{< admonition type="note" >}} -The `--windows.priority` flag is in [Public preview][] and is not covered by {{< param "FULL_PRODUCT_NAME" >}} [backward compatibility][] guarantees. +The `--feature.prometheus.direct-fanout.enabled` flag is an [experimental][] feature. +Experimental features are subject to frequent breaking changes, and may be removed with no equivalent replacement. +To enable and use an experimental feature, you must set the `stability.level` [flag](#permitted-stability-levels) to `experimental`. -### Deprecated flags +[experimental]: https://grafana.com/docs/release-life-cycle/ +{{< /admonition >}} -* `--feature.prometheus.metric-validation-scheme`: This flag is deprecated and has no effect. You can configure the metric validation scheme individually for each `prometheus.scrape` component in your {{< param "PRODUCT_NAME" >}} configuration file. +{{< admonition type="note" >}} +The `--windows.priority` flag is in [Public preview][] and is not covered by {{< param "FULL_PRODUCT_NAME" >}} [backward compatibility][] guarantees. [Public preview]: https://grafana.com/docs/release-life-cycle/ [backward compatibility]: ../../../introduction/backward-compatibility/ {{< /admonition >}} +### Deprecated flags + +* `--feature.prometheus.metric-validation-scheme`: This flag is deprecated and has no effect. You can configure the metric validation scheme individually for each `prometheus.scrape` component in your {{< param "PRODUCT_NAME" >}} configuration file. + ## Update the configuration file The configuration file can be reloaded from disk by either: diff --git a/internal/alloycli/cmd_run.go b/internal/alloycli/cmd_run.go index 217c81b962d..1d7d36b1da8 100644 --- a/internal/alloycli/cmd_run.go +++ b/internal/alloycli/cmd_run.go @@ -19,7 +19,6 @@ import ( "github.com/fatih/color" "github.com/go-kit/log" - "github.com/grafana/alloy/internal/util" "github.com/grafana/ckit/advertise" "github.com/grafana/ckit/peer" "github.com/prometheus/client_golang/prometheus" @@ -28,6 +27,8 @@ import ( "go.opentelemetry.io/otel" "golang.org/x/exp/maps" + "github.com/grafana/alloy/internal/util" + "github.com/grafana/alloy/internal/alloyseed" "github.com/grafana/alloy/internal/boringcrypto" "github.com/grafana/alloy/internal/component" @@ -170,6 +171,7 @@ depending on the nature of the reload error. cmd.Flags().StringVar(&r.windowsPriority, "windows.priority", r.windowsPriority, fmt.Sprintf("Process priority to use when running on windows. This flag is currently in public preview. Supported values: %s", strings.Join(slices.Collect(windowspriority.PriorityValues()), ", "))) } cmd.Flags().DurationVar(&r.taskShutdownDeadline, "feature.component-shutdown-deadline", r.taskShutdownDeadline, "Maximum duration to wait for a component to shut down before giving up and logging an error") + cmd.Flags().BoolVar(&r.enableDirectFanout, "feature.prometheus.direct-fanout.enabled", r.enableDirectFanout, "Enable experimental direct fanout for metric forwarding without a global label store") addDeprecatedFlags(cmd) return cmd @@ -184,6 +186,7 @@ type alloyRun struct { enablePprof bool disableReporting bool clusterEnabled bool + enableDirectFanout bool clusterNodeName string clusterAdvAddr string clusterJoinAddr string @@ -208,6 +211,18 @@ type alloyRun struct { taskShutdownDeadline time.Duration } +func (fr *alloyRun) checkExperimentalFlags() error { + if fr.minStability.Permits(featuregate.StabilityExperimental) { + return nil + } + + if fr.enableDirectFanout { + return fmt.Errorf("the '--feature.prometheus.direct-fanout.enabled' can be used only at experimental stability level") + } + + return nil +} + func (fr *alloyRun) Run(cmd *cobra.Command, configPath string) error { var wg sync.WaitGroup defer wg.Wait() @@ -219,6 +234,10 @@ func (fr *alloyRun) Run(cmd *cobra.Command, configPath string) error { return fmt.Errorf("path argument not provided") } + if err := fr.checkExperimentalFlags(); err != nil { + return err + } + // Buffer logs until log format has been determined l, err := logging.NewDeferred(os.Stderr) if err != nil { @@ -370,7 +389,11 @@ func (fr *alloyRun) Run(cmd *cobra.Command, configPath string) error { return fmt.Errorf("failed to create otel service") } - labelService := labelstore.New(l, reg) + if fr.enableDirectFanout { + level.Info(l).Log("msg", "global label store is disabled") + } + + labelService := labelstore.New(l, reg, !fr.enableDirectFanout) alloyseed.Init(fr.storagePath, l) f, err := alloy_runtime.New(alloy_runtime.Options{ diff --git a/internal/component/otelcol/exporter/prometheus/prometheus.go b/internal/component/otelcol/exporter/prometheus/prometheus.go index e4887d6ea73..9c0ca39dec2 100644 --- a/internal/component/otelcol/exporter/prometheus/prometheus.go +++ b/internal/component/otelcol/exporter/prometheus/prometheus.go @@ -8,6 +8,8 @@ import ( "time" "github.com/go-kit/log" + "github.com/prometheus/prometheus/storage" + "github.com/grafana/alloy/internal/component" "github.com/grafana/alloy/internal/component/otelcol" "github.com/grafana/alloy/internal/component/otelcol/exporter/prometheus/internal/convert" @@ -15,7 +17,6 @@ import ( "github.com/grafana/alloy/internal/component/prometheus" "github.com/grafana/alloy/internal/featuregate" "github.com/grafana/alloy/internal/service/labelstore" - "github.com/prometheus/prometheus/storage" ) func init() { @@ -116,6 +117,8 @@ func New(o component.Options, c Arguments) (*Component, error) { // Run implements Component. func (c *Component) Run(ctx context.Context) error { + defer c.fanout.Clear() + for { select { case <-ctx.Done(): diff --git a/internal/component/prometheus/appenders/new.go b/internal/component/prometheus/appenders/new.go new file mode 100644 index 00000000000..77462369ea7 --- /dev/null +++ b/internal/component/prometheus/appenders/new.go @@ -0,0 +1,21 @@ +package appenders + +import ( + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/prometheus/storage" +) + +// New returns an appropriate appender based on the number of children. +func New(children []storage.Appender, store *SeriesRefMappingStore, deadRefThreshold storage.SeriesRef, writeLatency prometheus.Histogram, samplesForwarded prometheus.Counter) storage.Appender { + // No destination, no work to do. + if len(children) == 0 { + return Noop{} + } + + // Single destination, no need to fanout. + if len(children) == 1 { + return NewPassthrough(children[0], deadRefThreshold, writeLatency, samplesForwarded) + } + + return NewSeriesRefMapping(children, store, writeLatency, samplesForwarded) +} diff --git a/internal/component/prometheus/appenders/new_test.go b/internal/component/prometheus/appenders/new_test.go new file mode 100644 index 00000000000..a58b55f9a51 --- /dev/null +++ b/internal/component/prometheus/appenders/new_test.go @@ -0,0 +1,63 @@ +package appenders + +import ( + "testing" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/storage" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestNew_NoChildrenReturnsNoop(t *testing.T) { + app := New(nil, nil, 0, nil, nil) + + _, ok := app.(Noop) + assert.True(t, ok, "expected Noop appender for zero children") +} + +func TestNew_SingleChildReturnsPassthrough(t *testing.T) { + child := &mockAppender{} + app := New([]storage.Appender{child}, nil, 0, nil, nil) + + _, ok := app.(*passthrough) + assert.True(t, ok, "expected passthrough appender for single child") +} + +func TestNew_MultipleChildrenReturnsSeriesRefMapping(t *testing.T) { + store := NewSeriesRefMappingStore(nil) + t.Cleanup(func() { store.Clear() }) + + child1 := &mockAppender{} + child2 := &mockAppender{} + app := New([]storage.Appender{child1, child2}, store, 0, nil, nil) + + _, ok := app.(*seriesRefMapping) + assert.True(t, ok, "expected seriesRefMapping appender for multiple children") +} + +func TestNew_PassthroughReceivesDeadRefThreshold(t *testing.T) { + store := NewSeriesRefMappingStore(nil) + + // Issue a mapping so nextUniqueRef advances past 1. + lbls := labels.FromStrings("job", "test") + store.CreateMapping([]storage.SeriesRef{100, 200}, lbls) + + // Clear advances firstRefOfCurrentGeneration to the current nextUniqueRef. + threshold := store.Clear() + require.Greater(t, uint64(threshold), uint64(0)) + + sf := prometheus.NewCounter(prometheus.CounterOpts{Name: "test_forwarded", Help: "test"}) + child := &mockAppender{appendFn: func(ref storage.SeriesRef, _ labels.Labels, _ int64, _ float64) (storage.SeriesRef, error) { + return ref, nil // echo back whatever ref we receive + }} + app := New([]storage.Appender{child}, store, threshold, nil, sf) + + // A ref below the threshold must be zeroed by the passthrough. + staleRef := threshold - 1 + _, err := app.Append(staleRef, lbls, 1, 1.0) + require.NoError(t, err) + require.Equal(t, storage.SeriesRef(0), child.appendRefs[0], + "passthrough must zero refs below the dead ref threshold") +} diff --git a/internal/component/prometheus/appenders/noop.go b/internal/component/prometheus/appenders/noop.go new file mode 100644 index 00000000000..97fdbbf46e9 --- /dev/null +++ b/internal/component/prometheus/appenders/noop.go @@ -0,0 +1,53 @@ +package appenders + +import ( + "context" + + "github.com/prometheus/prometheus/model/exemplar" + "github.com/prometheus/prometheus/model/histogram" + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/model/metadata" + "github.com/prometheus/prometheus/storage" +) + +type Noop struct { +} + +func (n Noop) Appender(_ context.Context) storage.Appender { + return n +} + +func (n Noop) Append(ref storage.SeriesRef, _ labels.Labels, _ int64, _ float64) (storage.SeriesRef, error) { + return ref, nil +} + +func (n Noop) Commit() error { + return nil +} + +func (n Noop) Rollback() error { + return nil +} + +func (n Noop) SetOptions(_ *storage.AppendOptions) { +} + +func (n Noop) AppendExemplar(ref storage.SeriesRef, _ labels.Labels, _ exemplar.Exemplar) (storage.SeriesRef, error) { + return ref, nil +} + +func (n Noop) AppendHistogram(ref storage.SeriesRef, _ labels.Labels, _ int64, _ *histogram.Histogram, _ *histogram.FloatHistogram) (storage.SeriesRef, error) { + return ref, nil +} + +func (n Noop) AppendHistogramSTZeroSample(ref storage.SeriesRef, _ labels.Labels, _, _ int64, _ *histogram.Histogram, _ *histogram.FloatHistogram) (storage.SeriesRef, error) { + return ref, nil +} + +func (n Noop) UpdateMetadata(ref storage.SeriesRef, _ labels.Labels, _ metadata.Metadata) (storage.SeriesRef, error) { + return ref, nil +} + +func (n Noop) AppendSTZeroSample(ref storage.SeriesRef, _ labels.Labels, _, _ int64) (storage.SeriesRef, error) { + return ref, nil +} diff --git a/internal/component/prometheus/appenders/passthrough.go b/internal/component/prometheus/appenders/passthrough.go new file mode 100644 index 00000000000..709a470ab9b --- /dev/null +++ b/internal/component/prometheus/appenders/passthrough.go @@ -0,0 +1,111 @@ +package appenders + +import ( + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/prometheus/model/exemplar" + "github.com/prometheus/prometheus/model/histogram" + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/model/metadata" + "github.com/prometheus/prometheus/storage" +) + +type passthrough struct { + wrapping storage.Appender + start time.Time + writeLatency prometheus.Histogram + samplesForwarded prometheus.Counter + // deadRefThreshold marks the boundary of the current ref generation. Any incoming + // ref below this value is from a previous generation and meaningless to this child; + // it must be zeroed so the child allocates a fresh ref. + deadRefThreshold storage.SeriesRef +} + +func NewPassthrough(wrapping storage.Appender, deadRefThreshold storage.SeriesRef, writeLatency prometheus.Histogram, samplesForwarded prometheus.Counter) storage.Appender { + return &passthrough{ + wrapping: wrapping, + deadRefThreshold: deadRefThreshold, + writeLatency: writeLatency, + samplesForwarded: samplesForwarded, + } +} + +// sanitizeRef zeros ref if it is from a previous generation. +func (p *passthrough) sanitizeRef(ref storage.SeriesRef) storage.SeriesRef { + if ref != 0 && ref < p.deadRefThreshold { + return 0 + } + return ref +} + +func (p *passthrough) Commit() error { + defer p.recordLatency() + return p.wrapping.Commit() +} + +func (p *passthrough) Rollback() error { + defer p.recordLatency() + return p.wrapping.Rollback() +} + +func (p *passthrough) recordLatency() { + if p.start.IsZero() { + return + } + duration := time.Since(p.start) + p.writeLatency.Observe(duration.Seconds()) +} + +func (p *passthrough) SetOptions(opts *storage.AppendOptions) { + p.wrapping.SetOptions(opts) +} + +func (p *passthrough) Append(ref storage.SeriesRef, l labels.Labels, t int64, v float64) (storage.SeriesRef, error) { + if p.start.IsZero() { + p.start = time.Now() + } + + ref, err := p.wrapping.Append(p.sanitizeRef(ref), l, t, v) + + if err == nil { + p.samplesForwarded.Inc() + } + + return ref, err +} + +func (p *passthrough) AppendExemplar(ref storage.SeriesRef, l labels.Labels, e exemplar.Exemplar) (storage.SeriesRef, error) { + if p.start.IsZero() { + p.start = time.Now() + } + return p.wrapping.AppendExemplar(p.sanitizeRef(ref), l, e) +} + +func (p *passthrough) AppendHistogram(ref storage.SeriesRef, l labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) { + if p.start.IsZero() { + p.start = time.Now() + } + return p.wrapping.AppendHistogram(p.sanitizeRef(ref), l, t, h, fh) +} + +func (p *passthrough) AppendHistogramSTZeroSample(ref storage.SeriesRef, l labels.Labels, t, st int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) { + if p.start.IsZero() { + p.start = time.Now() + } + return p.wrapping.AppendHistogramSTZeroSample(p.sanitizeRef(ref), l, t, st, h, fh) +} + +func (p *passthrough) UpdateMetadata(ref storage.SeriesRef, l labels.Labels, m metadata.Metadata) (storage.SeriesRef, error) { + if p.start.IsZero() { + p.start = time.Now() + } + return p.wrapping.UpdateMetadata(p.sanitizeRef(ref), l, m) +} + +func (p *passthrough) AppendSTZeroSample(ref storage.SeriesRef, l labels.Labels, t, st int64) (storage.SeriesRef, error) { + if p.start.IsZero() { + p.start = time.Now() + } + return p.wrapping.AppendSTZeroSample(p.sanitizeRef(ref), l, t, st) +} diff --git a/internal/component/prometheus/appenders/passthrough_test.go b/internal/component/prometheus/appenders/passthrough_test.go new file mode 100644 index 00000000000..b79bd08a951 --- /dev/null +++ b/internal/component/prometheus/appenders/passthrough_test.go @@ -0,0 +1,245 @@ +package appenders + +import ( + "errors" + "strings" + "testing" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/testutil" + "github.com/prometheus/prometheus/model/exemplar" + "github.com/prometheus/prometheus/model/histogram" + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/model/metadata" + "github.com/prometheus/prometheus/storage" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/grafana/alloy/internal/util/testappender" +) + +func TestPassthrough_Append(t *testing.T) { + collecting := testappender.NewCollectingAppender() + samplesForwarded := prometheus.NewCounter(prometheus.CounterOpts{ + Name: "test_samples_forwarded", + Help: "Test samples forwarded", + }) + a := NewPassthrough(collecting, 0, nil, samplesForwarded) + + testLabels := labels.FromStrings("metric", "test_metric", "job", "test_job") + timestamp := time.Now().UnixMilli() + value := 42.0 + + // Test Append + ref, err := a.Append(storage.SeriesRef(100), testLabels, timestamp, value) + require.NoError(t, err) + assert.Equal(t, storage.SeriesRef(100), ref) + + samples := collecting.CollectedSamples() + require.Len(t, samples, 1) + + sample := samples[testLabels.String()] + require.NotNil(t, sample) + assert.Equal(t, timestamp, sample.Timestamp) + assert.Equal(t, value, sample.Value) + assert.Equal(t, testLabels, sample.Labels) + + expected := ` + # HELP test_samples_forwarded Test samples forwarded + # TYPE test_samples_forwarded counter + test_samples_forwarded 1 + ` + err = testutil.CollectAndCompare(samplesForwarded, strings.NewReader(expected), "test_samples_forwarded") + require.NoError(t, err) +} + +func TestPassthrough_AppendError(t *testing.T) { + // Create a failing appender + failingAppender := &failingAppender{} + + samplesForwarded := prometheus.NewCounter(prometheus.CounterOpts{ + Name: "test_samples_forwarded", + Help: "Test samples forwarded", + }) + a := NewPassthrough(failingAppender, 0, nil, samplesForwarded) + + testLabels := labels.FromStrings("metric", "test_metric") + timestamp := time.Now().UnixMilli() + value := 42.0 + + _, err := a.Append(storage.SeriesRef(100), testLabels, timestamp, value) + require.Error(t, err) + + expected := ` + # HELP test_samples_forwarded Test samples forwarded + # TYPE test_samples_forwarded counter + test_samples_forwarded 0 + ` + err = testutil.CollectAndCompare(samplesForwarded, strings.NewReader(expected), "test_samples_forwarded") + require.NoError(t, err) +} + +func TestPassthrough_AppendHistogram(t *testing.T) { + collecting := testappender.NewCollectingAppender() + + a := NewPassthrough(collecting, 0, nil, nil) + + testLabels := labels.FromStrings("histogram", "test_histogram") + timestamp := time.Now().UnixMilli() + + // Create a test histogram + h := &histogram.Histogram{ + Count: 10, + Sum: 100.5, + Schema: 1, + } + + // Test AppendHistogram + ref, err := a.AppendHistogram(storage.SeriesRef(200), testLabels, timestamp, h, nil) + require.NoError(t, err) + assert.Equal(t, storage.SeriesRef(200), ref) + + // Verify histogram was forwarded + histograms := collecting.CollectedHistograms() + require.Len(t, histograms, 1) + + histSample := histograms[testLabels.String()] + require.NotNil(t, histSample) + assert.Equal(t, timestamp, histSample.Timestamp) + assert.Equal(t, testLabels, histSample.Labels) + assert.Equal(t, h, histSample.Histogram) + assert.Nil(t, histSample.FloatHistogram) +} + +func TestPassthrough_UpdateMetadata(t *testing.T) { + collecting := testappender.NewCollectingAppender() + + a := NewPassthrough(collecting, 0, nil, nil) + + testLabels := labels.FromStrings("metric", "test_metric") + testMetadata := metadata.Metadata{ + Type: "counter", + Help: "Test counter metric", + Unit: "seconds", + } + + // Test UpdateMetadata + ref, err := a.UpdateMetadata(storage.SeriesRef(300), testLabels, testMetadata) + require.NoError(t, err) + assert.Equal(t, storage.SeriesRef(300), ref) + + // Verify metadata was forwarded + m := collecting.CollectedMetadata() + require.Len(t, m, 1) + + metadataEntry := m[testLabels.String()] + assert.Equal(t, testMetadata, metadataEntry) +} + +func TestPassthrough_Commit(t *testing.T) { + collecting := testappender.NewCollectingAppender() + + samplesForwarded := prometheus.NewCounter(prometheus.CounterOpts{ + Name: "test_samples_forwarded", + Help: "Test samples forwarded", + }) + writeLatency := prometheus.NewHistogram(prometheus.HistogramOpts{ + Name: "test_write_latency", + Help: "Test write latency", + }) + a := NewPassthrough(collecting, 0, writeLatency, samplesForwarded) + + testLabels := labels.FromStrings("metric", "test_metric") + timestamp := time.Now().UnixMilli() + _, err := a.Append(storage.SeriesRef(100), testLabels, timestamp, 42.0) + require.NoError(t, err) + + err = a.Commit() + require.NoError(t, err) + + // Verify histogram recorded exactly one observation + count := testutil.CollectAndCount(writeLatency, "test_write_latency") + assert.Equal(t, 1, count, "should have recorded one latency observation") +} + +func TestPassthrough_Rollback(t *testing.T) { + collecting := testappender.NewCollectingAppender() + + samplesForwarded := prometheus.NewCounter(prometheus.CounterOpts{ + Name: "test_samples_forwarded", + Help: "Test samples forwarded", + }) + writeLatency := prometheus.NewHistogram(prometheus.HistogramOpts{ + Name: "test_write_latency", + Help: "Test write latency", + }) + a := NewPassthrough(collecting, 0, writeLatency, samplesForwarded) + + testLabels := labels.FromStrings("metric", "test_metric") + timestamp := time.Now().UnixMilli() + _, err := a.Append(storage.SeriesRef(100), testLabels, timestamp, 42.0) + require.NoError(t, err) + + err = a.Rollback() + require.NoError(t, err) + + // Verify histogram recorded exactly one observation + count := testutil.CollectAndCount(writeLatency, "test_write_latency") + assert.Equal(t, 1, count, "should have recorded one latency observation") +} + +func TestPassthrough_AppendExemplar(t *testing.T) { + collecting := testappender.NewCollectingAppender() + + a := NewPassthrough(collecting, 0, nil, nil) + + testLabels := labels.FromStrings("metric", "test_metric") + testExemplar := exemplar.Exemplar{ + Labels: labels.FromStrings("trace_id", "12345"), + Value: 1.0, + Ts: time.Now().UnixMilli(), + } + + // Test AppendExemplar - this will panic in collectingappender but we test that passthrough forwards it + require.Panics(t, func() { + _, _ = a.AppendExemplar(storage.SeriesRef(400), testLabels, testExemplar) + }) +} + +// failingAppender is a test appender that always returns an error on Append +type failingAppender struct{} + +func (f *failingAppender) Append(ref storage.SeriesRef, _ labels.Labels, _ int64, _ float64) (storage.SeriesRef, error) { + return ref, errors.New("append failed") +} + +func (f *failingAppender) Commit() error { + return nil +} + +func (f *failingAppender) Rollback() error { + return nil +} + +func (f *failingAppender) SetOptions(*storage.AppendOptions) {} + +func (f *failingAppender) AppendExemplar(ref storage.SeriesRef, _ labels.Labels, _ exemplar.Exemplar) (storage.SeriesRef, error) { + return ref, errors.New("append exemplar failed") +} + +func (f *failingAppender) AppendHistogram(ref storage.SeriesRef, _ labels.Labels, _ int64, _ *histogram.Histogram, _ *histogram.FloatHistogram) (storage.SeriesRef, error) { + return ref, errors.New("append histogram failed") +} + +func (f *failingAppender) AppendHistogramSTZeroSample(ref storage.SeriesRef, _ labels.Labels, _, _ int64, _ *histogram.Histogram, _ *histogram.FloatHistogram) (storage.SeriesRef, error) { + return ref, errors.New("append histogram st zero failed") +} + +func (f *failingAppender) UpdateMetadata(ref storage.SeriesRef, _ labels.Labels, _ metadata.Metadata) (storage.SeriesRef, error) { + return ref, errors.New("update metadata failed") +} + +func (f *failingAppender) AppendSTZeroSample(ref storage.SeriesRef, _ labels.Labels, _, _ int64) (storage.SeriesRef, error) { + return ref, errors.New("append st zero failed") +} diff --git a/internal/component/prometheus/appenders/seriesrefmapping.go b/internal/component/prometheus/appenders/seriesrefmapping.go new file mode 100644 index 00000000000..21ca24c98ea --- /dev/null +++ b/internal/component/prometheus/appenders/seriesrefmapping.go @@ -0,0 +1,528 @@ +package appenders + +import ( + "sync" + "time" + + "github.com/hashicorp/go-multierror" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/prometheus/model/exemplar" + "github.com/prometheus/prometheus/model/histogram" + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/model/metadata" + "github.com/prometheus/prometheus/storage" + "go.uber.org/atomic" +) + +type MappingStore interface { + GetMapping(uniqueRef storage.SeriesRef, lbls labels.Labels) []storage.SeriesRef + CreateMapping(refResults []storage.SeriesRef, lbls labels.Labels) storage.SeriesRef + UpdateMapping(uniqueRef storage.SeriesRef, refResults []storage.SeriesRef, lbls labels.Labels) + TrackAppendedSeries(ts int64, cell *Cell) + GetCellForAppendedSeries() *Cell +} + +type seriesRefMapping struct { + start time.Time + children []storage.Appender + store MappingStore + + uniqueRefCell *Cell + + // childRefs is reused for each append call to avoid allocations. This is safe because storage.Appender should never + // have concurrent calls to Append methods. + childRefs []storage.SeriesRef + writeLatency prometheus.Histogram + samplesForwarded prometheus.Counter +} + +func NewSeriesRefMapping(children []storage.Appender, store MappingStore, writeLatency prometheus.Histogram, samplesForwarded prometheus.Counter) storage.Appender { + uniqueRefCell := store.GetCellForAppendedSeries() + + return &seriesRefMapping{ + children: children, + store: store, + writeLatency: writeLatency, + samplesForwarded: samplesForwarded, + + uniqueRefCell: uniqueRefCell, + childRefs: make([]storage.SeriesRef, 0, len(children)), + } +} + +func (s *seriesRefMapping) SetOptions(opts *storage.AppendOptions) { + for _, c := range s.children { + c.SetOptions(opts) + } +} + +func (s *seriesRefMapping) Commit() error { + defer s.recordLatency() + + s.store.TrackAppendedSeries(time.Now().Unix(), s.uniqueRefCell) + + var multiErr error + for _, c := range s.children { + err := c.Commit() + if err != nil { + multiErr = multierror.Append(multiErr, err) + } + } + return multiErr +} + +func (s *seriesRefMapping) Rollback() error { + defer s.recordLatency() + + // We still track rolled back series so we can properly + // clean up any series that was appended + s.store.TrackAppendedSeries(time.Now().Unix(), s.uniqueRefCell) + + var multiErr error + for _, c := range s.children { + err := c.Rollback() + if err != nil { + multiErr = multierror.Append(multiErr, err) + } + } + return multiErr +} + +func (s *seriesRefMapping) recordLatency() { + if s.start.IsZero() { + return + } + + duration := time.Since(s.start) + s.writeLatency.Observe(duration.Seconds()) +} + +func (s *seriesRefMapping) resetFields() { + // Reset childRefs slice length to 0 for reuse + s.childRefs = s.childRefs[:0] +} + +func (s *seriesRefMapping) Append(ref storage.SeriesRef, l labels.Labels, t int64, v float64) (storage.SeriesRef, error) { + return s.appendToChildren(ref, l, func(appender storage.Appender, ref storage.SeriesRef) (storage.SeriesRef, error) { + newRef, err := appender.Append(ref, l, t, v) + if err == nil { + s.samplesForwarded.Inc() + } + return newRef, err + }) +} + +func (s *seriesRefMapping) AppendExemplar(ref storage.SeriesRef, l labels.Labels, e exemplar.Exemplar) (storage.SeriesRef, error) { + return s.appendToChildren(ref, l, func(appender storage.Appender, ref storage.SeriesRef) (storage.SeriesRef, error) { + return appender.AppendExemplar(ref, l, e) + }) +} + +func (s *seriesRefMapping) AppendHistogram(ref storage.SeriesRef, l labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) { + return s.appendToChildren(ref, l, func(appender storage.Appender, ref storage.SeriesRef) (storage.SeriesRef, error) { + return appender.AppendHistogram(ref, l, t, h, fh) + }) +} + +func (s *seriesRefMapping) AppendHistogramSTZeroSample(ref storage.SeriesRef, l labels.Labels, t, st int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) { + return s.appendToChildren(ref, l, func(appender storage.Appender, ref storage.SeriesRef) (storage.SeriesRef, error) { + return appender.AppendHistogramSTZeroSample(ref, l, t, st, h, fh) + }) +} + +func (s *seriesRefMapping) UpdateMetadata(ref storage.SeriesRef, l labels.Labels, m metadata.Metadata) (storage.SeriesRef, error) { + return s.appendToChildren(ref, l, func(appender storage.Appender, ref storage.SeriesRef) (storage.SeriesRef, error) { + return appender.UpdateMetadata(ref, l, m) + }) +} + +func (s *seriesRefMapping) AppendSTZeroSample(ref storage.SeriesRef, l labels.Labels, t, st int64) (storage.SeriesRef, error) { + return s.appendToChildren(ref, l, func(appender storage.Appender, ref storage.SeriesRef) (storage.SeriesRef, error) { + return appender.AppendSTZeroSample(ref, l, t, st) + }) +} + +type appenderFunc func(appender storage.Appender, ref storage.SeriesRef) (storage.SeriesRef, error) + +func (s *seriesRefMapping) appendToChildren(ref storage.SeriesRef, lbls labels.Labels, af appenderFunc) (storage.SeriesRef, error) { + defer s.resetFields() + + if s.start.IsZero() { + s.start = time.Now() + } + + // Check if the incoming ref has ref mappings + existingChildRefs := s.store.GetMapping(ref, lbls) + + var appendErr error + + // Sanity check: if we have existing child refs, they must match the number of children + if existingChildRefs != nil && len(existingChildRefs) == len(s.children) { + s.uniqueRefCell.Refs = append(s.uniqueRefCell.Refs, ref) + + refUpdateRequired := false + for childIndex, childRef := range existingChildRefs { + newChildRef, err := af(s.children[childIndex], childRef) + if err != nil { + appendErr = multierror.Append(appendErr, err) + } + + if newChildRef != childRef { + refUpdateRequired = true + } + + // Track refs in the local reuse buffer instead of mutating the shared mapping slice. + s.childRefs = append(s.childRefs, newChildRef) + } + + if appendErr != nil { + return 0, appendErr + } + + if refUpdateRequired { + s.store.UpdateMapping(ref, s.childRefs, lbls) + } + + return ref, nil + } + + // No existing mapping, proceed with normal append to all children. + var nonZeroCount int + var nonZeroRef storage.SeriesRef + for _, child := range s.children { + childRef, err := af(child, ref) + if err != nil { + appendErr = multierror.Append(appendErr, err) + } + + s.childRefs = append(s.childRefs, childRef) + if childRef != 0 { + nonZeroCount++ + nonZeroRef = childRef + } + } + + if appendErr != nil { + return 0, appendErr + } + + if nonZeroCount == 0 { + // All children returned ref 0, so return the input ref + return ref, nil + } + + if nonZeroCount == 1 { + // Only one child allocated a ref; return it directly — no mapping needed. + return nonZeroRef, nil + } + + uniqueRef := s.store.CreateMapping(s.childRefs, lbls) + s.uniqueRefCell.Refs = append(s.uniqueRefCell.Refs, uniqueRef) + return uniqueRef, nil +} + +type uniqRefChildren struct { + childRefs []storage.SeriesRef + labelHash uint64 +} + +type SeriesRefMappingStore struct { + // refMappingMu protects uniqueRefToChildRefs, labelHashToUniqueRef and nextUniqueRef + refMappingMu sync.RWMutex + // uniqueRefToChildRefs maps the unique ref to the expected child ref in order + uniqueRefToChildRefs map[storage.SeriesRef]uniqRefChildren + // labelHashToUniqueRef maps the label hash to unique ref. + labelHashToUniqueRef map[uint64]storage.SeriesRef + + // nextUniqueRef is the next ref ID we will hand out + nextUniqueRef storage.SeriesRef + // firstRefOfCurrentGeneration is the first ref issued after the last Clear(). Any ref + // below this value is from a previous generation and must be treated as unmapped. + firstRefOfCurrentGeneration storage.SeriesRef + + // timestampTrackingMu protects uniqueRefTimestamps and cellPool + timestampTrackingMu sync.Mutex + // uniqueRefTimestamps maps unique refs to their last append timestamp + uniqueRefTimestamps map[storage.SeriesRef]int64 + // cellPool is used to pool slices of SeriesRefs used for tracking unique refs in TrackAppendedSeries. + cellPool sync.Pool + + // Cleanup goroutine coordination (no lock required) + startRefCleanup sync.Once + cleanupStarted atomic.Bool + stopCleanup chan struct{} + cleanupStopped chan struct{} + + // Metrics (safe for concurrent access, no lock required) + activeMappings prometheus.Gauge + trackedRefs prometheus.Gauge + refsCleaned prometheus.Counter + uniqueRefsTotal prometheus.Counter +} + +func NewSeriesRefMappingStore(reg prometheus.Registerer) *SeriesRefMappingStore { + activeMappings := prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "alloy_fanout_mapping_store_mappings_total", + Help: "Number of active unique ref mappings in the store.", + }) + trackedRefs := prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "alloy_fanout_mapping_store_tracked_refs_total", + Help: "Number of refs being tracked for timestamp-based cleanup.", + }) + refsCleaned := prometheus.NewCounter(prometheus.CounterOpts{ + Name: "alloy_fanout_mapping_store_refs_cleaned_total", + Help: "Total number of stale refs cleaned up over time.", + }) + uniqueRefsTotal := prometheus.NewCounter(prometheus.CounterOpts{ + Name: "alloy_fanout_mapping_store_unique_refs_created_total", + Help: "Total number of unique refs created.", + }) + + if reg != nil { + _ = reg.Register(activeMappings) + _ = reg.Register(trackedRefs) + _ = reg.Register(refsCleaned) + _ = reg.Register(uniqueRefsTotal) + } + + return &SeriesRefMappingStore{ + uniqueRefToChildRefs: make(map[storage.SeriesRef]uniqRefChildren), + nextUniqueRef: 1, + uniqueRefTimestamps: make(map[storage.SeriesRef]int64), + labelHashToUniqueRef: make(map[uint64]storage.SeriesRef), + cellPool: sync.Pool{ + New: func() any { + return &Cell{Refs: make([]storage.SeriesRef, 0, 100)} + }, + }, + stopCleanup: make(chan struct{}), + cleanupStopped: make(chan struct{}), + activeMappings: activeMappings, + trackedRefs: trackedRefs, + refsCleaned: refsCleaned, + uniqueRefsTotal: uniqueRefsTotal, + } +} + +type Cell struct { + Refs []storage.SeriesRef +} + +// GetMapping returns existing child ref results for the given unique ref if one exists. +// +// If the passed uniqueRef is zero, the method will attempt to find a mapping using passed labels. +// Returns nil if no mapping exists. +// +// The returned slice must be treated as read-only. Callers that need to change a mapping +// must provide an updated slice to UpdateMapping. Concurrent appenders may race to update +// the same mapping with different values, which is safe because stale mappings are +// self-correcting - using a stale ref will cause the child appender to return a new ref +// on the next append. +func (s *SeriesRefMappingStore) GetMapping(uniqueRef storage.SeriesRef, lbls labels.Labels) []storage.SeriesRef { + s.refMappingMu.RLock() + defer s.refMappingMu.RUnlock() + + if uniqueRef == 0 { + // Some consumers don't memo the global ref. Try to lookup a ref by label hash. + labelHash := lbls.Hash() + gotRef, ok := s.labelHashToUniqueRef[labelHash] + if !ok { + return nil + } + + uniqueRef = gotRef + } + + // Refs below firstRefOfCurrentGeneration were issued before the last Clear() and are + // no longer valid — the children they mapped to may have changed. + if uniqueRef < s.firstRefOfCurrentGeneration { + return nil + } + + if mapping, ok := s.uniqueRefToChildRefs[uniqueRef]; ok { + // Guard against numeric collisions with refs cached from a previous generation. + if mapping.labelHash != lbls.Hash() { + return nil + } + return mapping.childRefs + } + return nil +} + +// CreateMapping creates a new unique ref mapping for the given child ref results. +func (s *SeriesRefMappingStore) CreateMapping(refResults []storage.SeriesRef, lbls labels.Labels) storage.SeriesRef { + // Start cleanup goroutine on first mapping + s.startRefCleanup.Do(func() { + s.cleanupStarted.Store(true) + go s.cleanupStaleRefs() + }) + + // Store a copy of the child ref results directly + childRefSlice := make([]storage.SeriesRef, len(refResults)) + copy(childRefSlice, refResults) + + // Hash labels to for the fallback lookup table + labelHash := lbls.Hash() + + s.refMappingMu.Lock() + defer s.refMappingMu.Unlock() + + // Create a new unique ref + uniqueRef := s.nextUniqueRef + s.nextUniqueRef++ + + s.labelHashToUniqueRef[labelHash] = uniqueRef + s.uniqueRefToChildRefs[uniqueRef] = uniqRefChildren{ + childRefs: childRefSlice, + labelHash: labelHash, + } + + s.activeMappings.Inc() + s.uniqueRefsTotal.Inc() + + return uniqueRef +} + +func (s *SeriesRefMappingStore) UpdateMapping(uniqueRef storage.SeriesRef, refResults []storage.SeriesRef, lbls labels.Labels) { + if uniqueRef == 0 { + return + } + + childRefSlice := make([]storage.SeriesRef, len(refResults)) + copy(childRefSlice, refResults) + + s.refMappingMu.Lock() + defer s.refMappingMu.Unlock() + + newHash := lbls.Hash() + prev, ok := s.uniqueRefToChildRefs[uniqueRef] + if ok && prev.labelHash != newHash { + delete(s.labelHashToUniqueRef, prev.labelHash) + s.labelHashToUniqueRef[newHash] = uniqueRef + } + + s.uniqueRefToChildRefs[uniqueRef] = uniqRefChildren{ + childRefs: childRefSlice, + labelHash: lbls.Hash(), + } +} + +func (s *SeriesRefMappingStore) TrackAppendedSeries(ts int64, cell *Cell) { + s.timestampTrackingMu.Lock() + defer s.timestampTrackingMu.Unlock() + + for _, r := range cell.Refs { + s.uniqueRefTimestamps[r] = ts + } + + s.trackedRefs.Set(float64(len(s.uniqueRefTimestamps))) + + cell.Refs = cell.Refs[:0] + s.cellPool.Put(cell) +} + +func (s *SeriesRefMappingStore) GetCellForAppendedSeries() *Cell { + return s.cellPool.Get().(*Cell) +} + +func (s *SeriesRefMappingStore) cleanupStaleRefs() { + defer close(s.cleanupStopped) + + ticker := time.NewTicker(15 * time.Minute) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + cutoffTime := time.Now().Add(-15 * time.Minute).Unix() + + // Hold both locks to prevent race condition where a ref could be + // appended after we delete it from uniqueRefCell but before + // we delete it from uniqueRefToChildRefs + s.timestampTrackingMu.Lock() + s.refMappingMu.Lock() + + staleRefCount := 0 + for ref, ts := range s.uniqueRefTimestamps { + if ts < cutoffTime { + staleRefCount++ + + v, ok := s.uniqueRefToChildRefs[ref] + if ok { + delete(s.labelHashToUniqueRef, v.labelHash) + } + + delete(s.uniqueRefTimestamps, ref) + delete(s.uniqueRefToChildRefs, ref) + } + } + + // Update metrics + if staleRefCount > 0 { + s.refsCleaned.Add(float64(staleRefCount)) + s.activeMappings.Sub(float64(staleRefCount)) + s.trackedRefs.Set(float64(len(s.uniqueRefTimestamps))) + } + + s.refMappingMu.Unlock() + s.timestampTrackingMu.Unlock() + + case <-s.stopCleanup: + return + } + } +} + +// Clear will clear all internal mappings and stop the cleaner goroutine if it is running. +// It is safe to re-use the same instance after calling Clear. +// Returns the generation boundary; any ref below this value is stale. +func (s *SeriesRefMappingStore) Clear() storage.SeriesRef { + // Stop the cleanup goroutine and wait for it to be stopped so we can + // avoid a possible deadlock with cleanup that also holds both locks + if s.cleanupStarted.Load() { + select { + case <-s.stopCleanup: + // Already closed + default: + close(s.stopCleanup) + <-s.cleanupStopped + } + } + + // We need to hold both locks to do this safely and we do it in the same order as + // cleanupStaleRefs. We stopped and waited for the background worker that calls it + // to finish but some extra safety won't hurt. + s.timestampTrackingMu.Lock() + defer s.timestampTrackingMu.Unlock() + + s.refMappingMu.Lock() + defer s.refMappingMu.Unlock() + + clear(s.uniqueRefToChildRefs) + clear(s.uniqueRefTimestamps) + clear(s.labelHashToUniqueRef) + + // reset the pool + s.cellPool = sync.Pool{ + New: func() any { + return &Cell{Refs: make([]storage.SeriesRef, 0, 100)} + }, + } + + // NOTE: We do NOT reset nextUniqueRef here. Resetting it would cause ref collisions + // with components like prometheus.scrape which will keep re-sending the same cached refs. + // We continue incrementing to ensure all refs remain unique across the lifetime of the process. + s.firstRefOfCurrentGeneration = s.nextUniqueRef + + // Reset metrics + s.activeMappings.Set(0) + s.trackedRefs.Set(0) + + // Reset channels and flags + s.stopCleanup = make(chan struct{}) + s.cleanupStopped = make(chan struct{}) + s.startRefCleanup = sync.Once{} + s.cleanupStarted.Store(false) + + return s.firstRefOfCurrentGeneration +} diff --git a/internal/component/prometheus/appenders/seriesrefmapping_test.go b/internal/component/prometheus/appenders/seriesrefmapping_test.go new file mode 100644 index 00000000000..07fc25ff05a --- /dev/null +++ b/internal/component/prometheus/appenders/seriesrefmapping_test.go @@ -0,0 +1,816 @@ +package appenders + +import ( + "errors" + "strconv" + "sync" + "testing" + "testing/synctest" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/testutil" + "github.com/prometheus/prometheus/model/exemplar" + "github.com/prometheus/prometheus/model/histogram" + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/model/metadata" + "github.com/prometheus/prometheus/storage" + "github.com/stretchr/testify/require" +) + +func TestSeriesRefMappingStore_GetMappingReturnsNilForUnknownRef(t *testing.T) { + store := NewSeriesRefMappingStore(nil) + + require.Nil(t, store.GetMapping(0, labels.EmptyLabels())) + require.Nil(t, store.GetMapping(1, labels.EmptyLabels())) + require.Nil(t, store.GetMapping(999, labels.EmptyLabels())) + require.Nil(t, store.GetMapping(storage.SeriesRef(12345), labels.EmptyLabels())) +} + +func TestSeriesRefMappingStore_CreatedMappingCanBeRetrieved(t *testing.T) { + store := NewSeriesRefMappingStore(nil) + t.Cleanup(func() { store.Clear() }) + + childRefs := []storage.SeriesRef{1, 2, 3} + lbls := labels.NewBuilder(labels.EmptyLabels()).Set("foo", "bar").Labels() + + uniqueRef := store.CreateMapping(childRefs, lbls) + + // Case 1: get by unique ref + got := store.GetMapping(uniqueRef, lbls) + require.NotNil(t, got) + require.Equal(t, childRefs, got) + + // Case 1: rely on label hash fallback + got = store.GetMapping(0, lbls) + require.NotNil(t, got) + require.Equal(t, childRefs, got) +} + +func TestSeriesRefMappingStore_EachCreatedMappingGetsUniqueRef(t *testing.T) { + store := NewSeriesRefMappingStore(nil) + t.Cleanup(func() { store.Clear() }) + + type mappingAndLabels struct { + refs []storage.SeriesRef + labels labels.Labels + } + + refs := make(map[storage.SeriesRef]bool) + mappings := make(map[storage.SeriesRef]mappingAndLabels) + + for i := range 100 { + lbls := labels.NewBuilder(labels.EmptyLabels()).Set("k", strconv.Itoa(i)).Labels() + childRefs := []storage.SeriesRef{storage.SeriesRef(i), storage.SeriesRef(i + 1)} + uniqueRef := store.CreateMapping(childRefs, lbls) + + // Verify this ref is unique + require.False(t, refs[uniqueRef], "ref %d was already used", uniqueRef) + refs[uniqueRef] = true + mappings[uniqueRef] = mappingAndLabels{ + refs: childRefs, + labels: lbls, + } + } + + // Verify all mappings can be retrieved independently + for uniqueRef, mp := range mappings { + retrieved := store.GetMapping(uniqueRef, mp.labels) + require.Equal(t, mp.refs, retrieved) + } +} + +func TestSeriesRefMappingStore_UpdateMappingChangesReturnedValue(t *testing.T) { + store := NewSeriesRefMappingStore(nil) + t.Cleanup(func() { store.Clear() }) + lbls := labels.EmptyLabels() + + originalRefs := []storage.SeriesRef{1, 2, 3} + uniqueRef := store.CreateMapping(originalRefs, lbls) + + updatedRefs := []storage.SeriesRef{4, 5, 6} + store.UpdateMapping(uniqueRef, updatedRefs, lbls) + + retrieved := store.GetMapping(uniqueRef, lbls) + require.Equal(t, updatedRefs, retrieved) + require.NotEqual(t, originalRefs, retrieved) +} + +func TestSeriesRefMappingStore_UpdateMappingWithZeroRefDoesNothing(t *testing.T) { + store := NewSeriesRefMappingStore(nil) + lbls := labels.EmptyLabels() + + store.UpdateMapping(0, []storage.SeriesRef{1, 2, 3}, lbls) + + // Should still return nil + require.Nil(t, store.GetMapping(0, lbls)) +} + +func TestSeriesRefMappingStore_TrackAppendedSeriesDoesNotPanic(t *testing.T) { + store := NewSeriesRefMappingStore(nil) + + cell := store.GetCellForAppendedSeries() + cell.Refs = append(cell.Refs, 1, 2, 3) + + require.NotPanics(t, func() { + store.TrackAppendedSeries(time.Now().Unix(), cell) + }) +} + +func TestSeriesRefMappingStore_SliceIsEmptyAfterReturn(t *testing.T) { + store := NewSeriesRefMappingStore(nil) + + cell1 := store.GetCellForAppendedSeries() + cell1.Refs = append(cell1.Refs, 1, 2, 3) + store.TrackAppendedSeries(time.Now().Unix(), cell1) + + cell2 := store.GetCellForAppendedSeries() + require.NotNil(t, cell2) + require.Equal(t, 0, len(cell2.Refs), "slice returned should always have length 0") +} + +func TestSeriesRefMappingStore_RefsAreEventuallyCleanedUp(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + store := NewSeriesRefMappingStore(nil) + t.Cleanup(func() { store.Clear() }) + lbls := labels.EmptyLabels() + + // Create and track a mapping with old timestamp + childRefs := []storage.SeriesRef{1, 2, 3} + uniqueRef := store.CreateMapping(childRefs, lbls) + + oldTimestamp := time.Now().Add(-20 * time.Minute).Unix() + cell := store.GetCellForAppendedSeries() + cell.Refs = append(cell.Refs, uniqueRef) + store.TrackAppendedSeries(oldTimestamp, cell) + + // Verify mapping exists initially + require.NotNil(t, store.GetMapping(uniqueRef, lbls)) + + // Wait for cleanup to run (15 minute ticker + some buffer) + time.Sleep(16 * time.Minute) + + // Mapping should be cleaned up + require.Nil(t, store.GetMapping(uniqueRef, lbls)) + }) +} + +func TestSeriesRefMappingStore_RecentlyTrackedRefsAreNotCleanedUp(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + store := NewSeriesRefMappingStore(nil) + t.Cleanup(func() { store.Clear() }) + lbls := labels.EmptyLabels() + + // Create and track a mapping with recent timestamp + childRefs := []storage.SeriesRef{1, 2, 3} + uniqueRef := store.CreateMapping(childRefs, lbls) + + recentTimestamp := time.Now().Unix() + cell := store.GetCellForAppendedSeries() + cell.Refs = append(cell.Refs, uniqueRef) + store.TrackAppendedSeries(recentTimestamp, cell) + + // Wait for a cleanup cycle + time.Sleep(16 * time.Minute) + + // Mapping should still exist + require.NotNil(t, store.GetMapping(uniqueRef, lbls)) + }) +} + +func TestSeriesRefMappingStore_TrackingRefAgainUpdatesTimestamp(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + store := NewSeriesRefMappingStore(nil) + t.Cleanup(func() { store.Clear() }) + lbls := labels.EmptyLabels() + + // Create and track a mapping with old timestamp + childRefs := []storage.SeriesRef{1, 2, 3} + uniqueRef := store.CreateMapping(childRefs, lbls) + + oldTimestamp := time.Now().Add(-20 * time.Minute).Unix() + cell1 := store.GetCellForAppendedSeries() + cell1.Refs = append(cell1.Refs, uniqueRef) + store.TrackAppendedSeries(oldTimestamp, cell1) + + // Wait a bit + time.Sleep(1 * time.Minute) + + // Track the same ref again with current timestamp + currentTimestamp := time.Now().Unix() + cell2 := store.GetCellForAppendedSeries() + cell2.Refs = append(cell2.Refs, uniqueRef) + store.TrackAppendedSeries(currentTimestamp, cell2) + + // Wait for cleanup cycle + time.Sleep(16 * time.Minute) + + // Mapping should NOT be cleaned up because timestamp was refreshed + require.NotNil(t, store.GetMapping(uniqueRef, lbls)) + }) +} + +func TestSeriesRefMappingStore_ClearRemovesAllMappings(t *testing.T) { + store := NewSeriesRefMappingStore(nil) + lbls := labels.EmptyLabels() + + // Create several mappings + var uniqueRefs []storage.SeriesRef + for i := range 10 { + childRefs := []storage.SeriesRef{storage.SeriesRef(i), storage.SeriesRef(i + 1)} + uniqueRef := store.CreateMapping(childRefs, lbls) + uniqueRefs = append(uniqueRefs, uniqueRef) + + // Track them + cell := store.GetCellForAppendedSeries() + cell.Refs = append(cell.Refs, uniqueRef) + store.TrackAppendedSeries(time.Now().Unix(), cell) + } + + // Verify they exist + for _, ref := range uniqueRefs { + require.NotNil(t, store.GetMapping(ref, lbls)) + } + + // Clear advances the generation boundary past all previously issued refs. + threshold := store.Clear() + require.Greater(t, uint64(threshold), uint64(uniqueRefs[len(uniqueRefs)-1]), + "threshold must be above all previously issued refs") + + // Verify all are gone + for _, ref := range uniqueRefs { + require.Nil(t, store.GetMapping(ref, lbls)) + } +} + +func TestSeriesRefMappingStore_ClearIsIdempotent(t *testing.T) { + store := NewSeriesRefMappingStore(nil) + lbls := labels.EmptyLabels() + + // Create some mappings + childRefs := []storage.SeriesRef{1, 2, 3} + store.CreateMapping(childRefs, lbls) + + // Clear multiple times + require.NotPanics(t, func() { + store.Clear() + store.Clear() + store.Clear() + }) +} + +func TestSeriesRefMappingStore_CanBeReusedAfterClear(t *testing.T) { + store := NewSeriesRefMappingStore(nil) + t.Cleanup(func() { store.Clear() }) + lbls := labels.EmptyLabels() + + // Create multiple mappings before clear + childRefs1 := []storage.SeriesRef{1, 2, 3} + uniqueRef1 := store.CreateMapping(childRefs1, lbls) + childRefs2 := []storage.SeriesRef{7, 8, 9} + uniqueRef2 := store.CreateMapping(childRefs2, lbls) + + // Clear + store.Clear() + + // Create new mappings after clear + childRefs3 := []storage.SeriesRef{4, 5, 6} + uniqueRef3 := store.CreateMapping(childRefs3, lbls) + + // New mapping should work + retrieved := store.GetMapping(uniqueRef3, lbls) + require.NotNil(t, retrieved) + require.Equal(t, childRefs3, retrieved) + + // Old mappings should not exist + require.Nil(t, store.GetMapping(uniqueRef1, lbls)) + require.Nil(t, store.GetMapping(uniqueRef2, lbls)) +} + +func TestSeriesRefMappingStore_ConcurrentReadsAreConsistent(t *testing.T) { + store := NewSeriesRefMappingStore(nil) + t.Cleanup(func() { store.Clear() }) + lbls := labels.EmptyLabels() + + // Create a mapping + childRefs := []storage.SeriesRef{1, 2, 3} + uniqueRef := store.CreateMapping(childRefs, lbls) + + // Spawn many goroutines reading the same mapping + var wg sync.WaitGroup + numReaders := 100 + + for range numReaders { + wg.Go(func() { + for range 100 { + retrieved := store.GetMapping(uniqueRef, lbls) + require.NotNil(t, retrieved) + require.Equal(t, childRefs, retrieved) + } + }) + } + + wg.Wait() +} + +func TestSeriesRefMappingStore_ConcurrentCreatesGetUniqueRefs(t *testing.T) { + store := NewSeriesRefMappingStore(nil) + t.Cleanup(func() { store.Clear() }) + lbls := labels.EmptyLabels() + + var wg sync.WaitGroup + numCreators := 50 + refsPerCreator := 20 + + refsChan := make(chan storage.SeriesRef, numCreators*refsPerCreator) + + for i := range numCreators { + wg.Add(1) + go func(id int) { + defer wg.Done() + + for j := range refsPerCreator { + childRefs := []storage.SeriesRef{storage.SeriesRef(id*1000 + j)} + uniqueRef := store.CreateMapping(childRefs, lbls) + refsChan <- uniqueRef + } + }(i) + } + + wg.Wait() + close(refsChan) + + // Collect all refs and verify no duplicates + seenRefs := make(map[storage.SeriesRef]bool) + count := 0 + for ref := range refsChan { + require.False(t, seenRefs[ref], "duplicate ref %d", ref) + seenRefs[ref] = true + count++ + } + + require.Equal(t, numCreators*refsPerCreator, count) +} + +func TestSeriesRefMappingStore_ConcurrentTrackingIsCorrect(t *testing.T) { + store := NewSeriesRefMappingStore(nil) + t.Cleanup(func() { store.Clear() }) + lbls := labels.EmptyLabels() + + // Create some mappings + var uniqueRefs []storage.SeriesRef + for i := range 50 { + childRefs := []storage.SeriesRef{storage.SeriesRef(i)} + uniqueRef := store.CreateMapping(childRefs, lbls) + uniqueRefs = append(uniqueRefs, uniqueRef) + } + + // Track them concurrently from multiple goroutines + var wg sync.WaitGroup + numTrackers := 10 + + for i := range numTrackers { + wg.Add(1) + go func(id int) { + defer wg.Done() + + // Each tracker tracks a subset of refs + for j := id; j < len(uniqueRefs); j += numTrackers { + cell := store.GetCellForAppendedSeries() + cell.Refs = append(cell.Refs, uniqueRefs[j]) + store.TrackAppendedSeries(time.Now().Unix(), cell) + } + }(i) + } + + wg.Wait() + + // All refs should still be retrievable (tracking shouldn't break anything) + for _, ref := range uniqueRefs { + require.NotNil(t, store.GetMapping(ref, lbls)) + } +} + +func TestSeriesRefMapping_AppendReusesExistingMapping(t *testing.T) { + store := newMockMappingStore() + store.mappingByRef[77] = []storage.SeriesRef{101, 202} + + child1 := &mockAppender{} + child2 := &mockAppender{} + + writeLatency := prometheus.NewHistogram(prometheus.HistogramOpts{Name: "test_series_ref_mapping_write_latency_reuse", Help: "test"}) + samplesForwarded := prometheus.NewCounter(prometheus.CounterOpts{Name: "test_series_ref_mapping_samples_forwarded_reuse", Help: "test"}) + app := NewSeriesRefMapping([]storage.Appender{child1, child2}, store, writeLatency, samplesForwarded) + + lbls := labels.FromStrings("job", "test") + ref, err := app.Append(77, lbls, 123, 42) + require.NoError(t, err) + require.Equal(t, storage.SeriesRef(77), ref) + require.Equal(t, []storage.SeriesRef{101}, child1.appendRefs) + require.Equal(t, []storage.SeriesRef{202}, child2.appendRefs) + require.Len(t, store.createCalls, 0) + require.Len(t, store.updateCalls, 0) + require.Equal(t, []storage.SeriesRef{77}, store.cell.Refs) + require.Equal(t, float64(2), testutil.ToFloat64(samplesForwarded)) +} + +func TestSeriesRefMapping_AppendUpdatesExistingMappingWhenRefsChange(t *testing.T) { + store := newMockMappingStore() + store.mappingByRef[33] = []storage.SeriesRef{11, 22} + + child1 := &mockAppender{appendFn: func(_ storage.SeriesRef, _ labels.Labels, _ int64, _ float64) (storage.SeriesRef, error) { + return 111, nil + }} + child2 := &mockAppender{} + + writeLatency := prometheus.NewHistogram(prometheus.HistogramOpts{Name: "test_series_ref_mapping_write_latency_update", Help: "test"}) + samplesForwarded := prometheus.NewCounter(prometheus.CounterOpts{Name: "test_series_ref_mapping_samples_forwarded_update", Help: "test"}) + app := NewSeriesRefMapping([]storage.Appender{child1, child2}, store, writeLatency, samplesForwarded) + + lbls := labels.FromStrings("job", "test") + ref, err := app.Append(33, lbls, 123, 42) + require.NoError(t, err) + require.Equal(t, storage.SeriesRef(33), ref) + require.Len(t, store.updateCalls, 1) + require.Equal(t, storage.SeriesRef(33), store.updateCalls[0].uniqueRef) + require.Equal(t, []storage.SeriesRef{111, 22}, store.updateCalls[0].refs) + require.Len(t, store.createCalls, 0) +} + +func TestSeriesRefMapping_AppendAllChildrenZeroPassesThroughInputRef(t *testing.T) { + store := newMockMappingStore() + zeroFn := func(_ storage.SeriesRef, _ labels.Labels, _ int64, _ float64) (storage.SeriesRef, error) { + return 0, nil + } + child1 := &mockAppender{appendFn: zeroFn} + child2 := &mockAppender{appendFn: zeroFn} + + writeLatency := prometheus.NewHistogram(prometheus.HistogramOpts{Name: "test_all_zero_latency", Help: "test"}) + samplesForwarded := prometheus.NewCounter(prometheus.CounterOpts{Name: "test_all_zero_forwarded", Help: "test"}) + app := NewSeriesRefMapping([]storage.Appender{child1, child2}, store, writeLatency, samplesForwarded) + + ref, err := app.Append(42, labels.FromStrings("job", "test"), 1, 1) + require.NoError(t, err) + require.Equal(t, storage.SeriesRef(42), ref) + require.Len(t, store.createCalls, 0) + require.Empty(t, store.cell.Refs) +} + +func TestSeriesRefMapping_AppendSingleNonZeroChildReturnsChildRefDirectly(t *testing.T) { + store := newMockMappingStore() + child1 := &mockAppender{} + child2 := &mockAppender{appendFn: func(_ storage.SeriesRef, _ labels.Labels, _ int64, _ float64) (storage.SeriesRef, error) { + return 77, nil + }} + + writeLatency := prometheus.NewHistogram(prometheus.HistogramOpts{Name: "test_single_nonzero_latency", Help: "test"}) + samplesForwarded := prometheus.NewCounter(prometheus.CounterOpts{Name: "test_single_nonzero_forwarded", Help: "test"}) + app := NewSeriesRefMapping([]storage.Appender{child1, child2}, store, writeLatency, samplesForwarded) + + // The single non-zero child ref is returned directly — no mapping created. + ref, err := app.Append(0, labels.FromStrings("job", "test"), 1, 1) + require.NoError(t, err) + require.Equal(t, storage.SeriesRef(77), ref) + require.Len(t, store.createCalls, 0) + require.Empty(t, store.cell.Refs) +} + +func TestSeriesRefMapping_AppendSecondAppendUsesChildRefsFromMapping(t *testing.T) { + store := newMockMappingStore() + + // Both children return non-zero so a mapping is created on the first append. + child1 := &mockAppender{appendFn: func(_ storage.SeriesRef, _ labels.Labels, _ int64, _ float64) (storage.SeriesRef, error) { + return 5001, nil + }} + child2 := &mockAppender{appendFn: func(_ storage.SeriesRef, _ labels.Labels, _ int64, _ float64) (storage.SeriesRef, error) { + return 77, nil + }} + + writeLatency := prometheus.NewHistogram(prometheus.HistogramOpts{Name: "test_series_ref_mapping_write_latency_single_no_leak", Help: "test"}) + samplesForwarded := prometheus.NewCounter(prometheus.CounterOpts{Name: "test_series_ref_mapping_samples_forwarded_single_no_leak", Help: "test"}) + app := NewSeriesRefMapping([]storage.Appender{child1, child2}, store, writeLatency, samplesForwarded) + + lbls := labels.FromStrings("job", "single") + + // First append: both children return non-zero, so a mapping is created. + ref, err := app.Append(0, lbls, 1, 1) + require.NoError(t, err) + require.Equal(t, storage.SeriesRef(1000), ref) + + // Second append: mapping is found and each child is called with its stored child ref. + _, err = app.Append(ref, lbls, 2, 2) + require.NoError(t, err) + require.Equal(t, []storage.SeriesRef{0, 5001}, child1.appendRefs) + require.Equal(t, []storage.SeriesRef{0, 77}, child2.appendRefs) +} + +func TestSeriesRefMapping_AppendErrorSkipsMappingUpdate(t *testing.T) { + store := newMockMappingStore() + store.mappingByRef[88] = []storage.SeriesRef{11, 22} + + child1 := &mockAppender{appendFn: func(_ storage.SeriesRef, _ labels.Labels, _ int64, _ float64) (storage.SeriesRef, error) { + return 111, nil + }} + child2 := &mockAppender{appendFn: func(ref storage.SeriesRef, _ labels.Labels, _ int64, _ float64) (storage.SeriesRef, error) { + return ref, errors.New("child append failed") + }} + + writeLatency := prometheus.NewHistogram(prometheus.HistogramOpts{Name: "test_series_ref_mapping_write_latency_error", Help: "test"}) + samplesForwarded := prometheus.NewCounter(prometheus.CounterOpts{Name: "test_series_ref_mapping_samples_forwarded_error", Help: "test"}) + app := NewSeriesRefMapping([]storage.Appender{child1, child2}, store, writeLatency, samplesForwarded) + + ref, err := app.Append(88, labels.EmptyLabels(), 1, 1) + require.Error(t, err) + require.Equal(t, storage.SeriesRef(0), ref) + require.Len(t, store.updateCalls, 0) + require.Len(t, store.createCalls, 0) + require.Equal(t, float64(1), testutil.ToFloat64(samplesForwarded)) +} + +func TestSeriesRefMapping_CommitTracksRefsAndAggregatesErrors(t *testing.T) { + store := newMockMappingStore() + store.mappingByRef[101] = []storage.SeriesRef{11, 22} + + child1 := &mockAppender{commitFn: func() error { return errors.New("child1 commit failed") }} + child2 := &mockAppender{commitFn: func() error { return errors.New("child2 commit failed") }} + + writeLatency := prometheus.NewHistogram(prometheus.HistogramOpts{Name: "test_series_ref_mapping_write_latency_commit", Help: "test"}) + samplesForwarded := prometheus.NewCounter(prometheus.CounterOpts{Name: "test_series_ref_mapping_samples_forwarded_commit", Help: "test"}) + app := NewSeriesRefMapping([]storage.Appender{child1, child2}, store, writeLatency, samplesForwarded) + + _, err := app.Append(101, labels.EmptyLabels(), 1, 1) + require.NoError(t, err) + + err = app.Commit() + require.ErrorContains(t, err, "child1 commit failed") + require.ErrorContains(t, err, "child2 commit failed") + require.Len(t, store.trackCalls, 1) + require.Equal(t, []storage.SeriesRef{101}, store.trackCalls[0].refs) + require.Empty(t, store.cell.Refs) + require.Equal(t, 1, child1.commitCalls) + require.Equal(t, 1, child2.commitCalls) +} + +func TestSeriesRefMapping_RollbackTracksRefs(t *testing.T) { + store := newMockMappingStore() + store.mappingByRef[202] = []storage.SeriesRef{33, 44} + + child1 := &mockAppender{rollbackFn: func() error { return nil }} + child2 := &mockAppender{rollbackFn: func() error { return errors.New("child2 rollback failed") }} + + writeLatency := prometheus.NewHistogram(prometheus.HistogramOpts{Name: "test_series_ref_mapping_write_latency_rollback", Help: "test"}) + samplesForwarded := prometheus.NewCounter(prometheus.CounterOpts{Name: "test_series_ref_mapping_samples_forwarded_rollback", Help: "test"}) + app := NewSeriesRefMapping([]storage.Appender{child1, child2}, store, writeLatency, samplesForwarded) + + _, err := app.Append(202, labels.EmptyLabels(), 1, 1) + require.NoError(t, err) + + err = app.Rollback() + require.ErrorContains(t, err, "child2 rollback failed") + require.Len(t, store.trackCalls, 1) + require.Equal(t, []storage.SeriesRef{202}, store.trackCalls[0].refs) + require.Equal(t, 1, child1.rollbackCalls) + require.Equal(t, 1, child2.rollbackCalls) +} + +// TestSeriesRefMapping_MappingReusedOnSubsequentAppends verifies that the second +// append for a series uses the existing mapping and does not create a new one. +func TestSeriesRefMapping_MappingReusedOnSubsequentAppends(t *testing.T) { + store := newMockMappingStore() + + child1 := &mockAppender{appendFn: func(_ storage.SeriesRef, _ labels.Labels, _ int64, _ float64) (storage.SeriesRef, error) { + return 5001, nil + }} + child2 := &mockAppender{appendFn: func(_ storage.SeriesRef, _ labels.Labels, _ int64, _ float64) (storage.SeriesRef, error) { + return 6002, nil + }} + + writeLatency := prometheus.NewHistogram(prometheus.HistogramOpts{Name: "test_series_ref_mapping_reuse_latency", Help: "test"}) + samplesForwarded := prometheus.NewCounter(prometheus.CounterOpts{Name: "test_series_ref_mapping_reuse_forwarded", Help: "test"}) + app := NewSeriesRefMapping([]storage.Appender{child1, child2}, store, writeLatency, samplesForwarded) + + lbls := labels.FromStrings("job", "test") + + // First append: no existing mapping, creates one. + ref, err := app.Append(0, lbls, 1, 1) + require.NoError(t, err) + require.Equal(t, storage.SeriesRef(1000), ref) + require.Len(t, store.createCalls, 1) + + // Second append: existing mapping is found and reused, no new create. + ref2, err := app.Append(ref, lbls, 2, 2) + require.NoError(t, err) + require.Equal(t, storage.SeriesRef(1000), ref2) + require.Len(t, store.createCalls, 1, "no new mapping should be created on second append") + require.Len(t, store.updateCalls, 0) +} + +// TestSeriesRefMapping_ChildRefChangeUpdatesMapping verifies that when a child +// returns a different ref than the one stored in the mapping, the mapping is +// updated so subsequent appends use the new child ref. +func TestSeriesRefMapping_ChildRefChangeUpdatesMapping(t *testing.T) { + store := newMockMappingStore() + + // child2 starts at 6002 then changes to 9999 on the second append. + child2Calls := 0 + child1 := &mockAppender{appendFn: func(_ storage.SeriesRef, _ labels.Labels, _ int64, _ float64) (storage.SeriesRef, error) { + return 5001, nil + }} + child2 := &mockAppender{appendFn: func(_ storage.SeriesRef, _ labels.Labels, _ int64, _ float64) (storage.SeriesRef, error) { + child2Calls++ + if child2Calls == 1 { + return 6002, nil + } + return 9999, nil + }} + + writeLatency := prometheus.NewHistogram(prometheus.HistogramOpts{Name: "test_series_ref_mapping_child_change_latency", Help: "test"}) + samplesForwarded := prometheus.NewCounter(prometheus.CounterOpts{Name: "test_series_ref_mapping_child_change_forwarded", Help: "test"}) + app := NewSeriesRefMapping([]storage.Appender{child1, child2}, store, writeLatency, samplesForwarded) + + lbls := labels.FromStrings("job", "test") + + // First append: creates mapping with [5001, 6002]. + ref, err := app.Append(0, lbls, 1, 1) + require.NoError(t, err) + require.Len(t, store.createCalls, 1) + require.Equal(t, []storage.SeriesRef{5001, 6002}, store.createCalls[0].refs) + + // Second append: child2 returns 9999 instead of 6002, mapping should be updated. + _, err = app.Append(ref, lbls, 2, 2) + require.NoError(t, err) + require.Len(t, store.updateCalls, 1) + require.Equal(t, []storage.SeriesRef{5001, 9999}, store.updateCalls[0].refs) + + // Third append: child2 is called with updated ref 9999. + _, err = app.Append(ref, lbls, 3, 3) + require.NoError(t, err) + require.Equal(t, storage.SeriesRef(9999), child2.appendRefs[2]) +} + +type createCall struct { + refs []storage.SeriesRef + lbls labels.Labels +} + +type updateCall struct { + uniqueRef storage.SeriesRef + refs []storage.SeriesRef + lbls labels.Labels +} + +type trackCall struct { + ts int64 + refs []storage.SeriesRef +} + +type mockMappingStore struct { + mappingByRef map[storage.SeriesRef][]storage.SeriesRef + mappingByHash map[uint64]storage.SeriesRef + createCalls []createCall + updateCalls []updateCall + trackCalls []trackCall + createRef storage.SeriesRef + cell *Cell +} + +func newMockMappingStore() *mockMappingStore { + return &mockMappingStore{ + mappingByRef: map[storage.SeriesRef][]storage.SeriesRef{}, + mappingByHash: map[uint64]storage.SeriesRef{}, + createRef: 1000, + cell: &Cell{Refs: make([]storage.SeriesRef, 0, 10)}, + } +} + +func (m *mockMappingStore) GetMapping(uniqueRef storage.SeriesRef, lbls labels.Labels) []storage.SeriesRef { + if uniqueRef == 0 { + mappedRef, ok := m.mappingByHash[lbls.Hash()] + if !ok { + return nil + } + uniqueRef = mappedRef + } + + refs, ok := m.mappingByRef[uniqueRef] + if !ok { + return nil + } + + return copyRefs(refs) +} + +func (m *mockMappingStore) CreateMapping(refResults []storage.SeriesRef, lbls labels.Labels) storage.SeriesRef { + newRef := m.createRef + m.createRef++ + + copiedRefs := copyRefs(refResults) + m.mappingByRef[newRef] = copiedRefs + m.mappingByHash[lbls.Hash()] = newRef + m.createCalls = append(m.createCalls, createCall{refs: copiedRefs, lbls: lbls}) + + return newRef +} + +func (m *mockMappingStore) UpdateMapping(uniqueRef storage.SeriesRef, refResults []storage.SeriesRef, lbls labels.Labels) { + copiedRefs := copyRefs(refResults) + m.mappingByRef[uniqueRef] = copiedRefs + m.mappingByHash[lbls.Hash()] = uniqueRef + m.updateCalls = append(m.updateCalls, updateCall{uniqueRef: uniqueRef, refs: copiedRefs, lbls: lbls}) +} + +func (m *mockMappingStore) TrackAppendedSeries(ts int64, cell *Cell) { + m.trackCalls = append(m.trackCalls, trackCall{ts: ts, refs: copyRefs(cell.Refs)}) + cell.Refs = cell.Refs[:0] +} + +func (m *mockMappingStore) GetCellForAppendedSeries() *Cell { + return m.cell +} + +func copyRefs(in []storage.SeriesRef) []storage.SeriesRef { + out := make([]storage.SeriesRef, len(in)) + copy(out, in) + return out +} + +type mockAppender struct { + appendFn func(ref storage.SeriesRef, l labels.Labels, t int64, v float64) (storage.SeriesRef, error) + appendExemplarFn func(ref storage.SeriesRef, l labels.Labels, e exemplar.Exemplar) (storage.SeriesRef, error) + appendHistogramFn func(ref storage.SeriesRef, l labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) + appendHistogramSTZeroSampleFn func(ref storage.SeriesRef, l labels.Labels, t, st int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) + updateMetadataFn func(ref storage.SeriesRef, l labels.Labels, m metadata.Metadata) (storage.SeriesRef, error) + appendSTZeroSampleFn func(ref storage.SeriesRef, l labels.Labels, t, st int64) (storage.SeriesRef, error) + commitFn func() error + rollbackFn func() error + setOptionsFn func(opts *storage.AppendOptions) + + appendRefs []storage.SeriesRef + commitCalls int + rollbackCalls int +} + +func (m *mockAppender) Append(ref storage.SeriesRef, l labels.Labels, t int64, v float64) (storage.SeriesRef, error) { + m.appendRefs = append(m.appendRefs, ref) + if m.appendFn != nil { + return m.appendFn(ref, l, t, v) + } + return ref, nil +} + +func (m *mockAppender) AppendExemplar(ref storage.SeriesRef, l labels.Labels, e exemplar.Exemplar) (storage.SeriesRef, error) { + if m.appendExemplarFn != nil { + return m.appendExemplarFn(ref, l, e) + } + return ref, nil +} + +func (m *mockAppender) AppendHistogram(ref storage.SeriesRef, l labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) { + if m.appendHistogramFn != nil { + return m.appendHistogramFn(ref, l, t, h, fh) + } + return ref, nil +} + +func (m *mockAppender) AppendHistogramSTZeroSample(ref storage.SeriesRef, l labels.Labels, t, st int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) { + if m.appendHistogramSTZeroSampleFn != nil { + return m.appendHistogramSTZeroSampleFn(ref, l, t, st, h, fh) + } + return ref, nil +} + +func (m *mockAppender) UpdateMetadata(ref storage.SeriesRef, l labels.Labels, md metadata.Metadata) (storage.SeriesRef, error) { + if m.updateMetadataFn != nil { + return m.updateMetadataFn(ref, l, md) + } + return ref, nil +} + +func (m *mockAppender) AppendSTZeroSample(ref storage.SeriesRef, l labels.Labels, t, st int64) (storage.SeriesRef, error) { + if m.appendSTZeroSampleFn != nil { + return m.appendSTZeroSampleFn(ref, l, t, st) + } + return ref, nil +} + +func (m *mockAppender) Commit() error { + m.commitCalls++ + if m.commitFn != nil { + return m.commitFn() + } + return nil +} + +func (m *mockAppender) Rollback() error { + m.rollbackCalls++ + if m.rollbackFn != nil { + return m.rollbackFn() + } + return nil +} + +func (m *mockAppender) SetOptions(opts *storage.AppendOptions) { + if m.setOptionsFn != nil { + m.setOptionsFn(opts) + } +} diff --git a/internal/component/prometheus/enrich/enrich.go b/internal/component/prometheus/enrich/enrich.go index 0a3aa43bf29..749aed5b701 100644 --- a/internal/component/prometheus/enrich/enrich.go +++ b/internal/component/prometheus/enrich/enrich.go @@ -152,6 +152,7 @@ func New(opts component.Options, args Arguments) (*Component, error) { // Run implements component.Component. func (c *Component) Run(ctx context.Context) error { defer c.exited.Store(true) + defer c.fanout.Clear() <-ctx.Done() diff --git a/internal/component/prometheus/fanout.go b/internal/component/prometheus/fanout.go index 7433874e013..091ac1d4047 100644 --- a/internal/component/prometheus/fanout.go +++ b/internal/component/prometheus/fanout.go @@ -2,6 +2,7 @@ package prometheus import ( "context" + "slices" "sync" "time" @@ -17,6 +18,7 @@ import ( "github.com/prometheus/prometheus/storage" "go.uber.org/atomic" + "github.com/grafana/alloy/internal/component/prometheus/appenders" "github.com/grafana/alloy/internal/service/labelstore" ) @@ -37,6 +39,22 @@ type Fanout struct { // lastSeriesCount stores the number of series that were sent through the last appender. It helps to estimate how // much memory to allocate for the staleness trackers. lastSeriesCount atomic.Int64 + + useLabelStore bool + seriesRefMappingStore *appenders.SeriesRefMappingStore + // deadRefThreshold is updated on every children change. Any store-issued ref + // below this value was issued before the last topology change and must be + // zeroed before forwarding to a child appender. + deadRefThreshold storage.SeriesRef +} + +func normalizeChildren(children []storage.Appendable) []storage.Appendable { + if len(children) == 0 { + return nil + } + + cloned := slices.Clone(children) + return slices.DeleteFunc(cloned, func(i storage.Appendable) bool { return i == nil }) } // NewFanout creates a fanout appendable. @@ -48,26 +66,52 @@ func NewFanout(children []storage.Appendable, componentID string, register prome }) _ = register.Register(wl) + // Note: this only covers calls to Append. It could make more sense when upstream changes to AppendV2 where there will be + // only a single Append function. But we might want to make this a CounterVec with different labels for the + // different appended types. s := prometheus.NewCounter(prometheus.CounterOpts{ Name: "prometheus_forwarded_samples_total", Help: "Total number of samples sent to downstream components.", }) _ = register.Register(s) + useLabelStore := ls != nil && ls.Enabled() + return &Fanout{ - children: children, + children: normalizeChildren(children), componentID: componentID, writeLatency: wl, samplesCounter: s, ls: ls, + + useLabelStore: useLabelStore, + seriesRefMappingStore: appenders.NewSeriesRefMappingStore(register), } } // UpdateChildren allows changing of the children of the fanout. +// +// When children change, the store is cleared to start a new ref generation. +// This guards against two ref collision hazards that arise when the appender +// type changes between passthrough (1 child) and seriesRefMapping (N children): +// +// passthrough → seriesRefMapping: a cached raw child ref (e.g. WAL ref) may collide +// numerically with a store-issued unique ref for a different series. The store guards +// against this with a label hash check on every mapping lookup. +// +// seriesRefMapping → passthrough: a cached store-issued unique ref is meaningless to +// the child and must not be forwarded. Clear returns the new generation boundary; +// the passthrough zeros any ref below it before forwarding. func (f *Fanout) UpdateChildren(children []storage.Appendable) { + c := normalizeChildren(children) + f.mut.Lock() defer f.mut.Unlock() - f.children = children + + if !slices.Equal(f.children, c) { + f.deadRefThreshold = f.seriesRefMappingStore.Clear() + } + f.children = c } // Appender satisfies the Appendable interface. @@ -94,19 +138,27 @@ func (f *Fanout) Appender(ctx context.Context) storage.Appender { ctx = scrape.ContextWithMetricMetadataStore(ctx, NoopMetadataStore{}) } - app := &appender{ - children: make([]storage.Appender, 0), - fanout: f, - stalenessTrackers: make([]labelstore.StalenessTracker, 0, f.lastSeriesCount.Load()), + children := make([]storage.Appender, 0, len(f.children)) + for _, c := range f.children { + children = append(children, c.Appender(ctx)) } - for _, x := range f.children { - if x == nil { - continue + if f.useLabelStore { + return &appender{ + children: children, + fanout: f, + stalenessTrackers: make([]labelstore.StalenessTracker, 0, f.lastSeriesCount.Load()), } - app.children = append(app.children, x.Appender(ctx)) } - return app + + return appenders.New(children, f.seriesRefMappingStore, f.deadRefThreshold, f.writeLatency, f.samplesCounter) +} + +func (f *Fanout) Clear() { + f.mut.Lock() + defer f.mut.Unlock() + + f.deadRefThreshold = f.seriesRefMappingStore.Clear() } type appender struct { @@ -235,7 +287,7 @@ func (a *appender) AppendHistogram(ref storage.SeriesRef, l labels.Labels, t int if ref == 0 { ref = storage.SeriesRef(a.fanout.ls.GetOrAddGlobalRefID(l)) } - // TODO histograms are not currently tracked for staleness causing them to be held forever + var multiErr error for _, x := range a.children { _, err := x.AppendHistogram(ref, l, t, h, fh) diff --git a/internal/component/prometheus/fanout_test.go b/internal/component/prometheus/fanout_test.go index 138417a5ad6..b70db5b7544 100644 --- a/internal/component/prometheus/fanout_test.go +++ b/internal/component/prometheus/fanout_test.go @@ -1,29 +1,316 @@ -package prometheus +package prometheus_test import ( + "context" + "fmt" + "strconv" "testing" + "time" - "github.com/prometheus/client_golang/prometheus" - - "github.com/grafana/alloy/internal/service/labelstore" - + "github.com/go-kit/log" + promclient "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/prometheus/model/exemplar" + "github.com/prometheus/prometheus/model/histogram" + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/model/metadata" "github.com/prometheus/prometheus/storage" - "github.com/stretchr/testify/require" + "go.uber.org/atomic" + + "github.com/grafana/alloy/internal/component/prometheus" + "github.com/grafana/alloy/internal/component/prometheus/remotewrite" + "github.com/grafana/alloy/internal/service/labelstore" ) func TestRollback(t *testing.T) { - ls := labelstore.New(nil, prometheus.DefaultRegisterer) - fanout := NewFanout([]storage.Appendable{NewFanout(nil, "1", prometheus.DefaultRegisterer, ls)}, "", prometheus.DefaultRegisterer, ls) + ls := labelstore.New(nil, promclient.DefaultRegisterer) + fanout := prometheus.NewFanout([]storage.Appendable{prometheus.NewFanout(nil, "1", promclient.DefaultRegisterer, ls)}, "", promclient.DefaultRegisterer, ls) app := fanout.Appender(t.Context()) err := app.Rollback() require.NoError(t, err) } func TestCommit(t *testing.T) { - ls := labelstore.New(nil, prometheus.DefaultRegisterer) - fanout := NewFanout([]storage.Appendable{NewFanout(nil, "1", prometheus.DefaultRegisterer, ls)}, "", prometheus.DefaultRegisterer, ls) + ls := labelstore.New(nil, promclient.DefaultRegisterer) + fanout := prometheus.NewFanout([]storage.Appendable{prometheus.NewFanout(nil, "1", promclient.DefaultRegisterer, ls)}, "", promclient.DefaultRegisterer, ls) app := fanout.Appender(t.Context()) err := app.Commit() require.NoError(t, err) } + +func TestNewFanoutIgnoresNilChildren(t *testing.T) { + ls := labelstore.New(nil, promclient.DefaultRegisterer) + fanout := prometheus.NewFanout([]storage.Appendable{nil, nil}, "", promclient.DefaultRegisterer, ls) + app := fanout.Appender(t.Context()) + err := app.Commit() + require.NoError(t, err) +} + +func TestNewFanoutWithNilLabelStore(t *testing.T) { + fanout := prometheus.NewFanout([]storage.Appendable{noopStore{}}, "", promclient.DefaultRegisterer, nil) + app := fanout.Appender(t.Context()) + _, err := app.Append(0, labels.FromStrings("foo", "bar"), time.Now().UnixMilli(), 1.0) + require.NoError(t, err) + err = app.Commit() + require.NoError(t, err) +} + +type benchAppenderFlowsItem struct { + series []labels.Labels + targetsCount int + useLabelStore bool +} + +func (i benchAppenderFlowsItem) name() string { + key := "seriesref" + if i.useLabelStore { + key = "labelstore" + } + + return fmt.Sprintf("pipeline=%s/targets=%d/metrics=%d", key, i.targetsCount, len(i.series)) +} + +// go test -bench="BenchmarkAppenderFlows" . -run ^$ -benchmem -count 6 -benchtime 5s | tee benchmarks +// benchstat -row '.name /targets /metrics' -col '/pipeline' benchmarks +func BenchmarkAppenderFlows(b *testing.B) { + labels := setupMetrics(2000) + cases := []benchAppenderFlowsItem{ + { + series: labels, + targetsCount: 1, + useLabelStore: true, + }, + { + series: labels, + targetsCount: 2, + useLabelStore: true, + }, + { + series: labels, + targetsCount: 1, + useLabelStore: false, + }, + { + series: labels, + targetsCount: 2, + useLabelStore: false, + }, + } + + for _, c := range cases { + now := time.Now().UnixMilli() + ls := labelstore.New(log.NewNopLogger(), promclient.DefaultRegisterer, c.useLabelStore) + + children := make([]storage.Appendable, c.targetsCount) + for i := range c.targetsCount { + children[i] = remotewrite.NewInterceptor(strconv.Itoa(i), &atomic.Bool{}, noopDebugDataPublisher{}, ls, noopStore{}) + } + fanout := prometheus.NewFanout(children, "fanout", promclient.DefaultRegisterer, ls) + + tname := c.name() + b.Run(tname, func(b *testing.B) { + b.ReportAllocs() + b.ResetTimer() + for b.Loop() { + app := fanout.Appender(b.Context()) + for _, metric := range c.series { + app.Append(0, metric, now, 1.0) + } + app.Commit() + + b.StopTimer() + fanout.Clear() + b.StartTimer() + } + }) + } +} + +type noopAppender struct { + refCounter atomic.Uint64 +} + +func (n noopAppender) Append(storage.SeriesRef, labels.Labels, int64, float64) (storage.SeriesRef, error) { + return storage.SeriesRef(n.refCounter.Inc()), nil +} + +func (n noopAppender) Commit() error { + return nil +} + +func (n noopAppender) Rollback() error { + return nil +} + +func (n noopAppender) SetOptions(*storage.AppendOptions) { +} + +func (n noopAppender) AppendExemplar(storage.SeriesRef, labels.Labels, exemplar.Exemplar) (storage.SeriesRef, error) { + return storage.SeriesRef(n.refCounter.Inc()), nil +} + +func (n noopAppender) AppendHistogram(storage.SeriesRef, labels.Labels, int64, *histogram.Histogram, *histogram.FloatHistogram) (storage.SeriesRef, error) { + return storage.SeriesRef(n.refCounter.Inc()), nil +} + +func (n noopAppender) AppendHistogramSTZeroSample(storage.SeriesRef, labels.Labels, int64, int64, *histogram.Histogram, *histogram.FloatHistogram) (storage.SeriesRef, error) { + return storage.SeriesRef(n.refCounter.Inc()), nil +} + +func (n noopAppender) UpdateMetadata(storage.SeriesRef, labels.Labels, metadata.Metadata) (storage.SeriesRef, error) { + return storage.SeriesRef(n.refCounter.Inc()), nil +} + +func (n noopAppender) AppendSTZeroSample(storage.SeriesRef, labels.Labels, int64, int64) (storage.SeriesRef, error) { + return storage.SeriesRef(n.refCounter.Inc()), nil +} + +type noopStore struct { + refCounter atomic.Uint64 +} + +func (n noopStore) Querier(int64, int64) (storage.Querier, error) { + return nil, nil +} + +func (n noopStore) ChunkQuerier(int64, int64) (storage.ChunkQuerier, error) { + return nil, nil +} + +func (n noopStore) Appender(context.Context) storage.Appender { + return noopAppender(n) +} + +func (n noopStore) StartTime() (int64, error) { + return 0, nil +} + +func (n noopStore) Close() error { + return nil +} + +// recordingAppender records the ref passed to each Append call. +type recordingAppender struct { + nextRef storage.SeriesRef + appendRefs []storage.SeriesRef +} + +func (r *recordingAppender) Append(ref storage.SeriesRef, _ labels.Labels, _ int64, _ float64) (storage.SeriesRef, error) { + r.appendRefs = append(r.appendRefs, ref) + if ref == 0 { + r.nextRef++ + return r.nextRef, nil + } + return ref, nil +} +func (r *recordingAppender) Commit() error { + return nil +} + +func (r *recordingAppender) Rollback() error { + return nil +} + +func (r *recordingAppender) SetOptions(*storage.AppendOptions) {} +func (r *recordingAppender) AppendExemplar(ref storage.SeriesRef, _ labels.Labels, _ exemplar.Exemplar) (storage.SeriesRef, error) { + return ref, nil +} +func (r *recordingAppender) AppendHistogram(ref storage.SeriesRef, _ labels.Labels, _ int64, _ *histogram.Histogram, _ *histogram.FloatHistogram) (storage.SeriesRef, error) { + return ref, nil +} +func (r *recordingAppender) AppendHistogramSTZeroSample(ref storage.SeriesRef, _ labels.Labels, _, _ int64, _ *histogram.Histogram, _ *histogram.FloatHistogram) (storage.SeriesRef, error) { + return ref, nil +} +func (r *recordingAppender) UpdateMetadata(ref storage.SeriesRef, _ labels.Labels, _ metadata.Metadata) (storage.SeriesRef, error) { + return ref, nil +} +func (r *recordingAppender) AppendSTZeroSample(ref storage.SeriesRef, _ labels.Labels, _, _ int64) (storage.SeriesRef, error) { + return ref, nil +} + +// recordingStore is a storage.Appendable backed by a recordingAppender. +type recordingStore struct{ appender *recordingAppender } + +func newRecordingStore() *recordingStore { + return &recordingStore{appender: &recordingAppender{nextRef: 5000}} +} +func (s *recordingStore) Appender(context.Context) storage.Appender { return s.appender } + +// TestFanout_SeriesRefMappingToPassthroughTransition verifies that when the fanout +// transitions from seriesRefMapping (2 children) to passthrough (1 child) via +// UpdateChildren, store-issued unique refs cached by callers are zeroed before +// forwarding so the child allocates a fresh ref. +func TestFanout_SeriesRefMappingToPassthroughTransition(t *testing.T) { + child1 := newRecordingStore() + child2 := newRecordingStore() + fanout := prometheus.NewFanout([]storage.Appendable{child1, child2}, "test", promclient.NewRegistry(), nil) + + // Phase 1 (2 children → seriesRefMapping): store issues unique ref 1 for lblsA. + app1 := fanout.Appender(t.Context()) + lblsA := labels.FromStrings("job", "seriesA") + uniqueRef, err := app1.Append(0, lblsA, 1, 1.0) + require.NoError(t, err) + require.NoError(t, app1.Commit()) + require.Equal(t, storage.SeriesRef(1), uniqueRef) + + // Transition to 1 child. + walChild := newRecordingStore() + fanout.UpdateChildren([]storage.Appendable{walChild}) + + // Phase 2 (1 child → passthrough): caller re-sends the store-issued unique ref. + // The passthrough must zero it so the child allocates a fresh ref. + app2 := fanout.Appender(t.Context()) + _, err = app2.Append(uniqueRef, lblsA, 2, 2.0) + require.NoError(t, err) + require.NoError(t, app2.Commit()) + + // The child must have been called with ref=0, not the store-issued unique ref. + require.Equal(t, storage.SeriesRef(0), walChild.appender.appendRefs[0], + "child must be called with ref=0, not the store-issued unique ref") +} + +// TestFanout_PassthroughToSeriesRefMappingTransition verifies that when the fanout +// transitions from passthrough (1 child) to seriesRefMapping (2 children) via +// UpdateChildren, a cached raw child ref that collides numerically with a new +// store-issued unique ref for a different series is handled correctly via label +// hash guards. +func TestFanout_PassthroughToSeriesRefMappingTransition(t *testing.T) { + walChild := newRecordingStore() + fanout := prometheus.NewFanout([]storage.Appendable{walChild}, "test", promclient.NewRegistry(), nil) + + // Phase 1 (1 child → passthrough): child returns raw ref 5001 for lblsB. + app1 := fanout.Appender(t.Context()) + lblsB := labels.FromStrings("job", "seriesB") + passthroughRef, err := app1.Append(0, lblsB, 1, 1.0) + require.NoError(t, err) + require.NoError(t, app1.Commit()) + // passthrough returns the raw child ref directly. + require.Equal(t, storage.SeriesRef(5001), passthroughRef) + + // Transition to 2 children. + child1 := newRecordingStore() + child2 := newRecordingStore() + fanout.UpdateChildren([]storage.Appendable{child1, child2}) + + // Phase 2 (2 children → seriesRefMapping): store issues unique ref 1 for lblsA. + app2 := fanout.Appender(t.Context()) + lblsA := labels.FromStrings("job", "seriesA") + _, err = app2.Append(0, lblsA, 2, 2.0) + require.NoError(t, err) + + // Caller re-sends passthroughRef for lblsB. The label hash guard must prevent + // it from matching lblsA's mapping and force a fresh append for lblsB. + refB, err := app2.Append(passthroughRef, lblsB, 3, 3.0) + require.NoError(t, err) + require.NoError(t, app2.Commit()) + + // lblsB must have its own mapping distinct from lblsA's. + require.NotEqual(t, storage.SeriesRef(1), refB, + "seriesB must not reuse seriesA's store-issued unique ref") + + // Both children must have been called with passthroughRef for lblsB, not seriesA's child refs. + require.Equal(t, passthroughRef, child1.appender.appendRefs[1], + "child1 must be called with passthrough ref for seriesB") + require.Equal(t, passthroughRef, child2.appender.appendRefs[1], + "child2 must be called with passthrough ref for seriesB") +} diff --git a/internal/component/prometheus/operator/common/crdmanager.go b/internal/component/prometheus/operator/common/crdmanager.go index cdaa323b982..b327115364d 100644 --- a/internal/component/prometheus/operator/common/crdmanager.go +++ b/internal/component/prometheus/operator/common/crdmanager.go @@ -183,6 +183,7 @@ func (c *crdManager) Run(ctx context.Context) error { // Start prometheus scrape manager. alloyAppendable := prometheus.NewFanout(c.args.ForwardTo, c.opts.ID, c.opts.Registerer, c.ls) + defer alloyAppendable.Clear() // TODO: Expose EnableCreatedTimestampZeroIngestion: https://github.com/grafana/alloy/issues/4045 scrapeOpts := &scrape.Options{ diff --git a/internal/component/prometheus/pipeline_test.go b/internal/component/prometheus/pipeline_test.go index 38e6e673427..1d293299584 100644 --- a/internal/component/prometheus/pipeline_test.go +++ b/internal/component/prometheus/pipeline_test.go @@ -4,6 +4,8 @@ import ( "context" "fmt" "log/slog" + "strconv" + "sync" "testing" "time" @@ -18,6 +20,7 @@ import ( "github.com/grafana/alloy/internal/component" "github.com/grafana/alloy/internal/component/prometheus" + "github.com/grafana/alloy/internal/component/prometheus/appenders" "github.com/grafana/alloy/internal/component/prometheus/relabel" "github.com/grafana/alloy/internal/component/prometheus/remotewrite" "github.com/grafana/alloy/internal/component/prometheus/scrape" @@ -33,166 +36,371 @@ import ( // This test simulates a scrape -> remote_write pipeline, without actually scraping func TestPipeline(t *testing.T) { - pipeline, ls, destination := newDefaultPipeline(t, util.TestLogger(t)) - - // We need to use a future timestamp since remote_write will ignore any - // sample which is earlier than the time when it started. Adding a minute - // ensures that our samples will never get ignored. - sampleTimestamp := time.Now().Add(time.Minute).UnixMilli() - - // Send metrics to our component. These will be written to the WAL and - // subsequently written to our HTTP server. - lset1 := labels.FromStrings("foo", "bar") - ref1 := sendMetric(t, pipeline.Appender(t.Context()), lset1, sampleTimestamp, 12) - lset2 := labels.FromStrings("fizz", "buzz") - ref2 := sendMetric(t, pipeline.Appender(t.Context()), lset2, sampleTimestamp, 34) - - expect := []*testappender.MetricSample{{ - Labels: lset1, - Timestamp: sampleTimestamp, - Value: 12, - }, { - Labels: lset2, - Timestamp: sampleTimestamp, - Value: 34, - }} - - require.EventuallyWithT(t, func(t *assert.CollectT) { - require.Len(t, destination.CollectedSamples(), 2) - require.ElementsMatch(t, expect, maps.Values(destination.CollectedSamples())) - }, 5*time.Second, 10*time.Millisecond, "timed out waiting for metrics to be written to destination") - - ref := ls.GetOrAddGlobalRefID(lset1) - require.NotZero(t, ref) - // Append result ref should match the labelstore ref - require.Equal(t, ref, ref1) - localRef := ls.GetLocalRefID("prometheus.remote_write.test", ref) - require.NotZero(t, localRef) - - ref = ls.GetOrAddGlobalRefID(lset2) - require.NotZero(t, ref) - // Append result ref should match the labelstore ref - require.Equal(t, ref, ref2) - localRef = ls.GetLocalRefID("prometheus.remote_write.test", ref) - require.NotZero(t, localRef) + tests := []struct { + name string + useLabelStore bool + }{ + {"without_labelstore", false}, + {"with_labelstore", true}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + destination := testappender.NewCollectingAppender() + cfg := refTrackingConfig{useLabelStore: tt.useLabelStore} + pipeline, ls, _ := newRemoteWritePipeline(t, util.TestLogger(t), 1, destination, cfg) + sampleTimestamp := time.Now().UnixMilli() + + // Send metrics to our component. These will be written to the WAL and to the collecting appender + lset1 := labels.FromStrings("foo", "bar") + ref1 := sendMetric(t, pipeline.Appender(t.Context()), lset1, sampleTimestamp, 12) + require.NotZero(t, ref1) + + lset2 := labels.FromStrings("fizz", "buzz") + ref2 := sendMetric(t, pipeline.Appender(t.Context()), lset2, sampleTimestamp, 34) + require.NotZero(t, ref2) + + expect := []*testappender.MetricSample{{ + Labels: lset1, + Timestamp: sampleTimestamp, + Value: 12, + }, { + Labels: lset2, + Timestamp: sampleTimestamp, + Value: 34, + }} + + require.EventuallyWithT(t, func(t *assert.CollectT) { + require.Len(t, destination.CollectedSamples(), 2) + require.ElementsMatch(t, expect, maps.Values(destination.CollectedSamples())) + }, 5*time.Second, 10*time.Millisecond, "timed out waiting for metrics to be written to destination") + + if tt.useLabelStore { + ref := ls.GetOrAddGlobalRefID(lset1) + require.NotZero(t, ref) + // Append result ref should match the labelstore ref + require.Equal(t, ref, ref1) + localRef := ls.GetLocalRefID("prometheus.remote_write.test", ref) + require.NotZero(t, localRef) + + ref = ls.GetOrAddGlobalRefID(lset2) + require.NotZero(t, ref) + // Append result ref should match the labelstore ref + require.Equal(t, ref, ref2) + localRef = ls.GetLocalRefID("prometheus.remote_write.test", ref) + require.NotZero(t, localRef) + } + }) + } } // This test simulates a scrape -> relabel -> remote_write pipeline, without actually scraping func TestRelabelPipeline(t *testing.T) { - pipeline, ls, destination := newRelabelPipeline(t, util.TestLogger(t)) - - // We need to use a future timestamp since remote_write will ignore any - // sample which is earlier than the time when it started. Adding a minute - // ensures that our samples will never get ignored. - sampleTimestamp := time.Now().Add(time.Minute).UnixMilli() - - // Send metrics to our component. These will be written to the WAL and - // subsequently written to our HTTP server. - lset1 := labels.FromStrings("foo", "bar") - ref1 := sendMetric(t, pipeline.Appender(t.Context()), lset1, sampleTimestamp, 12) - lset2 := labels.FromStrings("fizz", "buzz") - ref2 := sendMetric(t, pipeline.Appender(t.Context()), lset2, sampleTimestamp, 34) - - expect := []*testappender.MetricSample{{ - Labels: labels.NewBuilder(lset1).Set("lbl", "foo").Labels(), - Timestamp: sampleTimestamp, - Value: 12, - }, { - Labels: labels.NewBuilder(lset2).Set("lbl", "foo").Labels(), - Timestamp: sampleTimestamp, - Value: 34, - }} - - require.EventuallyWithT(t, func(t *assert.CollectT) { - require.Len(t, destination.CollectedSamples(), 2) - require.ElementsMatch(t, expect, maps.Values(destination.CollectedSamples())) - }, 1*time.Minute, 100*time.Millisecond, "timed out waiting for metrics to be written to destination") - - ref := ls.GetOrAddGlobalRefID(lset1) - require.NotZero(t, ref) - // Append result ref should match the labelstore ref - require.Equal(t, ref, ref1) - // This was relabeled, so we shouldn't have a local ref - localRef := ls.GetLocalRefID("prometheus.remote_write.test", ref) - require.Zero(t, localRef) - - ref = ls.GetOrAddGlobalRefID(lset2) - require.NotZero(t, ref) - // Append result ref should match the labelstore ref - require.Equal(t, ref, ref2) - - // This was relabeled, so we shouldn't have a local ref - localRef = ls.GetLocalRefID("prometheus.remote_write.test", ref) - require.Zero(t, localRef) - - lset1Relabeled := labels.NewBuilder(lset1).Set("lbl", "foo").Labels() - ref = ls.GetOrAddGlobalRefID(lset1Relabeled) - require.NotZero(t, ref) - localRef = ls.GetLocalRefID("prometheus.remote_write.test", ref) - require.NotZero(t, localRef) - - lset2Relabeled := labels.NewBuilder(lset2).Set("lbl", "foo").Labels() - ref = ls.GetOrAddGlobalRefID(lset2Relabeled) - require.NotZero(t, ref) - localRef = ls.GetLocalRefID("prometheus.remote_write.test", ref) - require.NotZero(t, localRef) + tests := []struct { + name string + useLabelStore bool + }{ + {"without_labelstore", false}, + {"with_labelstore", true}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + destination := testappender.NewCollectingAppender() + cfg := refTrackingConfig{useLabelStore: tt.useLabelStore} + pipeline, ls, _ := newRelabelPipeline(t, util.TestLogger(t), destination, cfg) + sampleTimestamp := time.Now().UnixMilli() + + // Send metrics to our component. These will be written to the WAL and to the collecting appender + lset1 := labels.FromStrings("foo", "bar") + ref1 := sendMetric(t, pipeline.Appender(t.Context()), lset1, sampleTimestamp, 12) + require.NotZero(t, ref1) + + lset2 := labels.FromStrings("fizz", "buzz") + ref2 := sendMetric(t, pipeline.Appender(t.Context()), lset2, sampleTimestamp, 34) + require.NotZero(t, ref2) + + expect := []*testappender.MetricSample{{ + Labels: labels.NewBuilder(lset1).Set("lbl", "foo").Labels(), + Timestamp: sampleTimestamp, + Value: 12, + }, { + Labels: labels.NewBuilder(lset2).Set("lbl", "foo").Labels(), + Timestamp: sampleTimestamp, + Value: 34, + }} + + require.EventuallyWithT(t, func(t *assert.CollectT) { + require.Len(t, destination.CollectedSamples(), 2) + require.ElementsMatch(t, expect, maps.Values(destination.CollectedSamples())) + }, 5*time.Second, 100*time.Millisecond, "timed out waiting for metrics to be written to destination") + + if tt.useLabelStore { + ref := ls.GetOrAddGlobalRefID(lset1) + require.NotZero(t, ref) + // Append result ref should match the labelstore ref + require.Equal(t, ref, ref1) + // This was relabeled, so we shouldn't have a local ref + localRef := ls.GetLocalRefID("prometheus.remote_write.test", ref) + require.Zero(t, localRef) + + ref = ls.GetOrAddGlobalRefID(lset2) + require.NotZero(t, ref) + // Append result ref should match the labelstore ref + require.Equal(t, ref, ref2) + + // This was relabeled, so we shouldn't have a local ref + localRef = ls.GetLocalRefID("prometheus.remote_write.test", ref) + require.Zero(t, localRef) + + lset1Relabeled := labels.NewBuilder(lset1).Set("lbl", "foo").Labels() + ref = ls.GetOrAddGlobalRefID(lset1Relabeled) + require.NotZero(t, ref) + localRef = ls.GetLocalRefID("prometheus.remote_write.test", ref) + require.NotZero(t, localRef) + + lset2Relabeled := labels.NewBuilder(lset2).Set("lbl", "foo").Labels() + ref = ls.GetOrAddGlobalRefID(lset2Relabeled) + require.NotZero(t, ref) + localRef = ls.GetLocalRefID("prometheus.remote_write.test", ref) + require.NotZero(t, localRef) + } + }) + } +} + +type refTrackingConfig struct { + useLabelStore bool } +func (ref refTrackingConfig) TestNameString() string { + if ref.useLabelStore { + return "labelstore" + } + + return "seriesrefmapping" +} + +// go test -bench="BenchmarkPipelines" ./internal/component/prometheus -run ^$ -benchmem -count 6 -benchtime 5s | tee benchmarks +// benchstat -row ".name /pipeline /remotewritecomponents /new-metrics" -filter "/new-metrics:(10 OR 1000)" -col /reftrackingconfig benchmarks +// benchstat -row ".name /pipeline /remotewritecomponents /concurrent-existing-metrics" -filter "/concurrent-existing-metrics:(10 OR 1000)" -col /reftrackingconfig benchmarks func BenchmarkPipelines(b *testing.B) { - tests := []struct { + pipelineTypes := []struct { name string - pipelineBuilder func(t testing.TB, logger log.Logger) (storage.Appendable, labelstore.LabelStore, testappender.CollectingAppender) + pipelineBuilder func(t testing.TB, logger log.Logger, rwComponents int, refTrackingConfig refTrackingConfig) (storage.Appendable, labelstore.LabelStore, clearCacheFunc) }{ - {"default", newDefaultPipeline}, - {"relabel", newRelabelPipeline}, + { + name: "remote_write", + pipelineBuilder: func(t testing.TB, logger log.Logger, rwComponents int, refTrackingConfig refTrackingConfig) (storage.Appendable, labelstore.LabelStore, clearCacheFunc) { + return newRemoteWritePipeline(t, logger, rwComponents, appenders.Noop{}, refTrackingConfig) + }, + }, + { + name: "relabel-remote_write", + pipelineBuilder: func(t testing.TB, logger log.Logger, _ int, refTrackingConfig refTrackingConfig) (storage.Appendable, labelstore.LabelStore, clearCacheFunc) { + return newRelabelPipeline(t, logger, appenders.Noop{}, refTrackingConfig) + }, + }, } - numberOfMetrics := []int{2, 10, 100, 1000} + testConfigs := []struct { + numberOfRWComponents int + refTrackingConfig refTrackingConfig + skipFor map[string]struct{} + }{ + { + numberOfRWComponents: 1, + refTrackingConfig: refTrackingConfig{ + useLabelStore: true, + }, + }, + { + numberOfRWComponents: 1, + refTrackingConfig: refTrackingConfig{ + useLabelStore: false, + }, + }, + { + numberOfRWComponents: 2, + refTrackingConfig: refTrackingConfig{ + useLabelStore: true, + }, + skipFor: map[string]struct{}{"relabel-remote_write": {}}, + }, + { + numberOfRWComponents: 2, + refTrackingConfig: refTrackingConfig{ + useLabelStore: false, + }, + skipFor: map[string]struct{}{"relabel-remote_write": {}}, + }, + } + // Simulates appending various numbers of new metrics sequentially + numberOfMetrics := []int{10, 1000} for _, n := range numberOfMetrics { - for _, tt := range tests { - // Don't need care about the labelstore and destination for benchmarks - pipeline, _, _ := tt.pipelineBuilder(b, log.NewNopLogger()) - b.Run(fmt.Sprintf("%s/%d-metrics", tt.name, n), func(b *testing.B) { - b.ReportAllocs() - b.ResetTimer() - - for b.Loop() { - for i := 0; i < n; i++ { - sendMetric( - b, - pipeline.Appender(b.Context()), - labels.FromStrings(fmt.Sprintf("metric-%d", i), fmt.Sprintf("metric-%d", i)), - time.Now().Add(time.Minute).UnixMilli(), - float64(i), - ) + for _, pipelineType := range pipelineTypes { + for _, config := range testConfigs { + if _, skip := config.skipFor[pipelineType.name]; skip { + continue + } + + testName := fmt.Sprintf("pipeline=%s/remotewritecomponents=%d/reftrackingconfig=%s/new-metrics=%d", + pipelineType.name, config.numberOfRWComponents, config.refTrackingConfig.TestNameString(), n) + + b.Run(testName, func(b *testing.B) { + pipeline, _, clearCache := pipelineType.pipelineBuilder(b, log.NewNopLogger(), config.numberOfRWComponents, config.refTrackingConfig) + metrics := setupMetrics(n) + b.ReportAllocs() + b.ResetTimer() + + for b.Loop() { + a := pipeline.Appender(b.Context()) + for i, metric := range metrics { + a.Append(0, metric, time.Now().UnixMilli(), float64(i)) + } + a.Commit() + + b.StopTimer() + clearCache() + b.StartTimer() } + }) + } + } + } + + // Simulate concurrently appending from multiple scrapers for known metrics + concurrency := []int{10, 1000} + for _, c := range concurrency { + for _, pipelineType := range pipelineTypes { + for _, config := range testConfigs { + if _, skip := config.skipFor[pipelineType.name]; skip { + continue } - }) + testName := fmt.Sprintf("pipeline=%s/remotewritecomponents=%d/reftrackingconfig=%s/concurrent-existing-metrics=%d", + pipelineType.name, config.numberOfRWComponents, config.refTrackingConfig.TestNameString(), c) + + b.Run(testName, func(b *testing.B) { + pipeline, _, _ := pipelineType.pipelineBuilder(b, log.NewNopLogger(), config.numberOfRWComponents, config.refTrackingConfig) + var metricsForAppenders [][]labels.Labels + numMetrics := 1000 + for appenderIndex := range c { + metrics := setupMetrics(numMetrics, fmt.Sprintf("concurrency-%d", appenderIndex)) + + // Send them through once so further appends can use "known refs" + a := pipeline.Appender(b.Context()) + for metricIndex, metric := range metrics { + expectedRef := storage.SeriesRef(appenderIndex*numMetrics + metricIndex + 1) + ref, err := a.Append(expectedRef, metric, time.Now().UnixMilli(), float64(metricIndex)) + require.NoError(b, err) + require.Equal(b, expectedRef, ref) + } + require.NoError(b, a.Commit()) + + metricsForAppenders = append(metricsForAppenders, metrics) + } + b.ReportAllocs() + b.ResetTimer() + + for b.Loop() { + var wg sync.WaitGroup + errCh := make(chan error, 1) + + reportErr := func(err error) { + if err == nil { + return + } + select { + case errCh <- err: + default: + } + } + + for appenderIndex := range c { + wg.Add(1) + go func(appenderIndex int) { + defer wg.Done() + + a := pipeline.Appender(b.Context()) + for metricIndex, metric := range metricsForAppenders[appenderIndex] { + ref := storage.SeriesRef(appenderIndex*numMetrics + metricIndex + 1) + _, err := a.Append(ref, metric, time.Now().UnixMilli(), float64(metricIndex)) + if err != nil { + reportErr(fmt.Errorf("append failed for appender=%d metric=%d: %w", appenderIndex, metricIndex, err)) + return + } + } + if err := a.Commit(); err != nil { + reportErr(fmt.Errorf("commit failed for appender=%d: %w", appenderIndex, err)) + } + }(appenderIndex) + } + + wg.Wait() + select { + case err := <-errCh: + b.Fatal(err) + default: + } + } + }) + } + } + } +} + +func setupMetrics(numberOfMetrics int, extraLabels ...string) []labels.Labels { + metrics := make([]labels.Labels, 0, numberOfMetrics) + for i := range numberOfMetrics { + key := fmt.Sprintf("metric-%d", i) + value := strconv.Itoa(i) + lbls := labels.FromStrings(key, value) + for _, extraLabel := range extraLabels { + lbls = labels.NewBuilder(lbls).Set(extraLabel, "value").Labels() } + metrics = append(metrics, lbls) } + return metrics } -func newDefaultPipeline(t testing.TB, logger log.Logger) (storage.Appendable, labelstore.LabelStore, testappender.CollectingAppender) { - ls := labelstore.New(logger, promclient.DefaultRegisterer) - rwAppendable, rwDestination := newRemoteWriteComponent(t, logger, ls) - pipelineAppendable := prometheus.NewFanout([]storage.Appendable{rwAppendable}, "", promclient.DefaultRegisterer, ls) - scrapeInterceptor := scrape.NewInterceptor("prometheus.scrape.test", livedebugging.NewLiveDebugging(), pipelineAppendable) +type clearCacheFunc = func() + +func newRemoteWritePipeline(t testing.TB, logger log.Logger, numberOfRemoteWriteComponents int, destination storage.Appender, config refTrackingConfig) (storage.Appendable, labelstore.LabelStore, clearCacheFunc) { + ls := labelstore.New(logger, promclient.DefaultRegisterer, config.useLabelStore) + + destAppendable := testappender.ConstantAppendable{Inner: destination} + + rwAppendables := make([]storage.Appendable, 0, numberOfRemoteWriteComponents) + for range numberOfRemoteWriteComponents { + rwAppendable := newRemoteWriteComponent(t, logger, ls, destAppendable) + rwAppendables = append(rwAppendables, rwAppendable) + } + pipelineAppendable := prometheus.NewFanout(rwAppendables, "", promclient.DefaultRegisterer, ls) + scrapeInterceptor := scrape.NewInterceptor("prometheus.scrape.test", noopDebugDataPublisher{}, pipelineAppendable) - return scrapeInterceptor, ls, rwDestination + return scrapeInterceptor, ls, func() { + pipelineAppendable.Clear() + ls.Clear() + } } -func newRelabelPipeline(t testing.TB, logger log.Logger) (storage.Appendable, labelstore.LabelStore, testappender.CollectingAppender) { - ls := labelstore.New(logger, promclient.DefaultRegisterer) - rwAppendable, rwDestination := newRemoteWriteComponent(t, logger, ls) +func newRelabelPipeline(t testing.TB, logger log.Logger, destination storage.Appender, config refTrackingConfig) (storage.Appendable, labelstore.LabelStore, clearCacheFunc) { + ls := labelstore.New(logger, promclient.DefaultRegisterer, config.useLabelStore) + + destAppendable := testappender.ConstantAppendable{Inner: destination} + rwAppendable := newRemoteWriteComponent(t, logger, ls, destAppendable) relabelAppendable := newRelabelComponent(t, logger, []storage.Appendable{rwAppendable}, ls) pipelineAppendable := prometheus.NewFanout([]storage.Appendable{relabelAppendable}, "", promclient.DefaultRegisterer, ls) - scrapeInterceptor := scrape.NewInterceptor("prometheus.scrape.test", livedebugging.NewLiveDebugging(), pipelineAppendable) + scrapeInterceptor := scrape.NewInterceptor("prometheus.scrape.test", noopDebugDataPublisher{}, pipelineAppendable) - return scrapeInterceptor, ls, rwDestination + return scrapeInterceptor, ls, func() { + pipelineAppendable.Clear() + ls.Clear() + } } -func newRemoteWriteComponent(t testing.TB, logger log.Logger, ls *labelstore.Service) (storage.Appendable, testappender.CollectingAppender) { +func newRemoteWriteComponent(t testing.TB, logger log.Logger, ls labelstore.LabelStore, destination storage.Appendable) storage.Appendable { walDir := t.TempDir() walStorage, err := wal.NewStorage(logger, promclient.NewRegistry(), walDir) @@ -204,15 +412,14 @@ func newRemoteWriteComponent(t testing.TB, logger log.Logger, ls *labelstore.Ser ), ) - inMemoryAppendable := testappender.ConstantAppendable{Inner: testappender.NewCollectingAppender()} - store := storage.NewFanout(fanoutLogger, walStorage, testStorage{inMemoryAppendable: inMemoryAppendable}) + store := storage.NewFanout(fanoutLogger, walStorage, testStorage{destination: destination}) t.Cleanup(func() { store.Close() walStorage.Close() }) - return remotewrite.NewInterceptor("prometheus.remote_write.test", &atomic.Bool{}, livedebugging.NewLiveDebugging(), ls, store), inMemoryAppendable.Inner + return remotewrite.NewInterceptor("prometheus.remote_write.test", &atomic.Bool{}, noopDebugDataPublisher{}, ls, store) } type testStorage struct { @@ -220,11 +427,11 @@ type testStorage struct { storage.Queryable storage.ChunkQueryable - inMemoryAppendable storage.Appendable + destination storage.Appendable } func (t testStorage) Appender(ctx context.Context) storage.Appender { - return t.inMemoryAppendable.Appender(ctx) + return t.destination.Appender(ctx) } func (t testStorage) StartTime() (int64, error) { @@ -235,7 +442,7 @@ func (t testStorage) Close() error { return nil } -func newRelabelComponent(t testing.TB, logger log.Logger, forwardTo []storage.Appendable, ls *labelstore.Service) storage.Appendable { +func newRelabelComponent(t testing.TB, logger log.Logger, forwardTo []storage.Appendable, ls labelstore.LabelStore) storage.Appendable { cfg := `forward_to = [] rule { action = "replace" @@ -273,3 +480,8 @@ func sendMetric(t testing.TB, appender storage.Appender, labels labels.Labels, t return uint64(ref) } + +// noopDebugDataPublisher is a no-op implementation of livedebugging.DebugDataPublisher for testing. +type noopDebugDataPublisher struct{} + +func (n noopDebugDataPublisher) PublishIfActive(livedebugging.Data) {} diff --git a/internal/component/prometheus/receive_http/receive_http.go b/internal/component/prometheus/receive_http/receive_http.go index 197c263804e..98c71bdd220 100644 --- a/internal/component/prometheus/receive_http/receive_http.go +++ b/internal/component/prometheus/receive_http/receive_http.go @@ -128,6 +128,7 @@ func New(opts component.Options, args Arguments) (*Component, error) { // Run satisfies the Component interface. func (c *Component) Run(ctx context.Context) error { + defer c.fanout.Clear() defer func() { c.updateMut.Lock() defer c.updateMut.Unlock() diff --git a/internal/component/prometheus/relabel/relabel.go b/internal/component/prometheus/relabel/relabel.go index 3cf7f8c12d8..378f65273a1 100644 --- a/internal/component/prometheus/relabel/relabel.go +++ b/internal/component/prometheus/relabel/relabel.go @@ -91,7 +91,7 @@ type Component struct { debugDataPublisher livedebugging.DebugDataPublisher cacheMut sync.RWMutex - cache *lru.Cache[storage.SeriesRef, labels.Labels] + cache *lru.Cache[uint64, labels.Labels] } var ( @@ -101,7 +101,7 @@ var ( // New creates a new prometheus.relabel component. func New(o component.Options, args Arguments) (*Component, error) { - cache, err := lru.New[storage.SeriesRef, labels.Labels](args.CacheSize) + cache, err := lru.New[uint64, labels.Labels](args.CacheSize) if err != nil { return nil, err } @@ -162,7 +162,7 @@ func New(o component.Options, args Arguments) (*Component, error) { return 0, fmt.Errorf("%s has exited", o.ID) } - newLbl := c.relabel(ref, v, l) + newLbl := c.relabel(v, l) if newLbl.IsEmpty() { return 0, nil } @@ -176,7 +176,7 @@ func New(o component.Options, args Arguments) (*Component, error) { return 0, fmt.Errorf("%s has exited", o.ID) } - newLbl := c.relabel(ref, 0, l) + newLbl := c.relabel(0, l) if newLbl.IsEmpty() { return 0, nil } @@ -189,7 +189,7 @@ func New(o component.Options, args Arguments) (*Component, error) { return 0, fmt.Errorf("%s has exited", o.ID) } - newLbl := c.relabel(ref, 0, l) + newLbl := c.relabel(0, l) if newLbl.IsEmpty() { return 0, nil } @@ -202,7 +202,7 @@ func New(o component.Options, args Arguments) (*Component, error) { return 0, fmt.Errorf("%s has exited", o.ID) } - newLbl := c.relabel(ref, 0, l) + newLbl := c.relabel(0, l) if newLbl.IsEmpty() { return 0, nil } @@ -227,6 +227,7 @@ func New(o component.Options, args Arguments) (*Component, error) { // Run implements component.Component. func (c *Component) Run(ctx context.Context) error { defer c.exited.Store(true) + defer c.fanout.Clear() <-ctx.Done() return nil @@ -247,7 +248,7 @@ func (c *Component) Update(args component.Arguments) error { return nil } -func (c *Component) relabel(ref storage.SeriesRef, val float64, lbls labels.Labels) labels.Labels { +func (c *Component) relabel(val float64, lbls labels.Labels) labels.Labels { c.mut.RLock() defer c.mut.RUnlock() @@ -257,7 +258,7 @@ func (c *Component) relabel(ref storage.SeriesRef, val float64, lbls labels.Labe relabelled labels.Labels keep bool ) - newLbls, found := c.getFromCache(ref) + newLbls, found := c.getFromCache(lbls) if found { c.cacheHits.Inc() // If newLbls is empty but cache entry was found then we want to keep the value empty, if it's not we want to reuse the labels @@ -269,12 +270,12 @@ func (c *Component) relabel(ref storage.SeriesRef, val float64, lbls labels.Labe // slice. relabelled, keep = relabel.Process(lbls.Copy(), c.mrc...) c.cacheMisses.Inc() - c.addToCache(ref, relabelled, keep) + c.addToCache(lbls, relabelled, keep) } // If stale remove from the cache, the reason we don't exit early is so the stale value can propagate. if value.IsStaleNaN(val) { - c.deleteFromCache(ref) + c.deleteFromCache(lbls) } // Set the cache size to the cache.len // TODO(@mattdurham): Instead of setting this each time could collect on demand for better performance. @@ -297,37 +298,42 @@ func (c *Component) relabel(ref storage.SeriesRef, val float64, lbls labels.Labe return relabelled } -func (c *Component) getFromCache(id storage.SeriesRef) (labels.Labels, bool) { +func (c *Component) getFromCache(lbls labels.Labels) (labels.Labels, bool) { c.cacheMut.RLock() defer c.cacheMut.RUnlock() - fm, found := c.cache.Get(id) + hash := lbls.Hash() + fm, found := c.cache.Get(hash) + return fm, found } -func (c *Component) deleteFromCache(id storage.SeriesRef) { +func (c *Component) deleteFromCache(lbls labels.Labels) { c.cacheMut.Lock() defer c.cacheMut.Unlock() c.cacheDeletes.Inc() - c.cache.Remove(id) + + hash := lbls.Hash() + c.cache.Remove(hash) } func (c *Component) clearCache(cacheSize int) { c.cacheMut.Lock() defer c.cacheMut.Unlock() - cache, _ := lru.New[storage.SeriesRef, labels.Labels](cacheSize) + cache, _ := lru.New[uint64, labels.Labels](cacheSize) c.cache = cache } -func (c *Component) addToCache(originalID storage.SeriesRef, lbls labels.Labels, keep bool) { +func (c *Component) addToCache(lbls labels.Labels, relabeled labels.Labels, keep bool) { c.cacheMut.Lock() defer c.cacheMut.Unlock() + hash := lbls.Hash() if !keep { - c.cache.Add(originalID, labels.EmptyLabels()) + c.cache.Add(hash, labels.EmptyLabels()) return } - c.cache.Add(originalID, lbls) + c.cache.Add(hash, relabeled) } func (c *Component) LiveDebugging() {} diff --git a/internal/component/prometheus/relabel/relabel_test.go b/internal/component/prometheus/relabel/relabel_test.go index 0eee251432e..88a7460f954 100644 --- a/internal/component/prometheus/relabel/relabel_test.go +++ b/internal/component/prometheus/relabel/relabel_test.go @@ -28,7 +28,7 @@ import ( func TestUpdateReset(t *testing.T) { relabeller := generateRelabel(t) lbls := labels.FromStrings("__address__", "localhost") - relabeller.relabel(storage.SeriesRef(1), 0, lbls) + relabeller.relabel(0, lbls) require.True(t, relabeller.cache.Len() == 1) _ = relabeller.Update(Arguments{ CacheSize: 100000, @@ -73,7 +73,7 @@ func TestNil(t *testing.T) { require.NoError(t, err) lbls := labels.FromStrings("__address__", "localhost") - relabeller.relabel(storage.SeriesRef(1), 0, lbls) + relabeller.relabel(0, lbls) } func TestLRU(t *testing.T) { @@ -81,7 +81,7 @@ func TestLRU(t *testing.T) { for i := 0; i < 600_000; i++ { lbls := labels.FromStrings("__address__", "localhost", "inc", strconv.Itoa(i)) - relabeller.relabel(storage.SeriesRef(i), 0, lbls) + relabeller.relabel(0, lbls) } require.Equal(t, 100_000, relabeller.cache.Len()) } @@ -89,15 +89,16 @@ func TestLRU(t *testing.T) { func TestLRUNaN(t *testing.T) { relabeller := generateRelabel(t) lbls := labels.FromStrings("__address__", "localhost") - ref := storage.SeriesRef(1) - relabeller.relabel(ref, 0, lbls) + relabeled := relabeller.relabel(0, lbls) - _, found := relabeller.getFromCache(ref) + require.NotEqual(t, lbls, relabeled) + + _, found := relabeller.getFromCache(lbls) require.True(t, found) - relabeller.relabel(ref, math.Float64frombits(value.StaleNaN), lbls) + relabeller.relabel(math.Float64frombits(value.StaleNaN), lbls) - _, found = relabeller.getFromCache(ref) + _, found = relabeller.getFromCache(lbls) require.False(t, found) } @@ -105,7 +106,7 @@ func TestMetrics(t *testing.T) { relabeller := generateRelabel(t) lbls := labels.FromStrings("__address__", "localhost") - relabeller.relabel(storage.SeriesRef(1), 0, lbls) + relabeller.relabel(0, lbls) m := &dto.Metric{} err := relabeller.metricsProcessed.Write(m) require.NoError(t, err) @@ -118,14 +119,15 @@ func BenchmarkCache(b *testing.B) { return ref, nil })) var entry storage.Appendable - _, _ = New(component.Options{ + _, err := New(component.Options{ ID: "1", Logger: util.TestAlloyLogger(b), OnStateChange: func(e component.Exports) { newE := e.(Exports) entry = newE.Receiver }, - Registerer: prom.NewRegistry(), + Registerer: prom.NewRegistry(), + GetServiceData: getServiceData, }, Arguments{ ForwardTo: []storage.Appendable{fanout}, MetricRelabelConfigs: []*alloy_relabel.Config{ @@ -137,7 +139,9 @@ func BenchmarkCache(b *testing.B) { Action: "replace", }, }, + CacheSize: 100_000, }) + require.NoError(b, err) lbls := labels.FromStrings("__address__", "localhost") app := entry.Appender(b.Context()) @@ -230,3 +234,40 @@ func getServiceData(name string) (any, error) { return nil, fmt.Errorf("service not found %s", name) } } + +// TestHashCollision demonstrates that the cache can return incorrect results when +// two different labelsets have the same hash. This is a known limitation of using +// hash as the cache key without collision detection. +func TestHashCollision(t *testing.T) { + relabeller := generateRelabel(t) + + // These two series have the same XXHash; thanks to https://github.com/pstibrany/labels_hash_collisions + ls1 := labels.FromStrings("__name__", "metric", "lbl", "HFnEaGl") + ls2 := labels.FromStrings("__name__", "metric", "lbl", "RqcXatm") + + if ls1.Hash() != ls2.Hash() { + // These ones are the same when using -tags slicelabels + ls1 = labels.FromStrings("__name__", "metric", "lbl1", "value", "lbl2", "l6CQ5y") + ls2 = labels.FromStrings("__name__", "metric", "lbl1", "value", "lbl2", "v7uDlF") + } + + if ls1.Hash() != ls2.Hash() { + t.Skip("Unable to find colliding label hashes for this labels implementation") + } + + // Relabel the first labelset - this will cache the result + relabeled1 := relabeller.relabel(0, ls1) + require.NotEmpty(t, relabeled1) + + // Relabel the second labelset - due to hash collision, this will return + // the cached result from ls1 instead of relabeling ls2 + relabeled2 := relabeller.relabel(0, ls2) + require.NotEmpty(t, relabeled2) + + // This documents an inherited deficiency + t.Log("Expected failure: hash collision causes cache to return wrong labels") + require.True(t, labels.Equal(relabeled1, relabeled2), + "Hash collision: different input labels produced same cached output. "+ + "ls1=%s, ls2=%s, relabeled1=%s, relabeled2=%s", + ls1.String(), ls2.String(), relabeled1.String(), relabeled2.String()) +} diff --git a/internal/component/prometheus/remotewrite/interceptor.go b/internal/component/prometheus/remotewrite/interceptor.go index dbd2ae0ce02..fe0231e93cb 100644 --- a/internal/component/prometheus/remotewrite/interceptor.go +++ b/internal/component/prometheus/remotewrite/interceptor.go @@ -51,15 +51,23 @@ func NewInterceptor(componentID string, exited *atomic.Bool, debugDataPublisher // treat the remote_write ID as a "local ID" and translate it to a "global // ID" to ensure Alloy compatibility. - prometheus.WithAppendHook(func(globalRef storage.SeriesRef, l labels.Labels, t int64, v float64, next storage.Appender) (storage.SeriesRef, error) { + prometheus.WithAppendHook(func(ref storage.SeriesRef, l labels.Labels, t int64, v float64, next storage.Appender) (storage.SeriesRef, error) { if exited.Load() { return 0, fmt.Errorf("%s has exited", componentID) } - localRef := ls.GetLocalRefID(componentID, uint64(globalRef)) - newLocalRef, nextErr := next.Append(storage.SeriesRef(localRef), l, t, v) - if nextErr == nil { - handleLocalLink(uint64(globalRef), l, localRef, uint64(newLocalRef)) + var finalRef storage.SeriesRef + var err error + if ls.Enabled() { + localRef := ls.GetLocalRefID(componentID, uint64(ref)) + newLocalRef, nextErr := next.Append(storage.SeriesRef(localRef), l, t, v) + if nextErr == nil { + handleLocalLink(uint64(ref), l, localRef, uint64(newLocalRef)) + } + finalRef = ref + err = nextErr + } else { + finalRef, err = next.Append(ref, l, t, v) } debugDataPublisher.PublishIfActive(livedebugging.NewData( @@ -70,17 +78,25 @@ func NewInterceptor(componentID string, exited *atomic.Bool, debugDataPublisher return fmt.Sprintf("sample: ts=%d, labels=%s, value=%f", t, l, v) }, )) - return globalRef, nextErr + return finalRef, err }), - prometheus.WithHistogramHook(func(globalRef storage.SeriesRef, l labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram, next storage.Appender) (storage.SeriesRef, error) { + prometheus.WithHistogramHook(func(ref storage.SeriesRef, l labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram, next storage.Appender) (storage.SeriesRef, error) { if exited.Load() { return 0, fmt.Errorf("%s has exited", componentID) } - localRef := ls.GetLocalRefID(componentID, uint64(globalRef)) - newLocalRef, nextErr := next.AppendHistogram(storage.SeriesRef(localRef), l, t, h, fh) - if nextErr == nil { - handleLocalLink(uint64(globalRef), l, localRef, uint64(newLocalRef)) + var finalRef storage.SeriesRef + var err error + if ls.Enabled() { + localRef := ls.GetLocalRefID(componentID, uint64(ref)) + newLocalRef, nextErr := next.AppendHistogram(storage.SeriesRef(localRef), l, t, h, fh) + if nextErr == nil { + handleLocalLink(uint64(ref), l, localRef, uint64(newLocalRef)) + } + finalRef = ref + err = nextErr + } else { + finalRef, err = next.AppendHistogram(ref, l, t, h, fh) } debugDataPublisher.PublishIfActive(livedebugging.NewData( @@ -99,17 +115,25 @@ func NewInterceptor(componentID string, exited *atomic.Bool, debugDataPublisher return data }, )) - return globalRef, nextErr + return finalRef, err }), - prometheus.WithMetadataHook(func(globalRef storage.SeriesRef, l labels.Labels, m metadata.Metadata, next storage.Appender) (storage.SeriesRef, error) { + prometheus.WithMetadataHook(func(ref storage.SeriesRef, l labels.Labels, m metadata.Metadata, next storage.Appender) (storage.SeriesRef, error) { if exited.Load() { return 0, fmt.Errorf("%s has exited", componentID) } - localRef := ls.GetLocalRefID(componentID, uint64(globalRef)) - newLocalRef, nextErr := next.UpdateMetadata(storage.SeriesRef(localRef), l, m) - if nextErr == nil { - handleLocalLink(uint64(globalRef), l, localRef, uint64(newLocalRef)) + var finalRef storage.SeriesRef + var err error + if ls.Enabled() { + localRef := ls.GetLocalRefID(componentID, uint64(ref)) + newLocalRef, nextErr := next.UpdateMetadata(storage.SeriesRef(localRef), l, m) + if nextErr == nil { + handleLocalLink(uint64(ref), l, localRef, uint64(newLocalRef)) + } + finalRef = ref + err = nextErr + } else { + finalRef, err = next.UpdateMetadata(ref, l, m) } debugDataPublisher.PublishIfActive(livedebugging.NewData( @@ -120,17 +144,25 @@ func NewInterceptor(componentID string, exited *atomic.Bool, debugDataPublisher return fmt.Sprintf("metadata: labels=%s, type=%q, unit=%q, help=%q", l, m.Type, m.Unit, m.Help) }, )) - return globalRef, nextErr + return finalRef, err }), - prometheus.WithExemplarHook(func(globalRef storage.SeriesRef, l labels.Labels, e exemplar.Exemplar, next storage.Appender) (storage.SeriesRef, error) { + prometheus.WithExemplarHook(func(ref storage.SeriesRef, l labels.Labels, e exemplar.Exemplar, next storage.Appender) (storage.SeriesRef, error) { if exited.Load() { return 0, fmt.Errorf("%s has exited", componentID) } - localRef := ls.GetLocalRefID(componentID, uint64(globalRef)) - newLocalRef, nextErr := next.AppendExemplar(storage.SeriesRef(localRef), l, e) - if nextErr == nil { - handleLocalLink(uint64(globalRef), l, localRef, uint64(newLocalRef)) + var finalRef storage.SeriesRef + var err error + if ls.Enabled() { + localRef := ls.GetLocalRefID(componentID, uint64(ref)) + newLocalRef, nextErr := next.AppendExemplar(storage.SeriesRef(localRef), l, e) + if nextErr == nil { + handleLocalLink(uint64(ref), l, localRef, uint64(newLocalRef)) + } + finalRef = ref + err = nextErr + } else { + finalRef, err = next.AppendExemplar(ref, l, e) } debugDataPublisher.PublishIfActive(livedebugging.NewData( @@ -141,7 +173,7 @@ func NewInterceptor(componentID string, exited *atomic.Bool, debugDataPublisher return fmt.Sprintf("exemplar: ts=%d, labels=%s, exemplar_labels=%s, value=%f", e.Ts, l, e.Labels, e.Value) }, )) - return globalRef, nextErr + return finalRef, err }), ) } diff --git a/internal/component/prometheus/scrape/scrape.go b/internal/component/prometheus/scrape/scrape.go index a502b493977..889a5cf5c3c 100644 --- a/internal/component/prometheus/scrape/scrape.go +++ b/internal/component/prometheus/scrape/scrape.go @@ -331,7 +331,8 @@ func New(o component.Options, args Arguments) (*Component, error) { targetsGauge := client_prometheus.NewGauge(client_prometheus.GaugeOpts{ Name: "prometheus_scrape_targets_gauge", - Help: "Number of targets this component is configured to scrape"}) + Help: "Number of targets this component is configured to scrape", + }) err = o.Registerer.Register(targetsGauge) if err != nil { return nil, err @@ -339,7 +340,8 @@ func New(o component.Options, args Arguments) (*Component, error) { movedTargetsCounter := client_prometheus.NewCounter(client_prometheus.CounterOpts{ Name: "prometheus_scrape_targets_moved_total", - Help: "Number of targets that have moved from this cluster node to another one"}) + Help: "Number of targets that have moved from this cluster node to another one", + }) err = o.Registerer.Register(movedTargetsCounter) if err != nil { return nil, err @@ -385,6 +387,7 @@ func New(o component.Options, args Arguments) (*Component, error) { func (c *Component) Run(ctx context.Context) error { defer c.scraper.Stop() defer c.unregisterer.UnregisterAll() + defer c.appendable.Clear() targetSetsChan := make(chan map[string][]*targetgroup.Group) @@ -430,12 +433,7 @@ func (c *Component) Run(ctx context.Context) error { } } -func (c *Component) distributeTargets( - targets []discovery.Target, - jobName string, - args Arguments, -) (map[string][]*targetgroup.Group, []*scrape.Target) { - +func (c *Component) distributeTargets(targets []discovery.Target, jobName string, args Arguments) (map[string][]*targetgroup.Group, []*scrape.Target) { var ( newDistTargets = discovery.NewDistributedTargets(args.Clustering.Enabled, c.cluster, targets) oldDistributedTargets *discovery.DistributedTargets diff --git a/internal/service/labelstore/data.go b/internal/service/labelstore/data.go index 5e8ca95fc49..850aaebd1dc 100644 --- a/internal/service/labelstore/data.go +++ b/internal/service/labelstore/data.go @@ -1,8 +1,14 @@ package labelstore -import "github.com/prometheus/prometheus/model/labels" +import ( + "github.com/prometheus/prometheus/model/labels" + + alloy_service "github.com/grafana/alloy/internal/service" +) type LabelStore interface { + alloy_service.Service + // AddLocalLink adds a mapping from local to global id for the given component. AddLocalLink(componentID string, globalRefID uint64, localRefID uint64) @@ -21,6 +27,12 @@ type LabelStore interface { // ReplaceLocalLink updates an existing local to global mapping for a component. ReplaceLocalLink(componentID string, globalRefID uint64, cachedLocalRef uint64, newLocalRef uint64) + + // Clear removes all mappings from the label store. Only used for testing. + Clear() + + // Enabled allows checking if the label store is enabled before doing work to generate global ref ids and track staleness. + Enabled() bool } type StalenessTracker struct { diff --git a/internal/service/labelstore/service.go b/internal/service/labelstore/service.go index 3a786388ca4..cbf8f60d6c0 100644 --- a/internal/service/labelstore/service.go +++ b/internal/service/labelstore/service.go @@ -36,12 +36,35 @@ type staleMarker struct { type Arguments struct{} -var _ alloy_service.Service = (*Service)(nil) +var ( + _ alloy_service.Service = (*Service)(nil) + _ alloy_service.Service = (*disabledStore)(nil) +) + +type LabelStoreService interface { + alloy_service.Service + LabelStore +} -func New(l log.Logger, r prometheus.Registerer) *Service { +func New(l log.Logger, r prometheus.Registerer, enabled ...bool) LabelStoreService { if l == nil { l = log.NewNopLogger() } + + e := true + if len(enabled) != 0 { + e = enabled[0] + } + + if !e { + level.Info(l).Log("msg", "labelstore service is disabled") + return disabledStore{} + } + + return newLabelStore(l, r) +} + +func newLabelStore(l log.Logger, r prometheus.Registerer) *Service { s := &Service{ log: l, globalRefID: 0, @@ -55,6 +78,7 @@ func New(l log.Logger, r prometheus.Registerer) *Service { Help: "Last time stale check was ran expressed in unix timestamp.", }), } + _ = r.Register(s.lastStaleCheck) _ = r.Register(s) return s @@ -261,6 +285,20 @@ func (s *Service) CheckAndRemoveStaleMarkers() { } } +func (s *Service) Clear() { + s.mut.Lock() + defer s.mut.Unlock() + + s.globalRefID = 0 + s.mappings = make(map[string]*remoteWriteMapping) + s.labelsHashToGlobal = make(map[uint64]uint64) + s.staleGlobals = make(map[uint64]*staleMarker) +} + +func (s *Service) Enabled() bool { + return true +} + func (rw *remoteWriteMapping) deleteStaleIDs(globalID uint64) { localID, found := rw.globalToLocal[globalID] if !found { @@ -276,3 +314,54 @@ type remoteWriteMapping struct { localToGlobal map[uint64]uint64 globalToLocal map[uint64]uint64 } + +type disabledStore struct{} + +func (d disabledStore) Definition() alloy_service.Definition { + return alloy_service.Definition{ + Name: ServiceName, + ConfigType: Arguments{}, + DependsOn: nil, + Stability: featuregate.StabilityGenerallyAvailable, + } +} + +func (d disabledStore) Run(ctx context.Context, host alloy_service.Host) error { + <-ctx.Done() + return nil +} + +func (d disabledStore) Update(newConfig any) error { + return nil +} + +func (d disabledStore) Data() any { + return d +} + +func (d disabledStore) AddLocalLink(componentID string, globalRefID uint64, localRefID uint64) { +} + +func (d disabledStore) GetOrAddGlobalRefID(l labels.Labels) uint64 { + return 0 +} + +func (d disabledStore) GetLocalRefID(componentID string, globalRefID uint64) uint64 { + return 0 +} + +func (d disabledStore) TrackStaleness(ids []StalenessTracker) { +} + +func (d disabledStore) CheckAndRemoveStaleMarkers() { +} + +func (d disabledStore) ReplaceLocalLink(componentID string, globalRefID uint64, cachedLocalRef uint64, newLocalRef uint64) { +} + +func (d disabledStore) Clear() { +} + +func (d disabledStore) Enabled() bool { + return false +} diff --git a/internal/service/labelstore/service_test.go b/internal/service/labelstore/service_test.go index 42fcf8ab94d..39589f640d0 100644 --- a/internal/service/labelstore/service_test.go +++ b/internal/service/labelstore/service_test.go @@ -2,6 +2,7 @@ package labelstore import ( "math" + "os" "strconv" "sync" "testing" @@ -15,7 +16,7 @@ import ( ) func TestAddingMarker(t *testing.T) { - mapping := New(log.NewNopLogger(), prometheus.DefaultRegisterer) + mapping := newLabelStore(log.NewNopLogger(), prometheus.DefaultRegisterer) l := labels.FromStrings("__name__", "test") globalID := mapping.GetOrAddGlobalRefID(l) shouldBeSameGlobalID := mapping.GetOrAddGlobalRefID(l) @@ -24,7 +25,7 @@ func TestAddingMarker(t *testing.T) { } func TestAddingDifferentMarkers(t *testing.T) { - mapping := New(log.NewNopLogger(), prometheus.DefaultRegisterer) + mapping := newLabelStore(log.NewNopLogger(), prometheus.DefaultRegisterer) l := labels.FromStrings("__name__", "test") l2 := labels.FromStrings("__name__", "roar") globalID := mapping.GetOrAddGlobalRefID(l) @@ -34,7 +35,7 @@ func TestAddingDifferentMarkers(t *testing.T) { } func TestAddingLocalMapping(t *testing.T) { - mapping := New(log.NewNopLogger(), prometheus.DefaultRegisterer) + mapping := newLabelStore(log.NewNopLogger(), prometheus.DefaultRegisterer) l := labels.FromStrings("__name__", "test") globalID := mapping.GetOrAddGlobalRefID(l) @@ -49,7 +50,7 @@ func TestAddingLocalMapping(t *testing.T) { } func TestAddingLocalMappings(t *testing.T) { - mapping := New(log.NewNopLogger(), prometheus.DefaultRegisterer) + mapping := newLabelStore(log.NewNopLogger(), prometheus.DefaultRegisterer) l := labels.FromStrings("__name__", "test") globalID := mapping.GetOrAddGlobalRefID(l) @@ -71,7 +72,7 @@ func TestAddingLocalMappings(t *testing.T) { } func TestReplaceLocalMappings(t *testing.T) { - mapping := New(log.NewNopLogger(), prometheus.DefaultRegisterer) + mapping := newLabelStore(log.NewNopLogger(), prometheus.DefaultRegisterer) l := labels.FromStrings("__name__", "test") globalID := mapping.GetOrAddGlobalRefID(l) @@ -99,7 +100,7 @@ func TestReplaceLocalMappings(t *testing.T) { } func TestReplaceWithoutAddingLocalMapping(t *testing.T) { - mapping := New(log.NewNopLogger(), prometheus.DefaultRegisterer) + mapping := newLabelStore(log.NewNopLogger(), prometheus.DefaultRegisterer) l := labels.FromStrings("__name__", "test") globalID := mapping.GetOrAddGlobalRefID(l) @@ -112,7 +113,7 @@ func TestReplaceWithoutAddingLocalMapping(t *testing.T) { } func TestStaleness(t *testing.T) { - mapping := New(log.NewNopLogger(), prometheus.DefaultRegisterer) + mapping := newLabelStore(log.NewNopLogger(), prometheus.DefaultRegisterer) l := labels.FromStrings("__name__", "test") l2 := labels.FromStrings("__name__", "test2") @@ -137,7 +138,7 @@ func TestStaleness(t *testing.T) { } func TestRemovingStaleness(t *testing.T) { - mapping := New(log.NewNopLogger(), prometheus.DefaultRegisterer) + mapping := newLabelStore(log.NewNopLogger(), prometheus.DefaultRegisterer) l := labels.FromStrings("__name__", "test") global1 := mapping.GetOrAddGlobalRefID(l) @@ -162,9 +163,38 @@ func TestRemovingStaleness(t *testing.T) { require.Len(t, mapping.staleGlobals, 0) } +func TestHashCollisions(t *testing.T) { + // TODO: address hash collisions + env := os.Getenv("TEST_HASH_COLLISIONS") + if ok, _ := strconv.ParseBool(env); !ok { + t.Skip("Skipping TestHashCollisions as TEST_HASH_COLLISIONS is not set") + return + } + + mapping := newLabelStore(log.NewNopLogger(), prometheus.DefaultRegisterer) + // These two series have the same XXHash; thanks to https://github.com/pstibrany/labels_hash_collisions + ls1 := labels.FromStrings("__name__", "metric", "lbl", "HFnEaGl") + ls2 := labels.FromStrings("__name__", "metric", "lbl", "RqcXatm") + + if ls1.Hash() != ls2.Hash() { + // These ones are the same when using -tags slicelabels + ls1 = labels.FromStrings("__name__", "metric", "lbl1", "value", "lbl2", "l6CQ5y") + ls2 = labels.FromStrings("__name__", "metric", "lbl1", "value", "lbl2", "v7uDlF") + } + + if ls1.Hash() != ls2.Hash() { + t.Fatalf("This code needs to be updated: find new labels with colliding hash values.") + } + + globalID1 := mapping.GetOrAddGlobalRefID(ls1) + globalID2 := mapping.GetOrAddGlobalRefID(ls2) + + require.NotEqual(t, globalID1, globalID2) +} + func BenchmarkStaleness(b *testing.B) { b.StopTimer() - ls := New(log.NewNopLogger(), prometheus.DefaultRegisterer) + ls := newLabelStore(log.NewNopLogger(), prometheus.DefaultRegisterer) tracking := make([]StalenessTracker, 100_000) for i := 0; i < 100_000; i++ { diff --git a/internal/util/testappender/collectingappender.go b/internal/util/testappender/collectingappender.go index 763999cb721..449efe33acb 100644 --- a/internal/util/testappender/collectingappender.go +++ b/internal/util/testappender/collectingappender.go @@ -133,7 +133,7 @@ func (c *collectingAppender) AppendHistogramSTZeroSample(ref storage.SeriesRef, func (c *collectingAppender) SetOptions(_ *storage.AppendOptions) {} type ConstantAppendable struct { - Inner CollectingAppender + Inner storage.Appender } func (c ConstantAppendable) Appender(_ context.Context) storage.Appender {