Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/sources/reference/components/loki/loki.process.md
Original file line number Diff line number Diff line change
Expand Up @@ -2056,6 +2056,7 @@ The following fields are exported and can be referenced by other components:

## Debug metrics

* `loki_fanout_latency` (histogram): Write latency for sending to components.
* `loki_process_dropped_lines_total` (counter): Number of lines dropped as part of a processing stage.
* `loki_process_dropped_lines_by_label_total` (counter): Number of lines dropped when `by_label_name` is non-empty in [stage.limit][].
* `loki_process_truncated_fields_total` (counter): Number of lines, label values, extracted field values, and structured metadata values truncated as part of a `truncate` stage.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ The `tls` block configures TLS for the HTTP server.
The following are some of the metrics that are exposed when this component is used.
The metrics include labels such as `status_code` where relevant, which can be used to measure request success rates.

* `loki_fanout_latency` (histogram): Write latency for sending to components.
* `loki_source_api_request_duration_seconds` (histogram): Time (in seconds) spent serving HTTP requests.
* `loki_source_api_request_message_bytes` (histogram): Size (in bytes) of messages received in the request.
* `loki_source_api_response_message_bytes` (histogram): Size (in bytes) of messages sent in response.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ The `loki.source.cloudflare` component doesn't support any blocks. You can confi

## Debug metrics

* `loki_fanout_latency` (histogram): Write latency for sending to components.
* `loki_source_cloudflare_target_entries_total` (counter): Total number of successful entries sent via the cloudflare target.
* `loki_source_cloudflare_target_last_requested_end_timestamp` (gauge): The last cloudflare request end timestamp fetched, for calculating how far behind the target is.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ The `tls_config` block configures TLS settings for connecting to HTTPS Docker da

## Debug metrics

* `loki_fanout_latency` (histogram): Write latency for sending to components.
* `loki_source_docker_target_entries_total` (gauge): Total number of successful entries sent to the Docker target.
* `loki_source_docker_target_parsing_errors_total` (gauge): Total number of parsing errors while receiving Docker messages.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ The glob patterns support the `{a,b,c}` syntax for matching multiple alternative

## Debug metrics

* `loki_fanout_latency` (histogram): Write latency for sending to components.
* `loki_source_file_file_bytes_total` (gauge): Number of bytes total.
* `loki_source_file_files_active_total` (gauge): Number of active files.
* `loki_source_file_read_bytes_total` (gauge): Number of bytes read.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ The translation of legacy position file will happens if there is no position fil

## Debug Metrics

* `loki_fanout_latency` (histogram): Write latency for sending to components.
* `loki_source_journal_target_parsing_errors_total` (counter): Total number of parsing errors while reading journal messages.
* `loki_source_journal_target_lines_total` (counter): Total number of successful journal lines read.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ The following arguments are supported:

## Debug metrics

`loki.source.kubernetes_events` doesn't expose any component-specific debug metrics.
* `loki_fanout_latency` (histogram): Write latency for sending to components.

## Component behavior

Expand Down
20 changes: 19 additions & 1 deletion internal/component/common/loki/fanout.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,23 @@ import (
"context"
"reflect"
"sync"
"time"

"github.com/prometheus/client_golang/prometheus"
)

