From 21725d7df4b0694fabdaa303918e6d8c1f8cf303 Mon Sep 17 00:00:00 2001 From: Pablo Baeyens Date: Tue, 21 Jan 2025 12:10:17 +0100 Subject: [PATCH 1/3] [chore][service] Add views to drop metrics based on levels For the following components: - processor/batch - contrib's internal/otelarrow/netstats - otel-arrow library --- processor/batchprocessor/batch_processor.go | 11 ++- processor/batchprocessor/metrics.go | 4 -- service/telemetry/metrics.go | 77 +++++++++++++++++++++ 3 files changed, 86 insertions(+), 6 deletions(-) diff --git a/processor/batchprocessor/batch_processor.go b/processor/batchprocessor/batch_processor.go index f06c8010033..6dc5ed828ee 100644 --- a/processor/batchprocessor/batch_processor.go +++ b/processor/batchprocessor/batch_processor.go @@ -264,10 +264,17 @@ func (b *shard[T]) sendItems(trigger trigger) { return } var bytes int - if b.processor.telemetry.detailed { + bpt := b.processor.telemetry + + // Check if the instrument is enabled to calculate the size of the batch in bytes. + // See https://pkg.go.dev/go.opentelemetry.io/otel/sdk/metric/internal/x#readme-instrument-enabled + batchSendSizeBytes := bpt.telemetryBuilder.ProcessorBatchBatchSendSizeBytes + instr, ok := batchSendSizeBytes.(interface{ Enabled(context.Context) bool }) + if !ok || instr.Enabled(bpt.exportCtx) { bytes = b.batch.sizeBytes(req) } - b.processor.telemetry.record(trigger, int64(sent), int64(bytes)) + + bpt.record(trigger, int64(sent), int64(bytes)) } // singleShardBatcher is used when metadataKeys is empty, to avoid the diff --git a/processor/batchprocessor/metrics.go b/processor/batchprocessor/metrics.go index 01621ad86e6..6ec1386faa8 100644 --- a/processor/batchprocessor/metrics.go +++ b/processor/batchprocessor/metrics.go @@ -9,7 +9,6 @@ import ( "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric" - "go.opentelemetry.io/collector/config/configtelemetry" "go.opentelemetry.io/collector/processor" "go.opentelemetry.io/collector/processor/batchprocessor/internal/metadata" "go.opentelemetry.io/collector/processor/internal" @@ -23,8 +22,6 @@ const ( ) type batchProcessorTelemetry struct { - detailed bool - exportCtx context.Context processorAttr metric.MeasurementOption @@ -44,7 +41,6 @@ func newBatchProcessorTelemetry(set processor.Settings, currentMetadataCardinali return &batchProcessorTelemetry{ exportCtx: context.Background(), - detailed: set.MetricsLevel == configtelemetry.LevelDetailed, telemetryBuilder: telemetryBuilder, processorAttr: attrs, }, nil diff --git a/service/telemetry/metrics.go b/service/telemetry/metrics.go index 0b2690c0af1..70deb0220b2 100644 --- a/service/telemetry/metrics.go +++ b/service/telemetry/metrics.go @@ -10,6 +10,7 @@ import ( "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/metric/noop" + "go.opentelemetry.io/otel/sdk/instrumentation" sdkmetric "go.opentelemetry.io/otel/sdk/metric" "go.opentelemetry.io/otel/sdk/resource" "go.uber.org/multierr" @@ -36,6 +37,15 @@ type meterProviderSettings struct { asyncErrorChannel chan error } +func dropViewOption(instrument sdkmetric.Instrument) sdkmetric.Option { + return sdkmetric.WithView(sdkmetric.NewView( + instrument, + sdkmetric.Stream{ + Aggregation: sdkmetric.AggregationDrop{}, + }, + )) +} + // newMeterProvider creates a new MeterProvider from Config. func newMeterProvider(set meterProviderSettings, disableHighCardinality bool) (metric.MeterProvider, error) { if set.cfg.Level == configtelemetry.LevelNone || len(set.cfg.Readers) == 0 { @@ -56,6 +66,73 @@ func newMeterProvider(set meterProviderSettings, disableHighCardinality bool) (m opts = append(opts, sdkmetric.WithReader(r)) } + // otel-arrow library metrics + // See https://github.com/open-telemetry/otel-arrow/blob/c39257/pkg/otel/arrow_record/consumer.go#L174-L176 + if set.cfg.Level < configtelemetry.LevelNormal { + scope := instrumentation.Scope{Name: "otel-arrow/pkg/otel/arrow_record"} + opts = append(opts, + dropViewOption(sdkmetric.Instrument{ + Name: "arrow_batch_records", + Scope: scope, + }), + dropViewOption(sdkmetric.Instrument{ + Name: "arrow_schema_resets", + Scope: scope, + }), + dropViewOption(sdkmetric.Instrument{ + Name: "arrow_memory_inuse", + Scope: scope, + }), + ) + } + + // contrib's internal/otelarrow/netstats metrics + // See + // - https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/a25f05/internal/otelarrow/netstats/netstats.go#L130 + // - https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/a25f05/internal/otelarrow/netstats/netstats.go#L165 + if set.cfg.Level < configtelemetry.LevelDetailed { + scope := instrumentation.Scope{Name: "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/netstats"} + // Compressed size metrics. + opts = append(opts, dropViewOption(sdkmetric.Instrument{ + Name: "otelcol_*_compressed_size", + Scope: scope, + })) + + opts = append(opts, dropViewOption(sdkmetric.Instrument{ + Name: "otelcol_*_compressed_size", + Scope: scope, + })) + + // makeRecvMetrics for exporters. + opts = append(opts, dropViewOption(sdkmetric.Instrument{ + Name: "otelcol_exporter_recv", + Scope: scope, + })) + opts = append(opts, dropViewOption(sdkmetric.Instrument{ + Name: "otelcol_exporter_recv_wire", + Scope: scope, + })) + + // makeSentMetrics for receivers. + opts = append(opts, dropViewOption(sdkmetric.Instrument{ + Name: "otelcol_receiver_sent", + Scope: scope, + })) + opts = append(opts, dropViewOption(sdkmetric.Instrument{ + Name: "otelcol_receiver_sent_wire", + Scope: scope, + })) + } + + // Batch processor metrics + if set.cfg.Level < configtelemetry.LevelDetailed { + scope := instrumentation.Scope{Name: "go.opentelemetry.io/collector/processor/batchprocessor"} + opts = append(opts, dropViewOption(sdkmetric.Instrument{ + Name: "otelcol_processor_batch_batch_send_size_bytes", + Scope: scope, + })) + } + var err error mp.MeterProvider, err = otelinit.InitOpenTelemetry(set.res, opts, disableHighCardinality) if err != nil { From ba34135418f5215f9e691579ddf3801022d6e4ca Mon Sep 17 00:00:00 2001 From: Pablo Baeyens Date: Tue, 21 Jan 2025 17:47:21 +0100 Subject: [PATCH 2/3] Add test for 'Enabled' interface --- service/telemetry/metrics_test.go | 69 +++++++++++++++++++++++++++++++ 1 file changed, 69 insertions(+) diff --git a/service/telemetry/metrics_test.go b/service/telemetry/metrics_test.go index 35aa2114d21..0a7edebf79a 100644 --- a/service/telemetry/metrics_test.go +++ b/service/telemetry/metrics_test.go @@ -10,11 +10,13 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" io_prometheus_client "github.com/prometheus/client_model/go" "github.com/prometheus/common/expfmt" "github.com/stretchr/testify/require" "go.opentelemetry.io/contrib/config" "go.opentelemetry.io/otel/metric" + sdkresource "go.opentelemetry.io/otel/sdk/resource" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/configtelemetry" @@ -232,3 +234,70 @@ func getMetricsFromPrometheus(t *testing.T, endpoint string) map[string]*io_prom return parsed } + +// Test that the MeterProvider implements the 'Enabled' functionality. +// See https://pkg.go.dev/go.opentelemetry.io/otel/sdk/metric/internal/x#readme-instrument-enabled. +func TestInstrumentEnabled(t *testing.T) { + prom := promtest.GetAvailableLocalAddressPrometheus(t) + set := meterProviderSettings{ + res: sdkresource.Default(), + cfg: MetricsConfig{ + Level: configtelemetry.LevelDetailed, + Readers: []config.MetricReader{{ + Pull: &config.PullMetricReader{Exporter: config.MetricExporter{Prometheus: prom}}, + }}, + }, + asyncErrorChannel: make(chan error), + } + meterProvider, err := newMeterProvider(set, false) + defer func() { + if prov, ok := meterProvider.(interface{ Shutdown(context.Context) error }); ok { + require.NoError(t, prov.Shutdown(context.Background())) + } + }() + require.NoError(t, err) + + meter := meterProvider.Meter("go.opentelemetry.io/collector/service/telemetry") + + type enabledInstrument interface{ Enabled(context.Context) bool } + + intCnt, err := meter.Int64Counter("int64.counter") + require.NoError(t, err) + _, ok := intCnt.(enabledInstrument) + assert.True(t, ok, "Int64Counter does not implement the experimental 'Enabled' method") + + intUpDownCnt, err := meter.Int64UpDownCounter("int64.updowncounter") + require.NoError(t, err) + _, ok = intUpDownCnt.(enabledInstrument) + assert.True(t, ok, "Int64UpDownCounter does not implement the experimental 'Enabled' method") + + intHist, err := meter.Int64Histogram("int64.updowncounter") + require.NoError(t, err) + _, ok = intHist.(enabledInstrument) + assert.True(t, ok, "Int64Histogram does not implement the experimental 'Enabled' method") + + intGauge, err := meter.Int64Gauge("int64.updowncounter") + require.NoError(t, err) + _, ok = intGauge.(enabledInstrument) + assert.True(t, ok, "Int64Gauge does not implement the experimental 'Enabled' method") + + floatCnt, err := meter.Float64Counter("int64.updowncounter") + require.NoError(t, err) + _, ok = floatCnt.(enabledInstrument) + assert.True(t, ok, "Float64Counter does not implement the experimental 'Enabled' method") + + floatUpDownCnt, err := meter.Float64UpDownCounter("int64.updowncounter") + require.NoError(t, err) + _, ok = floatUpDownCnt.(enabledInstrument) + assert.True(t, ok, "Float64UpDownCounter does not implement the experimental 'Enabled' method") + + floatHist, err := meter.Float64Histogram("int64.updowncounter") + require.NoError(t, err) + _, ok = floatHist.(enabledInstrument) + assert.True(t, ok, "Float64Histogram does not implement the experimental 'Enabled' method") + + floatGauge, err := meter.Float64Gauge("int64.updowncounter") + require.NoError(t, err) + _, ok = floatGauge.(enabledInstrument) + assert.True(t, ok, "Float64Gauge does not implement the experimental 'Enabled' method") +} From 1beb31a86b81faeb9b1754305cc3b6c2d92f8713 Mon Sep 17 00:00:00 2001 From: Pablo Baeyens Date: Tue, 21 Jan 2025 17:47:48 +0100 Subject: [PATCH 3/3] make fmt --- service/telemetry/metrics_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/service/telemetry/metrics_test.go b/service/telemetry/metrics_test.go index 0a7edebf79a..9e86b20e92c 100644 --- a/service/telemetry/metrics_test.go +++ b/service/telemetry/metrics_test.go @@ -10,9 +10,9 @@ import ( "testing" "time" - "github.com/stretchr/testify/assert" io_prometheus_client "github.com/prometheus/client_model/go" "github.com/prometheus/common/expfmt" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/contrib/config" "go.opentelemetry.io/otel/metric" @@ -242,7 +242,7 @@ func TestInstrumentEnabled(t *testing.T) { set := meterProviderSettings{ res: sdkresource.Default(), cfg: MetricsConfig{ - Level: configtelemetry.LevelDetailed, + Level: configtelemetry.LevelDetailed, Readers: []config.MetricReader{{ Pull: &config.PullMetricReader{Exporter: config.MetricExporter{Prometheus: prom}}, }},