Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 12 additions & 10 deletions internal/component/loki/source/docker/tailer.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,19 +242,15 @@ func (t *tailer) processLoop(ctx context.Context, tty bool, reader io.ReadCloser

// extractTsFromBytes parses an RFC3339Nano timestamp from the byte slice.
func extractTsFromBytes(line []byte) (time.Time, []byte, error) {
const timestampLayout = "2006-01-02T15:04:05.999999999Z07:00"

spaceIdx := bytes.IndexByte(line, ' ')
if spaceIdx == -1 || spaceIdx >= len(line)-1 {
if spaceIdx == -1 || spaceIdx >= len(line) {
return time.Time{}, nil, fmt.Errorf("could not find timestamp in bytes")
}

// The unsafe.String is used here to avoid allocation and string conversion when parsing the timestamp
// This is safe because:
// 1. spaceIdx > 0 and spaceIdx < len(line)-1 is guaranteed by the check above
// 2. time.Parse doesn't retain the string after returning
// 3. The underlying bytes aren't modified during parsing
ts, err := time.Parse(timestampLayout, unsafe.String(&line[0], spaceIdx))
// The unsafe.String is used here to avoid allocation and string conversion when parsing the timestamp.
// This is safe because time.Parse doesn't retain the string after returning and
// the underlying bytes aren't modified during parsing.
ts, err := time.Parse(time.RFC3339Nano, unsafe.String(&line[0], spaceIdx))
if err != nil {
return time.Time{}, nil, fmt.Errorf("could not parse timestamp: %w", err)
}
Expand All @@ -264,8 +260,9 @@ func extractTsFromBytes(line []byte) (time.Time, []byte, error) {
func (t *tailer) process(r io.Reader, logStreamLset model.LabelSet) {
defer t.wg.Done()

scanner := bufio.NewScanner(r)
const maxCapacity = dockerMaxChunkSize * 64

scanner := bufio.NewScanner(r)
buf := make([]byte, 0, maxCapacity)
scanner.Buffer(buf, maxCapacity)
for scanner.Scan() {
Expand All @@ -278,6 +275,11 @@ func (t *tailer) process(r io.Reader, logStreamLset model.LabelSet) {
continue
}

if len(content) == 0 {
level.Debug(t.logger).Log("msg", "empty log, skipping line")
continue
}

t.recv.Chan() <- loki.Entry{
Labels: logStreamLset,
Entry: push.Entry{
Expand Down
75 changes: 75 additions & 0 deletions internal/component/loki/source/docker/tailer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (

"github.com/docker/docker/api/types/container"
"github.com/docker/docker/client"
"github.com/docker/docker/pkg/stdcopy"
"github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
Expand Down Expand Up @@ -216,6 +217,61 @@ func TestTailerNeverStarted(t *testing.T) {
require.NotPanics(t, func() { cancel() })
}

var _ io.ReadCloser = (*stringReader)(nil)

func newStringReader(s string) *stringReader {
return &stringReader{Reader: strings.NewReader(s)}
}

type stringReader struct {
*strings.Reader
}

func (s *stringReader) Close() error {
return nil
}

func TestTailerConsumeLines(t *testing.T) {
t.Run("skip empty line", func(t *testing.T) {
collector := loki.NewCollectingHandler()
tailer := &tailer{
logger: log.NewNopLogger(),
recv: collector.Receiver(),
positions: positions.NewNop(),
containerID: "test",
metrics: newMetrics(prometheus.DefaultRegisterer),
running: true,
wg: sync.WaitGroup{},
last: atomic.NewInt64(0),
since: atomic.NewInt64(0),
componentStopping: func() bool { return false },
}

bb := &bytes.Buffer{}
writer := stdcopy.NewStdWriter(bb, stdcopy.Stdout)
_, err := writer.Write([]byte("2023-12-09T12:00:00.000000000Z \n2023-12-09T12:00:00.000000000Z line\n"))
require.NoError(t, err)

tailer.wg.Add(3)
go func() {
tailer.processLoop(t.Context(), false, newStringReader(bb.String()))
}()

require.Eventually(t, func() bool {
return len(collector.Received()) == 1
}, 2*time.Second, 50*time.Millisecond)

entry := collector.Received()[0]

expectedLine := "line"
expectedTimestamp, err := time.Parse(time.RFC3339Nano, "2023-12-09T12:00:00.000000000Z")
require.NoError(t, err)

require.Equal(t, expectedLine, entry.Line)
require.Equal(t, expectedTimestamp, entry.Timestamp)
})
}

func TestChunkWriter(t *testing.T) {
logger := log.NewNopLogger()
var buf bytes.Buffer
Expand Down Expand Up @@ -254,6 +310,25 @@ func TestChunkWriter(t *testing.T) {
assert.Equal(t, expected, buf.Bytes())
}

func TestExtractTsFromBytes(t *testing.T) {
t.Run("invalid timestamp", func(t *testing.T) {
_, _, err := extractTsFromBytes([]byte("123 test\n"))
require.Error(t, err)
})

t.Run("valid timestamp empty line", func(t *testing.T) {
ts, _, err := extractTsFromBytes([]byte("2024-05-02T13:11:55.879889Z \n"))
require.NoError(t, err)
expectedTs, err := time.Parse(time.RFC3339Nano, "2024-05-02T13:11:55.879889Z")
require.NoError(t, err)
require.Equal(t, expectedTs, ts)
})
t.Run("valid timestamp no space", func(t *testing.T) {
_, _, err := extractTsFromBytes([]byte("2024-05-02T13:11:55.879889Z\n"))
require.Error(t, err)
})
}

func newDockerServer(t *testing.T) *httptest.Server {
h := func(w http.ResponseWriter, r *http.Request) {
path := r.URL.Path
Expand Down
Loading