diff --git a/.chloggen/kafka-receiver-profiles.yaml b/.chloggen/kafka-receiver-profiles.yaml new file mode 100644 index 0000000000000..cf4dfc0e13d21 --- /dev/null +++ b/.chloggen/kafka-receiver-profiles.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: kafkareceiver + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add profiles support + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [41479] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/receiver/kafkareceiver/README.md b/receiver/kafkareceiver/README.md index d819eb15f19ed..2035e8f7b0904 100644 --- a/receiver/kafkareceiver/README.md +++ b/receiver/kafkareceiver/README.md @@ -3,12 +3,14 @@ | Status | | | ------------- |-----------| -| Stability | [beta]: metrics, logs, traces | +| Stability | [development]: profiles | +| | [beta]: metrics, logs, traces | | Distributions | [core], [contrib] | | Issues | [![Open issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aopen%20label%3Areceiver%2Fkafka%20&label=open&color=orange&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aopen+is%3Aissue+label%3Areceiver%2Fkafka) [![Closed issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aclosed%20label%3Areceiver%2Fkafka%20&label=closed&color=blue&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aclosed+is%3Aissue+label%3Areceiver%2Fkafka) | | Code coverage | [![codecov](https://codecov.io/github/open-telemetry/opentelemetry-collector-contrib/graph/main/badge.svg?component=receiver_kafka)](https://app.codecov.io/gh/open-telemetry/opentelemetry-collector-contrib/tree/main/?components%5B0%5D=receiver_kafka&displayType=list) | | [Code Owners](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/CONTRIBUTING.md#becoming-a-code-owner) | [@pavolloffay](https://www.github.com/pavolloffay), [@MovieStoreGuy](https://www.github.com/MovieStoreGuy), [@axw](https://www.github.com/axw) | +[development]: https://github.com/open-telemetry/opentelemetry-collector/blob/main/docs/component-stability.md#development [beta]: https://github.com/open-telemetry/opentelemetry-collector/blob/main/docs/component-stability.md#beta [core]: https://github.com/open-telemetry/opentelemetry-collector-releases/tree/main/distributions/otelcol [contrib]: https://github.com/open-telemetry/opentelemetry-collector-releases/tree/main/distributions/otelcol-contrib @@ -48,6 +50,9 @@ The following settings can be optionally configured: - `traces` - `topic` (default = otlp\_spans): The name of the Kafka topic from which to consume traces. - `encoding` (default = otlp\_proto): The encoding for the Kafka topic. See [Supported encodings](#supported-encodings). +- `profiles` + - `topic` (default = otlp\_profiles): The name of the Kafka topic from which to consume profiles. + - `encoding` (default = otlp\_proto): The encoding for the Kafka topic. See [Supported encodings](#supported-encodings). - `topic` (Deprecated [v0.124.0]: use `logs::topic`, `traces::topic`, or `metrics::topic`). If this is set, it will take precedence over the default value for those fields. - `encoding` (Deprecated [v0.124.0]: use `logs::encoding`, `traces::encoding`, or `metrics::encoding`). @@ -109,10 +114,10 @@ The following settings can be optionally configured: **Note: this can block the entire partition in case a message processing returns a permanent error** - `header_extraction`: - `extract_headers` (default = false): Allows user to attach header fields to resource attributes in otel pipeline - - `headers` (default = []): List of headers they'd like to extract from kafka record. - **Note: Matching pattern will be `exact`. Regexes are not supported as of now.** + - `headers` (default = []): List of headers they'd like to extract from kafka record. + **Note: Matching pattern will be `exact`. Regexes are not supported as of now.** - `error_backoff`: [BackOff](https://github.com/open-telemetry/opentelemetry-collector/blob/v0.116.0/config/configretry/backoff.go#L27-L43) configuration in case of errors - - `enabled`: (default = false) Whether to enable backoff when next consumers return errors + - `enabled`: (default = false) Whether to enable backoff when next consumers return errors - `initial_interval`: The time to wait after the first error before retrying - `max_interval`: The upper bound on backoff interval between consecutive retries - `multiplier`: The value multiplied by the backoff interval bounds @@ -190,7 +195,7 @@ be configured to extract and attach specific headers as resource attributes. e.g ```yaml receivers: kafka: - header_extraction: + header_extraction: extract_headers: true headers: ["header1", "header2"] ``` diff --git a/receiver/kafkareceiver/config.go b/receiver/kafkareceiver/config.go index 03fc49ea10875..df9a7bdaf78fb 100644 --- a/receiver/kafkareceiver/config.go +++ b/receiver/kafkareceiver/config.go @@ -27,6 +27,9 @@ type Config struct { // Traces holds configuration about how traces should be consumed. Traces TopicEncodingConfig `mapstructure:"traces"` + // Profiles holds configuration about how profiles should be consumed. + Profiles TopicEncodingConfig `mapstructure:"profiles"` + // Topic holds the name of the Kafka topic from which to consume data. // // Topic has no default. If explicitly specified, it will take precedence @@ -80,6 +83,9 @@ func (c *Config) Unmarshal(conf *confmap.Conf) error { if zeroConfig.Traces.Topic == "" { c.Traces.Topic = c.Topic } + if zeroConfig.Profiles.Topic == "" { + c.Profiles.Topic = c.Topic + } } if c.Encoding != "" { if zeroConfig.Logs.Encoding == "" { @@ -91,6 +97,9 @@ func (c *Config) Unmarshal(conf *confmap.Conf) error { if zeroConfig.Traces.Encoding == "" { c.Traces.Encoding = c.Encoding } + if zeroConfig.Profiles.Encoding == "" { + c.Profiles.Encoding = c.Encoding + } } // Set OnPermanentError default value to inherit from OnError for backward compatibility @@ -119,6 +128,7 @@ type TopicEncodingConfig struct { // - "otlp_spans" for traces // - "otlp_metrics" for metrics // - "otlp_logs" for logs + // - "otlp_profiles" for profiles Topic string `mapstructure:"topic"` // Encoding holds the expected encoding of messages for the signal type diff --git a/receiver/kafkareceiver/config_test.go b/receiver/kafkareceiver/config_test.go index 724bb0e4fdb12..9629132c4a9e9 100644 --- a/receiver/kafkareceiver/config_test.go +++ b/receiver/kafkareceiver/config_test.go @@ -58,6 +58,10 @@ func TestLoadConfig(t *testing.T) { Topic: "spans", Encoding: "otlp_proto", }, + Profiles: TopicEncodingConfig{ + Topic: "spans", + Encoding: "otlp_proto", + }, Topic: "spans", ErrorBackOff: configretry.BackOffConfig{ Enabled: false, @@ -88,6 +92,10 @@ func TestLoadConfig(t *testing.T) { Topic: "legacy_topic", Encoding: "otlp_proto", }, + Profiles: TopicEncodingConfig{ + Topic: "legacy_topic", + Encoding: "otlp_proto", + }, Topic: "legacy_topic", ErrorBackOff: configretry.BackOffConfig{ Enabled: false, @@ -111,6 +119,10 @@ func TestLoadConfig(t *testing.T) { Topic: "otlp_spans", Encoding: "legacy_encoding", }, + Profiles: TopicEncodingConfig{ + Topic: "otlp_profiles", + Encoding: "legacy_encoding", + }, Encoding: "legacy_encoding", ErrorBackOff: configretry.BackOffConfig{ Enabled: false, @@ -158,6 +170,10 @@ func TestLoadConfig(t *testing.T) { Topic: "otlp_spans", Encoding: "otlp_proto", }, + Profiles: TopicEncodingConfig{ + Topic: "otlp_profiles", + Encoding: "otlp_proto", + }, ErrorBackOff: configretry.BackOffConfig{ Enabled: true, InitialInterval: 1 * time.Second, @@ -189,6 +205,10 @@ func TestLoadConfig(t *testing.T) { Topic: "otlp_spans", Encoding: "otlp_proto", }, + Profiles: TopicEncodingConfig{ + Topic: "otlp_profiles", + Encoding: "otlp_proto", + }, ErrorBackOff: configretry.BackOffConfig{ Enabled: false, }, @@ -211,6 +231,10 @@ func TestLoadConfig(t *testing.T) { Topic: "otlp_spans", Encoding: "otlp_proto", }, + Profiles: TopicEncodingConfig{ + Topic: "otlp_profiles", + Encoding: "otlp_proto", + }, MessageMarking: MessageMarking{ After: true, OnError: true, @@ -238,6 +262,10 @@ func TestLoadConfig(t *testing.T) { Topic: "otlp_spans", Encoding: "otlp_proto", }, + Profiles: TopicEncodingConfig{ + Topic: "otlp_profiles", + Encoding: "otlp_proto", + }, MessageMarking: MessageMarking{ After: false, OnError: false, @@ -265,6 +293,10 @@ func TestLoadConfig(t *testing.T) { Topic: "otlp_spans", Encoding: "otlp_proto", }, + Profiles: TopicEncodingConfig{ + Topic: "otlp_profiles", + Encoding: "otlp_proto", + }, MessageMarking: MessageMarking{ After: true, OnError: true, diff --git a/receiver/kafkareceiver/documentation.md b/receiver/kafkareceiver/documentation.md index 306d31b63e2c1..8b9aff415c790 100644 --- a/receiver/kafkareceiver/documentation.md +++ b/receiver/kafkareceiver/documentation.md @@ -278,6 +278,21 @@ Number of metric points failed to be unmarshaled | topic | The Kafka topic. | Any Str | | partition | The Kafka topic partition. | Any Int | +### otelcol_kafka_receiver_unmarshal_failed_profiles + +Number of profiles failed to be unmarshaled + +| Unit | Metric Type | Value Type | Monotonic | +| ---- | ----------- | ---------- | --------- | +| 1 | Sum | Int | true | + +#### Attributes + +| Name | Description | Values | +| ---- | ----------- | ------ | +| topic | The Kafka topic. | Any Str | +| partition | The Kafka topic partition. | Any Int | + ### otelcol_kafka_receiver_unmarshal_failed_spans Number of spans failed to be unmarshaled diff --git a/receiver/kafkareceiver/encoding.go b/receiver/kafkareceiver/encoding.go index 385e85cf85d52..04c361f7f409b 100644 --- a/receiver/kafkareceiver/encoding.go +++ b/receiver/kafkareceiver/encoding.go @@ -11,6 +11,7 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/pdata/pprofile" "go.opentelemetry.io/collector/pdata/ptrace" "go.opentelemetry.io/collector/receiver" @@ -127,6 +128,24 @@ func loadEncodingExtension[T any](host component.Host, encoding, signalType stri return unmarshaler, nil } +func newProfilesUnmarshaler(encoding string, _ receiver.Settings, host component.Host) (pprofile.Unmarshaler, error) { + // Extensions take precedence. + if unmarshaler, err := loadEncodingExtension[pprofile.Unmarshaler](host, encoding, "profiles"); err != nil { + if !errors.Is(err, errInvalidComponentType) && !errors.Is(err, errUnknownEncodingExtension) { + return nil, err + } + } else { + return unmarshaler, nil + } + switch encoding { + case "otlp_proto": + return &pprofile.ProtoUnmarshaler{}, nil + case "otlp_json": + return &pprofile.JSONUnmarshaler{}, nil + } + return nil, fmt.Errorf("unrecognized profiles encoding %q", encoding) +} + // encodingToComponentID attempts to parse the encoding string as a component ID. func encodingToComponentID(encoding string) (*component.ID, error) { var id component.ID diff --git a/receiver/kafkareceiver/encoding_test.go b/receiver/kafkareceiver/encoding_test.go index 60cc1d60ed907..63092401f34e8 100644 --- a/receiver/kafkareceiver/encoding_test.go +++ b/receiver/kafkareceiver/encoding_test.go @@ -16,12 +16,15 @@ import ( "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/pdata/pprofile" "go.opentelemetry.io/collector/pdata/ptrace" + "go.opentelemetry.io/collector/pdata/testdata" "go.opentelemetry.io/collector/receiver/receivertest" "golang.org/x/text/encoding/unicode" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest/plogtest" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest/pmetrictest" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest/pprofiletest" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest/ptracetest" zipkinthriftconverter "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/zipkin/zipkinthriftconverter" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/zipkin/zipkinv2" @@ -41,6 +44,10 @@ var ( component.Component ptrace.Unmarshaler } + customProfilesUnmarshalerExtension struct { + component.Component + pprofile.Unmarshaler + } ) func TestNewLogsUnmarshaler(t *testing.T) { @@ -376,6 +383,60 @@ func TestNewTracesUnmarshalerExtension(t *testing.T) { assert.Nil(t, u) } +func TestNewProfilesUnmarshaler(t *testing.T) { + profiles := testdata.GenerateProfiles(3) + + otlpProtoProfiles, err := (&pprofile.ProtoMarshaler{}).MarshalProfiles(profiles) + require.NoError(t, err) + otlpJSONProfiles, err := (&pprofile.JSONMarshaler{}).MarshalProfiles(profiles) + require.NoError(t, err) + + for _, tc := range []struct { + encoding string + input []byte + check func(*testing.T, pprofile.Profiles) + }{ + { + encoding: "otlp_proto", + input: otlpProtoProfiles, + check: func(t *testing.T, actual pprofile.Profiles) { + assert.NoError(t, pprofiletest.CompareProfiles(profiles, actual)) + }, + }, + { + encoding: "otlp_json", + input: otlpJSONProfiles, + check: func(t *testing.T, actual pprofile.Profiles) { + assert.NoError(t, pprofiletest.CompareProfiles(profiles, actual)) + }, + }, + } { + t.Run(tc.encoding, func(t *testing.T) { + u := mustNewProfilesUnmarshaler(t, tc.encoding, componenttest.NewNopHost()) + out, err := u.UnmarshalProfiles(tc.input) + require.NoError(t, err) + tc.check(t, out) + }) + } +} + +func TestNewProfilesUnmarshalerExtension(t *testing.T) { + settings := receivertest.NewNopSettings(metadata.Type) + + // Verify extensions take precedence over built-in unmarshalers. + u := mustNewProfilesUnmarshaler(t, "otlp_proto", extensionsHost{ + component.MustNewID("otlp_proto"): &customProfilesUnmarshalerExtension, + }) + assert.Equal(t, &customProfilesUnmarshalerExtension, u) + + // Specifying an extension for a different type should fail fast. + u, err := newProfilesUnmarshaler("not_profiles", settings, extensionsHost{ + component.MustNewID("not_profiles"): &customLogsUnmarshalerExtension, + }) + require.EqualError(t, err, `extension "not_profiles" is not a profiles unmarshaler`) + assert.Nil(t, u) +} + func mustNewLogsUnmarshaler(tb testing.TB, encoding string, host component.Host) plog.Unmarshaler { settings := receivertest.NewNopSettings(metadata.Type) u, err := newLogsUnmarshaler(encoding, settings, host) @@ -397,6 +458,13 @@ func mustNewTracesUnmarshaler(tb testing.TB, encoding string, host component.Hos return u } +func mustNewProfilesUnmarshaler(tb testing.TB, encoding string, host component.Host) pprofile.Unmarshaler { + settings := receivertest.NewNopSettings(metadata.Type) + u, err := newProfilesUnmarshaler(encoding, settings, host) + require.NoError(tb, err) + return u +} + type extensionsHost map[component.ID]component.Component func (h extensionsHost) GetExtensions() map[component.ID]component.Component { diff --git a/receiver/kafkareceiver/factory.go b/receiver/kafkareceiver/factory.go index f1a9c077872eb..f6b2246c2fb36 100644 --- a/receiver/kafkareceiver/factory.go +++ b/receiver/kafkareceiver/factory.go @@ -8,7 +8,9 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/consumer/xconsumer" "go.opentelemetry.io/collector/receiver" + "go.opentelemetry.io/collector/receiver/xreceiver" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/kafka/configkafka" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kafkareceiver/internal/metadata" @@ -23,16 +25,20 @@ const ( defaultTracesTopic = "otlp_spans" defaultTracesEncoding = "otlp_proto" + + defaultProfilesTopic = "otlp_profiles" + defaultProfilesEncoding = "otlp_proto" ) // NewFactory creates Kafka receiver factory. func NewFactory() receiver.Factory { - return receiver.NewFactory( + return xreceiver.NewFactory( metadata.Type, createDefaultConfig, - receiver.WithTraces(createTracesReceiver, metadata.TracesStability), - receiver.WithMetrics(createMetricsReceiver, metadata.MetricsStability), - receiver.WithLogs(createLogsReceiver, metadata.LogsStability), + xreceiver.WithTraces(createTracesReceiver, metadata.TracesStability), + xreceiver.WithMetrics(createMetricsReceiver, metadata.MetricsStability), + xreceiver.WithLogs(createLogsReceiver, metadata.LogsStability), + xreceiver.WithProfiles(createProfilesReceiver, metadata.ProfilesStability), ) } @@ -52,6 +58,10 @@ func createDefaultConfig() component.Config { Topic: defaultTracesTopic, Encoding: defaultTracesEncoding, }, + Profiles: TopicEncodingConfig{ + Topic: defaultProfilesTopic, + Encoding: defaultProfilesEncoding, + }, MessageMarking: MessageMarking{ After: false, OnError: false, @@ -89,3 +99,12 @@ func createLogsReceiver( ) (receiver.Logs, error) { return newLogsReceiver(cfg.(*Config), set, nextConsumer) } + +func createProfilesReceiver( + _ context.Context, + set receiver.Settings, + cfg component.Config, + nextConsumer xconsumer.Profiles, +) (xreceiver.Profiles, error) { + return newProfilesReceiver(cfg.(*Config), set, nextConsumer) +} diff --git a/receiver/kafkareceiver/factory_test.go b/receiver/kafkareceiver/factory_test.go index baab2d08842aa..ed080eeb39460 100644 --- a/receiver/kafkareceiver/factory_test.go +++ b/receiver/kafkareceiver/factory_test.go @@ -10,6 +10,7 @@ import ( "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/receiver/receivertest" + "go.opentelemetry.io/collector/receiver/xreceiver" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/kafka/configkafka" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kafkareceiver/internal/metadata" @@ -124,3 +125,37 @@ func TestWithLogsUnmarshalers(t *testing.T) { assert.NotNil(t, receiver) }) } + +func TestCreateProfiles(t *testing.T) { + cfg := createDefaultConfig().(*Config) + cfg.Brokers = []string{"invalid:9092"} + cfg.ProtocolVersion = "2.0.0" + r, err := createProfilesReceiver(t.Context(), receivertest.NewNopSettings(metadata.Type), cfg, nil) + require.NoError(t, err) + require.NoError(t, r.Start(t.Context(), componenttest.NewNopHost())) + assert.NoError(t, r.Shutdown(t.Context())) +} + +func TestWithProfilesUnmarshalers(t *testing.T) { + f := NewFactory() + + t.Run("custom_encoding", func(t *testing.T) { + cfg := createDefaultConfig().(*Config) + cfg.Profiles.Encoding = "custom" + receiver, err := f.(xreceiver.Factory).CreateProfiles(t.Context(), receivertest.NewNopSettings(metadata.Type), cfg, nil) + profilesConsumer, ok := receiver.(*saramaConsumer) + require.True(t, ok) + require.Equal(t, "custom", profilesConsumer.config.Profiles.Encoding) + require.NoError(t, err) + require.NotNil(t, receiver) + }) + t.Run("default_encoding", func(t *testing.T) { + cfg := createDefaultConfig() + receiver, err := f.(xreceiver.Factory).CreateProfiles(t.Context(), receivertest.NewNopSettings(metadata.Type), cfg, nil) + profilesConsumer, ok := receiver.(*saramaConsumer) + require.True(t, ok) + require.Equal(t, defaultProfilesEncoding, profilesConsumer.config.Profiles.Encoding) + require.NoError(t, err) + assert.NotNil(t, receiver) + }) +} diff --git a/receiver/kafkareceiver/go.mod b/receiver/kafkareceiver/go.mod index 91c5b388d022c..8930485954df8 100644 --- a/receiver/kafkareceiver/go.mod +++ b/receiver/kafkareceiver/go.mod @@ -31,13 +31,16 @@ require ( go.opentelemetry.io/collector/consumer v1.39.0 go.opentelemetry.io/collector/consumer/consumererror v0.133.0 go.opentelemetry.io/collector/consumer/consumertest v0.133.0 + go.opentelemetry.io/collector/consumer/xconsumer v0.133.0 go.opentelemetry.io/collector/exporter v0.133.0 go.opentelemetry.io/collector/featuregate v1.39.0 go.opentelemetry.io/collector/pdata v1.39.0 + go.opentelemetry.io/collector/pdata/pprofile v0.133.0 go.opentelemetry.io/collector/pdata/testdata v0.133.0 go.opentelemetry.io/collector/receiver v1.39.0 go.opentelemetry.io/collector/receiver/receiverhelper v0.133.0 go.opentelemetry.io/collector/receiver/receivertest v0.133.0 + go.opentelemetry.io/collector/receiver/xreceiver v0.133.0 go.opentelemetry.io/otel v1.37.0 go.opentelemetry.io/otel/metric v1.37.0 go.opentelemetry.io/otel/sdk/metric v1.37.0 @@ -109,14 +112,11 @@ require ( go.opentelemetry.io/collector/config/configcompression v1.39.0 // indirect go.opentelemetry.io/collector/config/configopaque v1.39.0 // indirect go.opentelemetry.io/collector/config/configoptional v0.133.0 // indirect - go.opentelemetry.io/collector/consumer/xconsumer v0.133.0 // indirect go.opentelemetry.io/collector/extension v1.39.0 // indirect go.opentelemetry.io/collector/extension/xextension v0.133.0 // indirect go.opentelemetry.io/collector/internal/telemetry v0.133.0 // indirect - go.opentelemetry.io/collector/pdata/pprofile v0.133.0 // indirect go.opentelemetry.io/collector/pdata/xpdata v0.133.0 // indirect go.opentelemetry.io/collector/pipeline v1.39.0 // indirect - go.opentelemetry.io/collector/receiver/xreceiver v0.133.0 // indirect go.opentelemetry.io/contrib/bridges/otelzap v0.12.0 // indirect go.opentelemetry.io/otel/log v0.13.0 // indirect go.opentelemetry.io/otel/sdk v1.37.0 // indirect diff --git a/receiver/kafkareceiver/internal/metadata/generated_status.go b/receiver/kafkareceiver/internal/metadata/generated_status.go index 2ed2339579052..ac65f6ca09e8a 100644 --- a/receiver/kafkareceiver/internal/metadata/generated_status.go +++ b/receiver/kafkareceiver/internal/metadata/generated_status.go @@ -12,7 +12,8 @@ var ( ) const ( - MetricsStability = component.StabilityLevelBeta - LogsStability = component.StabilityLevelBeta - TracesStability = component.StabilityLevelBeta + ProfilesStability = component.StabilityLevelDevelopment + MetricsStability = component.StabilityLevelBeta + LogsStability = component.StabilityLevelBeta + TracesStability = component.StabilityLevelBeta ) diff --git a/receiver/kafkareceiver/internal/metadata/generated_telemetry.go b/receiver/kafkareceiver/internal/metadata/generated_telemetry.go index a43aa18bd125e..9d56a3fbc556d 100644 --- a/receiver/kafkareceiver/internal/metadata/generated_telemetry.go +++ b/receiver/kafkareceiver/internal/metadata/generated_telemetry.go @@ -43,6 +43,7 @@ type TelemetryBuilder struct { KafkaReceiverRecordsDelay metric.Float64Histogram KafkaReceiverUnmarshalFailedLogRecords metric.Int64Counter KafkaReceiverUnmarshalFailedMetricPoints metric.Int64Counter + KafkaReceiverUnmarshalFailedProfiles metric.Int64Counter KafkaReceiverUnmarshalFailedSpans metric.Int64Counter } @@ -180,6 +181,12 @@ func NewTelemetryBuilder(settings component.TelemetrySettings, options ...Teleme metric.WithUnit("1"), ) errs = errors.Join(errs, err) + builder.KafkaReceiverUnmarshalFailedProfiles, err = builder.meter.Int64Counter( + "otelcol_kafka_receiver_unmarshal_failed_profiles", + metric.WithDescription("Number of profiles failed to be unmarshaled"), + metric.WithUnit("1"), + ) + errs = errors.Join(errs, err) builder.KafkaReceiverUnmarshalFailedSpans, err = builder.meter.Int64Counter( "otelcol_kafka_receiver_unmarshal_failed_spans", metric.WithDescription("Number of spans failed to be unmarshaled"), diff --git a/receiver/kafkareceiver/internal/metadatatest/generated_telemetrytest.go b/receiver/kafkareceiver/internal/metadatatest/generated_telemetrytest.go index c1a9d0b4a3ba1..7236a563a5cc9 100644 --- a/receiver/kafkareceiver/internal/metadatatest/generated_telemetrytest.go +++ b/receiver/kafkareceiver/internal/metadatatest/generated_telemetrytest.go @@ -284,6 +284,22 @@ func AssertEqualKafkaReceiverUnmarshalFailedMetricPoints(t *testing.T, tt *compo metricdatatest.AssertEqual(t, want, got, opts...) } +func AssertEqualKafkaReceiverUnmarshalFailedProfiles(t *testing.T, tt *componenttest.Telemetry, dps []metricdata.DataPoint[int64], opts ...metricdatatest.Option) { + want := metricdata.Metrics{ + Name: "otelcol_kafka_receiver_unmarshal_failed_profiles", + Description: "Number of profiles failed to be unmarshaled", + Unit: "1", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: dps, + }, + } + got, err := tt.GetMetric("otelcol_kafka_receiver_unmarshal_failed_profiles") + require.NoError(t, err) + metricdatatest.AssertEqual(t, want, got, opts...) +} + func AssertEqualKafkaReceiverUnmarshalFailedSpans(t *testing.T, tt *componenttest.Telemetry, dps []metricdata.DataPoint[int64], opts ...metricdatatest.Option) { want := metricdata.Metrics{ Name: "otelcol_kafka_receiver_unmarshal_failed_spans", diff --git a/receiver/kafkareceiver/internal/metadatatest/generated_telemetrytest_test.go b/receiver/kafkareceiver/internal/metadatatest/generated_telemetrytest_test.go index 5b4cc4221d052..69af98a591a45 100644 --- a/receiver/kafkareceiver/internal/metadatatest/generated_telemetrytest_test.go +++ b/receiver/kafkareceiver/internal/metadatatest/generated_telemetrytest_test.go @@ -37,6 +37,7 @@ func TestSetupTelemetry(t *testing.T) { tb.KafkaReceiverRecordsDelay.Record(context.Background(), 1) tb.KafkaReceiverUnmarshalFailedLogRecords.Add(context.Background(), 1) tb.KafkaReceiverUnmarshalFailedMetricPoints.Add(context.Background(), 1) + tb.KafkaReceiverUnmarshalFailedProfiles.Add(context.Background(), 1) tb.KafkaReceiverUnmarshalFailedSpans.Add(context.Background(), 1) AssertEqualKafkaBrokerClosed(t, testTel, []metricdata.DataPoint[int64]{{Value: 1}}, @@ -89,6 +90,9 @@ func TestSetupTelemetry(t *testing.T) { AssertEqualKafkaReceiverUnmarshalFailedMetricPoints(t, testTel, []metricdata.DataPoint[int64]{{Value: 1}}, metricdatatest.IgnoreTimestamp()) + AssertEqualKafkaReceiverUnmarshalFailedProfiles(t, testTel, + []metricdata.DataPoint[int64]{{Value: 1}}, + metricdatatest.IgnoreTimestamp()) AssertEqualKafkaReceiverUnmarshalFailedSpans(t, testTel, []metricdata.DataPoint[int64]{{Value: 1}}, metricdatatest.IgnoreTimestamp()) diff --git a/receiver/kafkareceiver/kafka_receiver.go b/receiver/kafkareceiver/kafka_receiver.go index 701aa799fd4ed..ed59fddc03cfa 100644 --- a/receiver/kafkareceiver/kafka_receiver.go +++ b/receiver/kafkareceiver/kafka_receiver.go @@ -13,12 +13,15 @@ import ( "go.opentelemetry.io/collector/config/configretry" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumererror" + "go.opentelemetry.io/collector/consumer/xconsumer" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/pdata/pprofile" "go.opentelemetry.io/collector/pdata/ptrace" "go.opentelemetry.io/collector/receiver" "go.opentelemetry.io/collector/receiver/receiverhelper" + "go.opentelemetry.io/collector/receiver/xreceiver" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric" "go.uber.org/zap" @@ -35,7 +38,7 @@ type newConsumeMessageFunc func(host component.Host, obsrecv *receiverhelper.Obs ) (consumeMessageFunc, error) // messageHandler provides a generic interface for handling messages for a pdata type. -type messageHandler[T plog.Logs | pmetric.Metrics | ptrace.Traces] interface { +type messageHandler[T plog.Logs | pmetric.Metrics | ptrace.Traces | pprofile.Profiles] interface { // unmarshalData unmarshals the message payload into a pdata type (plog.Logs, etc.) // and returns the number of items (log records, metric data points, spans) within it. unmarshalData(data []byte) (T, int, error) @@ -135,6 +138,30 @@ func newTracesReceiver(config *Config, set receiver.Settings, nextConsumer consu return newReceiver(config, set, []string{config.Traces.Topic}, consumeFn) } +func newProfilesReceiver(config *Config, set receiver.Settings, nextConsumer xconsumer.Profiles) (xreceiver.Profiles, error) { + consumeFn := func(host component.Host, + obsrecv *receiverhelper.ObsReport, + telBldr *metadata.TelemetryBuilder, + ) (consumeMessageFunc, error) { + unmarshaler, err := newProfilesUnmarshaler(config.Profiles.Encoding, set, host) + if err != nil { + return nil, err + } + return func(ctx context.Context, message kafkaMessage, attrs attribute.Set) error { + return processMessage(ctx, message, config, set.Logger, telBldr, + &profilesHandler{ + unmarshaler: unmarshaler, + obsrecv: obsrecv, + consumer: nextConsumer, + encoding: config.Profiles.Encoding, + }, + attrs, + ) + }, nil + } + return newReceiver(config, set, []string{config.Profiles.Topic}, consumeFn) +} + func newReceiver( config *Config, set receiver.Settings, @@ -273,8 +300,49 @@ func (*tracesHandler) getUnmarshalFailureCounter(telBldr *metadata.TelemetryBuil return telBldr.KafkaReceiverUnmarshalFailedSpans } +type profilesHandler struct { + unmarshaler pprofile.Unmarshaler + obsrecv *receiverhelper.ObsReport + consumer xconsumer.Profiles + encoding string +} + +func (h *profilesHandler) unmarshalData(data []byte) (pprofile.Profiles, int, error) { + profiles, err := h.unmarshaler.UnmarshalProfiles(data) + if err != nil { + return pprofile.Profiles{}, 0, err + } + return profiles, profiles.SampleCount(), nil +} + +func (h *profilesHandler) consumeData(ctx context.Context, data pprofile.Profiles) error { + return h.consumer.ConsumeProfiles(ctx, data) +} + +func (h *profilesHandler) startObsReport(ctx context.Context) context.Context { + return h.obsrecv.StartTracesOp(ctx) +} + +func (h *profilesHandler) endObsReport(ctx context.Context, n int, err error) { + h.obsrecv.EndTracesOp(ctx, h.encoding, n, err) +} + +func (*profilesHandler) getResources(data pprofile.Profiles) iter.Seq[pcommon.Resource] { + return func(yield func(pcommon.Resource) bool) { + for _, rm := range data.ResourceProfiles().All() { + if !yield(rm.Resource()) { + return + } + } + } +} + +func (*profilesHandler) getUnmarshalFailureCounter(telBldr *metadata.TelemetryBuilder) metric.Int64Counter { + return telBldr.KafkaReceiverUnmarshalFailedProfiles +} + // processMessage is a generic function that processes any KafkaMessage using a messageHandler -func processMessage[T plog.Logs | pmetric.Metrics | ptrace.Traces]( +func processMessage[T plog.Logs | pmetric.Metrics | ptrace.Traces | pprofile.Profiles]( ctx context.Context, message kafkaMessage, config *Config, diff --git a/receiver/kafkareceiver/kafka_receiver_test.go b/receiver/kafkareceiver/kafka_receiver_test.go index ac5d58e8c9ea6..edb6c61ee239f 100644 --- a/receiver/kafkareceiver/kafka_receiver_test.go +++ b/receiver/kafkareceiver/kafka_receiver_test.go @@ -30,6 +30,7 @@ import ( "go.opentelemetry.io/collector/exporter/exporterhelper" "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/pdata/pprofile" "go.opentelemetry.io/collector/pdata/ptrace" "go.opentelemetry.io/collector/pdata/testdata" "go.opentelemetry.io/collector/receiver" @@ -706,6 +707,60 @@ func TestNewMetricsReceiver(t *testing.T) { }) } +func TestNewProfilesReceiver(t *testing.T) { + runTestForClients(t, func(t *testing.T) { + kafkaClient, receiverConfig := mustNewFakeCluster(t, kfake.SeedTopics(1, "otlp_profiles")) + + var sink consumertest.ProfilesSink + receiverConfig.HeaderExtraction.ExtractHeaders = true + receiverConfig.HeaderExtraction.Headers = []string{"key1"} + set, tel, _ := mustNewSettings(t) + r, err := newProfilesReceiver(receiverConfig, set, &sink) + require.NoError(t, err) + + // Send some profiles to the otlp_profiles topic. + profiles := testdata.GenerateProfiles(1) + data, err := (&pprofile.ProtoMarshaler{}).MarshalProfiles(profiles) + require.NoError(t, err) + results := kafkaClient.ProduceSync(t.Context(), + &kgo.Record{ + Topic: "otlp_profiles", + Value: data, + Headers: []kgo.RecordHeader{ + {Key: "key1", Value: []byte("value1")}, + }, + }, + &kgo.Record{Topic: "otlp_profiles", Value: []byte("junk")}, + ) + require.NoError(t, results.FirstErr()) + + err = r.Start(t.Context(), componenttest.NewNopHost()) + require.NoError(t, err) + t.Cleanup(func() { + assert.NoError(t, r.Shutdown(context.Background())) //nolint:usetesting + }) + + // There should be one failed message due to the invalid message payload. + // It may not be available immediately, as the receiver may not have processed it yet. + assert.Eventually(t, func() bool { + _, err := tel.GetMetric("otelcol_kafka_receiver_unmarshal_failed_profiles") + return err == nil + }, 10*time.Second, 100*time.Millisecond) + metadatatest.AssertEqualKafkaReceiverUnmarshalFailedProfiles(t, tel, []metricdata.DataPoint[int64]{{ + Value: 1, + Attributes: attribute.NewSet( + attribute.String("topic", "otlp_profiles"), + attribute.Int64("partition", 0), + ), + }}, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars()) + + // There should be one successfully processed batch of profiles. + assert.Len(t, sink.AllProfiles(), 1) + _, ok := sink.AllProfiles()[0].ResourceProfiles().At(0).Resource().Attributes().Get("kafka.header.key1") + require.True(t, ok) + }) +} + func TestComponentStatus(t *testing.T) { t.Parallel() _, receiverConfig := mustNewFakeCluster(t, kfake.SeedTopics(1, "otlp_spans")) diff --git a/receiver/kafkareceiver/metadata.yaml b/receiver/kafkareceiver/metadata.yaml index f3a615704ac19..1489083276934 100644 --- a/receiver/kafkareceiver/metadata.yaml +++ b/receiver/kafkareceiver/metadata.yaml @@ -4,6 +4,7 @@ status: class: receiver stability: beta: [metrics, logs, traces] + development: [profiles] distributions: - core - contrib @@ -204,3 +205,11 @@ telemetry: value_type: int monotonic: true attributes: [topic, partition] + kafka_receiver_unmarshal_failed_profiles: + enabled: true + description: Number of profiles failed to be unmarshaled + unit: "1" + sum: + value_type: int + monotonic: true + attributes: [topic, partition]