Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 38 additions & 19 deletions internal/component/common/loki/drain.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,32 +2,51 @@ package loki

import (
"context"
"sync"
"time"
)

// Drain consumes log entries from recv in a background goroutine while f executes.
// This prevents deadlocks that can occur when stopping components that may still be
// sending entries to the receiver channel. The draining goroutine will continue
// consuming entries until f returns, at which point the context is cancelled and
// the goroutine exits.
const DefaultDrainTimeout = 2 * time.Minute

// Drain forwards log entries from recv to fanout in a background goroutine while
// fn executes. It will continue to forward up to the timeout and then falls back
// to discarding entries from recv until fn returns. This prevents deadlocks in
// shutdown paths where component may still send to recv while fn is stopping them.
Comment thread
kalleep marked this conversation as resolved.
//
// This is typically used during component shutdown to drain any remaining entries
// from a receiver channel while performing cleanup operations.
func Drain(recv LogsReceiver, f func()) {
func Drain(recv LogsReceiver, fanout *Fanout, timeout time.Duration, fn func()) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
for {
select {
case <-ctx.Done():
var wg sync.WaitGroup

defer func() {
cancel()
wg.Wait()
}()

wg.Go(func() {
consumeCtx, consumeCancel := context.WithTimeout(ctx, timeout)
defer consumeCancel()
Consume(consumeCtx, recv, fanout)

// NOTE: If we could not forward entries during the configured timeout we discard entries.
// This is just to guard against deadlock. When fn finishes successfully this will stop.
discard(ctx, recv)
Comment thread
kalleep marked this conversation as resolved.
})
Comment thread
kalleep marked this conversation as resolved.

fn()
}

func discard(ctx context.Context, recv LogsReceiver) {
for {
select {
case <-ctx.Done():
return
case _, ok := <-recv.Chan():
// Consume and discard entries to prevent channel blocking
if !ok {
return
case _, ok := <-recv.Chan():
// Consume and discard entries to prevent channel blocking
if !ok {
return
}
}
}
}()

f()
}
}
103 changes: 89 additions & 14 deletions internal/component/common/loki/drain_test.go
Comment thread
kalleep marked this conversation as resolved.
Original file line number Diff line number Diff line change
@@ -1,38 +1,113 @@
package loki

import (
"strconv"
"sync"
"testing"
"testing/synctest"
"time"

"github.com/grafana/loki/pkg/push"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"
"go.uber.org/goleak"
)

func TestDrain(t *testing.T) {
recv := NewLogsReceiver()
defer goleak.VerifyNone(t)

var wg sync.WaitGroup
wg.Go(func() {
for range 10 {
entry := Entry{
t.Run("forwards while fn runs", func(t *testing.T) {
Comment thread
kalleep marked this conversation as resolved.
recv := NewLogsReceiver()
collector := NewCollectingHandler()
defer collector.Stop()

var producer sync.WaitGroup
producer.Go(func() {
recv.Chan() <- Entry{
Labels: model.LabelSet{"test": "label"},
Entry: push.Entry{
Timestamp: time.Now(),
Line: "test log entry",
Line: "forwarded",
},
}
recv.Chan() <- entry
}
})

Drain(recv, NewFanout([]LogsReceiver{collector.Receiver()}), time.Second, func() {
require.Eventually(t, func() bool {
return len(collector.Received()) == 1
}, time.Second, 10*time.Millisecond)
})

producer.Wait()
require.Len(t, collector.Received(), 1)
require.Equal(t, "forwarded", collector.Received()[0].Line)
})

completed := false
Drain(recv, func() {
time.Sleep(100 * time.Millisecond)
completed = true
t.Run("falls back to discard when forwarding blocks", func(t *testing.T) {
recv := NewLogsReceiver()
blockedRecv := NewLogsReceiver()

var producer sync.WaitGroup
producer.Go(func() {
for range 2 {
recv.Chan() <- Entry{
Labels: model.LabelSet{"test": "label"},
Entry: push.Entry{
Timestamp: time.Now(),
Line: "blocked",
},
}
}
})

Drain(recv, NewFanout([]LogsReceiver{blockedRecv}), 20*time.Millisecond, func() {
time.Sleep(100 * time.Millisecond)
})

producer.Wait()
})

wg.Wait()
require.True(t, completed, "Drain should complete without deadlock")
t.Run("forwards one entry and discard rest", func(t *testing.T) {
synctest.Test(t, func(t *testing.T) {
recv := NewLogsReceiver()
// Use a buffered channel so the first entry can always be forwarded to the fanout.
consumer := NewLogsReceiver(WithChannel(make(chan Entry, 1)))

var producerWG sync.WaitGroup
producerWG.Go(func() {
for i := range 3 {
recv.Chan() <- Entry{
Entry: push.Entry{
Timestamp: time.Now(),
Line: strconv.Itoa(i),
},
}
}
})

var wg sync.WaitGroup
wg.Go(func() {
Drain(recv, NewFanout([]LogsReceiver{consumer}), 100*time.Millisecond, func() {
// Wait until the producer has finished sending all entries.
producerWG.Wait()
})
})

// Wait until all go routines are blocked and advance time.
synctest.Wait()
time.Sleep(101 * time.Millisecond)
wg.Wait()

// Make sure we only get the first entry.
entry := <-consumer.Chan()
require.Equal(t, "0", entry.Line)
synctest.Wait()

select {
case extra := <-consumer.Chan():
t.Fatalf("unexpected extra forwarded entry: %q", extra.Line)
default:
}
})
})
}
2 changes: 1 addition & 1 deletion internal/component/loki/process/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func New(o component.Options, args Arguments) (*Component, error) {
// Run implements component.Component.
func (c *Component) Run(ctx context.Context) error {
defer func() {
loki.Drain(c.processOut, func() {
loki.Drain(c.processOut, c.fanout, loki.DefaultDrainTimeout, func() {
c.mut.Lock()
defer c.mut.Unlock()
if c.entryHandler != nil {
Expand Down
2 changes: 1 addition & 1 deletion internal/component/loki/source/cloudflare/cloudflare.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func (c *Component) Run(ctx context.Context) error {

// NOTE: We need to stop posFile first so we don't record entries we are draining.
c.posFile.Stop()
loki.Drain(c.handler, func() {
loki.Drain(c.handler, c.fanout, loki.DefaultDrainTimeout, func() {
c.mut.Lock()
defer c.mut.Unlock()
c.tailer.stop()
Expand Down
3 changes: 1 addition & 2 deletions internal/component/loki/source/docker/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,8 +160,7 @@ func (c *Component) Run(ctx context.Context) error {
c.exited.Store(true)
c.posFile.Stop()

// Start black hole drain routine to prevent deadlock when we call c.scheduler.Stop().
loki.Drain(c.handler, func() {
loki.Drain(c.handler, c.fanout, loki.DefaultDrainTimeout, func() {
c.mut.Lock()
defer c.mut.Unlock()
c.scheduler.Stop()
Expand Down
2 changes: 1 addition & 1 deletion internal/component/loki/source/journal/journal.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func (c *Component) Run(ctx context.Context) error {
// We need to stop posFile first so we don't record entries we are draining
c.positions.Stop()

loki.Drain(c.recv, func() {
loki.Drain(c.recv, c.fanout, loki.DefaultDrainTimeout, func() {
c.mut.Lock()
defer c.mut.Unlock()
if c.tailer != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func New(o component.Options, args Arguments) (*Component, error) {
func (c *Component) Run(ctx context.Context) error {
defer func() {
c.positions.Stop()
loki.Drain(c.handler, func() {
loki.Drain(c.handler, c.fanout, loki.DefaultDrainTimeout, func() {
c.mut.Lock()
defer c.mut.Unlock()
c.scheduler.Stop()
Expand Down
Loading