Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 73 additions & 2 deletions felix/bpf/proxy/kube-proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,14 @@
package proxy

import (
"context"
"net"
"sync"
"time"

"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"

"github.com/projectcalico/calico/felix/bpf/bpfmap"
Expand All @@ -43,6 +46,8 @@ type KubeProxy struct {

k8s kubernetes.Interface
hostname string
nodeLabels map[string]string
lastHostIPs []net.IP
frontendMap maps.MapWithExistsCheck
backendMap maps.MapWithExistsCheck
affinityMap maps.Map
Expand Down Expand Up @@ -107,6 +112,29 @@ func (kp *KubeProxy) Stop() {
})
}

func (kp *KubeProxy) fetchNodeLabels() map[string]string {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

node, err := kp.k8s.CoreV1().Nodes().Get(ctx, kp.hostname, metav1.GetOptions{})
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we shouldn't be performing any Get requests from within the Felix code. Felix runs as a DaemonSet on every node in the cluster, and for very large clusters this can result in problems with scale.

We will need to feed this via the Syncer which is a Watch API fed by Typha for scale purposes. I believe Felix is already receiving node labels over that API (or at least has the ability to without much fuss)

if err != nil {
log.WithError(err).WithField("hostname", kp.hostname).Warn("Failed to fetch node labels, using empty labels")
return make(map[string]string)
}

labels := make(map[string]string, len(node.Labels))
for k, v := range node.Labels {
labels[k] = v
}

log.WithFields(log.Fields{
"hostname": kp.hostname,
"labels": labels,
}).Debug("Fetched node labels")

return labels
}

