Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 19 additions & 11 deletions go-controller/pkg/node/gateway_shared_intf.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (

kapi "k8s.io/api/core/v1"
discovery "k8s.io/api/discovery/v1"
kerrors "k8s.io/apimachinery/pkg/api/errors"
ktypes "k8s.io/apimachinery/pkg/types"
apierrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/sets"
Expand Down Expand Up @@ -774,7 +775,8 @@ func (npw *nodePortWatcher) AddEndpointSlice(epSlice *discovery.EndpointSlice) e
// Here we make sure the correct rules are programmed whenever an AddEndpointSlice
// event is received, only alter flows if we need to, i.e if cache wasn't
// set or if it was and hasLocalHostNetworkEp state changed, to prevent flow churn
namespacedName, err := namespacedNameFromEPSlice(epSlice)
namespacedName, err := serviceNamespacedNameFromEndpointSlice(epSlice)

if err != nil {
return fmt.Errorf("cannot add %s/%s to nodePortWatcher: %v", epSlice.Namespace, epSlice.Name, err)
}
Expand Down Expand Up @@ -814,7 +816,7 @@ func (npw *nodePortWatcher) DeleteEndpointSlice(epSlice *discovery.EndpointSlice

klog.V(5).Infof("Deleting endpointslice %s in namespace %s", epSlice.Name, epSlice.Namespace)
// remove rules for endpoints and add back normal ones
namespacedName, err := namespacedNameFromEPSlice(epSlice)
namespacedName, err := serviceNamespacedNameFromEndpointSlice(epSlice)
if err != nil {
return fmt.Errorf("cannot delete %s/%s from nodePortWatcher: %v", epSlice.Namespace, epSlice.Name, err)
}
Expand All @@ -835,10 +837,11 @@ func (npw *nodePortWatcher) DeleteEndpointSlice(epSlice *discovery.EndpointSlice
return nil
}

func getEndpointAddresses(endpointSlice *discovery.EndpointSlice) []string {
func getEndpointAddresses(endpointSlice *discovery.EndpointSlice, service *kapi.Service) []string {
endpointsAddress := make([]string, 0)
includeTerminating := service != nil && service.Spec.PublishNotReadyAddresses
for _, endpoint := range endpointSlice.Endpoints {
if isEndpointReady(endpoint) {
if util.IsEndpointValid(endpoint, includeTerminating) {
for _, ip := range endpoint.Addresses {
endpointsAddress = append(endpointsAddress, utilnet.ParseIPSloppy(ip).String())
}
Expand All @@ -851,18 +854,23 @@ func (npw *nodePortWatcher) UpdateEndpointSlice(oldEpSlice, newEpSlice *discover
var err error
var errors []error

oldEpAddr := getEndpointAddresses(oldEpSlice)
newEpAddr := getEndpointAddresses(newEpSlice)
namespacedName, err := serviceNamespacedNameFromEndpointSlice(newEpSlice)
if err != nil {
return fmt.Errorf("cannot update %s/%s in nodePortWatcher: %v", newEpSlice.Namespace, newEpSlice.Name, err)
}
svc, err := npw.watchFactory.GetService(namespacedName.Namespace, namespacedName.Name)
if err != nil && !kerrors.IsNotFound(err) {
return fmt.Errorf("error while retrieving service for endpointslice %s/%s during update: %v",
oldEpSlice.Namespace, oldEpSlice.Name, err)
}

oldEpAddr := getEndpointAddresses(oldEpSlice, svc)
newEpAddr := getEndpointAddresses(newEpSlice, svc)
if reflect.DeepEqual(oldEpAddr, newEpAddr) {
return nil
}

klog.V(5).Infof("Updating endpointslice %s in namespace %s", oldEpSlice.Name, oldEpSlice.Namespace)

namespacedName, err := namespacedNameFromEPSlice(newEpSlice)
if err != nil {
return fmt.Errorf("cannot update %s/%s in nodePortWatcher: %v", newEpSlice.Namespace, newEpSlice.Name, err)
}
if _, exists := npw.getServiceInfo(namespacedName); !exists {
// When a service is updated from externalName to nodeport type, it won't be
// in nodePortWatcher cache (npw): in this case, have the new nodeport IPtable rules
Expand Down
8 changes: 4 additions & 4 deletions go-controller/pkg/node/healthcheck_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func (l *loadBalancerHealthChecker) SyncServices(svcs []interface{}) error {
}

func (l *loadBalancerHealthChecker) SyncEndPointSlices(epSlice *discovery.EndpointSlice) error {
namespacedName, err := namespacedNameFromEPSlice(epSlice)
namespacedName, err := serviceNamespacedNameFromEndpointSlice(epSlice)
if err != nil {
return fmt.Errorf("skipping %s/%s: %v", epSlice.Namespace, epSlice.Name, err)
}
Expand All @@ -124,7 +124,7 @@ func (l *loadBalancerHealthChecker) SyncEndPointSlices(epSlice *discovery.Endpoi
}

func (l *loadBalancerHealthChecker) AddEndpointSlice(epSlice *discovery.EndpointSlice) error {
namespacedName, err := namespacedNameFromEPSlice(epSlice)
namespacedName, err := serviceNamespacedNameFromEndpointSlice(epSlice)
if err != nil {
return fmt.Errorf("cannot add %s/%s to loadBalancerHealthChecker: %v", epSlice.Namespace, epSlice.Name, err)
}
Expand All @@ -137,7 +137,7 @@ func (l *loadBalancerHealthChecker) AddEndpointSlice(epSlice *discovery.Endpoint
}

func (l *loadBalancerHealthChecker) UpdateEndpointSlice(oldEpSlice, newEpSlice *discovery.EndpointSlice) error {
namespacedName, err := namespacedNameFromEPSlice(newEpSlice)
namespacedName, err := serviceNamespacedNameFromEndpointSlice(newEpSlice)
if err != nil {
return fmt.Errorf("cannot update %s/%s in loadBalancerHealthChecker: %v",
newEpSlice.Namespace, newEpSlice.Name, err)
Expand Down Expand Up @@ -209,7 +209,7 @@ func isEndpointReady(endpoint discovery.Endpoint) bool {
return endpoint.Conditions.Ready == nil || *endpoint.Conditions.Ready
}

func namespacedNameFromEPSlice(epSlice *discovery.EndpointSlice) (ktypes.NamespacedName, error) {
func serviceNamespacedNameFromEndpointSlice(epSlice *discovery.EndpointSlice) (ktypes.NamespacedName, error) {
// Return the namespaced name of the corresponding service
var serviceNamespacedName ktypes.NamespacedName
svcName := epSlice.Labels[discovery.LabelServiceName]
Expand Down
24 changes: 17 additions & 7 deletions go-controller/pkg/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

kapi "k8s.io/api/core/v1"
discovery "k8s.io/api/discovery/v1"
kerrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/util/wait"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/record"
Expand Down Expand Up @@ -715,7 +716,15 @@ func (n *OvnNode) reconcileConntrackUponEndpointSliceEvents(oldEndpointSlice, ne
// nothing to do upon an add event
return nil
}

namespacedName, err := serviceNamespacedNameFromEndpointSlice(oldEndpointSlice)
if err != nil {
return fmt.Errorf("cannot reconcile conntrack: %v", err)
}
svc, err := n.watchFactory.GetService(namespacedName.Namespace, namespacedName.Name)
if err != nil && !kerrors.IsNotFound(err) {
return fmt.Errorf("error while retrieving service for endpointslice %s/%s when reconciling conntrack: %v",
newEndpointSlice.Namespace, newEndpointSlice.Name, err)
}
for _, oldPort := range oldEndpointSlice.Ports {
if *oldPort.Protocol != kapi.ProtocolUDP { // flush conntrack only for UDP
continue
Expand All @@ -725,7 +734,7 @@ func (n *OvnNode) reconcileConntrackUponEndpointSliceEvents(oldEndpointSlice, ne
oldIPStr := utilnet.ParseIPSloppy(oldIP).String()
// upon an update event, remove conntrack entries for IP addresses that are no longer
// in the endpointslice, skip otherwise
if newEndpointSlice != nil && doesEPSliceContainReadyEndpoint(newEndpointSlice, oldIPStr, *oldPort.Port, *oldPort.Protocol) {
if newEndpointSlice != nil && doesEndpointSliceContainValidEndpoint(newEndpointSlice, oldIPStr, *oldPort.Port, *oldPort.Protocol, svc) {
continue
}
// upon update and delete events, flush conntrack only for UDP
Expand Down Expand Up @@ -876,13 +885,14 @@ func (n *OvnNode) validateVTEPInterfaceMTU() error {
return nil
}

// doesEPSliceContainEndpoint checks whether the endpointslice
// contains a specific endpoint with IP/Port/Protocol and this endpoint is ready
func doesEPSliceContainReadyEndpoint(epSlice *discovery.EndpointSlice,
epIP string, epPort int32, protocol kapi.Protocol) bool {
// doesEndpointSliceContainValidEndpoint returns true if the endpointslice
// contains an endpoint with the given IP/Port/Protocol and this endpoint is considered valid
func doesEndpointSliceContainValidEndpoint(epSlice *discovery.EndpointSlice,
epIP string, epPort int32, protocol kapi.Protocol, service *kapi.Service) bool {
includeTerminating := service != nil && service.Spec.PublishNotReadyAddresses
for _, port := range epSlice.Ports {
for _, endpoint := range epSlice.Endpoints {
if !isEndpointReady(endpoint) {
if !util.IsEndpointValid(endpoint, includeTerminating) {
continue
}
for _, ip := range endpoint.Addresses {
Expand Down
2 changes: 1 addition & 1 deletion go-controller/pkg/ovn/controller/services/load_balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ var protos = []v1.Protocol{
func buildServiceLBConfigs(service *v1.Service, endpointSlices []*discovery.EndpointSlice) (perNodeConfigs []lbConfig, clusterConfigs []lbConfig) {
// For each svcPort, determine if it will be applied per-node or cluster-wide
for _, svcPort := range service.Spec.Ports {
eps := util.GetLbEndpoints(endpointSlices, svcPort)
eps := util.GetLbEndpoints(endpointSlices, svcPort, service.Spec.PublishNotReadyAddresses)

// if ExternalTrafficPolicy or InternalTrafficPolicy is local, then we need to do things a bit differently
externalTrafficLocal := (service.Spec.ExternalTrafficPolicy == v1.ServiceExternalTrafficPolicyTypeLocal)
Expand Down
22 changes: 11 additions & 11 deletions go-controller/pkg/util/kube.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,31 +306,31 @@ type LbEndpoints struct {
Port int32
}

// GetLbEndpoints return the endpoints that belong to the IPFamily as a slice of IPs
func GetLbEndpoints(slices []*discovery.EndpointSlice, svcPort kapi.ServicePort) LbEndpoints {
// GetLbEndpoints returns the IPv4 and IPv6 addresses of valid endpoints as slices inside a struct
func GetLbEndpoints(slices []*discovery.EndpointSlice, svcPort kapi.ServicePort, includeTerminating bool) LbEndpoints {
v4ips := sets.NewString()
v6ips := sets.NewString()

out := LbEndpoints{}
// return an empty object so the caller don't have to check for nil and can use it as an iterator
// return an empty object so the caller doesn't have to check for nil and can use it as an iterator
if len(slices) == 0 {
return out
}

for _, slice := range slices {
klog.V(4).Infof("Getting endpoints for slice %s/%s", slice.Namespace, slice.Name)

// build the list of endpoints in the slice
// build the list of valid endpoints in the slice
for _, port := range slice.Ports {
// If Service port name set it must match the name field in the endpoint
// If Service port name is not set we just use the endpoint port
// If Service port name is set, it must match the name field in the endpoint
// If Service port name is not set, we just use the endpoint port
if svcPort.Name != "" && svcPort.Name != *port.Name {
klog.V(5).Infof("Slice %s with different Port name, requested: %s received: %s",
slice.Name, svcPort.Name, *port.Name)
continue
}

// Skip ports that doesn't match the protocol
// Skip ports that don't match the protocol
if *port.Protocol != svcPort.Protocol {
klog.V(5).Infof("Slice %s with different Port protocol, requested: %s received: %s",
slice.Name, svcPort.Protocol, *port.Protocol)
Expand All @@ -339,13 +339,13 @@ func GetLbEndpoints(slices []*discovery.EndpointSlice, svcPort kapi.ServicePort)

out.Port = *port.Port
for _, endpoint := range slice.Endpoints {
// Skip endpoints that are not ready
if endpoint.Conditions.Ready != nil && !*endpoint.Conditions.Ready {
klog.V(4).Infof("Slice endpoints Not Ready")
// Skip endpoint if it's not valid
if !IsEndpointValid(endpoint, includeTerminating) {
klog.V(4).Infof("Slice endpoint not valid")
continue
}
for _, ip := range endpoint.Addresses {
klog.V(4).Infof("Adding slice %s endpoints: %v, port: %d", slice.Name, endpoint.Addresses, *port.Port)
klog.V(4).Infof("Adding slice %s endpoint: %v, port: %d", slice.Name, endpoint.Addresses, *port.Port)
ipStr := utilnet.ParseIPSloppy(ip).String()
switch slice.AddressType {
case discovery.AddressTypeIPv4:
Expand Down
76 changes: 75 additions & 1 deletion go-controller/pkg/util/kube_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -637,10 +637,84 @@ func Test_getLbEndpoints(t *testing.T) {
},
want: LbEndpoints{[]string{"10.0.0.2", "10.1.1.2", "10.2.2.2"}, []string{}, 80},
},
{
name: "slices with non-ready but serving endpoints",
args: args{
slices: []*discovery.EndpointSlice{
{
ObjectMeta: metav1.ObjectMeta{
Name: "svc-ab23",
Namespace: "ns",
Labels: map[string]string{discovery.LabelServiceName: "svc"},
},
Ports: []discovery.EndpointPort{
{
Name: utilpointer.StringPtr("tcp-example"),
Protocol: protoPtr(v1.ProtocolTCP),
Port: utilpointer.Int32Ptr(int32(80)),
},
},
AddressType: discovery.AddressTypeIPv6,
Endpoints: []discovery.Endpoint{
{
Conditions: discovery.EndpointConditions{
Ready: utilpointer.BoolPtr(false),
Serving: utilpointer.BoolPtr(true),
},
Addresses: []string{"2001:db2::2"},
},
},
},
},
svcPort: v1.ServicePort{
Name: "tcp-example",
TargetPort: intstr.FromInt(80),
Protocol: v1.ProtocolTCP,
},
},
want: LbEndpoints{[]string{}, []string{"2001:db2::2"}, 80},
},
{
name: "slices with non-ready non-serving endpoints",
args: args{
slices: []*discovery.EndpointSlice{
{
ObjectMeta: metav1.ObjectMeta{
Name: "svc-ab23",
Namespace: "ns",
Labels: map[string]string{discovery.LabelServiceName: "svc"},
},
Ports: []discovery.EndpointPort{
{
Name: utilpointer.StringPtr("tcp-example"),
Protocol: protoPtr(v1.ProtocolTCP),
Port: utilpointer.Int32Ptr(int32(80)),
},
},
AddressType: discovery.AddressTypeIPv6,
Endpoints: []discovery.Endpoint{
{
Conditions: discovery.EndpointConditions{
Ready: utilpointer.BoolPtr(false),
Serving: utilpointer.BoolPtr(false),
},
Addresses: []string{"2001:db2::2"},
},
},
},
},
svcPort: v1.ServicePort{
Name: "tcp-example",
TargetPort: intstr.FromInt(80),
Protocol: v1.ProtocolTCP,
},
},
want: LbEndpoints{[]string{}, []string{}, 80},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := GetLbEndpoints(tt.args.slices, tt.args.svcPort)
got := GetLbEndpoints(tt.args.slices, tt.args.svcPort, false)
assert.Equal(t, tt.want, got)
})
}
Expand Down
28 changes: 28 additions & 0 deletions go-controller/pkg/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (

"github.com/urfave/cli/v2"

discovery "k8s.io/api/discovery/v1"
"k8s.io/klog/v2"
utilnet "k8s.io/utils/net"
)
Expand Down Expand Up @@ -310,3 +311,30 @@ func UpdateNodeSwitchExcludeIPs(nbClient libovsdbclient.Client, nodeName string,

return nil
}

// IsEndpointReady takes as input an endpoint from an endpoint slice and returns true if the endpoint is
// to be considered ready. Considering as ready an endpoint with Conditions.Ready==nil
// as per doc: "In most cases consumers should interpret this unknown state as ready"
// https://github.com/kubernetes/api/blob/0478a3e95231398d8b380dc2a1905972be8ae1d5/discovery/v1/types.go#L129-L131
func IsEndpointReady(endpoint discovery.Endpoint) bool {
return endpoint.Conditions.Ready == nil || *endpoint.Conditions.Ready
}

// IsEndpointServing takes as input an endpoint from an endpoint slice and returns true if the endpoint is
// to be considered serving. Falling back to IsEndpointReady when Serving field is nil, as per doc:
// "If nil, consumers should defer to the ready condition.
// https://github.com/kubernetes/api/blob/0478a3e95231398d8b380dc2a1905972be8ae1d5/discovery/v1/types.go#L138-L139
func IsEndpointServing(endpoint discovery.Endpoint) bool {
if endpoint.Conditions.Serving != nil {
return *endpoint.Conditions.Serving
} else {
return IsEndpointReady(endpoint)
}
}

// IsEndpointValid takes as input an endpoint from an endpoint slice and a boolean that indicates whether to include
// all terminating endpoints, as per the PublishNotReadyAddresses feature in kubernetes service spec. It always returns true
// if includeTerminating is true and falls back to IsEndpointServing otherwise.
func IsEndpointValid(endpoint discovery.Endpoint, includeTerminating bool) bool {
return includeTerminating || IsEndpointServing(endpoint)
}