Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/sources/reference/components/loki/loki.process.md
Original file line number Diff line number Diff line change
Expand Up @@ -2059,6 +2059,8 @@ The following fields are exported and can be referenced by other components:
* `loki_process_dropped_lines_total` (counter): Number of lines dropped as part of a processing stage.
* `loki_process_dropped_lines_by_label_total` (counter): Number of lines dropped when `by_label_name` is non-empty in [stage.limit][].
* `loki_process_truncated_fields_total` (counter): Number of lines, label values, extracted field values, and structured metadata values truncated as part of a `truncate` stage.
* `loki_process_cri_partial_lines_flushed_total` (counter): Number of partial lines flushed prematurely due to `max_partial_lines` limit being exceeded in [stage.cri][].
* `loki_process_cri_lines_truncated_total` (counter): Number of lines truncated due to `max_partial_line_size` limit in [stage.cri][].

## Example

Expand Down
43 changes: 36 additions & 7 deletions internal/component/loki/process/stages/cri.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
crip "github.com/grafana/alloy/internal/component/loki/process/stages/cri"
"github.com/grafana/alloy/internal/featuregate"
"github.com/grafana/alloy/internal/runtime/logging/level"
"github.com/grafana/alloy/internal/util"
"github.com/grafana/alloy/syntax"
)

Expand Down Expand Up @@ -49,20 +50,42 @@ func (args *CRIConfig) Validate() error {
return nil
}

