diff --git a/internal/component/loki/source/docker/tailer.go b/internal/component/loki/source/docker/tailer.go index 05b6df2a200..6c2e547fbf2 100644 --- a/internal/component/loki/source/docker/tailer.go +++ b/internal/component/loki/source/docker/tailer.go @@ -242,19 +242,15 @@ func (t *tailer) processLoop(ctx context.Context, tty bool, reader io.ReadCloser // extractTsFromBytes parses an RFC3339Nano timestamp from the byte slice. func extractTsFromBytes(line []byte) (time.Time, []byte, error) { - const timestampLayout = "2006-01-02T15:04:05.999999999Z07:00" - spaceIdx := bytes.IndexByte(line, ' ') - if spaceIdx == -1 || spaceIdx >= len(line)-1 { + if spaceIdx == -1 || spaceIdx >= len(line) { return time.Time{}, nil, fmt.Errorf("could not find timestamp in bytes") } - // The unsafe.String is used here to avoid allocation and string conversion when parsing the timestamp - // This is safe because: - // 1. spaceIdx > 0 and spaceIdx < len(line)-1 is guaranteed by the check above - // 2. time.Parse doesn't retain the string after returning - // 3. The underlying bytes aren't modified during parsing - ts, err := time.Parse(timestampLayout, unsafe.String(&line[0], spaceIdx)) + // The unsafe.String is used here to avoid allocation and string conversion when parsing the timestamp. + // This is safe because time.Parse doesn't retain the string after returning and + // the underlying bytes aren't modified during parsing. + ts, err := time.Parse(time.RFC3339Nano, unsafe.String(&line[0], spaceIdx)) if err != nil { return time.Time{}, nil, fmt.Errorf("could not parse timestamp: %w", err) } @@ -264,8 +260,9 @@ func extractTsFromBytes(line []byte) (time.Time, []byte, error) { func (t *tailer) process(r io.Reader, logStreamLset model.LabelSet) { defer t.wg.Done() - scanner := bufio.NewScanner(r) const maxCapacity = dockerMaxChunkSize * 64 + + scanner := bufio.NewScanner(r) buf := make([]byte, 0, maxCapacity) scanner.Buffer(buf, maxCapacity) for scanner.Scan() { @@ -278,6 +275,11 @@ func (t *tailer) process(r io.Reader, logStreamLset model.LabelSet) { continue } + if len(content) == 0 { + level.Debug(t.logger).Log("msg", "empty log, skipping line") + continue + } + t.recv.Chan() <- loki.Entry{ Labels: logStreamLset, Entry: push.Entry{ diff --git a/internal/component/loki/source/docker/tailer_test.go b/internal/component/loki/source/docker/tailer_test.go index 86aff4d4d7f..965ed07548e 100644 --- a/internal/component/loki/source/docker/tailer_test.go +++ b/internal/component/loki/source/docker/tailer_test.go @@ -19,6 +19,7 @@ import ( "github.com/docker/docker/api/types/container" "github.com/docker/docker/client" + "github.com/docker/docker/pkg/stdcopy" "github.com/go-kit/log" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" @@ -216,6 +217,61 @@ func TestTailerNeverStarted(t *testing.T) { require.NotPanics(t, func() { cancel() }) } +var _ io.ReadCloser = (*stringReader)(nil) + +func newStringReader(s string) *stringReader { + return &stringReader{Reader: strings.NewReader(s)} +} + +type stringReader struct { + *strings.Reader +} + +func (s *stringReader) Close() error { + return nil +} + +func TestTailerConsumeLines(t *testing.T) { + t.Run("skip empty line", func(t *testing.T) { + collector := loki.NewCollectingHandler() + tailer := &tailer{ + logger: log.NewNopLogger(), + recv: collector.Receiver(), + positions: positions.NewNop(), + containerID: "test", + metrics: newMetrics(prometheus.DefaultRegisterer), + running: true, + wg: sync.WaitGroup{}, + last: atomic.NewInt64(0), + since: atomic.NewInt64(0), + componentStopping: func() bool { return false }, + } + + bb := &bytes.Buffer{} + writer := stdcopy.NewStdWriter(bb, stdcopy.Stdout) + _, err := writer.Write([]byte("2023-12-09T12:00:00.000000000Z \n2023-12-09T12:00:00.000000000Z line\n")) + require.NoError(t, err) + + tailer.wg.Add(3) + go func() { + tailer.processLoop(t.Context(), false, newStringReader(bb.String())) + }() + + require.Eventually(t, func() bool { + return len(collector.Received()) == 1 + }, 2*time.Second, 50*time.Millisecond) + + entry := collector.Received()[0] + + expectedLine := "line" + expectedTimestamp, err := time.Parse(time.RFC3339Nano, "2023-12-09T12:00:00.000000000Z") + require.NoError(t, err) + + require.Equal(t, expectedLine, entry.Line) + require.Equal(t, expectedTimestamp, entry.Timestamp) + }) +} + func TestChunkWriter(t *testing.T) { logger := log.NewNopLogger() var buf bytes.Buffer @@ -254,6 +310,25 @@ func TestChunkWriter(t *testing.T) { assert.Equal(t, expected, buf.Bytes()) } +func TestExtractTsFromBytes(t *testing.T) { + t.Run("invalid timestamp", func(t *testing.T) { + _, _, err := extractTsFromBytes([]byte("123 test\n")) + require.Error(t, err) + }) + + t.Run("valid timestamp empty line", func(t *testing.T) { + ts, _, err := extractTsFromBytes([]byte("2024-05-02T13:11:55.879889Z \n")) + require.NoError(t, err) + expectedTs, err := time.Parse(time.RFC3339Nano, "2024-05-02T13:11:55.879889Z") + require.NoError(t, err) + require.Equal(t, expectedTs, ts) + }) + t.Run("valid timestamp no space", func(t *testing.T) { + _, _, err := extractTsFromBytes([]byte("2024-05-02T13:11:55.879889Z\n")) + require.Error(t, err) + }) +} + func newDockerServer(t *testing.T) *httptest.Server { h := func(w http.ResponseWriter, r *http.Request) { path := r.URL.Path