diff --git a/docs/sources/reference/components/loki/loki.process.md b/docs/sources/reference/components/loki/loki.process.md index 20c9068fbb6..cf26f176853 100644 --- a/docs/sources/reference/components/loki/loki.process.md +++ b/docs/sources/reference/components/loki/loki.process.md @@ -2059,6 +2059,8 @@ The following fields are exported and can be referenced by other components: * `loki_process_dropped_lines_total` (counter): Number of lines dropped as part of a processing stage. * `loki_process_dropped_lines_by_label_total` (counter): Number of lines dropped when `by_label_name` is non-empty in [stage.limit][]. * `loki_process_truncated_fields_total` (counter): Number of lines, label values, extracted field values, and structured metadata values truncated as part of a `truncate` stage. +* `loki_process_cri_partial_lines_flushed_total` (counter): Number of partial lines flushed prematurely due to `max_partial_lines` limit being exceeded in [stage.cri][]. +* `loki_process_cri_lines_truncated_total` (counter): Number of lines truncated due to `max_partial_line_size` limit in [stage.cri][]. ## Example diff --git a/internal/component/loki/process/stages/cri.go b/internal/component/loki/process/stages/cri.go index c7e680aaadc..2cb82cd0c69 100644 --- a/internal/component/loki/process/stages/cri.go +++ b/internal/component/loki/process/stages/cri.go @@ -12,6 +12,7 @@ import ( crip "github.com/grafana/alloy/internal/component/loki/process/stages/cri" "github.com/grafana/alloy/internal/featuregate" "github.com/grafana/alloy/internal/runtime/logging/level" + "github.com/grafana/alloy/internal/util" "github.com/grafana/alloy/syntax" ) @@ -49,20 +50,42 @@ func (args *CRIConfig) Validate() error { return nil } -func NewCRI(logger log.Logger, cfg CRIConfig, _ prometheus.Registerer, _ featuregate.Stability) (Stage, error) { +func NewCRI(logger log.Logger, cfg CRIConfig, registerer prometheus.Registerer, _ featuregate.Stability) (Stage, error) { + partialLinesFlushedMetric := getPartialLinesFlushedMetric(registerer) + linesTruncatedMetric := getLinesTruncatedMetric(registerer) return &cri{ - logger: logger, - cfg: cfg, - partialLines: make(map[model.Fingerprint]Entry, cfg.MaxPartialLines), + logger: logger, + cfg: cfg, + partialLines: make(map[model.Fingerprint]Entry, cfg.MaxPartialLines), + partialLinesFlushedMetric: partialLinesFlushedMetric, + linesTruncatedMetric: linesTruncatedMetric, }, nil } +func getPartialLinesFlushedMetric(registerer prometheus.Registerer) prometheus.Counter { + metric := prometheus.NewCounter(prometheus.CounterOpts{ + Name: "loki_process_cri_partial_lines_flushed_total", + Help: "A count of partial lines that were flushed prematurely due to the max_partial_lines limit being exceeded", + }) + return util.MustRegisterOrGet(registerer, metric).(prometheus.Counter) +} + +func getLinesTruncatedMetric(registerer prometheus.Registerer) prometheus.Counter { + metric := prometheus.NewCounter(prometheus.CounterOpts{ + Name: "loki_process_cri_lines_truncated_total", + Help: "A count of lines that were truncated due to the max_partial_line_size limit", + }) + return util.MustRegisterOrGet(registerer, metric).(prometheus.Counter) +} + var _ Stage = (*cri)(nil) type cri struct { - logger log.Logger - cfg CRIConfig - partialLines map[model.Fingerprint]Entry + logger log.Logger + cfg CRIConfig + partialLines map[model.Fingerprint]Entry + partialLinesFlushedMetric prometheus.Counter + linesTruncatedMetric prometheus.Counter } const ( @@ -103,6 +126,9 @@ func (c *cri) Run(in chan Entry) chan Entry { if parsed.Flag == crip.FlagPartial { if len(c.partialLines) >= c.cfg.MaxPartialLines { level.Warn(c.logger).Log("msg", "cri stage: partial lines upperbound exceeded. merging it to single line", "threshold", c.cfg.MaxPartialLines) + if c.partialLinesFlushedMetric != nil { + c.partialLinesFlushedMetric.Add(float64(len(c.partialLines))) + } // Merge existing partialLines entries := make([]Entry, 0, len(c.partialLines)) @@ -151,6 +177,9 @@ func (c *cri) Run(in chan Entry) chan Entry { func (c *cri) ensureTruncateIfRequired(e *Entry) { if c.cfg.MaxPartialLineSizeTruncate && len(e.Line) > int(c.cfg.MaxPartialLineSize) { e.Line = e.Line[:c.cfg.MaxPartialLineSize] + if c.linesTruncatedMetric != nil { + c.linesTruncatedMetric.Inc() + } } } diff --git a/internal/component/loki/process/stages/cri_test.go b/internal/component/loki/process/stages/cri_test.go index 16d6710b933..95f0fc07397 100644 --- a/internal/component/loki/process/stages/cri_test.go +++ b/internal/component/loki/process/stages/cri_test.go @@ -1,11 +1,14 @@ package stages import ( + "fmt" + "strings" "testing" "time" "github.com/go-kit/log" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/testutil" "github.com/prometheus/common/model" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -92,12 +95,14 @@ func TestCRI_tags(t *testing.T) { } type testCase struct { - name string - expected []string - maxPartialLines int - maxPartialLineSize uint64 - maxPartialLineSizeTruncate bool - entries []testEntry + name string + expected []string + maxPartialLines int + maxPartialLineSize uint64 + maxPartialLineSizeTruncate bool + entries []testEntry + expectedPartialLinesFlushed int // expected value of the partial lines flushed metric + expectedLinesTruncated int // expected value of the lines truncated metric } cases := []testCase{ @@ -108,7 +113,9 @@ func TestCRI_tags(t *testing.T) { {line: "2019-05-07T18:57:50.904275087+00:00 stdout F some full line", labels: model.LabelSet{"foo": "bar"}}, {line: "2019-05-07T18:57:55.904275087+00:00 stdout F log", labels: model.LabelSet{"foo": "bar"}}, }, - expected: []string{"some full line", "log"}, + expected: []string{"some full line", "log"}, + expectedPartialLinesFlushed: 0, + expectedLinesTruncated: 0, }, { name: "tag P multi-stream", @@ -123,6 +130,8 @@ func TestCRI_tags(t *testing.T) { "partial line 1 log finished", // belongs to stream `{foo="bar"}` "partial line 2 another full log", // belongs to stream `{foo="bar2"} }, + expectedPartialLinesFlushed: 0, + expectedLinesTruncated: 0, }, { name: "tag P multi-stream with maxPartialLines exceeded", @@ -145,6 +154,8 @@ func TestCRI_tags(t *testing.T) { "another full log", "partial line 5 yet an another full log", }, + expectedPartialLinesFlushed: 3, // 3 partial lines were flushed when limit was exceeded + expectedLinesTruncated: 0, }, { name: "tag P single stream", @@ -161,6 +172,8 @@ func TestCRI_tags(t *testing.T) { "partial line 1 partial line 2 partial line 3 partial line 4 log finished", "another full log", }, + expectedPartialLinesFlushed: 0, // single stream, no flush due to limit (partial lines merge within same stream) + expectedLinesTruncated: 0, }, { name: "tag P multi-stream with truncation", @@ -177,17 +190,20 @@ func TestCRI_tags(t *testing.T) { "partial lin", "partialfull", }, + expectedPartialLinesFlushed: 0, + expectedLinesTruncated: 2, // 2 lines were truncated due to max_partial_line_size }, } for _, tt := range cases { t.Run(tt.name, func(t *testing.T) { + registry := prometheus.NewRegistry() cfg := CRIConfig{ MaxPartialLines: tt.maxPartialLines, MaxPartialLineSize: tt.maxPartialLineSize, MaxPartialLineSizeTruncate: tt.maxPartialLineSizeTruncate, } - p, err := NewCRI(log.NewNopLogger(), cfg, prometheus.DefaultRegisterer, featuregate.StabilityGenerallyAvailable) + p, err := NewCRI(log.NewNopLogger(), cfg, registry, featuregate.StabilityGenerallyAvailable) require.NoError(t, err) got := make([]string, 0) @@ -212,6 +228,17 @@ func TestCRI_tags(t *testing.T) { } assert.Equal(t, expectedMap, gotMap) + + // Verify the metrics + expectedMetrics := fmt.Sprintf(` +# HELP loki_process_cri_lines_truncated_total A count of lines that were truncated due to the max_partial_line_size limit +# TYPE loki_process_cri_lines_truncated_total counter +loki_process_cri_lines_truncated_total %d +# HELP loki_process_cri_partial_lines_flushed_total A count of partial lines that were flushed prematurely due to the max_partial_lines limit being exceeded +# TYPE loki_process_cri_partial_lines_flushed_total counter +loki_process_cri_partial_lines_flushed_total %d +`, tt.expectedLinesTruncated, tt.expectedPartialLinesFlushed) + require.NoError(t, testutil.GatherAndCompare(registry, strings.NewReader(expectedMetrics))) }) } } diff --git a/internal/component/loki/process/stages/match.go b/internal/component/loki/process/stages/match.go index b9c61d3bdc5..da51cbc43e6 100644 --- a/internal/component/loki/process/stages/match.go +++ b/internal/component/loki/process/stages/match.go @@ -104,6 +104,8 @@ func getDropCountMetric(registerer prometheus.Registerer) *prometheus.CounterVec }, []string{"reason"}) err := registerer.Register(dropCount) if err != nil { + // TODO: This code should neither panic nor use AlreadyRegisteredError. + // Register it without these, and return error if it fails. if existing, ok := err.(prometheus.AlreadyRegisteredError); ok { dropCount = existing.ExistingCollector.(*prometheus.CounterVec) } else {