// NewFanout creates a new Fanout that will send log entries to the provided
// list of LogsReceivers.
func NewFanout(children []LogsReceiver) *Fanout {
func NewFanout(children []LogsReceiver, register prometheus.Registerer) *Fanout {
wl := prometheus.NewHistogram(prometheus.HistogramOpts{
Name: "loki_fanout_latency",
Help: "Write latency for sending to components",
Buckets: []float64{0.0002, 0.001, 0.005, 0.02, 0.1, 1.0},
})
_ = register.Register(wl)

return &Fanout{
wl: wl,
children: children,
}
}
Expand All @@ -18,12 +29,16 @@ func NewFanout(children []LogsReceiver) *Fanout {
// It is thread-safe and allows the list of receivers to be updated dynamically
type Fanout struct {
mut sync.RWMutex
wl prometheus.Histogram
children []LogsReceiver
}

// Send forwards a log entry to all registered receivers. It returns an error
// if the context is cancelled while sending.
func (f *Fanout) Send(ctx context.Context, entry Entry) error {
start := time.Now()
defer func() { f.wl.Observe(float64(time.Since(start).Seconds())) }()

// NOTE: It's important that we hold a read lock for the duration of Send
// rather than making a copy of children and releasing the lock early.
//
Expand All @@ -50,6 +65,9 @@ func (f *Fanout) Send(ctx context.Context, entry Entry) error {
// SendBatch forwards a batch of entires to all registered receivers. It returns an error
// if the context is cancelled while sending.
func (f *Fanout) SendBatch(ctx context.Context, batch []Entry) error {
start := time.Now()
defer func() { f.wl.Observe(float64(time.Since(start).Seconds())) }()

// NOTE: It's important that we hold a read lock for the duration of SendBatch
// rather than making a copy of children and releasing the lock early.
//
Expand Down
2 changes: 1 addition & 1 deletion internal/component/loki/process/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func New(o component.Options, args Arguments) (*Component, error) {
processOut: loki.NewLogsReceiver(),
processIn: loki.NewLogsReceiver(),
receiver: loki.NewLogsReceiver(loki.WithComponentID(o.ID)),
fanout: loki.NewFanout(args.ForwardTo),
fanout: loki.NewFanout(args.ForwardTo, o.Registerer),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this registerer scoped to the component?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

debugDataPublisher: debugDataPublisher.(livedebugging.DebugDataPublisher),
}

Expand Down
81 changes: 61 additions & 20 deletions internal/component/loki/process/process_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"os"
"slices"
"strings"
"sync"
"testing"
Expand All @@ -13,6 +14,7 @@ import (
"github.com/grafana/loki/pkg/push"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/testutil"
dto "github.com/prometheus/client_model/go"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
Expand Down Expand Up @@ -47,13 +49,12 @@ func TestComponent(t *testing.T) {
require.NoError(t, ctrl.WaitExports(time.Minute))

recv := ctrl.Exports().(Exports).Receiver
fanout := loki.NewFanout([]loki.LogsReceiver{recv})

wg.Go(func() {
for {
// We get error if context is canceled
if err := fanout.Send(ctx, loki.Entry{}); err != nil {
select {
case <-ctx.Done():
return
case recv.Chan() <- loki.Entry{}:
}
}
})
Expand Down Expand Up @@ -813,9 +814,12 @@ func TestMetricsStageRefresh(t *testing.T) {

// The component will be reconfigured so that it has a metric.
t.Run("config with a metric", func(t *testing.T) {
tester.updateAndTest(numLogsToSend, cfg,
tester.updateAndTest(
numLogsToSend,
cfg,
"",
fmt.Sprintf(expectedMetrics, numLogsToSend, numLogsToSend))
fmt.Sprintf(expectedMetrics, numLogsToSend, numLogsToSend),
)
})

// The component will be "updated" with the same config.
Expand All @@ -825,16 +829,24 @@ func TestMetricsStageRefresh(t *testing.T) {
// which reloads the collector config every X seconds.
// Those users wouldn't expect their metrics to be reset every time the config is reloaded.
t.Run("config with the same metric", func(t *testing.T) {
tester.updateAndTest(numLogsToSend, cfg,
tester.updateAndTest(
numLogsToSend,
cfg,
fmt.Sprintf(expectedMetrics, numLogsToSend, numLogsToSend),
fmt.Sprintf(expectedMetrics, 2*numLogsToSend, 2*numLogsToSend))
fmt.Sprintf(expectedMetrics, 2*numLogsToSend, 2*numLogsToSend),
)
})

// Use a config which has no metrics stage.
// This should cause the metric to disappear.
cfgWithNoStages := forwardArgs
t.Run("config with no metrics stage", func(t *testing.T) {
tester.updateAndTest(numLogsToSend, cfgWithNoStages, "", "")
tester.updateAndTest(
numLogsToSend,
cfgWithNoStages,
"",
"",
)
})

// Use a config which has a metric with a different name,
Expand Down Expand Up @@ -881,16 +893,19 @@ func TestMetricsStageRefresh(t *testing.T) {
`

t.Run("config with a new and old metric", func(t *testing.T) {
tester.updateAndTest(numLogsToSend, updatedCfg,
tester.updateAndTest(
numLogsToSend,
updatedCfg,
"",
fmt.Sprintf(expectedMetrics3, numLogsToSend, numLogsToSend, numLogsToSend))
fmt.Sprintf(expectedMetrics3, numLogsToSend, numLogsToSend, numLogsToSend),
)
})
}

type tester struct {
t *testing.T
component *Component
registry *prometheus.Registry
registry *ignoringRegistry
cancelFunc context.CancelFunc
logReceiver loki.LogsReceiver
logTimestamp time.Time
Expand All @@ -900,7 +915,10 @@ type tester struct {

// Create the component, so that it can process and forward logs.
func newTester(t *testing.T) *tester {
reg := prometheus.NewRegistry()
reg := newIgnoringRegistry(
prometheus.NewRegistry(),
"loki_fanout_latency",
)

opts := component.Options{
Logger: util.TestAlloyLogger(t),
Expand Down Expand Up @@ -960,10 +978,7 @@ func (t *tester) updateAndTest(numLogsToSend int, cfg, expectedMetricsBeforeSend
t.component.Update(args)

// Check the component metrics.
if err := testutil.GatherAndCompare(t.registry,
strings.NewReader(expectedMetricsBeforeSendingLogs)); err != nil {
require.NoError(t.t, err)
}
require.NoError(t.t, testutil.GatherAndCompare(t.registry, strings.NewReader(expectedMetricsBeforeSendingLogs)))

// Send logs.
for i := 0; i < numLogsToSend; i++ {
Expand All @@ -983,8 +998,34 @@ func (t *tester) updateAndTest(numLogsToSend int, cfg, expectedMetricsBeforeSend
}

// Check the component metrics.
if err := testutil.GatherAndCompare(t.registry,
strings.NewReader(expectedMetricsAfterSendingLogs)); err != nil {
require.NoError(t.t, err)
require.NoError(t.t, testutil.GatherAndCompare(t.registry, strings.NewReader(expectedMetricsAfterSendingLogs)))
}

func newIgnoringRegistry(reg *prometheus.Registry, ignore ...string) *ignoringRegistry {
return &ignoringRegistry{
Registry: reg,
ignore: ignore,
}
}

type ignoringRegistry struct {
*prometheus.Registry
ignore []string
}

func (r *ignoringRegistry) Gather() ([]*dto.MetricFamily, error) {
mfs, err := r.Registry.Gather()
if err != nil {
return nil, err
}

filtered := make([]*dto.MetricFamily, 0, len(mfs))
for _, mf := range mfs {
if slices.Contains(r.ignore, mf.GetName()) {
continue
}
filtered = append(filtered, mf)
}

return filtered, nil
}
2 changes: 1 addition & 1 deletion internal/component/loki/source/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func New(opts component.Options, args Arguments) (*Component, error) {
handler: loki.NewLogsBatchReceiver(),
uncheckedCollector: util.NewUncheckedCollector(nil),

fanout: loki.NewFanout(args.ForwardTo),
fanout: loki.NewFanout(args.ForwardTo, opts.Registerer),
}
opts.Registerer.MustRegister(c.uncheckedCollector)
err := c.Update(args)
Expand Down
2 changes: 1 addition & 1 deletion internal/component/loki/source/cloudflare/cloudflare.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func New(o component.Options, args Arguments) (*Component, error) {
opts: o,
metrics: newMetrics(o.Registerer),
handler: loki.NewLogsReceiver(),
fanout: loki.NewFanout(args.ForwardTo),
fanout: loki.NewFanout(args.ForwardTo, o.Registerer),
posFile: positionsFile,
}

Expand Down
7 changes: 4 additions & 3 deletions internal/component/loki/source/consume_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"testing"

"github.com/grafana/loki/pkg/push"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"

"github.com/grafana/alloy/internal/component/common/loki"
Expand All @@ -14,7 +15,7 @@ import (
func TestConsume(t *testing.T) {
consumer := loki.NewLogsReceiver()
producer := loki.NewLogsReceiver()
fanout := loki.NewFanout([]loki.LogsReceiver{consumer})
fanout := loki.NewFanout([]loki.LogsReceiver{consumer}, prometheus.NewRegistry())

t.Run("should fanout any consumed entries", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -47,7 +48,7 @@ func TestConsume(t *testing.T) {
func TestConsumeAndProcess(t *testing.T) {
consumer := loki.NewLogsReceiver()
producer := loki.NewLogsReceiver()
fanout := loki.NewFanout([]loki.LogsReceiver{consumer})
fanout := loki.NewFanout([]loki.LogsReceiver{consumer}, prometheus.NewRegistry())

t.Run("should process and fanout any consumed entries", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -88,7 +89,7 @@ func TestConsumeAndProcess(t *testing.T) {
func TestConsumeBatch(t *testing.T) {
consumer := loki.NewLogsReceiver()
producer := loki.NewLogsBatchReceiver()
fanout := loki.NewFanout([]loki.LogsReceiver{consumer})
fanout := loki.NewFanout([]loki.LogsReceiver{consumer}, prometheus.NewRegistry())

t.Run("should fanout any consumed entries", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
Expand Down
2 changes: 1 addition & 1 deletion internal/component/loki/source/docker/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func New(o component.Options, args Arguments) (*Component, error) {
exited: atomic.NewBool(false),
handler: loki.NewLogsReceiver(),
scheduler: source.NewScheduler[string](),
fanout: loki.NewFanout(args.ForwardTo),
fanout: loki.NewFanout(args.ForwardTo, o.Registerer),
posFile: positionsFile,
}

Expand Down
2 changes: 1 addition & 1 deletion internal/component/loki/source/file/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ func New(o component.Options, args Arguments) (*Component, error) {
opts: o,
metrics: newMetrics(o.Registerer),
handler: loki.NewLogsReceiver(),
fanout: loki.NewFanout(args.ForwardTo),
fanout: loki.NewFanout(args.ForwardTo, o.Registerer),
posFile: positionsFile,
scheduler: source.NewScheduler[positions.Entry](),
watcher: time.NewTicker(args.FileMatch.SyncPeriod),
Expand Down
2 changes: 1 addition & 1 deletion internal/component/loki/source/journal/journal.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func New(o component.Options, args Arguments) (*Component, error) {
opts: o,
recv: loki.NewLogsReceiver(),
positions: positionsFile,
fanout: loki.NewFanout(args.ForwardTo),
fanout: loki.NewFanout(args.ForwardTo, o.Registerer),
targetsUpdated: make(chan struct{}, 1),
args: args,
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func New(o component.Options, args Arguments) (*Component, error) {
positions: positionsFile,
handler: loki.NewLogsReceiver(),
scheduler: source.NewScheduler[string](),
fanout: loki.NewFanout(args.ForwardTo),
fanout: loki.NewFanout(args.ForwardTo, o.Registerer),
}
if err := c.Update(args); err != nil {
return nil, err
Expand Down
Loading