From df014295ed0f7827de4a86fd5fe42df1b71cbd98 Mon Sep 17 00:00:00 2001 From: Kalle <23356117+kalleep@users.noreply.github.com> Date: Thu, 19 Mar 2026 14:26:51 +0100 Subject: [PATCH 1/5] fix(loki): Make drain forward entries with fallback timeout --- internal/component/common/loki/drain.go | 57 +++++++++++------ internal/component/common/loki/drain_test.go | 62 ++++++++++++++----- internal/component/loki/process/process.go | 2 +- .../loki/source/cloudflare/cloudflare.go | 2 +- .../component/loki/source/docker/docker.go | 3 +- .../component/loki/source/journal/journal.go | 2 +- .../kubernetes_events/kubernetes_events.go | 2 +- 7 files changed, 90 insertions(+), 40 deletions(-) diff --git a/internal/component/common/loki/drain.go b/internal/component/common/loki/drain.go index 612c3d7100b..26579c99df9 100644 --- a/internal/component/common/loki/drain.go +++ b/internal/component/common/loki/drain.go @@ -2,32 +2,51 @@ package loki import ( "context" + "sync" + "time" ) -// Drain consumes log entries from recv in a background goroutine while f executes. -// This prevents deadlocks that can occur when stopping components that may still be -// sending entries to the receiver channel. The draining goroutine will continue -// consuming entries until f returns, at which point the context is cancelled and -// the goroutine exits. +const DefaultDrainTimeout = 2 * time.Minute + +// Drain forwards log entries from recv to fanout in a background goroutine while +// fn executes. If forwarding blocks for longer than timeout, Drain falls back +// to discarding entries from recv until fn returns. This prevents deadlocks in +// shutdown paths where component may still send to recv while fn is stopping them. // // This is typically used during component shutdown to drain any remaining entries // from a receiver channel while performing cleanup operations. -func Drain(recv LogsReceiver, f func()) { +func Drain(recv LogsReceiver, fanout *Fanout, timeout time.Duration, fn func()) { ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - go func() { - for { - select { - case <-ctx.Done(): + var wg sync.WaitGroup + + defer func() { + cancel() + wg.Wait() + }() + + wg.Go(func() { + consumeCtx, consumeCancel := context.WithTimeout(ctx, timeout) + Consume(consumeCtx, recv, fanout) + consumeCancel() + + // NOTE: If we could not forward entries within fallbackDuration we drain to nothing. + // This is just to gaurd against deadlock. If/when fn finish sucessfully this will stop. + discard(ctx, recv) + }) + + fn() +} + +func discard(ctx context.Context, recv LogsReceiver) { + for { + select { + case <-ctx.Done(): + return + case _, ok := <-recv.Chan(): + // Consume and discard entries to prevent channel blocking + if !ok { return - case _, ok := <-recv.Chan(): - // Consume and discard entries to prevent channel blocking - if !ok { - return - } } } - }() - - f() + } } diff --git a/internal/component/common/loki/drain_test.go b/internal/component/common/loki/drain_test.go index b153691cca7..54ac2726653 100644 --- a/internal/component/common/loki/drain_test.go +++ b/internal/component/common/loki/drain_test.go @@ -11,28 +11,60 @@ import ( ) func TestDrain(t *testing.T) { - recv := NewLogsReceiver() + t.Run("forwards while fn runs", func(t *testing.T) { + recv := NewLogsReceiver() + collector := NewCollectingHandler() + defer collector.Stop() - var wg sync.WaitGroup - wg.Go(func() { - for range 10 { - entry := Entry{ + var producer sync.WaitGroup + producer.Go(func() { + recv.Chan() <- Entry{ Labels: model.LabelSet{"test": "label"}, Entry: push.Entry{ Timestamp: time.Now(), - Line: "test log entry", + Line: "forwarded", }, } - recv.Chan() <- entry - } - }) + }) + + completed := false + Drain(recv, NewFanout([]LogsReceiver{collector.Receiver()}), time.Second, func() { + require.Eventually(t, func() bool { + return len(collector.Received()) == 1 + }, time.Second, 10*time.Millisecond) + completed = true + }) - completed := false - Drain(recv, func() { - time.Sleep(100 * time.Millisecond) - completed = true + producer.Wait() + require.True(t, completed) + require.Len(t, collector.Received(), 1) + require.Equal(t, "forwarded", collector.Received()[0].Line) }) - wg.Wait() - require.True(t, completed, "Drain should complete without deadlock") + t.Run("falls back to discard when forwarding blocks", func(t *testing.T) { + recv := NewLogsReceiver() + blockedRecv := NewLogsReceiver() + + var producer sync.WaitGroup + producer.Go(func() { + for range 2 { + recv.Chan() <- Entry{ + Labels: model.LabelSet{"test": "label"}, + Entry: push.Entry{ + Timestamp: time.Now(), + Line: "blocked", + }, + } + } + }) + + completed := false + Drain(recv, NewFanout([]LogsReceiver{blockedRecv}), 20*time.Millisecond, func() { + time.Sleep(100 * time.Millisecond) + completed = true + }) + + producer.Wait() + require.True(t, completed) + }) } diff --git a/internal/component/loki/process/process.go b/internal/component/loki/process/process.go index 3b86d5b8a98..79186913184 100644 --- a/internal/component/loki/process/process.go +++ b/internal/component/loki/process/process.go @@ -89,7 +89,7 @@ func New(o component.Options, args Arguments) (*Component, error) { // Run implements component.Component. func (c *Component) Run(ctx context.Context) error { defer func() { - loki.Drain(c.processOut, func() { + loki.Drain(c.processOut, c.fanout, loki.DefaultDrainTimeout, func() { c.mut.Lock() defer c.mut.Unlock() if c.entryHandler != nil { diff --git a/internal/component/loki/source/cloudflare/cloudflare.go b/internal/component/loki/source/cloudflare/cloudflare.go index df311cb4cd3..64e6f0f2ed6 100644 --- a/internal/component/loki/source/cloudflare/cloudflare.go +++ b/internal/component/loki/source/cloudflare/cloudflare.go @@ -138,7 +138,7 @@ func (c *Component) Run(ctx context.Context) error { // NOTE: We need to stop posFile first so we don't record entries we are draining. c.posFile.Stop() - loki.Drain(c.handler, func() { + loki.Drain(c.handler, c.fanout, loki.DefaultDrainTimeout, func() { c.mut.Lock() defer c.mut.Unlock() c.tailer.stop() diff --git a/internal/component/loki/source/docker/docker.go b/internal/component/loki/source/docker/docker.go index 4b8b37489df..25b47525f27 100644 --- a/internal/component/loki/source/docker/docker.go +++ b/internal/component/loki/source/docker/docker.go @@ -160,8 +160,7 @@ func (c *Component) Run(ctx context.Context) error { c.exited.Store(true) c.posFile.Stop() - // Start black hole drain routine to prevent deadlock when we call c.scheduler.Stop(). - loki.Drain(c.handler, func() { + loki.Drain(c.handler, c.fanout, loki.DefaultDrainTimeout, func() { c.mut.Lock() defer c.mut.Unlock() c.scheduler.Stop() diff --git a/internal/component/loki/source/journal/journal.go b/internal/component/loki/source/journal/journal.go index da8ff942c73..6b2a2ee3b60 100644 --- a/internal/component/loki/source/journal/journal.go +++ b/internal/component/loki/source/journal/journal.go @@ -93,7 +93,7 @@ func (c *Component) Run(ctx context.Context) error { // We need to stop posFile first so we don't record entries we are draining c.positions.Stop() - loki.Drain(c.recv, func() { + loki.Drain(c.recv, c.fanout, loki.DefaultDrainTimeout, func() { c.mut.Lock() defer c.mut.Unlock() if c.tailer != nil { diff --git a/internal/component/loki/source/kubernetes_events/kubernetes_events.go b/internal/component/loki/source/kubernetes_events/kubernetes_events.go index 6319e41ddc4..fde8ffe37ea 100644 --- a/internal/component/loki/source/kubernetes_events/kubernetes_events.go +++ b/internal/component/loki/source/kubernetes_events/kubernetes_events.go @@ -129,7 +129,7 @@ func New(o component.Options, args Arguments) (*Component, error) { func (c *Component) Run(ctx context.Context) error { defer func() { c.positions.Stop() - loki.Drain(c.handler, func() { + loki.Drain(c.handler, c.fanout, loki.DefaultDrainTimeout, func() { c.mut.Lock() defer c.mut.Unlock() c.scheduler.Stop() From 31558aa698a314c48a043938ef8b33ef3bb23850 Mon Sep 17 00:00:00 2001 From: Karl Persson <23356117+kalleep@users.noreply.github.com> Date: Thu, 19 Mar 2026 14:52:04 +0100 Subject: [PATCH 2/5] Potential fix for pull request finding Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> --- internal/component/common/loki/drain.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/component/common/loki/drain.go b/internal/component/common/loki/drain.go index 26579c99df9..55ceff45b15 100644 --- a/internal/component/common/loki/drain.go +++ b/internal/component/common/loki/drain.go @@ -30,7 +30,7 @@ func Drain(recv LogsReceiver, fanout *Fanout, timeout time.Duration, fn func()) consumeCancel() // NOTE: If we could not forward entries within fallbackDuration we drain to nothing. - // This is just to gaurd against deadlock. If/when fn finish sucessfully this will stop. + // This is just to guard against deadlock. If/when fn finish sucessfully this will stop. discard(ctx, recv) }) From d3b29c919b0016666cac26eb0adeda4b1f975831 Mon Sep 17 00:00:00 2001 From: Karl Persson <23356117+kalleep@users.noreply.github.com> Date: Thu, 19 Mar 2026 23:21:49 +0100 Subject: [PATCH 3/5] Update internal/component/common/loki/drain.go Co-authored-by: Kyle Eckhart --- internal/component/common/loki/drain.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/component/common/loki/drain.go b/internal/component/common/loki/drain.go index 55ceff45b15..5c1410fffec 100644 --- a/internal/component/common/loki/drain.go +++ b/internal/component/common/loki/drain.go @@ -9,7 +9,7 @@ import ( const DefaultDrainTimeout = 2 * time.Minute // Drain forwards log entries from recv to fanout in a background goroutine while -// fn executes. If forwarding blocks for longer than timeout, Drain falls back +// fn executes. It will continue to forward up to the timeout and then falls back // to discarding entries from recv until fn returns. This prevents deadlocks in // shutdown paths where component may still send to recv while fn is stopping them. // From d76b2adf958a9e5a79abeac421c9b71d5fe942fb Mon Sep 17 00:00:00 2001 From: Kalle <23356117+kalleep@users.noreply.github.com> Date: Thu, 19 Mar 2026 23:27:11 +0100 Subject: [PATCH 4/5] add leak detection and call cancel in defer --- internal/component/common/loki/drain.go | 2 +- internal/component/common/loki/drain_test.go | 3 +++ 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/internal/component/common/loki/drain.go b/internal/component/common/loki/drain.go index 5c1410fffec..5ee8b0c2bbe 100644 --- a/internal/component/common/loki/drain.go +++ b/internal/component/common/loki/drain.go @@ -26,8 +26,8 @@ func Drain(recv LogsReceiver, fanout *Fanout, timeout time.Duration, fn func()) wg.Go(func() { consumeCtx, consumeCancel := context.WithTimeout(ctx, timeout) + defer consumeCancel() Consume(consumeCtx, recv, fanout) - consumeCancel() // NOTE: If we could not forward entries within fallbackDuration we drain to nothing. // This is just to guard against deadlock. If/when fn finish sucessfully this will stop. diff --git a/internal/component/common/loki/drain_test.go b/internal/component/common/loki/drain_test.go index 54ac2726653..2a3ec36ea89 100644 --- a/internal/component/common/loki/drain_test.go +++ b/internal/component/common/loki/drain_test.go @@ -8,9 +8,12 @@ import ( "github.com/grafana/loki/pkg/push" "github.com/prometheus/common/model" "github.com/stretchr/testify/require" + "go.uber.org/goleak" ) func TestDrain(t *testing.T) { + defer goleak.VerifyNone(t) + t.Run("forwards while fn runs", func(t *testing.T) { recv := NewLogsReceiver() collector := NewCollectingHandler() From 9657ebda32414502cb2da74e3981a80240e4d993 Mon Sep 17 00:00:00 2001 From: Kalle <23356117+kalleep@users.noreply.github.com> Date: Fri, 20 Mar 2026 09:17:17 +0100 Subject: [PATCH 5/5] Add test to make sure we forward one entry and discard the rest --- internal/component/common/loki/drain.go | 4 +- internal/component/common/loki/drain_test.go | 52 +++++++++++++++++--- 2 files changed, 48 insertions(+), 8 deletions(-) diff --git a/internal/component/common/loki/drain.go b/internal/component/common/loki/drain.go index 5ee8b0c2bbe..101c2f14bef 100644 --- a/internal/component/common/loki/drain.go +++ b/internal/component/common/loki/drain.go @@ -29,8 +29,8 @@ func Drain(recv LogsReceiver, fanout *Fanout, timeout time.Duration, fn func()) defer consumeCancel() Consume(consumeCtx, recv, fanout) - // NOTE: If we could not forward entries within fallbackDuration we drain to nothing. - // This is just to guard against deadlock. If/when fn finish sucessfully this will stop. + // NOTE: If we could not forward entries during the configured timeout we discard entries. + // This is just to guard against deadlock. When fn finishes successfully this will stop. discard(ctx, recv) }) diff --git a/internal/component/common/loki/drain_test.go b/internal/component/common/loki/drain_test.go index 2a3ec36ea89..a47ddb68387 100644 --- a/internal/component/common/loki/drain_test.go +++ b/internal/component/common/loki/drain_test.go @@ -1,8 +1,10 @@ package loki import ( + "strconv" "sync" "testing" + "testing/synctest" "time" "github.com/grafana/loki/pkg/push" @@ -30,16 +32,13 @@ func TestDrain(t *testing.T) { } }) - completed := false Drain(recv, NewFanout([]LogsReceiver{collector.Receiver()}), time.Second, func() { require.Eventually(t, func() bool { return len(collector.Received()) == 1 }, time.Second, 10*time.Millisecond) - completed = true }) producer.Wait() - require.True(t, completed) require.Len(t, collector.Received(), 1) require.Equal(t, "forwarded", collector.Received()[0].Line) }) @@ -61,13 +60,54 @@ func TestDrain(t *testing.T) { } }) - completed := false Drain(recv, NewFanout([]LogsReceiver{blockedRecv}), 20*time.Millisecond, func() { time.Sleep(100 * time.Millisecond) - completed = true }) producer.Wait() - require.True(t, completed) + }) + + t.Run("forwards one entry and discard rest", func(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + recv := NewLogsReceiver() + // Use a buffered channel so the first entry can always be forwarded to the fanout. + consumer := NewLogsReceiver(WithChannel(make(chan Entry, 1))) + + var producerWG sync.WaitGroup + producerWG.Go(func() { + for i := range 3 { + recv.Chan() <- Entry{ + Entry: push.Entry{ + Timestamp: time.Now(), + Line: strconv.Itoa(i), + }, + } + } + }) + + var wg sync.WaitGroup + wg.Go(func() { + Drain(recv, NewFanout([]LogsReceiver{consumer}), 100*time.Millisecond, func() { + // Wait until the producer has finished sending all entries. + producerWG.Wait() + }) + }) + + // Wait until all go routines are blocked and advance time. + synctest.Wait() + time.Sleep(101 * time.Millisecond) + wg.Wait() + + // Make sure we only get the first entry. + entry := <-consumer.Chan() + require.Equal(t, "0", entry.Line) + synctest.Wait() + + select { + case extra := <-consumer.Chan(): + t.Fatalf("unexpected extra forwarded entry: %q", extra.Line) + default: + } + }) }) }