diff --git a/internal/component/loki/process/stages/multiline.go b/internal/component/loki/process/stages/multiline.go index 429694ac838..a3c09a5c617 100644 --- a/internal/component/loki/process/stages/multiline.go +++ b/internal/component/loki/process/stages/multiline.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "regexp" + "slices" "strings" "sync" "time" @@ -12,7 +13,9 @@ import ( "github.com/go-kit/log" "github.com/prometheus/common/model" + "github.com/grafana/alloy/internal/component/common/loki" "github.com/grafana/alloy/internal/runtime/logging/level" + "github.com/grafana/loki/pkg/push" ) // Configuration errors. @@ -78,9 +81,9 @@ type multilineState struct { } func (s *multilineState) Reset() { + // We don't reset startLineEntry here to keep old behaviour. s.buffer.Reset() s.currentLines = 0 - s.startLineEntry = Entry{} } // newMultilineStage creates a MulitlineStage from config @@ -193,11 +196,23 @@ func (m *multilineStage) flush(out chan Entry, s *multilineState) { return } - entry := s.startLineEntry - entry.Line = s.buffer.String() + // copy extracted data. + extracted := make(map[string]any, len(s.startLineEntry.Extracted)) + for k, v := range s.startLineEntry.Extracted { + extracted[k] = v + } + collapsed := Entry{ + Extracted: extracted, + Entry: loki.NewEntryWithCreatedUnixMicro(s.startLineEntry.Entry.Labels.Clone(), s.startLineEntry.Created(), push.Entry{ + Timestamp: s.startLineEntry.Entry.Entry.Timestamp, + Line: s.buffer.String(), + StructuredMetadata: slices.Clone(s.startLineEntry.Entry.Entry.StructuredMetadata), + }), + } s.Reset() - out <- entry + + out <- collapsed } // Cleanup implements Stage. diff --git a/internal/component/loki/process/stages/multiline_test.go b/internal/component/loki/process/stages/multiline_test.go index eac42aa1310..58a53e5c73d 100644 --- a/internal/component/loki/process/stages/multiline_test.go +++ b/internal/component/loki/process/stages/multiline_test.go @@ -6,16 +6,15 @@ import ( "testing" "time" + "github.com/go-kit/log" "github.com/grafana/loki/pkg/push" "github.com/prometheus/common/model" "github.com/stretchr/testify/require" "github.com/grafana/alloy/internal/component/common/loki" - "github.com/grafana/alloy/internal/util" ) func TestMultilineStageProcess(t *testing.T) { - logger := util.TestAlloyLogger(t) mcfg := MultilineConfig{Expression: "^START", MaxWaitTime: 3 * time.Second, TrimNewlines: true} regex, err := validateMultilineConfig(mcfg) require.NoError(t, err) @@ -23,7 +22,7 @@ func TestMultilineStageProcess(t *testing.T) { stage := &multilineStage{ cfg: mcfg, regex: regex, - logger: logger, + logger: log.NewNopLogger(), } out := processEntries(stage, @@ -43,7 +42,6 @@ func TestMultilineStageProcess(t *testing.T) { } func TestMultilineStageMultiStreams(t *testing.T) { - logger := util.TestAlloyLogger(t) mcfg := MultilineConfig{Expression: "^START", MaxWaitTime: 3 * time.Second, TrimNewlines: true} regex, err := validateMultilineConfig(mcfg) require.NoError(t, err) @@ -51,7 +49,7 @@ func TestMultilineStageMultiStreams(t *testing.T) { stage := &multilineStage{ cfg: mcfg, regex: regex, - logger: logger, + logger: log.NewNopLogger(), } out := processEntries(stage, @@ -84,7 +82,6 @@ func TestMultilineStageMultiStreams(t *testing.T) { } func TestMultilineStageProcessLeaveNewlines(t *testing.T) { - logger := util.TestAlloyLogger(t) mcfg := MultilineConfig{Expression: "^START", MaxWaitTime: 3 * time.Second, TrimNewlines: false} regex, err := validateMultilineConfig(mcfg) require.NoError(t, err) @@ -92,7 +89,7 @@ func TestMultilineStageProcessLeaveNewlines(t *testing.T) { stage := &multilineStage{ cfg: mcfg, regex: regex, - logger: logger, + logger: log.NewNopLogger(), } out := processEntries(stage, @@ -112,7 +109,6 @@ func TestMultilineStageProcessLeaveNewlines(t *testing.T) { } func TestMultilineStageMaxWaitTime(t *testing.T) { - logger := util.TestAlloyLogger(t) mcfg := MultilineConfig{Expression: "^START", MaxWaitTime: 100 * time.Millisecond, TrimNewlines: true} regex, err := validateMultilineConfig(mcfg) require.NoError(t, err) @@ -120,7 +116,7 @@ func TestMultilineStageMaxWaitTime(t *testing.T) { stage := &multilineStage{ cfg: mcfg, regex: regex, - logger: logger, + logger: log.NewNopLogger(), } in := make(chan Entry, 2) @@ -156,6 +152,50 @@ func TestMultilineStageMaxWaitTime(t *testing.T) { require.Equal(t, "not a start line hitting timeout", res[1].Line) } +func TestMultilineStageStartLineFlushedBeforeNew(t *testing.T) { + mcfg := MultilineConfig{ + Expression: "^START", + MaxLines: 2, + MaxWaitTime: 3 * time.Second, + TrimNewlines: true, + } + regex, err := validateMultilineConfig(mcfg) + require.NoError(t, err) + + stage := &multilineStage{ + cfg: mcfg, + regex: regex, + logger: log.NewNopLogger(), + } + + startTs := time.Now() + lset := model.LabelSet{"value": "label"} + + out := processEntries(stage, + Entry{ + Extracted: map[string]any{}, + Entry: loki.NewEntry(lset.Clone(), push.Entry{Timestamp: startTs, Line: "START line 1"}), + }, + Entry{ + Extracted: map[string]any{}, + Entry: loki.NewEntry(lset.Clone(), push.Entry{Timestamp: startTs.Add(1 * time.Second), Line: "continuation line 1"}), + }, + Entry{ + Extracted: map[string]any{}, + Entry: loki.NewEntry(lset.Clone(), push.Entry{Timestamp: startTs.Add(2 * time.Second), Line: "continuation line 2"}), + }, + ) + + require.Len(t, out, 2) + require.Equal(t, lset, out[0].Labels) + require.Equal(t, startTs, out[0].Timestamp) + require.Equal(t, "START line 1\ncontinuation line 1", out[0].Line) + + require.Equal(t, lset, out[1].Labels) + require.Equal(t, startTs, out[1].Timestamp) + require.Equal(t, "continuation line 2", out[1].Line) +} + func simpleEntry(line, label string) Entry { // We're adding a small wait time here, because on Windows, timers have a // smaller resolution than on Linux. This can mess with the ordering of log @@ -174,7 +214,6 @@ func simpleEntry(line, label string) Entry { } func TestMultilineStageKeepingStructuredMetadata(t *testing.T) { - logger := util.TestAlloyLogger(t) mcfg := MultilineConfig{Expression: "^START", MaxWaitTime: 3 * time.Second, TrimNewlines: true} regex, err := validateMultilineConfig(mcfg) require.NoError(t, err) @@ -182,7 +221,7 @@ func TestMultilineStageKeepingStructuredMetadata(t *testing.T) { stage := &multilineStage{ cfg: mcfg, regex: regex, - logger: logger, + logger: log.NewNopLogger(), } line1 := Entry{