Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
167 changes: 167 additions & 0 deletions internal/component/loki/source/file/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/goleak"
"golang.org/x/text/encoding/unicode"

"github.com/grafana/alloy/internal/component"
"github.com/grafana/alloy/internal/component/common/loki"
Expand Down Expand Up @@ -600,6 +601,172 @@ func TestEncoding(t *testing.T) {
})
}

func TestEncodingWithLogRotation(t *testing.T) {
defer goleak.VerifyNone(t, goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"))

runTests(t, func(t *testing.T, match FileMatch) {
logDir := t.TempDir()

// File that will be tailed
dstFile := filepath.Join(logDir, "UTF-16_LE.txt")

// Expected lines from the original file
originalExpectedLines := []string{
"Log entry 1",
"Log entry 2",
"Log entry 3",
"Log entry 4",
"Log entry 5",
}

// Expected lines from the rotated file
rotatedExpectedLines := []string{
"New log entry A",
"New log entry B",
"New log entry C",
}

// Create original file with UTF-16 LE encoded content
var originalData []byte
for _, line := range originalExpectedLines {
originalData = append(originalData, encodeUTF16LE(line+"\r\n")...)
}
require.NoError(t, os.WriteFile(dstFile, originalData, 0644))

// Lines to append (used both pre and post rotation)
appendLines := []string{
"Appended entry X",
}

opts := component.Options{
Logger: util.TestAlloyLogger(t),
Registerer: prometheus.NewRegistry(),
OnStateChange: func(e component.Exports) {},
DataPath: t.TempDir(),
}

ch1 := loki.NewLogsReceiver()
args := Arguments{
Targets: []discovery.Target{discovery.NewTargetFromMap(map[string]string{
"__path__": dstFile,
"source": "rotation_test",
})},
FileMatch: match,
Encoding: "UTF-16LE",
ForwardTo: []loki.LogsReceiver{ch1},
}

// Create and run the component
c, err := New(opts, args)
require.NoError(t, err)

ctx, cancel := context.WithCancel(componenttest.TestContext(t))
defer cancel()

var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
err := c.Run(ctx)
require.NoError(t, err)
}()

expectedLabelSet := model.LabelSet{
"filename": model.LabelValue(dstFile),
"source": "rotation_test",
}

// Helper function to collect log lines
collectLines := func(expectedCount int, timeout time.Duration) []string {
receivedLines := make([]string, 0)
timeoutCh := time.After(timeout)
for len(receivedLines) < expectedCount {
select {
case logEntry := <-ch1.Chan():
require.Equal(t, expectedLabelSet, logEntry.Labels)
receivedLines = append(receivedLines, logEntry.Line)
t.Logf("Received log line %d: %q", len(receivedLines), logEntry.Line)
case <-timeoutCh:
t.Logf("Timeout reached, received %d log lines total", len(receivedLines))
return receivedLines
}
}
return receivedLines
}

// Step 1: Read the original file content
receivedLines := collectLines(len(originalExpectedLines), 10*time.Second)
require.Len(t, receivedLines, len(originalExpectedLines), "should receive all original lines")
for i := range originalExpectedLines {
require.Equal(t, originalExpectedLines[i], receivedLines[i], "original log line %d should match", i+1)
}

// Step 2: Append lines before rotation
t.Log("Appending lines before rotation")
f, err := os.OpenFile(dstFile, os.O_APPEND|os.O_WRONLY, 0644)
require.NoError(t, err)
for _, line := range appendLines {
content := line + "\r\n"
_, err = f.Write(encodeUTF16LE(content))
require.NoError(t, err)
}
require.NoError(t, f.Sync())
require.NoError(t, f.Close())

// Collect the pre-rotation appended lines
receivedLines = collectLines(len(appendLines), 10*time.Second)
require.Len(t, receivedLines, len(appendLines), "should receive all pre-rotation appended lines")
for i := range appendLines {
require.Equal(t, appendLines[i], receivedLines[i], "pre-rotation appended log line %d should match", i+1)
}

// Step 3: Replace the file with rotated content (simulating log rotation)
var rotatedData []byte
for _, line := range rotatedExpectedLines {
rotatedData = append(rotatedData, encodeUTF16LE(line+"\r\n")...)
}
t.Log("Replacing file with rotated content")
require.NoError(t, os.WriteFile(dstFile, rotatedData, 0644))

// Collect the rotated file lines
receivedLines = collectLines(len(rotatedExpectedLines), 10*time.Second)
require.Len(t, receivedLines, len(rotatedExpectedLines), "should receive all rotated lines")
for i := range rotatedExpectedLines {
require.Equal(t, rotatedExpectedLines[i], receivedLines[i], "rotated log line %d should match", i+1)
}

// Step 4: Append more lines after rotation
t.Log("Appending lines after rotation")
f, err = os.OpenFile(dstFile, os.O_APPEND|os.O_WRONLY, 0644)
require.NoError(t, err)
for _, line := range appendLines {
content := line + "\r\n"
_, err = f.Write(encodeUTF16LE(content))
require.NoError(t, err)
}
require.NoError(t, f.Sync())
require.NoError(t, f.Close())

// Collect the post-rotation appended lines
receivedLines = collectLines(len(appendLines), 10*time.Second)
require.Len(t, receivedLines, len(appendLines), "should receive all post-rotation appended lines")
for i := range appendLines {
require.Equal(t, appendLines[i], receivedLines[i], "post-rotation appended log line %d should match", i+1)
}

// Shut down the component
cancel()
wg.Wait()
})
}

