From fc95aab439dc5ea51b016e9503fc4759092b84d4 Mon Sep 17 00:00:00 2001 From: Piotr <17101802+thampiotr@users.noreply.github.com> Date: Wed, 8 Oct 2025 16:24:18 +0100 Subject: [PATCH 1/4] loki.source.journal: fix deadlock --- .../component/loki/source/journal/journal.go | 134 ++++++++++++------ .../component/loki/source/syslog/syslog.go | 11 +- 2 files changed, 99 insertions(+), 46 deletions(-) diff --git a/internal/component/loki/source/journal/journal.go b/internal/component/loki/source/journal/journal.go index f46e4e73795..7197b620b66 100644 --- a/internal/component/loki/source/journal/journal.go +++ b/internal/component/loki/source/journal/journal.go @@ -38,16 +38,16 @@ var _ component.Component = (*Component)(nil) // Component represents reading from a journal type Component struct { - mut sync.RWMutex - t *target.JournalTarget - metrics *target.Metrics - o component.Options - handler chan loki.Entry - positions positions.Positions - receivers []loki.LogsReceiver - argsUpdated chan struct{} - args Arguments - healthErr error + mut sync.RWMutex + t *target.JournalTarget + metrics *target.Metrics + o component.Options + handler chan loki.Entry + positions positions.Positions + receivers []loki.LogsReceiver + targetsUpdated chan struct{} + args Arguments + healthErr error } // New creates a new component. @@ -73,13 +73,13 @@ func New(o component.Options, args Arguments) (*Component, error) { } c := &Component{ - metrics: target.NewMetrics(o.Registerer), - o: o, - handler: make(chan loki.Entry), - positions: positionsFile, - receivers: args.Receivers, - argsUpdated: make(chan struct{}, 1), - args: args, + metrics: target.NewMetrics(o.Registerer), + o: o, + handler: make(chan loki.Entry), + positions: positionsFile, + receivers: args.Receivers, + targetsUpdated: make(chan struct{}, 1), + args: args, } err = c.Update(args) return c, err @@ -88,18 +88,25 @@ func New(o component.Options, args Arguments) (*Component, error) { // Run starts the component. func (c *Component) Run(ctx context.Context) error { defer func() { + // Start draining routine to prevent potential deadlock if target attempts to send during Stop(). + cancel := c.startDrainingRoutine() + defer cancel() + + // Stop existing target c.mut.RLock() + defer c.mut.RUnlock() if c.t != nil { + level.Info(c.o.Logger).Log("msg", "loki.source.journal component shutting down, stopping journal target") err := c.t.Stop() if err != nil { level.Warn(c.o.Logger).Log("msg", "error stopping journal target", "err", err) } } - c.mut.RUnlock() - }() for { select { + case <-c.targetsUpdated: + c.reloadTargets() case <-ctx.Done(): return nil case entry := <-c.handler: @@ -112,27 +119,6 @@ func (c *Component) Run(ctx context.Context) error { r.Chan() <- lokiEntry } c.mut.RUnlock() - case <-c.argsUpdated: - c.mut.Lock() - if c.t != nil { - err := c.t.Stop() - if err != nil { - level.Error(c.o.Logger).Log("msg", "error stopping journal target", "err", err) - } - c.t = nil - } - rcs := alloy_relabel.ComponentToPromRelabelConfigs(c.args.RelabelRules) - entryHandler := loki.NewEntryHandler(c.handler, func() {}) - - newTarget, err := target.NewJournalTarget(c.metrics, c.o.Logger, entryHandler, c.positions, c.o.ID, rcs, convertArgs(c.o.ID, c.args)) - if err != nil { - level.Error(c.o.Logger).Log("msg", "error creating journal target", "err", err, "path", c.args.Path) - c.healthErr = fmt.Errorf("error creating journal target: %w", err) - } else { - c.t = newTarget - c.healthErr = nil - } - c.mut.Unlock() } } } @@ -143,8 +129,9 @@ func (c *Component) Update(args component.Arguments) error { c.mut.Lock() defer c.mut.Unlock() c.args = newArgs + c.receivers = newArgs.Receivers select { - case c.argsUpdated <- struct{}{}: + case c.targetsUpdated <- struct{}{}: default: // Update notification already sent } return nil @@ -169,6 +156,71 @@ func (c *Component) CurrentHealth() component.Health { } } +func (c *Component) startDrainingRoutine() func() { + readCtx, cancel := context.WithCancel(context.Background()) + c.mut.RLock() + defer c.mut.RUnlock() + receiversCopy := make([]loki.LogsReceiver, len(c.receivers)) + copy(receiversCopy, c.receivers) + go func() { + for { + select { + case <-readCtx.Done(): + return + case entry := <-c.handler: + lokiEntry := loki.Entry{ + Labels: entry.Labels, + Entry: entry.Entry, + } + for _, r := range receiversCopy { + r.Chan() <- lokiEntry + } + } + } + }() + return cancel +} + +func (c *Component) reloadTargets() { + // Start draining routine to prevent potential deadlock if target attempts to send during Stop(). + cancel := c.startDrainingRoutine() + + // Grab current state + c.mut.RLock() + var targetToStop *target.JournalTarget + if c.t != nil { + targetToStop = c.t + } + rcs := alloy_relabel.ComponentToPromRelabelConfigs(c.args.RelabelRules) + c.mut.RUnlock() + + // Stop existing target + if targetToStop != nil { + err := targetToStop.Stop() + if err != nil { + level.Error(c.o.Logger).Log("msg", "error stopping journal target", "err", err) + } + } + + // Stop draining routine + cancel() + + // Create new target + c.mut.Lock() + defer c.mut.Unlock() + c.t = nil + entryHandler := loki.NewEntryHandler(c.handler, func() {}) + + newTarget, err := target.NewJournalTarget(c.metrics, c.o.Logger, entryHandler, c.positions, c.o.ID, rcs, convertArgs(c.o.ID, c.args)) + if err != nil { + level.Error(c.o.Logger).Log("msg", "error creating journal target", "err", err, "path", c.args.Path) + c.healthErr = fmt.Errorf("error creating journal target: %w", err) + } else { + c.t = newTarget + c.healthErr = nil + } +} + func convertArgs(job string, a Arguments) *scrapeconfig.JournalTargetConfig { labels := model.LabelSet{ model.LabelName("job"): model.LabelValue(job), diff --git a/internal/component/loki/source/syslog/syslog.go b/internal/component/loki/source/syslog/syslog.go index b18ccec03e8..915a0be9202 100644 --- a/internal/component/loki/source/syslog/syslog.go +++ b/internal/component/loki/source/syslog/syslog.go @@ -137,11 +137,9 @@ func (c *Component) startDrainingRoutine() func() { case <-readCtx.Done(): return case entry := <-c.handler.Chan(): - c.mut.RLock() for _, receiver := range fanoutCopy { receiver.Chan() <- entry } - c.mut.RUnlock() } } }() @@ -152,20 +150,23 @@ func (c *Component) reloadTargets() { // Start draining routine to prevent potential deadlock if targets attempt to send during Stop(). cancel := c.startDrainingRoutine() - // Stop all targets + // Grab current state c.mut.RLock() var rcs []*relabel.Config if len(c.args.RelabelRules) > 0 { rcs = alloy_relabel.ComponentToPromRelabelConfigs(c.args.RelabelRules) } + targetsToStop := make([]*st.SyslogTarget, len(c.targets)) + copy(targetsToStop, c.targets) + c.mut.RUnlock() - for _, l := range c.targets { + // Stop existing targets + for _, l := range targetsToStop { err := l.Stop() if err != nil { level.Error(c.opts.Logger).Log("msg", "error while stopping syslog listener", "err", err) } } - c.mut.RUnlock() // Stop draining routine cancel() From 93c77d1904d67189d0736fd77e0c8e89a2dfb7b8 Mon Sep 17 00:00:00 2001 From: Piotr <17101802+thampiotr@users.noreply.github.com> Date: Wed, 8 Oct 2025 16:26:17 +0100 Subject: [PATCH 2/4] changelog --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 48a14027f8b..ec796e8454b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -43,6 +43,8 @@ Main (unreleased) - Fix `prometheus.exporter.cloudwatch` to not always emit debug logs but respect debug property. (@kalleep) +- Fix potential deadlock in `loki.source.journal` when stopping the component. (@thampiotr) + v1.11.0 ----------------- From a9c6f607c5c69683bf3a8c43bc96fda48c3d707a Mon Sep 17 00:00:00 2001 From: Piotr <17101802+thampiotr@users.noreply.github.com> Date: Wed, 8 Oct 2025 16:44:13 +0100 Subject: [PATCH 3/4] remove redundant field --- internal/component/loki/source/journal/journal.go | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/internal/component/loki/source/journal/journal.go b/internal/component/loki/source/journal/journal.go index 7197b620b66..0826412b5f2 100644 --- a/internal/component/loki/source/journal/journal.go +++ b/internal/component/loki/source/journal/journal.go @@ -44,7 +44,6 @@ type Component struct { o component.Options handler chan loki.Entry positions positions.Positions - receivers []loki.LogsReceiver targetsUpdated chan struct{} args Arguments healthErr error @@ -77,7 +76,6 @@ func New(o component.Options, args Arguments) (*Component, error) { o: o, handler: make(chan loki.Entry), positions: positionsFile, - receivers: args.Receivers, targetsUpdated: make(chan struct{}, 1), args: args, } @@ -115,7 +113,7 @@ func (c *Component) Run(ctx context.Context) error { Labels: entry.Labels, Entry: entry.Entry, } - for _, r := range c.receivers { + for _, r := range c.args.Receivers { r.Chan() <- lokiEntry } c.mut.RUnlock() @@ -129,7 +127,6 @@ func (c *Component) Update(args component.Arguments) error { c.mut.Lock() defer c.mut.Unlock() c.args = newArgs - c.receivers = newArgs.Receivers select { case c.targetsUpdated <- struct{}{}: default: // Update notification already sent @@ -160,8 +157,8 @@ func (c *Component) startDrainingRoutine() func() { readCtx, cancel := context.WithCancel(context.Background()) c.mut.RLock() defer c.mut.RUnlock() - receiversCopy := make([]loki.LogsReceiver, len(c.receivers)) - copy(receiversCopy, c.receivers) + receiversCopy := make([]loki.LogsReceiver, len(c.args.Receivers)) + copy(receiversCopy, c.args.Receivers) go func() { for { select { From e865f02bce35e9e8fcb8653d0950f956b5cb669c Mon Sep 17 00:00:00 2001 From: Piotr <17101802+thampiotr@users.noreply.github.com> Date: Wed, 8 Oct 2025 16:46:36 +0100 Subject: [PATCH 4/4] rephrase changelog --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index ec796e8454b..7ab30194274 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -43,7 +43,7 @@ Main (unreleased) - Fix `prometheus.exporter.cloudwatch` to not always emit debug logs but respect debug property. (@kalleep) -- Fix potential deadlock in `loki.source.journal` when stopping the component. (@thampiotr) +- Fix potential deadlock in `loki.source.journal` when stopping or reloading the component. (@thampiotr) v1.11.0 -----------------