Skip to content

Commit

Permalink
[processor/filter] mv from opencensus to otel (open-telemetry#30736)
Browse files Browse the repository at this point in the history
This moves the telemetry generated by the processor from using
opencensus to using otel. Note that the test checks the name of the
metric in the format that it is generated using the meter provider,
hence the change from `processor_filter_datapoints_filtered` to
`processor/filter/datapoints.filtered`. When using the prometheus
exporter, the output name will be the same as before.

Fixes open-telemetry#30734

---------

Signed-off-by: Alex Boten <[email protected]>
  • Loading branch information
Alex Boten authored and cparkins committed Feb 1, 2024
1 parent 3a4138c commit b655ceb
Show file tree
Hide file tree
Showing 7 changed files with 157 additions and 165 deletions.
27 changes: 27 additions & 0 deletions .chloggen/codeboten_rm-census-filter.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: filterprocessor

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: move metrics from OpenCensus to OpenTelemetry

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [30736]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
10 changes: 5 additions & 5 deletions processor/filterprocessor/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,10 @@ module github.com/open-telemetry/opentelemetry-collector-contrib/processor/filte
go 1.20

require (
contrib.go.opencensus.io/exporter/prometheus v0.4.2
github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.93.0
github.com/open-telemetry/opentelemetry-collector-contrib/internal/filter v0.93.0
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl v0.93.0
github.com/prometheus/client_golang v1.18.0
github.com/prometheus/client_model v0.5.0
github.com/prometheus/common v0.46.0
github.com/stretchr/testify v1.8.4
go.opencensus.io v0.24.0
go.opentelemetry.io/collector/component v0.93.0
go.opentelemetry.io/collector/confmap v0.93.0
go.opentelemetry.io/collector/consumer v0.93.0
Expand All @@ -26,6 +21,7 @@ require (
)

require (
contrib.go.opencensus.io/exporter/prometheus v0.4.2 // indirect
github.com/alecthomas/participle/v2 v2.1.1 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
Expand Down Expand Up @@ -54,8 +50,12 @@ require (
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.93.0 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_golang v1.18.0 // indirect
github.com/prometheus/client_model v0.5.0 // indirect
github.com/prometheus/common v0.46.0 // indirect
github.com/prometheus/procfs v0.12.0 // indirect
github.com/prometheus/statsd_exporter v0.22.7 // indirect
go.opencensus.io v0.24.0 // indirect
go.opentelemetry.io/collector v0.93.0 // indirect
go.opentelemetry.io/collector/config/configtelemetry v0.93.0 // indirect
go.opentelemetry.io/collector/featuregate v1.0.1 // indirect
Expand Down
2 changes: 1 addition & 1 deletion processor/filterprocessor/logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -765,7 +765,7 @@ func TestFilterLogProcessorWithOTTL(t *testing.T) {

func TestFilterLogProcessorTelemetry(t *testing.T) {
telemetryTest(t, "FilterLogProcessorTelemetry", func(t *testing.T, tel testTelemetry) {
processor, err := newFilterLogsProcessor(processortest.NewNopCreateSettings(), &Config{
processor, err := newFilterLogsProcessor(tel.NewProcessorCreateSettings(), &Config{
Logs: LogFilters{LogConditions: []string{`IsMatch(body, "operationA")`}},
})
assert.NoError(t, err)
Expand Down
8 changes: 4 additions & 4 deletions processor/filterprocessor/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@ func TestFilterMetricProcessorTelemetry(t *testing.T) {
factory := NewFactory()
fmp, err := factory.CreateMetricsProcessor(
context.Background(),
processortest.NewNopCreateSettings(),
tel.NewProcessorCreateSettings(),
cfg,
next,
)
Expand All @@ -401,7 +401,7 @@ func TestFilterMetricProcessorTelemetry(t *testing.T) {
assert.NoError(t, err)

tel.assertMetrics(t, expectedMetrics{
metricDataPointsFiltered: float64(0),
metricDataPointsFiltered: 0,
})

err = fmp.ConsumeMetrics(context.Background(), testResourceMetrics([]metricWithResource{
Expand All @@ -415,7 +415,7 @@ func TestFilterMetricProcessorTelemetry(t *testing.T) {
assert.NoError(t, err)

tel.assertMetrics(t, expectedMetrics{
metricDataPointsFiltered: float64(1),
metricDataPointsFiltered: 1,
})

err = fmp.ConsumeMetrics(context.Background(), testResourceMetrics([]metricWithResource{
Expand All @@ -429,7 +429,7 @@ func TestFilterMetricProcessorTelemetry(t *testing.T) {
assert.NoError(t, err)

tel.assertMetrics(t, expectedMetrics{
metricDataPointsFiltered: float64(2),
metricDataPointsFiltered: 2,
})

assert.NoError(t, fmp.Shutdown(ctx))
Expand Down
93 changes: 40 additions & 53 deletions processor/filterprocessor/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,10 @@ package filterprocessor // import "github.com/open-telemetry/opentelemetry-colle
import (
"context"

"go.opencensus.io/stats"
"go.opencensus.io/stats/view"
"go.opencensus.io/tag"
"go.opentelemetry.io/collector/processor"
"go.opentelemetry.io/collector/processor/processorhelper"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/filterprocessor/internal/metadata"
)
Expand All @@ -24,78 +22,67 @@ const (
triggerSpansDropped
)

var (
typeStr = metadata.Type
processorTagKey = tag.MustNewKey(typeStr)
statMetricDataPointsFiltered = stats.Int64("datapoints.filtered", "Number of metric data points dropped by the filter processor", stats.UnitDimensionless)
statLogsFiltered = stats.Int64("logs.filtered", "Number of logs dropped by the filter processor", stats.UnitDimensionless)
statSpansFiltered = stats.Int64("spans.filtered", "Number of spans dropped by the filter processor", stats.UnitDimensionless)
)

func init() {
// TODO: Find a way to handle the error.
_ = view.Register(metricViews()...)
}

func metricViews() []*view.View {
processorTagKeys := []tag.Key{processorTagKey}

return []*view.View{
{
Name: processorhelper.BuildCustomMetricName(typeStr, statMetricDataPointsFiltered.Name()),
Measure: statMetricDataPointsFiltered,
Description: statMetricDataPointsFiltered.Description(),
Aggregation: view.Sum(),
TagKeys: processorTagKeys,
},
{
Name: processorhelper.BuildCustomMetricName(typeStr, statLogsFiltered.Name()),
Measure: statLogsFiltered,
Description: statLogsFiltered.Description(),
Aggregation: view.Sum(),
TagKeys: processorTagKeys,
},
{
Name: processorhelper.BuildCustomMetricName(typeStr, statSpansFiltered.Name()),
Measure: statSpansFiltered,
Description: statSpansFiltered.Description(),
Aggregation: view.Sum(),
TagKeys: processorTagKeys,
},
}
}

type filterProcessorTelemetry struct {
exportCtx context.Context

processorAttr []attribute.KeyValue

datapointsFiltered metric.Int64Counter
logsFiltered metric.Int64Counter
spansFiltered metric.Int64Counter
}

func newfilterProcessorTelemetry(set processor.CreateSettings) (*filterProcessorTelemetry, error) {
processorID := set.ID.String()

exportCtx, err := tag.New(context.Background(), tag.Insert(processorTagKey, processorID))
fpt := &filterProcessorTelemetry{
processorAttr: []attribute.KeyValue{attribute.String(metadata.Type, processorID)},
exportCtx: context.Background(),
}

counter, err := metadata.Meter(set.TelemetrySettings).Int64Counter(
processorhelper.BuildCustomMetricName(metadata.Type, "datapoints.filtered"),
metric.WithDescription("Number of metric data points dropped by the filter processor"),
metric.WithUnit("1"),
)
if err != nil {
return nil, err
}
fpt := &filterProcessorTelemetry{
processorAttr: []attribute.KeyValue{attribute.String(typeStr, processorID)},
exportCtx: exportCtx,
fpt.datapointsFiltered = counter

counter, err = metadata.Meter(set.TelemetrySettings).Int64Counter(
processorhelper.BuildCustomMetricName(metadata.Type, "logs.filtered"),
metric.WithDescription("Number of logs dropped by the filter processor"),
metric.WithUnit("1"),
)
if err != nil {
return nil, err
}
fpt.logsFiltered = counter

counter, err = metadata.Meter(set.TelemetrySettings).Int64Counter(
processorhelper.BuildCustomMetricName(metadata.Type, "spans.filtered"),
metric.WithDescription("Number of spans dropped by the filter processor"),
metric.WithUnit("1"),
)
if err != nil {
return nil, err
}
fpt.spansFiltered = counter

return fpt, nil
}

func (fpt *filterProcessorTelemetry) record(trigger trigger, dropped int64) {
var triggerMeasure *stats.Int64Measure
var triggerMeasure metric.Int64Counter
switch trigger {
case triggerMetricDataPointsDropped:
triggerMeasure = statMetricDataPointsFiltered
triggerMeasure = fpt.datapointsFiltered
case triggerLogsDropped:
triggerMeasure = statLogsFiltered
triggerMeasure = fpt.logsFiltered
case triggerSpansDropped:
triggerMeasure = statSpansFiltered
triggerMeasure = fpt.spansFiltered
}

stats.Record(fpt.exportCtx, triggerMeasure.M(dropped))
triggerMeasure.Add(fpt.exportCtx, dropped, metric.WithAttributes(fpt.processorAttr...))
}
Loading

0 comments on commit b655ceb

Please sign in to comment.