Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/2293.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:feature
sync-catalog: add ability to support weighted loadbalancing by service annotation `consul.hashicorp.com/service-weight: <number>`
```
6 changes: 6 additions & 0 deletions control-plane/catalog/to-consul/annotation.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,10 @@ const (
// annotationServiceMetaPrefix is the prefix for setting meta key/value
// for a service. The remainder of the key is the meta key.
annotationServiceMetaPrefix = "consul.hashicorp.com/service-meta-"

// annotationServiceWeight is the key of the annotation that determines
// the traffic weight of the service which is spanned over multiple k8s cluster.
// e.g. Service `backend` in k8s cluster `A` receives 25% of the traffic
// compared to same `backend` service in k8s cluster `B`.
annotationServiceWeight = "consul.hashicorp.com/service-weight"
)
41 changes: 41 additions & 0 deletions control-plane/catalog/to-consul/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -511,6 +511,19 @@ func (t *ServiceResource) generateRegistrations(key string) {
r.Service = &rs
r.Service.ID = serviceID(r.Service.Service, ip)
r.Service.Address = ip
// Adding information about service weight.
// Overrides the existing weight if present.
if weight, ok := svc.Annotations[annotationServiceWeight]; ok && weight != "" {
weightI, err := getServiceWeight(weight)
if err == nil {
r.Service.Weights = consulapi.AgentWeights{
Passing: weightI,
}
} else {
t.Log.Debug("[generateRegistrations] service weight err: ", err)
}
}

t.consulMap[key] = append(t.consulMap[key], &r)
}

Expand Down Expand Up @@ -547,6 +560,19 @@ func (t *ServiceResource) generateRegistrations(key string) {
r.Service.ID = serviceID(r.Service.Service, addr)
r.Service.Address = addr

// Adding information about service weight.
// Overrides the existing weight if present.
if weight, ok := svc.Annotations[annotationServiceWeight]; ok && weight != "" {
Copy link
Copy Markdown
Contributor

@asheshvidyut asheshvidyut Jun 8, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these lines 567 - 577 and lines 518 - 528 are exactly the same. can we take it out in a function as well?

Copy link
Copy Markdown
Contributor Author

@srahul3 srahul3 Jun 8, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code at both place is constructing the CatalogRegistration object, there is no logic which is duplicate. Read the comment #2293 (comment)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is ok to keep as is. it appears that the only behavior here is an assignment which IMO is not really duplication.

weightI, err := getServiceWeight(weight)
if err == nil {
r.Service.Weights = consulapi.AgentWeights{
Passing: weightI,
}
} else {
t.Log.Debug("[generateRegistrations] service weight err: ", err)
}
}

t.consulMap[key] = append(t.consulMap[key], &r)
}
}
Expand Down Expand Up @@ -999,3 +1025,18 @@ func (t *ServiceResource) isIngressService(key string) bool {
func consulHealthCheckID(k8sNS string, serviceID string) string {
return fmt.Sprintf("%s/%s", k8sNS, serviceID)
}

// Calculates the passing service weight.
func getServiceWeight(weight string) (int, error) {
// error validation if the input param is a number.
weightI, err := strconv.Atoi(weight)
if err != nil {
return -1, err
}

if weightI <= 1 {
return -1, fmt.Errorf("expecting the service annotation %s value to be greater than 1", annotationServiceWeight)
Comment on lines +1037 to +1038
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can add some additional checks here. Can the service weight be greater than 100? if it can, ignore this comment, but just wanted to ensure we have completeness here wrt failing for all invalid service weight!

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey thanks for sharing the thoughts. I couldn't find any documentation which mentions the weight to be percentage. To start with all the service default weight is set to 1 hence it is 1:1 which can be considered as equal ratio diversion. I guess in that case it is allowed to go beyond 100.

I have seen some example on stackoverflow, users configuring it to 10000:1, not sure if it is valid scenario unless someone wants to do super safe canary test.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool! thanks for investigating this!

}

return weightI, nil
}
133 changes: 133 additions & 0 deletions control-plane/catalog/to-consul/resource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,139 @@ func TestServiceResource_createDelete(t *testing.T) {
})
}

// Test that Loadbalancer service weight is set from service annotation.
func TestServiceWeight_ingress(t *testing.T) {
t.Parallel()
client := fake.NewSimpleClientset()
syncer := newTestSyncer()
serviceResource := defaultServiceResource(client, syncer)

// Start the controller
closer := controller.TestControllerRun(&serviceResource)
defer closer()

// Insert an LB service
svc := lbService("foo", metav1.NamespaceDefault, "1.2.3.4")
svc.Annotations[annotationServiceWeight] = "22"
svc.Status.LoadBalancer.Ingress = append(
svc.Status.LoadBalancer.Ingress,
corev1.LoadBalancerIngress{IP: "3.3.3.3"},
)

svc.Status.LoadBalancer.Ingress = append(
svc.Status.LoadBalancer.Ingress,
corev1.LoadBalancerIngress{IP: "4.4.4.4"},
)

_, err := client.CoreV1().Services(metav1.NamespaceDefault).Create(context.Background(), svc, metav1.CreateOptions{})
require.NoError(t, err)

// Verify what we got
retry.Run(t, func(r *retry.R) {
syncer.Lock()
defer syncer.Unlock()
actual := syncer.Registrations
require.Len(r, actual, 3)
require.Equal(r, "foo", actual[1].Service.Service)
require.Equal(r, "3.3.3.3", actual[1].Service.Address)
require.Equal(r, 22, actual[1].Service.Weights.Passing)
require.Equal(r, "foo", actual[2].Service.Service)
require.Equal(r, "4.4.4.4", actual[2].Service.Address)
require.Equal(r, 22, actual[2].Service.Weights.Passing)
require.NotEqual(r, actual[1].Service.ID, actual[2].Service.ID)
})
}

// Test that Loadbalancer service weight is set from service annotation.
func TestServiceWeight_externalIP(t *testing.T) {
t.Parallel()
client := fake.NewSimpleClientset()
syncer := newTestSyncer()
serviceResource := defaultServiceResource(client, syncer)

// Start the controller
closer := controller.TestControllerRun(&serviceResource)
defer closer()

// Insert an LB service
svc := lbService("foo", metav1.NamespaceDefault, "1.2.3.4")
svc.Annotations[annotationServiceWeight] = "22"
svc.Spec.ExternalIPs = []string{"3.3.3.3", "4.4.4.4"}

_, err := client.CoreV1().Services(metav1.NamespaceDefault).Create(context.Background(), svc, metav1.CreateOptions{})
require.NoError(t, err)

// Verify what we got
retry.Run(t, func(r *retry.R) {
syncer.Lock()
defer syncer.Unlock()
actual := syncer.Registrations
require.Len(r, actual, 2)
require.Equal(r, "foo", actual[0].Service.Service)
require.Equal(r, "3.3.3.3", actual[0].Service.Address)
require.Equal(r, 22, actual[0].Service.Weights.Passing)
require.Equal(r, "foo", actual[1].Service.Service)
require.Equal(r, "4.4.4.4", actual[1].Service.Address)
require.Equal(r, 22, actual[1].Service.Weights.Passing)
require.NotEqual(r, actual[0].Service.ID, actual[1].Service.ID)
})
}

// Test service weight.
func TestServiceWeight(t *testing.T) {
t.Parallel()
cases := map[string]struct {
Weight string
ExpectError bool
ExtectedWeight int
}{
"external-IP": {
Weight: "22",
ExpectError: false,
ExtectedWeight: 22,
},
"non-int-weight": {
Weight: "non-int",
ExpectError: true,
ExtectedWeight: 0,
},
"one-weight": {
Weight: "1",
ExpectError: true,
ExtectedWeight: 0,
},
"zero-weight": {
Weight: "0",
ExpectError: true,
ExtectedWeight: 0,
},
"negative-weight": {
Weight: "-2",
ExpectError: true,
ExtectedWeight: 0,
},
"greater-than-100-is-allowed": {
Weight: "1000",
ExpectError: false,
ExtectedWeight: 1000,
},
}

for name, c := range cases {
t.Run(name, func(tt *testing.T) {
weightI, err := getServiceWeight(c.Weight)
// Verify what we got
retry.Run(tt, func(r *retry.R) {
if c.ExpectError {
require.Error(r, err)
} else {
require.Equal(r, c.ExtectedWeight, weightI)
}
})
})
}
}

// Test that we're default enabled.
func TestServiceResource_defaultEnable(t *testing.T) {
t.Parallel()
Expand Down