diff --git a/pkg/operator/onepodpernodeccontroller/onepodpernode_controller.go b/pkg/operator/onepodpernodeccontroller/onepodpernode_controller.go new file mode 100644 index 0000000000..6ee1a7d746 --- /dev/null +++ b/pkg/operator/onepodpernodeccontroller/onepodpernode_controller.go @@ -0,0 +1,226 @@ +package onepodpernodeccontroller + +import ( + "context" + "strings" + "time" + + opv1 "github.com/openshift/api/operator/v1" + "github.com/openshift/library-go/pkg/controller/factory" + "github.com/openshift/library-go/pkg/operator/events" + "github.com/openshift/library-go/pkg/operator/management" + "github.com/openshift/library-go/pkg/operator/v1helpers" + corev1 "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" + policyv1 "k8s.io/api/policy/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/util/clock" + "k8s.io/apimachinery/pkg/util/sets" + coreinformersv1 "k8s.io/client-go/informers/core/v1" + "k8s.io/client-go/kubernetes" + corev1lister "k8s.io/client-go/listers/core/v1" + "k8s.io/klog/v2" +) + +// OnePodPerNodeController is a generic controller that ensures that only one pod is scheduled per node. +// +// This is useful in cases where topology spread is desired. We have encountered cases where the scheduler is +// misscheduling a pod. The scheduler still does need to be fixed, but this keeps the platform from failing. +type OnePodPerNodeController struct { + name string + operatorClient v1helpers.OperatorClientWithFinalizers + clock clock.Clock + + namespace string + kubeClient kubernetes.Interface + podLister corev1lister.PodLister + podSelector labels.Selector + minReadySeconds int32 // this comes from your deployment, daemonset, etc + recorder events.Recorder +} + +func NewOnePodPerNodeController( + name string, + namespace string, + podSelector *metav1.LabelSelector, + minReadySeconds int32, // this comes from your deployment, daemonset, etc + recorder events.Recorder, + operatorClient v1helpers.OperatorClientWithFinalizers, + kubeClient kubernetes.Interface, + podInformer coreinformersv1.PodInformer, +) factory.Controller { + selector, err := metav1.LabelSelectorAsSelector(podSelector) + if err != nil { + panic(err) + } + + c := &OnePodPerNodeController{ + name: name, + operatorClient: operatorClient, + clock: clock.RealClock{}, + + namespace: namespace, + podSelector: selector, + minReadySeconds: minReadySeconds, + kubeClient: kubeClient, + podLister: podInformer.Lister(), + } + + return factory.New().WithInformers( + podInformer.Informer(), + ).WithSync( + c.sync, + ).ResyncEvery( + time.Minute, + ).WithSyncDegradedOnError( + operatorClient, + ).ToController( + c.name, + recorder.WithComponentSuffix(strings.ToLower(name)+"-one-pod-per-node-"), + ) +} + +func (c *OnePodPerNodeController) Name() string { + return c.name +} + +func (c *OnePodPerNodeController) sync(ctx context.Context, syncContext factory.SyncContext) error { + klog.V(4).Infof("sync") + opSpec, _, _, err := c.operatorClient.GetOperatorState() + if apierrors.IsNotFound(err) && management.IsOperatorRemovable() { + return nil + } + if err != nil { + return err + } + + if opSpec.ManagementState != opv1.Managed { + return nil + } + + return c.syncManaged(ctx, syncContext) +} + +func (c *OnePodPerNodeController) syncManaged(ctx context.Context, syncContext factory.SyncContext) error { + klog.V(4).Infof("syncManaged") + + matchingPods, err := c.podLister.Pods(c.namespace).List(c.podSelector) + if err != nil { + return err + } + + nodesToPods := map[string][]*corev1.Pod{} + for i := range matchingPods { + pod := matchingPods[i] + + // don't consider deleted pods, they are shutting down and need grace to come down. + if pod.DeletionTimestamp != nil { + continue + } + // don't consider unscheduled pods + if len(pod.Spec.NodeName) == 0 { + continue + } + // don't consider unavailable pods, they cannot reliably handle requests + if !isPodAvailable(pod, c.minReadySeconds, metav1.Time{Time: c.clock.Now()}) { + continue + } + nodesToPods[pod.Spec.NodeName] = append(nodesToPods[pod.Spec.NodeName], pod) + } + + for _, pods := range nodesToPods { + if len(pods) <= 1 { + continue + } + + // we choose to delete the oldest, because if a deployment, daemonset, or other controller is rolling out a newer + // level, the newer pod will be the desired pod and the older pod is the not-desired pod. + oldestPod := pods[0] + for i := 1; i < len(pods); i++ { + currPod := pods[i] + if currPod.CreationTimestamp.Before(&oldestPod.CreationTimestamp) { + oldestPod = currPod + } + } + + displayPodString := sets.String{} + for _, pod := range pods { + displayPodString.Insert("pod/" + pod.Name) + } + + // we use eviction, not deletion. Eviction honors PDBs. + c.recorder.Warningf("MalscheduledPod", + "%v should be one per node, but all were placed on node/%v; evicting pod/%v", + strings.Join(displayPodString.List(), " "), + oldestPod.Spec.NodeName, + oldestPod.Name, + ) + err := c.kubeClient.CoreV1().Pods(oldestPod.Namespace).EvictV1(ctx, + &policyv1.Eviction{ + ObjectMeta: metav1.ObjectMeta{Namespace: oldestPod.Namespace, Name: oldestPod.Name}, + DeleteOptions: nil, + }, + ) + if err != nil { + return err + } + } + + return nil +} + +// these are lifted from k/k: https://github.com/kubernetes/kubernetes/blob/release-1.23/pkg/api/v1/pod/util.go#L286 + +// IsPodAvailable returns true if a pod is available; false otherwise. +// Precondition for an available pod is that it must be ready. On top +// of that, there are two cases when a pod can be considered available: +// 1. minReadySeconds == 0, or +// 2. LastTransitionTime (is set) + minReadySeconds < current time +func isPodAvailable(pod *v1.Pod, minReadySeconds int32, now metav1.Time) bool { + if !isPodReady(pod) { + return false + } + + c := getPodReadyCondition(pod.Status) + minReadySecondsDuration := time.Duration(minReadySeconds) * time.Second + if minReadySeconds == 0 || (!c.LastTransitionTime.IsZero() && c.LastTransitionTime.Add(minReadySecondsDuration).Before(now.Time)) { + return true + } + + return false +} + +func isPodReady(pod *v1.Pod) bool { + return isPodReadyConditionTrue(pod.Status) +} + +func isPodReadyConditionTrue(status v1.PodStatus) bool { + condition := getPodReadyCondition(status) + return condition != nil && condition.Status == v1.ConditionTrue +} + +func getPodReadyCondition(status v1.PodStatus) *v1.PodCondition { + _, condition := getPodCondition(&status, v1.PodReady) + return condition +} + +func getPodCondition(status *v1.PodStatus, conditionType v1.PodConditionType) (int, *v1.PodCondition) { + if status == nil { + return -1, nil + } + return getPodConditionFromList(status.Conditions, conditionType) +} + +func getPodConditionFromList(conditions []v1.PodCondition, conditionType v1.PodConditionType) (int, *v1.PodCondition) { + if conditions == nil { + return -1, nil + } + for i := range conditions { + if conditions[i].Type == conditionType { + return i, &conditions[i] + } + } + return -1, nil +} diff --git a/pkg/operator/onepodpernodeccontroller/onepodpernode_controller_test.go b/pkg/operator/onepodpernodeccontroller/onepodpernode_controller_test.go new file mode 100644 index 0000000000..dc67f97941 --- /dev/null +++ b/pkg/operator/onepodpernodeccontroller/onepodpernode_controller_test.go @@ -0,0 +1,330 @@ +package onepodpernodeccontroller + +import ( + "context" + "fmt" + "testing" + "time" + + policyv1 "k8s.io/api/policy/v1" + kubetesting "k8s.io/client-go/testing" + + "k8s.io/apimachinery/pkg/runtime" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "github.com/openshift/library-go/pkg/controller/factory" + "github.com/openshift/library-go/pkg/operator/events" + "github.com/openshift/library-go/pkg/operator/v1helpers" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/util/clock" + "k8s.io/client-go/kubernetes/fake" + corev1listers "k8s.io/client-go/listers/core/v1" + "k8s.io/client-go/tools/cache" +) + +func mustTime(in string) time.Time { + out, err := time.Parse(time.RFC3339, in) + if err != nil { + panic(err) + } + return out +} + +type podMutator func(pod *corev1.Pod) *corev1.Pod + +func createPod(pod *corev1.Pod, mutators ...podMutator) *corev1.Pod { + for _, mutator := range mutators { + pod = mutator(pod) + } + return pod +} + +func newPod(namespace, name string, creationTime time.Time) *corev1.Pod { + return &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: namespace, + Name: name, + CreationTimestamp: metav1.Time{Time: creationTime}, + }, + } +} + +func setLabel(key, value string) podMutator { + return func(pod *corev1.Pod) *corev1.Pod { + if pod.Labels == nil { + pod.Labels = map[string]string{} + } + pod.Labels[key] = value + return pod + } +} + +func setNode(nodeName string) podMutator { + return func(pod *corev1.Pod) *corev1.Pod { + pod.Spec.NodeName = nodeName + return pod + } +} + +func makeReadyAt(time time.Time) podMutator { + return func(pod *corev1.Pod) *corev1.Pod { + pod.Status.Conditions = append(pod.Status.Conditions, + corev1.PodCondition{ + Type: corev1.PodReady, + Status: corev1.ConditionTrue, + LastTransitionTime: metav1.Time{Time: time}, + }, + ) + return pod + } +} + +func setDeleted(time time.Time) podMutator { + return func(pod *corev1.Pod) *corev1.Pod { + pod.DeletionTimestamp = &metav1.Time{Time: time} + return pod + } +} + +func TestOnePodPerNodeController_syncManaged(t *testing.T) { + fakeClock := clock.NewFakeClock(mustTime("2022-03-07T12:00:00Z")) + twoHoursAgo := fakeClock.Now().Add(-2 * time.Hour) + oneHourAgo := fakeClock.Now().Add(-1 * time.Hour) + oneMinuteAgo := fakeClock.Now().Add(-1 * time.Minute) + oneSecondAgo := fakeClock.Now().Add(-1 * time.Second) + + type fields struct { + name string + operatorClient v1helpers.OperatorClientWithFinalizers + namespace string + pods []*corev1.Pod + podSelector labels.Selector + minReadySeconds int32 + recorder events.Recorder + } + type args struct { + ctx context.Context + syncContext factory.SyncContext + } + tests := []struct { + name string + fields fields + args args + wantErr bool + validateActions func(clientset *fake.Clientset) error + }{ + { + name: "no-evict-all-spread", + fields: fields{ + operatorClient: nil, + namespace: "test-ns", + pods: []*corev1.Pod{ + createPod( + newPod("test-ns", "first", twoHoursAgo), + setLabel("label-1", "match"), + setNode("node-1"), + makeReadyAt(oneHourAgo.Add(time.Minute)), + ), + createPod( + newPod("test-ns", "second", oneHourAgo), + setLabel("label-1", "match"), + setNode("node-2"), + makeReadyAt(oneHourAgo.Add(time.Minute)), + ), + }, + podSelector: labels.Set{"label-1": "match"}.AsSelector(), + minReadySeconds: 10, + recorder: events.NewInMemoryRecorder("testing"), + }, + args: args{}, + wantErr: false, + validateActions: func(clientset *fake.Clientset) error { + if len(clientset.Actions()) > 0 { + return fmt.Errorf("expected 0 actions, got: \n%v", clientset.Actions()) + } + return nil + }, + }, + { + name: "no-evict-one-deleted", + fields: fields{ + operatorClient: nil, + namespace: "test-ns", + pods: []*corev1.Pod{ + createPod( + newPod("test-ns", "first", twoHoursAgo), + setLabel("label-1", "match"), + setNode("node-1"), + makeReadyAt(oneHourAgo.Add(time.Minute)), + ), + createPod( + newPod("test-ns", "second", oneHourAgo), + setDeleted(oneHourAgo.Add(10*time.Minute)), + setLabel("label-1", "match"), + setNode("node-1"), + makeReadyAt(oneHourAgo.Add(time.Minute)), + ), + }, + podSelector: labels.Set{"label-1": "match"}.AsSelector(), + minReadySeconds: 10, + recorder: events.NewInMemoryRecorder("testing"), + }, + args: args{}, + wantErr: false, + validateActions: func(clientset *fake.Clientset) error { + if len(clientset.Actions()) > 0 { + return fmt.Errorf("expected 0 actions, got: \n%v", clientset.Actions()) + } + return nil + }, + }, + { + name: "no-evict-one-ready-but-not-available", + fields: fields{ + operatorClient: nil, + namespace: "test-ns", + pods: []*corev1.Pod{ + createPod( + newPod("test-ns", "first", twoHoursAgo), + setLabel("label-1", "match"), + setNode("node-1"), + makeReadyAt(oneHourAgo.Add(time.Minute)), + ), + createPod( + newPod("test-ns", "second", oneHourAgo), + setLabel("label-1", "match"), + setNode("node-1"), + makeReadyAt(oneSecondAgo), + ), + }, + podSelector: labels.Set{"label-1": "match"}.AsSelector(), + minReadySeconds: 10, + recorder: events.NewInMemoryRecorder("testing"), + }, + args: args{}, + wantErr: false, + validateActions: func(clientset *fake.Clientset) error { + if len(clientset.Actions()) > 0 { + return fmt.Errorf("expected 0 actions, got: \n%v", clientset.Actions()) + } + return nil + }, + }, + { + name: "evict-two-pods-same-node", + fields: fields{ + operatorClient: nil, + namespace: "test-ns", + pods: []*corev1.Pod{ + createPod( + newPod("test-ns", "first", twoHoursAgo), + setLabel("label-1", "match"), + setNode("node-1"), + makeReadyAt(oneHourAgo.Add(time.Minute)), + ), + createPod( + newPod("test-ns", "second", oneHourAgo), + setLabel("label-1", "match"), + setNode("node-1"), + makeReadyAt(oneMinuteAgo), + ), + }, + podSelector: labels.Set{"label-1": "match"}.AsSelector(), + minReadySeconds: 10, + recorder: events.NewInMemoryRecorder("testing"), + }, + args: args{}, + wantErr: false, + validateActions: func(clientset *fake.Clientset) error { + actions := clientset.Actions() + if len(actions) != 1 { + return fmt.Errorf("expected 1 actions, got: \n%v", actions) + } + if !actions[0].Matches("create", "pods/eviction") { + return fmt.Errorf("expected eviction, got %v", actions[0]) + } + if actions[0].(kubetesting.CreateAction).GetObject().(*policyv1.Eviction).Name != "first" { + return fmt.Errorf("expected eviction of first, got %v", actions[0]) + } + return nil + }, + }, { + name: "evict-oldestthree-pods-same-node", + fields: fields{ + operatorClient: nil, + namespace: "test-ns", + pods: []*corev1.Pod{ + createPod( + newPod("test-ns", "first", twoHoursAgo), + setLabel("label-1", "match"), + setNode("node-1"), + makeReadyAt(oneHourAgo.Add(time.Minute)), + ), + createPod( + newPod("test-ns", "second", oneHourAgo), + setLabel("label-1", "match"), + setNode("node-1"), + makeReadyAt(oneMinuteAgo), + ), + createPod( + newPod("test-ns", "third", oneHourAgo.Add(30*time.Minute)), + setLabel("label-1", "match"), + setNode("node-1"), + makeReadyAt(oneHourAgo.Add(time.Minute)), + ), + }, + podSelector: labels.Set{"label-1": "match"}.AsSelector(), + minReadySeconds: 10, + recorder: events.NewInMemoryRecorder("testing"), + }, + args: args{}, + wantErr: false, + validateActions: func(clientset *fake.Clientset) error { + actions := clientset.Actions() + if len(actions) != 1 { + return fmt.Errorf("expected 1 actions, got: \n%v", actions) + } + if !actions[0].Matches("create", "pods/eviction") { + return fmt.Errorf("expected eviction, got %v", actions[0]) + } + if actions[0].(kubetesting.CreateAction).GetObject().(*policyv1.Eviction).Name != "first" { + return fmt.Errorf("expected eviction of first, got %v", actions[0]) + } + return nil + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + existingPods := []runtime.Object{} + podIndex := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) + for i := range tt.fields.pods { + podIndex.Add(tt.fields.pods[i]) + existingPods = append(existingPods, tt.fields.pods[i]) + } + podLister := corev1listers.NewPodLister(podIndex) + + fakeClient := fake.NewSimpleClientset(existingPods...) + + c := &OnePodPerNodeController{ + name: tt.fields.name, + operatorClient: tt.fields.operatorClient, + minReadySeconds: tt.fields.minReadySeconds, + clock: fakeClock, + namespace: tt.fields.namespace, + kubeClient: fakeClient, + podLister: podLister, + podSelector: tt.fields.podSelector, + recorder: tt.fields.recorder, + } + if err := c.syncManaged(tt.args.ctx, tt.args.syncContext); (err != nil) != tt.wantErr { + t.Errorf("syncManaged() error = %v, wantErr %v", err, tt.wantErr) + } + if err := tt.validateActions(fakeClient); err != nil { + t.Error(err) + } + }) + } +}