diff --git a/pkg/scheduler/plugins/numaaware/numaaware.go b/pkg/scheduler/plugins/numaaware/numaaware.go index a6decc6bf0..3e62acdd6e 100644 --- a/pkg/scheduler/plugins/numaaware/numaaware.go +++ b/pkg/scheduler/plugins/numaaware/numaaware.go @@ -233,14 +233,14 @@ func getNodeNumaNumForTask(nodeInfo []*api.NodeInfo, resAssignMap map[string]api assignCpus := resAssignMap[node.Name][string(v1.ResourceCPU)] nodeNumaCnts[index] = api.ScoredNode{ NodeName: node.Name, - Score: int64(getNumaNodeCntForCpuID(assignCpus, node.NumaSchedulerInfo.CPUDetail)), + Score: int64(getNumaNodeCntForCPUID(assignCpus, node.NumaSchedulerInfo.CPUDetail)), } }) return nodeNumaCnts } -func getNumaNodeCntForCpuID(cpus cpuset.CPUSet, cpuDetails topology.CPUDetails) int { +func getNumaNodeCntForCPUID(cpus cpuset.CPUSet, cpuDetails topology.CPUDetails) int { mask, _ := bitmask.NewBitMask() s := cpus.ToSlice() diff --git a/pkg/scheduler/plugins/overcommit/overcommit.go b/pkg/scheduler/plugins/overcommit/overcommit.go index 0086bb79bd..83198eca57 100644 --- a/pkg/scheduler/plugins/overcommit/overcommit.go +++ b/pkg/scheduler/plugins/overcommit/overcommit.go @@ -88,10 +88,21 @@ func (op *overcommitPlugin) OnSessionOpen(ssn *framework.Session) { } op.idleResource = total.Clone().Multi(op.overCommitFactor).Sub(used) - // calculate inqueue job resources for _, job := range ssn.Jobs { + // calculate inqueue job resources if job.PodGroup.Status.Phase == scheduling.PodGroupInqueue && job.PodGroup.Spec.MinResources != nil { op.inqueueResource.Add(api.NewResource(*job.PodGroup.Spec.MinResources)) + continue + } + // calculate inqueue resource for running jobs + // the judgement 'job.PodGroup.Status.Running >= job.PodGroup.Spec.MinMember' will work on cases such as the following condition: + // Considering a Spark job is completed(driver pod is completed) while the podgroup keeps running, the allocated resource will be reserved again if without the judgement. + if job.PodGroup.Status.Phase == scheduling.PodGroupRunning && + job.PodGroup.Spec.MinResources != nil && + job.PodGroup.Status.Running >= job.PodGroup.Spec.MinMember { + allocated := util.GetAllocatedResource(job) + inqueued := util.GetInqueueResource(job, allocated) + op.inqueueResource.Add(inqueued) } } diff --git a/pkg/scheduler/plugins/proportion/proportion.go b/pkg/scheduler/plugins/proportion/proportion.go index d1569993e0..238f8ce6e7 100644 --- a/pkg/scheduler/plugins/proportion/proportion.go +++ b/pkg/scheduler/plugins/proportion/proportion.go @@ -140,6 +140,17 @@ func (pp *proportionPlugin) OnSessionOpen(ssn *framework.Session) { if job.PodGroup.Status.Phase == scheduling.PodGroupInqueue { attr.inqueue.Add(job.GetMinResources()) } + + // calculate inqueue resource for running jobs + // the judgement 'job.PodGroup.Status.Running >= job.PodGroup.Spec.MinMember' will work on cases such as the following condition: + // Considering a Spark job is completed(driver pod is completed) while the podgroup keeps running, the allocated resource will be reserved again if without the judgement. + if job.PodGroup.Status.Phase == scheduling.PodGroupRunning && + job.PodGroup.Spec.MinResources != nil && + job.PodGroup.Status.Running >= job.PodGroup.Spec.MinMember { + allocated := util.GetAllocatedResource(job) + inqueued := util.GetInqueueResource(job, allocated) + attr.inqueue.Add(inqueued) + } } // Record metrics diff --git a/pkg/scheduler/plugins/util/util.go b/pkg/scheduler/plugins/util/util.go index 561c85da22..ee9f4f78fa 100644 --- a/pkg/scheduler/plugins/util/util.go +++ b/pkg/scheduler/plugins/util/util.go @@ -298,3 +298,48 @@ func NormalizeScore(maxPriority int64, reverse bool, scores []api.ScoredNode) { scores[idx].Score = score } } + +// GetAllocatedResource returns allocated resource for given job +func GetAllocatedResource(job *api.JobInfo) *api.Resource { + allocated := &api.Resource{} + for status, tasks := range job.TaskStatusIndex { + if api.AllocatedStatus(status) { + for _, t := range tasks { + allocated.Add(t.Resreq) + } + } + } + return allocated +} + +// GetInqueueResource returns reserved resource for running job whose part of pods have not been allocated resource. +func GetInqueueResource(job *api.JobInfo, allocated *api.Resource) *api.Resource { + inqueue := &api.Resource{} + for rName, rQuantity := range *job.PodGroup.Spec.MinResources { + switch rName { + case v1.ResourceCPU: + reservedCPU := float64(rQuantity.Value()) - allocated.MilliCPU + if reservedCPU > 0 { + inqueue.MilliCPU = reservedCPU + } + case v1.ResourceMemory: + reservedMemory := float64(rQuantity.Value()) - allocated.Memory + if reservedMemory > 0 { + inqueue.Memory = reservedMemory + } + default: + if inqueue.ScalarResources == nil { + inqueue.ScalarResources = make(map[v1.ResourceName]float64) + } + if allocatedMount, ok := allocated.ScalarResources[rName]; !ok { + inqueue.ScalarResources[rName] = float64(rQuantity.Value()) + } else { + reservedScalarRes := float64(rQuantity.Value()) - allocatedMount + if reservedScalarRes > 0 { + inqueue.ScalarResources[rName] = reservedScalarRes + } + } + } + } + return inqueue +}