diff --git a/docs/sources/reference/components/loki/loki.source.file.md b/docs/sources/reference/components/loki/loki.source.file.md index 469dfb6aa2f..c6ee4897d33 100644 --- a/docs/sources/reference/components/loki/loki.source.file.md +++ b/docs/sources/reference/components/loki/loki.source.file.md @@ -48,8 +48,9 @@ You can use the following arguments with `loki.source.file`: | `on_positions_file_error` | `string` | How to handle a corrupt positions file entry for a given file. | `"restart_from_beginning"` | no | | `tail_from_end` | `bool` | Whether to tail from end if a stored position isn't found. | `false` | no | -The `encoding` argument must be a valid [IANA encoding][] name. -If not set, it defaults to UTF-8. +The `encoding` argument must be a valid [IANA encoding][] name and if not set, it defaults to UTF-8. {{< param "PRODUCT_NAME" >}} can automatically change +the encoding to `UTF-16` if the file includes a Byte Order Mark (BOM) for either `UTF-16BE` or `UTF-16LE`. +The BOM will be taken into account even if {{< param "PRODUCT_NAME" >}} resumes tailing a file from the middle of the file. This can happen after {{< param "PRODUCT_NAME" >}} is restarted. You can use the `tail_from_end` argument when you want to tail a large file without reading its entire content. When set to true, only new logs are read, ignoring the existing ones. @@ -67,8 +68,6 @@ The format of the positions file is different in Grafana Alloy, so this will con This operation only occurs if the new positions file doesn't exist and the `legacy_positions_file` is valid. When `legacy_positions_file` is set, Alloy will try to find previous positions for a given file by matching the path and labels, falling back to matching on path only if no match is found. -If you want to read a UTF-16 file with a Byte Order Mark (BOM), set `encoding` to `UTF-16`. -BOMs will be ignored if `encoding` is set to either `UTF-16BE` or `UTF-16LE`. ## Blocks diff --git a/internal/component/loki/source/file/decompresser.go b/internal/component/loki/source/file/decompresser.go index 10f9070ce00..2c202e5ef90 100644 --- a/internal/component/loki/source/file/decompresser.go +++ b/internal/component/loki/source/file/decompresser.go @@ -93,7 +93,7 @@ func newDecompressor( } } - decoder, err := getDecoder(opts.encoding) + enc, err := getEncoding(opts.encoding) if err != nil { return nil, fmt.Errorf("failed to get decoder: %w", err) } @@ -107,7 +107,7 @@ func newDecompressor( labels: opts.labels, running: atomic.NewBool(false), position: position, - decoder: decoder, + decoder: enc.NewDecoder(), cfg: opts.decompressionConfig, onPositionsFileError: opts.onPositionsFileError, componentStopping: componentStopping, diff --git a/internal/component/loki/source/file/encoding.go b/internal/component/loki/source/file/encoding.go new file mode 100644 index 00000000000..abfc7d36b98 --- /dev/null +++ b/internal/component/loki/source/file/encoding.go @@ -0,0 +1,14 @@ +package file + +import ( + "golang.org/x/text/encoding" + "golang.org/x/text/encoding/ianaindex" +) + +func getEncoding(enc string) (encoding.Encoding, error) { + if enc == "" { + return encoding.Nop, nil + } + + return ianaindex.IANA.Encoding(enc) +} diff --git a/internal/component/loki/source/file/internal/tail/bom.go b/internal/component/loki/source/file/internal/tail/bom.go new file mode 100644 index 00000000000..01ddbdac962 --- /dev/null +++ b/internal/component/loki/source/file/internal/tail/bom.go @@ -0,0 +1,92 @@ +package tail + +import ( + "bytes" + "io" + "os" + + "golang.org/x/text/encoding" + "golang.org/x/text/encoding/unicode" +) + +// BOM byte sequences +var ( + bomUTF32BE = []byte{0x00, 0x00, 0xFE, 0xFF} + bomUTF32LE = []byte{0xFF, 0xFE, 0x00, 0x00} + bomUTF8 = []byte{0xEF, 0xBB, 0xBF} + bomUTF16BE = []byte{0xFE, 0xFF} + bomUTF16LE = []byte{0xFF, 0xFE} +) + +// skipBOM detects and skips a BOM at the beginning of the file. +// It takes the current offset and returns the final offset +// and the BOM bytes that were detected. +// The file is positioned at the start to read the BOM, then +// seeks to the final offset for subsequent reads. +func skipBOM(f *os.File, offset int64) (int64, []byte) { + // Make sure we are reading from the start of the file. + f.Seek(0, io.SeekStart) + + // Read up to 4 bytes (longest BOM) + var buf [4]byte + n, err := f.Read(buf[:]) + if err != nil && n == 0 { + return offset, nil + } + + bomLen := detectBom(buf[:n]) + + // If a BOM was detected and its length is greater than or equal to the + // provided offset, use the BOM length as the offset. Otherwise, use the + // provided offset (which may be beyond the BOM). + if bomLen >= offset { + offset = bomLen + } + + f.Seek(offset, io.SeekStart) + return offset, buf[:bomLen] +} + +// detectBom detects a BOM in the given bytes and returns the length +// of the BOM (0 if no BOM was detected). +func detectBom(b []byte) int64 { + switch { + case bytes.HasPrefix(b, bomUTF8): + return 3 + case bytes.HasPrefix(b, bomUTF16BE): + return 2 + case bytes.HasPrefix(b, bomUTF16LE): + return 2 + case bytes.HasPrefix(b, bomUTF32BE): + return 4 + case bytes.HasPrefix(b, bomUTF32LE): + return 4 + default: + return 0 + } +} + +// resolveEncodingFromBOM takes the BOM bytes and the original encoding, +// and returns the resolved encoding. If a UTF-16 BOM is detected, it returns +// an encoding with the correct endianness and IgnoreBOM mode. +// Otherwise, it returns the original encoding. +func resolveEncodingFromBOM(bomBytes []byte, originalEnc encoding.Encoding) encoding.Encoding { + if len(bomBytes) == 0 { + return originalEnc + } + + switch { + case bytes.HasPrefix(bomBytes, bomUTF8): + // UTF-8 BOM detected - return encoding + return encoding.Nop + case bytes.HasPrefix(bomBytes, bomUTF16BE): + // UTF-16 BE BOM detected - return encoding with IgnoreBOM since we skip it + return unicode.UTF16(unicode.BigEndian, unicode.IgnoreBOM) + case bytes.HasPrefix(bomBytes, bomUTF16LE): + // UTF-16 LE BOM detected - return encoding with IgnoreBOM since we skip it + return unicode.UTF16(unicode.LittleEndian, unicode.IgnoreBOM) + default: + // Other BOMs (UTF-8, UTF-32) don't affect encoding selection + return originalEnc + } +} diff --git a/internal/component/loki/source/file/internal/tail/config.go b/internal/component/loki/source/file/internal/tail/config.go index 2decfa3a73e..49447ed6253 100644 --- a/internal/component/loki/source/file/internal/tail/config.go +++ b/internal/component/loki/source/file/internal/tail/config.go @@ -14,12 +14,9 @@ type Config struct { // If 0, tailing starts from the beginning of the file. Offset int64 - // Decoder is an optional text decoder for non-UTF-8 encoded files. - // If the file is not UTF-8, the tailer must use the correct decoder - // or the output text may be corrupted. For example, if the file is - // "UTF-16 LE" encoded, the tailer would not separate new lines properly - // and the output could appear as garbled characters. - Decoder *encoding.Decoder + // Encoding used for file. If none is provided encoding.Nop is used + // and the file is assumed to be UTF-8. + Encoding encoding.Encoding // WatcherConfig controls how the file system is polled for changes. WatcherConfig WatcherConfig diff --git a/internal/component/loki/source/file/internal/tail/file.go b/internal/component/loki/source/file/internal/tail/file.go index e1a25d23c04..76e465640fa 100644 --- a/internal/component/loki/source/file/internal/tail/file.go +++ b/internal/component/loki/source/file/internal/tail/file.go @@ -1,18 +1,17 @@ package tail import ( - "bufio" "context" "errors" "fmt" "io" "os" - "strings" "sync" "time" "github.com/go-kit/log" "github.com/grafana/dskit/backoff" + "golang.org/x/text/encoding" "github.com/grafana/alloy/internal/component/loki/source/file/internal/tail/fileext" "github.com/grafana/alloy/internal/runtime/logging/level" @@ -28,6 +27,14 @@ func NewFile(logger log.Logger, cfg *Config) (*File, error) { return nil, err } + if cfg.Encoding == nil { + cfg.Encoding = encoding.Nop + } + + if cfg.WatcherConfig == (WatcherConfig{}) { + cfg.WatcherConfig = defaultWatcherConfig + } + if cfg.Offset != 0 { // Seek to provided offset if _, err := f.Seek(cfg.Offset, io.SeekStart); err != nil { @@ -35,19 +42,19 @@ func NewFile(logger log.Logger, cfg *Config) (*File, error) { } } - if cfg.WatcherConfig == (WatcherConfig{}) { - cfg.WatcherConfig = defaultWatcherConfig + scanner, err := newReader(f, cfg.Offset, cfg.Encoding) + if err != nil { + return nil, err } cfg.WatcherConfig.MinPollFrequency = min(cfg.WatcherConfig.MinPollFrequency, cfg.WatcherConfig.MaxPollFrequency) - ctx, cancel := context.WithCancel(context.Background()) return &File{ cfg: cfg, logger: logger, file: f, - reader: newReader(f, cfg), + reader: scanner, ctx: ctx, cancel: cancel, }, nil @@ -63,9 +70,7 @@ type File struct { // protects file, reader, and lastOffset. mu sync.Mutex file *os.File - reader *bufio.Reader - - lastOffset int64 + reader *reader // bufferedLines stores lines that were read from an old file handle before // it was closed during file rotation. @@ -98,10 +103,10 @@ read: return &line, nil } - text, err := f.readLine() + text, err := f.reader.next() if err != nil { if errors.Is(err, io.EOF) { - if err := f.wait(text != ""); err != nil { + if err := f.wait(); err != nil { return nil, err } goto read @@ -109,16 +114,9 @@ read: return nil, err } - offset, err := f.offset() - if err != nil { - return nil, err - } - - f.lastOffset = offset - return &Line{ Text: text, - Offset: offset, + Offset: f.reader.position(), Time: time.Now(), }, nil } @@ -148,7 +146,7 @@ func (f *File) Stop() error { // wait blocks until a file event is detected (modification, truncation, or deletion). // Returns an error if the context is canceled or an unrecoverable error occurs. -func (f *File) wait(partial bool) error { +func (f *File) wait() error { offset, err := f.offset() if err != nil { return err @@ -158,11 +156,6 @@ func (f *File) wait(partial bool) error { switch event { case eventModified: level.Debug(f.logger).Log("msg", "file modified") - if partial { - // We need to reset to last successful offset because we consumed a partial line. - f.file.Seek(f.lastOffset, io.SeekStart) - f.reader.Reset(f.file) - } return nil case eventTruncated: level.Debug(f.logger).Log("msg", "file truncated") @@ -172,8 +165,6 @@ func (f *File) wait(partial bool) error { level.Debug(f.logger).Log("msg", "file deleted") // if a file is deleted we want to make sure we drain what's remaining in the open file. f.drain() - - f.lastOffset = 0 // In polling mode we could miss events when a file is deleted, so before we give up // we try to reopen the file. return f.reopen(false) @@ -182,14 +173,13 @@ func (f *File) wait(partial bool) error { } } -// readLine reads a single line from the file, including the newline character. -// The newline and any trailing carriage return (for Windows line endings) are stripped. -func (f *File) readLine() (string, error) { - line, err := f.reader.ReadString('\n') +// offset returns the current byte offset in the file where the next read will occur. +func (f *File) offset() (int64, error) { + offset, err := f.file.Seek(0, io.SeekCurrent) if err != nil { - return line, err + return 0, err } - return strings.TrimRight(line, "\r\n"), err + return offset, nil } // drain reads all remaining complete lines from the current file handle and stores @@ -197,52 +187,27 @@ func (f *File) readLine() (string, error) { // to ensure we don't lose any data from the old file before switching to the new one. // drain is best effort and will stop if it encounters any errors. func (f *File) drain() { - if _, err := f.file.Seek(f.lastOffset, io.SeekStart); err != nil { - return - } - f.reader.Reset(f.file) - for { - text, err := f.readLine() + text, err := f.reader.next() if err != nil { if text != "" { - offset, err := f.offset() - if err != nil { - return - } f.bufferedLines = append(f.bufferedLines, Line{ Text: text, - Offset: offset, + Offset: f.reader.position(), Time: time.Now(), }) } return } - offset, err := f.offset() - if err != nil { - return - } - f.bufferedLines = append(f.bufferedLines, Line{ Text: text, - Offset: offset, + Offset: f.reader.position(), Time: time.Now(), }) } } -// offset returns the current byte offset in the file where the next read will occur. -// It accounts for buffered data in the reader. -func (f *File) offset() (int64, error) { - offset, err := f.file.Seek(0, io.SeekCurrent) - if err != nil { - return 0, err - } - - return offset - int64(f.reader.Buffered()), nil -} - // reopen closes the current file handle and opens a new one for the same file path. // If truncated is true, it indicates the file was truncated and we should reopen immediately. // If truncated is false, it indicates the file was deleted or moved, and we should wait @@ -298,16 +263,9 @@ func (f *File) reopen(truncated bool) error { } f.file = file - f.reader.Reset(f.file) + f.reader.reset(f.file) break } return backoff.Err() } - -func newReader(f *os.File, cfg *Config) *bufio.Reader { - if cfg.Decoder != nil { - return bufio.NewReader(cfg.Decoder.Reader(f)) - } - return bufio.NewReader(f) -} diff --git a/internal/component/loki/source/file/internal/tail/file_test.go b/internal/component/loki/source/file/internal/tail/file_test.go index 55633850c76..1e67b162df8 100644 --- a/internal/component/loki/source/file/internal/tail/file_test.go +++ b/internal/component/loki/source/file/internal/tail/file_test.go @@ -202,18 +202,18 @@ func TestFile(t *testing.T) { t.Run("UTF-16LE", func(t *testing.T) { file, err := NewFile(log.NewNopLogger(), &Config{ Filename: "testdata/mssql.log", - Decoder: unicode.UTF16(unicode.LittleEndian, unicode.ExpectBOM).NewDecoder(), + Encoding: unicode.UTF16(unicode.LittleEndian, unicode.IgnoreBOM), }) require.NoError(t, err) defer file.Stop() - verify(t, file, &Line{Text: "2025-03-11 11:11:02.58 Server Microsoft SQL Server 2019 (RTM) - 15.0.2000.5 (X64) ", Offset: 528}, nil) - verify(t, file, &Line{Text: " Sep 24 2019 13:48:23 ", Offset: 552}, nil) - verify(t, file, &Line{Text: " Copyright (C) 2019 Microsoft Corporation", Offset: 595}, nil) - verify(t, file, &Line{Text: " Enterprise Edition (64-bit) on Windows Server 2022 Standard 10.0 (Build 20348: ) (Hypervisor)", Offset: 697}, nil) - verify(t, file, &Line{Text: "", Offset: 699}, nil) - verify(t, file, &Line{Text: "2025-03-11 11:11:02.71 Server UTC adjustment: 1:00", Offset: 756}, nil) - verify(t, file, &Line{Text: "2025-03-11 11:11:02.71 Server (c) Microsoft Corporation.", Offset: 819}, nil) + verify(t, file, &Line{Text: "2025-03-11 11:11:02.58 Server Microsoft SQL Server 2019 (RTM) - 15.0.2000.5 (X64) ", Offset: 180}, nil) + verify(t, file, &Line{Text: " Sep 24 2019 13:48:23 ", Offset: 228}, nil) + verify(t, file, &Line{Text: " Copyright (C) 2019 Microsoft Corporation", Offset: 314}, nil) + verify(t, file, &Line{Text: " Enterprise Edition (64-bit) on Windows Server 2022 Standard 10.0 (Build 20348: ) (Hypervisor)", Offset: 518}, nil) + verify(t, file, &Line{Text: "", Offset: 522}, nil) + verify(t, file, &Line{Text: "2025-03-11 11:11:02.71 Server UTC adjustment: 1:00", Offset: 636}, nil) + verify(t, file, &Line{Text: "2025-03-11 11:11:02.71 Server (c) Microsoft Corporation.", Offset: 762}, nil) verify(t, file, &Line{Text: "2025-03-11 11:11:02.72 Server All rights reserved.", Offset: 876}, nil) }) @@ -232,7 +232,7 @@ func TestFile(t *testing.T) { }) t.Run("file rotation drains remaining lines from old file", func(t *testing.T) { - name := createFile(t, "rotation", "line1\nline2\nline3\nline4\npartial") + name := createFile(t, "rotation", "line1\nline2\nline3\nline4\n") defer removeFile(t, name) file, err := NewFile(log.NewNopLogger(), &Config{ @@ -257,10 +257,96 @@ func TestFile(t *testing.T) { // Verify we get the remaining old lines first, then new lines verify(t, file, &Line{Text: "line3", Offset: 18}, nil) verify(t, file, &Line{Text: "line4", Offset: 24}, nil) - verify(t, file, &Line{Text: "partial", Offset: 31}, nil) verify(t, file, &Line{Text: "newline1", Offset: 9}, nil) verify(t, file, &Line{Text: "newline2", Offset: 18}, nil) }) + + t.Run("should detect UTF-16LE encoding from BOM", func(t *testing.T) { + enc := unicode.UTF16(unicode.LittleEndian, unicode.UseBOM).NewEncoder() + encoded, err := enc.String("Hello, 世界\r\n") + require.NoError(t, err) + name := createFile(t, "utf-16LE", encoded) + defer removeFile(t, name) + + file, err := NewFile(log.NewNopLogger(), &Config{ + Filename: name, + // We are setting UTF8 here but still expect to decode the file using UTF-16LE + Encoding: unicode.UTF8, + }) + require.NoError(t, err) + + verify(t, file, &Line{Text: "Hello, 世界", Offset: 24}, nil) + file.Stop() + + enc = unicode.UTF16(unicode.LittleEndian, unicode.IgnoreBOM).NewEncoder() + encoded, err = enc.String("newline\r\n") + require.NoError(t, err) + appendToFile(t, name, encoded) + + // Reopen file from last offset to make sure it handles that. + file, err = NewFile(log.NewNopLogger(), &Config{ + Filename: name, + // We are setting UTF8 here but still expect to decode the file using UTF-16LE + Encoding: unicode.UTF8, + Offset: 24, + }) + require.NoError(t, err) + defer file.Stop() + + verify(t, file, &Line{Text: "newline", Offset: 42}, nil) + }) + + t.Run("should detect UTF-16BE encoding from BOM", func(t *testing.T) { + enc := unicode.UTF16(unicode.BigEndian, unicode.UseBOM).NewEncoder() + encoded, err := enc.String("Hello, 世界\r\n") + require.NoError(t, err) + name := createFile(t, "utf-16LE", encoded) + defer removeFile(t, name) + + file, err := NewFile(log.NewNopLogger(), &Config{ + Filename: name, + // We are setting UTF8 here but still expect to decode the file using UTF-16LE + Encoding: unicode.UTF8, + }) + require.NoError(t, err) + + verify(t, file, &Line{Text: "Hello, 世界", Offset: 24}, nil) + file.Stop() + + enc = unicode.UTF16(unicode.BigEndian, unicode.IgnoreBOM).NewEncoder() + encoded, err = enc.String("newline\r\n") + require.NoError(t, err) + appendToFile(t, name, encoded) + + // Reopen file from last offset. + file, err = NewFile(log.NewNopLogger(), &Config{ + Filename: name, + // We are setting UTF8 here but still expect to decode the file using UTF-16LE + Encoding: unicode.UTF8, + Offset: 24, + }) + require.NoError(t, err) + defer file.Stop() + + verify(t, file, &Line{Text: "newline", Offset: 42}, nil) + }) + + t.Run("should detect UTF-8 encoding from BOM", func(t *testing.T) { + bytes := []byte("Hello, 世界\r\n") + + name := createFile(t, "utf-8", string(append(bomUTF8, bytes...))) + defer removeFile(t, name) + + file, err := NewFile(log.NewNopLogger(), &Config{ + Filename: name, + // We are setting UTF-16BE here but still expect to decode the file using UTF-8 + Encoding: unicode.UTF16(unicode.BigEndian, unicode.UseBOM), + }) + require.NoError(t, err) + defer file.Stop() + + verify(t, file, &Line{Text: "Hello, 世界", Offset: 18}, nil) + }) } func createFile(t *testing.T, name, content string) string { diff --git a/internal/component/loki/source/file/internal/tail/reader.go b/internal/component/loki/source/file/internal/tail/reader.go new file mode 100644 index 00000000000..c45d9350287 --- /dev/null +++ b/internal/component/loki/source/file/internal/tail/reader.go @@ -0,0 +1,147 @@ +package tail + +import ( + "bufio" + "bytes" + "os" + "unsafe" + + "golang.org/x/text/encoding" +) + +const defaultBufSize = 4096 + +func newReader(f *os.File, offset int64, enc encoding.Encoding) (*reader, error) { + var bomBytes []byte + offset, bomBytes = skipBOM(f, offset) + enc = resolveEncodingFromBOM(bomBytes, enc) + + var ( + decoder = enc.NewDecoder() + encoder = enc.NewEncoder() + ) + + nl, err := encodedNewline(encoder) + if err != nil { + return nil, err + } + + cr, err := encodedCarriageReturn(encoder) + if err != nil { + return nil, err + } + + reader := &reader{ + pos: offset, + br: bufio.NewReader(f), + decoder: decoder, + enc: enc, // Store the encoding (after BOM detection) for use in reset + nl: nl, + lastNl: nl[len(nl)-1], + cr: cr, + pending: make([]byte, 0, defaultBufSize), + } + + return reader, nil +} + +type reader struct { + pos int64 + br *bufio.Reader + decoder *encoding.Decoder + enc encoding.Encoding + + nl []byte + lastNl byte + cr []byte + pending []byte +} + +func (r *reader) next() (string, error) { + // First we check if we already have a full line buffered. + if line, ok := r.consumeLine(); ok { + return r.decode(line) + } + + for { + + // Read more data up until the last byte of nl. + chunk, err := r.br.ReadBytes(r.lastNl) + if len(chunk) > 0 { + r.pending = append(r.pending, chunk...) + + if line, ok := r.consumeLine(); ok { + return r.decode(line) + } + } + + // If we did not get an error and did not find a full line we + // need to read more data. + if err == nil { + continue + } + + return "", err + } +} + +func (r *reader) decode(line []byte) (string, error) { + // Decode the line we have consumed. + converted, err := r.decoder.Bytes(bytes.TrimSuffix(line, r.cr)) + if err != nil { + return "", err + } + + // It is safe to convert this into a string here because converter will always copy + // the bytes given to it, even Nop decoder will do that. + return unsafe.String(unsafe.SliceData(converted), len(converted)), nil +} + +// consumeLine checks pending for the delimiter; if found, it splits +// pending into line and remainder. +func (r *reader) consumeLine() ([]byte, bool) { + // Check if pending contains a full line. + i := bytes.Index(r.pending, r.nl) + if i < 0 { + return nil, false + } + + // Extract everything up until newline. + line := r.pending[:i] + // Keep everything except the line we extraxted and newline. + rem := r.pending[i+len(r.nl):] + r.pending = append(make([]byte, 0, defaultBufSize), rem...) + + // Advance the position on bytes we have consumed as a full line. + r.pos += int64(len(line) + len(r.nl)) + return line, true +} + +// position returns the byte offset for completed lines, +// not necessarily all bytes consumed from the file. +func (r *reader) position() int64 { + return r.pos +} + +// reset prepares the reader for a new file handle, assuming the same encoding. +// It skips the BOM, resets the buffered reader and decoder, and clears pending data. +func (r *reader) reset(f *os.File) { + // Skip BOM if needed, we asume that the rotated file have the same encoding. + offset, _ := skipBOM(f, 0) + r.pos = offset + r.br.Reset(f) + r.decoder.Reset() + r.pending = make([]byte, 0, defaultBufSize) +} + +func encodedNewline(e *encoding.Encoder) ([]byte, error) { + out := make([]byte, 10) + nDst, _, err := e.Transform(out, []byte{'\n'}, true) + return out[:nDst], err +} + +func encodedCarriageReturn(e *encoding.Encoder) ([]byte, error) { + out := make([]byte, 10) + nDst, _, err := e.Transform(out, []byte{'\r'}, true) + return out[:nDst], err +} diff --git a/internal/component/loki/source/file/tailer.go b/internal/component/loki/source/file/tailer.go index 912c268151d..aa67fdb9c46 100644 --- a/internal/component/loki/source/file/tailer.go +++ b/internal/component/loki/source/file/tailer.go @@ -18,7 +18,6 @@ import ( "github.com/prometheus/common/model" "go.uber.org/atomic" "golang.org/x/text/encoding" - "golang.org/x/text/encoding/ianaindex" "github.com/grafana/alloy/internal/component/common/loki" "github.com/grafana/alloy/internal/component/loki/source/file/internal/tail" @@ -47,8 +46,8 @@ type tailer struct { report sync.Once - file *tail.File - decoder *encoding.Decoder + file *tail.File + enc encoding.Encoding } func newTailer( @@ -60,7 +59,7 @@ func newTailer( opts sourceOptions, ) (*tailer, error) { - decoder, err := getDecoder(opts.encoding) + enc, err := getEncoding(opts.encoding) if err != nil { return nil, fmt.Errorf("failed to get decoder: %w", err) } @@ -82,7 +81,7 @@ func newTailer( }, componentStopping: componentStopping, report: sync.Once{}, - decoder: decoder, + enc: enc, } return tailer, nil @@ -241,7 +240,7 @@ func (t *tailer) initRun() error { tail, err := tail.NewFile(t.logger, &tail.Config{ Filename: t.key.Path, Offset: pos, - Decoder: t.decoder, + Encoding: t.enc, WatcherConfig: t.watcherConfig, }) @@ -254,18 +253,6 @@ func (t *tailer) initRun() error { return nil } -func getDecoder(encoding string) (*encoding.Decoder, error) { - if encoding == "" { - return nil, nil - } - - encoder, err := ianaindex.IANA.Encoding(encoding) - if err != nil { - return nil, fmt.Errorf("failed to get IANA encoding %s: %w", encoding, err) - } - return encoder.NewDecoder(), nil -} - // readLines reads lines from the tailed file by calling Next() in a loop. // It processes each line by sending it to the receiver's channel and updates // position tracking periodically. It exits when Next() returns an error,