From e057e7731312942ccbf1ff88c3079e777f9a8e8f Mon Sep 17 00:00:00 2001 From: Romain Dauby Date: Mon, 30 Mar 2026 20:19:58 -0400 Subject: [PATCH 1/4] Interval processor flush buffer on shutdown --- .../intervalprocessor-flush-on-shutdown.yaml | 10 ++++ processor/intervalprocessor/README.md | 2 + processor/intervalprocessor/processor.go | 13 +++-- processor/intervalprocessor/processor_test.go | 49 ++++++++++++++++++- 4 files changed, 69 insertions(+), 5 deletions(-) create mode 100644 .chloggen/intervalprocessor-flush-on-shutdown.yaml diff --git a/.chloggen/intervalprocessor-flush-on-shutdown.yaml b/.chloggen/intervalprocessor-flush-on-shutdown.yaml new file mode 100644 index 0000000000000..fb1690f8e2305 --- /dev/null +++ b/.chloggen/intervalprocessor-flush-on-shutdown.yaml @@ -0,0 +1,10 @@ +# Use this changelog template to create an entry for release notes. +change_type: enhancement +component: processor/interval +note: Flush remaining buffered metrics on shutdown to prevent data loss during restarts and rollouts. +issues: [47238] +subtext: | + Previously, the interval processor would silently drop any metrics accumulated + in its buffer when receiving a shutdown signal. Now it flushes the buffer to + the next consumer before exiting, consistent with the batch processor behavior. +change_logs: [user] diff --git a/processor/intervalprocessor/README.md b/processor/intervalprocessor/README.md index 761754eb5afeb..396ed887e3a2e 100644 --- a/processor/intervalprocessor/README.md +++ b/processor/intervalprocessor/README.md @@ -78,3 +78,5 @@ At the next `interval` (15s by default), the processor would pass the following > [!IMPORTANT] > After exporting, any internal state is cleared. So if no new metrics come in, the next interval will export nothing. + +On shutdown (e.g. when the collector receives a SIGHUP or SIGTERM), the processor flushes all buffered metrics to the next component before exiting, ensuring no data is lost. diff --git a/processor/intervalprocessor/processor.go b/processor/intervalprocessor/processor.go index e24406333236f..b4218c86d17ba 100644 --- a/processor/intervalprocessor/processor.go +++ b/processor/intervalprocessor/processor.go @@ -27,6 +27,7 @@ type intervalProcessor struct { cancel context.CancelFunc logger *zap.Logger + wg sync.WaitGroup stateLock sync.Mutex md pmetric.Metrics @@ -70,14 +71,19 @@ func newProcessor(config *Config, log *zap.Logger, nextConsumer consumer.Metrics func (p *intervalProcessor) Start(_ context.Context, _ component.Host) error { exportTicker := time.NewTicker(p.config.Interval) + p.wg.Add(1) go func() { + defer p.wg.Done() for { select { case <-p.ctx.Done(): exportTicker.Stop() + // Flush remaining buffered metrics before exiting. + // Use context.Background() since p.ctx is already cancelled. + p.exportMetrics(context.Background()) return case <-exportTicker.C: - p.exportMetrics() + p.exportMetrics(p.ctx) } } }() @@ -87,6 +93,7 @@ func (p *intervalProcessor) Start(_ context.Context, _ component.Host) error { func (p *intervalProcessor) Shutdown(_ context.Context) error { p.cancel() + p.wg.Wait() return nil } @@ -201,7 +208,7 @@ func aggregateDataPoints[DPS metrics.DataPointSlice[DP], DP metrics.DataPoint[DP } } -func (p *intervalProcessor) exportMetrics() { +func (p *intervalProcessor) exportMetrics(ctx context.Context) { md := func() pmetric.Metrics { p.stateLock.Lock() defer p.stateLock.Unlock() @@ -223,7 +230,7 @@ func (p *intervalProcessor) exportMetrics() { return out }() - if err := p.nextConsumer.ConsumeMetrics(p.ctx, md); err != nil { + if err := p.nextConsumer.ConsumeMetrics(ctx, md); err != nil { p.logger.Error("Metrics export failed", zap.Error(err)) } } diff --git a/processor/intervalprocessor/processor_test.go b/processor/intervalprocessor/processor_test.go index 429d1629071b7..2a64c1494cf41 100644 --- a/processor/intervalprocessor/processor_test.go +++ b/processor/intervalprocessor/processor_test.go @@ -10,6 +10,7 @@ import ( "time" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/processor/processortest" @@ -70,7 +71,7 @@ func TestAggregation(t *testing.T) { processor := mgp.(*intervalProcessor) // Pretend we hit the interval timer and call export - processor.exportMetrics() + processor.exportMetrics(ctx) // All the lookup tables should now be empty require.Empty(t, processor.rmLookup) @@ -82,7 +83,7 @@ func TestAggregation(t *testing.T) { require.Empty(t, processor.summaryLookup) // Exporting again should return nothing - processor.exportMetrics() + processor.exportMetrics(ctx) // Next should have gotten three data sets: // 1. Anything left over from ConsumeMetrics() @@ -107,3 +108,47 @@ func TestAggregation(t *testing.T) { }) } } + +func TestFlushOnShutdown(t *testing.T) { + t.Parallel() + + // Use a very long interval so the ticker never fires during the test. + config := &Config{Interval: time.Hour} + next := &consumertest.MetricsSink{} + + factory := NewFactory() + mgp, err := factory.CreateMetrics( + t.Context(), + processortest.NewNopSettings(metadata.Type), + config, + next, + ) + require.NoError(t, err) + + // Start the processor. + err = mgp.Start(t.Context(), componenttest.NewNopHost()) + require.NoError(t, err) + + // Feed metrics into the processor. + dir := filepath.Join("testdata", "basic_aggregation") + md, err := golden.ReadMetrics(filepath.Join(dir, "input.yaml")) + require.NoError(t, err) + + err = mgp.ConsumeMetrics(t.Context(), md) + require.NoError(t, err) + + // Shutdown should flush the buffered metrics. + err = mgp.Shutdown(t.Context()) + require.NoError(t, err) + + // The next consumer should have received: + // 1. Pass-through metrics from ConsumeMetrics + // 2. Buffered metrics flushed on shutdown + allMetrics := next.AllMetrics() + require.Len(t, allMetrics, 2) + + // Verify the flushed data matches expected output. + expectedExportData, err := golden.ReadMetrics(filepath.Join(dir, "output.yaml")) + require.NoError(t, err) + require.NoError(t, pmetrictest.CompareMetrics(expectedExportData, allMetrics[1])) +} From 960c24bca3018ac31d5936cd8a789588f0d6b07d Mon Sep 17 00:00:00 2001 From: Romain Dauby Date: Mon, 30 Mar 2026 20:53:44 -0400 Subject: [PATCH 2/4] Fix linter --- processor/intervalprocessor/processor.go | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/processor/intervalprocessor/processor.go b/processor/intervalprocessor/processor.go index b4218c86d17ba..92050e005de92 100644 --- a/processor/intervalprocessor/processor.go +++ b/processor/intervalprocessor/processor.go @@ -71,9 +71,7 @@ func newProcessor(config *Config, log *zap.Logger, nextConsumer consumer.Metrics func (p *intervalProcessor) Start(_ context.Context, _ component.Host) error { exportTicker := time.NewTicker(p.config.Interval) - p.wg.Add(1) - go func() { - defer p.wg.Done() + p.wg.Go(func() { for { select { case <-p.ctx.Done(): @@ -86,7 +84,7 @@ func (p *intervalProcessor) Start(_ context.Context, _ component.Host) error { p.exportMetrics(p.ctx) } } - }() + }) return nil } From 82723eb2d271162928f0e0d3c943a9fb798c00c6 Mon Sep 17 00:00:00 2001 From: Romain Dauby Date: Tue, 31 Mar 2026 16:25:22 -0400 Subject: [PATCH 3/4] Remove comments --- processor/intervalprocessor/README.md | 4 +--- processor/intervalprocessor/processor_test.go | 7 ------- 2 files changed, 1 insertion(+), 10 deletions(-) diff --git a/processor/intervalprocessor/README.md b/processor/intervalprocessor/README.md index 396ed887e3a2e..186c93803bba2 100644 --- a/processor/intervalprocessor/README.md +++ b/processor/intervalprocessor/README.md @@ -77,6 +77,4 @@ At the next `interval` (15s by default), the processor would pass the following | 10 | test_metric | Cumulative | labelA: bar | 6.4 | > [!IMPORTANT] -> After exporting, any internal state is cleared. So if no new metrics come in, the next interval will export nothing. - -On shutdown (e.g. when the collector receives a SIGHUP or SIGTERM), the processor flushes all buffered metrics to the next component before exiting, ensuring no data is lost. +> After exporting, any internal state is cleared. So if no new metrics come in, the next interval will export nothing. \ No newline at end of file diff --git a/processor/intervalprocessor/processor_test.go b/processor/intervalprocessor/processor_test.go index 2a64c1494cf41..5d6d0ff76c6d4 100644 --- a/processor/intervalprocessor/processor_test.go +++ b/processor/intervalprocessor/processor_test.go @@ -125,11 +125,9 @@ func TestFlushOnShutdown(t *testing.T) { ) require.NoError(t, err) - // Start the processor. err = mgp.Start(t.Context(), componenttest.NewNopHost()) require.NoError(t, err) - // Feed metrics into the processor. dir := filepath.Join("testdata", "basic_aggregation") md, err := golden.ReadMetrics(filepath.Join(dir, "input.yaml")) require.NoError(t, err) @@ -137,17 +135,12 @@ func TestFlushOnShutdown(t *testing.T) { err = mgp.ConsumeMetrics(t.Context(), md) require.NoError(t, err) - // Shutdown should flush the buffered metrics. err = mgp.Shutdown(t.Context()) require.NoError(t, err) - // The next consumer should have received: - // 1. Pass-through metrics from ConsumeMetrics - // 2. Buffered metrics flushed on shutdown allMetrics := next.AllMetrics() require.Len(t, allMetrics, 2) - // Verify the flushed data matches expected output. expectedExportData, err := golden.ReadMetrics(filepath.Join(dir, "output.yaml")) require.NoError(t, err) require.NoError(t, pmetrictest.CompareMetrics(expectedExportData, allMetrics[1])) From 1e34b3f9d1ced5004c27a963fe4ab634f821f217 Mon Sep 17 00:00:00 2001 From: Romain Dauby Date: Tue, 31 Mar 2026 18:18:48 -0400 Subject: [PATCH 4/4] Revert README.md --- processor/intervalprocessor/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/processor/intervalprocessor/README.md b/processor/intervalprocessor/README.md index 186c93803bba2..761754eb5afeb 100644 --- a/processor/intervalprocessor/README.md +++ b/processor/intervalprocessor/README.md @@ -77,4 +77,4 @@ At the next `interval` (15s by default), the processor would pass the following | 10 | test_metric | Cumulative | labelA: bar | 6.4 | > [!IMPORTANT] -> After exporting, any internal state is cleared. So if no new metrics come in, the next interval will export nothing. \ No newline at end of file +> After exporting, any internal state is cleared. So if no new metrics come in, the next interval will export nothing.