func (kp *KubeProxy) run(hostIPs []net.IP) error {

ips := make([]net.IP, 0, len(hostIPs))
Expand All @@ -123,6 +151,9 @@ func (kp *KubeProxy) run(hostIPs []net.IP) error {
kp.lock.Lock()
defer kp.lock.Unlock()

kp.lastHostIPs = hostIPs
kp.nodeLabels = kp.fetchNodeLabels()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you do not need to fetch the labels, you already got them in OnNodeLabelsUpdate()

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, agreed. I am updating it. I got a bit confused there. Thanks.


withLocalNP := make([]net.IP, len(hostIPs), len(hostIPs)+1)
copy(withLocalNP, hostIPs)
if kp.ipFamily == 4 {
Expand All @@ -132,7 +163,7 @@ func (kp *KubeProxy) run(hostIPs []net.IP) error {
}

syncer, err := NewSyncer(kp.ipFamily, withLocalNP, kp.frontendMap, kp.backendMap, kp.affinityMap,
kp.rt, kp.excludedCIDRs)
kp.rt, kp.excludedCIDRs, kp.nodeLabels)
if err != nil {
return errors.WithMessage(err, "new bpf syncer")
}
Expand All @@ -154,7 +185,8 @@ func (kp *KubeProxy) start() error {
withLocalNP = append(withLocalNP, podNPIPV6)
}

syncer, err := NewSyncer(kp.ipFamily, withLocalNP, kp.frontendMap, kp.backendMap, kp.affinityMap, kp.rt, kp.excludedCIDRs)
// Node labels will be fetched in run() when we have the actual host IPs
syncer, err := NewSyncer(kp.ipFamily, withLocalNP, kp.frontendMap, kp.backendMap, kp.affinityMap, kp.rt, kp.excludedCIDRs, nil)
if err != nil {
return errors.WithMessage(err, "new bpf syncer")
}
Expand Down Expand Up @@ -219,6 +251,45 @@ func (kp *KubeProxy) OnHostIPsUpdate(IPs []net.IP) {
log.Debugf("kube-proxy OnHostIPsUpdate: %+v", IPs)
}

// OnNodeLabelsUpdate should be used by an external user to update the node's labels.
// This will trigger a resync to update NodePort service programming based on the new labels.
func (kp *KubeProxy) OnNodeLabelsUpdate(labels map[string]string) {
kp.lock.Lock()
oldLabels := kp.nodeLabels
kp.nodeLabels = labels
hostIPs := kp.lastHostIPs
kp.lock.Unlock()

// Check if labels actually changed
labelsChanged := len(oldLabels) != len(labels)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should be able to use maps.Equal for this to simplify a bit

if !labelsChanged {
for k, v := range labels {
if oldLabels[k] != v {
labelsChanged = true
break
}
}
}

if !labelsChanged {
log.Debug("Node labels unchanged, skipping resync")
return
}

log.WithFields(log.Fields{
"oldLabels": oldLabels,
"newLabels": labels,
}).Info("Node labels changed, triggering kube-proxy resync")

// Trigger a resync with the current host IPs
// This will recreate the syncer with updated node labels
if len(hostIPs) > 0 {
kp.OnHostIPsUpdate(hostIPs)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like we are already getting node label updates streamed to us, so I think instead of calling OnHostIPsUpdate here we could introduce a new method for updating the labels that triggers a re-evaluation of Service programming, getting rid of the need for the Get() call mentioned above, right?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, I was going to say that we should perhaps use the same/similar approach as when updating the host IPs. Both is expected to be rather infrequent event.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Follow the OnHostIPsUpdate example. To avoid possibly restarting kube-proxy 2x, it might be the best to move https://github.com/projectcalico/calico/blob/master/felix/dataplane/linux/bpf_route_mgr.go#L590 to https://github.com/projectcalico/calico/blob/master/felix/dataplane/linux/bpf_route_mgr.go#L242 if there was any change and send labels and hostIPs as a single update.

} else {
log.Warn("No host IPs available for resync after label update")
}
}

// OnRouteUpdate should be used to update the internal state of routing tables
func (kp *KubeProxy) OnRouteUpdate(k routes.KeyInterface, v routes.ValueInterface) {
log.WithFields(log.Fields{"key": k, "value": v}).Debug("kube-proxy: OnRouteUpdate")
Expand Down
8 changes: 4 additions & 4 deletions felix/bpf/proxy/lb_src_range_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func testfn(makeIPs func(ips []net.IP) proxy.K8sServicePortOption) {
externalIP := makeIPs([]net.IP{net.IPv4(35, 0, 0, 2)})
twoExternalIPs := makeIPs([]net.IP{net.IPv4(35, 0, 0, 2), net.IPv4(45, 0, 1, 2)})

s, _ := proxy.NewSyncer(4, nodeIPs, svcs, eps, aff, rt, nil)
s, _ := proxy.NewSyncer(4, nodeIPs, svcs, eps, aff, rt, nil, nil)

svcKey := k8sp.ServicePortName{
NamespacedName: types.NamespacedName{
Expand Down Expand Up @@ -210,7 +210,7 @@ func testfn(makeIPs func(ips []net.IP) proxy.K8sServicePortOption) {
externalIP,
proxy.K8sSvcWithLBSourceRangeIPs([]*net.IPNet{&ipnet}),
)
s, _ = proxy.NewSyncer(4, nodeIPs, svcs, eps, aff, rt, nil)
s, _ = proxy.NewSyncer(4, nodeIPs, svcs, eps, aff, rt, nil, nil)
err := s.Apply(state)
Expect(err).NotTo(HaveOccurred())
Expect(svcs.m).To(HaveLen(3))
Expand All @@ -223,7 +223,7 @@ func testfn(makeIPs func(ips []net.IP) proxy.K8sServicePortOption) {
v1.ProtocolTCP,
externalIP,
)
s, _ = proxy.NewSyncer(4, nodeIPs, svcs, eps, aff, rt, nil)
s, _ = proxy.NewSyncer(4, nodeIPs, svcs, eps, aff, rt, nil, nil)
err := s.Apply(state)
Expect(err).NotTo(HaveOccurred())
Expect(svcs.m).To(HaveLen(2))
Expand Down Expand Up @@ -253,7 +253,7 @@ func test0000SourceRange() {

externalIP := net.IPv4(35, 0, 0, 2)

s, _ := proxy.NewSyncer(4, nodeIPs, svcs, eps, aff, rt, nil)
s, _ := proxy.NewSyncer(4, nodeIPs, svcs, eps, aff, rt, nil, nil)

svcKey := k8sp.ServicePortName{
NamespacedName: types.NamespacedName{
Expand Down
11 changes: 11 additions & 0 deletions felix/bpf/proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,16 +394,19 @@ const (
ReapTerminatingUDPImmediatelly = "TerminatingImmediately"

ExcludeServiceAnnotation = "projectcalico.org/natExcludeService"
NodeSelectorAnnotation = "projectcalico.org/nodeSelector"
)

type ServiceAnnotations interface {
ReapTerminatingUDP() bool
ExcludeService() bool
NodeSelector() string
}

type servicePortAnnotations struct {
reapTerminatingUDP bool
excludeService bool
nodeSelector string
}

func (s *servicePortAnnotations) ReapTerminatingUDP() bool {
Expand All @@ -414,6 +417,10 @@ func (s *servicePortAnnotations) ExcludeService() bool {
return s.excludeService
}

func (s *servicePortAnnotations) NodeSelector() string {
return s.nodeSelector
}

type servicePort struct {
k8sp.ServicePort
servicePortAnnotations
Expand All @@ -435,6 +442,10 @@ func makeServiceInfo(_ *v1.ServicePort, s *v1.Service, baseSvc *k8sp.BaseService
}
}

if v, ok := s.Annotations[NodeSelectorAnnotation]; ok {
svc.nodeSelector = strings.TrimSpace(v)
}

out:
return svc
}
Expand Down
1 change: 1 addition & 0 deletions felix/bpf/proxy/proxy_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ func benchmarkProxyUpdates(b *testing.B, svcN, epsN int) {
&mock.DummyMap{},
proxy.NewRTCache(),
nil,
nil,
)
Expect(err).ShouldNot(HaveOccurred())

Expand Down
89 changes: 71 additions & 18 deletions felix/bpf/proxy/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/projectcalico/calico/felix/bpf/routes"
"github.com/projectcalico/calico/felix/cachingmap"
"github.com/projectcalico/calico/felix/ip"
"github.com/projectcalico/calico/libcalico-go/lib/selector"
)

var podNPIPStr = "255.255.255.255"
Expand Down Expand Up @@ -121,6 +122,7 @@ type Syncer struct {
nextSvcID uint32

nodePortIPs []net.IP
nodeLabels map[string]string
rt Routes

// new maps are valid during the Apply()'s runtime to provide easy access
Expand Down Expand Up @@ -209,18 +211,36 @@ func uniqueIPs(ips []net.IP) []net.IP {
return ret
}

// nodeMatchesSelector checks if the current node matches the given selector expression.
// Returns true if the selector is empty or if the node labels match the selector.
func (s *Syncer) nodeMatchesSelector(selectorStr string) bool {
if selectorStr == "" {
return true
}

sel, err := selector.Parse(selectorStr)
if err != nil {
log.WithError(err).WithField("selector", selectorStr).Error("Failed to parse node selector, ignoring")
return true // Default to allowing on parse error to avoid breaking services
}

return sel.Evaluate(s.nodeLabels)
}

// NewSyncer returns a new Syncer
func NewSyncer(family int, nodePortIPs []net.IP,
frontendMap maps.MapWithExistsCheck, backendMap maps.MapWithExistsCheck,
affmap maps.Map, rt Routes,
excludedCIDRs *ip.CIDRTrie,
nodeLabels map[string]string,
) (*Syncer, error) {

s := &Syncer{
ipFamily: family,
bpfAff: affmap,
rt: rt,
nodePortIPs: uniqueIPs(nodePortIPs),
nodeLabels: nodeLabels,
prevSvcMap: make(map[svcKey]svcInfo),
prevEpsMap: make(k8sp.EndpointsMap),
stop: make(chan struct{}),
Expand Down Expand Up @@ -649,25 +669,38 @@ func (s *Syncer) apply(state DPSyncerState) error {
}

if nport := svc.NodePort(); nport != 0 {
for _, npip := range s.nodePortIPs {
npInfo := serviceInfoFromK8sServicePort(svc)
npInfo.clusterIP = npip
npInfo.port = nport
if svc.InternalPolicyLocal() &&
((s.ipFamily == 4 && npip.Equal(podNPIP)) || (s.ipFamily == 6 && npip.Equal(podNPIPV6))) {
// do not program the meta entry, program each node
// separately
continue
}
err := s.applyDerived(sname, svcTypeNodePort, npInfo)
if err != nil {
log.Errorf("failed to apply NodePort %s for service %s : %s", npip, sname, err)
continue
}
// Check if this node matches the service's node selector (if any)
nodeSelector := ""
if svcAnnotated, ok := svc.(Service); ok {
nodeSelector = svcAnnotated.NodeSelector()
}
if svc.InternalPolicyLocal() {
if miss := s.expandAndApplyNodePorts(sname, svc, eps, nport, s.rt.Lookup); miss != nil {
expNPMisses = append(expNPMisses, miss)

if !s.nodeMatchesSelector(nodeSelector) {
log.WithFields(log.Fields{
"service": sname,
"selector": nodeSelector,
}).Debug("Node does not match selector, skipping NodePort programming")
} else {
for _, npip := range s.nodePortIPs {
npInfo := serviceInfoFromK8sServicePort(svc)
npInfo.clusterIP = npip
npInfo.port = nport
if svc.InternalPolicyLocal() &&
((s.ipFamily == 4 && npip.Equal(podNPIP)) || (s.ipFamily == 6 && npip.Equal(podNPIPV6))) {
// do not program the meta entry, program each node
// separately
continue
}
err := s.applyDerived(sname, svcTypeNodePort, npInfo)
if err != nil {
log.Errorf("failed to apply NodePort %s for service %s : %s", npip, sname, err)
continue
}
}
if svc.InternalPolicyLocal() {
if miss := s.expandAndApplyNodePorts(sname, svc, eps, nport, s.rt.Lookup); miss != nil {
expNPMisses = append(expNPMisses, miss)
}
}
}
}
Expand Down Expand Up @@ -1558,3 +1591,23 @@ func K8sSvcWithReapTerminatingUDP() K8sServicePortOption {
s.(*servicePort).reapTerminatingUDP = true
}
}

// NewK8sServicePortWithSelector creates a new k8s ServicePort with a node selector
func NewK8sServicePortWithSelector(clusterIP net.IP, port int, proto v1.Protocol,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like it is only used in test code, so should probably live in the _test.go file

nodeSelector string, opts ...K8sServicePortOption) k8sp.ServicePort {

x := &servicePort{
ServicePort: &serviceInfo{
clusterIP: clusterIP,
port: port,
protocol: proto,
},
}

x.nodeSelector = nodeSelector

for _, o := range opts {
o(x)
}
return x
}
2 changes: 2 additions & 0 deletions felix/bpf/proxy/syncer_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ func runBenchmarkServiceUpdate(b *testing.B, svcCnt, epCnt int, mockMaps bool, o
&mock.DummyMap{},
NewRTCache(),
nil,
nil,
)
Expect(err).ShouldNot(HaveOccurred())
} else {
Expand All @@ -190,6 +191,7 @@ func runBenchmarkServiceUpdate(b *testing.B, svcCnt, epCnt int, mockMaps bool, o
&mock.DummyMap{},
NewRTCache(),
nil,
nil,
)
Expect(err).ShouldNot(HaveOccurred())
}
Expand Down
Loading
Loading