Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[chore][service] Drop component metrics depending on level #12143

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions processor/batchprocessor/batch_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,10 +264,17 @@ func (b *shard[T]) sendItems(trigger trigger) {
return
}
var bytes int
if b.processor.telemetry.detailed {
bpt := b.processor.telemetry

// Check if the instrument is enabled to calculate the size of the batch in bytes.
// See https://pkg.go.dev/go.opentelemetry.io/otel/sdk/metric/internal/x#readme-instrument-enabled
batchSendSizeBytes := bpt.telemetryBuilder.ProcessorBatchBatchSendSizeBytes
instr, ok := batchSendSizeBytes.(interface{ Enabled(context.Context) bool })
if !ok || instr.Enabled(bpt.exportCtx) {
mx-psi marked this conversation as resolved.
Show resolved Hide resolved
bytes = b.batch.sizeBytes(req)
}
b.processor.telemetry.record(trigger, int64(sent), int64(bytes))

bpt.record(trigger, int64(sent), int64(bytes))
}

// singleShardBatcher is used when metadataKeys is empty, to avoid the
Expand Down
4 changes: 0 additions & 4 deletions processor/batchprocessor/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"

"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/collector/processor"
"go.opentelemetry.io/collector/processor/batchprocessor/internal/metadata"
"go.opentelemetry.io/collector/processor/internal"
Expand All @@ -23,8 +22,6 @@ const (
)

type batchProcessorTelemetry struct {
detailed bool

exportCtx context.Context

processorAttr metric.MeasurementOption
Expand All @@ -44,7 +41,6 @@ func newBatchProcessorTelemetry(set processor.Settings, currentMetadataCardinali

return &batchProcessorTelemetry{
exportCtx: context.Background(),
detailed: set.MetricsLevel == configtelemetry.LevelDetailed,
telemetryBuilder: telemetryBuilder,
processorAttr: attrs,
}, nil
Expand Down
77 changes: 77 additions & 0 deletions service/telemetry/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/noop"
"go.opentelemetry.io/otel/sdk/instrumentation"
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/resource"
"go.uber.org/multierr"
Expand All @@ -36,6 +37,15 @@ type meterProviderSettings struct {
asyncErrorChannel chan error
}

func dropViewOption(instrument sdkmetric.Instrument) sdkmetric.Option {
return sdkmetric.WithView(sdkmetric.NewView(
instrument,
sdkmetric.Stream{
Aggregation: sdkmetric.AggregationDrop{},
},
))
}

// newMeterProvider creates a new MeterProvider from Config.
func newMeterProvider(set meterProviderSettings, disableHighCardinality bool) (metric.MeterProvider, error) {
if set.cfg.Level == configtelemetry.LevelNone || len(set.cfg.Readers) == 0 {
Expand All @@ -56,6 +66,73 @@ func newMeterProvider(set meterProviderSettings, disableHighCardinality bool) (m
opts = append(opts, sdkmetric.WithReader(r))
}

// otel-arrow library metrics
// See https://github.com/open-telemetry/otel-arrow/blob/c39257/pkg/otel/arrow_record/consumer.go#L174-L176
if set.cfg.Level < configtelemetry.LevelNormal {
scope := instrumentation.Scope{Name: "otel-arrow/pkg/otel/arrow_record"}
opts = append(opts,
dropViewOption(sdkmetric.Instrument{
Name: "arrow_batch_records",
Scope: scope,
}),
dropViewOption(sdkmetric.Instrument{
Name: "arrow_schema_resets",
Scope: scope,
}),
dropViewOption(sdkmetric.Instrument{
Name: "arrow_memory_inuse",
Scope: scope,
}),
)
}

// contrib's internal/otelarrow/netstats metrics
// See
// - https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/a25f05/internal/otelarrow/netstats/netstats.go#L130
// - https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/a25f05/internal/otelarrow/netstats/netstats.go#L165
if set.cfg.Level < configtelemetry.LevelDetailed {
scope := instrumentation.Scope{Name: "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/netstats"}
// Compressed size metrics.
opts = append(opts, dropViewOption(sdkmetric.Instrument{
Name: "otelcol_*_compressed_size",
Scope: scope,
}))

opts = append(opts, dropViewOption(sdkmetric.Instrument{
Name: "otelcol_*_compressed_size",
Scope: scope,
}))

// makeRecvMetrics for exporters.
opts = append(opts, dropViewOption(sdkmetric.Instrument{
Name: "otelcol_exporter_recv",
Scope: scope,
}))
opts = append(opts, dropViewOption(sdkmetric.Instrument{
Name: "otelcol_exporter_recv_wire",
Scope: scope,
}))

// makeSentMetrics for receivers.
opts = append(opts, dropViewOption(sdkmetric.Instrument{
Name: "otelcol_receiver_sent",
Scope: scope,
}))
opts = append(opts, dropViewOption(sdkmetric.Instrument{
Name: "otelcol_receiver_sent_wire",
Scope: scope,
}))
}

// Batch processor metrics
if set.cfg.Level < configtelemetry.LevelDetailed {
scope := instrumentation.Scope{Name: "go.opentelemetry.io/collector/processor/batchprocessor"}
opts = append(opts, dropViewOption(sdkmetric.Instrument{
Name: "otelcol_processor_batch_batch_send_size_bytes",
Scope: scope,
}))
}

var err error
mp.MeterProvider, err = otelinit.InitOpenTelemetry(set.res, opts, disableHighCardinality)
if err != nil {
Expand Down
Loading