fix: Stop components in a deterministic order#5613
Conversation
There was a problem hiding this comment.
Pull request overview
This pull request addresses the issue of non-deterministic component shutdown order in the Alloy controller. The changes ensure that components are stopped synchronously in the reverse order of their scheduling, which is critical for proper pipeline teardown (e.g., stopping loki.source.file before loki.process before loki.write).
Changes:
- Modified Synchronize to stop tasks synchronously in reverse scheduling order instead of asynchronously
- Introduced stoppingOrder slice to track the deterministic stopping sequence
- Renamed Close() to Stop() and simplified the shutdown logic
- Removed async goroutine-based stopping with timeout warnings during synchronization
- Changed task context creation from using scheduler's context to using background context
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 6 comments.
| File | Description |
|---|---|
| internal/runtime/internal/controller/scheduler.go | Implements deterministic stopping order by tracking stoppingOrder slice, removes async stopping logic, renames Close to Stop, changes task context creation |
| internal/runtime/alloy.go | Updates defer order in Run() to call sched.Stop() instead of sched.Close(), ensures proper shutdown sequence |
2a7d7ef to
13fe18d
Compare
13fe18d to
2f407ea
Compare
1e0c3ef to
188704b
Compare
| doneStopping := make(chan struct{}) | ||
| go func() { | ||
| stopping.Wait() | ||
| close(doneStopping) | ||
| }() | ||
|
|
||
| stoppingTimedOut := false | ||
| select { | ||
| case <-doneStopping: | ||
| // All tasks stopped successfully within timeout. | ||
| case <-time.After(TaskShutdownWarningTimeout): | ||
| level.Warn(s.logger).Log("msg", "Some tasks are taking longer than expected to shutdown, proceeding with new tasks") | ||
| stoppingTimedOut = true |
There was a problem hiding this comment.
I removed this for now, I don't see the point of us having this, we need to wait for stopped components anyway and without this at least we can garantuee that we don't start any new tasks before we have stopped tasks that was removed
There was a problem hiding this comment.
Since startup/shutdown is still all done together I'm not a huge fan of letting a single task bottleneck scheduling for 10 mins. Outside of the port collision issue I'm not sure how much it is a problem that in exceptional cases we can startup a new component before the other is shutdown.
I would feel more comfortable about dropping this if we were doing it at the pipeline level rather than globally.
There was a problem hiding this comment.
Pipeline level would be tricky. Yes one slow task would potentially bottleneck startup of all new ones and sure this is mostly related to shared resources such as ports, maybe others but cannot think of an example right now.
We can add it back if you think it's that important but future scheduling will be blocked anyway until we have stopped them.
There was a problem hiding this comment.
Yeah no matter what we have to wait and it's probably better to wait out the stop vs enter in to more undefined behavior with potentially having overlapping components running at the same time.
Final thing, in the event we did get stalled on shutdown is our logging good enough to make it obvious? I think so based on us logging the warning after 1 minute and the failure at 10 mins but WDYT?
There was a problem hiding this comment.
I would say logging is fine but shutdown could be way longer now because we stop synchronously and the deadline is per task, are we fine with that?
There was a problem hiding this comment.
I think at worst we are waiting a little over 10 mins because still we launch a goroutine to stop every background task. Did I miss something that might have changed that?
There was a problem hiding this comment.
So before we stopped every task async with a default timeout of 10min. Now when we stop it synchronously within each sub pipeline the max time would be e.g. 10min * number of components
There was a problem hiding this comment.
Ahhhh, I still think it's okay maybe we could add more diagnostic info when shutdown is taking a long time? How hard would it be to provide the number of components left to stop for us to log?
There was a problem hiding this comment.
Hm not that hard, but we do have warning log for when things are slow for each task we stop. Adding one where we log how many are left during stopping would be a bit noisy I think
There was a problem hiding this comment.
I don't think we need another log, I was thinking we could include the number of components left in the warning / error when it is taking too long.
| defer stopping.Done() | ||
| t.Stop() | ||
| }(t) | ||
| toStop[t.groupID] = append(toStop[t.groupID], t) |
There was a problem hiding this comment.
Tasks being stopped are added to toStop using t.groupID, but this value has already been updated at lines 76-77 for tasks that are being kept. For tasks being stopped, you should capture the old groupID before updating the kept tasks, or use the groupID before any updates. This could result in tasks being grouped incorrectly during shutdown.
There was a problem hiding this comment.
This is fine, we just update for tasks that are kept. So every task we want to stop still belongs to the same groupID.
| }) | ||
|
|
||
| t.Run("single graph", func(t *testing.T) { | ||
| // One component: a->b->c |
There was a problem hiding this comment.
What does "component" mean in this case? Aren't there three components?
There was a problem hiding this comment.
Its just the wording used by this kind of algorithm, I removed the comment but essentially one component is an isolated "sub" graph https://www.geeksforgeeks.org/dsa/find-weakly-connected-components-in-a-directed-graph/
37c3191 to
8cf2d5b
Compare
kgeckhart
left a comment
There was a problem hiding this comment.
Mostly questions / small things, overall I think it looks good on paper. The part I'm most concerned about is that we don't have any tests on alloy.go that we could use to prove we are going to schedule the same things at the end of this change.
| func FilterLeavesFunc(g *Graph, n Node) bool { | ||
| return len(g.outEdges[n]) == 0 | ||
| } |
There was a problem hiding this comment.
This is the only defined func we use, is it useful to keep the others or wait to add them when we need them?
There was a problem hiding this comment.
Yeah so first I just implemented WeaklyConnectedComponents without filtering but for scheduling to work correctly I only need leaves, but felt bad about only adding a specialized function for it.
I am not sure we need the rest of them I just added it for consistency but I can remove unused ones, though I think FilterAllFunc is useful just to test that the algorithm is correct in general
| doneStopping := make(chan struct{}) | ||
| go func() { | ||
| stopping.Wait() | ||
| close(doneStopping) | ||
| }() | ||
|
|
||
| stoppingTimedOut := false | ||
| select { | ||
| case <-doneStopping: | ||
| // All tasks stopped successfully within timeout. | ||
| case <-time.After(TaskShutdownWarningTimeout): | ||
| level.Warn(s.logger).Log("msg", "Some tasks are taking longer than expected to shutdown, proceeding with new tasks") | ||
| stoppingTimedOut = true |
There was a problem hiding this comment.
Since startup/shutdown is still all done together I'm not a huge fan of letting a single task bottleneck scheduling for 10 mins. Outside of the port collision issue I'm not sure how much it is a problem that in exceptional cases we can startup a new component before the other is shutdown.
I would feel more comfortable about dropping this if we were doing it at the pipeline level rather than globally.
We do have some, like specific tests for modules, services, foreach and a couple of others. But yes I should probably add some test to ensure that all works correctly. I can do that in another pr and then once that is merged I will rebase this branch to make sure they are still passing. |
Great idea! |
|
Added a test in #5679 Let me know if you want me to expand it @kgeckhart |
…ponents (#5679) Add test that verifies different component statues for alloy runtime, this is to increase confidence with upcoming scheduling changes #5613 (comment)
5b735dd to
dc7cc18
Compare
task to start before returning
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Kyle Eckhart <kgeckhart@users.noreply.github.com>
dc7cc18 to
efd1e6a
Compare
### Pull Request Details We have added some shared functionallity for working with loki pipelines and should use them in our `database_observability`. This will make it easier in the future to start to migrate to [new architecture for loki pipelines](#4940) and avoid common pitfalls. I did not review all `collectors` but just did a sample and e.g. [explain_plan](https://github.com/grafana/alloy/blob/main/internal/component/database_observability/mysql/collector/explain_plans.go#L469) can cause a deadlock when component is stopping and in turn preventing alloy from stopping because nothing is consuming from handler. Changes: 1. Use [loki.Fanout](https://github.com/grafana/alloy/blob/main/internal/component/common/loki/fanout.go#L19) - This is doing internal locking of logs receivers so components do not have to care about it. It will also stop forwarding when context is canceled. 2. Use [loki.Consume](https://github.com/grafana/alloy/blob/0ac6d7ca7c744268668189c0f11460a84c9ef458/internal/component/common/loki/consume.go#L13) - Runs the consume loop from a `LogsReceiver` and will exit when context is canceled. 3. Use [loki.Drain](https://github.com/grafana/alloy/blob/0ac6d7ca7c744268668189c0f11460a84c9ef458/internal/component/common/loki/drain.go#L18) - When components is stopping we need to still forward / drain to make sure collectors can stop. Since #5613 we have a deterministic order for stopping components so it is safe to forward to downstream components during stops but we still have a timeout after we would drain into nothing. 4. Use [loki.NewEntry](https://github.com/grafana/alloy/blob/0ac6d7ca7c744268668189c0f11460a84c9ef458/internal/component/common/loki/entry.go#L10) - This makes sure we set `created` timestramp of entries so they can be properly tracked by `loki_write_entry_propagation_latency_seconds`. ### Issue(s) fixed by this Pull Request Part of: #5826 ### Notes to the Reviewer <!-- Add any relevant notes for the reviewers and testers of this PR. --> ### PR Checklist <!-- Remove items that do not apply. For completed items, change [ ] to [x]. --> - [ ] Documentation added - [ ] Tests updated - [ ] Config converters updated
🤖 I have created a release *beep* *boop* --- ## [1.15.0](v1.14.0...v1.15.0) (2026-03-26) ### ⚠ BREAKING CHANGES * **otelcol:** Upgrade to OTel Collector v0.147.0 ([#5784](#5784)) * Renamed undocumented metrics that was previously prefixed with <component_id>_<metric_name> to loki_source_awsfirehose_<metric_name> ### Features 🌟 * **alloy-mixin:** Add filters, groupBy, and multi-select dashboard variables ([#5611](#5611)) ([3ef714e](3ef714e)) * **beyla.ebpf:** Add support for Prometheus native histograms ([#5812](#5812)) ([7d806fb](7d806fb)) * **beyla.ebpf:** Bump Beyla to v3.6 ([#5833](#5833)) ([cd878d5](cd878d5)) * **converters:** Support converting Promtail limits_config ([#5777](#5777)) ([9491385](9491385)) * **database_observability.mysql:** Add filtering of query samples and wait events by minimum duration ([#5678](#5678)) ([5a4d03b](5a4d03b)) * **database_observability.mysql:** Embed prometheus exporter within db-o11y component ([#5711](#5711)) ([88bffb0](88bffb0)) * **database_observability.postgres:** Add configurable limit to `pg_stat_statements` query ([#5639](#5639)) ([0de0a3f](0de0a3f)) * **database_observability.postgres:** Embed prometheus exporter within db-o11y component ([#5714](#5714)) ([9dc2e83](9dc2e83)) * **database_observability:** Add scaffolding for db-o11y integration tests ([#5575](#5575)) ([ca637d8](ca637d8)) * **database_observability:** Promote components to stable ([#5736](#5736)) ([21a9af6](21a9af6)) * Expose Functionality to Handle syslogs with Empty MSG Field ([#5687](#5687)) ([178b1e6](178b1e6)) * **helm:** Allow setting `revisionHistoryLimit` in the helm chart ([#5847](#5847)) ([9713ad4](9713ad4)) * **loki.process:** Support structured metadata as source type of stage.labels for loki.process ([#5055](#5055)) ([eda3152](eda3152)) * **loki.secretfilter:** Add sampling for secretfilter entries ([#5663](#5663)) ([9997802](9997802)) * **loki.source.gcplog:** Add alloy config for MaxOutstandingBytes and MaxOutstandingMessages ([#5760](#5760)) ([c2b9f0b](c2b9f0b)) * **loki.write:** Add loki pipeline latency metric ([#5702](#5702)) ([cc744a1](cc744a1)) * **mixin:** Update loki dashboard ([#5848](#5848)) ([b616d58](b616d58)) * **otelcol.receiver.datadog:** Expose intake proxy and trace_id_cache_size settings ([#5776](#5776)) ([0384ad4](0384ad4)) * **otelcol:** Upgrade to OTel Collector v0.147.0 ([#5784](#5784)) ([a9b5396](a9b5396)) * **prometheus.exporter.cloudwatch:** Use aws-sdk-go-v2 by default ([#5768](#5768)) ([a2f3489](a2f3489)) * **pyroscope.ebpf:** Add comm, pid labels and kernel frame options ([#5769](#5769)) ([4fa7068](4fa7068)) * **pyroscope.ebpf:** Expose OTel eBPF profiler internal metrics to Prometheus ([#5774](#5774)) ([e713392](e713392)) * **pyroscope:** Copy prometheus common/config HTTP client into promhttp2 package ([#5810](#5810)) ([0b31aaa](0b31aaa)) ### Bug Fixes 🐛 * **beyla:** Inject Beyla version into binary via ldflags ([#5735](#5735)) ([71c03ec](71c03ec)) * Correctly handle the deprecated topic field in otelcol.receiver.kafka configuration ([#5726](#5726)) ([538ac75](538ac75)) * **database_observability.mysql:** Ensure result sets are properly closed ([#5893](#5893)) ([f28f91c](f28f91c)) * **database_observability:** Ensure all collectors are properly stopped ([#5796](#5796)) ([6bfa2a7](6bfa2a7)) * **database_observability:** Ensure that `connection_info` metric is only emitted for a given DB instance when it is available ([#5707](#5707)) ([bf0c3dc](bf0c3dc)) * **database_observability:** Solve test flakiness in MySQL and Postgres sample collectors ([#5130](#5130)) ([a7590d1](a7590d1)) * **deps:** Update module github.com/buger/jsonparser to v1.1.2 [SECURITY] ([#5834](#5834)) ([b2fee8a](b2fee8a)) * **deps:** Update module github.com/buger/jsonparser to v1.1.2 [SECURITY] ([#5870](#5870)) ([698b4e7](698b4e7)) * **deps:** Update module google.golang.org/grpc to v1.79.3 [SECURITY] ([#5825](#5825)) ([5cfbcc4](5cfbcc4)) * **deps:** Update module google.golang.org/grpc to v1.79.3 [SECURITY] ([#5871](#5871)) ([259152d](259152d)) * **deps:** Update npm dependencies ([#5876](#5876)) ([f0f6a11](f0f6a11)) * **deps:** Update npm deps across repo to address CVE-2026-26996 and CVE-2026-22029 ([#5872](#5872)) ([df518dd](df518dd)) * **go:** Update build image to go v1.25.8 ([#5832](#5832)) ([f9b3043](f9b3043)) * **go:** Update go to 1.25.8 ([#5844](#5844)) ([534e7db](534e7db)) * Helm: alloy.extraPorts not working with service.type=NodePort [COPY] ([#5892](#5892)) ([162c6f7](162c6f7)) * **loki.enrich:** Use shared loki functions and fix locking ([#5821](#5821)) ([f916c72](f916c72)) * **loki.process:** Multiline no longer pass empty entry if start was flushed ([#5746](#5746)) ([7bdedf1](7bdedf1)) * **loki.process:** Protect against json that does not look like docker json format ([#5761](#5761)) ([0af6eaa](0af6eaa)) * **loki.secretfilter:** Fix bug where entries were being shadow dropped ([#5786](#5786)) ([90243f9](90243f9)) * **loki.source.file:** Fix position tracking when component stops ([#5800](#5800)) ([9762946](9762946)) * **loki.source.file:** Keep positions for compressed files when reading is finished ([#5723](#5723)) ([fb41d0a](fb41d0a)) * **loki.source.gcplog:** Update to pubsub v2 and fix shutdown semantics ([#5713](#5713)) ([e9d9b69](e9d9b69)) * **loki.source.heroku:** Fix shutdown semantics and consume logs in batches ([#5804](#5804)) ([deda452](deda452)) * **loki.write:** Remove noisy log ([#5837](#5837)) ([8e28f35](8e28f35)) * **loki:** Make drain forward entries with fallback timeout ([#5830](#5830)) ([cfbca90](cfbca90)) * **prometheus.scrape:** Update arguments and targets even if `scrape_native_histograms` and `extra_metrics` are updated ([#5787](#5787)) ([dc4cb0a](dc4cb0a)) * **pyroscope.ebpf:** Update opentelemetry-ebpf-profiler ([#5904](#5904)) ([dfaec47](dfaec47)) * Stop components in a deterministic order ([#5613](#5613)) ([00cd371](00cd371)) ### Chores * Use shared source structures for aws firehose ([#5739](#5739)) ([aef19dc](aef19dc)) --- This PR was generated with [Release Please](https://github.com/googleapis/release-please). See [documentation](https://github.com/googleapis/release-please#release-please). Co-authored-by: grafana-alloybot[bot] <167359181+grafana-alloybot[bot]@users.noreply.github.com>
Pull Request Details
The order we use to stop components either on config update or when alloy stops is not deterministic and is done async.
This is not optimal due to the nature of how pipeline works. Take a loki pipeline as an example
In this pipeline the optimal order is to stop at the source, then process and last write. When we create our components it's done in the opposite order and that is correct.
To do this I added
WeaklyConnectedComponentswith filter support and updatedSynchronizeto accept the graph instead.We use WeaklyConnectedComponents and filter out only leaf nodes for each "sub" graph. For these leaves we can the do a topological walk on each sub graph. The order we get if from leaves to roots, for every node we increase it's rank. For starting we sort all task in ascending order and when stopping we sort them by descending order. Each sub graph also get's a ephemeral groupID that we can use to run start/stop for different graphs in parallel. The rank and groupID is only valid until next Syncronize call so we need to update stored tasks.
Issue(s) fixed by this Pull Request
Fixes: 330
Notes to the Reviewer
PR Checklist