Skip to content

Commit f398043

Browse files
Merge pull request #491 from yuanchen8911/node_selection
Control the number of feasible nodes to find and score in scheduling
2 parents 87fe68f + 507833d commit f398043

File tree

5 files changed

+108
-22
lines changed

5 files changed

+108
-22
lines changed

cmd/scheduler/app/options/options.go

+20-1
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,11 @@ const (
3333

3434
defaultQPS = 50.0
3535
defaultBurst = 100
36+
37+
// Default parameters to control the number of feasible nodes to find and score
38+
defaultMinPercentageOfNodesToFind = 5
39+
defaultMinNodesToFind = 100
40+
defaultPercentageOfNodesToFind = 100
3641
)
3742

3843
// ServerOption is the main context object for the controller manager.
@@ -50,9 +55,14 @@ type ServerOption struct {
5055
EnablePriorityClass bool
5156
KubeAPIBurst int
5257
KubeAPIQPS float32
53-
// HealthzBindAddress is the IP address and port for the health check server to serve on,
58+
// HealthzBindAddress is the IP address and port for the health check server to serve on
5459
// defaulting to 127.0.0.1:11251
5560
HealthzBindAddress string
61+
62+
// Parameters for scheduling tuning: the number of feasible nodes to find and score
63+
MinNodesToFind int32
64+
MinPercentageOfNodesToFind int32
65+
PercentageOfNodesToFind int32
5666
}
5767

5868
// ServerOpts server options
@@ -84,6 +94,15 @@ func (s *ServerOption) AddFlags(fs *pflag.FlagSet) {
8494
fs.Float32Var(&s.KubeAPIQPS, "kube-api-qps", defaultQPS, "QPS to use while talking with kubernetes apiserver")
8595
fs.IntVar(&s.KubeAPIBurst, "kube-api-burst", defaultBurst, "Burst to use while talking with kubernetes apiserver")
8696
fs.StringVar(&s.HealthzBindAddress, "healthz-bind-address", defaultHealthzBindAddress, "The address to listen on for /healthz HTTP requests.")
97+
98+
// Minimum number of feasible nodes to find and score
99+
fs.Int32Var(&s.MinNodesToFind, "minimum-feasible-nodes", defaultMinNodesToFind, "The minimum number of feasible nodes to find and score")
100+
101+
// Minimum percentage of nodes to find and score
102+
fs.Int32Var(&s.MinPercentageOfNodesToFind, "minimum-percentage-nodes-to-find", defaultMinPercentageOfNodesToFind, "The minimum percentage of nodes to find and score")
103+
104+
// The percentage of nodes that would be scored in each scheduling cycle; if <= 0, an adpative percentage will be calcuated
105+
fs.Int32Var(&s.PercentageOfNodesToFind, "percentage-nodes-to-find", defaultPercentageOfNodesToFind, "The percentage of nodes to find and score, if <=0 will be calcuated based on the cluster size")
87106
}
88107

89108
// CheckOptionOrDie check lock-object-namespace when LeaderElection is enabled

cmd/scheduler/app/options/options_test.go

