From aef3664286e280d0d778da38c8d3264306efd794 Mon Sep 17 00:00:00 2001 From: Paulin Todev Date: Thu, 12 Mar 2026 12:24:05 +0000 Subject: [PATCH 1/2] Support converting Promtail limits_config --- .../internal/build/global_context.go | 2 + .../internal/build/limits_config.go | 63 +++++++++++++++++++ .../internal/build/scrape_builder.go | 14 +++-- .../promtailconvert/promtailconvert.go | 1 + .../testdata/limits_config.alloy | 43 +++++++++++++ .../testdata/limits_config.diags | 3 + .../testdata/limits_config.yaml | 19 ++++++ .../testdata/unsupported.diags | 2 +- .../internal/promtailconvert/validate.go | 18 ++++-- 9 files changed, 154 insertions(+), 11 deletions(-) create mode 100644 internal/converter/internal/promtailconvert/internal/build/limits_config.go create mode 100644 internal/converter/internal/promtailconvert/testdata/limits_config.alloy create mode 100644 internal/converter/internal/promtailconvert/testdata/limits_config.diags create mode 100644 internal/converter/internal/promtailconvert/testdata/limits_config.yaml diff --git a/internal/converter/internal/promtailconvert/internal/build/global_context.go b/internal/converter/internal/promtailconvert/internal/build/global_context.go index 1589ce2884a..9363d571196 100644 --- a/internal/converter/internal/promtailconvert/internal/build/global_context.go +++ b/internal/converter/internal/promtailconvert/internal/build/global_context.go @@ -4,10 +4,12 @@ import ( "time" "github.com/grafana/alloy/internal/component/common/loki" + "github.com/grafana/alloy/internal/loki/promtail/limit" ) type GlobalContext struct { WriteReceivers []loki.LogsReceiver TargetSyncPeriod time.Duration LabelPrefix string + LimitsConfig limit.Config } diff --git a/internal/converter/internal/promtailconvert/internal/build/limits_config.go b/internal/converter/internal/promtailconvert/internal/build/limits_config.go new file mode 100644 index 00000000000..400763ce50b --- /dev/null +++ b/internal/converter/internal/promtailconvert/internal/build/limits_config.go @@ -0,0 +1,63 @@ +package build + +import ( + "fmt" + + "github.com/alecthomas/units" + + "github.com/grafana/alloy/internal/component/loki/process/stages" + "github.com/grafana/alloy/internal/loki/promtail/limit" +) + +// buildLimitsConfigStages converts the global Promtail limits_config into +// equivalent loki.process pipeline stages. +// +// Note: limits_config.max_streams has no equivalent in Alloy's pipeline stages +// and is silently omitted. A diagnostic warning is emitted by the top-level +// validator when max_streams is set. +// +// The conversion is inherently approximate: Promtail's limits apply globally +// across all pipelines, whereas the returned stages are injected into each +// per-scrape-config loki.process component individually. +func buildLimitsConfigStages(cfg limit.Config) []stages.StageConfig { + var result []stages.StageConfig + + if cfg.ReadlineRateEnabled { + result = append(result, stages.StageConfig{ + LimitConfig: &stages.LimitConfig{ + Rate: cfg.ReadlineRate, + Burst: cfg.ReadlineBurst, + Drop: cfg.ReadlineRateDrop, + }, + }) + } + + if cfg.MaxLineSize > 0 { + lineSizeBytes, err := units.ParseBase2Bytes(fmt.Sprintf("%dB", cfg.MaxLineSize.Val())) + if err != nil { + // MaxLineSize.Val() returns an int of raw bytes, so "%dB" is always valid. + // This branch is unreachable in practice. + return result + } + if cfg.MaxLineSizeTruncate { + result = append(result, stages.StageConfig{ + TruncateConfig: &stages.TruncateConfig{ + Rules: []*stages.RuleConfig{ + { + Limit: lineSizeBytes, + SourceType: stages.TruncateSourceLine, + }, + }, + }, + }) + } else { + result = append(result, stages.StageConfig{ + DropConfig: &stages.DropConfig{ + LongerThan: lineSizeBytes, + }, + }) + } + } + + return result +} diff --git a/internal/converter/internal/promtailconvert/internal/build/scrape_builder.go b/internal/converter/internal/promtailconvert/internal/build/scrape_builder.go index c15172f8060..e0f27158cbe 100644 --- a/internal/converter/internal/promtailconvert/internal/build/scrape_builder.go +++ b/internal/converter/internal/promtailconvert/internal/build/scrape_builder.go @@ -123,15 +123,21 @@ func (s *ScrapeConfigBuilder) getOrNewProcessStageReceivers() []loki.LogsReceive if s.processStageReceivers != nil { return s.processStageReceivers } - if len(s.cfg.PipelineStages) == 0 { + + globalLimitStages := buildLimitsConfigStages(s.globalCtx.LimitsConfig) + + if len(s.cfg.PipelineStages) == 0 && len(globalLimitStages) == 0 { s.processStageReceivers = s.globalCtx.WriteReceivers return s.processStageReceivers } - alloyStages := make([]stages.StageConfig, len(s.cfg.PipelineStages)) - for i, ps := range s.cfg.PipelineStages { + // Global limit stages are prepended so they apply before any per-scrape-config + // pipeline stages, matching Promtail's behavior of applying limits before processing. + alloyStages := make([]stages.StageConfig, 0, len(globalLimitStages)+len(s.cfg.PipelineStages)) + alloyStages = append(alloyStages, globalLimitStages...) + for _, ps := range s.cfg.PipelineStages { if fs, ok := convertStage(ps, s.diags); ok { - alloyStages[i] = fs + alloyStages = append(alloyStages, fs) } } args := process.Arguments{ diff --git a/internal/converter/internal/promtailconvert/promtailconvert.go b/internal/converter/internal/promtailconvert/promtailconvert.go index 1c5779f3434..4c6e6e74364 100644 --- a/internal/converter/internal/promtailconvert/promtailconvert.go +++ b/internal/converter/internal/promtailconvert/promtailconvert.go @@ -115,6 +115,7 @@ func AppendAll(f *builder.File, cfg *promtailcfg.Config, labelPrefix string, dia WriteReceivers: writeReceivers, TargetSyncPeriod: cfg.TargetConfig.SyncPeriod, LabelPrefix: labelPrefix, + LimitsConfig: cfg.LimitsConfig, } for _, sc := range cfg.ScrapeConfig { diff --git a/internal/converter/internal/promtailconvert/testdata/limits_config.alloy b/internal/converter/internal/promtailconvert/testdata/limits_config.alloy new file mode 100644 index 00000000000..798d0dc6509 --- /dev/null +++ b/internal/converter/internal/promtailconvert/testdata/limits_config.alloy @@ -0,0 +1,43 @@ +discovery.kubernetes "example" { + role = "pod" + kubeconfig_file = "/home/toby/.kube/config" + + selectors { + role = "pod" + field = "spec.nodeName=" + coalesce(sys.env("HOSTNAME"), constants.hostname) + } +} + +loki.process "example" { + forward_to = [loki.write.default.receiver] + + stage.limit { + rate = 1000.5 + burst = 1000 + drop = true + } + + stage.truncate { + rule { + limit = "256KiB" + source_type = "line" + } + } +} + +loki.source.file "example" { + targets = discovery.kubernetes.example.targets + forward_to = [loki.process.example.receiver] + + file_match { + enabled = true + } + legacy_positions_file = "/var/log/positions.yaml" +} + +loki.write "default" { + endpoint { + url = "http://localhost/loki/api/v1/push" + } + external_labels = {} +} diff --git a/internal/converter/internal/promtailconvert/testdata/limits_config.diags b/internal/converter/internal/promtailconvert/testdata/limits_config.diags new file mode 100644 index 00000000000..1faaf976a15 --- /dev/null +++ b/internal/converter/internal/promtailconvert/testdata/limits_config.diags @@ -0,0 +1,3 @@ +(Warning) limits_config.max_streams is not supported in Alloy and will be ignored in the converted configuration. +(Warning) limits_config rate limiting settings (readline_rate, readline_burst, readline_rate_drop) have been converted to stage.limit stages in each loki.process component. Unlike Promtail's global rate limiter, these limits apply independently per pipeline rather than across all pipelines combined. +(Warning) If you have a tracing set up for Promtail, it cannot be migrated to Alloy automatically. Refer to the documentation on how to configure tracing in Alloy. diff --git a/internal/converter/internal/promtailconvert/testdata/limits_config.yaml b/internal/converter/internal/promtailconvert/testdata/limits_config.yaml new file mode 100644 index 00000000000..999aaa543bb --- /dev/null +++ b/internal/converter/internal/promtailconvert/testdata/limits_config.yaml @@ -0,0 +1,19 @@ +limits_config: + max_line_size: 256KB + max_line_size_truncate: true + max_streams: 100 + readline_rate_enabled: true + readline_rate: 1000.5 + readline_burst: 1000 + readline_rate_drop: true + +clients: + - url: http://localhost/loki/api/v1/push + +scrape_configs: + - job_name: example + kubernetes_sd_configs: + - role: pod + kubeconfig_file: /home/toby/.kube/config + +server: { register_instrumentation: false } diff --git a/internal/converter/internal/promtailconvert/testdata/unsupported.diags b/internal/converter/internal/promtailconvert/testdata/unsupported.diags index 06cd17ce5a7..ecec5ecac54 100644 --- a/internal/converter/internal/promtailconvert/testdata/unsupported.diags +++ b/internal/converter/internal/promtailconvert/testdata/unsupported.diags @@ -1,5 +1,5 @@ (Error) Promtail's WAL is currently not supported in Alloy -(Error) limits_config is not yet supported in Alloy +(Warning) limits_config rate limiting settings (readline_rate, readline_burst, readline_rate_drop) have been converted to stage.limit stages in each loki.process component. Unlike Promtail's global rate limiter, these limits apply independently per pipeline rather than across all pipelines combined. (Warning) If you have a tracing set up for Promtail, it cannot be migrated to Alloy automatically. Refer to the documentation on how to configure tracing in Alloy. (Error) reading targets from stdin is not supported in Alloy configuration file (Warning) server.profiling_enabled is not supported - use Alloy's main HTTP server's profiling endpoints instead diff --git a/internal/converter/internal/promtailconvert/validate.go b/internal/converter/internal/promtailconvert/validate.go index 331a63146b1..b41e181655b 100644 --- a/internal/converter/internal/promtailconvert/validate.go +++ b/internal/converter/internal/promtailconvert/validate.go @@ -17,13 +17,19 @@ func validateTopLevelConfig(cfg *promtailcfg.Config, diags *diag.Diagnostics) { ) } - // Not yet supported, see https://github.com/grafana/agent/issues/4342. It's an error since we want to - // err on the safe side. - //TODO(thampiotr): seems like it's possible to support this using loki.process component - if cfg.LimitsConfig != DefaultLimitsConfig() { + if cfg.LimitsConfig.MaxStreams > 0 { diags.Add( - diag.SeverityLevelError, - "limits_config is not yet supported in Alloy", + diag.SeverityLevelWarn, + "limits_config.max_streams is not supported in Alloy and will be ignored in the converted configuration.", + ) + } + + if cfg.LimitsConfig.ReadlineRateEnabled { + diags.Add( + diag.SeverityLevelWarn, + "limits_config rate limiting settings (readline_rate, readline_burst, readline_rate_drop) have been "+ + "converted to stage.limit stages in each loki.process component. Unlike Promtail's global rate "+ + "limiter, these limits apply independently per pipeline rather than across all pipelines combined.", ) } From 3316006417516303122d71097ab4410e33e9c680 Mon Sep 17 00:00:00 2001 From: Paulin Todev Date: Mon, 16 Mar 2026 11:22:15 +0100 Subject: [PATCH 2/2] Convert max_streams --- .../promtailconvert/internal/build/limits_config.go | 5 ++--- .../internal/promtailconvert/internal/build/loki_write.go | 7 ++++--- .../converter/internal/promtailconvert/promtailconvert.go | 2 +- .../internal/promtailconvert/testdata/limits_config.alloy | 1 + .../internal/promtailconvert/testdata/limits_config.diags | 1 - internal/converter/internal/promtailconvert/validate.go | 7 ------- 6 files changed, 8 insertions(+), 15 deletions(-) diff --git a/internal/converter/internal/promtailconvert/internal/build/limits_config.go b/internal/converter/internal/promtailconvert/internal/build/limits_config.go index 400763ce50b..8b018935803 100644 --- a/internal/converter/internal/promtailconvert/internal/build/limits_config.go +++ b/internal/converter/internal/promtailconvert/internal/build/limits_config.go @@ -12,9 +12,8 @@ import ( // buildLimitsConfigStages converts the global Promtail limits_config into // equivalent loki.process pipeline stages. // -// Note: limits_config.max_streams has no equivalent in Alloy's pipeline stages -// and is silently omitted. A diagnostic warning is emitted by the top-level -// validator when max_streams is set. +// Note: limits_config.max_streams is converted separately into the max_streams +// argument on each loki.write component. // // The conversion is inherently approximate: Promtail's limits apply globally // across all pipelines, whereas the returned stages are injected into each diff --git a/internal/converter/internal/promtailconvert/internal/build/loki_write.go b/internal/converter/internal/promtailconvert/internal/build/loki_write.go index 3912b6b61f3..5dca611bfed 100644 --- a/internal/converter/internal/promtailconvert/internal/build/loki_write.go +++ b/internal/converter/internal/promtailconvert/internal/build/loki_write.go @@ -14,7 +14,7 @@ import ( "github.com/grafana/alloy/syntax/token/builder" ) -func NewLokiWrite(client *client.Config, diags *diag.Diagnostics, index int, labelPrefix string) (*builder.Block, loki.LogsReceiver) { +func NewLokiWrite(client *client.Config, diags *diag.Diagnostics, index int, labelPrefix string, maxStreams int) (*builder.Block, loki.LogsReceiver) { label := "default" if labelPrefix != "" { label = labelPrefix @@ -22,14 +22,14 @@ func NewLokiWrite(client *client.Config, diags *diag.Diagnostics, index int, lab lokiWriteLabel := common.LabelWithIndex(index, label) - lokiWriteArgs := toLokiWriteArguments(client, diags) + lokiWriteArgs := toLokiWriteArguments(client, diags, maxStreams) block := common.NewBlockWithOverride([]string{"loki", "write"}, lokiWriteLabel, lokiWriteArgs) return block, common.ConvertLogsReceiver{ Expr: fmt.Sprintf("loki.write.%s.receiver", lokiWriteLabel), } } -func toLokiWriteArguments(config *client.Config, diags *diag.Diagnostics) *lokiwrite.Arguments { +func toLokiWriteArguments(config *client.Config, diags *diag.Diagnostics, maxStreams int) *lokiwrite.Arguments { batchSize, err := units.ParseBase2Bytes(fmt.Sprintf("%dB", config.BatchSize)) if err != nil { diags.Add( @@ -57,6 +57,7 @@ func toLokiWriteArguments(config *client.Config, diags *diag.Diagnostics) *lokiw }, }, ExternalLabels: convertFlagLabels(config.ExternalLabels), + MaxStreams: maxStreams, } } diff --git a/internal/converter/internal/promtailconvert/promtailconvert.go b/internal/converter/internal/promtailconvert/promtailconvert.go index 4c6e6e74364..5ac246f1060 100644 --- a/internal/converter/internal/promtailconvert/promtailconvert.go +++ b/internal/converter/internal/promtailconvert/promtailconvert.go @@ -108,7 +108,7 @@ func AppendAll(f *builder.File, cfg *promtailcfg.Config, labelPrefix string, dia // Each client config needs to be a separate remote_write, // because they may have different ExternalLabels fields. for i, cc := range cfg.ClientConfigs { - writeBlocks[i], writeReceivers[i] = build.NewLokiWrite(&cc, &diags, i, labelPrefix) + writeBlocks[i], writeReceivers[i] = build.NewLokiWrite(&cc, &diags, i, labelPrefix, cfg.LimitsConfig.MaxStreams) } gc := &build.GlobalContext{ diff --git a/internal/converter/internal/promtailconvert/testdata/limits_config.alloy b/internal/converter/internal/promtailconvert/testdata/limits_config.alloy index 798d0dc6509..9f23b05894d 100644 --- a/internal/converter/internal/promtailconvert/testdata/limits_config.alloy +++ b/internal/converter/internal/promtailconvert/testdata/limits_config.alloy @@ -40,4 +40,5 @@ loki.write "default" { url = "http://localhost/loki/api/v1/push" } external_labels = {} + max_streams = 100 } diff --git a/internal/converter/internal/promtailconvert/testdata/limits_config.diags b/internal/converter/internal/promtailconvert/testdata/limits_config.diags index 1faaf976a15..e4b36a192c2 100644 --- a/internal/converter/internal/promtailconvert/testdata/limits_config.diags +++ b/internal/converter/internal/promtailconvert/testdata/limits_config.diags @@ -1,3 +1,2 @@ -(Warning) limits_config.max_streams is not supported in Alloy and will be ignored in the converted configuration. (Warning) limits_config rate limiting settings (readline_rate, readline_burst, readline_rate_drop) have been converted to stage.limit stages in each loki.process component. Unlike Promtail's global rate limiter, these limits apply independently per pipeline rather than across all pipelines combined. (Warning) If you have a tracing set up for Promtail, it cannot be migrated to Alloy automatically. Refer to the documentation on how to configure tracing in Alloy. diff --git a/internal/converter/internal/promtailconvert/validate.go b/internal/converter/internal/promtailconvert/validate.go index b41e181655b..ea3f2f304c6 100644 --- a/internal/converter/internal/promtailconvert/validate.go +++ b/internal/converter/internal/promtailconvert/validate.go @@ -17,13 +17,6 @@ func validateTopLevelConfig(cfg *promtailcfg.Config, diags *diag.Diagnostics) { ) } - if cfg.LimitsConfig.MaxStreams > 0 { - diags.Add( - diag.SeverityLevelWarn, - "limits_config.max_streams is not supported in Alloy and will be ignored in the converted configuration.", - ) - } - if cfg.LimitsConfig.ReadlineRateEnabled { diags.Add( diag.SeverityLevelWarn,