From 33d14b043294313e39b37a1033bb30f599326731 Mon Sep 17 00:00:00 2001
From: Kalle <23356117+kalleep@users.noreply.github.com>
Date: Wed, 26 Nov 2025 14:46:31 +0100
Subject: [PATCH 01/12] Add new proposal
---
docs/design/xxx-reliable-loki-pipelines.md | 109 +++++++++++++++++++++
1 file changed, 109 insertions(+)
create mode 100644 docs/design/xxx-reliable-loki-pipelines.md
diff --git a/docs/design/xxx-reliable-loki-pipelines.md b/docs/design/xxx-reliable-loki-pipelines.md
new file mode 100644
index 00000000000..47ffaa45eee
--- /dev/null
+++ b/docs/design/xxx-reliable-loki-pipelines.md
@@ -0,0 +1,109 @@
+# Proposal: Reliable Loki pipelines
+
+* Author: Karl Persson (@kalleep)
+* Last updated: 2025-11-26
+* Discussion link: TODO
+
+## Abstract
+
+Alloy's Loki pipelines currently use channels, which limits throughput due to head-of-line blocking and can causes silent log drops during config reloads or shutdowns.
+
+This proposal introduces a function-based pipeline using a `Consumer` interface, replacing the channel-based design. Source components will call `Consume()` directly on downstream components, enabling parallel processing and returning errors that sources can use for retry logic or proper HTTP error responses.
+
+## Problem
+
+Loki pipelines in Alloy are built using (unbuffered) channels, a design inherited from promtail.
+
+This comes with two big limitations:
+1. Throughput of each component is limited due to head-of-line blocking, where pushing to the next channel may not be possible in the presence of a slow component. An example of this is usage of [secretfilter](https://github.com/grafana/alloy/issues/3694).
+2. Because there is no way to signal back to the source, components can silently drop logs during config reload or shutdown and there is no to detect that.
+
+Consider the following simplified config:
+```
+loki.source.file "logs" {
+ targets = targets
+ forward_to = [loki.process.logs.receiver]
+}
+
+loki.process "logs" {
+ forward_to = [loki.write.loki.receiver]
+}
+
+loki.write "loki" {}
+```
+
+`loki.source.file` will tail all files from targets and compete to send on the channel exposed by `loki.process`. Only one entry will be processed by each stage configured in `loki.process`. If a reload happens or if alloy is shutting down logs could be silently dropped.
+
+## Proposal 0: Do nothing
+This architecture works in most cases, it will be hard to use slow components such as `secretfilter` because a lot of the time it's too slow.
+It's also hard to use Alloy as a gateway for loki pipelines with e.g. `loki.source.api` due to the limitations listed above.
+
+## Proposal 1: Chain function calls
+
+Loki pipelines are the only one using channels for passing data between components. Prometheus, Pyroscope and otelcol are all using this pattern where each component just calls functions on the next.
+
+They all have slightly different interfaces but basically work the same. Each component exports its own interface like Appender for Prometheus or Consumer for Otel.
+
+We could adopt the same pattern for loki pipelines as well with the following interface:
+
+```go
+type Consumer interface {
+ Consume(ctx context.Context, entries []Entry) error
+}
+```
+
+Adopting this pattern for loki pipelines would change it from a channel based pipeline to a function based pipeline. This would give us two things:
+1. Increased throughput because several sources such as many files or http requests can now call the next component in the pipeline at the same time.
+2. A way to return signals back to the source so we can handle things like giving a proper error response or determine if the position file should be updated.
+
+Solving the issues listed above.
+
+A batch of entries should be considered successfully consumed when they are queued up for sending. We could try to extend this to when it was successfully sent over the wire, but that could be considered an improvement at a later stage.
+
+### Handling fan-out failures
+
+Because a pipeline can "fan-out" to multiple paths, it can also partially succeed. We need to determine how to handle this.
+
+Two options to handle this:
+* Always retry if one or more failed - This could lead to duplicated logs but is easy and safe to implement. This is also how otelcol works.
+ * When using `loki.source.api`, we would return a 5xx error so the caller can retry.
+ * When using `loki.source.file`, we would retry the same batch again.
+* Configuration option `min_success` - Only retry if we don't succeed on at least the configured number of destinations.
+
+### Affected components
+
+The following components need to be updated with this new interface and we need to make sure they are concurrency safe:
+
+**Source components** (need to call `Consume()` and handle errors):
+- `loki.source.file`
+- `loki.source.api`
+- `loki.source.kafka`
+- `loki.source.journal`
+- `loki.source.docker`
+- `loki.source.kubernetes`
+- `loki.source.kubernetes_events`
+- `loki.source.podlogs`
+- `loki.source.syslog`
+- `loki.source.gelf`
+- `loki.source.cloudflare`
+- `loki.source.gcplog`
+- `loki.source.heroku`
+- `loki.source.azure_event_hubs`
+- `loki.source.aws_firehose`
+- `loki.source.windowsevent`
+
+**Processing components** (need to implement `Consumer` and forward to next):
+- `loki.process`
+- `loki.relabel`
+- `loki.secretfilter`
+- `loki.enrich`
+
+**Sink components** (need to implement `Consumer`):
+- `loki.write`
+- `loki.echo`
+
+Pros:
+* Increase throughput of log pipelines.
+* A way to signal back to the source
+Cons:
+* We need to rewrite all loki components with this new pattern and make them safe to call in parallel.
From 21542a9c4d608a157265016314b7981d9da9e65e Mon Sep 17 00:00:00 2001
From: Kalle <23356117+kalleep@users.noreply.github.com>
Date: Wed, 26 Nov 2025 15:20:25 +0100
Subject: [PATCH 02/12] update link
---
...liable-loki-pipelines.md => 4940-reliable-loki-pipelines.md} | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
rename docs/design/{xxx-reliable-loki-pipelines.md => 4940-reliable-loki-pipelines.md} (98%)
diff --git a/docs/design/xxx-reliable-loki-pipelines.md b/docs/design/4940-reliable-loki-pipelines.md
similarity index 98%
rename from docs/design/xxx-reliable-loki-pipelines.md
rename to docs/design/4940-reliable-loki-pipelines.md
index 47ffaa45eee..db19bebe7bf 100644
--- a/docs/design/xxx-reliable-loki-pipelines.md
+++ b/docs/design/4940-reliable-loki-pipelines.md
@@ -2,7 +2,7 @@
* Author: Karl Persson (@kalleep)
* Last updated: 2025-11-26
-* Discussion link: TODO
+* Discussion link: https://github.com/grafana/alloy/pull/4940
## Abstract
From a3f92e2fcab9220891297954cfd8c9213aae2dbe Mon Sep 17 00:00:00 2001
From: Kalle <23356117+kalleep@users.noreply.github.com>
Date: Fri, 28 Nov 2025 11:25:24 +0100
Subject: [PATCH 03/12] Add additional proposal
---
docs/design/4940-reliable-loki-pipelines.md | 62 ++++++++++++++++++---
1 file changed, 55 insertions(+), 7 deletions(-)
diff --git a/docs/design/4940-reliable-loki-pipelines.md b/docs/design/4940-reliable-loki-pipelines.md
index db19bebe7bf..4c7796050e9 100644
--- a/docs/design/4940-reliable-loki-pipelines.md
+++ b/docs/design/4940-reliable-loki-pipelines.md
@@ -6,7 +6,7 @@
## Abstract
-Alloy's Loki pipelines currently use channels, which limits throughput due to head-of-line blocking and can causes silent log drops during config reloads or shutdowns.
+Alloy's Loki pipelines currently use channels, which limits throughput due to head-of-line blocking and can cause silent log drops during config reloads or shutdowns.
This proposal introduces a function-based pipeline using a `Consumer` interface, replacing the channel-based design. Source components will call `Consume()` directly on downstream components, enabling parallel processing and returning errors that sources can use for retry logic or proper HTTP error responses.
@@ -16,7 +16,7 @@ Loki pipelines in Alloy are built using (unbuffered) channels, a design inherite
This comes with two big limitations:
1. Throughput of each component is limited due to head-of-line blocking, where pushing to the next channel may not be possible in the presence of a slow component. An example of this is usage of [secretfilter](https://github.com/grafana/alloy/issues/3694).
-2. Because there is no way to signal back to the source, components can silently drop logs during config reload or shutdown and there is no to detect that.
+2. Because there is no way to signal back to the source, components can silently drop logs during config reload or shutdown and there is no way to detect that.
Consider the following simplified config:
```
@@ -60,6 +60,58 @@ Solving the issues listed above.
A batch of entries should be considered successfully consumed when they are queued up for sending. We could try to extend this to when it was successfully sent over the wire, but that could be considered an improvement at a later stage.
+Pros:
+* Increase throughput of log pipelines.
+* A way to signal back to the source
+Cons:
+* We need to rewrite all loki components with this new pattern and make them safe to call in parallel.
+* We go from an iterator like pipeline to passing slices around. Every component would have to iterate over this slice and we need to make sure it's safe to mutate because of fanout.
+
+## Proposal 2: Appendable
+
+The prometheus pipeline uses [Appendable](https://github.com/prometheus/prometheus/blob/main/storage/interface.go#L62).
+Appendable only has one method `Appender` that will return an implementation of [Appender](https://github.com/prometheus/prometheus/blob/main/storage/interface.go#L270).
+
+We could adopt this pattern for loki pipelines by having:
+```go
+type Appendable interface {
+ Appender(ctx context.Context) Appender
+}
+
+type Appender interface {
+ Append(entry Entry) error
+ Commit() error
+ Rollback() error
+}
+```
+
+This approach would, like Proposal 1, solve the issues listed above with a function-based pipeline, but the pipeline would still be iterator-like (one entry at a time).
+
+### How it works
+
+Sources would obtain an `Appender` from the next component in the pipeline, then:
+1. Call `Append(entry)` for each entry in a batch
+2. Call `Commit()` to finalize the batch, or `Rollback()` to discard it
+3. Handle errors at any step
+
+Processing components would:
+Implement `Appendable` to return an `Appender` that runs processing for each entry and forwards it to next component.
+
+Sink components would:
+Implement `Appendable` to return an `Appender` that buffers entries until either `Commit` or `Rollback` is called.
+
+Pros:
+* Increase throughput of log pipelines.
+* A way to signal back to the source
+* Iterator-like pipeline - one entry at a time
+* Transaction semantics provide better error handling
+* Aligns with Prometheus patterns already in use
+Cons:
+* We need to rewrite all loki components with this new pattern and make them safe to call in parallel.
+* More complex API
+
+## Considerations for implementation
+
### Handling fan-out failures
Because a pipeline can "fan-out" to multiple paths, it can also partially succeed. We need to determine how to handle this.
@@ -102,8 +154,4 @@ The following components need to be updated with this new interface and we need
- `loki.write`
- `loki.echo`
-Pros:
-* Increase throughput of log pipelines.
-* A way to signal back to the source
-Cons:
-* We need to rewrite all loki components with this new pattern and make them safe to call in parallel.
+
From 2880e03cdba1f429fe3ea8ce9e0a566a0e087fe5 Mon Sep 17 00:00:00 2001
From: Kalle <23356117+kalleep@users.noreply.github.com>
Date: Fri, 28 Nov 2025 11:27:15 +0100
Subject: [PATCH 04/12] spelling
---
docs/design/4940-reliable-loki-pipelines.md | 20 ++++++++------------
1 file changed, 8 insertions(+), 12 deletions(-)
diff --git a/docs/design/4940-reliable-loki-pipelines.md b/docs/design/4940-reliable-loki-pipelines.md
index 4c7796050e9..c0cd53a04f2 100644
--- a/docs/design/4940-reliable-loki-pipelines.md
+++ b/docs/design/4940-reliable-loki-pipelines.md
@@ -32,7 +32,7 @@ loki.process "logs" {
loki.write "loki" {}
```
-`loki.source.file` will tail all files from targets and compete to send on the channel exposed by `loki.process`. Only one entry will be processed by each stage configured in `loki.process`. If a reload happens or if alloy is shutting down logs could be silently dropped.
+`loki.source.file` will tail all files from targets and compete to send on the channel exposed by `loki.process`. Only one entry will be processed by each stage configured in `loki.process`. If a reload happens or if Alloy is shutting down, logs could be silently dropped.
## Proposal 0: Do nothing
This architecture works in most cases, it will be hard to use slow components such as `secretfilter` because a lot of the time it's too slow.
@@ -40,7 +40,7 @@ It's also hard to use Alloy as a gateway for loki pipelines with e.g. `loki.sour
## Proposal 1: Chain function calls
-Loki pipelines are the only one using channels for passing data between components. Prometheus, Pyroscope and otelcol are all using this pattern where each component just calls functions on the next.
+Loki pipelines are the only ones using channels for passing data between components. Prometheus, Pyroscope and otelcol are all using this pattern where each component just calls functions on the next.
They all have slightly different interfaces but basically work the same. Each component exports its own interface like Appender for Prometheus or Consumer for Otel.
@@ -52,7 +52,7 @@ type Consumer interface {
}
```
-Adopting this pattern for loki pipelines would change it from a channel based pipeline to a function based pipeline. This would give us two things:
+Adopting this pattern for loki pipelines would change it from a channel-based pipeline to a function-based pipeline. This would give us two things:
1. Increased throughput because several sources such as many files or http requests can now call the next component in the pipeline at the same time.
2. A way to return signals back to the source so we can handle things like giving a proper error response or determine if the position file should be updated.
@@ -65,7 +65,7 @@ Pros:
* A way to signal back to the source
Cons:
* We need to rewrite all loki components with this new pattern and make them safe to call in parallel.
-* We go from an iterator like pipeline to passing slices around. Every component would have to iterate over this slice and we need to make sure it's safe to mutate because of fanout.
+* We go from an iterator-like pipeline to passing slices around. Every component would have to iterate over this slice and we need to make sure it's safe to mutate because of fan-out.
## Proposal 2: Appendable
@@ -88,14 +88,12 @@ type Appender interface {
This approach would, like Proposal 1, solve the issues listed above with a function-based pipeline, but the pipeline would still be iterator-like (one entry at a time).
### How it works
-
-Sources would obtain an `Appender` from the next component in the pipeline, then:
-1. Call `Append(entry)` for each entry in a batch
-2. Call `Commit()` to finalize the batch, or `Rollback()` to discard it
-3. Handle errors at any step
+Source components would:
+Obtain an `Appender` that can fan-out to all downstream components, then call `Append` for each entry.
+If every call to `Append` is successful, `Commit` should be called; otherwise `Rollback`.
Processing components would:
-Implement `Appendable` to return an `Appender` that runs processing for each entry and forwards it to next component.
+Implement `Appendable` to return an `Appender` that runs processing for each entry and fan-out to all downstream components.
Sink components would:
Implement `Appendable` to return an `Appender` that buffers entries until either `Commit` or `Rollback` is called.
@@ -153,5 +151,3 @@ The following components need to be updated with this new interface and we need
**Sink components** (need to implement `Consumer`):
- `loki.write`
- `loki.echo`
-
-
From c16eed75b99438685dc69b03b111e4087f9ef09f Mon Sep 17 00:00:00 2001
From: Kalle <23356117+kalleep@users.noreply.github.com>
Date: Fri, 28 Nov 2025 11:30:30 +0100
Subject: [PATCH 05/12] Update date
---
docs/design/4940-reliable-loki-pipelines.md | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/docs/design/4940-reliable-loki-pipelines.md b/docs/design/4940-reliable-loki-pipelines.md
index c0cd53a04f2..b93422fa774 100644
--- a/docs/design/4940-reliable-loki-pipelines.md
+++ b/docs/design/4940-reliable-loki-pipelines.md
@@ -1,7 +1,7 @@
# Proposal: Reliable Loki pipelines
* Author: Karl Persson (@kalleep)
-* Last updated: 2025-11-26
+* Last updated: 2025-11-30
* Discussion link: https://github.com/grafana/alloy/pull/4940
## Abstract
From 35093641ed8b4cc9da8a34b2f8fabb281426840b Mon Sep 17 00:00:00 2001
From: Kalle <23356117+kalleep@users.noreply.github.com>
Date: Fri, 28 Nov 2025 13:46:08 +0100
Subject: [PATCH 06/12] remove pro
---
docs/design/4940-reliable-loki-pipelines.md | 3 +--
1 file changed, 1 insertion(+), 2 deletions(-)
diff --git a/docs/design/4940-reliable-loki-pipelines.md b/docs/design/4940-reliable-loki-pipelines.md
index b93422fa774..d175d5d2229 100644
--- a/docs/design/4940-reliable-loki-pipelines.md
+++ b/docs/design/4940-reliable-loki-pipelines.md
@@ -102,8 +102,7 @@ Pros:
* Increase throughput of log pipelines.
* A way to signal back to the source
* Iterator-like pipeline - one entry at a time
-* Transaction semantics provide better error handling
-* Aligns with Prometheus patterns already in use
+* Transaction semantics, sources have better control on when a batch should be aborted.
Cons:
* We need to rewrite all loki components with this new pattern and make them safe to call in parallel.
* More complex API
From 443884f99c0508ace7bd09a654707ca753c8de4f Mon Sep 17 00:00:00 2001
From: Kalle <23356117+kalleep@users.noreply.github.com>
Date: Fri, 28 Nov 2025 15:34:35 +0100
Subject: [PATCH 07/12] Add section about transition to new pipeline code
---
docs/design/4940-reliable-loki-pipelines.md | 14 ++++++++++++++
1 file changed, 14 insertions(+)
diff --git a/docs/design/4940-reliable-loki-pipelines.md b/docs/design/4940-reliable-loki-pipelines.md
index d175d5d2229..9c53caf9a78 100644
--- a/docs/design/4940-reliable-loki-pipelines.md
+++ b/docs/design/4940-reliable-loki-pipelines.md
@@ -119,6 +119,17 @@ Two options to handle this:
* When using `loki.source.file`, we would retry the same batch again.
* Configuration option `min_success` - Only retry if we don't succeed on at least the configured number of destinations.
+### Transition from current pipeline to either Proposal 1 or Proposal 2
+Changing the way loki pipeline works is a big effort and will affect all loki components.
+
+We have a couple of options how to do this:
+1. Build tag
+ * We build out the new pipeline under a build tag. This way we could build custom Alloy image using this new pipeline and test it out internally before we commit it to an official release.
+2. New argument
+ * We could add additional argument to components in addition to `forward_to`. This new argument would be using the new pipeline code. This argument would be protected by experimental flag and we would remove it once we are confident in the new code and remove the current pipeline.
+3. Replace pipeline directly
+ * We could replace the pipeline directly without any fallback mechanism. This should be doable over several PRs where we first only replace the communication between components, e.g. in loki.source.file we would still have the [main loop](https://github.com/grafana/alloy/blob/main/internal/component/loki/source/file/file.go#L229-L247) reading from channel and send one entry at a time with this new pipeline between components. Then we could work component by component and remove most of channel usage.
+
### Affected components
The following components need to be updated with this new interface and we need to make sure they are concurrency safe:
@@ -140,6 +151,9 @@ The following components need to be updated with this new interface and we need
- `loki.source.azure_event_hubs`
- `loki.source.aws_firehose`
- `loki.source.windowsevent`
+- `database_observability.mysql`
+- `database_observability.postgres`
+- `faro.receiver`
**Processing components** (need to implement `Consumer` and forward to next):
- `loki.process`
From 3f4c7eff3df569f085c4a442f35f4c59fe3291c3 Mon Sep 17 00:00:00 2001
From: Kalle <23356117+kalleep@users.noreply.github.com>
Date: Mon, 8 Dec 2025 14:47:28 +0100
Subject: [PATCH 08/12] update text
---
docs/design/4940-reliable-loki-pipelines.md | 6 +++++-
1 file changed, 5 insertions(+), 1 deletion(-)
diff --git a/docs/design/4940-reliable-loki-pipelines.md b/docs/design/4940-reliable-loki-pipelines.md
index 9c53caf9a78..53e2f0e9e59 100644
--- a/docs/design/4940-reliable-loki-pipelines.md
+++ b/docs/design/4940-reliable-loki-pipelines.md
@@ -8,7 +8,9 @@
Alloy's Loki pipelines currently use channels, which limits throughput due to head-of-line blocking and can cause silent log drops during config reloads or shutdowns.
-This proposal introduces a function-based pipeline using a `Consumer` interface, replacing the channel-based design. Source components will call `Consume()` directly on downstream components, enabling parallel processing and returning errors that sources can use for retry logic or proper HTTP error responses.
+This proposal introduces a function-based pipeline using a `Consumer` or `Appender` interface, replacing the channel-based design.
+
+Source components will call functions directly on downstream components, enabling parallel processing and returning errors that sources can use for retry logic or proper HTTP error responses.
## Problem
@@ -34,6 +36,8 @@ loki.write "loki" {}
`loki.source.file` will tail all files from targets and compete to send on the channel exposed by `loki.process`. Only one entry will be processed by each stage configured in `loki.process`. If a reload happens or if Alloy is shutting down, logs could be silently dropped.
+There is also no way to abort entries in the pipeline. This is problematic when using components such as `loki.source.api` where caller could cancel request due to e.g. timeouts.
+
## Proposal 0: Do nothing
This architecture works in most cases, it will be hard to use slow components such as `secretfilter` because a lot of the time it's too slow.
It's also hard to use Alloy as a gateway for loki pipelines with e.g. `loki.source.api` due to the limitations listed above.
From d9d20d45e1952a6bae47d234f517f32d851c0e8b Mon Sep 17 00:00:00 2001
From: Kalle <23356117+kalleep@users.noreply.github.com>
Date: Mon, 9 Mar 2026 14:02:03 +0100
Subject: [PATCH 09/12] Update design doc
---
docs/design/4940-reliable-loki-pipelines.md | 230 ++++++++++++--------
1 file changed, 141 insertions(+), 89 deletions(-)
diff --git a/docs/design/4940-reliable-loki-pipelines.md b/docs/design/4940-reliable-loki-pipelines.md
index 53e2f0e9e59..d174e0ab512 100644
--- a/docs/design/4940-reliable-loki-pipelines.md
+++ b/docs/design/4940-reliable-loki-pipelines.md
@@ -1,144 +1,194 @@
# Proposal: Reliable Loki pipelines
-* Author: Karl Persson (@kalleep)
-* Last updated: 2025-11-30
-* Discussion link: https://github.com/grafana/alloy/pull/4940
+- Author: Karl Persson (@kalleep), Piotr Gwizdala (@thampiotr)
+- Last updated: 2026-03-09
+- Discussion link: https://github.com/grafana/alloy/pull/4940
## Abstract
-Alloy's Loki pipelines currently use channels, which limits throughput due to head-of-line blocking and can cause silent log drops during config reloads or shutdowns.
+Alloy's Loki pipelines have correctness, error handling, and performance issues. We cover them together as they are related, and some solutions will address multiple problems. After an overview, we list our assumptions and propose solutions.
-This proposal introduces a function-based pipeline using a `Consumer` or `Appender` interface, replacing the channel-based design.
+## Problems
-Source components will call functions directly on downstream components, enabling parallel processing and returning errors that sources can use for retry logic or proper HTTP error responses.
+Loki pipelines in Alloy are built using unbuffered Go channels, a design inherited from promtail.
-## Problem
+This implementation comes with limitations that we aim to solve:
-Loki pipelines in Alloy are built using (unbuffered) channels, a design inherited from promtail.
+1. **Propagating errors:** There is no way for a downstream component to signal error or success. If we do receive errors in source components, we wouldn’t know how to handle them - the correct handling needs to be designed:
+ - In some cases, an error can be propagated, for example in case of loki.source.api, we can respond to a client with a retriable error.
+ - In other cases, an error may need to be retried, for example in case of loki.source.file.
+ - Success handling: ideally we should only advance the positions in the file after we got success. Similarly, we should send response ‘200’ only when successful.
+ - Logs can be dropped in the pipeline: the dropped logs should be considered successfully processed
+ - Logs can be joined from multiple entries into one entry via multiline: in this case the success or failure of the aggregated line needs to be propagated as success/failure of all the entries that it was made with.
-This comes with two big limitations:
-1. Throughput of each component is limited due to head-of-line blocking, where pushing to the next channel may not be possible in the presence of a slow component. An example of this is usage of [secretfilter](https://github.com/grafana/alloy/issues/3694).
-2. Because there is no way to signal back to the source, components can silently drop logs during config reload or shutdown and there is no way to detect that.
+2. **Fan-out errors:** Properly handle errors when fan-out to 2+ subpipelines does not succeed. We need to define the desired behavior and what the ‘success’ means in the context of fan-out. See additional questions below.
-Consider the following simplified config:
-```
-loki.source.file "logs" {
- targets = targets
- forward_to = [loki.process.logs.receiver]
-}
+3. **Loss of logs during shutdown:** because the component shutdown sequence in Alloy is currently not deterministic. To cleanly shutdown and drain Loki pipelines without losing logs, we want to stop accepting new logs and then make sure the pipeline is drained.
-loki.process "logs" {
- forward_to = [loki.write.loki.receiver]
-}
+4. **Performance:** Throughput does not scale with CPU/memory given to the process. Pipelines don’t scale because of head-of-line blocking, where pushing to the next channel may not be possible in the presence of a slow component. An example of this is usage of [secretfilter](https://github.com/grafana/alloy/issues/3694).
-loki.write "loki" {}
-```
+5. **Traffic shedding:** When traffic volume is higher than Alloy’s max throughput, there is no mechanism to shed traffic and reject new requests without attempting to process them.
+ Ideally we would shed traffic immediately on arrival if Alloy detects congestion, so that we can allow requests already in the pipeline to be processed. Perhaps it’s a temporary spike or scaling needs to kick-in.
+
+6. **Congestion observability:** There is no way to track pipeline latency, for example, to understand if Alloy is able to keep up with the volume of logs. There is this GH issue.
+
+7. **Cancelling writes:** There is no way to signal from the source to downstream components that processing of some entries is no longer needed (e.g. request cancelled). For example:
+ loki.source.api receives a request which is subsequently cancelled by the client. If the logs are already sent, there’s nothing we can do about it, but if the logs are still in the pipeline, perhaps we could cancel their processing.
+
+## Assumptions
+
+**Error Handling for Fan-Out to Multiple Subpipelines**
+
+When fanning out to two or more subpipelines, we considered several approaches for handling errors:
+
+1. Partial success model – Treat the operation as successful if at least one downstream component succeeds.
+
+2. All-or-nothing model – Require all downstream components to succeed for the overall operation to succeed.
-`loki.source.file` will tail all files from targets and compete to send on the channel exposed by `loki.process`. Only one entry will be processed by each stage configured in `loki.process`. If a reload happens or if Alloy is shutting down, logs could be silently dropped.
+3. Per-source configurable threshold – Allow each source component to define a minimum success, for example min_success, that determines how many downstream components must succeed.
-There is also no way to abort entries in the pipeline. This is problematic when using components such as `loki.source.api` where caller could cancel request due to e.g. timeouts.
+4. Configurable per downstream edge – Allow downstream edge components to define whether their failures should impact the overall result, with configuration possible either at the edge component level or per endpoint, for example within loki.write.
-## Proposal 0: Do nothing
-This architecture works in most cases, it will be hard to use slow components such as `secretfilter` because a lot of the time it's too slow.
-It's also hard to use Alloy as a gateway for loki pipelines with e.g. `loki.source.api` due to the limitations listed above.
+**Our default behavior will follow the all-or-nothing model.** The overall operation succeeds only if all downstream components succeed. This provides clear and predictably safe semantics by default.
-## Proposal 1: Chain function calls
+**Option 4** can be supported with already existing `block_on_overflow`. If this is configured for endpoint logs would be dropped and not reported as error and the specific endpoint would not be able to bottleneck the pipeline.
-Loki pipelines are the only ones using channels for passing data between components. Prometheus, Pyroscope and otelcol are all using this pattern where each component just calls functions on the next.
+**Success Semantics for loki.write**
-They all have slightly different interfaces but basically work the same. Each component exports its own interface like Appender for Prometheus or Consumer for Otel.
+For loki.write we need to define when a batch of logs are considered successfully processed and when a success result can be returned and propagated to the source. We considered the following options:
-We could adopt the same pattern for loki pipelines as well with the following interface:
+1. When sent over the wire or written to disk
+ - If WAL is disabled, consider the write successful only after the data has been successfully sent over the network.
+ - If WAL is enabled, consider the write successful once the data is persisted to disk, since it is durable and survives crashes.
+
+2. When written to the queue or written to disk.
+ - If WAL is disabled, consider write to be successful once it is added to the in-memory queue. With a clean shutdown and no downstream issues, no logs are lost.
+ - If WAL is enabled, similar to option 1, consider the write successful once the data is persisted to disk. This will be documented as the recommended configuration for users that want the additional durability.
+
+**We have decided to adopt option 2.** While this approach is not as reliable as option 1 when using in-memory queue, the WAL option is equally reliable and would be recommended for users that want extra reliability. We can still revisit this later if we find that stronger guarantees are required.
+
+## Goals
+
+| Problem | Impact | Priority |
+| ----------------------------------------------- | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------------ |
+| Traffic shedding & Congestion observability | There is always a performance limit. When we hit it, it should be well handled to give users a good experience and clarity of what’s going on. | Very high |
+| Performance | Crashing, not keeping up, loss of logs | Very high |
+| Loss of logs during shutdown | HPA can be quite active, we may end up losing logs. Some customers have alerts on logs. | High |
+| Propagation of errors & fan-out errors handling | **So what errors do we need to handle?**
Seems like the only errors we could get here are: 1. I/O errors when writing to WAL. 2. Congestion when queues are full.
**For gateway setup**, requests that can be retried due to these errors may not be retried and we can lose logs.
**For position files** we only want to advance the position when the entries are successfully pushed into the pipeline. | Medium / Low |
+| Cancelling writes | Potential for more duplicated logs, but duplicated logs are always possible. | Medium / Low |
+
+## Proposal: Function-based pipeline with synchronous error propagation
+
+We replace the current unbuffered channel-based Loki pipeline with synchronous function calls. Components implement a shared Consumer interface. Source components wait for the full pipeline to complete before committing (responding to API clients or advancing file positions). Errors and context cancellation propagate back to the source naturally.
+
+### Interface
+
+This is for illustration purposes only. The actual naming may be different.
```go
type Consumer interface {
- Consume(ctx context.Context, entries []Entry) error
+ Consume(ctx context.Context, entries []loki.Entry) error
}
```
-Adopting this pattern for loki pipelines would change it from a channel-based pipeline to a function-based pipeline. This would give us two things:
-1. Increased throughput because several sources such as many files or http requests can now call the next component in the pipeline at the same time.
-2. A way to return signals back to the source so we can handle things like giving a proper error response or determine if the position file should be updated.
+This replaces the current Chan() chan<- loki.Entry. The call is synchronous — the caller's goroutine does the work through the pipeline. context.Context scopes the processing lifetime (HTTP request context, shutdown signal). error propagates failures back to the source.
+
+Not every Loki backend (or other downstream endpoint) has out-of-order ingestion enabled, so entries within a single stream must be processed in order. To achieve parallelism without breaking ordering, we assign entire streams to specific goroutines — all entries for a given stream always go to the same goroutine (see ShardingConsumer below).
-Solving the issues listed above.
+Pipeline components like loki.process, loki.relabel, loki.write, and fan-out all implement Consumer. These components are stream-agnostic — they process whatever entries they receive and do not perform sharding. The entries they receive will already be grouped by stream.
-A batch of entries should be considered successfully consumed when they are queued up for sending. We could try to extend this to when it was successfully sent over the wire, but that could be considered an improvement at a later stage.
+**Note on label mutation**: loki.process and loki.relabel can change an entry's labels, which changes its stream identity. This is safe because sharding happens before these components run, based on the original stream labels. All entries from the same original stream are on the same worker and processed in order — this ordering is preserved through mutations. When entries reach loki.write, it reshards internally based on final labels. If entries from different original streams end up in the same target stream after relabeling, their interleaving is fine — they had no ordering relationship before relabeling.
-Pros:
-* Increase throughput of log pipelines.
-* A way to signal back to the source
-Cons:
-* We need to rewrite all loki components with this new pattern and make them safe to call in parallel.
-* We go from an iterator-like pipeline to passing slices around. Every component would have to iterate over this slice and we need to make sure it's safe to mutate because of fan-out.
+The existing loki.Entry already carries `model.LabelSet` which identifies the stream, so no changes to the entry type are needed.
-## Proposal 2: Appendable
+### ShardingConsumer
-The prometheus pipeline uses [Appendable](https://github.com/prometheus/prometheus/blob/main/storage/interface.go#L62).
-Appendable only has one method `Appender` that will return an implementation of [Appender](https://github.com/prometheus/prometheus/blob/main/storage/interface.go#L270).
+Source components (loki.source.api, loki.source.file) receive entries that may belong to multiple streams. A ShardingConsumer sits at the boundary between source and pipeline. It groups entries by stream, dispatches each group to a worker goroutine (by stream hash), and waits for all workers to complete. Each worker calls a plain Consumer chain (e.g. loki.process → loki.write) with entries from a single stream.
-We could adopt this pattern for loki pipelines by having:
```go
-type Appendable interface {
- Appender(ctx context.Context) Appender
-}
+type ShardingConsumer struct { ... }
-type Appender interface {
- Append(entry Entry) error
- Commit() error
- Rollback() error
-}
+// Consume groups entries by stream hash, dispatches to workers, and waits
+// for all to complete. Returns error if any stream's processing failed.
+func (s *ShardingConsumer) Consume(ctx context.Context, entries []loki.Entry) error
```
-This approach would, like Proposal 1, solve the issues listed above with a function-based pipeline, but the pipeline would still be iterator-like (one entry at a time).
+ShardingConsumer runs N workers which process entries from consistently hashed streams. So entries from the same stream go to the same worker and are processed in order. Different streams are processed concurrently across workers.
-### How it works
-Source components would:
-Obtain an `Appender` that can fan-out to all downstream components, then call `Append` for each entry.
-If every call to `Append` is successful, `Commit` should be called; otherwise `Rollback`.
+Each worker handles the full processing path inline: Consume() called by the worker passes through loki.process (mutations, filtering) and into loki.write. No goroutine hand-offs. This keeps goroutine stacks under control and avoids context switching between pools.
-Processing components would:
-Implement `Appendable` to return an `Appender` that runs processing for each entry and fan-out to all downstream components.
+loki.source.api. Calls shardingConsumer.Consume(ctx, entries) with all entries from the HTTP request. If success → respond 200. If error → respond with a retryable status code (429/503). If the client disconnects, the HTTP request context is cancelled and workers abort — no 200 is sent, so no commitment is made. To bound goroutine creation, loki.source.api will limit the number of concurrently accepted connections at the HTTP server level.
-Sink components would:
-Implement `Appendable` to return an `Appender` that buffers entries until either `Commit` or `Rollback` is called.
+loki.source.file. Calls shardingConsumer.Consume(ctx, entries) for each batch read from a file target (already single-stream). If success → advance position. If error → do not advance, will retry. If context is cancelled (shutdown) → do not advance, clean exit.
-Pros:
-* Increase throughput of log pipelines.
-* A way to signal back to the source
-* Iterator-like pipeline - one entry at a time
-* Transaction semantics, sources have better control on when a batch should be aborted.
-Cons:
-* We need to rewrite all loki components with this new pattern and make them safe to call in parallel.
-* More complex API
+loki.write. Implements Consumer by appending to a WAL or in-memory queue. Blocks if queue is full; returns error on WAL I/O failure or context cancellation. WAL I/O errors are retryable — another Alloy instance may have a healthy WAL, or the error may be transient.
-## Considerations for implementation
+### FanoutConsumer
-### Handling fan-out failures
+Every component that can fan-out should use `FanoutConsumer`. This one is responsible to call all `Consume` on all consumers a component should forward to. From sources we would pass this one to `ShardingConsumer`.
-Because a pipeline can "fan-out" to multiple paths, it can also partially succeed. We need to determine how to handle this.
+```go
+type FanoutConsumer struct { ... }
+
+// Consume calls consume on all internal consumers pass to it and aggregate errors.
+func (f *FanoutConsumer) Consume(ctx context.Context, entries []loki.Entry) error
+```
+
+### Error handling and backpressure
+
+Errors propagate synchronously back to the source through Consume() return values. The source has not committed yet (no 200 sent, no positions advanced), so it can always handle errors safely.
+
+| Error | What happens | Source behavior |
+| ----------------------- | ------------------------------------------------ | ------------------------------------------------------------------------------------------------ |
+| WAL I/O failure | loki.write.Consume() returns retryable error | API: respond 429/503. File: don't advance position. Both retry. |
+| Queue full (congestion) | loki.write.Consume() blocks | API: request eventually times out → retryable error. File: reader blocks, positions not updated. |
+| Context cancelled | loki.write.Consume() returns ctx.Err() | API: no response needed (client gone). File: don't advance, clean exit. |
+| Fan-out partial failure | Overall Consume() returns error (all-or-nothing) | Same as WAL error — retryable response or no position advance. |
+
+### Backpressure flow under sustained congestion:
+
+1. HTTP send slows down or WAL disk I/O fails
+2. loki.write's queue fills up → Consume() blocks
+3. Pool workers block → pool input queues fill up
+4. Sources can no longer submit:
+ * loki.source.api: requests time out → retryable error responses → clients retry, possibly to another instance
+ * loki.source.file: file reader blocks → positions not updated
+
+The HTTP request timeout acts as the natural load shedding mechanism. No explicit congestion detection is needed.
+
+### Shutdown
+
+1. Sources stop accepting new requests / reads.
+2. In-flight processing is cancelled via context. Since sources have not committed (no 200 response sent, no positions advanced), cancellation is safe — clients will retry, and file positions will be re-read on restart.
+3. loki.write drains its internal queue (entries already accepted into the queue are flushed to the network / WAL). This is the only component that needs full draining, and it already supports this today.
-Two options to handle this:
-* Always retry if one or more failed - This could lead to duplicated logs but is easy and safe to implement. This is also how otelcol works.
- * When using `loki.source.api`, we would return a 5xx error so the caller can retry.
- * When using `loki.source.file`, we would retry the same batch again.
-* Configuration option `min_success` - Only retry if we don't succeed on at least the configured number of destinations.
+### Queue sizing
-### Transition from current pipeline to either Proposal 1 or Proposal 2
-Changing the way loki pipeline works is a big effort and will affect all loki components.
+The main tunable queue is in loki.write (the worker input queues are small and fixed). We track a metric of the average log entry size observed at runtime. Combined with the available memory on the instance, we can compute a queue capacity with a safety margin to avoid OOM. In first iterations this will be documented as a manual tuning step with some sound defaults.
-We have a couple of options how to do this:
-1. Build tag
- * We build out the new pipeline under a build tag. This way we could build custom Alloy image using this new pipeline and test it out internally before we commit it to an official release.
-2. New argument
- * We could add additional argument to components in addition to `forward_to`. This new argument would be using the new pipeline code. This argument would be protected by experimental flag and we would remove it once we are confident in the new code and remove the current pipeline.
-3. Replace pipeline directly
- * We could replace the pipeline directly without any fallback mechanism. This should be doable over several PRs where we first only replace the communication between components, e.g. in loki.source.file we would still have the [main loop](https://github.com/grafana/alloy/blob/main/internal/component/loki/source/file/file.go#L229-L247) reading from channel and send one entry at a time with this new pipeline between components. Then we could work component by component and remove most of channel usage.
+### Observability
+
+Every batch created timestamp when it enters the pipeline. When loki.write sends the entries over the network, it records the total propagation latency in a histogram. This enables alerting on pipeline congestion (a single alert covers both "not reading files fast enough" and general backpressure) and gives users visibility into whether Alloy is keeping up with log volume.
+
+### Position file update lag (optional)
+
+Even with synchronous error propagation via ShardingConsumer, there is a window where entries have been accepted into loki.write's in-memory queue (Consume() returned success) but haven't been sent over the network yet. If Alloy has an unclean shutdown in that time window, those entries are lost but positions were already advanced.
+
+We have an option to mitigate this in the future:
+
+* We introduce a configurable lag for position file updates, initially set to ~30 seconds. We delay committing the read position, so if Alloy crashes, we re-read and re-send entries from the last committed position.
+* Loki handles duplicates correctly as long as timestamps come from the log entry itself (not the wall clock), which is the standard behavior.
+
+### Future improvement:
+
+Automatically tune the lag based on a total pipeline latency estimate (e.g. from loki.write metrics).
### Affected components
The following components need to be updated with this new interface and we need to make sure they are concurrency safe:
**Source components** (need to call `Consume()` and handle errors):
+
- `loki.source.file`
- `loki.source.api`
- `loki.source.kafka`
@@ -160,11 +210,13 @@ The following components need to be updated with this new interface and we need
- `faro.receiver`
**Processing components** (need to implement `Consumer` and forward to next):
+
- `loki.process`
- `loki.relabel`
- `loki.secretfilter`
- `loki.enrich`
**Sink components** (need to implement `Consumer`):
+
- `loki.write`
- `loki.echo`
From fad497bc08565bc6c689bdb69c8f19b7a3a0dcba Mon Sep 17 00:00:00 2001
From: Kalle <23356117+kalleep@users.noreply.github.com>
Date: Tue, 10 Mar 2026 13:23:50 +0100
Subject: [PATCH 10/12] update
---
docs/design/4940-reliable-loki-pipelines.md | 97 +++++++++++----------
1 file changed, 49 insertions(+), 48 deletions(-)
diff --git a/docs/design/4940-reliable-loki-pipelines.md b/docs/design/4940-reliable-loki-pipelines.md
index d174e0ab512..9fee445537a 100644
--- a/docs/design/4940-reliable-loki-pipelines.md
+++ b/docs/design/4940-reliable-loki-pipelines.md
@@ -113,16 +113,6 @@ type ShardingConsumer struct { ... }
func (s *ShardingConsumer) Consume(ctx context.Context, entries []loki.Entry) error
```
-ShardingConsumer runs N workers which process entries from consistently hashed streams. So entries from the same stream go to the same worker and are processed in order. Different streams are processed concurrently across workers.
-
-Each worker handles the full processing path inline: Consume() called by the worker passes through loki.process (mutations, filtering) and into loki.write. No goroutine hand-offs. This keeps goroutine stacks under control and avoids context switching between pools.
-
-loki.source.api. Calls shardingConsumer.Consume(ctx, entries) with all entries from the HTTP request. If success → respond 200. If error → respond with a retryable status code (429/503). If the client disconnects, the HTTP request context is cancelled and workers abort — no 200 is sent, so no commitment is made. To bound goroutine creation, loki.source.api will limit the number of concurrently accepted connections at the HTTP server level.
-
-loki.source.file. Calls shardingConsumer.Consume(ctx, entries) for each batch read from a file target (already single-stream). If success → advance position. If error → do not advance, will retry. If context is cancelled (shutdown) → do not advance, clean exit.
-
-loki.write. Implements Consumer by appending to a WAL or in-memory queue. Blocks if queue is full; returns error on WAL I/O failure or context cancellation. WAL I/O errors are retryable — another Alloy instance may have a healthy WAL, or the error may be transient.
-
### FanoutConsumer
Every component that can fan-out should use `FanoutConsumer`. This one is responsible to call all `Consume` on all consumers a component should forward to. From sources we would pass this one to `ShardingConsumer`.
@@ -134,6 +124,55 @@ type FanoutConsumer struct { ... }
func (f *FanoutConsumer) Consume(ctx context.Context, entries []loki.Entry) error
```
+### Architecture
+
+```
+loki.source.api loki.source.file
+ | |
+ | HTTP handler receives request | File reader reads entries
+ | | (one stream per target)
+ v v
++------------------------------------------------------------+
+| ShardingConsumer |
+| |
+| Groups entries by stream hash, dispatches to N workers. |
+| WAITS for all workers to complete, returns result. |
+| |
+| worker 0 --+ |
+| worker 1 --+-- Consume(logs belonging to this worker) |
+| worker 2 --+ | |
+| ... | | |
+| worker N --+ | |
++-------------------------+----------------------------------+
+ |
+ | Consume() - synchronous, may block
+ v
+ +-- loki.process etc. --+
+ | (mutations, |
+ | filtering) |
+ +-----------------------+
+ |
+ | Consume() - synchronous, may block
+ v
++------------------------------------------------------------+
+| loki.write |
+| |
+| Consume() -> append to in-memory queue or WAL |
+| returns error on WAL I/O failure or |
+| blocks if in-memory queue is full for backpressure |
++------------------------------------------------------------+
+```
+
+ShardingConsumer runs N workers which process entries from consistently hashed streams. So entries from the same stream go to the same worker and are processed in order. Different streams are processed concurrently across workers.
+
+Each worker handles the full processing path inline: Consume() called by the worker passes through loki.process (mutations, filtering) and into loki.write. No goroutine hand-offs. This keeps goroutine stacks under control and avoids context switching between pools.
+
+loki.source.api. Calls shardingConsumer.Consume(ctx, entries) with all entries from the HTTP request. If success → respond 200. If error → respond with a retryable status code (429/503). If the client disconnects, the HTTP request context is cancelled and workers abort — no 200 is sent, so no commitment is made. To bound goroutine creation, loki.source.api will limit the number of concurrently accepted connections at the HTTP server level.
+
+loki.source.file. Calls shardingConsumer.Consume(ctx, entries) for each batch read from a file target (already single-stream). If success → advance position. If error → do not advance, will retry. If context is cancelled (shutdown) → do not advance, clean exit.
+
+loki.write. Implements Consumer by appending to a WAL or in-memory queue. Blocks if queue is full; returns error on WAL I/O failure or context cancellation. WAL I/O errors are retryable — another Alloy instance may have a healthy WAL, or the error may be transient.
+
### Error handling and backpressure
Errors propagate synchronously back to the source through Consume() return values. The source has not committed yet (no 200 sent, no positions advanced), so it can always handle errors safely.
@@ -182,41 +221,3 @@ We have an option to mitigate this in the future:
### Future improvement:
Automatically tune the lag based on a total pipeline latency estimate (e.g. from loki.write metrics).
-
-### Affected components
-
-The following components need to be updated with this new interface and we need to make sure they are concurrency safe:
-
-**Source components** (need to call `Consume()` and handle errors):
-
-- `loki.source.file`
-- `loki.source.api`
-- `loki.source.kafka`
-- `loki.source.journal`
-- `loki.source.docker`
-- `loki.source.kubernetes`
-- `loki.source.kubernetes_events`
-- `loki.source.podlogs`
-- `loki.source.syslog`
-- `loki.source.gelf`
-- `loki.source.cloudflare`
-- `loki.source.gcplog`
-- `loki.source.heroku`
-- `loki.source.azure_event_hubs`
-- `loki.source.aws_firehose`
-- `loki.source.windowsevent`
-- `database_observability.mysql`
-- `database_observability.postgres`
-- `faro.receiver`
-
-**Processing components** (need to implement `Consumer` and forward to next):
-
-- `loki.process`
-- `loki.relabel`
-- `loki.secretfilter`
-- `loki.enrich`
-
-**Sink components** (need to implement `Consumer`):
-
-- `loki.write`
-- `loki.echo`
From ef2a3f956f20594d05eee839661728bf9f236a97 Mon Sep 17 00:00:00 2001
From: Kalle <23356117+kalleep@users.noreply.github.com>
Date: Wed, 25 Mar 2026 14:10:43 +0100
Subject: [PATCH 11/12] Move paragraph
---
docs/design/4940-reliable-loki-pipelines.md | 6 ++----
1 file changed, 2 insertions(+), 4 deletions(-)
diff --git a/docs/design/4940-reliable-loki-pipelines.md b/docs/design/4940-reliable-loki-pipelines.md
index 9fee445537a..2e86253db27 100644
--- a/docs/design/4940-reliable-loki-pipelines.md
+++ b/docs/design/4940-reliable-loki-pipelines.md
@@ -97,10 +97,6 @@ Not every Loki backend (or other downstream endpoint) has out-of-order ingestion
Pipeline components like loki.process, loki.relabel, loki.write, and fan-out all implement Consumer. These components are stream-agnostic — they process whatever entries they receive and do not perform sharding. The entries they receive will already be grouped by stream.
-**Note on label mutation**: loki.process and loki.relabel can change an entry's labels, which changes its stream identity. This is safe because sharding happens before these components run, based on the original stream labels. All entries from the same original stream are on the same worker and processed in order — this ordering is preserved through mutations. When entries reach loki.write, it reshards internally based on final labels. If entries from different original streams end up in the same target stream after relabeling, their interleaving is fine — they had no ordering relationship before relabeling.
-
-The existing loki.Entry already carries `model.LabelSet` which identifies the stream, so no changes to the entry type are needed.
-
### ShardingConsumer
Source components (loki.source.api, loki.source.file) receive entries that may belong to multiple streams. A ShardingConsumer sits at the boundary between source and pipeline. It groups entries by stream, dispatches each group to a worker goroutine (by stream hash), and waits for all workers to complete. Each worker calls a plain Consumer chain (e.g. loki.process → loki.write) with entries from a single stream.
@@ -113,6 +109,8 @@ type ShardingConsumer struct { ... }
func (s *ShardingConsumer) Consume(ctx context.Context, entries []loki.Entry) error
```
+**Note on label mutation**: loki.process and loki.relabel can change an entry's labels, which changes its stream identity. This is safe because sharding happens before these components run, based on the original stream labels. All entries from the same original stream are on the same worker and processed in order — this ordering is preserved through mutations. When entries reach loki.write, it reshards internally based on final labels. If entries from different original streams end up in the same target stream after relabeling, their interleaving is fine — they had no ordering relationship before relabeling.
+
### FanoutConsumer
Every component that can fan-out should use `FanoutConsumer`. This one is responsible to call all `Consume` on all consumers a component should forward to. From sources we would pass this one to `ShardingConsumer`.
From 717badebba2250f6baab11914d237e934c7d1601 Mon Sep 17 00:00:00 2001
From: Kalle <23356117+kalleep@users.noreply.github.com>
Date: Mon, 30 Mar 2026 16:26:22 +0200
Subject: [PATCH 12/12] Update
---
docs/design/4940-reliable-loki-pipelines.md | 11 ++++++++---
1 file changed, 8 insertions(+), 3 deletions(-)
diff --git a/docs/design/4940-reliable-loki-pipelines.md b/docs/design/4940-reliable-loki-pipelines.md
index 2e86253db27..cbda475f9a8 100644
--- a/docs/design/4940-reliable-loki-pipelines.md
+++ b/docs/design/4940-reliable-loki-pipelines.md
@@ -93,8 +93,6 @@ type Consumer interface {
This replaces the current Chan() chan<- loki.Entry. The call is synchronous — the caller's goroutine does the work through the pipeline. context.Context scopes the processing lifetime (HTTP request context, shutdown signal). error propagates failures back to the source.
-Not every Loki backend (or other downstream endpoint) has out-of-order ingestion enabled, so entries within a single stream must be processed in order. To achieve parallelism without breaking ordering, we assign entire streams to specific goroutines — all entries for a given stream always go to the same goroutine (see ShardingConsumer below).
-
Pipeline components like loki.process, loki.relabel, loki.write, and fan-out all implement Consumer. These components are stream-agnostic — they process whatever entries they receive and do not perform sharding. The entries they receive will already be grouped by stream.
### ShardingConsumer
@@ -109,7 +107,6 @@ type ShardingConsumer struct { ... }
func (s *ShardingConsumer) Consume(ctx context.Context, entries []loki.Entry) error
```
-**Note on label mutation**: loki.process and loki.relabel can change an entry's labels, which changes its stream identity. This is safe because sharding happens before these components run, based on the original stream labels. All entries from the same original stream are on the same worker and processed in order — this ordering is preserved through mutations. When entries reach loki.write, it reshards internally based on final labels. If entries from different original streams end up in the same target stream after relabeling, their interleaving is fine — they had no ordering relationship before relabeling.
### FanoutConsumer
@@ -171,6 +168,14 @@ loki.source.file. Calls shardingConsumer.Consume(ctx, entries) for each batch re
loki.write. Implements Consumer by appending to a WAL or in-memory queue. Blocks if queue is full; returns error on WAL I/O failure or context cancellation. WAL I/O errors are retryable — another Alloy instance may have a healthy WAL, or the error may be transient.
+### Ordering
+
+Some Loki backends, and other possible downstream endpoints, do not accept out-of-order ingestion within a stream. That means Alloy must preserve per-stream ordering even when the pipeline processes multiple streams concurrently.
+
+We preserve that invariant by sharding on the entry's original stream labels at the boundary between the source and the pipeline. All entries from the same stream are assigned to the same worker goroutine and are processed serially on that worker. Different streams can still be processed in parallel.
+
+`loki.process` and `loki.relabel` may change an entry's labels, which can change its final stream identity. That can break ordering for the final stream if entries from different original streams are relabeled into the same output stream. This proposal does not address that behavior. It already exists in the current pipeline design, and the ordering guarantee described here applies only to entries within the original stream seen at the source boundary.
+
### Error handling and backpressure
Errors propagate synchronously back to the source through Consume() return values. The source has not committed yet (no 200 sent, no positions advanced), so it can always handle errors safely.