diff --git a/CHANGELOG.md b/CHANGELOG.md index 48a14027f8b..7ab30194274 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -43,6 +43,8 @@ Main (unreleased) - Fix `prometheus.exporter.cloudwatch` to not always emit debug logs but respect debug property. (@kalleep) +- Fix potential deadlock in `loki.source.journal` when stopping or reloading the component. (@thampiotr) + v1.11.0 ----------------- diff --git a/internal/component/loki/source/journal/journal.go b/internal/component/loki/source/journal/journal.go index f46e4e73795..0826412b5f2 100644 --- a/internal/component/loki/source/journal/journal.go +++ b/internal/component/loki/source/journal/journal.go @@ -38,16 +38,15 @@ var _ component.Component = (*Component)(nil) // Component represents reading from a journal type Component struct { - mut sync.RWMutex - t *target.JournalTarget - metrics *target.Metrics - o component.Options - handler chan loki.Entry - positions positions.Positions - receivers []loki.LogsReceiver - argsUpdated chan struct{} - args Arguments - healthErr error + mut sync.RWMutex + t *target.JournalTarget + metrics *target.Metrics + o component.Options + handler chan loki.Entry + positions positions.Positions + targetsUpdated chan struct{} + args Arguments + healthErr error } // New creates a new component. @@ -73,13 +72,12 @@ func New(o component.Options, args Arguments) (*Component, error) { } c := &Component{ - metrics: target.NewMetrics(o.Registerer), - o: o, - handler: make(chan loki.Entry), - positions: positionsFile, - receivers: args.Receivers, - argsUpdated: make(chan struct{}, 1), - args: args, + metrics: target.NewMetrics(o.Registerer), + o: o, + handler: make(chan loki.Entry), + positions: positionsFile, + targetsUpdated: make(chan struct{}, 1), + args: args, } err = c.Update(args) return c, err @@ -88,18 +86,25 @@ func New(o component.Options, args Arguments) (*Component, error) { // Run starts the component. func (c *Component) Run(ctx context.Context) error { defer func() { + // Start draining routine to prevent potential deadlock if target attempts to send during Stop(). + cancel := c.startDrainingRoutine() + defer cancel() + + // Stop existing target c.mut.RLock() + defer c.mut.RUnlock() if c.t != nil { + level.Info(c.o.Logger).Log("msg", "loki.source.journal component shutting down, stopping journal target") err := c.t.Stop() if err != nil { level.Warn(c.o.Logger).Log("msg", "error stopping journal target", "err", err) } } - c.mut.RUnlock() - }() for { select { + case <-c.targetsUpdated: + c.reloadTargets() case <-ctx.Done(): return nil case entry := <-c.handler: @@ -108,31 +113,10 @@ func (c *Component) Run(ctx context.Context) error { Labels: entry.Labels, Entry: entry.Entry, } - for _, r := range c.receivers { + for _, r := range c.args.Receivers { r.Chan() <- lokiEntry } c.mut.RUnlock() - case <-c.argsUpdated: - c.mut.Lock() - if c.t != nil { - err := c.t.Stop() - if err != nil { - level.Error(c.o.Logger).Log("msg", "error stopping journal target", "err", err) - } - c.t = nil - } - rcs := alloy_relabel.ComponentToPromRelabelConfigs(c.args.RelabelRules) - entryHandler := loki.NewEntryHandler(c.handler, func() {}) - - newTarget, err := target.NewJournalTarget(c.metrics, c.o.Logger, entryHandler, c.positions, c.o.ID, rcs, convertArgs(c.o.ID, c.args)) - if err != nil { - level.Error(c.o.Logger).Log("msg", "error creating journal target", "err", err, "path", c.args.Path) - c.healthErr = fmt.Errorf("error creating journal target: %w", err) - } else { - c.t = newTarget - c.healthErr = nil - } - c.mut.Unlock() } } } @@ -144,7 +128,7 @@ func (c *Component) Update(args component.Arguments) error { defer c.mut.Unlock() c.args = newArgs select { - case c.argsUpdated <- struct{}{}: + case c.targetsUpdated <- struct{}{}: default: // Update notification already sent } return nil @@ -169,6 +153,71 @@ func (c *Component) CurrentHealth() component.Health { } } +func (c *Component) startDrainingRoutine() func() { + readCtx, cancel := context.WithCancel(context.Background()) + c.mut.RLock() + defer c.mut.RUnlock() + receiversCopy := make([]loki.LogsReceiver, len(c.args.Receivers)) + copy(receiversCopy, c.args.Receivers) + go func() { + for { + select { + case <-readCtx.Done(): + return + case entry := <-c.handler: + lokiEntry := loki.Entry{ + Labels: entry.Labels, + Entry: entry.Entry, + } + for _, r := range receiversCopy { + r.Chan() <- lokiEntry + } + } + } + }() + return cancel +} + +func (c *Component) reloadTargets() { + // Start draining routine to prevent potential deadlock if target attempts to send during Stop(). + cancel := c.startDrainingRoutine() + + // Grab current state + c.mut.RLock() + var targetToStop *target.JournalTarget + if c.t != nil { + targetToStop = c.t + } + rcs := alloy_relabel.ComponentToPromRelabelConfigs(c.args.RelabelRules) + c.mut.RUnlock() + + // Stop existing target + if targetToStop != nil { + err := targetToStop.Stop() + if err != nil { + level.Error(c.o.Logger).Log("msg", "error stopping journal target", "err", err) + } + } + + // Stop draining routine + cancel() + + // Create new target + c.mut.Lock() + defer c.mut.Unlock() + c.t = nil + entryHandler := loki.NewEntryHandler(c.handler, func() {}) + + newTarget, err := target.NewJournalTarget(c.metrics, c.o.Logger, entryHandler, c.positions, c.o.ID, rcs, convertArgs(c.o.ID, c.args)) + if err != nil { + level.Error(c.o.Logger).Log("msg", "error creating journal target", "err", err, "path", c.args.Path) + c.healthErr = fmt.Errorf("error creating journal target: %w", err) + } else { + c.t = newTarget + c.healthErr = nil + } +} + func convertArgs(job string, a Arguments) *scrapeconfig.JournalTargetConfig { labels := model.LabelSet{ model.LabelName("job"): model.LabelValue(job), diff --git a/internal/component/loki/source/syslog/syslog.go b/internal/component/loki/source/syslog/syslog.go index b18ccec03e8..915a0be9202 100644 --- a/internal/component/loki/source/syslog/syslog.go +++ b/internal/component/loki/source/syslog/syslog.go @@ -137,11 +137,9 @@ func (c *Component) startDrainingRoutine() func() { case <-readCtx.Done(): return case entry := <-c.handler.Chan(): - c.mut.RLock() for _, receiver := range fanoutCopy { receiver.Chan() <- entry } - c.mut.RUnlock() } } }() @@ -152,20 +150,23 @@ func (c *Component) reloadTargets() { // Start draining routine to prevent potential deadlock if targets attempt to send during Stop(). cancel := c.startDrainingRoutine() - // Stop all targets + // Grab current state c.mut.RLock() var rcs []*relabel.Config if len(c.args.RelabelRules) > 0 { rcs = alloy_relabel.ComponentToPromRelabelConfigs(c.args.RelabelRules) } + targetsToStop := make([]*st.SyslogTarget, len(c.targets)) + copy(targetsToStop, c.targets) + c.mut.RUnlock() - for _, l := range c.targets { + // Stop existing targets + for _, l := range targetsToStop { err := l.Stop() if err != nil { level.Error(c.opts.Logger).Log("msg", "error while stopping syslog listener", "err", err) } } - c.mut.RUnlock() // Stop draining routine cancel()