Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pkg/scheduler/plugins/numaaware/numaaware.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,14 +233,14 @@ func getNodeNumaNumForTask(nodeInfo []*api.NodeInfo, resAssignMap map[string]api
assignCpus := resAssignMap[node.Name][string(v1.ResourceCPU)]
nodeNumaCnts[index] = api.ScoredNode{
NodeName: node.Name,
Score: int64(getNumaNodeCntForCpuID(assignCpus, node.NumaSchedulerInfo.CPUDetail)),
Score: int64(getNumaNodeCntForCPUID(assignCpus, node.NumaSchedulerInfo.CPUDetail)),
}
})

return nodeNumaCnts
}

func getNumaNodeCntForCpuID(cpus cpuset.CPUSet, cpuDetails topology.CPUDetails) int {
func getNumaNodeCntForCPUID(cpus cpuset.CPUSet, cpuDetails topology.CPUDetails) int {
mask, _ := bitmask.NewBitMask()
s := cpus.ToSlice()

Expand Down
13 changes: 12 additions & 1 deletion pkg/scheduler/plugins/overcommit/overcommit.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,21 @@ func (op *overcommitPlugin) OnSessionOpen(ssn *framework.Session) {
}
op.idleResource = total.Clone().Multi(op.overCommitFactor).Sub(used)

// calculate inqueue job resources
for _, job := range ssn.Jobs {
// calculate inqueue job resources
if job.PodGroup.Status.Phase == scheduling.PodGroupInqueue && job.PodGroup.Spec.MinResources != nil {
op.inqueueResource.Add(api.NewResource(*job.PodGroup.Spec.MinResources))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need to consider below conner case while PodGroup is in Inqueue.
driver pod is running, part or all of executor pod are pending. The already allocated resource might need to be excluded from op.inqueueResource and then compare with idle resource.

Copy link
Contributor Author

@Thor-wl Thor-wl Mar 4, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need to consider below conner case while PodGroup is in Inqueue. driver pod is running, part or all of executor pod are pending. The already allocated resource might need to be excluded from op.inqueueResource and then compare with idle resource.

It will not be at inqueue phase for Spark application because it will turn into running at the end of the first session. The default minMember will be 1 and the allocated pod number is at least 1.

continue
}
// calculate inqueue resource for running jobs
// the judgement 'job.PodGroup.Status.Running >= job.PodGroup.Spec.MinMember' will work on cases such as the following condition:
// Considering a Spark job is completed(driver pod is completed) while the podgroup keeps running, the allocated resource will be reserved again if without the judgement.
if job.PodGroup.Status.Phase == scheduling.PodGroupRunning &&
job.PodGroup.Spec.MinResources != nil &&
job.PodGroup.Status.Running >= job.PodGroup.Spec.MinMember {
allocated := util.GetAllocatedResource(job)
inqueued := util.GetInqueueResource(job, allocated)
op.inqueueResource.Add(inqueued)
}
}

Expand Down
11 changes: 11 additions & 0 deletions pkg/scheduler/plugins/proportion/proportion.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,17 @@ func (pp *proportionPlugin) OnSessionOpen(ssn *framework.Session) {
if job.PodGroup.Status.Phase == scheduling.PodGroupInqueue {
attr.inqueue.Add(job.GetMinResources())
}

// calculate inqueue resource for running jobs
// the judgement 'job.PodGroup.Status.Running >= job.PodGroup.Spec.MinMember' will work on cases such as the following condition:
// Considering a Spark job is completed(driver pod is completed) while the podgroup keeps running, the allocated resource will be reserved again if without the judgement.
if job.PodGroup.Status.Phase == scheduling.PodGroupRunning &&
job.PodGroup.Spec.MinResources != nil &&
job.PodGroup.Status.Running >= job.PodGroup.Spec.MinMember {
allocated := util.GetAllocatedResource(job)
inqueued := util.GetInqueueResource(job, allocated)
attr.inqueue.Add(inqueued)
}
}

// Record metrics
Expand Down
45 changes: 45 additions & 0 deletions pkg/scheduler/plugins/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,3 +298,48 @@ func NormalizeScore(maxPriority int64, reverse bool, scores []api.ScoredNode) {
scores[idx].Score = score
}
}

// GetAllocatedResource returns allocated resource for given job
func GetAllocatedResource(job *api.JobInfo) *api.Resource {
allocated := &api.Resource{}
for status, tasks := range job.TaskStatusIndex {
if api.AllocatedStatus(status) {
for _, t := range tasks {
allocated.Add(t.Resreq)
}
}
}
return allocated
}

// GetInqueueResource returns reserved resource for running job whose part of pods have not been allocated resource.
func GetInqueueResource(job *api.JobInfo, allocated *api.Resource) *api.Resource {
inqueue := &api.Resource{}
for rName, rQuantity := range *job.PodGroup.Spec.MinResources {
switch rName {
case v1.ResourceCPU:
reservedCPU := float64(rQuantity.Value()) - allocated.MilliCPU
if reservedCPU > 0 {
inqueue.MilliCPU = reservedCPU
}
case v1.ResourceMemory:
reservedMemory := float64(rQuantity.Value()) - allocated.Memory
if reservedMemory > 0 {
inqueue.Memory = reservedMemory
}
default:
if inqueue.ScalarResources == nil {
inqueue.ScalarResources = make(map[v1.ResourceName]float64)
}
if allocatedMount, ok := allocated.ScalarResources[rName]; !ok {
inqueue.ScalarResources[rName] = float64(rQuantity.Value())
} else {
reservedScalarRes := float64(rQuantity.Value()) - allocatedMount
if reservedScalarRes > 0 {
inqueue.ScalarResources[rName] = reservedScalarRes
}
}
}
}
return inqueue
}