Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 50 additions & 2 deletions controller/api/destination/watcher/endpoints_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -634,8 +634,7 @@ func (pp *portPublisher) updateEndpointSlice(oldSlice *discovery.EndpointSlice,
updatedAddressSet.Addresses[id] = address
}

oldAddressSet := pp.endpointSliceToAddresses(oldSlice)
for id := range oldAddressSet.Addresses {
for _, id := range pp.endpointSliceToIDs(oldSlice) {
delete(updatedAddressSet.Addresses, id)
}

Expand Down Expand Up @@ -771,6 +770,55 @@ func (pp *portPublisher) endpointSliceToAddresses(es *discovery.EndpointSlice) A
}
}

// endpointSliceToIDs is similar to endpointSliceToAddresses but instead returns
// only the IDs of the endpoints rather than the addresses themselves.
func (pp *portPublisher) endpointSliceToIDs(es *discovery.EndpointSlice) []ID {
resolvedPort := pp.resolveESTargetPort(es.Ports)
if resolvedPort == undefinedEndpointPort {
return []ID{}
}

serviceID, err := getEndpointSliceServiceID(es)
if err != nil {
pp.log.Errorf("Could not fetch resource service name:%v", err)
}

ids := []ID{}
for _, endpoint := range es.Endpoints {
if endpoint.Hostname != nil {
if pp.hostname != "" && pp.hostname != *endpoint.Hostname {
continue
}
}
if endpoint.Conditions.Ready != nil && !*endpoint.Conditions.Ready {
continue
}

if endpoint.TargetRef == nil {
for _, IPAddr := range endpoint.Addresses {
ids = append(ids, ServiceID{
Name: strings.Join([]string{
serviceID.Name,
IPAddr,
fmt.Sprint(resolvedPort),
}, "-"),
Namespace: es.Namespace,
})
}
continue
}

if endpoint.TargetRef.Kind == endpointTargetRefPod {
ids = append(ids, PodID{
Name: endpoint.TargetRef.Name,
Namespace: endpoint.TargetRef.Namespace,
})
}

}
return ids
}

func (pp *portPublisher) endpointsToAddresses(endpoints *corev1.Endpoints) AddressSet {
addresses := make(map[ID]Address)
for _, subset := range endpoints.Subsets {
Expand Down
135 changes: 135 additions & 0 deletions controller/api/destination/watcher/endpoints_watcher_test.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,21 @@
package watcher

import (
"context"
"errors"
"fmt"
"sort"
"sync"
"testing"
"time"

"github.com/linkerd/linkerd2/controller/k8s"
consts "github.com/linkerd/linkerd2/pkg/k8s"
"github.com/linkerd/linkerd2/testutil"
logging "github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
dv1 "k8s.io/api/discovery/v1"
kerrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

Expand Down Expand Up @@ -2046,3 +2051,133 @@ status:
})
}
}

