Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions go-controller/pkg/ovn/endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func (ovn *Controller) getLbEndpoints(ep *kapi.Endpoints) map[kapi.Protocol]map[

// AddEndpoints adds endpoints and creates corresponding resources in OVN
func (ovn *Controller) AddEndpoints(ep *kapi.Endpoints) error {
klog.V(5).Infof("Adding endpoints: %s for namespace: %s", ep.Name, ep.Namespace)
Comment thread
trozet marked this conversation as resolved.
klog.Infof("Adding endpoints: %s for namespace: %s", ep.Name, ep.Namespace)
// get service
// TODO: cache the service
svc, err := ovn.watchFactory.GetService(ep.Namespace, ep.Name)
Expand Down Expand Up @@ -157,7 +157,7 @@ func (ovn *Controller) handleNodePortLB(node *kapi.Node) error {
}

func (ovn *Controller) deleteEndpoints(ep *kapi.Endpoints) error {
klog.V(5).Infof("Deleting endpoints: %s for namespace: %s", ep.Name, ep.Namespace)
klog.Infof("Deleting endpoints: %s for namespace: %s", ep.Name, ep.Namespace)
svc, err := ovn.watchFactory.GetService(ep.Namespace, ep.Name)
if err != nil {
// This is not necessarily an error. For e.g when a service is deleted,
Expand All @@ -183,7 +183,7 @@ func (ovn *Controller) deleteEndpoints(ep *kapi.Endpoints) error {
if err != nil {
klog.Errorf("Failed to create reject ACL for load balancer: %s, error: %v", lb, err)
}
klog.V(5).Infof("Reject ACL created for load balancer: %s, %s", lb, aclUUID)
klog.Infof("Reject ACL created for load balancer: %s, %s", lb, aclUUID)
}

// clear endpoints from the LB
Expand Down
1 change: 1 addition & 0 deletions go-controller/pkg/ovn/endpoints_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ func (e endpoints) delNodePortPortCmds(fexec *ovntest.FakeExec, service v1.Servi
})
fexec.AddFakeCmdsNoOutputNoError([]string{
fmt.Sprintf("ovn-nbctl --timeout=15 --if-exists remove load_balancer load_balancer_%s vips \"%s:%v\"", strconv.Itoa(idx), "169.254.33.2", service.Spec.Ports[0].NodePort),
fmt.Sprintf("ovn-nbctl --timeout=15 --data=bare --no-heading --columns=_uuid find acl name=load_balancer_%s-169.254.33.2\\:%d", strconv.Itoa(idx), service.Spec.Ports[0].NodePort),
})
}
}
Expand Down
47 changes: 38 additions & 9 deletions go-controller/pkg/ovn/loadbalancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ func (ovn *Controller) getLogicalSwitchesForLoadBalancer(lb string) ([]string, e
}

// TODO: Add unittest for function.
func generateACLName(lb string, sourceIP string, sourcePort int32) string {
func (ovn *Controller) generateACLName(lb string, sourceIP string, sourcePort int32) string {
aclName := fmt.Sprintf("%s-%s:%d", lb, sourceIP, sourcePort)
aclName = strings.ReplaceAll(aclName, ":", "\\:")
// ACL names are limited to 63 characters
Expand Down Expand Up @@ -221,7 +221,7 @@ func (ovn *Controller) createLoadBalancerRejectACL(lb string, sourceIP string, s
}
vip := util.JoinHostPortInt32(sourceIP, sourcePort)
// NOTE: doesn't use vip, to avoid having brackets in the name with IPv6
aclName := generateACLName(lb, sourceIP, sourcePort)
aclName := ovn.generateACLName(lb, sourceIP, sourcePort)
// If ovn-k8s was restarted, we lost the cache, and an ACL may already exist in OVN. In that case we need to check
// using ACL name
aclUUID, stderr, err := util.RunOVNNbctl("--data=bare", "--no-heading", "--columns=_uuid", "find", "acl",
Expand Down Expand Up @@ -267,12 +267,43 @@ func (ovn *Controller) createLoadBalancerRejectACL(lb string, sourceIP string, s
}

func (ovn *Controller) deleteLoadBalancerRejectACL(lb, vip string) {
acl, _ := ovn.getServiceLBInfo(lb, vip)
Comment thread
trozet marked this conversation as resolved.
if acl == "" {
aclUUID, hasEndpoints := ovn.getServiceLBInfo(lb, vip)
if aclUUID == "" && !hasEndpoints {
// If no ACL and does not have endpoints, we can assume there is no valid entry in the cache here as
// this is an illegal state.
// Determine and remove ACL by name.
ip, port, err := util.SplitHostPortInt32(vip)
if err != nil {
klog.Errorf("Unable to parse vip for Reject ACL deletion: %v", err)
return
}
ovn.removeStaleRejectACL(lb, ip, port)
} else if aclUUID == "" {
// Must have endpoints and no reject ACL to remove
klog.V(5).Infof("No reject ACL found to remove for load balancer: %s, vip: %s", lb, vip)
return
} else {
ovn.removeACLFromNodeSwitches(lb, aclUUID)
}
ovn.removeServiceACL(lb, vip)
}

func (ovn *Controller) removeStaleRejectACL(lb, ip string, port int32) {
aclName := ovn.generateACLName(lb, ip, port)
aclUUID, stderr, err := util.RunOVNNbctl("--data=bare", "--no-heading", "--columns=_uuid", "find", "acl",
fmt.Sprintf("name=%s", aclName))
if err != nil {
klog.Errorf("Error while querying ACLs by name: %s, %v", stderr, err)
return
} else if len(aclUUID) == 0 {
klog.Infof("Reject ACL not found to remove for name: %s", aclName)
return
}

ovn.removeACLFromNodeSwitches(lb, aclUUID)
}

func (ovn *Controller) removeACLFromNodeSwitches(lb, aclUUID string) {
switches, err := ovn.getLogicalSwitchesForLoadBalancer(lb)
if err != nil {
klog.Errorf("Could not retrieve logical switches associated with load balancer %s", lb)
Expand All @@ -281,17 +312,15 @@ func (ovn *Controller) deleteLoadBalancerRejectACL(lb, vip string) {

args := []string{}
for _, ls := range switches {
args = append(args, "--", "--if-exists", "remove", "logical_switch", ls, "acl", acl)
args = append(args, "--", "--if-exists", "remove", "logical_switch", ls, "acl", aclUUID)
}

if len(args) > 0 {
_, _, err = util.RunOVNNbctl(args...)
if err != nil {
klog.Errorf("Error while removing ACL: %s, from switches, error: %v", acl, err)
klog.Errorf("Error while removing ACL: %s, from switches, error: %v", aclUUID, err)
} else {
klog.V(5).Infof("ACL: %s, removed from switches: %s", acl, switches)
klog.Infof("ACL: %s, removed from switches: %s", aclUUID, switches)
}
}

ovn.removeServiceACL(lb, vip)
}
118 changes: 113 additions & 5 deletions go-controller/pkg/ovn/service.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package ovn

import (
"encoding/json"
"fmt"
"net"
"reflect"
Expand Down Expand Up @@ -30,6 +31,9 @@ func (ovn *Controller) syncServices(services []interface{}) {
// with load balancer type services based on each protocol.
lbServices := make(map[kapi.Protocol][]string)

// Track which services found should have reject ACLs. Format is name, load balancer, and value is if service has endpoints
svcRejectACLs := make(map[string]map[string]bool)

// Go through the k8s services and populate 'clusterServices',
// 'nodeportServices' and 'lbServices'
for _, serviceInterface := range services {
Expand All @@ -48,6 +52,16 @@ func (ovn *Controller) syncServices(services []interface{}) {
continue
}

// detect if service has endpoints for stale reject ACL check. If there are endpoints, we need to wipe any
// old stale ACLs
ep, err := ovn.watchFactory.GetEndpoint(service.Namespace, service.Name)
Comment thread
dcbw marked this conversation as resolved.
hasEndpoints := false
if err == nil {
if len(ep.Subsets) > 0 {
hasEndpoints = true
}
}

for _, svcPort := range service.Spec.Ports {
if err := util.ValidatePort(svcPort.Protocol, svcPort.Port); err != nil {
klog.Errorf("Error validating port %s: %v", svcPort.Name, err)
Expand All @@ -57,14 +71,107 @@ func (ovn *Controller) syncServices(services []interface{}) {
if util.ServiceTypeHasNodePort(service) {
port := fmt.Sprintf("%d", svcPort.NodePort)
nodeportServices[svcPort.Protocol] = append(nodeportServices[svcPort.Protocol], port)
gatewayRouters, _, err := ovn.getOvnGateways()
if err == nil {
for _, gatewayRouter := range gatewayRouters {
lb, err := ovn.getGatewayLoadBalancer(gatewayRouter, svcPort.Protocol)
if err != nil {
klog.Warningf("Service Sync: Gateway router %s does not have load balancer (%v)",
gatewayRouter, err)
continue
}
physicalIPs, err := ovn.getGatewayPhysicalIPs(gatewayRouter)
if err != nil {
klog.Warningf("Service Sync: Gateway router %s does not have physical ips: %v",
gatewayRouter, err)
continue
}
for _, physicalIP := range physicalIPs {
name := ovn.generateACLName(lb, physicalIP, svcPort.NodePort)
Comment thread
trozet marked this conversation as resolved.
if _, ok := svcRejectACLs[name]; !ok {
svcRejectACLs[name] = make(map[string]bool)
}
svcRejectACLs[name][lb] = hasEndpoints
}
}
}
}

key := util.JoinHostPortInt32(service.Spec.ClusterIP, svcPort.Port)
clusterServices[svcPort.Protocol] = append(clusterServices[svcPort.Protocol], key)

lb, err := ovn.getLoadBalancer(svcPort.Protocol)
if err != nil {
klog.Warningf("Unable to get existing load balancer from ovn. Reject ACLs may not be synced!")
} else {
name := ovn.generateACLName(lb, service.Spec.ClusterIP, svcPort.Port)
if _, ok := svcRejectACLs[name]; !ok {
svcRejectACLs[name] = make(map[string]bool)
}
svcRejectACLs[name][lb] = hasEndpoints
}
for _, extIP := range service.Spec.ExternalIPs {
key := util.JoinHostPortInt32(extIP, svcPort.Port)
lbServices[svcPort.Protocol] = append(lbServices[svcPort.Protocol], key)
gateways, _, err := ovn.getOvnGateways()
if err != nil {
continue
}
for _, gateway := range gateways {
lb, err := ovn.getGatewayLoadBalancer(gateway, svcPort.Protocol)
if err != nil {
klog.Errorf("Service Sync: Gateway router %s does not have load balancer (%v)",
gateway, err)
continue
}
name := ovn.generateACLName(lb, extIP, svcPort.Port)
if _, ok := svcRejectACLs[name]; !ok {
svcRejectACLs[name] = make(map[string]bool)
}
svcRejectACLs[name][lb] = hasEndpoints
}
}
}
}

// Get OVN's current reject ACLs. Note, currently only services use reject ACLs.
type ovnACLData struct {
Data [][]interface{}
}
data, stderr, err := util.RunOVNNbctl("--columns=name,_uuid", "--format=json", "find", "acl", "action=reject")
if err != nil {
klog.Errorf("Error while querying ACLs with reject action: %s, %v", stderr, err)
} else {
x := ovnACLData{}
if err := json.Unmarshal([]byte(data), &x); err != nil {
klog.Errorf("Unable to get current OVN reject ACLs. Unable to sync reject ACLs!: %v", err)
} else if len(x.Data) == 0 {
klog.Infof("Service Sync: No reject ACLs currently configured in OVN")
} else {
for _, entry := range x.Data {
// ACL entry format is a slice: [<aclName>, ["_uuid", <uuid>]]
if len(entry) != 2 {
continue
}
name, ok := entry[0].(string)
if !ok {
continue
}
uuidData, ok := entry[1].([]interface{})
if !ok || len(uuidData) != 2 {
continue
}
uuid, ok := uuidData[1].(string)
if !ok {
continue
}
if svcCacheEntry, ok := svcRejectACLs[name]; ok {
for lb, hasEps := range svcCacheEntry {
if hasEps {
klog.Infof("Service Sync: Removing OVN stale reject ACL: %s", name)
ovn.removeACLFromNodeSwitches(lb, uuid)
}
}
}
}
}
}
Expand Down Expand Up @@ -134,7 +241,7 @@ func (ovn *Controller) syncServices(services []interface{}) {
}

func (ovn *Controller) createService(service *kapi.Service) error {
klog.V(5).Infof("Creating service %s", service.Name)
klog.Infof("Creating service %s", service.Name)
if !util.IsClusterIPSet(service) {
klog.V(5).Infof("Skipping service create: No cluster IP for service %s found", service.Name)
return nil
Expand Down Expand Up @@ -216,7 +323,7 @@ func (ovn *Controller) createService(service *kapi.Service) error {
if err != nil {
return fmt.Errorf("failed to create service ACL: %v", err)
}
klog.V(5).Infof("Service Reject ACL created for gateway router: %s", aclUUID)
klog.Infof("Service Reject ACL created for gateway router: %s", aclUUID)
}
}
}
Expand All @@ -242,7 +349,7 @@ func (ovn *Controller) createService(service *kapi.Service) error {
if err != nil {
return fmt.Errorf("failed to create service ACL: %v", err)
}
klog.V(5).Infof("Service Reject ACL created for cluster IP: %s", aclUUID)
klog.Infof("Service Reject ACL created for cluster IP: %s", aclUUID)
}
if len(service.Spec.ExternalIPs) > 0 {
gateways, _, err := ovn.getOvnGateways()
Expand All @@ -265,7 +372,7 @@ func (ovn *Controller) createService(service *kapi.Service) error {
if err != nil {
return fmt.Errorf("failed to create service ACL for external IP")
}
klog.V(5).Infof("Service Reject ACL created for external IP: %s", aclUUID)
klog.Infof("Service Reject ACL created for external IP: %s", aclUUID)
}
}
}
Expand All @@ -292,6 +399,7 @@ func (ovn *Controller) updateService(oldSvc, newSvc *kapi.Service) error {
}

func (ovn *Controller) deleteService(service *kapi.Service) {
klog.Infof("Deleting service %s", service.Name)
if !util.IsClusterIPSet(service) {
return
}
Expand Down
10 changes: 10 additions & 0 deletions go-controller/pkg/ovn/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,16 @@ func (s service) baseCmds(fexec *ovntest.FakeExec, service v1.Service) {
Cmd: "ovn-nbctl --timeout=15 --data=bare --no-heading --columns=_uuid find load_balancer external_ids:k8s-cluster-lb-tcp=yes",
Output: k8sTCPLoadBalancerIP,
})
fexec.AddFakeCmdsNoOutputNoError([]string{
"ovn-nbctl --timeout=15 --columns=name,_uuid --format=json find acl action=reject",
})
fexec.AddFakeCmd(&ovntest.ExpectedCmd{
Cmd: fmt.Sprintf("ovn-nbctl --timeout=15 --data=bare --no-heading get load_balancer %s vips", k8sTCPLoadBalancerIP),
Output: "{\"172.30.0.10:53\"=\"10.128.0.18:5353,10.129.0.3:5353\"}",
})
fexec.AddFakeCmdsNoOutputNoError([]string{
fmt.Sprintf("ovn-nbctl --timeout=15 --if-exists remove load_balancer %s vips \"172.30.0.10:53\"", k8sTCPLoadBalancerIP),
fmt.Sprintf("ovn-nbctl --timeout=15 --data=bare --no-heading --columns=_uuid find acl name=%s-172.30.0.10\\:53", k8sTCPLoadBalancerIP),
})
fexec.AddFakeCmd(&ovntest.ExpectedCmd{
Cmd: "ovn-nbctl --timeout=15 --data=bare --no-heading --columns=_uuid find load_balancer external_ids:k8s-cluster-lb-udp=yes",
Expand All @@ -62,6 +66,7 @@ func (s service) baseCmds(fexec *ovntest.FakeExec, service v1.Service) {
})
fexec.AddFakeCmdsNoOutputNoError([]string{
fmt.Sprintf("ovn-nbctl --timeout=15 --if-exists remove load_balancer %s vips \"172.30.0.10:53\"", k8sUDPLoadBalancerIP),
fmt.Sprintf("ovn-nbctl --timeout=15 --data=bare --no-heading --columns=_uuid find acl name=%s-172.30.0.10\\:53", k8sUDPLoadBalancerIP),
})
fexec.AddFakeCmd(&ovntest.ExpectedCmd{
Cmd: "ovn-nbctl --timeout=15 --data=bare --no-heading --columns=_uuid find load_balancer external_ids:k8s-cluster-lb-sctp=yes",
Expand All @@ -73,6 +78,7 @@ func (s service) baseCmds(fexec *ovntest.FakeExec, service v1.Service) {
})
fexec.AddFakeCmdsNoOutputNoError([]string{
fmt.Sprintf("ovn-nbctl --timeout=15 --if-exists remove load_balancer %s vips \"172.30.0.10:53\"", k8sSCTPLoadBalancerIP),
fmt.Sprintf("ovn-nbctl --timeout=15 --data=bare --no-heading --columns=_uuid find acl name=%s-172.30.0.10\\:53", k8sSCTPLoadBalancerIP),
})
fexec.AddFakeCmd(&ovntest.ExpectedCmd{
Cmd: "ovn-nbctl --timeout=15 --data=bare --no-heading --columns=name find logical_router options:chassis!=null",
Expand All @@ -88,6 +94,7 @@ func (s service) baseCmds(fexec *ovntest.FakeExec, service v1.Service) {
})
fexec.AddFakeCmdsNoOutputNoError([]string{
"ovn-nbctl --timeout=15 --if-exists remove load_balancer tcp_load_balancer_id_1 vips \"172.30.0.10:53\"",
"ovn-nbctl --timeout=15 --data=bare --no-heading --columns=_uuid find acl name=tcp_load_balancer_id_1-172.30.0.10\\:53",
})
fexec.AddFakeCmd(&ovntest.ExpectedCmd{
Cmd: "ovn-nbctl --timeout=15 --data=bare --no-heading --columns=_uuid find load_balancer external_ids:UDP_lb_gateway_router=gateway1",
Expand All @@ -99,6 +106,7 @@ func (s service) baseCmds(fexec *ovntest.FakeExec, service v1.Service) {
})
fexec.AddFakeCmdsNoOutputNoError([]string{
"ovn-nbctl --timeout=15 --if-exists remove load_balancer udp_load_balancer_id_1 vips \"172.30.0.10:53\"",
"ovn-nbctl --timeout=15 --data=bare --no-heading --columns=_uuid find acl name=udp_load_balancer_id_1-172.30.0.10\\:53",
})
fexec.AddFakeCmd(&ovntest.ExpectedCmd{
Cmd: "ovn-nbctl --timeout=15 --data=bare --no-heading --columns=_uuid find load_balancer external_ids:SCTP_lb_gateway_router=gateway1",
Expand All @@ -110,6 +118,7 @@ func (s service) baseCmds(fexec *ovntest.FakeExec, service v1.Service) {
})
fexec.AddFakeCmdsNoOutputNoError([]string{
"ovn-nbctl --timeout=15 --if-exists remove load_balancer sctp_load_balancer_id_1 vips \"172.30.0.10:53\"",
"ovn-nbctl --timeout=15 --data=bare --no-heading --columns=_uuid find acl name=sctp_load_balancer_id_1-172.30.0.10\\:53",
})
fexec.AddFakeCmdsNoOutputNoError([]string{
fmt.Sprintf("ovn-nbctl --timeout=15 --data=bare --no-heading --columns=_uuid find logical_switch load_balancer{>=}k8s_tcp_load_balancer"),
Expand All @@ -125,6 +134,7 @@ func (s service) delCmds(fexec *ovntest.FakeExec, service v1.Service) {
for _, port := range service.Spec.Ports {
fexec.AddFakeCmdsNoOutputNoError([]string{
fmt.Sprintf("ovn-nbctl --timeout=15 --if-exists remove load_balancer %s vips \"%s:%v\"", k8sTCPLoadBalancerIP, service.Spec.ClusterIP, port.Port),
fmt.Sprintf("ovn-nbctl --timeout=15 --data=bare --no-heading --columns=_uuid find acl name=%s-%s\\:%v", k8sTCPLoadBalancerIP, service.Spec.ClusterIP, port.Port),
})
}
}
Expand Down
13 changes: 13 additions & 0 deletions go-controller/pkg/util/net.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,19 @@ func JoinHostPortInt32(host string, port int32) string {
return net.JoinHostPort(host, strconv.Itoa(int(port)))
}

// SplitHostPortInt32 splits a vip into its host and port counterparts
Comment thread
trozet marked this conversation as resolved.
func SplitHostPortInt32(vip string) (string, int32, error) {
ip, portRaw, err := net.SplitHostPort(vip)
if err != nil {
return "", 0, err
}
port, err := strconv.ParseInt(portRaw, 10, 32)
if err != nil {
return "", 0, err
}
return ip, int32(port), nil
}

// IPAddrToHWAddr takes the four octets of IPv4 address (aa.bb.cc.dd, for example) and uses them in creating
// a MAC address (0A:58:AA:BB:CC:DD). For IPv6, create a hash from the IPv6 string and use that for MAC Address.
// Assumption: the caller will ensure that an empty net.IP{} will NOT be passed.
Expand Down