From 35ff9ed98ac6c9b5ed274cbe1a3e1f4681107527 Mon Sep 17 00:00:00 2001 From: Andrew Sy Kim Date: Sat, 2 Nov 2019 22:15:45 -0400 Subject: [PATCH 1/3] add experimental support for Service Type=LoadBalancer w/ NSX-T Signed-off-by: Andrew Sy Kim --- go.mod | 1 + go.sum | 2 + pkg/cloudprovider/vsphere/cloud.go | 29 +- pkg/cloudprovider/vsphere/loadbalancer.go | 623 ++++++++++++++++++++++ pkg/cloudprovider/vsphere/types.go | 1 + pkg/common/config/types.go | 19 + 6 files changed, 668 insertions(+), 7 deletions(-) create mode 100644 pkg/cloudprovider/vsphere/loadbalancer.go diff --git a/go.mod b/go.mod index 385f96368f..b1b0a8c274 100644 --- a/go.mod +++ b/go.mod @@ -15,6 +15,7 @@ require ( github.com/prometheus/client_golang v0.9.3-0.20190127221311-3c4408c8b829 github.com/spf13/cobra v0.0.3 github.com/spf13/pflag v1.0.3 + github.com/vmware/go-vmware-nsxt v0.0.0-20190201205556-16aa0443042d github.com/vmware/govmomi v0.21.0 golang.org/x/lint v0.0.0-20190409202823-959b441ac422 // indirect golang.org/x/net v0.0.0-20190724013045-ca1201d0de80 diff --git a/go.sum b/go.sum index c715f51aa9..b2fa9a4222 100644 --- a/go.sum +++ b/go.sum @@ -314,6 +314,8 @@ github.com/tmc/grpc-websocket-proxy v0.0.0-20170815181823-89b8d40f7ca8/go.mod h1 github.com/urfave/negroni v1.0.0/go.mod h1:Meg73S6kFm/4PpbYdq35yYWoCZ9mS/YSx+lKnmiohz4= github.com/vishvananda/netlink v0.0.0-20171020171820-b2de5d10e38e/go.mod h1:+SR5DhBJrl6ZM7CoCKvpw5BKroDKQ+PJqOg65H/2ktk= github.com/vishvananda/netns v0.0.0-20171111001504-be1fbeda1936/go.mod h1:ZjcWmFBXmLKZu9Nxj3WKYEafiSqer2rnvPr0en9UNpI= +github.com/vmware/go-vmware-nsxt v0.0.0-20190201205556-16aa0443042d h1:95uMoAryJvAwfDSxlgQEbT9xkNyWTB5YIxCweItbjGc= +github.com/vmware/go-vmware-nsxt v0.0.0-20190201205556-16aa0443042d/go.mod h1:AzmozsuEImWQcg9Cfef9oMNoPQKcNaps6VkyJhzbSf0= github.com/vmware/govmomi v0.20.1/go.mod h1:URlwyTFZX72RmxtxuaFL2Uj3fD1JTvZdx59bHWk6aFU= github.com/vmware/govmomi v0.21.0 h1:jc8uMuxpcV2xMAA/cnEDlnsIjvqcMra5Y8onh/U3VuY= github.com/vmware/govmomi v0.21.0/go.mod h1:zbnFoBQ9GIjs2RVETy8CNEpb+L+Lwkjs3XZUL0B3/m0= diff --git a/pkg/cloudprovider/vsphere/cloud.go b/pkg/cloudprovider/vsphere/cloud.go index d482eddb5e..2a4e9edc30 100644 --- a/pkg/cloudprovider/vsphere/cloud.go +++ b/pkg/cloudprovider/vsphere/cloud.go @@ -95,8 +95,12 @@ func (vs *VSphere) Initialize(clientBuilder cloudprovider.ControllerClientBuilde // LoadBalancer returns a balancer interface. Also returns true if the // interface is supported, false otherwise. func (vs *VSphere) LoadBalancer() (cloudprovider.LoadBalancer, bool) { - klog.Warning("The vSphere cloud provider does not support load balancers") - return nil, false + if vs.loadbalancer == nil { + klog.Warning("The vSphere cloud provider does not support load balancers") + return nil, false + } + + return vs.loadbalancer, true } // Instances returns an instances interface. Also returns true if the @@ -152,12 +156,23 @@ func buildVSphereFromConfig(cfg *vcfg.Config) (*VSphere, error) { vcList: make(map[string]*VCenterInfo), } + var err error + var loadbalancer cloudprovider.LoadBalancer + if cfg.LoadbalancerNSXT != nil { + // TODO: feature gate this config with an env var? + loadbalancer, err = newNSXTLoadBalancer(cfg.Global.ClusterID, cfg.LoadbalancerNSXT) + if err != nil { + return nil, err + } + } + vs := VSphere{ - cfg: cfg, - nodeManager: nm, - instances: newInstances(nm), - zones: newZones(nm, cfg.Labels.Zone, cfg.Labels.Region), - server: server.NewServer(cfg.Global.APIBinding, nm), + cfg: cfg, + nodeManager: nm, + instances: newInstances(nm), + loadbalancer: loadbalancer, + zones: newZones(nm, cfg.Labels.Zone, cfg.Labels.Region), + server: server.NewServer(cfg.Global.APIBinding, nm), } return &vs, nil } diff --git a/pkg/cloudprovider/vsphere/loadbalancer.go b/pkg/cloudprovider/vsphere/loadbalancer.go new file mode 100644 index 0000000000..24894461f6 --- /dev/null +++ b/pkg/cloudprovider/vsphere/loadbalancer.go @@ -0,0 +1,623 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package vsphere + +import ( + "context" + "crypto/tls" + "encoding/json" + "fmt" + "io/ioutil" + "net/http" + "strconv" + + v1 "k8s.io/api/core/v1" + cloudprovider "k8s.io/cloud-provider" + vcfg "k8s.io/cloud-provider-vsphere/pkg/common/config" + "k8s.io/klog" + + nsxt "github.com/vmware/go-vmware-nsxt" + "github.com/vmware/go-vmware-nsxt/common" + "github.com/vmware/go-vmware-nsxt/loadbalancer" + "github.com/vmware/go-vmware-nsxt/manager" +) + +// nsxtLB implements cloudprovider.LoadBalancer for vSphere clusters with NSX-T +type nsxtLB struct { + client *nsxt.APIClient + // clusterID is a unique identifier of the cluster + clusterID string + // routerID is the ID of the NSX-T tier1 router used for creating LoadBalancers + routerID string + // lbServiceID is the ID of the NSX-T LoadBalancer Service where virtual servers + // for Service Type=LoadBalancer are created + lbServiceID string + // vipPoolID is the ID of the IP Pool where VIPs will be allocated + vipPoolID string + + // required raw http requests for listing load balancer resources + // TODO: remove this once go-vmware-nsxt can support ListLoadBalancer* methods + server string + username string + password string + insecure bool +} + +func newNSXTLoadBalancer(clusterID string, cfg *vcfg.LoadbalancerNSXT) (cloudprovider.LoadBalancer, error) { + nsxtCfg := &nsxt.Configuration{ + BasePath: "/api/v1", + Host: cfg.Server, + Scheme: "https", + UserAgent: "kubernetes/cloud-provider-vsphere", + UserName: cfg.User, + Password: cfg.Password, + Insecure: cfg.Insecure, + } + + nsxClient, err := nsxt.NewAPIClient(nsxtCfg) + if err != nil { + return nil, err + } + + // TODO: validate config values + return &nsxtLB{ + client: nsxClient, + routerID: cfg.Tier1RouterID, + clusterID: clusterID, + vipPoolID: cfg.VIPPoolID, + + // only required for raw http requests + server: cfg.Server, + username: cfg.User, + password: cfg.Password, + insecure: cfg.Insecure, + }, nil +} + +func (n *nsxtLB) Initialize() error { + // first look for the Tier1 router by ID provided in config + router, resp, err := n.client.LogicalRoutingAndServicesApi.ReadLogicalRouter(n.client.Context, n.routerID) + if resp.StatusCode == http.StatusNotFound { + return fmt.Errorf("NSX-T logical router with router ID %q not found", n.routerID) + } + + if err != nil { + return err + } + + // then check for LB Service by cluster ID, if it already exists, we're good to go + // if it doesn't exist, create one now + lbServiceName := n.loadBalancerServiceName() + lbService, exists, err := n.getLBServiceByName(lbServiceName) + if err != nil { + return err + } + + if exists { + n.lbServiceID = lbService.Id + return nil + } + + lbService = loadbalancer.LbService{ + DisplayName: lbServiceName, + Description: fmt.Sprintf("LoadBalancer Service managed by Kubernetes vSphere Cloud Provider (%s)", n.clusterID), + Enabled: true, + Size: "SMALL", // TODO: this should be configurable in the config? + Attachment: &common.ResourceReference{ + TargetType: router.ResourceType, + TargetId: router.Id, + }, + } + + lbService, _, err = n.client.ServicesApi.CreateLoadBalancerService(n.client.Context, lbService) + if err != nil { + return err + } + + n.lbServiceID = lbService.Id + return nil +} + +func (n *nsxtLB) GetLoadBalancer(ctx context.Context, clusterName string, service *v1.Service) (status *v1.LoadBalancerStatus, exists bool, err error) { + lbName := n.GetLoadBalancerName(ctx, clusterName, service) + + virtualServers, err := n.listLoadBalancerVirtualServers() + if err != nil { + return nil, false, err + } + + for _, virtualServer := range virtualServers.Results { + if virtualServer.DisplayName != lbName { + continue + } + + return &v1.LoadBalancerStatus{ + Ingress: []v1.LoadBalancerIngress{ + { + IP: virtualServer.IpAddress, + }, + }, + }, true, nil + + } + + return nil, false, nil +} + +func (n *nsxtLB) GetLoadBalancerName(ctx context.Context, clusterName string, service *v1.Service) string { + // NSX-T LB name is in the format --. + // The UUID in the end is the ensure LB names are unique across clusters + return fmt.Sprintf("%s-%s-%s", service.Namespace, service.Name, service.UID[:5]) +} + +func (n *nsxtLB) EnsureLoadBalancer(ctx context.Context, clusterName string, service *v1.Service, nodes []*v1.Node) (*v1.LoadBalancerStatus, error) { + lbName := n.GetLoadBalancerName(ctx, clusterName, service) + + virtualServer, exists, err := n.getVirtualServerByName(lbName) + if err != nil { + return nil, err + } + + if exists { + return &v1.LoadBalancerStatus{ + Ingress: []v1.LoadBalancerIngress{ + { + IP: virtualServer.IpAddress, + }, + }, + }, nil + + } + + // TODO: do actual IPAM allocation to get virtual server IP + // For now use cluster IP + + // we can re-use the same LB pool members since we can rely on DefaultPoolMemberPort to indicate the node port + // as the target port for each VirtualServer + var lbMembers []loadbalancer.PoolMember + for _, node := range nodes { + // TODO: don't always assume InternalIP from node addresses + ip := getInternalIP(node) + if ip == "" { + klog.Warningf("node %s has no addresses assigned", node.Name) + continue + } + + member := loadbalancer.PoolMember{ + DisplayName: node.Name, + Weight: 1, + IpAddress: ip, + } + + lbMembers = append(lbMembers, member) + } + + // allocate VIP from pool provided in config + allocation, _, err := n.client.PoolManagementApi.AllocateOrReleaseFromIpPool(n.client.Context, n.vipPoolID, manager.AllocationIpAddress{}, "ALLOCATE") + if err != nil { + return nil, err + } + + vip := allocation.AllocationId + + success := false + defer func() { + if success { + return + } + + // release VIP from pool if load balancer was not created successfully + _, _, err := n.client.PoolManagementApi.AllocateOrReleaseFromIpPool(n.client.Context, n.vipPoolID, + manager.AllocationIpAddress{AllocationId: vip}, "RELEASE") + if err != nil { + klog.Errorf("error releasing VIP %s after load balancer failed to provision", vip) + } + + }() + + var newVirtualServerIDs []string + + // Create a new virtual server per port in the Service since LB pools only support single ports + // and each Service Port has a dedicated node port + for _, port := range service.Spec.Ports { + // create a pool ID using this port's node port + lbPool := loadbalancer.LbPool{ + // TODO: make this configurable + Algorithm: "ROUND_ROBIN", + DisplayName: fmt.Sprintf("%s-port-%d", lbName, port.Port), + Description: fmt.Sprintf("LoadBalancer Pool managed by Kubernetes vSphere Cloud Provider (%s)", n.clusterID), + Members: lbMembers, + MinActiveMembers: 1, + } + + lbPool, _, err := n.client.ServicesApi.CreateLoadBalancerPool(n.client.Context, lbPool) + if err != nil { + return nil, err + } + + // virtual server doesn't exist.. update node pools? or should that only happen on update? + virtualServer := loadbalancer.LbVirtualServer{ + DisplayName: fmt.Sprintf("%s-port-%d", lbName, port.Port), + Description: fmt.Sprintf("LoadBalancer VirtualServer managed by Kubernetes vSphere Cloud Provider (%s)", n.clusterID), + IpProtocol: string(port.Protocol), + DefaultPoolMemberPort: strconv.Itoa(int(port.NodePort)), + IpAddress: vip, + Ports: []string{strconv.Itoa(int(port.Port))}, + Enabled: true, + PoolId: lbPool.Id, + } + + virtualServer, _, err = n.client.ServicesApi.CreateLoadBalancerVirtualServer(n.client.Context, virtualServer) + if err != nil { + return nil, err + } + + newVirtualServerIDs = append(newVirtualServerIDs, virtualServer.Id) + } + + success = true + + err = n.AddVirtualServersToLoadBalancer(newVirtualServerIDs) + if err != nil { + return nil, err + } + + return &v1.LoadBalancerStatus{ + Ingress: []v1.LoadBalancerIngress{ + { + IP: vip, + }, + }, + }, nil +} + +func (n *nsxtLB) UpdateLoadBalancer(ctx context.Context, clusterName string, service *v1.Service, nodes []*v1.Node) error { + lbName := n.GetLoadBalancerName(ctx, clusterName, service) + + // we can re-use the same LB pool members since we can rely on DefaultPoolMemberPort to indicate the node port + // as the target port for each VirtualServer + var lbMembers []loadbalancer.PoolMember + for _, node := range nodes { + // TODO: don't always assume InternalIP from node addresses + ip := getInternalIP(node) + if ip == "" { + klog.Warningf("node %s has no addresses assigned", node.Name) + continue + } + + member := loadbalancer.PoolMember{ + DisplayName: node.Name, + Weight: 1, + IpAddress: ip, + } + + lbMembers = append(lbMembers, member) + } + + // Create a new virtual server per port in the Service since LB pools only support single ports + // and each Service Port has a dedicated node port + for _, port := range service.Spec.Ports { + poolName := generatePoolName(lbName, int(port.Port)) + lbPool, exists, err := n.getLBPoolByName(poolName) + if err != nil { + return err + } + + if !exists { + return fmt.Errorf("error updating LB pool %s because it doesn't exist", poolName) + } + + lbPoolID := lbPool.Id + + // create a pool ID using this port's node port + lbPool = loadbalancer.LbPool{ + // TODO: make this configurable + Algorithm: "ROUND_ROBIN", + DisplayName: fmt.Sprintf("%s-port-%d", lbName, port.Port), + Description: fmt.Sprintf("LoadBalancer Pool managed by Kubernetes vSphere Cloud Provider (%s)", n.clusterID), + Members: lbMembers, + MinActiveMembers: 1, + } + + lbPool, _, err = n.client.ServicesApi.UpdateLoadBalancerPool(n.client.Context, lbPoolID, lbPool) + if err != nil { + return err + } + + virtualServerName := generateVirtualServerName(lbName, int(port.Port)) + virtualServer, exists, err := n.getVirtualServerByName(virtualServerName) + if err != nil { + return err + } + + if !exists { + return fmt.Errorf("error updating Virtual Server %s because it doesn't exist", virtualServerName) + } + + virtualServerID := virtualServer.Id + + // virtual server doesn't exist.. update node pools? or should that only happen on update? + virtualServer = loadbalancer.LbVirtualServer{ + DisplayName: fmt.Sprintf("%s-port-%d", lbName, port.Port), + Description: fmt.Sprintf("LoadBalancer VirtualServer managed by Kubernetes vSphere Cloud Provider (%s)", n.clusterID), + IpProtocol: string(port.Protocol), + DefaultPoolMemberPort: strconv.Itoa(int(port.NodePort)), + IpAddress: virtualServer.IpAddress, + Ports: []string{strconv.Itoa(int(port.Port))}, + Enabled: true, + PoolId: lbPool.Id, + } + + virtualServer, _, err = n.client.ServicesApi.UpdateLoadBalancerVirtualServer(n.client.Context, virtualServerID, virtualServer) + if err != nil { + return err + } + } + + return nil +} + +func (n *nsxtLB) EnsureLoadBalancerDeleted(ctx context.Context, clusterName string, service *v1.Service) error { + lbName := n.GetLoadBalancerName(ctx, clusterName, service) + + // Create a new virtual server per port in the Service since LB pools only support single ports + // and each Service Port has a dedicated node port + for _, port := range service.Spec.Ports { + poolName := generatePoolName(lbName, int(port.Port)) + lbPool, exists, err := n.getLBPoolByName(poolName) + if err != nil { + return err + } + + if exists { + _, err := n.client.ServicesApi.DeleteLoadBalancerPool(n.client.Context, lbPool.Id) + if err != nil { + return err + } + } + + virtualServerName := generateVirtualServerName(lbName, int(port.Port)) + virtualServer, exists, err := n.getVirtualServerByName(virtualServerName) + if err != nil { + return err + } + + if exists { + _, err := n.client.ServicesApi.DeleteLoadBalancerVirtualServer(n.client.Context, virtualServer.Id, nil) + if err != nil { + return err + } + } + } + + return nil +} + +func (n *nsxtLB) loadBalancerServiceName() string { + return fmt.Sprintf("kubernetes-cpi-vsphere-%s", n.clusterID) +} + +func generatePoolName(lbName string, port int) string { + return fmt.Sprintf("%s-port-%d", lbName, port) +} + +func generateVirtualServerName(lbName string, port int) string { + return fmt.Sprintf("%s-port-%d", lbName, port) +} + +func (n *nsxtLB) AddVirtualServersToLoadBalancer(virtualServerIDs []string) error { + // first read load balancer service + lbService, _, err := n.client.ServicesApi.ReadLoadBalancerService(n.client.Context, n.lbServiceID) + if err != nil { + return err + } + + newVirtualServerIDs := append(lbService.VirtualServerIds, virtualServerIDs...) + lbService.VirtualServerIds = newVirtualServerIDs + + _, _, err = n.client.ServicesApi.UpdateLoadBalancerService(n.client.Context, lbService.Id, lbService) + return err +} + +func (n *nsxtLB) getLBServiceByName(name string) (loadbalancer.LbService, bool, error) { + lbs, err := n.listLoadBalancerServices() + if err != nil { + return loadbalancer.LbService{}, false, err + } + + for _, lbSvc := range lbs.Results { + if lbSvc.DisplayName != name { + continue + } + + return lbSvc, true, nil + } + + return loadbalancer.LbService{}, false, nil +} + +func (n *nsxtLB) getVirtualServerByName(name string) (loadbalancer.LbVirtualServer, bool, error) { + virtualServers, err := n.listLoadBalancerVirtualServers() + if err != nil { + return loadbalancer.LbVirtualServer{}, false, err + } + + for _, virtualServer := range virtualServers.Results { + if virtualServer.DisplayName != name { + continue + } + + return virtualServer, true, nil + } + + return loadbalancer.LbVirtualServer{}, false, nil +} + +func (n *nsxtLB) getLBPoolByName(name string) (loadbalancer.LbPool, bool, error) { + lbPools, err := n.listLoadBalancerPool() + if err != nil { + return loadbalancer.LbPool{}, false, err + } + + for _, lbPool := range lbPools.Results { + if lbPool.DisplayName != name { + continue + } + + return lbPool, true, nil + } + + return loadbalancer.LbPool{}, false, nil +} + +func getInternalIP(node *v1.Node) string { + for _, address := range node.Status.Addresses { + if address.Type != v1.NodeInternalIP { + continue + } + + return address.Address + } + + return "" +} + +// ListLoadBalancerVirtualServers represents the http response from list load balancer virtual server request +// TODO: remove when NSX-T client adds ListLoadBalancer* methods +type ListLoadBalancerVirtualServers struct { + ResultCount int `json:"result_count"` + Results []loadbalancer.LbVirtualServer `json:"results"` +} + +// listLoadBalancerVirtualServers makes an http request for listing load balancer virtual servers +// TODO: remove this once the go-vmware-nsxt client supports this call +func (n *nsxtLB) listLoadBalancerVirtualServers() (ListLoadBalancerVirtualServers, error) { + // set default transport to skip verifiy + tr := &http.Transport{ + TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, + } + client := &http.Client{Transport: tr} + + url := "https://" + n.server + "/api/v1/loadbalancer/virtual-servers" + req, err := http.NewRequest("GET", url, nil) + if err != nil { + return ListLoadBalancerVirtualServers{}, err + } + + req.SetBasicAuth(n.username, n.password) + resp, err := client.Do(req) + if err != nil { + return ListLoadBalancerVirtualServers{}, err + } + + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + return ListLoadBalancerVirtualServers{}, err + } + + var results ListLoadBalancerVirtualServers + err = json.Unmarshal(body, &results) + if err != nil { + return ListLoadBalancerVirtualServers{}, err + } + + return results, nil +} + +// ListLoadBalancerService represents the http response from list load balancer service request +// TODO: remove when NSX-T client adds ListLoadBalancer* methods +type ListLoadBalancerService struct { + ResultCount int `json:"result_count"` + Results []loadbalancer.LbService `json:"results"` +} + +// listLoadBalancers makes an http request for listing load balancer services +// TODO: remove this once the go-vmware-nsxt client supports this call +func (n *nsxtLB) listLoadBalancerServices() (ListLoadBalancerService, error) { + // set default transport to skip verifiy + tr := &http.Transport{ + TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, + } + client := &http.Client{Transport: tr} + + url := "https://" + n.server + "/api/v1/loadbalancer/services" + req, err := http.NewRequest("GET", url, nil) + if err != nil { + return ListLoadBalancerService{}, err + } + + req.SetBasicAuth(n.username, n.password) + resp, err := client.Do(req) + if err != nil { + return ListLoadBalancerService{}, err + } + + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + return ListLoadBalancerService{}, err + } + + var results ListLoadBalancerService + err = json.Unmarshal(body, &results) + if err != nil { + return ListLoadBalancerService{}, err + } + + return results, nil +} + +// ListLoadBalancerPool represents the http response from list load balancer pools request +// TODO: remove when NSX-T client adds ListLoadBalancer* methods +type ListLoadBalancerPool struct { + ResultCount int `json:"result_count"` + Results []loadbalancer.LbPool `json:"results"` +} + +// listLoadBalancerPool makes an http request for listing load balancer pools +// TODO: remove this once the go-vmware-nsxt client supports this call +func (n *nsxtLB) listLoadBalancerPool() (ListLoadBalancerPool, error) { + // set default transport to skip verifiy + tr := &http.Transport{ + TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, + } + client := &http.Client{Transport: tr} + + url := "https://" + n.server + "/api/v1/loadbalancer/pools" + req, err := http.NewRequest("GET", url, nil) + if err != nil { + return ListLoadBalancerPool{}, err + } + + req.SetBasicAuth(n.username, n.password) + resp, err := client.Do(req) + if err != nil { + return ListLoadBalancerPool{}, err + } + + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + return ListLoadBalancerPool{}, err + } + + var results ListLoadBalancerPool + err = json.Unmarshal(body, &results) + if err != nil { + return ListLoadBalancerPool{}, err + } + + return results, nil +} diff --git a/pkg/cloudprovider/vsphere/types.go b/pkg/cloudprovider/vsphere/types.go index aaa3e4319e..0e27f604e7 100644 --- a/pkg/cloudprovider/vsphere/types.go +++ b/pkg/cloudprovider/vsphere/types.go @@ -41,6 +41,7 @@ type VSphere struct { nodeManager *NodeManager informMgr *k8s.InformerManager instances cloudprovider.Instances + loadbalancer cloudprovider.LoadBalancer zones cloudprovider.Zones server GRPCServer } diff --git a/pkg/common/config/types.go b/pkg/common/config/types.go index fca50dd808..c6c434fb35 100644 --- a/pkg/common/config/types.go +++ b/pkg/common/config/types.go @@ -19,6 +19,9 @@ package config // Config is used to read and store information from the cloud configuration file type Config struct { Global struct { + // ClusterID is the unique identifer used for a cluster + // Required if NSX-T load balancer support is enabled + ClusterID string `gcfg:"cluster-id"` // vCenter username. User string `gcfg:"user"` // vCenter password in clear text. @@ -69,6 +72,11 @@ type Config struct { Zone string `gcfg:"zone"` Region string `gcfg:"region"` } + + // Loadbalancer_NSX_T contains configuration for enabling NSX-T based loadbalancer support for + // Services Type=LoadBalancer resources + // TODO: mark this as experimental + LoadbalancerNSXT *LoadbalancerNSXT } // VirtualCenterConfig contains information used to access a remote vCenter @@ -114,3 +122,14 @@ type VirtualCenterConfig struct { // IPFamilyPriority (intentionally not exposed via the config) the list/priority of IP versions IPFamilyPriority []string } + +// LoadbalancerNSXT contains configuration for enabling NSX-T based loadbalancer support for +// Services Type=LoadBalancer resources +type LoadbalancerNSXT struct { + Server string `gcfg:"server"` + User string `gcfg:"user"` + Password string `gcfg:"password"` + Insecure bool `gcfg:"insecure-flag"` + Tier1RouterID string `gcfg:"tier1-router-id"` + VIPPoolID string `gcfg:"vip-pool-id"` +} From e67bd13ad53339827de49d1e78f114f61424e282 Mon Sep 17 00:00:00 2001 From: Andrew Sy Kim Date: Sun, 3 Nov 2019 14:35:32 -0500 Subject: [PATCH 2/3] add loadbalancer_utils.go file Signed-off-by: Andrew Sy Kim --- pkg/cloudprovider/vsphere/loadbalancer.go | 222 +--------------- .../vsphere/loadbalancer_utils.go | 244 ++++++++++++++++++ 2 files changed, 247 insertions(+), 219 deletions(-) create mode 100644 pkg/cloudprovider/vsphere/loadbalancer_utils.go diff --git a/pkg/cloudprovider/vsphere/loadbalancer.go b/pkg/cloudprovider/vsphere/loadbalancer.go index 24894461f6..f2184edab8 100644 --- a/pkg/cloudprovider/vsphere/loadbalancer.go +++ b/pkg/cloudprovider/vsphere/loadbalancer.go @@ -18,10 +18,7 @@ package vsphere import ( "context" - "crypto/tls" - "encoding/json" "fmt" - "io/ioutil" "net/http" "strconv" @@ -140,6 +137,7 @@ func (n *nsxtLB) GetLoadBalancer(ctx context.Context, clusterName string, servic return nil, false, err } + // this is wrong, the lb name doesn't actually match the virtual server name, we need to check by port name for _, virtualServer := range virtualServers.Results { if virtualServer.DisplayName != lbName { continue @@ -167,6 +165,7 @@ func (n *nsxtLB) GetLoadBalancerName(ctx context.Context, clusterName string, se func (n *nsxtLB) EnsureLoadBalancer(ctx context.Context, clusterName string, service *v1.Service, nodes []*v1.Node) (*v1.LoadBalancerStatus, error) { lbName := n.GetLoadBalancerName(ctx, clusterName, service) + // this is wrong, actually need to get VirtualServer per port virtualServer, exists, err := n.getVirtualServerByName(lbName) if err != nil { return nil, err @@ -271,7 +270,7 @@ func (n *nsxtLB) EnsureLoadBalancer(ctx context.Context, clusterName string, ser success = true - err = n.AddVirtualServersToLoadBalancer(newVirtualServerIDs) + err = n.addVirtualServersToLoadBalancer(newVirtualServerIDs) if err != nil { return nil, err } @@ -406,218 +405,3 @@ func (n *nsxtLB) EnsureLoadBalancerDeleted(ctx context.Context, clusterName stri return nil } - -func (n *nsxtLB) loadBalancerServiceName() string { - return fmt.Sprintf("kubernetes-cpi-vsphere-%s", n.clusterID) -} - -func generatePoolName(lbName string, port int) string { - return fmt.Sprintf("%s-port-%d", lbName, port) -} - -func generateVirtualServerName(lbName string, port int) string { - return fmt.Sprintf("%s-port-%d", lbName, port) -} - -func (n *nsxtLB) AddVirtualServersToLoadBalancer(virtualServerIDs []string) error { - // first read load balancer service - lbService, _, err := n.client.ServicesApi.ReadLoadBalancerService(n.client.Context, n.lbServiceID) - if err != nil { - return err - } - - newVirtualServerIDs := append(lbService.VirtualServerIds, virtualServerIDs...) - lbService.VirtualServerIds = newVirtualServerIDs - - _, _, err = n.client.ServicesApi.UpdateLoadBalancerService(n.client.Context, lbService.Id, lbService) - return err -} - -func (n *nsxtLB) getLBServiceByName(name string) (loadbalancer.LbService, bool, error) { - lbs, err := n.listLoadBalancerServices() - if err != nil { - return loadbalancer.LbService{}, false, err - } - - for _, lbSvc := range lbs.Results { - if lbSvc.DisplayName != name { - continue - } - - return lbSvc, true, nil - } - - return loadbalancer.LbService{}, false, nil -} - -func (n *nsxtLB) getVirtualServerByName(name string) (loadbalancer.LbVirtualServer, bool, error) { - virtualServers, err := n.listLoadBalancerVirtualServers() - if err != nil { - return loadbalancer.LbVirtualServer{}, false, err - } - - for _, virtualServer := range virtualServers.Results { - if virtualServer.DisplayName != name { - continue - } - - return virtualServer, true, nil - } - - return loadbalancer.LbVirtualServer{}, false, nil -} - -func (n *nsxtLB) getLBPoolByName(name string) (loadbalancer.LbPool, bool, error) { - lbPools, err := n.listLoadBalancerPool() - if err != nil { - return loadbalancer.LbPool{}, false, err - } - - for _, lbPool := range lbPools.Results { - if lbPool.DisplayName != name { - continue - } - - return lbPool, true, nil - } - - return loadbalancer.LbPool{}, false, nil -} - -func getInternalIP(node *v1.Node) string { - for _, address := range node.Status.Addresses { - if address.Type != v1.NodeInternalIP { - continue - } - - return address.Address - } - - return "" -} - -// ListLoadBalancerVirtualServers represents the http response from list load balancer virtual server request -// TODO: remove when NSX-T client adds ListLoadBalancer* methods -type ListLoadBalancerVirtualServers struct { - ResultCount int `json:"result_count"` - Results []loadbalancer.LbVirtualServer `json:"results"` -} - -// listLoadBalancerVirtualServers makes an http request for listing load balancer virtual servers -// TODO: remove this once the go-vmware-nsxt client supports this call -func (n *nsxtLB) listLoadBalancerVirtualServers() (ListLoadBalancerVirtualServers, error) { - // set default transport to skip verifiy - tr := &http.Transport{ - TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, - } - client := &http.Client{Transport: tr} - - url := "https://" + n.server + "/api/v1/loadbalancer/virtual-servers" - req, err := http.NewRequest("GET", url, nil) - if err != nil { - return ListLoadBalancerVirtualServers{}, err - } - - req.SetBasicAuth(n.username, n.password) - resp, err := client.Do(req) - if err != nil { - return ListLoadBalancerVirtualServers{}, err - } - - body, err := ioutil.ReadAll(resp.Body) - if err != nil { - return ListLoadBalancerVirtualServers{}, err - } - - var results ListLoadBalancerVirtualServers - err = json.Unmarshal(body, &results) - if err != nil { - return ListLoadBalancerVirtualServers{}, err - } - - return results, nil -} - -// ListLoadBalancerService represents the http response from list load balancer service request -// TODO: remove when NSX-T client adds ListLoadBalancer* methods -type ListLoadBalancerService struct { - ResultCount int `json:"result_count"` - Results []loadbalancer.LbService `json:"results"` -} - -// listLoadBalancers makes an http request for listing load balancer services -// TODO: remove this once the go-vmware-nsxt client supports this call -func (n *nsxtLB) listLoadBalancerServices() (ListLoadBalancerService, error) { - // set default transport to skip verifiy - tr := &http.Transport{ - TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, - } - client := &http.Client{Transport: tr} - - url := "https://" + n.server + "/api/v1/loadbalancer/services" - req, err := http.NewRequest("GET", url, nil) - if err != nil { - return ListLoadBalancerService{}, err - } - - req.SetBasicAuth(n.username, n.password) - resp, err := client.Do(req) - if err != nil { - return ListLoadBalancerService{}, err - } - - body, err := ioutil.ReadAll(resp.Body) - if err != nil { - return ListLoadBalancerService{}, err - } - - var results ListLoadBalancerService - err = json.Unmarshal(body, &results) - if err != nil { - return ListLoadBalancerService{}, err - } - - return results, nil -} - -// ListLoadBalancerPool represents the http response from list load balancer pools request -// TODO: remove when NSX-T client adds ListLoadBalancer* methods -type ListLoadBalancerPool struct { - ResultCount int `json:"result_count"` - Results []loadbalancer.LbPool `json:"results"` -} - -// listLoadBalancerPool makes an http request for listing load balancer pools -// TODO: remove this once the go-vmware-nsxt client supports this call -func (n *nsxtLB) listLoadBalancerPool() (ListLoadBalancerPool, error) { - // set default transport to skip verifiy - tr := &http.Transport{ - TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, - } - client := &http.Client{Transport: tr} - - url := "https://" + n.server + "/api/v1/loadbalancer/pools" - req, err := http.NewRequest("GET", url, nil) - if err != nil { - return ListLoadBalancerPool{}, err - } - - req.SetBasicAuth(n.username, n.password) - resp, err := client.Do(req) - if err != nil { - return ListLoadBalancerPool{}, err - } - - body, err := ioutil.ReadAll(resp.Body) - if err != nil { - return ListLoadBalancerPool{}, err - } - - var results ListLoadBalancerPool - err = json.Unmarshal(body, &results) - if err != nil { - return ListLoadBalancerPool{}, err - } - - return results, nil -} diff --git a/pkg/cloudprovider/vsphere/loadbalancer_utils.go b/pkg/cloudprovider/vsphere/loadbalancer_utils.go new file mode 100644 index 0000000000..31d432410f --- /dev/null +++ b/pkg/cloudprovider/vsphere/loadbalancer_utils.go @@ -0,0 +1,244 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package vsphere + +import ( + "crypto/tls" + "encoding/json" + "fmt" + "io/ioutil" + "net/http" + + v1 "k8s.io/api/core/v1" + + "github.com/vmware/go-vmware-nsxt/loadbalancer" +) + +func generatePoolName(lbName string, port int) string { + return fmt.Sprintf("%s-port-%d", lbName, port) +} + +func generateVirtualServerName(lbName string, port int) string { + return fmt.Sprintf("%s-port-%d", lbName, port) +} + +func (n *nsxtLB) loadBalancerServiceName() string { + return fmt.Sprintf("kubernetes-cpi-vsphere-%s", n.clusterID) +} + +func (n *nsxtLB) addVirtualServersToLoadBalancer(virtualServerIDs []string) error { + // first read load balancer service + lbService, _, err := n.client.ServicesApi.ReadLoadBalancerService(n.client.Context, n.lbServiceID) + if err != nil { + return err + } + + newVirtualServerIDs := append(lbService.VirtualServerIds, virtualServerIDs...) + lbService.VirtualServerIds = newVirtualServerIDs + + _, _, err = n.client.ServicesApi.UpdateLoadBalancerService(n.client.Context, lbService.Id, lbService) + return err +} + +func (n *nsxtLB) getLBServiceByName(name string) (loadbalancer.LbService, bool, error) { + lbs, err := n.listLoadBalancerServices() + if err != nil { + return loadbalancer.LbService{}, false, err + } + + for _, lbSvc := range lbs.Results { + if lbSvc.DisplayName != name { + continue + } + + return lbSvc, true, nil + } + + return loadbalancer.LbService{}, false, nil +} + +func (n *nsxtLB) getVirtualServerByName(name string) (loadbalancer.LbVirtualServer, bool, error) { + virtualServers, err := n.listLoadBalancerVirtualServers() + if err != nil { + return loadbalancer.LbVirtualServer{}, false, err + } + + for _, virtualServer := range virtualServers.Results { + if virtualServer.DisplayName != name { + continue + } + + return virtualServer, true, nil + } + + return loadbalancer.LbVirtualServer{}, false, nil +} + +func (n *nsxtLB) getLBPoolByName(name string) (loadbalancer.LbPool, bool, error) { + lbPools, err := n.listLoadBalancerPool() + if err != nil { + return loadbalancer.LbPool{}, false, err + } + + for _, lbPool := range lbPools.Results { + if lbPool.DisplayName != name { + continue + } + + return lbPool, true, nil + } + + return loadbalancer.LbPool{}, false, nil +} + +func getInternalIP(node *v1.Node) string { + for _, address := range node.Status.Addresses { + if address.Type != v1.NodeInternalIP { + continue + } + + return address.Address + } + + return "" +} + +// ListLoadBalancerVirtualServers represents the http response from list load balancer virtual server request +// TODO: remove when NSX-T client adds ListLoadBalancer* methods +type ListLoadBalancerVirtualServers struct { + ResultCount int `json:"result_count"` + Results []loadbalancer.LbVirtualServer `json:"results"` +} + +// listLoadBalancerVirtualServers makes an http request for listing load balancer virtual servers +// TODO: remove this once the go-vmware-nsxt client supports this call +func (n *nsxtLB) listLoadBalancerVirtualServers() (ListLoadBalancerVirtualServers, error) { + // set default transport to skip verifiy + tr := &http.Transport{ + TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, + } + client := &http.Client{Transport: tr} + + url := "https://" + n.server + "/api/v1/loadbalancer/virtual-servers" + req, err := http.NewRequest("GET", url, nil) + if err != nil { + return ListLoadBalancerVirtualServers{}, err + } + + req.SetBasicAuth(n.username, n.password) + resp, err := client.Do(req) + if err != nil { + return ListLoadBalancerVirtualServers{}, err + } + + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + return ListLoadBalancerVirtualServers{}, err + } + + var results ListLoadBalancerVirtualServers + err = json.Unmarshal(body, &results) + if err != nil { + return ListLoadBalancerVirtualServers{}, err + } + + return results, nil +} + +// ListLoadBalancerService represents the http response from list load balancer service request +// TODO: remove when NSX-T client adds ListLoadBalancer* methods +type ListLoadBalancerService struct { + ResultCount int `json:"result_count"` + Results []loadbalancer.LbService `json:"results"` +} + +// listLoadBalancers makes an http request for listing load balancer services +// TODO: remove this once the go-vmware-nsxt client supports this call +func (n *nsxtLB) listLoadBalancerServices() (ListLoadBalancerService, error) { + // set default transport to skip verifiy + tr := &http.Transport{ + TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, + } + client := &http.Client{Transport: tr} + + url := "https://" + n.server + "/api/v1/loadbalancer/services" + req, err := http.NewRequest("GET", url, nil) + if err != nil { + return ListLoadBalancerService{}, err + } + + req.SetBasicAuth(n.username, n.password) + resp, err := client.Do(req) + if err != nil { + return ListLoadBalancerService{}, err + } + + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + return ListLoadBalancerService{}, err + } + + var results ListLoadBalancerService + err = json.Unmarshal(body, &results) + if err != nil { + return ListLoadBalancerService{}, err + } + + return results, nil +} + +// ListLoadBalancerPool represents the http response from list load balancer pools request +// TODO: remove when NSX-T client adds ListLoadBalancer* methods +type ListLoadBalancerPool struct { + ResultCount int `json:"result_count"` + Results []loadbalancer.LbPool `json:"results"` +} + +// listLoadBalancerPool makes an http request for listing load balancer pools +// TODO: remove this once the go-vmware-nsxt client supports this call +func (n *nsxtLB) listLoadBalancerPool() (ListLoadBalancerPool, error) { + // set default transport to skip verifiy + tr := &http.Transport{ + TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, + } + client := &http.Client{Transport: tr} + + url := "https://" + n.server + "/api/v1/loadbalancer/pools" + req, err := http.NewRequest("GET", url, nil) + if err != nil { + return ListLoadBalancerPool{}, err + } + + req.SetBasicAuth(n.username, n.password) + resp, err := client.Do(req) + if err != nil { + return ListLoadBalancerPool{}, err + } + + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + return ListLoadBalancerPool{}, err + } + + var results ListLoadBalancerPool + err = json.Unmarshal(body, &results) + if err != nil { + return ListLoadBalancerPool{}, err + } + + return results, nil +} From d88fa991aa4de1b341de391b98c8dfca0a6c74ec Mon Sep 17 00:00:00 2001 From: Andrew Sy Kim Date: Wed, 6 Nov 2019 12:50:39 -0500 Subject: [PATCH 3/3] refactor so that each Service Port gets a virutal serer Signed-off-by: Andrew Sy Kim --- pkg/cloudprovider/vsphere/loadbalancer.go | 356 +++++++----------- .../vsphere/loadbalancer_utils.go | 150 ++++++-- pkg/common/config/types.go | 12 +- 3 files changed, 269 insertions(+), 249 deletions(-) diff --git a/pkg/cloudprovider/vsphere/loadbalancer.go b/pkg/cloudprovider/vsphere/loadbalancer.go index f2184edab8..e1f9667767 100644 --- a/pkg/cloudprovider/vsphere/loadbalancer.go +++ b/pkg/cloudprovider/vsphere/loadbalancer.go @@ -28,7 +28,6 @@ import ( "k8s.io/klog" nsxt "github.com/vmware/go-vmware-nsxt" - "github.com/vmware/go-vmware-nsxt/common" "github.com/vmware/go-vmware-nsxt/loadbalancer" "github.com/vmware/go-vmware-nsxt/manager" ) @@ -38,8 +37,6 @@ type nsxtLB struct { client *nsxt.APIClient // clusterID is a unique identifier of the cluster clusterID string - // routerID is the ID of the NSX-T tier1 router used for creating LoadBalancers - routerID string // lbServiceID is the ID of the NSX-T LoadBalancer Service where virtual servers // for Service Type=LoadBalancer are created lbServiceID string @@ -72,10 +69,10 @@ func newNSXTLoadBalancer(clusterID string, cfg *vcfg.LoadbalancerNSXT) (cloudpro // TODO: validate config values return &nsxtLB{ - client: nsxClient, - routerID: cfg.Tier1RouterID, - clusterID: clusterID, - vipPoolID: cfg.VIPPoolID, + client: nsxClient, + clusterID: clusterID, + lbServiceID: cfg.LBServiceID, + vipPoolID: cfg.VIPPoolID, // only required for raw http requests server: cfg.Server, @@ -86,74 +83,44 @@ func newNSXTLoadBalancer(clusterID string, cfg *vcfg.LoadbalancerNSXT) (cloudpro } func (n *nsxtLB) Initialize() error { - // first look for the Tier1 router by ID provided in config - router, resp, err := n.client.LogicalRoutingAndServicesApi.ReadLogicalRouter(n.client.Context, n.routerID) + _, resp, err := n.client.ServicesApi.ReadLoadBalancerService(n.client.Context, n.lbServiceID) if resp.StatusCode == http.StatusNotFound { - return fmt.Errorf("NSX-T logical router with router ID %q not found", n.routerID) + return fmt.Errorf("load balancer service with ID %s does not exist", n.lbServiceID) } if err != nil { - return err - } - - // then check for LB Service by cluster ID, if it already exists, we're good to go - // if it doesn't exist, create one now - lbServiceName := n.loadBalancerServiceName() - lbService, exists, err := n.getLBServiceByName(lbServiceName) - if err != nil { - return err - } - - if exists { - n.lbServiceID = lbService.Id - return nil - } - - lbService = loadbalancer.LbService{ - DisplayName: lbServiceName, - Description: fmt.Sprintf("LoadBalancer Service managed by Kubernetes vSphere Cloud Provider (%s)", n.clusterID), - Enabled: true, - Size: "SMALL", // TODO: this should be configurable in the config? - Attachment: &common.ResourceReference{ - TargetType: router.ResourceType, - TargetId: router.Id, - }, - } - - lbService, _, err = n.client.ServicesApi.CreateLoadBalancerService(n.client.Context, lbService) - if err != nil { - return err + return fmt.Errorf("error looking for load balancer service with ID %s: %v", n.lbServiceID, err) } - n.lbServiceID = lbService.Id return nil } func (n *nsxtLB) GetLoadBalancer(ctx context.Context, clusterName string, service *v1.Service) (status *v1.LoadBalancerStatus, exists bool, err error) { - lbName := n.GetLoadBalancerName(ctx, clusterName, service) - - virtualServers, err := n.listLoadBalancerVirtualServers() + virtualServers, err := n.getVirtualServers(service) if err != nil { return nil, false, err } - // this is wrong, the lb name doesn't actually match the virtual server name, we need to check by port name - for _, virtualServer := range virtualServers.Results { - if virtualServer.DisplayName != lbName { - continue - } - - return &v1.LoadBalancerStatus{ - Ingress: []v1.LoadBalancerIngress{ - { - IP: virtualServer.IpAddress, - }, - }, - }, true, nil + if len(virtualServers) == 0 { + return nil, false, nil + } + // get unique IPs + ips := n.getUniqueIPsFromVirtualServers(virtualServers) + if len(ips) == 0 { + return nil, false, fmt.Errorf("error getting unique IPs of virtual servers for service %s", service.Name) + } + if len(ips) > 1 { + return nil, false, fmt.Errorf("more than virtual IP was associated with service %s", service.Name) } - return nil, false, nil + return &v1.LoadBalancerStatus{ + Ingress: []v1.LoadBalancerIngress{ + { + IP: ips[0], + }, + }, + }, true, nil } func (n *nsxtLB) GetLoadBalancerName(ctx context.Context, clusterName string, service *v1.Service) string { @@ -165,116 +132,97 @@ func (n *nsxtLB) GetLoadBalancerName(ctx context.Context, clusterName string, se func (n *nsxtLB) EnsureLoadBalancer(ctx context.Context, clusterName string, service *v1.Service, nodes []*v1.Node) (*v1.LoadBalancerStatus, error) { lbName := n.GetLoadBalancerName(ctx, clusterName, service) - // this is wrong, actually need to get VirtualServer per port - virtualServer, exists, err := n.getVirtualServerByName(lbName) + virtualServers, err := n.getVirtualServers(service) if err != nil { return nil, err } - if exists { - return &v1.LoadBalancerStatus{ - Ingress: []v1.LoadBalancerIngress{ - { - IP: virtualServer.IpAddress, - }, - }, - }, nil - - } + var vip string + releaseAllocatedVIP := true - // TODO: do actual IPAM allocation to get virtual server IP - // For now use cluster IP - - // we can re-use the same LB pool members since we can rely on DefaultPoolMemberPort to indicate the node port - // as the target port for each VirtualServer - var lbMembers []loadbalancer.PoolMember - for _, node := range nodes { - // TODO: don't always assume InternalIP from node addresses - ip := getInternalIP(node) - if ip == "" { - klog.Warningf("node %s has no addresses assigned", node.Name) - continue + ips := n.getUniqueIPsFromVirtualServers(virtualServers) + if len(ips) == 0 { + allocation, _, err := n.client.PoolManagementApi.AllocateOrReleaseFromIpPool(n.client.Context, n.vipPoolID, + manager.AllocationIpAddress{}, "ALLOCATE") + if err != nil { + return nil, err } - member := loadbalancer.PoolMember{ - DisplayName: node.Name, - Weight: 1, - IpAddress: ip, - } + vip := allocation.AllocationId + defer func() { + if !releaseAllocatedVIP { + return + } - lbMembers = append(lbMembers, member) + // release VIP from pool if load balancer was not created successfully + _, _, err := n.client.PoolManagementApi.AllocateOrReleaseFromIpPool(n.client.Context, n.vipPoolID, + manager.AllocationIpAddress{AllocationId: vip}, "RELEASE") + if err != nil { + klog.Errorf("error releasing VIP %s after load balancer failed to provision", vip) + } + + }() + } else if len(ips) == 1 { + vip = ips[0] + } else { + return nil, fmt.Errorf("got more than 1 VIP for service %s", service.Name) } - // allocate VIP from pool provided in config - allocation, _, err := n.client.PoolManagementApi.AllocateOrReleaseFromIpPool(n.client.Context, n.vipPoolID, manager.AllocationIpAddress{}, "ALLOCATE") + lbMembers := n.nodesToLBMembers(nodes) + lbPool, err := n.createOrUpdateLBPool(lbName, lbMembers) if err != nil { return nil, err } - vip := allocation.AllocationId - - success := false - defer func() { - if success { - return - } - - // release VIP from pool if load balancer was not created successfully - _, _, err := n.client.PoolManagementApi.AllocateOrReleaseFromIpPool(n.client.Context, n.vipPoolID, - manager.AllocationIpAddress{AllocationId: vip}, "RELEASE") - if err != nil { - klog.Errorf("error releasing VIP %s after load balancer failed to provision", vip) - } - - }() - var newVirtualServerIDs []string - // Create a new virtual server per port in the Service since LB pools only support single ports // and each Service Port has a dedicated node port for _, port := range service.Spec.Ports { - // create a pool ID using this port's node port - lbPool := loadbalancer.LbPool{ - // TODO: make this configurable - Algorithm: "ROUND_ROBIN", - DisplayName: fmt.Sprintf("%s-port-%d", lbName, port.Port), - Description: fmt.Sprintf("LoadBalancer Pool managed by Kubernetes vSphere Cloud Provider (%s)", n.clusterID), - Members: lbMembers, - MinActiveMembers: 1, - } + virtualServerID := "" + virtualServerExists := false + for _, virtualServer := range virtualServers { + if virtualServer.DisplayName != generateVirtualServerName(lbName, port.Port) { + continue + } - lbPool, _, err := n.client.ServicesApi.CreateLoadBalancerPool(n.client.Context, lbPool) - if err != nil { - return nil, err + virtualServerExists = true + virtualServerID = virtualServer.Id + break } - // virtual server doesn't exist.. update node pools? or should that only happen on update? virtualServer := loadbalancer.LbVirtualServer{ - DisplayName: fmt.Sprintf("%s-port-%d", lbName, port.Port), - Description: fmt.Sprintf("LoadBalancer VirtualServer managed by Kubernetes vSphere Cloud Provider (%s)", n.clusterID), - IpProtocol: string(port.Protocol), - DefaultPoolMemberPort: strconv.Itoa(int(port.NodePort)), - IpAddress: vip, - Ports: []string{strconv.Itoa(int(port.Port))}, - Enabled: true, - PoolId: lbPool.Id, + DisplayName: generateVirtualServerName(lbName, port.Port), + Description: fmt.Sprintf("LoadBalancer VirtualServer managed by Kubernetes vSphere Cloud Provider (%s)", n.clusterID), + IpProtocol: string(port.Protocol), + DefaultPoolMemberPorts: []string{strconv.Itoa(int(port.NodePort))}, + IpAddress: vip, + Ports: []string{strconv.Itoa(int(port.Port))}, + Enabled: true, + PoolId: lbPool.Id, } - virtualServer, _, err = n.client.ServicesApi.CreateLoadBalancerVirtualServer(n.client.Context, virtualServer) - if err != nil { - return nil, err + if !virtualServerExists { + virtualServer, _, err = n.client.ServicesApi.CreateLoadBalancerVirtualServer(n.client.Context, virtualServer) + if err != nil { + return nil, err + } + } else { + virtualServer, _, err = n.client.ServicesApi.UpdateLoadBalancerVirtualServer(n.client.Context, virtualServerID, virtualServer) + if err != nil { + return nil, err + } + } newVirtualServerIDs = append(newVirtualServerIDs, virtualServer.Id) } - success = true - err = n.addVirtualServersToLoadBalancer(newVirtualServerIDs) if err != nil { return nil, err } + releaseAllocatedVIP = false return &v1.LoadBalancerStatus{ Ingress: []v1.LoadBalancerIngress{ { @@ -287,120 +235,102 @@ func (n *nsxtLB) EnsureLoadBalancer(ctx context.Context, clusterName string, ser func (n *nsxtLB) UpdateLoadBalancer(ctx context.Context, clusterName string, service *v1.Service, nodes []*v1.Node) error { lbName := n.GetLoadBalancerName(ctx, clusterName, service) - // we can re-use the same LB pool members since we can rely on DefaultPoolMemberPort to indicate the node port - // as the target port for each VirtualServer - var lbMembers []loadbalancer.PoolMember - for _, node := range nodes { - // TODO: don't always assume InternalIP from node addresses - ip := getInternalIP(node) - if ip == "" { - klog.Warningf("node %s has no addresses assigned", node.Name) - continue - } + virtualServers, err := n.getVirtualServers(service) + if err != nil { + return err + } - member := loadbalancer.PoolMember{ - DisplayName: node.Name, - Weight: 1, - IpAddress: ip, - } + ips := n.getUniqueIPsFromVirtualServers(virtualServers) + if len(ips) != 1 { + return fmt.Errorf("expected exactly 1 VIP for service %s, got %v", service.Name, ips) + } + + vip := ips[0] - lbMembers = append(lbMembers, member) + lbMembers := n.nodesToLBMembers(nodes) + lbPool, err := n.createOrUpdateLBPool(lbName, lbMembers) + if err != nil { + return err } + var newVirtualServerIDs []string // Create a new virtual server per port in the Service since LB pools only support single ports // and each Service Port has a dedicated node port for _, port := range service.Spec.Ports { - poolName := generatePoolName(lbName, int(port.Port)) - lbPool, exists, err := n.getLBPoolByName(poolName) - if err != nil { - return err + virtualServerID := "" + virtualServerExists := false + for _, virtualServer := range virtualServers { + if virtualServer.DisplayName != generateVirtualServerName(lbName, port.Port) { + continue + } + + virtualServerExists = true + virtualServerID = virtualServer.Id + break } - if !exists { - return fmt.Errorf("error updating LB pool %s because it doesn't exist", poolName) + virtualServer := loadbalancer.LbVirtualServer{ + DisplayName: generateVirtualServerName(lbName, port.Port), + Description: fmt.Sprintf("LoadBalancer VirtualServer managed by Kubernetes vSphere Cloud Provider (%s)", n.clusterID), + IpProtocol: string(port.Protocol), + DefaultPoolMemberPorts: []string{strconv.Itoa(int(port.NodePort))}, + IpAddress: vip, + Ports: []string{strconv.Itoa(int(port.Port))}, + Enabled: true, + PoolId: lbPool.Id, } - lbPoolID := lbPool.Id + if !virtualServerExists { + virtualServer, _, err = n.client.ServicesApi.CreateLoadBalancerVirtualServer(n.client.Context, virtualServer) + if err != nil { + return err + } + } else { + virtualServer, _, err = n.client.ServicesApi.UpdateLoadBalancerVirtualServer(n.client.Context, virtualServerID, virtualServer) + if err != nil { + return err + } - // create a pool ID using this port's node port - lbPool = loadbalancer.LbPool{ - // TODO: make this configurable - Algorithm: "ROUND_ROBIN", - DisplayName: fmt.Sprintf("%s-port-%d", lbName, port.Port), - Description: fmt.Sprintf("LoadBalancer Pool managed by Kubernetes vSphere Cloud Provider (%s)", n.clusterID), - Members: lbMembers, - MinActiveMembers: 1, } - lbPool, _, err = n.client.ServicesApi.UpdateLoadBalancerPool(n.client.Context, lbPoolID, lbPool) - if err != nil { - return err - } + newVirtualServerIDs = append(newVirtualServerIDs, virtualServer.Id) + } - virtualServerName := generateVirtualServerName(lbName, int(port.Port)) - virtualServer, exists, err := n.getVirtualServerByName(virtualServerName) - if err != nil { - return err - } + return n.addVirtualServersToLoadBalancer(newVirtualServerIDs) +} - if !exists { - return fmt.Errorf("error updating Virtual Server %s because it doesn't exist", virtualServerName) - } +func (n *nsxtLB) EnsureLoadBalancerDeleted(ctx context.Context, clusterName string, service *v1.Service) error { + lbName := n.GetLoadBalancerName(ctx, clusterName, service) - virtualServerID := virtualServer.Id - - // virtual server doesn't exist.. update node pools? or should that only happen on update? - virtualServer = loadbalancer.LbVirtualServer{ - DisplayName: fmt.Sprintf("%s-port-%d", lbName, port.Port), - Description: fmt.Sprintf("LoadBalancer VirtualServer managed by Kubernetes vSphere Cloud Provider (%s)", n.clusterID), - IpProtocol: string(port.Protocol), - DefaultPoolMemberPort: strconv.Itoa(int(port.NodePort)), - IpAddress: virtualServer.IpAddress, - Ports: []string{strconv.Itoa(int(port.Port))}, - Enabled: true, - PoolId: lbPool.Id, - } + lbPool, exists, err := n.getLBPoolByName(lbName) + if err != nil { + return err + } - virtualServer, _, err = n.client.ServicesApi.UpdateLoadBalancerVirtualServer(n.client.Context, virtualServerID, virtualServer) + if exists { + _, err := n.client.ServicesApi.DeleteLoadBalancerPool(n.client.Context, lbPool.Id) if err != nil { return err } } - return nil -} - -func (n *nsxtLB) EnsureLoadBalancerDeleted(ctx context.Context, clusterName string, service *v1.Service) error { - lbName := n.GetLoadBalancerName(ctx, clusterName, service) - // Create a new virtual server per port in the Service since LB pools only support single ports // and each Service Port has a dedicated node port for _, port := range service.Spec.Ports { - poolName := generatePoolName(lbName, int(port.Port)) - lbPool, exists, err := n.getLBPoolByName(poolName) + virtualServerName := generateVirtualServerName(lbName, port.Port) + virtualServer, exists, err := n.getVirtualServerByName(virtualServerName) if err != nil { return err } - if exists { - _, err := n.client.ServicesApi.DeleteLoadBalancerPool(n.client.Context, lbPool.Id) - if err != nil { - return err - } + if !exists { + continue } - virtualServerName := generateVirtualServerName(lbName, int(port.Port)) - virtualServer, exists, err := n.getVirtualServerByName(virtualServerName) + _, err = n.client.ServicesApi.DeleteLoadBalancerVirtualServer(n.client.Context, virtualServer.Id, nil) if err != nil { return err } - - if exists { - _, err := n.client.ServicesApi.DeleteLoadBalancerVirtualServer(n.client.Context, virtualServer.Id, nil) - if err != nil { - return err - } - } } return nil diff --git a/pkg/cloudprovider/vsphere/loadbalancer_utils.go b/pkg/cloudprovider/vsphere/loadbalancer_utils.go index 31d432410f..d45aa488c6 100644 --- a/pkg/cloudprovider/vsphere/loadbalancer_utils.go +++ b/pkg/cloudprovider/vsphere/loadbalancer_utils.go @@ -17,6 +17,7 @@ limitations under the License. package vsphere import ( + "context" "crypto/tls" "encoding/json" "fmt" @@ -24,23 +25,21 @@ import ( "net/http" v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/klog" "github.com/vmware/go-vmware-nsxt/loadbalancer" ) -func generatePoolName(lbName string, port int) string { +func generateVirtualServerName(lbName string, port int32) string { return fmt.Sprintf("%s-port-%d", lbName, port) } -func generateVirtualServerName(lbName string, port int) string { - return fmt.Sprintf("%s-port-%d", lbName, port) -} - -func (n *nsxtLB) loadBalancerServiceName() string { - return fmt.Sprintf("kubernetes-cpi-vsphere-%s", n.clusterID) -} - func (n *nsxtLB) addVirtualServersToLoadBalancer(virtualServerIDs []string) error { + if len(virtualServerIDs) == 0 { + return nil + } + // first read load balancer service lbService, _, err := n.client.ServicesApi.ReadLoadBalancerService(n.client.Context, n.lbServiceID) if err != nil { @@ -54,13 +53,51 @@ func (n *nsxtLB) addVirtualServersToLoadBalancer(virtualServerIDs []string) erro return err } +func (n *nsxtLB) getVirtualServers(service *v1.Service) ([]loadbalancer.LbVirtualServer, error) { + lbName := n.GetLoadBalancerName(context.TODO(), "", service) + + allVirtualServers, err := n.listLoadBalancerVirtualServers() + if err != nil { + return nil, err + } + + virtualServerNames := sets.NewString() + for _, port := range service.Spec.Ports { + virtualServerNames.Insert(generateVirtualServerName(lbName, port.Port)) + } + + virtualServers := []loadbalancer.LbVirtualServer{} + for _, virtualServer := range allVirtualServers { + if !virtualServerNames.Has(virtualServer.DisplayName) { + continue + } + + virtualServers = append(virtualServers, virtualServer) + } + + return virtualServers, nil +} + +func (n *nsxtLB) getUniqueIPsFromVirtualServers(lbs []loadbalancer.LbVirtualServer) []string { + ipSet := sets.NewString() + for _, lb := range lbs { + if ipSet.Has(lb.IpAddress) { + continue + } + + ipSet.Insert(lb.IpAddress) + } + + return ipSet.List() +} + func (n *nsxtLB) getLBServiceByName(name string) (loadbalancer.LbService, bool, error) { lbs, err := n.listLoadBalancerServices() if err != nil { return loadbalancer.LbService{}, false, err } - for _, lbSvc := range lbs.Results { + for _, lbSvc := range lbs { if lbSvc.DisplayName != name { continue } @@ -77,7 +114,7 @@ func (n *nsxtLB) getVirtualServerByName(name string) (loadbalancer.LbVirtualServ return loadbalancer.LbVirtualServer{}, false, err } - for _, virtualServer := range virtualServers.Results { + for _, virtualServer := range virtualServers { if virtualServer.DisplayName != name { continue } @@ -88,13 +125,44 @@ func (n *nsxtLB) getVirtualServerByName(name string) (loadbalancer.LbVirtualServ return loadbalancer.LbVirtualServer{}, false, nil } +func (n *nsxtLB) createOrUpdateLBPool(lbName string, lbMembers []loadbalancer.PoolMember) (loadbalancer.LbPool, error) { + lbPool, exists, err := n.getLBPoolByName(lbName) + if err != nil { + return loadbalancer.LbPool{}, err + } + + lbPoolID := lbPool.Id + lbPool = loadbalancer.LbPool{ + // TODO: LB pool algorithm should be configurable via an annotation on the Service + Algorithm: "ROUND_ROBIN", + DisplayName: lbName, + Description: fmt.Sprintf("LoadBalancer Pool managed by Kubernetes vSphere Cloud Provider (%s)", n.clusterID), + Members: lbMembers, + MinActiveMembers: 1, + } + + if !exists { + lbPool, _, err = n.client.ServicesApi.CreateLoadBalancerPool(n.client.Context, lbPool) + if err != nil { + return loadbalancer.LbPool{}, err + } + } else { + lbPool, _, err = n.client.ServicesApi.UpdateLoadBalancerPool(n.client.Context, lbPoolID, lbPool) + if err != nil { + return loadbalancer.LbPool{}, err + } + } + + return lbPool, nil +} + func (n *nsxtLB) getLBPoolByName(name string) (loadbalancer.LbPool, bool, error) { lbPools, err := n.listLoadBalancerPool() if err != nil { return loadbalancer.LbPool{}, false, err } - for _, lbPool := range lbPools.Results { + for _, lbPool := range lbPools { if lbPool.DisplayName != name { continue } @@ -105,6 +173,28 @@ func (n *nsxtLB) getLBPoolByName(name string) (loadbalancer.LbPool, bool, error) return loadbalancer.LbPool{}, false, nil } +func (n *nsxtLB) nodesToLBMembers(nodes []*v1.Node) []loadbalancer.PoolMember { + var lbMembers []loadbalancer.PoolMember + for _, node := range nodes { + // TODO: don't always assume InternalIP from node addresses + nodeIP := getInternalIP(node) + if nodeIP == "" { + klog.Warningf("node %s has no addresses assigned", node.Name) + continue + } + + member := loadbalancer.PoolMember{ + DisplayName: node.Name, + Weight: 1, + IpAddress: nodeIP, + } + + lbMembers = append(lbMembers, member) + } + + return lbMembers +} + func getInternalIP(node *v1.Node) string { for _, address := range node.Status.Addresses { if address.Type != v1.NodeInternalIP { @@ -126,7 +216,7 @@ type ListLoadBalancerVirtualServers struct { // listLoadBalancerVirtualServers makes an http request for listing load balancer virtual servers // TODO: remove this once the go-vmware-nsxt client supports this call -func (n *nsxtLB) listLoadBalancerVirtualServers() (ListLoadBalancerVirtualServers, error) { +func (n *nsxtLB) listLoadBalancerVirtualServers() ([]loadbalancer.LbVirtualServer, error) { // set default transport to skip verifiy tr := &http.Transport{ TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, @@ -136,27 +226,27 @@ func (n *nsxtLB) listLoadBalancerVirtualServers() (ListLoadBalancerVirtualServer url := "https://" + n.server + "/api/v1/loadbalancer/virtual-servers" req, err := http.NewRequest("GET", url, nil) if err != nil { - return ListLoadBalancerVirtualServers{}, err + return nil, err } req.SetBasicAuth(n.username, n.password) resp, err := client.Do(req) if err != nil { - return ListLoadBalancerVirtualServers{}, err + return nil, err } body, err := ioutil.ReadAll(resp.Body) if err != nil { - return ListLoadBalancerVirtualServers{}, err + return nil, err } var results ListLoadBalancerVirtualServers err = json.Unmarshal(body, &results) if err != nil { - return ListLoadBalancerVirtualServers{}, err + return nil, err } - return results, nil + return results.Results, nil } // ListLoadBalancerService represents the http response from list load balancer service request @@ -168,7 +258,7 @@ type ListLoadBalancerService struct { // listLoadBalancers makes an http request for listing load balancer services // TODO: remove this once the go-vmware-nsxt client supports this call -func (n *nsxtLB) listLoadBalancerServices() (ListLoadBalancerService, error) { +func (n *nsxtLB) listLoadBalancerServices() ([]loadbalancer.LbService, error) { // set default transport to skip verifiy tr := &http.Transport{ TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, @@ -178,27 +268,27 @@ func (n *nsxtLB) listLoadBalancerServices() (ListLoadBalancerService, error) { url := "https://" + n.server + "/api/v1/loadbalancer/services" req, err := http.NewRequest("GET", url, nil) if err != nil { - return ListLoadBalancerService{}, err + return nil, err } req.SetBasicAuth(n.username, n.password) resp, err := client.Do(req) if err != nil { - return ListLoadBalancerService{}, err + return nil, err } body, err := ioutil.ReadAll(resp.Body) if err != nil { - return ListLoadBalancerService{}, err + return nil, err } var results ListLoadBalancerService err = json.Unmarshal(body, &results) if err != nil { - return ListLoadBalancerService{}, err + return nil, err } - return results, nil + return results.Results, nil } // ListLoadBalancerPool represents the http response from list load balancer pools request @@ -210,7 +300,7 @@ type ListLoadBalancerPool struct { // listLoadBalancerPool makes an http request for listing load balancer pools // TODO: remove this once the go-vmware-nsxt client supports this call -func (n *nsxtLB) listLoadBalancerPool() (ListLoadBalancerPool, error) { +func (n *nsxtLB) listLoadBalancerPool() ([]loadbalancer.LbPool, error) { // set default transport to skip verifiy tr := &http.Transport{ TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, @@ -220,25 +310,25 @@ func (n *nsxtLB) listLoadBalancerPool() (ListLoadBalancerPool, error) { url := "https://" + n.server + "/api/v1/loadbalancer/pools" req, err := http.NewRequest("GET", url, nil) if err != nil { - return ListLoadBalancerPool{}, err + return nil, err } req.SetBasicAuth(n.username, n.password) resp, err := client.Do(req) if err != nil { - return ListLoadBalancerPool{}, err + return nil, err } body, err := ioutil.ReadAll(resp.Body) if err != nil { - return ListLoadBalancerPool{}, err + return nil, err } var results ListLoadBalancerPool err = json.Unmarshal(body, &results) if err != nil { - return ListLoadBalancerPool{}, err + return nil, err } - return results, nil + return results.Results, nil } diff --git a/pkg/common/config/types.go b/pkg/common/config/types.go index c6c434fb35..ee9c0c369e 100644 --- a/pkg/common/config/types.go +++ b/pkg/common/config/types.go @@ -126,10 +126,10 @@ type VirtualCenterConfig struct { // LoadbalancerNSXT contains configuration for enabling NSX-T based loadbalancer support for // Services Type=LoadBalancer resources type LoadbalancerNSXT struct { - Server string `gcfg:"server"` - User string `gcfg:"user"` - Password string `gcfg:"password"` - Insecure bool `gcfg:"insecure-flag"` - Tier1RouterID string `gcfg:"tier1-router-id"` - VIPPoolID string `gcfg:"vip-pool-id"` + Server string `gcfg:"server"` + User string `gcfg:"user"` + Password string `gcfg:"password"` + Insecure bool `gcfg:"insecure-flag"` + LBServiceID string `gcfg:"lb-service--id"` + VIPPoolID string `gcfg:"vip-pool-id"` }