Skip to content

Commit

Permalink
feature: Set ExternalTrafficPolicy in LB Services, fixes #47
Browse files Browse the repository at this point in the history
  • Loading branch information
rg0now committed May 18, 2024
1 parent d7c0a10 commit 04f89a8
Show file tree
Hide file tree
Showing 5 changed files with 639 additions and 27 deletions.
84 changes: 59 additions & 25 deletions internal/renderer/service_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"

gwapiv1 "sigs.k8s.io/gateway-api/apis/v1"
Expand Down Expand Up @@ -335,7 +334,7 @@ func (r *Renderer) createLbService4Gateway(c *RenderContext, gw *gwapiv1.Gateway
}
}

// update service type if necessary
// Service type
svcType := string(opdefault.DefaultServiceType)
if t, ok := c.gwConf.Spec.LoadBalancerServiceAnnotations[opdefault.ServiceTypeAnnotationKey]; ok {
svcType = t
Expand All @@ -357,11 +356,30 @@ func (r *Renderer) createLbService4Gateway(c *RenderContext, gw *gwapiv1.Gateway
svc.Spec.Type = opdefault.DefaultServiceType
}

// MixedProtocolLB (we use the annotations from the svc: already merged from the gwConf and gw)
mixedProto := false
if isMixedProtocolEnabled, found := svc.GetAnnotations()[opdefault.MixedProtocolAnnotationKey]; found {
mixedProto = isMixedProtocolEnabled == opdefault.MixedProtocolAnnotationValue
}

// ExternalTrafficPolicy
extTrafficPolicy := ""
if p, ok := c.gwConf.Spec.LoadBalancerServiceAnnotations[opdefault.ExternalTrafficPolicyAnnotationKey]; ok {
extTrafficPolicy = p
}

if p, ok := gw.GetAnnotations()[opdefault.ExternalTrafficPolicyAnnotationKey]; ok {
extTrafficPolicy = p
}

if strings.ToLower(extTrafficPolicy) == opdefault.ExternalTrafficPolicyAnnotationValue {
svc.Spec.ExternalTrafficPolicy = corev1.ServiceExternalTrafficPolicyLocal
} else {
svc.Spec.ExternalTrafficPolicy = corev1.ServiceExternalTrafficPolicyCluster
}

// copy all listener ports/protocols from the gateway
ports := []corev1.ServicePort{}
serviceProto := ""
for _, l := range gw.Spec.Listeners {
var proto string
Expand All @@ -385,29 +403,15 @@ func (r *Renderer) createLbService4Gateway(c *RenderContext, gw *gwapiv1.Gateway
continue
}

servicePortExists := false
// search for existing port
for i := range svc.Spec.Ports {
s := &svc.Spec.Ports[i]
if string(l.Name) == s.Name {
// found one, let's update it and move on
s.Protocol = corev1.Protocol(serviceProto)
s.Port = int32(l.Port)
s.TargetPort = intstr.FromInt(int(l.Port))
servicePortExists = true
break
}
}

if !servicePortExists {
svc.Spec.Ports = append(svc.Spec.Ports, corev1.ServicePort{
Name: string(l.Name),
Protocol: corev1.Protocol(serviceProto),
Port: int32(l.Port),
})
}
ports = append(ports, corev1.ServicePort{
Name: string(l.Name),
Protocol: corev1.Protocol(serviceProto),
Port: int32(l.Port),
})
}

svc.Spec.Ports = mergeServicePorts(svc.Spec.Ports, ports)

// Open the health-check port for LoadBalancer Services only
if svc.Spec.Type == corev1.ServiceTypeLoadBalancer {
healthCheckPort, err := setHealthCheck(svc.GetAnnotations(), svc)
Expand All @@ -429,8 +433,8 @@ func (r *Renderer) createLbService4Gateway(c *RenderContext, gw *gwapiv1.Gateway

// no valid listener in gateway: refuse to create a service
if len(svc.Spec.Ports) == 0 {
c.log.V(1).Info("createLbService4Gateway: refusing to create a LB service as there "+
"is no valid listener found", "gateway", store.GetObjectKey(gw))
c.log.V(1).Info("createLbService4Gateway: refusing to create a LB service as "+
"there is no valid listener found", "gateway", store.GetObjectKey(gw))
return nil
}

Expand Down Expand Up @@ -467,6 +471,36 @@ func (r *Renderer) getServiceProtocol(proto gwapiv1.ProtocolType) (string, error
return serviceProto, nil
}

// TODO: understand and refactor
// merge serviceports wth existing svc
// - p2 overrides ps1 on conflict
// - service-ports not existing in ps2 are deleted from result
func mergeServicePorts(ps1, ps2 []corev1.ServicePort) []corev1.ServicePort {
// init
ret := make([]corev1.ServicePort, len(ps2))
for i := range ps2 {
ps2[i].DeepCopyInto(&ret[i])
}

// if a service-port exists in ps1, then merge
for i := range ret {
for j := range ps1 {
if ret[i].Name == ps1[j].Name {
tmp := ret[i].DeepCopy()
// copy ps1
ps1[j].DeepCopyInto(&ret[i])
// then update
ret[i].Protocol = tmp.Protocol
ret[i].Port = tmp.Port
ret[i].TargetPort = tmp.TargetPort
break
}
}
}

return ret
}

func setHealthCheck(annotations map[string]string, svc *corev1.Service) (int32, error) {
var healthCheckPort int32
var healthCheckProtocol string
Expand Down
Loading

0 comments on commit 04f89a8

Please sign in to comment.