diff --git a/internal/dag/dag.go b/internal/dag/dag.go index b504ca752b1..55014ed1e7c 100644 --- a/internal/dag/dag.go +++ b/internal/dag/dag.go @@ -228,3 +228,7 @@ func (g *Graph) Clone() *Graph { } return newGraph } + +func (g *Graph) Len() int { + return len(g.nodes) +} diff --git a/internal/dag/weak.go b/internal/dag/weak.go new file mode 100644 index 00000000000..cdd3b6aae1e --- /dev/null +++ b/internal/dag/weak.go @@ -0,0 +1,73 @@ +package dag + +type FilterFunc func(g *Graph, n Node) bool + +// FilterAllFunc includes every node. +func FilterAllFunc(_ *Graph, _ Node) bool { + return true +} + +// FilterLeavesFunc includes only leaves. +func FilterLeavesFunc(g *Graph, n Node) bool { + return len(g.outEdges[n]) == 0 +} + +// FilterRootsFunc includes only roots. +func FilterRootsFunc(g *Graph, n Node) bool { + return len(g.inEdges[n]) == 0 +} + +// WeaklyConnectedComponents returns the graph split into weakly connected +// components. Two nodes are in the same component if there is a path between them in either direction. +// Each node appears in exactly one component. The graph is unchanged. +func WeaklyConnectedComponents(g *Graph, f FilterFunc) [][]Node { + visited := make(nodeSet) + var components [][]Node + + for _, n := range g.Nodes() { + if visited.Has(n) { + continue + } + + var ( + queue = []Node{n} + component = make([]Node, 0) + ) + + visited.Add(n) + for len(queue) > 0 { + curr := queue[0] + queue = queue[1:] + + if f(g, curr) { + component = append(component, curr) + } + + for _, neighbor := range neighbors(g, curr) { + if visited.Has(neighbor) { + continue + } + visited.Add(neighbor) + queue = append(queue, neighbor) + } + } + + if len(component) > 0 { + components = append(components, component) + } + } + + return components +} + +// neighbors returns all nodes adjacent to n ignoring edge direction. +func neighbors(g *Graph, n Node) []Node { + out := make([]Node, 0, len(g.outEdges[n])+len(g.inEdges[n])) + for neighbor := range g.outEdges[n] { + out = append(out, neighbor) + } + for neighbor := range g.inEdges[n] { + out = append(out, neighbor) + } + return out +} diff --git a/internal/dag/weak_test.go b/internal/dag/weak_test.go new file mode 100644 index 00000000000..9873032c1fb --- /dev/null +++ b/internal/dag/weak_test.go @@ -0,0 +1,94 @@ +package dag + +import ( + "sort" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestWeaklyConnectedComponents(t *testing.T) { + t.Run("two disjoint graphs", func(t *testing.T) { + var g Graph + a, b, c, d := stringNode("a"), stringNode("b"), stringNode("c"), stringNode("d") + g.Add(a) + g.Add(b) + g.Add(c) + g.Add(d) + g.AddEdge(Edge{From: a, To: b}) + g.AddEdge(Edge{From: c, To: d}) + + got := WeaklyConnectedComponents(&g, FilterAllFunc) + got = sortComponents(got) + + require.Equal(t, [][]Node{{a, b}, {c, d}}, got) + }) + + t.Run("single graph", func(t *testing.T) { + var g Graph + a, b, c := stringNode("a"), stringNode("b"), stringNode("c") + g.Add(a) + g.Add(b) + g.Add(c) + g.AddEdge(Edge{From: a, To: b}) + g.AddEdge(Edge{From: b, To: c}) + + got := WeaklyConnectedComponents(&g, FilterAllFunc) + got = sortComponents(got) + require.Equal(t, [][]Node{{a, b, c}}, got) + }) + + t.Run("isolated graphs", func(t *testing.T) { + var g Graph + a, b, c := stringNode("a"), stringNode("b"), stringNode("c") + g.Add(a) + g.Add(b) + g.Add(c) + + got := WeaklyConnectedComponents(&g, FilterAllFunc) + got = sortComponents(got) + require.Equal(t, [][]Node{{a}, {b}, {c}}, got) + }) + + t.Run("only leaves", func(t *testing.T) { + var g Graph + a, b, c, d := stringNode("a"), stringNode("b"), stringNode("c"), stringNode("d") + g.Add(a) + g.Add(b) + g.Add(c) + g.Add(d) + g.AddEdge(Edge{From: a, To: b}) + g.AddEdge(Edge{From: c, To: d}) + + got := WeaklyConnectedComponents(&g, FilterLeavesFunc) + got = sortComponents(got) + require.Equal(t, [][]Node{{b}, {d}}, got) + }) + + t.Run("only roots", func(t *testing.T) { + var g Graph + a, b, c, d := stringNode("a"), stringNode("b"), stringNode("c"), stringNode("d") + g.Add(a) + g.Add(b) + g.Add(c) + g.Add(d) + g.AddEdge(Edge{From: a, To: b}) + g.AddEdge(Edge{From: c, To: d}) + + got := WeaklyConnectedComponents(&g, FilterRootsFunc) + got = sortComponents(got) + require.Equal(t, [][]Node{{a}, {c}}, got) + }) +} + +func sortComponents(components [][]Node) [][]Node { + for _, c := range components { + sort.Slice(c, func(i, j int) bool { + return c[i].NodeID() < c[j].NodeID() + }) + } + sort.Slice(components, func(i, j int) bool { + return components[i][0].NodeID() < components[j][0].NodeID() + }) + return components +} diff --git a/internal/runtime/alloy.go b/internal/runtime/alloy.go index bb64b15979e..2e443cd1523 100644 --- a/internal/runtime/alloy.go +++ b/internal/runtime/alloy.go @@ -265,9 +265,11 @@ func newController(o controllerOptions) (*Runtime, error) { // Run starts the Alloy controller, blocking until the provided context is // canceled. Run must only be called once. func (f *Runtime) Run(ctx context.Context) { - defer func() { _ = f.sched.Close() }() - defer f.loader.Cleanup(!f.opts.IsModule) - defer level.Debug(f.log).Log("msg", "Alloy controller exiting") + defer func() { + level.Debug(f.log).Log("msg", "Alloy controller exiting") + f.loader.Cleanup(!f.opts.IsModule) + f.sched.Stop() + }() level.Debug(f.log).Log("msg", "Running alloy controller") @@ -284,34 +286,9 @@ func (f *Runtime) Run(ctx context.Context) { f.loader.EvaluateDependants(ctx, all) case <-f.loadFinished: level.Info(f.log).Log("msg", "scheduling loaded components and services") - - var ( - components = f.loader.Components() - services = f.loader.Services() - imports = f.loader.Imports() - - runnables = make([]controller.RunnableNode, 0, len(components)+len(services)+len(imports)) - ) - for _, c := range components { - runnables = append(runnables, c) - } - - for _, i := range imports { - runnables = append(runnables, i) - } - - // Only the root controller should run services, since modules share the - // same service instance as the root. - if !f.opts.IsModule { - for _, svc := range services { - runnables = append(runnables, svc) - } - } - - if err := f.sched.Synchronize(runnables); err != nil { + if err := f.sched.Synchronize(f.loader.Graph()); err != nil { level.Error(f.log).Log("msg", "failed to load components and services", "err", err) } - f.loadComplete.Store(true) } } diff --git a/internal/runtime/internal/controller/scheduler.go b/internal/runtime/internal/controller/scheduler.go index 6b429fa1101..431bbc04a02 100644 --- a/internal/runtime/internal/controller/scheduler.go +++ b/internal/runtime/internal/controller/scheduler.go @@ -3,11 +3,13 @@ package controller import ( "context" "fmt" + "slices" "sync" "time" "github.com/go-kit/log" + "github.com/grafana/alloy/internal/dag" "github.com/grafana/alloy/internal/runtime/logging/level" ) @@ -25,8 +27,6 @@ type RunnableNode interface { // Scheduler runs components. type Scheduler struct { - ctx context.Context - cancel context.CancelFunc running sync.WaitGroup logger log.Logger taskShutdownDeadline time.Duration @@ -38,12 +38,9 @@ type Scheduler struct { // NewScheduler creates a new Scheduler. Call Synchronize to manage the set of // components which are running. // -// Call Close to stop the Scheduler and all running components. +// Call Stop to stop the Scheduler and all running components. func NewScheduler(logger log.Logger, taskShutdownDeadline time.Duration) *Scheduler { - ctx, cancel := context.WithCancel(context.Background()) return &Scheduler{ - ctx: ctx, - cancel: cancel, logger: logger, taskShutdownDeadline: taskShutdownDeadline, @@ -52,124 +49,122 @@ func NewScheduler(logger log.Logger, taskShutdownDeadline time.Duration) *Schedu } // Synchronize adjusts the set of running components based on the provided -// RunnableNodes in the following way, +// graph in the following way, // // 1. Nodes already managed by the scheduler will be unchanged. // 2. Nodes which are no longer present will be told to shutdown. -// 3. Nodes will be given 1 minute to shutdown before new nodes are created. -// 4. Wait for any remaining nodes to shutdown +// 3. Nodes that is not already managed by the scheduler will be started. // // Nodes are shutdown first to ensure any shared resources, such as ports, -// are allowed to be freed before new nodes are scheduled. As a means to avoid, -// long stretches of downtime we give this a 1 minute timeout. +// are allowed to be freed before new nodes are scheduled. +// +// Tasks are stopped from roots to leaves and started from leaves to roots. // // Existing components will be restarted if they stopped since the previous // call to Synchronize. // // Synchronize is not goroutine safe and should not be called concurrently. -func (s *Scheduler) Synchronize(rr []RunnableNode) error { - select { - case <-s.ctx.Done(): - return fmt.Errorf("scheduler is closed") - default: - } - - newRunnables := make(map[string]RunnableNode, len(rr)) - for _, r := range rr { - newRunnables[r.NodeID()] = r - } +func (s *Scheduler) Synchronize(g *dag.Graph) error { + desiredTasks := desiredTasksFromGraph(g) - // Stop tasks that are not defined in rr. + toStop := make(map[int][]*task) s.tasksMut.Lock() - var stopping sync.WaitGroup for id, t := range s.tasks { - if _, keep := newRunnables[id]; keep { + if dt, keep := desiredTasks[id]; keep { + t.groupID = dt.groupID + t.rank = dt.rank continue } - - level.Debug(s.logger).Log("msg", "Stopping task", "id", id) - stopping.Add(1) - go func(t *task) { - defer stopping.Done() - t.Stop() - }(t) + toStop[t.groupID] = append(toStop[t.groupID], t) } s.tasksMut.Unlock() - // Wrapping the waitgroup with a channel will allow us to wait with a timeout - doneStopping := make(chan struct{}) - go func() { - stopping.Wait() - close(doneStopping) - }() - - stoppingTimedOut := false - select { - case <-doneStopping: - // All tasks stopped successfully within timeout. - case <-time.After(TaskShutdownWarningTimeout): - level.Warn(s.logger).Log("msg", "Some tasks are taking longer than expected to shutdown, proceeding with new tasks") - stoppingTimedOut = true + var stopping sync.WaitGroup + for _, group := range toStop { + stopping.Go(func() { + for _, t := range stopOrder(group) { + t.Stop() + } + }) } - // Launch new runnables that have appeared. + stopping.Wait() + s.tasksMut.Lock() - for id, r := range newRunnables { + toStart := make(map[int][]*task) + + // Launch new runnables that have appeared. + for id, t := range desiredTasks { if _, exist := s.tasks[id]; exist { continue } - level.Debug(s.logger).Log("msg", "Starting task", "id", id) - var ( - nodeID = id - newRunnable = r - ) - - opts := taskOptions{ - context: s.ctx, - runnable: newRunnable, + s.running.Add(1) + task := newTask(t.groupID, t.rank, taskOptions{ + runnable: t.runnable, onDone: func(err error) { defer s.running.Done() - if err != nil { - level.Error(s.logger).Log("msg", "node exited with error", "node", nodeID, "err", err) + level.Error(s.logger).Log("msg", "node exited with error", "node", id, "err", err) } else { - level.Info(s.logger).Log("msg", "node exited without error", "node", nodeID) + level.Info(s.logger).Log("msg", "node exited without error", "node", id) } s.tasksMut.Lock() defer s.tasksMut.Unlock() - delete(s.tasks, nodeID) + delete(s.tasks, id) }, - logger: log.With(s.logger, "taskID", nodeID), + logger: log.With(s.logger, "taskID", id), taskShutdownDeadline: s.taskShutdownDeadline, - } + }) - s.running.Add(1) - s.tasks[nodeID] = newTask(opts) + s.tasks[id] = task + toStart[task.groupID] = append(toStart[task.groupID], task) } s.tasksMut.Unlock() - // If we timed out, wait for stopping tasks to fully exit before returning. - // Tasks shutting down cannot fully complete their shutdown until the taskMut - // lock is released. - if stoppingTimedOut { - <-doneStopping + var starting sync.WaitGroup + + for _, group := range toStart { + starting.Go(func() { + for _, t := range startOrder(group) { + t.Start() + } + }) } + starting.Wait() return nil } -// Close stops the Scheduler and returns after all running goroutines have +// Stop stops the Scheduler and returns after all running goroutines have // exited. -func (s *Scheduler) Close() error { - s.cancel() +func (s *Scheduler) Stop() { + s.tasksMut.Lock() + + toStop := make(map[int][]*task) + for _, t := range s.tasks { + toStop[t.groupID] = append(toStop[t.groupID], t) + } + s.tasksMut.Unlock() + + for _, group := range toStop { + go func() { + // NOTE: we cannot hold lock when calling Stop because onDone will mutate running tasks. + for _, t := range stopOrder(group) { + t.Stop() + } + }() + } + s.running.Wait() - return nil } // task is a scheduled runnable. type task struct { + groupID int + rank int + ctx context.Context cancel context.CancelFunc exited chan struct{} @@ -178,7 +173,6 @@ type task struct { } type taskOptions struct { - context context.Context runnable RunnableNode onDone func(error) logger log.Logger @@ -186,27 +180,36 @@ type taskOptions struct { } // newTask creates and starts a new task. -func newTask(opts taskOptions) *task { - ctx, cancel := context.WithCancel(opts.context) - +func newTask(groupID, rank int, opts taskOptions) *task { t := &task{ - ctx: ctx, - cancel: cancel, - exited: make(chan struct{}), - opts: opts, + groupID: groupID, + rank: rank, + opts: opts, } + t.ctx, t.cancel = context.WithCancel(context.Background()) + t.exited = make(chan struct{}) + + return t +} + +func (t *task) Start() { + level.Debug(t.opts.logger).Log("msg", "Starting task", "id", t.opts.runnable.NodeID()) + go func() { - err := opts.runnable.Run(t.ctx) + err := t.opts.runnable.Run(t.ctx) + // NOTE: make sure we call cancel here so if the runnable + // exit unexpectedly we clean up resources. + t.cancel() close(t.exited) t.doneOnce.Do(func() { t.opts.onDone(err) }) }() - return t } func (t *task) Stop() { + level.Debug(t.opts.logger).Log("msg", "Stopping task", "id", t.opts.runnable.NodeID()) t.cancel() deadlineDuration := t.opts.taskShutdownDeadline @@ -231,3 +234,50 @@ func (t *task) Stop() { } } } + +// desiredTask describes a runnable to be scheduled. +type desiredTask struct { + // rank defines order, start tasks in ascending rank order, stop in descending rank order + rank int + // groupID is ephemeral and can change between Synchronize calls + groupID int + runnable RunnableNode +} + +func desiredTasksFromGraph(g *dag.Graph) map[string]desiredTask { + var ( + desiredTasks = make(map[string]desiredTask, g.Len()) + components = dag.WeaklyConnectedComponents(g, dag.FilterLeavesFunc) + ) + + for groupID, leaves := range components { + var rank int + _ = dag.WalkTopological(g, leaves, func(n dag.Node) error { + if runnable, ok := n.(RunnableNode); ok { + desiredTasks[runnable.NodeID()] = desiredTask{ + rank: rank, + groupID: groupID, + runnable: runnable, + } + rank++ + } + return nil + }) + } + + return desiredTasks +} + +func startOrder(tasks []*task) []*task { + slices.SortFunc(tasks, func(a, b *task) int { + return a.rank - b.rank + }) + return tasks +} + +func stopOrder(tasks []*task) []*task { + slices.SortFunc(tasks, func(a, b *task) int { + return b.rank - a.rank + }) + return tasks +} diff --git a/internal/runtime/internal/controller/scheduler_test.go b/internal/runtime/internal/controller/scheduler_test.go index 8fa0b177483..4fd49ead2bb 100644 --- a/internal/runtime/internal/controller/scheduler_test.go +++ b/internal/runtime/internal/controller/scheduler_test.go @@ -3,7 +3,7 @@ package controller_test import ( "bytes" "context" - "os" + "slices" "sync" "testing" "testing/synctest" @@ -11,16 +11,16 @@ import ( "github.com/go-kit/log" "github.com/stretchr/testify/require" - "go.uber.org/atomic" "github.com/grafana/alloy/internal/component" + "github.com/grafana/alloy/internal/dag" "github.com/grafana/alloy/internal/runtime/internal/controller" "github.com/grafana/alloy/syntax/ast" "github.com/grafana/alloy/syntax/vm" ) func TestScheduler_Synchronize(t *testing.T) { - logger := log.NewLogfmtLogger(os.Stdout) + logger := log.NewNopLogger() t.Run("Can start new jobs", func(t *testing.T) { var started, finished sync.WaitGroup started.Add(3) @@ -34,15 +34,17 @@ func TestScheduler_Synchronize(t *testing.T) { return nil } + g := &dag.Graph{} + + g.Add(&fakeRunnable{ID: "component-a", Component: mockComponent{RunFunc: runFunc}}) + g.Add(&fakeRunnable{ID: "component-b", Component: mockComponent{RunFunc: runFunc}}) + g.Add(&fakeRunnable{ID: "component-c", Component: mockComponent{RunFunc: runFunc}}) + sched := controller.NewScheduler(logger, 1*time.Minute) - sched.Synchronize([]controller.RunnableNode{ - fakeRunnable{ID: "component-a", Component: mockComponent{RunFunc: runFunc}}, - fakeRunnable{ID: "component-b", Component: mockComponent{RunFunc: runFunc}}, - fakeRunnable{ID: "component-c", Component: mockComponent{RunFunc: runFunc}}, - }) + sched.Synchronize(g) started.Wait() - require.NoError(t, sched.Close()) + sched.Stop() finished.Wait() }) @@ -57,17 +59,17 @@ func TestScheduler_Synchronize(t *testing.T) { } sched := controller.NewScheduler(logger, 1*time.Minute) + g := &dag.Graph{} + g.Add(&fakeRunnable{ID: "component-a", Component: mockComponent{RunFunc: runFunc}}) for i := 0; i < 10; i++ { // If a new runnable is created, runFunc will panic since the WaitGroup // only supports 1 goroutine. - sched.Synchronize([]controller.RunnableNode{ - fakeRunnable{ID: "component-a", Component: mockComponent{RunFunc: runFunc}}, - }) + sched.Synchronize(g) } started.Wait() - require.NoError(t, sched.Close()) + sched.Stop() }) t.Run("Runnables which no longer exist are shutdown before new ones are created", func(t *testing.T) { @@ -97,92 +99,27 @@ func TestScheduler_Synchronize(t *testing.T) { } sched := controller.NewScheduler(logger, 1*time.Minute) + g := &dag.Graph{} + g.Add(&fakeRunnable{ID: "component-a", Component: mockComponent{RunFunc: sharedResourceRun}}) + g.Add(&fakeRunnable{ID: "component-b", Component: mockComponent{RunFunc: basicRun}}) - comp1 := fakeRunnable{ID: "component-a", Component: mockComponent{RunFunc: sharedResourceRun}} - comp2 := fakeRunnable{ID: "component-b", Component: mockComponent{RunFunc: basicRun}} - comp3 := fakeRunnable{ID: "component-c", Component: mockComponent{RunFunc: sharedResourceRun}} - - sched.Synchronize([]controller.RunnableNode{comp1, comp2}) + sched.Synchronize(g) started.Wait() started.Add(1) finished.Add(1) - sched.Synchronize([]controller.RunnableNode{comp2, comp3}) + g = &dag.Graph{} + g.Add(&fakeRunnable{ID: "component-b", Component: mockComponent{RunFunc: basicRun}}) + g.Add(&fakeRunnable{ID: "component-c", Component: mockComponent{RunFunc: sharedResourceRun}}) + sched.Synchronize(g) started.Wait() finished.Wait() finished.Add(2) - require.NoError(t, sched.Close()) + sched.Stop() finished.Wait() }) - t.Run("Shutdown will stop waiting after TaskShutdownWarningTimeout to startup components and wait for shutdown after", func(t *testing.T) { - synctest.Test(t, func(t *testing.T) { - var oldTaskExited, newTaskStarted atomic.Bool - - // Old task that takes a long time to stop - slowStop := func(ctx context.Context) error { - <-ctx.Done() - - // Simulate slow shutdown - time.Sleep(2 * controller.TaskShutdownWarningTimeout) - oldTaskExited.Store(true) - return nil - } - - // New task - basicRun := func(ctx context.Context) error { - newTaskStarted.Store(true) - <-ctx.Done() - return nil - } - - sched := controller.NewScheduler(logger, 5*time.Minute) - - // Start component-a with slow stop behavior - comp1 := fakeRunnable{ID: "component-a", Component: mockComponent{RunFunc: slowStop}} - err := sched.Synchronize([]controller.RunnableNode{comp1}) - require.NoError(t, err) - - // Replace with component-b - // This should timeout waiting for component-a, start component-b anyway, - // but not return until component-a fully exits - comp2 := fakeRunnable{ID: "component-b", Component: mockComponent{RunFunc: basicRun}} - - syncDone := make(chan struct{}) - go func() { - err := sched.Synchronize([]controller.RunnableNode{comp2}) - require.NoError(t, err) - close(syncDone) - }() - - // Wait past the timeout for new task to start - time.Sleep(controller.TaskShutdownWarningTimeout + 1*time.Second) - - require.True(t, newTaskStarted.Load(), "new task should have started after timeout") - require.False(t, oldTaskExited.Load(), "old task should still be running") - - select { - case <-syncDone: - t.Error("Synchronize returned before old task finished") - default: - } - - // Wait for old task to finish - time.Sleep(2 * time.Minute) - - select { - case <-syncDone: - default: - t.Error("Synchronize should have returned after old task finished") - } - - require.True(t, oldTaskExited.Load(), "old task should have exited") - require.True(t, newTaskStarted.Load(), "new task should still be running") - - require.NoError(t, sched.Close()) - }) - }) t.Run("Task shutdown deadline logs warnings and errors", func(t *testing.T) { synctest.Test(t, func(t *testing.T) { // Create a thread-safe buffer to capture log output @@ -197,16 +134,17 @@ func TestScheduler_Synchronize(t *testing.T) { } sched := controller.NewScheduler(logger, 2*time.Minute) + g := &dag.Graph{} + g.Add(&fakeRunnable{ID: "blocking-component", Component: mockComponent{RunFunc: runFunc}}) // Start a component - err := sched.Synchronize([]controller.RunnableNode{ - fakeRunnable{ID: "blocking-component", Component: mockComponent{RunFunc: runFunc}}, - }) + err := sched.Synchronize(g) require.NoError(t, err) syncDone := make(chan struct{}) go func() { - err := sched.Synchronize([]controller.RunnableNode{}) + g := &dag.Graph{} + err := sched.Synchronize(g) require.NoError(t, err) close(syncDone) }() @@ -234,12 +172,40 @@ func TestScheduler_Synchronize(t *testing.T) { t.Error("Synchronize should have returned after deadline") } - require.NoError(t, sched.Close()) + sched.Stop() // Sleep long enough to let the runFunc exit to preventing a synctest panic time.Sleep(time.Minute) }) }) + + t.Run("Tasks are shutdown from roots to leaves within each subgraph", func(t *testing.T) { + var stopOrder []string + + edges := []edge{ + {From: "g1_root1", To: "g1_mid1"}, {From: "g1_root1", To: "g1_leaf2"}, {From: "g1_root1", To: "g1_mid2"}, + {From: "g1_mid1", To: "g1_leaf1"}, {From: "g1_mid2", To: "g1_leaf1"}, + {From: "g2_root1", To: "g2_leaf1"}, + } + + g := buildGraphFromEdges(edges, &stopOrder) + sched := controller.NewScheduler(logger, 1*time.Minute) + err := sched.Synchronize(g) + require.NoError(t, err) + sched.Stop() + + indexOf := func(slice []string, id string) int { + return slices.IndexFunc(slice, func(n string) bool { + return n == id + }) + } + + for _, e := range g.Edges() { + from := indexOf(stopOrder, e.From.NodeID()) + to := indexOf(stopOrder, e.To.NodeID()) + require.Less(t, from, to) + } + }) } type fakeRunnable struct { @@ -282,3 +248,35 @@ func (sb *syncBuffer) String() string { defer sb.mu.Unlock() return sb.buf.String() } + +func buildGraphFromEdges(edges []edge, stopOrder *[]string) *dag.Graph { + set := make(map[string]struct{}) + for _, e := range edges { + set[e.From] = struct{}{} + set[e.To] = struct{}{} + } + + var lock sync.Mutex + + nodes := make(map[string]*fakeRunnable, len(set)) + for id := range set { + nodes[id] = &fakeRunnable{ + ID: id, + Component: mockComponent{RunFunc: func(ctx context.Context) error { + <-ctx.Done() + lock.Lock() + defer lock.Unlock() + *stopOrder = append(*stopOrder, id) + return nil + }}, + } + } + g := &dag.Graph{} + for _, n := range nodes { + g.Add(n) + } + for _, e := range edges { + g.AddEdge(dag.Edge{From: nodes[e.From], To: nodes[e.To]}) + } + return g +}