Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions internal/component/loki/source/file/tailer.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,9 +265,7 @@ func (t *tailer) stop(done chan struct{}) {
// We need to cleanup created metrics
t.cleanupMetrics()

// If the component is not stopping, then it means that the target for this component is gone and that
// we should clear the entry from the positions file.
if !t.componentStopping() {
if !t.shouldKeepPosition() {
t.positions.Remove(t.key.Path, t.key.Labels)
}
}
Expand All @@ -276,6 +274,14 @@ func (t *tailer) Key() positions.Entry {
return t.key
}

func (t *tailer) shouldKeepPosition() bool {
// NOTE: We want to keep position if component is stopping or decompression is enabled.
// If component is not stopping that means that target is gone and we should no longer tail the file.
// If decompression is enabled we read file until we reach EOF and stop so tailer will exit, but we need
// to remember the position so that we don't re-ingest it on restart.
return t.componentStopping() || t.decompression.Enabled
}

// cleanupMetrics removes all metrics exported by this tailer
func (t *tailer) cleanupMetrics() {
// When we stop tailing the file, also un-export metrics related to the file
Expand Down
3 changes: 2 additions & 1 deletion internal/component/loki/source/file/tailer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,8 @@ func TestTailer_Compressions(t *testing.T) {
logger,
handler.Receiver(),
positionsFile,
func() bool { return true },
// We return false here to verify that position is kept and we don't re-ingest the file.
func() bool { return false },
sourceOptions{
path: filename,
labels: labels,
Expand Down
Loading