From 3539f95ff701acd29af8ce34db81e2ba673123b0 Mon Sep 17 00:00:00 2001 From: Alex Buchanan Date: Fri, 26 Jan 2018 16:40:29 -0800 Subject: [PATCH] compute/scheduler: fix node task cleanup restart edge case --- compute/scheduler/scheduler.go | 12 +---- tests/config_utils.go | 11 ----- tests/funnel_utils.go | 13 +++++ tests/scheduler/node_test.go | 90 ++++++++++++++++++++++++++++++++++ 4 files changed, 105 insertions(+), 21 deletions(-) diff --git a/compute/scheduler/scheduler.go b/compute/scheduler/scheduler.go index e779e7bd5..588d3d985 100644 --- a/compute/scheduler/scheduler.go +++ b/compute/scheduler/scheduler.go @@ -62,16 +62,8 @@ func (s *Scheduler) CheckNodes() error { if node.State == pbs.NodeState_GONE { for _, tid := range node.TaskIds { - serr := s.Event.WriteEvent(ctx, events.NewState(tid, tes.State_SYSTEM_ERROR)) - if serr != nil { - return fmt.Errorf( - "Error cleaning up task assigned to dead/gone node. taskID: %s nodeID: %s error: %v", - tid, - node.Id, - serr.Error(), - ) - } - s.Event.WriteEvent(ctx, events.NewSystemLog(tid, 0, 0, "error", + s.Event.WriteEvent(ctx, events.NewState(tid, tes.State_SYSTEM_ERROR)) + s.Event.WriteEvent(ctx, events.NewSystemLog(tid, 0, 0, "info", "Cleaning up Task assigned to dead/gone node", map[string]string{ "nodeID": node.Id, })) diff --git a/tests/config_utils.go b/tests/config_utils.go index 653a48567..766359729 100644 --- a/tests/config_utils.go +++ b/tests/config_utils.go @@ -6,7 +6,6 @@ import ( "fmt" "github.com/ohsu-comp-bio/funnel/config" "github.com/ohsu-comp-bio/funnel/logger" - "github.com/ohsu-comp-bio/funnel/proto/tes" "github.com/ohsu-comp-bio/funnel/util/fsutil" "io" "io/ioutil" @@ -139,13 +138,3 @@ func LogConfig() logger.Config { conf.TextFormat.Indent = " " return conf } - -// HelloWorld is a simple, valid task that is easy to reuse in tests. -var HelloWorld = &tes.Task{ - Executors: []*tes.Executor{ - { - Image: "alpine", - Command: []string{"echo", "hello world"}, - }, - }, -} diff --git a/tests/funnel_utils.go b/tests/funnel_utils.go index 0a673bcf3..6419cdf1a 100644 --- a/tests/funnel_utils.go +++ b/tests/funnel_utils.go @@ -468,3 +468,16 @@ func NewRPCConn(conf config.Config, opts ...grpc.DialOption) (*grpc.ClientConn, return conn, nil } + +// HelloWorld is a simple, valid task that is easy to reuse in tests. +func HelloWorld() *tes.Task { + return &tes.Task{ + Id: tes.GenerateID(), + Executors: []*tes.Executor{ + { + Image: "alpine", + Command: []string{"echo", "hello world"}, + }, + }, + } +} diff --git a/tests/scheduler/node_test.go b/tests/scheduler/node_test.go index db389d692..3b9a95863 100644 --- a/tests/scheduler/node_test.go +++ b/tests/scheduler/node_test.go @@ -4,6 +4,7 @@ import ( "context" workercmd "github.com/ohsu-comp-bio/funnel/cmd/worker" "github.com/ohsu-comp-bio/funnel/compute/scheduler" + "github.com/ohsu-comp-bio/funnel/events" "github.com/ohsu-comp-bio/funnel/logger" pbs "github.com/ohsu-comp-bio/funnel/proto/scheduler" "github.com/ohsu-comp-bio/funnel/proto/tes" @@ -158,3 +159,92 @@ func TestDeadNodeTaskCleanup(t *testing.T) { t.Fatal("unexpected task state") } } + +// Tests a bug where tasks and nodes were not being correctly cleaned up +// when the node crashed and was restarted. +func TestNodeCleanup(t *testing.T) { + log := logger.NewLogger("node", tests.LogConfig()) + ctx := context.Background() + + conf := tests.DefaultConfig() + conf.Compute = "manual" + srv := tests.NewFunnel(conf) + + e := srv.Server.Events + + t1 := tests.HelloWorld() + srv.Server.Tasks.CreateTask(ctx, t1) + e.WriteEvent(ctx, events.NewState(t1.Id, tes.Complete)) + + t2 := tests.HelloWorld() + srv.Server.Tasks.CreateTask(ctx, t2) + e.WriteEvent(ctx, events.NewState(t2.Id, tes.Running)) + + t3 := tests.HelloWorld() + srv.Server.Tasks.CreateTask(ctx, t3) + e.WriteEvent(ctx, events.NewState(t3.Id, tes.SystemError)) + + t4 := tests.HelloWorld() + srv.Server.Tasks.CreateTask(ctx, t4) + e.WriteEvent(ctx, events.NewState(t4.Id, tes.Running)) + + t5 := tests.HelloWorld() + srv.Server.Tasks.CreateTask(ctx, t5) + e.WriteEvent(ctx, events.NewState(t5.Id, tes.Running)) + + srv.Scheduler.Nodes.PutNode(ctx, &pbs.Node{ + Id: "test-gone-node-cleanup-restart-1", + State: pbs.NodeState_GONE, + TaskIds: []string{t1.Id, t2.Id, t3.Id}, + }) + + srv.Scheduler.Nodes.PutNode(ctx, &pbs.Node{ + Id: "test-gone-node-cleanup-restart-2", + State: pbs.NodeState_GONE, + TaskIds: []string{t4.Id}, + }) + + srv.Scheduler.Nodes.PutNode(ctx, &pbs.Node{ + Id: "test-gone-node-cleanup-restart-3", + State: pbs.NodeState_ALIVE, + TaskIds: []string{t5.Id}, + }) + + ns, _ := srv.Scheduler.Nodes.ListNodes(ctx, &pbs.ListNodesRequest{}) + log.Info("nodes before", ns) + + err := srv.Scheduler.CheckNodes() + if err != nil { + t.Error(err) + } + + ns, _ = srv.Scheduler.Nodes.ListNodes(ctx, &pbs.ListNodesRequest{}) + if len(ns.Nodes) != 1 { + t.Error("expected 1 node") + } + + if ns.Nodes[0].Id != "test-gone-node-cleanup-restart-3" { + t.Error("unexpected node") + } + + ts, _ := srv.Server.Tasks.ListTasks(ctx, &tes.ListTasksRequest{}) + if len(ts.Tasks) != 5 { + log.Info("tasks", ts) + t.Error("expected 5 tasks") + } + + expected := []tes.State{ + tes.Running, + tes.SystemError, + tes.SystemError, + tes.SystemError, + tes.Complete, + } + + for i, task := range ts.Tasks { + e := expected[i] + if task.State != e { + t.Error("expected state for task", i, task.State, e) + } + } +}