Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions internal/dag/dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,3 +228,7 @@ func (g *Graph) Clone() *Graph {
}
return newGraph
}

func (g *Graph) Len() int {
return len(g.nodes)
}
73 changes: 73 additions & 0 deletions internal/dag/weak.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package dag

type FilterFunc func(g *Graph, n Node) bool

// FilterAllFunc includes every node.
func FilterAllFunc(_ *Graph, _ Node) bool {
return true
}

// FilterLeavesFunc includes only leaves.
func FilterLeavesFunc(g *Graph, n Node) bool {
return len(g.outEdges[n]) == 0
}
Comment on lines +11 to +13
Copy link
Copy Markdown
Contributor

@kgeckhart kgeckhart Feb 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the only defined func we use, is it useful to keep the others or wait to add them when we need them?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah so first I just implemented WeaklyConnectedComponents without filtering but for scheduling to work correctly I only need leaves, but felt bad about only adding a specialized function for it.

I am not sure we need the rest of them I just added it for consistency but I can remove unused ones, though I think FilterAllFunc is useful just to test that the algorithm is correct in general


// FilterRootsFunc includes only roots.
func FilterRootsFunc(g *Graph, n Node) bool {
Comment thread
kalleep marked this conversation as resolved.
return len(g.inEdges[n]) == 0
}

// WeaklyConnectedComponents returns the graph split into weakly connected
// components. Two nodes are in the same component if there is a path between them in either direction.
// Each node appears in exactly one component. The graph is unchanged.
func WeaklyConnectedComponents(g *Graph, f FilterFunc) [][]Node {
visited := make(nodeSet)
var components [][]Node

for _, n := range g.Nodes() {
if visited.Has(n) {
continue
}

var (
queue = []Node{n}
component = make([]Node, 0)
)

visited.Add(n)
for len(queue) > 0 {
curr := queue[0]
queue = queue[1:]

if f(g, curr) {
component = append(component, curr)
}

for _, neighbor := range neighbors(g, curr) {
if visited.Has(neighbor) {
continue
}
visited.Add(neighbor)
queue = append(queue, neighbor)
}
}

if len(component) > 0 {
components = append(components, component)
}
}

return components
}

// neighbors returns all nodes adjacent to n ignoring edge direction.
func neighbors(g *Graph, n Node) []Node {
out := make([]Node, 0, len(g.outEdges[n])+len(g.inEdges[n]))
for neighbor := range g.outEdges[n] {
out = append(out, neighbor)
}
for neighbor := range g.inEdges[n] {
out = append(out, neighbor)
}
return out
}
94 changes: 94 additions & 0 deletions internal/dag/weak_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package dag

import (
"sort"
"testing"

"github.com/stretchr/testify/require"
)

func TestWeaklyConnectedComponents(t *testing.T) {
t.Run("two disjoint graphs", func(t *testing.T) {
var g Graph
a, b, c, d := stringNode("a"), stringNode("b"), stringNode("c"), stringNode("d")
g.Add(a)
g.Add(b)
g.Add(c)
g.Add(d)
g.AddEdge(Edge{From: a, To: b})
g.AddEdge(Edge{From: c, To: d})

got := WeaklyConnectedComponents(&g, FilterAllFunc)
got = sortComponents(got)

require.Equal(t, [][]Node{{a, b}, {c, d}}, got)
})

t.Run("single graph", func(t *testing.T) {
var g Graph
a, b, c := stringNode("a"), stringNode("b"), stringNode("c")
g.Add(a)
g.Add(b)
g.Add(c)
g.AddEdge(Edge{From: a, To: b})
g.AddEdge(Edge{From: b, To: c})

got := WeaklyConnectedComponents(&g, FilterAllFunc)
got = sortComponents(got)
require.Equal(t, [][]Node{{a, b, c}}, got)
})

t.Run("isolated graphs", func(t *testing.T) {
var g Graph
a, b, c := stringNode("a"), stringNode("b"), stringNode("c")
g.Add(a)
g.Add(b)
g.Add(c)

got := WeaklyConnectedComponents(&g, FilterAllFunc)
got = sortComponents(got)
require.Equal(t, [][]Node{{a}, {b}, {c}}, got)
})

t.Run("only leaves", func(t *testing.T) {
var g Graph
a, b, c, d := stringNode("a"), stringNode("b"), stringNode("c"), stringNode("d")
g.Add(a)
g.Add(b)
g.Add(c)
g.Add(d)
g.AddEdge(Edge{From: a, To: b})
g.AddEdge(Edge{From: c, To: d})

got := WeaklyConnectedComponents(&g, FilterLeavesFunc)
got = sortComponents(got)
require.Equal(t, [][]Node{{b}, {d}}, got)
})

t.Run("only roots", func(t *testing.T) {
var g Graph
a, b, c, d := stringNode("a"), stringNode("b"), stringNode("c"), stringNode("d")
g.Add(a)
g.Add(b)
g.Add(c)
g.Add(d)
g.AddEdge(Edge{From: a, To: b})
g.AddEdge(Edge{From: c, To: d})

got := WeaklyConnectedComponents(&g, FilterRootsFunc)
got = sortComponents(got)
require.Equal(t, [][]Node{{a}, {c}}, got)
})
}

func sortComponents(components [][]Node) [][]Node {
for _, c := range components {
sort.Slice(c, func(i, j int) bool {
return c[i].NodeID() < c[j].NodeID()
})
}
sort.Slice(components, func(i, j int) bool {
return components[i][0].NodeID() < components[j][0].NodeID()
})
return components
}
35 changes: 6 additions & 29 deletions internal/runtime/alloy.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,9 +265,11 @@ func newController(o controllerOptions) (*Runtime, error) {
// Run starts the Alloy controller, blocking until the provided context is
// canceled. Run must only be called once.
func (f *Runtime) Run(ctx context.Context) {
defer func() { _ = f.sched.Close() }()
defer f.loader.Cleanup(!f.opts.IsModule)
defer level.Debug(f.log).Log("msg", "Alloy controller exiting")
defer func() {
level.Debug(f.log).Log("msg", "Alloy controller exiting")
f.loader.Cleanup(!f.opts.IsModule)
f.sched.Stop()
Comment thread
kgeckhart marked this conversation as resolved.
}()

level.Debug(f.log).Log("msg", "Running alloy controller")

Expand All @@ -284,34 +286,9 @@ func (f *Runtime) Run(ctx context.Context) {
f.loader.EvaluateDependants(ctx, all)
case <-f.loadFinished:
level.Info(f.log).Log("msg", "scheduling loaded components and services")

var (
components = f.loader.Components()
services = f.loader.Services()
imports = f.loader.Imports()

runnables = make([]controller.RunnableNode, 0, len(components)+len(services)+len(imports))
)
for _, c := range components {
runnables = append(runnables, c)
}

for _, i := range imports {
runnables = append(runnables, i)
}

// Only the root controller should run services, since modules share the
// same service instance as the root.
if !f.opts.IsModule {
for _, svc := range services {
runnables = append(runnables, svc)
}
}
Comment thread
kalleep marked this conversation as resolved.

if err := f.sched.Synchronize(runnables); err != nil {
if err := f.sched.Synchronize(f.loader.Graph()); err != nil {
level.Error(f.log).Log("msg", "failed to load components and services", "err", err)
}

f.loadComplete.Store(true)
}
}
Expand Down
Loading
Loading