Skip to content

Commit

Permalink
compute/scheduler: fix node task cleanup restart edge case
Browse files Browse the repository at this point in the history
  • Loading branch information
Alex Buchanan committed Jan 27, 2018
1 parent 227734b commit 3539f95
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 21 deletions.
12 changes: 2 additions & 10 deletions compute/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,16 +62,8 @@ func (s *Scheduler) CheckNodes() error {

if node.State == pbs.NodeState_GONE {
for _, tid := range node.TaskIds {
serr := s.Event.WriteEvent(ctx, events.NewState(tid, tes.State_SYSTEM_ERROR))
if serr != nil {
return fmt.Errorf(
"Error cleaning up task assigned to dead/gone node. taskID: %s nodeID: %s error: %v",
tid,
node.Id,
serr.Error(),
)
}
s.Event.WriteEvent(ctx, events.NewSystemLog(tid, 0, 0, "error",
s.Event.WriteEvent(ctx, events.NewState(tid, tes.State_SYSTEM_ERROR))
s.Event.WriteEvent(ctx, events.NewSystemLog(tid, 0, 0, "info",
"Cleaning up Task assigned to dead/gone node", map[string]string{
"nodeID": node.Id,
}))
Expand Down
11 changes: 0 additions & 11 deletions tests/config_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"fmt"
"github.com/ohsu-comp-bio/funnel/config"
"github.com/ohsu-comp-bio/funnel/logger"
"github.com/ohsu-comp-bio/funnel/proto/tes"
"github.com/ohsu-comp-bio/funnel/util/fsutil"
"io"
"io/ioutil"
Expand Down Expand Up @@ -139,13 +138,3 @@ func LogConfig() logger.Config {
conf.TextFormat.Indent = " "
return conf
}

// HelloWorld is a simple, valid task that is easy to reuse in tests.
var HelloWorld = &tes.Task{
Executors: []*tes.Executor{
{
Image: "alpine",
Command: []string{"echo", "hello world"},
},
},
}
13 changes: 13 additions & 0 deletions tests/funnel_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -468,3 +468,16 @@ func NewRPCConn(conf config.Config, opts ...grpc.DialOption) (*grpc.ClientConn,

return conn, nil
}

// HelloWorld is a simple, valid task that is easy to reuse in tests.
func HelloWorld() *tes.Task {
return &tes.Task{
Id: tes.GenerateID(),
Executors: []*tes.Executor{
{
Image: "alpine",
Command: []string{"echo", "hello world"},
},
},
}
}
90 changes: 90 additions & 0 deletions tests/scheduler/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
workercmd "github.com/ohsu-comp-bio/funnel/cmd/worker"
"github.com/ohsu-comp-bio/funnel/compute/scheduler"
"github.com/ohsu-comp-bio/funnel/events"
"github.com/ohsu-comp-bio/funnel/logger"
pbs "github.com/ohsu-comp-bio/funnel/proto/scheduler"
"github.com/ohsu-comp-bio/funnel/proto/tes"
Expand Down Expand Up @@ -158,3 +159,92 @@ func TestDeadNodeTaskCleanup(t *testing.T) {
t.Fatal("unexpected task state")
}
}

// Tests a bug where tasks and nodes were not being correctly cleaned up
// when the node crashed and was restarted.
func TestNodeCleanup(t *testing.T) {
log := logger.NewLogger("node", tests.LogConfig())
ctx := context.Background()

conf := tests.DefaultConfig()
conf.Compute = "manual"
srv := tests.NewFunnel(conf)

e := srv.Server.Events

t1 := tests.HelloWorld()
srv.Server.Tasks.CreateTask(ctx, t1)
e.WriteEvent(ctx, events.NewState(t1.Id, tes.Complete))

t2 := tests.HelloWorld()
srv.Server.Tasks.CreateTask(ctx, t2)
e.WriteEvent(ctx, events.NewState(t2.Id, tes.Running))

t3 := tests.HelloWorld()
srv.Server.Tasks.CreateTask(ctx, t3)
e.WriteEvent(ctx, events.NewState(t3.Id, tes.SystemError))

t4 := tests.HelloWorld()
srv.Server.Tasks.CreateTask(ctx, t4)
e.WriteEvent(ctx, events.NewState(t4.Id, tes.Running))

t5 := tests.HelloWorld()
srv.Server.Tasks.CreateTask(ctx, t5)
e.WriteEvent(ctx, events.NewState(t5.Id, tes.Running))

srv.Scheduler.Nodes.PutNode(ctx, &pbs.Node{
Id: "test-gone-node-cleanup-restart-1",
State: pbs.NodeState_GONE,
TaskIds: []string{t1.Id, t2.Id, t3.Id},
})

srv.Scheduler.Nodes.PutNode(ctx, &pbs.Node{
Id: "test-gone-node-cleanup-restart-2",
State: pbs.NodeState_GONE,
TaskIds: []string{t4.Id},
})

srv.Scheduler.Nodes.PutNode(ctx, &pbs.Node{
Id: "test-gone-node-cleanup-restart-3",
State: pbs.NodeState_ALIVE,
TaskIds: []string{t5.Id},
})

ns, _ := srv.Scheduler.Nodes.ListNodes(ctx, &pbs.ListNodesRequest{})
log.Info("nodes before", ns)

err := srv.Scheduler.CheckNodes()
if err != nil {
t.Error(err)
}

ns, _ = srv.Scheduler.Nodes.ListNodes(ctx, &pbs.ListNodesRequest{})
if len(ns.Nodes) != 1 {
t.Error("expected 1 node")
}

if ns.Nodes[0].Id != "test-gone-node-cleanup-restart-3" {
t.Error("unexpected node")
}

ts, _ := srv.Server.Tasks.ListTasks(ctx, &tes.ListTasksRequest{})
if len(ts.Tasks) != 5 {
log.Info("tasks", ts)
t.Error("expected 5 tasks")
}

expected := []tes.State{
tes.Running,
tes.SystemError,
tes.SystemError,
tes.SystemError,
tes.Complete,
}

for i, task := range ts.Tasks {
e := expected[i]
if task.State != e {
t.Error("expected state for task", i, task.State, e)
}
}
}

0 comments on commit 3539f95

Please sign in to comment.