Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ Main (unreleased)

- Fix `prometheus.exporter.cloudwatch` to not always emit debug logs but respect debug property. (@kalleep)

- Fix potential deadlock in `loki.source.journal` when stopping or reloading the component. (@thampiotr)

v1.11.0
-----------------

Expand Down
133 changes: 91 additions & 42 deletions internal/component/loki/source/journal/journal.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,15 @@ var _ component.Component = (*Component)(nil)

// Component represents reading from a journal
type Component struct {
mut sync.RWMutex
t *target.JournalTarget
metrics *target.Metrics
o component.Options
handler chan loki.Entry
positions positions.Positions
receivers []loki.LogsReceiver
argsUpdated chan struct{}
args Arguments
healthErr error
mut sync.RWMutex
t *target.JournalTarget
metrics *target.Metrics
o component.Options
handler chan loki.Entry
positions positions.Positions
targetsUpdated chan struct{}
args Arguments
healthErr error
}

// New creates a new component.
Expand All @@ -73,13 +72,12 @@ func New(o component.Options, args Arguments) (*Component, error) {
}

c := &Component{
metrics: target.NewMetrics(o.Registerer),
o: o,
handler: make(chan loki.Entry),
positions: positionsFile,
receivers: args.Receivers,
argsUpdated: make(chan struct{}, 1),
args: args,
metrics: target.NewMetrics(o.Registerer),
o: o,
handler: make(chan loki.Entry),
positions: positionsFile,
targetsUpdated: make(chan struct{}, 1),
args: args,
}
err = c.Update(args)
return c, err
Expand All @@ -88,18 +86,25 @@ func New(o component.Options, args Arguments) (*Component, error) {
// Run starts the component.
func (c *Component) Run(ctx context.Context) error {
defer func() {
// Start draining routine to prevent potential deadlock if target attempts to send during Stop().
cancel := c.startDrainingRoutine()
defer cancel()

// Stop existing target
c.mut.RLock()
defer c.mut.RUnlock()
if c.t != nil {
level.Info(c.o.Logger).Log("msg", "loki.source.journal component shutting down, stopping journal target")
err := c.t.Stop()
if err != nil {
level.Warn(c.o.Logger).Log("msg", "error stopping journal target", "err", err)
}
}
c.mut.RUnlock()

}()
for {
select {
case <-c.targetsUpdated:
c.reloadTargets()
case <-ctx.Done():
return nil
case entry := <-c.handler:
Expand All @@ -108,31 +113,10 @@ func (c *Component) Run(ctx context.Context) error {
Labels: entry.Labels,
Entry: entry.Entry,
}
for _, r := range c.receivers {
for _, r := range c.args.Receivers {
r.Chan() <- lokiEntry
}
c.mut.RUnlock()
case <-c.argsUpdated:
c.mut.Lock()
if c.t != nil {
err := c.t.Stop()
if err != nil {
level.Error(c.o.Logger).Log("msg", "error stopping journal target", "err", err)
}
c.t = nil
}
rcs := alloy_relabel.ComponentToPromRelabelConfigs(c.args.RelabelRules)
entryHandler := loki.NewEntryHandler(c.handler, func() {})

newTarget, err := target.NewJournalTarget(c.metrics, c.o.Logger, entryHandler, c.positions, c.o.ID, rcs, convertArgs(c.o.ID, c.args))
if err != nil {
level.Error(c.o.Logger).Log("msg", "error creating journal target", "err", err, "path", c.args.Path)
c.healthErr = fmt.Errorf("error creating journal target: %w", err)
} else {
c.t = newTarget
c.healthErr = nil
}
c.mut.Unlock()
}
}
}
Expand All @@ -144,7 +128,7 @@ func (c *Component) Update(args component.Arguments) error {
defer c.mut.Unlock()
c.args = newArgs
select {
case c.argsUpdated <- struct{}{}:
case c.targetsUpdated <- struct{}{}:
default: // Update notification already sent
}
return nil
Expand All @@ -169,6 +153,71 @@ func (c *Component) CurrentHealth() component.Health {
}
}

func (c *Component) startDrainingRoutine() func() {
readCtx, cancel := context.WithCancel(context.Background())
c.mut.RLock()
defer c.mut.RUnlock()
receiversCopy := make([]loki.LogsReceiver, len(c.args.Receivers))
copy(receiversCopy, c.args.Receivers)
go func() {
for {
select {
case <-readCtx.Done():
return
case entry := <-c.handler:
lokiEntry := loki.Entry{
Labels: entry.Labels,
Entry: entry.Entry,
}
for _, r := range receiversCopy {
r.Chan() <- lokiEntry
}
}
}
}()
return cancel
}

func (c *Component) reloadTargets() {
// Start draining routine to prevent potential deadlock if target attempts to send during Stop().
cancel := c.startDrainingRoutine()

// Grab current state
c.mut.RLock()
var targetToStop *target.JournalTarget
if c.t != nil {
targetToStop = c.t
}
rcs := alloy_relabel.ComponentToPromRelabelConfigs(c.args.RelabelRules)
c.mut.RUnlock()

// Stop existing target
if targetToStop != nil {
err := targetToStop.Stop()
if err != nil {
level.Error(c.o.Logger).Log("msg", "error stopping journal target", "err", err)
}
}

// Stop draining routine
cancel()

// Create new target
c.mut.Lock()
defer c.mut.Unlock()
c.t = nil
entryHandler := loki.NewEntryHandler(c.handler, func() {})

newTarget, err := target.NewJournalTarget(c.metrics, c.o.Logger, entryHandler, c.positions, c.o.ID, rcs, convertArgs(c.o.ID, c.args))
if err != nil {
level.Error(c.o.Logger).Log("msg", "error creating journal target", "err", err, "path", c.args.Path)
c.healthErr = fmt.Errorf("error creating journal target: %w", err)
} else {
c.t = newTarget
c.healthErr = nil
}
}

func convertArgs(job string, a Arguments) *scrapeconfig.JournalTargetConfig {
labels := model.LabelSet{
model.LabelName("job"): model.LabelValue(job),
Expand Down
11 changes: 6 additions & 5 deletions internal/component/loki/source/syslog/syslog.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,11 +137,9 @@ func (c *Component) startDrainingRoutine() func() {
case <-readCtx.Done():
return
case entry := <-c.handler.Chan():
c.mut.RLock()
for _, receiver := range fanoutCopy {
receiver.Chan() <- entry
}
c.mut.RUnlock()
}
}
}()
Expand All @@ -152,20 +150,23 @@ func (c *Component) reloadTargets() {
// Start draining routine to prevent potential deadlock if targets attempt to send during Stop().
cancel := c.startDrainingRoutine()

// Stop all targets
// Grab current state
c.mut.RLock()
var rcs []*relabel.Config
if len(c.args.RelabelRules) > 0 {
rcs = alloy_relabel.ComponentToPromRelabelConfigs(c.args.RelabelRules)
}
targetsToStop := make([]*st.SyslogTarget, len(c.targets))
copy(targetsToStop, c.targets)
c.mut.RUnlock()

for _, l := range c.targets {
// Stop existing targets
for _, l := range targetsToStop {
err := l.Stop()
if err != nil {
level.Error(c.opts.Logger).Log("msg", "error while stopping syslog listener", "err", err)
}
}
c.mut.RUnlock()

// Stop draining routine
cancel()
Expand Down
Loading