diff --git a/.chloggen/pubsubreceiver-encodingextensions.yaml b/.chloggen/pubsubreceiver-encodingextensions.yaml new file mode 100644 index 000000000000..efbeca18fe5f --- /dev/null +++ b/.chloggen/pubsubreceiver-encodingextensions.yaml @@ -0,0 +1,11 @@ +change_type: enhancement + +component: googlecloudpubsubreceiver + +note: Added support for encoding extensions. + +issues: [37109] + +subtext: + +change_logs: [user] diff --git a/receiver/googlecloudpubsubreceiver/README.md b/receiver/googlecloudpubsubreceiver/README.md index bb8edb3b4dcd..21b4a419b650 100644 --- a/receiver/googlecloudpubsubreceiver/README.md +++ b/receiver/googlecloudpubsubreceiver/README.md @@ -41,29 +41,53 @@ receivers: ## Encoding -You should not need to set the encoding of the subscription as the receiver will try to discover the type of the data -by looking at the `ce-type` and `content-type` attributes of the message. Only when those attributes are not set -must the `encoding` field in the configuration be set. +The `encoding` options allows you to specify Encoding Extensions for decoding messages on the subscription. An +extension need to be configured in the `extensions` section, and added to pipeline in the collectors configuration file. -| ce-type | ce-datacontenttype | encoding | description | -|-----------------------------------|----------------------|-------------------|------------------------------------------------| -| org.opentelemetry.otlp.traces.v1 | application/protobuf | | Decode OTLP trace message | -| org.opentelemetry.otlp.metrics.v1 | application/protobuf | | Decode OTLP metric message | -| org.opentelemetry.otlp.logs.v1 | application/json | | Decode OTLP log message | -| - | - | otlp_proto_trace | Decode OTLP trace message | -| - | - | otlp_proto_metric | Decode OTLP trace message | -| - | - | otlp_proto_log | Decode OTLP trace message | -| - | - | cloud_logging | Decode [Cloud Logging] [LogEntry] message type | -| - | - | raw_text | Wrap in an OTLP log message | +The following example shows how to use the text encoding extension for ingesting arbitrary text message on a +subscription, wrapping them in OTLP Log messages. Note that not all extensions support all signals. + +```yaml +extensions: + text_encoding: + encoding: utf8 + unmarshaling_separator: "\r?\n" + +service: + extensions: [text_encoding] + pipelines: + logs: + receivers: [googlecloudpubsub] + processors: [] + exporters: [debug] +``` -When the `encoding` configuration is set, the attributes on the message are ignored. +The receiver also supports build in encodings for the native OTLP encodings, without the need to specify an Encoding +Extensions. The non OTLP build in encodings will be deprecated as soon as extensions for the formats are available. + +| encoding | description | +|-------------------|------------------------------------------------| +| otlp_proto_trace | Decode OTLP trace message | +| otlp_proto_metric | Decode OTLP trace message | +| otlp_proto_log | Decode OTLP trace message | +| cloud_logging | Decode [Cloud Logging] [LogEntry] message type | +| raw_text | Wrap in an OTLP log message | With `cloud_logging`, the receiver can be used to bring Cloud Logging messages into an OpenTelemetry pipeline. You'll first need to [set up a logging sink][sink-docs] with a Pub/Sub topic as its destination. Note that the `cloud_logging` integration is considered **alpha** as the semantic convention on some of the conversion are not stabilized yet. With `raw_text`, the receiver can be used for ingesting arbitrary text message on a Pubsub subscription, wrapping them -in OTLP Log messages, making it a convenient way to ingest raw log lines from Pubsub. +in OTLP Log messages. + +When no encoding is specified, the receiver will try to discover the type of the data by looking at the `ce-type` and +`content-type` attributes of the message. These message attributes are set by the `googlepubsubexporter`. + +| ce-type | ce-datacontenttype | encoding | description | +|-----------------------------------|----------------------|-------------------|------------------------------------------------| +| org.opentelemetry.otlp.traces.v1 | application/protobuf | | Decode OTLP trace message | +| org.opentelemetry.otlp.metrics.v1 | application/protobuf | | Decode OTLP metric message | +| org.opentelemetry.otlp.logs.v1 | application/protobuf | | Decode OTLP log message | [Cloud Logging]: https://cloud.google.com/logging [LogEntry]: https://cloud.google.com/logging/docs/reference/v2/rest/v2/LogEntry diff --git a/receiver/googlecloudpubsubreceiver/config.go b/receiver/googlecloudpubsubreceiver/config.go index 8dbdb8b9a3e7..d167a29dd63b 100644 --- a/receiver/googlecloudpubsubreceiver/config.go +++ b/receiver/googlecloudpubsubreceiver/config.go @@ -35,51 +35,6 @@ type Config struct { ClientID string `mapstructure:"client_id"` } -func (config *Config) validateForLog() error { - err := config.validate() - if err != nil { - return err - } - switch config.Encoding { - case "": - case "otlp_proto_log": - case "raw_text": - case "raw_json": - case "cloud_logging": - default: - return fmt.Errorf("log encoding %v is not supported. supported encoding formats include [otlp_proto_log,raw_text,raw_json,cloud_logging]", config.Encoding) - } - return nil -} - -func (config *Config) validateForTrace() error { - err := config.validate() - if err != nil { - return err - } - switch config.Encoding { - case "": - case "otlp_proto_trace": - default: - return fmt.Errorf("trace encoding %v is not supported. supported encoding formats include [otlp_proto_trace]", config.Encoding) - } - return nil -} - -func (config *Config) validateForMetric() error { - err := config.validate() - if err != nil { - return err - } - switch config.Encoding { - case "": - case "otlp_proto_metric": - default: - return fmt.Errorf("metric encoding %v is not supported. supported encoding formats include [otlp_proto_metric]", config.Encoding) - } - return nil -} - func (config *Config) validate() error { if !subscriptionMatcher.MatchString(config.Subscription) { return fmt.Errorf("subscription '%s' is not a valid format, use 'projects//subscriptions/'", config.Subscription) diff --git a/receiver/googlecloudpubsubreceiver/config_test.go b/receiver/googlecloudpubsubreceiver/config_test.go index 7dfb798ab6be..6b86acc14bb4 100644 --- a/receiver/googlecloudpubsubreceiver/config_test.go +++ b/receiver/googlecloudpubsubreceiver/config_test.go @@ -63,9 +63,6 @@ func TestLoadConfig(t *testing.T) { func TestConfigValidation(t *testing.T) { factory := NewFactory() c := factory.CreateDefaultConfig().(*Config) - assert.Error(t, c.validateForTrace()) - assert.Error(t, c.validateForLog()) - assert.Error(t, c.validateForMetric()) c.Subscription = "projects/000project/subscriptions/my-subscription" assert.Error(t, c.validate()) c.Subscription = "projects/my-project/topics/my-topic" @@ -73,60 +70,3 @@ func TestConfigValidation(t *testing.T) { c.Subscription = "projects/my-project/subscriptions/my-subscription" assert.NoError(t, c.validate()) } - -func TestTraceConfigValidation(t *testing.T) { - factory := NewFactory() - c := factory.CreateDefaultConfig().(*Config) - c.Subscription = "projects/my-project/subscriptions/my-subscription" - assert.NoError(t, c.validateForTrace()) - - c.Encoding = "otlp_proto_metric" - assert.Error(t, c.validateForTrace()) - c.Encoding = "otlp_proto_log" - assert.Error(t, c.validateForTrace()) - c.Encoding = "raw_text" - assert.Error(t, c.validateForTrace()) - c.Encoding = "raw_json" - assert.Error(t, c.validateForTrace()) - - c.Encoding = "otlp_proto_trace" - assert.NoError(t, c.validateForTrace()) -} - -func TestMetricConfigValidation(t *testing.T) { - factory := NewFactory() - c := factory.CreateDefaultConfig().(*Config) - c.Subscription = "projects/my-project/subscriptions/my-subscription" - assert.NoError(t, c.validateForMetric()) - - c.Encoding = "otlp_proto_trace" - assert.Error(t, c.validateForMetric()) - c.Encoding = "otlp_proto_log" - assert.Error(t, c.validateForMetric()) - c.Encoding = "raw_text" - assert.Error(t, c.validateForMetric()) - c.Encoding = "raw_json" - assert.Error(t, c.validateForMetric()) - - c.Encoding = "otlp_proto_metric" - assert.NoError(t, c.validateForMetric()) -} - -func TestLogConfigValidation(t *testing.T) { - factory := NewFactory() - c := factory.CreateDefaultConfig().(*Config) - c.Subscription = "projects/my-project/subscriptions/my-subscription" - assert.NoError(t, c.validateForLog()) - - c.Encoding = "otlp_proto_trace" - assert.Error(t, c.validateForLog()) - c.Encoding = "otlp_proto_metric" - assert.Error(t, c.validateForLog()) - - c.Encoding = "raw_text" - assert.NoError(t, c.validateForLog()) - c.Encoding = "raw_json" - assert.NoError(t, c.validateForLog()) - c.Encoding = "otlp_proto_log" - assert.NoError(t, c.validateForLog()) -} diff --git a/receiver/googlecloudpubsubreceiver/factory.go b/receiver/googlecloudpubsubreceiver/factory.go index 96ccc49d5814..802718a55fb4 100644 --- a/receiver/googlecloudpubsubreceiver/factory.go +++ b/receiver/googlecloudpubsubreceiver/factory.go @@ -71,7 +71,7 @@ func (factory *pubsubReceiverFactory) CreateTraces( cfg component.Config, consumer consumer.Traces, ) (receiver.Traces, error) { - err := cfg.(*Config).validateForTrace() + err := cfg.(*Config).validate() if err != nil { return nil, err } @@ -89,7 +89,7 @@ func (factory *pubsubReceiverFactory) CreateMetrics( cfg component.Config, consumer consumer.Metrics, ) (receiver.Metrics, error) { - err := cfg.(*Config).validateForMetric() + err := cfg.(*Config).validate() if err != nil { return nil, err } @@ -107,7 +107,7 @@ func (factory *pubsubReceiverFactory) CreateLogs( cfg component.Config, consumer consumer.Logs, ) (receiver.Logs, error) { - err := cfg.(*Config).validateForLog() + err := cfg.(*Config).validate() if err != nil { return nil, err } diff --git a/receiver/googlecloudpubsubreceiver/go.mod b/receiver/googlecloudpubsubreceiver/go.mod index f0a8fd4c2845..ce68b9893147 100644 --- a/receiver/googlecloudpubsubreceiver/go.mod +++ b/receiver/googlecloudpubsubreceiver/go.mod @@ -9,6 +9,7 @@ require ( github.com/googleapis/gax-go/v2 v2.14.1 github.com/iancoleman/strcase v0.3.0 github.com/json-iterator/go v1.1.12 + github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding v0.114.0 github.com/stretchr/testify v1.10.0 go.opentelemetry.io/collector/component v0.118.1-0.20250121185328-fbefb22cc2b3 go.opentelemetry.io/collector/component/componenttest v0.118.1-0.20250121185328-fbefb22cc2b3 @@ -37,7 +38,7 @@ require ( cloud.google.com/go/iam v1.2.2 // indirect cloud.google.com/go/longrunning v0.6.2 // indirect github.com/cenkalti/backoff/v4 v4.3.0 // indirect - github.com/davecgh/go-spew v1.1.1 // indirect + github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/felixge/httpsnoop v1.0.4 // indirect github.com/go-logr/logr v1.4.2 // indirect github.com/go-logr/stdr v1.2.2 // indirect @@ -55,7 +56,7 @@ require ( github.com/mitchellh/reflectwalk v1.0.2 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect - github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect go.einride.tech/aip v0.68.0 // indirect go.opencensus.io v0.24.0 // indirect go.opentelemetry.io/auto/sdk v1.1.0 // indirect @@ -92,3 +93,5 @@ retract ( v0.76.1 v0.65.0 ) + +replace github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding => ../../extension/encoding diff --git a/receiver/googlecloudpubsubreceiver/go.sum b/receiver/googlecloudpubsubreceiver/go.sum index 63a63dac35fe..838a13d37be6 100644 --- a/receiver/googlecloudpubsubreceiver/go.sum +++ b/receiver/googlecloudpubsubreceiver/go.sum @@ -22,8 +22,9 @@ github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= +github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98= @@ -100,8 +101,9 @@ github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= -github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= +github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII= github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o= diff --git a/receiver/googlecloudpubsubreceiver/internal/log_entry.go b/receiver/googlecloudpubsubreceiver/internal/log_entry.go index 282ed890e6f3..8ebc3f2fd9a3 100644 --- a/receiver/googlecloudpubsubreceiver/internal/log_entry.go +++ b/receiver/googlecloudpubsubreceiver/internal/log_entry.go @@ -5,7 +5,6 @@ package internal // import "github.com/open-telemetry/opentelemetry-collector-co import ( "bytes" - "context" "encoding/hex" stdjson "encoding/json" "errors" @@ -20,7 +19,6 @@ import ( jsoniter "github.com/json-iterator/go" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/plog" - "go.uber.org/zap" "google.golang.org/genproto/googleapis/api/monitoredres" "google.golang.org/protobuf/encoding/protojson" "google.golang.org/protobuf/reflect/protoreflect" @@ -113,7 +111,7 @@ func getLogEntryDescriptor() protoreflect.MessageDescriptor { // schema; this ensures that a numeric value in the input is correctly // translated to either an integer or a double in the output. It falls back to // plain JSON decoding if payload type is not available in the proto registry. -func TranslateLogEntry(_ context.Context, _ *zap.Logger, data []byte) (pcommon.Resource, plog.LogRecord, error) { +func TranslateLogEntry(data []byte) (pcommon.Resource, plog.LogRecord, error) { lr := plog.NewLogRecord() res := pcommon.NewResource() diff --git a/receiver/googlecloudpubsubreceiver/internal/log_entry_test.go b/receiver/googlecloudpubsubreceiver/internal/log_entry_test.go index a5b959b06013..5eef975189f0 100644 --- a/receiver/googlecloudpubsubreceiver/internal/log_entry_test.go +++ b/receiver/googlecloudpubsubreceiver/internal/log_entry_test.go @@ -4,7 +4,6 @@ package internal import ( - "context" "fmt" "testing" "time" @@ -15,7 +14,6 @@ import ( "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/plog" "go.uber.org/multierr" - "go.uber.org/zap" ) type Log struct { @@ -70,15 +68,12 @@ func TestTranslateLogEntry(t *testing.T) { }{ // TODO: Add publicly shareable log test data. } - - logger, _ := zap.NewDevelopment() - for _, tt := range tests { var errs error wantRes, wantLr, err := generateLog(t, tt.want) errs = multierr.Append(errs, err) - gotRes, gotLr, err := TranslateLogEntry(context.TODO(), logger, []byte(tt.input)) + gotRes, gotLr, err := TranslateLogEntry([]byte(tt.input)) errs = multierr.Append(errs, err) errs = multierr.Combine(errs, compareResources(wantRes, gotRes), compareLogRecords(wantLr, gotLr)) diff --git a/receiver/googlecloudpubsubreceiver/receiver.go b/receiver/googlecloudpubsubreceiver/receiver.go index caecafc0135a..9fb36f6b1d6d 100644 --- a/receiver/googlecloudpubsubreceiver/receiver.go +++ b/receiver/googlecloudpubsubreceiver/receiver.go @@ -24,6 +24,7 @@ import ( "go.opentelemetry.io/collector/receiver/receiverhelper" "go.uber.org/zap" + "github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/googlecloudpubsubreceiver/internal" ) @@ -44,29 +45,71 @@ type pubsubReceiver struct { startOnce sync.Once } -type encoding int +type buildInEncoding int const ( - unknown encoding = iota - otlpProtoTrace = iota - otlpProtoMetric = iota - otlpProtoLog = iota - rawTextLog = iota - cloudLogging = iota + unknown buildInEncoding = iota + otlpProtoTrace = iota + otlpProtoMetric = iota + otlpProtoLog = iota + rawTextLog = iota + cloudLogging = iota ) -type compression int +type buildInCompression int const ( - uncompressed compression = iota - gZip = iota + uncompressed buildInCompression = iota + gZip = iota ) -func (receiver *pubsubReceiver) Start(ctx context.Context, _ component.Host) error { +// consumerCount returns the number of attached consumers, useful for detecting errors in pipelines +func (receiver *pubsubReceiver) consumerCount() int { + count := 0 + if receiver.logsConsumer != nil { + count++ + } + if receiver.metricsConsumer != nil { + count++ + } + if receiver.tracesConsumer != nil { + count++ + } + return count +} + +func (receiver *pubsubReceiver) Start(ctx context.Context, host component.Host) error { if receiver.tracesConsumer == nil && receiver.metricsConsumer == nil && receiver.logsConsumer == nil { return errors.New("cannot start receiver: no consumers were specified") } + var createHandlerFn func(context.Context) error + + if receiver.config.Encoding != "" { + if receiver.consumerCount() > 1 { + return errors.New("cannot start receiver: multiple consumers were attached, but encoding was specified") + } + encodingID := convertEncoding(receiver.config.Encoding) + if encodingID == unknown { + err := receiver.setMarshallerFromExtension(host) + if err != nil { + return err + } + } else { + err := receiver.setMarshallerFromEncodingID(encodingID) + if err != nil { + return err + } + } + createHandlerFn = receiver.createReceiverHandler + } else { + // we will rely on the attributes of the message to determine the signal, so we need all proto unmarshalers + receiver.tracesUnmarshaler = &ptrace.ProtoUnmarshaler{} + receiver.metricsUnmarshaler = &pmetric.ProtoUnmarshaler{} + receiver.logsUnmarshaler = &plog.ProtoUnmarshaler{} + createHandlerFn = receiver.createMultiplexingReceiverHandler + } + var startErr error receiver.startOnce.Do(func() { client, err := newSubscriberClient(ctx, receiver.config, receiver.userAgent) @@ -76,18 +119,79 @@ func (receiver *pubsubReceiver) Start(ctx context.Context, _ component.Host) err } receiver.client = client - err = receiver.createReceiverHandler(ctx) + err = createHandlerFn(ctx) if err != nil { startErr = fmt.Errorf("failed to create ReceiverHandler: %w", err) return } }) - receiver.tracesUnmarshaler = &ptrace.ProtoUnmarshaler{} - receiver.metricsUnmarshaler = &pmetric.ProtoUnmarshaler{} - receiver.logsUnmarshaler = &plog.ProtoUnmarshaler{} return startErr } +func (receiver *pubsubReceiver) setMarshallerFromExtension(host component.Host) error { + extensionID := component.ID{} + err := extensionID.UnmarshalText([]byte(receiver.config.Encoding)) + if err != nil { + return errors.New("cannot start receiver: neither a build in encoder, or an extension") + } + extensions := host.GetExtensions() + if extension, ok := extensions[extensionID]; ok { + if receiver.tracesConsumer != nil { + receiver.tracesUnmarshaler, ok = extension.(encoding.TracesUnmarshalerExtension) + if !ok { + return fmt.Errorf("cannot start receiver: extension %q is not a trace unmarshaler", extensionID) + } + } + if receiver.logsConsumer != nil { + receiver.logsUnmarshaler, ok = extension.(encoding.LogsUnmarshalerExtension) + if !ok { + return fmt.Errorf("cannot start receiver: extension %q is not a logs unmarshaler", extensionID) + } + } + if receiver.metricsConsumer != nil { + receiver.metricsUnmarshaler, ok = extension.(encoding.MetricsUnmarshalerExtension) + if !ok { + return fmt.Errorf("cannot start receiver: extension %q is not a metrics unmarshaler", extensionID) + } + } + } else { + return fmt.Errorf("cannot start receiver: extension %q not found", extensionID) + } + return nil +} + +func (receiver *pubsubReceiver) setMarshallerFromEncodingID(encodingID buildInEncoding) error { + if receiver.tracesConsumer != nil { + switch encodingID { + case otlpProtoTrace: + receiver.tracesUnmarshaler = &ptrace.ProtoUnmarshaler{} + default: + return fmt.Errorf("cannot start receiver: build in encoding %s is not supported for traces", receiver.config.Encoding) + } + } + if receiver.logsConsumer != nil { + switch encodingID { + case otlpProtoLog: + receiver.logsUnmarshaler = &plog.ProtoUnmarshaler{} + case rawTextLog: + receiver.logsUnmarshaler = unmarshalLogStrings{} + case cloudLogging: + receiver.logsUnmarshaler = unmarshalCloudLoggingLogEntry{} + default: + return fmt.Errorf("cannot start receiver: build in encoding %s is not supported for logs", receiver.config.Encoding) + } + } + if receiver.metricsConsumer != nil { + switch encodingID { + case otlpProtoMetric: + receiver.metricsUnmarshaler = &pmetric.ProtoUnmarshaler{} + default: + return fmt.Errorf("cannot start receiver: build in encoding %s is not supported for metrics", receiver.config.Encoding) + } + } + return nil +} + func (receiver *pubsubReceiver) Shutdown(_ context.Context) error { if receiver.handler != nil { receiver.logger.Info("Stopping Google Pubsub receiver") @@ -103,13 +207,9 @@ func (receiver *pubsubReceiver) Shutdown(_ context.Context) error { return client.Close() } -func (receiver *pubsubReceiver) handleLogStrings(ctx context.Context, message *pubsubpb.ReceivedMessage) error { - if receiver.logsConsumer == nil { - return nil - } - data := string(message.Message.Data) - timestamp := message.GetMessage().PublishTime +type unmarshalLogStrings struct{} +func (unmarshalLogStrings) UnmarshalLogs(data []byte) (plog.Logs, error) { out := plog.NewLogs() logs := out.ResourceLogs() rls := logs.AppendEmpty() @@ -117,22 +217,34 @@ func (receiver *pubsubReceiver) handleLogStrings(ctx context.Context, message *p ills := rls.ScopeLogs().AppendEmpty() lr := ills.LogRecords().AppendEmpty() - lr.Body().SetStr(data) - lr.SetTimestamp(pcommon.NewTimestampFromTime(timestamp.AsTime())) + lr.Body().SetStr(string(data)) + return out, nil +} + +func (receiver *pubsubReceiver) handleLogStrings(ctx context.Context, payload []byte) error { + if receiver.logsConsumer == nil { + return nil + } + unmarshall := unmarshalLogStrings{} + out, err := unmarshall.UnmarshalLogs(payload) + if err != nil { + return err + } return receiver.logsConsumer.ConsumeLogs(ctx, out) } -func (receiver *pubsubReceiver) handleCloudLoggingLogEntry(ctx context.Context, message *pubsubpb.ReceivedMessage) error { - resource, lr, err := internal.TranslateLogEntry(ctx, receiver.logger, message.Message.Data) +type unmarshalCloudLoggingLogEntry struct{} + +func (unmarshalCloudLoggingLogEntry) UnmarshalLogs(data []byte) (plog.Logs, error) { + resource, lr, err := internal.TranslateLogEntry(data) + out := plog.NewLogs() lr.SetObservedTimestamp(pcommon.NewTimestampFromTime(time.Now())) if err != nil { - receiver.logger.Error("got an error", zap.Error(err)) - return err + return out, err } - out := plog.NewLogs() logs := out.ResourceLogs() rls := logs.AppendEmpty() resource.CopyTo(rls.Resource()) @@ -140,10 +252,10 @@ func (receiver *pubsubReceiver) handleCloudLoggingLogEntry(ctx context.Context, ills := rls.ScopeLogs().AppendEmpty() lr.CopyTo(ills.LogRecords().AppendEmpty()) - return receiver.logsConsumer.ConsumeLogs(ctx, out) + return out, nil } -func decompress(payload []byte, compression compression) ([]byte, error) { +func decompress(payload []byte, compression buildInCompression) ([]byte, error) { if compression == gZip { reader, err := gzip.NewReader(bytes.NewReader(payload)) if err != nil { @@ -154,7 +266,7 @@ func decompress(payload []byte, compression compression) ([]byte, error) { return payload, nil } -func (receiver *pubsubReceiver) handleTrace(ctx context.Context, payload []byte, compression compression) error { +func (receiver *pubsubReceiver) handleTrace(ctx context.Context, payload []byte, compression buildInCompression) error { payload, err := decompress(payload, compression) if err != nil { return err @@ -170,7 +282,7 @@ func (receiver *pubsubReceiver) handleTrace(ctx context.Context, payload []byte, return nil } -func (receiver *pubsubReceiver) handleMetric(ctx context.Context, payload []byte, compression compression) error { +func (receiver *pubsubReceiver) handleMetric(ctx context.Context, payload []byte, compression buildInCompression) error { payload, err := decompress(payload, compression) if err != nil { return err @@ -186,7 +298,7 @@ func (receiver *pubsubReceiver) handleMetric(ctx context.Context, payload []byte return nil } -func (receiver *pubsubReceiver) handleLog(ctx context.Context, payload []byte, compression compression) error { +func (receiver *pubsubReceiver) handleLog(ctx context.Context, payload []byte, compression buildInCompression) error { payload, err := decompress(payload, compression) if err != nil { return err @@ -202,9 +314,9 @@ func (receiver *pubsubReceiver) handleLog(ctx context.Context, payload []byte, c return nil } -func (receiver *pubsubReceiver) detectEncoding(attributes map[string]string) (encoding, compression) { - otlpEncoding := unknown - otlpCompression := uncompressed +func (receiver *pubsubReceiver) detectEncoding(attributes map[string]string) (otlpEncoding buildInEncoding, otlpCompression buildInCompression) { + otlpEncoding = unknown + otlpCompression = uncompressed ceType := attributes["ce-type"] ceContentType := attributes["content-type"] @@ -222,18 +334,7 @@ func (receiver *pubsubReceiver) detectEncoding(attributes map[string]string) (en } if otlpEncoding == unknown && receiver.config.Encoding != "" { - switch receiver.config.Encoding { - case "otlp_proto_trace": - otlpEncoding = otlpProtoTrace - case "otlp_proto_metric": - otlpEncoding = otlpProtoMetric - case "otlp_proto_log": - otlpEncoding = otlpProtoLog - case "cloud_logging": - otlpEncoding = cloudLogging - case "raw_text": - otlpEncoding = rawTextLog - } + otlpEncoding = convertEncoding(receiver.config.Encoding) } ceContentEncoding := attributes["content-encoding"] @@ -246,10 +347,26 @@ func (receiver *pubsubReceiver) detectEncoding(attributes map[string]string) (en otlpCompression = gZip } } - return otlpEncoding, otlpCompression + return } -func (receiver *pubsubReceiver) createReceiverHandler(ctx context.Context) error { +func convertEncoding(encodingConfig string) (encoding buildInEncoding) { + switch encodingConfig { + case "otlp_proto_trace": + return otlpProtoTrace + case "otlp_proto_metric": + return otlpProtoMetric + case "otlp_proto_log": + return otlpProtoLog + case "cloud_logging": + return cloudLogging + case "raw_text": + return rawTextLog + } + return unknown +} + +func (receiver *pubsubReceiver) createMultiplexingReceiverHandler(ctx context.Context) error { var err error receiver.handler, err = internal.NewHandler( ctx, @@ -274,16 +391,14 @@ func (receiver *pubsubReceiver) createReceiverHandler(ctx context.Context) error if receiver.logsConsumer != nil { return receiver.handleLog(ctx, payload, compression) } - case cloudLogging: + case rawTextLog: if receiver.logsConsumer != nil { - return receiver.handleCloudLoggingLogEntry(ctx, message) + return receiver.handleLogStrings(ctx, payload) } - case rawTextLog: - return receiver.handleLogStrings(ctx, message) - case unknown: + default: return errors.New("unknown encoding") } - return errors.New("unknown encoding") + return nil }) if err != nil { return err @@ -291,3 +406,40 @@ func (receiver *pubsubReceiver) createReceiverHandler(ctx context.Context) error receiver.handler.RecoverableStream(ctx) return nil } + +func (receiver *pubsubReceiver) createReceiverHandler(ctx context.Context) error { + var err error + var handlerFn func(context.Context, *pubsubpb.ReceivedMessage) error + compression := uncompressed + if receiver.tracesConsumer != nil { + handlerFn = func(ctx context.Context, message *pubsubpb.ReceivedMessage) error { + payload := message.Message.Data + return receiver.handleTrace(ctx, payload, compression) + } + } + if receiver.logsConsumer != nil { + handlerFn = func(ctx context.Context, message *pubsubpb.ReceivedMessage) error { + payload := message.Message.Data + return receiver.handleLog(ctx, payload, compression) + } + } + if receiver.metricsConsumer != nil { + handlerFn = func(ctx context.Context, message *pubsubpb.ReceivedMessage) error { + payload := message.Message.Data + return receiver.handleMetric(ctx, payload, compression) + } + } + + receiver.handler, err = internal.NewHandler( + ctx, + receiver.logger, + receiver.client, + receiver.config.ClientID, + receiver.config.Subscription, + handlerFn) + if err != nil { + return err + } + receiver.handler.RecoverableStream(ctx) + return nil +} diff --git a/receiver/googlecloudpubsubreceiver/receiver_test.go b/receiver/googlecloudpubsubreceiver/receiver_test.go index 01ed07c3f7ae..72eca61b1315 100644 --- a/receiver/googlecloudpubsubreceiver/receiver_test.go +++ b/receiver/googlecloudpubsubreceiver/receiver_test.go @@ -15,6 +15,7 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/exporter/exporterhelper" + "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/receiver/receiverhelper" "go.opentelemetry.io/collector/receiver/receivertest" "go.uber.org/zap" @@ -24,13 +25,10 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/googlecloudpubsubreceiver/testdata" ) -func TestStartReceiverNoSubscription(t *testing.T) { - ctx := context.Background() - // Start a fake server running locally. +func createBaseReceiver() (*pstest.Server, *pubsubReceiver) { srv := pstest.NewServer() - defer srv.Close() core, _ := observer.New(zap.WarnLevel) - receiver := &pubsubReceiver{ + return srv, &pubsubReceiver{ logger: zap.New(core), userAgent: "test-user-agent", @@ -44,15 +42,46 @@ func TestStartReceiverNoSubscription(t *testing.T) { Subscription: "projects/my-project/subscriptions/otlp", }, } +} + +type fakeUnmarshalLog struct{} + +func (fakeUnmarshalLog) Start(_ context.Context, _ component.Host) error { + return nil +} + +func (fakeUnmarshalLog) Shutdown(_ context.Context) error { + return nil +} + +func (fakeUnmarshalLog) UnmarshalLogs(_ []byte) (plog.Logs, error) { + return plog.Logs{}, nil +} + +type fakeHost struct{} + +func (fakeHost) GetExtensions() map[component.ID]component.Component { + ext := make(map[component.ID]component.Component) + extensionID := component.ID{} + _ = extensionID.UnmarshalText([]byte("text_encoding")) + ext[extensionID] = fakeUnmarshalLog{} + return ext +} + +func TestStartReceiverNoSubscription(t *testing.T) { + ctx := context.Background() + srv, receiver := createBaseReceiver() defer func() { + assert.NoError(t, srv.Close()) assert.NoError(t, receiver.Shutdown(ctx)) }() + receiver.tracesConsumer = consumertest.NewNop() receiver.metricsConsumer = consumertest.NewNop() receiver.logsConsumer = consumertest.NewNop() // No error is thrown as the stream is handled async, // no locks should be kept though - assert.NoError(t, receiver.Start(ctx, nil)) + assert.NoError(t, receiver.Start(ctx, fakeHost{})) } func TestReceiver(t *testing.T) { @@ -103,14 +132,14 @@ func TestReceiver(t *testing.T) { metricsConsumer: metricSink, logsConsumer: logSink, } - assert.NoError(t, receiver.Start(ctx, nil)) + assert.NoError(t, receiver.Start(ctx, fakeHost{})) receiver.tracesConsumer = traceSink receiver.metricsConsumer = metricSink receiver.logsConsumer = logSink // No error is thrown as the stream is handled async, // no locks should be kept though - assert.NoError(t, receiver.Start(ctx, nil)) + assert.NoError(t, receiver.Start(ctx, fakeHost{})) time.Sleep(1 * time.Second) @@ -156,3 +185,125 @@ func TestReceiver(t *testing.T) { assert.NoError(t, receiver.Shutdown(ctx)) assert.NoError(t, receiver.Shutdown(ctx)) } + +func TestEncodingMultipleConsumersForAnEncoding(t *testing.T) { + ctx := context.Background() + srv, receiver := createBaseReceiver() + defer func() { + assert.NoError(t, srv.Close()) + assert.NoError(t, receiver.Shutdown(ctx)) + }() + + receiver.tracesConsumer = consumertest.NewNop() + receiver.metricsConsumer = consumertest.NewNop() + receiver.logsConsumer = consumertest.NewNop() + receiver.config.Encoding = "foo" + assert.ErrorContains(t, receiver.Start(ctx, fakeHost{}), "multiple consumers were attached") +} + +func TestEncodingBuildInProtoTrace(t *testing.T) { + ctx := context.Background() + srv, receiver := createBaseReceiver() + defer func() { + assert.NoError(t, srv.Close()) + assert.NoError(t, receiver.Shutdown(ctx)) + }() + + receiver.tracesConsumer = consumertest.NewNop() + receiver.config.Encoding = "otlp_proto_trace" + + assert.NoError(t, receiver.Start(ctx, fakeHost{})) + assert.NotNil(t, receiver.tracesConsumer) + assert.Nil(t, receiver.metricsConsumer) + assert.Nil(t, receiver.logsConsumer) +} + +func TestEncodingBuildInProtoMetric(t *testing.T) { + ctx := context.Background() + srv, receiver := createBaseReceiver() + defer func() { + assert.NoError(t, srv.Close()) + assert.NoError(t, receiver.Shutdown(ctx)) + }() + + receiver.metricsConsumer = consumertest.NewNop() + receiver.config.Encoding = "otlp_proto_metric" + + assert.NoError(t, receiver.Start(ctx, fakeHost{})) + assert.Nil(t, receiver.tracesConsumer) + assert.NotNil(t, receiver.metricsConsumer) + assert.Nil(t, receiver.logsConsumer) +} + +func TestEncodingBuildInProtoLog(t *testing.T) { + ctx := context.Background() + srv, receiver := createBaseReceiver() + defer func() { + assert.NoError(t, srv.Close()) + assert.NoError(t, receiver.Shutdown(ctx)) + }() + + receiver.logsConsumer = consumertest.NewNop() + receiver.config.Encoding = "otlp_proto_log" + + assert.NoError(t, receiver.Start(ctx, fakeHost{})) + assert.Nil(t, receiver.tracesConsumer) + assert.Nil(t, receiver.metricsConsumer) + assert.NotNil(t, receiver.logsConsumer) +} + +func TestEncodingConsumerMismatch(t *testing.T) { + ctx := context.Background() + srv, receiver := createBaseReceiver() + defer func() { + assert.NoError(t, srv.Close()) + assert.NoError(t, receiver.Shutdown(ctx)) + }() + + receiver.tracesConsumer = consumertest.NewNop() + receiver.config.Encoding = "otlp_proto_log" + + assert.ErrorContains(t, receiver.Start(ctx, fakeHost{}), "build in encoding otlp_proto_log is not supported for traces") +} + +func TestEncodingNotFound(t *testing.T) { + ctx := context.Background() + srv, receiver := createBaseReceiver() + defer func() { + assert.NoError(t, srv.Close()) + assert.NoError(t, receiver.Shutdown(ctx)) + }() + + receiver.tracesConsumer = consumertest.NewNop() + receiver.config.Encoding = "foo" + assert.ErrorContains(t, receiver.Start(ctx, fakeHost{}), "extension \"foo\" not found") +} + +func TestEncodingExtension(t *testing.T) { + ctx := context.Background() + srv, receiver := createBaseReceiver() + defer func() { + assert.NoError(t, srv.Close()) + assert.NoError(t, receiver.Shutdown(ctx)) + }() + + receiver.tracesConsumer = consumertest.NewNop() + receiver.config.Encoding = "text_encoding" + assert.ErrorContains(t, receiver.Start(ctx, fakeHost{}), "extension \"text_encoding\" is not a trace unmarshaler") +} + +func TestEncodingExtensionMismatch(t *testing.T) { + ctx := context.Background() + srv, receiver := createBaseReceiver() + defer func() { + assert.NoError(t, srv.Close()) + assert.NoError(t, receiver.Shutdown(ctx)) + }() + + receiver.logsConsumer = consumertest.NewNop() + receiver.config.Encoding = "text_encoding" + assert.NoError(t, receiver.Start(ctx, fakeHost{})) + assert.Nil(t, receiver.tracesConsumer) + assert.Nil(t, receiver.metricsConsumer) + assert.NotNil(t, receiver.logsConsumer) +}