diff --git a/internal/component/loki/source/file/tailer.go b/internal/component/loki/source/file/tailer.go index 0ff4f515194..404b94b4c34 100644 --- a/internal/component/loki/source/file/tailer.go +++ b/internal/component/loki/source/file/tailer.go @@ -265,9 +265,7 @@ func (t *tailer) stop(done chan struct{}) { // We need to cleanup created metrics t.cleanupMetrics() - // If the component is not stopping, then it means that the target for this component is gone and that - // we should clear the entry from the positions file. - if !t.componentStopping() { + if !t.shouldKeepPosition() { t.positions.Remove(t.key.Path, t.key.Labels) } } @@ -276,6 +274,14 @@ func (t *tailer) Key() positions.Entry { return t.key } +func (t *tailer) shouldKeepPosition() bool { + // NOTE: We want to keep position if component is stopping or decompression is enabled. + // If component is not stopping that means that target is gone and we should no longer tail the file. + // If decompression is enabled we read file until we reach EOF and stop so tailer will exit, but we need + // to remember the position so that we don't re-ingest it on restart. + return t.componentStopping() || t.decompression.Enabled +} + // cleanupMetrics removes all metrics exported by this tailer func (t *tailer) cleanupMetrics() { // When we stop tailing the file, also un-export metrics related to the file diff --git a/internal/component/loki/source/file/tailer_test.go b/internal/component/loki/source/file/tailer_test.go index e4829fa7428..3d48ffd798c 100644 --- a/internal/component/loki/source/file/tailer_test.go +++ b/internal/component/loki/source/file/tailer_test.go @@ -361,7 +361,8 @@ func TestTailer_Compressions(t *testing.T) { logger, handler.Receiver(), positionsFile, - func() bool { return true }, + // We return false here to verify that position is kept and we don't re-ingest the file. + func() bool { return false }, sourceOptions{ path: filename, labels: labels,