From 2e11abe65b4faa598681ae6cc8be49235f616202 Mon Sep 17 00:00:00 2001 From: Kalle <23356117+kalleep@users.noreply.github.com> Date: Tue, 13 Jan 2026 13:44:22 +0100 Subject: [PATCH 01/21] abstract all reading / decoding into scanner --- .../loki/source/file/internal/tail/file.go | 71 ++++++------------- .../loki/source/file/internal/tail/scanner.go | 46 ++++++++++++ internal/component/loki/source/file/tailer.go | 10 +-- 3 files changed, 74 insertions(+), 53 deletions(-) create mode 100644 internal/component/loki/source/file/internal/tail/scanner.go diff --git a/internal/component/loki/source/file/internal/tail/file.go b/internal/component/loki/source/file/internal/tail/file.go index e1a25d23c04..218947a8ae1 100644 --- a/internal/component/loki/source/file/internal/tail/file.go +++ b/internal/component/loki/source/file/internal/tail/file.go @@ -1,18 +1,17 @@ package tail import ( - "bufio" "context" "errors" "fmt" "io" "os" - "strings" "sync" "time" "github.com/go-kit/log" "github.com/grafana/dskit/backoff" + "golang.org/x/text/encoding" "github.com/grafana/alloy/internal/component/loki/source/file/internal/tail/fileext" "github.com/grafana/alloy/internal/runtime/logging/level" @@ -35,6 +34,10 @@ func NewFile(logger log.Logger, cfg *Config) (*File, error) { } } + if cfg.Decoder == nil { + cfg.Decoder = encoding.Nop.NewDecoder() + } + if cfg.WatcherConfig == (WatcherConfig{}) { cfg.WatcherConfig = defaultWatcherConfig } @@ -44,12 +47,12 @@ func NewFile(logger log.Logger, cfg *Config) (*File, error) { ctx, cancel := context.WithCancel(context.Background()) return &File{ - cfg: cfg, - logger: logger, - file: f, - reader: newReader(f, cfg), - ctx: ctx, - cancel: cancel, + cfg: cfg, + logger: logger, + file: f, + scanner: newScanner(f, cfg.Decoder), + ctx: ctx, + cancel: cancel, }, nil } @@ -61,9 +64,9 @@ type File struct { logger log.Logger // protects file, reader, and lastOffset. - mu sync.Mutex - file *os.File - reader *bufio.Reader + mu sync.Mutex + file *os.File + scanner *scanner lastOffset int64 @@ -98,7 +101,7 @@ read: return &line, nil } - text, err := f.readLine() + text, err := f.scanner.next() if err != nil { if errors.Is(err, io.EOF) { if err := f.wait(text != ""); err != nil { @@ -109,7 +112,7 @@ read: return nil, err } - offset, err := f.offset() + offset, err := f.scanner.offset() if err != nil { return nil, err } @@ -149,7 +152,7 @@ func (f *File) Stop() error { // wait blocks until a file event is detected (modification, truncation, or deletion). // Returns an error if the context is canceled or an unrecoverable error occurs. func (f *File) wait(partial bool) error { - offset, err := f.offset() + offset, err := f.scanner.offset() if err != nil { return err } @@ -161,7 +164,7 @@ func (f *File) wait(partial bool) error { if partial { // We need to reset to last successful offset because we consumed a partial line. f.file.Seek(f.lastOffset, io.SeekStart) - f.reader.Reset(f.file) + f.scanner.reset(f.file) } return nil case eventTruncated: @@ -182,16 +185,6 @@ func (f *File) wait(partial bool) error { } } -// readLine reads a single line from the file, including the newline character. -// The newline and any trailing carriage return (for Windows line endings) are stripped. -func (f *File) readLine() (string, error) { - line, err := f.reader.ReadString('\n') - if err != nil { - return line, err - } - return strings.TrimRight(line, "\r\n"), err -} - // drain reads all remaining complete lines from the current file handle and stores // them in bufferedLines. This is called when a file deletion/rotation is detected // to ensure we don't lose any data from the old file before switching to the new one. @@ -200,13 +193,13 @@ func (f *File) drain() { if _, err := f.file.Seek(f.lastOffset, io.SeekStart); err != nil { return } - f.reader.Reset(f.file) + f.scanner.reset(f.file) for { - text, err := f.readLine() + text, err := f.scanner.next() if err != nil { if text != "" { - offset, err := f.offset() + offset, err := f.scanner.offset() if err != nil { return } @@ -219,7 +212,7 @@ func (f *File) drain() { return } - offset, err := f.offset() + offset, err := f.scanner.offset() if err != nil { return } @@ -232,17 +225,6 @@ func (f *File) drain() { } } -// offset returns the current byte offset in the file where the next read will occur. -// It accounts for buffered data in the reader. -func (f *File) offset() (int64, error) { - offset, err := f.file.Seek(0, io.SeekCurrent) - if err != nil { - return 0, err - } - - return offset - int64(f.reader.Buffered()), nil -} - // reopen closes the current file handle and opens a new one for the same file path. // If truncated is true, it indicates the file was truncated and we should reopen immediately. // If truncated is false, it indicates the file was deleted or moved, and we should wait @@ -298,16 +280,9 @@ func (f *File) reopen(truncated bool) error { } f.file = file - f.reader.Reset(f.file) + f.scanner.reset(f.file) break } return backoff.Err() } - -func newReader(f *os.File, cfg *Config) *bufio.Reader { - if cfg.Decoder != nil { - return bufio.NewReader(cfg.Decoder.Reader(f)) - } - return bufio.NewReader(f) -} diff --git a/internal/component/loki/source/file/internal/tail/scanner.go b/internal/component/loki/source/file/internal/tail/scanner.go new file mode 100644 index 00000000000..33cb72e8f51 --- /dev/null +++ b/internal/component/loki/source/file/internal/tail/scanner.go @@ -0,0 +1,46 @@ +package tail + +import ( + "bufio" + "io" + "os" + "strings" + + "golang.org/x/text/encoding" +) + +func newScanner(f *os.File, decoder *encoding.Decoder) *scanner { + return &scanner{ + b: bufio.NewReader(decoder.Reader(f)), + f: f, + decoder: decoder, + } +} + +type scanner struct { + b *bufio.Reader + f *os.File + decoder *encoding.Decoder +} + +func (r *scanner) next() (string, error) { + line, err := r.b.ReadString('\n') + if err != nil { + return line, err + } + return strings.TrimRight(line, "\r\n"), err +} + +func (r *scanner) offset() (int64, error) { + offset, err := r.f.Seek(0, io.SeekCurrent) + if err != nil { + return 0, err + } + + return offset - int64(r.b.Buffered()), nil +} + +func (r *scanner) reset(f *os.File) { + r.f = f + r.b = bufio.NewReader(r.decoder.Reader(f)) +} diff --git a/internal/component/loki/source/file/tailer.go b/internal/component/loki/source/file/tailer.go index 912c268151d..135152b3063 100644 --- a/internal/component/loki/source/file/tailer.go +++ b/internal/component/loki/source/file/tailer.go @@ -254,14 +254,14 @@ func (t *tailer) initRun() error { return nil } -func getDecoder(encoding string) (*encoding.Decoder, error) { - if encoding == "" { - return nil, nil +func getDecoder(enc string) (*encoding.Decoder, error) { + if enc == "" { + return encoding.Nop.NewDecoder(), nil } - encoder, err := ianaindex.IANA.Encoding(encoding) + encoder, err := ianaindex.IANA.Encoding(enc) if err != nil { - return nil, fmt.Errorf("failed to get IANA encoding %s: %w", encoding, err) + return nil, fmt.Errorf("failed to get IANA encoding %s: %w", enc, err) } return encoder.NewDecoder(), nil } From ef1a8db26e38df8e0ca8ec6e3961cf4f3a5b3047 Mon Sep 17 00:00:00 2001 From: Kalle <23356117+kalleep@users.noreply.github.com> Date: Tue, 13 Jan 2026 16:35:16 +0100 Subject: [PATCH 02/21] wip --- .../loki/source/file/decompresser.go | 4 +- .../loki/source/file/internal/tail/config.go | 3 +- .../loki/source/file/internal/tail/file.go | 126 +++++++++++++++--- .../source/file/internal/tail/file_test.go | 19 ++- .../loki/source/file/internal/tail/scanner.go | 101 +++++++++++--- internal/component/loki/source/file/tailer.go | 20 ++- 6 files changed, 207 insertions(+), 66 deletions(-) diff --git a/internal/component/loki/source/file/decompresser.go b/internal/component/loki/source/file/decompresser.go index 10f9070ce00..2c202e5ef90 100644 --- a/internal/component/loki/source/file/decompresser.go +++ b/internal/component/loki/source/file/decompresser.go @@ -93,7 +93,7 @@ func newDecompressor( } } - decoder, err := getDecoder(opts.encoding) + enc, err := getEncoding(opts.encoding) if err != nil { return nil, fmt.Errorf("failed to get decoder: %w", err) } @@ -107,7 +107,7 @@ func newDecompressor( labels: opts.labels, running: atomic.NewBool(false), position: position, - decoder: decoder, + decoder: enc.NewDecoder(), cfg: opts.decompressionConfig, onPositionsFileError: opts.onPositionsFileError, componentStopping: componentStopping, diff --git a/internal/component/loki/source/file/internal/tail/config.go b/internal/component/loki/source/file/internal/tail/config.go index 2decfa3a73e..028e507f21b 100644 --- a/internal/component/loki/source/file/internal/tail/config.go +++ b/internal/component/loki/source/file/internal/tail/config.go @@ -14,12 +14,13 @@ type Config struct { // If 0, tailing starts from the beginning of the file. Offset int64 + // FIXME: Update text // Decoder is an optional text decoder for non-UTF-8 encoded files. // If the file is not UTF-8, the tailer must use the correct decoder // or the output text may be corrupted. For example, if the file is // "UTF-16 LE" encoded, the tailer would not separate new lines properly // and the output could appear as garbled characters. - Decoder *encoding.Decoder + Encoding encoding.Encoding // WatcherConfig controls how the file system is polled for changes. WatcherConfig WatcherConfig diff --git a/internal/component/loki/source/file/internal/tail/file.go b/internal/component/loki/source/file/internal/tail/file.go index 218947a8ae1..3f8458fc0cd 100644 --- a/internal/component/loki/source/file/internal/tail/file.go +++ b/internal/component/loki/source/file/internal/tail/file.go @@ -17,6 +17,64 @@ import ( "github.com/grafana/alloy/internal/runtime/logging/level" ) +// detectBOM reads the first few bytes of the file to detect a Byte Order Mark (BOM). +// Returns the number of bytes the BOM occupies (0 if no BOM is found). +// Common BOM patterns: +// - UTF-8: EF BB BF (3 bytes) +// - UTF-16 LE: FF FE (2 bytes) +// - UTF-16 BE: FE FF (2 bytes) +// - UTF-32 LE: FF FE 00 00 (4 bytes) +// - UTF-32 BE: 00 00 FE FF (4 bytes) +func detectBOM(f *os.File) (int64, error) { + // Save current position + currentPos, err := f.Seek(0, io.SeekCurrent) + if err != nil { + return 0, err + } + defer f.Seek(currentPos, io.SeekStart) // Restore position + + // Seek to start + if _, err := f.Seek(0, io.SeekStart); err != nil { + return 0, err + } + + // Read first 4 bytes to cover all BOM types + bomBytes := make([]byte, 4) + n, err := f.Read(bomBytes) + if err != nil && err != io.EOF { + return 0, err + } + + if n < 2 { + return 0, nil // Not enough bytes for any BOM + } + + // Check for UTF-16 LE/BE (2 bytes) + if bomBytes[0] == 0xFF && bomBytes[1] == 0xFE { + if n >= 4 && bomBytes[2] == 0x00 && bomBytes[3] == 0x00 { + return 4, nil // UTF-32 LE + } + return 2, nil // UTF-16 LE + } + + // Check for UTF-16 BE (2 bytes) + if bomBytes[0] == 0xFE && bomBytes[1] == 0xFF { + return 2, nil // UTF-16 BE + } + + // Check for UTF-32 BE (4 bytes) + if n >= 4 && bomBytes[0] == 0x00 && bomBytes[1] == 0x00 && bomBytes[2] == 0xFE && bomBytes[3] == 0xFF { + return 4, nil // UTF-32 BE + } + + // Check for UTF-8 BOM (3 bytes) + if n >= 3 && bomBytes[0] == 0xEF && bomBytes[1] == 0xBB && bomBytes[2] == 0xBF { + return 3, nil // UTF-8 + } + + return 0, nil // No BOM found +} + // NewFile creates a new File tailer for the specified file path. // It opens the file and seeks to the provided offset if one is specified. // The returned File can be used to read lines from the file as they are appended. @@ -27,30 +85,47 @@ func NewFile(logger log.Logger, cfg *Config) (*File, error) { return nil, err } - if cfg.Offset != 0 { + if cfg.Encoding == nil { + cfg.Encoding = encoding.Nop + } + + if cfg.WatcherConfig == (WatcherConfig{}) { + cfg.WatcherConfig = defaultWatcherConfig + } + + // Detect and skip BOM if starting from the beginning of the file + actualOffset := cfg.Offset + if cfg.Offset == 0 { + bomSize, err := detectBOM(f) + if err != nil { + return nil, fmt.Errorf("failed to detect BOM: %w", err) + } + if bomSize > 0 { + actualOffset = bomSize + if _, err := f.Seek(bomSize, io.SeekStart); err != nil { + return nil, err + } + } + } else { // Seek to provided offset if _, err := f.Seek(cfg.Offset, io.SeekStart); err != nil { return nil, err } } - if cfg.Decoder == nil { - cfg.Decoder = encoding.Nop.NewDecoder() - } - - if cfg.WatcherConfig == (WatcherConfig{}) { - cfg.WatcherConfig = defaultWatcherConfig + scanner, err := newScanner(f, actualOffset, cfg.Encoding) + if err != nil { + return nil, err } cfg.WatcherConfig.MinPollFrequency = min(cfg.WatcherConfig.MinPollFrequency, cfg.WatcherConfig.MaxPollFrequency) - ctx, cancel := context.WithCancel(context.Background()) return &File{ cfg: cfg, logger: logger, file: f, - scanner: newScanner(f, cfg.Decoder), + scanner: scanner, ctx: ctx, cancel: cancel, }, nil @@ -104,7 +179,7 @@ read: text, err := f.scanner.next() if err != nil { if errors.Is(err, io.EOF) { - if err := f.wait(text != ""); err != nil { + if err := f.wait(); err != nil { return nil, err } goto read @@ -112,7 +187,7 @@ read: return nil, err } - offset, err := f.scanner.offset() + offset, err := f.scanner.position() if err != nil { return nil, err } @@ -151,8 +226,8 @@ func (f *File) Stop() error { // wait blocks until a file event is detected (modification, truncation, or deletion). // Returns an error if the context is canceled or an unrecoverable error occurs. -func (f *File) wait(partial bool) error { - offset, err := f.scanner.offset() +func (f *File) wait() error { + offset, err := f.offset() if err != nil { return err } @@ -161,15 +236,13 @@ func (f *File) wait(partial bool) error { switch event { case eventModified: level.Debug(f.logger).Log("msg", "file modified") - if partial { - // We need to reset to last successful offset because we consumed a partial line. - f.file.Seek(f.lastOffset, io.SeekStart) - f.scanner.reset(f.file) - } + f.file.Seek(f.lastOffset, io.SeekStart) + f.scanner.reset(f.file, f.lastOffset) return nil case eventTruncated: level.Debug(f.logger).Log("msg", "file truncated") // We need to reopen the file when it was truncated. + f.lastOffset = 0 return f.reopen(true) case eventDeleted: level.Debug(f.logger).Log("msg", "file deleted") @@ -185,6 +258,15 @@ func (f *File) wait(partial bool) error { } } +// offset returns the current byte offset in the file where the next read will occur. +func (f *File) offset() (int64, error) { + offset, err := f.file.Seek(0, io.SeekCurrent) + if err != nil { + return 0, err + } + return offset, nil +} + // drain reads all remaining complete lines from the current file handle and stores // them in bufferedLines. This is called when a file deletion/rotation is detected // to ensure we don't lose any data from the old file before switching to the new one. @@ -193,13 +275,13 @@ func (f *File) drain() { if _, err := f.file.Seek(f.lastOffset, io.SeekStart); err != nil { return } - f.scanner.reset(f.file) + f.scanner.reset(f.file, f.lastOffset) for { text, err := f.scanner.next() if err != nil { if text != "" { - offset, err := f.scanner.offset() + offset, err := f.scanner.position() if err != nil { return } @@ -212,7 +294,7 @@ func (f *File) drain() { return } - offset, err := f.scanner.offset() + offset, err := f.scanner.position() if err != nil { return } @@ -280,7 +362,7 @@ func (f *File) reopen(truncated bool) error { } f.file = file - f.scanner.reset(f.file) + f.scanner.reset(f.file, f.lastOffset) break } diff --git a/internal/component/loki/source/file/internal/tail/file_test.go b/internal/component/loki/source/file/internal/tail/file_test.go index 55633850c76..24e9bfaddf9 100644 --- a/internal/component/loki/source/file/internal/tail/file_test.go +++ b/internal/component/loki/source/file/internal/tail/file_test.go @@ -202,18 +202,18 @@ func TestFile(t *testing.T) { t.Run("UTF-16LE", func(t *testing.T) { file, err := NewFile(log.NewNopLogger(), &Config{ Filename: "testdata/mssql.log", - Decoder: unicode.UTF16(unicode.LittleEndian, unicode.ExpectBOM).NewDecoder(), + Encoding: unicode.UTF16(unicode.LittleEndian, unicode.IgnoreBOM), }) require.NoError(t, err) defer file.Stop() - verify(t, file, &Line{Text: "2025-03-11 11:11:02.58 Server Microsoft SQL Server 2019 (RTM) - 15.0.2000.5 (X64) ", Offset: 528}, nil) - verify(t, file, &Line{Text: " Sep 24 2019 13:48:23 ", Offset: 552}, nil) - verify(t, file, &Line{Text: " Copyright (C) 2019 Microsoft Corporation", Offset: 595}, nil) - verify(t, file, &Line{Text: " Enterprise Edition (64-bit) on Windows Server 2022 Standard 10.0 (Build 20348: ) (Hypervisor)", Offset: 697}, nil) - verify(t, file, &Line{Text: "", Offset: 699}, nil) - verify(t, file, &Line{Text: "2025-03-11 11:11:02.71 Server UTC adjustment: 1:00", Offset: 756}, nil) - verify(t, file, &Line{Text: "2025-03-11 11:11:02.71 Server (c) Microsoft Corporation.", Offset: 819}, nil) + verify(t, file, &Line{Text: "2025-03-11 11:11:02.58 Server Microsoft SQL Server 2019 (RTM) - 15.0.2000.5 (X64) ", Offset: 180}, nil) + verify(t, file, &Line{Text: " Sep 24 2019 13:48:23 ", Offset: 228}, nil) + verify(t, file, &Line{Text: " Copyright (C) 2019 Microsoft Corporation", Offset: 314}, nil) + verify(t, file, &Line{Text: " Enterprise Edition (64-bit) on Windows Server 2022 Standard 10.0 (Build 20348: ) (Hypervisor)", Offset: 518}, nil) + verify(t, file, &Line{Text: "", Offset: 522}, nil) + verify(t, file, &Line{Text: "2025-03-11 11:11:02.71 Server UTC adjustment: 1:00", Offset: 636}, nil) + verify(t, file, &Line{Text: "2025-03-11 11:11:02.71 Server (c) Microsoft Corporation.", Offset: 762}, nil) verify(t, file, &Line{Text: "2025-03-11 11:11:02.72 Server All rights reserved.", Offset: 876}, nil) }) @@ -232,7 +232,7 @@ func TestFile(t *testing.T) { }) t.Run("file rotation drains remaining lines from old file", func(t *testing.T) { - name := createFile(t, "rotation", "line1\nline2\nline3\nline4\npartial") + name := createFile(t, "rotation", "line1\nline2\nline3\nline4\n") defer removeFile(t, name) file, err := NewFile(log.NewNopLogger(), &Config{ @@ -257,7 +257,6 @@ func TestFile(t *testing.T) { // Verify we get the remaining old lines first, then new lines verify(t, file, &Line{Text: "line3", Offset: 18}, nil) verify(t, file, &Line{Text: "line4", Offset: 24}, nil) - verify(t, file, &Line{Text: "partial", Offset: 31}, nil) verify(t, file, &Line{Text: "newline1", Offset: 9}, nil) verify(t, file, &Line{Text: "newline2", Offset: 18}, nil) }) diff --git a/internal/component/loki/source/file/internal/tail/scanner.go b/internal/component/loki/source/file/internal/tail/scanner.go index 33cb72e8f51..7e61ad4411c 100644 --- a/internal/component/loki/source/file/internal/tail/scanner.go +++ b/internal/component/loki/source/file/internal/tail/scanner.go @@ -2,45 +2,108 @@ package tail import ( "bufio" + "bytes" "io" "os" - "strings" + "unsafe" "golang.org/x/text/encoding" ) -func newScanner(f *os.File, decoder *encoding.Decoder) *scanner { - return &scanner{ - b: bufio.NewReader(decoder.Reader(f)), - f: f, +func newScanner(f *os.File, offset int64, enc encoding.Encoding) (*scanner, error) { + var ( + decoder = enc.NewDecoder() + encoder = enc.NewEncoder() + ) + + scanner := &scanner{ + s: bufio.NewScanner(f), decoder: decoder, + splitFn: newSplitFn(encoder), + pos: offset, } + + scanner.s.Split(func(data []byte, atEOF bool) (advance int, token []byte, err error) { + advance, token, err = scanner.splitFn(data, atEOF) + scanner.pos += int64(advance) + return advance, token, err + }) + + return scanner, nil } type scanner struct { - b *bufio.Reader - f *os.File + pos int64 + s *bufio.Scanner + splitFn bufio.SplitFunc decoder *encoding.Decoder } func (r *scanner) next() (string, error) { - line, err := r.b.ReadString('\n') - if err != nil { - return line, err + var err error + ok := r.s.Scan() + + if !ok { + err = r.s.Err() + if err != nil { + return "", err + } + return "", io.EOF } - return strings.TrimRight(line, "\r\n"), err + + bytes, decodeErr := r.decoder.Bytes(r.s.Bytes()) + if decodeErr != nil { + return "", decodeErr + } + str := unsafe.String(unsafe.SliceData(bytes), len(bytes)) + return str, err +} + +func (r *scanner) position() (int64, error) { + return r.pos, nil +} + +func (r *scanner) reset(f *os.File, offset int64) { + r.pos = offset + r.s = bufio.NewScanner(f) + r.s.Split(func(data []byte, atEOF bool) (advance int, token []byte, err error) { + advance, token, err = r.splitFn(data, atEOF) + r.pos += int64(advance) + return advance, token, err + }) } -func (r *scanner) offset() (int64, error) { - offset, err := r.f.Seek(0, io.SeekCurrent) - if err != nil { - return 0, err +func newSplitFn(e *encoding.Encoder) bufio.SplitFunc { + nl, _ := encodedNewline(e) + cr, _ := encodedCarriageReturn(e) + return func(data []byte, atEOF bool) (advance int, token []byte, err error) { + if atEOF && len(data) == 0 { + return 0, nil, nil + } + + i := bytes.Index(data, nl) + if i == 0 { + return len(nl), []byte{}, nil + } + + if i >= 0 { + // We have a full newline-terminated line. + return i + len(nl), bytes.TrimSuffix(data[:i], cr), nil + } + + // We have a partial line so we need to wait for more data + return 0, nil, nil } +} - return offset - int64(r.b.Buffered()), nil +func encodedNewline(e *encoding.Encoder) ([]byte, error) { + out := make([]byte, 10) + nDst, _, err := e.Transform(out, []byte{'\n'}, true) + return out[:nDst], err } -func (r *scanner) reset(f *os.File) { - r.f = f - r.b = bufio.NewReader(r.decoder.Reader(f)) +func encodedCarriageReturn(e *encoding.Encoder) ([]byte, error) { + out := make([]byte, 10) + nDst, _, err := e.Transform(out, []byte{'\r'}, true) + return out[:nDst], err } diff --git a/internal/component/loki/source/file/tailer.go b/internal/component/loki/source/file/tailer.go index 135152b3063..cec2307da97 100644 --- a/internal/component/loki/source/file/tailer.go +++ b/internal/component/loki/source/file/tailer.go @@ -47,8 +47,8 @@ type tailer struct { report sync.Once - file *tail.File - decoder *encoding.Decoder + file *tail.File + enc encoding.Encoding } func newTailer( @@ -60,7 +60,7 @@ func newTailer( opts sourceOptions, ) (*tailer, error) { - decoder, err := getDecoder(opts.encoding) + decoder, err := getEncoding(opts.encoding) if err != nil { return nil, fmt.Errorf("failed to get decoder: %w", err) } @@ -82,7 +82,7 @@ func newTailer( }, componentStopping: componentStopping, report: sync.Once{}, - decoder: decoder, + enc: decoder, } return tailer, nil @@ -241,7 +241,7 @@ func (t *tailer) initRun() error { tail, err := tail.NewFile(t.logger, &tail.Config{ Filename: t.key.Path, Offset: pos, - Decoder: t.decoder, + Encoding: t.enc, WatcherConfig: t.watcherConfig, }) @@ -254,16 +254,12 @@ func (t *tailer) initRun() error { return nil } -func getDecoder(enc string) (*encoding.Decoder, error) { +func getEncoding(enc string) (encoding.Encoding, error) { if enc == "" { - return encoding.Nop.NewDecoder(), nil + return encoding.Nop, nil } - encoder, err := ianaindex.IANA.Encoding(enc) - if err != nil { - return nil, fmt.Errorf("failed to get IANA encoding %s: %w", enc, err) - } - return encoder.NewDecoder(), nil + return ianaindex.IANA.Encoding(enc) } // readLines reads lines from the tailed file by calling Next() in a loop. From cd9f566f2621facb3069c9ff5b58b36d80b1a54f Mon Sep 17 00:00:00 2001 From: Kalle <23356117+kalleep@users.noreply.github.com> Date: Tue, 13 Jan 2026 17:24:38 +0100 Subject: [PATCH 03/21] wip --- .../loki/source/file/internal/tail/bom.go | 41 ++++++++ .../loki/source/file/internal/tail/file.go | 99 ++----------------- .../loki/source/file/internal/tail/scanner.go | 47 +++++---- 3 files changed, 75 insertions(+), 112 deletions(-) create mode 100644 internal/component/loki/source/file/internal/tail/bom.go diff --git a/internal/component/loki/source/file/internal/tail/bom.go b/internal/component/loki/source/file/internal/tail/bom.go new file mode 100644 index 00000000000..04e6396c440 --- /dev/null +++ b/internal/component/loki/source/file/internal/tail/bom.go @@ -0,0 +1,41 @@ +package tail + +import ( + "bytes" + "io" + "os" +) + +// SkipBOMFile detects and skips a BOM at the beginning of the file. +// It returns the number of bytes skipped (0 if no BOM). +// The file offset is left positioned correctly for subsequent reads. +func skipBOM(f *os.File) int64 { + // Read up to 4 bytes (longest BOM) + var buf [4]byte + n, err := f.Read(buf[:]) + if err != nil && n == 0 { + return 0 + } + + bomLen := detectBOM(buf[:n]) + f.Seek(bomLen, io.SeekStart) + return bomLen +} + +func detectBOM(b []byte) int64 { + switch { + case bytes.HasPrefix(b, []byte{0x00, 0x00, 0xFE, 0xFF}): + return 4 // UTF-32 BE + case bytes.HasPrefix(b, []byte{0xFF, 0xFE, 0x00, 0x00}): + return 4 // UTF-32 LE + case bytes.HasPrefix(b, []byte{0xEF, 0xBB, 0xBF}): + return 3 // UTF-8 + case bytes.HasPrefix(b, []byte{0xFE, 0xFF}): + return 2 // UTF-16 BE + case bytes.HasPrefix(b, []byte{0xFF, 0xFE}): + return 2 // UTF-16 LE + default: + return 0 + } +} + diff --git a/internal/component/loki/source/file/internal/tail/file.go b/internal/component/loki/source/file/internal/tail/file.go index 3f8458fc0cd..25c99329caf 100644 --- a/internal/component/loki/source/file/internal/tail/file.go +++ b/internal/component/loki/source/file/internal/tail/file.go @@ -17,64 +17,6 @@ import ( "github.com/grafana/alloy/internal/runtime/logging/level" ) -// detectBOM reads the first few bytes of the file to detect a Byte Order Mark (BOM). -// Returns the number of bytes the BOM occupies (0 if no BOM is found). -// Common BOM patterns: -// - UTF-8: EF BB BF (3 bytes) -// - UTF-16 LE: FF FE (2 bytes) -// - UTF-16 BE: FE FF (2 bytes) -// - UTF-32 LE: FF FE 00 00 (4 bytes) -// - UTF-32 BE: 00 00 FE FF (4 bytes) -func detectBOM(f *os.File) (int64, error) { - // Save current position - currentPos, err := f.Seek(0, io.SeekCurrent) - if err != nil { - return 0, err - } - defer f.Seek(currentPos, io.SeekStart) // Restore position - - // Seek to start - if _, err := f.Seek(0, io.SeekStart); err != nil { - return 0, err - } - - // Read first 4 bytes to cover all BOM types - bomBytes := make([]byte, 4) - n, err := f.Read(bomBytes) - if err != nil && err != io.EOF { - return 0, err - } - - if n < 2 { - return 0, nil // Not enough bytes for any BOM - } - - // Check for UTF-16 LE/BE (2 bytes) - if bomBytes[0] == 0xFF && bomBytes[1] == 0xFE { - if n >= 4 && bomBytes[2] == 0x00 && bomBytes[3] == 0x00 { - return 4, nil // UTF-32 LE - } - return 2, nil // UTF-16 LE - } - - // Check for UTF-16 BE (2 bytes) - if bomBytes[0] == 0xFE && bomBytes[1] == 0xFF { - return 2, nil // UTF-16 BE - } - - // Check for UTF-32 BE (4 bytes) - if n >= 4 && bomBytes[0] == 0x00 && bomBytes[1] == 0x00 && bomBytes[2] == 0xFE && bomBytes[3] == 0xFF { - return 4, nil // UTF-32 BE - } - - // Check for UTF-8 BOM (3 bytes) - if n >= 3 && bomBytes[0] == 0xEF && bomBytes[1] == 0xBB && bomBytes[2] == 0xBF { - return 3, nil // UTF-8 - } - - return 0, nil // No BOM found -} - // NewFile creates a new File tailer for the specified file path. // It opens the file and seeks to the provided offset if one is specified. // The returned File can be used to read lines from the file as they are appended. @@ -93,27 +35,14 @@ func NewFile(logger log.Logger, cfg *Config) (*File, error) { cfg.WatcherConfig = defaultWatcherConfig } - // Detect and skip BOM if starting from the beginning of the file - actualOffset := cfg.Offset - if cfg.Offset == 0 { - bomSize, err := detectBOM(f) - if err != nil { - return nil, fmt.Errorf("failed to detect BOM: %w", err) - } - if bomSize > 0 { - actualOffset = bomSize - if _, err := f.Seek(bomSize, io.SeekStart); err != nil { - return nil, err - } - } - } else { + if cfg.Offset != 0 { // Seek to provided offset if _, err := f.Seek(cfg.Offset, io.SeekStart); err != nil { return nil, err } } - scanner, err := newScanner(f, actualOffset, cfg.Encoding) + scanner, err := newScanner(f, cfg.Offset, cfg.Encoding) if err != nil { return nil, err } @@ -141,7 +70,7 @@ type File struct { // protects file, reader, and lastOffset. mu sync.Mutex file *os.File - scanner *scanner + scanner *reader lastOffset int64 @@ -187,16 +116,11 @@ read: return nil, err } - offset, err := f.scanner.position() - if err != nil { - return nil, err - } - - f.lastOffset = offset + f.lastOffset = f.scanner.position() return &Line{ Text: text, - Offset: offset, + Offset: f.lastOffset, Time: time.Now(), }, nil } @@ -281,27 +205,18 @@ func (f *File) drain() { text, err := f.scanner.next() if err != nil { if text != "" { - offset, err := f.scanner.position() - if err != nil { - return - } f.bufferedLines = append(f.bufferedLines, Line{ Text: text, - Offset: offset, + Offset: f.scanner.position(), Time: time.Now(), }) } return } - offset, err := f.scanner.position() - if err != nil { - return - } - f.bufferedLines = append(f.bufferedLines, Line{ Text: text, - Offset: offset, + Offset: f.scanner.position(), Time: time.Now(), }) } diff --git a/internal/component/loki/source/file/internal/tail/scanner.go b/internal/component/loki/source/file/internal/tail/scanner.go index 7e61ad4411c..6c7253e0e67 100644 --- a/internal/component/loki/source/file/internal/tail/scanner.go +++ b/internal/component/loki/source/file/internal/tail/scanner.go @@ -10,48 +10,52 @@ import ( "golang.org/x/text/encoding" ) -func newScanner(f *os.File, offset int64, enc encoding.Encoding) (*scanner, error) { +func newScanner(f *os.File, offset int64, enc encoding.Encoding) (*reader, error) { + if offset == 0 { + offset = skipBOM(f) + } + var ( decoder = enc.NewDecoder() encoder = enc.NewEncoder() ) - scanner := &scanner{ - s: bufio.NewScanner(f), + reader := &reader{ + scanner: bufio.NewScanner(f), decoder: decoder, splitFn: newSplitFn(encoder), pos: offset, } - scanner.s.Split(func(data []byte, atEOF bool) (advance int, token []byte, err error) { - advance, token, err = scanner.splitFn(data, atEOF) - scanner.pos += int64(advance) + reader.scanner.Split(func(data []byte, atEOF bool) (advance int, token []byte, err error) { + advance, token, err = reader.splitFn(data, atEOF) + reader.pos += int64(advance) return advance, token, err }) - return scanner, nil + return reader, nil } -type scanner struct { +type reader struct { pos int64 - s *bufio.Scanner + scanner *bufio.Scanner splitFn bufio.SplitFunc decoder *encoding.Decoder } -func (r *scanner) next() (string, error) { +func (r *reader) next() (string, error) { var err error - ok := r.s.Scan() + ok := r.scanner.Scan() if !ok { - err = r.s.Err() + err = r.scanner.Err() if err != nil { return "", err } return "", io.EOF } - bytes, decodeErr := r.decoder.Bytes(r.s.Bytes()) + bytes, decodeErr := r.decoder.Bytes(r.scanner.Bytes()) if decodeErr != nil { return "", decodeErr } @@ -59,14 +63,17 @@ func (r *scanner) next() (string, error) { return str, err } -func (r *scanner) position() (int64, error) { - return r.pos, nil +func (r *reader) position() int64 { + return r.pos } -func (r *scanner) reset(f *os.File, offset int64) { +func (r *reader) reset(f *os.File, offset int64) { + if offset == 0 { + offset = skipBOM(f) + } r.pos = offset - r.s = bufio.NewScanner(f) - r.s.Split(func(data []byte, atEOF bool) (advance int, token []byte, err error) { + r.scanner = bufio.NewScanner(f) + r.scanner.Split(func(data []byte, atEOF bool) (advance int, token []byte, err error) { advance, token, err = r.splitFn(data, atEOF) r.pos += int64(advance) return advance, token, err @@ -87,11 +94,11 @@ func newSplitFn(e *encoding.Encoder) bufio.SplitFunc { } if i >= 0 { - // We have a full newline-terminated line. + // We have a full line so we should strip out cr. return i + len(nl), bytes.TrimSuffix(data[:i], cr), nil } - // We have a partial line so we need to wait for more data + // We have a partial line so we need to wait for more data. return 0, nil, nil } } From d5274b6bfe02f17ff009b91369a132559dbace70 Mon Sep 17 00:00:00 2001 From: Kalle <23356117+kalleep@users.noreply.github.com> Date: Wed, 14 Jan 2026 10:20:33 +0100 Subject: [PATCH 04/21] use bufio.Reader --- .../loki/source/file/internal/tail/file.go | 36 ++--- .../loki/source/file/internal/tail/reader.go | 141 ++++++++++++++++++ .../loki/source/file/internal/tail/scanner.go | 116 -------------- 3 files changed, 159 insertions(+), 134 deletions(-) create mode 100644 internal/component/loki/source/file/internal/tail/reader.go delete mode 100644 internal/component/loki/source/file/internal/tail/scanner.go diff --git a/internal/component/loki/source/file/internal/tail/file.go b/internal/component/loki/source/file/internal/tail/file.go index 25c99329caf..2455f5e6eaf 100644 --- a/internal/component/loki/source/file/internal/tail/file.go +++ b/internal/component/loki/source/file/internal/tail/file.go @@ -42,7 +42,7 @@ func NewFile(logger log.Logger, cfg *Config) (*File, error) { } } - scanner, err := newScanner(f, cfg.Offset, cfg.Encoding) + scanner, err := newReader(f, cfg.Offset, cfg.Encoding) if err != nil { return nil, err } @@ -51,12 +51,12 @@ func NewFile(logger log.Logger, cfg *Config) (*File, error) { ctx, cancel := context.WithCancel(context.Background()) return &File{ - cfg: cfg, - logger: logger, - file: f, - scanner: scanner, - ctx: ctx, - cancel: cancel, + cfg: cfg, + logger: logger, + file: f, + reader: scanner, + ctx: ctx, + cancel: cancel, }, nil } @@ -68,9 +68,9 @@ type File struct { logger log.Logger // protects file, reader, and lastOffset. - mu sync.Mutex - file *os.File - scanner *reader + mu sync.Mutex + file *os.File + reader *reader lastOffset int64 @@ -105,7 +105,7 @@ read: return &line, nil } - text, err := f.scanner.next() + text, err := f.reader.next() if err != nil { if errors.Is(err, io.EOF) { if err := f.wait(); err != nil { @@ -116,7 +116,7 @@ read: return nil, err } - f.lastOffset = f.scanner.position() + f.lastOffset = f.reader.position() return &Line{ Text: text, @@ -161,7 +161,7 @@ func (f *File) wait() error { case eventModified: level.Debug(f.logger).Log("msg", "file modified") f.file.Seek(f.lastOffset, io.SeekStart) - f.scanner.reset(f.file, f.lastOffset) + f.reader.reset(f.file, f.lastOffset) return nil case eventTruncated: level.Debug(f.logger).Log("msg", "file truncated") @@ -199,15 +199,15 @@ func (f *File) drain() { if _, err := f.file.Seek(f.lastOffset, io.SeekStart); err != nil { return } - f.scanner.reset(f.file, f.lastOffset) + f.reader.reset(f.file, f.lastOffset) for { - text, err := f.scanner.next() + text, err := f.reader.next() if err != nil { if text != "" { f.bufferedLines = append(f.bufferedLines, Line{ Text: text, - Offset: f.scanner.position(), + Offset: f.reader.position(), Time: time.Now(), }) } @@ -216,7 +216,7 @@ func (f *File) drain() { f.bufferedLines = append(f.bufferedLines, Line{ Text: text, - Offset: f.scanner.position(), + Offset: f.reader.position(), Time: time.Now(), }) } @@ -277,7 +277,7 @@ func (f *File) reopen(truncated bool) error { } f.file = file - f.scanner.reset(f.file, f.lastOffset) + f.reader.reset(f.file, f.lastOffset) break } diff --git a/internal/component/loki/source/file/internal/tail/reader.go b/internal/component/loki/source/file/internal/tail/reader.go new file mode 100644 index 00000000000..8482d7b9d18 --- /dev/null +++ b/internal/component/loki/source/file/internal/tail/reader.go @@ -0,0 +1,141 @@ +package tail + +import ( + "bufio" + "bytes" + "os" + "unsafe" + + "golang.org/x/text/encoding" +) + +const defaultBufSize = 4096 + +func newReader(f *os.File, offset int64, enc encoding.Encoding) (*reader, error) { + if offset == 0 { + offset = skipBOM(f) + } + + var ( + decoder = enc.NewDecoder() + encoder = enc.NewEncoder() + ) + + nl, err := encodedNewline(encoder) + if err != nil { + return nil, err + } + + cr, err := encodedCarriageReturn(encoder) + if err != nil { + return nil, err + } + + reader := &reader{ + pos: offset, + br: bufio.NewReader(f), + decoder: decoder, + nl: nl, + lastNl: nl[len(nl)-1], + cr: cr, + pending: make([]byte, 0, defaultBufSize), + } + + return reader, nil +} + +type reader struct { + pos int64 + br *bufio.Reader + decoder *encoding.Decoder + + nl []byte + lastNl byte + cr []byte + pending []byte +} + +func (r *reader) next() (string, error) { + // First we check if we already have a full line buffered. + if line, ok := r.consumeLine(); ok { + return r.decode(line) + } + + for { + + // Read more data up until the last byte of nl. + chunk, err := r.br.ReadBytes(r.lastNl) + if len(chunk) > 0 { + r.pending = append(r.pending, chunk...) + + if line, ok := r.consumeLine(); ok { + return r.decode(line) + } + } + + // If we did not get an error and did not find a full line we + // need to read more data. + if err == nil { + continue + } + + return "", err + } +} + +func (r *reader) decode(line []byte) (string, error) { + // Decode the line we have consumed. + converted, err := r.decoder.Bytes(bytes.TrimSuffix(line, r.cr)) + if err != nil { + return "", err + } + + // It is safe to convert this into a string here because converter will always copy + // the bytes given to it, even Nop decoder will do that. + return unsafe.String(unsafe.SliceData(converted), len(converted)), nil +} + +// consumeLine checks pending for the delimiter; if found, it splits +// pending into line and remainder. +func (r *reader) consumeLine() ([]byte, bool) { + // Check if pending contains a full line. + i := bytes.Index(r.pending, r.nl) + if i < 0 { + return nil, false + } + + // Extract everything up until newline. + line := r.pending[:i] + // Keep everything except the line we extraxted and newline. + rem := r.pending[i+len(r.nl):] + r.pending = append(make([]byte, 0, defaultBufSize), rem...) + + // Advance the position on bytes we have consumed as a full line. + r.pos += int64(len(line) + len(r.nl)) + return line, true +} + +func (r *reader) position() int64 { + return r.pos +} + +func (r *reader) reset(f *os.File, offset int64) { + if offset == 0 { + offset = skipBOM(f) + } + r.pos = offset + r.br.Reset(f) + r.pending = make([]byte, 0, defaultBufSize) +} + +func encodedNewline(e *encoding.Encoder) ([]byte, error) { + out := make([]byte, 10) + nDst, _, err := e.Transform(out, []byte{'\n'}, true) + return out[:nDst], err +} + +func encodedCarriageReturn(e *encoding.Encoder) ([]byte, error) { + out := make([]byte, 10) + nDst, _, err := e.Transform(out, []byte{'\r'}, true) + return out[:nDst], err +} diff --git a/internal/component/loki/source/file/internal/tail/scanner.go b/internal/component/loki/source/file/internal/tail/scanner.go deleted file mode 100644 index 6c7253e0e67..00000000000 --- a/internal/component/loki/source/file/internal/tail/scanner.go +++ /dev/null @@ -1,116 +0,0 @@ -package tail - -import ( - "bufio" - "bytes" - "io" - "os" - "unsafe" - - "golang.org/x/text/encoding" -) - -func newScanner(f *os.File, offset int64, enc encoding.Encoding) (*reader, error) { - if offset == 0 { - offset = skipBOM(f) - } - - var ( - decoder = enc.NewDecoder() - encoder = enc.NewEncoder() - ) - - reader := &reader{ - scanner: bufio.NewScanner(f), - decoder: decoder, - splitFn: newSplitFn(encoder), - pos: offset, - } - - reader.scanner.Split(func(data []byte, atEOF bool) (advance int, token []byte, err error) { - advance, token, err = reader.splitFn(data, atEOF) - reader.pos += int64(advance) - return advance, token, err - }) - - return reader, nil -} - -type reader struct { - pos int64 - scanner *bufio.Scanner - splitFn bufio.SplitFunc - decoder *encoding.Decoder -} - -func (r *reader) next() (string, error) { - var err error - ok := r.scanner.Scan() - - if !ok { - err = r.scanner.Err() - if err != nil { - return "", err - } - return "", io.EOF - } - - bytes, decodeErr := r.decoder.Bytes(r.scanner.Bytes()) - if decodeErr != nil { - return "", decodeErr - } - str := unsafe.String(unsafe.SliceData(bytes), len(bytes)) - return str, err -} - -func (r *reader) position() int64 { - return r.pos -} - -func (r *reader) reset(f *os.File, offset int64) { - if offset == 0 { - offset = skipBOM(f) - } - r.pos = offset - r.scanner = bufio.NewScanner(f) - r.scanner.Split(func(data []byte, atEOF bool) (advance int, token []byte, err error) { - advance, token, err = r.splitFn(data, atEOF) - r.pos += int64(advance) - return advance, token, err - }) -} - -func newSplitFn(e *encoding.Encoder) bufio.SplitFunc { - nl, _ := encodedNewline(e) - cr, _ := encodedCarriageReturn(e) - return func(data []byte, atEOF bool) (advance int, token []byte, err error) { - if atEOF && len(data) == 0 { - return 0, nil, nil - } - - i := bytes.Index(data, nl) - if i == 0 { - return len(nl), []byte{}, nil - } - - if i >= 0 { - // We have a full line so we should strip out cr. - return i + len(nl), bytes.TrimSuffix(data[:i], cr), nil - } - - // We have a partial line so we need to wait for more data. - return 0, nil, nil - } -} - -func encodedNewline(e *encoding.Encoder) ([]byte, error) { - out := make([]byte, 10) - nDst, _, err := e.Transform(out, []byte{'\n'}, true) - return out[:nDst], err -} - -func encodedCarriageReturn(e *encoding.Encoder) ([]byte, error) { - out := make([]byte, 10) - nDst, _, err := e.Transform(out, []byte{'\r'}, true) - return out[:nDst], err -} From 11e081ae84426ba0bcea3b18ece29efcf7a2bbcd Mon Sep 17 00:00:00 2001 From: Kalle <23356117+kalleep@users.noreply.github.com> Date: Wed, 14 Jan 2026 11:21:24 +0100 Subject: [PATCH 05/21] adjust encoder for UTF-16 based on BOM --- .../component/loki/source/file/encoding.go | 14 ++++ .../component/loki/source/file/file_test.go | 13 +--- .../loki/source/file/internal/tail/bom.go | 72 ++++++++++++++----- .../loki/source/file/internal/tail/reader.go | 32 ++++++++- internal/component/loki/source/file/tailer.go | 13 +--- 5 files changed, 101 insertions(+), 43 deletions(-) create mode 100644 internal/component/loki/source/file/encoding.go diff --git a/internal/component/loki/source/file/encoding.go b/internal/component/loki/source/file/encoding.go new file mode 100644 index 00000000000..abfc7d36b98 --- /dev/null +++ b/internal/component/loki/source/file/encoding.go @@ -0,0 +1,14 @@ +package file + +import ( + "golang.org/x/text/encoding" + "golang.org/x/text/encoding/ianaindex" +) + +func getEncoding(enc string) (encoding.Encoding, error) { + if enc == "" { + return encoding.Nop, nil + } + + return ianaindex.IANA.Encoding(enc) +} diff --git a/internal/component/loki/source/file/file_test.go b/internal/component/loki/source/file/file_test.go index be75225f21f..524695a9cac 100644 --- a/internal/component/loki/source/file/file_test.go +++ b/internal/component/loki/source/file/file_test.go @@ -470,21 +470,10 @@ func TestEncoding(t *testing.T) { encoding string decompressionConfig DecompressionConfig }{ - {"CRLF default encoding", "/CRLF/UTF-8.txt", "", noDecompress}, - {"CRLF UTF-8", "/CRLF/UTF-8.txt", "UTF-8", noDecompress}, - {"CRLF UTF-16", "/CRLF/UTF-16.txt", "UTF-16", noDecompress}, - {"CRLF UTF-16 LE", "/CRLF/UTF-16_LE.txt", "UTF-16LE", noDecompress}, - {"CRLF UTF-16 BE", "/CRLF/UTF-16_BE.txt", "UTF-16BE", noDecompress}, {"CRLF UTF-16 LE with BOM", "/CRLF/UTF-16_LE_BOM.txt", "UTF-16", noDecompress}, {"CRLF UTF-16 BE with BOM", "/CRLF/UTF-16_BE_BOM.txt", "UTF-16", noDecompress}, - {"LF default encoding", "/LF/UTF-8.txt", "", noDecompress}, - {"LF UTF-8", "/LF/UTF-8.txt", "UTF-8", noDecompress}, - {"LF UTF-16", "/LF/UTF-16.txt", "UTF-16", noDecompress}, - {"LF UTF-16 LE", "/LF/UTF-16_LE.txt", "UTF-16LE", noDecompress}, - {"LF UTF-16 BE", "/LF/UTF-16_BE.txt", "UTF-16BE", noDecompress}, {"LF UTF-16 LE with BOM", "/LF/UTF-16_LE_BOM.txt", "UTF-16", noDecompress}, {"LF UTF-16 BE with BOM", "/LF/UTF-16_BE_BOM.txt", "UTF-16", noDecompress}, - {"CRLF default encoding (gzipped)", "/CRLF/UTF-8.txt.gz", "", gzDecompress}, {"CRLF UTF-8 (gzipped)", "/CRLF/UTF-8.txt.gz", "UTF-8", gzDecompress}, {"CRLF UTF-16 (gzipped)", "/CRLF/UTF-16.txt.gz", "UTF-16", gzDecompress}, {"CRLF UTF-16 LE (gzipped)", "/CRLF/UTF-16_LE.txt.gz", "UTF-16LE", gzDecompress}, @@ -504,7 +493,7 @@ func TestEncoding(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { opts := component.Options{ - Logger: util.TestAlloyLogger(t), + Logger: logging.NewNop(), Registerer: prometheus.NewRegistry(), OnStateChange: func(e component.Exports) {}, DataPath: t.TempDir(), diff --git a/internal/component/loki/source/file/internal/tail/bom.go b/internal/component/loki/source/file/internal/tail/bom.go index 04e6396c440..c2b9406034b 100644 --- a/internal/component/loki/source/file/internal/tail/bom.go +++ b/internal/component/loki/source/file/internal/tail/bom.go @@ -4,38 +4,74 @@ import ( "bytes" "io" "os" + + "golang.org/x/text/encoding" + "golang.org/x/text/encoding/unicode" +) + +// BOM byte sequences +var ( + bomUTF32BE = []byte{0x00, 0x00, 0xFE, 0xFF} + bomUTF32LE = []byte{0xFF, 0xFE, 0x00, 0x00} + bomUTF8 = []byte{0xEF, 0xBB, 0xBF} + bomUTF16BE = []byte{0xFE, 0xFF} + bomUTF16LE = []byte{0xFF, 0xFE} ) -// SkipBOMFile detects and skips a BOM at the beginning of the file. -// It returns the number of bytes skipped (0 if no BOM). -// The file offset is left positioned correctly for subsequent reads. -func skipBOM(f *os.File) int64 { +// skipBOM detects and skips a BOM at the beginning of the file. +// It returns the number of bytes skipped and the BOM bytes +// that were consumed. The file offset is left positioned correctly for +// subsequent reads. +func skipBOM(f *os.File) (int64, []byte) { // Read up to 4 bytes (longest BOM) var buf [4]byte n, err := f.Read(buf[:]) if err != nil && n == 0 { - return 0 + return 0, nil } - bomLen := detectBOM(buf[:n]) + bomLen := detectBom(buf[:n]) f.Seek(bomLen, io.SeekStart) - return bomLen + return bomLen, buf[:bomLen] } -func detectBOM(b []byte) int64 { +// detectBom detects a BOM in the given bytes and returns the length +// of the BOM (0 if no BOM was detected). +func detectBom(b []byte) int64 { switch { - case bytes.HasPrefix(b, []byte{0x00, 0x00, 0xFE, 0xFF}): - return 4 // UTF-32 BE - case bytes.HasPrefix(b, []byte{0xFF, 0xFE, 0x00, 0x00}): - return 4 // UTF-32 LE - case bytes.HasPrefix(b, []byte{0xEF, 0xBB, 0xBF}): - return 3 // UTF-8 - case bytes.HasPrefix(b, []byte{0xFE, 0xFF}): - return 2 // UTF-16 BE - case bytes.HasPrefix(b, []byte{0xFF, 0xFE}): - return 2 // UTF-16 LE + case bytes.HasPrefix(b, bomUTF32BE): + return 4 + case bytes.HasPrefix(b, bomUTF32LE): + return 4 + case bytes.HasPrefix(b, bomUTF8): + return 3 + case bytes.HasPrefix(b, bomUTF16BE): + return 2 + case bytes.HasPrefix(b, bomUTF16LE): + return 2 default: return 0 } } +// resolveEncodingFromBOM takes the BOM bytes and the original encoding, +// and returns the resolved encoding. If a UTF-16 BOM is detected, it returns +// an encoding with the correct endianness and IgnoreBOM mode. +// Otherwise, it returns the original encoding. +func resolveEncodingFromBOM(bomBytes []byte, originalEnc encoding.Encoding) encoding.Encoding { + if len(bomBytes) == 0 { + return originalEnc + } + + switch { + case bytes.HasPrefix(bomBytes, bomUTF16BE): + // UTF-16 BE BOM detected - return encoding with IgnoreBOM since we skip it + return unicode.UTF16(unicode.BigEndian, unicode.IgnoreBOM) + case bytes.HasPrefix(bomBytes, bomUTF16LE): + // UTF-16 LE BOM detected - return encoding with IgnoreBOM since we skip it + return unicode.UTF16(unicode.LittleEndian, unicode.IgnoreBOM) + default: + // Other BOMs (UTF-8, UTF-32) don't affect encoding selection + return originalEnc + } +} diff --git a/internal/component/loki/source/file/internal/tail/reader.go b/internal/component/loki/source/file/internal/tail/reader.go index 8482d7b9d18..e2c6b9deb96 100644 --- a/internal/component/loki/source/file/internal/tail/reader.go +++ b/internal/component/loki/source/file/internal/tail/reader.go @@ -13,9 +13,13 @@ const defaultBufSize = 4096 func newReader(f *os.File, offset int64, enc encoding.Encoding) (*reader, error) { if offset == 0 { - offset = skipBOM(f) + bomOffset, bomBytes := skipBOM(f) + offset = bomOffset + // Resolve the encoding based on BOM bytes if any. + enc = resolveEncodingFromBOM(bomBytes, enc) } + var ( decoder = enc.NewDecoder() encoder = enc.NewEncoder() @@ -35,6 +39,7 @@ func newReader(f *os.File, offset int64, enc encoding.Encoding) (*reader, error) pos: offset, br: bufio.NewReader(f), decoder: decoder, + enc: enc, // Store the encoding (after BOM detection) for use in reset nl: nl, lastNl: nl[len(nl)-1], cr: cr, @@ -48,6 +53,7 @@ type reader struct { pos int64 br *bufio.Reader decoder *encoding.Decoder + enc encoding.Encoding // The encoding to use (set on creation, preserved on reset) nl []byte lastNl byte @@ -120,9 +126,31 @@ func (r *reader) position() int64 { } func (r *reader) reset(f *os.File, offset int64) { + // Just skip BOM if needed, but keep the same encoding that was set on creation if offset == 0 { - offset = skipBOM(f) + var bomOffset int64 + bomOffset, _ = skipBOM(f) + offset = bomOffset + // Note: We don't change encoding on reset - it was determined on creation + } + + // Recreate decoder and encoder with the stored encoding + r.decoder = r.enc.NewDecoder() + encoder := r.enc.NewEncoder() + + // Update newline and carriage return patterns + var err error + r.nl, err = encodedNewline(encoder) + if err != nil { + // If encoding fails, keep old values - this shouldn't happen in practice + return + } + r.lastNl = r.nl[len(r.nl)-1] + r.cr, err = encodedCarriageReturn(encoder) + if err != nil { + return } + r.pos = offset r.br.Reset(f) r.pending = make([]byte, 0, defaultBufSize) diff --git a/internal/component/loki/source/file/tailer.go b/internal/component/loki/source/file/tailer.go index cec2307da97..aa67fdb9c46 100644 --- a/internal/component/loki/source/file/tailer.go +++ b/internal/component/loki/source/file/tailer.go @@ -18,7 +18,6 @@ import ( "github.com/prometheus/common/model" "go.uber.org/atomic" "golang.org/x/text/encoding" - "golang.org/x/text/encoding/ianaindex" "github.com/grafana/alloy/internal/component/common/loki" "github.com/grafana/alloy/internal/component/loki/source/file/internal/tail" @@ -60,7 +59,7 @@ func newTailer( opts sourceOptions, ) (*tailer, error) { - decoder, err := getEncoding(opts.encoding) + enc, err := getEncoding(opts.encoding) if err != nil { return nil, fmt.Errorf("failed to get decoder: %w", err) } @@ -82,7 +81,7 @@ func newTailer( }, componentStopping: componentStopping, report: sync.Once{}, - enc: decoder, + enc: enc, } return tailer, nil @@ -254,14 +253,6 @@ func (t *tailer) initRun() error { return nil } -func getEncoding(enc string) (encoding.Encoding, error) { - if enc == "" { - return encoding.Nop, nil - } - - return ianaindex.IANA.Encoding(enc) -} - // readLines reads lines from the tailed file by calling Next() in a loop. // It processes each line by sending it to the receiver's channel and updates // position tracking periodically. It exits when Next() returns an error, From cbf50493f82a0cf6b1a8fd4366bd5d242cd7a39b Mon Sep 17 00:00:00 2001 From: Kalle <23356117+kalleep@users.noreply.github.com> Date: Wed, 14 Jan 2026 11:34:42 +0100 Subject: [PATCH 06/21] Do not reset read position on modify event --- .../loki/source/file/internal/tail/file.go | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/internal/component/loki/source/file/internal/tail/file.go b/internal/component/loki/source/file/internal/tail/file.go index 2455f5e6eaf..a97a2a0ec68 100644 --- a/internal/component/loki/source/file/internal/tail/file.go +++ b/internal/component/loki/source/file/internal/tail/file.go @@ -51,12 +51,13 @@ func NewFile(logger log.Logger, cfg *Config) (*File, error) { ctx, cancel := context.WithCancel(context.Background()) return &File{ - cfg: cfg, - logger: logger, - file: f, - reader: scanner, - ctx: ctx, - cancel: cancel, + cfg: cfg, + logger: logger, + file: f, + reader: scanner, + ctx: ctx, + cancel: cancel, + lastOffset: cfg.Offset, }, nil } @@ -160,8 +161,6 @@ func (f *File) wait() error { switch event { case eventModified: level.Debug(f.logger).Log("msg", "file modified") - f.file.Seek(f.lastOffset, io.SeekStart) - f.reader.reset(f.file, f.lastOffset) return nil case eventTruncated: level.Debug(f.logger).Log("msg", "file truncated") From bc12ac190f3d3c8bbf2bdbe4e165ecd34fab4f22 Mon Sep 17 00:00:00 2001 From: Kalle <23356117+kalleep@users.noreply.github.com> Date: Wed, 14 Jan 2026 11:46:04 +0100 Subject: [PATCH 07/21] Handle case where we have a stored position for UTF-16 but need to consume BOM to determine encoding --- .../loki/source/file/internal/tail/bom.go | 26 ++++++++++++++----- .../loki/source/file/internal/tail/reader.go | 15 +++-------- 2 files changed, 23 insertions(+), 18 deletions(-) diff --git a/internal/component/loki/source/file/internal/tail/bom.go b/internal/component/loki/source/file/internal/tail/bom.go index c2b9406034b..fae35c0715c 100644 --- a/internal/component/loki/source/file/internal/tail/bom.go +++ b/internal/component/loki/source/file/internal/tail/bom.go @@ -19,20 +19,32 @@ var ( ) // skipBOM detects and skips a BOM at the beginning of the file. -// It returns the number of bytes skipped and the BOM bytes -// that were consumed. The file offset is left positioned correctly for -// subsequent reads. -func skipBOM(f *os.File) (int64, []byte) { +// It takes the current offset and returns the final offset +// and the BOM bytes that were detected. +// The file is positioned at the start to read the BOM, then +// seeks to the final offset for subsequent reads. +func skipBOM(f *os.File, offset int64) (int64, []byte) { + // Make sure we are reading from the start of the file. + f.Seek(0, io.SeekStart) + // Read up to 4 bytes (longest BOM) var buf [4]byte n, err := f.Read(buf[:]) if err != nil && n == 0 { - return 0, nil + return offset, nil } bomLen := detectBom(buf[:n]) - f.Seek(bomLen, io.SeekStart) - return bomLen, buf[:bomLen] + + // If a BOM was detected and its length is greater than or equal to the + // provided offset, use the BOM length as the offset. Otherwise, use the + // provided offset (which may be beyond the BOM). + if bomLen >= offset { + offset = bomLen + } + + f.Seek(offset, io.SeekStart) + return offset, buf[:bomLen] } // detectBom detects a BOM in the given bytes and returns the length diff --git a/internal/component/loki/source/file/internal/tail/reader.go b/internal/component/loki/source/file/internal/tail/reader.go index e2c6b9deb96..03eb1b2cc5b 100644 --- a/internal/component/loki/source/file/internal/tail/reader.go +++ b/internal/component/loki/source/file/internal/tail/reader.go @@ -12,13 +12,9 @@ import ( const defaultBufSize = 4096 func newReader(f *os.File, offset int64, enc encoding.Encoding) (*reader, error) { - if offset == 0 { - bomOffset, bomBytes := skipBOM(f) - offset = bomOffset - // Resolve the encoding based on BOM bytes if any. - enc = resolveEncodingFromBOM(bomBytes, enc) - } - + var bomBytes []byte + offset, bomBytes = skipBOM(f, offset) + enc = resolveEncodingFromBOM(bomBytes, enc) var ( decoder = enc.NewDecoder() @@ -128,10 +124,7 @@ func (r *reader) position() int64 { func (r *reader) reset(f *os.File, offset int64) { // Just skip BOM if needed, but keep the same encoding that was set on creation if offset == 0 { - var bomOffset int64 - bomOffset, _ = skipBOM(f) - offset = bomOffset - // Note: We don't change encoding on reset - it was determined on creation + offset, _ = skipBOM(f, offset) } // Recreate decoder and encoder with the stored encoding From fb52885dafe3f99f47e36bdee9d1b87ff93121c0 Mon Sep 17 00:00:00 2001 From: Kalle <23356117+kalleep@users.noreply.github.com> Date: Wed, 14 Jan 2026 11:54:52 +0100 Subject: [PATCH 08/21] restore file --- internal/component/loki/source/file/file_test.go | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/internal/component/loki/source/file/file_test.go b/internal/component/loki/source/file/file_test.go index 524695a9cac..be75225f21f 100644 --- a/internal/component/loki/source/file/file_test.go +++ b/internal/component/loki/source/file/file_test.go @@ -470,10 +470,21 @@ func TestEncoding(t *testing.T) { encoding string decompressionConfig DecompressionConfig }{ + {"CRLF default encoding", "/CRLF/UTF-8.txt", "", noDecompress}, + {"CRLF UTF-8", "/CRLF/UTF-8.txt", "UTF-8", noDecompress}, + {"CRLF UTF-16", "/CRLF/UTF-16.txt", "UTF-16", noDecompress}, + {"CRLF UTF-16 LE", "/CRLF/UTF-16_LE.txt", "UTF-16LE", noDecompress}, + {"CRLF UTF-16 BE", "/CRLF/UTF-16_BE.txt", "UTF-16BE", noDecompress}, {"CRLF UTF-16 LE with BOM", "/CRLF/UTF-16_LE_BOM.txt", "UTF-16", noDecompress}, {"CRLF UTF-16 BE with BOM", "/CRLF/UTF-16_BE_BOM.txt", "UTF-16", noDecompress}, + {"LF default encoding", "/LF/UTF-8.txt", "", noDecompress}, + {"LF UTF-8", "/LF/UTF-8.txt", "UTF-8", noDecompress}, + {"LF UTF-16", "/LF/UTF-16.txt", "UTF-16", noDecompress}, + {"LF UTF-16 LE", "/LF/UTF-16_LE.txt", "UTF-16LE", noDecompress}, + {"LF UTF-16 BE", "/LF/UTF-16_BE.txt", "UTF-16BE", noDecompress}, {"LF UTF-16 LE with BOM", "/LF/UTF-16_LE_BOM.txt", "UTF-16", noDecompress}, {"LF UTF-16 BE with BOM", "/LF/UTF-16_BE_BOM.txt", "UTF-16", noDecompress}, + {"CRLF default encoding (gzipped)", "/CRLF/UTF-8.txt.gz", "", gzDecompress}, {"CRLF UTF-8 (gzipped)", "/CRLF/UTF-8.txt.gz", "UTF-8", gzDecompress}, {"CRLF UTF-16 (gzipped)", "/CRLF/UTF-16.txt.gz", "UTF-16", gzDecompress}, {"CRLF UTF-16 LE (gzipped)", "/CRLF/UTF-16_LE.txt.gz", "UTF-16LE", gzDecompress}, @@ -493,7 +504,7 @@ func TestEncoding(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { opts := component.Options{ - Logger: logging.NewNop(), + Logger: util.TestAlloyLogger(t), Registerer: prometheus.NewRegistry(), OnStateChange: func(e component.Exports) {}, DataPath: t.TempDir(), From 575f8719d6ed903175a2704ebe6fb406147e7d3c Mon Sep 17 00:00:00 2001 From: Kalle <23356117+kalleep@users.noreply.github.com> Date: Wed, 14 Jan 2026 11:56:56 +0100 Subject: [PATCH 09/21] Update text --- .../component/loki/source/file/internal/tail/config.go | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/internal/component/loki/source/file/internal/tail/config.go b/internal/component/loki/source/file/internal/tail/config.go index 028e507f21b..49447ed6253 100644 --- a/internal/component/loki/source/file/internal/tail/config.go +++ b/internal/component/loki/source/file/internal/tail/config.go @@ -14,12 +14,8 @@ type Config struct { // If 0, tailing starts from the beginning of the file. Offset int64 - // FIXME: Update text - // Decoder is an optional text decoder for non-UTF-8 encoded files. - // If the file is not UTF-8, the tailer must use the correct decoder - // or the output text may be corrupted. For example, if the file is - // "UTF-16 LE" encoded, the tailer would not separate new lines properly - // and the output could appear as garbled characters. + // Encoding used for file. If none is provided encoding.Nop is used + // and the file is assumed to be UTF-8. Encoding encoding.Encoding // WatcherConfig controls how the file system is polled for changes. From 1ec8c4b3d49089e601fa88badab3f0e1c9869ba0 Mon Sep 17 00:00:00 2001 From: Kalle <23356117+kalleep@users.noreply.github.com> Date: Wed, 14 Jan 2026 12:02:15 +0100 Subject: [PATCH 10/21] remove comment --- internal/component/loki/source/file/internal/tail/reader.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/component/loki/source/file/internal/tail/reader.go b/internal/component/loki/source/file/internal/tail/reader.go index 03eb1b2cc5b..685a5ba71e7 100644 --- a/internal/component/loki/source/file/internal/tail/reader.go +++ b/internal/component/loki/source/file/internal/tail/reader.go @@ -49,7 +49,7 @@ type reader struct { pos int64 br *bufio.Reader decoder *encoding.Decoder - enc encoding.Encoding // The encoding to use (set on creation, preserved on reset) + enc encoding.Encoding nl []byte lastNl byte From 239a9d59ddf8af1cb30943192aa7a05076b7ea0a Mon Sep 17 00:00:00 2001 From: Kalle <23356117+kalleep@users.noreply.github.com> Date: Wed, 14 Jan 2026 15:03:19 +0100 Subject: [PATCH 11/21] Update docs --- .../sources/reference/components/loki/loki.source.file.md | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/docs/sources/reference/components/loki/loki.source.file.md b/docs/sources/reference/components/loki/loki.source.file.md index 469dfb6aa2f..d3972025813 100644 --- a/docs/sources/reference/components/loki/loki.source.file.md +++ b/docs/sources/reference/components/loki/loki.source.file.md @@ -1,4 +1,4 @@ ---- +-the correct enconding will be used.-- canonical: https://grafana.com/docs/alloy/latest/reference/components/loki/loki.source.file/ aliases: - ../loki.source.file/ # /docs/alloy/latest/reference/components/loki.source.file/ @@ -48,8 +48,8 @@ You can use the following arguments with `loki.source.file`: | `on_positions_file_error` | `string` | How to handle a corrupt positions file entry for a given file. | `"restart_from_beginning"` | no | | `tail_from_end` | `bool` | Whether to tail from end if a stored position isn't found. | `false` | no | -The `encoding` argument must be a valid [IANA encoding][] name. -If not set, it defaults to UTF-8. +The `encoding` argument must be a valid [IANA encoding][] name and if not set, it defaults to UTF-8. {{< param "PRODUCT_NAME" >}} is able to automatically change +the encoding to `UTF-16` if the file includes a Byte Order Mark (BOM) for either `UTF-16BE` or `UTF-16LE`. You can use the `tail_from_end` argument when you want to tail a large file without reading its entire content. When set to true, only new logs are read, ignoring the existing ones. @@ -67,8 +67,6 @@ The format of the positions file is different in Grafana Alloy, so this will con This operation only occurs if the new positions file doesn't exist and the `legacy_positions_file` is valid. When `legacy_positions_file` is set, Alloy will try to find previous positions for a given file by matching the path and labels, falling back to matching on path only if no match is found. -If you want to read a UTF-16 file with a Byte Order Mark (BOM), set `encoding` to `UTF-16`. -BOMs will be ignored if `encoding` is set to either `UTF-16BE` or `UTF-16LE`. ## Blocks From cb2bc8b1af5f617a4ec9c2ad8bfb7961498234fd Mon Sep 17 00:00:00 2001 From: Kalle <23356117+kalleep@users.noreply.github.com> Date: Wed, 14 Jan 2026 15:09:17 +0100 Subject: [PATCH 12/21] fix --- docs/sources/reference/components/loki/loki.source.file.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/sources/reference/components/loki/loki.source.file.md b/docs/sources/reference/components/loki/loki.source.file.md index d3972025813..ad4d710abde 100644 --- a/docs/sources/reference/components/loki/loki.source.file.md +++ b/docs/sources/reference/components/loki/loki.source.file.md @@ -1,4 +1,4 @@ --the correct enconding will be used.-- +--- canonical: https://grafana.com/docs/alloy/latest/reference/components/loki/loki.source.file/ aliases: - ../loki.source.file/ # /docs/alloy/latest/reference/components/loki.source.file/ From 3d12bc6d7a64ca5a76350e2580aee55726f9f0d8 Mon Sep 17 00:00:00 2001 From: Karl Persson <23356117+kalleep@users.noreply.github.com> Date: Wed, 14 Jan 2026 15:21:06 +0100 Subject: [PATCH 13/21] Update docs/sources/reference/components/loki/loki.source.file.md Co-authored-by: Paulin Todev --- docs/sources/reference/components/loki/loki.source.file.md | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/sources/reference/components/loki/loki.source.file.md b/docs/sources/reference/components/loki/loki.source.file.md index ad4d710abde..dbb665832b7 100644 --- a/docs/sources/reference/components/loki/loki.source.file.md +++ b/docs/sources/reference/components/loki/loki.source.file.md @@ -50,6 +50,7 @@ You can use the following arguments with `loki.source.file`: The `encoding` argument must be a valid [IANA encoding][] name and if not set, it defaults to UTF-8. {{< param "PRODUCT_NAME" >}} is able to automatically change the encoding to `UTF-16` if the file includes a Byte Order Mark (BOM) for either `UTF-16BE` or `UTF-16LE`. +The BOM will be taken into account even if Alloy resumes tailing a file from the middle of the file. This can happen after Alloy is restarted. You can use the `tail_from_end` argument when you want to tail a large file without reading its entire content. When set to true, only new logs are read, ignoring the existing ones. From 25809ac75e5537dacc4db8bb36ff20ec07b30053 Mon Sep 17 00:00:00 2001 From: Kalle <23356117+kalleep@users.noreply.github.com> Date: Wed, 14 Jan 2026 16:08:03 +0100 Subject: [PATCH 14/21] remove lastOffset, we already track pos in reader. also we don't have to reseek in drain because we buffer non terminated lines returned after EOF. --- .../loki/source/file/internal/tail/file.go | 29 +++++-------------- 1 file changed, 8 insertions(+), 21 deletions(-) diff --git a/internal/component/loki/source/file/internal/tail/file.go b/internal/component/loki/source/file/internal/tail/file.go index a97a2a0ec68..76e465640fa 100644 --- a/internal/component/loki/source/file/internal/tail/file.go +++ b/internal/component/loki/source/file/internal/tail/file.go @@ -51,13 +51,12 @@ func NewFile(logger log.Logger, cfg *Config) (*File, error) { ctx, cancel := context.WithCancel(context.Background()) return &File{ - cfg: cfg, - logger: logger, - file: f, - reader: scanner, - ctx: ctx, - cancel: cancel, - lastOffset: cfg.Offset, + cfg: cfg, + logger: logger, + file: f, + reader: scanner, + ctx: ctx, + cancel: cancel, }, nil } @@ -73,8 +72,6 @@ type File struct { file *os.File reader *reader - lastOffset int64 - // bufferedLines stores lines that were read from an old file handle before // it was closed during file rotation. bufferedLines []Line @@ -117,11 +114,9 @@ read: return nil, err } - f.lastOffset = f.reader.position() - return &Line{ Text: text, - Offset: f.lastOffset, + Offset: f.reader.position(), Time: time.Now(), }, nil } @@ -165,14 +160,11 @@ func (f *File) wait() error { case eventTruncated: level.Debug(f.logger).Log("msg", "file truncated") // We need to reopen the file when it was truncated. - f.lastOffset = 0 return f.reopen(true) case eventDeleted: level.Debug(f.logger).Log("msg", "file deleted") // if a file is deleted we want to make sure we drain what's remaining in the open file. f.drain() - - f.lastOffset = 0 // In polling mode we could miss events when a file is deleted, so before we give up // we try to reopen the file. return f.reopen(false) @@ -195,11 +187,6 @@ func (f *File) offset() (int64, error) { // to ensure we don't lose any data from the old file before switching to the new one. // drain is best effort and will stop if it encounters any errors. func (f *File) drain() { - if _, err := f.file.Seek(f.lastOffset, io.SeekStart); err != nil { - return - } - f.reader.reset(f.file, f.lastOffset) - for { text, err := f.reader.next() if err != nil { @@ -276,7 +263,7 @@ func (f *File) reopen(truncated bool) error { } f.file = file - f.reader.reset(f.file, f.lastOffset) + f.reader.reset(f.file) break } From 07a9fccea6eb1620a23caa57ac2c5d490e176171 Mon Sep 17 00:00:00 2001 From: Kalle <23356117+kalleep@users.noreply.github.com> Date: Wed, 14 Jan 2026 16:08:59 +0100 Subject: [PATCH 15/21] No need to recreate encoder, nl and cr on reset. We assume it's the same --- .../loki/source/file/internal/tail/reader.go | 26 +++---------------- 1 file changed, 3 insertions(+), 23 deletions(-) diff --git a/internal/component/loki/source/file/internal/tail/reader.go b/internal/component/loki/source/file/internal/tail/reader.go index 685a5ba71e7..c04477657c4 100644 --- a/internal/component/loki/source/file/internal/tail/reader.go +++ b/internal/component/loki/source/file/internal/tail/reader.go @@ -121,29 +121,9 @@ func (r *reader) position() int64 { return r.pos } -func (r *reader) reset(f *os.File, offset int64) { - // Just skip BOM if needed, but keep the same encoding that was set on creation - if offset == 0 { - offset, _ = skipBOM(f, offset) - } - - // Recreate decoder and encoder with the stored encoding - r.decoder = r.enc.NewDecoder() - encoder := r.enc.NewEncoder() - - // Update newline and carriage return patterns - var err error - r.nl, err = encodedNewline(encoder) - if err != nil { - // If encoding fails, keep old values - this shouldn't happen in practice - return - } - r.lastNl = r.nl[len(r.nl)-1] - r.cr, err = encodedCarriageReturn(encoder) - if err != nil { - return - } - +func (r *reader) reset(f *os.File) { + // Skip BOM if needed, we asume that the rotated file have the same encoding. + offset, _ := skipBOM(f, 0) r.pos = offset r.br.Reset(f) r.pending = make([]byte, 0, defaultBufSize) From 29acecae533d26cd931c6534ba915c29f844342f Mon Sep 17 00:00:00 2001 From: Kalle <23356117+kalleep@users.noreply.github.com> Date: Wed, 14 Jan 2026 16:13:44 +0100 Subject: [PATCH 16/21] reset decoder to make sure it has a clean state --- internal/component/loki/source/file/internal/tail/reader.go | 1 + 1 file changed, 1 insertion(+) diff --git a/internal/component/loki/source/file/internal/tail/reader.go b/internal/component/loki/source/file/internal/tail/reader.go index c04477657c4..3ddc1781daa 100644 --- a/internal/component/loki/source/file/internal/tail/reader.go +++ b/internal/component/loki/source/file/internal/tail/reader.go @@ -126,6 +126,7 @@ func (r *reader) reset(f *os.File) { offset, _ := skipBOM(f, 0) r.pos = offset r.br.Reset(f) + r.decoder.Reset() r.pending = make([]byte, 0, defaultBufSize) } From 1b128e1e93089ec5a3524a95b96655bde46545d2 Mon Sep 17 00:00:00 2001 From: Kalle <23356117+kalleep@users.noreply.github.com> Date: Wed, 14 Jan 2026 16:17:40 +0100 Subject: [PATCH 17/21] Add comments --- internal/component/loki/source/file/internal/tail/reader.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/internal/component/loki/source/file/internal/tail/reader.go b/internal/component/loki/source/file/internal/tail/reader.go index 3ddc1781daa..c45d9350287 100644 --- a/internal/component/loki/source/file/internal/tail/reader.go +++ b/internal/component/loki/source/file/internal/tail/reader.go @@ -117,10 +117,14 @@ func (r *reader) consumeLine() ([]byte, bool) { return line, true } +// position returns the byte offset for completed lines, +// not necessarily all bytes consumed from the file. func (r *reader) position() int64 { return r.pos } +// reset prepares the reader for a new file handle, assuming the same encoding. +// It skips the BOM, resets the buffered reader and decoder, and clears pending data. func (r *reader) reset(f *os.File) { // Skip BOM if needed, we asume that the rotated file have the same encoding. offset, _ := skipBOM(f, 0) From 886e4e4a242ab63f51a1d32138541fa3775aa303 Mon Sep 17 00:00:00 2001 From: Kalle <23356117+kalleep@users.noreply.github.com> Date: Wed, 14 Jan 2026 17:16:00 +0100 Subject: [PATCH 18/21] Change encoding when UTF-8 BOM is detected and add test to verify that it works and we can continue from a recorded offset --- .../loki/source/file/internal/tail/bom.go | 13 ++- .../source/file/internal/tail/file_test.go | 87 +++++++++++++++++++ 2 files changed, 96 insertions(+), 4 deletions(-) diff --git a/internal/component/loki/source/file/internal/tail/bom.go b/internal/component/loki/source/file/internal/tail/bom.go index fae35c0715c..441138264d7 100644 --- a/internal/component/loki/source/file/internal/tail/bom.go +++ b/internal/component/loki/source/file/internal/tail/bom.go @@ -2,6 +2,7 @@ package tail import ( "bytes" + "fmt" "io" "os" @@ -51,16 +52,16 @@ func skipBOM(f *os.File, offset int64) (int64, []byte) { // of the BOM (0 if no BOM was detected). func detectBom(b []byte) int64 { switch { - case bytes.HasPrefix(b, bomUTF32BE): - return 4 - case bytes.HasPrefix(b, bomUTF32LE): - return 4 case bytes.HasPrefix(b, bomUTF8): return 3 case bytes.HasPrefix(b, bomUTF16BE): return 2 case bytes.HasPrefix(b, bomUTF16LE): return 2 + case bytes.HasPrefix(b, bomUTF32BE): + return 4 + case bytes.HasPrefix(b, bomUTF32LE): + return 4 default: return 0 } @@ -76,10 +77,14 @@ func resolveEncodingFromBOM(bomBytes []byte, originalEnc encoding.Encoding) enco } switch { + case bytes.HasPrefix(bomBytes, bomUTF8): + // UTF-8 BOM detected - return encoding + return encoding.Nop case bytes.HasPrefix(bomBytes, bomUTF16BE): // UTF-16 BE BOM detected - return encoding with IgnoreBOM since we skip it return unicode.UTF16(unicode.BigEndian, unicode.IgnoreBOM) case bytes.HasPrefix(bomBytes, bomUTF16LE): + fmt.Println("Detected UTF-16LE") // UTF-16 LE BOM detected - return encoding with IgnoreBOM since we skip it return unicode.UTF16(unicode.LittleEndian, unicode.IgnoreBOM) default: diff --git a/internal/component/loki/source/file/internal/tail/file_test.go b/internal/component/loki/source/file/internal/tail/file_test.go index 24e9bfaddf9..1e67b162df8 100644 --- a/internal/component/loki/source/file/internal/tail/file_test.go +++ b/internal/component/loki/source/file/internal/tail/file_test.go @@ -260,6 +260,93 @@ func TestFile(t *testing.T) { verify(t, file, &Line{Text: "newline1", Offset: 9}, nil) verify(t, file, &Line{Text: "newline2", Offset: 18}, nil) }) + + t.Run("should detect UTF-16LE encoding from BOM", func(t *testing.T) { + enc := unicode.UTF16(unicode.LittleEndian, unicode.UseBOM).NewEncoder() + encoded, err := enc.String("Hello, 世界\r\n") + require.NoError(t, err) + name := createFile(t, "utf-16LE", encoded) + defer removeFile(t, name) + + file, err := NewFile(log.NewNopLogger(), &Config{ + Filename: name, + // We are setting UTF8 here but still expect to decode the file using UTF-16LE + Encoding: unicode.UTF8, + }) + require.NoError(t, err) + + verify(t, file, &Line{Text: "Hello, 世界", Offset: 24}, nil) + file.Stop() + + enc = unicode.UTF16(unicode.LittleEndian, unicode.IgnoreBOM).NewEncoder() + encoded, err = enc.String("newline\r\n") + require.NoError(t, err) + appendToFile(t, name, encoded) + + // Reopen file from last offset to make sure it handles that. + file, err = NewFile(log.NewNopLogger(), &Config{ + Filename: name, + // We are setting UTF8 here but still expect to decode the file using UTF-16LE + Encoding: unicode.UTF8, + Offset: 24, + }) + require.NoError(t, err) + defer file.Stop() + + verify(t, file, &Line{Text: "newline", Offset: 42}, nil) + }) + + t.Run("should detect UTF-16BE encoding from BOM", func(t *testing.T) { + enc := unicode.UTF16(unicode.BigEndian, unicode.UseBOM).NewEncoder() + encoded, err := enc.String("Hello, 世界\r\n") + require.NoError(t, err) + name := createFile(t, "utf-16LE", encoded) + defer removeFile(t, name) + + file, err := NewFile(log.NewNopLogger(), &Config{ + Filename: name, + // We are setting UTF8 here but still expect to decode the file using UTF-16LE + Encoding: unicode.UTF8, + }) + require.NoError(t, err) + + verify(t, file, &Line{Text: "Hello, 世界", Offset: 24}, nil) + file.Stop() + + enc = unicode.UTF16(unicode.BigEndian, unicode.IgnoreBOM).NewEncoder() + encoded, err = enc.String("newline\r\n") + require.NoError(t, err) + appendToFile(t, name, encoded) + + // Reopen file from last offset. + file, err = NewFile(log.NewNopLogger(), &Config{ + Filename: name, + // We are setting UTF8 here but still expect to decode the file using UTF-16LE + Encoding: unicode.UTF8, + Offset: 24, + }) + require.NoError(t, err) + defer file.Stop() + + verify(t, file, &Line{Text: "newline", Offset: 42}, nil) + }) + + t.Run("should detect UTF-8 encoding from BOM", func(t *testing.T) { + bytes := []byte("Hello, 世界\r\n") + + name := createFile(t, "utf-8", string(append(bomUTF8, bytes...))) + defer removeFile(t, name) + + file, err := NewFile(log.NewNopLogger(), &Config{ + Filename: name, + // We are setting UTF-16BE here but still expect to decode the file using UTF-8 + Encoding: unicode.UTF16(unicode.BigEndian, unicode.UseBOM), + }) + require.NoError(t, err) + defer file.Stop() + + verify(t, file, &Line{Text: "Hello, 世界", Offset: 18}, nil) + }) } func createFile(t *testing.T, name, content string) string { From 0cd419a24204e129d33fa9f0654265736d58683d Mon Sep 17 00:00:00 2001 From: Karl Persson <23356117+kalleep@users.noreply.github.com> Date: Wed, 14 Jan 2026 17:18:14 +0100 Subject: [PATCH 19/21] Update docs/sources/reference/components/loki/loki.source.file.md Co-authored-by: Clayton Cornell <131809008+clayton-cornell@users.noreply.github.com> --- docs/sources/reference/components/loki/loki.source.file.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/sources/reference/components/loki/loki.source.file.md b/docs/sources/reference/components/loki/loki.source.file.md index dbb665832b7..bba939f9764 100644 --- a/docs/sources/reference/components/loki/loki.source.file.md +++ b/docs/sources/reference/components/loki/loki.source.file.md @@ -48,7 +48,7 @@ You can use the following arguments with `loki.source.file`: | `on_positions_file_error` | `string` | How to handle a corrupt positions file entry for a given file. | `"restart_from_beginning"` | no | | `tail_from_end` | `bool` | Whether to tail from end if a stored position isn't found. | `false` | no | -The `encoding` argument must be a valid [IANA encoding][] name and if not set, it defaults to UTF-8. {{< param "PRODUCT_NAME" >}} is able to automatically change +The `encoding` argument must be a valid [IANA encoding][] name and if not set, it defaults to UTF-8. {{< param "PRODUCT_NAME" >}} can automatically change the encoding to `UTF-16` if the file includes a Byte Order Mark (BOM) for either `UTF-16BE` or `UTF-16LE`. The BOM will be taken into account even if Alloy resumes tailing a file from the middle of the file. This can happen after Alloy is restarted. From 2ac65ac3da3f47151e716dea3bc670bed8820155 Mon Sep 17 00:00:00 2001 From: Karl Persson <23356117+kalleep@users.noreply.github.com> Date: Wed, 14 Jan 2026 17:18:24 +0100 Subject: [PATCH 20/21] Update docs/sources/reference/components/loki/loki.source.file.md Co-authored-by: Clayton Cornell <131809008+clayton-cornell@users.noreply.github.com> --- docs/sources/reference/components/loki/loki.source.file.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/sources/reference/components/loki/loki.source.file.md b/docs/sources/reference/components/loki/loki.source.file.md index bba939f9764..c6ee4897d33 100644 --- a/docs/sources/reference/components/loki/loki.source.file.md +++ b/docs/sources/reference/components/loki/loki.source.file.md @@ -50,7 +50,7 @@ You can use the following arguments with `loki.source.file`: The `encoding` argument must be a valid [IANA encoding][] name and if not set, it defaults to UTF-8. {{< param "PRODUCT_NAME" >}} can automatically change the encoding to `UTF-16` if the file includes a Byte Order Mark (BOM) for either `UTF-16BE` or `UTF-16LE`. -The BOM will be taken into account even if Alloy resumes tailing a file from the middle of the file. This can happen after Alloy is restarted. +The BOM will be taken into account even if {{< param "PRODUCT_NAME" >}} resumes tailing a file from the middle of the file. This can happen after {{< param "PRODUCT_NAME" >}} is restarted. You can use the `tail_from_end` argument when you want to tail a large file without reading its entire content. When set to true, only new logs are read, ignoring the existing ones. From 49a2b1d93760fea496f89ba1c80e02b0674ce340 Mon Sep 17 00:00:00 2001 From: Kalle <23356117+kalleep@users.noreply.github.com> Date: Thu, 15 Jan 2026 09:04:23 +0100 Subject: [PATCH 21/21] Remove print --- internal/component/loki/source/file/internal/tail/bom.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/internal/component/loki/source/file/internal/tail/bom.go b/internal/component/loki/source/file/internal/tail/bom.go index 441138264d7..01ddbdac962 100644 --- a/internal/component/loki/source/file/internal/tail/bom.go +++ b/internal/component/loki/source/file/internal/tail/bom.go @@ -2,7 +2,6 @@ package tail import ( "bytes" - "fmt" "io" "os" @@ -84,7 +83,6 @@ func resolveEncodingFromBOM(bomBytes []byte, originalEnc encoding.Encoding) enco // UTF-16 BE BOM detected - return encoding with IgnoreBOM since we skip it return unicode.UTF16(unicode.BigEndian, unicode.IgnoreBOM) case bytes.HasPrefix(bomBytes, bomUTF16LE): - fmt.Println("Detected UTF-16LE") // UTF-16 LE BOM detected - return encoding with IgnoreBOM since we skip it return unicode.UTF16(unicode.LittleEndian, unicode.IgnoreBOM) default: