Skip to content

Commit

Permalink
[receiver/googlecloudpubsub] Add support for encoding extensions (#37109
Browse files Browse the repository at this point in the history
) (#37137)

#### Description
Added support for encoding extensions. Setting the encoding field in the
config now references the extension. If it didn't find the extension, it
will fall back to searching the internal encoders.

To make the build in encoders consistent with the extensions, they now
have the same interface.

#### Link to tracking issue
Fixes #37109

#### Testing
- Added tests for the receiver to handle the new extensions and the
build in encoders
- Removed the tests in the configuration, as they are now handled in the
receiver tests

#### Documentation
Reworked the encoding section of the README, with an example of a
text_encoding extension
  • Loading branch information
alexvanboxel authored Jan 24, 2025
1 parent 4abd68a commit 6dcced1
Show file tree
Hide file tree
Showing 11 changed files with 431 additions and 200 deletions.
11 changes: 11 additions & 0 deletions .chloggen/pubsubreceiver-encodingextensions.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
change_type: enhancement

component: googlecloudpubsubreceiver

note: Added support for encoding extensions.

issues: [37109]

subtext:

change_logs: [user]
54 changes: 39 additions & 15 deletions receiver/googlecloudpubsubreceiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,29 +41,53 @@ receivers:
## Encoding
You should not need to set the encoding of the subscription as the receiver will try to discover the type of the data
by looking at the `ce-type` and `content-type` attributes of the message. Only when those attributes are not set
must the `encoding` field in the configuration be set.
The `encoding` options allows you to specify Encoding Extensions for decoding messages on the subscription. An
extension need to be configured in the `extensions` section, and added to pipeline in the collectors configuration file.

| ce-type | ce-datacontenttype | encoding | description |
|-----------------------------------|----------------------|-------------------|------------------------------------------------|
| org.opentelemetry.otlp.traces.v1 | application/protobuf | | Decode OTLP trace message |
| org.opentelemetry.otlp.metrics.v1 | application/protobuf | | Decode OTLP metric message |
| org.opentelemetry.otlp.logs.v1 | application/json | | Decode OTLP log message |
| - | - | otlp_proto_trace | Decode OTLP trace message |
| - | - | otlp_proto_metric | Decode OTLP trace message |
| - | - | otlp_proto_log | Decode OTLP trace message |
| - | - | cloud_logging | Decode [Cloud Logging] [LogEntry] message type |
| - | - | raw_text | Wrap in an OTLP log message |
The following example shows how to use the text encoding extension for ingesting arbitrary text message on a
subscription, wrapping them in OTLP Log messages. Note that not all extensions support all signals.

```yaml
extensions:
text_encoding:
encoding: utf8
unmarshaling_separator: "\r?\n"
service:
extensions: [text_encoding]
pipelines:
logs:
receivers: [googlecloudpubsub]
processors: []
exporters: [debug]
```

When the `encoding` configuration is set, the attributes on the message are ignored.
The receiver also supports build in encodings for the native OTLP encodings, without the need to specify an Encoding
Extensions. The non OTLP build in encodings will be deprecated as soon as extensions for the formats are available.

| encoding | description |
|-------------------|------------------------------------------------|
| otlp_proto_trace | Decode OTLP trace message |
| otlp_proto_metric | Decode OTLP trace message |
| otlp_proto_log | Decode OTLP trace message |
| cloud_logging | Decode [Cloud Logging] [LogEntry] message type |
| raw_text | Wrap in an OTLP log message |

With `cloud_logging`, the receiver can be used to bring Cloud Logging messages into an OpenTelemetry pipeline. You'll
first need to [set up a logging sink][sink-docs] with a Pub/Sub topic as its destination. Note that the `cloud_logging`
integration is considered **alpha** as the semantic convention on some of the conversion are not stabilized yet.

With `raw_text`, the receiver can be used for ingesting arbitrary text message on a Pubsub subscription, wrapping them
in OTLP Log messages, making it a convenient way to ingest raw log lines from Pubsub.
in OTLP Log messages.

When no encoding is specified, the receiver will try to discover the type of the data by looking at the `ce-type` and
`content-type` attributes of the message. These message attributes are set by the `googlepubsubexporter`.

| ce-type | ce-datacontenttype | encoding | description |
|-----------------------------------|----------------------|-------------------|------------------------------------------------|
| org.opentelemetry.otlp.traces.v1 | application/protobuf | | Decode OTLP trace message |
| org.opentelemetry.otlp.metrics.v1 | application/protobuf | | Decode OTLP metric message |
| org.opentelemetry.otlp.logs.v1 | application/protobuf | | Decode OTLP log message |

[Cloud Logging]: https://cloud.google.com/logging
[LogEntry]: https://cloud.google.com/logging/docs/reference/v2/rest/v2/LogEntry
Expand Down
45 changes: 0 additions & 45 deletions receiver/googlecloudpubsubreceiver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,51 +35,6 @@ type Config struct {
ClientID string `mapstructure:"client_id"`
}

func (config *Config) validateForLog() error {
err := config.validate()
if err != nil {
return err
}
switch config.Encoding {
case "":
case "otlp_proto_log":
case "raw_text":
case "raw_json":
case "cloud_logging":
default:
return fmt.Errorf("log encoding %v is not supported. supported encoding formats include [otlp_proto_log,raw_text,raw_json,cloud_logging]", config.Encoding)
}
return nil
}

func (config *Config) validateForTrace() error {
err := config.validate()
if err != nil {
return err
}
switch config.Encoding {
case "":
case "otlp_proto_trace":
default:
return fmt.Errorf("trace encoding %v is not supported. supported encoding formats include [otlp_proto_trace]", config.Encoding)
}
return nil
}

func (config *Config) validateForMetric() error {
err := config.validate()
if err != nil {
return err
}
switch config.Encoding {
case "":
case "otlp_proto_metric":
default:
return fmt.Errorf("metric encoding %v is not supported. supported encoding formats include [otlp_proto_metric]", config.Encoding)
}
return nil
}

func (config *Config) validate() error {
if !subscriptionMatcher.MatchString(config.Subscription) {
return fmt.Errorf("subscription '%s' is not a valid format, use 'projects/<project_id>/subscriptions/<name>'", config.Subscription)
Expand Down
60 changes: 0 additions & 60 deletions receiver/googlecloudpubsubreceiver/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,70 +63,10 @@ func TestLoadConfig(t *testing.T) {
func TestConfigValidation(t *testing.T) {
factory := NewFactory()
c := factory.CreateDefaultConfig().(*Config)
assert.Error(t, c.validateForTrace())
assert.Error(t, c.validateForLog())
assert.Error(t, c.validateForMetric())
c.Subscription = "projects/000project/subscriptions/my-subscription"
assert.Error(t, c.validate())
c.Subscription = "projects/my-project/topics/my-topic"
assert.Error(t, c.validate())
c.Subscription = "projects/my-project/subscriptions/my-subscription"
assert.NoError(t, c.validate())
}

func TestTraceConfigValidation(t *testing.T) {
factory := NewFactory()
c := factory.CreateDefaultConfig().(*Config)
c.Subscription = "projects/my-project/subscriptions/my-subscription"
assert.NoError(t, c.validateForTrace())

c.Encoding = "otlp_proto_metric"
assert.Error(t, c.validateForTrace())
c.Encoding = "otlp_proto_log"
assert.Error(t, c.validateForTrace())
c.Encoding = "raw_text"
assert.Error(t, c.validateForTrace())
c.Encoding = "raw_json"
assert.Error(t, c.validateForTrace())

c.Encoding = "otlp_proto_trace"
assert.NoError(t, c.validateForTrace())
}

func TestMetricConfigValidation(t *testing.T) {
factory := NewFactory()
c := factory.CreateDefaultConfig().(*Config)
c.Subscription = "projects/my-project/subscriptions/my-subscription"
assert.NoError(t, c.validateForMetric())

c.Encoding = "otlp_proto_trace"
assert.Error(t, c.validateForMetric())
c.Encoding = "otlp_proto_log"
assert.Error(t, c.validateForMetric())
c.Encoding = "raw_text"
assert.Error(t, c.validateForMetric())
c.Encoding = "raw_json"
assert.Error(t, c.validateForMetric())

c.Encoding = "otlp_proto_metric"
assert.NoError(t, c.validateForMetric())
}

func TestLogConfigValidation(t *testing.T) {
factory := NewFactory()
c := factory.CreateDefaultConfig().(*Config)
c.Subscription = "projects/my-project/subscriptions/my-subscription"
assert.NoError(t, c.validateForLog())

c.Encoding = "otlp_proto_trace"
assert.Error(t, c.validateForLog())
c.Encoding = "otlp_proto_metric"
assert.Error(t, c.validateForLog())

c.Encoding = "raw_text"
assert.NoError(t, c.validateForLog())
c.Encoding = "raw_json"
assert.NoError(t, c.validateForLog())
c.Encoding = "otlp_proto_log"
assert.NoError(t, c.validateForLog())
}
6 changes: 3 additions & 3 deletions receiver/googlecloudpubsubreceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func (factory *pubsubReceiverFactory) CreateTraces(
cfg component.Config,
consumer consumer.Traces,
) (receiver.Traces, error) {
err := cfg.(*Config).validateForTrace()
err := cfg.(*Config).validate()
if err != nil {
return nil, err
}
Expand All @@ -89,7 +89,7 @@ func (factory *pubsubReceiverFactory) CreateMetrics(
cfg component.Config,
consumer consumer.Metrics,
) (receiver.Metrics, error) {
err := cfg.(*Config).validateForMetric()
err := cfg.(*Config).validate()
if err != nil {
return nil, err
}
Expand All @@ -107,7 +107,7 @@ func (factory *pubsubReceiverFactory) CreateLogs(
cfg component.Config,
consumer consumer.Logs,
) (receiver.Logs, error) {
err := cfg.(*Config).validateForLog()
err := cfg.(*Config).validate()
if err != nil {
return nil, err
}
Expand Down
7 changes: 5 additions & 2 deletions receiver/googlecloudpubsubreceiver/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ require (
github.com/googleapis/gax-go/v2 v2.14.1
github.com/iancoleman/strcase v0.3.0
github.com/json-iterator/go v1.1.12
github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding v0.114.0
github.com/stretchr/testify v1.10.0
go.opentelemetry.io/collector/component v0.118.1-0.20250121185328-fbefb22cc2b3
go.opentelemetry.io/collector/component/componenttest v0.118.1-0.20250121185328-fbefb22cc2b3
Expand Down Expand Up @@ -37,7 +38,7 @@ require (
cloud.google.com/go/iam v1.2.2 // indirect
cloud.google.com/go/longrunning v0.6.2 // indirect
github.com/cenkalti/backoff/v4 v4.3.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/felixge/httpsnoop v1.0.4 // indirect
github.com/go-logr/logr v1.4.2 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
Expand All @@ -55,7 +56,7 @@ require (
github.com/mitchellh/reflectwalk v1.0.2 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
go.einride.tech/aip v0.68.0 // indirect
go.opencensus.io v0.24.0 // indirect
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
Expand Down Expand Up @@ -92,3 +93,5 @@ retract (
v0.76.1
v0.65.0
)

replace github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding => ../../extension/encoding
6 changes: 4 additions & 2 deletions receiver/googlecloudpubsubreceiver/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 1 addition & 3 deletions receiver/googlecloudpubsubreceiver/internal/log_entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package internal // import "github.com/open-telemetry/opentelemetry-collector-co

import (
"bytes"
"context"
"encoding/hex"
stdjson "encoding/json"
"errors"
Expand All @@ -20,7 +19,6 @@ import (
jsoniter "github.com/json-iterator/go"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
"go.uber.org/zap"
"google.golang.org/genproto/googleapis/api/monitoredres"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/reflect/protoreflect"
Expand Down Expand Up @@ -113,7 +111,7 @@ func getLogEntryDescriptor() protoreflect.MessageDescriptor {
// schema; this ensures that a numeric value in the input is correctly
// translated to either an integer or a double in the output. It falls back to
// plain JSON decoding if payload type is not available in the proto registry.
func TranslateLogEntry(_ context.Context, _ *zap.Logger, data []byte) (pcommon.Resource, plog.LogRecord, error) {
func TranslateLogEntry(data []byte) (pcommon.Resource, plog.LogRecord, error) {
lr := plog.NewLogRecord()
res := pcommon.NewResource()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
package internal

import (
"context"
"fmt"
"testing"
"time"
Expand All @@ -15,7 +14,6 @@ import (
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
"go.uber.org/multierr"
"go.uber.org/zap"
)

type Log struct {
Expand Down Expand Up @@ -70,15 +68,12 @@ func TestTranslateLogEntry(t *testing.T) {
}{
// TODO: Add publicly shareable log test data.
}

logger, _ := zap.NewDevelopment()

for _, tt := range tests {
var errs error
wantRes, wantLr, err := generateLog(t, tt.want)
errs = multierr.Append(errs, err)

gotRes, gotLr, err := TranslateLogEntry(context.TODO(), logger, []byte(tt.input))
gotRes, gotLr, err := TranslateLogEntry([]byte(tt.input))
errs = multierr.Append(errs, err)
errs = multierr.Combine(errs, compareResources(wantRes, gotRes), compareLogRecords(wantLr, gotLr))

Expand Down
Loading

0 comments on commit 6dcced1

Please sign in to comment.