Skip to content

Commit 1f53707

Browse files
Merge pull request kubernetes#1400 from JoelSpeed/upstream-112807
OCPBUGS-2774: UPSTREAM: 112807 Fix Load balancer services with xTP local
2 parents 4bd0702 + be31e8d commit 1f53707

File tree

2 files changed

+103
-1582
lines changed

2 files changed

+103
-1582
lines changed

staging/src/k8s.io/cloud-provider/controllers/service/controller.go

Lines changed: 71 additions & 118 deletions
Original file line numberDiff line numberDiff line change
@@ -98,9 +98,6 @@ type Controller struct {
9898
nodeSyncCh chan interface{}
9999
// needFullSync indicates if the nodeSyncInternal will do a full node sync on all LB services.
100100
needFullSync bool
101-
// lastSyncedNodes is used when reconciling node state and keeps track of the last synced set of
102-
// nodes. Access to this attribute by multiple go-routines is protected by nodeSyncLock
103-
lastSyncedNodes []*v1.Node
104101
}
105102

106103
// New returns a new service controller to keep cloud provider service resources
@@ -135,8 +132,7 @@ func New(
135132
nodeListerSynced: nodeInformer.Informer().HasSynced,
136133
queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "service"),
137134
// nodeSyncCh has a size 1 buffer. Only one pending sync signal would be cached.
138-
nodeSyncCh: make(chan interface{}, 1),
139-
lastSyncedNodes: []*v1.Node{},
135+
nodeSyncCh: make(chan interface{}, 1),
140136
}
141137