// encodeUTF16LE encodes a string to UTF-16 LE bytes
func encodeUTF16LE(s string) []byte {
encoder := unicode.UTF16(unicode.LittleEndian, unicode.IgnoreBOM).NewEncoder()
encoded, _ := encoder.Bytes([]byte(s))
return encoded
}

func TestDeleteRecreateFile(t *testing.T) {
defer goleak.VerifyNone(t, goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"))

Expand Down
31 changes: 26 additions & 5 deletions internal/component/loki/source/file/internal/tail/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,19 +149,30 @@ func (f *File) Stop() error {
// wait blocks until a file event is detected (modification, truncation, or deletion).
// Returns an error if the context is canceled or an unrecoverable error occurs.
func (f *File) wait(partial bool) error {
offset, err := f.offset()
// Use file size instead of calculated offset for blockUntilEvent.
// This is important when using a decoder (e.g., UTF-16) because the offset()
// calculation mixes raw file bytes with decoded bytes, which are incompatible
// units and can cause incorrect comparisons.
fi, err := f.file.Stat()
if err != nil {
return err
}
fileSize := fi.Size()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure this is correct. We used offset because we are interested to get events from where we last read.

If new data has come in between us trying to read a line and calling wait we would miss that.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you, I think you're right 🤔 I'll see if I can get offset() to only use raw bytes.


event, err := blockUntilEvent(f.ctx, f.file, offset, f.cfg)
event, err := blockUntilEvent(f.ctx, f.file, fileSize, f.cfg)
switch event {
case eventModified:
level.Debug(f.logger).Log("msg", "file modified")
if partial {
// We need to reset to last successful offset because we consumed a partial line.
// TODO: Return error from Seek()?
Comment thread
blewis12 marked this conversation as resolved.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's fine to return an error here!

f.file.Seek(f.lastOffset, io.SeekStart)
f.reader.Reset(f.file)
f.resetReader()
} else if f.cfg.Decoder != nil {
// Reset the reader when a decoder is configured. Once a decoder's underlying
// transform.Reader returns EOF, it won't read more data even if new content
// is appended. Resetting creates a fresh decoder that can read the new content.
f.resetReader()
}
return nil
case eventTruncated:
Expand All @@ -186,9 +197,11 @@ func (f *File) wait(partial bool) error {
// The newline and any trailing carriage return (for Windows line endings) are stripped.
func (f *File) readLine() (string, error) {
line, err := f.reader.ReadString('\n')

if err != nil {
return line, err
}

return strings.TrimRight(line, "\r\n"), err
}

Expand All @@ -200,7 +213,7 @@ func (f *File) drain() {
if _, err := f.file.Seek(f.lastOffset, io.SeekStart); err != nil {
return
}
f.reader.Reset(f.file)
f.resetReader()

for {
text, err := f.readLine()
Expand Down Expand Up @@ -298,13 +311,21 @@ func (f *File) reopen(truncated bool) error {
}

f.file = file
f.reader.Reset(f.file)
f.resetReader()
break
}

return backoff.Err()
}

// resetReader recreates the reader chain, preserving the decoder if one is configured.
// This is necessary when seeking to a new position in the file, as the decoder may have
// internal state that needs to be reset. Simply calling bufio.Reader.Reset() would
// bypass the decoder and read raw bytes instead of decoded content.
func (f *File) resetReader() {
f.reader = newReader(f.file, f.cfg)
Comment thread
blewis12 marked this conversation as resolved.
}

func newReader(f *os.File, cfg *Config) *bufio.Reader {
if cfg.Decoder != nil {
return bufio.NewReader(cfg.Decoder.Reader(f))
Expand Down
Loading