diff --git a/felix/bpf/proxy/kube-proxy.go b/felix/bpf/proxy/kube-proxy.go index c5060abe425..428dfa951b7 100644 --- a/felix/bpf/proxy/kube-proxy.go +++ b/felix/bpf/proxy/kube-proxy.go @@ -15,11 +15,14 @@ package proxy import ( + "context" "net" "sync" + "time" "github.com/pkg/errors" log "github.com/sirupsen/logrus" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" "github.com/projectcalico/calico/felix/bpf/bpfmap" @@ -43,6 +46,8 @@ type KubeProxy struct { k8s kubernetes.Interface hostname string + nodeLabels map[string]string + lastHostIPs []net.IP frontendMap maps.MapWithExistsCheck backendMap maps.MapWithExistsCheck affinityMap maps.Map @@ -107,6 +112,29 @@ func (kp *KubeProxy) Stop() { }) } +func (kp *KubeProxy) fetchNodeLabels() map[string]string { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + node, err := kp.k8s.CoreV1().Nodes().Get(ctx, kp.hostname, metav1.GetOptions{}) + if err != nil { + log.WithError(err).WithField("hostname", kp.hostname).Warn("Failed to fetch node labels, using empty labels") + return make(map[string]string) + } + + labels := make(map[string]string, len(node.Labels)) + for k, v := range node.Labels { + labels[k] = v + } + + log.WithFields(log.Fields{ + "hostname": kp.hostname, + "labels": labels, + }).Debug("Fetched node labels") + + return labels +} + func (kp *KubeProxy) run(hostIPs []net.IP) error { ips := make([]net.IP, 0, len(hostIPs)) @@ -123,6 +151,9 @@ func (kp *KubeProxy) run(hostIPs []net.IP) error { kp.lock.Lock() defer kp.lock.Unlock() + kp.lastHostIPs = hostIPs + kp.nodeLabels = kp.fetchNodeLabels() + withLocalNP := make([]net.IP, len(hostIPs), len(hostIPs)+1) copy(withLocalNP, hostIPs) if kp.ipFamily == 4 { @@ -132,7 +163,7 @@ func (kp *KubeProxy) run(hostIPs []net.IP) error { } syncer, err := NewSyncer(kp.ipFamily, withLocalNP, kp.frontendMap, kp.backendMap, kp.affinityMap, - kp.rt, kp.excludedCIDRs) + kp.rt, kp.excludedCIDRs, kp.nodeLabels) if err != nil { return errors.WithMessage(err, "new bpf syncer") } @@ -154,7 +185,8 @@ func (kp *KubeProxy) start() error { withLocalNP = append(withLocalNP, podNPIPV6) } - syncer, err := NewSyncer(kp.ipFamily, withLocalNP, kp.frontendMap, kp.backendMap, kp.affinityMap, kp.rt, kp.excludedCIDRs) + // Node labels will be fetched in run() when we have the actual host IPs + syncer, err := NewSyncer(kp.ipFamily, withLocalNP, kp.frontendMap, kp.backendMap, kp.affinityMap, kp.rt, kp.excludedCIDRs, nil) if err != nil { return errors.WithMessage(err, "new bpf syncer") } @@ -219,6 +251,45 @@ func (kp *KubeProxy) OnHostIPsUpdate(IPs []net.IP) { log.Debugf("kube-proxy OnHostIPsUpdate: %+v", IPs) } +// OnNodeLabelsUpdate should be used by an external user to update the node's labels. +// This will trigger a resync to update NodePort service programming based on the new labels. +func (kp *KubeProxy) OnNodeLabelsUpdate(labels map[string]string) { + kp.lock.Lock() + oldLabels := kp.nodeLabels + kp.nodeLabels = labels + hostIPs := kp.lastHostIPs + kp.lock.Unlock() + + // Check if labels actually changed + labelsChanged := len(oldLabels) != len(labels) + if !labelsChanged { + for k, v := range labels { + if oldLabels[k] != v { + labelsChanged = true + break + } + } + } + + if !labelsChanged { + log.Debug("Node labels unchanged, skipping resync") + return + } + + log.WithFields(log.Fields{ + "oldLabels": oldLabels, + "newLabels": labels, + }).Info("Node labels changed, triggering kube-proxy resync") + + // Trigger a resync with the current host IPs + // This will recreate the syncer with updated node labels + if len(hostIPs) > 0 { + kp.OnHostIPsUpdate(hostIPs) + } else { + log.Warn("No host IPs available for resync after label update") + } +} + // OnRouteUpdate should be used to update the internal state of routing tables func (kp *KubeProxy) OnRouteUpdate(k routes.KeyInterface, v routes.ValueInterface) { log.WithFields(log.Fields{"key": k, "value": v}).Debug("kube-proxy: OnRouteUpdate") diff --git a/felix/bpf/proxy/lb_src_range_test.go b/felix/bpf/proxy/lb_src_range_test.go index 48bb142eabc..a8b8ef0ff66 100644 --- a/felix/bpf/proxy/lb_src_range_test.go +++ b/felix/bpf/proxy/lb_src_range_test.go @@ -47,7 +47,7 @@ func testfn(makeIPs func(ips []net.IP) proxy.K8sServicePortOption) { externalIP := makeIPs([]net.IP{net.IPv4(35, 0, 0, 2)}) twoExternalIPs := makeIPs([]net.IP{net.IPv4(35, 0, 0, 2), net.IPv4(45, 0, 1, 2)}) - s, _ := proxy.NewSyncer(4, nodeIPs, svcs, eps, aff, rt, nil) + s, _ := proxy.NewSyncer(4, nodeIPs, svcs, eps, aff, rt, nil, nil) svcKey := k8sp.ServicePortName{ NamespacedName: types.NamespacedName{ @@ -210,7 +210,7 @@ func testfn(makeIPs func(ips []net.IP) proxy.K8sServicePortOption) { externalIP, proxy.K8sSvcWithLBSourceRangeIPs([]*net.IPNet{&ipnet}), ) - s, _ = proxy.NewSyncer(4, nodeIPs, svcs, eps, aff, rt, nil) + s, _ = proxy.NewSyncer(4, nodeIPs, svcs, eps, aff, rt, nil, nil) err := s.Apply(state) Expect(err).NotTo(HaveOccurred()) Expect(svcs.m).To(HaveLen(3)) @@ -223,7 +223,7 @@ func testfn(makeIPs func(ips []net.IP) proxy.K8sServicePortOption) { v1.ProtocolTCP, externalIP, ) - s, _ = proxy.NewSyncer(4, nodeIPs, svcs, eps, aff, rt, nil) + s, _ = proxy.NewSyncer(4, nodeIPs, svcs, eps, aff, rt, nil, nil) err := s.Apply(state) Expect(err).NotTo(HaveOccurred()) Expect(svcs.m).To(HaveLen(2)) @@ -253,7 +253,7 @@ func test0000SourceRange() { externalIP := net.IPv4(35, 0, 0, 2) - s, _ := proxy.NewSyncer(4, nodeIPs, svcs, eps, aff, rt, nil) + s, _ := proxy.NewSyncer(4, nodeIPs, svcs, eps, aff, rt, nil, nil) svcKey := k8sp.ServicePortName{ NamespacedName: types.NamespacedName{ diff --git a/felix/bpf/proxy/proxy.go b/felix/bpf/proxy/proxy.go index d4120854829..ffe1bc6a6d9 100644 --- a/felix/bpf/proxy/proxy.go +++ b/felix/bpf/proxy/proxy.go @@ -394,16 +394,19 @@ const ( ReapTerminatingUDPImmediatelly = "TerminatingImmediately" ExcludeServiceAnnotation = "projectcalico.org/natExcludeService" + NodeSelectorAnnotation = "projectcalico.org/nodeSelector" ) type ServiceAnnotations interface { ReapTerminatingUDP() bool ExcludeService() bool + NodeSelector() string } type servicePortAnnotations struct { reapTerminatingUDP bool excludeService bool + nodeSelector string } func (s *servicePortAnnotations) ReapTerminatingUDP() bool { @@ -414,6 +417,10 @@ func (s *servicePortAnnotations) ExcludeService() bool { return s.excludeService } +func (s *servicePortAnnotations) NodeSelector() string { + return s.nodeSelector +} + type servicePort struct { k8sp.ServicePort servicePortAnnotations @@ -435,6 +442,10 @@ func makeServiceInfo(_ *v1.ServicePort, s *v1.Service, baseSvc *k8sp.BaseService } } + if v, ok := s.Annotations[NodeSelectorAnnotation]; ok { + svc.nodeSelector = strings.TrimSpace(v) + } + out: return svc } diff --git a/felix/bpf/proxy/proxy_bench_test.go b/felix/bpf/proxy/proxy_bench_test.go index 2cad81f2bb7..60a6363ede3 100644 --- a/felix/bpf/proxy/proxy_bench_test.go +++ b/felix/bpf/proxy/proxy_bench_test.go @@ -49,6 +49,7 @@ func benchmarkProxyUpdates(b *testing.B, svcN, epsN int) { &mock.DummyMap{}, proxy.NewRTCache(), nil, + nil, ) Expect(err).ShouldNot(HaveOccurred()) diff --git a/felix/bpf/proxy/syncer.go b/felix/bpf/proxy/syncer.go index 33b58db28d2..d1a191acc64 100644 --- a/felix/bpf/proxy/syncer.go +++ b/felix/bpf/proxy/syncer.go @@ -34,6 +34,7 @@ import ( "github.com/projectcalico/calico/felix/bpf/routes" "github.com/projectcalico/calico/felix/cachingmap" "github.com/projectcalico/calico/felix/ip" + "github.com/projectcalico/calico/libcalico-go/lib/selector" ) var podNPIPStr = "255.255.255.255" @@ -121,6 +122,7 @@ type Syncer struct { nextSvcID uint32 nodePortIPs []net.IP + nodeLabels map[string]string rt Routes // new maps are valid during the Apply()'s runtime to provide easy access @@ -209,11 +211,28 @@ func uniqueIPs(ips []net.IP) []net.IP { return ret } +// nodeMatchesSelector checks if the current node matches the given selector expression. +// Returns true if the selector is empty or if the node labels match the selector. +func (s *Syncer) nodeMatchesSelector(selectorStr string) bool { + if selectorStr == "" { + return true + } + + sel, err := selector.Parse(selectorStr) + if err != nil { + log.WithError(err).WithField("selector", selectorStr).Error("Failed to parse node selector, ignoring") + return true // Default to allowing on parse error to avoid breaking services + } + + return sel.Evaluate(s.nodeLabels) +} + // NewSyncer returns a new Syncer func NewSyncer(family int, nodePortIPs []net.IP, frontendMap maps.MapWithExistsCheck, backendMap maps.MapWithExistsCheck, affmap maps.Map, rt Routes, excludedCIDRs *ip.CIDRTrie, + nodeLabels map[string]string, ) (*Syncer, error) { s := &Syncer{ @@ -221,6 +240,7 @@ func NewSyncer(family int, nodePortIPs []net.IP, bpfAff: affmap, rt: rt, nodePortIPs: uniqueIPs(nodePortIPs), + nodeLabels: nodeLabels, prevSvcMap: make(map[svcKey]svcInfo), prevEpsMap: make(k8sp.EndpointsMap), stop: make(chan struct{}), @@ -649,25 +669,38 @@ func (s *Syncer) apply(state DPSyncerState) error { } if nport := svc.NodePort(); nport != 0 { - for _, npip := range s.nodePortIPs { - npInfo := serviceInfoFromK8sServicePort(svc) - npInfo.clusterIP = npip - npInfo.port = nport - if svc.InternalPolicyLocal() && - ((s.ipFamily == 4 && npip.Equal(podNPIP)) || (s.ipFamily == 6 && npip.Equal(podNPIPV6))) { - // do not program the meta entry, program each node - // separately - continue - } - err := s.applyDerived(sname, svcTypeNodePort, npInfo) - if err != nil { - log.Errorf("failed to apply NodePort %s for service %s : %s", npip, sname, err) - continue - } + // Check if this node matches the service's node selector (if any) + nodeSelector := "" + if svcAnnotated, ok := svc.(Service); ok { + nodeSelector = svcAnnotated.NodeSelector() } - if svc.InternalPolicyLocal() { - if miss := s.expandAndApplyNodePorts(sname, svc, eps, nport, s.rt.Lookup); miss != nil { - expNPMisses = append(expNPMisses, miss) + + if !s.nodeMatchesSelector(nodeSelector) { + log.WithFields(log.Fields{ + "service": sname, + "selector": nodeSelector, + }).Debug("Node does not match selector, skipping NodePort programming") + } else { + for _, npip := range s.nodePortIPs { + npInfo := serviceInfoFromK8sServicePort(svc) + npInfo.clusterIP = npip + npInfo.port = nport + if svc.InternalPolicyLocal() && + ((s.ipFamily == 4 && npip.Equal(podNPIP)) || (s.ipFamily == 6 && npip.Equal(podNPIPV6))) { + // do not program the meta entry, program each node + // separately + continue + } + err := s.applyDerived(sname, svcTypeNodePort, npInfo) + if err != nil { + log.Errorf("failed to apply NodePort %s for service %s : %s", npip, sname, err) + continue + } + } + if svc.InternalPolicyLocal() { + if miss := s.expandAndApplyNodePorts(sname, svc, eps, nport, s.rt.Lookup); miss != nil { + expNPMisses = append(expNPMisses, miss) + } } } } @@ -1558,3 +1591,23 @@ func K8sSvcWithReapTerminatingUDP() K8sServicePortOption { s.(*servicePort).reapTerminatingUDP = true } } + +// NewK8sServicePortWithSelector creates a new k8s ServicePort with a node selector +func NewK8sServicePortWithSelector(clusterIP net.IP, port int, proto v1.Protocol, + nodeSelector string, opts ...K8sServicePortOption) k8sp.ServicePort { + + x := &servicePort{ + ServicePort: &serviceInfo{ + clusterIP: clusterIP, + port: port, + protocol: proto, + }, + } + + x.nodeSelector = nodeSelector + + for _, o := range opts { + o(x) + } + return x +} diff --git a/felix/bpf/proxy/syncer_bench_test.go b/felix/bpf/proxy/syncer_bench_test.go index 039989f0d83..0ce8c1ae78c 100644 --- a/felix/bpf/proxy/syncer_bench_test.go +++ b/felix/bpf/proxy/syncer_bench_test.go @@ -173,6 +173,7 @@ func runBenchmarkServiceUpdate(b *testing.B, svcCnt, epCnt int, mockMaps bool, o &mock.DummyMap{}, NewRTCache(), nil, + nil, ) Expect(err).ShouldNot(HaveOccurred()) } else { @@ -190,6 +191,7 @@ func runBenchmarkServiceUpdate(b *testing.B, svcCnt, epCnt int, mockMaps bool, o &mock.DummyMap{}, NewRTCache(), nil, + nil, ) Expect(err).ShouldNot(HaveOccurred()) } diff --git a/felix/bpf/proxy/syncer_test.go b/felix/bpf/proxy/syncer_test.go index 212db8cd6b1..37b5dffc424 100644 --- a/felix/bpf/proxy/syncer_test.go +++ b/felix/bpf/proxy/syncer_test.go @@ -74,7 +74,7 @@ var _ = Describe("BPF Syncer", func() { rt = proxy.NewRTCache() - s, _ = proxy.NewSyncer(4, nodeIPs, svcs, eps, aff, rt, nil) + s, _ = proxy.NewSyncer(4, nodeIPs, svcs, eps, aff, rt, nil, nil) ep := proxy.NewEndpointInfo("10.1.0.1", 5555, proxy.EndpointInfoOptIsReady(true)) state = proxy.DPSyncerState{ @@ -498,7 +498,7 @@ var _ = Describe("BPF Syncer", func() { })) By("resyncing after creating a new syncer with the same result", makestep(func() { - s, _ = proxy.NewSyncer(4, nodeIPs, svcs, eps, aff, rt, nil) + s, _ = proxy.NewSyncer(4, nodeIPs, svcs, eps, aff, rt, nil, nil) checkAfterResync() })) @@ -506,7 +506,7 @@ var _ = Describe("BPF Syncer", func() { svcs.m[nat.NewNATKey(net.IPv4(5, 5, 5, 5), 1111, 6)] = nat.NewNATValue(0xdeadbeef, 2, 2, 0) eps.m[nat.NewNATBackendKey(0xdeadbeef, 0)] = nat.NewNATBackendValue(net.IPv4(6, 6, 6, 6), 666) eps.m[nat.NewNATBackendKey(0xdeadbeef, 1)] = nat.NewNATBackendValue(net.IPv4(7, 7, 7, 7), 777) - s, _ = proxy.NewSyncer(4, nodeIPs, svcs, eps, aff, rt, nil) + s, _ = proxy.NewSyncer(4, nodeIPs, svcs, eps, aff, rt, nil, nil) checkAfterResync() })) @@ -654,7 +654,7 @@ var _ = Describe("BPF Syncer", func() { By("inserting non-local eps for a NodePort - no route", makestep(func() { // use the meta node IP for nodeports as well - s, _ = proxy.NewSyncer(4, append(nodeIPs, net.IPv4(255, 255, 255, 255)), svcs, eps, aff, rt, nil) + s, _ = proxy.NewSyncer(4, append(nodeIPs, net.IPv4(255, 255, 255, 255)), svcs, eps, aff, rt, nil, nil) state.SvcMap[svcKey2] = proxy.NewK8sServicePort( net.IPv4(10, 0, 0, 2), 2222, @@ -807,7 +807,7 @@ var _ = Describe("BPF Syncer", func() { By("inserting only non-local eps for a NodePort - multiple nodes & pods/node", makestep(func() { // use the meta node IP for nodeports as well - s, _ = proxy.NewSyncer(4, append(nodeIPs, net.IPv4(255, 255, 255, 255)), svcs, eps, aff, rt, nil) + s, _ = proxy.NewSyncer(4, append(nodeIPs, net.IPv4(255, 255, 255, 255)), svcs, eps, aff, rt, nil, nil) state.SvcMap[svcKey2] = proxy.NewK8sServicePort( net.IPv4(10, 0, 0, 2), 2222, @@ -887,7 +887,7 @@ var _ = Describe("BPF Syncer", func() { By("restarting Syncer to check if NodePortRemotes are picked up correctly", makestep(func() { // use the meta node IP for nodeports as well - s, _ = proxy.NewSyncer(4, append(nodeIPs, net.IPv4(255, 255, 255, 255)), svcs, eps, aff, rt, nil) + s, _ = proxy.NewSyncer(4, append(nodeIPs, net.IPv4(255, 255, 255, 255)), svcs, eps, aff, rt, nil, nil) err := s.Apply(state) Expect(err).NotTo(HaveOccurred()) @@ -1539,3 +1539,223 @@ func ctEntriesForSvc(ct maps.Map, proto v1.Protocol, err = ct.Update(revKey.AsBytes(), val.AsBytes()) Expect(err).NotTo(HaveOccurred(), "Test failed to populate ct map with REV") } + +var _ = Describe("NodePort with node selector", func() { + var ( + s *proxy.Syncer + svcs *mock.NATMap + eps *mock.NATBackendMap + aff *mock.Map + rt *proxy.RTCache + state proxy.DPSyncerState + ) + + BeforeEach(func() { + svcs = newMockNATMap() + eps = newMockNATBackendMap() + aff = newMockAffinityMap() + rt = proxy.NewRTCache() + state = proxy.DPSyncerState{ + SvcMap: make(k8sp.ServicePortMap), + EpsMap: make(k8sp.EndpointsMap), + NodeZone: "zone-a", + } + }) + + It("should program NodePort when node matches selector", func() { + nodeLabels := map[string]string{ + "tenant": "foo", + "env": "production", + } + s, _ = proxy.NewSyncer(4, []net.IP{net.IPv4(1, 2, 3, 4)}, svcs, eps, aff, rt, nil, nodeLabels) + + svcKey := k8sp.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "default", Name: "test-svc"}, Port: "http"} + + // Create service port with node selector through NewK8sServicePortWithSelector + svcPort := proxy.NewK8sServicePortWithSelector(net.IPv4(10, 0, 0, 1), 80, v1.ProtocolTCP, + "tenant == 'foo'", + proxy.K8sSvcWithNodePort(30000)) + + ep := proxy.NewEndpointInfo("10.1.0.1", 8080, proxy.EndpointInfoOptIsReady(true), proxy.EndpointInfoOptIsLocal(true)) + + state.SvcMap[svcKey] = svcPort + state.EpsMap[svcKey] = []k8sp.Endpoint{ep} + + err := s.Apply(state) + Expect(err).NotTo(HaveOccurred()) + + // Verify NodePort was programmed + npKey := nat.NewNATKey(net.IPv4(1, 2, 3, 4), 30000, 6) + _, ok := svcs.m[npKey] + Expect(ok).To(BeTrue(), "NodePort should be programmed when selector matches") + }) + + It("should NOT program NodePort when node does not match selector", func() { + nodeLabels := map[string]string{ + "tenant": "bar", + "env": "production", + } + s, _ = proxy.NewSyncer(4, []net.IP{net.IPv4(1, 2, 3, 4)}, svcs, eps, aff, rt, nil, nodeLabels) + + svcKey := k8sp.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "default", Name: "test-svc"}, Port: "http"} + + // Create service port with node selector + svcPort := proxy.NewK8sServicePortWithSelector(net.IPv4(10, 0, 0, 1), 80, v1.ProtocolTCP, + "tenant == 'foo'", + proxy.K8sSvcWithNodePort(30000)) + + ep := proxy.NewEndpointInfo("10.1.0.1", 8080, proxy.EndpointInfoOptIsReady(true), proxy.EndpointInfoOptIsLocal(true)) + + state.SvcMap[svcKey] = svcPort + state.EpsMap[svcKey] = []k8sp.Endpoint{ep} + + err := s.Apply(state) + Expect(err).NotTo(HaveOccurred()) + + // Verify NodePort was NOT programmed + npKey := nat.NewNATKey(net.IPv4(1, 2, 3, 4), 30000, 6) + _, ok := svcs.m[npKey] + Expect(ok).To(BeFalse(), "NodePort should NOT be programmed when selector does not match") + }) + + It("should program NodePort when selector is empty", func() { + nodeLabels := map[string]string{ + "tenant": "bar", + } + s, _ = proxy.NewSyncer(4, []net.IP{net.IPv4(1, 2, 3, 4)}, svcs, eps, aff, rt, nil, nodeLabels) + + svcKey := k8sp.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "default", Name: "test-svc"}, Port: "http"} + + // Create service port without node selector annotation + svcPort := proxy.NewK8sServicePort(net.IPv4(10, 0, 0, 1), 80, v1.ProtocolTCP, + proxy.K8sSvcWithNodePort(30000)) + + ep := proxy.NewEndpointInfo("10.1.0.1", 8080, proxy.EndpointInfoOptIsReady(true), proxy.EndpointInfoOptIsLocal(true)) + + state.SvcMap[svcKey] = svcPort + state.EpsMap[svcKey] = []k8sp.Endpoint{ep} + + err := s.Apply(state) + Expect(err).NotTo(HaveOccurred()) + + // Verify NodePort was programmed (default behavior when no selector) + npKey := nat.NewNATKey(net.IPv4(1, 2, 3, 4), 30000, 6) + _, ok := svcs.m[npKey] + Expect(ok).To(BeTrue(), "NodePort should be programmed when no selector is specified") + }) + + It("should handle complex selectors", func() { + nodeLabels := map[string]string{ + "tenant": "acme", + "env": "production", + "region": "us-west", + } + s, _ = proxy.NewSyncer(4, []net.IP{net.IPv4(1, 2, 3, 4)}, svcs, eps, aff, rt, nil, nodeLabels) + + svcKey := k8sp.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "default", Name: "test-svc"}, Port: "http"} + + // Create service port with complex selector + svcPort := proxy.NewK8sServicePortWithSelector(net.IPv4(10, 0, 0, 1), 80, v1.ProtocolTCP, + "tenant == 'acme' && env == 'production'", + proxy.K8sSvcWithNodePort(30000)) + + ep := proxy.NewEndpointInfo("10.1.0.1", 8080, proxy.EndpointInfoOptIsReady(true), proxy.EndpointInfoOptIsLocal(true)) + + state.SvcMap[svcKey] = svcPort + state.EpsMap[svcKey] = []k8sp.Endpoint{ep} + + err := s.Apply(state) + Expect(err).NotTo(HaveOccurred()) + + // Verify NodePort was programmed (both conditions match) + npKey := nat.NewNATKey(net.IPv4(1, 2, 3, 4), 30000, 6) + _, ok := svcs.m[npKey] + Expect(ok).To(BeTrue(), "NodePort should be programmed when complex selector matches") + }) + + It("should handle 'in' operator selectors", func() { + nodeLabels := map[string]string{ + "zone": "us-east-1a", + } + s, _ = proxy.NewSyncer(4, []net.IP{net.IPv4(1, 2, 3, 4)}, svcs, eps, aff, rt, nil, nodeLabels) + + svcKey := k8sp.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "default", Name: "test-svc"}, Port: "http"} + + // Create service port with 'in' selector + svcPort := proxy.NewK8sServicePortWithSelector(net.IPv4(10, 0, 0, 1), 80, v1.ProtocolTCP, + "zone in { 'us-east-1a', 'us-east-1b' }", + proxy.K8sSvcWithNodePort(30000)) + + ep := proxy.NewEndpointInfo("10.1.0.1", 8080, proxy.EndpointInfoOptIsReady(true), proxy.EndpointInfoOptIsLocal(true)) + + state.SvcMap[svcKey] = svcPort + state.EpsMap[svcKey] = []k8sp.Endpoint{ep} + + err := s.Apply(state) + Expect(err).NotTo(HaveOccurred()) + + // Verify NodePort was programmed (zone is in the set) + npKey := nat.NewNATKey(net.IPv4(1, 2, 3, 4), 30000, 6) + _, ok := svcs.m[npKey] + Expect(ok).To(BeTrue(), "NodePort should be programmed when 'in' selector matches") + }) + + It("should handle invalid selectors gracefully", func() { + nodeLabels := map[string]string{ + "tenant": "acme", + } + s, _ = proxy.NewSyncer(4, []net.IP{net.IPv4(1, 2, 3, 4)}, svcs, eps, aff, rt, nil, nodeLabels) + + svcKey := k8sp.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "default", Name: "test-svc"}, Port: "http"} + + // Create service port with invalid selector syntax + svcPort := proxy.NewK8sServicePortWithSelector(net.IPv4(10, 0, 0, 1), 80, v1.ProtocolTCP, + "invalid selector syntax @#$", + proxy.K8sSvcWithNodePort(30000)) + + ep := proxy.NewEndpointInfo("10.1.0.1", 8080, proxy.EndpointInfoOptIsReady(true), proxy.EndpointInfoOptIsLocal(true)) + + state.SvcMap[svcKey] = svcPort + state.EpsMap[svcKey] = []k8sp.Endpoint{ep} + + err := s.Apply(state) + Expect(err).NotTo(HaveOccurred()) + + // Verify NodePort was still programmed (fail-open behavior on parse error) + npKey := nat.NewNATKey(net.IPv4(1, 2, 3, 4), 30000, 6) + _, ok := svcs.m[npKey] + Expect(ok).To(BeTrue(), "NodePort should be programmed when selector is invalid (fail-open)") + }) + + It("should work with multiple NodePort IPs", func() { + nodeLabels := map[string]string{ + "tenant": "acme", + } + nodeIPs := []net.IP{net.IPv4(1, 2, 3, 4), net.IPv4(5, 6, 7, 8)} + s, _ = proxy.NewSyncer(4, nodeIPs, svcs, eps, aff, rt, nil, nodeLabels) + + svcKey := k8sp.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "default", Name: "test-svc"}, Port: "http"} + + // Create service port with node selector + svcPort := proxy.NewK8sServicePortWithSelector(net.IPv4(10, 0, 0, 1), 80, v1.ProtocolTCP, + "tenant == 'acme'", + proxy.K8sSvcWithNodePort(30000)) + + ep := proxy.NewEndpointInfo("10.1.0.1", 8080, proxy.EndpointInfoOptIsReady(true), proxy.EndpointInfoOptIsLocal(true)) + + state.SvcMap[svcKey] = svcPort + state.EpsMap[svcKey] = []k8sp.Endpoint{ep} + + err := s.Apply(state) + Expect(err).NotTo(HaveOccurred()) + + // Verify NodePort was programmed on all NodePort IPs + npKey1 := nat.NewNATKey(net.IPv4(1, 2, 3, 4), 30000, 6) + _, ok1 := svcs.m[npKey1] + Expect(ok1).To(BeTrue(), "NodePort should be programmed on first IP when selector matches") + + npKey2 := nat.NewNATKey(net.IPv4(5, 6, 7, 8), 30000, 6) + _, ok2 := svcs.m[npKey2] + Expect(ok2).To(BeTrue(), "NodePort should be programmed on second IP when selector matches") + }) +}) diff --git a/felix/dataplane/linux/bpf_route_mgr.go b/felix/dataplane/linux/bpf_route_mgr.go index d207851bcb9..1e54f2752c6 100644 --- a/felix/dataplane/linux/bpf_route_mgr.go +++ b/felix/dataplane/linux/bpf_route_mgr.go @@ -89,10 +89,11 @@ type bpfRouteManager struct { dirtyRoutes set.Set[routes.KeyInterface] // Callbacks used to tell kube-proxy about the relevant routes. - cbLck sync.RWMutex - hostIPsUpdateCB func([]net.IP) - routesUpdateCB func(routes.KeyInterface, routes.ValueInterface) - routesDeleteCB func(routes.KeyInterface) + cbLck sync.RWMutex + hostIPsUpdateCB func([]net.IP) + routesUpdateCB func(routes.KeyInterface, routes.ValueInterface) + routesDeleteCB func(routes.KeyInterface) + nodeLabelsUpdateCB func(map[string]string) opReporter logutils.OpRecorder @@ -211,6 +212,12 @@ func (m *bpfRouteManager) OnUpdate(msg interface{}) { m.onWorkloadEndpointRemove(msg) case *proto.GlobalBGPConfigUpdate: m.onBGPConfigUpdate(msg) + + // Updates for local node metadata including labels. + case *proto.HostMetadataV4V6Update: + if msg.Hostname == m.myNodename && msg.Labels != nil { + m.onNodeLabelsChange(msg.Labels) + } } } @@ -600,6 +607,15 @@ func (m *bpfRouteManager) onHostIPsChange(newIPs []net.IP) { log.Debugf("localHostIPs update %+v", newIPs) } +func (m *bpfRouteManager) onNodeLabelsChange(labels map[string]string) { + m.cbLck.RLock() + defer m.cbLck.RUnlock() + if m.nodeLabelsUpdateCB != nil { + m.nodeLabelsUpdateCB(labels) + } + log.WithField("labels", labels).Debug("Node labels update") +} + func (m *bpfRouteManager) onRouteUpdate(update *proto.RouteUpdate) { cidr := ip.MustParseCIDROrIP(update.Dst) if uint8(m.ipFamily) != cidr.Version() { @@ -748,6 +764,13 @@ func (m *bpfRouteManager) setHostIPUpdatesCallBack(cb func([]net.IP)) { m.hostIPsUpdateCB = cb } +func (m *bpfRouteManager) setNodeLabelsUpdateCallBack(cb func(map[string]string)) { + m.cbLck.Lock() + defer m.cbLck.Unlock() + + m.nodeLabelsUpdateCB = cb +} + func (m *bpfRouteManager) setRoutesCallBacks(update func(routes.KeyInterface, routes.ValueInterface), del func(routes.KeyInterface)) { m.cbLck.Lock() defer m.cbLck.Unlock() diff --git a/felix/dataplane/linux/int_dataplane.go b/felix/dataplane/linux/int_dataplane.go index 11569549bf7..ae6da2fe871 100644 --- a/felix/dataplane/linux/int_dataplane.go +++ b/felix/dataplane/linux/int_dataplane.go @@ -2935,6 +2935,7 @@ func startBPFDataplaneComponents( bpfRTMgr.setHostIPUpdatesCallBack(kp.OnHostIPsUpdate) bpfRTMgr.setRoutesCallBacks(kp.OnRouteUpdate, kp.OnRouteDelete) + bpfRTMgr.setNodeLabelsUpdateCallBack(kp.OnNodeLabelsUpdate) conntrackScanner.AddUnlocked(bpfconntrack.NewStaleNATScanner(kp)) } else { log.Info("BPF enabled but no Kubernetes client available, unable to run kube-proxy module.")