Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions docs/sources/reference/components/loki/loki.source.gcplog.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ You can use the following blocks with `loki.source.gcplog`:
| Name | Description | Required |
| ------------------------------ | ----------------------------------------------------------------------------- | -------- |
| [`pull`][pull] | Configures a target to pull logs from a GCP Pub/Sub subscription. | no |
| `pull` > [`limit`][limit] | Configures Pub/Sub flow-control limits for in-flight message processing. | no |
| [`push`][push] | Configures a server to receive logs as GCP Pub/Sub push requests. | no |
| `push` > [`grpc`][grpc] | Configures the gRPC server that receives requests when using the `push` mode. | no |
| `push` > `gprc` > [`tls`][tls] | Configures TLS for the gRPC server. | no |
Expand All @@ -62,6 +63,7 @@ The `http` and `grpc` block are just used when the `push` block is configured.

[grpc]: #grpc
[http]: #http
[limit]: #limit
[pull]: #pull
[push]: #push
[tls]: #tls
Expand All @@ -81,11 +83,27 @@ Any omitted fields take their default values.
| `use_full_line` | `bool` | Send the full line from Cloud Logging even if `textPayload` is available. | `false` | no |
| `use_incoming_timestamp` | `bool` | Whether to use the incoming log timestamp. | `false` | no |

The following blocks can be used to configure the `pull` block:

| Name | Description | Required |
| ----------------- | ----------------------------------------------------------------------------- | -------- |
| [`limit`][limit] | Configures Pub/Sub flow-control limits for in-flight message processing. | no |

To make use of the `pull` strategy, the GCP project must have been [configured](/docs/loki/next/clients/promtail/gcplog-cloud/) to forward its cloud resource logs onto a Pub/Sub topic for `loki.source.gcplog` to consume.

Typically, the host system also needs to have its GCP [credentials](https://cloud.google.com/docs/authentication/application-default-credentials) configured.
One way to do it, is to point the `GOOGLE_APPLICATION_CREDENTIALS` environment variable to the location of a credential configuration JSON file or a service account key.

### `limit`

The `limit` block configures flow-control budgets for unprocessed Pub/Sub messages.
When either budget is reached, Pub/Sub delivery is throttled, which limits concurrent in-flight message handling.

| Name | Type | Description | Default | Required |
| -------------------------- | -------- | ----------------------------------------------------------------------------------------- | ------- | -------- |
| `max_outstanding_bytes` | `string` | Byte budget for unprocessed messages, hitting this budget throttles delivery concurrency. | `"1GiB"`| no |
| `max_outstanding_messages` | `int` | Count budget for unprocessed messages, hitting this budget caps in-flight concurrency. | `1000` | no |

### `push`

The `push` block defines the configuration of the server that receives push requests from the GCP Pub/Sub servers.
Expand Down
27 changes: 27 additions & 0 deletions internal/component/loki/source/gcplog/gcptypes/gcptypes.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"fmt"
"time"

"github.com/alecthomas/units"

fnet "github.com/grafana/alloy/internal/component/common/net"
)

Expand All @@ -14,6 +16,31 @@ type PullConfig struct {
Labels map[string]string `alloy:"labels,attr,optional"`
UseIncomingTimestamp bool `alloy:"use_incoming_timestamp,attr,optional"`
UseFullLine bool `alloy:"use_full_line,attr,optional"`
Limit LimitConfig `alloy:"limit,block,optional"`
}

func (p *PullConfig) SetToDefault() {
p.Limit.SetToDefault()
}

type LimitConfig struct {
// MaxOutstandingBytes sets the byte budget for unprocessed messages.
// Hitting this budget throttles delivery and limits concurrent in-flight handling.
MaxOutstandingBytes units.Base2Bytes `alloy:"max_outstanding_bytes,attr,optional"`
// MaxOutstandingMessages sets the count budget for unprocessed messages.
// Hitting this budget stops further delivery and limits concurrent in-flight handling.
MaxOutstandingMessages int `alloy:"max_outstanding_messages,attr,optional"`
}

var DefaultLimitConfig = LimitConfig{
// Default from https://github.com/googleapis/google-cloud-go/blob/df64147605e961803c7ea839bc080ffd1b814ac9/pubsub/v2/subscriber.go#L172
MaxOutstandingBytes: 1 * units.GiB,
// Default from https://github.com/googleapis/google-cloud-go/blob/df64147605e961803c7ea839bc080ffd1b814ac9/pubsub/v2/subscriber.go#L171
MaxOutstandingMessages: 1000,
}

func (l *LimitConfig) SetToDefault() {
*l = DefaultLimitConfig
}

