Skip to content

Commit

Permalink
fix error
Browse files Browse the repository at this point in the history
  • Loading branch information
wusamzong committed Sep 14, 2023
1 parent a454e0a commit f0f0c07
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 53 deletions.
1 change: 1 addition & 0 deletions test/e2e/framework/configmanager/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ const (
AppPath = "ws/v1/partition/%s/queue/%s/application/%s"
ClustersPath = "ws/v1/clusters"
NodesPath = "ws/v1/partition/%s/nodes"
NodePath = "ws/v1/partition/%s/node/%s"
UserUsagePath = "ws/v1/partition/%s/usage/user/%s"
GroupUsagePath = "ws/v1/partition/%s/usage/group/%s"
HealthCheckPath = "ws/v1/scheduler/healthcheck"
Expand Down
10 changes: 10 additions & 0 deletions test/e2e/framework/helpers/yunikorn/rest_api_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,16 @@ func (c *RClient) GetNodes(partition string) (*[]dao.NodeDAOInfo, error) {
return &nodes, err
}

func (c *RClient) GetNode(partition string, nodeId string) (*dao.NodeDAOInfo, error) {
req, err := c.newRequest("GET", fmt.Sprintf(configmanager.NodePath, partition, nodeId), nil)
if err != nil {
return nil, err
}
var node dao.NodeDAOInfo
_, err = c.do(req, &node)
return &node, err
}

func (c *RClient) WaitForAppStateTransition(partition string, queue string, appID string, state string, timeout int) error {
return wait.PollImmediate(time.Millisecond*300, time.Duration(timeout)*time.Second, c.isAppInDesiredState(partition, queue, appID, state))
}
Expand Down
115 changes: 62 additions & 53 deletions test/e2e/resource_fairness/resource_fairness_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,10 @@ var restClient yunikorn.RClient
var nodes *v1.NodeList
var workerID1 string
var workerID2 string
var worker1 dao.NodeDAOInfo
var worker2 dao.NodeDAOInfo
var namespace *v1.Namespace
var err error
var queuePath string
var highestUtili int64

var maxCPU int64 = 500
var maxMem int64 = 500
Expand Down Expand Up @@ -77,6 +76,7 @@ var _ = BeforeSuite(func() {
Ω(err).NotTo(HaveOccurred())
Ω(len(nodes.Items)).NotTo(gomega.BeZero(), "Nodes can't be zero")

// get node id
for _, node := range nodes.Items {
node := node
if k8s.IsMasterNode(&node) || !k8s.IsComputeNode(&node) {
Expand All @@ -97,17 +97,20 @@ var _ = BeforeSuite(func() {
err = kClient.TaintNodes(nodesToTaint, taintKey, "value", v1.TaintEffectNoSchedule)
Ω(err).NotTo(HaveOccurred())

// Get the highest utilization of a specific resource type on worker nodes.
var nodesDAOInfo *[]dao.NodeDAOInfo
nodesDAOInfo, err = restClient.GetNodes(constants.DefaultPartition)
Ω(err).NotTo(HaveOccurred())
Ω(nodesDAOInfo).NotTo(gomega.BeNil())

for _, node := range *nodesDAOInfo {
if node.NodeID == workerID1 {
worker1 = node
}
if node.NodeID == workerID2 {
worker2 = node
if node.NodeID == workerID1 || node.NodeID == workerID2 {
if node.Utilized["memory"] > highestUtili {
highestUtili = node.Utilized["memory"]
}
if node.Utilized["vcore"] > highestUtili {
highestUtili = node.Utilized["vcore"]
}
}
}
})
Expand Down Expand Up @@ -244,10 +247,9 @@ var _ = Describe("FairScheduling:", func() {
})

// Validates the order of node allocation for requested pod resources, following the fairNodeScheduling approach.
// Step 1: Deploy 2 apps, which utilizing 5% of the resources on worker1 and 10% on worker2.
// Step 2: Deploy 1 apps, according to fair scheduling principles, will be allocated to worker1.
// This allocation increases the resource allocation on worker1 to 15%.
// Step 3: Deploy 1 apps, according to fair scheduling principles, will be allocated to worker2.
// Step 1: Deploy 2 apps, 5% increase in resource utilization on worker1 and a 10% increase on worker2.
// Step 2: Deploy 1 apps is expected to allocate it to worker1, 15% increase in the utilization of resources on worker1.
// Step 3: Deploy 1 apps is expected to allocate it to worker2.
It("Verify_basic_node_sorting_with_fairness_policy", func() {
By(fmt.Sprintf("update test namespace %s", ns))
namespace, err = kClient.UpdateNamespace(ns, nil)
Expand All @@ -269,46 +271,50 @@ var _ = Describe("FairScheduling:", func() {
return nil
})

sleepPodConfs := []k8s.SleepPodConfig{}

// Select worker1, utilizing 5% of the resources on worker1
// Assign pod1 to worker1 and pod2 to worker2.
sleepPod1Conf := k8s.SleepPodConfig{
Name: "pod1", NS: ns, AppID: "app1",
CPU: fillNodeUtil(&worker1, "vcore", float64(0.05)),
Mem: fillNodeUtil(&worker1, "memory", float64(0.05)) / 1000 / 1000,
RequiredNode: workerID1,
Labels: map[string]string{constants.LabelQueueName: queuePath}}

// Select worker2, utilizing 10% of the resources on worker2
sleepPod2Conf := k8s.SleepPodConfig{
Name: "pod2", NS: ns, AppID: "app2",
CPU: fillNodeUtil(&worker2, "vcore", float64(0.10)),
Mem: fillNodeUtil(&worker2, "memory", float64(0.10)) / 1000 / 1000,
RequiredNode: workerID2,
Labels: map[string]string{constants.LabelQueueName: queuePath}}

// It wouldn't select node, but would utilizing 10% of the resources on worker1
// Verify if pod3 and pod4 are assigned based on node sorting fairness.
sleepPod3Conf := k8s.SleepPodConfig{
Name: "pod3", NS: ns, AppID: "app3",
CPU: fillNodeUtil(&worker1, "vcore", float64(0.10)),
Mem: fillNodeUtil(&worker1, "memory", float64(0.10)) / 1000 / 1000,
Labels: map[string]string{constants.LabelQueueName: queuePath}}

// It wouldn't select node, but would utilizing 10% of the resources on worker2
sleepPod4Conf := k8s.SleepPodConfig{
Name: "pod4", NS: ns, AppID: "app4",
CPU: fillNodeUtil(&worker2, "vcore", float64(0.10)),
Mem: fillNodeUtil(&worker2, "memory", float64(0.10)) / 1000 / 1000,
Labels: map[string]string{constants.LabelQueueName: queuePath}}

sleepPodConfs := []k8s.SleepPodConfig{}
sleepPodConfs = append(sleepPodConfs, sleepPod1Conf)
sleepPodConfs = append(sleepPodConfs, sleepPod2Conf)
sleepPodConfs = append(sleepPodConfs, sleepPod3Conf)
sleepPodConfs = append(sleepPodConfs, sleepPod4Conf)

// The node where the pod is expected to be deployed affects its resource utilization.
expectedNode := []string{
workerID1,
workerID2,
workerID1,
workerID2,
}
basicUtiliz := float64(highestUtili) / 100
increaseUtiliz := []map[string]float64{
{"vcore": 0.05, "memory": 0.05},
{"vcore": 0.10, "memory": 0.10},
{"vcore": 0.15, "memory": 0.15},
{"vcore": 0.15, "memory": 0.15},
}

// Deploy pod
for _, config := range sleepPodConfs {
for idx, config := range sleepPodConfs {
ginkgo.By("Deploy the sleep pod " + config.Name)
config.CPU = fillNodeUtil(expectedNode[idx], "vcore", increaseUtiliz[idx]["vcore"]+basicUtiliz)
config.Mem = fillNodeUtil(expectedNode[idx], "memory", increaseUtiliz[idx]["memory"]+basicUtiliz) / 1000 / 1000
initPod, podErr := k8s.InitSleepPod(config)
Ω(podErr).NotTo(HaveOccurred())
_, err = kClient.CreatePod(initPod, ns)
Expand All @@ -331,12 +337,10 @@ var _ = Describe("FairScheduling:", func() {
})

// Validates the order of node allocation for requested pod resources, considering fairNodeScheduling with resource weights.
// Step 1: Set the Resource weights to {"vcore": 2.0, "memory": 1.0}
// Step 2: Deploy 2 apps, utilizing 13% of the resource on worker1,
// , and 17% of on worker2,
// Step 3: Deploy 1 apps, according to fair scheduling principles, will be allocated to worker1.
// This allocation increases the resource allocation on worker1 to 20%.
// Step 4: Deploy 1 apps, according to fair scheduling principles, will be allocated to worker2.
// Step 1: Set the Resource weights to {"vcore": 5.0, "memory": 1.0}
// Step 2: Deploy 2 apps, increase resource utilization on both worker1 and worker2. It's expected that worker2 will experience higher resource utilization.
// Step 3: Deploy 1 apps is expected to allocate it to worker1, Leading to higher resource utilization on worker1.
// Step 4: Deploy 1 apps is expected to allocate it to worker2.
It("Verify_node_sorting_fairness_policy_with_resource_weight", func() {
By("Setting custom YuniKorn configuration")
annotation = "ann-" + common.RandSeq(10)
Expand All @@ -347,7 +351,7 @@ var _ = Describe("FairScheduling:", func() {
sc.Partitions[0].NodeSortPolicy = configs.NodeSortingPolicy{
Type: "fair",
ResourceWeights: map[string]float64{
"vcore": 2.0,
"vcore": 5.0,
"memory": 1.0,
},
}
Expand All @@ -360,46 +364,49 @@ var _ = Describe("FairScheduling:", func() {
return nil
})

sleepPodConfs := []k8s.SleepPodConfig{}

// Select worker1, utilizing 13% of the resources on worker1
// Assign pod1 to worker1 and pod2 to worker2.
sleepPod1Conf := k8s.SleepPodConfig{
Name: "pod1", NS: ns, AppID: "app1",
CPU: fillNodeUtil(&worker1, "vcore", float64(0.10)),
Mem: fillNodeUtil(&worker1, "memory", float64(0.20)) / 1000 / 1000,
RequiredNode: workerID1,
Labels: map[string]string{constants.LabelQueueName: queuePath}}

// Select worker1, utilizing 17% of the resources on worker2
sleepPod2Conf := k8s.SleepPodConfig{
Name: "pod2", NS: ns, AppID: "app2",
CPU: fillNodeUtil(&worker2, "vcore", float64(0.20)),
Mem: fillNodeUtil(&worker2, "memory", float64(0.10)) / 1000 / 1000,
RequiredNode: workerID2,
Labels: map[string]string{constants.LabelQueueName: queuePath}}

// It wouldn't select node, but would utilizing 17% of the resources on worker1
// Verify if pod3 and pod4 are assigned based on node sorting fairness.
sleepPod3Conf := k8s.SleepPodConfig{
Name: "pod3", NS: ns, AppID: "app3",
CPU: fillNodeUtil(&worker1, "vcore", float64(0.10)),
Mem: 0,
Labels: map[string]string{constants.LabelQueueName: queuePath}}

// It wouldn't select node, but would utilizing 13% of the resources on worker2
sleepPod4Conf := k8s.SleepPodConfig{
Name: "pod4", NS: ns, AppID: "app4",
CPU: 0,
Mem: fillNodeUtil(&worker2, "memory", float64(0.10)) / 1000 / 1000,
Labels: map[string]string{constants.LabelQueueName: queuePath}}

sleepPodConfs := []k8s.SleepPodConfig{}
sleepPodConfs = append(sleepPodConfs, sleepPod1Conf)
sleepPodConfs = append(sleepPodConfs, sleepPod2Conf)
sleepPodConfs = append(sleepPodConfs, sleepPod3Conf)
sleepPodConfs = append(sleepPodConfs, sleepPod4Conf)

// The node where the pod is expected to be deployed affects its resource utilization.
expectedNode := []string{
workerID1,
workerID2,
workerID1,
workerID2,
}
basicUtiliz := float64(highestUtili) / 100
increaseUtiliz := []map[string]float64{
{"vcore": 0.05, "memory": 0.10},
{"vcore": 0.10, "memory": 0.05},
{"vcore": 0.15, "memory": 0.15},
{"vcore": 0.15, "memory": 0.15},
}
// Deploy pod
for _, config := range sleepPodConfs {
for idx, config := range sleepPodConfs {
ginkgo.By("Deploy the sleeppod " + config.Name)
config.CPU = fillNodeUtil(expectedNode[idx], "vcore", increaseUtiliz[idx]["vcore"]+basicUtiliz)
config.Mem = fillNodeUtil(expectedNode[idx], "memory", increaseUtiliz[idx]["memory"]+basicUtiliz) / 1000 / 1000
initPod, podErr := k8s.InitSleepPod(config)
Ω(podErr).NotTo(HaveOccurred())
_, err = kClient.CreatePod(initPod, ns)
Expand Down Expand Up @@ -436,7 +443,9 @@ var _ = Describe("FairScheduling:", func() {
})
})

func fillNodeUtil(node *dao.NodeDAOInfo, resourceType string, percent float64) int64 {
func fillNodeUtil(nodeId string, resourceType string, percent float64) int64 {
node, err := restClient.GetNode(constants.DefaultPartition, nodeId)
Ω(err).NotTo(HaveOccurred())
fillingResource := percent*float64(node.Capacity[resourceType]) - float64(node.Allocated[resourceType])
return int64(fillingResource)
}

0 comments on commit f0f0c07

Please sign in to comment.