diff --git a/docs/sources/reference/components/loki/loki.source.gcplog.md b/docs/sources/reference/components/loki/loki.source.gcplog.md index 03b51772f5c..c40823ba6fa 100644 --- a/docs/sources/reference/components/loki/loki.source.gcplog.md +++ b/docs/sources/reference/components/loki/loki.source.gcplog.md @@ -47,6 +47,7 @@ You can use the following blocks with `loki.source.gcplog`: | Name | Description | Required | | ------------------------------ | ----------------------------------------------------------------------------- | -------- | | [`pull`][pull] | Configures a target to pull logs from a GCP Pub/Sub subscription. | no | +| `pull` > [`limit`][limit] | Configures Pub/Sub flow-control limits for in-flight message processing. | no | | [`push`][push] | Configures a server to receive logs as GCP Pub/Sub push requests. | no | | `push` > [`grpc`][grpc] | Configures the gRPC server that receives requests when using the `push` mode. | no | | `push` > `gprc` > [`tls`][tls] | Configures TLS for the gRPC server. | no | @@ -62,6 +63,7 @@ The `http` and `grpc` block are just used when the `push` block is configured. [grpc]: #grpc [http]: #http +[limit]: #limit [pull]: #pull [push]: #push [tls]: #tls @@ -81,11 +83,27 @@ Any omitted fields take their default values. | `use_full_line` | `bool` | Send the full line from Cloud Logging even if `textPayload` is available. | `false` | no | | `use_incoming_timestamp` | `bool` | Whether to use the incoming log timestamp. | `false` | no | +The following blocks can be used to configure the `pull` block: + +| Name | Description | Required | +| ----------------- | ----------------------------------------------------------------------------- | -------- | +| [`limit`][limit] | Configures Pub/Sub flow-control limits for in-flight message processing. | no | + To make use of the `pull` strategy, the GCP project must have been [configured](/docs/loki/next/clients/promtail/gcplog-cloud/) to forward its cloud resource logs onto a Pub/Sub topic for `loki.source.gcplog` to consume. Typically, the host system also needs to have its GCP [credentials](https://cloud.google.com/docs/authentication/application-default-credentials) configured. One way to do it, is to point the `GOOGLE_APPLICATION_CREDENTIALS` environment variable to the location of a credential configuration JSON file or a service account key. +### `limit` + +The `limit` block configures flow-control budgets for unprocessed Pub/Sub messages. +When either budget is reached, Pub/Sub delivery is throttled, which limits concurrent in-flight message handling. + +| Name | Type | Description | Default | Required | +| -------------------------- | -------- | ----------------------------------------------------------------------------------------- | ------- | -------- | +| `max_outstanding_bytes` | `string` | Byte budget for unprocessed messages, hitting this budget throttles delivery concurrency. | `"1GiB"`| no | +| `max_outstanding_messages` | `int` | Count budget for unprocessed messages, hitting this budget caps in-flight concurrency. | `1000` | no | + ### `push` The `push` block defines the configuration of the server that receives push requests from the GCP Pub/Sub servers. diff --git a/internal/component/loki/source/gcplog/gcptypes/gcptypes.go b/internal/component/loki/source/gcplog/gcptypes/gcptypes.go index b4e0156c8e5..c871038132a 100644 --- a/internal/component/loki/source/gcplog/gcptypes/gcptypes.go +++ b/internal/component/loki/source/gcplog/gcptypes/gcptypes.go @@ -4,6 +4,8 @@ import ( "fmt" "time" + "github.com/alecthomas/units" + fnet "github.com/grafana/alloy/internal/component/common/net" ) @@ -14,6 +16,31 @@ type PullConfig struct { Labels map[string]string `alloy:"labels,attr,optional"` UseIncomingTimestamp bool `alloy:"use_incoming_timestamp,attr,optional"` UseFullLine bool `alloy:"use_full_line,attr,optional"` + Limit LimitConfig `alloy:"limit,block,optional"` +} + +func (p *PullConfig) SetToDefault() { + p.Limit.SetToDefault() +} + +type LimitConfig struct { + // MaxOutstandingBytes sets the byte budget for unprocessed messages. + // Hitting this budget throttles delivery and limits concurrent in-flight handling. + MaxOutstandingBytes units.Base2Bytes `alloy:"max_outstanding_bytes,attr,optional"` + // MaxOutstandingMessages sets the count budget for unprocessed messages. + // Hitting this budget stops further delivery and limits concurrent in-flight handling. + MaxOutstandingMessages int `alloy:"max_outstanding_messages,attr,optional"` +} + +var DefaultLimitConfig = LimitConfig{ + // Default from https://github.com/googleapis/google-cloud-go/blob/df64147605e961803c7ea839bc080ffd1b814ac9/pubsub/v2/subscriber.go#L172 + MaxOutstandingBytes: 1 * units.GiB, + // Default from https://github.com/googleapis/google-cloud-go/blob/df64147605e961803c7ea839bc080ffd1b814ac9/pubsub/v2/subscriber.go#L171 + MaxOutstandingMessages: 1000, +} + +func (l *LimitConfig) SetToDefault() { + *l = DefaultLimitConfig } // PushConfig configures a GCPLog target with the 'push' strategy. diff --git a/internal/component/loki/source/gcplog/internal/gcplogtarget/pull_target.go b/internal/component/loki/source/gcplog/internal/gcplogtarget/pull_target.go index 56739b2722b..1a6fc31b868 100644 --- a/internal/component/loki/source/gcplog/internal/gcplogtarget/pull_target.go +++ b/internal/component/loki/source/gcplog/internal/gcplogtarget/pull_target.go @@ -41,8 +41,8 @@ type PullTarget struct { backoff *backoff.Backoff // pubsub - ps io.Closer - sub pubsubSubscription + client io.Closer + subscriber subscriber } // TODO(@tpaschalis) Expose this as Alloy configuration in the future. @@ -52,8 +52,8 @@ var defaultBackoff = backoff.Config{ MaxRetries: 0, // Retry forever } -// pubsubSubscription allows us to mock pubsub for testing -type pubsubSubscription interface { +// subscriber allows us to mock pubsub for testing +type subscriber interface { Receive(ctx context.Context, f func(context.Context, *pubsub.Message)) error } @@ -68,12 +68,16 @@ func NewPullTarget( ) (*PullTarget, error) { ctx, cancel := context.WithCancel(context.Background()) - ps, err := pubsub.NewClient(ctx, config.ProjectID, clientOptions...) + client, err := pubsub.NewClient(ctx, config.ProjectID, clientOptions...) if err != nil { cancel() return nil, err } + subscriber := client.Subscriber(config.Subscription) + subscriber.ReceiveSettings.MaxOutstandingBytes = int(config.Limit.MaxOutstandingBytes) + subscriber.ReceiveSettings.MaxOutstandingMessages = config.Limit.MaxOutstandingMessages + return &PullTarget{ metrics: metrics, logger: logger, @@ -82,8 +86,8 @@ func NewPullTarget( config: config, ctx: ctx, cancel: cancel, - ps: ps, - sub: ps.Subscriber(config.Subscription), + client: client, + subscriber: subscriber, backoff: backoff.New(ctx, defaultBackoff), }, nil } @@ -93,7 +97,7 @@ func (t *PullTarget) Run() error { lbls := t.labels() for t.backoff.Ongoing() { - err := t.sub.Receive(t.ctx, func(ctx context.Context, m *pubsub.Message) { + err := t.subscriber.Receive(t.ctx, func(ctx context.Context, m *pubsub.Message) { entry, err := parseLogEntry(m.Data, labels.NewBuilder(labels.EmptyLabels()), t.relabelConfig, parseOptions{ fixedLabels: lbls, useFullLine: t.config.UseFullLine, @@ -132,7 +136,7 @@ func (t *PullTarget) Run() error { func (t *PullTarget) Stop() { t.cancel() t.wg.Wait() - t.ps.Close() + t.client.Close() } func (t *PullTarget) labels() model.LabelSet { diff --git a/internal/component/loki/source/gcplog/internal/gcplogtarget/pull_target_test.go b/internal/component/loki/source/gcplog/internal/gcplogtarget/pull_target_test.go index 9d1f3fe4336..b2a7a782b9f 100644 --- a/internal/component/loki/source/gcplog/internal/gcplogtarget/pull_target_test.go +++ b/internal/component/loki/source/gcplog/internal/gcplogtarget/pull_target_test.go @@ -93,15 +93,15 @@ func testPullTarget(t *testing.T, recv loki.LogsReceiver) *testContext { ctx, cancel := context.WithCancel(t.Context()) sub := newFakeSubscription() target := &PullTarget{ - metrics: NewMetrics(prometheus.NewRegistry()), - logger: log.NewNopLogger(), - recv: recv, - ctx: ctx, - cancel: cancel, - config: testConfig, - ps: io.NopCloser(nil), - sub: sub, - backoff: backoff.New(ctx, testBackoff), + metrics: NewMetrics(prometheus.NewRegistry()), + logger: log.NewNopLogger(), + recv: recv, + ctx: ctx, + cancel: cancel, + config: testConfig, + client: io.NopCloser(nil), + subscriber: sub, + backoff: backoff.New(ctx, testBackoff), } return &testContext{ diff --git a/internal/converter/internal/promtailconvert/internal/build/gcplog.go b/internal/converter/internal/promtailconvert/internal/build/gcplog.go index 07cf47b8eda..1f84986cbea 100644 --- a/internal/converter/internal/promtailconvert/internal/build/gcplog.go +++ b/internal/converter/internal/promtailconvert/internal/build/gcplog.go @@ -21,13 +21,13 @@ func (s *ScrapeConfigBuilder) AppendGCPLog() { cfg := s.cfg.GcplogConfig switch cfg.SubscriptionType { case "", "pull": - pullConfig = &gcptypes.PullConfig{ - ProjectID: cfg.ProjectID, - Subscription: cfg.Subscription, - Labels: convertPromLabels(cfg.Labels), - UseIncomingTimestamp: cfg.UseIncomingTimestamp, - UseFullLine: cfg.UseFullLine, - } + pullConfig = &gcptypes.PullConfig{} + pullConfig.SetToDefault() + pullConfig.ProjectID = cfg.ProjectID + pullConfig.Subscription = cfg.Subscription + pullConfig.Labels = convertPromLabels(cfg.Labels) + pullConfig.UseIncomingTimestamp = cfg.UseIncomingTimestamp + pullConfig.UseFullLine = cfg.UseFullLine case "push": s.diags.AddAll(common.ValidateWeaveWorksServerCfg(cfg.Server)) alloyServer := common.WeaveworksServerToAlloyServer(cfg.Server)