From 52473075d32d0715dca4d5afaceb6babaa2adc93 Mon Sep 17 00:00:00 2001 From: Roger Coll Date: Wed, 7 Jan 2026 18:06:23 +0100 Subject: [PATCH 1/3] test: context cancel not being propagated --- reporter/collector_reporter_test.go | 52 +++++++++++++++++++++++++++++ 1 file changed, 52 insertions(+) diff --git a/reporter/collector_reporter_test.go b/reporter/collector_reporter_test.go index d1effae07..81d0a88b3 100644 --- a/reporter/collector_reporter_test.go +++ b/reporter/collector_reporter_test.go @@ -3,14 +3,18 @@ package reporter import ( "context" "errors" + "sync/atomic" "testing" + "time" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/consumer/xconsumer" "go.opentelemetry.io/collector/pdata/pprofile" "go.opentelemetry.io/ebpf-profiler/libpf" "go.opentelemetry.io/ebpf-profiler/reporter/samples" + "go.opentelemetry.io/ebpf-profiler/support" ) func TestCollectorReporterReportTraceEvent(t *testing.T) { @@ -60,3 +64,51 @@ func TestCollectorReporterReportTraceEvent(t *testing.T) { }) } } + +func TestCollectorReporterShutdown(t *testing.T) { + var cancelled atomic.Bool + consumerStarted := make(chan struct{}) + next, err := xconsumer.NewProfiles(func(ctx context.Context, _ pprofile.Profiles) error { + close(consumerStarted) + select { + case <-ctx.Done(): + cancelled.Store(true) + return nil + } + }) + require.NoError(t, err) + + r, err := NewCollector(&Config{ + ReportInterval: 10 * time.Millisecond, + }, next) + require.NoError(t, err) + + traceEventsPtr := r.traceEvents.WLock() + tree := (*traceEventsPtr) + tree[libpf.NullString] = map[libpf.Origin]samples.KeyToEventMapping{ + support.TraceOriginProbe: map[samples.TraceAndMetaKey]*samples.TraceEvents{ + {Pid: 1}: { + Frames: func() libpf.Frames { + frames := make(libpf.Frames, 0, 1) + frames.Append(&libpf.Frame{ + Type: libpf.KernelFrame, + AddressOrLineno: 0xef, + FunctionName: libpf.Intern("func1"), + }) + return frames + }(), + Timestamps: []uint64{1, 2, 3, 4}, + }, + }, + } + r.traceEvents.WUnlock(&traceEventsPtr) + + ctx, cancelFn := context.WithCancel(t.Context()) + require.NoError(t, r.Start(ctx)) + // BLOCK until the consumer is actually running + <-consumerStarted + cancelFn() + require.EventuallyWithT(t, func(collect *assert.CollectT) { + assert.True(collect, cancelled.Load()) + }, 300*time.Millisecond, 50*time.Millisecond, "failed to cancel consumer") +} From 5b5bb3cc6cd496758d1cd31fa24d8880ec0a7ee0 Mon Sep 17 00:00:00 2001 From: Roger Coll Date: Thu, 8 Jan 2026 09:15:02 +0100 Subject: [PATCH 2/3] fix(collector): propagate context to reportProfile for graceful shutdown --- reporter/collector_reporter.go | 2 +- reporter/collector_reporter_test.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/reporter/collector_reporter.go b/reporter/collector_reporter.go index 178289c6a..a320a3e4f 100644 --- a/reporter/collector_reporter.go +++ b/reporter/collector_reporter.go @@ -57,7 +57,7 @@ func (r *CollectorReporter) Start(ctx context.Context) error { ctx, cancelReporting := context.WithCancel(ctx) r.runLoop.Start(ctx, r.cfg.ReportInterval, func() { - if err := r.reportProfile(context.Background()); err != nil { + if err := r.reportProfile(ctx); err != nil { log.Errorf("Request failed: %v", err) } }, func() { diff --git a/reporter/collector_reporter_test.go b/reporter/collector_reporter_test.go index 81d0a88b3..3ca0bdf64 100644 --- a/reporter/collector_reporter_test.go +++ b/reporter/collector_reporter_test.go @@ -110,5 +110,5 @@ func TestCollectorReporterShutdown(t *testing.T) { cancelFn() require.EventuallyWithT(t, func(collect *assert.CollectT) { assert.True(collect, cancelled.Load()) - }, 300*time.Millisecond, 50*time.Millisecond, "failed to cancel consumer") + }, 300*time.Millisecond, 50*time.Millisecond, "consumer did not exit after context cancellation") } From f0a5f347fefa7b4471f743e4131647da314ab13f Mon Sep 17 00:00:00 2001 From: Roger Coll Date: Thu, 8 Jan 2026 10:19:42 +0100 Subject: [PATCH 3/3] Update reporter/collector_reporter_test.go Co-authored-by: Christos Kalkanis --- reporter/collector_reporter_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/reporter/collector_reporter_test.go b/reporter/collector_reporter_test.go index 3ca0bdf64..56185416c 100644 --- a/reporter/collector_reporter_test.go +++ b/reporter/collector_reporter_test.go @@ -110,5 +110,5 @@ func TestCollectorReporterShutdown(t *testing.T) { cancelFn() require.EventuallyWithT(t, func(collect *assert.CollectT) { assert.True(collect, cancelled.Load()) - }, 300*time.Millisecond, 50*time.Millisecond, "consumer did not exit after context cancellation") + }, 5*time.Second, 100*time.Millisecond, "consumer did not exit after context cancellation") }