Skip to content

Commit

Permalink
WIP [processor/groupbyattrs] move from OpenCensus to OpenTelemetry
Browse files Browse the repository at this point in the history
Fixes open-telemetry#30763

Signed-off-by: Alex Boten <[email protected]>
  • Loading branch information
Alex Boten committed Feb 1, 2024
1 parent 0e28e15 commit a90086d
Show file tree
Hide file tree
Showing 7 changed files with 185 additions and 132 deletions.
27 changes: 27 additions & 0 deletions .chloggen/codeboten_rm-groupbyattrs-census.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: groupbyattributesprocessor

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: move metrics from OpenCensus to OpenTelemetry

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [30763]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
34 changes: 19 additions & 15 deletions processor/groupbyattrsprocessor/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,7 @@ package groupbyattrsprocessor // import "github.com/open-telemetry/opentelemetry

import (
"context"
"sync"

"go.opencensus.io/stats/view"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/processor"
Expand All @@ -21,15 +19,8 @@ var (
consumerCapabilities = consumer.Capabilities{MutatesData: true}
)

var once sync.Once

// NewFactory returns a new factory for the Filter processor.
func NewFactory() processor.Factory {
once.Do(func() {
// TODO: as with other -contrib factories registering metrics, this is causing the error being ignored
_ = view.Register(metricViews()...)
})

return processor.NewFactory(
metadata.Type,
createDefaultConfig,
Expand All @@ -45,23 +36,27 @@ func createDefaultConfig() component.Config {
}
}

func createGroupByAttrsProcessor(logger *zap.Logger, attributes []string) *groupByAttrsProcessor {
func createGroupByAttrsProcessor(set processor.CreateSettings, attributes []string) (*groupByAttrsProcessor, error) {
var nonEmptyAttributes []string
presentAttributes := make(map[string]struct{})

for _, str := range attributes {
if str != "" {
_, isPresent := presentAttributes[str]
if isPresent {
logger.Warn("A grouping key is already present", zap.String("key", str))
set.Logger.Warn("A grouping key is already present", zap.String("key", str))
} else {
nonEmptyAttributes = append(nonEmptyAttributes, str)
presentAttributes[str] = struct{}{}
}
}
}

return &groupByAttrsProcessor{logger: logger, groupByKeys: nonEmptyAttributes}
it, err := newProcessorTelemetry(set.TelemetrySettings)
if err != nil {
return nil, err
}
return &groupByAttrsProcessor{logger: set.Logger, groupByKeys: nonEmptyAttributes, internalTelemetry: it}, nil
}

// createTracesProcessor creates a trace processor based on this config.
Expand All @@ -72,7 +67,10 @@ func createTracesProcessor(
nextConsumer consumer.Traces) (processor.Traces, error) {

oCfg := cfg.(*Config)
gap := createGroupByAttrsProcessor(set.Logger, oCfg.GroupByKeys)
gap, err := createGroupByAttrsProcessor(set, oCfg.GroupByKeys)
if err != nil {
return nil, err
}

return processorhelper.NewTracesProcessor(
ctx,
Expand All @@ -91,7 +89,10 @@ func createLogsProcessor(
nextConsumer consumer.Logs) (processor.Logs, error) {

oCfg := cfg.(*Config)
gap := createGroupByAttrsProcessor(set.Logger, oCfg.GroupByKeys)
gap, err := createGroupByAttrsProcessor(set, oCfg.GroupByKeys)
if err != nil {
return nil, err
}

return processorhelper.NewLogsProcessor(
ctx,
Expand All @@ -110,7 +111,10 @@ func createMetricsProcessor(
nextConsumer consumer.Metrics) (processor.Metrics, error) {

oCfg := cfg.(*Config)
gap := createGroupByAttrsProcessor(set.Logger, oCfg.GroupByKeys)
gap, err := createGroupByAttrsProcessor(set, oCfg.GroupByKeys)
if err != nil {
return nil, err
}

return processorhelper.NewMetricsProcessor(
ctx,
Expand Down
8 changes: 5 additions & 3 deletions processor/groupbyattrsprocessor/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/processor/processortest"
"go.uber.org/zap"
)

func TestDefaultConfiguration(t *testing.T) {
Expand Down Expand Up @@ -41,12 +41,14 @@ func TestCreateTestProcessor(t *testing.T) {

func TestNoKeys(t *testing.T) {
// This is allowed since can be used for compacting data
gap := createGroupByAttrsProcessor(zap.NewNop(), []string{})
gap, err := createGroupByAttrsProcessor(processortest.NewNopCreateSettings(), []string{})
require.NoError(t, err)
assert.NotNil(t, gap)
}

func TestDuplicateKeys(t *testing.T) {
gbap := createGroupByAttrsProcessor(zap.NewNop(), []string{"foo", "foo", ""})
gbap, err := createGroupByAttrsProcessor(processortest.NewNopCreateSettings(), []string{"foo", "foo", ""})
require.NoError(t, err)
assert.NotNil(t, gbap)
assert.EqualValues(t, []string{"foo"}, gbap.groupByKeys)
}
184 changes: 110 additions & 74 deletions processor/groupbyattrsprocessor/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,87 +4,123 @@
package groupbyattrsprocessor // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/groupbyattrsprocessor"

import (
"go.opencensus.io/stats"
"go.opencensus.io/stats/view"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/processor/processorhelper"
"go.opentelemetry.io/otel/metric"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/groupbyattrsprocessor/internal/metadata"
)

var (
mNumGroupedSpans = stats.Int64("num_grouped_spans", "Number of spans that had attributes grouped", stats.UnitDimensionless)
mNumNonGroupedSpans = stats.Int64("num_non_grouped_spans", "Number of spans that did not have attributes grouped", stats.UnitDimensionless)
mDistSpanGroups = stats.Int64("span_groups", "Distribution of groups extracted for spans", stats.UnitDimensionless)
type internalTelemetry struct {
mNumGroupedSpans metric.Int64Counter
mNumNonGroupedSpans metric.Int64Counter
mDistSpanGroups metric.Int64Histogram

mNumGroupedLogs = stats.Int64("num_grouped_logs", "Number of logs that had attributes grouped", stats.UnitDimensionless)
mNumNonGroupedLogs = stats.Int64("num_non_grouped_logs", "Number of logs that did not have attributes grouped", stats.UnitDimensionless)
mDistLogGroups = stats.Int64("log_groups", "Distribution of groups extracted for logs", stats.UnitDimensionless)
mNumGroupedLogs metric.Int64Counter
mNumNonGroupedLogs metric.Int64Counter
mDistLogGroups metric.Int64Histogram

mNumGroupedMetrics = stats.Int64("num_grouped_metrics", "Number of metrics that had attributes grouped", stats.UnitDimensionless)
mNumNonGroupedMetrics = stats.Int64("num_non_grouped_metrics", "Number of metrics that did not have attributes grouped", stats.UnitDimensionless)
mDistMetricGroups = stats.Int64("metric_groups", "Distribution of groups extracted for metrics", stats.UnitDimensionless)
)
mNumGroupedMetrics metric.Int64Counter
mNumNonGroupedMetrics metric.Int64Counter
mDistMetricGroups metric.Int64Histogram
}

func newProcessorTelemetry(set component.TelemetrySettings) (*internalTelemetry, error) {
it := internalTelemetry{}

counter, err := metadata.Meter(set).Int64Counter(
processorhelper.BuildCustomMetricName(metadata.Type, "num_grouped_spans"),
metric.WithDescription("Number of spans that had attributes grouped"),
metric.WithUnit("1"),
)
if err != nil {
return nil, err
}
it.mNumGroupedSpans = counter

counter, err = metadata.Meter(set).Int64Counter(
processorhelper.BuildCustomMetricName(metadata.Type, "num_non_grouped_spans"),
metric.WithDescription("Number of spans that did not have attributes grouped"),
metric.WithUnit("1"),
)
if err != nil {
return nil, err
}

it.mNumNonGroupedSpans = counter

histo, err := metadata.Meter(set).Int64Histogram(
processorhelper.BuildCustomMetricName(metadata.Type, "span_groups"),
metric.WithDescription("Distribution of groups extracted for spans"),
metric.WithUnit("1"),
)
if err != nil {
return nil, err
}

it.mDistSpanGroups = histo

counter, err = metadata.Meter(set).Int64Counter(
processorhelper.BuildCustomMetricName(metadata.Type, "num_grouped_logs"),
metric.WithDescription("Number of logs that had attributes grouped"),
metric.WithUnit("1"),
)
if err != nil {
return nil, err
}
it.mNumGroupedLogs = counter

counter, err = metadata.Meter(set).Int64Counter(
processorhelper.BuildCustomMetricName(metadata.Type, "num_non_grouped_logs"),
metric.WithDescription("Number of logs that did not have attributes grouped"),
metric.WithUnit("1"),
)
if err != nil {
return nil, err
}

it.mNumNonGroupedLogs = counter

histo, err = metadata.Meter(set).Int64Histogram(
processorhelper.BuildCustomMetricName(metadata.Type, "log_groups"),
metric.WithDescription("Distribution of groups extracted for logs"),
metric.WithUnit("1"),
)
if err != nil {
return nil, err
}

// metricViews return the metrics views according to given telemetry level.
func metricViews() []*view.View {
distributionGroups := view.Distribution(1, 2, 5, 10, 20, 50, 100, 500, 2000)

return []*view.View{
{
Name: processorhelper.BuildCustomMetricName(string(metadata.Type), mNumGroupedSpans.Name()),
Measure: mNumGroupedSpans,
Description: mNumGroupedSpans.Description(),
Aggregation: view.Sum(),
},
{
Name: processorhelper.BuildCustomMetricName(string(metadata.Type), mNumNonGroupedSpans.Name()),
Measure: mNumNonGroupedSpans,
Description: mNumNonGroupedSpans.Description(),
Aggregation: view.Sum(),
},
{
Name: processorhelper.BuildCustomMetricName(string(metadata.Type), mDistSpanGroups.Name()),
Measure: mDistSpanGroups,
Description: mDistSpanGroups.Description(),
Aggregation: distributionGroups,
},

{
Name: processorhelper.BuildCustomMetricName(string(metadata.Type), mNumGroupedLogs.Name()),
Measure: mNumGroupedLogs,
Description: mNumGroupedLogs.Description(),
Aggregation: view.Sum(),
},
{
Name: processorhelper.BuildCustomMetricName(string(metadata.Type), mNumNonGroupedLogs.Name()),
Measure: mNumNonGroupedLogs,
Description: mNumNonGroupedLogs.Description(),
Aggregation: view.Sum(),
},
{
Name: processorhelper.BuildCustomMetricName(string(metadata.Type), mDistLogGroups.Name()),
Measure: mDistLogGroups,
Description: mDistLogGroups.Description(),
Aggregation: distributionGroups,
},

{
Name: processorhelper.BuildCustomMetricName(string(metadata.Type), mNumGroupedMetrics.Name()),
Measure: mNumGroupedMetrics,
Description: mNumGroupedMetrics.Description(),
Aggregation: view.Sum(),
},
{
Name: processorhelper.BuildCustomMetricName(string(metadata.Type), mNumNonGroupedMetrics.Name()),
Measure: mNumNonGroupedMetrics,
Description: mNumNonGroupedMetrics.Description(),
Aggregation: view.Sum(),
},
{
Name: processorhelper.BuildCustomMetricName(string(metadata.Type), mDistMetricGroups.Name()),
Measure: mDistMetricGroups,
Description: mDistMetricGroups.Description(),
Aggregation: distributionGroups,
},
it.mDistLogGroups = histo

counter, err = metadata.Meter(set).Int64Counter(
processorhelper.BuildCustomMetricName(metadata.Type, "num_grouped_metrics"),
metric.WithDescription("Number of metrics that had attributes grouped"),
metric.WithUnit("1"),
)
if err != nil {
return nil, err
}
it.mNumGroupedMetrics = counter

counter, err = metadata.Meter(set).Int64Counter(
processorhelper.BuildCustomMetricName(metadata.Type, "num_non_grouped_metrics"),
metric.WithDescription("Number of metrics that did not have attributes grouped"),
metric.WithUnit("1"),
)
if err != nil {
return nil, err
}
it.mNumNonGroupedMetrics = counter

histo, err = metadata.Meter(set).Int64Histogram(
processorhelper.BuildCustomMetricName(metadata.Type, "metric_groups"),
metric.WithDescription("Distribution of groups extracted for metrics"),
metric.WithUnit("1"),
)
if err != nil {
return nil, err
}
it.mDistMetricGroups = histo

return &it, nil
}
22 changes: 0 additions & 22 deletions processor/groupbyattrsprocessor/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,3 @@
// SPDX-License-Identifier: Apache-2.0

package groupbyattrsprocessor

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestProcessorMetrics(t *testing.T) {
expectedViewNames := []string{
"processor/groupbyattrs/num_grouped_spans",
"processor/groupbyattrs/num_non_grouped_spans",
"processor/groupbyattrs/span_groups",
"processor/groupbyattrs/num_grouped_logs",
"processor/groupbyattrs/num_non_grouped_logs",
"processor/groupbyattrs/log_groups",
}

views := metricViews()
for i, viewName := range expectedViewNames {
assert.Equal(t, viewName, views[i].Name)
}
}
Loading

0 comments on commit a90086d

Please sign in to comment.