diff --git a/docs/sources/reference/components/loki/loki.write.md b/docs/sources/reference/components/loki/loki.write.md index 0ac70692258..be68b4aae17 100644 --- a/docs/sources/reference/components/loki/loki.write.md +++ b/docs/sources/reference/components/loki/loki.write.md @@ -46,7 +46,7 @@ You can use the following blocks with `loki.write`: | `endpoint` > [`basic_auth`][basic_auth] | Configure `basic_auth` for authenticating to the endpoint. | no | | `endpoint` > [`oauth2`][oauth2] | Configure OAuth 2.0 for authenticating to the endpoint. | no | | `endpoint` > `oauth2` > [`tls_config`][tls_config] | Configure TLS settings for connecting to the endpoint. | no | -| `endpoint` > [`queue_config`][queue_config] | When WAL is enabled, configures the queue client. | no | +| `endpoint` > [`queue_config`][queue_config] | Configure the queue used for the endpoint. | no | | `endpoint` > [`tls_config`][tls_config] | Configure TLS settings for connecting to the endpoint. | no | | [`wal`][wal] | Write-ahead log configuration. | no | @@ -104,8 +104,9 @@ The following arguments are supported: If no `tenant_id` is provided, the component assumes that the Loki instance at `endpoint` is running in single-tenant mode and no X-Scope-OrgID header is sent. When multiple `endpoint` blocks are provided, the `loki.write` component creates a client for each. -Received log entries are fanned-out to these clients in succession. -That means that if one client is bottlenecked, it may impact the rest. +Received log entries are fanned-out to these endpoints in succession. That means that if one endpoint is bottlenecked, it may impact the rest. + +Each endpoint has a _queue_ of batches to be sent. The `queue_config` block can be used to customize the behavior of this queue. Endpoints can be named for easier identification in debug metrics by using the `name` argument. If the `name` argument isn't provided, a name is generated based on a hash of the endpoint settings. @@ -129,8 +130,7 @@ When `retry_on_http_429` is enabled, the retry mechanism is governed by the back {{< docs/shared lookup="stability/experimental_feature.md" source="alloy" version="" >}} -The optional `queue_config` block configures, when WAL is enabled, how the underlying client queues batches of logs sent to Loki. -Refer to [Write-Ahead block](#wal) for more information. +The optional `queue_config` block configures how the endpoint queues batches of logs sent to Loki. The following arguments are supported: @@ -138,6 +138,13 @@ The following arguments are supported: | --------------- | ---------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------- | -------- | | `capacity` | `string` | Controls the size of the underlying send queue buffer. This setting should be considered a worst-case scenario of memory consumption, in which all enqueued batches are full. | `10MiB` | no | | `drain_timeout` | `duration` | Configures the maximum time the client can take to drain the send queue upon shutdown. During that time, it enqueues pending batches and drains the send queue sending each. | `"1m"` | no | +| `min_shards` | `number` | Minimum number of concurrent shards sending samples to the endpoint. | `1` | no | + +Each endpoint is divided into a number of concurrent _shards_ which are responsible for sending a fraction of batches. The number of shards is controlled with `min_shards` argument. +Each shard has a queue of batches it keeps in memory, controlled with the `capacity` argument. + +Queue size is calculated using `batch_size` and `capacity` for each shard. So if `batch_size` is 1MiB and `capacity` is 10MiB each shard would be able to queue up 10 batches. +The maximum amount of memory required for all configured shards can be calculated using `capacity` * `min_shards`. ### `tls_config` diff --git a/internal/component/common/loki/client/config.go b/internal/component/common/loki/client/config.go index 270e47c0139..c3c7d7f05ad 100644 --- a/internal/component/common/loki/client/config.go +++ b/internal/component/common/loki/client/config.go @@ -33,11 +33,11 @@ type Config struct { // prevent HOL blocking in multitenant deployments. DropRateLimitedBatches bool - // Queue controls configuration parameters specific to the queue client - Queue QueueConfig + // QueueConfig controls how shards and queues are configured for endpoint. + QueueConfig QueueConfig } -// QueueConfig holds configurations for the queue-based remote-write client. +// QueueConfig controls how shards and queues are configured for endpoints. type QueueConfig struct { // Capacity is the worst case size in bytes desired for the send queue. This value is used to calculate the size of // the buffered channel used underneath. The worst case scenario assumed is that every batch buffered in full, hence @@ -47,6 +47,9 @@ type QueueConfig struct { // is the 1 MiB default, and a capacity of 100 MiB, the underlying buffered channel would buffer up to 100 batches. Capacity int - // DrainTimeout controls the maximum time that draining the send queue can take. + // MinShards is the minimum number of concurrent shards sending batches to the endpoint. + MinShards int + + // DrainTimeout controls the maximum time that draining the queue can take. DrainTimeout time.Duration } diff --git a/internal/component/common/loki/client/consumer_fanout.go b/internal/component/common/loki/client/consumer_fanout.go index 6a7fadf3804..458eb97e298 100644 --- a/internal/component/common/loki/client/consumer_fanout.go +++ b/internal/component/common/loki/client/consumer_fanout.go @@ -1,58 +1,45 @@ package client import ( - "bufio" - "bytes" - "context" - "crypto/sha256" - "errors" "fmt" - "io" - "net/http" - "strconv" "sync" - "time" "github.com/go-kit/log" "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/common/config" - "github.com/prometheus/common/model" "github.com/grafana/alloy/internal/component/common/loki" - "github.com/grafana/alloy/internal/runtime/logging/level" - "github.com/grafana/alloy/internal/useragent" - "github.com/grafana/dskit/backoff" + "github.com/grafana/alloy/internal/component/common/loki/client/internal" ) -func NewFanoutConsumer(logger log.Logger, reg prometheus.Registerer, clientCfgs ...Config) (*FanoutConsumer, error) { - if len(clientCfgs) == 0 { - return nil, fmt.Errorf("at least one client config must be provided") +func NewFanoutConsumer(logger log.Logger, reg prometheus.Registerer, cfgs ...Config) (*FanoutConsumer, error) { + if len(cfgs) == 0 { + return nil, fmt.Errorf("at least one endpoint config must be provided") } m := &FanoutConsumer{ - clients: make([]*client, 0, len(clientCfgs)), - recv: make(chan loki.Entry), + endpoints: make([]*endpoint, 0, len(cfgs)), + recv: make(chan loki.Entry), } var ( - metrics = NewMetrics(reg) - clientsCheck = make(map[string]struct{}) + metrics = newMetrics(reg) + endpointsCheck = make(map[string]struct{}) ) - for _, cfg := range clientCfgs { - // Don't allow duplicate clients, we have client specific metrics that need at least one unique label value (name). - clientName := getClientName(cfg) - if _, ok := clientsCheck[clientName]; ok { - return nil, fmt.Errorf("duplicate client configs are not allowed, found duplicate for name: %s", cfg.Name) + for _, cfg := range cfgs { + // Don't allow duplicate endpoints, we have endpoint specific metrics that need at least one unique label value (name). + name := getEndpointName(cfg) + if _, ok := endpointsCheck[name]; ok { + return nil, fmt.Errorf("duplicate endpoint configs are not allowed, found duplicate for name: %s", cfg.Name) } - clientsCheck[clientName] = struct{}{} - client, err := newClient(metrics, cfg, logger) + endpointsCheck[name] = struct{}{} + endpoint, err := newEndpoint(metrics, cfg, logger, internal.NewNopMarkerHandler()) if err != nil { - return nil, fmt.Errorf("error starting client: %w", err) + return nil, fmt.Errorf("error starting endpoint: %w", err) } - m.clients = append(m.clients, client) + m.endpoints = append(m.endpoints, endpoint) } m.wg.Go(m.run) @@ -62,16 +49,16 @@ func NewFanoutConsumer(logger log.Logger, reg prometheus.Registerer, clientCfgs var _ Consumer = (*FanoutConsumer)(nil) type FanoutConsumer struct { - clients []*client - wg sync.WaitGroup - once sync.Once - recv chan loki.Entry + endpoints []*endpoint + wg sync.WaitGroup + once sync.Once + recv chan loki.Entry } func (c *FanoutConsumer) run() { for e := range c.recv { - for _, c := range c.clients { - c.Chan() <- e + for _, c := range c.endpoints { + c.enqueue(e, 0) } } } @@ -86,328 +73,13 @@ func (c *FanoutConsumer) Stop() { c.wg.Wait() var stopWG sync.WaitGroup - // Stop all clients. - for _, c := range c.clients { + // Stop all endpoints. + for _, c := range c.endpoints { stopWG.Go(func() { - c.Stop() + c.stop() }) } - // Wait for all clients to stop. + // Wait for all endpoints to stop. stopWG.Wait() } - -// getClientName computes the specific name for each client config. The name is either the configured Name setting in Config, -// or a hash of the config as whole, this allows us to detect repeated configs. -func getClientName(cfg Config) string { - if cfg.Name != "" { - return cfg.Name - } - return asSha256(cfg) -} - -func asSha256(o interface{}) string { - h := sha256.New() - _, _ = fmt.Fprintf(h, "%v", o) - - temp := fmt.Sprintf("%x", h.Sum(nil)) - return temp[:6] -} - -const ( - contentType = "application/x-protobuf" - maxErrMsgLen = 1024 - - // Label reserved to override the tenant ID while processing - // pipeline stages - ReservedLabelTenantID = "__tenant_id__" -) - -var userAgent = useragent.Get() - -// Client for pushing logs in snappy-compressed protos over HTTP. -type client struct { - metrics *Metrics - logger log.Logger - cfg Config - client *http.Client - entries chan loki.Entry - - once sync.Once - wg sync.WaitGroup - - // ctx is used in any upstream calls from the `client`. - ctx context.Context - cancel context.CancelFunc -} - -func newClient(metrics *Metrics, cfg Config, logger log.Logger) (*client, error) { - if cfg.URL.URL == nil { - return nil, errors.New("client needs target URL") - } - if metrics == nil { - return nil, errors.New("metrics must be instantiated") - } - - ctx, cancel := context.WithCancel(context.Background()) - - c := &client{ - logger: log.With(logger, "component", "client", "host", cfg.URL.Host), - cfg: cfg, - entries: make(chan loki.Entry), - metrics: metrics, - ctx: ctx, - cancel: cancel, - } - - err := cfg.Client.Validate() - if err != nil { - return nil, err - } - - c.client, err = config.NewClientFromConfig(cfg.Client, useragent.ProductName, config.WithHTTP2Disabled()) - if err != nil { - return nil, err - } - - c.client.Timeout = cfg.Timeout - - c.wg.Go(func() { c.run() }) - return c, nil -} - -func (c *client) initBatchMetrics(tenantID string) { - // Initialize counters to 0 so the metrics are exported before the first - // occurrence of incrementing to avoid missing metrics. - for _, counter := range c.metrics.countersWithHostTenantReason { - for _, reason := range Reasons { - counter.WithLabelValues(c.cfg.URL.Host, tenantID, reason).Add(0) - } - } - - for _, counter := range c.metrics.countersWithHostTenant { - counter.WithLabelValues(c.cfg.URL.Host, tenantID).Add(0) - } -} - -func (c *client) run() { - batches := map[string]*batch{} - - // Given the client handles multiple batches (1 per tenant) and each batch - // can be created at a different point in time, we look for batches whose - // max wait time has been reached every 10 times per BatchWait, so that the - // maximum delay we have sending batches is 10% of the max waiting time. - // We apply a cap of 10ms to the ticker, to avoid too frequent checks in - // case the BatchWait is very low. - minWaitCheckFrequency := 10 * time.Millisecond - maxWaitCheckFrequency := max(c.cfg.BatchWait/10, minWaitCheckFrequency) - - maxWaitCheck := time.NewTicker(maxWaitCheckFrequency) - - defer func() { - maxWaitCheck.Stop() - // Send all pending batches - for tenantID, batch := range batches { - c.sendBatch(tenantID, batch) - } - }() - - for { - select { - case e, ok := <-c.entries: - if !ok { - return - } - - e, tenantID := c.processEntry(e) - batch, ok := batches[tenantID] - - // If the batch doesn't exist yet, we create a new one with the entry - if !ok { - batches[tenantID] = newBatch(c.cfg.MaxStreams, e) - c.initBatchMetrics(tenantID) - break - } - - // If adding the entry to the batch will increase the size over the max - // size allowed, we do send the current batch and then create a new one - if batch.sizeBytesAfter(e.Entry) > c.cfg.BatchSize { - c.sendBatch(tenantID, batch) - - batches[tenantID] = newBatch(c.cfg.MaxStreams, e) - break - } - - // The max size of the batch isn't reached, so we can add the entry - err := batch.add(e, 0) - if err != nil { - level.Error(c.logger).Log("msg", "batch add err", "tenant", tenantID, "error", err) - reason := ReasonGeneric - if errors.Is(err, errMaxStreamsLimitExceeded) { - reason = ReasonStreamLimited - } - c.metrics.droppedBytes.WithLabelValues(c.cfg.URL.Host, tenantID, reason).Add(float64(len(e.Line))) - c.metrics.droppedEntries.WithLabelValues(c.cfg.URL.Host, tenantID, reason).Inc() - return - } - case <-maxWaitCheck.C: - // Send all batches whose max wait time has been reached - for tenantID, batch := range batches { - if batch.age() < c.cfg.BatchWait { - continue - } - - c.sendBatch(tenantID, batch) - delete(batches, tenantID) - } - } - } -} - -func (c *client) Chan() chan<- loki.Entry { - return c.entries -} - -func batchIsRateLimited(status int) bool { - return status == 429 -} - -func (c *client) sendBatch(tenantID string, batch *batch) { - buf, entriesCount, err := batch.encode() - if err != nil { - level.Error(c.logger).Log("msg", "error encoding batch", "error", err) - return - } - bufBytes := float64(len(buf)) - c.metrics.encodedBytes.WithLabelValues(c.cfg.URL.Host, tenantID).Add(bufBytes) - - backoff := backoff.New(c.ctx, c.cfg.BackoffConfig) - var status int - for { - start := time.Now() - // send uses `timeout` internally, so `context.Background` is good enough. - status, err = c.send(context.Background(), tenantID, buf) - - c.metrics.requestDuration.WithLabelValues(strconv.Itoa(status), c.cfg.URL.Host, tenantID).Observe(time.Since(start).Seconds()) - - // Immediately drop rate limited batches to avoid HOL blocking for other tenants not experiencing throttling - if c.cfg.DropRateLimitedBatches && batchIsRateLimited(status) { - level.Warn(c.logger).Log("msg", "dropping batch due to rate limiting applied at ingester") - c.metrics.droppedBytes.WithLabelValues(c.cfg.URL.Host, tenantID, ReasonRateLimited).Add(bufBytes) - c.metrics.droppedEntries.WithLabelValues(c.cfg.URL.Host, tenantID, ReasonRateLimited).Add(float64(entriesCount)) - return - } - - if err == nil { - c.metrics.sentBytes.WithLabelValues(c.cfg.URL.Host, tenantID).Add(bufBytes) - c.metrics.sentEntries.WithLabelValues(c.cfg.URL.Host, tenantID).Add(float64(entriesCount)) - return - } - - // Only retry 429s, 500s and connection-level errors. - if status > 0 && !batchIsRateLimited(status) && status/100 != 5 { - break - } - - level.Debug(c.logger).Log("msg", "error sending batch, will retry", "status", status, "tenant", tenantID, "error", err) - c.metrics.batchRetries.WithLabelValues(c.cfg.URL.Host, tenantID).Inc() - backoff.Wait() - - // Make sure it sends at least once before checking for retry. - if !backoff.Ongoing() { - break - } - } - - level.Error(c.logger).Log("msg", "final error sending batch, no retries left, dropping data", "status", status, "tenant", tenantID, "error", err) - // If the reason for the last retry error was rate limiting, count the drops as such, even if the previous errors - // were for a different reason - dropReason := ReasonGeneric - if batchIsRateLimited(status) { - dropReason = ReasonRateLimited - } - c.metrics.droppedBytes.WithLabelValues(c.cfg.URL.Host, tenantID, dropReason).Add(bufBytes) - c.metrics.droppedEntries.WithLabelValues(c.cfg.URL.Host, tenantID, dropReason).Add(float64(entriesCount)) -} - -func (c *client) send(ctx context.Context, tenantID string, buf []byte) (int, error) { - ctx, cancel := context.WithTimeout(ctx, c.cfg.Timeout) - defer cancel() - req, err := http.NewRequestWithContext(ctx, "POST", c.cfg.URL.String(), bytes.NewReader(buf)) - if err != nil { - return -1, err - } - req.Header.Set("Content-Type", contentType) - req.Header.Set("User-Agent", userAgent) - - // If the tenant ID is not empty promtail is running in multi-tenant mode, so - // we should send it to Loki - if tenantID != "" { - req.Header.Set("X-Scope-OrgID", tenantID) - } - - // Add custom headers on request - if len(c.cfg.Headers) > 0 { - for k, v := range c.cfg.Headers { - if req.Header.Get(k) == "" { - req.Header.Add(k, v) - } else { - level.Warn(c.logger).Log("msg", "custom header key already exists, skipping", "key", k) - } - } - } - - resp, err := c.client.Do(req) - if err != nil { - return -1, err - } - - // NOTE: it is important in go to fully read the body and - // close it so that the connection can be reused. - // We only partially read the body if we encounter a non 2xx error - // so we should always consume whats left. - // https://github.com/golang/go/blob/32a9804c7ba3f4a0e0bd26cc24b9204860a49ec8/src/net/http/response.go#L59-L64 - // It is unclear that we always need to drain the body but - // https://github.com/golang/go/issues/60240#issuecomment-1551060433 seems to indicate that we should. - defer func() { - _, _ = io.Copy(io.Discard, resp.Body) - _ = resp.Body.Close() - }() - - if resp.StatusCode/100 != 2 { - scanner := bufio.NewScanner(io.LimitReader(resp.Body, maxErrMsgLen)) - line := "" - if scanner.Scan() { - line = scanner.Text() - } - err = fmt.Errorf("server returned HTTP status %s (%d): %s", resp.Status, resp.StatusCode, line) - } - return resp.StatusCode, err -} - -func (c *client) getTenantID(labels model.LabelSet) string { - // Check if it has been overridden while processing the pipeline stages - if value, ok := labels[ReservedLabelTenantID]; ok { - return string(value) - } - - // Check if has been specified in the config - if c.cfg.TenantID != "" { - return c.cfg.TenantID - } - - // Defaults to an empty string, which means the X-Scope-OrgID header - // will not be sent - return "" -} - -// Stop the client. -func (c *client) Stop() { - c.once.Do(func() { close(c.entries) }) - c.wg.Wait() -} - -func (c *client) processEntry(e loki.Entry) (loki.Entry, string) { - tenantID := c.getTenantID(e.Labels) - return e, tenantID -} diff --git a/internal/component/common/loki/client/consumer_fanout_test.go b/internal/component/common/loki/client/consumer_fanout_test.go index e3094149a21..db95abff999 100644 --- a/internal/component/common/loki/client/consumer_fanout_test.go +++ b/internal/component/common/loki/client/consumer_fanout_test.go @@ -5,7 +5,6 @@ import ( "fmt" "net/http" "net/url" - "strings" "testing" "time" @@ -14,10 +13,7 @@ import ( "github.com/grafana/dskit/flagext" "github.com/grafana/loki/pkg/push" "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/testutil" - "github.com/prometheus/common/config" "github.com/prometheus/common/model" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/grafana/alloy/internal/component/common/loki" @@ -25,9 +21,9 @@ import ( ) func TestFanoutConsumer(t *testing.T) { - testClientConfig, rwReceivedReqs, closeServer := newServerAndClientConfig(t) + testEndpointConfig, rwReceivedReqs, closeServer := newServerAndEndpointConfig(t) - consumer, err := NewFanoutConsumer(log.NewNopLogger(), prometheus.NewRegistry(), testClientConfig) + consumer, err := NewFanoutConsumer(log.NewNopLogger(), prometheus.NewRegistry(), testEndpointConfig) require.NoError(t, err) receivedRequests := util.NewSyncSlice[util.RemoteWriteRequest]() @@ -61,7 +57,7 @@ func TestFanoutConsumer(t *testing.T) { }, 5*time.Second, time.Second, "timed out waiting for requests to be received") var seenEntries = map[string]struct{}{} - // assert over rw client received entries + // assert over rw received entries defer receivedRequests.DoneIterate() for _, req := range receivedRequests.StartIterate() { require.Len(t, req.Request.Streams, 1, "expected 1 stream requests to be received") @@ -73,12 +69,12 @@ func TestFanoutConsumer(t *testing.T) { } func TestFanoutConsumer_MultipleConfigs(t *testing.T) { - testClientConfig, rwReceivedReqs, closeServer := newServerAndClientConfig(t) - testClientConfig2, rwReceivedReqs2, closeServer2 := newServerAndClientConfig(t) - testClientConfig2.Name = "test-client-2" + testEndpointConfig, rwReceivedReqs, closeServer := newServerAndEndpointConfig(t) + testEndpointConfig2, rwReceivedReqs2, closeServer2 := newServerAndEndpointConfig(t) + testEndpointConfig2.Name = "test-client-2" // start writer and consumer - consumer, err := NewFanoutConsumer(log.NewNopLogger(), prometheus.NewRegistry(), testClientConfig, testClientConfig2) + consumer, err := NewFanoutConsumer(log.NewNopLogger(), prometheus.NewRegistry(), testEndpointConfig, testEndpointConfig2) require.NoError(t, err) receivedRequests := util.NewSyncSlice[util.RemoteWriteRequest]() @@ -117,14 +113,14 @@ func TestFanoutConsumer_MultipleConfigs(t *testing.T) { } } - // times 2 due to clients being run + // times 2 due to endpoints being run expectedTotalLines := totalLines * 2 require.Eventually(t, func() bool { return receivedRequests.Length() == expectedTotalLines }, 5*time.Second, time.Second, "timed out waiting for requests to be received") var seenEntries int - // assert over rw client received entries + // assert over rw received entries defer receivedRequests.DoneIterate() for _, req := range receivedRequests.StartIterate() { require.Len(t, req.Request.Streams, 1, "expected 1 stream requests to be received") @@ -135,12 +131,12 @@ func TestFanoutConsumer_MultipleConfigs(t *testing.T) { } func TestFanoutConsumer_InvalidConfig(t *testing.T) { - t.Run("no clients", func(t *testing.T) { + t.Run("no endpoints", func(t *testing.T) { _, err := NewFanoutConsumer(log.NewNopLogger(), prometheus.NewRegistry()) require.Error(t, err) }) - t.Run("repeated client", func(t *testing.T) { + t.Run("repeated endpoint", func(t *testing.T) { host, _ := url.Parse("http://localhost:3100") config := Config{URL: flagext.URLValue{URL: host}} _, err := NewFanoutConsumer(log.NewNopLogger(), prometheus.NewRegistry(), config, config) @@ -182,460 +178,28 @@ var logEntries = []loki.Entry{ }, } -func TestClient_Handle(t *testing.T) { - tests := map[string]struct { - clientBatchSize int - clientBatchWait time.Duration - clientMaxRetries int - clientTenantID string - clientDropRateLimited bool - serverResponseStatus int - inputEntries []loki.Entry - inputDelay time.Duration - expectedReqs []util.RemoteWriteRequest - expectedMetrics string - }{ - "batch log entries together until the batch size is reached": { - clientBatchSize: 10, - clientBatchWait: 100 * time.Millisecond, - clientMaxRetries: 3, - serverResponseStatus: 200, - inputEntries: []loki.Entry{logEntries[0], logEntries[1], logEntries[2]}, - expectedReqs: []util.RemoteWriteRequest{ - { - TenantID: "", - Request: push.PushRequest{Streams: []push.Stream{{Labels: "{}", Entries: []push.Entry{logEntries[0].Entry, logEntries[1].Entry}}}}, - }, - { - TenantID: "", - Request: push.PushRequest{Streams: []push.Stream{{Labels: "{}", Entries: []push.Entry{logEntries[2].Entry}}}}, - }, - }, - expectedMetrics: ` - # HELP loki_write_sent_entries_total Number of log entries sent to the ingester. - # TYPE loki_write_sent_entries_total counter - loki_write_sent_entries_total{host="__HOST__",tenant=""} 3.0 - # HELP loki_write_dropped_entries_total Number of log entries dropped because failed to be sent to the ingester after all retries. - # TYPE loki_write_dropped_entries_total counter - loki_write_dropped_entries_total{host="__HOST__",reason="ingester_error",tenant=""} 0 - loki_write_dropped_entries_total{host="__HOST__",reason="line_too_long",tenant=""} 0 - loki_write_dropped_entries_total{host="__HOST__",reason="rate_limited",tenant=""} 0 - loki_write_dropped_entries_total{host="__HOST__",reason="stream_limited",tenant=""} 0 - # HELP loki_write_mutated_entries_total The total number of log entries that have been mutated. - # TYPE loki_write_mutated_entries_total counter - loki_write_mutated_entries_total{host="__HOST__",reason="ingester_error",tenant=""} 0 - loki_write_mutated_entries_total{host="__HOST__",reason="line_too_long",tenant=""} 0 - loki_write_mutated_entries_total{host="__HOST__",reason="rate_limited",tenant=""} 0 - loki_write_mutated_entries_total{host="__HOST__",reason="stream_limited",tenant=""} 0 - # HELP loki_write_mutated_bytes_total The total number of bytes that have been mutated. - # TYPE loki_write_mutated_bytes_total counter - loki_write_mutated_bytes_total{host="__HOST__",reason="ingester_error",tenant=""} 0 - loki_write_mutated_bytes_total{host="__HOST__",reason="line_too_long",tenant=""} 0 - loki_write_mutated_bytes_total{host="__HOST__",reason="rate_limited",tenant=""} 0 - loki_write_mutated_bytes_total{host="__HOST__",reason="stream_limited",tenant=""} 0 - `, - }, - "batch log entries together until the batch wait time is reached": { - clientBatchSize: 10, - clientBatchWait: 100 * time.Millisecond, - clientMaxRetries: 3, - serverResponseStatus: 200, - inputEntries: []loki.Entry{logEntries[0], logEntries[1]}, - inputDelay: 110 * time.Millisecond, - expectedReqs: []util.RemoteWriteRequest{ - { - TenantID: "", - Request: push.PushRequest{Streams: []push.Stream{{Labels: "{}", Entries: []push.Entry{logEntries[0].Entry}}}}, - }, - { - TenantID: "", - Request: push.PushRequest{Streams: []push.Stream{{Labels: "{}", Entries: []push.Entry{logEntries[1].Entry}}}}, - }, - }, - expectedMetrics: ` - # HELP loki_write_sent_entries_total Number of log entries sent to the ingester. - # TYPE loki_write_sent_entries_total counter - loki_write_sent_entries_total{host="__HOST__",tenant=""} 2.0 - # HELP loki_write_dropped_entries_total Number of log entries dropped because failed to be sent to the ingester after all retries. - # TYPE loki_write_dropped_entries_total counter - loki_write_dropped_entries_total{host="__HOST__",reason="ingester_error",tenant=""} 0 - loki_write_dropped_entries_total{host="__HOST__",reason="line_too_long",tenant=""} 0 - loki_write_dropped_entries_total{host="__HOST__",reason="rate_limited",tenant=""} 0 - loki_write_dropped_entries_total{host="__HOST__",reason="stream_limited",tenant=""} 0 - # HELP loki_write_mutated_entries_total The total number of log entries that have been mutated. - # TYPE loki_write_mutated_entries_total counter - loki_write_mutated_entries_total{host="__HOST__",reason="ingester_error",tenant=""} 0 - loki_write_mutated_entries_total{host="__HOST__",reason="line_too_long",tenant=""} 0 - loki_write_mutated_entries_total{host="__HOST__",reason="rate_limited",tenant=""} 0 - loki_write_mutated_entries_total{host="__HOST__",reason="stream_limited",tenant=""} 0 - # HELP loki_write_mutated_bytes_total The total number of bytes that have been mutated. - # TYPE loki_write_mutated_bytes_total counter - loki_write_mutated_bytes_total{host="__HOST__",reason="ingester_error",tenant=""} 0 - loki_write_mutated_bytes_total{host="__HOST__",reason="line_too_long",tenant=""} 0 - loki_write_mutated_bytes_total{host="__HOST__",reason="rate_limited",tenant=""} 0 - loki_write_mutated_bytes_total{host="__HOST__",reason="stream_limited",tenant=""} 0 - `, - }, - "retry send a batch up to backoff's max retries in case the server responds with a 5xx": { - clientBatchSize: 10, - clientBatchWait: 10 * time.Millisecond, - clientMaxRetries: 3, - serverResponseStatus: 500, - inputEntries: []loki.Entry{logEntries[0]}, - expectedReqs: []util.RemoteWriteRequest{ - { - TenantID: "", - Request: push.PushRequest{Streams: []push.Stream{{Labels: "{}", Entries: []push.Entry{logEntries[0].Entry}}}}, - }, - { - TenantID: "", - Request: push.PushRequest{Streams: []push.Stream{{Labels: "{}", Entries: []push.Entry{logEntries[0].Entry}}}}, - }, - { - TenantID: "", - Request: push.PushRequest{Streams: []push.Stream{{Labels: "{}", Entries: []push.Entry{logEntries[0].Entry}}}}, - }, - }, - expectedMetrics: ` - # HELP loki_write_dropped_entries_total Number of log entries dropped because failed to be sent to the ingester after all retries. - # TYPE loki_write_dropped_entries_total counter - loki_write_dropped_entries_total{host="__HOST__",reason="ingester_error",tenant=""} 1 - loki_write_dropped_entries_total{host="__HOST__",reason="line_too_long",tenant=""} 0 - loki_write_dropped_entries_total{host="__HOST__",reason="rate_limited",tenant=""} 0 - loki_write_dropped_entries_total{host="__HOST__",reason="stream_limited",tenant=""} 0 - # HELP loki_write_mutated_entries_total The total number of log entries that have been mutated. - # TYPE loki_write_mutated_entries_total counter - loki_write_mutated_entries_total{host="__HOST__",reason="ingester_error",tenant=""} 0 - loki_write_mutated_entries_total{host="__HOST__",reason="line_too_long",tenant=""} 0 - loki_write_mutated_entries_total{host="__HOST__",reason="rate_limited",tenant=""} 0 - loki_write_mutated_entries_total{host="__HOST__",reason="stream_limited",tenant=""} 0 - # HELP loki_write_mutated_bytes_total The total number of bytes that have been mutated. - # TYPE loki_write_mutated_bytes_total counter - loki_write_mutated_bytes_total{host="__HOST__",reason="ingester_error",tenant=""} 0 - loki_write_mutated_bytes_total{host="__HOST__",reason="line_too_long",tenant=""} 0 - loki_write_mutated_bytes_total{host="__HOST__",reason="rate_limited",tenant=""} 0 - loki_write_mutated_bytes_total{host="__HOST__",reason="stream_limited",tenant=""} 0 - # HELP loki_write_sent_entries_total Number of log entries sent to the ingester. - # TYPE loki_write_sent_entries_total counter - loki_write_sent_entries_total{host="__HOST__",tenant=""} 0 - `, - }, - "do not retry send a batch in case the server responds with a 4xx": { - clientBatchSize: 10, - clientBatchWait: 10 * time.Millisecond, - clientMaxRetries: 3, - serverResponseStatus: 400, - inputEntries: []loki.Entry{logEntries[0]}, - expectedReqs: []util.RemoteWriteRequest{ - { - TenantID: "", - Request: push.PushRequest{Streams: []push.Stream{{Labels: "{}", Entries: []push.Entry{logEntries[0].Entry}}}}, - }, - }, - expectedMetrics: ` - # HELP loki_write_dropped_entries_total Number of log entries dropped because failed to be sent to the ingester after all retries. - # TYPE loki_write_dropped_entries_total counter - loki_write_dropped_entries_total{host="__HOST__",reason="ingester_error",tenant=""} 1 - loki_write_dropped_entries_total{host="__HOST__",reason="line_too_long",tenant=""} 0 - loki_write_dropped_entries_total{host="__HOST__",reason="rate_limited",tenant=""} 0 - loki_write_dropped_entries_total{host="__HOST__",reason="stream_limited",tenant=""} 0 - # HELP loki_write_mutated_entries_total The total number of log entries that have been mutated. - # TYPE loki_write_mutated_entries_total counter - loki_write_mutated_entries_total{host="__HOST__",reason="ingester_error",tenant=""} 0 - loki_write_mutated_entries_total{host="__HOST__",reason="line_too_long",tenant=""} 0 - loki_write_mutated_entries_total{host="__HOST__",reason="rate_limited",tenant=""} 0 - loki_write_mutated_entries_total{host="__HOST__",reason="stream_limited",tenant=""} 0 - # HELP loki_write_mutated_bytes_total The total number of bytes that have been mutated. - # TYPE loki_write_mutated_bytes_total counter - loki_write_mutated_bytes_total{host="__HOST__",reason="ingester_error",tenant=""} 0 - loki_write_mutated_bytes_total{host="__HOST__",reason="line_too_long",tenant=""} 0 - loki_write_mutated_bytes_total{host="__HOST__",reason="rate_limited",tenant=""} 0 - loki_write_mutated_bytes_total{host="__HOST__",reason="stream_limited",tenant=""} 0 - # HELP loki_write_sent_entries_total Number of log entries sent to the ingester. - # TYPE loki_write_sent_entries_total counter - loki_write_sent_entries_total{host="__HOST__",tenant=""} 0 - `, - }, - "do retry sending a batch in case the server responds with a 429": { - clientBatchSize: 10, - clientBatchWait: 10 * time.Millisecond, - clientMaxRetries: 3, - serverResponseStatus: 429, - inputEntries: []loki.Entry{logEntries[0]}, - expectedReqs: []util.RemoteWriteRequest{ - { - TenantID: "", - Request: push.PushRequest{Streams: []push.Stream{{Labels: "{}", Entries: []push.Entry{logEntries[0].Entry}}}}, - }, - { - TenantID: "", - Request: push.PushRequest{Streams: []push.Stream{{Labels: "{}", Entries: []push.Entry{logEntries[0].Entry}}}}, - }, - { - TenantID: "", - Request: push.PushRequest{Streams: []push.Stream{{Labels: "{}", Entries: []push.Entry{logEntries[0].Entry}}}}, - }, - }, - expectedMetrics: ` - # HELP loki_write_dropped_entries_total Number of log entries dropped because failed to be sent to the ingester after all retries. - # TYPE loki_write_dropped_entries_total counter - loki_write_dropped_entries_total{host="__HOST__",reason="ingester_error",tenant=""} 0 - loki_write_dropped_entries_total{host="__HOST__",reason="line_too_long",tenant=""} 0 - loki_write_dropped_entries_total{host="__HOST__",reason="rate_limited",tenant=""} 1 - loki_write_dropped_entries_total{host="__HOST__",reason="stream_limited",tenant=""} 0 - # HELP loki_write_mutated_entries_total The total number of log entries that have been mutated. - # TYPE loki_write_mutated_entries_total counter - loki_write_mutated_entries_total{host="__HOST__",reason="ingester_error",tenant=""} 0 - loki_write_mutated_entries_total{host="__HOST__",reason="line_too_long",tenant=""} 0 - loki_write_mutated_entries_total{host="__HOST__",reason="rate_limited",tenant=""} 0 - loki_write_mutated_entries_total{host="__HOST__",reason="stream_limited",tenant=""} 0 - # HELP loki_write_mutated_bytes_total The total number of bytes that have been mutated. - # TYPE loki_write_mutated_bytes_total counter - loki_write_mutated_bytes_total{host="__HOST__",reason="ingester_error",tenant=""} 0 - loki_write_mutated_bytes_total{host="__HOST__",reason="line_too_long",tenant=""} 0 - loki_write_mutated_bytes_total{host="__HOST__",reason="rate_limited",tenant=""} 0 - loki_write_mutated_bytes_total{host="__HOST__",reason="stream_limited",tenant=""} 0 - # HELP loki_write_sent_entries_total Number of log entries sent to the ingester. - # TYPE loki_write_sent_entries_total counter - loki_write_sent_entries_total{host="__HOST__",tenant=""} 0 - `, - }, - "do not retry in case of 429 when client is configured to drop rate limited batches": { - clientBatchSize: 10, - clientBatchWait: 10 * time.Millisecond, - clientMaxRetries: 3, - clientDropRateLimited: true, - serverResponseStatus: 429, - inputEntries: []loki.Entry{logEntries[0]}, - expectedReqs: []util.RemoteWriteRequest{ - { - TenantID: "", - Request: push.PushRequest{Streams: []push.Stream{{Labels: "{}", Entries: []push.Entry{logEntries[0].Entry}}}}, - }, - }, - expectedMetrics: ` - # HELP loki_write_dropped_entries_total Number of log entries dropped because failed to be sent to the ingester after all retries. - # TYPE loki_write_dropped_entries_total counter - loki_write_dropped_entries_total{host="__HOST__",reason="ingester_error",tenant=""} 0 - loki_write_dropped_entries_total{host="__HOST__",reason="line_too_long",tenant=""} 0 - loki_write_dropped_entries_total{host="__HOST__",reason="rate_limited",tenant=""} 1 - loki_write_dropped_entries_total{host="__HOST__",reason="stream_limited",tenant=""} 0 - # HELP loki_write_mutated_entries_total The total number of log entries that have been mutated. - # TYPE loki_write_mutated_entries_total counter - loki_write_mutated_entries_total{host="__HOST__",reason="ingester_error",tenant=""} 0 - loki_write_mutated_entries_total{host="__HOST__",reason="line_too_long",tenant=""} 0 - loki_write_mutated_entries_total{host="__HOST__",reason="rate_limited",tenant=""} 0 - loki_write_mutated_entries_total{host="__HOST__",reason="stream_limited",tenant=""} 0 - # HELP loki_write_mutated_bytes_total The total number of bytes that have been mutated. - # TYPE loki_write_mutated_bytes_total counter - loki_write_mutated_bytes_total{host="__HOST__",reason="ingester_error",tenant=""} 0 - loki_write_mutated_bytes_total{host="__HOST__",reason="line_too_long",tenant=""} 0 - loki_write_mutated_bytes_total{host="__HOST__",reason="rate_limited",tenant=""} 0 - loki_write_mutated_bytes_total{host="__HOST__",reason="stream_limited",tenant=""} 0 - # HELP loki_write_sent_entries_total Number of log entries sent to the ingester. - # TYPE loki_write_sent_entries_total counter - loki_write_sent_entries_total{host="__HOST__",tenant=""} 0 - `, - }, - "batch log entries together honoring the client tenant ID": { - clientBatchSize: 100, - clientBatchWait: 100 * time.Millisecond, - clientMaxRetries: 3, - clientTenantID: "tenant-default", - serverResponseStatus: 200, - inputEntries: []loki.Entry{logEntries[0], logEntries[1]}, - expectedReqs: []util.RemoteWriteRequest{ - { - TenantID: "tenant-default", - Request: push.PushRequest{Streams: []push.Stream{{Labels: "{}", Entries: []push.Entry{logEntries[0].Entry, logEntries[1].Entry}}}}, - }, - }, - expectedMetrics: ` - # HELP loki_write_sent_entries_total Number of log entries sent to the ingester. - # TYPE loki_write_sent_entries_total counter - loki_write_sent_entries_total{host="__HOST__",tenant="tenant-default"} 2.0 - # HELP loki_write_dropped_entries_total Number of log entries dropped because failed to be sent to the ingester after all retries. - # TYPE loki_write_dropped_entries_total counter - loki_write_dropped_entries_total{host="__HOST__", reason="ingester_error", tenant="tenant-default"} 0 - loki_write_dropped_entries_total{host="__HOST__",reason="line_too_long",tenant="tenant-default"} 0 - loki_write_dropped_entries_total{host="__HOST__", reason="rate_limited", tenant="tenant-default"} 0 - loki_write_dropped_entries_total{host="__HOST__",reason="stream_limited",tenant="tenant-default"} 0 - # HELP loki_write_mutated_entries_total The total number of log entries that have been mutated. - # TYPE loki_write_mutated_entries_total counter - loki_write_mutated_entries_total{host="__HOST__",reason="ingester_error",tenant="tenant-default"} 0 - loki_write_mutated_entries_total{host="__HOST__",reason="line_too_long",tenant="tenant-default"} 0 - loki_write_mutated_entries_total{host="__HOST__",reason="rate_limited",tenant="tenant-default"} 0 - loki_write_mutated_entries_total{host="__HOST__",reason="stream_limited",tenant="tenant-default"} 0 - # HELP loki_write_mutated_bytes_total The total number of bytes that have been mutated. - # TYPE loki_write_mutated_bytes_total counter - loki_write_mutated_bytes_total{host="__HOST__",reason="ingester_error",tenant="tenant-default"} 0 - loki_write_mutated_bytes_total{host="__HOST__",reason="line_too_long",tenant="tenant-default"} 0 - loki_write_mutated_bytes_total{host="__HOST__",reason="rate_limited",tenant="tenant-default"} 0 - loki_write_mutated_bytes_total{host="__HOST__",reason="stream_limited",tenant="tenant-default"} 0 - `, - }, - "batch log entries together honoring the tenant ID overridden while processing the pipeline stages": { - clientBatchSize: 100, - clientBatchWait: 100 * time.Millisecond, - clientMaxRetries: 3, - clientTenantID: "tenant-default", - serverResponseStatus: 200, - inputEntries: []loki.Entry{logEntries[0], logEntries[3], logEntries[4], logEntries[5]}, - expectedReqs: []util.RemoteWriteRequest{ - { - TenantID: "tenant-default", - Request: push.PushRequest{Streams: []push.Stream{{Labels: "{}", Entries: []push.Entry{logEntries[0].Entry}}}}, - }, - { - TenantID: "tenant-1", - Request: push.PushRequest{Streams: []push.Stream{{Labels: "{}", Entries: []push.Entry{logEntries[3].Entry, logEntries[4].Entry}}}}, - }, - { - TenantID: "tenant-2", - Request: push.PushRequest{Streams: []push.Stream{{Labels: "{}", Entries: []push.Entry{logEntries[5].Entry}}}}, - }, - }, - expectedMetrics: ` - # HELP loki_write_sent_entries_total Number of log entries sent to the ingester. - # TYPE loki_write_sent_entries_total counter - loki_write_sent_entries_total{host="__HOST__",tenant="tenant-1"} 2.0 - loki_write_sent_entries_total{host="__HOST__",tenant="tenant-2"} 1.0 - loki_write_sent_entries_total{host="__HOST__",tenant="tenant-default"} 1.0 - # HELP loki_write_dropped_entries_total Number of log entries dropped because failed to be sent to the ingester after all retries. - # TYPE loki_write_dropped_entries_total counter - loki_write_dropped_entries_total{host="__HOST__",reason="ingester_error",tenant="tenant-1"} 0 - loki_write_dropped_entries_total{host="__HOST__",reason="ingester_error",tenant="tenant-2"} 0 - loki_write_dropped_entries_total{host="__HOST__",reason="ingester_error",tenant="tenant-default"} 0 - loki_write_dropped_entries_total{host="__HOST__",reason="line_too_long",tenant="tenant-1"} 0 - loki_write_dropped_entries_total{host="__HOST__",reason="line_too_long",tenant="tenant-2"} 0 - loki_write_dropped_entries_total{host="__HOST__",reason="line_too_long",tenant="tenant-default"} 0 - loki_write_dropped_entries_total{host="__HOST__",reason="rate_limited",tenant="tenant-1"} 0 - loki_write_dropped_entries_total{host="__HOST__",reason="rate_limited",tenant="tenant-2"} 0 - loki_write_dropped_entries_total{host="__HOST__",reason="rate_limited",tenant="tenant-default"} 0 - loki_write_dropped_entries_total{host="__HOST__",reason="stream_limited",tenant="tenant-1"} 0 - loki_write_dropped_entries_total{host="__HOST__",reason="stream_limited",tenant="tenant-2"} 0 - loki_write_dropped_entries_total{host="__HOST__",reason="stream_limited",tenant="tenant-default"} 0 - # HELP loki_write_mutated_entries_total The total number of log entries that have been mutated. - # TYPE loki_write_mutated_entries_total counter - loki_write_mutated_entries_total{host="__HOST__",reason="ingester_error",tenant="tenant-1"} 0 - loki_write_mutated_entries_total{host="__HOST__",reason="ingester_error",tenant="tenant-2"} 0 - loki_write_mutated_entries_total{host="__HOST__",reason="ingester_error",tenant="tenant-default"} 0 - loki_write_mutated_entries_total{host="__HOST__",reason="line_too_long",tenant="tenant-1"} 0 - loki_write_mutated_entries_total{host="__HOST__",reason="line_too_long",tenant="tenant-2"} 0 - loki_write_mutated_entries_total{host="__HOST__",reason="line_too_long",tenant="tenant-default"} 0 - loki_write_mutated_entries_total{host="__HOST__",reason="rate_limited",tenant="tenant-1"} 0 - loki_write_mutated_entries_total{host="__HOST__",reason="rate_limited",tenant="tenant-2"} 0 - loki_write_mutated_entries_total{host="__HOST__",reason="rate_limited",tenant="tenant-default"} 0 - loki_write_mutated_entries_total{host="__HOST__",reason="stream_limited",tenant="tenant-1"} 0 - loki_write_mutated_entries_total{host="__HOST__",reason="stream_limited",tenant="tenant-2"} 0 - loki_write_mutated_entries_total{host="__HOST__",reason="stream_limited",tenant="tenant-default"} 0 - # HELP loki_write_mutated_bytes_total The total number of bytes that have been mutated. - # TYPE loki_write_mutated_bytes_total counter - loki_write_mutated_bytes_total{host="__HOST__",reason="ingester_error",tenant="tenant-1"} 0 - loki_write_mutated_bytes_total{host="__HOST__",reason="ingester_error",tenant="tenant-2"} 0 - loki_write_mutated_bytes_total{host="__HOST__",reason="ingester_error",tenant="tenant-default"} 0 - loki_write_mutated_bytes_total{host="__HOST__",reason="line_too_long",tenant="tenant-1"} 0 - loki_write_mutated_bytes_total{host="__HOST__",reason="line_too_long",tenant="tenant-2"} 0 - loki_write_mutated_bytes_total{host="__HOST__",reason="line_too_long",tenant="tenant-default"} 0 - loki_write_mutated_bytes_total{host="__HOST__",reason="rate_limited",tenant="tenant-1"} 0 - loki_write_mutated_bytes_total{host="__HOST__",reason="rate_limited",tenant="tenant-2"} 0 - loki_write_mutated_bytes_total{host="__HOST__",reason="rate_limited",tenant="tenant-default"} 0 - loki_write_mutated_bytes_total{host="__HOST__",reason="stream_limited",tenant="tenant-1"} 0 - loki_write_mutated_bytes_total{host="__HOST__",reason="stream_limited",tenant="tenant-2"} 0 - loki_write_mutated_bytes_total{host="__HOST__",reason="stream_limited",tenant="tenant-default"} 0 - `, - }, - } - - for testName, testData := range tests { - t.Run(testName, func(t *testing.T) { - reg := prometheus.NewRegistry() - - // Create a buffer channel where we do enqueue received requests - receivedReqsChan := make(chan util.RemoteWriteRequest, 10) - - // Start a local HTTP server - server := util.NewRemoteWriteServer(receivedReqsChan, testData.serverResponseStatus) - require.NotNil(t, server) - defer server.Close() - - // Get the URL at which the local test server is listening to - serverURL := flagext.URLValue{} - err := serverURL.Set(server.URL) - require.NoError(t, err) - - // Instance the client - cfg := Config{ - URL: serverURL, - BatchWait: testData.clientBatchWait, - BatchSize: testData.clientBatchSize, - DropRateLimitedBatches: testData.clientDropRateLimited, - Client: config.DefaultHTTPClientConfig, - BackoffConfig: backoff.Config{MinBackoff: 1 * time.Millisecond, MaxBackoff: 2 * time.Millisecond, MaxRetries: testData.clientMaxRetries}, - Timeout: 1 * time.Second, - TenantID: testData.clientTenantID, - } - - m := NewMetrics(reg) - c, err := newClient(m, cfg, log.NewNopLogger()) - require.NoError(t, err) - - // Send all the input log entries - for i, logEntry := range testData.inputEntries { - c.Chan() <- logEntry - - if testData.inputDelay > 0 && i < len(testData.inputEntries)-1 { - time.Sleep(testData.inputDelay) - } - } - - // Wait until the expected push requests are received (with a timeout) - deadline := time.Now().Add(1 * time.Second) - for len(receivedReqsChan) < len(testData.expectedReqs) && time.Now().Before(deadline) { - time.Sleep(5 * time.Millisecond) - } - - // Stop the client: it waits until the current batch is sent - c.Stop() - close(receivedReqsChan) - - // Get all push requests received on the server side - receivedReqs := make([]util.RemoteWriteRequest, 0) - for req := range receivedReqsChan { - receivedReqs = append(receivedReqs, req) - } - - assert.ElementsMatch(t, testData.expectedReqs, receivedReqs) - - expectedMetrics := strings.ReplaceAll(testData.expectedMetrics, "__HOST__", serverURL.Host) - err = testutil.GatherAndCompare(reg, strings.NewReader(expectedMetrics), "loki_write_sent_entries_total", "loki_write_dropped_entries_total", "loki_write_mutated_entries_total", "loki_write_mutated_bytes_total") - assert.NoError(t, err) - }) - } -} - -func newServerAndClientConfig(t *testing.T) (Config, chan util.RemoteWriteRequest, func()) { +func newServerAndEndpointConfig(t *testing.T) (Config, chan util.RemoteWriteRequest, func()) { receivedReqsChan := make(chan util.RemoteWriteRequest, 10) // Start a local HTTP server server := util.NewRemoteWriteServer(receivedReqsChan, http.StatusOK) require.NotNil(t, server) - testClientURL, _ := url.Parse(server.URL) - testClientConfig := Config{ + url, _ := url.Parse(server.URL) + endpointConfig := Config{ Name: "test-client", - URL: flagext.URLValue{URL: testClientURL}, + URL: flagext.URLValue{URL: url}, Timeout: time.Second * 2, BatchSize: 1, BackoffConfig: backoff.Config{ MaxRetries: 0, }, - Queue: QueueConfig{ + QueueConfig: QueueConfig{ Capacity: 10, // buffered channel of size 10 DrainTimeout: time.Second * 10, }, } - return testClientConfig, receivedReqsChan, func() { + return endpointConfig, receivedReqsChan, func() { server.Close() close(receivedReqsChan) } diff --git a/internal/component/common/loki/client/consumer_wal.go b/internal/component/common/loki/client/consumer_wal.go index ca2a6f4cf19..00ec74f0a96 100644 --- a/internal/component/common/loki/client/consumer_wal.go +++ b/internal/component/common/loki/client/consumer_wal.go @@ -1,23 +1,12 @@ package client import ( - "bufio" - "bytes" - "context" - "errors" "fmt" - "io" - "net/http" - "strconv" "sync" - "time" "github.com/go-kit/log" "github.com/go-kit/log/level" - "github.com/grafana/dskit/backoff" - "github.com/grafana/loki/pkg/push" "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/common/config" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/tsdb/chunks" "github.com/prometheus/prometheus/tsdb/record" @@ -25,12 +14,11 @@ import ( "github.com/grafana/alloy/internal/component/common/loki" "github.com/grafana/alloy/internal/component/common/loki/client/internal" "github.com/grafana/alloy/internal/component/common/loki/wal" - "github.com/grafana/alloy/internal/useragent" ) -func NewWALConsumer(logger log.Logger, reg prometheus.Registerer, walCfg wal.Config, clientCfgs ...Config) (*WALConsumer, error) { - if len(clientCfgs) == 0 { - return nil, fmt.Errorf("at least one client config must be provided") +func NewWALConsumer(logger log.Logger, reg prometheus.Registerer, walCfg wal.Config, cfgs ...Config) (*WALConsumer, error) { + if len(cfgs) == 0 { + return nil, fmt.Errorf("at least one endpoint config must be provided") } writer, err := wal.NewWriter(walCfg, logger, reg) @@ -40,55 +28,54 @@ func NewWALConsumer(logger log.Logger, reg prometheus.Registerer, walCfg wal.Con m := &WALConsumer{ writer: writer, - pairs: make([]clientWatcherPair, 0, len(clientCfgs)), + pairs: make([]endpointWatcherPair, 0, len(cfgs)), } var ( - metrics = NewMetrics(reg) - clientsCheck = make(map[string]struct{}) + metrics = newMetrics(reg) + endpointsCheck = make(map[string]struct{}) - walWatcherMetrics = wal.NewWatcherMetrics(reg) - walMarkerMetrics = internal.NewMarkerMetrics(reg) - walClientMetrics = NewWALClientMetrics(reg) + walWatcherMetrics = wal.NewWatcherMetrics(reg) + walMarkerMetrics = internal.NewMarkerMetrics(reg) + walEndpointMetrics = newWALEndpointMetrics(reg) ) - for _, cfg := range clientCfgs { - // Don't allow duplicate clients, we have client specific metrics that need at least one unique label value (name). - clientName := getClientName(cfg) - if _, ok := clientsCheck[clientName]; ok { - return nil, fmt.Errorf("duplicate client configs are not allowed, found duplicate for name: %s", cfg.Name) + for _, cfg := range cfgs { + // Don't allow duplicate endpoints, we have endpoint specific metrics that need at least one unique label value (name). + name := getEndpointName(cfg) + if _, ok := endpointsCheck[name]; ok { + return nil, fmt.Errorf("duplicate endpoint configs are not allowed, found duplicate for name: %s", cfg.Name) } - clientsCheck[clientName] = struct{}{} - - // add some context information for the logger the watcher uses - wlog := log.With(logger, "client", clientName) + endpointsCheck[name] = struct{}{} markerFileHandler, err := internal.NewMarkerFileHandler(logger, walCfg.Dir) if err != nil { return nil, err } - markerHandler := internal.NewMarkerHandler(markerFileHandler, walCfg.MaxSegmentAge, logger, walMarkerMetrics.WithCurriedId(clientName)) + markerHandler := internal.NewMarkerHandler(markerFileHandler, walCfg.MaxSegmentAge, logger, walMarkerMetrics.CurryWithId(name)) - client, err := newWalClient(metrics, walClientMetrics.CurryWithId(clientName), cfg, logger, markerHandler) + endpoint, err := newEndpoint(metrics, cfg, logger, markerHandler) if err != nil { - return nil, fmt.Errorf("error starting wal client: %w", err) + return nil, fmt.Errorf("error starting endpoint: %w", err) } + adapter := newWalEndpointAdapter(endpoint, logger, walEndpointMetrics.CurryWithId(name), markerHandler) + // subscribe watcher's wal.WriteTo to writer events. This will make the writer trigger the cleanup of the wal.WriteTo // series cache whenever a segment is deleted. - writer.SubscribeCleanup(client) + writer.SubscribeCleanup(adapter) - watcher := wal.NewWatcher(walCfg.Dir, clientName, walWatcherMetrics, client, wlog, walCfg.WatchConfig, markerHandler) + watcher := wal.NewWatcher(walCfg.Dir, name, walWatcherMetrics, adapter, log.With(logger, "component", name), walCfg.WatchConfig, markerHandler) // subscribe watcher to wal write events writer.SubscribeWrite(watcher) - level.Debug(logger).Log("msg", "starting WAL watcher for client", "client", clientName) + level.Debug(logger).Log("msg", "starting WAL watcher for endpoint", "endpoint", name) watcher.Start() - m.pairs = append(m.pairs, clientWatcherPair{ - watcher: watcher, - client: client, + m.pairs = append(m.pairs, endpointWatcherPair{ + watcher: watcher, + endpoint: adapter, }) } @@ -97,28 +84,28 @@ func NewWALConsumer(logger log.Logger, reg prometheus.Registerer, walCfg wal.Con return m, nil } -type clientWatcherPair struct { - watcher *wal.Watcher - client *walClient +type endpointWatcherPair struct { + watcher *wal.Watcher + endpoint *walEndpointAdapter } -// Stop will proceed to stop, in order, watcher and the client. -func (p clientWatcherPair) Stop(drain bool) { +// Stop will proceed to stop, in order, watcher and the endpoint. +func (p endpointWatcherPair) Stop(drain bool) { // If drain enabled, drain the WAL. if drain { p.watcher.Drain() } p.watcher.Stop() - // Subsequently stop the client. - p.client.Stop() + // Subsequently stop the endpoint. + p.endpoint.Stop() } var _ DrainableConsumer = (*WALConsumer)(nil) type WALConsumer struct { writer *wal.Writer - pairs []clientWatcherPair + pairs []endpointWatcherPair } func (m *WALConsumer) Chan() chan<- loki.Entry { @@ -129,8 +116,8 @@ func (m *WALConsumer) Stop() { m.stop(false) } -// StopAndDrain will stop the manager, its WalWriter, Write-Ahead Log watchers, -// and queues accordingly. It attempt to drain the WAL completely. +// StopAndDrain will stop the consumer, its WalWriter, Write-Ahead Log watchers, +// and endpoints accordingly. It attempt to drain the WAL completely. func (m *WALConsumer) StopAndDrain() { m.stop(true) } @@ -142,7 +129,7 @@ func (m *WALConsumer) stop(drain bool) { // Depending on whether drain is enabled, the maximum time stopping a watcher and it's queue can take is // the drain time of the watcher + drain time queue. To minimize this, and since we keep a separate WAL for each - // client config, each (watcher, queue) pair is stopped concurrently. + // endpoint config, each (watcher, queue) pair is stopped concurrently. for _, pair := range m.pairs { stopWG.Go(func() { pair.Stop(drain) @@ -153,96 +140,39 @@ func (m *WALConsumer) stop(drain bool) { stopWG.Wait() } -// walClient is a WAL-specific remote write client implementation. This client attests to the wal.WriteTo interface, -// which allows it to be injected in the wal.Watcher as a destination where to write read series and entries. As the watcher -// reads from the WAL, batches are created and dispatched onto a send queue when ready to be sent. -type walClient struct { - metrics *Metrics - wcMetrics *WALClientMetrics - logger log.Logger - cfg Config - client *http.Client - - batches map[string]*batch - batchesMtx sync.Mutex - sendQueue *queue - drainTimeout time.Duration - - wg sync.WaitGroup - - // series cache - series map[chunks.HeadSeriesRef]model.LabelSet - seriesSegment map[chunks.HeadSeriesRef]int - seriesLock sync.RWMutex - - // ctx is used in any upstream calls from the `client`. - ctx context.Context - cancel context.CancelFunc - quit chan struct{} - markerHandler internal.MarkerHandler -} - -func newWalClient(metrics *Metrics, qcMetrics *WALClientMetrics, cfg Config, logger log.Logger, markerHandler internal.MarkerHandler) (*walClient, error) { - if cfg.URL.URL == nil { - return nil, errors.New("client needs target URL") - } - - ctx, cancel := context.WithCancel(context.Background()) - - c := &walClient{ - logger: log.With(logger, "component", "client", "host", cfg.URL.Host), - cfg: cfg, - metrics: metrics, - wcMetrics: qcMetrics, - drainTimeout: cfg.Queue.DrainTimeout, - quit: make(chan struct{}), - - batches: make(map[string]*batch), - markerHandler: markerHandler, +func newWalEndpointAdapter(endpoint *endpoint, logger log.Logger, metrics *walEndpointMetrics, markerHandler internal.MarkerHandler) *walEndpointAdapter { + c := &walEndpointAdapter{ + logger: log.With(logger, "component", "waladapter"), + metrics: metrics, + endpoint: endpoint, series: make(map[chunks.HeadSeriesRef]model.LabelSet), seriesSegment: make(map[chunks.HeadSeriesRef]int), - ctx: ctx, - cancel: cancel, + markerHandler: markerHandler, } - // The buffered channel size is calculated using the configured capacity, which is the worst case number of bytes - // the send queue can consume. - var queueBufferSize = cfg.Queue.Capacity / cfg.BatchSize - c.sendQueue = newQueue(c, queueBufferSize, logger) - - err := cfg.Client.Validate() - if err != nil { - return nil, err - } + return c +} - c.client, err = config.NewClientFromConfig(cfg.Client, useragent.ProductName) - if err != nil { - return nil, err - } +// walEndpointAdapter is an adapter between watcher and endpoint. This component attests to the wal.WriteTo interface, +// which allows it to be injected in the wal.Watcher as a destination where to write series and entries. As the watcher +// reads from the WAL, entires are forwarded here so it can be written to endpoint. +type walEndpointAdapter struct { + logger log.Logger + metrics *walEndpointMetrics - c.client.Timeout = cfg.Timeout + endpoint *endpoint - c.wg.Go(func() { c.runSendOldBatches() }) - return c, nil -} - -func (c *walClient) initBatchMetrics(tenantID string) { - // Initialize counters to 0 so the metrics are exported before the first - // occurrence of incrementing to avoid missing metrics. - for _, counter := range c.metrics.countersWithHostTenantReason { - for _, reason := range Reasons { - counter.WithLabelValues(c.cfg.URL.Host, tenantID, reason).Add(0) - } - } + // series cache + series map[chunks.HeadSeriesRef]model.LabelSet + seriesSegment map[chunks.HeadSeriesRef]int + seriesLock sync.RWMutex - for _, counter := range c.metrics.countersWithHostTenant { - counter.WithLabelValues(c.cfg.URL.Host, tenantID).Add(0) - } + markerHandler internal.MarkerHandler } -func (c *walClient) SeriesReset(segmentNum int) { +func (c *walEndpointAdapter) SeriesReset(segmentNum int) { c.seriesLock.Lock() defer c.seriesLock.Unlock() for k, v := range c.seriesSegment { @@ -254,7 +184,7 @@ func (c *walClient) SeriesReset(segmentNum int) { } } -func (c *walClient) StoreSeries(series []record.RefSeries, segment int) { +func (c *walEndpointAdapter) StoreSeries(series []record.RefSeries, segment int) { c.seriesLock.Lock() defer c.seriesLock.Unlock() for _, seriesRec := range series { @@ -263,14 +193,17 @@ func (c *walClient) StoreSeries(series []record.RefSeries, segment int) { } } -func (c *walClient) AppendEntries(entries wal.RefEntries, segment int) error { +func (c *walEndpointAdapter) AppendEntries(entries wal.RefEntries, segment int) error { c.seriesLock.RLock() l, ok := c.series[entries.Ref] c.seriesLock.RUnlock() var maxSeenTimestamp int64 = -1 if ok { for _, e := range entries.Entries { - c.appendSingleEntry(segment, l, e) + ok := c.endpoint.enqueue(loki.Entry{Labels: l, Entry: e}, segment) + if !ok { + return nil + } if e.Timestamp.Unix() > maxSeenTimestamp { maxSeenTimestamp = e.Timestamp.Unix() } @@ -284,387 +217,13 @@ func (c *walClient) AppendEntries(entries wal.RefEntries, segment int) error { // It's safe to assume that upon an AppendEntries call, there will always be at least // one entry. - c.wcMetrics.lastReadTimestamp.WithLabelValues().Set(float64(maxSeenTimestamp)) - + c.metrics.lastReadTimestamp.WithLabelValues().Set(float64(maxSeenTimestamp)) return nil } -func (c *walClient) appendSingleEntry(segmentNum int, lbs model.LabelSet, e push.Entry) { - lbs, tenantID := c.processLabels(lbs) - - // TODO: can I make this locking more fine grained? - c.batchesMtx.Lock() - - batch, ok := c.batches[tenantID] - - // If the batch doesn't exist yet, we create a new one with the entry - if !ok { - nb := newBatch(c.cfg.MaxStreams) - // since the batch is new, adding a new entry, and hence a new stream, won't fail since there aren't any stream - // registered in the batch. - _ = nb.add(loki.Entry{Labels: lbs, Entry: e}, segmentNum) - - c.batches[tenantID] = nb - c.batchesMtx.Unlock() - - c.initBatchMetrics(tenantID) - return - } - - // If adding the entry to the batch will increase the size over the max - // size allowed, we do send the current batch and then create a new one - if batch.sizeBytesAfter(e) > c.cfg.BatchSize { - c.sendQueue.enqueue(queuedBatch{ - TenantID: tenantID, - Batch: batch, - }) - - nb := newBatch(c.cfg.MaxStreams) - _ = nb.add(loki.Entry{Labels: lbs, Entry: e}, segmentNum) - c.batches[tenantID] = nb - c.batchesMtx.Unlock() - - return - } - - // The max size of the batch isn't reached, so we can add the entry - err := batch.add(loki.Entry{Labels: lbs, Entry: e}, segmentNum) - c.batchesMtx.Unlock() - - if err != nil { - level.Error(c.logger).Log("msg", "batch add err", "tenant", tenantID, "error", err) - reason := ReasonGeneric - if errors.Is(err, errMaxStreamsLimitExceeded) { - reason = ReasonStreamLimited - } - c.metrics.droppedBytes.WithLabelValues(c.cfg.URL.Host, tenantID, reason).Add(float64(len(e.Line))) - c.metrics.droppedEntries.WithLabelValues(c.cfg.URL.Host, tenantID, reason).Inc() - } -} - -func (c *walClient) runSendOldBatches() { - // Given the client handles multiple batches (1 per tenant) and each batch - // can be created at a different point in time, we look for batches whose - // max wait time has been reached every 10 times per BatchWait, so that the - // maximum delay we have sending batches is 10% of the max waiting time. - // We apply a cap of 10ms to the ticker, to avoid too frequent checks in - // case the BatchWait is very low. - minWaitCheckFrequency := 10 * time.Millisecond - maxWaitCheckFrequency := max(c.cfg.BatchWait/10, minWaitCheckFrequency) - - maxWaitCheck := time.NewTicker(maxWaitCheckFrequency) - - // pablo: maybe this should be moved out - defer func() { - maxWaitCheck.Stop() - }() - - var batchesToFlush []queuedBatch - - for { - select { - case <-c.quit: - return - - case <-maxWaitCheck.C: - c.batchesMtx.Lock() - // Send all batches whose max wait time has been reached - for tenantID, b := range c.batches { - if b.age() < c.cfg.BatchWait { - continue - } - - // add to batches to flush, so we can enqueue them later and release the batches lock - // as early as possible - batchesToFlush = append(batchesToFlush, queuedBatch{ - TenantID: tenantID, - Batch: b, - }) - - // deleting assuming that since the batch expired the wait time, it - // hasn't been written for some time - delete(c.batches, tenantID) - } - - c.batchesMtx.Unlock() - - // enqueue batches that were marked as too old - for _, qb := range batchesToFlush { - c.sendQueue.enqueue(qb) - } - - batchesToFlush = batchesToFlush[:0] // renew slice - } - } -} - -// enqueuePendingBatches will go over the pending batches, and enqueue them in the send queue. If the context's -// deadline is exceeded in any enqueue operation, this routine exits. -func (c *walClient) enqueuePendingBatches(ctx context.Context) { - c.batchesMtx.Lock() - defer c.batchesMtx.Unlock() - - for tenantID, batch := range c.batches { - if !c.sendQueue.enqueueWithCancel(ctx, queuedBatch{ - TenantID: tenantID, - Batch: batch, - }) { - // if enqueue times out due to the context timing out, cancel all - return - } - } -} - -func (c *walClient) sendBatch(ctx context.Context, tenantID string, batch *batch) { - buf, entriesCount, err := batch.encode() - if err != nil { - level.Error(c.logger).Log("msg", "error encoding batch", "error", err) - return - } - bufBytes := float64(len(buf)) - c.metrics.encodedBytes.WithLabelValues(c.cfg.URL.Host, tenantID).Add(bufBytes) - - backoff := backoff.New(c.ctx, c.cfg.BackoffConfig) - var status int - for { - start := time.Now() - // send uses `timeout` internally, so `context.Background` is good enough. - status, err = c.send(ctx, tenantID, buf) - - c.metrics.requestDuration.WithLabelValues(strconv.Itoa(status), c.cfg.URL.Host, tenantID).Observe(time.Since(start).Seconds()) - - // Immediately drop rate limited batches to avoid HOL blocking for other tenants not experiencing throttling - if c.cfg.DropRateLimitedBatches && batchIsRateLimited(status) { - level.Warn(c.logger).Log("msg", "dropping batch due to rate limiting applied at ingester") - c.metrics.droppedBytes.WithLabelValues(c.cfg.URL.Host, tenantID, ReasonRateLimited).Add(bufBytes) - c.metrics.droppedEntries.WithLabelValues(c.cfg.URL.Host, tenantID, ReasonRateLimited).Add(float64(entriesCount)) - return - } - - if err == nil { - c.metrics.sentBytes.WithLabelValues(c.cfg.URL.Host, tenantID).Add(bufBytes) - c.metrics.sentEntries.WithLabelValues(c.cfg.URL.Host, tenantID).Add(float64(entriesCount)) - return - } - - // Only retry 429s, 500s and connection-level errors. - if status > 0 && !batchIsRateLimited(status) && status/100 != 5 { - break - } - - level.Warn(c.logger).Log("msg", "error sending batch, will retry", "status", status, "tenant", tenantID, "error", err) - c.metrics.batchRetries.WithLabelValues(c.cfg.URL.Host, tenantID).Inc() - backoff.Wait() - - // Make sure it sends at least once before checking for retry. - if !backoff.Ongoing() { - break - } - } - - level.Error(c.logger).Log("msg", "final error sending batch, no retries left, dropping data", "status", status, "tenant", tenantID, "error", err) - // If the reason for the last retry error was rate limiting, count the drops as such, even if the previous errors - // were for a different reason - dropReason := ReasonGeneric - if batchIsRateLimited(status) { - dropReason = ReasonRateLimited - } - c.metrics.droppedBytes.WithLabelValues(c.cfg.URL.Host, tenantID, dropReason).Add(bufBytes) - c.metrics.droppedEntries.WithLabelValues(c.cfg.URL.Host, tenantID, dropReason).Add(float64(entriesCount)) -} - -func (c *walClient) send(ctx context.Context, tenantID string, buf []byte) (int, error) { - ctx, cancel := context.WithTimeout(ctx, c.cfg.Timeout) - defer cancel() - req, err := http.NewRequest("POST", c.cfg.URL.String(), bytes.NewReader(buf)) - if err != nil { - return -1, err - } - req = req.WithContext(ctx) - req.Header.Set("Content-Type", contentType) - req.Header.Set("User-Agent", userAgent) - - // If the tenant ID is not empty promtail is running in multi-tenant mode, so - // we should send it to Loki - if tenantID != "" { - req.Header.Set("X-Scope-OrgID", tenantID) - } - - // Add custom headers on request - if len(c.cfg.Headers) > 0 { - for k, v := range c.cfg.Headers { - if req.Header.Get(k) == "" { - req.Header.Add(k, v) - } else { - level.Warn(c.logger).Log("msg", "custom header key already exists, skipping", "key", k) - } - } - } - - resp, err := c.client.Do(req) - if err != nil { - return -1, err - } - - // NOTE: it is important in go to fully read the body and - // close it so that the connection can be reused. - // We only partially read the body if we encounter a non 2xx error - // so we should always consume whats left. - // https://github.com/golang/go/blob/32a9804c7ba3f4a0e0bd26cc24b9204860a49ec8/src/net/http/response.go#L59-L64 - // It is unclear that we always need to drain the body but - // https://github.com/golang/go/issues/60240#issuecomment-1551060433 seems to indicate that we should. - - defer func() { - _, _ = io.Copy(io.Discard, resp.Body) - _ = resp.Body.Close() - }() - - if resp.StatusCode/100 != 2 { - scanner := bufio.NewScanner(io.LimitReader(resp.Body, maxErrMsgLen)) - line := "" - if scanner.Scan() { - line = scanner.Text() - } - err = fmt.Errorf("server returned HTTP status %s (%d): %s", resp.Status, resp.StatusCode, line) - } - return resp.StatusCode, err -} - -func (c *walClient) getTenantID(labels model.LabelSet) string { - // Check if it has been overridden while processing the pipeline stages - if value, ok := labels[ReservedLabelTenantID]; ok { - return string(value) - } - - // Check if has been specified in the config - if c.cfg.TenantID != "" { - return c.cfg.TenantID - } - - // Defaults to an empty string, which means the X-Scope-OrgID header - // will not be sent - return "" -} - -// Stop the client, enqueueing pending batches and draining the send queue accordingly. Both closing operations are +// Stop the endpoint, enqueueing pending batches and draining the send queue accordingly. Both closing operations are // limited by a deadline, controlled by a configured drain timeout, which is global to the Stop call. -func (c *walClient) Stop() { - // first close main queue routine - close(c.quit) - c.wg.Wait() - - // fire timeout timer - ctx, cancel := context.WithTimeout(context.Background(), c.drainTimeout) - defer cancel() - - // enqueue batches that might be pending in the batches map - c.enqueuePendingBatches(ctx) - - // drain sendQueue with timeout in context - c.sendQueue.closeAndDrain(ctx) - - // stop request after drain times out or exits - c.cancel() - +func (c *walEndpointAdapter) Stop() { + c.endpoint.stop() c.markerHandler.Stop() } - -func (c *walClient) processLabels(lbs model.LabelSet) (model.LabelSet, string) { - tenantID := c.getTenantID(lbs) - return lbs, tenantID -} - -// queuedBatch is a batch specific to a tenant, that is considered ready to be sent. -type queuedBatch struct { - TenantID string - Batch *batch -} - -// queue wraps a buffered channel and a routine that reads from it, sending batches of entries. -type queue struct { - client *walClient - q chan queuedBatch - quit chan struct{} - wg sync.WaitGroup - logger log.Logger -} - -func newQueue(client *walClient, size int, logger log.Logger) *queue { - q := queue{ - client: client, - q: make(chan queuedBatch, size), - quit: make(chan struct{}), - logger: logger, - } - - q.wg.Go(func() { q.run() }) - - return &q -} - -// enqueue adds to the send queue a batch ready to be sent. Note that if the backing queue is has no -// remaining capacity to enqueue the batch, calling enqueue might block. -func (q *queue) enqueue(qb queuedBatch) { - q.q <- qb -} - -// enqueueWithCancel tries to enqueue a batch, giving up if the supplied context times deadlines -// times out. If the batch is successfully enqueued, it returns true. -func (q *queue) enqueueWithCancel(ctx context.Context, qb queuedBatch) bool { - select { - case <-ctx.Done(): - return false - case q.q <- qb: - } - return true -} - -func (q *queue) run() { - for { - select { - case <-q.quit: - return - case qb := <-q.q: - // Since inside the actual send operation a context with time out is used, we should exceed that timeout - // instead of cancelling this send operation, since that batch has been taken out of the queue. - q.sendAndReport(context.Background(), qb.TenantID, qb.Batch) - } - } -} - -// closeAndDrain stops gracefully the queue. The process first stops the main routine that reads batches to be sent, -// to instead drain the queue and send those batches from this thread, exiting if the supplied context deadline -// is exceeded. Also, if the underlying buffered channel is fully drain, this will exit promptly. -func (q *queue) closeAndDrain(ctx context.Context) { - // defer main channel closing - defer close(q.q) - - // first stop main routine, and wait for it to signal - close(q.quit) - q.wg.Wait() - - // keep reading messages from sendQueue until all have been consumed, or timeout is exceeded - for { - select { - case qb := <-q.q: - // drain uses the same timeout, so if a timeout was applied to the parent context, it can cancel the underlying - // send operation preemptively. - q.sendAndReport(ctx, qb.TenantID, qb.Batch) - case <-ctx.Done(): - level.Warn(q.logger).Log("msg", "timeout exceeded while draining send queue") - return - default: - level.Debug(q.logger).Log("msg", "drain queue exited because there were no batches left to send") - return - // if default clause is taken, it means there's nothing left in the send queue - } - } -} - -// sendAndReport attempts to send the batch for the given tenant, and either way that operation succeeds or fails, reports -// the data as sent. -func (q *queue) sendAndReport(ctx context.Context, tenantId string, b *batch) { - q.client.sendBatch(ctx, tenantId, b) - // mark segment data for that batch as sent, even if the send operation failed - b.reportAsSentData(q.client.markerHandler) -} diff --git a/internal/component/common/loki/client/consumer_wal_test.go b/internal/component/common/loki/client/consumer_wal_test.go index dd72f308e16..35128db57fe 100644 --- a/internal/component/common/loki/client/consumer_wal_test.go +++ b/internal/component/common/loki/client/consumer_wal_test.go @@ -36,9 +36,9 @@ func TestWALConsumer(t *testing.T) { WatchConfig: wal.DefaultWatchConfig, } // start all necessary resources - testClientConfig, rwReceivedReqs, closeServer := newServerAndClientConfig(t) + testEndpointConfig, rwReceivedReqs, closeServer := newServerAndEndpointConfig(t) - consumer, err := NewWALConsumer(log.NewNopLogger(), prometheus.NewRegistry(), walConfig, testClientConfig) + consumer, err := NewWALConsumer(log.NewNopLogger(), prometheus.NewRegistry(), walConfig, testEndpointConfig) require.NoError(t, err) receivedRequests := util.NewSyncSlice[util.RemoteWriteRequest]() @@ -72,7 +72,7 @@ func TestWALConsumer(t *testing.T) { }, 5*time.Second, time.Second, "timed out waiting for requests to be received") var seenEntries = map[string]struct{}{} - // assert over rw client received entries + // assert over rw received entries defer receivedRequests.DoneIterate() for _, req := range receivedRequests.StartIterate() { require.Len(t, req.Request.Streams, 1, "expected 1 stream requests to be received") @@ -84,9 +84,9 @@ func TestWALConsumer(t *testing.T) { } func TestWALConsumer_MultipleConfigs(t *testing.T) { - testClientConfig, rwReceivedReqs, closeServer := newServerAndClientConfig(t) - testClientConfig2, rwReceivedReqs2, closeServer2 := newServerAndClientConfig(t) - testClientConfig2.Name = "test-client-2" + testEndpointConfig, rwReceivedReqs, closeServer := newServerAndEndpointConfig(t) + testEndpointConfig2, rwReceivedReqs2, closeServer2 := newServerAndEndpointConfig(t) + testEndpointConfig2.Name = "test-client-2" walConfig := wal.Config{ Dir: t.TempDir(), @@ -95,7 +95,7 @@ func TestWALConsumer_MultipleConfigs(t *testing.T) { MaxSegmentAge: time.Second * 10, } - consumer, err := NewWALConsumer(log.NewNopLogger(), prometheus.NewRegistry(), walConfig, testClientConfig, testClientConfig2) + consumer, err := NewWALConsumer(log.NewNopLogger(), prometheus.NewRegistry(), walConfig, testEndpointConfig, testEndpointConfig2) require.NoError(t, err) receivedRequests := util.NewSyncSlice[util.RemoteWriteRequest]() @@ -134,14 +134,14 @@ func TestWALConsumer_MultipleConfigs(t *testing.T) { } } - // times 2 due to clients being run + // times 2 due to endpoint being run expectedTotalLines := totalLines * 2 require.Eventually(t, func() bool { return receivedRequests.Length() == expectedTotalLines }, 5*time.Second, time.Second, "timed out waiting for requests to be received") var seenEntries int - // assert over rw client received entries + // assert over rw received entries defer receivedRequests.DoneIterate() for _, req := range receivedRequests.StartIterate() { require.Len(t, req.Request.Streams, 1, "expected 1 stream requests to be received") @@ -152,12 +152,12 @@ func TestWALConsumer_MultipleConfigs(t *testing.T) { } func TestWALConsumer_InvalidConfig(t *testing.T) { - t.Run("no clients", func(t *testing.T) { + t.Run("no endpoints", func(t *testing.T) { _, err := NewWALConsumer(log.NewNopLogger(), prometheus.NewRegistry(), wal.Config{}) require.Error(t, err) }) - t.Run("repeated client", func(t *testing.T) { + t.Run("repeated endpoints", func(t *testing.T) { host, _ := url.Parse("http://localhost:3100") config := Config{URL: flagext.URLValue{URL: host}} _, err := NewWALConsumer(log.NewNopLogger(), prometheus.NewRegistry(), wal.Config{}, config, config) @@ -166,7 +166,7 @@ func TestWALConsumer_InvalidConfig(t *testing.T) { } type testCase struct { - // numLines is the total number of lines sent through the client in the benchmark. + // numLines is the total number of lines sent through the endpoint in the benchmark. numLines int // numSeries is the different number of series to use in entries. Series are dynamically generated for each entry, but @@ -182,7 +182,7 @@ type testCase struct { expectedRWReqsCount int64 } -func TestWALClient(t *testing.T) { +func TestWALEndpoint(t *testing.T) { for name, tc := range map[string]testCase{ "small test": { numLines: 3, @@ -247,7 +247,6 @@ func TestWALClient(t *testing.T) { err := serverURL.Set(server.URL) require.NoError(t, err) - // Instance the client cfg := Config{ URL: serverURL, BatchWait: tc.batchWait, @@ -256,13 +255,15 @@ func TestWALClient(t *testing.T) { BackoffConfig: backoff.Config{MinBackoff: 5 * time.Second, MaxBackoff: 10 * time.Second, MaxRetries: 1}, Timeout: 1 * time.Second, TenantID: "", - Queue: tc.queueConfig, + QueueConfig: tc.queueConfig, } logger := log.NewLogfmtLogger(os.Stdout) + marker := internal.NewNopMarkerHandler() - wc, err := newWalClient(NewMetrics(reg), NewWALClientMetrics(reg).CurryWithId("test"), cfg, logger, internal.NewNopMarkerHandler()) + endpoint, err := newEndpoint(newMetrics(reg), cfg, logger, marker) require.NoError(t, err) + adapter := newWalEndpointAdapter(endpoint, logger, newWALEndpointMetrics(reg).CurryWithId("test"), marker) //labels := model.LabelSet{"app": "test"} lines := make([]string, 0, tc.numLines) @@ -273,7 +274,7 @@ func TestWALClient(t *testing.T) { // Send all the input log entries for i, l := range lines { mod := i % tc.numSeries - wc.StoreSeries([]record.RefSeries{ + adapter.StoreSeries([]record.RefSeries{ { Labels: labels.New( labels.Label{Name: "app", Value: fmt.Sprintf("test-%d", mod)}, @@ -282,7 +283,7 @@ func TestWALClient(t *testing.T) { }, }, 0) - _ = wc.AppendEntries(wal.RefEntries{ + _ = adapter.AppendEntries(wal.RefEntries{ Ref: chunks.HeadSeriesRef(mod), Entries: []push.Entry{{ Timestamp: time.Now(), @@ -299,14 +300,14 @@ func TestWALClient(t *testing.T) { require.Equal(t, tc.expectedRWReqsCount, receivedRWsCount.Load(), "number for remote write request not expected") } - // Stop the client: it waits until the current batch is sent - wc.Stop() + // Stop the endpoint: it waits until the current batch is sent + adapter.Stop() close(receivedReqsChan) }) } } -func BenchmarkClientImplementations(b *testing.B) { +func BenchmarkEndpointImplementations(b *testing.B) { for name, bc := range map[string]testCase{ "100 entries, single series, no batching": { numLines: 100, @@ -331,33 +332,33 @@ func BenchmarkClientImplementations(b *testing.B) { } { b.Run(name, func(b *testing.B) { b.Run("implementation=wal_nil_marker_handler", func(b *testing.B) { - runWALClientBenchCase(b, bc, func(t *testing.B) internal.MarkerHandler { + runWALEndpointBenchCase(b, bc, func(t *testing.B) internal.MarkerHandler { return internal.NewNopMarkerHandler() }) }) b.Run("implementation=wal_marker_handler", func(b *testing.B) { - runWALClientBenchCase(b, bc, func(t *testing.B) internal.MarkerHandler { + runWALEndpointBenchCase(b, bc, func(t *testing.B) internal.MarkerHandler { dir := b.TempDir() nopLogger := log.NewNopLogger() markerFileHandler, err := internal.NewMarkerFileHandler(nopLogger, dir) require.NoError(b, err) - markerHandler := internal.NewMarkerHandler(markerFileHandler, time.Minute, nopLogger, internal.NewMarkerMetrics(nil).WithCurriedId("test")) + markerHandler := internal.NewMarkerHandler(markerFileHandler, time.Minute, nopLogger, internal.NewMarkerMetrics(nil).CurryWithId("test")) return markerHandler }) }) b.Run("implementation=regular", func(b *testing.B) { - runRegularClientBenchCase(b, bc) + runEndpointBenchCase(b, bc) }) }) } } -func runWALClientBenchCase(b *testing.B, bc testCase, mhFactory func(t *testing.B) internal.MarkerHandler) { +func runWALEndpointBenchCase(b *testing.B, bc testCase, mhFactory func(t *testing.B) internal.MarkerHandler) { reg := prometheus.NewRegistry() // Create a buffer channel where we do enqueue received requests @@ -387,7 +388,6 @@ func runWALClientBenchCase(b *testing.B, bc testCase, mhFactory func(t *testing. err := serverURL.Set(server.URL) require.NoError(b, err) - // Instance the client cfg := Config{ URL: serverURL, BatchWait: time.Millisecond * 50, @@ -396,16 +396,18 @@ func runWALClientBenchCase(b *testing.B, bc testCase, mhFactory func(t *testing. BackoffConfig: backoff.Config{MinBackoff: 5 * time.Second, MaxBackoff: 10 * time.Second, MaxRetries: 1}, Timeout: 1 * time.Second, TenantID: "", - Queue: QueueConfig{ + QueueConfig: QueueConfig{ Capacity: 1000, // queue size of 100 DrainTimeout: time.Second * 10, }, } logger := log.NewLogfmtLogger(os.Stdout) + marker := mhFactory(b) - qc, err := newWalClient(NewMetrics(reg), NewWALClientMetrics(reg).CurryWithId("test"), cfg, logger, mhFactory(b)) + endpoint, err := newEndpoint(newMetrics(reg), cfg, logger, marker) require.NoError(b, err) + adapter := newWalEndpointAdapter(endpoint, logger, newWALEndpointMetrics(reg).CurryWithId("test"), marker) //labels := model.LabelSet{"app": "test"} var lines []string @@ -417,7 +419,7 @@ func runWALClientBenchCase(b *testing.B, bc testCase, mhFactory func(t *testing. // Send all the input log entries for j, l := range lines { seriesId := j % bc.numSeries - qc.StoreSeries([]record.RefSeries{ + adapter.StoreSeries([]record.RefSeries{ { Labels: labels.New( // take j module bc.numSeries to evenly distribute those numSeries across all sent entries @@ -427,7 +429,7 @@ func runWALClientBenchCase(b *testing.B, bc testCase, mhFactory func(t *testing. }, }, 0) - _ = qc.AppendEntries(wal.RefEntries{ + _ = adapter.AppendEntries(wal.RefEntries{ Ref: chunks.HeadSeriesRef(seriesId), Entries: []push.Entry{{ Timestamp: time.Now(), @@ -444,12 +446,12 @@ func runWALClientBenchCase(b *testing.B, bc testCase, mhFactory func(t *testing. reset() } - // Stop the client: it waits until the current batch is sent - qc.Stop() + // Stop the endpoint: it waits until the current batch is sent + adapter.Stop() close(receivedReqsChan) } -func runRegularClientBenchCase(b *testing.B, bc testCase) { +func runEndpointBenchCase(b *testing.B, bc testCase) { reg := prometheus.NewRegistry() // Create a buffer channel where we do enqueue received requests @@ -479,7 +481,6 @@ func runRegularClientBenchCase(b *testing.B, bc testCase) { err := serverURL.Set(server.URL) require.NoError(b, err) - // Instance the client cfg := Config{ URL: serverURL, BatchWait: time.Millisecond * 50, @@ -488,7 +489,7 @@ func runRegularClientBenchCase(b *testing.B, bc testCase) { BackoffConfig: backoff.Config{MinBackoff: 5 * time.Second, MaxBackoff: 10 * time.Second, MaxRetries: 1}, Timeout: 1 * time.Second, TenantID: "", - Queue: QueueConfig{ + QueueConfig: QueueConfig{ Capacity: 1000, // queue size of 100 DrainTimeout: time.Second * 10, }, @@ -496,8 +497,8 @@ func runRegularClientBenchCase(b *testing.B, bc testCase) { logger := log.NewLogfmtLogger(os.Stdout) - m := NewMetrics(reg) - qc, err := newClient(m, cfg, logger) + m := newMetrics(reg) + endpoint, err := newEndpoint(m, cfg, logger, internal.NewNopMarkerHandler()) require.NoError(b, err) //labels := model.LabelSet{"app": "test"} @@ -510,7 +511,7 @@ func runRegularClientBenchCase(b *testing.B, bc testCase) { // Send all the input log entries for j, l := range lines { seriesId := j % bc.numSeries - qc.Chan() <- loki.Entry{ + endpoint.enqueue(loki.Entry{ Labels: model.LabelSet{ // take j module bc.numSeries to evenly distribute those numSeries across all sent entries "app": model.LabelValue(fmt.Sprintf("series-%d", seriesId)), @@ -519,7 +520,7 @@ func runRegularClientBenchCase(b *testing.B, bc testCase) { Timestamp: time.Now(), Line: l, }, - } + }, 0) } require.Eventually(b, func() bool { @@ -530,7 +531,7 @@ func runRegularClientBenchCase(b *testing.B, bc testCase) { reset() } - // Stop the client: it waits until the current batch is sent - qc.Stop() + // Stop the endpoint: it waits until the current batch is sent + endpoint.stop() close(receivedReqsChan) } diff --git a/internal/component/common/loki/client/endpoint.go b/internal/component/common/loki/client/endpoint.go new file mode 100644 index 00000000000..d83fd032211 --- /dev/null +++ b/internal/component/common/loki/client/endpoint.go @@ -0,0 +1,80 @@ +package client + +import ( + "context" + "crypto/sha256" + "fmt" + "time" + + "github.com/go-kit/log" + "github.com/grafana/dskit/backoff" + + "github.com/grafana/alloy/internal/component/common/loki" + "github.com/grafana/alloy/internal/component/common/loki/client/internal" +) + +type endpoint struct { + cfg Config + entries chan loki.Entry + + ctx context.Context + cancel context.CancelFunc + + shards *shards +} + +func newEndpoint(metrics *metrics, cfg Config, logger log.Logger, markerHandler internal.MarkerHandler) (*endpoint, error) { + logger = log.With(logger, "component", "endpoint", "host", cfg.URL.Host) + + shards, err := newShards(metrics, logger, markerHandler, cfg) + if err != nil { + return nil, err + } + + ctx, cancel := context.WithCancel(context.Background()) + + c := &endpoint{ + cfg: cfg, + entries: make(chan loki.Entry), + shards: shards, + ctx: ctx, + cancel: cancel, + } + + c.shards.start(cfg.QueueConfig.MinShards) + return c, nil +} + +// enqueue will try to enqueue entry. If endpoint is stopped any active attempts will +// be stopped and false will be returned. +func (c *endpoint) enqueue(entry loki.Entry, segmentNum int) bool { + backoff := backoff.New(c.ctx, backoff.Config{ + MinBackoff: 5 * time.Millisecond, + MaxBackoff: 50 * time.Millisecond, + }) + + for !c.shards.enqueue(entry, segmentNum) { + backoff.Wait() + if !backoff.Ongoing() { + return false + } + } + return true +} + +func (c *endpoint) stop() { + c.cancel() + c.shards.stop() +} + +// getEndpointName computes the specific name for each endpoint config. The name is either the configured Name setting in Config, +// or a hash of the config as whole, this allows us to detect repeated configs. +func getEndpointName(cfg Config) string { + if cfg.Name != "" { + return cfg.Name + } + + h := sha256.New() + _, _ = fmt.Fprintf(h, "%v", cfg) + return fmt.Sprintf("%x", h.Sum(nil))[:6] +} diff --git a/internal/component/common/loki/client/endpoint_test.go b/internal/component/common/loki/client/endpoint_test.go new file mode 100644 index 00000000000..0e754fb7155 --- /dev/null +++ b/internal/component/common/loki/client/endpoint_test.go @@ -0,0 +1,450 @@ +package client + +import ( + "strings" + "testing" + "time" + + "github.com/go-kit/log" + "github.com/grafana/dskit/backoff" + "github.com/grafana/dskit/flagext" + "github.com/grafana/loki/pkg/push" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/testutil" + "github.com/prometheus/common/config" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/grafana/alloy/internal/component/common/loki" + "github.com/grafana/alloy/internal/component/common/loki/client/internal" + "github.com/grafana/alloy/internal/loki/util" +) + +func TestEndpoint(t *testing.T) { + tests := map[string]struct { + endpointConfig Config + serverResponseStatus int + inputEntries []loki.Entry + inputDelay time.Duration + expectedReqs []util.RemoteWriteRequest + expectedMetrics string + }{ + "batch log entries together until the batch size is reached": { + endpointConfig: Config{ + BatchSize: 10, + BatchWait: 100 * time.Millisecond, + }, + serverResponseStatus: 200, + inputEntries: []loki.Entry{logEntries[0], logEntries[1], logEntries[2]}, + expectedReqs: []util.RemoteWriteRequest{ + { + TenantID: "", + Request: push.PushRequest{Streams: []push.Stream{{Labels: "{}", Entries: []push.Entry{logEntries[0].Entry, logEntries[1].Entry}}}}, + }, + { + TenantID: "", + Request: push.PushRequest{Streams: []push.Stream{{Labels: "{}", Entries: []push.Entry{logEntries[2].Entry}}}}, + }, + }, + expectedMetrics: ` + # HELP loki_write_sent_entries_total Number of log entries sent to the ingester. + # TYPE loki_write_sent_entries_total counter + loki_write_sent_entries_total{host="__HOST__",tenant=""} 3.0 + # HELP loki_write_dropped_entries_total Number of log entries dropped because failed to be sent to the ingester after all retries. + # TYPE loki_write_dropped_entries_total counter + loki_write_dropped_entries_total{host="__HOST__",reason="ingester_error",tenant=""} 0 + loki_write_dropped_entries_total{host="__HOST__",reason="line_too_long",tenant=""} 0 + loki_write_dropped_entries_total{host="__HOST__",reason="rate_limited",tenant=""} 0 + loki_write_dropped_entries_total{host="__HOST__",reason="stream_limited",tenant=""} 0 + # HELP loki_write_mutated_entries_total The total number of log entries that have been mutated. + # TYPE loki_write_mutated_entries_total counter + loki_write_mutated_entries_total{host="__HOST__",reason="ingester_error",tenant=""} 0 + loki_write_mutated_entries_total{host="__HOST__",reason="line_too_long",tenant=""} 0 + loki_write_mutated_entries_total{host="__HOST__",reason="rate_limited",tenant=""} 0 + loki_write_mutated_entries_total{host="__HOST__",reason="stream_limited",tenant=""} 0 + # HELP loki_write_mutated_bytes_total The total number of bytes that have been mutated. + # TYPE loki_write_mutated_bytes_total counter + loki_write_mutated_bytes_total{host="__HOST__",reason="ingester_error",tenant=""} 0 + loki_write_mutated_bytes_total{host="__HOST__",reason="line_too_long",tenant=""} 0 + loki_write_mutated_bytes_total{host="__HOST__",reason="rate_limited",tenant=""} 0 + loki_write_mutated_bytes_total{host="__HOST__",reason="stream_limited",tenant=""} 0 + `, + }, + "batch log entries together until the batch wait time is reached": { + endpointConfig: Config{ + BatchSize: 10, + BatchWait: 100 * time.Millisecond, + }, + serverResponseStatus: 200, + inputEntries: []loki.Entry{logEntries[0], logEntries[1]}, + inputDelay: 110 * time.Millisecond, + expectedReqs: []util.RemoteWriteRequest{ + { + TenantID: "", + Request: push.PushRequest{Streams: []push.Stream{{Labels: "{}", Entries: []push.Entry{logEntries[0].Entry}}}}, + }, + { + TenantID: "", + Request: push.PushRequest{Streams: []push.Stream{{Labels: "{}", Entries: []push.Entry{logEntries[1].Entry}}}}, + }, + }, + expectedMetrics: ` + # HELP loki_write_sent_entries_total Number of log entries sent to the ingester. + # TYPE loki_write_sent_entries_total counter + loki_write_sent_entries_total{host="__HOST__",tenant=""} 2.0 + # HELP loki_write_dropped_entries_total Number of log entries dropped because failed to be sent to the ingester after all retries. + # TYPE loki_write_dropped_entries_total counter + loki_write_dropped_entries_total{host="__HOST__",reason="ingester_error",tenant=""} 0 + loki_write_dropped_entries_total{host="__HOST__",reason="line_too_long",tenant=""} 0 + loki_write_dropped_entries_total{host="__HOST__",reason="rate_limited",tenant=""} 0 + loki_write_dropped_entries_total{host="__HOST__",reason="stream_limited",tenant=""} 0 + # HELP loki_write_mutated_entries_total The total number of log entries that have been mutated. + # TYPE loki_write_mutated_entries_total counter + loki_write_mutated_entries_total{host="__HOST__",reason="ingester_error",tenant=""} 0 + loki_write_mutated_entries_total{host="__HOST__",reason="line_too_long",tenant=""} 0 + loki_write_mutated_entries_total{host="__HOST__",reason="rate_limited",tenant=""} 0 + loki_write_mutated_entries_total{host="__HOST__",reason="stream_limited",tenant=""} 0 + # HELP loki_write_mutated_bytes_total The total number of bytes that have been mutated. + # TYPE loki_write_mutated_bytes_total counter + loki_write_mutated_bytes_total{host="__HOST__",reason="ingester_error",tenant=""} 0 + loki_write_mutated_bytes_total{host="__HOST__",reason="line_too_long",tenant=""} 0 + loki_write_mutated_bytes_total{host="__HOST__",reason="rate_limited",tenant=""} 0 + loki_write_mutated_bytes_total{host="__HOST__",reason="stream_limited",tenant=""} 0 + `, + }, + "retry send a batch up to backoff's max retries in case the server responds with a 5xx": { + endpointConfig: Config{ + BatchSize: 10, + BatchWait: 10 * time.Millisecond, + }, + serverResponseStatus: 500, + inputEntries: []loki.Entry{logEntries[0]}, + expectedReqs: []util.RemoteWriteRequest{ + { + TenantID: "", + Request: push.PushRequest{Streams: []push.Stream{{Labels: "{}", Entries: []push.Entry{logEntries[0].Entry}}}}, + }, + { + TenantID: "", + Request: push.PushRequest{Streams: []push.Stream{{Labels: "{}", Entries: []push.Entry{logEntries[0].Entry}}}}, + }, + { + TenantID: "", + Request: push.PushRequest{Streams: []push.Stream{{Labels: "{}", Entries: []push.Entry{logEntries[0].Entry}}}}, + }, + }, + expectedMetrics: ` + # HELP loki_write_dropped_entries_total Number of log entries dropped because failed to be sent to the ingester after all retries. + # TYPE loki_write_dropped_entries_total counter + loki_write_dropped_entries_total{host="__HOST__",reason="ingester_error",tenant=""} 1 + loki_write_dropped_entries_total{host="__HOST__",reason="line_too_long",tenant=""} 0 + loki_write_dropped_entries_total{host="__HOST__",reason="rate_limited",tenant=""} 0 + loki_write_dropped_entries_total{host="__HOST__",reason="stream_limited",tenant=""} 0 + # HELP loki_write_mutated_entries_total The total number of log entries that have been mutated. + # TYPE loki_write_mutated_entries_total counter + loki_write_mutated_entries_total{host="__HOST__",reason="ingester_error",tenant=""} 0 + loki_write_mutated_entries_total{host="__HOST__",reason="line_too_long",tenant=""} 0 + loki_write_mutated_entries_total{host="__HOST__",reason="rate_limited",tenant=""} 0 + loki_write_mutated_entries_total{host="__HOST__",reason="stream_limited",tenant=""} 0 + # HELP loki_write_mutated_bytes_total The total number of bytes that have been mutated. + # TYPE loki_write_mutated_bytes_total counter + loki_write_mutated_bytes_total{host="__HOST__",reason="ingester_error",tenant=""} 0 + loki_write_mutated_bytes_total{host="__HOST__",reason="line_too_long",tenant=""} 0 + loki_write_mutated_bytes_total{host="__HOST__",reason="rate_limited",tenant=""} 0 + loki_write_mutated_bytes_total{host="__HOST__",reason="stream_limited",tenant=""} 0 + # HELP loki_write_sent_entries_total Number of log entries sent to the ingester. + # TYPE loki_write_sent_entries_total counter + loki_write_sent_entries_total{host="__HOST__",tenant=""} 0 + `, + }, + "do not retry send a batch in case the server responds with a 4xx": { + endpointConfig: Config{ + BatchSize: 10, + BatchWait: 10 * time.Millisecond, + }, + serverResponseStatus: 400, + inputEntries: []loki.Entry{logEntries[0]}, + expectedReqs: []util.RemoteWriteRequest{ + { + TenantID: "", + Request: push.PushRequest{Streams: []push.Stream{{Labels: "{}", Entries: []push.Entry{logEntries[0].Entry}}}}, + }, + }, + expectedMetrics: ` + # HELP loki_write_dropped_entries_total Number of log entries dropped because failed to be sent to the ingester after all retries. + # TYPE loki_write_dropped_entries_total counter + loki_write_dropped_entries_total{host="__HOST__",reason="ingester_error",tenant=""} 1 + loki_write_dropped_entries_total{host="__HOST__",reason="line_too_long",tenant=""} 0 + loki_write_dropped_entries_total{host="__HOST__",reason="rate_limited",tenant=""} 0 + loki_write_dropped_entries_total{host="__HOST__",reason="stream_limited",tenant=""} 0 + # HELP loki_write_mutated_entries_total The total number of log entries that have been mutated. + # TYPE loki_write_mutated_entries_total counter + loki_write_mutated_entries_total{host="__HOST__",reason="ingester_error",tenant=""} 0 + loki_write_mutated_entries_total{host="__HOST__",reason="line_too_long",tenant=""} 0 + loki_write_mutated_entries_total{host="__HOST__",reason="rate_limited",tenant=""} 0 + loki_write_mutated_entries_total{host="__HOST__",reason="stream_limited",tenant=""} 0 + # HELP loki_write_mutated_bytes_total The total number of bytes that have been mutated. + # TYPE loki_write_mutated_bytes_total counter + loki_write_mutated_bytes_total{host="__HOST__",reason="ingester_error",tenant=""} 0 + loki_write_mutated_bytes_total{host="__HOST__",reason="line_too_long",tenant=""} 0 + loki_write_mutated_bytes_total{host="__HOST__",reason="rate_limited",tenant=""} 0 + loki_write_mutated_bytes_total{host="__HOST__",reason="stream_limited",tenant=""} 0 + # HELP loki_write_sent_entries_total Number of log entries sent to the ingester. + # TYPE loki_write_sent_entries_total counter + loki_write_sent_entries_total{host="__HOST__",tenant=""} 0 + `, + }, + "do retry sending a batch in case the server responds with a 429": { + endpointConfig: Config{ + BatchSize: 10, + BatchWait: 10 * time.Millisecond, + }, + serverResponseStatus: 429, + inputEntries: []loki.Entry{logEntries[0]}, + expectedReqs: []util.RemoteWriteRequest{ + { + TenantID: "", + Request: push.PushRequest{Streams: []push.Stream{{Labels: "{}", Entries: []push.Entry{logEntries[0].Entry}}}}, + }, + { + TenantID: "", + Request: push.PushRequest{Streams: []push.Stream{{Labels: "{}", Entries: []push.Entry{logEntries[0].Entry}}}}, + }, + { + TenantID: "", + Request: push.PushRequest{Streams: []push.Stream{{Labels: "{}", Entries: []push.Entry{logEntries[0].Entry}}}}, + }, + }, + expectedMetrics: ` + # HELP loki_write_dropped_entries_total Number of log entries dropped because failed to be sent to the ingester after all retries. + # TYPE loki_write_dropped_entries_total counter + loki_write_dropped_entries_total{host="__HOST__",reason="ingester_error",tenant=""} 0 + loki_write_dropped_entries_total{host="__HOST__",reason="line_too_long",tenant=""} 0 + loki_write_dropped_entries_total{host="__HOST__",reason="rate_limited",tenant=""} 1 + loki_write_dropped_entries_total{host="__HOST__",reason="stream_limited",tenant=""} 0 + # HELP loki_write_mutated_entries_total The total number of log entries that have been mutated. + # TYPE loki_write_mutated_entries_total counter + loki_write_mutated_entries_total{host="__HOST__",reason="ingester_error",tenant=""} 0 + loki_write_mutated_entries_total{host="__HOST__",reason="line_too_long",tenant=""} 0 + loki_write_mutated_entries_total{host="__HOST__",reason="rate_limited",tenant=""} 0 + loki_write_mutated_entries_total{host="__HOST__",reason="stream_limited",tenant=""} 0 + # HELP loki_write_mutated_bytes_total The total number of bytes that have been mutated. + # TYPE loki_write_mutated_bytes_total counter + loki_write_mutated_bytes_total{host="__HOST__",reason="ingester_error",tenant=""} 0 + loki_write_mutated_bytes_total{host="__HOST__",reason="line_too_long",tenant=""} 0 + loki_write_mutated_bytes_total{host="__HOST__",reason="rate_limited",tenant=""} 0 + loki_write_mutated_bytes_total{host="__HOST__",reason="stream_limited",tenant=""} 0 + # HELP loki_write_sent_entries_total Number of log entries sent to the ingester. + # TYPE loki_write_sent_entries_total counter + loki_write_sent_entries_total{host="__HOST__",tenant=""} 0 + `, + }, + "do not retry in case of 429 when endpoint is configured to drop rate limited batches": { + endpointConfig: Config{ + BatchSize: 10, + BatchWait: 10 * time.Millisecond, + DropRateLimitedBatches: true, + }, + serverResponseStatus: 429, + inputEntries: []loki.Entry{logEntries[0]}, + expectedReqs: []util.RemoteWriteRequest{ + { + TenantID: "", + Request: push.PushRequest{Streams: []push.Stream{{Labels: "{}", Entries: []push.Entry{logEntries[0].Entry}}}}, + }, + }, + expectedMetrics: ` + # HELP loki_write_dropped_entries_total Number of log entries dropped because failed to be sent to the ingester after all retries. + # TYPE loki_write_dropped_entries_total counter + loki_write_dropped_entries_total{host="__HOST__",reason="ingester_error",tenant=""} 0 + loki_write_dropped_entries_total{host="__HOST__",reason="line_too_long",tenant=""} 0 + loki_write_dropped_entries_total{host="__HOST__",reason="rate_limited",tenant=""} 1 + loki_write_dropped_entries_total{host="__HOST__",reason="stream_limited",tenant=""} 0 + # HELP loki_write_mutated_entries_total The total number of log entries that have been mutated. + # TYPE loki_write_mutated_entries_total counter + loki_write_mutated_entries_total{host="__HOST__",reason="ingester_error",tenant=""} 0 + loki_write_mutated_entries_total{host="__HOST__",reason="line_too_long",tenant=""} 0 + loki_write_mutated_entries_total{host="__HOST__",reason="rate_limited",tenant=""} 0 + loki_write_mutated_entries_total{host="__HOST__",reason="stream_limited",tenant=""} 0 + # HELP loki_write_mutated_bytes_total The total number of bytes that have been mutated. + # TYPE loki_write_mutated_bytes_total counter + loki_write_mutated_bytes_total{host="__HOST__",reason="ingester_error",tenant=""} 0 + loki_write_mutated_bytes_total{host="__HOST__",reason="line_too_long",tenant=""} 0 + loki_write_mutated_bytes_total{host="__HOST__",reason="rate_limited",tenant=""} 0 + loki_write_mutated_bytes_total{host="__HOST__",reason="stream_limited",tenant=""} 0 + # HELP loki_write_sent_entries_total Number of log entries sent to the ingester. + # TYPE loki_write_sent_entries_total counter + loki_write_sent_entries_total{host="__HOST__",tenant=""} 0 + `, + }, + "batch log entries together honoring the endpoint tenant ID": { + endpointConfig: Config{ + BatchSize: 100, + BatchWait: 100 * time.Millisecond, + TenantID: "tenant-default", + }, + serverResponseStatus: 200, + inputEntries: []loki.Entry{logEntries[0], logEntries[1]}, + expectedReqs: []util.RemoteWriteRequest{ + { + TenantID: "tenant-default", + Request: push.PushRequest{Streams: []push.Stream{{Labels: "{}", Entries: []push.Entry{logEntries[0].Entry, logEntries[1].Entry}}}}, + }, + }, + expectedMetrics: ` + # HELP loki_write_sent_entries_total Number of log entries sent to the ingester. + # TYPE loki_write_sent_entries_total counter + loki_write_sent_entries_total{host="__HOST__",tenant="tenant-default"} 2.0 + # HELP loki_write_dropped_entries_total Number of log entries dropped because failed to be sent to the ingester after all retries. + # TYPE loki_write_dropped_entries_total counter + loki_write_dropped_entries_total{host="__HOST__", reason="ingester_error", tenant="tenant-default"} 0 + loki_write_dropped_entries_total{host="__HOST__",reason="line_too_long",tenant="tenant-default"} 0 + loki_write_dropped_entries_total{host="__HOST__", reason="rate_limited", tenant="tenant-default"} 0 + loki_write_dropped_entries_total{host="__HOST__",reason="stream_limited",tenant="tenant-default"} 0 + # HELP loki_write_mutated_entries_total The total number of log entries that have been mutated. + # TYPE loki_write_mutated_entries_total counter + loki_write_mutated_entries_total{host="__HOST__",reason="ingester_error",tenant="tenant-default"} 0 + loki_write_mutated_entries_total{host="__HOST__",reason="line_too_long",tenant="tenant-default"} 0 + loki_write_mutated_entries_total{host="__HOST__",reason="rate_limited",tenant="tenant-default"} 0 + loki_write_mutated_entries_total{host="__HOST__",reason="stream_limited",tenant="tenant-default"} 0 + # HELP loki_write_mutated_bytes_total The total number of bytes that have been mutated. + # TYPE loki_write_mutated_bytes_total counter + loki_write_mutated_bytes_total{host="__HOST__",reason="ingester_error",tenant="tenant-default"} 0 + loki_write_mutated_bytes_total{host="__HOST__",reason="line_too_long",tenant="tenant-default"} 0 + loki_write_mutated_bytes_total{host="__HOST__",reason="rate_limited",tenant="tenant-default"} 0 + loki_write_mutated_bytes_total{host="__HOST__",reason="stream_limited",tenant="tenant-default"} 0 + `, + }, + "batch log entries together honoring the tenant ID overridden while processing the pipeline stages": { + endpointConfig: Config{ + BatchSize: 100, + BatchWait: 100 * time.Millisecond, + TenantID: "tenant-default", + }, + serverResponseStatus: 200, + inputEntries: []loki.Entry{logEntries[0], logEntries[3], logEntries[4], logEntries[5]}, + expectedReqs: []util.RemoteWriteRequest{ + { + TenantID: "tenant-default", + Request: push.PushRequest{Streams: []push.Stream{{Labels: "{}", Entries: []push.Entry{logEntries[0].Entry}}}}, + }, + { + TenantID: "tenant-1", + Request: push.PushRequest{Streams: []push.Stream{{Labels: "{}", Entries: []push.Entry{logEntries[3].Entry, logEntries[4].Entry}}}}, + }, + { + TenantID: "tenant-2", + Request: push.PushRequest{Streams: []push.Stream{{Labels: "{}", Entries: []push.Entry{logEntries[5].Entry}}}}, + }, + }, + expectedMetrics: ` + # HELP loki_write_sent_entries_total Number of log entries sent to the ingester. + # TYPE loki_write_sent_entries_total counter + loki_write_sent_entries_total{host="__HOST__",tenant="tenant-1"} 2.0 + loki_write_sent_entries_total{host="__HOST__",tenant="tenant-2"} 1.0 + loki_write_sent_entries_total{host="__HOST__",tenant="tenant-default"} 1.0 + # HELP loki_write_dropped_entries_total Number of log entries dropped because failed to be sent to the ingester after all retries. + # TYPE loki_write_dropped_entries_total counter + loki_write_dropped_entries_total{host="__HOST__",reason="ingester_error",tenant="tenant-1"} 0 + loki_write_dropped_entries_total{host="__HOST__",reason="ingester_error",tenant="tenant-2"} 0 + loki_write_dropped_entries_total{host="__HOST__",reason="ingester_error",tenant="tenant-default"} 0 + loki_write_dropped_entries_total{host="__HOST__",reason="line_too_long",tenant="tenant-1"} 0 + loki_write_dropped_entries_total{host="__HOST__",reason="line_too_long",tenant="tenant-2"} 0 + loki_write_dropped_entries_total{host="__HOST__",reason="line_too_long",tenant="tenant-default"} 0 + loki_write_dropped_entries_total{host="__HOST__",reason="rate_limited",tenant="tenant-1"} 0 + loki_write_dropped_entries_total{host="__HOST__",reason="rate_limited",tenant="tenant-2"} 0 + loki_write_dropped_entries_total{host="__HOST__",reason="rate_limited",tenant="tenant-default"} 0 + loki_write_dropped_entries_total{host="__HOST__",reason="stream_limited",tenant="tenant-1"} 0 + loki_write_dropped_entries_total{host="__HOST__",reason="stream_limited",tenant="tenant-2"} 0 + loki_write_dropped_entries_total{host="__HOST__",reason="stream_limited",tenant="tenant-default"} 0 + # HELP loki_write_mutated_entries_total The total number of log entries that have been mutated. + # TYPE loki_write_mutated_entries_total counter + loki_write_mutated_entries_total{host="__HOST__",reason="ingester_error",tenant="tenant-1"} 0 + loki_write_mutated_entries_total{host="__HOST__",reason="ingester_error",tenant="tenant-2"} 0 + loki_write_mutated_entries_total{host="__HOST__",reason="ingester_error",tenant="tenant-default"} 0 + loki_write_mutated_entries_total{host="__HOST__",reason="line_too_long",tenant="tenant-1"} 0 + loki_write_mutated_entries_total{host="__HOST__",reason="line_too_long",tenant="tenant-2"} 0 + loki_write_mutated_entries_total{host="__HOST__",reason="line_too_long",tenant="tenant-default"} 0 + loki_write_mutated_entries_total{host="__HOST__",reason="rate_limited",tenant="tenant-1"} 0 + loki_write_mutated_entries_total{host="__HOST__",reason="rate_limited",tenant="tenant-2"} 0 + loki_write_mutated_entries_total{host="__HOST__",reason="rate_limited",tenant="tenant-default"} 0 + loki_write_mutated_entries_total{host="__HOST__",reason="stream_limited",tenant="tenant-1"} 0 + loki_write_mutated_entries_total{host="__HOST__",reason="stream_limited",tenant="tenant-2"} 0 + loki_write_mutated_entries_total{host="__HOST__",reason="stream_limited",tenant="tenant-default"} 0 + # HELP loki_write_mutated_bytes_total The total number of bytes that have been mutated. + # TYPE loki_write_mutated_bytes_total counter + loki_write_mutated_bytes_total{host="__HOST__",reason="ingester_error",tenant="tenant-1"} 0 + loki_write_mutated_bytes_total{host="__HOST__",reason="ingester_error",tenant="tenant-2"} 0 + loki_write_mutated_bytes_total{host="__HOST__",reason="ingester_error",tenant="tenant-default"} 0 + loki_write_mutated_bytes_total{host="__HOST__",reason="line_too_long",tenant="tenant-1"} 0 + loki_write_mutated_bytes_total{host="__HOST__",reason="line_too_long",tenant="tenant-2"} 0 + loki_write_mutated_bytes_total{host="__HOST__",reason="line_too_long",tenant="tenant-default"} 0 + loki_write_mutated_bytes_total{host="__HOST__",reason="rate_limited",tenant="tenant-1"} 0 + loki_write_mutated_bytes_total{host="__HOST__",reason="rate_limited",tenant="tenant-2"} 0 + loki_write_mutated_bytes_total{host="__HOST__",reason="rate_limited",tenant="tenant-default"} 0 + loki_write_mutated_bytes_total{host="__HOST__",reason="stream_limited",tenant="tenant-1"} 0 + loki_write_mutated_bytes_total{host="__HOST__",reason="stream_limited",tenant="tenant-2"} 0 + loki_write_mutated_bytes_total{host="__HOST__",reason="stream_limited",tenant="tenant-default"} 0 + `, + }, + } + + for testName, tt := range tests { + t.Run(testName, func(t *testing.T) { + reg := prometheus.NewRegistry() + + // Create a buffer channel where we do enqueue received requests + receivedReqsChan := make(chan util.RemoteWriteRequest, 10) + + // Start a local HTTP server + server := util.NewRemoteWriteServer(receivedReqsChan, tt.serverResponseStatus) + require.NotNil(t, server) + defer server.Close() + + // Get the URL at which the local test server is listening to + serverURL := flagext.URLValue{} + err := serverURL.Set(server.URL) + require.NoError(t, err) + + tt.endpointConfig.URL = serverURL + tt.endpointConfig.Client = config.DefaultHTTPClientConfig + tt.endpointConfig.BackoffConfig = backoff.Config{MinBackoff: 1 * time.Millisecond, MaxBackoff: 2 * time.Millisecond, MaxRetries: 3} + tt.endpointConfig.Timeout = 1 * time.Second + + m := newMetrics(reg) + c, err := newEndpoint(m, tt.endpointConfig, log.NewNopLogger(), internal.NewNopMarkerHandler()) + require.NoError(t, err) + + // Send all the input log entries + for i, logEntry := range tt.inputEntries { + c.enqueue(logEntry, 0) + + if tt.inputDelay > 0 && i < len(tt.inputEntries)-1 { + time.Sleep(tt.inputDelay) + } + } + + // Wait until the expected push requests are received (with a timeout) + deadline := time.Now().Add(1 * time.Second) + for len(receivedReqsChan) < len(tt.expectedReqs) && time.Now().Before(deadline) { + time.Sleep(5 * time.Millisecond) + } + + // Stop the endpoint: it waits until the current batch is sent + c.stop() + close(receivedReqsChan) + + // Get all push requests received on the server side + receivedReqs := make([]util.RemoteWriteRequest, 0) + for req := range receivedReqsChan { + receivedReqs = append(receivedReqs, req) + } + + assert.ElementsMatch(t, tt.expectedReqs, receivedReqs) + + expectedMetrics := strings.ReplaceAll(tt.expectedMetrics, "__HOST__", serverURL.Host) + err = testutil.GatherAndCompare(reg, strings.NewReader(expectedMetrics), "loki_write_sent_entries_total", "loki_write_dropped_entries_total", "loki_write_mutated_entries_total", "loki_write_mutated_bytes_total") + assert.NoError(t, err) + }) + } +} diff --git a/internal/component/common/loki/client/internal/marker_handler_test.go b/internal/component/common/loki/client/internal/marker_handler_test.go index 71610610a5d..1049e88d07f 100644 --- a/internal/component/common/loki/client/internal/marker_handler_test.go +++ b/internal/component/common/loki/client/internal/marker_handler_test.go @@ -31,7 +31,7 @@ func (m *mockMarkerFileHandler) MarkSegment(segment int) { func TestMarkerHandler(t *testing.T) { logger := log.NewLogfmtLogger(os.Stdout) // drive-by test: if metrics don't have the id curried, it panics when emitting them - metrics := NewMarkerMetrics(nil).WithCurriedId("test") + metrics := NewMarkerMetrics(nil).CurryWithId("test") t.Run("returns last marked segment from file handler on start", func(t *testing.T) { mockMFH := newMockMarkerFileHandler(10) mh := NewMarkerHandler(mockMFH, time.Minute, logger, metrics) diff --git a/internal/component/common/loki/client/internal/metrics.go b/internal/component/common/loki/client/internal/metrics.go index 85d83af5fb5..07c15c043c8 100644 --- a/internal/component/common/loki/client/internal/metrics.go +++ b/internal/component/common/loki/client/internal/metrics.go @@ -27,9 +27,9 @@ func NewMarkerMetrics(reg prometheus.Registerer) *MarkerMetrics { return m } -// WithCurriedId returns a curried version of MarkerMetrics, with the id label pre-filled. This is a helper that avoids +// CurryWithId returns a curried version of MarkerMetrics, with the id label pre-filled. This is a helper that avoids // having to move the id around where it's unnecessary, and won't change inside the consumer of the metrics. -func (m *MarkerMetrics) WithCurriedId(id string) *MarkerMetrics { +func (m *MarkerMetrics) CurryWithId(id string) *MarkerMetrics { return &MarkerMetrics{ lastMarkedSegment: m.lastMarkedSegment.MustCurryWith(map[string]string{ "id": id, diff --git a/internal/component/common/loki/client/metrics.go b/internal/component/common/loki/client/metrics.go index 8129829a03e..c74b0e339a7 100644 --- a/internal/component/common/loki/client/metrics.go +++ b/internal/component/common/loki/client/metrics.go @@ -6,19 +6,19 @@ import ( ) const ( - HostLabel = "host" - TenantLabel = "tenant" - ReasonLabel = "reason" - - ReasonGeneric = "ingester_error" - ReasonRateLimited = "rate_limited" - ReasonStreamLimited = "stream_limited" - ReasonLineTooLong = "line_too_long" + labelHost = "host" + labelTenant = "tenant" + labelReason = "reason" + + reasonGeneric = "ingester_error" + reasonRateLimited = "rate_limited" + reasonStreamLimited = "stream_limited" + reasonLineTooLong = "line_too_long" ) -var Reasons = []string{ReasonGeneric, ReasonRateLimited, ReasonStreamLimited, ReasonLineTooLong} +var reasons = []string{reasonGeneric, reasonRateLimited, reasonStreamLimited, reasonLineTooLong} -type Metrics struct { +type metrics struct { encodedBytes *prometheus.CounterVec sentBytes *prometheus.CounterVec droppedBytes *prometheus.CounterVec @@ -32,45 +32,45 @@ type Metrics struct { countersWithHostTenantReason []*prometheus.CounterVec } -func NewMetrics(reg prometheus.Registerer) *Metrics { - var m Metrics +func newMetrics(reg prometheus.Registerer) *metrics { + var m metrics m.encodedBytes = prometheus.NewCounterVec(prometheus.CounterOpts{ Name: "loki_write_encoded_bytes_total", Help: "Number of bytes encoded and ready to send.", - }, []string{HostLabel, TenantLabel}) + }, []string{labelHost, labelTenant}) m.sentBytes = prometheus.NewCounterVec(prometheus.CounterOpts{ Name: "loki_write_sent_bytes_total", Help: "Number of bytes sent.", - }, []string{HostLabel, TenantLabel}) + }, []string{labelHost, labelTenant}) m.droppedBytes = prometheus.NewCounterVec(prometheus.CounterOpts{ Name: "loki_write_dropped_bytes_total", Help: "Number of bytes dropped because failed to be sent to the ingester after all retries.", - }, []string{HostLabel, TenantLabel, ReasonLabel}) + }, []string{labelHost, labelTenant, labelReason}) m.sentEntries = prometheus.NewCounterVec(prometheus.CounterOpts{ Name: "loki_write_sent_entries_total", Help: "Number of log entries sent to the ingester.", - }, []string{HostLabel, TenantLabel}) + }, []string{labelHost, labelTenant}) m.droppedEntries = prometheus.NewCounterVec(prometheus.CounterOpts{ Name: "loki_write_dropped_entries_total", Help: "Number of log entries dropped because failed to be sent to the ingester after all retries.", - }, []string{HostLabel, TenantLabel, ReasonLabel}) + }, []string{labelHost, labelTenant, labelReason}) m.mutatedEntries = prometheus.NewCounterVec(prometheus.CounterOpts{ Name: "loki_write_mutated_entries_total", Help: "The total number of log entries that have been mutated.", - }, []string{HostLabel, TenantLabel, ReasonLabel}) + }, []string{labelHost, labelTenant, labelReason}) m.mutatedBytes = prometheus.NewCounterVec(prometheus.CounterOpts{ Name: "loki_write_mutated_bytes_total", Help: "The total number of bytes that have been mutated.", - }, []string{HostLabel, TenantLabel, ReasonLabel}) + }, []string{labelHost, labelTenant, labelReason}) m.requestDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{ Name: "loki_write_request_duration_seconds", Help: "Duration of send requests.", - }, []string{"status_code", HostLabel, TenantLabel}) + }, []string{"status_code", labelHost, labelTenant}) m.batchRetries = prometheus.NewCounterVec(prometheus.CounterOpts{ Name: "loki_write_batch_retries_total", Help: "Number of times batches has had to be retried.", - }, []string{HostLabel, TenantLabel}) + }, []string{labelHost, labelTenant}) m.countersWithHostTenant = []*prometheus.CounterVec{ m.batchRetries, m.encodedBytes, m.sentBytes, m.sentEntries, @@ -95,12 +95,12 @@ func NewMetrics(reg prometheus.Registerer) *Metrics { return &m } -type WALClientMetrics struct { +type walEndpointMetrics struct { lastReadTimestamp *prometheus.GaugeVec } -func NewWALClientMetrics(reg prometheus.Registerer) *WALClientMetrics { - m := &WALClientMetrics{ +func newWALEndpointMetrics(reg prometheus.Registerer) *walEndpointMetrics { + m := &walEndpointMetrics{ lastReadTimestamp: prometheus.NewGaugeVec( prometheus.GaugeOpts{ Namespace: "loki_write", @@ -118,8 +118,8 @@ func NewWALClientMetrics(reg prometheus.Registerer) *WALClientMetrics { return m } -func (m *WALClientMetrics) CurryWithId(id string) *WALClientMetrics { - return &WALClientMetrics{ +func (m *walEndpointMetrics) CurryWithId(id string) *walEndpointMetrics { + return &walEndpointMetrics{ lastReadTimestamp: m.lastReadTimestamp.MustCurryWith(map[string]string{ "id": id, }), diff --git a/internal/component/common/loki/client/shards.go b/internal/component/common/loki/client/shards.go new file mode 100644 index 00000000000..ead98982a6f --- /dev/null +++ b/internal/component/common/loki/client/shards.go @@ -0,0 +1,528 @@ +package client + +import ( + "bufio" + "bytes" + "context" + "errors" + "fmt" + "io" + "net/http" + "strconv" + "sync" + "time" + + "github.com/go-kit/log" + "github.com/grafana/dskit/backoff" + "github.com/prometheus/common/config" + "go.uber.org/atomic" + + "github.com/grafana/alloy/internal/component/common/loki" + "github.com/grafana/alloy/internal/runtime/logging/level" + "github.com/grafana/alloy/internal/useragent" +) + +const ( + // Label reserved to override the tenant ID while processing + // pipeline stages + ReservedLabelTenantID = "__tenant_id__" +) + +// queuedBatch is a batch specific to a tenant, that is considered ready to be sent. +type queuedBatch struct { + TenantID string + Batch *batch +} + +func newQueue(metrics *metrics, logger log.Logger, cfg Config) *queue { + // Capacity is the worst case size in bytes desired for the send queue. This value is used to calculate the size of + // the buffered channel. The worst case scenario assumed is that every batch buffered in full, hence + // the channel capacity would be calculated as: bufferChannelSize = Capacity / BatchSize. + // For example, assuming BatchSize is the 1 MiB default and Capacity is 100 MiB, + // the underlying buffered channel would buffer up to 100 batches. + capacity := max(cfg.QueueConfig.Capacity/max(cfg.BatchSize, 1), 1) + + return &queue{ + cfg: cfg, + metrics: metrics, + logger: logger, + + batches: make(map[string]*batch), + c: make(chan queuedBatch, capacity), + } +} + +// queue for batching and sending log entries to Loki. +// The queue maintains separate batches per tenant and enqueues batches when they +// reach the configured batch size limit. +type queue struct { + cfg Config + metrics *metrics + logger log.Logger + c chan queuedBatch + + mu sync.Mutex + // batches maintains one active batch per tenant. When a batch reaches + // the size limit, it's moved to the channel and a new batch is created + // for that tenant. + batches map[string]*batch +} + +// append adds a log entry to the queue for the given tenant. +// It returns true if the entry was successfully queued, false if the queue +// is full and backpressure should be applied. +func (q *queue) append(tenantID string, entry loki.Entry, segmentNum int) bool { + q.mu.Lock() + defer q.mu.Unlock() + + batch, ok := q.batches[tenantID] + if !ok { + // Create a new batch for this tenant. + batch := newBatch(q.cfg.MaxStreams) + _ = batch.add(entry, segmentNum) + q.batches[tenantID] = batch + return true + } + + // If adding this entry would exceed the batch size limit, enqueue the + // current batch and start a new one. + if batch.sizeBytesAfter(entry.Entry) > q.cfg.BatchSize { + select { + case q.c <- queuedBatch{Batch: batch, TenantID: tenantID}: + // Successfully enqueued the batch. + default: + // Channel is full, signal backpressure. + return false + } + + batch := newBatch(q.cfg.MaxStreams) + _ = batch.add(entry, segmentNum) + q.batches[tenantID] = batch + return true + } + + // Add entry to existing batch. If we cannot add entry to batch we will drop it. + if err := batch.add(entry, segmentNum); err != nil { + level.Error(q.logger).Log("msg", "batch add err", "tenant", tenantID, "error", err) + reason := reasonGeneric + if errors.Is(err, errMaxStreamsLimitExceeded) { + reason = reasonStreamLimited + } + q.metrics.droppedBytes.WithLabelValues(q.cfg.URL.Host, tenantID, reason).Add(float64(len(entry.Line))) + q.metrics.droppedEntries.WithLabelValues(q.cfg.URL.Host, tenantID, reason).Inc() + } + + return true +} + +// channel returns the channel used to receive batches ready to be sent. +func (q *queue) channel() chan queuedBatch { + return q.c +} + +// drain retrieves all batches that are ready to be sent. +// It returns all batches currently in the channel and all batches +// from the batches map that have exceeded BatchWait. +func (q *queue) drain() []queuedBatch { + q.mu.Lock() + defer q.mu.Unlock() + + var batches []queuedBatch + + // First drain all batches in queue. +loop: + for { + select { + case b, ok := <-q.c: + if !ok { + break loop + } + batches = append(batches, b) + default: + break loop + } + } + + // Then check batches that are not queued but should be flushed anyway. + for tenantID, batch := range q.batches { + if batch.age() < q.cfg.BatchWait { + continue + } + + // Batch has exceeded wait time, remove from map and return it. + delete(q.batches, tenantID) + batches = append(batches, queuedBatch{ + TenantID: tenantID, + Batch: batch, + }) + } + + return batches +} + +// flushAndShutdown flushes all remaining batches and closes the channel. +// It will stop early if the done channel is closed. +func (q *queue) flushAndShutdown(done chan struct{}) { +loop: + for q.tryEnqueueingBatch(done) { + select { + case <-done: + break loop + case <-time.After(time.Second): + } + } + + q.mu.Lock() + defer q.mu.Unlock() + q.batches = nil + close(q.c) +} + +// tryEnqueueingBatch tries to send a batch if necessary. If sending needs to +// be retried it will return true. +func (q *queue) tryEnqueueingBatch(done <-chan struct{}) bool { + q.mu.Lock() + defer q.mu.Unlock() + + for tenantID, batch := range q.batches { + select { + case q.c <- queuedBatch{Batch: batch, TenantID: tenantID}: + // Successfully queued a batch. If we have more we should retry this. + delete(q.batches, tenantID) + return len(q.batches) > 0 + case <-done: + // Shutdown timeout reached, stop trying to flush. + return false + default: + // Queue is full so we should try again. + return true + } + } + return false +} + +// newShards creates a new shards instance for parallel processing of log entries. +// It validates the configuration and creates an HTTP client for sending batches to Loki. +func newShards(metrics *metrics, logger log.Logger, markerHandler SentDataMarkerHandler, cfg Config) (*shards, error) { + if cfg.URL.URL == nil { + return nil, errors.New("endpoint needs target URL") + } + + err := cfg.Client.Validate() + if err != nil { + return nil, err + } + + client, err := config.NewClientFromConfig(cfg.Client, useragent.ProductName, config.WithHTTP2Disabled()) + if err != nil { + return nil, err + } + + client.Timeout = cfg.Timeout + + return &shards{ + cfg: cfg, + logger: logger, + metrics: metrics, + client: client, + markerHandler: markerHandler, + tenants: make(map[string]struct{}), + }, nil +} + +// shards manages multiple parallel queues for processing and sending log entries to Loki. +// It uses sharding to distribute entries across multiple worker goroutines based on label fingerprints, +// enabling parallel processing and improved throughput. Each shard has its own queue and worker goroutine. +// Entries are routed to shards using a hash of their label fingerprint. +type shards struct { + cfg Config + logger log.Logger + metrics *metrics + client *http.Client + markerHandler SentDataMarkerHandler + + mut sync.Mutex + tenants map[string]struct{} + queues []*queue + + // running is used to track the number of running shards. + running atomic.Int32 + onceDone sync.Once + // done is used to signal that all shards have finished. + done chan struct{} + + // softShutdown is used to signal that no new entries should be accepted. + softShutdown chan struct{} + ctx context.Context + // cancel is used to cancel the context when a hard shutdown is initiated. + cancel context.CancelFunc +} + +// start initializes n shards and starts worker goroutines for each one. +// Each shard gets its own queue and a dedicated worker that processes batches +// from that queue. The number of shards determines the parallelism level. +func (s *shards) start(n int) { + n = max(n, 1) + + s.mut.Lock() + defer s.mut.Unlock() + + queues := make([]*queue, n) + + for i := range n { + queues[i] = newQueue(s.metrics, s.logger, s.cfg) + } + + s.queues = queues + s.ctx, s.cancel = context.WithCancel(context.Background()) + s.running.Store(int32(n)) + s.onceDone = sync.Once{} + s.done = make(chan struct{}) + s.softShutdown = make(chan struct{}) + + for i := range n { + go s.runShard(s.queues[i]) + } +} + +// stop tries to perform a graceful shutdown of all shards. +// It first attempts a soft shutdown by signaling that no new entries should be accepted +// and allowing all queues to flush their remaining batches within the drain timeout. +// If the drain timeout is exceeded, it performs a hard shutdown that will drop any remaining batches. +func (s *shards) stop() { + s.mut.Lock() + defer s.mut.Unlock() + + // Attempt a soft shutdown, meaning that all shards try to flush their remaining batches. + close(s.softShutdown) + + for _, q := range s.queues { + go q.flushAndShutdown(s.done) + } + + select { + case <-s.done: + return + case <-time.After(s.cfg.QueueConfig.DrainTimeout): + } + + level.Warn(s.logger).Log("msg", "failed to flush all queues during shutdown") + + // Perform hard shutdown + s.cancel() + <-s.done +} + +// runShard is the worker goroutine that processes batches from a single queue. +func (s *shards) runShard(q *queue) { + // Given that a shard handles multiple batches (1 per tenant) and each batch + // can be created at a different point in time, we look for batches whose + // max wait time has been reached every 10 times per BatchWait, so that the + // maximum delay we have sending batches is 10% of the max waiting time. + // We apply a cap of 10ms to the ticker, to avoid too frequent checks in + // case the BatchWait is very low. + const minWaitCheckFrequency = 10 * time.Millisecond + maxWaitCheckFrequency := max(s.cfg.BatchWait/10, minWaitCheckFrequency) + + maxWaitCheck := time.NewTicker(maxWaitCheckFrequency) + defer func() { + maxWaitCheck.Stop() + + if s.running.Dec() == 0 { + s.onceDone.Do(func() { close(s.done) }) + } + }() + + for { + select { + case <-s.ctx.Done(): + // Context is closed when hard shutdown is initiated. + return + case b, ok := <-q.channel(): + if !ok { + // Channel is closed, when a graceful shutdown is successful. + return + } + s.sendBatch(b.TenantID, b.Batch) + case <-maxWaitCheck.C: + // Drain all batches that have exceeded the max wait time. + for _, b := range q.drain() { + s.sendBatch(b.TenantID, b.Batch) + } + } + } +} + +// enqueue routes a log entry to the appropriate shard based on its label fingerprint. +// Returns false if we could not enqueue the entry, either because the shard is shutting down or the queue is full. +// It is up to the caller to retry or drop the entry. +func (s *shards) enqueue(entry loki.Entry, segmentNum int) bool { + s.mut.Lock() + defer s.mut.Unlock() + + entry, tenantID := s.processEntry(entry) + if _, ok := s.tenants[tenantID]; !ok { + s.tenants[tenantID] = struct{}{} + s.initBatchMetrics(tenantID) + } + + fingerprint := entry.Labels.FastFingerprint() + shard := uint64(fingerprint) % uint64(len(s.queues)) + + select { + case <-s.softShutdown: + return false + default: + return s.queues[shard].append(tenantID, entry, segmentNum) + } +} + +func (s *shards) initBatchMetrics(tenantID string) { + // Initialize counters to 0 so the metrics are exported before the first + // occurrence of incrementing to avoid missing metrics. + for _, counter := range s.metrics.countersWithHostTenantReason { + for _, reason := range reasons { + counter.WithLabelValues(s.cfg.URL.Host, tenantID, reason).Add(0) + } + } + + for _, counter := range s.metrics.countersWithHostTenant { + counter.WithLabelValues(s.cfg.URL.Host, tenantID).Add(0) + } +} + +func (s *shards) processEntry(e loki.Entry) (loki.Entry, string) { + // Check if it has been overridden while processing the pipeline stages + if value, ok := e.Labels[ReservedLabelTenantID]; ok { + return e, string(value) + } + + return e, s.cfg.TenantID +} + +// sendBatch encodes a batch and sends it to Loki with retry logic. +func (s *shards) sendBatch(tenantID string, batch *batch) { + defer batch.reportAsSentData(s.markerHandler) + buf, entriesCount, err := batch.encode() + + if err != nil { + level.Error(s.logger).Log("msg", "error encoding batch", "error", err) + return + } + + bufBytes := float64(len(buf)) + s.metrics.encodedBytes.WithLabelValues(s.cfg.URL.Host, tenantID).Add(bufBytes) + + backoff := backoff.New(s.ctx, s.cfg.BackoffConfig) + var status int + for { + start := time.Now() + // send uses `timeout` internally, so `context.Background` is good enough. + status, err = s.send(context.Background(), tenantID, buf) + + s.metrics.requestDuration.WithLabelValues(strconv.Itoa(status), s.cfg.URL.Host, tenantID).Observe(time.Since(start).Seconds()) + + // Immediately drop rate limited batches to avoid HOL blocking for other tenants not experiencing throttling + if s.cfg.DropRateLimitedBatches && batchIsRateLimited(status) { + level.Warn(s.logger).Log("msg", "dropping batch due to rate limiting applied at ingester") + s.metrics.droppedBytes.WithLabelValues(s.cfg.URL.Host, tenantID, reasonRateLimited).Add(bufBytes) + s.metrics.droppedEntries.WithLabelValues(s.cfg.URL.Host, tenantID, reasonRateLimited).Add(float64(entriesCount)) + return + } + + if err == nil { + s.metrics.sentBytes.WithLabelValues(s.cfg.URL.Host, tenantID).Add(bufBytes) + s.metrics.sentEntries.WithLabelValues(s.cfg.URL.Host, tenantID).Add(float64(entriesCount)) + return + } + + // Only retry 429s, 500s and connection-level errors. + if status > 0 && !batchIsRateLimited(status) && status/100 != 5 { + break + } + + level.Debug(s.logger).Log("msg", "error sending batch, will retry", "status", status, "tenant", tenantID, "error", err) + s.metrics.batchRetries.WithLabelValues(s.cfg.URL.Host, tenantID).Inc() + backoff.Wait() + + // Make sure it sends at least once before checking for retry. + if !backoff.Ongoing() { + break + } + } + + level.Error(s.logger).Log("msg", "final error sending batch, no retries left, dropping data", "status", status, "tenant", tenantID, "error", err) + // If the reason for the last retry error was rate limiting, count the drops as such, even if the previous errors + // were for a different reason + dropReason := reasonGeneric + if batchIsRateLimited(status) { + dropReason = reasonRateLimited + } + s.metrics.droppedBytes.WithLabelValues(s.cfg.URL.Host, tenantID, dropReason).Add(bufBytes) + s.metrics.droppedEntries.WithLabelValues(s.cfg.URL.Host, tenantID, dropReason).Add(float64(entriesCount)) +} + +var userAgent = useragent.Get() + +// send performs the HTTP POST request to send a batch to Loki. +func (s *shards) send(ctx context.Context, tenantID string, buf []byte) (int, error) { + ctx, cancel := context.WithTimeout(ctx, s.cfg.Timeout) + defer cancel() + req, err := http.NewRequestWithContext(ctx, "POST", s.cfg.URL.String(), bytes.NewReader(buf)) + if err != nil { + return -1, err + } + + const contentType = "application/x-protobuf" + req.Header.Set("Content-Type", contentType) + req.Header.Set("User-Agent", userAgent) + + // If the tenant ID is not empty alloy is running in multi-tenant mode, so + // we should send it to Loki + if tenantID != "" { + req.Header.Set("X-Scope-OrgID", tenantID) + } + + // Add custom headers on request + if len(s.cfg.Headers) > 0 { + for k, v := range s.cfg.Headers { + if req.Header.Get(k) == "" { + req.Header.Add(k, v) + } else { + level.Warn(s.logger).Log("msg", "custom header key already exists, skipping", "key", k) + } + } + } + + resp, err := s.client.Do(req) + if err != nil { + return -1, err + } + + // NOTE: it is important in go to fully read the body and + // close it so that the connection can be reused. + // We only partially read the body if we encounter a non 2xx error + // so we should always consume whats left. + // https://github.com/golang/go/blob/32a9804c7ba3f4a0e0bd26cc24b9204860a49ec8/src/net/http/response.go#L59-L64 + // It is unclear that we always need to drain the body but + // https://github.com/golang/go/issues/60240#issuecomment-1551060433 seems to indicate that we should. + defer func() { + _, _ = io.Copy(io.Discard, resp.Body) + _ = resp.Body.Close() + }() + + if resp.StatusCode/100 != 2 { + const maxErrMsgLen = 1024 + scanner := bufio.NewScanner(io.LimitReader(resp.Body, maxErrMsgLen)) + line := "" + if scanner.Scan() { + line = scanner.Text() + } + err = fmt.Errorf("server returned HTTP status %s (%d): %s", resp.Status, resp.StatusCode, line) + } + return resp.StatusCode, err +} + +func batchIsRateLimited(status int) bool { + return status == 429 +} diff --git a/internal/component/common/loki/client/shards_test.go b/internal/component/common/loki/client/shards_test.go new file mode 100644 index 00000000000..7aa915db5cd --- /dev/null +++ b/internal/component/common/loki/client/shards_test.go @@ -0,0 +1,176 @@ +package client + +import ( + "sync" + "testing" + "time" + + "github.com/go-kit/log" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/model" + "github.com/stretchr/testify/assert" + + "github.com/grafana/alloy/internal/component/common/loki" + "github.com/grafana/loki/pkg/push" +) + +// each entry counts as 4 bytes. +var entry = loki.Entry{ + Labels: model.LabelSet{"foo": "bar"}, + Entry: push.Entry{Timestamp: time.Now(), Line: "test"}, +} + +func TestQueue_append(t *testing.T) { + // a queue with 8 bytes batches and only one batch can queued. + q := newQueue(newMetrics(prometheus.NewRegistry()), log.NewNopLogger(), Config{ + BatchSize: 8, + QueueConfig: QueueConfig{ + Capacity: 8, + }, + }) + + // add 2 entries to the queue + for range 2 { + queued := q.append("tenant-1", entry, 0) + assert.True(t, queued) + } + assert.Equal(t, q.batches["tenant-1"].sizeBytes(), 8) + + // add two more entries, the current batch should be queued and a new batch should be created. + for range 2 { + queued := q.append("tenant-1", entry, 0) + assert.True(t, queued) + } + assert.Equal(t, q.batches["tenant-1"].sizeBytes(), 8) + + // adding one more should fail because both queue and batch is full + queued := q.append("tenant-1", entry, 0) + assert.False(t, queued) + + // dequeue current batch. + <-q.channel() + + // add batch again. + queued = q.append("tenant-1", entry, 0) + assert.True(t, queued) + assert.Equal(t, q.batches["tenant-1"].sizeBytes(), 4) +} + +func TestQueue_drain(t *testing.T) { + t.Run("should drain queue and current batch", func(t *testing.T) { + // a queue with 8 bytes batches and only one batch can queued at any given time. + q := newQueue(newMetrics(prometheus.NewRegistry()), log.NewNopLogger(), Config{ + BatchSize: 8, + QueueConfig: QueueConfig{ + Capacity: 8, + }, + }) + + // fill up queue and current batch + for range 4 { + queued := q.append("tenant-1", entry, 0) + assert.True(t, queued) + } + assert.Equal(t, q.batches["tenant-1"].sizeBytes(), 8) + + batches := q.drain() + // We should drain queued batch and batch stored in memory + assert.Len(t, batches, 2) + }) + + t.Run("should only drain queue", func(t *testing.T) { + // a queue with 8 bytes batches and only one batch can queued at any given time. + q := newQueue(newMetrics(prometheus.NewRegistry()), log.NewNopLogger(), Config{ + BatchSize: 8, + BatchWait: 10 * time.Second, + QueueConfig: QueueConfig{ + Capacity: 8, + }, + }) + + // fill up queue and current batch + for range 4 { + queued := q.append("tenant-1", entry, 0) + assert.True(t, queued) + } + assert.Equal(t, q.batches["tenant-1"].sizeBytes(), 8) + + batches := q.drain() + // We should drain queued batch and batch stored in memory + assert.Len(t, batches, 1) + }) +} + +func TestQueue_flushAndShutdown(t *testing.T) { + t.Run("should flush all batches to queue", func(t *testing.T) { + // a queue with 8 bytes batches and only one batch can queued at any given time. + q := newQueue(newMetrics(prometheus.NewRegistry()), log.NewNopLogger(), Config{ + BatchSize: 8, + QueueConfig: QueueConfig{ + Capacity: 8, + }, + }) + + // fill current batch but don't queue it. + for range 2 { + queued := q.append("tenant-1", entry, 0) + assert.True(t, queued) + } + assert.Equal(t, q.batches["tenant-1"].sizeBytes(), 8) + + var wg sync.WaitGroup + + wg.Go(func() { + done := make(chan struct{}) + defer close(done) + q.flushAndShutdown(done) + }) + + wg.Go(func() { + var batches []queuedBatch + for { + b, ok := <-q.channel() + if !ok { + break + } + batches = append(batches, b) + } + assert.Len(t, batches, 1) + }) + wg.Wait() + }) + + t.Run("should stop early if done channel is closed", func(t *testing.T) { + // a queue with 8 bytes batches and only one batch can queued at any given time. + q := newQueue(newMetrics(prometheus.NewRegistry()), log.NewNopLogger(), Config{ + BatchSize: 8, + QueueConfig: QueueConfig{ + Capacity: 8, + }, + }) + + // fill current batch but don't queue it. + for range 4 { + queued := q.append("tenant-1", entry, 0) + assert.True(t, queued) + } + + // Create and immediately close the done channel. + done := make(chan struct{}) + close(done) + + // Flush and shutdown - should stop early when done channel is signaled. + q.flushAndShutdown(done) + + // Verify batches map is nil. + assert.Nil(t, q.batches) + + // First batch should already be in queue. + _, ok := <-q.channel() + assert.True(t, ok) + + // Second batch should not have been queued + _, ok = <-q.channel() + assert.False(t, ok) + }) +} diff --git a/internal/component/loki/write/types.go b/internal/component/loki/write/types.go index e83b715a560..b5e1b7f0d69 100644 --- a/internal/component/loki/write/types.go +++ b/internal/component/loki/write/types.go @@ -70,22 +70,25 @@ func (r *EndpointOptions) Validate() error { return nil } -// QueueConfig controls how the queue logs remote write client is configured. Note that this client is only used when the -// loki.write component has WAL support enabled. +// QueueConfig controls how shards and queue are configured for endpoint. type QueueConfig struct { Capacity units.Base2Bytes `alloy:"capacity,attr,optional"` + MinShards int `alloy:"min_shards,attr,optional"` DrainTimeout time.Duration `alloy:"drain_timeout,attr,optional"` } +var defaultQueueConfig = QueueConfig{ + Capacity: 10 * units.MiB, // considering the default BatchSize of 1MiB, this gives us a default buffered channel of size 10 + MinShards: 1, + DrainTimeout: 15 * time.Second, +} + // SetToDefault implements syntax.Defaulter. func (q *QueueConfig) SetToDefault() { - *q = QueueConfig{ - Capacity: 10 * units.MiB, // considering the default BatchSize of 1MiB, this gives us a default buffered channel of size 10 - DrainTimeout: 15 * time.Second, - } + *q = defaultQueueConfig } -func (args Arguments) convertClientConfigs() []client.Config { +func (args Arguments) convertEndpointConfigs() []client.Config { var res []client.Config for _, cfg := range args.Endpoints { url, _ := url.Parse(cfg.URL) @@ -105,8 +108,9 @@ func (args Arguments) convertClientConfigs() []client.Config { TenantID: cfg.TenantID, MaxStreams: args.MaxStreams, DropRateLimitedBatches: !cfg.RetryOnHTTP429, - Queue: client.QueueConfig{ + QueueConfig: client.QueueConfig{ Capacity: int(cfg.QueueConfig.Capacity), + MinShards: cfg.QueueConfig.MinShards, DrainTimeout: cfg.QueueConfig.DrainTimeout, }, } diff --git a/internal/component/loki/write/write.go b/internal/component/loki/write/write.go index 61d568ed4be..48e1e186be3 100644 --- a/internal/component/loki/write/write.go +++ b/internal/component/loki/write/write.go @@ -2,6 +2,7 @@ package write import ( "context" + "errors" "fmt" "path/filepath" "sync" @@ -151,6 +152,10 @@ func (c *Component) Run(ctx context.Context) error { func (c *Component) Update(args component.Arguments) error { newArgs := args.(Arguments) + if err := validateConfigStabilityLevel(c.opts, newArgs); err != nil { + return err + } + c.mut.Lock() defer c.mut.Unlock() c.args = newArgs @@ -164,7 +169,7 @@ func (c *Component) Update(args component.Arguments) error { c.consumer.Stop() } - cfgs := newArgs.convertClientConfigs() + cfgs := newArgs.convertEndpointConfigs() uid := alloyseed.Get().UID for i := range cfgs { @@ -211,3 +216,13 @@ func newEntryHandler(handler loki.EntryHandler, externalLabels model.LabelSet) l return e }) } + +func validateConfigStabilityLevel(o component.Options, args Arguments) error { + canUseExperimentalConfig := o.MinStability.Permits(featuregate.StabilityExperimental) + for _, e := range args.Endpoints { + if e.QueueConfig != defaultQueueConfig && !canUseExperimentalConfig { + return errors.New("changing queue_config requires stability.level flag to be experimental") + } + } + return nil +} diff --git a/internal/component/loki/write/write_test.go b/internal/component/loki/write/write_test.go index 392cf813fb8..7dc64e204ae 100644 --- a/internal/component/loki/write/write_test.go +++ b/internal/component/loki/write/write_test.go @@ -14,10 +14,12 @@ import ( "github.com/stretchr/testify/require" "go.uber.org/atomic" + "github.com/grafana/alloy/internal/component" "github.com/grafana/alloy/internal/component/common/loki" "github.com/grafana/alloy/internal/component/common/loki/wal" "github.com/grafana/alloy/internal/component/discovery" lsf "github.com/grafana/alloy/internal/component/loki/source/file" + "github.com/grafana/alloy/internal/featuregate" loki_util "github.com/grafana/alloy/internal/loki/util" "github.com/grafana/alloy/internal/runtime/componenttest" "github.com/grafana/alloy/internal/util" @@ -318,6 +320,48 @@ func testMultipleEndpoint(t *testing.T, alterArgs func(arguments *Arguments)) { } } +func TestComponentExperimentalConfig(t *testing.T) { + t.Run("should not be able to create component with experimental config without correct flag", func(t *testing.T) { + var args Arguments + err := syntax.Unmarshal([]byte(` + endpoint { + url = "test.com" + queue_config { + min_shards = 2 + } + } + `), &args) + require.NoError(t, err) + + _, err = New(component.Options{ + MinStability: featuregate.StabilityGenerallyAvailable, + OnStateChange: func(e component.Exports) {}, + }, args) + + require.Error(t, err) + }) + + t.Run("should be able to create component with experimental config correct flag", func(t *testing.T) { + var args Arguments + err := syntax.Unmarshal([]byte(` + endpoint { + url = "test.com" + queue_config { + min_shards = 2 + } + } + `), &args) + require.NoError(t, err) + + _, err = New(component.Options{ + MinStability: featuregate.StabilityExperimental, + OnStateChange: func(e component.Exports) {}, + }, args) + + require.NoError(t, err) + }) +} + type testCase struct { linesCount int seriesCount int