Skip to content

Commit

Permalink
[YUNIKORN-1877] Add additional cases of preemption to the e2e test (#643
Browse files Browse the repository at this point in the history
)

Closes: #643

Signed-off-by: Craig Condit <[email protected]>
  • Loading branch information
wusamzong authored and craigcondit committed Aug 23, 2023
1 parent 95c55a9 commit a2b772e
Showing 1 changed file with 224 additions and 0 deletions.
224 changes: 224 additions & 0 deletions test/e2e/preemption/preemption_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import (
"github.com/onsi/ginkgo/v2"
"github.com/onsi/gomega"
v1 "k8s.io/api/core/v1"
schedulingv1 "k8s.io/api/scheduling/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/apache/yunikorn-core/pkg/common/configs"
"github.com/apache/yunikorn-k8shim/pkg/common/constants"
Expand All @@ -46,6 +48,7 @@ var annotation = "ann-" + common.RandSeq(10)
var Worker = ""
var WorkerMemRes int64
var sleepPodMemLimit int64
var sleepPodMemLimit2 int64
var taintKey = "e2e_test_preemption"
var nodesToTaint []string

Expand Down Expand Up @@ -107,6 +110,10 @@ var _ = ginkgo.BeforeSuite(func() {
sleepPodMemLimit = int64(float64(WorkerMemRes) / 3)
Ω(sleepPodMemLimit).NotTo(gomega.BeZero(), "Sleep pod memory limit cannot be zero")
fmt.Fprintf(ginkgo.GinkgoWriter, "Sleep pod limit memory %dM\n", sleepPodMemLimit)

sleepPodMemLimit2 = int64(float64(WorkerMemRes) / 4)
Ω(sleepPodMemLimit2).NotTo(gomega.BeZero(), "Sleep pod memory limit cannot be zero")
fmt.Fprintf(ginkgo.GinkgoWriter, "Sleep pod limit memory %dM\n", sleepPodMemLimit2)
})

var _ = ginkgo.AfterSuite(func() {
Expand Down Expand Up @@ -328,6 +335,223 @@ var _ = ginkgo.Describe("Preemption", func() {
}
})

ginkgo.It("Verify_preemption_on_priority_queue", func() {
ginkgo.By("A task can only preempt a task with lower or equal priority")
// update config
ginkgo.By(fmt.Sprintf("Update root.sandbox1, root.low-priority, root.high-priority with guaranteed memory %dM", sleepPodMemLimit))
annotation = "ann-" + common.RandSeq(10)
yunikorn.UpdateCustomConfigMapWrapper(oldConfigMap, "", annotation, func(sc *configs.SchedulerConfig) error {
// remove placement rules so we can control queue
sc.Partitions[0].PlacementRules = nil

var err error
if err = common.AddQueue(sc, "default", "root", configs.QueueConfig{
Name: "high-priority",
Resources: configs.Resources{Guaranteed: map[string]string{"memory": fmt.Sprintf("%dM", sleepPodMemLimit)}},
Properties: map[string]string{"preemption.delay": "1s", "priority.offset": "100"},
}); err != nil {
return err
}

if err = common.AddQueue(sc, "default", "root", configs.QueueConfig{
Name: "sandbox1",
Resources: configs.Resources{Guaranteed: map[string]string{"memory": fmt.Sprintf("%dM", sleepPodMemLimit)}},
Properties: map[string]string{"preemption.delay": "1s", "priority.offset": "0"},
}); err != nil {
return err
}

if err = common.AddQueue(sc, "default", "root", configs.QueueConfig{
Name: "low-priority",
Resources: configs.Resources{Guaranteed: map[string]string{"memory": fmt.Sprintf("%dM", sleepPodMemLimit)}},
Properties: map[string]string{"preemption.delay": "1s", "priority.offset": "-100"},
}); err != nil {
return err
}
return nil
})

// Define sleepPod
sandbox1SleepPodConfigs := createSandbox1SleepPodCofigs(3, 30)
sleepPod4Config := k8s.SleepPodConfig{Name: "sleepjob4", NS: dev, Mem: sleepPodMemLimit, Time: 600, Optedout: true, Labels: map[string]string{"queue": "root.low-priority"}}
sleepPod5Config := k8s.SleepPodConfig{Name: "sleepjob5", NS: dev, Mem: sleepPodMemLimit, Time: 600, Optedout: true, Labels: map[string]string{"queue": "root.high-priority"}}

for _, config := range sandbox1SleepPodConfigs {
ginkgo.By("Deploy the sleep pod " + config.Name + " to the development namespace")
sleepObj, podErr := k8s.InitSleepPod(config)
Ω(podErr).NotTo(gomega.HaveOccurred())
sleepRespPod, podErr := kClient.CreatePod(sleepObj, dev)
gomega.Ω(podErr).NotTo(gomega.HaveOccurred())

// Wait for pod to move to running state
podErr = kClient.WaitForPodBySelectorRunning(dev,
fmt.Sprintf("app=%s", sleepRespPod.ObjectMeta.Labels["app"]),
60)
gomega.Ω(podErr).NotTo(gomega.HaveOccurred())
}

// Deploy sleepjob4 pod in root.low-priority
ginkgo.By("Deploy the sleep pod " + sleepPod4Config.Name + " to the development namespace")
sleepObj, podErr := k8s.InitSleepPod(sleepPod4Config)
Ω(podErr).NotTo(gomega.HaveOccurred())
sleepRespPod4, err := kClient.CreatePod(sleepObj, dev)
gomega.Ω(err).NotTo(gomega.HaveOccurred())

// Deploy sleepjob5 pod in root.high-priority
ginkgo.By("Deploy the sleep pod " + sleepPod5Config.Name + " to the development namespace")
sleepObj, podErr = k8s.InitSleepPod(sleepPod5Config)
Ω(podErr).NotTo(gomega.HaveOccurred())
sleepRespPod5, err := kClient.CreatePod(sleepObj, dev)
gomega.Ω(err).NotTo(gomega.HaveOccurred())

// sleepjob4 pod can't be scheduled before pods in root.sandbox1 are succeeded
ginkgo.By("The sleep pod " + sleepPod4Config.Name + " can't be scheduled")
err = kClient.WaitForPodUnschedulable(sleepRespPod4, 30*time.Second)
gomega.Ω(err).NotTo(gomega.HaveOccurred())

// sleepjob5 pod can be scheduled before pods in root.sandbox1 are succeeded
ginkgo.By("The sleep pod " + sleepPod5Config.Name + " can be scheduled")
err = kClient.WaitForPodScheduled(ns.Name, sleepRespPod5.Name, 30*time.Second)
gomega.Ω(err).NotTo(gomega.HaveOccurred())

// assert one of the pods in root.sandbox1 is preempted
ginkgo.By("One of the pods in root.sanbox1 is preempted")
sandbox1RunningPodsCnt := 0
pods, err := kClient.ListPodsByLabelSelector(dev, "queue=root.sandbox1")
gomega.Ω(err).NotTo(gomega.HaveOccurred())
for _, pod := range pods.Items {
if pod.DeletionTimestamp != nil {
continue
}
if pod.Status.Phase == v1.PodRunning {
sandbox1RunningPodsCnt++
}
}
Ω(sandbox1RunningPodsCnt).To(gomega.Equal(2), "One of the pods in root.sandbox1 should be preempted")
})

ginkgo.It("Verify_allow_preemption_tag", func() {
ginkgo.By("The value of 'false' for the allow preemption annotation on the PriorityClass moves the Pod to the back of the preemption list")
// update config
ginkgo.By(fmt.Sprintf("Update root.sandbox1, root.sandbox2 and root.sandbox3 with guaranteed memory %dM", sleepPodMemLimit2))
annotation = "ann-" + common.RandSeq(10)
yunikorn.UpdateCustomConfigMapWrapper(oldConfigMap, "", annotation, func(sc *configs.SchedulerConfig) error {
// remove placement rules so we can control queue
sc.Partitions[0].PlacementRules = nil

var err error
if err = common.AddQueue(sc, "default", "root", configs.QueueConfig{
Name: "sandbox1",
Resources: configs.Resources{Guaranteed: map[string]string{"memory": fmt.Sprintf("%dM", sleepPodMemLimit2)}},
Properties: map[string]string{"preemption.delay": "1s"},
}); err != nil {
return err
}

if err = common.AddQueue(sc, "default", "root", configs.QueueConfig{
Name: "sandbox2",
Resources: configs.Resources{Guaranteed: map[string]string{"memory": fmt.Sprintf("%dM", sleepPodMemLimit2)}},
Properties: map[string]string{"preemption.delay": "1s"},
}); err != nil {
return err
}

if err = common.AddQueue(sc, "default", "root", configs.QueueConfig{
Name: "sandbox3",
Resources: configs.Resources{Guaranteed: map[string]string{"memory": fmt.Sprintf("%dM", sleepPodMemLimit2)}},
Properties: map[string]string{"preemption.delay": "1s"},
}); err != nil {
return err
}
return nil
})

// Define PriorityClass
var preemptAllowPriorityClass = schedulingv1.PriorityClass{
ObjectMeta: metav1.ObjectMeta{
Name: "allow-preemption",
Annotations: map[string]string{constants.AnnotationAllowPreemption: constants.True},
},
}
var preemptNotAllowPriorityClass = schedulingv1.PriorityClass{
ObjectMeta: metav1.ObjectMeta{
Name: "preemption-not-allow",
Annotations: map[string]string{constants.AnnotationAllowPreemption: constants.False},
},
}

// Create PriorityClass
ginkgo.By(fmt.Sprintf("Creating priority class %s", preemptAllowPriorityClass.Name))
_, err := kClient.CreatePriorityClass(&preemptAllowPriorityClass)
gomega.Ω(err).ShouldNot(HaveOccurred())
ginkgo.By(fmt.Sprintf("Creating priority class %s", preemptNotAllowPriorityClass.Name))
_, err = kClient.CreatePriorityClass(&preemptNotAllowPriorityClass)
gomega.Ω(err).ShouldNot(HaveOccurred())

// Define sleepPod
sleepPod1Config := k8s.SleepPodConfig{Name: "sleepjob1", NS: dev, Mem: sleepPodMemLimit2, Time: 60, Optedout: true, Labels: map[string]string{"queue": "root.sandbox1"}}
sleepPod2Config := k8s.SleepPodConfig{Name: "sleepjob2", NS: dev, Mem: sleepPodMemLimit2, Time: 60, Optedout: true, Labels: map[string]string{"queue": "root.sandbox1"}}
sleepPod3Config := k8s.SleepPodConfig{Name: "sleepjob3", NS: dev, Mem: sleepPodMemLimit2, Time: 60, Optedout: false, Labels: map[string]string{"queue": "root.sandbox2"}}
sleepPod4Config := k8s.SleepPodConfig{Name: "sleepjob4", NS: dev, Mem: sleepPodMemLimit2, Time: 60, Optedout: false, Labels: map[string]string{"queue": "root.sandbox2"}}
sleepPod5Config := k8s.SleepPodConfig{Name: "sleepjob5", NS: dev, Mem: sleepPodMemLimit2, Time: 600, Optedout: true, Labels: map[string]string{"queue": "root.sandbox3"}}

for _, config := range []k8s.SleepPodConfig{sleepPod1Config, sleepPod2Config, sleepPod3Config, sleepPod4Config} {
ginkgo.By("Deploy the sleep pod " + config.Name + " to the development namespace")
sleepObj, podErr := k8s.InitSleepPod(config)

// Setting PriorityClasses for Pods in a specific queue
if config.Name == "sleepjob3" || config.Name == "sleepjob4" {
sleepObj.Spec.PriorityClassName = "preemption-not-allow"
} else {
sleepObj.Spec.PriorityClassName = "allow-preemption"
}

Ω(podErr).NotTo(gomega.HaveOccurred())
sleepRespPod, podErr := kClient.CreatePod(sleepObj, dev)
gomega.Ω(podErr).NotTo(gomega.HaveOccurred())

// Wait for pod to move to running state
podErr = kClient.WaitForPodBySelectorRunning(dev,
fmt.Sprintf("app=%s", sleepRespPod.ObjectMeta.Labels["app"]),
60)
gomega.Ω(podErr).NotTo(gomega.HaveOccurred())
}

// Deploy sleepjob5 pod in root.sandbox3
ginkgo.By("Deploy the sleep pod " + sleepPod5Config.Name + " to the development namespace")
sleepObj, podErr := k8s.InitSleepPod(sleepPod5Config)
Ω(podErr).NotTo(gomega.HaveOccurred())
sleepRespPod5, err := kClient.CreatePod(sleepObj, dev)
gomega.Ω(err).NotTo(gomega.HaveOccurred())

// sleepjob5 pod can be scheduled before pods in root.sandbox1 are succeeded
ginkgo.By("The sleep pod " + sleepPod5Config.Name + " can be scheduled")
err = kClient.WaitForPodScheduled(ns.Name, sleepRespPod5.Name, 30*time.Second)
gomega.Ω(err).NotTo(gomega.HaveOccurred())

// assert one of the pods in root.sandbox1 is preempted
ginkgo.By("One of the pods in root.sanbox1 is preempted")
sandbox1RunningPodsCnt := 0
pods, err := kClient.ListPodsByLabelSelector(dev, "queue=root.sandbox1")
gomega.Ω(err).NotTo(gomega.HaveOccurred())
for _, pod := range pods.Items {
if pod.DeletionTimestamp != nil {
continue
}
if pod.Status.Phase == v1.PodRunning {
sandbox1RunningPodsCnt++
}
}
Ω(sandbox1RunningPodsCnt).To(gomega.Equal(1), "One of the pods in root.sandbox1 should be preempted")

ginkgo.By(fmt.Sprintf("Removing priority class %s", preemptAllowPriorityClass.ObjectMeta.Name))
err = kClient.DeletePriorityClass(preemptAllowPriorityClass.ObjectMeta.Name)
gomega.Ω(err).ShouldNot(HaveOccurred())
ginkgo.By(fmt.Sprintf("Removing priority class %s", preemptNotAllowPriorityClass.ObjectMeta.Name))
err = kClient.DeletePriorityClass(preemptNotAllowPriorityClass.ObjectMeta.Name)
gomega.Ω(err).ShouldNot(HaveOccurred())
})

ginkgo.AfterEach(func() {

// Delete all sleep pods
Expand Down

0 comments on commit a2b772e

Please sign in to comment.