diff --git a/CHANGELOG.md b/CHANGELOG.md index dcd9de5c41f..5b40e3a3904 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - Support attributes with empty value (`attribute.EMPTY`) in `go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest`. (#8038) - Add support for per-series start time tracking for cumulative metrics in `go.opentelemetry.io/otel/sdk/metric`. Set `OTEL_GO_X_PER_SERIES_START_TIMESTAMPS=true` to enable. (#8060) +- Add `WithCardinalityLimitSelector` for metric reader for configuring cardinality limits specific to the instrument kind. (#7855) ### Changed diff --git a/sdk/metric/config.go b/sdk/metric/config.go index c6440a1346c..306e5e3cdce 100644 --- a/sdk/metric/config.go +++ b/sdk/metric/config.go @@ -160,12 +160,14 @@ func WithExemplarFilter(filter exemplar.Filter) Option { }) } -// WithCardinalityLimit sets the cardinality limit for the MeterProvider. +// WithCardinalityLimit sets the global cardinality limit for the MeterProvider. // // The cardinality limit is the hard limit on the number of metric datapoints // that can be collected for a single instrument in a single collect cycle. // // Setting this to a zero or negative value means no limit is applied. +// This value applies to all instrument kinds, but can be overridden per kind by +// the reader's cardinality limit selector (see [WithCardinalityLimitSelector]). func WithCardinalityLimit(limit int) Option { // For backward compatibility, the environment variable `OTEL_GO_X_CARDINALITY_LIMIT` // can also be used to set this value. diff --git a/sdk/metric/config_test.go b/sdk/metric/config_test.go index daa989d0962..e4f1b98f87c 100644 --- a/sdk/metric/config_test.go +++ b/sdk/metric/config_test.go @@ -20,13 +20,14 @@ import ( ) type reader struct { - producer sdkProducer - externalProducers []Producer - temporalityFunc TemporalitySelector - aggregationFunc AggregationSelector - collectFunc func(context.Context, *metricdata.ResourceMetrics) error - forceFlushFunc func(context.Context) error - shutdownFunc func(context.Context) error + producer sdkProducer + externalProducers []Producer + temporalityFunc TemporalitySelector + aggregationFunc AggregationSelector + cardinalityLimitSelector CardinalityLimitSelector + collectFunc func(context.Context, *metricdata.ResourceMetrics) error + forceFlushFunc func(context.Context) error + shutdownFunc func(context.Context) error } const envVarResourceAttributes = "OTEL_RESOURCE_ATTRIBUTES" @@ -45,6 +46,13 @@ func (r *reader) temporality(kind InstrumentKind) metricdata.Temporality { return r.temporalityFunc(kind) } +func (r *reader) cardinalityLimit(kind InstrumentKind) (int, bool) { + if r.cardinalityLimitSelector != nil { + return r.cardinalityLimitSelector(kind) + } + return 0, true +} + func (r *reader) Collect(ctx context.Context, rm *metricdata.ResourceMetrics) error { return r.collectFunc(ctx, rm) } diff --git a/sdk/metric/manual_reader.go b/sdk/metric/manual_reader.go index 5b0630207b5..0357afd455f 100644 --- a/sdk/metric/manual_reader.go +++ b/sdk/metric/manual_reader.go @@ -32,8 +32,9 @@ type ManualReader struct { isShutdown bool externalProducers atomic.Value - temporalitySelector TemporalitySelector - aggregationSelector AggregationSelector + temporalitySelector TemporalitySelector + aggregationSelector AggregationSelector + cardinalityLimitSelector CardinalityLimitSelector inst *observ.Instrumentation } @@ -45,8 +46,9 @@ var _ = map[Reader]struct{}{&ManualReader{}: {}} func NewManualReader(opts ...ManualReaderOption) *ManualReader { cfg := newManualReaderConfig(opts) r := &ManualReader{ - temporalitySelector: cfg.temporalitySelector, - aggregationSelector: cfg.aggregationSelector, + temporalitySelector: cfg.temporalitySelector, + aggregationSelector: cfg.aggregationSelector, + cardinalityLimitSelector: cfg.cardinalityLimitSelector, } r.externalProducers.Store(cfg.producers) @@ -89,6 +91,11 @@ func (mr *ManualReader) aggregation( return mr.aggregationSelector(kind) } +// cardinalityLimit returns the cardinality limit for kind. +func (mr *ManualReader) cardinalityLimit(kind InstrumentKind) (int, bool) { + return mr.cardinalityLimitSelector(kind) +} + // Shutdown closes any connections and frees any resources used by the reader. // // This method is safe to call concurrently. @@ -179,16 +186,18 @@ func (r *ManualReader) MarshalLog() any { // manualReaderConfig contains configuration options for a ManualReader. type manualReaderConfig struct { - temporalitySelector TemporalitySelector - aggregationSelector AggregationSelector - producers []Producer + temporalitySelector TemporalitySelector + aggregationSelector AggregationSelector + cardinalityLimitSelector CardinalityLimitSelector + producers []Producer } // newManualReaderConfig returns a manualReaderConfig configured with options. func newManualReaderConfig(opts []ManualReaderOption) manualReaderConfig { cfg := manualReaderConfig{ - temporalitySelector: DefaultTemporalitySelector, - aggregationSelector: DefaultAggregationSelector, + temporalitySelector: DefaultTemporalitySelector, + aggregationSelector: DefaultAggregationSelector, + cardinalityLimitSelector: defaultCardinalityLimitSelector, } for _, opt := range opts { cfg = opt.applyManual(cfg) diff --git a/sdk/metric/periodic_reader.go b/sdk/metric/periodic_reader.go index 32a7e19325f..d1efc9f374a 100644 --- a/sdk/metric/periodic_reader.go +++ b/sdk/metric/periodic_reader.go @@ -26,17 +26,19 @@ const ( // periodicReaderConfig contains configuration options for a PeriodicReader. type periodicReaderConfig struct { - interval time.Duration - timeout time.Duration - producers []Producer + interval time.Duration + timeout time.Duration + producers []Producer + cardinalityLimitSelector CardinalityLimitSelector } // newPeriodicReaderConfig returns a periodicReaderConfig configured with // options. func newPeriodicReaderConfig(options []PeriodicReaderOption) periodicReaderConfig { c := periodicReaderConfig{ - interval: envDuration(envInterval, defaultInterval), - timeout: envDuration(envTimeout, defaultTimeout), + interval: envDuration(envInterval, defaultInterval), + timeout: envDuration(envTimeout, defaultTimeout), + cardinalityLimitSelector: defaultCardinalityLimitSelector, } for _, o := range options { c = o.applyPeriodic(c) @@ -111,12 +113,13 @@ func NewPeriodicReader(exporter Exporter, options ...PeriodicReaderOption) *Peri context.Background(), ) r := &PeriodicReader{ - interval: conf.interval, - timeout: conf.timeout, - exporter: exporter, - flushCh: make(chan chan error), - cancel: cancel, - done: make(chan struct{}), + interval: conf.interval, + timeout: conf.timeout, + exporter: exporter, + flushCh: make(chan chan error), + cancel: cancel, + done: make(chan struct{}), + cardinalityLimitSelector: conf.cardinalityLimitSelector, rmPool: sync.Pool{ New: func() any { return &metricdata.ResourceMetrics{} @@ -170,6 +173,8 @@ type PeriodicReader struct { rmPool sync.Pool + cardinalityLimitSelector CardinalityLimitSelector + inst *observ.Instrumentation } @@ -222,6 +227,11 @@ func (r *PeriodicReader) aggregation( return r.exporter.Aggregation(kind) } +// cardinalityLimit returns the cardinality limit for kind. +func (r *PeriodicReader) cardinalityLimit(kind InstrumentKind) (int, bool) { + return r.cardinalityLimitSelector(kind) +} + // collectAndExport gather all metric data related to the periodicReader r from // the SDK and exports it with r's exporter. func (r *PeriodicReader) collectAndExport(ctx context.Context) error { diff --git a/sdk/metric/pipeline.go b/sdk/metric/pipeline.go index 5bef5e3878e..34300a786ca 100644 --- a/sdk/metric/pipeline.go +++ b/sdk/metric/pipeline.go @@ -395,9 +395,7 @@ func (i *inserter[N]) cachedAggregator( b.Filter = stream.AttributeFilter // A value less than or equal to zero will disable the aggregation // limits for the builder (an all the created aggregates). - // cardinalityLimit will be 0 by default if unset (or - // unrecognized input). Use that value directly. - b.AggregationLimit = i.pipeline.cardinalityLimit + b.AggregationLimit = i.getCardinalityLimit(kind) in, out, err := i.aggregateFunc(b, stream.Aggregation, kind) if err != nil { return aggVal[N]{0, nil, err} @@ -419,6 +417,18 @@ func (i *inserter[N]) cachedAggregator( return cv.Measure, cv.ID, cv.Err } +// getCardinalityLimit returns the cardinality limit for the given instrument kind. +// When the reader's selector returns fallback = true, the pipeline's global +// limit is used, then the default if global is unset. When fallback is false, +// the selector's limit is used (0 or less means unlimited). +func (i *inserter[N]) getCardinalityLimit(kind InstrumentKind) int { + limit, fallback := i.pipeline.reader.cardinalityLimit(kind) + if fallback { + return i.pipeline.cardinalityLimit + } + return limit +} + // logConflict validates if an instrument with the same case-insensitive name // as id has already been created. If that instrument conflicts with id, a // warning is logged. diff --git a/sdk/metric/provider_test.go b/sdk/metric/provider_test.go index c498277db64..22d9dec4878 100644 --- a/sdk/metric/provider_test.go +++ b/sdk/metric/provider_test.go @@ -244,3 +244,231 @@ func TestMeterProviderCardinalityLimit(t *testing.T) { }) } } + +func TestMeterProviderPerInstrumentCardinalityLimits(t *testing.T) { + const uniqueAttributesCount = 10 + + type metricCase struct { + name string + selector CardinalityLimitSelector + globalLimit int + build func(t *testing.T, meter api.Meter) + wantPoints int + } + + testCases := []metricCase{ + { + name: "counter uses counter-specific limit", + selector: func(kind InstrumentKind) (int, bool) { + if kind == InstrumentKindCounter { + return 3, false + } + return 0, true + }, + globalLimit: 8, + build: func(t *testing.T, meter api.Meter) { + counter, err := meter.Int64Counter("counter-metric") + require.NoError(t, err) + for i := range uniqueAttributesCount { + counter.Add(t.Context(), 1, api.WithAttributes(attribute.Int("key", i))) + } + }, + wantPoints: 3, + }, + { + name: "histogram uses histogram-specific limit", + selector: func(kind InstrumentKind) (int, bool) { + if kind == InstrumentKindHistogram { + return 4, false + } + return 0, true + }, + globalLimit: 8, + build: func(t *testing.T, meter api.Meter) { + histogram, err := meter.Int64Histogram("histogram-metric") + require.NoError(t, err) + for i := range uniqueAttributesCount { + histogram.Record(t.Context(), int64(i), api.WithAttributes(attribute.Int("key", i))) + } + }, + wantPoints: 4, + }, + { + name: "gauge uses gauge-specific limit", + selector: func(kind InstrumentKind) (int, bool) { + if kind == InstrumentKindGauge { + return 5, false + } + return 0, true + }, + globalLimit: 8, + build: func(t *testing.T, meter api.Meter) { + gauge, err := meter.Int64Gauge("gauge-metric") + require.NoError(t, err) + for i := range uniqueAttributesCount { + gauge.Record(t.Context(), int64(i), api.WithAttributes(attribute.Int("key", i))) + } + }, + wantPoints: 5, + }, + { + name: "up down counter uses updowncounter-specific limit", + selector: func(kind InstrumentKind) (int, bool) { + if kind == InstrumentKindUpDownCounter { + return 2, false + } + return 0, true + }, + globalLimit: 8, + build: func(t *testing.T, meter api.Meter) { + upDownCounter, err := meter.Int64UpDownCounter("updowncounter-metric") + require.NoError(t, err) + for i := range uniqueAttributesCount { + upDownCounter.Add(t.Context(), 1, api.WithAttributes(attribute.Int("key", i))) + } + }, + wantPoints: 2, + }, + { + name: "observable counter uses observable-counter-specific limit", + selector: func(kind InstrumentKind) (int, bool) { + if kind == InstrumentKindObservableCounter { + return 4, false + } + return 0, true + }, + globalLimit: 8, + build: func(t *testing.T, meter api.Meter) { + obs, err := meter.Int64ObservableCounter( + "observable-counter-metric", + api.WithInt64Callback(func(_ context.Context, o api.Int64Observer) error { + for i := range uniqueAttributesCount { + o.Observe(int64(i), api.WithAttributes(attribute.Int("key", i))) + } + return nil + }), + ) + require.NoError(t, err) + require.NotNil(t, obs) + }, + wantPoints: 4, + }, + { + name: "observable gauge uses observable-gauge-specific limit", + selector: func(kind InstrumentKind) (int, bool) { + if kind == InstrumentKindObservableGauge { + return 5, false + } + return 0, true + }, + globalLimit: 8, + build: func(t *testing.T, meter api.Meter) { + obs, err := meter.Int64ObservableGauge( + "observable-gauge-metric", + api.WithInt64Callback(func(_ context.Context, o api.Int64Observer) error { + for i := range uniqueAttributesCount { + o.Observe(int64(i), api.WithAttributes(attribute.Int("key", i))) + } + return nil + }), + ) + require.NoError(t, err) + require.NotNil(t, obs) + }, + wantPoints: 5, + }, + { + name: "observable up down counter uses limit", + selector: func(kind InstrumentKind) (int, bool) { + if kind == InstrumentKindObservableUpDownCounter { + return 3, false + } + return 0, true + }, + globalLimit: 8, + build: func(t *testing.T, meter api.Meter) { + obs, err := meter.Int64ObservableUpDownCounter( + "observable-updowncounter-metric", + api.WithInt64Callback(func(_ context.Context, o api.Int64Observer) error { + for i := range uniqueAttributesCount { + o.Observe(int64(i), api.WithAttributes(attribute.Int("key", i))) + } + return nil + }), + ) + require.NoError(t, err) + require.NotNil(t, obs) + }, + wantPoints: 3, + }, + { + name: "instrument without specific limit falls back to global limit", + selector: func(kind InstrumentKind) (int, bool) { + if kind == InstrumentKindCounter { + return 3, false + } + return 0, true // fall back to global limit for other kinds + }, + globalLimit: 6, + build: func(t *testing.T, meter api.Meter) { + histogram, err := meter.Int64Histogram("histogram-metric") + require.NoError(t, err) + for i := range uniqueAttributesCount { + histogram.Record(t.Context(), int64(i), api.WithAttributes(attribute.Int("key", i))) + } + }, + wantPoints: 6, + }, + { + name: "selector can set specific kind to unlimited while global limit is nonzero (limited)", + selector: func(kind InstrumentKind) (int, bool) { + if kind == InstrumentKindCounter { + return 0, false // unlimited for counter only + } + return 0, true // fallback to global limit + }, + globalLimit: 3, + build: func(t *testing.T, meter api.Meter) { + counter, err := meter.Int64Counter("counter-metric") + require.NoError(t, err) + for i := range uniqueAttributesCount { + counter.Add(t.Context(), 1, api.WithAttributes(attribute.Int("key", i))) + } + }, + wantPoints: uniqueAttributesCount, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + reader := NewManualReader( + WithCardinalityLimitSelector(tc.selector), + ) + mp := NewMeterProvider( + WithReader(reader), + WithCardinalityLimit(tc.globalLimit), + ) + + meter := mp.Meter("test-meter") + tc.build(t, meter) + + var rm metricdata.ResourceMetrics + err := reader.Collect(t.Context(), &rm) + require.NoError(t, err) + + require.Len(t, rm.ScopeMetrics, 1) + require.Len(t, rm.ScopeMetrics[0].Metrics, 1) + + switch data := rm.ScopeMetrics[0].Metrics[0].Data.(type) { + case metricdata.Sum[int64]: + assert.Len(t, data.DataPoints, tc.wantPoints, tc.name) + case metricdata.Histogram[int64]: + assert.Len(t, data.DataPoints, tc.wantPoints, tc.name) + case metricdata.Gauge[int64]: + assert.Len(t, data.DataPoints, tc.wantPoints, tc.name) + default: + t.Fatalf("unexpected data type %T", data) + } + }) + } +} diff --git a/sdk/metric/reader.go b/sdk/metric/reader.go index 7b205c736c2..99079dd2783 100644 --- a/sdk/metric/reader.go +++ b/sdk/metric/reader.go @@ -59,6 +59,15 @@ type Reader interface { // Reader methods. aggregation(InstrumentKind) Aggregation // nolint:revive // import-shadow for method scoped by type. + // cardinalityLimit returns the cardinality limit for an instrument kind. + // When fallback is true, the pipeline falls back to the provider's global limit. + // When fallback is false, limit is used: 0 or less means no limit (unlimited), + // and a positive value is the limit for that kind. + // + // This method needs to be concurrent safe with itself and all the other + // Reader methods. + cardinalityLimit(InstrumentKind) (limit int, fallback bool) + // Collect gathers and returns all metric data related to the Reader from // the SDK and stores it in rm. An error is returned if this is called // after Shutdown or if rm is nil. @@ -192,6 +201,25 @@ func DefaultAggregationSelector(ik InstrumentKind) Aggregation { panic("unknown instrument kind") } +// CardinalityLimitSelector selects the cardinality limit to use based on the +// InstrumentKind. The cardinality limit is the maximum number of distinct +// attribute sets that can be recorded for a single instrument. +// +// The selector returns (limit, fallback). When fallback is true, the pipeline +// falls back to the provider's global cardinality limit. +// When fallback is false, the limit is applied: a value of 0 or less means +// no limit, and a positive value is the limit for that kind. +// To avoid overriding the provider's global limit, return (0, true). +type CardinalityLimitSelector func(InstrumentKind) (limit int, fallback bool) + +// defaultCardinalityLimitSelector is the default CardinalityLimitSelector used +// if WithCardinalityLimitSelector is not provided. It returns (0, true) for all +// instrument kinds, allowing the pipeline to fall back to the provider's global +// limit. +func defaultCardinalityLimitSelector(_ InstrumentKind) (int, bool) { + return 0, true +} + // ReaderOption is an option which can be applied to manual or Periodic // readers. type ReaderOption interface { @@ -220,3 +248,33 @@ func (o producerOption) applyPeriodic(c periodicReaderConfig) periodicReaderConf c.producers = append(c.producers, o.p) return c } + +// WithCardinalityLimitSelector sets the CardinalityLimitSelector a reader will +// use to determine the cardinality limit for an instrument based on its kind. +// If this option is not used, the reader will use the +// defaultCardinalityLimitSelector. +// +// The selector should return (limit, false) to set a positive limit, +// (0, false) to explicitly specify unlimited, or +// (0, true) to fall back to the provider's global limit. +// +// See [CardinalityLimitSelector] for more details. +func WithCardinalityLimitSelector(selector CardinalityLimitSelector) ReaderOption { + return cardinalityLimitSelectorOption{selector: selector} +} + +type cardinalityLimitSelectorOption struct { + selector CardinalityLimitSelector +} + +// applyManual returns a manualReaderConfig with option applied. +func (o cardinalityLimitSelectorOption) applyManual(c manualReaderConfig) manualReaderConfig { + c.cardinalityLimitSelector = o.selector + return c +} + +// applyPeriodic returns a periodicReaderConfig with option applied. +func (o cardinalityLimitSelectorOption) applyPeriodic(c periodicReaderConfig) periodicReaderConfig { + c.cardinalityLimitSelector = o.selector + return c +}