Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 39 additions & 29 deletions pkg/network/proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"k8s.io/klog/v2"

corev1 "k8s.io/api/core/v1"
discoveryv1beta1 "k8s.io/api/discovery/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
ktypes "k8s.io/apimachinery/pkg/types"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
Expand All @@ -27,14 +28,21 @@ import (
"github.com/openshift/sdn/pkg/network/common"
)

// EndpointsConfigHandler is an abstract interface of objects which receive update notifications for the set of endpoints.
type EndpointsConfigHandler interface {
// OnEndpointsUpdate gets called when endpoints configuration is changed for a given
// service on any of the configuration sources. An example is when a new
// service comes up, or when containers come up or down for an existing service.
OnEndpointsUpdate(endpoints []*corev1.Endpoints)
// NoopEndpointSliceHandler is a noop handler for proxiers that have not yet
// implemented a full EndpointSliceHandler.
type NoopEndpointsHandler struct{}

func (*NoopEndpointsHandler) OnEndpointsAdd(*corev1.Endpoints) {}

func (*NoopEndpointsHandler) OnEndpointsUpdate(old, new *corev1.Endpoints) {
}

func (*NoopEndpointsHandler) OnEndpointsDelete(*corev1.Endpoints) {}

func (*NoopEndpointsHandler) OnEndpointsSynced() {}

var _ kubeproxyconfig.EndpointsHandler = &NoopEndpointsHandler{}

type firewallItem struct {
ruleType networkv1.EgressNetworkPolicyRuleType
net *net.IPNet
Expand All @@ -46,12 +54,12 @@ type proxyFirewallItem struct {
}

type proxyEndpoints struct {
endpoints *corev1.Endpoints
endpoints *discoveryv1beta1.EndpointSlice
blocked bool
}

type OsdnProxy struct {
kubeproxyconfig.NoopEndpointSliceHandler
NoopEndpointsHandler
sync.Mutex

kClient kubernetes.Interface
Expand Down Expand Up @@ -264,12 +272,12 @@ func (proxy *OsdnProxy) updateEgressNetworkPolicy(policy networkv1.EgressNetwork
}

wasBlocked := pep.blocked
pep.blocked = proxy.endpointsBlocked(pep.endpoints)
pep.blocked = proxy.endpointSliceBlocked(pep.endpoints)
switch {
case wasBlocked && !pep.blocked:
proxy.baseProxy.OnEndpointsAdd(pep.endpoints)
proxy.baseProxy.OnEndpointSliceAdd(pep.endpoints)
case !wasBlocked && pep.blocked:
proxy.baseProxy.OnEndpointsDelete(pep.endpoints)
proxy.baseProxy.OnEndpointSliceDelete(pep.endpoints)
}
}
}
Expand All @@ -290,13 +298,13 @@ func (proxy *OsdnProxy) firewallBlocksIP(namespace string, ip net.IP) bool {
return false
}

func (proxy *OsdnProxy) endpointsBlocked(ep *corev1.Endpoints) bool {
for _, ss := range ep.Subsets {
func (proxy *OsdnProxy) endpointSliceBlocked(ep *discoveryv1beta1.EndpointSlice) bool {
for _, ss := range ep.Endpoints {
for _, addr := range ss.Addresses {
IP := net.ParseIP(addr.IP)
if _, contains := common.ClusterNetworkListContains(proxy.networkInfo.ClusterNetworks, IP); !contains && !proxy.networkInfo.ServiceNetwork.Contains(IP) {
if proxy.firewallBlocksIP(ep.Namespace, IP) {
klog.Warningf("Service '%s' in namespace '%s' has an Endpoint pointing to firewalled destination (%s)", ep.Name, ep.Namespace, addr.IP)
ip := net.ParseIP(addr)
if _, contains := common.ClusterNetworkListContains(proxy.networkInfo.ClusterNetworks, ip); !contains && !proxy.networkInfo.ServiceNetwork.Contains(ip) {
if proxy.firewallBlocksIP(ep.Namespace, ip) {
klog.Warningf("Service '%s' in namespace '%s' has an Endpoint pointing to firewalled destination (%s)", ep.Name, ep.Namespace, addr)
return true
}
}
Expand All @@ -314,18 +322,18 @@ func (proxy *OsdnProxy) checkInitialized() {
}
}

func (proxy *OsdnProxy) OnEndpointsAdd(ep *corev1.Endpoints) {
func (proxy *OsdnProxy) OnEndpointSliceAdd(ep *discoveryv1beta1.EndpointSlice) {
proxy.Lock()
defer proxy.Unlock()

pep := &proxyEndpoints{ep, proxy.endpointsBlocked(ep)}
pep := &proxyEndpoints{ep, proxy.endpointSliceBlocked(ep)}
proxy.allEndpoints[ep.UID] = pep
if !pep.blocked {
proxy.baseProxy.OnEndpointsAdd(ep)
proxy.baseProxy.OnEndpointSliceAdd(ep)
}
}

func (proxy *OsdnProxy) OnEndpointsUpdate(old, ep *corev1.Endpoints) {
func (proxy *OsdnProxy) OnEndpointSliceUpdate(old, ep *discoveryv1beta1.EndpointSlice) {
proxy.Lock()
defer proxy.Unlock()

Expand All @@ -337,19 +345,19 @@ func (proxy *OsdnProxy) OnEndpointsUpdate(old, ep *corev1.Endpoints) {
}
wasBlocked := pep.blocked
pep.endpoints = ep
pep.blocked = proxy.endpointsBlocked(ep)
pep.blocked = proxy.endpointSliceBlocked(ep)

switch {
case wasBlocked && !pep.blocked:
proxy.baseProxy.OnEndpointsAdd(ep)
proxy.baseProxy.OnEndpointSliceAdd(ep)
case !wasBlocked && !pep.blocked:
proxy.baseProxy.OnEndpointsUpdate(old, ep)
proxy.baseProxy.OnEndpointSliceUpdate(old, ep)
case !wasBlocked && pep.blocked:
proxy.baseProxy.OnEndpointsDelete(ep)
proxy.baseProxy.OnEndpointSliceDelete(ep)
}
}

func (proxy *OsdnProxy) OnEndpointsDelete(ep *corev1.Endpoints) {
func (proxy *OsdnProxy) OnEndpointSliceDelete(ep *discoveryv1beta1.EndpointSlice) {
proxy.Lock()
defer proxy.Unlock()

Expand All @@ -360,13 +368,15 @@ func (proxy *OsdnProxy) OnEndpointsDelete(ep *corev1.Endpoints) {
}
delete(proxy.allEndpoints, ep.UID)
if !pep.blocked {
proxy.baseProxy.OnEndpointsDelete(ep)
proxy.baseProxy.OnEndpointSliceDelete(ep)
}
}

func (proxy *OsdnProxy) OnEndpointsSynced() {
proxy.baseProxy.OnEndpointsSynced()
func (proxy *OsdnProxy) OnEndpointSlicesSynced() {
klog.Infof("DEBUG: OsdnProxy OnEndpointSlicesSynced")
proxy.baseProxy.OnEndpointSlicesSynced()
proxy.endpointsSynced = true
klog.Infof("DEBUG: OsdnProxy OnEndpointSlicesSynced true")
proxy.checkInitialized()
}

Expand Down
74 changes: 38 additions & 36 deletions pkg/network/proxyimpl/hybrid/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"k8s.io/klog/v2"

corev1 "k8s.io/api/core/v1"
discoveryv1beta1 "k8s.io/api/discovery/v1beta1"
"k8s.io/apimachinery/pkg/types"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
corev1listers "k8s.io/client-go/listers/core/v1"
Expand All @@ -27,11 +28,28 @@ type RunnableProxy interface {
SetSyncRunner(b *async.BoundedFrequencyRunner)
}

// NoopEndpointSliceHandler is a noop handler for proxiers that have not yet
// implemented a full EndpointSliceHandler.
type NoopEndpointsHandler struct{}

func (*NoopEndpointsHandler) OnEndpointsAdd(*corev1.Endpoints) {}

func (*NoopEndpointsHandler) OnEndpointsUpdate(old, new *corev1.Endpoints) {
}

func (*NoopEndpointsHandler) OnEndpointsDelete(*corev1.Endpoints) {}

func (*NoopEndpointsHandler) OnEndpointsSynced() {}

var _ proxyconfig.EndpointsHandler = &NoopEndpointsHandler{}

// HybridProxier runs an unidling proxy and a primary proxy at the same time,
// delegating idled services to the unidling proxy and other services to the
// primary proxy.
type HybridProxier struct {
proxyconfig.NoopEndpointSliceHandler
// TODO implement https://github.com/kubernetes/enhancements/pull/640
proxyconfig.NoopNodeHandler
NoopEndpointsHandler

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The HybridProxier can't no-op Endpoints handling; it has to pass EndpointSlice events down to the iptables proxier and Endpoints events to the userspace proxier. And since OsdnProxy acts as a filter on top of HybridProxier, it needs to also pass both sets of events down to the proxier it's wrapping.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I updated userspace proxier to use EndpointSlice, I thought we were already going to have to switch to use Service instead of Endpoints for idling.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops, I see what you mean. Why wasn't userspace proxier updated? Just no one signed up for it?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why wasn't userspace proxier updated? Just no one signed up for it?

Upstream doesn't care about the userspace proxier any more (Tim would probably have already deleted it if OCP wasn't using it for unidling) and Red Hat had thought we weren't going to have to use EndpointSlice in openshift-sdn, so we didn't care about updating it either.

At any rate, I think we don't actually need to update userspace to use EndpointSlice; we just need to make HybridProxier and OsdnProxy pass both endpoint events and endpointslice events down to their wrapper proxiers, and then eventually the iptables proxy will act on the endpointslice events and the userspace proxy will act on the endpoint events.


mainProxy RunnableProxy
unidlingProxy RunnableProxy
Expand Down Expand Up @@ -89,22 +107,6 @@ func NewHybridProxier(
return p, nil
}

func (proxier *HybridProxier) OnNodeAdd(node *corev1.Node) {
// TODO implement https://github.com/kubernetes/enhancements/pull/640
}

func (proxier *HybridProxier) OnNodeUpdate(oldNode, node *corev1.Node) {
// TODO implement https://github.com/kubernetes/enhancements/pull/640
}

func (proxier *HybridProxier) OnNodeDelete(node *corev1.Node) {
// TODO implement https://github.com/kubernetes/enhancements/pull/640
}

func (proxier *HybridProxier) OnNodeSynced() {
// TODO implement https://github.com/kubernetes/enhancements/pull/640
}

func (p *HybridProxier) OnServiceAdd(service *corev1.Service) {
svcName := types.NamespacedName{
Namespace: service.Namespace,
Expand Down Expand Up @@ -190,11 +192,11 @@ func (p *HybridProxier) OnServiceSynced() {
klog.V(6).Infof("hybrid proxy: services synced")
}

// shouldEndpointsUseUserspace checks to see if the given endpoints have the correct
// shouldEndpointSliceUseUserspace checks to see if the given endpoints have the correct
// annotations and size to use the unidling proxy.
func (p *HybridProxier) shouldEndpointsUseUserspace(endpoints *corev1.Endpoints) bool {
func (p *HybridProxier) shouldEndpointSliceUseUserspace(endpoints *discoveryv1beta1.EndpointSlice) bool {
hasEndpoints := false
for _, subset := range endpoints.Subsets {
for _, subset := range endpoints.Endpoints {
if len(subset.Addresses) > 0 {
hasEndpoints = true
break
Expand Down Expand Up @@ -244,11 +246,11 @@ func (p *HybridProxier) switchService(name types.NamespacedName) {
p.switchedToUserspace[name] = p.usingUserspace[name]
}

func (p *HybridProxier) OnEndpointsAdd(endpoints *corev1.Endpoints) {
func (p *HybridProxier) OnEndpointSliceAdd(endpoints *discoveryv1beta1.EndpointSlice) {
// we track all endpoints in the unidling endpoints handler so that we can succesfully
// detect when a service become unidling
klog.V(6).Infof("hybrid proxy: (always) add ep %s/%s in unidling proxy", endpoints.Namespace, endpoints.Name)
p.unidlingProxy.OnEndpointsAdd(endpoints)
p.unidlingProxy.OnEndpointSliceAdd(endpoints)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not familiar at all with the sdn/proxy implementation, maybe this information is redundant, but can be multiple slices for the same service, and each slice can have duplicate endpoints, kube-proxy uses a cache
https://github.com/kubernetes/kubernetes/blob/2bcbc527a760106ec89647fcf6852f37c804f4ed/pkg/proxy/endpointslicecache.go#L43-L49

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the limit of endpoints per slice is 100, so if you have more than 100 endpoints, let's say 110 for service X you'll receive two slices Y1 and Y2, maybe with 100 and 10 endpoints each

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We really should have an e2e test in upstream that creates a service with > 100 endpoints then to exercise this. Is there one you know of I can crib?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, for idling, wondering whether we even need to support > three or four endpoints. The only time the user space proxy should be in play is on an idle service which has no endpoints.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

her we even need to support > three or four endpoints.

as I said, I'm not familiar with this code, just raising some points that I think may be taking into consideration, if that is the case, it seems we should't worry about this

We really should have an e2e test in upstream that creates a service with > 100 endpoints

this is well tested upstream

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The unidling proxy can just ignore the endpointslices and work with the endpoints like it always did. Endpoints objects always contain the full set of endpoints, even in cases where the EndpointSlice controller would start splitting things up; it just means that code working with the Endpoints objects doesn't get the efficiency wins that code working with the EndpointSlice objects would get.


p.usingUserspaceLock.Lock()
defer p.usingUserspaceLock.Unlock()
Expand All @@ -259,11 +261,11 @@ func (p *HybridProxier) OnEndpointsAdd(endpoints *corev1.Endpoints) {
}

wasUsingUserspace, knownEndpoints := p.usingUserspace[svcName]
p.usingUserspace[svcName] = p.shouldEndpointsUseUserspace(endpoints)
p.usingUserspace[svcName] = p.shouldEndpointSliceUseUserspace(endpoints)

if !p.usingUserspace[svcName] {
klog.V(6).Infof("hybrid proxy: add ep %s/%s in main proxy", endpoints.Namespace, endpoints.Name)
p.mainProxy.OnEndpointsAdd(endpoints)
p.mainProxy.OnEndpointSliceAdd(endpoints)
}

// a service could appear before endpoints, so we have to treat this as a potential
Expand All @@ -273,11 +275,11 @@ func (p *HybridProxier) OnEndpointsAdd(endpoints *corev1.Endpoints) {
}
}

func (p *HybridProxier) OnEndpointsUpdate(oldEndpoints, endpoints *corev1.Endpoints) {
func (p *HybridProxier) OnEndpointSliceUpdate(oldEndpoints, endpoints *discoveryv1beta1.EndpointSlice) {
// we track all endpoints in the unidling endpoints handler so that we can succesfully
// detect when a service become unidling
klog.V(6).Infof("hybrid proxy: (always) update ep %s/%s in unidling proxy", endpoints.Namespace, endpoints.Name)
p.unidlingProxy.OnEndpointsUpdate(oldEndpoints, endpoints)
p.unidlingProxy.OnEndpointSliceUpdate(oldEndpoints, endpoints)

p.usingUserspaceLock.Lock()
defer p.usingUserspaceLock.Unlock()
Expand All @@ -288,7 +290,7 @@ func (p *HybridProxier) OnEndpointsUpdate(oldEndpoints, endpoints *corev1.Endpoi
}

wasUsingUserspace, knownEndpoints := p.usingUserspace[svcName]
p.usingUserspace[svcName] = p.shouldEndpointsUseUserspace(endpoints)
p.usingUserspace[svcName] = p.shouldEndpointSliceUseUserspace(endpoints)

if !knownEndpoints {
utilruntime.HandleError(fmt.Errorf("received update for unknown endpoints %s", svcName.String()))
Expand All @@ -299,26 +301,26 @@ func (p *HybridProxier) OnEndpointsUpdate(oldEndpoints, endpoints *corev1.Endpoi

if !isSwitch && !p.usingUserspace[svcName] {
klog.V(6).Infof("hybrid proxy: update ep %s/%s in main proxy", endpoints.Namespace, endpoints.Name)
p.mainProxy.OnEndpointsUpdate(oldEndpoints, endpoints)
p.mainProxy.OnEndpointSliceUpdate(oldEndpoints, endpoints)
return
}

if p.usingUserspace[svcName] {
klog.V(6).Infof("hybrid proxy: del ep %s/%s in main proxy", endpoints.Namespace, endpoints.Name)
p.mainProxy.OnEndpointsDelete(oldEndpoints)
p.mainProxy.OnEndpointSliceDelete(oldEndpoints)
} else {
klog.V(6).Infof("hybrid proxy: add ep %s/%s in main proxy", endpoints.Namespace, endpoints.Name)
p.mainProxy.OnEndpointsAdd(endpoints)
p.mainProxy.OnEndpointSliceAdd(endpoints)
}

p.switchService(svcName)
}

func (p *HybridProxier) OnEndpointsDelete(endpoints *corev1.Endpoints) {
func (p *HybridProxier) OnEndpointSliceDelete(endpoints *discoveryv1beta1.EndpointSlice) {
// we track all endpoints in the unidling endpoints handler so that we can succesfully
// detect when a service become unidling
klog.V(6).Infof("hybrid proxy: (always) del ep %s/%s in unidling proxy", endpoints.Namespace, endpoints.Name)
p.unidlingProxy.OnEndpointsDelete(endpoints)
p.unidlingProxy.OnEndpointSliceDelete(endpoints)

// Careful - there is the potential for deadlocks here,
// except that we always get usingUserspaceLock first, then
Expand All @@ -340,15 +342,15 @@ func (p *HybridProxier) OnEndpointsDelete(endpoints *corev1.Endpoints) {

if !usingUserspace {
klog.V(6).Infof("hybrid proxy: del ep %s/%s in main proxy", endpoints.Namespace, endpoints.Name)
p.mainProxy.OnEndpointsDelete(endpoints)
p.mainProxy.OnEndpointSliceDelete(endpoints)
}

p.cleanupState(svcName)
}

func (p *HybridProxier) OnEndpointsSynced() {
p.unidlingProxy.OnEndpointsSynced()
p.mainProxy.OnEndpointsSynced()
func (p *HybridProxier) OnEndpointSlicesSynced() {
p.unidlingProxy.OnEndpointSlicesSynced()
p.mainProxy.OnEndpointSlicesSynced()
klog.V(6).Infof("hybrid proxy: endpoints synced")
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/openshift-sdn/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ func NewOpenShiftSDNCommand(basename string, errout io.Writer) *cobra.Command {
Short: "Start OpenShiftSDN",
Long: networkLong,
Run: func(c *cobra.Command, _ []string) {
c.Flags().Lookup("v").Value.Set("5")
ch := make(chan struct{})
interrupt.New(func(s os.Signal) {
fmt.Fprintf(errout, "interrupt: Gracefully shutting down ...\n")
Expand Down Expand Up @@ -124,7 +125,6 @@ func (sdn *OpenShiftSDN) ValidateAndParse() error {
if err != nil {
return err
}

return nil
}

Expand Down
13 changes: 2 additions & 11 deletions pkg/openshift-sdn/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,12 @@ import (
utilwait "k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/server/mux"
"k8s.io/apiserver/pkg/server/routes"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/kubernetes/scheme"
kv1core "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/record"
"k8s.io/component-base/metrics/legacyregistry"
"k8s.io/klog/v2"
kubeproxyoptions "k8s.io/kubernetes/cmd/kube-proxy/app"
"k8s.io/kubernetes/pkg/features"
proxy "k8s.io/kubernetes/pkg/proxy"
kubeproxyconfig "k8s.io/kubernetes/pkg/proxy/apis/config"
pconfig "k8s.io/kubernetes/pkg/proxy/config"
Expand Down Expand Up @@ -67,13 +65,6 @@ func (sdn *OpenShiftSDN) runProxy(waitChan chan<- bool) {
return
}

if utilfeature.DefaultFeatureGate.Enabled(features.EndpointSlice) ||
utilfeature.DefaultFeatureGate.Enabled(features.EndpointSliceProxying) {
klog.Warningf("kube-proxy has unsupported EndpointSlice/EndpointSliceProxying gates enabled")
close(waitChan)
return
}

bindAddr := net.ParseIP(sdn.ProxyConfig.BindAddress)
nodeAddr := bindAddr

Expand Down Expand Up @@ -214,8 +205,8 @@ func (sdn *OpenShiftSDN) runProxy(waitChan chan<- bool) {
}
}

endpointsConfig := pconfig.NewEndpointsConfig(
sdn.informers.KubeInformers.Core().V1().Endpoints(),
endpointsConfig := pconfig.NewEndpointSliceConfig(
sdn.informers.KubeInformers.Discovery().V1beta1().EndpointSlices(),
sdn.ProxyConfig.IPTables.SyncPeriod.Duration,
)
// customized handling registration that inserts a filter if needed
Expand Down
Loading