Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions .chloggen/intervalprocessor-flush-on-shutdown.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# Use this changelog template to create an entry for release notes.
change_type: enhancement
component: processor/interval
note: Flush remaining buffered metrics on shutdown to prevent data loss during restarts and rollouts.
issues: [47238]
subtext: |
Previously, the interval processor would silently drop any metrics accumulated
in its buffer when receiving a shutdown signal. Now it flushes the buffer to
the next consumer before exiting, consistent with the batch processor behavior.
change_logs: [user]
15 changes: 10 additions & 5 deletions processor/intervalprocessor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type intervalProcessor struct {
cancel context.CancelFunc
logger *zap.Logger

wg sync.WaitGroup
stateLock sync.Mutex

md pmetric.Metrics
Expand Down Expand Up @@ -70,23 +71,27 @@ func newProcessor(config *Config, log *zap.Logger, nextConsumer consumer.Metrics

func (p *intervalProcessor) Start(_ context.Context, _ component.Host) error {
exportTicker := time.NewTicker(p.config.Interval)
go func() {
p.wg.Go(func() {
for {
select {
case <-p.ctx.Done():
exportTicker.Stop()
// Flush remaining buffered metrics before exiting.
// Use context.Background() since p.ctx is already cancelled.
p.exportMetrics(context.Background())
Comment thread
edmocosta marked this conversation as resolved.
return
case <-exportTicker.C:
p.exportMetrics()
p.exportMetrics(p.ctx)
}
}
}()
})

return nil
}

func (p *intervalProcessor) Shutdown(_ context.Context) error {
p.cancel()
p.wg.Wait()
return nil
}

Expand Down Expand Up @@ -201,7 +206,7 @@ func aggregateDataPoints[DPS metrics.DataPointSlice[DP], DP metrics.DataPoint[DP
}
}

func (p *intervalProcessor) exportMetrics() {
func (p *intervalProcessor) exportMetrics(ctx context.Context) {
md := func() pmetric.Metrics {
p.stateLock.Lock()
defer p.stateLock.Unlock()
Expand All @@ -223,7 +228,7 @@ func (p *intervalProcessor) exportMetrics() {
return out
}()

if err := p.nextConsumer.ConsumeMetrics(p.ctx, md); err != nil {
if err := p.nextConsumer.ConsumeMetrics(ctx, md); err != nil {
p.logger.Error("Metrics export failed", zap.Error(err))
}
}
Expand Down
42 changes: 40 additions & 2 deletions processor/intervalprocessor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"time"

"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/processor/processortest"
Expand Down Expand Up @@ -70,7 +71,7 @@ func TestAggregation(t *testing.T) {
processor := mgp.(*intervalProcessor)

// Pretend we hit the interval timer and call export
processor.exportMetrics()
processor.exportMetrics(ctx)

// All the lookup tables should now be empty
require.Empty(t, processor.rmLookup)
Expand All @@ -82,7 +83,7 @@ func TestAggregation(t *testing.T) {
require.Empty(t, processor.summaryLookup)

// Exporting again should return nothing
processor.exportMetrics()
processor.exportMetrics(ctx)

// Next should have gotten three data sets:
// 1. Anything left over from ConsumeMetrics()
Expand All @@ -107,3 +108,40 @@ func TestAggregation(t *testing.T) {
})
}
}

func TestFlushOnShutdown(t *testing.T) {
t.Parallel()

// Use a very long interval so the ticker never fires during the test.
config := &Config{Interval: time.Hour}
next := &consumertest.MetricsSink{}

factory := NewFactory()
mgp, err := factory.CreateMetrics(
t.Context(),
processortest.NewNopSettings(metadata.Type),
config,
next,
)
require.NoError(t, err)

err = mgp.Start(t.Context(), componenttest.NewNopHost())
require.NoError(t, err)

dir := filepath.Join("testdata", "basic_aggregation")
md, err := golden.ReadMetrics(filepath.Join(dir, "input.yaml"))
require.NoError(t, err)

err = mgp.ConsumeMetrics(t.Context(), md)
require.NoError(t, err)

err = mgp.Shutdown(t.Context())
require.NoError(t, err)

allMetrics := next.AllMetrics()
require.Len(t, allMetrics, 2)

expectedExportData, err := golden.ReadMetrics(filepath.Join(dir, "output.yaml"))
require.NoError(t, err)
require.NoError(t, pmetrictest.CompareMetrics(expectedExportData, allMetrics[1]))
}
Loading