Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 17 additions & 9 deletions internal/component/loki/process/stages/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,28 +38,36 @@ const (
)

func (d *DockerStage) Process(labels model.LabelSet, extracted map[string]any, t *time.Time, entry *string) {
var log DockerLog

if err := json.Unmarshal([]byte(*entry), &log); err != nil {
var parsed DockerLog
if err := json.Unmarshal([]byte(*entry), &parsed); err != nil {
if Debug {
level.Debug(d.logger).Log("msg", "failed to parse docker log", "err", err)
}
return
}

// NOTE: json.Unmarshal will happily parse any JSON and produce a zero-value struct.
// To protect against incorrect usage, validate that the log field is present.
if parsed.Log == "" {
if Debug {
level.Debug(d.logger).Log("msg", "not valid docker format")
}
return
}

// NOTE: Previous implementation used a "sub-pipeline"
// to parse docker logs where the json stage added these fields
// as "extracted" values so the other stages could operate on them.
// We don't need this anymore but it would be a breaking change to
// no longer set these.
extracted[dockerOutput] = log.Log
extracted[dockerStream] = log.Stream
extracted[dockerTimestamp] = log.Time
extracted[dockerOutput] = parsed.Log
extracted[dockerStream] = parsed.Stream
extracted[dockerTimestamp] = parsed.Time

*entry = log.Log
labels["stream"] = model.LabelValue(log.Stream)
*entry = parsed.Log
labels["stream"] = model.LabelValue(parsed.Stream)

ts, err := time.Parse(time.RFC3339Nano, log.Time)
ts, err := time.Parse(time.RFC3339Nano, parsed.Time)
if err == nil {
*t = ts
}
Expand Down
116 changes: 73 additions & 43 deletions internal/component/loki/process/stages/docker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@ import (
"github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/grafana/alloy/internal/component/common/loki"
"github.com/grafana/alloy/internal/featuregate"
"github.com/grafana/loki/pkg/push"
)

var (
Expand All @@ -22,63 +24,91 @@ var (
)

func TestDocker(t *testing.T) {
loc, err := time.LoadLocation("UTC")
if err != nil {
t.Fatal("could not parse timezone", err)
type testCase struct {
name string
input loki.Entry
expected loki.Entry
}

tests := map[string]struct {
entry string
expectedEntry string
t time.Time
expectedT time.Time
labels map[string]string
expectedLabels map[string]string
}{
"happy path": {
dockerRaw,
dockerProcessed,
time.Now(),
time.Date(2019, 4, 30, 02, 12, 41, 844351500, loc),
map[string]string{},
map[string]string{
"stream": "stderr",
tests := []testCase{
{
name: "happy path",
input: loki.Entry{
Entry: push.Entry{
Line: dockerRaw,
Timestamp: time.Now(),
},
},
expected: loki.Entry{
Labels: model.LabelSet{"stream": "stderr"},
Entry: push.Entry{
Line: dockerProcessed,
Timestamp: time.Date(2019, 4, 30, 02, 12, 41, 844351500, time.UTC),
},
},
},
{
name: "invalid timestamp",
input: loki.Entry{
Entry: push.Entry{
Line: dockerInvalidTimestampRaw,
Timestamp: dockerTestTimeNow,
},
},
expected: loki.Entry{
Labels: model.LabelSet{"stream": "stderr"},
Entry: push.Entry{
Line: "log message\n",
Timestamp: dockerTestTimeNow,
},
},
},
"invalid timestamp": {
dockerInvalidTimestampRaw,
"log message\n",
dockerTestTimeNow,
dockerTestTimeNow,
map[string]string{},
map[string]string{
"stream": "stderr",
{
name: "not json",
input: loki.Entry{
Entry: push.Entry{
Line: "i'm not json!",
Timestamp: dockerTestTimeNow,
},
},
expected: loki.Entry{
Labels: model.LabelSet{},
Entry: push.Entry{
Line: "i'm not json!",
Timestamp: dockerTestTimeNow,
},
},
},
"invalid json": {
"i'm not json!",
"i'm not json!",
dockerTestTimeNow,
dockerTestTimeNow,
map[string]string{},
map[string]string{},
{
name: "json but not docker format",
input: loki.Entry{
Entry: push.Entry{
Line: `{"msg": "test"}`,
Timestamp: dockerTestTimeNow,
},
},
expected: loki.Entry{
Labels: model.LabelSet{},
Entry: push.Entry{
Line: `{"msg": "test"}`,
Timestamp: dockerTestTimeNow,
},
},
},
}

for tName, tt := range tests {
t.Run(tName, func(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
p, err := NewDocker(log.NewNopLogger(), prometheus.DefaultRegisterer, featuregate.StabilityGenerallyAvailable)
if err != nil {
t.Fatalf("failed to create Docker parser: %s", err)
}
out := processEntries(p, newEntry(nil, toLabelSet(tt.labels), tt.entry, tt.t))[0]
out := processEntries(p, newEntry(nil, tt.input.Labels, tt.input.Line, tt.input.Timestamp))[0]

assertLabels(t, tt.expectedLabels, out.Labels)
assert.Equal(t, tt.expectedEntry, out.Line, "did not receive expected log entry")
if out.Timestamp.Unix() != tt.expectedT.Unix() {
t.Fatalf("mismatch ts want: %s got:%s", tt.expectedT, tt.t)
}
require.EqualValues(t, tt.expected.Labels, out.Entry.Labels)
require.Equal(t, tt.expected.Entry.Line, out.Entry.Line)
require.Equal(t, tt.expected.Entry.Timestamp, out.Entry.Timestamp)
})
}
}
Expand Down
Loading