diff --git a/internal/component/loki/source/file/file.go b/internal/component/loki/source/file/file.go index 5d35134c6ae..4c23db283f5 100644 --- a/internal/component/loki/source/file/file.go +++ b/internal/component/loki/source/file/file.go @@ -252,8 +252,6 @@ func New(o component.Options, args Arguments) (*Component, error) { func (c *Component) Run(ctx context.Context) error { defer func() { level.Info(c.opts.Logger).Log("msg", "loki.source.file component shutting down, stopping sources and positions file") - // We need to stop posFile first so we don't record entries we are draining - c.posFile.Stop() // Start black hole drain routine to prevent deadlock when we call c.scheduler.Stop(). source.Drain(c.handler, func() { @@ -264,6 +262,8 @@ func (c *Component) Run(ctx context.Context) error { close(c.handler.Chan()) c.mut.Unlock() }) + + c.posFile.Stop() }() var wg sync.WaitGroup diff --git a/internal/component/loki/source/file/tailer.go b/internal/component/loki/source/file/tailer.go index b02237412ce..5a9e6bd902f 100644 --- a/internal/component/loki/source/file/tailer.go +++ b/internal/component/loki/source/file/tailer.go @@ -145,7 +145,7 @@ func (t *tailer) Run(ctx context.Context) { default: } - err := t.initRun() + pos, err := t.initRun() if err != nil { // We are retrying tailers until the target has disappeared. // We are mostly interested in this log if this happens directly when @@ -165,7 +165,7 @@ func (t *tailer) Run(ctx context.Context) { ctx, cancel := context.WithCancel(ctx) go func() { // readLines closes done on exit - t.readLines(done) + t.readLines(pos, done) cancel() }() @@ -176,21 +176,21 @@ func (t *tailer) Run(ctx context.Context) { t.stop(done) } -func (t *tailer) initRun() error { +func (t *tailer) initRun() (int64, error) { fi, err := os.Stat(t.key.Path) if err != nil { - return fmt.Errorf("failed to tail file: %w", err) + return 0, fmt.Errorf("failed to tail file: %w", err) } pos, err := t.positions.Get(t.key.Path, t.key.Labels) if err != nil { switch t.onPositionsFileError { case OnPositionsFileErrorSkip: - return fmt.Errorf("failed to get file position: %w", err) + return 0, fmt.Errorf("failed to get file position: %w", err) case OnPositionsFileErrorRestartEnd: pos, err = getLastLinePosition(t.key.Path) if err != nil { - return fmt.Errorf("failed to get last line position after positions error: %w", err) + return 0, fmt.Errorf("failed to get last line position after positions error: %w", err) } level.Info(t.logger).Log("msg", "retrieved the position of the last line after positions error") default: @@ -207,7 +207,7 @@ func (t *tailer) initRun() error { if pos == 0 && t.legacyPositionUsed { pos, err = t.positions.Get(t.key.Path, "{}") if err != nil { - return fmt.Errorf("failed to get file position with empty labels: %w", err) + return 0, fmt.Errorf("failed to get file position with empty labels: %w", err) } } @@ -240,19 +240,19 @@ func (t *tailer) initRun() error { }) if err != nil { - return fmt.Errorf("failed to tail the file: %w", err) + return pos, fmt.Errorf("failed to tail the file: %w", err) } t.file = tail - return nil + return pos, nil } // readLines reads lines from the tailed file by calling Next() in a loop. // It processes each line by sending it to the receiver's channel and updates // position tracking periodically. It exits when Next() returns an error, // this happens when the tail.File is stopped or or we have a unrecoverable error. -func (t *tailer) readLines(done chan struct{}) { +func (t *tailer) readLines(pos int64, done chan struct{}) { level.Info(t.logger).Log("msg", "tail routine started") if t.decompression.Enabled && t.decompression.InitialDelay > 0 { @@ -261,8 +261,8 @@ func (t *tailer) readLines(done chan struct{}) { } var ( + lastOffset = pos entries = t.receiver.Chan() - lastOffset = int64(0) positionInterval = t.positions.SyncPeriod() lastUpdatedPosition = time.Time{} )