diff --git a/nginx-controller/controller/controller.go b/nginx-controller/controller/controller.go index 738f841d6d..c8f2b420af 100644 --- a/nginx-controller/controller/controller.go +++ b/nginx-controller/controller/controller.go @@ -26,11 +26,14 @@ import ( "github.com/nginxinc/kubernetes-ingress/nginx-controller/nginx" "k8s.io/kubernetes/pkg/api" + podutil "k8s.io/kubernetes/pkg/api/pod" "k8s.io/kubernetes/pkg/apis/extensions" "k8s.io/kubernetes/pkg/client/cache" client "k8s.io/kubernetes/pkg/client/unversioned" "k8s.io/kubernetes/pkg/controller/framework" + "k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/util/intstr" "k8s.io/kubernetes/pkg/watch" ) @@ -384,13 +387,13 @@ func (lbc *LoadBalancerController) createIngress(ing *extensions.Ingress) nginx. ingEx.Secrets[secretName] = secret } - ingEx.Endpoints = make(map[string]*api.Endpoints) + ingEx.Endpoints = make(map[string][]string) if ing.Spec.Backend != nil { endps, err := lbc.getEndpointsForIngressBackend(ing.Spec.Backend, ing.Namespace) if err != nil { - glog.V(3).Infof("Error retrieving endpoints for the services %v: %v", ing.Spec.Backend.ServiceName, err) + glog.Warningf("Error retrieving endpoints for the service %v: %v", ing.Spec.Backend.ServiceName, err) } else { - ingEx.Endpoints[ing.Spec.Backend.ServiceName] = endps + ingEx.Endpoints[ing.Spec.Backend.ServiceName+ing.Spec.Backend.ServicePort.String()] = endps } } @@ -402,35 +405,113 @@ func (lbc *LoadBalancerController) createIngress(ing *extensions.Ingress) nginx. for _, path := range rule.HTTP.Paths { endps, err := lbc.getEndpointsForIngressBackend(&path.Backend, ing.Namespace) if err != nil { - glog.V(3).Infof("Error retrieving endpoints for the services %v: %v", path.Backend.ServiceName, err) + glog.Warningf("Error retrieving endpoints for the service %v: %v", path.Backend.ServiceName, err) } else { - ingEx.Endpoints[path.Backend.ServiceName] = endps + ingEx.Endpoints[path.Backend.ServiceName+path.Backend.ServicePort.String()] = endps } - } } return ingEx } -func (lbc *LoadBalancerController) getEndpointsForIngressBackend(backend *extensions.IngressBackend, namespace string) (*api.Endpoints, error) { +func (lbc *LoadBalancerController) getEndpointsForIngressBackend(backend *extensions.IngressBackend, namespace string) ([]string, error) { + svc, err := lbc.getServiceForIngressBackend(backend, namespace) + if err != nil { + glog.V(3).Infof("Error getting service %v: %v", backend.ServiceName, err) + return nil, err + } + + endps, err := lbc.endpLister.GetServiceEndpoints(svc) + if err != nil { + glog.V(3).Infof("Error getting endpoints for service %s from the cache: %v", svc.Name, err) + return nil, err + } + + result, err := lbc.getEndpointsForPort(endps, backend.ServicePort, svc) + if err != nil { + glog.V(3).Infof("Error getting endpoints for service %s port %v: %v", svc.Name, backend.ServicePort, err) + return nil, err + } + return result, nil +} + +func (lbc *LoadBalancerController) getEndpointsForPort(endps api.Endpoints, ingSvcPort intstr.IntOrString, svc *api.Service) ([]string, error) { + var targetPort int + var err error + found := false + + for _, port := range svc.Spec.Ports { + if (ingSvcPort.Type == intstr.Int && port.Port == ingSvcPort.IntValue()) || (ingSvcPort.Type == intstr.String && port.Name == ingSvcPort.String()) { + targetPort, err = lbc.getTargetPort(&port, svc) + if err != nil { + return nil, fmt.Errorf("Error determining target port for port %v in Ingress: %v", ingSvcPort, err) + } + found = true + break + } + } + + if !found { + return nil, fmt.Errorf("No port %v in service %s", ingSvcPort, svc.Name) + } + + for _, subset := range endps.Subsets { + for _, port := range subset.Ports { + if port.Port == targetPort { + var endpoints []string + for _, address := range subset.Addresses { + endpoint := fmt.Sprintf("%v:%v", address.IP, port.Port) + endpoints = append(endpoints, endpoint) + } + return endpoints, nil + } + } + } + + return nil, fmt.Errorf("No endpoints for target port %v in service %s", targetPort, svc.Name) +} + +func (lbc *LoadBalancerController) getTargetPort(svcPort *api.ServicePort, svc *api.Service) (int, error) { + if (svcPort.TargetPort == intstr.IntOrString{}) { + return svcPort.Port, nil + } + + if svcPort.TargetPort.Type == intstr.Int { + return svcPort.TargetPort.IntValue(), nil + } + + pods, err := lbc.client.Pods(svc.Namespace).List(api.ListOptions{LabelSelector: labels.Set(svc.Spec.Selector).AsSelector()}) + if err != nil { + return 0, fmt.Errorf("Error getting pod information: %v", err) + } + + if len(pods.Items) == 0 { + return 0, fmt.Errorf("No pods of service %s", svc.Name) + } + + pod := &pods.Items[0] + + portNum, err := podutil.FindPort(pod, svcPort) + if err != nil { + return 0, fmt.Errorf("Error finding named port %v in pod %s: %v", svcPort, pod.Name, err) + } + + return portNum, nil +} + +func (lbc *LoadBalancerController) getServiceForIngressBackend(backend *extensions.IngressBackend, namespace string) (*api.Service, error) { svcKey := namespace + "/" + backend.ServiceName svcObj, svcExists, err := lbc.svcLister.Store.GetByKey(svcKey) if err != nil { - glog.V(3).Infof("error getting service %v from the cache: %v", svcKey, err) return nil, err } + if svcExists { - svc := svcObj.(*api.Service) - endps, err := lbc.endpLister.GetServiceEndpoints(svc) - if err != nil { - glog.V(3).Infof("error getting endpoints for service %v from the cache: %v", svc, err) - return nil, err - } - return &endps, nil + return svcObj.(*api.Service), nil } - return nil, fmt.Errorf("service %s doesn't exists", svcKey) + return nil, fmt.Errorf("service %s doesn't exists", svcKey) } func parseNginxConfigMaps(nginxConfigMaps string) (string, string, error) { diff --git a/nginx-controller/nginx/configurator.go b/nginx-controller/nginx/configurator.go index fdbab6a726..3e32c6c8ce 100644 --- a/nginx-controller/nginx/configurator.go +++ b/nginx-controller/nginx/configurator.go @@ -203,9 +203,13 @@ func createLocation(path string, upstream Upstream, cfg *Config, websocket bool) func (cnf *Configurator) createUpstream(ingEx *IngressEx, name string, backend *extensions.IngressBackend, namespace string) Upstream { ups := NewUpstreamWithDefaultServer(name) - endps, exists := ingEx.Endpoints[backend.ServiceName] + endps, exists := ingEx.Endpoints[backend.ServiceName+backend.ServicePort.String()] if exists { - upsServers := endpointsToUpstreamServers(*endps, backend.ServicePort.IntValue()) + var upsServers []UpstreamServer + for _, endp := range endps { + addressport := strings.Split(endp, ":") + upsServers = append(upsServers, UpstreamServer{addressport[0], addressport[1]}) + } if len(upsServers) > 0 { ups.UpstreamServers = upsServers } @@ -221,23 +225,6 @@ func pathOrDefault(path string) string { return path } -func endpointsToUpstreamServers(endps api.Endpoints, servicePort int) []UpstreamServer { - var upsServers []UpstreamServer - for _, subset := range endps.Subsets { - for _, port := range subset.Ports { - if port.Port == servicePort { - for _, address := range subset.Addresses { - ups := UpstreamServer{Address: address.IP, Port: fmt.Sprintf("%v", servicePort)} - upsServers = append(upsServers, ups) - } - break - } - } - } - - return upsServers -} - func getNameForUpstream(ing *extensions.Ingress, host string, service string) string { return fmt.Sprintf("%v-%v-%v-%v", ing.Namespace, ing.Name, host, service) } diff --git a/nginx-controller/nginx/ingress.go b/nginx-controller/nginx/ingress.go index a82675c826..a4c641e299 100644 --- a/nginx-controller/nginx/ingress.go +++ b/nginx-controller/nginx/ingress.go @@ -8,5 +8,5 @@ import "k8s.io/kubernetes/pkg/apis/extensions" type IngressEx struct { Ingress *extensions.Ingress Secrets map[string]*api.Secret - Endpoints map[string]*api.Endpoints + Endpoints map[string][]string } diff --git a/nginx-plus-controller/controller/controller.go b/nginx-plus-controller/controller/controller.go index cda7992e2c..ede077bcf4 100644 --- a/nginx-plus-controller/controller/controller.go +++ b/nginx-plus-controller/controller/controller.go @@ -26,11 +26,14 @@ import ( "github.com/nginxinc/kubernetes-ingress/nginx-plus-controller/nginx" "k8s.io/kubernetes/pkg/api" + podutil "k8s.io/kubernetes/pkg/api/pod" "k8s.io/kubernetes/pkg/apis/extensions" "k8s.io/kubernetes/pkg/client/cache" client "k8s.io/kubernetes/pkg/client/unversioned" "k8s.io/kubernetes/pkg/controller/framework" + "k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/util/intstr" "k8s.io/kubernetes/pkg/watch" ) @@ -384,13 +387,13 @@ func (lbc *LoadBalancerController) createIngress(ing *extensions.Ingress) nginx. ingEx.Secrets[secretName] = secret } - ingEx.Endpoints = make(map[string]*api.Endpoints) + ingEx.Endpoints = make(map[string][]string) if ing.Spec.Backend != nil { endps, err := lbc.getEndpointsForIngressBackend(ing.Spec.Backend, ing.Namespace) if err != nil { - glog.V(3).Infof("Error retrieving endpoints for the services %v: %v", ing.Spec.Backend.ServiceName, err) + glog.Warningf("Error retrieving endpoints for the service %v: %v", ing.Spec.Backend.ServiceName, err) } else { - ingEx.Endpoints[ing.Spec.Backend.ServiceName] = endps + ingEx.Endpoints[ing.Spec.Backend.ServiceName+ing.Spec.Backend.ServicePort.String()] = endps } } @@ -402,32 +405,110 @@ func (lbc *LoadBalancerController) createIngress(ing *extensions.Ingress) nginx. for _, path := range rule.HTTP.Paths { endps, err := lbc.getEndpointsForIngressBackend(&path.Backend, ing.Namespace) if err != nil { - glog.V(3).Infof("Error retrieving endpoints for the services %v: %v", path.Backend.ServiceName, err) + glog.Warningf("Error retrieving endpoints for the service %v: %v", path.Backend.ServiceName, err) } else { - ingEx.Endpoints[path.Backend.ServiceName] = endps + ingEx.Endpoints[path.Backend.ServiceName+path.Backend.ServicePort.String()] = endps } - } } return ingEx } -func (lbc *LoadBalancerController) getEndpointsForIngressBackend(backend *extensions.IngressBackend, namespace string) (*api.Endpoints, error) { +func (lbc *LoadBalancerController) getEndpointsForIngressBackend(backend *extensions.IngressBackend, namespace string) ([]string, error) { + svc, err := lbc.getServiceForIngressBackend(backend, namespace) + if err != nil { + glog.V(3).Infof("Error getting service %v: %v", backend.ServiceName, err) + return nil, err + } + + endps, err := lbc.endpLister.GetServiceEndpoints(svc) + if err != nil { + glog.V(3).Infof("Error getting endpoints for service %s from the cache: %v", svc.Name, err) + return nil, err + } + + result, err := lbc.getEndpointsForPort(endps, backend.ServicePort, svc) + if err != nil { + glog.V(3).Infof("Error getting endpoints for service %s port %v: %v", svc.Name, backend.ServicePort, err) + return nil, err + } + return result, nil +} + +func (lbc *LoadBalancerController) getEndpointsForPort(endps api.Endpoints, ingSvcPort intstr.IntOrString, svc *api.Service) ([]string, error) { + var targetPort int + var err error + found := false + + for _, port := range svc.Spec.Ports { + if (ingSvcPort.Type == intstr.Int && port.Port == ingSvcPort.IntValue()) || (ingSvcPort.Type == intstr.String && port.Name == ingSvcPort.String()) { + targetPort, err = lbc.getTargetPort(&port, svc) + if err != nil { + return nil, fmt.Errorf("Error determining target port for port %v in Ingress: %v", ingSvcPort, err) + } + found = true + break + } + } + + if !found { + return nil, fmt.Errorf("No port %v in service %s", ingSvcPort, svc.Name) + } + + for _, subset := range endps.Subsets { + for _, port := range subset.Ports { + if port.Port == targetPort { + var endpoints []string + for _, address := range subset.Addresses { + endpoint := fmt.Sprintf("%v:%v", address.IP, port.Port) + endpoints = append(endpoints, endpoint) + } + return endpoints, nil + } + } + } + + return nil, fmt.Errorf("No endpoints for target port %v in service %s", targetPort, svc.Name) +} + +func (lbc *LoadBalancerController) getTargetPort(svcPort *api.ServicePort, svc *api.Service) (int, error) { + if (svcPort.TargetPort == intstr.IntOrString{}) { + return svcPort.Port, nil + } + + if svcPort.TargetPort.Type == intstr.Int { + return svcPort.TargetPort.IntValue(), nil + } + + pods, err := lbc.client.Pods(svc.Namespace).List(api.ListOptions{LabelSelector: labels.Set(svc.Spec.Selector).AsSelector()}) + if err != nil { + return 0, fmt.Errorf("Error getting pod information: %v", err) + } + + if len(pods.Items) == 0 { + return 0, fmt.Errorf("No pods of service %s", svc.Name) + } + + pod := &pods.Items[0] + + portNum, err := podutil.FindPort(pod, svcPort) + if err != nil { + return 0, fmt.Errorf("Error finding named port %v in pod %s: %v", svcPort, pod.Name, err) + } + + return portNum, nil +} + +func (lbc *LoadBalancerController) getServiceForIngressBackend(backend *extensions.IngressBackend, namespace string) (*api.Service, error) { svcKey := namespace + "/" + backend.ServiceName svcObj, svcExists, err := lbc.svcLister.Store.GetByKey(svcKey) if err != nil { - glog.V(3).Infof("error getting service %v from the cache: %v", svcKey, err) return nil, err } + if svcExists { - svc := svcObj.(*api.Service) - endps, err := lbc.endpLister.GetServiceEndpoints(svc) - if err != nil { - glog.V(3).Infof("error getting endpoints for service %v from the cache: %v", svc, err) - return nil, err - } - return &endps, nil + return svcObj.(*api.Service), nil } return nil, fmt.Errorf("service %s doesn't exists", svcKey) diff --git a/nginx-plus-controller/nginx/configurator.go b/nginx-plus-controller/nginx/configurator.go index 766bba5c1a..7768241348 100644 --- a/nginx-plus-controller/nginx/configurator.go +++ b/nginx-plus-controller/nginx/configurator.go @@ -255,23 +255,6 @@ func pathOrDefault(path string) string { return path } -func endpointsToUpstreamServers(endps api.Endpoints, servicePort int) []UpstreamServer { - var upsServers []UpstreamServer - for _, subset := range endps.Subsets { - for _, port := range subset.Ports { - if port.Port == servicePort { - for _, address := range subset.Addresses { - ups := UpstreamServer{Address: address.IP, Port: fmt.Sprintf("%v", servicePort)} - upsServers = append(upsServers, ups) - } - break - } - } - } - - return upsServers -} - func getNameForUpstream(ing *extensions.Ingress, host string, service string) string { return fmt.Sprintf("%v-%v-%v-%v", ing.Namespace, ing.Name, host, service) } @@ -308,10 +291,9 @@ func (cnf *Configurator) UpdateEndpoints(name string, ingEx *IngressEx) { func (cnf *Configurator) updateEndpoints(name string, ingEx *IngressEx) { if ingEx.Ingress.Spec.Backend != nil { name := getNameForUpstream(ingEx.Ingress, emptyHost, ingEx.Ingress.Spec.Backend.ServiceName) - endps, exists := ingEx.Endpoints[ingEx.Ingress.Spec.Backend.ServiceName] + endps, exists := ingEx.Endpoints[ingEx.Ingress.Spec.Backend.ServiceName+ingEx.Ingress.Spec.Backend.ServicePort.String()] if exists { - endpoints := getEndpointsList(endps, ingEx.Ingress.Spec.Backend.ServicePort.IntValue()) - err := cnf.nginxAPI.UpdateServers(name, endpoints) + err := cnf.nginxAPI.UpdateServers(name, endps) if err != nil { glog.Warningf("Couldn't update the endponts for %v: %v", name, err) } @@ -323,10 +305,9 @@ func (cnf *Configurator) updateEndpoints(name string, ingEx *IngressEx) { } for _, path := range rule.HTTP.Paths { name := getNameForUpstream(ingEx.Ingress, rule.Host, path.Backend.ServiceName) - endps, exists := ingEx.Endpoints[path.Backend.ServiceName] + endps, exists := ingEx.Endpoints[path.Backend.ServiceName+path.Backend.ServicePort.String()] if exists { - endpoints := getEndpointsList(endps, path.Backend.ServicePort.IntValue()) - err := cnf.nginxAPI.UpdateServers(name, endpoints) + err := cnf.nginxAPI.UpdateServers(name, endps) if err != nil { glog.Warningf("Couldn't update the endponts for %v: %v", name, err) } @@ -335,23 +316,6 @@ func (cnf *Configurator) updateEndpoints(name string, ingEx *IngressEx) { } } -func getEndpointsList(endps *api.Endpoints, servicePort int) []string { - var result []string - - for _, subset := range endps.Subsets { - for _, port := range subset.Ports { - if port.Port == servicePort { - for _, address := range subset.Addresses { - result = append(result, fmt.Sprintf("%v:%v", address.IP, servicePort)) - } - break - } - } - } - - return result -} - // UpdateConfig updates NGINX Configuration parameters func (cnf *Configurator) UpdateConfig(config *Config) { cnf.lock.Lock() diff --git a/nginx-plus-controller/nginx/ingress.go b/nginx-plus-controller/nginx/ingress.go index a82675c826..a4c641e299 100644 --- a/nginx-plus-controller/nginx/ingress.go +++ b/nginx-plus-controller/nginx/ingress.go @@ -8,5 +8,5 @@ import "k8s.io/kubernetes/pkg/apis/extensions" type IngressEx struct { Ingress *extensions.Ingress Secrets map[string]*api.Secret - Endpoints map[string]*api.Endpoints + Endpoints map[string][]string }