Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions internal/component/loki/source/file/internal/tail/bom.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package tail

import (
"bufio"
"bytes"
"io"

"golang.org/x/text/encoding"
"golang.org/x/text/encoding/unicode"
Expand Down Expand Up @@ -30,15 +30,18 @@ const (

// detectBOM tries to detect a BOM from reader. It is important that the reader
// and underlying file are positioned at the beginning of the file
// when calling this function, as it peeks at the first bytes to detect the BOM.
func detectBOM(br *bufio.Reader, offset int64) (int64, BOM) {
// Peek up to 4 bytes (longest BOM)
buf, err := br.Peek(4)
// when calling this function, as it reads the first 4 bytes to detect the BOM.
func detectBOM(r io.Reader, offset int64) (int64, BOM) {
buf := make([]byte, 4)

n, err := r.Read(buf)
if err != nil {
return offset, bomUNKNOWN
}

var bom BOM
buf = buf[:n]

switch {
case bytes.HasPrefix(buf, bomUTF8Bytes):
bom = bomUTF8
Expand Down
4 changes: 4 additions & 0 deletions internal/component/loki/source/file/internal/tail/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,14 @@ import (
type Config struct {
// Filename is the path to the file to tail.
Filename string

// Offset is the byte offset in the file where tailing should start.
// If 0, tailing starts from the beginning of the file.
Offset int64

// StartFromEnd will read from the end of the file if true and Offset is 0.
StartFromEnd bool

// Encoding used for file. If none is provided no encoding is used
// and the file is assumed to be UTF-8.
Encoding string
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,11 @@ func getEncoding(enc string) (encoding.Encoding, error) {

return ianaindex.IANA.Encoding(enc)
}

func encodedNewline(e *encoding.Encoder) ([]byte, error) {
return e.Bytes([]byte{'\n'})
}

func encodedCarriageReturn(e *encoding.Encoder) ([]byte, error) {
return e.Bytes([]byte{'\r'})
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func NewFile(logger log.Logger, cfg *Config) (*File, error) {
return nil, err
}

reader, err := newReader(f, cfg.Offset, encoding, cfg.Compression)
reader, err := newReader(logger, f, cfg.Offset, encoding, cfg.Compression, cfg.StartFromEnd)
if err != nil {
f.Close()
return nil, err
Expand Down
98 changes: 92 additions & 6 deletions internal/component/loki/source/file/internal/tail/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -418,16 +418,57 @@ func TestFile(t *testing.T) {
utf16offsets = [3]int64{14, 26, 38}
)

var (
nopEncoder = encoding.Nop.NewEncoder()
utf16beEncoder = unicode.UTF16(unicode.BigEndian, unicode.IgnoreBOM).NewEncoder()
utf16beBOMEncoder = unicode.UTF16(unicode.BigEndian, unicode.UseBOM).NewEncoder()
utf16leEncoder = unicode.UTF16(unicode.LittleEndian, unicode.IgnoreBOM).NewEncoder()
utf16leBOMEncoder = unicode.UTF16(unicode.LittleEndian, unicode.UseBOM).NewEncoder()
)

t.Run("read gzip", func(t *testing.T) {
compressionTest(t, "plain", "gz", encoding.Nop.NewEncoder(), utf8offsets)
compressionTest(t, "utf-16be", "gz", unicode.UTF16(unicode.BigEndian, unicode.UseBOM).NewEncoder(), utf16offsets)
compressionTest(t, "utf-16le", "gz", unicode.UTF16(unicode.LittleEndian, unicode.UseBOM).NewEncoder(), utf16offsets)
compressionTest(t, "plain", "gz", nopEncoder, utf8offsets)
compressionTest(t, "utf-16be", "gz", utf16beBOMEncoder, utf16offsets)
compressionTest(t, "utf-16le", "gz", utf16leBOMEncoder, utf16offsets)
})

t.Run("read zlib", func(t *testing.T) {
compressionTest(t, "plain", "z", encoding.Nop.NewEncoder(), utf8offsets)
compressionTest(t, "utf-16be", "z", unicode.UTF16(unicode.BigEndian, unicode.UseBOM).NewEncoder(), utf16offsets)
compressionTest(t, "utf-16le", "z", unicode.UTF16(unicode.LittleEndian, unicode.UseBOM).NewEncoder(), utf16offsets)
compressionTest(t, "plain", "z", nopEncoder, utf8offsets)
compressionTest(t, "utf-16be", "z", utf16beBOMEncoder, utf16offsets)
compressionTest(t, "utf-16le", "z", utf16leBOMEncoder, utf16offsets)
})

t.Run("start from end", func(t *testing.T) {
startFromEndTest(t, "utf-8", nopEncoder, nopEncoder, false, 0, []Line{{Text: "line3", Offset: 18}})
startFromEndTest(t, "utf-16be", utf16beBOMEncoder, utf16beEncoder, false, 0, []Line{{Text: "line3", Offset: 38}})
startFromEndTest(t, "utf-16le", utf16leBOMEncoder, utf16leEncoder, false, 0, []Line{{Text: "line3", Offset: 38}})
})

t.Run("start from end with start offset", func(t *testing.T) {
startFromEndTest(t, "utf-8", nopEncoder, nopEncoder, false, 6, []Line{
{Text: "line2", Offset: 12},
{Text: "line3", Offset: 18},
})
startFromEndTest(t, "utf-8-cr", nopEncoder, nopEncoder, true, 7, []Line{
{Text: "line2", Offset: 14},
{Text: "line3", Offset: 21},
})
startFromEndTest(t, "utf-16be", utf16beBOMEncoder, utf16beEncoder, false, 14, []Line{
{Text: "line2", Offset: 26},
{Text: "line3", Offset: 38},
})
startFromEndTest(t, "utf-16be-cr", utf16beBOMEncoder, utf16beEncoder, true, 16, []Line{
{Text: "line2", Offset: 30},
{Text: "line3", Offset: 44},
})
startFromEndTest(t, "utf-16le", utf16leBOMEncoder, utf16leEncoder, false, 14, []Line{
{Text: "line2", Offset: 26},
{Text: "line3", Offset: 38},
})
startFromEndTest(t, "utf-16le", utf16leBOMEncoder, utf16leEncoder, true, 16, []Line{
{Text: "line2", Offset: 30},
{Text: "line3", Offset: 44},
})
})
}

Expand Down Expand Up @@ -473,6 +514,51 @@ func compressionTest(t *testing.T, name, compression string, enc *encoding.Encod
})
}

func startFromEndTest(t *testing.T, name string, encoder, appendEncoder *encoding.Encoder, useCR bool, offset int64, expected []Line) {
t.Run(name, func(t *testing.T) {
var (
content string
err error
)

if useCR {
content, err = encoder.String("line1\r\nline2\r\n")
} else {
content, err = encoder.String("line1\nline2\n")
}
require.NoError(t, err)

var toAppend string

if useCR {
toAppend, err = appendEncoder.String("line3\r\n")
} else {
toAppend, err = appendEncoder.String("line3\n")
}
require.NoError(t, err)

name := createFile(t, name, content)
defer removeFile(t, name)

file, err := NewFile(log.NewNopLogger(), &Config{
Filename: name,
Offset: offset,
StartFromEnd: true,
})
require.NoError(t, err)
defer file.Stop()

go func() {
time.Sleep(100 * time.Millisecond)
appendToFile(t, name, toAppend)
}()

for _, line := range expected {
verifyResult(t, file, &line, nil)
}
})
}

func createFile(tb testing.TB, name, content string) string {
path := tb.TempDir() + "/" + name
require.NoError(tb, os.WriteFile(path, []byte(content), 0600))
Expand Down
61 changes: 26 additions & 35 deletions internal/component/loki/source/file/internal/tail/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,23 +11,23 @@ import (
"os"
"unsafe"

"github.com/go-kit/log"
"golang.org/x/text/encoding"

"github.com/grafana/alloy/internal/runtime/logging/level"
)

const defaultBufSize = 4096

// newReader creates a new reader that is used to read from file.
// It is important that the provided file is positioned at the start of the file.
func newReader(f *os.File, offset int64, enc encoding.Encoding, compression string) (*reader, error) {
func newReader(logger log.Logger, f *os.File, offset int64, enc encoding.Encoding, compression string, startFromEnd bool) (*reader, error) {
rr, err := newReaderAt(f, compression, 0)
if err != nil {
return nil, err
}

br := bufio.NewReader(rr)

var bom BOM
offset, bom = detectBOM(br, offset)
offsetAfterBOM, bom := detectBOM(rr, offset)
enc = resolveEncodingFromBOM(bom, enc)

var (
Expand All @@ -45,17 +45,25 @@ func newReader(f *os.File, offset int64, enc encoding.Encoding, compression stri
return nil, err
}

if offset != 0 {
rr, err = newReaderAt(f, compression, offset)
if offset == 0 && startFromEnd {
offset, err = lastNewline(f, nl)
if err != nil {
return nil, err
level.Error(logger).Log("msg", "failed to get a position from the end of the file, default to start of file", "error", err)
}
br.Reset(rr)
}

if offsetAfterBOM > offset {
offset = offsetAfterBOM
}

rr, err = newReaderAt(f, compression, offset)
if err != nil {
return nil, err
}

return &reader{
pos: offset,
br: br,
br: bufio.NewReader(rr),
decoder: decoder,
nl: nl,
lastNl: nl[len(nl)-1],
Expand Down Expand Up @@ -159,37 +167,22 @@ func (r *reader) reset(f *os.File, offset int64) error {
if err != nil {
return err
}
r.br.Reset(rr)

offset, _ = detectBOM(r.br, offset)
if offset != 0 {
rr, err = newReaderAt(f, r.compression, offset)
if err != nil {
return nil
}
r.br.Reset(rr)
offset, _ = detectBOM(rr, offset)
rr, err = newReaderAt(f, r.compression, offset)
if err != nil {
return err
}

r.br.Reset(rr)
r.pos = offset
r.pending = make([]byte, 0, defaultBufSize)
return nil
}

func encodedNewline(e *encoding.Encoder) ([]byte, error) {
out := make([]byte, 10)
nDst, _, err := e.Transform(out, []byte{'\n'}, true)
return out[:nDst], err
}

func encodedCarriageReturn(e *encoding.Encoder) ([]byte, error) {
out := make([]byte, 10)
nDst, _, err := e.Transform(out, []byte{'\r'}, true)
return out[:nDst], err
}

func newReaderAt(f *os.File, compression string, offset int64) (io.Reader, error) {
// NOTE: If compression is used we always need to read from the beginning.
if compression != "" && offset != 0 {
if compression != "" {
if _, err := f.Seek(0, io.SeekStart); err != nil {
return nil, err
}
Expand All @@ -208,10 +201,8 @@ func newReaderAt(f *os.File, compression string, offset int64) (io.Reader, error
case "bz2":
reader = bzip2.NewReader(f)
default:
if offset != 0 {
if _, err := f.Seek(offset, io.SeekStart); err != nil {
return nil, err
}
if _, err := f.Seek(offset, io.SeekStart); err != nil {
return nil, err
}

reader = f
Expand Down
54 changes: 54 additions & 0 deletions internal/component/loki/source/file/internal/tail/util.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package tail

import (
"bytes"
"io"
"os"
)

// lastNewline returns the offset of the start of the last line in the file.
func lastNewline(file *os.File, nl []byte) (int64, error) {
fi, err := file.Stat()
if err != nil {
return 0, err
}

n := fi.Size()
if n == 0 {
return 0, nil
}

const chunkSize = 1024
buf := make([]byte, chunkSize)

var pos = n - chunkSize
if pos < 0 {
pos = 0
}

for {
_, err = file.Seek(pos, io.SeekStart)
if err != nil {
return 0, err
}

bytesRead, err := file.Read(buf)
if err != nil {
return 0, err
}

i := bytes.LastIndex(buf[:bytesRead], nl)
if i != -1 {
return pos + int64(i) + int64(len(nl)), nil
}

if pos == 0 {
return 0, nil
}

pos -= chunkSize
if pos < 0 {
pos = 0
}
}
}
Loading
Loading