diff --git a/processor/batchprocessor/batch_processor_test.go b/processor/batchprocessor/batch_processor_test.go index 8b2e16d496ab..311b1bae2745 100644 --- a/processor/batchprocessor/batch_processor_test.go +++ b/processor/batchprocessor/batch_processor_test.go @@ -23,6 +23,7 @@ import ( "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/consumer/consumertest" + "go.opentelemetry.io/collector/featuregate" "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pdata/ptrace" @@ -32,7 +33,44 @@ import ( "go.opentelemetry.io/collector/processor/processortest" ) +// Helper function to set up feature gates for testing +func setFeatureGateForTest(t *testing.T, useExporterHelper bool) func() { + originalValue := useExporterHelper + if useExporterHelper { + originalValue = false // Default is false (Alpha stage) + } + + require.NoError(t, featuregate.GlobalRegistry().Set(useExporterHelperGate, useExporterHelper)) + + // Return cleanup function + return func() { + require.NoError(t, featuregate.GlobalRegistry().Set(useExporterHelperGate, originalValue)) + } +} + +// Helper function to set up propagateErrors feature gate for testing +func setPropagateErrorsForTest(t *testing.T, usePropagateErrors bool) func() { + originalValue := usePropagateErrors + if usePropagateErrors { + originalValue = false // Default is false (Alpha stage) + } + + require.NoError(t, featuregate.GlobalRegistry().Set(propagateErrorsGate, usePropagateErrors)) + + // Return cleanup function + return func() { + require.NoError(t, featuregate.GlobalRegistry().Set(propagateErrorsGate, originalValue)) + } +} + func TestProcessorShutdown(t *testing.T) { + t.Run(t.Name()+"Legacy", func(t *testing.T) { testProcessorShutdown(t, false) }) // Test legacy implementation + t.Run(t.Name()+"Helper", func(t *testing.T) { testProcessorShutdown(t, true) }) // Test new implementation +} + +func testProcessorShutdown(t *testing.T, useExporterHelper bool) { + defer setFeatureGateForTest(t, useExporterHelper)() + factory := NewFactory() ctx := context.Background() @@ -60,6 +98,13 @@ func TestProcessorShutdown(t *testing.T) { } func TestProcessorLifecycle(t *testing.T) { + t.Run(t.Name()+"Legacy", func(t *testing.T) { testProcessorLifecycle(t, false) }) // Test legacy implementation + t.Run(t.Name()+"Helper", func(t *testing.T) { testProcessorLifecycle(t, true) }) // Test new implementation +} + +func testProcessorLifecycle(t *testing.T, useExporterHelper bool) { + defer setFeatureGateForTest(t, useExporterHelper)() + factory := NewFactory() ctx := context.Background() @@ -84,14 +129,30 @@ func TestProcessorLifecycle(t *testing.T) { } func TestBatchProcessorSpansDelivered(t *testing.T) { + t.Run(t.Name()+"Legacy", func(t *testing.T) { testBatchProcessorSpansDelivered(t, false, false) }) // Test legacy implementation + t.Run(t.Name()+"Helper", func(t *testing.T) { testBatchProcessorSpansDelivered(t, true, false) }) // Test new implementation + t.Run(t.Name()+"HelperWithPropagateErrors", func(t *testing.T) { testBatchProcessorSpansDelivered(t, true, true) }) // Test new implementation with propagate errors +} + +func testBatchProcessorSpansDelivered(t *testing.T, useExporterHelper, usePropagateErrors bool) { + defer setFeatureGateForTest(t, useExporterHelper)() + defer setPropagateErrorsForTest(t, usePropagateErrors)() + sink := new(consumertest.TracesSink) + + // Create a wrapper around the sink that logs when it's called + wrappedSink, err := consumer.NewTraces(func(ctx context.Context, td ptrace.Traces) error { + return sink.ConsumeTraces(ctx, td) + }) + require.NoError(t, err) + cfg := createDefaultConfig().(*Config) cfg.SendBatchSize = 128 - traces, err := NewFactory().CreateTraces(context.Background(), processortest.NewNopSettings(metadata.Type), cfg, sink) + traces, err := NewFactory().CreateTraces(context.Background(), processortest.NewNopSettings(metadata.Type), cfg, wrappedSink) require.NoError(t, err) require.NoError(t, traces.Start(context.Background(), componenttest.NewNopHost())) - requestCount := 1000 + requestCount := 2 // Reduced for easier debugging spansPerRequest := 100 sentResourceSpans := ptrace.NewTraces().ResourceSpans() for requestNum := 0; requestNum < requestCount; requestNum++ { @@ -124,24 +185,48 @@ func TestBatchProcessorSpansDelivered(t *testing.T) { } func TestBatchProcessorSpansDeliveredEnforceBatchSize(t *testing.T) { + t.Run(t.Name()+"Legacy", func(t *testing.T) { testBatchProcessorSpansDeliveredEnforceBatchSize(t, false, false) }) // Test legacy implementation + t.Run(t.Name()+"Helper", func(t *testing.T) { testBatchProcessorSpansDeliveredEnforceBatchSize(t, true, false) }) // Test new implementation + t.Run(t.Name()+"HelperWithPropagateErrors", func(t *testing.T) { testBatchProcessorSpansDeliveredEnforceBatchSize(t, true, true) }) // Test new implementation with propagate errors +} + +func testBatchProcessorSpansDeliveredEnforceBatchSize(t *testing.T, useExporterHelper, usePropagateErrors bool) { + defer setFeatureGateForTest(t, useExporterHelper)() + if usePropagateErrors { + err := featuregate.GlobalRegistry().Set("processor.batch.propagateerrors", true) + require.NoError(t, err) + defer func() { + err := featuregate.GlobalRegistry().Set("processor.batch.propagateerrors", false) + require.NoError(t, err) + }() + } + sink := new(consumertest.TracesSink) cfg := createDefaultConfig().(*Config) cfg.SendBatchSize = 128 cfg.SendBatchMaxSize = 130 + cfg.Timeout = 10 * time.Second traces, err := NewFactory().CreateTraces(context.Background(), processortest.NewNopSettings(metadata.Type), cfg, sink) require.NoError(t, err) require.NoError(t, traces.Start(context.Background(), componenttest.NewNopHost())) requestCount := 1000 spansPerRequest := 150 + var wg sync.WaitGroup + wg.Add(requestCount) + // Requests are in parallel for requestNum := 0; requestNum < requestCount; requestNum++ { - td := testdata.GenerateTraces(spansPerRequest) - spans := td.ResourceSpans().At(0).ScopeSpans().At(0).Spans() - for spanIndex := 0; spanIndex < spansPerRequest; spanIndex++ { - spans.At(spanIndex).SetName(getTestSpanName(requestNum, spanIndex)) - } - require.NoError(t, traces.ConsumeTraces(context.Background(), td)) + go func() { + defer wg.Done() + td := testdata.GenerateTraces(spansPerRequest) + spans := td.ResourceSpans().At(0).ScopeSpans().At(0).Spans() + for spanIndex := 0; spanIndex < spansPerRequest; spanIndex++ { + spans.At(spanIndex).SetName(getTestSpanName(requestNum, spanIndex)) + } + require.NoError(t, traces.ConsumeTraces(context.Background(), td)) + }() } + wg.Wait() // Added to test logic that check for empty resources. td := ptrace.NewTraces() @@ -163,6 +248,13 @@ func TestBatchProcessorSpansDeliveredEnforceBatchSize(t *testing.T) { } func TestBatchProcessorSentBySize(t *testing.T) { + t.Run(t.Name()+"Legacy", func(t *testing.T) { testBatchProcessorSentBySize(t, false) }) // Test legacy implementation + t.Run(t.Name()+"Helper", func(t *testing.T) { testBatchProcessorSentBySize(t, true) }) // Test new implementation +} + +func testBatchProcessorSentBySize(t *testing.T, useExporterHelper bool) { + defer setFeatureGateForTest(t, useExporterHelper)() + const ( sendBatchSize = 20 requestCount = 100 @@ -257,6 +349,13 @@ func TestBatchProcessorSentBySize(t *testing.T) { } func TestBatchProcessorSentBySizeWithMaxSize(t *testing.T) { + t.Run(t.Name()+"Legacy", func(t *testing.T) { testBatchProcessorSentBySizeWithMaxSize(t, false) }) // Test legacy implementation + t.Run(t.Name()+"Helper", func(t *testing.T) { testBatchProcessorSentBySizeWithMaxSize(t, true) }) // Test new implementation +} + +func testBatchProcessorSentBySizeWithMaxSize(t *testing.T, useExporterHelper bool) { + defer setFeatureGateForTest(t, useExporterHelper)() + const ( sendBatchSize = 20 sendBatchMaxSize = 37 @@ -356,6 +455,13 @@ func TestBatchProcessorSentBySizeWithMaxSize(t *testing.T) { } func TestBatchProcessorSentByTimeout(t *testing.T) { + t.Run(t.Name()+"Legacy", func(t *testing.T) { testBatchProcessorSentByTimeout(t, false) }) // Test legacy implementation + t.Run(t.Name()+"Helper", func(t *testing.T) { testBatchProcessorSentByTimeout(t, true) }) // Test new implementation +} + +func testBatchProcessorSentByTimeout(t *testing.T, useExporterHelper bool) { + defer setFeatureGateForTest(t, useExporterHelper)() + sink := new(consumertest.TracesSink) cfg := createDefaultConfig().(*Config) sendBatchSize := 100 @@ -402,6 +508,13 @@ func TestBatchProcessorSentByTimeout(t *testing.T) { } func TestBatchProcessorTraceSendWhenClosing(t *testing.T) { + t.Run(t.Name()+"Legacy", func(t *testing.T) { testBatchProcessorTraceSendWhenClosing(t, false) }) // Test legacy implementation + t.Run(t.Name()+"Helper", func(t *testing.T) { testBatchProcessorTraceSendWhenClosing(t, true) }) // Test new implementation +} + +func testBatchProcessorTraceSendWhenClosing(t *testing.T, useExporterHelper bool) { + defer setFeatureGateForTest(t, useExporterHelper)() + cfg := &Config{ Timeout: 3 * time.Second, SendBatchSize: 1000, @@ -426,6 +539,13 @@ func TestBatchProcessorTraceSendWhenClosing(t *testing.T) { } func TestBatchMetricProcessor_ReceivingData(t *testing.T) { + t.Run(t.Name()+"Legacy", func(t *testing.T) { testBatchMetricProcessor_ReceivingData(t, false) }) // Test legacy implementation + t.Run(t.Name()+"Helper", func(t *testing.T) { testBatchMetricProcessor_ReceivingData(t, true) }) // Test new implementation +} + +func testBatchMetricProcessor_ReceivingData(t *testing.T, useExporterHelper bool) { + defer setFeatureGateForTest(t, useExporterHelper)() + // Instantiate the batch processor with low config values to test data // gets sent through the processor. cfg := &Config{ @@ -473,6 +593,13 @@ func TestBatchMetricProcessor_ReceivingData(t *testing.T) { } func TestBatchMetricProcessorBatchSize(t *testing.T) { + t.Run(t.Name()+"Legacy", func(t *testing.T) { testBatchMetricProcessorBatchSize(t, false) }) // Test legacy implementation + t.Run(t.Name()+"Helper", func(t *testing.T) { testBatchMetricProcessorBatchSize(t, true) }) // Test new implementation +} + +func testBatchMetricProcessorBatchSize(t *testing.T, useExporterHelper bool) { + defer setFeatureGateForTest(t, useExporterHelper)() + tel := componenttest.NewTelemetry() sizer := &pmetric.ProtoMarshaler{} @@ -570,6 +697,13 @@ func TestBatchMetricProcessorBatchSize(t *testing.T) { } func TestBatchMetrics_UnevenBatchMaxSize(t *testing.T) { + t.Run(t.Name()+"Legacy", func(t *testing.T) { testBatchMetrics_UnevenBatchMaxSize(t, false) }) // Test legacy implementation + t.Run(t.Name()+"Helper", func(t *testing.T) { testBatchMetrics_UnevenBatchMaxSize(t, true) }) // Test new implementation +} + +func testBatchMetrics_UnevenBatchMaxSize(t *testing.T, useExporterHelper bool) { + defer setFeatureGateForTest(t, useExporterHelper)() + ctx := context.Background() sink := new(metricsSink) metricsCount := 50 @@ -590,6 +724,13 @@ func TestBatchMetrics_UnevenBatchMaxSize(t *testing.T) { } func TestBatchMetricsProcessor_Timeout(t *testing.T) { + t.Run(t.Name()+"Legacy", func(t *testing.T) { testBatchMetricsProcessor_Timeout(t, false) }) // Test legacy implementation + t.Run(t.Name()+"Helper", func(t *testing.T) { testBatchMetricsProcessor_Timeout(t, true) }) // Test new implementation +} + +func testBatchMetricsProcessor_Timeout(t *testing.T, useExporterHelper bool) { + defer setFeatureGateForTest(t, useExporterHelper)() + cfg := &Config{ Timeout: 100 * time.Millisecond, SendBatchSize: 101, @@ -634,6 +775,13 @@ func TestBatchMetricsProcessor_Timeout(t *testing.T) { } func TestBatchMetricProcessor_Shutdown(t *testing.T) { + t.Run(t.Name()+"Legacy", func(t *testing.T) { testBatchMetricProcessor_Shutdown(t, false) }) // Test legacy implementation + t.Run(t.Name()+"Helper", func(t *testing.T) { testBatchMetricProcessor_Shutdown(t, true) }) // Test new implementation +} + +func testBatchMetricProcessor_Shutdown(t *testing.T, useExporterHelper bool) { + defer setFeatureGateForTest(t, useExporterHelper)() + cfg := &Config{ Timeout: 3 * time.Second, SendBatchSize: 1000, @@ -773,6 +921,13 @@ func (sme *metricsSink) ConsumeMetrics(_ context.Context, md pmetric.Metrics) er } func TestBatchLogProcessor_ReceivingData(t *testing.T) { + t.Run(t.Name()+"Legacy", func(t *testing.T) { testBatchLogProcessor_ReceivingData(t, false) }) // Test legacy implementation + t.Run(t.Name()+"Helper", func(t *testing.T) { testBatchLogProcessor_ReceivingData(t, true) }) // Test new implementation +} + +func testBatchLogProcessor_ReceivingData(t *testing.T, useExporterHelper bool) { + defer setFeatureGateForTest(t, useExporterHelper)() + // Instantiate the batch processor with low config values to test data // gets sent through the processor. cfg := &Config{ @@ -820,6 +975,13 @@ func TestBatchLogProcessor_ReceivingData(t *testing.T) { } func TestBatchLogProcessor_BatchSize(t *testing.T) { + t.Run(t.Name()+"Legacy", func(t *testing.T) { testBatchLogProcessor_BatchSize(t, false) }) // Test legacy implementation + t.Run(t.Name()+"Helper", func(t *testing.T) { testBatchLogProcessor_BatchSize(t, true) }) // Test new implementation +} + +func testBatchLogProcessor_BatchSize(t *testing.T, useExporterHelper bool) { + defer setFeatureGateForTest(t, useExporterHelper)() + tel := componenttest.NewTelemetry() sizer := &plog.ProtoMarshaler{} @@ -915,6 +1077,13 @@ func TestBatchLogProcessor_BatchSize(t *testing.T) { } func TestBatchLogsProcessor_Timeout(t *testing.T) { + testBatchLogsProcessor_Timeout(t, false) // Test legacy implementation + testBatchLogsProcessor_Timeout(t, true) // Test new implementation +} + +func testBatchLogsProcessor_Timeout(t *testing.T, useExporterHelper bool) { + defer setFeatureGateForTest(t, useExporterHelper)() + cfg := &Config{ Timeout: 100 * time.Millisecond, SendBatchSize: 100, @@ -959,6 +1128,13 @@ func TestBatchLogsProcessor_Timeout(t *testing.T) { } func TestBatchLogProcessor_Shutdown(t *testing.T) { + t.Run(t.Name()+"Legacy", func(t *testing.T) { testBatchLogProcessor_Shutdown(t, false) }) // Test legacy implementation + t.Run(t.Name()+"Helper", func(t *testing.T) { testBatchLogProcessor_Shutdown(t, true) }) // Test new implementation +} + +func testBatchLogProcessor_Shutdown(t *testing.T, useExporterHelper bool) { + defer setFeatureGateForTest(t, useExporterHelper)() + cfg := &Config{ Timeout: 3 * time.Second, SendBatchSize: 1000, @@ -1006,6 +1182,13 @@ func logsReceivedBySeverityText(lds []plog.Logs) map[string]plog.LogRecord { } func TestShutdown(t *testing.T) { + t.Run(t.Name()+"Legacy", func(t *testing.T) { testShutdown(t, false) }) // Test legacy implementation + t.Run(t.Name()+"Helper", func(t *testing.T) { testShutdown(t, true) }) // Test new implementation +} + +func testShutdown(t *testing.T, useExporterHelper bool) { + defer setFeatureGateForTest(t, useExporterHelper)() + factory := NewFactory() processortest.VerifyShutdown(t, factory, factory.CreateDefaultConfig()) } @@ -1167,6 +1350,13 @@ func TestBatchProcessorMetadataCardinalityLimit(t *testing.T) { } func TestBatchZeroConfig(t *testing.T) { + t.Run(t.Name()+"Legacy", func(t *testing.T) { testBatchZeroConfig(t, false) }) // Test legacy implementation + t.Run(t.Name()+"Helper", func(t *testing.T) { testBatchZeroConfig(t, true) }) // Test new implementation +} + +func testBatchZeroConfig(t *testing.T, useExporterHelper bool) { + defer setFeatureGateForTest(t, useExporterHelper)() + // This is a no-op configuration. No need for a timer, no // minimum, no maximum, just a pass through. cfg := &Config{} @@ -1204,6 +1394,13 @@ func TestBatchZeroConfig(t *testing.T) { } func TestBatchSplitOnly(t *testing.T) { + t.Run(t.Name()+"Legacy", func(t *testing.T) { testBatchSplitOnly(t, false) }) // Test legacy implementation + t.Run(t.Name()+"Helper", func(t *testing.T) { testBatchSplitOnly(t, true) }) // Test new implementation +} + +func testBatchSplitOnly(t *testing.T, useExporterHelper bool) { + defer setFeatureGateForTest(t, useExporterHelper)() + const maxBatch = 10 const requestCount = 5 const logsPerRequest = 100 diff --git a/processor/batchprocessor/capture_deadlock.sh b/processor/batchprocessor/capture_deadlock.sh new file mode 100644 index 000000000000..b8f6bfc8bc6b --- /dev/null +++ b/processor/batchprocessor/capture_deadlock.sh @@ -0,0 +1,36 @@ +#!/bin/bash + +# Script to capture deadlock stack trace from batch processor test + +echo "Starting deadlock capture..." +cd /home/jmacd/src/otel/collector/processor/batchprocessor + +# Start the test in background and capture PID +echo "Starting test in background..." +go test -race -c . + +#TEST=TestBatchProcessorSpansDeliveredEnforceBatchSize/TestBatchProcessorSpansDeliveredEnforceBatchSizeHelper +TEST=TestBatchProcessorSpansDeliveredEnforceBatchSize/TestBatchProcessorSpansDeliveredEnforceBatchSizeHelperWithPropagateErrors + +echo "Testing ${TEST}" +./batchprocessor.test -test.v -test.run ${TEST} 2> deadlock_err 1> deadlock_out & +TEST_PID=$! + +echo "Test started with PID: $TEST_PID" + +# Wait for the test to reach deadlock state +echo "Waiting 10 seconds for deadlock to occur..." +sleep 10 + +# Send SIGABRT to get stack trace +echo "Sending SIGABRT to capture stack trace..." +kill -ABRT $TEST_PID + +# Wait a moment for the signal to be processed +sleep 2 + +kill -9 $TEST_PID 2>/dev/null + +cat deadlock_err +echo "To access the whole stacktrace (also shown above), see ./deadlock_err" +echo "To view test output see ./deadlock_out" diff --git a/processor/batchprocessor/config.go b/processor/batchprocessor/config.go index 8a1b20176263..18533382534d 100644 --- a/processor/batchprocessor/config.go +++ b/processor/batchprocessor/config.go @@ -66,5 +66,11 @@ func (cfg *Config) Validate() error { if cfg.Timeout < 0 { return errors.New("timeout must be greater or equal to 0") } + + if useExporterHelper.IsEnabled() { + if len(cfg.MetadataKeys) != 0 { + return errors.New("metadata_keys not supported while this feature flag is alpha") + } + } return nil } diff --git a/processor/batchprocessor/exporterhelper_processor.go b/processor/batchprocessor/exporterhelper_processor.go new file mode 100644 index 000000000000..e2af365d028e --- /dev/null +++ b/processor/batchprocessor/exporterhelper_processor.go @@ -0,0 +1,111 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package batchprocessor // import "go.opentelemetry.io/collector/processor/batchprocessor" + +import ( + "context" + "runtime" + + "go.opentelemetry.io/collector/config/configoptional" + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/exporter" + "go.opentelemetry.io/collector/exporter/exporterhelper" + "go.opentelemetry.io/collector/processor" +) + +// translateToExporterHelperConfig converts legacy batchprocessor config to exporterhelper config +func translateToExporterHelperConfig(cfg *Config) exporterhelper.QueueBatchConfig { + // These settings match legacy behavior + queueBatchConfig := exporterhelper.QueueBatchConfig{ + Enabled: true, + WaitForResult: propagateErrors.IsEnabled(), + BlockOnOverflow: true, + Sizer: exporterhelper.RequestSizerTypeItems, + QueueSize: int64(runtime.NumCPU()) * int64(max(cfg.SendBatchSize, cfg.SendBatchMaxSize, 100)), + NumConsumers: 1, + } + + if cfg.SendBatchSize > 0 || cfg.SendBatchMaxSize > 0 || cfg.Timeout > 0 { + batchConfig := exporterhelper.BatchConfig{ + FlushTimeout: cfg.Timeout, + Sizer: exporterhelper.RequestSizerTypeItems, + MinSize: 0, // Default: no minimum + MaxSize: 0, // Default: no maximum + } + + // Map send_batch_size to MinSize (minimum items to trigger batch) + if cfg.SendBatchSize > 0 { + batchConfig.MinSize = int64(cfg.SendBatchSize) + } + + // Map send_batch_max_size to MaxSize (maximum items in batch) + if cfg.SendBatchMaxSize > 0 { + batchConfig.MaxSize = int64(cfg.SendBatchMaxSize) + } + + queueBatchConfig.Batch = configoptional.Some(batchConfig) + } + + return queueBatchConfig +} + +// newTracesProcessorWithExporterHelper creates a new traces processor using exporterhelper components. +func newTracesProcessorWithExporterHelper(set processor.Settings, nextConsumer consumer.Traces, cfg *Config) (processor.Traces, error) { + set.Logger.Info("Creating traces processor with ExporterHelper") + + queueBatchConfig := translateToExporterHelperConfig(cfg) + + exporterSet := exporter.Settings{ + ID: set.ID, + TelemetrySettings: set.TelemetrySettings, + BuildInfo: set.BuildInfo, + } + + result, err := exporterhelper.NewTraces( + context.Background(), + exporterSet, + cfg, + nextConsumer.ConsumeTraces, + exporterhelper.WithQueue(queueBatchConfig), + ) + return result, err +} + +// newMetricsProcessorWithExporterHelper creates a new metrics processor using exporterhelper components. +func newMetricsProcessorWithExporterHelper(set processor.Settings, nextConsumer consumer.Metrics, cfg *Config) (processor.Metrics, error) { + queueBatchConfig := translateToExporterHelperConfig(cfg) + + exporterSet := exporter.Settings{ + ID: set.ID, + TelemetrySettings: set.TelemetrySettings, + BuildInfo: set.BuildInfo, + } + + return exporterhelper.NewMetrics( + context.Background(), + exporterSet, + cfg, + nextConsumer.ConsumeMetrics, + exporterhelper.WithQueue(queueBatchConfig), + ) +} + +// newLogsProcessorWithExporterHelper creates a new logs processor using exporterhelper components. +func newLogsProcessorWithExporterHelper(set processor.Settings, nextConsumer consumer.Logs, cfg *Config) (processor.Logs, error) { + queueBatchConfig := translateToExporterHelperConfig(cfg) + + exporterSet := exporter.Settings{ + ID: set.ID, + TelemetrySettings: set.TelemetrySettings, + BuildInfo: set.BuildInfo, + } + + return exporterhelper.NewLogs( + context.Background(), + exporterSet, + cfg, + nextConsumer.ConsumeLogs, + exporterhelper.WithQueue(queueBatchConfig), + ) +} diff --git a/processor/batchprocessor/factory.go b/processor/batchprocessor/factory.go index 2cb52a12c396..bbf69e934b59 100644 --- a/processor/batchprocessor/factory.go +++ b/processor/batchprocessor/factory.go @@ -49,7 +49,11 @@ func createTraces( cfg component.Config, nextConsumer consumer.Traces, ) (processor.Traces, error) { - return newTracesBatchProcessor(set, nextConsumer, cfg.(*Config)) + batchCfg := cfg.(*Config) + if useExporterHelper.IsEnabled() { + return newTracesProcessorWithExporterHelper(set, nextConsumer, batchCfg) + } + return newTracesBatchProcessor(set, nextConsumer, batchCfg) } func createMetrics( @@ -58,7 +62,11 @@ func createMetrics( cfg component.Config, nextConsumer consumer.Metrics, ) (processor.Metrics, error) { - return newMetricsBatchProcessor(set, nextConsumer, cfg.(*Config)) + batchCfg := cfg.(*Config) + if useExporterHelper.IsEnabled() { + return newMetricsProcessorWithExporterHelper(set, nextConsumer, batchCfg) + } + return newMetricsBatchProcessor(set, nextConsumer, batchCfg) } func createLogs( @@ -67,5 +75,9 @@ func createLogs( cfg component.Config, nextConsumer consumer.Logs, ) (processor.Logs, error) { - return newLogsBatchProcessor(set, nextConsumer, cfg.(*Config)) + batchCfg := cfg.(*Config) + if useExporterHelper.IsEnabled() { + return newLogsProcessorWithExporterHelper(set, nextConsumer, batchCfg) + } + return newLogsBatchProcessor(set, nextConsumer, batchCfg) } diff --git a/processor/batchprocessor/factory_test.go b/processor/batchprocessor/factory_test.go index b0e7e2a8be38..52f7eadde859 100644 --- a/processor/batchprocessor/factory_test.go +++ b/processor/batchprocessor/factory_test.go @@ -10,6 +10,10 @@ import ( "github.com/stretchr/testify/assert" "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/pdata/ptrace" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/processor/processortest" ) @@ -22,21 +26,38 @@ func TestCreateDefaultConfig(t *testing.T) { } func TestCreateProcessor(t *testing.T) { + t.Run(t.Name() + "Legacy", func(t *testing.T) { testCreateProcessor(t, false) }) // Test legacy implementation + t.Run(t.Name() + "Helper", func(t *testing.T) { testCreateProcessor(t, true) }) // Test new implementation +} + +func testCreateProcessor(t *testing.T, useExporterHelper bool) { + defer setFeatureGateForTest(t, useExporterHelper)() + factory := NewFactory() cfg := factory.CreateDefaultConfig() creationSet := processortest.NewNopSettings(factory.Type()) - tp, err := factory.CreateTraces(context.Background(), creationSet, cfg, nil) + + tc, _ := consumer.NewTraces(func(context.Context, ptrace.Traces) error { + return nil + }) + tp, err := factory.CreateTraces(context.Background(), creationSet, cfg, tc) assert.NotNil(t, tp) assert.NoError(t, err, "cannot create trace processor") assert.NoError(t, tp.Shutdown(context.Background())) - mp, err := factory.CreateMetrics(context.Background(), creationSet, cfg, nil) + mc, _ := consumer.NewMetrics(func(context.Context, pmetric.Metrics) error { + return nil + }) + mp, err := factory.CreateMetrics(context.Background(), creationSet, cfg, mc) assert.NotNil(t, mp) assert.NoError(t, err, "cannot create metric processor") assert.NoError(t, mp.Shutdown(context.Background())) - lp, err := factory.CreateLogs(context.Background(), creationSet, cfg, nil) + lc, _ := consumer.NewLogs(func(context.Context, plog.Logs) error { + return nil + }) + lp, err := factory.CreateLogs(context.Background(), creationSet, cfg, lc) assert.NotNil(t, lp) assert.NoError(t, err, "cannot create logs processor") assert.NoError(t, lp.Shutdown(context.Background())) diff --git a/processor/batchprocessor/featuregates.go b/processor/batchprocessor/featuregates.go new file mode 100644 index 000000000000..b8ade37e6fe1 --- /dev/null +++ b/processor/batchprocessor/featuregates.go @@ -0,0 +1,36 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package batchprocessor // import "go.opentelemetry.io/collector/processor/batchprocessor" + +import ( + "go.opentelemetry.io/collector/featuregate" +) + +const ( + // useExporterHelperGate controls whether to use exporterhelper components + // for batching instead of the legacy implementation. + useExporterHelperGate = "processor.batch.useexporterhelper" + + // propagateErrorsGate controls whether to propagate errors from the next + // consumer instead of suppressing them (legacy behavior). + propagateErrorsGate = "processor.batch.propagateerrors" +) + +var ( + // UseExporterHelper is the feature gate for using exporterhelper components. + useExporterHelper = featuregate.GlobalRegistry().MustRegister( + useExporterHelperGate, + featuregate.StageAlpha, + featuregate.WithRegisterDescription("Use exporterhelper components for batching instead of legacy implementation"), + featuregate.WithRegisterFromVersion("v0.131.0"), + ) + + // PropagateErrors is the feature gate for propagating errors instead of suppressing them. + propagateErrors = featuregate.GlobalRegistry().MustRegister( + propagateErrorsGate, + featuregate.StageAlpha, + featuregate.WithRegisterDescription("Propagate errors from next consumer instead of suppressing them"), + featuregate.WithRegisterFromVersion("v0.131.0"), + ) +) diff --git a/processor/batchprocessor/go.mod b/processor/batchprocessor/go.mod index 670f3ac653c9..365a546d34bc 100644 --- a/processor/batchprocessor/go.mod +++ b/processor/batchprocessor/go.mod @@ -7,16 +7,20 @@ require ( go.opentelemetry.io/collector/client v1.37.0 go.opentelemetry.io/collector/component v1.37.0 go.opentelemetry.io/collector/component/componenttest v0.131.0 + go.opentelemetry.io/collector/config/configoptional v0.131.0 go.opentelemetry.io/collector/confmap v1.37.0 go.opentelemetry.io/collector/consumer v1.37.0 go.opentelemetry.io/collector/consumer/consumererror v0.131.0 go.opentelemetry.io/collector/consumer/consumertest v0.131.0 + go.opentelemetry.io/collector/exporter v1.36.1 + go.opentelemetry.io/collector/featuregate v1.37.0 go.opentelemetry.io/collector/pdata v1.37.0 go.opentelemetry.io/collector/pdata/testdata v0.131.0 go.opentelemetry.io/collector/processor v1.37.0 - go.opentelemetry.io/collector/processor/processortest v0.131.0 + go.opentelemetry.io/collector/processor/processortest v0.130.1 go.opentelemetry.io/otel v1.37.0 go.opentelemetry.io/otel/metric v1.37.0 + go.opentelemetry.io/otel/sdk v1.37.0 go.opentelemetry.io/otel/sdk/metric v1.37.0 go.opentelemetry.io/otel/trace v1.37.0 go.uber.org/goleak v1.3.0 @@ -24,6 +28,7 @@ require ( ) require ( + github.com/cenkalti/backoff/v5 v5.0.3 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/go-logr/logr v1.4.3 // indirect github.com/go-logr/stdr v1.2.2 // indirect @@ -43,15 +48,17 @@ require ( github.com/pmezard/go-difflib v1.0.0 // indirect go.opentelemetry.io/auto/sdk v1.1.0 // indirect go.opentelemetry.io/collector/component/componentstatus v0.131.0 // indirect + go.opentelemetry.io/collector/config/configretry v1.37.0 // indirect go.opentelemetry.io/collector/consumer/xconsumer v0.131.0 // indirect - go.opentelemetry.io/collector/featuregate v1.37.0 // indirect + go.opentelemetry.io/collector/extension v1.37.0 // indirect + go.opentelemetry.io/collector/extension/xextension v0.131.0 // indirect go.opentelemetry.io/collector/internal/telemetry v0.131.0 // indirect go.opentelemetry.io/collector/pdata/pprofile v0.131.0 // indirect + go.opentelemetry.io/collector/pdata/xpdata v0.131.0 // indirect go.opentelemetry.io/collector/pipeline v0.131.0 // indirect go.opentelemetry.io/collector/processor/xprocessor v0.131.0 // indirect go.opentelemetry.io/contrib/bridges/otelzap v0.12.0 // indirect go.opentelemetry.io/otel/log v0.13.0 // indirect - go.opentelemetry.io/otel/sdk v1.37.0 // indirect go.uber.org/multierr v1.11.0 // indirect go.yaml.in/yaml/v3 v3.0.4 // indirect golang.org/x/net v0.40.0 // indirect @@ -103,3 +110,7 @@ replace go.opentelemetry.io/collector/consumer/consumererror => ../../consumer/c replace go.opentelemetry.io/collector/featuregate => ../../featuregate replace go.opentelemetry.io/collector/internal/telemetry => ../../internal/telemetry + +replace go.opentelemetry.io/collector/config/configretry => ../../config/configretry + +replace go.opentelemetry.io/collector/exporter => ../../exporter diff --git a/processor/batchprocessor/go.sum b/processor/batchprocessor/go.sum index 985d043b9af1..1257030e5d9d 100644 --- a/processor/batchprocessor/go.sum +++ b/processor/batchprocessor/go.sum @@ -1,3 +1,5 @@ +github.com/cenkalti/backoff/v5 v5.0.3 h1:ZN+IMa753KfX5hd8vVaMixjnqRZ3y8CuJKRKj1xcsSM= +github.com/cenkalti/backoff/v5 v5.0.3/go.mod h1:rkhZdG3JZukswDf7f0cwqPNk4K0sa+F97BxZthm/crw= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -57,6 +59,26 @@ github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9de github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA= go.opentelemetry.io/auto/sdk v1.1.0/go.mod h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A= +go.opentelemetry.io/collector/config/configoptional v0.131.0 h1:l71mCsUeF3t8L5V9Z+vxVyd4CLXkRU4Z8IRFYlSsaWU= +go.opentelemetry.io/collector/config/configoptional v0.131.0/go.mod h1:OD5fc5qphgy9UxCuM8NiOaCHxHOf7/25lXr+ZBJQ1gM= +go.opentelemetry.io/collector/exporter/exportertest v0.131.0 h1:gb/uM1Md/nsmUPf6POP8mLX3yog+9ZvberW6wDL8uw0= +go.opentelemetry.io/collector/exporter/exportertest v0.131.0/go.mod h1:j5r6by0ZOraHg43DkLEJkf/dodqDVwHKmzSH17gnuRY= +go.opentelemetry.io/collector/exporter/xexporter v0.131.0 h1:px+eJTiQOoiR1g8hQGmyPHoPHTUuBkIMY39IOOg7zC8= +go.opentelemetry.io/collector/exporter/xexporter v0.131.0/go.mod h1:R4r3/xWpTh9UTndnZW36nHkz/gVB1SezJfiup7pjspE= +go.opentelemetry.io/collector/extension v1.37.0 h1:K2srrCLyJ/Lp6mkerdmaEffhRj9P2GgbdxGvQq7klWc= +go.opentelemetry.io/collector/extension v1.37.0/go.mod h1:EF+cNmOt1KCVnUWECKDn0pDEmB4G7SreUwnRDdgdJew= +go.opentelemetry.io/collector/extension/extensiontest v0.131.0 h1:nd4SFLoCiwcbaNYkunIUl36BTMHKyxyF6jFD1p8wIUo= +go.opentelemetry.io/collector/extension/extensiontest v0.131.0/go.mod h1:GyPzWLgXr8avCoo5/MHtiY8oSICHu/WKyuIFMOPPTY4= +go.opentelemetry.io/collector/extension/xextension v0.131.0 h1:O0ZQgOhoW3GdYThLlnDvRQg6TmSiMWE9Pt1/dfvL794= +go.opentelemetry.io/collector/extension/xextension v0.131.0/go.mod h1:aoCi/HMAyeqPiG6ee+Cfq9x0J4MqdSFhZyVoPgIlfgY= +go.opentelemetry.io/collector/pdata/xpdata v0.131.0 h1:jCxncJWMNc65rWZ8QSLhaMdY+wZfbZJBmQWMemkDq34= +go.opentelemetry.io/collector/pdata/xpdata v0.131.0/go.mod h1:Wp5QttVjWAiB0kOba+y4hS+aVjie+aOACEkUFlvWz3A= +go.opentelemetry.io/collector/receiver v1.37.0 h1:rXe+tbhoC5lRv0zW39KKrmqTYPCl1XRglu40GOiXI24= +go.opentelemetry.io/collector/receiver v1.37.0/go.mod h1:+C8whk742qLVKsbZd/uvEBFv2T72s2FX0Xp4lPOICb4= +go.opentelemetry.io/collector/receiver/receivertest v0.131.0 h1:YauYAFjw10pCgAVbYRNI4pUs1e5pwIwFeO1hTk+rW88= +go.opentelemetry.io/collector/receiver/receivertest v0.131.0/go.mod h1:u0gax7qrs0dGQlPzXIjeciV/oawRq6/7THtrup+r06k= +go.opentelemetry.io/collector/receiver/xreceiver v0.131.0 h1:6PS72NU5QVNGz0yBshagolQC+ANqM16n9khn1Z1CTV4= +go.opentelemetry.io/collector/receiver/xreceiver v0.131.0/go.mod h1:7DZtSz6yQJMHO3EfqwUhAjc3XpCckwWqu4AIaqbpvtU= go.opentelemetry.io/contrib/bridges/otelzap v0.12.0 h1:FGre0nZh5BSw7G73VpT3xs38HchsfPsa2aZtMp0NPOs= go.opentelemetry.io/contrib/bridges/otelzap v0.12.0/go.mod h1:X2PYPViI2wTPIMIOBjG17KNybTzsrATnvPJ02kkz7LM= go.opentelemetry.io/otel v1.37.0 h1:9zhNfelUvx0KBfu/gb+ZgeAfAgtWrfHJZcAqFC228wQ=