// PushConfig configures a GCPLog target with the 'push' strategy.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ type PullTarget struct {
backoff *backoff.Backoff

// pubsub
ps io.Closer
sub pubsubSubscription
client io.Closer
subscriber subscriber
}

// TODO(@tpaschalis) Expose this as Alloy configuration in the future.
Expand All @@ -52,8 +52,8 @@ var defaultBackoff = backoff.Config{
MaxRetries: 0, // Retry forever
}

// pubsubSubscription allows us to mock pubsub for testing
type pubsubSubscription interface {
// subscriber allows us to mock pubsub for testing
type subscriber interface {
Receive(ctx context.Context, f func(context.Context, *pubsub.Message)) error
}

Expand All @@ -68,12 +68,16 @@ func NewPullTarget(
) (*PullTarget, error) {

ctx, cancel := context.WithCancel(context.Background())
ps, err := pubsub.NewClient(ctx, config.ProjectID, clientOptions...)
client, err := pubsub.NewClient(ctx, config.ProjectID, clientOptions...)
if err != nil {
cancel()
return nil, err
}

subscriber := client.Subscriber(config.Subscription)
subscriber.ReceiveSettings.MaxOutstandingBytes = int(config.Limit.MaxOutstandingBytes)
subscriber.ReceiveSettings.MaxOutstandingMessages = config.Limit.MaxOutstandingMessages

return &PullTarget{
metrics: metrics,
logger: logger,
Expand All @@ -82,8 +86,8 @@ func NewPullTarget(
config: config,
ctx: ctx,
cancel: cancel,
ps: ps,
sub: ps.Subscriber(config.Subscription),
client: client,
subscriber: subscriber,
backoff: backoff.New(ctx, defaultBackoff),
}, nil
}
Expand All @@ -93,7 +97,7 @@ func (t *PullTarget) Run() error {
lbls := t.labels()

for t.backoff.Ongoing() {
err := t.sub.Receive(t.ctx, func(ctx context.Context, m *pubsub.Message) {
err := t.subscriber.Receive(t.ctx, func(ctx context.Context, m *pubsub.Message) {
entry, err := parseLogEntry(m.Data, labels.NewBuilder(labels.EmptyLabels()), t.relabelConfig, parseOptions{
fixedLabels: lbls,
useFullLine: t.config.UseFullLine,
Expand Down Expand Up @@ -132,7 +136,7 @@ func (t *PullTarget) Run() error {
func (t *PullTarget) Stop() {
t.cancel()
t.wg.Wait()
t.ps.Close()
t.client.Close()
}

func (t *PullTarget) labels() model.LabelSet {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,15 +93,15 @@ func testPullTarget(t *testing.T, recv loki.LogsReceiver) *testContext {
ctx, cancel := context.WithCancel(t.Context())
sub := newFakeSubscription()
target := &PullTarget{
metrics: NewMetrics(prometheus.NewRegistry()),
logger: log.NewNopLogger(),
recv: recv,
ctx: ctx,
cancel: cancel,
config: testConfig,
ps: io.NopCloser(nil),
sub: sub,
backoff: backoff.New(ctx, testBackoff),
metrics: NewMetrics(prometheus.NewRegistry()),
logger: log.NewNopLogger(),
recv: recv,
ctx: ctx,
cancel: cancel,
config: testConfig,
client: io.NopCloser(nil),
subscriber: sub,
backoff: backoff.New(ctx, testBackoff),
}

return &testContext{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,13 @@ func (s *ScrapeConfigBuilder) AppendGCPLog() {
cfg := s.cfg.GcplogConfig
switch cfg.SubscriptionType {
case "", "pull":
pullConfig = &gcptypes.PullConfig{
ProjectID: cfg.ProjectID,
Subscription: cfg.Subscription,
Labels: convertPromLabels(cfg.Labels),
UseIncomingTimestamp: cfg.UseIncomingTimestamp,
UseFullLine: cfg.UseFullLine,
}
pullConfig = &gcptypes.PullConfig{}
pullConfig.SetToDefault()
pullConfig.ProjectID = cfg.ProjectID
pullConfig.Subscription = cfg.Subscription
pullConfig.Labels = convertPromLabels(cfg.Labels)
pullConfig.UseIncomingTimestamp = cfg.UseIncomingTimestamp
pullConfig.UseFullLine = cfg.UseFullLine
case "push":
s.diags.AddAll(common.ValidateWeaveWorksServerCfg(cfg.Server))
alloyServer := common.WeaveworksServerToAlloyServer(cfg.Server)
Expand Down
Loading