142138
serviceInformer.Informer().AddEventHandlerWithResyncPeriod(
@@ -180,7 +176,7 @@ func New(
180176
return
181177
}
182178

183-
if !shouldSyncUpdatedNode(oldNode, curNode) {
179+
if !shouldSyncNode(oldNode, curNode) {
184180
return
185181
}
186182

@@ -261,7 +257,7 @@ func (c *Controller) Run(ctx context.Context, workers int, controllerManagerMetr
261257
func (c *Controller) triggerNodeSync() {
262258
c.nodeSyncLock.Lock()
263259
defer c.nodeSyncLock.Unlock()
264-
newHosts, err := listWithPredicates(c.nodeLister, allNodePredicates...)
260+
newHosts, err := listWithPredicate(c.nodeLister, c.getNodeConditionPredicate())
265261
if err != nil {
266262
runtime.HandleError(fmt.Errorf("Failed to retrieve current set of nodes from node lister: %v", err))
267263
// if node list cannot be retrieve, trigger full node sync to be safe.
@@ -457,7 +453,7 @@ func (c *Controller) syncLoadBalancerIfNeeded(ctx context.Context, service *v1.S
457453
}
458454

459455
func (c *Controller) ensureLoadBalancer(ctx context.Context, service *v1.Service) (*v1.LoadBalancerStatus, error) {
460-
nodes, err := listWithPredicates(c.nodeLister, getNodePredicatesForService(service)...)
456+
nodes, err := listWithPredicate(c.nodeLister, c.getNodeConditionPredicate())
461457
if err != nil {
462458
return nil, err
463459
}
@@ -671,15 +667,6 @@ func portEqualForLB(x, y *v1.ServicePort) bool {
671667
return true
672668
}
673669

674-
func serviceKeys(services []*v1.Service) sets.String {
675-
ret := sets.NewString()
676-
for _, service := range services {
677-
key, _ := cache.MetaNamespaceKeyFunc(service)
678-
ret.Insert(key)
679-
}
680-
return ret
681-
}
682-
683670
func nodeNames(nodes []*v1.Node) sets.String {
684671
ret := sets.NewString()
685672
for _, node := range nodes {
@@ -695,21 +682,58 @@ func nodeSlicesEqualForLB(x, y []*v1.Node) bool {
695682
return nodeNames(x).Equal(nodeNames(y))
696683
}
697684

698-
func shouldSyncUpdatedNode(oldNode, newNode *v1.Node) bool {
699-
// Evaluate the individual node exclusion predicate before evaluating the
700-
// compounded result of all predicates. We don't sync ETP=local services
701-
// for changes on the readiness condition, hence if a node remains NotReady
702-
// and a user adds the exclusion label we will need to sync as to make sure
703-
// this change is reflected correctly on ETP=local services. The sync
704-
// function compares lastSyncedNodes with the new (existing) set of nodes
705-
// for each service, so services which are synced with the same set of nodes
706-
// should be skipped internally in the sync function. This is needed as to
707-
// trigger a global sync for all services and make sure no service gets
708-
// skipped due to a changing node predicate.
709-
if respectsPredicates(oldNode, nodeIncludedPredicate) != respectsPredicates(newNode, nodeIncludedPredicate) {
685+
func (c *Controller) getNodeConditionPredicate() NodeConditionPredicate {
686+
return func(node *v1.Node) bool {
687+
if _, hasExcludeBalancerLabel := node.Labels[v1.LabelNodeExcludeBalancers]; hasExcludeBalancerLabel {
688+
return false
689+
}
690+
691+
// Remove nodes that are about to be deleted by the cluster autoscaler.
692+
for _, taint := range node.Spec.Taints {
693+
if taint.Key == ToBeDeletedTaint {
694+
klog.V(4).Infof("Ignoring node %v with autoscaler taint %+v", node.Name, taint)
695+
return false
696+
}
697+
}
698+
699+
// If we have no info, don't accept
700+
if len(node.Status.Conditions) == 0 {
701+
return false
702+
}
703+
for _, cond := range node.Status.Conditions {
704+
// We consider the node for load balancing only when its NodeReady condition status
705+
// is ConditionTrue
706+
if cond.Type == v1.NodeReady && cond.Status != v1.ConditionTrue {
707+
klog.V(4).Infof("Ignoring node %v with %v condition status %v", node.Name, cond.Type, cond.Status)
708+
return false
709+
}
710+
}
710711
return true
711712
}
712-
return respectsPredicates(oldNode, allNodePredicates...) != respectsPredicates(newNode, allNodePredicates...)
713+
}
714+
715+
func shouldSyncNode(oldNode, newNode *v1.Node) bool {
716+
if oldNode.Spec.Unschedulable != newNode.Spec.Unschedulable {
717+
return true
718+
}
719+
720+
if !reflect.DeepEqual(oldNode.Labels, newNode.Labels) {
721+
return true
722+
}
723+
724+
return nodeReadyConditionStatus(oldNode) != nodeReadyConditionStatus(newNode)
725+
}
726+
727+
func nodeReadyConditionStatus(node *v1.Node) v1.ConditionStatus {
728+
for _, condition := range node.Status.Conditions {
729+
if condition.Type != v1.NodeReady {
730+
continue
731+
}
732+
733+
return condition.Status
734+
}
735+
736+
return ""
713737
}
714738

715739
// nodeSyncInternal handles updating the hosts pointed to by all load
@@ -751,29 +775,24 @@ func (c *Controller) nodeSyncInternal(ctx context.Context, workers int) {
751775
numServices-len(c.servicesToUpdate), numServices)
752776
}
753777

754-
// nodeSyncService syncs the nodes for one load balancer type service. The return value
755-
// indicates if we should retry. Hence, this functions returns false if we've updated
756-
// load balancers and finished doing it successfully, or didn't try to at all because
757-
// there's no need. This function returns true if we tried to update load balancers and
758-
// failed, indicating to the caller that we should try again.
759-
func (c *Controller) nodeSyncService(svc *v1.Service, oldNodes, newNodes []*v1.Node) bool {
760-
retSuccess := false
761-
retNeedRetry := true
778+
// nodeSyncService syncs the nodes for one load balancer type service
779+
func (c *Controller) nodeSyncService(svc *v1.Service) bool {
762780
if svc == nil || !wantsLoadBalancer(svc) {
763-
return retSuccess
764-
}
765-
newNodes = filterWithPredicates(newNodes, getNodePredicatesForService(svc)...)
766-
oldNodes = filterWithPredicates(oldNodes, getNodePredicatesForService(svc)...)
767-
if nodeNames(newNodes).Equal(nodeNames(oldNodes)) {
768-
return retSuccess
781+
return false
769782
}
770783
klog.V(4).Infof("nodeSyncService started for service %s/%s", svc.Namespace, svc.Name)
771-
if err := c.lockedUpdateLoadBalancerHosts(svc, newNodes); err != nil {
784+
hosts, err := listWithPredicate(c.nodeLister, c.getNodeConditionPredicate())
785+
if err != nil {
786+
runtime.HandleError(fmt.Errorf("failed to retrieve node list: %v", err))
787+
return true
788+
}
789+
790+
if err := c.lockedUpdateLoadBalancerHosts(svc, hosts); err != nil {
772791
runtime.HandleError(fmt.Errorf("failed to update load balancer hosts for service %s/%s: %v", svc.Namespace, svc.Name, err))
773-
return retNeedRetry
792+
return true
774793
}
775794
klog.V(4).Infof("nodeSyncService finished successfully for service %s/%s", svc.Namespace, svc.Name)
776-
return retSuccess
795+
return false
777796
}
778797

779798
// updateLoadBalancerHosts updates all existing load balancers so that
@@ -782,20 +801,11 @@ func (c *Controller) nodeSyncService(svc *v1.Service, oldNodes, newNodes []*v1.N
782801
func (c *Controller) updateLoadBalancerHosts(ctx context.Context, services []*v1.Service, workers int) (servicesToRetry sets.String) {
783802
klog.V(4).Infof("Running updateLoadBalancerHosts(len(services)==%d, workers==%d)", len(services), workers)
784803

785-
// Include all nodes and let nodeSyncService filter and figure out if
786-
// the update is relevant for the service in question.
787-
nodes, err := listWithPredicates(c.nodeLister)
788-
if err != nil {
789-
runtime.HandleError(fmt.Errorf("failed to retrieve node list: %v", err))
790-
return serviceKeys(services)
791-
}
792-
793804
// lock for servicesToRetry
794805
servicesToRetry = sets.NewString()
795806
lock := sync.Mutex{}
796-
797807
doWork := func(piece int) {
798-
if shouldRetry := c.nodeSyncService(services[piece], c.lastSyncedNodes, nodes); !shouldRetry {
808+
if shouldRetry := c.nodeSyncService(services[piece]); !shouldRetry {
799809
return
800810
}
801811
lock.Lock()
@@ -805,7 +815,6 @@ func (c *Controller) updateLoadBalancerHosts(ctx context.Context, services []*v1
805815
}
806816

807817
workqueue.ParallelizeUntil(ctx, workers, len(services), doWork)
808-
c.lastSyncedNodes = nodes
809818
klog.V(4).Infof("Finished updateLoadBalancerHosts")
810819
return servicesToRetry
811820
}
@@ -981,75 +990,19 @@ func (c *Controller) patchStatus(service *v1.Service, previousStatus, newStatus
981990
// some set of criteria defined by the function.
982991
type NodeConditionPredicate func(node *v1.Node) bool
983992

984-
var (
985-
allNodePredicates []NodeConditionPredicate = []NodeConditionPredicate{
986-
nodeIncludedPredicate,
987-
nodeUnTaintedPredicate,
988-
nodeReadyPredicate,
989-
}
990-
etpLocalNodePredicates []NodeConditionPredicate = []NodeConditionPredicate{
991-
nodeIncludedPredicate,
992-
nodeUnTaintedPredicate,
993-
}
994-
)
995-
996-
func getNodePredicatesForService(service *v1.Service) []NodeConditionPredicate {
997-
if service.Spec.ExternalTrafficPolicy == v1.ServiceExternalTrafficPolicyTypeLocal {
998-
return etpLocalNodePredicates
999-
}
1000-
return allNodePredicates
1001-
}
1002-
1003-
// We consider the node for load balancing only when the node is not labelled for exclusion.
1004-
func nodeIncludedPredicate(node *v1.Node) bool {
1005-
_, hasExcludeBalancerLabel := node.Labels[v1.LabelNodeExcludeBalancers]
1006-
return !hasExcludeBalancerLabel
1007-
}
1008-
1009-
// We consider the node for load balancing only when its not tainted for deletion by the cluster autoscaler.
1010-
func nodeUnTaintedPredicate(node *v1.Node) bool {
1011-
for _, taint := range node.Spec.Taints {
1012-
if taint.Key == ToBeDeletedTaint {
1013-
return false
1014-
}
1015-
}
1016-
return true
1017-
}
1018-
1019-
// We consider the node for load balancing only when its NodeReady condition status is ConditionTrue
1020-
func nodeReadyPredicate(node *v1.Node) bool {
1021-
for _, cond := range node.Status.Conditions {
1022-
if cond.Type == v1.NodeReady {
1023-
return cond.Status == v1.ConditionTrue
1024-
}
1025-
}
1026-
return false
1027-
}
1028-
1029-
// listWithPredicate gets nodes that matches all predicate functions.
1030-
func listWithPredicates(nodeLister corelisters.NodeLister, predicates ...NodeConditionPredicate) ([]*v1.Node, error) {
993+
// listWithPredicate gets nodes that matches predicate function.
994+
func listWithPredicate(nodeLister corelisters.NodeLister, predicate NodeConditionPredicate) ([]*v1.Node, error) {
1031995
nodes, err := nodeLister.List(labels.Everything())
1032996
if err != nil {
1033997
return nil, err
1034998
}
1035-
return filterWithPredicates(nodes, predicates...), nil
1036-
}
1037999

1038-
func filterWithPredicates(nodes []*v1.Node, predicates ...NodeConditionPredicate) []*v1.Node {
10391000
var filtered []*v1.Node
10401001
for i := range nodes {
1041-
if respectsPredicates(nodes[i], predicates...) {
1002+
if predicate(nodes[i]) {
10421003
filtered = append(filtered, nodes[i])
10431004
}
10441005
}
1045-
return filtered
1046-
}
10471006

1048-
func respectsPredicates(node *v1.Node, predicates ...NodeConditionPredicate) bool {
1049-
for _, p := range predicates {
1050-
if !p(node) {
1051-
return false
1052-
}
1053-
}
1054-
return true
1007+
return filtered, nil
10551008
}

0 commit comments

Comments
 (0)