// Test that when an EndpointSlice is scaled down, the EndpointsWatcher sends
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: godoc should start with the function name

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we typically use godoc for tests, do we? (note the // rather than ///)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok never mind, I thought one could use go doc to show docs from test functions, but that doesn't appear to be the case. OTOH, I'm not aware of the usage of ///, can you clarify?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nevermind, /// isn't a go thing. I got confused; holiday brain.

// all of the Remove events, even if the associated pod is no longer available
// from the API.
func TestEndpointSliceScaleDown(t *testing.T) {
k8sConfigsWithES := []string{`
kind: APIResourceList
apiVersion: v1
groupVersion: discovery.k8s.io/v1
resources:
- name: endpointslices
singularName: endpointslice
namespaced: true
kind: EndpointSlice
verbs:
- delete
- deletecollection
- get
- list
- patch
- create
- update
- watch
`, `
apiVersion: v1
kind: Service
metadata:
name: name1
namespace: ns
spec:
type: LoadBalancer
ports:
- port: 8989`, `
addressType: IPv4
apiVersion: discovery.k8s.io/v1
endpoints:
- addresses:
- 172.17.0.12
conditions:
ready: true
targetRef:
kind: Pod
name: name1-1
namespace: ns
topology:
kubernetes.io/hostname: node-1
kind: EndpointSlice
metadata:
labels:
kubernetes.io/service-name: name1
name: name1-es
namespace: ns
ports:
- name: ""
port: 8989`, `
apiVersion: v1
kind: Pod
metadata:
name: name1-1
namespace: ns
status:
phase: Running
podIP: 172.17.0.12`}

// Create an EndpointSlice with one endpoint, backed by a pod.

k8sAPI, err := k8s.NewFakeAPI(k8sConfigsWithES...)
if err != nil {
t.Fatalf("NewFakeAPI returned an error: %s", err)
}

watcher := NewEndpointsWatcher(k8sAPI, logging.WithField("test", t.Name()), true)

k8sAPI.Sync(nil)

listener := newBufferingEndpointListener()

err = watcher.Subscribe(ServiceID{Name: "name1", Namespace: "ns"}, 8989, "", listener)
if err != nil {
t.Fatal(err)
}

k8sAPI.Sync(nil)

listener.ExpectAdded([]string{"172.17.0.12:8989"}, t)

// Delete the backing pod and scale the EndpointSlice to 0 endpoints.

err = k8sAPI.Client.CoreV1().Pods("ns").Delete(context.Background(), "name1-1", metav1.DeleteOptions{})
if err != nil {
t.Fatal(err)
}

// It may take some time before the pod deletion is recognized by the
// lister. We wait until the lister sees the pod as deleted.
err = testutil.RetryFor(time.Second*30, func() error {
_, err := k8sAPI.Pod().Lister().Pods("ns").Get("name1-1")
if kerrors.IsNotFound(err) {
return nil
}
if err == nil {
return errors.New("pod should be deleted, but still exists in lister")
}
return err
})
if err != nil {
t.Fatal(err)
}

ES, err := k8sAPI.Client.DiscoveryV1().EndpointSlices("ns").Get(context.Background(), "name1-es", metav1.GetOptions{})
if err != nil {
t.Fatal(err)
}

emptyES := &dv1.EndpointSlice{
AddressType: "IPv4",
ObjectMeta: metav1.ObjectMeta{
Name: "name1-es", Namespace: "ns",
Labels: map[string]string{dv1.LabelServiceName: "name1"},
},
Endpoints: []dv1.Endpoint{},
Ports: []dv1.EndpointPort{},
}

watcher.updateEndpointSlice(ES, emptyES)

// Ensure the watcher emits a remove event.

listener.ExpectRemoved([]string{"172.17.0.12:8989"}, t)
}
2 changes: 1 addition & 1 deletion test/integration/deep/endpoints/endpoints_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func TestGoodEndpoints(t *testing.T) {
testName := fmt.Sprintf("expect endpoints created for %s", endpointCase.name)

t.Run(testName, func(t *testing.T) {
err = TestHelper.RetryFor(5*time.Second, func() error {
err = testutil.RetryFor(5*time.Second, func() error {
out, err = TestHelper.LinkerdRun("diagnostics", "endpoints", endpointCase.authority, "-ojson")
if err != nil {
return fmt.Errorf("failed to get endpoints for %s: %w", endpointCase.authority, err)
Expand Down
2 changes: 1 addition & 1 deletion test/integration/deep/install_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func TestInstallCNIPlugin(t *testing.T) {

// perform a linkerd check with --linkerd-cni-enabled
timeout := time.Minute
err = TestHelper.RetryFor(timeout, func() error {
err = testutil.RetryFor(timeout, func() error {
out, err = TestHelper.LinkerdRun("check", "--pre", "--linkerd-cni-enabled", "--wait=60m")
if err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion test/integration/deep/localhost/localhost_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func TestLocalhostServer(t *testing.T) {
}
}

err = TestHelper.RetryFor(50*time.Second, func() error {
err = testutil.RetryFor(50*time.Second, func() error {
// Use a short time window so that transient errors at startup
// fall out of the window.
metrics, err := TestHelper.LinkerdRun("diagnostics", "proxy-metrics", "-n", ns, "deploy/slow-cooker")
Expand Down
2 changes: 1 addition & 1 deletion test/integration/deep/opaqueports/opaque_ports_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ func runTests(ctx context.Context, t *testing.T, ns string, tcs []testCase) {
t.Helper()
for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
err := TestHelper.RetryFor(30*time.Second, func() error {
err := testutil.RetryFor(30*time.Second, func() error {
if err := checkPodMetrics(ctx, ns, tc.scName, tc.scChecks); err != nil {
return fmt.Errorf("failed to check metrics for client pod: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion test/integration/deep/skipports/skip_ports_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func TestSkipInboundPorts(t *testing.T) {
t.Run("check webapp metrics", func(t *testing.T) {
// Wait for slow-cookers to start sending requests by using a short
// time window through RetryFor.
err := TestHelper.RetryFor(30*time.Second, func() error {
err := testutil.RetryFor(30*time.Second, func() error {
pods, err := TestHelper.GetPods(ctx, ns, map[string]string{"app": "webapp"})
if err != nil {
return fmt.Errorf("error getting pods\n%w", err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func verifyInstallApp(ctx context.Context, t *testing.T) {
}

func checkAppWoks(t *testing.T, timeout time.Duration) error {
return TestHelper.RetryFor(timeout, func() error {
return testutil.RetryFor(timeout, func() error {
args := []string{"viz", "stat", "deploy", "-n", TestHelper.GetTestNamespace(TestAppNamespaceSuffix), "--from", "deploy/slow-cooker", "-t", "1m"}
out, err := TestHelper.LinkerdRun(args...)
if err != nil {
Expand Down Expand Up @@ -119,7 +119,7 @@ func verifyRotateExternalCerts(ctx context.Context, t *testing.T) {
func verifyIdentityServiceReloadsIssuerCert(t *testing.T) {
// check that the identity service has received an IssuerUpdated event
timeout := 90 * time.Second
err := TestHelper.RetryFor(timeout, func() error {
err := testutil.RetryFor(timeout, func() error {
out, err := TestHelper.Kubectl("",
"--namespace", TestHelper.GetLinkerdNamespace(),
"get", "events", "--field-selector", "reason=IssuerUpdated", "-ojson",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func TestRabbitMQDeploy(t *testing.T) {
// Verify client output
golden := "check.rabbitmq.golden"
timeout := 50 * time.Second
err = TestHelper.RetryFor(timeout, func() error {
err = testutil.RetryFor(timeout, func() error {
out, err := TestHelper.Kubectl("", "-n", testNamespace, "logs", "-lapp=rabbitmq-client", "-crabbitmq-client")
if err != nil {
return fmt.Errorf("'kubectl logs -l app=rabbitmq-client -c rabbitmq-client' command failed\n%w", err)
Expand Down
2 changes: 1 addition & 1 deletion test/integration/external/stat/stat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ func TestCliStatForLinkerdNamespace(t *testing.T) {
tt := tt // pin
timeout := 20 * time.Second
t.Run("linkerd "+strings.Join(tt.args, " "), func(t *testing.T) {
err := TestHelper.RetryFor(timeout, func() error {
err := testutil.RetryFor(timeout, func() error {
// Use a short time window so that transient errors at startup
// fall out of the window.
tt.args = append(tt.args, "-t", "30s")
Expand Down
2 changes: 1 addition & 1 deletion test/integration/install/inject/inject_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ func TestInjectAutoParams(t *testing.T) {
}

var pod *v1.Pod
err = TestHelper.RetryFor(30*time.Second, func() error {
err = testutil.RetryFor(30*time.Second, func() error {
pods, err := TestHelper.GetPodsForDeployment(ctx, ns, deployName)
if err != nil {
return fmt.Errorf("failed to get pods for namespace %s", ns)
Expand Down
2 changes: 1 addition & 1 deletion test/integration/install/smoke/install_smoke_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func TestSmoke(t *testing.T) {
// Use a short time window for check tests to get rid of transient
// errors
timeout := 5 * time.Minute
err = TestHelper.RetryFor(timeout, func() error {
err = testutil.RetryFor(timeout, func() error {
out, err := TestHelper.LinkerdRun(cmd...)
if err != nil {
return fmt.Errorf("'linkerd check' command failed\n%w\n%s", err, out)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func TestGateways(t *testing.T) {
})

timeout := time.Minute
err := TestHelper.RetryFor(timeout, func() error {
err := testutil.RetryFor(timeout, func() error {
out, err := TestHelper.LinkerdRun("--context="+contexts[testutil.SourceContextKey], "multicluster", "gateways")
if err != nil {
return err
Expand Down Expand Up @@ -178,7 +178,7 @@ func TestTargetTraffic(t *testing.T) {
})

timeout := time.Minute
err := TestHelper.RetryFor(timeout, func() error {
err := testutil.RetryFor(timeout, func() error {
out, err := TestHelper.KubectlWithContext("",
targetCtx,
"--namespace", ns,
Expand Down Expand Up @@ -259,7 +259,7 @@ func TestMulticlusterStatefulSetTargetTraffic(t *testing.T) {
t.Run("expect open outbound TCP connection from gateway to nginx", func(t *testing.T) {
// Use a short time window so that slow-cooker can warm-up and send
// requests.
err := TestHelper.RetryFor(1*time.Minute, func() error {
err := testutil.RetryFor(1*time.Minute, func() error {
// Check gateway metrics
metrics, err := TestHelper.LinkerdRun(dgCmd...)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions test/integration/viz/edges/edges_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func TestEdges(t *testing.T) {
"-ojson",
}
r := regexp.MustCompile(b.String())
err := TestHelper.RetryFor(timeout, func() error {
err := testutil.RetryFor(timeout, func() error {
out, err := TestHelper.LinkerdRun(cmd...)
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -141,7 +141,7 @@ func TestDirectEdges(t *testing.T) {
// check edges
timeout := 50 * time.Second
testDataPath := "testdata"
err = TestHelper.RetryFor(timeout, func() error {
err = testutil.RetryFor(timeout, func() error {
out, err = TestHelper.LinkerdRun("-n", testNamespace, "-o", "json", "viz", "edges", "deploy")
if err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion test/integration/viz/policy/policy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func TestPolicy(t *testing.T) {
tt := tt // pin
timeout := 3 * time.Minute
t.Run("linkerd "+strings.Join(tt.args, " "), func(t *testing.T) {
err := TestHelper.RetryFor(timeout, func() error {
err := testutil.RetryFor(timeout, func() error {
// Use a short time window so that transient errors at startup
// fall out of the window.
tt.args = append(tt.args, "-t", "30s")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ func testMetrics(t *testing.T) {
func assertRouteStat(upstream, namespace, downstream string, t *testing.T, assertFn func(stat *cmd2.JSONRouteStats) error) {
const routePath = "GET /testpath"
timeout := 2 * time.Minute
err := TestHelper.RetryFor(timeout, func() error {
err := testutil.RetryFor(timeout, func() error {
routes, err := getRoutes(upstream, namespace, []string{"--to", downstream})
if err != nil {
return fmt.Errorf("'linkerd routes' command failed: %w", err)
Expand Down Expand Up @@ -286,7 +286,7 @@ func getRoutes(deployName, namespace string, additionalArgs []string) ([]*cmd2.J

cmd = append(cmd, "--output", "json")
var results map[string][]*cmd2.JSONRouteStats
err := TestHelper.RetryFor(2*time.Minute, func() error {
err := testutil.RetryFor(2*time.Minute, func() error {
out, err := TestHelper.LinkerdRun(cmd...)
if err != nil {
return err
Expand Down
Loading