Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@ import (
"time"

"github.com/grafana/alloy/internal/component/common/loki"
"github.com/grafana/alloy/internal/loki/promtail/limit"
)

type GlobalContext struct {
WriteReceivers []loki.LogsReceiver
TargetSyncPeriod time.Duration
LabelPrefix string
LimitsConfig limit.Config
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package build

import (
"fmt"

"github.com/alecthomas/units"

"github.com/grafana/alloy/internal/component/loki/process/stages"
"github.com/grafana/alloy/internal/loki/promtail/limit"
)

// buildLimitsConfigStages converts the global Promtail limits_config into
// equivalent loki.process pipeline stages.
//
// Note: limits_config.max_streams is converted separately into the max_streams
// argument on each loki.write component.
//
// The conversion is inherently approximate: Promtail's limits apply globally
// across all pipelines, whereas the returned stages are injected into each
// per-scrape-config loki.process component individually.
func buildLimitsConfigStages(cfg limit.Config) []stages.StageConfig {
var result []stages.StageConfig

if cfg.ReadlineRateEnabled {
result = append(result, stages.StageConfig{
LimitConfig: &stages.LimitConfig{
Rate: cfg.ReadlineRate,
Burst: cfg.ReadlineBurst,
Drop: cfg.ReadlineRateDrop,
},
})
}

if cfg.MaxLineSize > 0 {
lineSizeBytes, err := units.ParseBase2Bytes(fmt.Sprintf("%dB", cfg.MaxLineSize.Val()))
if err != nil {
// MaxLineSize.Val() returns an int of raw bytes, so "%dB" is always valid.
// This branch is unreachable in practice.
return result
}
if cfg.MaxLineSizeTruncate {
result = append(result, stages.StageConfig{
TruncateConfig: &stages.TruncateConfig{
Rules: []*stages.RuleConfig{
{
Limit: lineSizeBytes,
SourceType: stages.TruncateSourceLine,
},
},
},
})
} else {
result = append(result, stages.StageConfig{
DropConfig: &stages.DropConfig{
LongerThan: lineSizeBytes,
},
})
}
}

return result
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,22 +14,22 @@ import (
"github.com/grafana/alloy/syntax/token/builder"
)

func NewLokiWrite(client *client.Config, diags *diag.Diagnostics, index int, labelPrefix string) (*builder.Block, loki.LogsReceiver) {
func NewLokiWrite(client *client.Config, diags *diag.Diagnostics, index int, labelPrefix string, maxStreams int) (*builder.Block, loki.LogsReceiver) {
label := "default"
if labelPrefix != "" {
label = labelPrefix
}

lokiWriteLabel := common.LabelWithIndex(index, label)

lokiWriteArgs := toLokiWriteArguments(client, diags)
lokiWriteArgs := toLokiWriteArguments(client, diags, maxStreams)
block := common.NewBlockWithOverride([]string{"loki", "write"}, lokiWriteLabel, lokiWriteArgs)
return block, common.ConvertLogsReceiver{
Expr: fmt.Sprintf("loki.write.%s.receiver", lokiWriteLabel),
}
}

func toLokiWriteArguments(config *client.Config, diags *diag.Diagnostics) *lokiwrite.Arguments {
func toLokiWriteArguments(config *client.Config, diags *diag.Diagnostics, maxStreams int) *lokiwrite.Arguments {
batchSize, err := units.ParseBase2Bytes(fmt.Sprintf("%dB", config.BatchSize))
if err != nil {
diags.Add(
Expand Down Expand Up @@ -57,6 +57,7 @@ func toLokiWriteArguments(config *client.Config, diags *diag.Diagnostics) *lokiw
},
},
ExternalLabels: convertFlagLabels(config.ExternalLabels),
MaxStreams: maxStreams,
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,15 +123,21 @@ func (s *ScrapeConfigBuilder) getOrNewProcessStageReceivers() []loki.LogsReceive
if s.processStageReceivers != nil {
return s.processStageReceivers
}
if len(s.cfg.PipelineStages) == 0 {

globalLimitStages := buildLimitsConfigStages(s.globalCtx.LimitsConfig)

if len(s.cfg.PipelineStages) == 0 && len(globalLimitStages) == 0 {
s.processStageReceivers = s.globalCtx.WriteReceivers
return s.processStageReceivers
}

alloyStages := make([]stages.StageConfig, len(s.cfg.PipelineStages))
for i, ps := range s.cfg.PipelineStages {
// Global limit stages are prepended so they apply before any per-scrape-config
// pipeline stages, matching Promtail's behavior of applying limits before processing.
alloyStages := make([]stages.StageConfig, 0, len(globalLimitStages)+len(s.cfg.PipelineStages))
alloyStages = append(alloyStages, globalLimitStages...)
for _, ps := range s.cfg.PipelineStages {
if fs, ok := convertStage(ps, s.diags); ok {
alloyStages[i] = fs
alloyStages = append(alloyStages, fs)
}
}
args := process.Arguments{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,13 +108,14 @@ func AppendAll(f *builder.File, cfg *promtailcfg.Config, labelPrefix string, dia
// Each client config needs to be a separate remote_write,
// because they may have different ExternalLabels fields.
for i, cc := range cfg.ClientConfigs {
writeBlocks[i], writeReceivers[i] = build.NewLokiWrite(&cc, &diags, i, labelPrefix)
writeBlocks[i], writeReceivers[i] = build.NewLokiWrite(&cc, &diags, i, labelPrefix, cfg.LimitsConfig.MaxStreams)
}

gc := &build.GlobalContext{
WriteReceivers: writeReceivers,
TargetSyncPeriod: cfg.TargetConfig.SyncPeriod,
LabelPrefix: labelPrefix,
LimitsConfig: cfg.LimitsConfig,
}

for _, sc := range cfg.ScrapeConfig {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
discovery.kubernetes "example" {
role = "pod"
kubeconfig_file = "/home/toby/.kube/config"

selectors {
role = "pod"
field = "spec.nodeName=" + coalesce(sys.env("HOSTNAME"), constants.hostname)
}
}

loki.process "example" {
forward_to = [loki.write.default.receiver]

stage.limit {
rate = 1000.5
burst = 1000
drop = true
}

stage.truncate {
rule {
limit = "256KiB"
source_type = "line"
}
}
}

loki.source.file "example" {
targets = discovery.kubernetes.example.targets
forward_to = [loki.process.example.receiver]

file_match {
enabled = true
}
legacy_positions_file = "/var/log/positions.yaml"
}

loki.write "default" {
endpoint {
url = "http://localhost/loki/api/v1/push"
}
external_labels = {}
max_streams = 100
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
(Warning) limits_config rate limiting settings (readline_rate, readline_burst, readline_rate_drop) have been converted to stage.limit stages in each loki.process component. Unlike Promtail's global rate limiter, these limits apply independently per pipeline rather than across all pipelines combined.
(Warning) If you have a tracing set up for Promtail, it cannot be migrated to Alloy automatically. Refer to the documentation on how to configure tracing in Alloy.
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
limits_config:
max_line_size: 256KB
max_line_size_truncate: true
max_streams: 100
readline_rate_enabled: true
readline_rate: 1000.5
readline_burst: 1000
readline_rate_drop: true

clients:
- url: http://localhost/loki/api/v1/push

scrape_configs:
- job_name: example
kubernetes_sd_configs:
- role: pod
kubeconfig_file: /home/toby/.kube/config

server: { register_instrumentation: false }
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
(Error) Promtail's WAL is currently not supported in Alloy
(Error) limits_config is not yet supported in Alloy
(Warning) limits_config rate limiting settings (readline_rate, readline_burst, readline_rate_drop) have been converted to stage.limit stages in each loki.process component. Unlike Promtail's global rate limiter, these limits apply independently per pipeline rather than across all pipelines combined.
(Warning) If you have a tracing set up for Promtail, it cannot be migrated to Alloy automatically. Refer to the documentation on how to configure tracing in Alloy.
(Error) reading targets from stdin is not supported in Alloy configuration file
(Warning) server.profiling_enabled is not supported - use Alloy's main HTTP server's profiling endpoints instead
Expand Down
11 changes: 5 additions & 6 deletions internal/converter/internal/promtailconvert/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,12 @@ func validateTopLevelConfig(cfg *promtailcfg.Config, diags *diag.Diagnostics) {
)
}

// Not yet supported, see https://github.com/grafana/agent/issues/4342. It's an error since we want to
// err on the safe side.
//TODO(thampiotr): seems like it's possible to support this using loki.process component
if cfg.LimitsConfig != DefaultLimitsConfig() {
if cfg.LimitsConfig.ReadlineRateEnabled {
diags.Add(
diag.SeverityLevelError,
"limits_config is not yet supported in Alloy",
diag.SeverityLevelWarn,
"limits_config rate limiting settings (readline_rate, readline_burst, readline_rate_drop) have been "+
"converted to stage.limit stages in each loki.process component. Unlike Promtail's global rate "+
"limiter, these limits apply independently per pipeline rather than across all pipelines combined.",
)
}

Expand Down
Loading