Skip to content

Commit

Permalink
Scheduler: better diagnostics for unknown preemption reason (#4055)
Browse files Browse the repository at this point in the history
* Scheduler: better diagnostics for unknown preemption reason

Signed-off-by: Robert Smith <[email protected]>

* fix

Signed-off-by: Robert Smith <[email protected]>

---------

Signed-off-by: Robert Smith <[email protected]>
  • Loading branch information
robertdavidsmith authored Nov 22, 2024
1 parent b3b07b9 commit 6e9f87c
Show file tree
Hide file tree
Showing 12 changed files with 145 additions and 57 deletions.
82 changes: 59 additions & 23 deletions internal/scheduler/internaltypes/node.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package internaltypes

import (
"fmt"
"math"

"github.com/pkg/errors"
Expand Down Expand Up @@ -44,6 +45,8 @@ type Node struct {
// Total space allocatable on this node
totalResources ResourceList

unallocatableResources map[int32]ResourceList

// This field is set when inserting the Node into a NodeDb.
Keys [][]byte

Expand Down Expand Up @@ -92,6 +95,11 @@ func FromSchedulerObjectsNode(node *schedulerobjects.Node,
}
allocatableByPriority[EvictedPriority] = allocatableByPriority[minimumPriority]

unallocatableResources := map[int32]ResourceList{}
for p, u := range node.UnallocatableResources {
unallocatableResources[p] = resourceListFactory.FromJobResourceListIgnoreUnknown(u.Resources)
}

return CreateNode(
node.Id,
nodeType,
Expand All @@ -102,6 +110,7 @@ func FromSchedulerObjectsNode(node *schedulerobjects.Node,
taints,
labels,
resourceListFactory.FromNodeProto(totalResources.Resources),
unallocatableResources,
allocatableByPriority,
map[string]ResourceList{},
map[string]ResourceList{},
Expand All @@ -119,27 +128,29 @@ func CreateNode(
taints []v1.Taint,
labels map[string]string,
totalResources ResourceList,
unallocatableResources map[int32]ResourceList,
allocatableByPriority map[int32]ResourceList,
allocatedByQueue map[string]ResourceList,
allocatedByJobId map[string]ResourceList,
evictedJobRunIds map[string]bool,
keys [][]byte,
) *Node {
return &Node{
id: id,
nodeType: nodeType,
index: index,
executor: executor,
name: name,
pool: pool,
taints: koTaint.DeepCopyTaints(taints),
labels: deepCopyLabels(labels),
totalResources: totalResources,
AllocatableByPriority: maps.Clone(allocatableByPriority),
AllocatedByQueue: maps.Clone(allocatedByQueue),
AllocatedByJobId: maps.Clone(allocatedByJobId),
EvictedJobRunIds: evictedJobRunIds,
Keys: keys,
id: id,
nodeType: nodeType,
index: index,
executor: executor,
name: name,
pool: pool,
taints: koTaint.DeepCopyTaints(taints),
labels: deepCopyLabels(labels),
totalResources: totalResources,
unallocatableResources: maps.Clone(unallocatableResources),
AllocatableByPriority: maps.Clone(allocatableByPriority),
AllocatedByQueue: maps.Clone(allocatedByQueue),
AllocatedByJobId: maps.Clone(allocatedByJobId),
EvictedJobRunIds: evictedJobRunIds,
Keys: keys,
}
}

Expand Down Expand Up @@ -204,18 +215,23 @@ func (node *Node) GetTotalResources() ResourceList {
return node.totalResources
}

func (node *Node) GetUnallocatableResources() map[int32]ResourceList {
return maps.Clone(node.unallocatableResources)
}

func (node *Node) DeepCopyNilKeys() *Node {
return &Node{
// private fields are immutable so a shallow copy is fine
id: node.id,
index: node.index,
executor: node.executor,
name: node.name,
pool: node.pool,
nodeType: node.nodeType,
taints: node.taints,
labels: node.labels,
totalResources: node.totalResources,
id: node.id,
index: node.index,
executor: node.executor,
name: node.name,
pool: node.pool,
nodeType: node.nodeType,
taints: node.taints,
labels: node.labels,
totalResources: node.totalResources,
unallocatableResources: node.unallocatableResources,

// keys set to nil
Keys: nil,
Expand All @@ -228,6 +244,26 @@ func (node *Node) DeepCopyNilKeys() *Node {
}
}

func (node *Node) SummaryString() string {
if node == nil {
return ""
}

result := fmt.Sprintf("Id: %s\n", node.id)
result += fmt.Sprintf("Index: %d\n", node.index)
result += fmt.Sprintf("Executor: %s\n", node.executor)
result += fmt.Sprintf("Name: %s\n", node.name)
result += fmt.Sprintf("Pool: %s\n", node.pool)
result += fmt.Sprintf("TotalResources: %s\n", node.totalResources.String())
result += fmt.Sprintf("Labels: %v\n", node.labels)
result += fmt.Sprintf("Taints: %v\n", node.taints)
for p, u := range node.unallocatableResources {
result += fmt.Sprintf("Unallocatable at %d: %s\n", p, u.String())
}

return result
}

func deepCopyLabels(labels map[string]string) map[string]string {
result := make(map[string]string, len(labels))
for k, v := range labels {
Expand Down
10 changes: 10 additions & 0 deletions internal/scheduler/internaltypes/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,14 @@ func TestNode(t *testing.T) {
"memory": resource.MustParse("32Gi"),
},
)
unallocatableResources := map[int32]ResourceList{
1: resourceListFactory.FromJobResourceListIgnoreUnknown(
map[string]resource.Quantity{
"cpu": resource.MustParse("8"),
"memory": resource.MustParse("16Gi"),
},
),
}
allocatableByPriority := map[int32]ResourceList{
1: resourceListFactory.FromNodeProto(
map[string]resource.Quantity{
Expand Down Expand Up @@ -103,6 +111,7 @@ func TestNode(t *testing.T) {
taints,
labels,
totalResources,
unallocatableResources,
allocatableByPriority,
allocatedByQueue,
allocatedByJobId,
Expand All @@ -119,6 +128,7 @@ func TestNode(t *testing.T) {
assert.Equal(t, taints, node.GetTaints())
assert.Equal(t, labels, node.GetLabels())
assert.Equal(t, totalResources, node.GetTotalResources())
assert.Equal(t, unallocatableResources, node.GetUnallocatableResources())
assert.Equal(t, allocatableByPriority, node.AllocatableByPriority)
assert.Equal(t, allocatedByQueue, node.AllocatedByQueue)
assert.Equal(t, allocatedByJobId, node.AllocatedByJobId)
Expand Down
24 changes: 24 additions & 0 deletions internal/scheduler/internaltypes/testfixtures/testfixtures.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package testfixtures

import (
"github.com/armadaproject/armada/internal/scheduler/internaltypes"
)

func TestSimpleNode(id string) *internaltypes.Node {
return internaltypes.CreateNode(
id,
nil,
0,
"",
"",
"",
nil,
nil,
internaltypes.ResourceList{},
nil,
nil,
nil,
nil,
nil,
nil)
}
5 changes: 3 additions & 2 deletions internal/scheduler/nodedb/nodedb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/armadaproject/armada/internal/common/util"
schedulerconfig "github.com/armadaproject/armada/internal/scheduler/configuration"
"github.com/armadaproject/armada/internal/scheduler/internaltypes"
ittestfixtures "github.com/armadaproject/armada/internal/scheduler/internaltypes/testfixtures"
"github.com/armadaproject/armada/internal/scheduler/jobdb"
"github.com/armadaproject/armada/internal/scheduler/schedulerobjects"
"github.com/armadaproject/armada/internal/scheduler/scheduling/context"
Expand Down Expand Up @@ -81,7 +82,7 @@ func TestSelectNodeForPod_NodeIdLabel_Success(t *testing.T) {
jctxs := context.JobSchedulingContextsFromJobs(jobs)
for _, jctx := range jctxs {
txn := db.Txn(false)
jctx.SetAssignedNodeId(nodeId)
jctx.SetAssignedNode(ittestfixtures.TestSimpleNode(nodeId))
node, err := db.SelectNodeForJobWithTxn(txn, jctx)
txn.Abort()
require.NoError(t, err)
Expand All @@ -106,7 +107,7 @@ func TestSelectNodeForPod_NodeIdLabel_Failure(t *testing.T) {
jctxs := context.JobSchedulingContextsFromJobs(jobs)
for _, jctx := range jctxs {
txn := db.Txn(false)
jctx.SetAssignedNodeId("non-existent node")
jctx.SetAssignedNode(ittestfixtures.TestSimpleNode("non-existent node"))
node, err := db.SelectNodeForJobWithTxn(txn, jctx)
txn.Abort()
if !assert.NoError(t, err) {
Expand Down
1 change: 1 addition & 0 deletions internal/scheduler/nodedb/nodeidindex_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ func makeTestNode(id string) *internaltypes.Node {
[]v1.Taint{},
map[string]string{},
internaltypes.ResourceList{},
nil,
map[int32]internaltypes.ResourceList{},
map[string]internaltypes.ResourceList{},
map[string]internaltypes.ResourceList{},
Expand Down
2 changes: 2 additions & 0 deletions internal/scheduler/nodedb/nodematching_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -662,6 +662,7 @@ func makeTestNodeTaintsLabels(taints []v1.Taint, labels map[string]string) *inte
taints,
labels,
internaltypes.ResourceList{},
nil,
map[int32]internaltypes.ResourceList{},
map[string]internaltypes.ResourceList{},
map[string]internaltypes.ResourceList{},
Expand All @@ -685,6 +686,7 @@ func makeTestNodeResources(t *testing.T, allocatableByPriority map[int32]interna
[]v1.Taint{},
map[string]string{},
totalResources,
nil,
allocatableByPriority,
map[string]internaltypes.ResourceList{},
map[string]internaltypes.ResourceList{},
Expand Down
19 changes: 13 additions & 6 deletions internal/scheduler/scheduling/context/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ type JobSchedulingContext struct {
GangInfo
// This is the node the pod is assigned to.
// This is only set for evicted jobs and is set alongside adding an additionalNodeSelector for the node
AssignedNodeId string
AssignedNode *internaltypes.Node
// Id of job that preempted this pod
PreemptingJobId string
// Description of the cause of preemption
Expand Down Expand Up @@ -109,14 +109,21 @@ func (jctx *JobSchedulingContext) Fail(unschedulableReason string) {
}
}

func (jctx *JobSchedulingContext) GetAssignedNode() *internaltypes.Node {
return jctx.AssignedNode
}

func (jctx *JobSchedulingContext) GetAssignedNodeId() string {
return jctx.AssignedNodeId
if jctx.AssignedNode == nil {
return ""
}
return jctx.AssignedNode.GetId()
}

func (jctx *JobSchedulingContext) SetAssignedNodeId(assignedNodeId string) {
if assignedNodeId != "" {
jctx.AssignedNodeId = assignedNodeId
jctx.AddNodeSelector(schedulerconfig.NodeIdLabel, assignedNodeId)
func (jctx *JobSchedulingContext) SetAssignedNode(assignedNode *internaltypes.Node) {
if assignedNode != nil {
jctx.AssignedNode = assignedNode
jctx.AddNodeSelector(schedulerconfig.NodeIdLabel, assignedNode.GetId())
}
}

Expand Down
17 changes: 11 additions & 6 deletions internal/scheduler/scheduling/context/job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,26 @@ import (
"github.com/stretchr/testify/assert"

"github.com/armadaproject/armada/internal/scheduler/configuration"
ittestfixtures "github.com/armadaproject/armada/internal/scheduler/internaltypes/testfixtures"
"github.com/armadaproject/armada/internal/scheduler/testfixtures"
)

func TestJobSchedulingContext_SetAssignedNodeId(t *testing.T) {
func TestJobSchedulingContext_SetAssignedNode(t *testing.T) {
jctx := &JobSchedulingContext{}

assert.Equal(t, "", jctx.GetAssignedNodeId())
assert.Nil(t, jctx.GetAssignedNode())
assert.Empty(t, jctx.GetAssignedNodeId())
assert.Empty(t, jctx.AdditionalNodeSelectors)

// Will not add a node selector if input is empty
jctx.SetAssignedNodeId("")
assert.Equal(t, "", jctx.GetAssignedNodeId())
// Will not add a node selector if input is nil
jctx.SetAssignedNode(nil)
assert.Nil(t, jctx.GetAssignedNode())
assert.Empty(t, jctx.GetAssignedNodeId())
assert.Empty(t, jctx.AdditionalNodeSelectors)

jctx.SetAssignedNodeId("node1")
n := ittestfixtures.TestSimpleNode("node1")
jctx.SetAssignedNode(n)
assert.Equal(t, n, jctx.GetAssignedNode())
assert.Equal(t, "node1", jctx.GetAssignedNodeId())
assert.Len(t, jctx.AdditionalNodeSelectors, 1)
assert.Equal(t, map[string]string{configuration.NodeIdLabel: "node1"}, jctx.AdditionalNodeSelectors)
Expand Down
2 changes: 1 addition & 1 deletion internal/scheduler/scheduling/eviction.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ func (evi *Evictor) Evict(ctx *armadacontext.Context, nodeDbTxn *memdb.Txn) (*Ev
// TODO(albin): We can remove the checkOnlyDynamicRequirements flag in the nodeDb now that we've added the tolerations.
jctx := schedulercontext.JobSchedulingContextFromJob(job)
jctx.IsEvicted = true
jctx.SetAssignedNodeId(node.GetId())
jctx.SetAssignedNode(node)
evictedJctxsByJobId[job.Id()] = jctx
jctx.AdditionalTolerations = append(jctx.AdditionalTolerations, node.GetTolerationsForTaints()...)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2507,6 +2507,7 @@ func testNodeWithTaints(node *internaltypes.Node, taints []v1.Taint) *internalty
taints,
node.GetLabels(),
node.GetTotalResources(),
nil,
node.AllocatableByPriority,
node.AllocatedByQueue,
node.AllocatedByJobId,
Expand Down
4 changes: 2 additions & 2 deletions internal/scheduler/scheduling/preemption_description.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
)

const (
unknownPreemptionCause = "Preempted by scheduler due to the job failing to reschedule - possibly node resource changed causing this job to be unschedulable"
unknownPreemptionCause = "Preempted by scheduler due to the job failing to reschedule - possibly node resource changed causing this job to be unschedulable\nNode Summary:\n%s"
unknownGangPreemptionCause = "Preempted by scheduler due to the job failing to reschedule - possibly another job in the gang was preempted or the node resource changed causing this job to be unschedulable"
fairSharePreemptionTemplate = "Preempted by scheduler using fair share preemption - preempting job %s"
urgencyPreemptionTemplate = "Preempted by scheduler using urgency preemption - preempting job %s"
Expand Down Expand Up @@ -45,7 +45,7 @@ func PopulatePreemptionDescriptions(preemptedJobs []*context.JobSchedulingContex
if isGang {
preemptedJctx.PreemptionDescription = fmt.Sprintf(unknownGangPreemptionCause)
} else {
preemptedJctx.PreemptionDescription = fmt.Sprintf(unknownPreemptionCause)
preemptedJctx.PreemptionDescription = fmt.Sprintf(unknownPreemptionCause, preemptedJctx.GetAssignedNode().SummaryString())
}
} else if len(potentialPreemptingJobs) == 1 {
preemptedJctx.PreemptionDescription = fmt.Sprintf(urgencyPreemptionTemplate, potentialPreemptingJobs[0].JobId)
Expand Down
Loading

0 comments on commit 6e9f87c

Please sign in to comment.