From 8af44e75df98a0bccc3aba61dc177b2cb1e2fa1b Mon Sep 17 00:00:00 2001 From: Yu-Ju Hong Date: Fri, 16 Sep 2016 15:16:08 -0700 Subject: [PATCH 01/10] Limit the number of names per image reported in the node status --- pkg/kubelet/kubelet.go | 3 --- pkg/kubelet/kubelet_node_status.go | 16 +++++++++++++++- pkg/kubelet/kubelet_node_status_test.go | 10 ++++++++-- pkg/kubelet/kubelet_test.go | 2 -- 4 files changed, 23 insertions(+), 8 deletions(-) diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index e8c3751fe8500..b91b7e21787b6 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -158,9 +158,6 @@ const ( // Period for performing image garbage collection. ImageGCPeriod = 5 * time.Minute - // maxImagesInStatus is the number of max images we store in image status. - maxImagesInNodeStatus = 50 - // Minimum number of dead containers to keep in a pod minDeadContainerInPod = 1 ) diff --git a/pkg/kubelet/kubelet_node_status.go b/pkg/kubelet/kubelet_node_status.go index 2c9a850257eb6..b155de781d780 100644 --- a/pkg/kubelet/kubelet_node_status.go +++ b/pkg/kubelet/kubelet_node_status.go @@ -39,6 +39,15 @@ import ( "k8s.io/kubernetes/pkg/volume/util/volumehelper" ) +const ( + // maxImagesInNodeStatus is the number of max images we store in image status. + maxImagesInNodeStatus = 50 + + // maxNamesPerImageInNodeStatus is max number of names per image stored in + // the node status. + maxNamesPerImageInNodeStatus = 5 +) + // registerWithApiServer registers the node with the cluster master. It is safe // to call multiple times, but not concurrently (kl.registrationCompleted is // not locked). @@ -501,8 +510,13 @@ func (kl *Kubelet) setNodeStatusImages(node *api.Node) { } for _, image := range containerImages { + names := append(image.RepoDigests, image.RepoTags...) + // Report up to maxNamesPerImageInNodeStatus names per image. + if len(names) > maxNamesPerImageInNodeStatus { + names = names[0:maxNamesPerImageInNodeStatus] + } imagesOnNode = append(imagesOnNode, api.ContainerImage{ - Names: append(image.RepoTags, image.RepoDigests...), + Names: names, SizeBytes: image.Size, }) } diff --git a/pkg/kubelet/kubelet_node_status_test.go b/pkg/kubelet/kubelet_node_status_test.go index 3287cb6857a47..027e8644e6a2e 100644 --- a/pkg/kubelet/kubelet_node_status_test.go +++ b/pkg/kubelet/kubelet_node_status_test.go @@ -44,6 +44,10 @@ import ( "k8s.io/kubernetes/pkg/volume/util/volumehelper" ) +const ( + maxImageTagsForTest = 20 +) + // generateTestingImageList generate randomly generated image list and corresponding expectedImageList. func generateTestingImageList(count int) ([]kubecontainer.Image, []api.ContainerImage) { // imageList is randomly generated image list @@ -64,7 +68,7 @@ func generateTestingImageList(count int) ([]kubecontainer.Image, []api.Container var expectedImageList []api.ContainerImage for _, kubeImage := range imageList { apiImage := api.ContainerImage{ - Names: kubeImage.RepoTags, + Names: kubeImage.RepoTags[0:maxNamesPerImageInNodeStatus], SizeBytes: kubeImage.Size, } @@ -76,7 +80,9 @@ func generateTestingImageList(count int) ([]kubecontainer.Image, []api.Container func generateImageTags() []string { var tagList []string - count := rand.IntnRange(1, maxImageTagsForTest+1) + // Generate > maxNamesPerImageInNodeStatus tags so that the test can verify + // that kubelet report up to maxNamesPerImageInNodeStatus tags. + count := rand.IntnRange(maxNamesPerImageInNodeStatus+1, maxImageTagsForTest+1) for ; count > 0; count-- { tagList = append(tagList, "gcr.io/google_containers:v"+strconv.Itoa(count)) } diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index 7d57d09e29168..e006011cf0e6d 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -86,8 +86,6 @@ const ( testReservationCPU = "200m" testReservationMemory = "100M" - maxImageTagsForTest = 3 - // TODO(harry) any global place for these two? // Reasonable size range of all container images. 90%ile of images on dockerhub drops into this range. minImgSize int64 = 23 * 1024 * 1024 From 89ed9c6767959e5705459745550755d7dc9df751 Mon Sep 17 00:00:00 2001 From: Jingtian Peng Date: Wed, 21 Sep 2016 16:36:08 +0800 Subject: [PATCH 02/10] fix the appending bug --- cluster/ubuntu/reconfDocker.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cluster/ubuntu/reconfDocker.sh b/cluster/ubuntu/reconfDocker.sh index a05e3d2d1306d..7cb11e9336795 100755 --- a/cluster/ubuntu/reconfDocker.sh +++ b/cluster/ubuntu/reconfDocker.sh @@ -59,7 +59,7 @@ function restart_docker { source /run/flannel/subnet.env source /etc/default/docker - echo DOCKER_OPTS=\"${DOCKER_OPTS} -H tcp://127.0.0.1:4243 -H unix:///var/run/docker.sock \ + echo DOCKER_OPTS=\" -H tcp://127.0.0.1:4243 -H unix:///var/run/docker.sock \ --bip=${FLANNEL_SUBNET} --mtu=${FLANNEL_MTU}\" > /etc/default/docker sudo service docker restart } From 8899541fe001caaa123927e882b898880d7927f1 Mon Sep 17 00:00:00 2001 From: shashidharatd Date: Wed, 21 Sep 2016 19:11:00 +0530 Subject: [PATCH 03/10] Fix goroutine leak in federation service controller --- .../service/endpoint_helper.go | 46 +++++++++++++------ .../service/service_helper.go | 45 +++++++++++++----- .../service/servicecontroller.go | 17 +++++++ 3 files changed, 83 insertions(+), 25 deletions(-) diff --git a/federation/pkg/federation-controller/service/endpoint_helper.go b/federation/pkg/federation-controller/service/endpoint_helper.go index 94be1adbc144d..0b2a7d28db5ab 100644 --- a/federation/pkg/federation-controller/service/endpoint_helper.go +++ b/federation/pkg/federation-controller/service/endpoint_helper.go @@ -31,22 +31,42 @@ import ( // worker runs a worker thread that just dequeues items, processes them, and marks them done. // It enforces that the syncHandler is never invoked concurrently with the same key. func (sc *ServiceController) clusterEndpointWorker() { - fedClient := sc.federationClient + // process all pending events in endpointWorkerDoneChan + eventPending := true + for eventPending { + select { + case clusterName := <-sc.endpointWorkerDoneChan: + sc.endpointWorkerMap[clusterName] = false + default: + // non-blocking, comes here if all existing events are processed + eventPending = false + break + } + } + for clusterName, cache := range sc.clusterCache.clientMap { + workerExist, keyFound := sc.endpointWorkerMap[clusterName] + if keyFound && workerExist { + continue + } + sc.endpointWorkerMap[clusterName] = true + + // create a worker only if the previous worker has finished and gone out of scope go func(cache *clusterCache, clusterName string) { + fedClient := sc.federationClient for { - func() { - key, quit := cache.endpointQueue.Get() - // update endpoint cache - if quit { - return - } - defer cache.endpointQueue.Done(key) - err := sc.clusterCache.syncEndpoint(key.(string), clusterName, cache, sc.serviceCache, fedClient, sc) - if err != nil { - glog.V(2).Infof("Failed to sync endpoint: %+v", err) - } - }() + key, quit := cache.endpointQueue.Get() + // update endpoint cache + if quit { + // send signal that current worker has finished tasks and is going out of scope + sc.endpointWorkerDoneChan <- clusterName + return + } + defer cache.endpointQueue.Done(key) + err := sc.clusterCache.syncEndpoint(key.(string), clusterName, cache, sc.serviceCache, fedClient, sc) + if err != nil { + glog.V(2).Infof("Failed to sync endpoint: %+v", err) + } } }(cache, clusterName) } diff --git a/federation/pkg/federation-controller/service/service_helper.go b/federation/pkg/federation-controller/service/service_helper.go index 79fb955a2e388..372546dc7113c 100644 --- a/federation/pkg/federation-controller/service/service_helper.go +++ b/federation/pkg/federation-controller/service/service_helper.go @@ -35,21 +35,42 @@ import ( // worker runs a worker thread that just dequeues items, processes them, and marks them done. // It enforces that the syncHandler is never invoked concurrently with the same key. func (sc *ServiceController) clusterServiceWorker() { - fedClient := sc.federationClient + // process all pending events in serviceWorkerDoneChan + eventPending := true + for eventPending { + select { + case clusterName := <-sc.serviceWorkerDoneChan: + sc.serviceWorkerMap[clusterName] = false + default: + // non-blocking, comes here if all existing events are processed + eventPending = false + break + } + } + for clusterName, cache := range sc.clusterCache.clientMap { + workerExist, keyFound := sc.serviceWorkerMap[clusterName] + if keyFound && workerExist { + continue + } + sc.serviceWorkerMap[clusterName] = true + + // create a worker only if the previous worker has finished and gone out of scope go func(cache *clusterCache, clusterName string) { + fedClient := sc.federationClient for { - func() { - key, quit := cache.serviceQueue.Get() - defer cache.serviceQueue.Done(key) - if quit { - return - } - err := sc.clusterCache.syncService(key.(string), clusterName, cache, sc.serviceCache, fedClient, sc) - if err != nil { - glog.Errorf("Failed to sync service: %+v", err) - } - }() + key, quit := cache.serviceQueue.Get() + if quit { + // send signal that current worker has finished tasks and is going out of scope + sc.serviceWorkerDoneChan <- clusterName + return + } + defer cache.serviceQueue.Done(key) + err := sc.clusterCache.syncService(key.(string), clusterName, cache, sc.serviceCache, fedClient, sc) + if err != nil { + glog.Errorf("Failed to sync service: %+v", err) + } + } }(cache, clusterName) } diff --git a/federation/pkg/federation-controller/service/servicecontroller.go b/federation/pkg/federation-controller/service/servicecontroller.go index a7d929e613b15..a22ee4a5d83ac 100644 --- a/federation/pkg/federation-controller/service/servicecontroller.go +++ b/federation/pkg/federation-controller/service/servicecontroller.go @@ -68,6 +68,8 @@ const ( UserAgentName = "federation-service-controller" KubeAPIQPS = 20.0 KubeAPIBurst = 30 + + maxNoOfClusters = 256 ) type cachedService struct { @@ -119,6 +121,16 @@ type ServiceController struct { // services that need to be synced queue *workqueue.Type knownClusterSet sets.String + // endpoint worker map contains all the clusters registered with an indication that worker exist + // key clusterName + endpointWorkerMap map[string]bool + // channel for worker to signal that it is going out of existence + endpointWorkerDoneChan chan string + // service worker map contains all the clusters registered with an indication that worker exist + // key clusterName + serviceWorkerMap map[string]bool + // channel for worker to signal that it is going out of existence + serviceWorkerDoneChan chan string } // New returns a new service controller to keep DNS provider service resources @@ -205,6 +217,11 @@ func New(federationClient federation_release_1_4.Interface, dns dnsprovider.Inte }, }, ) + + s.endpointWorkerMap = make(map[string]bool) + s.serviceWorkerMap = make(map[string]bool) + s.endpointWorkerDoneChan = make(chan string, maxNoOfClusters) + s.serviceWorkerDoneChan = make(chan string, maxNoOfClusters) return s } From ebb6ee0d9bb441cdd3b748ac953d15830134c74b Mon Sep 17 00:00:00 2001 From: shashidharatd Date: Fri, 23 Sep 2016 14:27:51 +0530 Subject: [PATCH 04/10] Handle review comments for Fix goroutine leak in federation service controller --- .../service/endpoint_helper.go | 39 ++++++++++--------- .../service/service_helper.go | 38 +++++++++--------- .../service/servicecontroller.go | 2 +- 3 files changed, 40 insertions(+), 39 deletions(-) diff --git a/federation/pkg/federation-controller/service/endpoint_helper.go b/federation/pkg/federation-controller/service/endpoint_helper.go index 0b2a7d28db5ab..c18c883bd779f 100644 --- a/federation/pkg/federation-controller/service/endpoint_helper.go +++ b/federation/pkg/federation-controller/service/endpoint_helper.go @@ -32,43 +32,44 @@ import ( // It enforces that the syncHandler is never invoked concurrently with the same key. func (sc *ServiceController) clusterEndpointWorker() { // process all pending events in endpointWorkerDoneChan - eventPending := true - for eventPending { +ForLoop: + for { select { case clusterName := <-sc.endpointWorkerDoneChan: sc.endpointWorkerMap[clusterName] = false default: // non-blocking, comes here if all existing events are processed - eventPending = false - break + break ForLoop } } for clusterName, cache := range sc.clusterCache.clientMap { - workerExist, keyFound := sc.endpointWorkerMap[clusterName] - if keyFound && workerExist { + workerExist, found := sc.endpointWorkerMap[clusterName] + if found && workerExist { continue } - sc.endpointWorkerMap[clusterName] = true // create a worker only if the previous worker has finished and gone out of scope go func(cache *clusterCache, clusterName string) { fedClient := sc.federationClient for { - key, quit := cache.endpointQueue.Get() - // update endpoint cache - if quit { - // send signal that current worker has finished tasks and is going out of scope - sc.endpointWorkerDoneChan <- clusterName - return - } - defer cache.endpointQueue.Done(key) - err := sc.clusterCache.syncEndpoint(key.(string), clusterName, cache, sc.serviceCache, fedClient, sc) - if err != nil { - glog.V(2).Infof("Failed to sync endpoint: %+v", err) - } + func() { + key, quit := cache.endpointQueue.Get() + // update endpoint cache + if quit { + // send signal that current worker has finished tasks and is going out of scope + sc.endpointWorkerDoneChan <- clusterName + return + } + defer cache.endpointQueue.Done(key) + err := sc.clusterCache.syncEndpoint(key.(string), clusterName, cache, sc.serviceCache, fedClient, sc) + if err != nil { + glog.V(2).Infof("Failed to sync endpoint: %+v", err) + } + }() } }(cache, clusterName) + sc.endpointWorkerMap[clusterName] = true } } diff --git a/federation/pkg/federation-controller/service/service_helper.go b/federation/pkg/federation-controller/service/service_helper.go index 372546dc7113c..6f3d4d693a974 100644 --- a/federation/pkg/federation-controller/service/service_helper.go +++ b/federation/pkg/federation-controller/service/service_helper.go @@ -36,43 +36,43 @@ import ( // It enforces that the syncHandler is never invoked concurrently with the same key. func (sc *ServiceController) clusterServiceWorker() { // process all pending events in serviceWorkerDoneChan - eventPending := true - for eventPending { +ForLoop: + for { select { case clusterName := <-sc.serviceWorkerDoneChan: sc.serviceWorkerMap[clusterName] = false default: // non-blocking, comes here if all existing events are processed - eventPending = false - break + break ForLoop } } for clusterName, cache := range sc.clusterCache.clientMap { - workerExist, keyFound := sc.serviceWorkerMap[clusterName] - if keyFound && workerExist { + workerExist, found := sc.serviceWorkerMap[clusterName] + if found && workerExist { continue } - sc.serviceWorkerMap[clusterName] = true // create a worker only if the previous worker has finished and gone out of scope go func(cache *clusterCache, clusterName string) { fedClient := sc.federationClient for { - key, quit := cache.serviceQueue.Get() - if quit { - // send signal that current worker has finished tasks and is going out of scope - sc.serviceWorkerDoneChan <- clusterName - return - } - defer cache.serviceQueue.Done(key) - err := sc.clusterCache.syncService(key.(string), clusterName, cache, sc.serviceCache, fedClient, sc) - if err != nil { - glog.Errorf("Failed to sync service: %+v", err) - } - + func() { + key, quit := cache.serviceQueue.Get() + if quit { + // send signal that current worker has finished tasks and is going out of scope + sc.serviceWorkerDoneChan <- clusterName + return + } + defer cache.serviceQueue.Done(key) + err := sc.clusterCache.syncService(key.(string), clusterName, cache, sc.serviceCache, fedClient, sc) + if err != nil { + glog.Errorf("Failed to sync service: %+v", err) + } + }() } }(cache, clusterName) + sc.serviceWorkerMap[clusterName] = true } } diff --git a/federation/pkg/federation-controller/service/servicecontroller.go b/federation/pkg/federation-controller/service/servicecontroller.go index a22ee4a5d83ac..9a5791185e10a 100644 --- a/federation/pkg/federation-controller/service/servicecontroller.go +++ b/federation/pkg/federation-controller/service/servicecontroller.go @@ -69,7 +69,7 @@ const ( KubeAPIQPS = 20.0 KubeAPIBurst = 30 - maxNoOfClusters = 256 + maxNoOfClusters = 100 ) type cachedService struct { From a0e5821ea10c009cda59fe3bc9dd5ae4e96627c3 Mon Sep 17 00:00:00 2001 From: Quinton Hoole Date: Sun, 25 Sep 2016 19:06:12 -0700 Subject: [PATCH 05/10] Add periodic ingress reconciliations. --- .../pkg/federation-controller/ingress/ingress_controller.go | 6 ++++++ .../ingress/ingress_controller_test.go | 1 - 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/federation/pkg/federation-controller/ingress/ingress_controller.go b/federation/pkg/federation-controller/ingress/ingress_controller.go index f01b3a312536c..82573f395fa27 100644 --- a/federation/pkg/federation-controller/ingress/ingress_controller.go +++ b/federation/pkg/federation-controller/ingress/ingress_controller.go @@ -712,10 +712,13 @@ func (ic *IngressController) reconcileIngress(ingress types.NamespacedName) { objMeta, err := conversion.NewCloner().DeepCopy(clusterIngress.ObjectMeta) if err != nil { glog.Errorf("Error deep copying ObjectMeta: %v", err) + ic.deliverIngress(ingress, ic.ingressReviewDelay, true) + } desiredIngress.ObjectMeta, ok = objMeta.(v1.ObjectMeta) if !ok { glog.Errorf("Internal error: Failed to cast to v1.ObjectMeta: %v", objMeta) + ic.deliverIngress(ingress, ic.ingressReviewDelay, true) } // Merge any annotations and labels on the federated ingress onto the underlying cluster ingress, // overwriting duplicates. @@ -748,6 +751,7 @@ func (ic *IngressController) reconcileIngress(ingress types.NamespacedName) { if len(operations) == 0 { // Everything is in order glog.V(4).Infof("Ingress %q is up-to-date in all clusters - no propagation to clusters required.", ingress) + ic.deliverIngress(ingress, ic.ingressReviewDelay, false) return } glog.V(4).Infof("Calling federatedUpdater.Update() - operations: %v", operations) @@ -760,4 +764,6 @@ func (ic *IngressController) reconcileIngress(ingress types.NamespacedName) { ic.deliverIngress(ingress, ic.ingressReviewDelay, true) return } + // Schedule another periodic reconciliation, only to account for possible bugs in watch processing. + ic.deliverIngress(ingress, ic.ingressReviewDelay, false) } diff --git a/federation/pkg/federation-controller/ingress/ingress_controller_test.go b/federation/pkg/federation-controller/ingress/ingress_controller_test.go index bbaa90067e440..91f887f156c16 100644 --- a/federation/pkg/federation-controller/ingress/ingress_controller_test.go +++ b/federation/pkg/federation-controller/ingress/ingress_controller_test.go @@ -99,7 +99,6 @@ func TestIngressController(t *testing.T) { Name: "test-ingress", Namespace: "mynamespace", SelfLink: "/api/v1/namespaces/mynamespace/ingress/test-ingress", - // TODO: Remove: Annotations: map[string]string{}, }, Status: extensions_v1beta1.IngressStatus{ LoadBalancer: api_v1.LoadBalancerStatus{ From c856dfe32e6760075f23700547ab254a766f8934 Mon Sep 17 00:00:00 2001 From: Quinton Hoole Date: Sun, 25 Sep 2016 21:30:31 -0700 Subject: [PATCH 06/10] Add better logging if IP addres updates. --- .../pkg/federation-controller/ingress/ingress_controller.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/federation/pkg/federation-controller/ingress/ingress_controller.go b/federation/pkg/federation-controller/ingress/ingress_controller.go index 82573f395fa27..126f8d8d292c1 100644 --- a/federation/pkg/federation-controller/ingress/ingress_controller.go +++ b/federation/pkg/federation-controller/ingress/ingress_controller.go @@ -691,12 +691,12 @@ func (ic *IngressController) reconcileIngress(ingress types.NamespacedName) { baseIngress.Status.LoadBalancer = *lbstatus } glog.V(4).Infof("Attempting to update base federated ingress: %v", baseIngress) - if _, err = ic.federatedApiClient.Extensions().Ingresses(baseIngress.Namespace).Update(baseIngress); err != nil { + if updatedFedIngress, err := ic.federatedApiClient.Extensions().Ingresses(baseIngress.Namespace).Update(baseIngress); err != nil { glog.Errorf("Failed to add static IP annotation to federated ingress %q, will try again later: %v", ingress, err) ic.deliverIngress(ingress, ic.ingressReviewDelay, true) return } else { - glog.V(4).Infof("Successfully added static IP annotation to federated ingress: %q", ingress) + glog.V(4).Infof("Successfully updated federated ingress %q (added IP), after update: %q", ingress, updatedFedIngress) ic.deliverIngress(ingress, ic.smallDelay, false) return } From 33b21ad06e3e0de98934be9c9df753090f4c385b Mon Sep 17 00:00:00 2001 From: Quinton Hoole Date: Mon, 26 Sep 2016 10:06:05 -0700 Subject: [PATCH 07/10] Use UpdateStatus, not Update, to add LoadBalancerStatus to Federated Ingress. --- .../ingress/ingress_controller.go | 30 ++++++++++++------- 1 file changed, 20 insertions(+), 10 deletions(-) diff --git a/federation/pkg/federation-controller/ingress/ingress_controller.go b/federation/pkg/federation-controller/ingress/ingress_controller.go index 126f8d8d292c1..516a49581ce64 100644 --- a/federation/pkg/federation-controller/ingress/ingress_controller.go +++ b/federation/pkg/federation-controller/ingress/ingress_controller.go @@ -679,6 +679,16 @@ func (ic *IngressController) reconcileIngress(ingress types.NamespacedName) { glog.V(4).Infof(logStr, "Transferring") if !baseIPAnnotationExists && clusterIPNameExists { baseIngress.ObjectMeta.Annotations[staticIPNameKeyWritable] = clusterIPName + glog.V(4).Infof("Attempting to update base federated ingress annotations: %v", baseIngress) + if updatedFedIngress, err := ic.federatedApiClient.Extensions().Ingresses(baseIngress.Namespace).Update(baseIngress); err != nil { + glog.Errorf("Failed to add static IP annotation to federated ingress %q, will try again later: %v", ingress, err) + ic.deliverIngress(ingress, ic.ingressReviewDelay, true) + return + } else { + glog.V(4).Infof("Successfully updated federated ingress %q (added IP annotation), after update: %q", ingress, updatedFedIngress) + ic.deliverIngress(ingress, ic.smallDelay, false) + return + } } if !baseLBStatusExists && clusterLBStatusExists { lbstatusObj, lbErr := conversion.NewCloner().DeepCopy(&clusterIngress.Status.LoadBalancer) @@ -689,16 +699,16 @@ func (ic *IngressController) reconcileIngress(ingress types.NamespacedName) { return } baseIngress.Status.LoadBalancer = *lbstatus - } - glog.V(4).Infof("Attempting to update base federated ingress: %v", baseIngress) - if updatedFedIngress, err := ic.federatedApiClient.Extensions().Ingresses(baseIngress.Namespace).Update(baseIngress); err != nil { - glog.Errorf("Failed to add static IP annotation to federated ingress %q, will try again later: %v", ingress, err) - ic.deliverIngress(ingress, ic.ingressReviewDelay, true) - return - } else { - glog.V(4).Infof("Successfully updated federated ingress %q (added IP), after update: %q", ingress, updatedFedIngress) - ic.deliverIngress(ingress, ic.smallDelay, false) - return + glog.V(4).Infof("Attempting to update base federated ingress status: %v", baseIngress) + if updatedFedIngress, err := ic.federatedApiClient.Extensions().Ingresses(baseIngress.Namespace).UpdateStatus(baseIngress); err != nil { + glog.Errorf("Failed to update federated ingress status of %q (loadbalancer status), will try again later: %v", ingress, err) + ic.deliverIngress(ingress, ic.ingressReviewDelay, true) + return + } else { + glog.V(4).Infof("Successfully updated federated ingress status of %q (added loadbalancer status), after update: %q", ingress, updatedFedIngress) + ic.deliverIngress(ingress, ic.smallDelay, false) + return + } } } else { glog.V(4).Infof(logStr, "Not transferring") From a88b2bb8ee8a7881892000440b7311ba5768a8ff Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Mon, 3 Oct 2016 18:45:45 -0700 Subject: [PATCH 08/10] scheduler: cache.delete deletes the pod from node specified in the cached state --- plugin/pkg/scheduler/schedulercache/cache.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/plugin/pkg/scheduler/schedulercache/cache.go b/plugin/pkg/scheduler/schedulercache/cache.go index 556aea3082680..985f9ed0f73d8 100644 --- a/plugin/pkg/scheduler/schedulercache/cache.go +++ b/plugin/pkg/scheduler/schedulercache/cache.go @@ -244,12 +244,12 @@ func (cache *schedulerCache) RemovePod(pod *api.Pod) error { cache.mu.Lock() defer cache.mu.Unlock() - _, ok := cache.podStates[key] + cachedstate, ok := cache.podStates[key] switch { // An assumed pod won't have Delete/Remove event. It needs to have Add event // before Remove event, in which case the state would change from Assumed to Added. case ok && !cache.assumedPods[key]: - err := cache.removePod(pod) + err := cache.removePod(cachedstate.pod) if err != nil { return err } From f240e4206253a9abc9fd89cb26ee81d5d94e7d00 Mon Sep 17 00:00:00 2001 From: Quinton Hoole Date: Mon, 3 Oct 2016 15:59:30 -0700 Subject: [PATCH 09/10] Heal the namespaceless ingresses in federation e2e. --- test/e2e/federated-ingress.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/e2e/federated-ingress.go b/test/e2e/federated-ingress.go index 03462c90558d5..3e69a9fa623f9 100644 --- a/test/e2e/federated-ingress.go +++ b/test/e2e/federated-ingress.go @@ -284,10 +284,10 @@ func createIngressOrFail(clientset *federation_release_1_4.Clientset, namespace }, } - _, err := clientset.Extensions().Ingresses(namespace).Create(ingress) + newIng, err := clientset.Extensions().Ingresses(namespace).Create(ingress) framework.ExpectNoError(err, "Creating ingress %q in namespace %q", ingress.Name, namespace) By(fmt.Sprintf("Successfully created federated ingress %q in namespace %q", FederatedIngressName, namespace)) - return ingress + return newIng } func updateIngressOrFail(clientset *federation_release_1_4.Clientset, namespace string) (newIng *v1beta1.Ingress) { From 64ab2334b7ad910b452d771013092c970465664d Mon Sep 17 00:00:00 2001 From: Quinton Hoole Date: Wed, 5 Oct 2016 18:20:12 -0700 Subject: [PATCH 10/10] Add missing argument to log message in federated ingress controller. --- .../pkg/federation-controller/ingress/ingress_controller.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/federation/pkg/federation-controller/ingress/ingress_controller.go b/federation/pkg/federation-controller/ingress/ingress_controller.go index 516a49581ce64..40e2b1cc58d7a 100644 --- a/federation/pkg/federation-controller/ingress/ingress_controller.go +++ b/federation/pkg/federation-controller/ingress/ingress_controller.go @@ -666,7 +666,7 @@ func (ic *IngressController) reconcileIngress(ingress types.NamespacedName) { ClusterName: cluster.Name, }) } else { - glog.V(4).Infof("No annotation %q exists on ingress %q in federation, and index of cluster %q is %d and not zero. Not queueing create operation for ingress %q until annotation exists", staticIPNameKeyWritable, ingress, cluster.Name, clusterIndex) + glog.V(4).Infof("No annotation %q exists on ingress %q in federation, and index of cluster %q is %d and not zero. Not queueing create operation for ingress %q until annotation exists", staticIPNameKeyWritable, ingress, cluster.Name, clusterIndex, ingress) } } else { clusterIngress := clusterIngressObj.(*extensions_v1beta1.Ingress)