Skip to content

Commit

Permalink
[processor/filter] mv from opencensus to otel
Browse files Browse the repository at this point in the history
This moves the telemetry generated by the processor from using opencensus to using otel. Note that the test checks the name of the metric in the format that it is generated using the meter provider, hence the change from `processor_filter_datapoints_filtered` to `processor/filter/datapoints.filtered`. When using the prometheus exporter, the output name will be the same as before.

Fixes open-telemetry#30734

Signed-off-by: Alex Boten <[email protected]>
  • Loading branch information
Alex Boten committed Jan 24, 2024
1 parent 409ecbf commit 851c20a
Show file tree
Hide file tree
Showing 6 changed files with 118 additions and 163 deletions.
10 changes: 5 additions & 5 deletions processor/filterprocessor/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,10 @@ module github.com/open-telemetry/opentelemetry-collector-contrib/processor/filte
go 1.20

require (
contrib.go.opencensus.io/exporter/prometheus v0.4.2
github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.93.0
github.com/open-telemetry/opentelemetry-collector-contrib/internal/filter v0.93.0
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl v0.93.0
github.com/prometheus/client_golang v1.18.0
github.com/prometheus/client_model v0.5.0
github.com/prometheus/common v0.46.0
github.com/stretchr/testify v1.8.4
go.opencensus.io v0.24.0
go.opentelemetry.io/collector/component v0.93.0
go.opentelemetry.io/collector/confmap v0.93.0
go.opentelemetry.io/collector/consumer v0.93.0
Expand All @@ -26,6 +21,7 @@ require (
)

require (
contrib.go.opencensus.io/exporter/prometheus v0.4.2 // indirect
github.com/alecthomas/participle/v2 v2.1.1 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
Expand Down Expand Up @@ -54,8 +50,12 @@ require (
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.93.0 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_golang v1.18.0 // indirect
github.com/prometheus/client_model v0.5.0 // indirect
github.com/prometheus/common v0.46.0 // indirect
github.com/prometheus/procfs v0.12.0 // indirect
github.com/prometheus/statsd_exporter v0.22.7 // indirect
go.opencensus.io v0.24.0 // indirect
go.opentelemetry.io/collector v0.93.0 // indirect
go.opentelemetry.io/collector/config/configtelemetry v0.93.0 // indirect
go.opentelemetry.io/collector/featuregate v1.0.1 // indirect
Expand Down
2 changes: 1 addition & 1 deletion processor/filterprocessor/logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -765,7 +765,7 @@ func TestFilterLogProcessorWithOTTL(t *testing.T) {

func TestFilterLogProcessorTelemetry(t *testing.T) {
telemetryTest(t, "FilterLogProcessorTelemetry", func(t *testing.T, tel testTelemetry) {
processor, err := newFilterLogsProcessor(processortest.NewNopCreateSettings(), &Config{
processor, err := newFilterLogsProcessor(tel.NewProcessorCreateSettings(), &Config{
Logs: LogFilters{LogConditions: []string{`IsMatch(body, "operationA")`}},
})
assert.NoError(t, err)
Expand Down
8 changes: 4 additions & 4 deletions processor/filterprocessor/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@ func TestFilterMetricProcessorTelemetry(t *testing.T) {
factory := NewFactory()
fmp, err := factory.CreateMetricsProcessor(
context.Background(),
processortest.NewNopCreateSettings(),
tel.NewProcessorCreateSettings(),
cfg,
next,
)
Expand All @@ -401,7 +401,7 @@ func TestFilterMetricProcessorTelemetry(t *testing.T) {
assert.NoError(t, err)

tel.assertMetrics(t, expectedMetrics{
metricDataPointsFiltered: float64(0),
metricDataPointsFiltered: 0,
})

err = fmp.ConsumeMetrics(context.Background(), testResourceMetrics([]metricWithResource{
Expand All @@ -415,7 +415,7 @@ func TestFilterMetricProcessorTelemetry(t *testing.T) {
assert.NoError(t, err)

tel.assertMetrics(t, expectedMetrics{
metricDataPointsFiltered: float64(1),
metricDataPointsFiltered: 1,
})

err = fmp.ConsumeMetrics(context.Background(), testResourceMetrics([]metricWithResource{
Expand All @@ -429,7 +429,7 @@ func TestFilterMetricProcessorTelemetry(t *testing.T) {
assert.NoError(t, err)

tel.assertMetrics(t, expectedMetrics{
metricDataPointsFiltered: float64(2),
metricDataPointsFiltered: 2,
})

assert.NoError(t, fmp.Shutdown(ctx))
Expand Down
93 changes: 40 additions & 53 deletions processor/filterprocessor/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,10 @@ package filterprocessor // import "github.com/open-telemetry/opentelemetry-colle
import (
"context"

"go.opencensus.io/stats"
"go.opencensus.io/stats/view"
"go.opencensus.io/tag"
"go.opentelemetry.io/collector/processor"
"go.opentelemetry.io/collector/processor/processorhelper"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/filterprocessor/internal/metadata"
)
Expand All @@ -24,78 +22,67 @@ const (
triggerSpansDropped
)

var (
typeStr = metadata.Type
processorTagKey = tag.MustNewKey(typeStr)
statMetricDataPointsFiltered = stats.Int64("datapoints.filtered", "Number of metric data points dropped by the filter processor", stats.UnitDimensionless)
statLogsFiltered = stats.Int64("logs.filtered", "Number of logs dropped by the filter processor", stats.UnitDimensionless)
statSpansFiltered = stats.Int64("spans.filtered", "Number of spans dropped by the filter processor", stats.UnitDimensionless)
)

func init() {
// TODO: Find a way to handle the error.
_ = view.Register(metricViews()...)
}

func metricViews() []*view.View {
processorTagKeys := []tag.Key{processorTagKey}

return []*view.View{
{
Name: processorhelper.BuildCustomMetricName(typeStr, statMetricDataPointsFiltered.Name()),
Measure: statMetricDataPointsFiltered,
Description: statMetricDataPointsFiltered.Description(),
Aggregation: view.Sum(),
TagKeys: processorTagKeys,
},
{
Name: processorhelper.BuildCustomMetricName(typeStr, statLogsFiltered.Name()),
Measure: statLogsFiltered,
Description: statLogsFiltered.Description(),
Aggregation: view.Sum(),
TagKeys: processorTagKeys,
},
{
Name: processorhelper.BuildCustomMetricName(typeStr, statSpansFiltered.Name()),
Measure: statSpansFiltered,
Description: statSpansFiltered.Description(),
Aggregation: view.Sum(),
TagKeys: processorTagKeys,
},
}
}

type filterProcessorTelemetry struct {
exportCtx context.Context

processorAttr []attribute.KeyValue

datapointsFiltered metric.Int64Counter
logsFiltered metric.Int64Counter
spansFiltered metric.Int64Counter
}

func newfilterProcessorTelemetry(set processor.CreateSettings) (*filterProcessorTelemetry, error) {
processorID := set.ID.String()

exportCtx, err := tag.New(context.Background(), tag.Insert(processorTagKey, processorID))
fpt := &filterProcessorTelemetry{
processorAttr: []attribute.KeyValue{attribute.String(metadata.Type, processorID)},
exportCtx: context.Background(),
}

counter, err := metadata.Meter(set.TelemetrySettings).Int64Counter(
processorhelper.BuildCustomMetricName(metadata.Type, "datapoints.filtered"),
metric.WithDescription("Number of metric data points dropped by the filter processor"),
metric.WithUnit("1"),
)
if err != nil {
return nil, err
}
fpt := &filterProcessorTelemetry{
processorAttr: []attribute.KeyValue{attribute.String(typeStr, processorID)},
exportCtx: exportCtx,
fpt.datapointsFiltered = counter

counter, err = metadata.Meter(set.TelemetrySettings).Int64Counter(
processorhelper.BuildCustomMetricName(metadata.Type, "logs.filtered"),
metric.WithDescription("Number of logs dropped by the filter processor"),
metric.WithUnit("1"),
)
if err != nil {
return nil, err
}
fpt.logsFiltered = counter

counter, err = metadata.Meter(set.TelemetrySettings).Int64Counter(
processorhelper.BuildCustomMetricName(metadata.Type, "spans.filtered"),
metric.WithDescription("Number of spans dropped by the filter processor"),
metric.WithUnit("1"),
)
if err != nil {
return nil, err
}
fpt.spansFiltered = counter

return fpt, nil
}

func (fpt *filterProcessorTelemetry) record(trigger trigger, dropped int64) {
var triggerMeasure *stats.Int64Measure
var triggerMeasure metric.Int64Counter
switch trigger {
case triggerMetricDataPointsDropped:
triggerMeasure = statMetricDataPointsFiltered
triggerMeasure = fpt.datapointsFiltered
case triggerLogsDropped:
triggerMeasure = statLogsFiltered
triggerMeasure = fpt.logsFiltered
case triggerSpansDropped:
triggerMeasure = statSpansFiltered
triggerMeasure = fpt.spansFiltered
}

stats.Record(fpt.exportCtx, triggerMeasure.M(dropped))
triggerMeasure.Add(fpt.exportCtx, dropped)
}
Loading

0 comments on commit 851c20a

Please sign in to comment.