Skip to content

Commit a2bf45b

Browse files
authored
Merge pull request kubernetes#133213 from sanposhiho/second-trial-conor
fix: handle corner cases in the async preemption
2 parents 2b2ea27 + f3466f8 commit a2bf45b

File tree

3 files changed

+127
-39
lines changed

3 files changed

+127
-39
lines changed

pkg/scheduler/framework/preemption/preemption.go

Lines changed: 63 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -189,12 +189,12 @@ func NewEvaluator(pluginName string, fh framework.Handle, i Interface, enableAsy
189189
}
190190
}
191191
if err := util.DeletePod(ctx, ev.Handler.ClientSet(), victim); err != nil {
192-
if !apierrors.IsNotFound(err) {
192+
if apierrors.IsNotFound(err) {
193+
logger.V(2).Info("Victim Pod is already deleted", "preemptor", klog.KObj(preemptor), "victim", klog.KObj(victim), "node", c.Name())
194+
} else {
193195
logger.Error(err, "Tried to preempted pod", "pod", klog.KObj(victim), "preemptor", klog.KObj(preemptor))
194-
return err
195196
}
196-
logger.V(2).Info("Victim Pod is already deleted", "preemptor", klog.KObj(preemptor), "victim", klog.KObj(victim), "node", c.Name())
197-
return nil
197+
return err
198198
}
199199
logger.V(2).Info("Preemptor Pod preempted victim Pod", "preemptor", klog.KObj(preemptor), "victim", klog.KObj(victim), "node", c.Name())
200200
}
@@ -436,7 +436,14 @@ func (ev *Evaluator) prepareCandidate(ctx context.Context, c Candidate, pod *v1.
436436
logger := klog.FromContext(ctx)
437437
errCh := parallelize.NewErrorChannel()
438438
fh.Parallelizer().Until(ctx, len(c.Victims().Pods), func(index int) {
439-
if err := ev.PreemptPod(ctx, c, pod, c.Victims().Pods[index], pluginName); err != nil {
439+
victimPod := c.Victims().Pods[index]
440+
if victimPod.DeletionTimestamp != nil {
441+
// If the victim Pod is already being deleted, we don't have to make another deletion api call.
442+
logger.V(2).Info("Victim Pod is already deleted, skipping the API call for it", "preemptor", klog.KObj(pod), "node", c.Name(), "victim", klog.KObj(victimPod))
443+
return
444+
}
445+
446+
if err := ev.PreemptPod(ctx, c, pod, victimPod, pluginName); err != nil && !apierrors.IsNotFound(err) {
440447
errCh.SendErrorWithCancel(err, cancel)
441448
}
442449
}, ev.PluginName)
@@ -497,33 +504,59 @@ func (ev *Evaluator) prepareCandidateAsync(c Candidate, pod *v1.Pod, pluginName
497504
// Intentionally create a new context, not using a ctx from the scheduling cycle, to create ctx,
498505
// because this process could continue even after this scheduling cycle finishes.
499506
ctx, cancel := context.WithCancel(context.Background())
507+
logger := klog.FromContext(ctx)
508+
victimPods := make([]*v1.Pod, 0, len(c.Victims().Pods))
509+
for _, victim := range c.Victims().Pods {
510+
if victim.DeletionTimestamp != nil {
511+
// If the victim Pod is already being deleted, we don't have to make another deletion api call.
512+
logger.V(2).Info("Victim Pod is already deleted, skipping the API call for it", "preemptor", klog.KObj(pod), "node", c.Name(), "victim", klog.KObj(victim))
513+
continue
514+
}
515+
victimPods = append(victimPods, victim)
516+
}
517+
if len(victimPods) == 0 {
518+
cancel()
519+
return
520+
}
521+
500522
errCh := parallelize.NewErrorChannel()
523+
// Whether all victim pods are already deleted before making API call.
524+
var allPodsAlreadyDeleted atomic.Bool
525+
allPodsAlreadyDeleted.Store(true)
501526
preemptPod := func(index int) {
502-
victim := c.Victims().Pods[index]
503-
if err := ev.PreemptPod(ctx, c, pod, victim, pluginName); err != nil {
527+
victim := victimPods[index]
528+
err := ev.PreemptPod(ctx, c, pod, victim, pluginName)
529+
switch {
530+
case err != nil && !apierrors.IsNotFound(err):
531+
// We don't have to handle NotFound error here, because it means the victim Pod is already deleted, and the preemption didn't have to remove it.
504532
errCh.SendErrorWithCancel(err, cancel)
533+
case err == nil:
534+
allPodsAlreadyDeleted.Store(false)
505535
}
506536
}
507537

508538
ev.mu.Lock()
509539
ev.preempting.Insert(pod.UID)
510540
ev.mu.Unlock()
511541

512-
logger := klog.FromContext(ctx)
513542
go func() {
514543
startTime := time.Now()
515544
result := metrics.GoroutineResultSuccess
545+
516546
defer metrics.PreemptionGoroutinesDuration.WithLabelValues(result).Observe(metrics.SinceInSeconds(startTime))
517547
defer metrics.PreemptionGoroutinesExecutionTotal.WithLabelValues(result).Inc()
518548
defer func() {
519-
if result == metrics.GoroutineResultError {
520-
// When API call isn't successful, the Pod may get stuck in the unschedulable pod pool in the worst case.
521-
// So, we should move the Pod to the activeQ.
549+
// When API call isn't successful, the Pod may get stuck in the unschedulable pod pool in the worst case.
550+
// So, we should move the Pod to the activeQ.
551+
if result == metrics.GoroutineResultError ||
552+
// When all pods are already deleted (which is very rare, but could happen in theory),
553+
// it's safe to activate the preemptor Pod because it might miss Pod/delete event that requeues the pod.
554+
allPodsAlreadyDeleted.Load() {
522555
ev.Handler.Activate(logger, map[string]*v1.Pod{pod.Name: pod})
523556
}
524557
}()
525558
defer cancel()
526-
logger.V(2).Info("Start the preemption asynchronously", "preemptor", klog.KObj(pod), "node", c.Name(), "numVictims", len(c.Victims().Pods))
559+
logger.V(2).Info("Start the preemption asynchronously", "preemptor", klog.KObj(pod), "node", c.Name(), "numVictims", len(c.Victims().Pods), "numVictimsToDelete", len(victimPods))
527560

528561
// Lower priority pods nominated to run on this node, may no longer fit on
529562
// this node. So, we should remove their nomination. Removing their
@@ -536,33 +569,32 @@ func (ev *Evaluator) prepareCandidateAsync(c Candidate, pod *v1.Pod, pluginName
536569
// We do not return as this error is not critical.
537570
}
538571

539-
if len(c.Victims().Pods) == 0 {
540-
ev.mu.Lock()
541-
delete(ev.preempting, pod.UID)
542-
ev.mu.Unlock()
543-
544-
return
545-
}
546-
547-
// We can evict all victims in parallel, but the last one.
548-
// We have to remove the pod from the preempting map before the last one is evicted
549-
// because, otherwise, the pod removal might be notified to the scheduling queue before
550-
// we remove this pod from the preempting map,
551-
// and the pod could end up stucking at the unschedulable pod pool
552-
// by all the pod removal events being ignored.
553-
ev.Handler.Parallelizer().Until(ctx, len(c.Victims().Pods)-1, preemptPod, ev.PluginName)
554-
if err := errCh.ReceiveError(); err != nil {
555-
utilruntime.HandleErrorWithContext(ctx, err, "Error occurred during async preemption")
556-
result = metrics.GoroutineResultError
572+
if len(victimPods) > 1 {
573+
// We can evict all victims in parallel, but the last one.
574+
// We have to remove the pod from the preempting map before the last one is evicted
575+
// because, otherwise, the pod removal might be notified to the scheduling queue before
576+
// we remove this pod from the preempting map,
577+
// and the pod could end up stucking at the unschedulable pod pool
578+
// by all the pod removal events being ignored.
579+
ev.Handler.Parallelizer().Until(ctx, len(victimPods)-1, preemptPod, ev.PluginName)
580+
if err := errCh.ReceiveError(); err != nil {
581+
utilruntime.HandleErrorWithContext(ctx, err, "Error occurred during async preemption")
582+
result = metrics.GoroutineResultError
583+
}
557584
}
558585

559586
ev.mu.Lock()
560587
delete(ev.preempting, pod.UID)
561588
ev.mu.Unlock()
562589

563-
if err := ev.PreemptPod(ctx, c, pod, c.Victims().Pods[len(c.Victims().Pods)-1], pluginName); err != nil {
590+
err := ev.PreemptPod(ctx, c, pod, victimPods[len(victimPods)-1], pluginName)
591+
switch {
592+
case err != nil && !apierrors.IsNotFound(err):
593+
// We don't have to handle NotFound error here, because it means the victim Pod is already deleted, and the preemption didn't have to remove it.
564594
utilruntime.HandleErrorWithContext(ctx, err, "Error occurred during async preemption")
565595
result = metrics.GoroutineResultError
596+
case err == nil:
597+
allPodsAlreadyDeleted.Store(false)
566598
}
567599

568600
logger.V(2).Info("Async Preemption finished completely", "preemptor", klog.KObj(pod), "node", c.Name(), "result", result)

pkg/scheduler/framework/preemption/preemption_test.go

Lines changed: 61 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import (
3030

3131
v1 "k8s.io/api/core/v1"
3232
policy "k8s.io/api/policy/v1"
33+
apierrors "k8s.io/apimachinery/pkg/api/errors"
3334
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3435
"k8s.io/apimachinery/pkg/runtime"
3536
"k8s.io/apimachinery/pkg/types"
@@ -44,12 +45,12 @@ import (
4445
"k8s.io/klog/v2/ktesting"
4546
extenderv1 "k8s.io/kube-scheduler/extender/v1"
4647
fwk "k8s.io/kube-scheduler/framework"
47-
"k8s.io/kubernetes/pkg/scheduler/backend/api_cache"
48-
"k8s.io/kubernetes/pkg/scheduler/backend/api_dispatcher"
48+
apicache "k8s.io/kubernetes/pkg/scheduler/backend/api_cache"
49+
apidispatcher "k8s.io/kubernetes/pkg/scheduler/backend/api_dispatcher"
4950
internalcache "k8s.io/kubernetes/pkg/scheduler/backend/cache"
5051
internalqueue "k8s.io/kubernetes/pkg/scheduler/backend/queue"
5152
"k8s.io/kubernetes/pkg/scheduler/framework"
52-
"k8s.io/kubernetes/pkg/scheduler/framework/api_calls"
53+
apicalls "k8s.io/kubernetes/pkg/scheduler/framework/api_calls"
5354
"k8s.io/kubernetes/pkg/scheduler/framework/parallelize"
5455
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/defaultbinder"
5556
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/queuesort"
@@ -420,6 +421,10 @@ func TestPrepareCandidate(t *testing.T) {
420421
Containers([]v1.Container{st.MakeContainer().Name("container1").Obj()}).
421422
Obj()
422423

424+
notFoundVictim1 = st.MakePod().Name("not-found-victim").UID("victim1").
425+
Node(node1Name).SchedulerName(defaultSchedulerName).Priority(midPriority).
426+
Containers([]v1.Container{st.MakeContainer().Name("container1").Obj()}).
427+
Obj()
423428
failVictim = st.MakePod().Name("fail-victim").UID("victim1").
424429
Node(node1Name).SchedulerName(defaultSchedulerName).Priority(midPriority).
425430
Containers([]v1.Container{st.MakeContainer().Name("container1").Obj()}).
@@ -451,6 +456,12 @@ func TestPrepareCandidate(t *testing.T) {
451456
errPatchStatusFailed = errors.New("patch pod status failed")
452457
)
453458

459+
victimWithDeletionTimestamp := victim1.DeepCopy()
460+
victimWithDeletionTimestamp.Name = "victim1-with-deletion-timestamp"
461+
victimWithDeletionTimestamp.UID = "victim1-with-deletion-timestamp"
462+
victimWithDeletionTimestamp.DeletionTimestamp = &metav1.Time{Time: time.Now().Add(-100 * time.Second)}
463+
victimWithDeletionTimestamp.Finalizers = []string{"test"}
464+
454465
tests := []struct {
455466
name string
456467
nodeNames []string
@@ -479,9 +490,8 @@ func TestPrepareCandidate(t *testing.T) {
479490
testPods: []*v1.Pod{
480491
victim1,
481492
},
482-
nodeNames: []string{node1Name},
483-
expectedStatus: nil,
484-
expectedPreemptingMap: sets.New(types.UID("preemptor")),
493+
nodeNames: []string{node1Name},
494+
expectedStatus: nil,
485495
},
486496
{
487497
name: "one victim without condition",
@@ -503,6 +513,42 @@ func TestPrepareCandidate(t *testing.T) {
503513
expectedStatus: nil,
504514
expectedPreemptingMap: sets.New(types.UID("preemptor")),
505515
},
516+
{
517+
name: "one victim, but victim is already being deleted",
518+
519+
candidate: &fakeCandidate{
520+
name: node1Name,
521+
victims: &extenderv1.Victims{
522+
Pods: []*v1.Pod{
523+
victimWithDeletionTimestamp,
524+
},
525+
},
526+
},
527+
preemptor: preemptor,
528+
testPods: []*v1.Pod{
529+
victimWithDeletionTimestamp,
530+
},
531+
nodeNames: []string{node1Name},
532+
expectedStatus: nil,
533+
},
534+
{
535+
name: "one victim, but victim is already deleted",
536+
537+
candidate: &fakeCandidate{
538+
name: node1Name,
539+
victims: &extenderv1.Victims{
540+
Pods: []*v1.Pod{
541+
notFoundVictim1,
542+
},
543+
},
544+
},
545+
preemptor: preemptor,
546+
testPods: []*v1.Pod{},
547+
nodeNames: []string{node1Name},
548+
expectedStatus: nil,
549+
expectedActivatedPods: map[string]*v1.Pod{preemptor.Name: preemptor},
550+
expectedPreemptingMap: sets.New(types.UID("preemptor")),
551+
},
506552
{
507553
name: "one victim with same condition",
508554

@@ -663,6 +709,11 @@ func TestPrepareCandidate(t *testing.T) {
663709
deletionFailure = true
664710
return true, nil, errDeletePodFailed
665711
}
712+
// fake clientset does not return an error for not-found pods, so we simulate it here.
713+
if name == "not-found-victim" {
714+
// Simulate a not-found error.
715+
return true, nil, apierrors.NewNotFound(v1.Resource("pods"), name)
716+
}
666717

667718
deletedPods.Insert(name)
668719
return true, nil, nil
@@ -675,6 +726,10 @@ func TestPrepareCandidate(t *testing.T) {
675726
patchFailure = true
676727
return true, nil, errPatchStatusFailed
677728
}
729+
// fake clientset does not return an error for not-found pods, so we simulate it here.
730+
if action.(clienttesting.PatchAction).GetName() == "not-found-victim" {
731+
return true, nil, apierrors.NewNotFound(v1.Resource("pods"), "not-found-victim")
732+
}
678733
return true, nil, nil
679734
})
680735

test/integration/scheduler/preemption/preemption_test.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1573,17 +1573,18 @@ func TestNominatedNodeCleanUp(t *testing.T) {
15731573
st.MakePod().Name("low-4").Priority(lowPriority).Req(map[v1.ResourceName]string{v1.ResourceCPU: "1"}).Obj(),
15741574
},
15751575
{
1576-
st.MakePod().Name("medium").Priority(mediumPriority).Req(map[v1.ResourceName]string{v1.ResourceCPU: "4"}).Obj(),
1576+
st.MakePod().Name("medium").Priority(mediumPriority).Req(map[v1.ResourceName]string{v1.ResourceCPU: "3"}).Obj(),
15771577
},
15781578
{
1579-
st.MakePod().Name("high").Priority(highPriority).Req(map[v1.ResourceName]string{v1.ResourceCPU: "3"}).Obj(),
1579+
st.MakePod().Name("high").Priority(highPriority).Req(map[v1.ResourceName]string{v1.ResourceCPU: "4"}).Obj(),
15801580
},
15811581
},
15821582
postChecks: []func(ctx context.Context, cs clientset.Interface, pod *v1.Pod) error{
15831583
testutils.WaitForPodToSchedule,
15841584
testutils.WaitForNominatedNodeName,
15851585
testutils.WaitForNominatedNodeName,
15861586
},
1587+
podNamesToDelete: []string{"low-1", "low-2", "low-3", "low-4"},
15871588
},
15881589
{
15891590
name: "mid-priority pod preempts low-priority pod, followed by a high-priority pod without additional preemption",

0 commit comments

Comments
 (0)