-
Notifications
You must be signed in to change notification settings - Fork 593
feat: Introduce SeriesRefMappingStore #5522
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
48 commits
Select commit
Hold shift + click to select a range
853091b
Stop caching relabel results by SeriesRef
kgeckhart 19267d5
Checkpoint
kgeckhart a463ea1
Fix hashing and remove unique ref benchmarks
kgeckhart 61555cc
wip: microbench
x1unix 11c903f
wip: separate test to capture profile
x1unix 9e09435
wip: bump series count
x1unix a9285bc
fix: avoid unnecessary GetOrAddGlobalRefID call
x1unix 3163114
Fix bool and attempt to optmize map growth
kgeckhart 9a49635
Make sure pool will work properly, and better benches
kgeckhart c76e3c6
Make sure fanout clear will not clear labelstore
kgeckhart 0b2bd4a
feat: use labels as fallback to lookup mapping
x1unix 83d6006
feat: pass labels to create mapping
x1unix 89b6aba
feat: make MappingStore iface public
x1unix 2688a06
feat: maintain and clean label hash index state
x1unix ae0eba6
feat: update seriesrefmapping_test.go
x1unix 31d4a58
feat: update tests after method signature changes
x1unix a76652d
fix: tests build errors
x1unix 3fa1603
feat: call Fanout.Clear after consumer component shutdown
x1unix f73b6cc
feat: add CLI flag to disable LabelStore
x1unix 1d882b5
fix: rephrase flag description
x1unix 6c168c8
feat: remove debug test
x1unix f95ba6d
fix: return LabelStoreService
x1unix e253584
fix: cli docs
x1unix 71a1f37
fix: labelstore tests
x1unix 9376f04
fix: golangci-lint
x1unix c88fc1f
fix: skip TestHashCollisions for now
x1unix 9c3a2cc
fix: nullptr dereference
x1unix 3f8160a
fix: use labelstore by default
x1unix 50768bf
fix: labelstore stub service name
x1unix cf5fdc9
fix: Definition result
x1unix bde1225
feat: remove obsolete BenchmarkStoreFlows and relocate BenchmarkAppen…
x1unix 81836e3
feat: capture passthrough flow in BenchmarkAppenderFlows
x1unix e0056dc
fix: TestSliceIsEmptyAfterReturn
x1unix 3ab155f
fix: address some review comments
x1unix a913cf0
fix: add missing latency tracking
x1unix f476123
fix: address race condition during appendToChildren
x1unix 45eb78a
feat: add append tests for seriesrefmapping
x1unix 47f12ec
fix: update iface impl after rebase
x1unix a1058fa
fix: fanout_test.go
x1unix da490bc
fix: remove bench results output
x1unix 23efb4e
fix: AI bot review comments
x1unix d645f4c
fix: harden fanout/appender ref handling edge cases
x1unix 90dadf2
fix: revert mutex type change in labelstore
x1unix 9ecc959
Always create a mapping if a ref is non-zero
kgeckhart 1696e3d
Make sure the children actually changed before clearing
kgeckhart bc48773
Ensure changes from passthrough to seriesref are safe in both directions
kgeckhart da86f32
Add prometheus prefix with more generic name to cli flag
kgeckhart 4fdcc53
Remove misleading comment and change misleading variable name
kgeckhart File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,21 @@ | ||
| package appenders | ||
|
|
||
| import ( | ||
| "github.com/prometheus/client_golang/prometheus" | ||
| "github.com/prometheus/prometheus/storage" | ||
| ) | ||
|
|
||
| // New returns an appropriate appender based on the number of children. | ||
| func New(children []storage.Appender, store *SeriesRefMappingStore, deadRefThreshold storage.SeriesRef, writeLatency prometheus.Histogram, samplesForwarded prometheus.Counter) storage.Appender { | ||
| // No destination, no work to do. | ||
| if len(children) == 0 { | ||
| return Noop{} | ||
| } | ||
|
|
||
| // Single destination, no need to fanout. | ||
| if len(children) == 1 { | ||
| return NewPassthrough(children[0], deadRefThreshold, writeLatency, samplesForwarded) | ||
| } | ||
|
|
||
| return NewSeriesRefMapping(children, store, writeLatency, samplesForwarded) | ||
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,63 @@ | ||
| package appenders | ||
|
|
||
| import ( | ||
| "testing" | ||
|
|
||
| "github.com/prometheus/client_golang/prometheus" | ||
| "github.com/prometheus/prometheus/model/labels" | ||
| "github.com/prometheus/prometheus/storage" | ||
| "github.com/stretchr/testify/assert" | ||
| "github.com/stretchr/testify/require" | ||
| ) | ||
|
|
||
| func TestNew_NoChildrenReturnsNoop(t *testing.T) { | ||
| app := New(nil, nil, 0, nil, nil) | ||
|
|
||
| _, ok := app.(Noop) | ||
| assert.True(t, ok, "expected Noop appender for zero children") | ||
| } | ||
|
|
||
| func TestNew_SingleChildReturnsPassthrough(t *testing.T) { | ||
| child := &mockAppender{} | ||
| app := New([]storage.Appender{child}, nil, 0, nil, nil) | ||
|
|
||
| _, ok := app.(*passthrough) | ||
| assert.True(t, ok, "expected passthrough appender for single child") | ||
| } | ||
|
|
||
| func TestNew_MultipleChildrenReturnsSeriesRefMapping(t *testing.T) { | ||
| store := NewSeriesRefMappingStore(nil) | ||
| t.Cleanup(func() { store.Clear() }) | ||
|
|
||
| child1 := &mockAppender{} | ||
| child2 := &mockAppender{} | ||
| app := New([]storage.Appender{child1, child2}, store, 0, nil, nil) | ||
|
|
||
| _, ok := app.(*seriesRefMapping) | ||
| assert.True(t, ok, "expected seriesRefMapping appender for multiple children") | ||
| } | ||
|
|
||
| func TestNew_PassthroughReceivesDeadRefThreshold(t *testing.T) { | ||
| store := NewSeriesRefMappingStore(nil) | ||
|
|
||
| // Issue a mapping so nextUniqueRef advances past 1. | ||
| lbls := labels.FromStrings("job", "test") | ||
| store.CreateMapping([]storage.SeriesRef{100, 200}, lbls) | ||
|
|
||
| // Clear advances firstRefOfCurrentGeneration to the current nextUniqueRef. | ||
| threshold := store.Clear() | ||
| require.Greater(t, uint64(threshold), uint64(0)) | ||
|
|
||
| sf := prometheus.NewCounter(prometheus.CounterOpts{Name: "test_forwarded", Help: "test"}) | ||
| child := &mockAppender{appendFn: func(ref storage.SeriesRef, _ labels.Labels, _ int64, _ float64) (storage.SeriesRef, error) { | ||
| return ref, nil // echo back whatever ref we receive | ||
| }} | ||
| app := New([]storage.Appender{child}, store, threshold, nil, sf) | ||
|
|
||
| // A ref below the threshold must be zeroed by the passthrough. | ||
| staleRef := threshold - 1 | ||
| _, err := app.Append(staleRef, lbls, 1, 1.0) | ||
| require.NoError(t, err) | ||
| require.Equal(t, storage.SeriesRef(0), child.appendRefs[0], | ||
| "passthrough must zero refs below the dead ref threshold") | ||
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,53 @@ | ||
| package appenders | ||
|
|
||
| import ( | ||
| "context" | ||
|
|
||
| "github.com/prometheus/prometheus/model/exemplar" | ||
| "github.com/prometheus/prometheus/model/histogram" | ||
| "github.com/prometheus/prometheus/model/labels" | ||
| "github.com/prometheus/prometheus/model/metadata" | ||
| "github.com/prometheus/prometheus/storage" | ||
| ) | ||
|
|
||
| type Noop struct { | ||
| } | ||
|
|
||
| func (n Noop) Appender(_ context.Context) storage.Appender { | ||
| return n | ||
| } | ||
|
|
||
| func (n Noop) Append(ref storage.SeriesRef, _ labels.Labels, _ int64, _ float64) (storage.SeriesRef, error) { | ||
| return ref, nil | ||
| } | ||
|
|
||
| func (n Noop) Commit() error { | ||
| return nil | ||
| } | ||
|
|
||
| func (n Noop) Rollback() error { | ||
| return nil | ||
| } | ||
|
|
||
| func (n Noop) SetOptions(_ *storage.AppendOptions) { | ||
| } | ||
|
|
||
| func (n Noop) AppendExemplar(ref storage.SeriesRef, _ labels.Labels, _ exemplar.Exemplar) (storage.SeriesRef, error) { | ||
| return ref, nil | ||
| } | ||
|
|
||
| func (n Noop) AppendHistogram(ref storage.SeriesRef, _ labels.Labels, _ int64, _ *histogram.Histogram, _ *histogram.FloatHistogram) (storage.SeriesRef, error) { | ||
| return ref, nil | ||
| } | ||
|
|
||
| func (n Noop) AppendHistogramSTZeroSample(ref storage.SeriesRef, _ labels.Labels, _, _ int64, _ *histogram.Histogram, _ *histogram.FloatHistogram) (storage.SeriesRef, error) { | ||
| return ref, nil | ||
| } | ||
|
|
||
| func (n Noop) UpdateMetadata(ref storage.SeriesRef, _ labels.Labels, _ metadata.Metadata) (storage.SeriesRef, error) { | ||
| return ref, nil | ||
| } | ||
|
|
||
| func (n Noop) AppendSTZeroSample(ref storage.SeriesRef, _ labels.Labels, _, _ int64) (storage.SeriesRef, error) { | ||
| return ref, nil | ||
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,111 @@ | ||
| package appenders | ||
|
|
||
| import ( | ||
| "time" | ||
|
|
||
| "github.com/prometheus/client_golang/prometheus" | ||
| "github.com/prometheus/prometheus/model/exemplar" | ||
| "github.com/prometheus/prometheus/model/histogram" | ||
| "github.com/prometheus/prometheus/model/labels" | ||
| "github.com/prometheus/prometheus/model/metadata" | ||
| "github.com/prometheus/prometheus/storage" | ||
| ) | ||
|
|
||
| type passthrough struct { | ||
| wrapping storage.Appender | ||
| start time.Time | ||
| writeLatency prometheus.Histogram | ||
| samplesForwarded prometheus.Counter | ||
| // deadRefThreshold marks the boundary of the current ref generation. Any incoming | ||
| // ref below this value is from a previous generation and meaningless to this child; | ||
| // it must be zeroed so the child allocates a fresh ref. | ||
| deadRefThreshold storage.SeriesRef | ||
| } | ||
|
|
||
| func NewPassthrough(wrapping storage.Appender, deadRefThreshold storage.SeriesRef, writeLatency prometheus.Histogram, samplesForwarded prometheus.Counter) storage.Appender { | ||
| return &passthrough{ | ||
| wrapping: wrapping, | ||
| deadRefThreshold: deadRefThreshold, | ||
| writeLatency: writeLatency, | ||
| samplesForwarded: samplesForwarded, | ||
| } | ||
| } | ||
|
|
||
| // sanitizeRef zeros ref if it is from a previous generation. | ||
| func (p *passthrough) sanitizeRef(ref storage.SeriesRef) storage.SeriesRef { | ||
| if ref != 0 && ref < p.deadRefThreshold { | ||
| return 0 | ||
| } | ||
| return ref | ||
| } | ||
|
|
||
| func (p *passthrough) Commit() error { | ||
| defer p.recordLatency() | ||
| return p.wrapping.Commit() | ||
| } | ||
|
|
||
| func (p *passthrough) Rollback() error { | ||
| defer p.recordLatency() | ||
| return p.wrapping.Rollback() | ||
| } | ||
|
|
||
| func (p *passthrough) recordLatency() { | ||
| if p.start.IsZero() { | ||
| return | ||
| } | ||
| duration := time.Since(p.start) | ||
| p.writeLatency.Observe(duration.Seconds()) | ||
| } | ||
|
|
||
| func (p *passthrough) SetOptions(opts *storage.AppendOptions) { | ||
| p.wrapping.SetOptions(opts) | ||
| } | ||
|
|
||
| func (p *passthrough) Append(ref storage.SeriesRef, l labels.Labels, t int64, v float64) (storage.SeriesRef, error) { | ||
| if p.start.IsZero() { | ||
| p.start = time.Now() | ||
| } | ||
|
|
||
| ref, err := p.wrapping.Append(p.sanitizeRef(ref), l, t, v) | ||
|
|
||
| if err == nil { | ||
| p.samplesForwarded.Inc() | ||
| } | ||
|
|
||
| return ref, err | ||
| } | ||
|
|
||
| func (p *passthrough) AppendExemplar(ref storage.SeriesRef, l labels.Labels, e exemplar.Exemplar) (storage.SeriesRef, error) { | ||
| if p.start.IsZero() { | ||
| p.start = time.Now() | ||
| } | ||
| return p.wrapping.AppendExemplar(p.sanitizeRef(ref), l, e) | ||
| } | ||
|
|
||
| func (p *passthrough) AppendHistogram(ref storage.SeriesRef, l labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) { | ||
| if p.start.IsZero() { | ||
| p.start = time.Now() | ||
| } | ||
| return p.wrapping.AppendHistogram(p.sanitizeRef(ref), l, t, h, fh) | ||
| } | ||
|
|
||
| func (p *passthrough) AppendHistogramSTZeroSample(ref storage.SeriesRef, l labels.Labels, t, st int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) { | ||
| if p.start.IsZero() { | ||
| p.start = time.Now() | ||
| } | ||
| return p.wrapping.AppendHistogramSTZeroSample(p.sanitizeRef(ref), l, t, st, h, fh) | ||
| } | ||
|
|
||
| func (p *passthrough) UpdateMetadata(ref storage.SeriesRef, l labels.Labels, m metadata.Metadata) (storage.SeriesRef, error) { | ||
| if p.start.IsZero() { | ||
| p.start = time.Now() | ||
| } | ||
| return p.wrapping.UpdateMetadata(p.sanitizeRef(ref), l, m) | ||
| } | ||
|
|
||
| func (p *passthrough) AppendSTZeroSample(ref storage.SeriesRef, l labels.Labels, t, st int64) (storage.SeriesRef, error) { | ||
| if p.start.IsZero() { | ||
| p.start = time.Now() | ||
| } | ||
| return p.wrapping.AppendSTZeroSample(p.sanitizeRef(ref), l, t, st) | ||
| } | ||
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.