diff --git a/internal/component/loki/source/file/internal/tail/bom.go b/internal/component/loki/source/file/internal/tail/bom.go index 295751a619a..7e4edb92bff 100644 --- a/internal/component/loki/source/file/internal/tail/bom.go +++ b/internal/component/loki/source/file/internal/tail/bom.go @@ -1,8 +1,8 @@ package tail import ( - "bufio" "bytes" + "io" "golang.org/x/text/encoding" "golang.org/x/text/encoding/unicode" @@ -30,15 +30,18 @@ const ( // detectBOM tries to detect a BOM from reader. It is important that the reader // and underlying file are positioned at the beginning of the file -// when calling this function, as it peeks at the first bytes to detect the BOM. -func detectBOM(br *bufio.Reader, offset int64) (int64, BOM) { - // Peek up to 4 bytes (longest BOM) - buf, err := br.Peek(4) +// when calling this function, as it reads the first 4 bytes to detect the BOM. +func detectBOM(r io.Reader, offset int64) (int64, BOM) { + buf := make([]byte, 4) + + n, err := r.Read(buf) if err != nil { return offset, bomUNKNOWN } var bom BOM + buf = buf[:n] + switch { case bytes.HasPrefix(buf, bomUTF8Bytes): bom = bomUTF8 diff --git a/internal/component/loki/source/file/internal/tail/config.go b/internal/component/loki/source/file/internal/tail/config.go index bdbee142a2d..d3699feaea4 100644 --- a/internal/component/loki/source/file/internal/tail/config.go +++ b/internal/component/loki/source/file/internal/tail/config.go @@ -8,10 +8,14 @@ import ( type Config struct { // Filename is the path to the file to tail. Filename string + // Offset is the byte offset in the file where tailing should start. // If 0, tailing starts from the beginning of the file. Offset int64 + // StartFromEnd will read from the end of the file if true and Offset is 0. + StartFromEnd bool + // Encoding used for file. If none is provided no encoding is used // and the file is assumed to be UTF-8. Encoding string diff --git a/internal/component/loki/source/file/internal/tail/encoding.go b/internal/component/loki/source/file/internal/tail/encoding.go index d2eb4ff6c84..c114f3f7900 100644 --- a/internal/component/loki/source/file/internal/tail/encoding.go +++ b/internal/component/loki/source/file/internal/tail/encoding.go @@ -12,3 +12,11 @@ func getEncoding(enc string) (encoding.Encoding, error) { return ianaindex.IANA.Encoding(enc) } + +func encodedNewline(e *encoding.Encoder) ([]byte, error) { + return e.Bytes([]byte{'\n'}) +} + +func encodedCarriageReturn(e *encoding.Encoder) ([]byte, error) { + return e.Bytes([]byte{'\r'}) +} diff --git a/internal/component/loki/source/file/internal/tail/file.go b/internal/component/loki/source/file/internal/tail/file.go index 8ecb389624d..058998cf49c 100644 --- a/internal/component/loki/source/file/internal/tail/file.go +++ b/internal/component/loki/source/file/internal/tail/file.go @@ -41,7 +41,7 @@ func NewFile(logger log.Logger, cfg *Config) (*File, error) { return nil, err } - reader, err := newReader(f, cfg.Offset, encoding, cfg.Compression) + reader, err := newReader(logger, f, cfg.Offset, encoding, cfg.Compression, cfg.StartFromEnd) if err != nil { f.Close() return nil, err diff --git a/internal/component/loki/source/file/internal/tail/file_test.go b/internal/component/loki/source/file/internal/tail/file_test.go index 2f276cf3aa1..4c9ae6dcabe 100644 --- a/internal/component/loki/source/file/internal/tail/file_test.go +++ b/internal/component/loki/source/file/internal/tail/file_test.go @@ -418,16 +418,57 @@ func TestFile(t *testing.T) { utf16offsets = [3]int64{14, 26, 38} ) + var ( + nopEncoder = encoding.Nop.NewEncoder() + utf16beEncoder = unicode.UTF16(unicode.BigEndian, unicode.IgnoreBOM).NewEncoder() + utf16beBOMEncoder = unicode.UTF16(unicode.BigEndian, unicode.UseBOM).NewEncoder() + utf16leEncoder = unicode.UTF16(unicode.LittleEndian, unicode.IgnoreBOM).NewEncoder() + utf16leBOMEncoder = unicode.UTF16(unicode.LittleEndian, unicode.UseBOM).NewEncoder() + ) + t.Run("read gzip", func(t *testing.T) { - compressionTest(t, "plain", "gz", encoding.Nop.NewEncoder(), utf8offsets) - compressionTest(t, "utf-16be", "gz", unicode.UTF16(unicode.BigEndian, unicode.UseBOM).NewEncoder(), utf16offsets) - compressionTest(t, "utf-16le", "gz", unicode.UTF16(unicode.LittleEndian, unicode.UseBOM).NewEncoder(), utf16offsets) + compressionTest(t, "plain", "gz", nopEncoder, utf8offsets) + compressionTest(t, "utf-16be", "gz", utf16beBOMEncoder, utf16offsets) + compressionTest(t, "utf-16le", "gz", utf16leBOMEncoder, utf16offsets) }) t.Run("read zlib", func(t *testing.T) { - compressionTest(t, "plain", "z", encoding.Nop.NewEncoder(), utf8offsets) - compressionTest(t, "utf-16be", "z", unicode.UTF16(unicode.BigEndian, unicode.UseBOM).NewEncoder(), utf16offsets) - compressionTest(t, "utf-16le", "z", unicode.UTF16(unicode.LittleEndian, unicode.UseBOM).NewEncoder(), utf16offsets) + compressionTest(t, "plain", "z", nopEncoder, utf8offsets) + compressionTest(t, "utf-16be", "z", utf16beBOMEncoder, utf16offsets) + compressionTest(t, "utf-16le", "z", utf16leBOMEncoder, utf16offsets) + }) + + t.Run("start from end", func(t *testing.T) { + startFromEndTest(t, "utf-8", nopEncoder, nopEncoder, false, 0, []Line{{Text: "line3", Offset: 18}}) + startFromEndTest(t, "utf-16be", utf16beBOMEncoder, utf16beEncoder, false, 0, []Line{{Text: "line3", Offset: 38}}) + startFromEndTest(t, "utf-16le", utf16leBOMEncoder, utf16leEncoder, false, 0, []Line{{Text: "line3", Offset: 38}}) + }) + + t.Run("start from end with start offset", func(t *testing.T) { + startFromEndTest(t, "utf-8", nopEncoder, nopEncoder, false, 6, []Line{ + {Text: "line2", Offset: 12}, + {Text: "line3", Offset: 18}, + }) + startFromEndTest(t, "utf-8-cr", nopEncoder, nopEncoder, true, 7, []Line{ + {Text: "line2", Offset: 14}, + {Text: "line3", Offset: 21}, + }) + startFromEndTest(t, "utf-16be", utf16beBOMEncoder, utf16beEncoder, false, 14, []Line{ + {Text: "line2", Offset: 26}, + {Text: "line3", Offset: 38}, + }) + startFromEndTest(t, "utf-16be-cr", utf16beBOMEncoder, utf16beEncoder, true, 16, []Line{ + {Text: "line2", Offset: 30}, + {Text: "line3", Offset: 44}, + }) + startFromEndTest(t, "utf-16le", utf16leBOMEncoder, utf16leEncoder, false, 14, []Line{ + {Text: "line2", Offset: 26}, + {Text: "line3", Offset: 38}, + }) + startFromEndTest(t, "utf-16le", utf16leBOMEncoder, utf16leEncoder, true, 16, []Line{ + {Text: "line2", Offset: 30}, + {Text: "line3", Offset: 44}, + }) }) } @@ -473,6 +514,51 @@ func compressionTest(t *testing.T, name, compression string, enc *encoding.Encod }) } +func startFromEndTest(t *testing.T, name string, encoder, appendEncoder *encoding.Encoder, useCR bool, offset int64, expected []Line) { + t.Run(name, func(t *testing.T) { + var ( + content string + err error + ) + + if useCR { + content, err = encoder.String("line1\r\nline2\r\n") + } else { + content, err = encoder.String("line1\nline2\n") + } + require.NoError(t, err) + + var toAppend string + + if useCR { + toAppend, err = appendEncoder.String("line3\r\n") + } else { + toAppend, err = appendEncoder.String("line3\n") + } + require.NoError(t, err) + + name := createFile(t, name, content) + defer removeFile(t, name) + + file, err := NewFile(log.NewNopLogger(), &Config{ + Filename: name, + Offset: offset, + StartFromEnd: true, + }) + require.NoError(t, err) + defer file.Stop() + + go func() { + time.Sleep(100 * time.Millisecond) + appendToFile(t, name, toAppend) + }() + + for _, line := range expected { + verifyResult(t, file, &line, nil) + } + }) +} + func createFile(tb testing.TB, name, content string) string { path := tb.TempDir() + "/" + name require.NoError(tb, os.WriteFile(path, []byte(content), 0600)) diff --git a/internal/component/loki/source/file/internal/tail/reader.go b/internal/component/loki/source/file/internal/tail/reader.go index 052c2f7217b..096cecea673 100644 --- a/internal/component/loki/source/file/internal/tail/reader.go +++ b/internal/component/loki/source/file/internal/tail/reader.go @@ -11,23 +11,23 @@ import ( "os" "unsafe" + "github.com/go-kit/log" "golang.org/x/text/encoding" + + "github.com/grafana/alloy/internal/runtime/logging/level" ) const defaultBufSize = 4096 // newReader creates a new reader that is used to read from file. // It is important that the provided file is positioned at the start of the file. -func newReader(f *os.File, offset int64, enc encoding.Encoding, compression string) (*reader, error) { +func newReader(logger log.Logger, f *os.File, offset int64, enc encoding.Encoding, compression string, startFromEnd bool) (*reader, error) { rr, err := newReaderAt(f, compression, 0) if err != nil { return nil, err } - br := bufio.NewReader(rr) - - var bom BOM - offset, bom = detectBOM(br, offset) + offsetAfterBOM, bom := detectBOM(rr, offset) enc = resolveEncodingFromBOM(bom, enc) var ( @@ -45,17 +45,25 @@ func newReader(f *os.File, offset int64, enc encoding.Encoding, compression stri return nil, err } - if offset != 0 { - rr, err = newReaderAt(f, compression, offset) + if offset == 0 && startFromEnd { + offset, err = lastNewline(f, nl) if err != nil { - return nil, err + level.Error(logger).Log("msg", "failed to get a position from the end of the file, default to start of file", "error", err) } - br.Reset(rr) + } + + if offsetAfterBOM > offset { + offset = offsetAfterBOM + } + + rr, err = newReaderAt(f, compression, offset) + if err != nil { + return nil, err } return &reader{ pos: offset, - br: br, + br: bufio.NewReader(rr), decoder: decoder, nl: nl, lastNl: nl[len(nl)-1], @@ -159,37 +167,22 @@ func (r *reader) reset(f *os.File, offset int64) error { if err != nil { return err } - r.br.Reset(rr) - offset, _ = detectBOM(r.br, offset) - if offset != 0 { - rr, err = newReaderAt(f, r.compression, offset) - if err != nil { - return nil - } - r.br.Reset(rr) + offset, _ = detectBOM(rr, offset) + rr, err = newReaderAt(f, r.compression, offset) + if err != nil { + return err } + r.br.Reset(rr) r.pos = offset r.pending = make([]byte, 0, defaultBufSize) return nil } -func encodedNewline(e *encoding.Encoder) ([]byte, error) { - out := make([]byte, 10) - nDst, _, err := e.Transform(out, []byte{'\n'}, true) - return out[:nDst], err -} - -func encodedCarriageReturn(e *encoding.Encoder) ([]byte, error) { - out := make([]byte, 10) - nDst, _, err := e.Transform(out, []byte{'\r'}, true) - return out[:nDst], err -} - func newReaderAt(f *os.File, compression string, offset int64) (io.Reader, error) { // NOTE: If compression is used we always need to read from the beginning. - if compression != "" && offset != 0 { + if compression != "" { if _, err := f.Seek(0, io.SeekStart); err != nil { return nil, err } @@ -208,10 +201,8 @@ func newReaderAt(f *os.File, compression string, offset int64) (io.Reader, error case "bz2": reader = bzip2.NewReader(f) default: - if offset != 0 { - if _, err := f.Seek(offset, io.SeekStart); err != nil { - return nil, err - } + if _, err := f.Seek(offset, io.SeekStart); err != nil { + return nil, err } reader = f diff --git a/internal/component/loki/source/file/internal/tail/util.go b/internal/component/loki/source/file/internal/tail/util.go new file mode 100644 index 00000000000..89d56e44ff9 --- /dev/null +++ b/internal/component/loki/source/file/internal/tail/util.go @@ -0,0 +1,54 @@ +package tail + +import ( + "bytes" + "io" + "os" +) + +// lastNewline returns the offset of the start of the last line in the file. +func lastNewline(file *os.File, nl []byte) (int64, error) { + fi, err := file.Stat() + if err != nil { + return 0, err + } + + n := fi.Size() + if n == 0 { + return 0, nil + } + + const chunkSize = 1024 + buf := make([]byte, chunkSize) + + var pos = n - chunkSize + if pos < 0 { + pos = 0 + } + + for { + _, err = file.Seek(pos, io.SeekStart) + if err != nil { + return 0, err + } + + bytesRead, err := file.Read(buf) + if err != nil { + return 0, err + } + + i := bytes.LastIndex(buf[:bytesRead], nl) + if i != -1 { + return pos + int64(i) + int64(len(nl)), nil + } + + if pos == 0 { + return 0, nil + } + + pos -= chunkSize + if pos < 0 { + pos = 0 + } + } +} diff --git a/internal/component/loki/source/file/internal/tail/util_test.go b/internal/component/loki/source/file/internal/tail/util_test.go new file mode 100644 index 00000000000..c35c17c2a1e --- /dev/null +++ b/internal/component/loki/source/file/internal/tail/util_test.go @@ -0,0 +1,102 @@ +package tail + +import ( + "os" + "path/filepath" + "testing" + + "github.com/stretchr/testify/require" + "golang.org/x/text/encoding" + "golang.org/x/text/encoding/unicode" +) + +func TestLastNewline(t *testing.T) { + encoder := encoding.Nop.NewEncoder() + t.Run("empty file", func(t *testing.T) { + lastNewlineTest(t, "empty", encoder, "", 0) + }) + + t.Run("UTF-8 no newline", func(t *testing.T) { + lastNewlineTest(t, "no-nl", encoder, "line1", 0) + }) + + t.Run("UTF-8 single newline at end", func(t *testing.T) { + lastNewlineTest(t, "end", encoder, "line1\n", 6) + }) + + t.Run("UTF-8 newline in middle", func(t *testing.T) { + lastNewlineTest(t, "middle", encoder, "line1\nline2", 6) + }) + + t.Run("UTF-8 last", func(t *testing.T) { + lastNewlineTest(t, "last", encoder, "line1\nline2\nline3\n", 18) + }) + + encoder = unicode.UTF16(unicode.LittleEndian, unicode.IgnoreBOM).NewEncoder() + t.Run("UTF-16LE empty", func(t *testing.T) { + lastNewlineTest(t, "empty", encoder, "", 0) + }) + + t.Run("UTF-16LE no newline", func(t *testing.T) { + lastNewlineTest(t, "no-nl", encoder, "line1", 0) + }) + + t.Run("UTF-16LE single newline at end", func(t *testing.T) { + lastNewlineTest(t, "end", encoder, "line1\n", 12) + }) + + t.Run("UTF-16LE newline in middle", func(t *testing.T) { + lastNewlineTest(t, "middle", encoder, "line1\nline2", 12) + }) + + t.Run("UTF-16LE last", func(t *testing.T) { + lastNewlineTest(t, "last", encoder, "line1\nline2\nline3\n", 36) + }) + + encoder = unicode.UTF16(unicode.BigEndian, unicode.IgnoreBOM).NewEncoder() + t.Run("UTF-16BE empty", func(t *testing.T) { + lastNewlineTest(t, "empty", encoder, "", 0) + }) + + t.Run("UTF-16BE no newline", func(t *testing.T) { + lastNewlineTest(t, "no-nl", encoder, "line1", 0) + }) + + t.Run("UTF-16BE single newline at end", func(t *testing.T) { + lastNewlineTest(t, "end", encoder, "line1\n", 12) + }) + + t.Run("UTF-16BE newline in middle", func(t *testing.T) { + lastNewlineTest(t, "middle", encoder, "line1\nline2", 12) + }) + + t.Run("UTF-16BE last", func(t *testing.T) { + lastNewlineTest(t, "last", encoder, "line1\nline2\nline3\n", 36) + }) +} + +func lastNewlineTest(t *testing.T, name string, encoder *encoding.Encoder, content string, expectedPos int64) { + encoded, err := encoder.String(content) + require.NoError(t, err) + + f := createFileWithContent(t, name, encoded) + defer os.Remove(f.Name()) + defer f.Close() + + nl, err := encodedNewline(encoder) + require.NoError(t, err) + + got, err := lastNewline(f, nl) + require.NoError(t, err) + require.Equal(t, expectedPos, got) +} + +// createTempFile creates a temp file with content and returns the open file (read-only seekable). +func createTempFile(t *testing.T, content []byte) *os.File { + t.Helper() + path := filepath.Join(t.TempDir(), "seektest") + require.NoError(t, os.WriteFile(path, content, 0600)) + f, err := os.Open(path) + require.NoError(t, err) + return f +} diff --git a/internal/component/loki/source/file/tailer.go b/internal/component/loki/source/file/tailer.go index 5a9e6bd902f..ae853127f59 100644 --- a/internal/component/loki/source/file/tailer.go +++ b/internal/component/loki/source/file/tailer.go @@ -4,7 +4,6 @@ package file // tailer implements the reader interface by using the github.com/grafana/tail package to tail files. import ( - "bytes" "context" "errors" "fmt" @@ -81,62 +80,6 @@ func newTailer( } } -// getLastLinePosition returns the offset of the start of the last line in the file at the given path. -// It will read chunks of bytes starting from the end of the file to return the position of the last '\n' + 1. -// If it cannot find any '\n' it will return 0. -func getLastLinePosition(path string) (int64, error) { - file, err := os.Open(path) - if err != nil { - return 0, err - } - defer file.Close() - - const chunkSize = 1024 - - buf := make([]byte, chunkSize) - fi, err := file.Stat() - if err != nil { - return 0, err - } - - if fi.Size() == 0 { - return 0, nil - } - - var pos = fi.Size() - chunkSize - if pos < 0 { - pos = 0 - } - - for { - _, err = file.Seek(pos, io.SeekStart) - if err != nil { - return 0, err - } - - bytesRead, err := file.Read(buf) - if err != nil { - return 0, err - } - - idx := bytes.LastIndexByte(buf[:bytesRead], '\n') - // newline found - if idx != -1 { - return pos + int64(idx) + 1, nil - } - - // no newline found in the entire file - if pos == 0 { - return 0, nil - } - - pos -= chunkSize - if pos < 0 { - pos = 0 - } - } -} - func (t *tailer) Run(ctx context.Context) { // Check if context was canceled between two calls to Run. select { @@ -182,17 +125,16 @@ func (t *tailer) initRun() (int64, error) { return 0, fmt.Errorf("failed to tail file: %w", err) } + startFromEnd := t.tailFromEnd + pos, err := t.positions.Get(t.key.Path, t.key.Labels) if err != nil { switch t.onPositionsFileError { case OnPositionsFileErrorSkip: return 0, fmt.Errorf("failed to get file position: %w", err) case OnPositionsFileErrorRestartEnd: - pos, err = getLastLinePosition(t.key.Path) - if err != nil { - return 0, fmt.Errorf("failed to get last line position after positions error: %w", err) - } - level.Info(t.logger).Log("msg", "retrieved the position of the last line after positions error") + startFromEnd = true + level.Info(t.logger).Log("msg", "reset position to end of file after position error") default: level.Debug(t.logger).Log("msg", "unrecognized `on_positions_file_error` option, defaulting to `restart_from_beginning`", "option", t.onPositionsFileError) fallthrough @@ -220,20 +162,10 @@ func (t *tailer) initRun() (int64, error) { t.positions.Remove(t.key.Path, t.key.Labels) } - // If no cached position is found and the tailFromEnd option is enabled. - if pos == 0 && t.tailFromEnd { - pos, err = getLastLinePosition(t.key.Path) - if err != nil { - level.Error(t.logger).Log("msg", "failed to get a position from the end of the file, default to start of file", "error", err) - } else { - t.positions.Put(t.key.Path, t.key.Labels, pos) - level.Info(t.logger).Log("msg", "retrieved and stored the position of the last line") - } - } - tail, err := tail.NewFile(t.logger, &tail.Config{ Filename: t.key.Path, Offset: pos, + StartFromEnd: startFromEnd, Encoding: t.encoding, Compression: t.decompression.GetFormat(), WatcherConfig: t.watcherConfig, diff --git a/internal/component/loki/source/file/tailer_test.go b/internal/component/loki/source/file/tailer_test.go index cf00c9243f5..e4829fa7428 100644 --- a/internal/component/loki/source/file/tailer_test.go +++ b/internal/component/loki/source/file/tailer_test.go @@ -1,7 +1,6 @@ package file import ( - "bytes" "context" "os" "path/filepath" @@ -21,79 +20,6 @@ import ( "github.com/grafana/alloy/internal/util" ) -func createTempFileWithContent(t *testing.T, content []byte) string { - t.Helper() - tmpfile, err := os.CreateTemp(t.TempDir(), "testfile") - if err != nil { - t.Fatalf("Failed to create temp file: %v", err) - } - - _, err = tmpfile.Write(content) - if err != nil { - tmpfile.Close() - t.Fatalf("Failed to write to temp file: %v", err) - } - - tmpfile.Close() - return tmpfile.Name() -} - -func TestGetLastLinePosition(t *testing.T) { - tests := []struct { - name string - content []byte - expected int64 - }{ - { - name: "File ending with newline", - content: []byte("Hello, World!\n"), - expected: 14, // Position after last '\n' - }, - { - name: "Newline in the middle", - content: []byte("Hello\nWorld"), - expected: 6, // Position after the '\n' in "Hello\n" - }, - { - name: "File not ending with newline", - content: []byte("Hello, World!"), - expected: 0, - }, - { - name: "File bigger than chunkSize without newline", - content: bytes.Repeat([]byte("A"), 1025), - expected: 0, - }, - { - name: "File bigger than chunkSize with newline in between", - content: append([]byte("Hello\n"), bytes.Repeat([]byte("A"), 1025)...), - expected: 6, // Position after the "Hello\n" - }, - { - name: "Empty file", - content: []byte(""), - expected: 0, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - filename := createTempFileWithContent(t, tt.content) - defer os.Remove(filename) - - got, err := getLastLinePosition(filename) - if err != nil { - t.Errorf("unexpected error: %v", err) - return - } - - if got != tt.expected { - t.Errorf("for content %q, expected position %d but got %d", tt.content, tt.expected, got) - } - }) - } -} - func TestTailer(t *testing.T) { defer goleak.VerifyNone(t, goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start")) l := logging.NewNop()