diff --git a/docs/sources/reference/components/loki/loki.process.md b/docs/sources/reference/components/loki/loki.process.md index cf26f176853..32bc90a4c63 100644 --- a/docs/sources/reference/components/loki/loki.process.md +++ b/docs/sources/reference/components/loki/loki.process.md @@ -2056,6 +2056,7 @@ The following fields are exported and can be referenced by other components: ## Debug metrics +* `loki_fanout_latency` (histogram): Write latency for sending to components. * `loki_process_dropped_lines_total` (counter): Number of lines dropped as part of a processing stage. * `loki_process_dropped_lines_by_label_total` (counter): Number of lines dropped when `by_label_name` is non-empty in [stage.limit][]. * `loki_process_truncated_fields_total` (counter): Number of lines, label values, extracted field values, and structured metadata values truncated as part of a `truncate` stage. diff --git a/docs/sources/reference/components/loki/loki.source.api.md b/docs/sources/reference/components/loki/loki.source.api.md index 29330111ba2..9f81aa0ce78 100644 --- a/docs/sources/reference/components/loki/loki.source.api.md +++ b/docs/sources/reference/components/loki/loki.source.api.md @@ -101,6 +101,7 @@ The `tls` block configures TLS for the HTTP server. The following are some of the metrics that are exposed when this component is used. The metrics include labels such as `status_code` where relevant, which can be used to measure request success rates. +* `loki_fanout_latency` (histogram): Write latency for sending to components. * `loki_source_api_request_duration_seconds` (histogram): Time (in seconds) spent serving HTTP requests. * `loki_source_api_request_message_bytes` (histogram): Size (in bytes) of messages received in the request. * `loki_source_api_response_message_bytes` (histogram): Size (in bytes) of messages sent in response. diff --git a/docs/sources/reference/components/loki/loki.source.cloudflare.md b/docs/sources/reference/components/loki/loki.source.cloudflare.md index 9848ed97c21..38e968e6733 100644 --- a/docs/sources/reference/components/loki/loki.source.cloudflare.md +++ b/docs/sources/reference/components/loki/loki.source.cloudflare.md @@ -236,6 +236,7 @@ The `loki.source.cloudflare` component doesn't support any blocks. You can confi ## Debug metrics +* `loki_fanout_latency` (histogram): Write latency for sending to components. * `loki_source_cloudflare_target_entries_total` (counter): Total number of successful entries sent via the cloudflare target. * `loki_source_cloudflare_target_last_requested_end_timestamp` (gauge): The last cloudflare request end timestamp fetched, for calculating how far behind the target is. diff --git a/docs/sources/reference/components/loki/loki.source.docker.md b/docs/sources/reference/components/loki/loki.source.docker.md index 6a7e3cd4d52..527d2d51a83 100644 --- a/docs/sources/reference/components/loki/loki.source.docker.md +++ b/docs/sources/reference/components/loki/loki.source.docker.md @@ -113,6 +113,7 @@ The `tls_config` block configures TLS settings for connecting to HTTPS Docker da ## Debug metrics +* `loki_fanout_latency` (histogram): Write latency for sending to components. * `loki_source_docker_target_entries_total` (gauge): Total number of successful entries sent to the Docker target. * `loki_source_docker_target_parsing_errors_total` (gauge): Total number of parsing errors while receiving Docker messages. diff --git a/docs/sources/reference/components/loki/loki.source.file.md b/docs/sources/reference/components/loki/loki.source.file.md index d11b99819fb..b3963b1df17 100644 --- a/docs/sources/reference/components/loki/loki.source.file.md +++ b/docs/sources/reference/components/loki/loki.source.file.md @@ -165,6 +165,7 @@ The glob patterns support the `{a,b,c}` syntax for matching multiple alternative ## Debug metrics +* `loki_fanout_latency` (histogram): Write latency for sending to components. * `loki_source_file_file_bytes_total` (gauge): Number of bytes total. * `loki_source_file_files_active_total` (gauge): Number of active files. * `loki_source_file_read_bytes_total` (gauge): Number of bytes read. diff --git a/docs/sources/reference/components/loki/loki.source.journal.md b/docs/sources/reference/components/loki/loki.source.journal.md index 1b3b33924e2..c0073957c7b 100644 --- a/docs/sources/reference/components/loki/loki.source.journal.md +++ b/docs/sources/reference/components/loki/loki.source.journal.md @@ -142,6 +142,7 @@ The translation of legacy position file will happens if there is no position fil ## Debug Metrics +* `loki_fanout_latency` (histogram): Write latency for sending to components. * `loki_source_journal_target_parsing_errors_total` (counter): Total number of parsing errors while reading journal messages. * `loki_source_journal_target_lines_total` (counter): Total number of successful journal lines read. diff --git a/docs/sources/reference/components/loki/loki.source.kubernetes_events.md b/docs/sources/reference/components/loki/loki.source.kubernetes_events.md index fd8b754fba4..a8a8672b591 100644 --- a/docs/sources/reference/components/loki/loki.source.kubernetes_events.md +++ b/docs/sources/reference/components/loki/loki.source.kubernetes_events.md @@ -145,7 +145,7 @@ The following arguments are supported: ## Debug metrics -`loki.source.kubernetes_events` doesn't expose any component-specific debug metrics. +* `loki_fanout_latency` (histogram): Write latency for sending to components. ## Component behavior diff --git a/internal/component/common/loki/fanout.go b/internal/component/common/loki/fanout.go index 1551bd8fd92..42cc6f1423e 100644 --- a/internal/component/common/loki/fanout.go +++ b/internal/component/common/loki/fanout.go @@ -4,12 +4,23 @@ import ( "context" "reflect" "sync" + "time" + + "github.com/prometheus/client_golang/prometheus" ) // NewFanout creates a new Fanout that will send log entries to the provided // list of LogsReceivers. -func NewFanout(children []LogsReceiver) *Fanout { +func NewFanout(children []LogsReceiver, register prometheus.Registerer) *Fanout { + wl := prometheus.NewHistogram(prometheus.HistogramOpts{ + Name: "loki_fanout_latency", + Help: "Write latency for sending to components", + Buckets: []float64{0.0002, 0.001, 0.005, 0.02, 0.1, 1.0}, + }) + _ = register.Register(wl) + return &Fanout{ + wl: wl, children: children, } } @@ -18,12 +29,16 @@ func NewFanout(children []LogsReceiver) *Fanout { // It is thread-safe and allows the list of receivers to be updated dynamically type Fanout struct { mut sync.RWMutex + wl prometheus.Histogram children []LogsReceiver } // Send forwards a log entry to all registered receivers. It returns an error // if the context is cancelled while sending. func (f *Fanout) Send(ctx context.Context, entry Entry) error { + start := time.Now() + defer func() { f.wl.Observe(float64(time.Since(start).Seconds())) }() + // NOTE: It's important that we hold a read lock for the duration of Send // rather than making a copy of children and releasing the lock early. // @@ -50,6 +65,9 @@ func (f *Fanout) Send(ctx context.Context, entry Entry) error { // SendBatch forwards a batch of entires to all registered receivers. It returns an error // if the context is cancelled while sending. func (f *Fanout) SendBatch(ctx context.Context, batch []Entry) error { + start := time.Now() + defer func() { f.wl.Observe(float64(time.Since(start).Seconds())) }() + // NOTE: It's important that we hold a read lock for the duration of SendBatch // rather than making a copy of children and releasing the lock early. // diff --git a/internal/component/loki/process/process.go b/internal/component/loki/process/process.go index 211c05327ef..3e6f46d83cb 100644 --- a/internal/component/loki/process/process.go +++ b/internal/component/loki/process/process.go @@ -73,7 +73,7 @@ func New(o component.Options, args Arguments) (*Component, error) { processOut: loki.NewLogsReceiver(), processIn: loki.NewLogsReceiver(), receiver: loki.NewLogsReceiver(loki.WithComponentID(o.ID)), - fanout: loki.NewFanout(args.ForwardTo), + fanout: loki.NewFanout(args.ForwardTo, o.Registerer), debugDataPublisher: debugDataPublisher.(livedebugging.DebugDataPublisher), } diff --git a/internal/component/loki/process/process_test.go b/internal/component/loki/process/process_test.go index 867f3d2a8ed..de0d459ede2 100644 --- a/internal/component/loki/process/process_test.go +++ b/internal/component/loki/process/process_test.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "os" + "slices" "strings" "sync" "testing" @@ -13,6 +14,7 @@ import ( "github.com/grafana/loki/pkg/push" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/testutil" + dto "github.com/prometheus/client_model/go" "github.com/prometheus/common/model" "github.com/stretchr/testify/require" "go.uber.org/atomic" @@ -47,13 +49,12 @@ func TestComponent(t *testing.T) { require.NoError(t, ctrl.WaitExports(time.Minute)) recv := ctrl.Exports().(Exports).Receiver - fanout := loki.NewFanout([]loki.LogsReceiver{recv}) - wg.Go(func() { for { - // We get error if context is canceled - if err := fanout.Send(ctx, loki.Entry{}); err != nil { + select { + case <-ctx.Done(): return + case recv.Chan() <- loki.Entry{}: } } }) @@ -813,9 +814,12 @@ func TestMetricsStageRefresh(t *testing.T) { // The component will be reconfigured so that it has a metric. t.Run("config with a metric", func(t *testing.T) { - tester.updateAndTest(numLogsToSend, cfg, + tester.updateAndTest( + numLogsToSend, + cfg, "", - fmt.Sprintf(expectedMetrics, numLogsToSend, numLogsToSend)) + fmt.Sprintf(expectedMetrics, numLogsToSend, numLogsToSend), + ) }) // The component will be "updated" with the same config. @@ -825,16 +829,24 @@ func TestMetricsStageRefresh(t *testing.T) { // which reloads the collector config every X seconds. // Those users wouldn't expect their metrics to be reset every time the config is reloaded. t.Run("config with the same metric", func(t *testing.T) { - tester.updateAndTest(numLogsToSend, cfg, + tester.updateAndTest( + numLogsToSend, + cfg, fmt.Sprintf(expectedMetrics, numLogsToSend, numLogsToSend), - fmt.Sprintf(expectedMetrics, 2*numLogsToSend, 2*numLogsToSend)) + fmt.Sprintf(expectedMetrics, 2*numLogsToSend, 2*numLogsToSend), + ) }) // Use a config which has no metrics stage. // This should cause the metric to disappear. cfgWithNoStages := forwardArgs t.Run("config with no metrics stage", func(t *testing.T) { - tester.updateAndTest(numLogsToSend, cfgWithNoStages, "", "") + tester.updateAndTest( + numLogsToSend, + cfgWithNoStages, + "", + "", + ) }) // Use a config which has a metric with a different name, @@ -881,16 +893,19 @@ func TestMetricsStageRefresh(t *testing.T) { ` t.Run("config with a new and old metric", func(t *testing.T) { - tester.updateAndTest(numLogsToSend, updatedCfg, + tester.updateAndTest( + numLogsToSend, + updatedCfg, "", - fmt.Sprintf(expectedMetrics3, numLogsToSend, numLogsToSend, numLogsToSend)) + fmt.Sprintf(expectedMetrics3, numLogsToSend, numLogsToSend, numLogsToSend), + ) }) } type tester struct { t *testing.T component *Component - registry *prometheus.Registry + registry *ignoringRegistry cancelFunc context.CancelFunc logReceiver loki.LogsReceiver logTimestamp time.Time @@ -900,7 +915,10 @@ type tester struct { // Create the component, so that it can process and forward logs. func newTester(t *testing.T) *tester { - reg := prometheus.NewRegistry() + reg := newIgnoringRegistry( + prometheus.NewRegistry(), + "loki_fanout_latency", + ) opts := component.Options{ Logger: util.TestAlloyLogger(t), @@ -960,10 +978,7 @@ func (t *tester) updateAndTest(numLogsToSend int, cfg, expectedMetricsBeforeSend t.component.Update(args) // Check the component metrics. - if err := testutil.GatherAndCompare(t.registry, - strings.NewReader(expectedMetricsBeforeSendingLogs)); err != nil { - require.NoError(t.t, err) - } + require.NoError(t.t, testutil.GatherAndCompare(t.registry, strings.NewReader(expectedMetricsBeforeSendingLogs))) // Send logs. for i := 0; i < numLogsToSend; i++ { @@ -983,8 +998,34 @@ func (t *tester) updateAndTest(numLogsToSend int, cfg, expectedMetricsBeforeSend } // Check the component metrics. - if err := testutil.GatherAndCompare(t.registry, - strings.NewReader(expectedMetricsAfterSendingLogs)); err != nil { - require.NoError(t.t, err) + require.NoError(t.t, testutil.GatherAndCompare(t.registry, strings.NewReader(expectedMetricsAfterSendingLogs))) +} + +func newIgnoringRegistry(reg *prometheus.Registry, ignore ...string) *ignoringRegistry { + return &ignoringRegistry{ + Registry: reg, + ignore: ignore, } } + +type ignoringRegistry struct { + *prometheus.Registry + ignore []string +} + +func (r *ignoringRegistry) Gather() ([]*dto.MetricFamily, error) { + mfs, err := r.Registry.Gather() + if err != nil { + return nil, err + } + + filtered := make([]*dto.MetricFamily, 0, len(mfs)) + for _, mf := range mfs { + if slices.Contains(r.ignore, mf.GetName()) { + continue + } + filtered = append(filtered, mf) + } + + return filtered, nil +} diff --git a/internal/component/loki/source/api/api.go b/internal/component/loki/source/api/api.go index b5f2b6ff3a6..05d37aa33bb 100644 --- a/internal/component/loki/source/api/api.go +++ b/internal/component/loki/source/api/api.go @@ -73,7 +73,7 @@ func New(opts component.Options, args Arguments) (*Component, error) { handler: loki.NewLogsBatchReceiver(), uncheckedCollector: util.NewUncheckedCollector(nil), - fanout: loki.NewFanout(args.ForwardTo), + fanout: loki.NewFanout(args.ForwardTo, opts.Registerer), } opts.Registerer.MustRegister(c.uncheckedCollector) err := c.Update(args) diff --git a/internal/component/loki/source/cloudflare/cloudflare.go b/internal/component/loki/source/cloudflare/cloudflare.go index 0417efcba44..e64351bbb85 100644 --- a/internal/component/loki/source/cloudflare/cloudflare.go +++ b/internal/component/loki/source/cloudflare/cloudflare.go @@ -120,7 +120,7 @@ func New(o component.Options, args Arguments) (*Component, error) { opts: o, metrics: newMetrics(o.Registerer), handler: loki.NewLogsReceiver(), - fanout: loki.NewFanout(args.ForwardTo), + fanout: loki.NewFanout(args.ForwardTo, o.Registerer), posFile: positionsFile, } diff --git a/internal/component/loki/source/consume_test.go b/internal/component/loki/source/consume_test.go index eca02596f39..d42a276ec79 100644 --- a/internal/component/loki/source/consume_test.go +++ b/internal/component/loki/source/consume_test.go @@ -6,6 +6,7 @@ import ( "testing" "github.com/grafana/loki/pkg/push" + "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/require" "github.com/grafana/alloy/internal/component/common/loki" @@ -14,7 +15,7 @@ import ( func TestConsume(t *testing.T) { consumer := loki.NewLogsReceiver() producer := loki.NewLogsReceiver() - fanout := loki.NewFanout([]loki.LogsReceiver{consumer}) + fanout := loki.NewFanout([]loki.LogsReceiver{consumer}, prometheus.NewRegistry()) t.Run("should fanout any consumed entries", func(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) @@ -47,7 +48,7 @@ func TestConsume(t *testing.T) { func TestConsumeAndProcess(t *testing.T) { consumer := loki.NewLogsReceiver() producer := loki.NewLogsReceiver() - fanout := loki.NewFanout([]loki.LogsReceiver{consumer}) + fanout := loki.NewFanout([]loki.LogsReceiver{consumer}, prometheus.NewRegistry()) t.Run("should process and fanout any consumed entries", func(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) @@ -88,7 +89,7 @@ func TestConsumeAndProcess(t *testing.T) { func TestConsumeBatch(t *testing.T) { consumer := loki.NewLogsReceiver() producer := loki.NewLogsBatchReceiver() - fanout := loki.NewFanout([]loki.LogsReceiver{consumer}) + fanout := loki.NewFanout([]loki.LogsReceiver{consumer}, prometheus.NewRegistry()) t.Run("should fanout any consumed entries", func(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) diff --git a/internal/component/loki/source/docker/docker.go b/internal/component/loki/source/docker/docker.go index 369ac661a18..74dad8d0ae7 100644 --- a/internal/component/loki/source/docker/docker.go +++ b/internal/component/loki/source/docker/docker.go @@ -142,7 +142,7 @@ func New(o component.Options, args Arguments) (*Component, error) { exited: atomic.NewBool(false), handler: loki.NewLogsReceiver(), scheduler: source.NewScheduler[string](), - fanout: loki.NewFanout(args.ForwardTo), + fanout: loki.NewFanout(args.ForwardTo, o.Registerer), posFile: positionsFile, } diff --git a/internal/component/loki/source/file/file.go b/internal/component/loki/source/file/file.go index 4c23db283f5..a7671cd79cd 100644 --- a/internal/component/loki/source/file/file.go +++ b/internal/component/loki/source/file/file.go @@ -234,7 +234,7 @@ func New(o component.Options, args Arguments) (*Component, error) { opts: o, metrics: newMetrics(o.Registerer), handler: loki.NewLogsReceiver(), - fanout: loki.NewFanout(args.ForwardTo), + fanout: loki.NewFanout(args.ForwardTo, o.Registerer), posFile: positionsFile, scheduler: source.NewScheduler[positions.Entry](), watcher: time.NewTicker(args.FileMatch.SyncPeriod), diff --git a/internal/component/loki/source/journal/journal.go b/internal/component/loki/source/journal/journal.go index 06cec76b83c..bed9b43c61d 100644 --- a/internal/component/loki/source/journal/journal.go +++ b/internal/component/loki/source/journal/journal.go @@ -79,7 +79,7 @@ func New(o component.Options, args Arguments) (*Component, error) { opts: o, recv: loki.NewLogsReceiver(), positions: positionsFile, - fanout: loki.NewFanout(args.ForwardTo), + fanout: loki.NewFanout(args.ForwardTo, o.Registerer), targetsUpdated: make(chan struct{}, 1), args: args, } diff --git a/internal/component/loki/source/kubernetes_events/kubernetes_events.go b/internal/component/loki/source/kubernetes_events/kubernetes_events.go index d266ed300c2..647a6add11f 100644 --- a/internal/component/loki/source/kubernetes_events/kubernetes_events.go +++ b/internal/component/loki/source/kubernetes_events/kubernetes_events.go @@ -117,7 +117,7 @@ func New(o component.Options, args Arguments) (*Component, error) { positions: positionsFile, handler: loki.NewLogsReceiver(), scheduler: source.NewScheduler[string](), - fanout: loki.NewFanout(args.ForwardTo), + fanout: loki.NewFanout(args.ForwardTo, o.Registerer), } if err := c.Update(args); err != nil { return nil, err