diff --git a/daemonset/generic-kuberouter-all-features-advertise-routes.yaml b/daemonset/generic-kuberouter-all-features-advertise-routes.yaml index fa1024df2..6df0468dd 100644 --- a/daemonset/generic-kuberouter-all-features-advertise-routes.yaml +++ b/daemonset/generic-kuberouter-all-features-advertise-routes.yaml @@ -206,6 +206,20 @@ rules: - get - list - watch + - apiGroups: + - "coordination.k8s.io" + resources: + - leases + verbs: + - get + - create + - update + - apiGroups: + - "" + resources: + - services/status + verbs: + - update --- kind: ClusterRoleBinding diff --git a/daemonset/generic-kuberouter-all-features.yaml b/daemonset/generic-kuberouter-all-features.yaml index 80c2e9714..777ddebec 100644 --- a/daemonset/generic-kuberouter-all-features.yaml +++ b/daemonset/generic-kuberouter-all-features.yaml @@ -202,6 +202,20 @@ rules: - get - list - watch + - apiGroups: + - "coordination.k8s.io" + resources: + - leases + verbs: + - get + - create + - update + - apiGroups: + - "" + resources: + - services/status + verbs: + - update --- kind: ClusterRoleBinding apiVersion: rbac.authorization.k8s.io/v1 diff --git a/daemonset/generic-kuberouter-only-advertise-routes.yaml b/daemonset/generic-kuberouter-only-advertise-routes.yaml index 601d177eb..417abe739 100644 --- a/daemonset/generic-kuberouter-only-advertise-routes.yaml +++ b/daemonset/generic-kuberouter-only-advertise-routes.yaml @@ -114,6 +114,20 @@ rules: - get - list - watch + - apiGroups: + - "coordination.k8s.io" + resources: + - leases + verbs: + - get + - create + - update + - apiGroups: + - "" + resources: + - services/status + verbs: + - update --- kind: ClusterRoleBinding diff --git a/daemonset/generic-kuberouter.yaml b/daemonset/generic-kuberouter.yaml index 8d812848f..234ee34c6 100644 --- a/daemonset/generic-kuberouter.yaml +++ b/daemonset/generic-kuberouter.yaml @@ -169,6 +169,20 @@ rules: - get - list - watch + - apiGroups: + - "coordination.k8s.io" + resources: + - leases + verbs: + - get + - create + - update + - apiGroups: + - "" + resources: + - services/status + verbs: + - update --- kind: ClusterRoleBinding diff --git a/daemonset/kubeadm-kuberouter-all-features-dsr.yaml b/daemonset/kubeadm-kuberouter-all-features-dsr.yaml index 49749e400..6f10ce5ae 100644 --- a/daemonset/kubeadm-kuberouter-all-features-dsr.yaml +++ b/daemonset/kubeadm-kuberouter-all-features-dsr.yaml @@ -185,6 +185,20 @@ rules: - get - list - watch + - apiGroups: + - "coordination.k8s.io" + resources: + - leases + verbs: + - get + - create + - update + - apiGroups: + - "" + resources: + - services/status + verbs: + - update --- kind: ClusterRoleBinding apiVersion: rbac.authorization.k8s.io/v1 diff --git a/daemonset/kubeadm-kuberouter-all-features-hostport.yaml b/daemonset/kubeadm-kuberouter-all-features-hostport.yaml index 6ec702973..20dc32185 100644 --- a/daemonset/kubeadm-kuberouter-all-features-hostport.yaml +++ b/daemonset/kubeadm-kuberouter-all-features-hostport.yaml @@ -184,6 +184,20 @@ rules: - get - list - watch + - apiGroups: + - "coordination.k8s.io" + resources: + - leases + verbs: + - get + - create + - update + - apiGroups: + - "" + resources: + - services/status + verbs: + - update --- kind: ClusterRoleBinding apiVersion: rbac.authorization.k8s.io/v1 diff --git a/daemonset/kubeadm-kuberouter-all-features.yaml b/daemonset/kubeadm-kuberouter-all-features.yaml index 6b4502663..0991c67f7 100644 --- a/daemonset/kubeadm-kuberouter-all-features.yaml +++ b/daemonset/kubeadm-kuberouter-all-features.yaml @@ -177,6 +177,20 @@ rules: - get - list - watch + - apiGroups: + - "coordination.k8s.io" + resources: + - leases + verbs: + - get + - create + - update + - apiGroups: + - "" + resources: + - services/status + verbs: + - update --- kind: ClusterRoleBinding apiVersion: rbac.authorization.k8s.io/v1 diff --git a/daemonset/kubeadm-kuberouter.yaml b/daemonset/kubeadm-kuberouter.yaml index c4e1d52e8..c75c9fcc1 100644 --- a/daemonset/kubeadm-kuberouter.yaml +++ b/daemonset/kubeadm-kuberouter.yaml @@ -173,6 +173,20 @@ rules: - get - list - watch + - apiGroups: + - "coordination.k8s.io" + resources: + - leases + verbs: + - get + - create + - update + - apiGroups: + - "" + resources: + - services/status + verbs: + - update --- kind: ClusterRoleBinding apiVersion: rbac.authorization.k8s.io/v1 diff --git a/docs/user-guide.md b/docs/user-guide.md index a7d1beebc..8e1f10465 100644 --- a/docs/user-guide.md +++ b/docs/user-guide.md @@ -94,6 +94,9 @@ Usage of kube-router: --ipvs-permit-all Enables rule to accept all incoming traffic to service VIP's on the node. (default true) --ipvs-sync-period duration The delay between ipvs config synchronizations (e.g. '5s', '1m', '2h22m'). Must be greater than 0. (default 5m0s) --kubeconfig string Path to kubeconfig file with authorization information (the master location is set by the master flag). + --loadbalancer-default-class Handle loadbalancer services without a class (default true) + --loadbalancer-ip-range strings CIDR values from which loadbalancer services addresses are assigned (can be specified multiple times) + --loadbalancer-sync-period duration The delay between checking for missed services (e.g. '5s', '1m'). Must be greater than 0. (default 1m0s) --masquerade-all SNAT all traffic to cluster IP/node port. --master string The address of the Kubernetes API server (overrides any value in kubeconfig). --metrics-path string Prometheus metrics path (default "/metrics") @@ -113,6 +116,7 @@ Usage of kube-router: --router-id string BGP router-id. Must be specified in a ipv6 only cluster, "generate" can be specified to generate the router id. --routes-sync-period duration The delay between route updates and advertisements (e.g. '5s', '1m', '2h22m'). Must be greater than 0. (default 5m0s) --run-firewall Enables Network Policy -- sets up iptables to provide ingress firewall for pods. (default true) + --run-loadbalancer Enable loadbalancer address allocator (default true) --run-router Enables Pod Networking -- Advertises and learns the routes to Pods via iBGP. (default true) --run-service-proxy Enables Service Proxy -- sets up IPVS for Kubernetes Services. (default true) --runtime-endpoint string Path to CRI compatible container runtime socket (used for DSR mode). Currently known working with containerd. diff --git a/pkg/cmd/kube-router.go b/pkg/cmd/kube-router.go index 45d003b51..1cc50d285 100644 --- a/pkg/cmd/kube-router.go +++ b/pkg/cmd/kube-router.go @@ -8,6 +8,7 @@ import ( "syscall" "time" + "github.com/cloudnativelabs/kube-router/v2/pkg/controllers/lballoc" "github.com/cloudnativelabs/kube-router/v2/pkg/controllers/netpol" "github.com/cloudnativelabs/kube-router/v2/pkg/controllers/proxy" "github.com/cloudnativelabs/kube-router/v2/pkg/controllers/routing" @@ -204,6 +205,19 @@ func (kr *KubeRouter) Run() error { go npc.Run(healthChan, stopCh, &wg) } + if kr.Config.RunLoadBalancer { + klog.V(0).Info("running load balancer allocator controller") + lbc, err := lballoc.NewLoadBalancerController(kr.Client, kr.Config, svcInformer) + if err != nil { + return errors.New("Failed to create load balancer allocator: " + err.Error()) + } + + svcInformer.AddEventHandler(lbc) + + wg.Add(1) + go lbc.Run(healthChan, stopCh, &wg) + } + // Handle SIGINT and SIGTERM ch := make(chan os.Signal, 1) signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM) diff --git a/pkg/controllers/lballoc/lballoc.go b/pkg/controllers/lballoc/lballoc.go new file mode 100644 index 000000000..1a3054299 --- /dev/null +++ b/pkg/controllers/lballoc/lballoc.go @@ -0,0 +1,517 @@ +package lballoc + +import ( + "context" + "errors" + "net" + "os" + "sync" + "time" + + "github.com/cloudnativelabs/kube-router/v2/pkg/healthcheck" + "github.com/cloudnativelabs/kube-router/v2/pkg/options" + v1core "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/tools/leaderelection" + "k8s.io/client-go/tools/leaderelection/resourcelock" + "k8s.io/client-go/util/retry" + "k8s.io/klog/v2" +) + +const loadBalancerClassName = "kube-router" + +type ipRanges struct { + ipRanges []net.IPNet + rangeIndex int + currentIP net.IP +} + +type LoadBalancerController struct { + ipv4Ranges *ipRanges + ipv6Ranges *ipRanges + svcLister cache.Indexer + lock *resourcelock.LeaseLock + addChan chan v1core.Service + allocateChan chan v1core.Service + clientset kubernetes.Interface + isDefault bool + syncPeriod time.Duration +} + +func getNamespace() (namespace string, err error) { + ns := os.Getenv("POD_NAMESPACE") + if ns != "" { + return ns, nil + } + + nb, err := os.ReadFile("/var/run/secrets/kubernetes.io/serviceaccount/namespace") + if err == nil { + return string(nb), nil + } + + return "", errors.New("unable to get namespace from kubernetes environment or $POD_NAMESPACE") +} + +func getPodname() (podname string, err error) { + podname = os.Getenv("POD_NAME") + if podname != "" { + return podname, nil + } + return "", errors.New("unable to get pod name from $POD_NAME") +} + +func copyIP(ip net.IP) net.IP { + return append(net.IP{}, ip...) +} + +func newipRanges(ranges []net.IPNet) *ipRanges { + var cip net.IP + if len(ranges) == 0 { + cip = nil + } else { + cip = copyIP(ranges[0].IP) + } + ir := &ipRanges{ + ipRanges: ranges, + currentIP: cip, + } + + return ir +} + +func (ir *ipRanges) inc() { + cn := ir.ipRanges[ir.rangeIndex] + ci := copyIP(ir.currentIP) + + // Increment the current IP address + // 10.0.0.3 will increment to 10.0.0.4 + // 10.0.0.255 will increment to 10.0.1.0 + for i := len(ci) - 1; i >= 0; i-- { + ci[i]++ + if ci[i] > 0 { // if the byte didn't overflow to zero, don't increment the byte to the left + break + } + } + + // If the new address is not in the current IP range, move to the first IP in the next range + // If the current range is the last, move to the first IP in the first range + if !cn.Contains(ci) { + if ir.rangeIndex == len(ir.ipRanges)-1 { + ir.rangeIndex = 0 + } else { + ir.rangeIndex++ + } + ci = copyIP(ir.ipRanges[ir.rangeIndex].IP) + } + + ir.currentIP = ci +} + +func ipInAllocated(ip net.IP, allocated []net.IP) bool { + for _, cip := range allocated { + if cip.Equal(ip) { + return true + } + } + return false +} + +func (ir *ipRanges) getNextFreeIP(allocated []net.IP) (net.IP, error) { + startIP := copyIP(ir.currentIP) + if len(startIP) == 0 { + return nil, errors.New("no IPs left to allocate") + } + ip := startIP + for { + if !ipInAllocated(ip, allocated) { + return ip, nil + } + ir.inc() + ip = ir.currentIP + if ip.Equal(startIP) { + break + } + } + return nil, errors.New("no IPs left to allocate") +} + +func (ir *ipRanges) Len() int { + return len(ir.ipRanges) +} + +func (ir *ipRanges) Contains(ip net.IP) bool { + for _, in := range ir.ipRanges { + if in.Contains(ip) { + return true + } + } + return false +} + +func (lbc *LoadBalancerController) runLeaderElection(ctx context.Context, isLeaderChan chan<- bool) { + leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{ + Lock: lbc.lock, + ReleaseOnCancel: true, + LeaseDuration: 15 * time.Second, //nolint:gomnd // No reason for a 15 second constant + RenewDeadline: 10 * time.Second, + RetryPeriod: 2 * time.Second, + Callbacks: leaderelection.LeaderCallbacks{ + OnStartedLeading: func(c context.Context) { + isLeaderChan <- true + }, + OnStoppedLeading: func() { + isLeaderChan <- false + }, + OnNewLeader: func(_ string) {}, + }, + }) +} + +func (lbc *LoadBalancerController) OnAdd(obj interface{}) { + if svc, ok := obj.(*v1core.Service); ok { + lbc.addChan <- *svc + } +} + +func (lbc *LoadBalancerController) OnDelete(obj interface{}) { +} + +func (lbc *LoadBalancerController) OnUpdate(oldObj interface{}, newObj interface{}) { + if svc, ok := newObj.(*v1core.Service); ok { + lbc.addChan <- *svc + } +} + +func getIPFamilies(families []v1core.IPFamily) (v4, v6 bool) { + for _, ipf := range families { + switch ipf { + case v1core.IPv4Protocol: + v4 = true + case v1core.IPv6Protocol: + v6 = true + } + } + return v4, v6 +} + +func getCurrentIngressFamilies(svc *v1core.Service) (v4, v6 bool) { + for _, lbi := range svc.Status.LoadBalancer.Ingress { + ip := net.ParseIP(lbi.IP) + switch { + case ip == nil: + continue + case ip.To4() != nil: + v4 = true + case ip.To4() == nil: + v6 = true + } + } + return v4, v6 +} + +func checkIngress(svc *v1core.Service) bool { + want4, want6 := getIPFamilies(svc.Spec.IPFamilies) + have4, have6 := getCurrentIngressFamilies(svc) + + if want4 != have4 { + return true + } + if want6 != have6 { + return true + } + return false +} + +func (lbc *LoadBalancerController) checkClass(svc *v1core.Service) bool { + cls := "" + if svc.Spec.LoadBalancerClass != nil { + cls = *svc.Spec.LoadBalancerClass + } + + switch { + case cls == loadBalancerClassName: + return true + case lbc.isDefault && cls == "default": + return true + case lbc.isDefault && cls == "": + return true + } + + return false +} + +func (lbc *LoadBalancerController) shouldAllocate(svc *v1core.Service) bool { + if svc.Spec.Type != v1core.ServiceTypeLoadBalancer { + return false + } + if !lbc.checkClass(svc) { + return false + } + if !checkIngress(svc) { + return false + } + + return true +} + +func (lbc *LoadBalancerController) walkServices() { + var svc *v1core.Service + var ok bool + for _, obj := range lbc.svcLister.List() { + if svc, ok = obj.(*v1core.Service); !ok { + continue + } + if lbc.shouldAllocate(svc) { + lbc.addChan <- *svc + } + } +} + +func (lbc *LoadBalancerController) canAllocate(svc v1core.Service) error { + canV4 := lbc.ipv4Ranges.Len() != 0 + canV6 := lbc.ipv6Ranges.Len() != 0 + requireDual := (svc.Spec.IPFamilyPolicy != nil && *svc.Spec.IPFamilyPolicy == v1core.IPFamilyPolicyRequireDualStack) + if requireDual && !canV4 { + return errors.New("IPv4 address required, but no IPv4 ranges available") + } + if requireDual && !canV6 { + return errors.New("IPv6 address required, but no IPv6 ranges available") + } + + ipv4, ipv6 := getIPFamilies(svc.Spec.IPFamilies) + if ipv4 && !canV4 && !ipv6 { + return errors.New("no IPv4 ranges specified") + } + if ipv6 && !canV6 && !ipv4 { + return errors.New("no IPv6 ranges specified") + } + + return nil +} + +func (lbc *LoadBalancerController) getIPsFromService(svc *v1core.Service) ([]net.IP, []net.IP) { + v4 := make([]net.IP, 0) + v6 := make([]net.IP, 0) + + allips := make([]string, 0) + allips = append(allips, svc.Spec.ExternalIPs...) + for _, lin := range svc.Status.LoadBalancer.Ingress { + if lin.IP == "" { + continue + } + allips = append(allips, lin.IP) + } + + for _, sip := range allips { + ip := net.ParseIP(sip) + if ip == nil { + continue + } + ip4 := ip.To4() + switch { + case ip4 != nil && lbc.ipv4Ranges.Contains(ip4): + v4 = append(v4, ip4) + case lbc.ipv6Ranges.Contains(ip): + v6 = append(v6, ip) + } + } + + return v4, v6 +} + +func (lbc *LoadBalancerController) getAllocatedIPs() ([]net.IP, []net.IP) { + allocated4 := make([]net.IP, 0) + allocated6 := make([]net.IP, 0) + var svc *v1core.Service + var ok bool + for _, obj := range lbc.svcLister.List() { + if svc, ok = obj.(*v1core.Service); !ok { + continue + } + ips4, ips6 := lbc.getIPsFromService(svc) + allocated4 = append(allocated4, ips4...) + allocated6 = append(allocated6, ips6...) + } + return allocated4, allocated6 +} + +func appendIngressIP(svc *v1core.Service, ip net.IP) { + lbi := v1core.LoadBalancerIngress{ + IP: ip.String(), + } + svc.Status.LoadBalancer.Ingress = append(svc.Status.LoadBalancer.Ingress, lbi) +} + +func (lbc *LoadBalancerController) updateService(svc *v1core.Service, ips ...net.IP) { + if lbc.clientset == nil { + panic("clientset") + } + if lbc.clientset.CoreV1() == nil { + panic("corev1") + } + svcClient := lbc.clientset.CoreV1().Services(svc.Namespace) + err := retry.RetryOnConflict(retry.DefaultRetry, func() error { + curSvc, err := svcClient.Get(context.TODO(), svc.Name, metav1.GetOptions{}) + if err != nil { + return err + } + for _, ip := range ips { + if ip == nil { + continue + } + appendIngressIP(curSvc, ip) + } + _, err = svcClient.UpdateStatus(context.TODO(), curSvc, metav1.UpdateOptions{}) + return err + }) + if err != nil { + klog.Errorf("unable to update %s in %s: %s", svc.Name, svc.Namespace, err) + } +} + +func (lbc *LoadBalancerController) allocateService(svc *v1core.Service) error { + allocated4, allocated6 := lbc.getAllocatedIPs() + + requireDual := (svc.Spec.IPFamilyPolicy != nil && *svc.Spec.IPFamilyPolicy == v1core.IPFamilyPolicyRequireDualStack) + want4, want6 := getIPFamilies(svc.Spec.IPFamilies) + have4, have6 := getCurrentIngressFamilies(svc) + + var ipv4, ipv6 net.IP + var err4, err6 error + if want4 && !have4 { + ipv4, err4 = lbc.ipv4Ranges.getNextFreeIP(allocated4) + } + if want6 && !have6 { + ipv6, err6 = lbc.ipv6Ranges.getNextFreeIP(allocated6) + } + err := err6 + if err4 != nil { + err = err4 + } + + if ipv4 == nil && ipv6 == nil { + return errors.New("unable to allocate address: " + err.Error()) + } + if (ipv4 == nil || ipv6 == nil) && requireDual { + return errors.New("unable to allocate dual-stack addresses: " + err.Error()) + } + + go lbc.updateService(svc, ipv4, ipv6) + return nil +} + +func (lbc *LoadBalancerController) allocator() { + for svc := range lbc.allocateChan { + err := lbc.canAllocate(svc) + if err != nil { + klog.Errorf("can not allocate address for %s in %s: %s", + svc.Name, svc.Namespace, err) + continue + } + err = lbc.allocateService(&svc) + if err != nil { + klog.Errorf("failed to allocate address for %s in %s: %s", + svc.Name, svc.Namespace, err) + continue + } + } +} + +func (lbc *LoadBalancerController) Run(healthChan chan<- *healthcheck.ControllerHeartbeat, + stopCh <-chan struct{}, wg *sync.WaitGroup) { + isLeader := false + isLeaderChan := make(chan bool) + ctx, cancel := context.WithCancel(context.Background()) + timer := time.NewTimer(lbc.syncPeriod) + defer wg.Done() + defer cancel() + defer close(lbc.allocateChan) + + go lbc.runLeaderElection(ctx, isLeaderChan) + go lbc.allocator() + + for { + select { + case <-stopCh: + klog.Info("shutting down load balancer allocator controller") + return + case isLeader = <-isLeaderChan: + if isLeader { + klog.Info("became the load balancer controller leader, syncing...") + go lbc.walkServices() + } + case svc := <-lbc.addChan: + if isLeader && lbc.shouldAllocate(&svc) { + lbc.allocateChan <- svc + } + case <-timer.C: + timer.Reset(time.Minute) + healthcheck.SendHeartBeat(healthChan, "LBC") + if isLeader { + go lbc.walkServices() + } + } + } +} + +func NewLoadBalancerController(clientset kubernetes.Interface, + config *options.KubeRouterConfig, svcInformer cache.SharedIndexInformer, +) (*LoadBalancerController, error) { + ranges4 := make([]net.IPNet, 0) + ranges6 := make([]net.IPNet, 0) + + for _, ir := range config.LoadBalancerCIDRs { + ip, cidr, err := net.ParseCIDR(ir) + if err != nil { + return nil, err + } + if ip.To4() != nil && !config.EnableIPv4 { + return nil, errors.New("IPv4 loadbalancer CIDR specified while IPv4 is disabled") + } + if ip.To4() == nil && !config.EnableIPv6 { + return nil, errors.New("IPv6 loadbalancer CIDR specified while IPv6 is disabled") + } + if ip.To4() != nil { + ranges4 = append(ranges4, *cidr) + } else { + ranges6 = append(ranges6, *cidr) + } + } + + lbc := &LoadBalancerController{ + ipv4Ranges: newipRanges(ranges4), + ipv6Ranges: newipRanges(ranges6), + addChan: make(chan v1core.Service), + allocateChan: make(chan v1core.Service), + clientset: clientset, + isDefault: config.LoadBalancerDefaultClass, + syncPeriod: config.LoadBalancerSyncPeriod, + } + + lbc.svcLister = svcInformer.GetIndexer() + + namespace, err := getNamespace() + if err != nil { + return nil, err + } + + podname, err := getPodname() + if err != nil { + return nil, err + } + + lbc.lock = &resourcelock.LeaseLock{ + LeaseMeta: metav1.ObjectMeta{ + Name: "kube-router-lballoc", + Namespace: namespace, + }, + Client: clientset.CoordinationV1(), + LockConfig: resourcelock.ResourceLockConfig{ + Identity: podname, + }, + } + + return lbc, nil +} diff --git a/pkg/controllers/lballoc/lballoc_test.go b/pkg/controllers/lballoc/lballoc_test.go new file mode 100644 index 000000000..0167f477a --- /dev/null +++ b/pkg/controllers/lballoc/lballoc_test.go @@ -0,0 +1,692 @@ +package lballoc + +import ( + "errors" + "net" + "testing" + "time" + + "github.com/cloudnativelabs/kube-router/v2/pkg/options" + v1core "k8s.io/api/core/v1" + "k8s.io/client-go/kubernetes/fake" + "k8s.io/client-go/tools/cache" +) + +const ( + testName = "falafel" + testDefaultClass = "default" +) + +func TestGetNamespace(t *testing.T) { + errExp := error(nil) + t.Setenv("POD_NAMESPACE", testName) + ns, err := getNamespace() + if ns != testName { + t.Fatalf("expected %s, got %s", testName, ns) + } + if err != errExp { + t.Fatalf("expected %s, got %s", errExp, err) + } +} + +func TestGetNamespaceFail(t *testing.T) { + nsExp := "" + errExp := errors.New("unable to get namespace from kubernetes environment or $POD_NAMESPACE") + ns, err := getNamespace() + if ns != nsExp { + t.Fatalf("expected \"%s\", got %s", nsExp, ns) + } + if err.Error() != errExp.Error() { + t.Fatalf("expected %s, got %s", errExp, err) + } +} + +func TestGetPodName(t *testing.T) { + errExp := error(nil) + t.Setenv("POD_NAME", testName) + name, err := getPodname() + if name != testName { + t.Fatalf("expected %s, got %s", testName, name) + } + if err != errExp { + t.Fatalf("expected %s, got %s", errExp, err) + } +} + +func TestGetPodNameFail(t *testing.T) { + nameExp := "" + errExp := errors.New("unable to get pod name from $POD_NAME") + name, err := getPodname() + if name != nameExp { + t.Fatalf("expected \"%s\", got %s", nameExp, name) + } + if err.Error() != errExp.Error() { + t.Fatalf("expected %s, got %s", errExp, err) + } +} + +func TestIPRangesEmpty(t *testing.T) { + lenExp := 0 + ipExp := net.IP(nil) + errExp := errors.New("no IPs left to allocate") + allocated := make([]net.IP, 0) + ir := newipRanges(nil) + + l := ir.Len() + if l != lenExp { + t.Fatalf("expected %d, got %d", lenExp, l) + } + + ip, err := ir.getNextFreeIP(allocated) + if ip != nil { + t.Fatalf("expected %s, got %s", ipExp, ip) + } + if err.Error() != errExp.Error() { + t.Fatalf("expected %s, got %s", errExp, err) + } +} + +func TestIPRange(t *testing.T) { + lenExp := 1 + ipExp := net.ParseIP("ffff::") + onesExp := 128 + bitsExp := 128 + errExp := errors.New("no IPs left to allocate") + containsExp := true + allocated := make([]net.IP, 0) + + _, ipnet, err := net.ParseCIDR("ffff::/128") + if err != nil { + t.Fatalf("expected %s, got %s", error(nil), err) + } + ipnets := append([]net.IPNet(nil), *ipnet) + ir := newipRanges(ipnets) + + l := ir.Len() + if l != lenExp { + t.Fatalf("expected %d, got %d", lenExp, l) + } + + if !ir.ipRanges[0].IP.Equal(ipExp) { + t.Fatalf("expected %s, got %s", ipExp, ir.ipRanges[0].IP) + } + ones, bits := ir.ipRanges[0].Mask.Size() + if ones != onesExp { + t.Fatalf("expected %d, got %d", onesExp, ones) + } + if bits != bitsExp { + t.Fatalf("expected %d, got %d", bitsExp, bits) + } + + ip, err := ir.getNextFreeIP(allocated) + if !ip.Equal(ipExp) { + t.Fatalf("expected %s, got %s", ipExp, ip) + } + if err != nil { + t.Fatalf("expected %s, got %s", error(nil), err) + } + + allocated = append(allocated, ip) + + ip, err = ir.getNextFreeIP(allocated) + if ip != nil { + t.Fatalf("expected %s, got %s", net.IP(nil), ip) + } + if err.Error() != errExp.Error() { + t.Fatalf("expected %s, got %s", errExp, err) + } + + contains := ir.Contains(ipExp) + if contains != containsExp { + t.Fatalf("expected %t, got %t", containsExp, contains) + } +} + +func TestGetIPFamilies(t *testing.T) { + v4Exp := true + v6Exp := true + + families := append([]v1core.IPFamily{}, v1core.IPv4Protocol, v1core.IPv6Protocol) + + v4, v6 := getIPFamilies(families) + + if v4 != v4Exp { + t.Fatalf("expected %t, got %t", v4Exp, v4) + } + + if v6 != v6Exp { + t.Fatalf("expected %t, got %t", v6Exp, v6) + } + +} + +func makeTestService() v1core.Service { + svc := v1core.Service{ + Spec: v1core.ServiceSpec{ + Type: v1core.ServiceTypeLoadBalancer, + }, + } + svc.Name = testName + svc.Namespace = "tahini" + svc.Spec.LoadBalancerClass = nil + svc.Spec.IPFamilies = append([]v1core.IPFamily{}, v1core.IPv4Protocol, v1core.IPv6Protocol) + + return svc +} + +func TestGetCurrentIngressFamilies(t *testing.T) { + svc := makeTestService() + for _, tip := range []string{"ffff::", "127.127.127.127"} { + ing := v1core.LoadBalancerIngress{ + IP: tip, + } + svc.Status.LoadBalancer.Ingress = append(svc.Status.LoadBalancer.Ingress, ing) + } + + expV4 := true + expV6 := true + v4, v6 := getCurrentIngressFamilies(&svc) + if expV4 != v4 { + t.Fatalf("expected %t, got %t", expV4, v4) + } + if expV6 != v6 { + t.Fatalf("expected %t, got %t", expV6, v6) + } + +} + +func TestCheckIngress(t *testing.T) { + svc := makeTestService() + + check := checkIngress(&svc) + if !check { + t.Fatalf("expected %t, got %t", true, check) + } + + v6Ingress := v1core.LoadBalancerIngress{ + IP: "ffff::", + } + svc.Status.LoadBalancer.Ingress = append(svc.Status.LoadBalancer.Ingress, v6Ingress) + + check = checkIngress(&svc) + if !check { + t.Fatalf("expected %t, got %t", true, check) + } + + v4Ingress := v1core.LoadBalancerIngress{ + IP: "127.127.127.127", + } + svc.Status.LoadBalancer.Ingress = append(svc.Status.LoadBalancer.Ingress, v4Ingress) + + check = checkIngress(&svc) + if check { + t.Fatalf("expected %t, got %t", false, check) + } +} + +func TestCheckClass(t *testing.T) { + lbc := &LoadBalancerController{ + isDefault: true, + } + + svc := makeTestService() + svc.Spec.LoadBalancerClass = nil + + check := lbc.checkClass(&svc) + if !check { + t.Fatalf("expected %t, got %t", true, check) + } + + lbc.isDefault = false + check = lbc.checkClass(&svc) + if check { + t.Fatalf("expected %t, got %t", false, check) + } + + cls := "" + svc.Spec.LoadBalancerClass = &cls + check = lbc.checkClass(&svc) + if check { + t.Fatalf("expected %t, got %t", false, check) + } + + cls = testDefaultClass + svc.Spec.LoadBalancerClass = &cls + check = lbc.checkClass(&svc) + if check { + t.Fatalf("expected %t, got %t", false, check) + } + + cls = loadBalancerClassName + svc.Spec.LoadBalancerClass = &cls + check = lbc.checkClass(&svc) + if !check { + t.Fatalf("expected %t, got %t", true, check) + } + + lbc.isDefault = true + + cls = "" + svc.Spec.LoadBalancerClass = &cls + check = lbc.checkClass(&svc) + if !check { + t.Fatalf("expected %t, got %t", true, check) + } + + cls = testDefaultClass + svc.Spec.LoadBalancerClass = &cls + check = lbc.checkClass(&svc) + if !check { + t.Fatalf("expected %t, got %t", true, check) + } + + cls = loadBalancerClassName + svc.Spec.LoadBalancerClass = &cls + check = lbc.checkClass(&svc) + if !check { + t.Fatalf("expected %t, got %t", true, check) + } + + cls = testName + svc.Spec.LoadBalancerClass = &cls + check = lbc.checkClass(&svc) + if check { + t.Fatalf("expected %t, got %t", false, check) + } + +} + +func TestShouldAllocate(t *testing.T) { + lbc := &LoadBalancerController{ + isDefault: true, + } + + svc := makeTestService() + + check := lbc.shouldAllocate(&svc) + if !check { + t.Fatalf("expected %t, got %t", true, check) + } + + svc.Spec.Type = v1core.ServiceTypeExternalName + check = lbc.shouldAllocate(&svc) + if check { + t.Fatalf("expected %t, got %t", false, check) + } + svc.Spec.Type = v1core.ServiceTypeLoadBalancer + + cls := testName + svc.Spec.LoadBalancerClass = &cls + check = lbc.shouldAllocate(&svc) + if check { + t.Fatalf("expected %t, got %t", false, check) + } + svc.Spec.LoadBalancerClass = nil + + svc.Spec.IPFamilies = append([]v1core.IPFamily{}, v1core.IPv4Protocol) + ingress := v1core.LoadBalancerIngress{ + IP: "127.127.127.127", + } + svc.Status.LoadBalancer.Ingress = append([]v1core.LoadBalancerIngress{}, ingress) + check = lbc.shouldAllocate(&svc) + if check { + t.Fatalf("expected %t, got %t", false, check) + } + + ingress = v1core.LoadBalancerIngress{ + IP: "ffff::", + } + svc.Status.LoadBalancer.Ingress = append([]v1core.LoadBalancerIngress{}, ingress) + check = lbc.shouldAllocate(&svc) + if !check { + t.Fatalf("expected %t, got %t", true, check) + } + +} + +type mockIndexer struct { + cache.FakeCustomStore + objects []interface{} +} + +func (mi *mockIndexer) Index(_ string, _ interface{}) ([]interface{}, error) { + return nil, errors.New("unsupported") +} + +func (mi *mockIndexer) IndexKeys(_, _ string) ([]string, error) { + return nil, errors.New("unsupported") +} + +func (mi *mockIndexer) ListIndexFuncValues(_ string) []string { + return nil +} + +func (mi *mockIndexer) ByIndex(_, _ string) ([]interface{}, error) { + return nil, errors.New("unsupported") +} + +func (mi *mockIndexer) GetIndexers() cache.Indexers { + return nil +} + +func (mi *mockIndexer) AddIndexers(_ cache.Indexers) error { + return errors.New("unsupported") +} + +func (mi *mockIndexer) List() []interface{} { + return mi.objects +} + +func newMockIndexer(objects ...interface{}) *mockIndexer { + mi := &mockIndexer{ + objects: make([]interface{}, 0), + } + mi.objects = append(mi.objects, objects...) + return mi +} + +func TestWalkServices(t *testing.T) { + svc1 := makeTestService() + svc2 := true + mi := newMockIndexer(svc1, svc2) + addChan := make(chan v1core.Service, 2) + lbc := &LoadBalancerController{ + svcLister: mi, + addChan: addChan, + } + + lbc.walkServices() + close(lbc.addChan) + + out := make([]v1core.Service, 1) + for svc := range lbc.addChan { + out = append(out, svc) + } + + l := 1 + lenExp := 1 + if len(out) != lenExp { + t.Fatalf("expected %d, got %d", lenExp, l) + } +} + +func makeIPRanges(ips ...string) (ir4, ir6 *ipRanges) { + var v4, v6 []net.IPNet + for _, sip := range ips { + _, ipn, _ := net.ParseCIDR(sip) + if ipn == nil { + continue + } + if ipn.IP.To4() != nil { + v4 = append(v4, *ipn) + } else { + v6 = append(v6, *ipn) + } + } + ir4 = newipRanges(v4) + ir6 = newipRanges(v6) + return ir4, ir6 +} + +func TestCanAllocate(t *testing.T) { + ir4, ir6 := makeIPRanges("127.127.127.127/32", "ffff::/32") + lbc := &LoadBalancerController{ + ipv4Ranges: ir4, + ipv6Ranges: ir6, + } + ippol := v1core.IPFamilyPolicy("RequireDualStack") + svc := makeTestService() + svc.Spec.IPFamilyPolicy = &ippol + + err := lbc.canAllocate(svc) + if err != nil { + t.Fatalf("expected %v, got %s", nil, err) + } + + lbc.ipv4Ranges = newipRanges(nil) + errExp := errors.New("IPv4 address required, but no IPv4 ranges available") + err = lbc.canAllocate(svc) + if err.Error() != errExp.Error() { + t.Fatalf("expected %s, got %s", errExp, err) + } + + lbc.ipv4Ranges = ir4 + lbc.ipv6Ranges = newipRanges(nil) + errExp = errors.New("IPv6 address required, but no IPv6 ranges available") + err = lbc.canAllocate(svc) + if err.Error() != errExp.Error() { + t.Fatalf("expected %s, got %s", errExp, err) + } + + ippol = v1core.IPFamilyPolicy("PreferDualStack") + svc.Spec.IPFamilyPolicy = &ippol + svc.Spec.IPFamilies = append([]v1core.IPFamily{}, v1core.IPv4Protocol) + err = lbc.canAllocate(svc) + if err != nil { + t.Fatalf("expected %v, got %s", nil, err) + } + + svc.Spec.IPFamilies = append([]v1core.IPFamily{}, v1core.IPv6Protocol) + err = lbc.canAllocate(svc) + errExp = errors.New("no IPv6 ranges specified") + if err.Error() != errExp.Error() { + t.Fatalf("expected %s, got %s", errExp, err) + } + + lbc.ipv4Ranges = newipRanges(nil) + lbc.ipv6Ranges = ir6 + svc.Spec.IPFamilies = append([]v1core.IPFamily{}, v1core.IPv4Protocol) + err = lbc.canAllocate(svc) + errExp = errors.New("no IPv4 ranges specified") + if err.Error() != errExp.Error() { + t.Fatalf("expected %s, got %s", errExp, err) + } + + lbc.ipv6Ranges = newipRanges(nil) + err = lbc.canAllocate(svc) + errExp = errors.New("no IPv4 ranges specified") + if err.Error() != errExp.Error() { + t.Fatalf("expected %s, got %s", errExp, err) + } +} + +func TestGetIPsFromService(t *testing.T) { + svc := makeTestService() + ir4, ir6 := makeIPRanges("127.127.127.127/32", "ffff::/32") + lbc := &LoadBalancerController{ + ipv4Ranges: ir4, + ipv6Ranges: ir6, + } + + svc.Spec.ExternalIPs = append([]string{}, "falafel", "127.127.127.127") + for _, is := range []string{"ffff::", "aaaa::", "tahini"} { + ing := v1core.LoadBalancerIngress{ + IP: is, + } + svc.Status.LoadBalancer.Ingress = append(svc.Status.LoadBalancer.Ingress, ing) + } + + addresses4, addresses6 := lbc.getIPsFromService(&svc) + l4Exp := 1 + l6Exp := 1 + l4 := len(addresses4) + l6 := len(addresses6) + if l4 != l4Exp { + t.Fatalf("expected %d, got %d", l4Exp, l4) + } + if l6 != l6Exp { + t.Fatalf("expected %d, got %d", l6Exp, l6) + } +} + +func TestGetAllocatedIPs(t *testing.T) { + svcExt := makeTestService() + svcExt.Spec.ExternalIPs = append([]string{}, "ffff::", "kaka", "255.255.255.255") + svcLB := makeTestService() + for _, is := range []string{"aaaa::", "127.127.127.127"} { + ing := v1core.LoadBalancerIngress{ + IP: is, + } + svcLB.Status.LoadBalancer.Ingress = append(svcLB.Status.LoadBalancer.Ingress, ing) + } + + mi := newMockIndexer(&svcExt, &svcLB, 1234) + ir4, ir6 := makeIPRanges("127.127.127.127/32", "ffff::/32") + lbc := &LoadBalancerController{ + ipv4Ranges: ir4, + ipv6Ranges: ir6, + svcLister: mi, + } + + allocated4, allocated6 := lbc.getAllocatedIPs() + + l4Exp := 1 + l4 := len(allocated4) + if l4 != l4Exp { + t.Fatalf("expected %d, got %d", l4Exp, l4) + } + + l6Exp := 1 + l6 := len(allocated6) + if l6 != l6Exp { + t.Fatalf("expected %d, got %d", l6Exp, l6) + } +} + +func TestAppendIngressIP(t *testing.T) { + svc := makeTestService() + ip := net.ParseIP("127.127.127.127") + appendIngressIP(&svc, ip) + + ilExp := 1 + il := len(svc.Status.LoadBalancer.Ingress) + if ilExp != il { + t.Fatalf("expected %d, got %d", ilExp, il) + } + + ipExp := "127.127.127.127" + if ipExp != svc.Status.LoadBalancer.Ingress[0].IP { + t.Fatalf("expected %s, got %s", ipExp, svc.Status.LoadBalancer.Ingress[0].IP) + } +} + +func TestAllocateService(t *testing.T) { + mlbc := &LoadBalancerController{ + clientset: fake.NewSimpleClientset(), + } + ir4, ir6 := makeIPRanges("127.127.127.127/30", "ffff::/80") + mlbc.ipv4Ranges = ir4 + mlbc.ipv6Ranges = ir6 + mi := newMockIndexer() + mlbc.svcLister = mi + svc := makeTestService() + + err := mlbc.allocateService(&svc) + if err != nil { + t.Fatalf("expected %v, got %s", nil, err) + } + + svc = makeTestService() + mlbc.ipv4Ranges = newipRanges(nil) + fp := v1core.IPFamilyPolicyRequireDualStack + svc.Spec.IPFamilyPolicy = &fp + err = mlbc.allocateService(&svc) + errExp := "unable to allocate dual-stack addresses: no IPs left to allocate" + if errExp != err.Error() { + t.Fatalf("expected %s, got %s", errExp, err) + } + + mlbc.ipv4Ranges = ir4 + mlbc.ipv6Ranges = newipRanges(nil) + err = mlbc.allocateService(&svc) + if errExp != err.Error() { + t.Fatalf("expected %s, got %s", errExp, err) + } + + mlbc.ipv4Ranges = newipRanges(nil) + fp = v1core.IPFamilyPolicyPreferDualStack + svc.Spec.IPFamilyPolicy = &fp + err = mlbc.allocateService(&svc) + errExp = "unable to allocate address: no IPs left to allocate" + if errExp != err.Error() { + t.Fatalf("expected %s, got %s", errExp, err) + } + +} + +type mockInformer struct { +} + +func (mf *mockInformer) GetIndexer() cache.Indexer { + return newMockIndexer() +} + +func (mf *mockInformer) AddIndexers(_ cache.Indexers) error { + return nil +} + +func (mf *mockInformer) AddEventHandler(_ cache.ResourceEventHandler) { +} + +func (mf *mockInformer) AddEventHandlerWithResyncPeriod(_ cache.ResourceEventHandler, _ time.Duration) { +} + +func (mf *mockInformer) GetController() cache.Controller { + return nil +} + +func (mf *mockInformer) GetStore() cache.Store { + return nil +} + +func (mf *mockInformer) HasSynced() bool { + return false +} + +func (mf *mockInformer) LastSyncResourceVersion() string { + return "" +} + +func (mf *mockInformer) Run(_ <-chan struct{}) { +} + +func (mf *mockInformer) SetTransform(_ cache.TransformFunc) error { + return nil +} + +func (mf *mockInformer) SetWatchErrorHandler(_ cache.WatchErrorHandler) error { + return nil +} + +func TestNewLoadBalancerController(t *testing.T) { + t.Setenv("POD_NAMESPACE", testName) + t.Setenv("POD_NAME", testName) + + mf := &mockInformer{} + config := &options.KubeRouterConfig{ + LoadBalancerCIDRs: []string{"127.127.127.127/30", "ffff::/80"}, + EnableIPv4: true, + EnableIPv6: true, + } + fs := fake.NewSimpleClientset() + + _, err := NewLoadBalancerController(fs, config, mf) + if err != nil { + t.Fatalf("expected %v, got %s", nil, err) + } + + config.EnableIPv4 = false + _, err = NewLoadBalancerController(fs, config, mf) + errExp := "IPv4 loadbalancer CIDR specified while IPv4 is disabled" + if err.Error() != errExp { + t.Fatalf("expected %s, got %s", errExp, err) + } + + config.EnableIPv4 = true + config.EnableIPv6 = false + _, err = NewLoadBalancerController(fs, config, mf) + errExp = "IPv6 loadbalancer CIDR specified while IPv6 is disabled" + if err.Error() != errExp { + t.Fatalf("expected %s, got %s", errExp, err) + } +} diff --git a/pkg/healthcheck/health_controller.go b/pkg/healthcheck/health_controller.go index e64ff4c03..b135d64c8 100644 --- a/pkg/healthcheck/health_controller.go +++ b/pkg/healthcheck/health_controller.go @@ -34,6 +34,8 @@ type HealthController struct { type HealthStats struct { sync.Mutex Healthy bool + LoadBalancerControllerAlive time.Time + LoadBalancerControllerAliveTTL time.Duration MetricsControllerAlive time.Time NetworkPolicyControllerAlive time.Time NetworkPolicyControllerAliveTTL time.Duration @@ -90,6 +92,11 @@ func (hc *HealthController) HandleHeartbeat(beat *ControllerHeartbeat) { switch { // The first heartbeat will set the initial gracetime the controller has to report in, A static time is added as // well when checking to allow for load variation in sync time + case beat.Component == "LBC": + if hc.Status.LoadBalancerControllerAliveTTL == 0 { + hc.Status.LoadBalancerControllerAliveTTL = time.Since(hc.Status.LoadBalancerControllerAlive) + } + hc.Status.LoadBalancerControllerAlive = beat.LastHeartBeat case beat.Component == "NSC": if hc.Status.NetworkServicesControllerAliveTTL == 0 { hc.Status.NetworkServicesControllerAliveTTL = time.Since(hc.Status.NetworkServicesControllerAlive) @@ -126,6 +133,14 @@ func (hc *HealthController) CheckHealth() bool { } } + if hc.Config.RunLoadBalancer { + if time.Since(hc.Status.LoadBalancerControllerAlive) > + hc.Config.LoadBalancerSyncPeriod+hc.Status.LoadBalancerControllerAliveTTL+graceTime { + klog.Error("Load Balancer Allocator Controller heartbeat missed") + health = false + } + } + if hc.Config.RunRouter { if time.Since(hc.Status.NetworkRoutingControllerAlive) > hc.Config.RoutesSyncPeriod+hc.Status.NetworkRoutingControllerAliveTTL+graceTime { @@ -205,6 +220,7 @@ func (hc *HealthController) SetAlive() { now := time.Now() + hc.Status.LoadBalancerControllerAlive = now hc.Status.MetricsControllerAlive = now hc.Status.NetworkPolicyControllerAlive = now hc.Status.NetworkRoutingControllerAlive = now diff --git a/pkg/options/options.go b/pkg/options/options.go index 30a433708..7ff8b7948 100644 --- a/pkg/options/options.go +++ b/pkg/options/options.go @@ -52,6 +52,9 @@ type KubeRouterConfig struct { IpvsPermitAll bool IpvsSyncPeriod time.Duration Kubeconfig string + LoadBalancerCIDRs []string + LoadBalancerDefaultClass bool + LoadBalancerSyncPeriod time.Duration MasqueradeAll bool Master string MetricsEnabled bool @@ -74,6 +77,7 @@ type KubeRouterConfig struct { RunFirewall bool RunRouter bool RunServiceProxy bool + RunLoadBalancer bool RuntimeEndpoint string Version bool VLevel string @@ -92,6 +96,7 @@ func NewKubeRouterConfig() *KubeRouterConfig { IPTablesSyncPeriod: 5 * time.Minute, IpvsGracefulPeriod: 30 * time.Second, IpvsSyncPeriod: 5 * time.Minute, + LoadBalancerSyncPeriod: time.Minute, NodePortRange: "30000-32767", OverlayType: "subnet", RoutesSyncPeriod: 5 * time.Minute, @@ -173,6 +178,12 @@ func (s *KubeRouterConfig) AddFlags(fs *pflag.FlagSet) { "The delay between ipvs config synchronizations (e.g. '5s', '1m', '2h22m'). Must be greater than 0.") fs.StringVar(&s.Kubeconfig, "kubeconfig", s.Kubeconfig, "Path to kubeconfig file with authorization information (the master location is set by the master flag).") + fs.BoolVar(&s.LoadBalancerDefaultClass, "loadbalancer-default-class", true, + "Handle loadbalancer services without a class") + fs.StringSliceVar(&s.LoadBalancerCIDRs, "loadbalancer-ip-range", s.LoadBalancerCIDRs, + "CIDR values from which loadbalancer services addresses are assigned (can be specified multiple times)") + fs.DurationVar(&s.LoadBalancerSyncPeriod, "loadbalancer-sync-period", s.LoadBalancerSyncPeriod, + "The delay between checking for missed services (e.g. '5s', '1m'). Must be greater than 0.") fs.BoolVar(&s.MasqueradeAll, "masquerade-all", false, "SNAT all traffic to cluster IP/node port.") fs.StringVar(&s.Master, "master", s.Master, @@ -216,6 +227,8 @@ func (s *KubeRouterConfig) AddFlags(fs *pflag.FlagSet) { "The delay between route updates and advertisements (e.g. '5s', '1m', '2h22m'). Must be greater than 0.") fs.BoolVar(&s.RunFirewall, "run-firewall", true, "Enables Network Policy -- sets up iptables to provide ingress firewall for pods.") + fs.BoolVar(&s.RunLoadBalancer, "run-loadbalancer", true, + "Enable loadbalancer address allocator") fs.BoolVar(&s.RunRouter, "run-router", true, "Enables Pod Networking -- Advertises and learns the routes to Pods via iBGP.") fs.BoolVar(&s.RunServiceProxy, "run-service-proxy", true,