From 3b838db42d95602bde06cd93198bdfd4a5db0fd4 Mon Sep 17 00:00:00 2001 From: Kalle <23356117+kalleep@users.noreply.github.com> Date: Wed, 18 Mar 2026 15:03:56 +0100 Subject: [PATCH 1/4] Move Consume and Drain functions to shared loki package --- .../{loki/source => common/loki}/consume.go | 22 +++++------ .../source => common/loki}/consume_test.go | 38 +++++++++---------- .../{loki/source => common/loki}/drain.go | 6 +-- .../source => common/loki}/drain_test.go | 7 ++-- internal/component/loki/process/process.go | 5 +-- internal/component/loki/source/api/api.go | 3 +- .../loki/source/aws_firehose/component.go | 3 +- .../loki/source/cloudflare/cloudflare.go | 5 +-- .../component/loki/source/docker/docker.go | 4 +- internal/component/loki/source/file/file.go | 2 +- .../component/loki/source/gcplog/gcplog.go | 3 +- .../component/loki/source/journal/journal.go | 4 +- .../kubernetes_events/kubernetes_events.go | 4 +- 13 files changed, 47 insertions(+), 59 deletions(-) rename internal/component/{loki/source => common/loki}/consume.go (81%) rename internal/component/{loki/source => common/loki}/consume_test.go (67%) rename internal/component/{loki/source => common/loki}/drain.go (87%) rename internal/component/{loki/source => common/loki}/drain_test.go (82%) diff --git a/internal/component/loki/source/consume.go b/internal/component/common/loki/consume.go similarity index 81% rename from internal/component/loki/source/consume.go rename to internal/component/common/loki/consume.go index 2c7d178ec1a..f97f05230e8 100644 --- a/internal/component/loki/source/consume.go +++ b/internal/component/common/loki/consume.go @@ -1,9 +1,7 @@ -package source +package loki import ( "context" - - "github.com/grafana/alloy/internal/component/common/loki" ) // Consume continuously reads log entries from recv and forwards them to the fanout f. @@ -12,8 +10,8 @@ import ( // This function is typically used in component Run methods to handle the forwarding // of log entries from a component's internal handler to downstream receivers. // The fanout allows entries to be sent to multiple receivers concurrently. -func Consume(ctx context.Context, recv loki.LogsReceiver, f *loki.Fanout) { - consume(ctx, recv, f, func(e loki.Entry) loki.Entry { return e }) +func Consume(ctx context.Context, recv LogsReceiver, f *Fanout) { + consume(ctx, recv, f, func(e Entry) Entry { return e }) } // ConsumeAndProcess continuously reads log entries from recv, processes them using processFn, @@ -27,9 +25,9 @@ func Consume(ctx context.Context, recv loki.LogsReceiver, f *loki.Fanout) { // or enrichment of log entries. func ConsumeAndProcess( ctx context.Context, - recv loki.LogsReceiver, - f *loki.Fanout, - processFn func(e loki.Entry) loki.Entry, + recv LogsReceiver, + f *Fanout, + processFn func(e Entry) Entry, ) { consume(ctx, recv, f, processFn) @@ -37,9 +35,9 @@ func ConsumeAndProcess( func consume( ctx context.Context, - recv loki.LogsReceiver, - f *loki.Fanout, - processFn func(e loki.Entry) loki.Entry, + recv LogsReceiver, + f *Fanout, + processFn func(e Entry) Entry, ) { for { @@ -61,7 +59,7 @@ func consume( // This function is typically used in component Run methods to handle the forwarding // of log entries from a component's internal handler to downstream receivers. // The fanout allows entries to be sent to multiple receivers concurrently. -func ConsumeBatch(ctx context.Context, recv loki.LogsBatchReceiver, f *loki.Fanout) { +func ConsumeBatch(ctx context.Context, recv LogsBatchReceiver, f *Fanout) { for { select { case <-ctx.Done(): diff --git a/internal/component/loki/source/consume_test.go b/internal/component/common/loki/consume_test.go similarity index 67% rename from internal/component/loki/source/consume_test.go rename to internal/component/common/loki/consume_test.go index eca02596f39..27bbb2b1177 100644 --- a/internal/component/loki/source/consume_test.go +++ b/internal/component/common/loki/consume_test.go @@ -1,4 +1,4 @@ -package source +package loki import ( "context" @@ -7,14 +7,12 @@ import ( "github.com/grafana/loki/pkg/push" "github.com/stretchr/testify/require" - - "github.com/grafana/alloy/internal/component/common/loki" ) func TestConsume(t *testing.T) { - consumer := loki.NewLogsReceiver() - producer := loki.NewLogsReceiver() - fanout := loki.NewFanout([]loki.LogsReceiver{consumer}) + consumer := NewLogsReceiver() + producer := NewLogsReceiver() + fanout := NewFanout([]LogsReceiver{consumer}) t.Run("should fanout any consumed entries", func(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) @@ -24,7 +22,7 @@ func TestConsume(t *testing.T) { Consume(ctx, producer, fanout) }) - producer.Chan() <- loki.Entry{Entry: push.Entry{Line: "1"}} + producer.Chan() <- Entry{Entry: push.Entry{Line: "1"}} e := <-consumer.Chan() require.Equal(t, "1", e.Entry.Line) cancel() @@ -38,21 +36,21 @@ func TestConsume(t *testing.T) { Consume(ctx, producer, fanout) }) - producer.Chan() <- loki.Entry{Entry: push.Entry{Line: "1"}} + producer.Chan() <- Entry{Entry: push.Entry{Line: "1"}} cancel() wg.Wait() }) } func TestConsumeAndProcess(t *testing.T) { - consumer := loki.NewLogsReceiver() - producer := loki.NewLogsReceiver() - fanout := loki.NewFanout([]loki.LogsReceiver{consumer}) + consumer := NewLogsReceiver() + producer := NewLogsReceiver() + fanout := NewFanout([]LogsReceiver{consumer}) t.Run("should process and fanout any consumed entries", func(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) - processFn := func(e loki.Entry) loki.Entry { + processFn := func(e Entry) Entry { e.Entry.Line = "processed: " + e.Entry.Line return e } @@ -62,7 +60,7 @@ func TestConsumeAndProcess(t *testing.T) { ConsumeAndProcess(ctx, producer, fanout, processFn) }) - producer.Chan() <- loki.Entry{Entry: push.Entry{Line: "1"}} + producer.Chan() <- Entry{Entry: push.Entry{Line: "1"}} e := <-consumer.Chan() require.Equal(t, "processed: 1", e.Entry.Line) cancel() @@ -71,7 +69,7 @@ func TestConsumeAndProcess(t *testing.T) { t.Run("should stop if context is canceled while trying to fanout", func(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) - processFn := func(e loki.Entry) loki.Entry { + processFn := func(e Entry) Entry { return e } wg := sync.WaitGroup{} @@ -79,16 +77,16 @@ func TestConsumeAndProcess(t *testing.T) { ConsumeAndProcess(ctx, producer, fanout, processFn) }) - producer.Chan() <- loki.Entry{Entry: push.Entry{Line: "1"}} + producer.Chan() <- Entry{Entry: push.Entry{Line: "1"}} cancel() wg.Wait() }) } func TestConsumeBatch(t *testing.T) { - consumer := loki.NewLogsReceiver() - producer := loki.NewLogsBatchReceiver() - fanout := loki.NewFanout([]loki.LogsReceiver{consumer}) + consumer := NewLogsReceiver() + producer := NewLogsBatchReceiver() + fanout := NewFanout([]LogsReceiver{consumer}) t.Run("should fanout any consumed entries", func(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) @@ -98,7 +96,7 @@ func TestConsumeBatch(t *testing.T) { ConsumeBatch(ctx, producer, fanout) }) - producer.Chan() <- []loki.Entry{{Entry: push.Entry{Line: "1"}}, {Entry: push.Entry{Line: "2"}}} + producer.Chan() <- []Entry{{Entry: push.Entry{Line: "1"}}, {Entry: push.Entry{Line: "2"}}} e := <-consumer.Chan() require.Equal(t, "1", e.Entry.Line) e = <-consumer.Chan() @@ -114,7 +112,7 @@ func TestConsumeBatch(t *testing.T) { ConsumeBatch(ctx, producer, fanout) }) - producer.Chan() <- []loki.Entry{{Entry: push.Entry{Line: "1"}}, {Entry: push.Entry{Line: "2"}}} + producer.Chan() <- []Entry{{Entry: push.Entry{Line: "1"}}, {Entry: push.Entry{Line: "2"}}} cancel() wg.Wait() }) diff --git a/internal/component/loki/source/drain.go b/internal/component/common/loki/drain.go similarity index 87% rename from internal/component/loki/source/drain.go rename to internal/component/common/loki/drain.go index acca25d41a9..612c3d7100b 100644 --- a/internal/component/loki/source/drain.go +++ b/internal/component/common/loki/drain.go @@ -1,9 +1,7 @@ -package source +package loki import ( "context" - - "github.com/grafana/alloy/internal/component/common/loki" ) // Drain consumes log entries from recv in a background goroutine while f executes. @@ -14,7 +12,7 @@ import ( // // This is typically used during component shutdown to drain any remaining entries // from a receiver channel while performing cleanup operations. -func Drain(recv loki.LogsReceiver, f func()) { +func Drain(recv LogsReceiver, f func()) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() go func() { diff --git a/internal/component/loki/source/drain_test.go b/internal/component/common/loki/drain_test.go similarity index 82% rename from internal/component/loki/source/drain_test.go rename to internal/component/common/loki/drain_test.go index a29749ce89b..b153691cca7 100644 --- a/internal/component/loki/source/drain_test.go +++ b/internal/component/common/loki/drain_test.go @@ -1,23 +1,22 @@ -package source +package loki import ( "sync" "testing" "time" - "github.com/grafana/alloy/internal/component/common/loki" "github.com/grafana/loki/pkg/push" "github.com/prometheus/common/model" "github.com/stretchr/testify/require" ) func TestDrain(t *testing.T) { - recv := loki.NewLogsReceiver() + recv := NewLogsReceiver() var wg sync.WaitGroup wg.Go(func() { for range 10 { - entry := loki.Entry{ + entry := Entry{ Labels: model.LabelSet{"test": "label"}, Entry: push.Entry{ Timestamp: time.Now(), diff --git a/internal/component/loki/process/process.go b/internal/component/loki/process/process.go index 211c05327ef..88600acf4d3 100644 --- a/internal/component/loki/process/process.go +++ b/internal/component/loki/process/process.go @@ -10,7 +10,6 @@ import ( "github.com/grafana/alloy/internal/component" "github.com/grafana/alloy/internal/component/common/loki" "github.com/grafana/alloy/internal/component/loki/process/stages" - "github.com/grafana/alloy/internal/component/loki/source" "github.com/grafana/alloy/internal/featuregate" "github.com/grafana/alloy/internal/runtime/logging/level" "github.com/grafana/alloy/internal/service/livedebugging" @@ -90,7 +89,7 @@ func New(o component.Options, args Arguments) (*Component, error) { // Run implements component.Component. func (c *Component) Run(ctx context.Context) error { defer func() { - source.Drain(c.processOut, func() { + loki.Drain(c.processOut, func() { c.mut.Lock() defer c.mut.Unlock() if c.entryHandler != nil { @@ -103,7 +102,7 @@ func (c *Component) Run(ctx context.Context) error { wg.Go(func() { c.handleIn(ctx) }) wg.Go(func() { - source.ConsumeAndProcess(ctx, c.processOut, c.fanout, func(e loki.Entry) loki.Entry { + loki.ConsumeAndProcess(ctx, c.processOut, c.fanout, func(e loki.Entry) loki.Entry { // The log entry is the same for every fanout, // so we can publish it only once. c.debugDataPublisher.PublishIfActive(livedebugging.NewData( diff --git a/internal/component/loki/source/api/api.go b/internal/component/loki/source/api/api.go index b5f2b6ff3a6..8f1dc827d2d 100644 --- a/internal/component/loki/source/api/api.go +++ b/internal/component/loki/source/api/api.go @@ -14,7 +14,6 @@ import ( "github.com/grafana/alloy/internal/component/common/loki" fnet "github.com/grafana/alloy/internal/component/common/net" "github.com/grafana/alloy/internal/component/common/relabel" - "github.com/grafana/alloy/internal/component/loki/source" "github.com/grafana/alloy/internal/component/loki/source/api/internal/lokipush" "github.com/grafana/alloy/internal/featuregate" "github.com/grafana/alloy/internal/util" @@ -94,7 +93,7 @@ func (c *Component) Run(ctx context.Context) (err error) { } }() - source.ConsumeBatch(ctx, c.handler, c.fanout) + loki.ConsumeBatch(ctx, c.handler, c.fanout) return } diff --git a/internal/component/loki/source/aws_firehose/component.go b/internal/component/loki/source/aws_firehose/component.go index af1c2741082..0acdaf9ed45 100644 --- a/internal/component/loki/source/aws_firehose/component.go +++ b/internal/component/loki/source/aws_firehose/component.go @@ -16,7 +16,6 @@ import ( "github.com/grafana/alloy/internal/component/common/loki" fnet "github.com/grafana/alloy/internal/component/common/net" alloy_relabel "github.com/grafana/alloy/internal/component/common/relabel" - "github.com/grafana/alloy/internal/component/loki/source" "github.com/grafana/alloy/internal/component/loki/source/aws_firehose/internal" "github.com/grafana/alloy/internal/util" ) @@ -96,7 +95,7 @@ func (c *Component) Run(ctx context.Context) error { } }() - source.Consume(ctx, c.handler, c.fanout) + loki.Consume(ctx, c.handler, c.fanout) return nil } diff --git a/internal/component/loki/source/cloudflare/cloudflare.go b/internal/component/loki/source/cloudflare/cloudflare.go index 0417efcba44..df311cb4cd3 100644 --- a/internal/component/loki/source/cloudflare/cloudflare.go +++ b/internal/component/loki/source/cloudflare/cloudflare.go @@ -17,7 +17,6 @@ import ( "github.com/grafana/alloy/internal/component" "github.com/grafana/alloy/internal/component/common/loki" - "github.com/grafana/alloy/internal/component/loki/source" "github.com/grafana/alloy/internal/component/loki/source/internal/positions" "github.com/grafana/alloy/internal/featuregate" "github.com/grafana/alloy/internal/runtime/logging/level" @@ -139,14 +138,14 @@ func (c *Component) Run(ctx context.Context) error { // NOTE: We need to stop posFile first so we don't record entries we are draining. c.posFile.Stop() - source.Drain(c.handler, func() { + loki.Drain(c.handler, func() { c.mut.Lock() defer c.mut.Unlock() c.tailer.stop() }) }() - source.Consume(ctx, c.handler, c.fanout) + loki.Consume(ctx, c.handler, c.fanout) return nil } diff --git a/internal/component/loki/source/docker/docker.go b/internal/component/loki/source/docker/docker.go index 369ac661a18..4b8b37489df 100644 --- a/internal/component/loki/source/docker/docker.go +++ b/internal/component/loki/source/docker/docker.go @@ -161,7 +161,7 @@ func (c *Component) Run(ctx context.Context) error { c.posFile.Stop() // Start black hole drain routine to prevent deadlock when we call c.scheduler.Stop(). - source.Drain(c.handler, func() { + loki.Drain(c.handler, func() { c.mut.Lock() defer c.mut.Unlock() c.scheduler.Stop() @@ -169,7 +169,7 @@ func (c *Component) Run(ctx context.Context) error { }() // Start consume and fanout loop - source.Consume(ctx, c.handler, c.fanout) + loki.Consume(ctx, c.handler, c.fanout) return nil } diff --git a/internal/component/loki/source/file/file.go b/internal/component/loki/source/file/file.go index 037624163e8..7e89f4fdf28 100644 --- a/internal/component/loki/source/file/file.go +++ b/internal/component/loki/source/file/file.go @@ -264,7 +264,7 @@ func (c *Component) Run(ctx context.Context) error { var wg sync.WaitGroup // Start consume and fanout loop - wg.Go(func() { source.Consume(ctx, c.handler, c.fanout) }) + wg.Go(func() { loki.Consume(ctx, c.handler, c.fanout) }) wg.Go(func() { for { diff --git a/internal/component/loki/source/gcplog/gcplog.go b/internal/component/loki/source/gcplog/gcplog.go index 329aac5d7b5..62beac09f7a 100644 --- a/internal/component/loki/source/gcplog/gcplog.go +++ b/internal/component/loki/source/gcplog/gcplog.go @@ -13,7 +13,6 @@ import ( "github.com/grafana/alloy/internal/component" "github.com/grafana/alloy/internal/component/common/loki" alloy_relabel "github.com/grafana/alloy/internal/component/common/relabel" - "github.com/grafana/alloy/internal/component/loki/source" "github.com/grafana/alloy/internal/component/loki/source/gcplog/gcptypes" gt "github.com/grafana/alloy/internal/component/loki/source/gcplog/internal/gcplogtarget" "github.com/grafana/alloy/internal/util" @@ -99,7 +98,7 @@ func (c *Component) Run(ctx context.Context) error { } }() - source.Consume(ctx, c.handler, c.fanout) + loki.Consume(ctx, c.handler, c.fanout) return nil } diff --git a/internal/component/loki/source/journal/journal.go b/internal/component/loki/source/journal/journal.go index 06cec76b83c..238805fa7a8 100644 --- a/internal/component/loki/source/journal/journal.go +++ b/internal/component/loki/source/journal/journal.go @@ -94,7 +94,7 @@ func (c *Component) Run(ctx context.Context) error { // We need to stop posFile first so we don't record entries we are draining c.positions.Stop() - source.Drain(c.recv, func() { + loki.Drain(c.recv, func() { c.mut.Lock() defer c.mut.Unlock() if c.tailer != nil { @@ -106,7 +106,7 @@ func (c *Component) Run(ctx context.Context) error { }() var wg sync.WaitGroup - wg.Go(func() { source.Consume(ctx, c.recv, c.fanout) }) + wg.Go(func() { loki.Consume(ctx, c.recv, c.fanout) }) wg.Go(func() { for { select { diff --git a/internal/component/loki/source/kubernetes_events/kubernetes_events.go b/internal/component/loki/source/kubernetes_events/kubernetes_events.go index d266ed300c2..6319e41ddc4 100644 --- a/internal/component/loki/source/kubernetes_events/kubernetes_events.go +++ b/internal/component/loki/source/kubernetes_events/kubernetes_events.go @@ -129,14 +129,14 @@ func New(o component.Options, args Arguments) (*Component, error) { func (c *Component) Run(ctx context.Context) error { defer func() { c.positions.Stop() - source.Drain(c.handler, func() { + loki.Drain(c.handler, func() { c.mut.Lock() defer c.mut.Unlock() c.scheduler.Stop() }) }() - source.Consume(ctx, c.handler, c.fanout) + loki.Consume(ctx, c.handler, c.fanout) return nil } From 376678f062ef6d06338ed426be5bcb89cb0d19c6 Mon Sep 17 00:00:00 2001 From: Kalle <23356117+kalleep@users.noreply.github.com> Date: Wed, 18 Mar 2026 15:38:17 +0100 Subject: [PATCH 2/4] chore: Use shared functions for loki components and fix locking --- internal/component/loki/enrich/enrich.go | 131 +++++-------- internal/component/loki/enrich/enrich_test.go | 172 ++++++++++-------- 2 files changed, 146 insertions(+), 157 deletions(-) diff --git a/internal/component/loki/enrich/enrich.go b/internal/component/loki/enrich/enrich.go index 43f6977f25a..ba0f633d69f 100644 --- a/internal/component/loki/enrich/enrich.go +++ b/internal/component/loki/enrich/enrich.go @@ -5,8 +5,6 @@ import ( "context" "sync" - "github.com/go-kit/log/level" - "github.com/grafana/loki/pkg/push" "github.com/prometheus/common/model" "github.com/grafana/alloy/internal/component" @@ -51,14 +49,13 @@ type Exports struct { } type Component struct { - opts component.Options - args Arguments - exports Exports + opts component.Options + receiver loki.LogsReceiver + fanout *loki.Fanout mut sync.RWMutex - receiver loki.LogsReceiver + args Arguments targetsCache map[string]model.LabelSet - cacheMutex sync.RWMutex } func New(opts component.Options, args Arguments) (*Component, error) { @@ -67,123 +64,81 @@ func New(opts component.Options, args Arguments) (*Component, error) { args: args, targetsCache: make(map[string]model.LabelSet), receiver: loki.NewLogsReceiver(loki.WithComponentID(opts.ID)), + fanout: loki.NewFanout(args.ForwardTo), } - // Initialize the cache with provided targets - c.refreshCacheFromTargets(args.Targets) + opts.OnStateChange(Exports{Receiver: c.receiver}) - // Create and immediately export the receiver - c.exports.Receiver = c.receiver - opts.OnStateChange(c.exports) + if err := c.Update(args); err != nil { + return nil, err + } return c, nil } // Run implements component.Component. func (c *Component) Run(ctx context.Context) error { - for { - select { - case <-ctx.Done(): - return nil - case entry := <-c.receiver.Chan(): - if err := c.processLog(&entry.Entry, entry.Labels); err != nil { - level.Error(c.opts.Logger).Log("msg", "failed to process log", "err", err) - } - } - } + loki.ConsumeAndProcess(ctx, c.receiver, c.fanout, func(e loki.Entry) loki.Entry { + return c.processLog(e) + }) + return nil } -func (c *Component) refreshCacheFromTargets(targets []discovery.Target) { - newCache := make(map[string]model.LabelSet) - - for _, target := range targets { - labelSet := make(model.LabelSet) - // Copy both own and group labels - target.ForEachLabel(func(k, v string) bool { - labelSet[model.LabelName(k)] = model.LabelValue(v) - return true - }) - if matchValue := string(labelSet[model.LabelName(c.args.TargetMatchLabel)]); matchValue != "" { - newCache[matchValue] = labelSet - } - } +func (c *Component) processLog(entry loki.Entry) loki.Entry { + c.mut.RLock() + defer c.mut.RUnlock() - c.cacheMutex.Lock() - c.targetsCache = newCache - c.cacheMutex.Unlock() -} + targetMatchLabel := c.args.TargetMatchLabel + logsMatchLabel := c.args.LogsMatchLabel + labelsToCopy := append([]string(nil), c.args.LabelsToCopy...) -func (c *Component) processLog(entry *push.Entry, labels model.LabelSet) error { // Determine which label to use for matching - matchLabel := c.args.LogsMatchLabel + matchLabel := logsMatchLabel if matchLabel == "" { - matchLabel = c.args.TargetMatchLabel + matchLabel = targetMatchLabel } // Get the source value to match against discovered targets - sourceValue := string(labels[model.LabelName(matchLabel)]) + sourceValue := string(entry.Labels[model.LabelName(matchLabel)]) if sourceValue == "" { // No match label, forward as-is - return c.forwardLog(entry, labels) + return entry } // Look up matching target - c.cacheMutex.RLock() targetLabels, found := c.targetsCache[sourceValue] - c.cacheMutex.RUnlock() if !found { // No matching target, forward as-is - return c.forwardLog(entry, labels) + return entry } - // Copy labels from target to log labels - newLabels := labels.Clone() - if len(c.args.LabelsToCopy) == 0 { + // Copy entry in case it was forwarded to several components. + newEntry := entry.Clone() + if len(labelsToCopy) == 0 { // If no specific labels are requested, copy all labels for k, v := range targetLabels { - newLabels[k] = v + newEntry.Labels[k] = v } } else { // Copy only requested labels - for _, label := range c.args.LabelsToCopy { + for _, label := range labelsToCopy { if value := targetLabels[model.LabelName(label)]; value != "" { - newLabels[model.LabelName(label)] = value + newEntry.Labels[model.LabelName(label)] = value } } } - return c.forwardLog(entry, newLabels) -} - -func (c *Component) forwardLog(entry *push.Entry, labels model.LabelSet) error { - c.mut.RLock() - fanout := c.args.ForwardTo - c.mut.RUnlock() - - for _, receiver := range fanout { - receiver.Chan() <- loki.Entry{ - Labels: labels, - Entry: *entry, - } - } - return nil -} - -func (c *Component) Name() string { - return "loki.enrich" -} - -func (c *Component) Ready() bool { - return true + return newEntry } func (c *Component) Update(args component.Arguments) error { - newArgs := args.(Arguments) - c.mut.Lock() defer c.mut.Unlock() + + newArgs := args.(Arguments) c.args = newArgs + c.fanout.UpdateChildren(newArgs.ForwardTo) // Update the targets cache with new targets c.refreshCacheFromTargets(newArgs.Targets) @@ -191,6 +146,20 @@ func (c *Component) Update(args component.Arguments) error { return nil } -func (c *Component) Exports() component.Exports { - return &c.exports +func (c *Component) refreshCacheFromTargets(targets []discovery.Target) { + newCache := make(map[string]model.LabelSet) + + for _, target := range targets { + labelSet := make(model.LabelSet) + // Copy both own and group labels + target.ForEachLabel(func(k, v string) bool { + labelSet[model.LabelName(k)] = model.LabelValue(v) + return true + }) + if matchValue := string(labelSet[model.LabelName(c.args.TargetMatchLabel)]); matchValue != "" { + newCache[matchValue] = labelSet + } + } + + c.targetsCache = newCache } diff --git a/internal/component/loki/enrich/enrich_test.go b/internal/component/loki/enrich/enrich_test.go index 9ce7ed0d64a..4de4881a808 100644 --- a/internal/component/loki/enrich/enrich_test.go +++ b/internal/component/loki/enrich/enrich_test.go @@ -1,6 +1,7 @@ package enrich import ( + "context" "testing" "time" @@ -22,11 +23,10 @@ func TestEnricher(t *testing.T) { } tests := []struct { - name string - args Arguments - inputLog *push.Entry - inputLabels model.LabelSet - expectedLabels model.LabelSet + name string + args Arguments + input loki.Entry + expected loki.Entry }{ { name: "label enrichment with target_labels and logs_match_label", @@ -43,18 +43,25 @@ func TestEnricher(t *testing.T) { LogsMatchLabel: "service_name", LabelsToCopy: []string{"env", "owner"}, }, - inputLog: &push.Entry{ - Timestamp: time.Now(), - Line: "test log", - }, - inputLabels: model.LabelSet{ - "service_name": "test-service", + input: loki.Entry{ + Labels: model.LabelSet{ + "service_name": "test-service", + }, + Entry: push.Entry{ + Timestamp: time.Now(), + Line: "test log", + }, }, // foo:bar is not added as it is not in the target labels. - expectedLabels: model.LabelSet{ - "service_name": "test-service", - "env": "prod", - "owner": "team-a", + expected: loki.Entry{ + Labels: model.LabelSet{ + "service_name": "test-service", + "env": "prod", + "owner": "team-a", + }, + Entry: push.Entry{ + Line: "test log", + }, }, }, { @@ -70,17 +77,24 @@ func TestEnricher(t *testing.T) { LogsMatchLabel: "service_name", LabelsToCopy: []string{"env"}, }, - inputLog: &push.Entry{ - Timestamp: time.Now(), - Line: "test log", - }, - inputLabels: model.LabelSet{ - "service_name": "test-service", - "foo": "bar", + input: loki.Entry{ + Labels: model.LabelSet{ + "service_name": "test-service", + "foo": "bar", + }, + Entry: push.Entry{ + Timestamp: time.Now(), + Line: "test log", + }, }, - expectedLabels: model.LabelSet{ - "service_name": "test-service", - "foo": "bar", + expected: loki.Entry{ + Labels: model.LabelSet{ + "service_name": "test-service", + "foo": "bar", + }, + Entry: push.Entry{ + Line: "test log", + }, }, }, { @@ -97,18 +111,25 @@ func TestEnricher(t *testing.T) { TargetMatchLabel: "service", // LogsMatchLabel intentionally omitted as 'service' label exists in both. }, - inputLog: &push.Entry{ - Timestamp: time.Now(), - Line: "test log", - }, - inputLabels: model.LabelSet{ - "service": "test-service", + input: loki.Entry{ + Labels: model.LabelSet{ + "service": "test-service", + }, + Entry: push.Entry{ + Timestamp: time.Now(), + Line: "test log", + }, }, - expectedLabels: model.LabelSet{ - "service": "test-service", - "env": "prod", - "owner": "team-b", - "region": "us-west", + expected: loki.Entry{ + Labels: model.LabelSet{ + "service": "test-service", + "env": "prod", + "owner": "team-b", + "region": "us-west", + }, + Entry: push.Entry{ + Line: "test log", + }, }, }, { @@ -125,53 +146,61 @@ func TestEnricher(t *testing.T) { // LogsMatchLabel intentionally omitted as 'service' label exists in both. LabelsToCopy: []string{"env", "owner"}, }, - inputLog: &push.Entry{ - Timestamp: time.Now(), - Line: "test log", - }, - inputLabels: model.LabelSet{ - "service": "test-service", // matches target_match_label - "original": "label", + input: loki.Entry{ + Labels: model.LabelSet{ + "service": "test-service", // matches target_match_label + "original": "label", + }, + Entry: push.Entry{ + Timestamp: time.Now(), + Line: "test log", + }, }, - expectedLabels: model.LabelSet{ - "service": "test-service", - "original": "label", - "env": "prod", - "owner": "team-c", + expected: loki.Entry{ + Labels: model.LabelSet{ + "service": "test-service", + "original": "label", + "env": "prod", + "owner": "team-c", + }, + Entry: push.Entry{ + Line: "test log", + }, }, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - // Create a channel to receive enriched logs - receivedCh := make(chan loki.Entry, 1) - receiver := loki.NewLogsReceiver() + collector := loki.NewCollectingHandler() + defer collector.Stop() + + var exports Exports // Create the component - tt.args.ForwardTo = []loki.LogsReceiver{receiver} + tt.args.ForwardTo = []loki.LogsReceiver{collector.Receiver()} + opts.OnStateChange = func(e component.Exports) { + exports = e.(Exports) + } comp, err := New(opts, tt.args) require.NoError(t, err) + require.NotNil(t, exports.Receiver) - // Start a goroutine to forward logs to our test channel + ctx, cancel := context.WithCancel(t.Context()) + defer cancel() go func() { - for entry := range receiver.Chan() { - receivedCh <- entry - } + _ = comp.Run(ctx) }() - // Process a log entry - err = comp.processLog(tt.inputLog, tt.inputLabels) - require.NoError(t, err) + exports.Receiver.Chan() <- tt.input - // Verify the enriched log - select { - case received := <-receivedCh: - require.Equal(t, tt.expectedLabels, received.Labels) - require.Equal(t, tt.inputLog.Line, received.Entry.Line) - case <-time.After(time.Second): - t.Fatal("timeout waiting for log entry") - } + require.Eventually(t, func() bool { + return len(collector.Received()) == 1 + }, time.Second, 10*time.Millisecond) + + received := collector.Received()[0] + require.Equal(t, tt.expected.Labels, received.Labels) + require.Equal(t, tt.expected.Line, received.Line) }) } } @@ -199,12 +228,3 @@ func TestUpdate(t *testing.T) { }) require.NoError(t, err) } - -func TestName(t *testing.T) { - comp, err := New(component.Options{ - Logger: log.NewNopLogger(), - OnStateChange: func(e component.Exports) {}, - }, Arguments{}) - require.NoError(t, err) - require.Equal(t, "loki.enrich", comp.Name()) -} From 901b1f496f0a276c7aad19a6ec01577a86778bf8 Mon Sep 17 00:00:00 2001 From: Kalle <23356117+kalleep@users.noreply.github.com> Date: Wed, 18 Mar 2026 16:01:38 +0100 Subject: [PATCH 3/4] fix tests --- internal/component/loki/enrich/enrich_test.go | 63 +++++++++---------- .../component/loki/source/journal/journal.go | 1 - 2 files changed, 28 insertions(+), 36 deletions(-) diff --git a/internal/component/loki/enrich/enrich_test.go b/internal/component/loki/enrich/enrich_test.go index 4de4881a808..4f6b76cd978 100644 --- a/internal/component/loki/enrich/enrich_test.go +++ b/internal/component/loki/enrich/enrich_test.go @@ -16,18 +16,26 @@ import ( ) func TestEnricher(t *testing.T) { - // Create basic component options - opts := component.Options{ - Logger: log.NewNopLogger(), - OnStateChange: func(e component.Exports) {}, - } + var ( + now = time.Now() + inputEntry = push.Entry{ + Timestamp: now, + Line: "test log", + } + expectedEntry = push.Entry{ + Line: "test log", + Timestamp: now, + } + ) - tests := []struct { + type testCase struct { name string args Arguments input loki.Entry expected loki.Entry - }{ + } + + tests := []testCase{ { name: "label enrichment with target_labels and logs_match_label", args: Arguments{ @@ -47,10 +55,7 @@ func TestEnricher(t *testing.T) { Labels: model.LabelSet{ "service_name": "test-service", }, - Entry: push.Entry{ - Timestamp: time.Now(), - Line: "test log", - }, + Entry: inputEntry, }, // foo:bar is not added as it is not in the target labels. expected: loki.Entry{ @@ -59,9 +64,7 @@ func TestEnricher(t *testing.T) { "env": "prod", "owner": "team-a", }, - Entry: push.Entry{ - Line: "test log", - }, + Entry: expectedEntry, }, }, { @@ -82,19 +85,14 @@ func TestEnricher(t *testing.T) { "service_name": "test-service", "foo": "bar", }, - Entry: push.Entry{ - Timestamp: time.Now(), - Line: "test log", - }, + Entry: inputEntry, }, expected: loki.Entry{ Labels: model.LabelSet{ "service_name": "test-service", "foo": "bar", }, - Entry: push.Entry{ - Line: "test log", - }, + Entry: expectedEntry, }, }, { @@ -115,10 +113,7 @@ func TestEnricher(t *testing.T) { Labels: model.LabelSet{ "service": "test-service", }, - Entry: push.Entry{ - Timestamp: time.Now(), - Line: "test log", - }, + Entry: inputEntry, }, expected: loki.Entry{ Labels: model.LabelSet{ @@ -127,9 +122,7 @@ func TestEnricher(t *testing.T) { "owner": "team-b", "region": "us-west", }, - Entry: push.Entry{ - Line: "test log", - }, + Entry: expectedEntry, }, }, { @@ -151,10 +144,7 @@ func TestEnricher(t *testing.T) { "service": "test-service", // matches target_match_label "original": "label", }, - Entry: push.Entry{ - Timestamp: time.Now(), - Line: "test log", - }, + Entry: inputEntry, }, expected: loki.Entry{ Labels: model.LabelSet{ @@ -163,9 +153,7 @@ func TestEnricher(t *testing.T) { "env": "prod", "owner": "team-c", }, - Entry: push.Entry{ - Line: "test log", - }, + Entry: expectedEntry, }, }, } @@ -179,6 +167,11 @@ func TestEnricher(t *testing.T) { // Create the component tt.args.ForwardTo = []loki.LogsReceiver{collector.Receiver()} + + opts := component.Options{ + Logger: log.NewNopLogger(), + OnStateChange: func(e component.Exports) {}, + } opts.OnStateChange = func(e component.Exports) { exports = e.(Exports) } diff --git a/internal/component/loki/source/journal/journal.go b/internal/component/loki/source/journal/journal.go index 238805fa7a8..da8ff942c73 100644 --- a/internal/component/loki/source/journal/journal.go +++ b/internal/component/loki/source/journal/journal.go @@ -16,7 +16,6 @@ import ( "github.com/grafana/alloy/internal/component" "github.com/grafana/alloy/internal/component/common/loki" alloy_relabel "github.com/grafana/alloy/internal/component/common/relabel" - "github.com/grafana/alloy/internal/component/loki/source" "github.com/grafana/alloy/internal/component/loki/source/internal/positions" "github.com/grafana/alloy/internal/featuregate" "github.com/grafana/alloy/internal/runtime/logging/level" From 04755d3a2cd03afc6d9764fd18aa740f3c2812a3 Mon Sep 17 00:00:00 2001 From: Kalle <23356117+kalleep@users.noreply.github.com> Date: Wed, 18 Mar 2026 16:11:54 +0100 Subject: [PATCH 4/4] fix --- internal/component/loki/enrich/enrich_test.go | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/internal/component/loki/enrich/enrich_test.go b/internal/component/loki/enrich/enrich_test.go index 4f6b76cd978..b7d2124a8ba 100644 --- a/internal/component/loki/enrich/enrich_test.go +++ b/internal/component/loki/enrich/enrich_test.go @@ -2,6 +2,7 @@ package enrich import ( "context" + "sync" "testing" "time" @@ -180,10 +181,10 @@ func TestEnricher(t *testing.T) { require.NotNil(t, exports.Receiver) ctx, cancel := context.WithCancel(t.Context()) - defer cancel() - go func() { + var wg sync.WaitGroup + wg.Go(func() { _ = comp.Run(ctx) - }() + }) exports.Receiver.Chan() <- tt.input @@ -194,6 +195,9 @@ func TestEnricher(t *testing.T) { received := collector.Received()[0] require.Equal(t, tt.expected.Labels, received.Labels) require.Equal(t, tt.expected.Line, received.Line) + + cancel() + wg.Wait() }) } }