Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
package source
package loki

import (
"context"

"github.com/grafana/alloy/internal/component/common/loki"
)

// Consume continuously reads log entries from recv and forwards them to the fanout f.
Expand All @@ -12,8 +10,8 @@ import (
// This function is typically used in component Run methods to handle the forwarding
// of log entries from a component's internal handler to downstream receivers.
// The fanout allows entries to be sent to multiple receivers concurrently.
func Consume(ctx context.Context, recv loki.LogsReceiver, f *loki.Fanout) {
consume(ctx, recv, f, func(e loki.Entry) loki.Entry { return e })
func Consume(ctx context.Context, recv LogsReceiver, f *Fanout) {
consume(ctx, recv, f, func(e Entry) Entry { return e })
}

// ConsumeAndProcess continuously reads log entries from recv, processes them using processFn,
Expand All @@ -27,19 +25,19 @@ func Consume(ctx context.Context, recv loki.LogsReceiver, f *loki.Fanout) {
// or enrichment of log entries.
func ConsumeAndProcess(
ctx context.Context,
recv loki.LogsReceiver,
f *loki.Fanout,
processFn func(e loki.Entry) loki.Entry,
recv LogsReceiver,
f *Fanout,
processFn func(e Entry) Entry,
) {

consume(ctx, recv, f, processFn)
}

func consume(
ctx context.Context,
recv loki.LogsReceiver,
f *loki.Fanout,
processFn func(e loki.Entry) loki.Entry,
recv LogsReceiver,
f *Fanout,
processFn func(e Entry) Entry,
) {

for {
Expand All @@ -61,7 +59,7 @@ func consume(
// This function is typically used in component Run methods to handle the forwarding
// of log entries from a component's internal handler to downstream receivers.
// The fanout allows entries to be sent to multiple receivers concurrently.
func ConsumeBatch(ctx context.Context, recv loki.LogsBatchReceiver, f *loki.Fanout) {
func ConsumeBatch(ctx context.Context, recv LogsBatchReceiver, f *Fanout) {
for {
select {
case <-ctx.Done():
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package source
package loki

import (
"context"
Expand All @@ -7,14 +7,12 @@ import (

"github.com/grafana/loki/pkg/push"
"github.com/stretchr/testify/require"

"github.com/grafana/alloy/internal/component/common/loki"
)

func TestConsume(t *testing.T) {
consumer := loki.NewLogsReceiver()
producer := loki.NewLogsReceiver()
fanout := loki.NewFanout([]loki.LogsReceiver{consumer})
consumer := NewLogsReceiver()
producer := NewLogsReceiver()
fanout := NewFanout([]LogsReceiver{consumer})

t.Run("should fanout any consumed entries", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
Expand All @@ -24,7 +22,7 @@ func TestConsume(t *testing.T) {
Consume(ctx, producer, fanout)
})

producer.Chan() <- loki.Entry{Entry: push.Entry{Line: "1"}}
producer.Chan() <- Entry{Entry: push.Entry{Line: "1"}}
e := <-consumer.Chan()
require.Equal(t, "1", e.Entry.Line)
cancel()
Expand All @@ -38,21 +36,21 @@ func TestConsume(t *testing.T) {
Consume(ctx, producer, fanout)
})

producer.Chan() <- loki.Entry{Entry: push.Entry{Line: "1"}}
producer.Chan() <- Entry{Entry: push.Entry{Line: "1"}}
cancel()
wg.Wait()
})
}

func TestConsumeAndProcess(t *testing.T) {
consumer := loki.NewLogsReceiver()
producer := loki.NewLogsReceiver()
fanout := loki.NewFanout([]loki.LogsReceiver{consumer})
consumer := NewLogsReceiver()
producer := NewLogsReceiver()
fanout := NewFanout([]LogsReceiver{consumer})

t.Run("should process and fanout any consumed entries", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())

processFn := func(e loki.Entry) loki.Entry {
processFn := func(e Entry) Entry {
e.Entry.Line = "processed: " + e.Entry.Line
return e
}
Expand All @@ -62,7 +60,7 @@ func TestConsumeAndProcess(t *testing.T) {
ConsumeAndProcess(ctx, producer, fanout, processFn)
})

producer.Chan() <- loki.Entry{Entry: push.Entry{Line: "1"}}
producer.Chan() <- Entry{Entry: push.Entry{Line: "1"}}
e := <-consumer.Chan()
require.Equal(t, "processed: 1", e.Entry.Line)
cancel()
Expand All @@ -71,24 +69,24 @@ func TestConsumeAndProcess(t *testing.T) {

t.Run("should stop if context is canceled while trying to fanout", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
processFn := func(e loki.Entry) loki.Entry {
processFn := func(e Entry) Entry {
return e
}
wg := sync.WaitGroup{}
wg.Go(func() {
ConsumeAndProcess(ctx, producer, fanout, processFn)
})

producer.Chan() <- loki.Entry{Entry: push.Entry{Line: "1"}}
producer.Chan() <- Entry{Entry: push.Entry{Line: "1"}}
cancel()
wg.Wait()
})
}

func TestConsumeBatch(t *testing.T) {
consumer := loki.NewLogsReceiver()
producer := loki.NewLogsBatchReceiver()
fanout := loki.NewFanout([]loki.LogsReceiver{consumer})
consumer := NewLogsReceiver()
producer := NewLogsBatchReceiver()
fanout := NewFanout([]LogsReceiver{consumer})

t.Run("should fanout any consumed entries", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
Expand All @@ -98,7 +96,7 @@ func TestConsumeBatch(t *testing.T) {
ConsumeBatch(ctx, producer, fanout)
})

producer.Chan() <- []loki.Entry{{Entry: push.Entry{Line: "1"}}, {Entry: push.Entry{Line: "2"}}}
producer.Chan() <- []Entry{{Entry: push.Entry{Line: "1"}}, {Entry: push.Entry{Line: "2"}}}
e := <-consumer.Chan()
require.Equal(t, "1", e.Entry.Line)
e = <-consumer.Chan()
Expand All @@ -114,7 +112,7 @@ func TestConsumeBatch(t *testing.T) {
ConsumeBatch(ctx, producer, fanout)
})

producer.Chan() <- []loki.Entry{{Entry: push.Entry{Line: "1"}}, {Entry: push.Entry{Line: "2"}}}
producer.Chan() <- []Entry{{Entry: push.Entry{Line: "1"}}, {Entry: push.Entry{Line: "2"}}}
cancel()
wg.Wait()
})
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
package source
package loki

import (
"context"

"github.com/grafana/alloy/internal/component/common/loki"
)

// Drain consumes log entries from recv in a background goroutine while f executes.
Expand All @@ -14,7 +12,7 @@ import (
//
// This is typically used during component shutdown to drain any remaining entries
// from a receiver channel while performing cleanup operations.
func Drain(recv loki.LogsReceiver, f func()) {
func Drain(recv LogsReceiver, f func()) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,23 +1,22 @@
package source
package loki

import (
"sync"
"testing"
"time"

"github.com/grafana/alloy/internal/component/common/loki"
"github.com/grafana/loki/pkg/push"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"
)

func TestDrain(t *testing.T) {
recv := loki.NewLogsReceiver()
recv := NewLogsReceiver()

var wg sync.WaitGroup
wg.Go(func() {
for range 10 {
entry := loki.Entry{
entry := Entry{
Labels: model.LabelSet{"test": "label"},
Entry: push.Entry{
Timestamp: time.Now(),
Expand Down
Loading
Loading