Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 8 additions & 13 deletions sdk/metric/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,14 @@ func (p *pipeline) addMultiCallback(c multiCallback) (unregister func()) {
//
// This method is safe to call concurrently.
func (p *pipeline) produce(ctx context.Context, rm *metricdata.ResourceMetrics) error {
// Only check if context is already cancelled before starting, not inside or after callback loops.
// If this method returns after executing some callbacks but before running all aggregations,
// internal aggregation state can be corrupted and result in incorrect data returned
// by future produce calls.
if err := ctx.Err(); err != nil {
return err
}

p.Lock()
defer p.Unlock()

Expand All @@ -130,26 +138,13 @@ func (p *pipeline) produce(ctx context.Context, rm *metricdata.ResourceMetrics)
if e := c(ctx); e != nil {
err = errors.Join(err, e)
}
if err := ctx.Err(); err != nil {
rm.Resource = nil
clear(rm.ScopeMetrics) // Erase elements to let GC collect objects.
rm.ScopeMetrics = rm.ScopeMetrics[:0]
return err
}
}
for e := p.multiCallbacks.Front(); e != nil; e = e.Next() {
// TODO make the callbacks parallel. ( #3034 )
f := e.Value.(multiCallback)
if e := f(ctx); e != nil {
err = errors.Join(err, e)
}
if err := ctx.Err(); err != nil {
// This means the context expired before we finished running callbacks.
rm.Resource = nil
clear(rm.ScopeMetrics) // Erase elements to let GC collect objects.
rm.ScopeMetrics = rm.ScopeMetrics[:0]
return err
}
}

rm.Resource = p.resource
Expand Down
127 changes: 127 additions & 0 deletions sdk/metric/pipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -613,3 +613,130 @@ func TestPipelineWithMultipleReaders(t *testing.T) {
assert.Equal(t, int64(2), rm.ScopeMetrics[0].Metrics[0].Data.(metricdata.Sum[int64]).DataPoints[0].Value)
}
}

func TestPipelineProduceErrors(t *testing.T) {
Comment thread
dmathieu marked this conversation as resolved.
// Create a test pipeline with aggregations
pipeReader := NewManualReader()
pipe := newPipeline(nil, pipeReader, nil, exemplar.AlwaysOffFilter)

// Set up an observable with callbacks
var testObsID observableID[int64]
aggBuilder := aggregate.Builder[int64]{Temporality: metricdata.CumulativeTemporality}
measure, _ := aggBuilder.Sum(true)
pipe.addInt64Measure(testObsID, []aggregate.Measure[int64]{measure})

// Add an aggregation that just sets the data point value to the number of times the aggregation is invoked
aggCallCount := 0
inst := instrumentSync{
name: "test-metric",
description: "test description",
unit: "test unit",
compAgg: func(dest *metricdata.Aggregation) int {
aggCallCount++

*dest = metricdata.Sum[int64]{
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: false,
DataPoints: []metricdata.DataPoint[int64]{{Value: int64(aggCallCount)}},
}
return aggCallCount
},
}
pipe.addSync(instrumentation.Scope{Name: "test"}, inst)

ctx, cancelCtx := context.WithCancel(context.Background())
var shouldCancelContext bool // When true, the second callback cancels ctx
var shouldReturnError bool // When true, the third callback returns an error
var callbackCounts [3]int

// Callback 1: cancels the context during execution but continues to populate data
pipe.callbacks = append(pipe.callbacks, func(ctx context.Context) error {
callbackCounts[0]++
for _, m := range pipe.int64Measures[testObsID] {
m(ctx, 123, *attribute.EmptySet())
}
return nil
})

// Callback 2: populates int64 observable data
pipe.callbacks = append(pipe.callbacks, func(ctx context.Context) error {
callbackCounts[1]++
if shouldCancelContext {
cancelCtx()
}
return nil
})

// Callback 3: return an error
pipe.callbacks = append(pipe.callbacks, func(ctx context.Context) error {
callbackCounts[2]++
if shouldReturnError {
return fmt.Errorf("test callback error")
}
return nil
})

assertMetrics := func(rm *metricdata.ResourceMetrics, expectVal int64) {
require.Len(t, rm.ScopeMetrics, 1)
require.Len(t, rm.ScopeMetrics[0].Metrics, 1)
metricdatatest.AssertEqual(t, metricdata.Metrics{
Name: inst.name,
Description: inst.description,
Unit: inst.unit,
Data: metricdata.Sum[int64]{
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: false,
DataPoints: []metricdata.DataPoint[int64]{{Value: expectVal}},
},
}, rm.ScopeMetrics[0].Metrics[0], metricdatatest.IgnoreTimestamp())
}

t.Run("no errors", func(t *testing.T) {
var rm metricdata.ResourceMetrics
err := pipe.produce(ctx, &rm)
require.NoError(t, err)

assert.Equal(t, [3]int{1, 1, 1}, callbackCounts)
assert.Equal(t, 1, aggCallCount)

assertMetrics(&rm, 1)
})

t.Run("callback returns error", func(t *testing.T) {
shouldReturnError = true

var rm metricdata.ResourceMetrics
err := pipe.produce(ctx, &rm)
require.ErrorContains(t, err, "test callback error")

// Even though a callback returned an error, the agg function is still called
assert.Equal(t, [3]int{2, 2, 2}, callbackCounts)
assert.Equal(t, 2, aggCallCount)

assertMetrics(&rm, 2)
})

t.Run("context canceled during produce", func(t *testing.T) {
shouldCancelContext = true

var rm metricdata.ResourceMetrics
err := pipe.produce(ctx, &rm)
require.ErrorContains(t, err, "test callback error")

// Even though the context was canceled midway through invoking callbacks,
// all remaining callbacks and agg functions are still called
assert.Equal(t, [3]int{3, 3, 3}, callbackCounts)
assert.Equal(t, 3, aggCallCount)
})

t.Run("context already cancelled", func(t *testing.T) {
var output metricdata.ResourceMetrics
err := pipe.produce(ctx, &output)
require.ErrorIs(t, err, context.Canceled)

// No callbacks or agg functions are called since the context was canceled prior to invoking
// the produce method
assert.Equal(t, [3]int{3, 3, 3}, callbackCounts)
assert.Equal(t, 3, aggCallCount)
})
}
Loading