func NewCRI(logger log.Logger, cfg CRIConfig, _ prometheus.Registerer, _ featuregate.Stability) (Stage, error) {
func NewCRI(logger log.Logger, cfg CRIConfig, registerer prometheus.Registerer, _ featuregate.Stability) (Stage, error) {
partialLinesFlushedMetric := getPartialLinesFlushedMetric(registerer)
linesTruncatedMetric := getLinesTruncatedMetric(registerer)
return &cri{
logger: logger,
cfg: cfg,
partialLines: make(map[model.Fingerprint]Entry, cfg.MaxPartialLines),
logger: logger,
cfg: cfg,
partialLines: make(map[model.Fingerprint]Entry, cfg.MaxPartialLines),
partialLinesFlushedMetric: partialLinesFlushedMetric,
linesTruncatedMetric: linesTruncatedMetric,
}, nil
}

func getPartialLinesFlushedMetric(registerer prometheus.Registerer) prometheus.Counter {
metric := prometheus.NewCounter(prometheus.CounterOpts{
Name: "loki_process_cri_partial_lines_flushed_total",
Help: "A count of partial lines that were flushed prematurely due to the max_partial_lines limit being exceeded",
})
return util.MustRegisterOrGet(registerer, metric).(prometheus.Counter)
}

func getLinesTruncatedMetric(registerer prometheus.Registerer) prometheus.Counter {
metric := prometheus.NewCounter(prometheus.CounterOpts{
Name: "loki_process_cri_lines_truncated_total",
Help: "A count of lines that were truncated due to the max_partial_line_size limit",
})
return util.MustRegisterOrGet(registerer, metric).(prometheus.Counter)
}

var _ Stage = (*cri)(nil)

type cri struct {
logger log.Logger
cfg CRIConfig
partialLines map[model.Fingerprint]Entry
logger log.Logger
cfg CRIConfig
partialLines map[model.Fingerprint]Entry
partialLinesFlushedMetric prometheus.Counter
linesTruncatedMetric prometheus.Counter
}

const (
Expand Down Expand Up @@ -103,6 +126,9 @@ func (c *cri) Run(in chan Entry) chan Entry {
if parsed.Flag == crip.FlagPartial {
if len(c.partialLines) >= c.cfg.MaxPartialLines {
level.Warn(c.logger).Log("msg", "cri stage: partial lines upperbound exceeded. merging it to single line", "threshold", c.cfg.MaxPartialLines)
if c.partialLinesFlushedMetric != nil {
c.partialLinesFlushedMetric.Add(float64(len(c.partialLines)))
Comment thread
ptodev marked this conversation as resolved.
}

// Merge existing partialLines
entries := make([]Entry, 0, len(c.partialLines))
Expand Down Expand Up @@ -151,6 +177,9 @@ func (c *cri) Run(in chan Entry) chan Entry {
func (c *cri) ensureTruncateIfRequired(e *Entry) {
if c.cfg.MaxPartialLineSizeTruncate && len(e.Line) > int(c.cfg.MaxPartialLineSize) {
e.Line = e.Line[:c.cfg.MaxPartialLineSize]
if c.linesTruncatedMetric != nil {
c.linesTruncatedMetric.Inc()
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I considered adding the log labels as metric labels, to make it easier to identify log streams with long lines. I suspect most of the time it'll be a particular stream. But for now I don't want to make changes that could lead to too many metrics.

}
}
}

Expand Down
43 changes: 35 additions & 8 deletions internal/component/loki/process/stages/cri_test.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
package stages

import (
"fmt"
"strings"
"testing"
"time"

"github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -92,12 +95,14 @@ func TestCRI_tags(t *testing.T) {
}

type testCase struct {
name string
expected []string
maxPartialLines int
maxPartialLineSize uint64
maxPartialLineSizeTruncate bool
entries []testEntry
name string
expected []string
maxPartialLines int
maxPartialLineSize uint64
maxPartialLineSizeTruncate bool
entries []testEntry
expectedPartialLinesFlushed int // expected value of the partial lines flushed metric
expectedLinesTruncated int // expected value of the lines truncated metric
}

cases := []testCase{
Expand All @@ -108,7 +113,9 @@ func TestCRI_tags(t *testing.T) {
{line: "2019-05-07T18:57:50.904275087+00:00 stdout F some full line", labels: model.LabelSet{"foo": "bar"}},
{line: "2019-05-07T18:57:55.904275087+00:00 stdout F log", labels: model.LabelSet{"foo": "bar"}},
},
expected: []string{"some full line", "log"},
expected: []string{"some full line", "log"},
expectedPartialLinesFlushed: 0,
expectedLinesTruncated: 0,
},
{
name: "tag P multi-stream",
Expand All @@ -123,6 +130,8 @@ func TestCRI_tags(t *testing.T) {
"partial line 1 log finished", // belongs to stream `{foo="bar"}`
"partial line 2 another full log", // belongs to stream `{foo="bar2"}
},
expectedPartialLinesFlushed: 0,
expectedLinesTruncated: 0,
},
{
name: "tag P multi-stream with maxPartialLines exceeded",
Expand All @@ -145,6 +154,8 @@ func TestCRI_tags(t *testing.T) {
"another full log",
"partial line 5 yet an another full log",
},
expectedPartialLinesFlushed: 3, // 3 partial lines were flushed when limit was exceeded
expectedLinesTruncated: 0,
},
{
name: "tag P single stream",
Expand All @@ -161,6 +172,8 @@ func TestCRI_tags(t *testing.T) {
"partial line 1 partial line 2 partial line 3 partial line 4 log finished",
"another full log",
},
expectedPartialLinesFlushed: 0, // single stream, no flush due to limit (partial lines merge within same stream)
expectedLinesTruncated: 0,
},
{
name: "tag P multi-stream with truncation",
Expand All @@ -177,17 +190,20 @@ func TestCRI_tags(t *testing.T) {
"partial lin",
"partialfull",
},
expectedPartialLinesFlushed: 0,
expectedLinesTruncated: 2, // 2 lines were truncated due to max_partial_line_size
},
}

for _, tt := range cases {
t.Run(tt.name, func(t *testing.T) {
registry := prometheus.NewRegistry()
cfg := CRIConfig{
MaxPartialLines: tt.maxPartialLines,
MaxPartialLineSize: tt.maxPartialLineSize,
MaxPartialLineSizeTruncate: tt.maxPartialLineSizeTruncate,
}
p, err := NewCRI(log.NewNopLogger(), cfg, prometheus.DefaultRegisterer, featuregate.StabilityGenerallyAvailable)
p, err := NewCRI(log.NewNopLogger(), cfg, registry, featuregate.StabilityGenerallyAvailable)
require.NoError(t, err)

got := make([]string, 0)
Expand All @@ -212,6 +228,17 @@ func TestCRI_tags(t *testing.T) {
}

assert.Equal(t, expectedMap, gotMap)

// Verify the metrics
expectedMetrics := fmt.Sprintf(`
# HELP loki_process_cri_lines_truncated_total A count of lines that were truncated due to the max_partial_line_size limit
# TYPE loki_process_cri_lines_truncated_total counter
loki_process_cri_lines_truncated_total %d
# HELP loki_process_cri_partial_lines_flushed_total A count of partial lines that were flushed prematurely due to the max_partial_lines limit being exceeded
# TYPE loki_process_cri_partial_lines_flushed_total counter
loki_process_cri_partial_lines_flushed_total %d
`, tt.expectedLinesTruncated, tt.expectedPartialLinesFlushed)
require.NoError(t, testutil.GatherAndCompare(registry, strings.NewReader(expectedMetrics)))
})
}
}
Expand Down
2 changes: 2 additions & 0 deletions internal/component/loki/process/stages/match.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@ func getDropCountMetric(registerer prometheus.Registerer) *prometheus.CounterVec
}, []string{"reason"})
err := registerer.Register(dropCount)
if err != nil {
// TODO: This code should neither panic nor use AlreadyRegisteredError.
// Register it without these, and return error if it fails.
if existing, ok := err.(prometheus.AlreadyRegisteredError); ok {
dropCount = existing.ExistingCollector.(*prometheus.CounterVec)
} else {
Expand Down
Loading