+10-7
Original file line numberDiff line numberDiff line change
@@ -37,13 +37,16 @@ func TestAddFlags(t *testing.T) {
3737

3838
// This is a snapshot of expected options parsed by args.
3939
expected := &ServerOption{
40-
SchedulerName: defaultSchedulerName,
41-
SchedulePeriod: 5 * time.Minute,
42-
DefaultQueue: defaultQueue,
43-
ListenAddress: defaultListenAddress,
44-
KubeAPIBurst: defaultBurst,
45-
KubeAPIQPS: defaultQPS,
46-
HealthzBindAddress: "127.0.0.1:11251",
40+
SchedulerName: defaultSchedulerName,
41+
SchedulePeriod: 5 * time.Minute,
42+
DefaultQueue: defaultQueue,
43+
ListenAddress: defaultListenAddress,
44+
KubeAPIBurst: defaultBurst,
45+
KubeAPIQPS: defaultQPS,
46+
HealthzBindAddress: "127.0.0.1:11251",
47+
MinNodesToFind: defaultMinNodesToFind,
48+
MinPercentageOfNodesToFind: defaultMinPercentageOfNodesToFind,
49+
PercentageOfNodesToFind: defaultPercentageOfNodesToFind,
4750
}
4851

4952
if !reflect.DeepEqual(expected, s) {

pkg/scheduler/actions/allocate/allocate_test.go

+8
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2626
"k8s.io/client-go/tools/record"
2727

28+
"volcano.sh/volcano/cmd/scheduler/app/options"
2829
kbv1 "volcano.sh/volcano/pkg/apis/scheduling/v1alpha2"
2930
"volcano.sh/volcano/pkg/scheduler/api"
3031
"volcano.sh/volcano/pkg/scheduler/cache"
@@ -38,6 +39,13 @@ import (
3839
func TestAllocate(t *testing.T) {
3940
framework.RegisterPluginBuilder("drf", drf.New)
4041
framework.RegisterPluginBuilder("proportion", proportion.New)
42+
43+
options.ServerOpts = &options.ServerOption{
44+
MinNodesToFind: 100,
45+
MinPercentageOfNodesToFind: 5,
46+
PercentageOfNodesToFind: 100,
47+
}
48+
4149
defer framework.CleanupPluginBuilders()
4250

4351
tests := []struct {

pkg/scheduler/actions/preempt/preempt_test.go

+6
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2525
"k8s.io/client-go/tools/record"
2626

27+
"volcano.sh/volcano/cmd/scheduler/app/options"
2728
kbv1 "volcano.sh/volcano/pkg/apis/scheduling/v1alpha2"
2829
"volcano.sh/volcano/pkg/scheduler/api"
2930
"volcano.sh/volcano/pkg/scheduler/cache"
@@ -37,6 +38,11 @@ import (
3738
func TestPreempt(t *testing.T) {
3839
framework.RegisterPluginBuilder("conformance", conformance.New)
3940
framework.RegisterPluginBuilder("gang", gang.New)
41+
options.ServerOpts = &options.ServerOption{
42+
MinNodesToFind: 100,
43+
MinPercentageOfNodesToFind: 5,
44+
PercentageOfNodesToFind: 100,
45+
}
4046
defer framework.CleanupPluginBuilders()
4147

4248
tests := []struct {

pkg/scheduler/util/scheduler_helper.go

+64-14
Original file line numberDiff line numberDiff line change
@@ -18,29 +18,69 @@ package util
1818

1919
import (
2020
"context"
21+
"github.com/golang/glog"
22+
"k8s.io/client-go/util/workqueue"
23+
schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
2124
"math"
2225
"math/rand"
2326
"sort"
2427
"sync"
25-
26-
"github.com/golang/glog"
27-
"k8s.io/client-go/util/workqueue"
28-
29-
schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
28+
"sync/atomic"
29+
"volcano.sh/volcano/cmd/scheduler/app/options"
3030
"volcano.sh/volcano/pkg/scheduler/api"
3131
)
3232

33-
// PredicateNodes returns nodes that fit task
34-
func PredicateNodes(task *api.TaskInfo, nodes []*api.NodeInfo, fn api.PredicateFn) ([]*api.NodeInfo, *api.FitErrors) {
35-
var predicateNodes []*api.NodeInfo
33+
const baselinePercentageOfNodesToFind = 50
3634

37-
var workerLock sync.Mutex
35+
var lastProcessedNodeIndex int
36+
37+
// CalculateNumOfFeasibleNodesToFind returns the number of feasible nodes that once found,
38+
// the scheduler stops its search for more feasible nodes.
39+
func CalculateNumOfFeasibleNodesToFind(numAllNodes int32) (numNodes int32) {
40+
opts := options.ServerOpts
41+
if numAllNodes <= opts.MinNodesToFind || opts.PercentageOfNodesToFind >= 100 {
42+
return numAllNodes
43+
}
44+
45+
adaptivePercentage := opts.PercentageOfNodesToFind
46+
if adaptivePercentage <= 0 {
47+
adaptivePercentage = baselinePercentageOfNodesToFind - numAllNodes/125
48+
if adaptivePercentage < opts.MinPercentageOfNodesToFind {
49+
adaptivePercentage = opts.MinPercentageOfNodesToFind
50+
}
51+
}
52+
53+
numNodes = numAllNodes * adaptivePercentage / 100
54+
if numNodes < opts.MinNodesToFind {
55+
numNodes = opts.MinNodesToFind
56+
}
57+
return numNodes
58+
}
59+
60+
// PredicateNodes returns the specified number of nodes that fit a task
61+
func PredicateNodes(task *api.TaskInfo, nodes []*api.NodeInfo, fn api.PredicateFn) ([]*api.NodeInfo, *api.FitErrors) {
62+
//var workerLock sync.Mutex
3863

3964
var errorLock sync.Mutex
4065
fe := api.NewFitErrors()
4166

67+
allNodes := len(nodes)
68+
numNodesToFind := CalculateNumOfFeasibleNodesToFind(int32(allNodes))
69+
70+
//allocate enough space to avoid growing it
71+
predicateNodes := make([]*api.NodeInfo, numNodesToFind)
72+
73+
numFoundNodes := int32(0)
74+
processedNodes := int32(0)
75+
76+
//create a context with cancellation
77+
ctx, cancel := context.WithCancel(context.Background())
78+
4279
checkNode := func(index int) {
43-
node := nodes[index]
80+
// Check the nodes starting from where is left off in the previous scheduling cycle,
81+
// to make sure all nodes have the same chance of being examined across pods.
82+
node := nodes[(lastProcessedNodeIndex+index)%allNodes]
83+
atomic.AddInt32(&processedNodes, 1)
4484
glog.V(3).Infof("Considering Task <%v/%v> on node <%v>: <%v> vs. <%v>",
4585
task.Namespace, task.Name, node.Name, task.Resreq, node.Idle)
4686

@@ -54,12 +94,22 @@ func PredicateNodes(task *api.TaskInfo, nodes []*api.NodeInfo, fn api.PredicateF
5494
return
5595
}
5696

57-
workerLock.Lock()
58-
predicateNodes = append(predicateNodes, node)
59-
workerLock.Unlock()
97+
//check if the number of found nodes is more than the numNodesTofind
98+
length := atomic.AddInt32(&numFoundNodes, 1)
99+
if length > numNodesToFind {
100+
cancel()
101+
atomic.AddInt32(&numFoundNodes, -1)
102+
} else {
103+
predicateNodes[length-1] = node
104+
}
60105
}
61106

62-
workqueue.ParallelizeUntil(context.TODO(), 16, len(nodes), checkNode)
107+
//workqueue.ParallelizeUntil(context.TODO(), 16, len(nodes), checkNode)
108+
workqueue.ParallelizeUntil(ctx, 16, allNodes, checkNode)
109+
110+
//processedNodes := int(numFoundNodes) + len(filteredNodesStatuses) + len(failedPredicateMap)
111+
lastProcessedNodeIndex = (lastProcessedNodeIndex + int(processedNodes)) % allNodes
112+
predicateNodes = predicateNodes[:numFoundNodes]
63113
return predicateNodes, fe
64114
}
65115

0 commit comments

Comments
 (0)