Skip to content

Commit

Permalink
feat: configurable mock destination for pipeline testing (#2012)
Browse files Browse the repository at this point in the history
This new exporter mocks a destination, and allows one to simulate
various back-pressure scenarios, like long time to get a response for
the export, and simulating some export requests being rejected.

It is useful for pipeline development and testing
  • Loading branch information
blumamir authored Dec 16, 2024
1 parent 5c4ab5b commit 793e2c9
Show file tree
Hide file tree
Showing 22 changed files with 1,114 additions and 1 deletion.
4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -246,3 +246,7 @@ dev-debug-destination:
.PHONY: dev-add-nop-destination
dev-nop-destination:
kubectl apply -f ./tests/nop-exporter.yaml

.PHONY: dev-add-backpressue-destination
dev-backpressue-destination:
kubectl apply -f ./tests/backpressure-exporter.yaml
2 changes: 2 additions & 0 deletions collector/builder-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ exporters:
- gomod: go.opentelemetry.io/collector/exporter/otlphttpexporter v0.106.0
- gomod: github.com/open-telemetry/opentelemetry-collector-contrib/odigos/exporter/azureblobstorageexporter v0.106.0
- gomod: github.com/open-telemetry/opentelemetry-collector-contrib/odigos/exporter/googlecloudstorageexporter v0.106.0
- gomod: github.com/open-telemetry/opentelemetry-collector-contrib/odigos/exporter/mockdestinationexporter v0.106.0
- gomod: github.com/open-telemetry/opentelemetry-collector-contrib/exporter/awss3exporter v0.106.0
- gomod: github.com/open-telemetry/opentelemetry-collector-contrib/exporter/awsxrayexporter v0.106.0
- gomod: github.com/open-telemetry/opentelemetry-collector-contrib/exporter/azuredataexplorerexporter v0.106.0
Expand Down Expand Up @@ -116,5 +117,6 @@ replaces:
- github.com/open-telemetry/opentelemetry-collector-contrib/odigos/processor/odigossqldboperationprocessor => ../processors/odigossqldboperationprocessor
- github.com/open-telemetry/opentelemetry-collector-contrib/odigos/exporter/azureblobstorageexporter => ../exporters/azureblobstorageexporter
- github.com/open-telemetry/opentelemetry-collector-contrib/odigos/exporter/googlecloudstorageexporter => ../exporters/googlecloudstorageexporter
- github.com/open-telemetry/opentelemetry-collector-contrib/odigos/exporter/mockdestinationexporter => ../exporters/mockdestinationexporter
- github.com/open-telemetry/opentelemetry-collector-contrib/odigos/processor/odigostrafficmetrics => ../processors/odigostrafficmetrics
- go.opentelemetry.io/collector/odigos/providers/odigosfileprovider => ../providers/odigosfileprovider
1 change: 1 addition & 0 deletions collector/exporters/mockdestinationexporter/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
include ../../Makefile.Common
19 changes: 19 additions & 0 deletions collector/exporters/mockdestinationexporter/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# Mock Destination Exporter

This exporter can be used for development and testing.
It allows you to mock a specific behavior of a destination exporter.

## Configuration

The following configuration options are available:

- `response_duration` can be used to set the duration of time until the export response is returned. can be used to simulate slow receivers (due to errors, network issues, etc).
- `reject_fraction` number from 0 to 1 that determines the fraction of exports that mocks a rejection of the export request.

Example:

```yaml
│ mockdestination:
│ reject_fraction: 0.5
│ response_duration: 500ms
```
33 changes: 33 additions & 0 deletions collector/exporters/mockdestinationexporter/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package mockdestinationexporter

import (
"fmt"
"time"

"go.opentelemetry.io/collector/component"
)

// Config contains the main configuration options for the mockdestination exporter
type Config struct {

// ResponseDuration is the amount of time the exporter will wait before returning a response.
// It can be used to simulate loaded and slow destinations.
ResponseDuration time.Duration `mapstructure:"response_duration"`

// RejectFraction is the fraction of exports that will randomly be rejected.
// Set to 0 to disable rejection, and to 1 to reject all exports.
// Can be used to simulate destinations that are back-pressuring the collector.
RejectFraction float64 `mapstructure:"reject_fraction"`
}

func (c *Config) Validate() error {
if c.ResponseDuration < 0 {
return fmt.Errorf("response_duration must be a non-negative duration")
}
if c.RejectFraction < 0 || c.RejectFraction > 1 {
return fmt.Errorf("reject_fraction must be a fraction between 0 and 1")
}
return nil
}

var _ component.Config = (*Config)(nil)
15 changes: 15 additions & 0 deletions collector/exporters/mockdestinationexporter/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package mockdestinationexporter

import (
"testing"

"github.com/stretchr/testify/assert"
"go.opentelemetry.io/collector/confmap"
)

func TestUnmarshalDefaultConfig(t *testing.T) {
factory := NewFactory()
cfg := factory.CreateDefaultConfig()
assert.NoError(t, confmap.New().Unmarshal(&cfg))
assert.Equal(t, factory.CreateDefaultConfig(), cfg)
}
62 changes: 62 additions & 0 deletions collector/exporters/mockdestinationexporter/exporter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package mockdestinationexporter

import (
"context"
"errors"
"math/rand/v2"
"time"

"go.opentelemetry.io/collector/exporter"

"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.uber.org/zap"
)

type MockDestinationExporter struct {
config *Config
logger *zap.Logger
}

func NewMockDestinationExporter(config *Config,
params exporter.Settings) (*MockDestinationExporter, error) {

if config == nil {
return nil, errors.New("mock destination exporter config is nil")
}

logger := params.Logger

mockDestinationExporter := &MockDestinationExporter{
config: config,
logger: logger,
}
return mockDestinationExporter, nil
}

func (e *MockDestinationExporter) Capabilities() consumer.Capabilities {
return consumer.Capabilities{MutatesData: false}
}

func (e *MockDestinationExporter) ConsumeTraces(ctx context.Context, traces ptrace.Traces) error {
return e.mockExport(ctx)
}

func (e *MockDestinationExporter) ConsumeMetrics(ctx context.Context, metrics pmetric.Metrics) error {
return e.mockExport(ctx)
}

func (e *MockDestinationExporter) ConsumeLogs(ctx context.Context, logs plog.Logs) error {
return e.mockExport(ctx)
}

func (e *MockDestinationExporter) mockExport(context.Context) error {
// not taking care of ctx cancel and shutdown as this is a dummy exporter and not used in production
<-time.After(e.config.ResponseDuration)
if rand.Float64() < e.config.RejectFraction {
return errors.New("export rejected by mock destination")
}
return nil
}
85 changes: 85 additions & 0 deletions collector/exporters/mockdestinationexporter/factory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package mockdestinationexporter

import (
"context"
"time"

"github.com/open-telemetry/opentelemetry-collector-contrib/odigos/exporter/mockdestinationexporter/internal/metadata"
"go.opentelemetry.io/collector/exporter"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/exporter/exporterhelper"
)

// NewFactory creates a factory for GCS exporter.
func NewFactory() exporter.Factory {
return exporter.NewFactory(
metadata.Type,
createDefaultConfig,
exporter.WithLogs(createLogsExporter, component.StabilityLevelBeta),
exporter.WithTraces(createTracesExporter, component.StabilityLevelBeta),
exporter.WithMetrics(createMetricsExporter, component.StabilityLevelBeta))
}

func createDefaultConfig() component.Config {
return &Config{
ResponseDuration: time.Millisecond * 100,
RejectFraction: 0,
}
}

func createLogsExporter(
ctx context.Context,
set exporter.Settings,
cfg component.Config) (exporter.Logs, error) {

pCfg := cfg.(*Config)
gcsExporter, err := NewMockDestinationExporter(pCfg, set)
if err != nil {
return nil, err
}

return exporterhelper.NewLogsExporter(
ctx,
set,
cfg,
gcsExporter.ConsumeLogs)
}

func createTracesExporter(
ctx context.Context,
set exporter.Settings,
cfg component.Config) (exporter.Traces, error) {

pCfg := cfg.(*Config)
gcsExporter, err := NewMockDestinationExporter(pCfg, set)
if err != nil {
return nil, err
}

return exporterhelper.NewTracesExporter(
ctx,
set,
cfg,
gcsExporter.ConsumeTraces,
)
}

func createMetricsExporter(
ctx context.Context,
set exporter.Settings,
cfg component.Config) (exporter.Metrics, error) {

pCfg := cfg.(*Config)
gcsExporter, err := NewMockDestinationExporter(pCfg, set)
if err != nil {
return nil, err
}

return exporterhelper.NewMetricsExporter(
ctx,
set,
cfg,
gcsExporter.ConsumeMetrics,
)
}

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 793e2c9

Please sign in to comment.