diff --git a/controller/api/destination/watcher/endpoints_watcher.go b/controller/api/destination/watcher/endpoints_watcher.go index cfc99b1c691ac..25657a0702a43 100644 --- a/controller/api/destination/watcher/endpoints_watcher.go +++ b/controller/api/destination/watcher/endpoints_watcher.go @@ -634,8 +634,7 @@ func (pp *portPublisher) updateEndpointSlice(oldSlice *discovery.EndpointSlice, updatedAddressSet.Addresses[id] = address } - oldAddressSet := pp.endpointSliceToAddresses(oldSlice) - for id := range oldAddressSet.Addresses { + for _, id := range pp.endpointSliceToIDs(oldSlice) { delete(updatedAddressSet.Addresses, id) } @@ -771,6 +770,55 @@ func (pp *portPublisher) endpointSliceToAddresses(es *discovery.EndpointSlice) A } } +// endpointSliceToIDs is similar to endpointSliceToAddresses but instead returns +// only the IDs of the endpoints rather than the addresses themselves. +func (pp *portPublisher) endpointSliceToIDs(es *discovery.EndpointSlice) []ID { + resolvedPort := pp.resolveESTargetPort(es.Ports) + if resolvedPort == undefinedEndpointPort { + return []ID{} + } + + serviceID, err := getEndpointSliceServiceID(es) + if err != nil { + pp.log.Errorf("Could not fetch resource service name:%v", err) + } + + ids := []ID{} + for _, endpoint := range es.Endpoints { + if endpoint.Hostname != nil { + if pp.hostname != "" && pp.hostname != *endpoint.Hostname { + continue + } + } + if endpoint.Conditions.Ready != nil && !*endpoint.Conditions.Ready { + continue + } + + if endpoint.TargetRef == nil { + for _, IPAddr := range endpoint.Addresses { + ids = append(ids, ServiceID{ + Name: strings.Join([]string{ + serviceID.Name, + IPAddr, + fmt.Sprint(resolvedPort), + }, "-"), + Namespace: es.Namespace, + }) + } + continue + } + + if endpoint.TargetRef.Kind == endpointTargetRefPod { + ids = append(ids, PodID{ + Name: endpoint.TargetRef.Name, + Namespace: endpoint.TargetRef.Namespace, + }) + } + + } + return ids +} + func (pp *portPublisher) endpointsToAddresses(endpoints *corev1.Endpoints) AddressSet { addresses := make(map[ID]Address) for _, subset := range endpoints.Subsets { diff --git a/controller/api/destination/watcher/endpoints_watcher_test.go b/controller/api/destination/watcher/endpoints_watcher_test.go index 0e2019532d04d..8d1ac0366431b 100644 --- a/controller/api/destination/watcher/endpoints_watcher_test.go +++ b/controller/api/destination/watcher/endpoints_watcher_test.go @@ -1,16 +1,21 @@ package watcher import ( + "context" + "errors" "fmt" "sort" "sync" "testing" + "time" "github.com/linkerd/linkerd2/controller/k8s" consts "github.com/linkerd/linkerd2/pkg/k8s" + "github.com/linkerd/linkerd2/testutil" logging "github.com/sirupsen/logrus" corev1 "k8s.io/api/core/v1" dv1 "k8s.io/api/discovery/v1" + kerrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) @@ -2046,3 +2051,133 @@ status: }) } } + +// Test that when an EndpointSlice is scaled down, the EndpointsWatcher sends +// all of the Remove events, even if the associated pod is no longer available +// from the API. +func TestEndpointSliceScaleDown(t *testing.T) { + k8sConfigsWithES := []string{` +kind: APIResourceList +apiVersion: v1 +groupVersion: discovery.k8s.io/v1 +resources: +- name: endpointslices + singularName: endpointslice + namespaced: true + kind: EndpointSlice + verbs: + - delete + - deletecollection + - get + - list + - patch + - create + - update + - watch +`, ` +apiVersion: v1 +kind: Service +metadata: + name: name1 + namespace: ns +spec: + type: LoadBalancer + ports: + - port: 8989`, ` +addressType: IPv4 +apiVersion: discovery.k8s.io/v1 +endpoints: +- addresses: + - 172.17.0.12 + conditions: + ready: true + targetRef: + kind: Pod + name: name1-1 + namespace: ns + topology: + kubernetes.io/hostname: node-1 +kind: EndpointSlice +metadata: + labels: + kubernetes.io/service-name: name1 + name: name1-es + namespace: ns +ports: +- name: "" + port: 8989`, ` +apiVersion: v1 +kind: Pod +metadata: + name: name1-1 + namespace: ns +status: + phase: Running + podIP: 172.17.0.12`} + + // Create an EndpointSlice with one endpoint, backed by a pod. + + k8sAPI, err := k8s.NewFakeAPI(k8sConfigsWithES...) + if err != nil { + t.Fatalf("NewFakeAPI returned an error: %s", err) + } + + watcher := NewEndpointsWatcher(k8sAPI, logging.WithField("test", t.Name()), true) + + k8sAPI.Sync(nil) + + listener := newBufferingEndpointListener() + + err = watcher.Subscribe(ServiceID{Name: "name1", Namespace: "ns"}, 8989, "", listener) + if err != nil { + t.Fatal(err) + } + + k8sAPI.Sync(nil) + + listener.ExpectAdded([]string{"172.17.0.12:8989"}, t) + + // Delete the backing pod and scale the EndpointSlice to 0 endpoints. + + err = k8sAPI.Client.CoreV1().Pods("ns").Delete(context.Background(), "name1-1", metav1.DeleteOptions{}) + if err != nil { + t.Fatal(err) + } + + // It may take some time before the pod deletion is recognized by the + // lister. We wait until the lister sees the pod as deleted. + err = testutil.RetryFor(time.Second*30, func() error { + _, err := k8sAPI.Pod().Lister().Pods("ns").Get("name1-1") + if kerrors.IsNotFound(err) { + return nil + } + if err == nil { + return errors.New("pod should be deleted, but still exists in lister") + } + return err + }) + if err != nil { + t.Fatal(err) + } + + ES, err := k8sAPI.Client.DiscoveryV1().EndpointSlices("ns").Get(context.Background(), "name1-es", metav1.GetOptions{}) + if err != nil { + t.Fatal(err) + } + + emptyES := &dv1.EndpointSlice{ + AddressType: "IPv4", + ObjectMeta: metav1.ObjectMeta{ + Name: "name1-es", Namespace: "ns", + Labels: map[string]string{dv1.LabelServiceName: "name1"}, + }, + Endpoints: []dv1.Endpoint{}, + Ports: []dv1.EndpointPort{}, + } + + watcher.updateEndpointSlice(ES, emptyES) + + // Ensure the watcher emits a remove event. + + listener.ExpectRemoved([]string{"172.17.0.12:8989"}, t) +} diff --git a/test/integration/deep/endpoints/endpoints_test.go b/test/integration/deep/endpoints/endpoints_test.go index 7e76078b898a7..88f6e6bb646b6 100644 --- a/test/integration/deep/endpoints/endpoints_test.go +++ b/test/integration/deep/endpoints/endpoints_test.go @@ -53,7 +53,7 @@ func TestGoodEndpoints(t *testing.T) { testName := fmt.Sprintf("expect endpoints created for %s", endpointCase.name) t.Run(testName, func(t *testing.T) { - err = TestHelper.RetryFor(5*time.Second, func() error { + err = testutil.RetryFor(5*time.Second, func() error { out, err = TestHelper.LinkerdRun("diagnostics", "endpoints", endpointCase.authority, "-ojson") if err != nil { return fmt.Errorf("failed to get endpoints for %s: %w", endpointCase.authority, err) diff --git a/test/integration/deep/install_test.go b/test/integration/deep/install_test.go index c5da3a59cf7ac..2124d797c5c77 100644 --- a/test/integration/deep/install_test.go +++ b/test/integration/deep/install_test.go @@ -73,7 +73,7 @@ func TestInstallCNIPlugin(t *testing.T) { // perform a linkerd check with --linkerd-cni-enabled timeout := time.Minute - err = TestHelper.RetryFor(timeout, func() error { + err = testutil.RetryFor(timeout, func() error { out, err = TestHelper.LinkerdRun("check", "--pre", "--linkerd-cni-enabled", "--wait=60m") if err != nil { return err diff --git a/test/integration/deep/localhost/localhost_test.go b/test/integration/deep/localhost/localhost_test.go index 043df7df0491b..498ead784cded 100644 --- a/test/integration/deep/localhost/localhost_test.go +++ b/test/integration/deep/localhost/localhost_test.go @@ -95,7 +95,7 @@ func TestLocalhostServer(t *testing.T) { } } - err = TestHelper.RetryFor(50*time.Second, func() error { + err = testutil.RetryFor(50*time.Second, func() error { // Use a short time window so that transient errors at startup // fall out of the window. metrics, err := TestHelper.LinkerdRun("diagnostics", "proxy-metrics", "-n", ns, "deploy/slow-cooker") diff --git a/test/integration/deep/opaqueports/opaque_ports_test.go b/test/integration/deep/opaqueports/opaque_ports_test.go index b6b3f0d63345c..363d01f077ea7 100644 --- a/test/integration/deep/opaqueports/opaque_ports_test.go +++ b/test/integration/deep/opaqueports/opaque_ports_test.go @@ -229,7 +229,7 @@ func runTests(ctx context.Context, t *testing.T, ns string, tcs []testCase) { t.Helper() for _, tc := range tcs { t.Run(tc.name, func(t *testing.T) { - err := TestHelper.RetryFor(30*time.Second, func() error { + err := testutil.RetryFor(30*time.Second, func() error { if err := checkPodMetrics(ctx, ns, tc.scName, tc.scChecks); err != nil { return fmt.Errorf("failed to check metrics for client pod: %w", err) } diff --git a/test/integration/deep/skipports/skip_ports_test.go b/test/integration/deep/skipports/skip_ports_test.go index c78eb4906388c..87f11c5d5fbfa 100644 --- a/test/integration/deep/skipports/skip_ports_test.go +++ b/test/integration/deep/skipports/skip_ports_test.go @@ -80,7 +80,7 @@ func TestSkipInboundPorts(t *testing.T) { t.Run("check webapp metrics", func(t *testing.T) { // Wait for slow-cookers to start sending requests by using a short // time window through RetryFor. - err := TestHelper.RetryFor(30*time.Second, func() error { + err := testutil.RetryFor(30*time.Second, func() error { pods, err := TestHelper.GetPods(ctx, ns, map[string]string{"app": "webapp"}) if err != nil { return fmt.Errorf("error getting pods\n%w", err) diff --git a/test/integration/external/externalissuer/external_issuer_test.go b/test/integration/external/externalissuer/external_issuer_test.go index ef66b95a00b68..ee1f4a7120d05 100644 --- a/test/integration/external/externalissuer/external_issuer_test.go +++ b/test/integration/external/externalissuer/external_issuer_test.go @@ -70,7 +70,7 @@ func verifyInstallApp(ctx context.Context, t *testing.T) { } func checkAppWoks(t *testing.T, timeout time.Duration) error { - return TestHelper.RetryFor(timeout, func() error { + return testutil.RetryFor(timeout, func() error { args := []string{"viz", "stat", "deploy", "-n", TestHelper.GetTestNamespace(TestAppNamespaceSuffix), "--from", "deploy/slow-cooker", "-t", "1m"} out, err := TestHelper.LinkerdRun(args...) if err != nil { @@ -119,7 +119,7 @@ func verifyRotateExternalCerts(ctx context.Context, t *testing.T) { func verifyIdentityServiceReloadsIssuerCert(t *testing.T) { // check that the identity service has received an IssuerUpdated event timeout := 90 * time.Second - err := TestHelper.RetryFor(timeout, func() error { + err := testutil.RetryFor(timeout, func() error { out, err := TestHelper.Kubectl("", "--namespace", TestHelper.GetLinkerdNamespace(), "get", "events", "--field-selector", "reason=IssuerUpdated", "-ojson", diff --git a/test/integration/external/externalresources/rabbitmq_test.go b/test/integration/external/externalresources/rabbitmq_test.go index 661578ef4674b..10c3895f955d7 100644 --- a/test/integration/external/externalresources/rabbitmq_test.go +++ b/test/integration/external/externalresources/rabbitmq_test.go @@ -65,7 +65,7 @@ func TestRabbitMQDeploy(t *testing.T) { // Verify client output golden := "check.rabbitmq.golden" timeout := 50 * time.Second - err = TestHelper.RetryFor(timeout, func() error { + err = testutil.RetryFor(timeout, func() error { out, err := TestHelper.Kubectl("", "-n", testNamespace, "logs", "-lapp=rabbitmq-client", "-crabbitmq-client") if err != nil { return fmt.Errorf("'kubectl logs -l app=rabbitmq-client -c rabbitmq-client' command failed\n%w", err) diff --git a/test/integration/external/stat/stat_test.go b/test/integration/external/stat/stat_test.go index 27405e0747370..f0c6b96b95f6d 100644 --- a/test/integration/external/stat/stat_test.go +++ b/test/integration/external/stat/stat_test.go @@ -233,7 +233,7 @@ func TestCliStatForLinkerdNamespace(t *testing.T) { tt := tt // pin timeout := 20 * time.Second t.Run("linkerd "+strings.Join(tt.args, " "), func(t *testing.T) { - err := TestHelper.RetryFor(timeout, func() error { + err := testutil.RetryFor(timeout, func() error { // Use a short time window so that transient errors at startup // fall out of the window. tt.args = append(tt.args, "-t", "30s") diff --git a/test/integration/install/inject/inject_test.go b/test/integration/install/inject/inject_test.go index 8c1afa6e0a306..c00425004a680 100644 --- a/test/integration/install/inject/inject_test.go +++ b/test/integration/install/inject/inject_test.go @@ -162,7 +162,7 @@ func TestInjectAutoParams(t *testing.T) { } var pod *v1.Pod - err = TestHelper.RetryFor(30*time.Second, func() error { + err = testutil.RetryFor(30*time.Second, func() error { pods, err := TestHelper.GetPodsForDeployment(ctx, ns, deployName) if err != nil { return fmt.Errorf("failed to get pods for namespace %s", ns) diff --git a/test/integration/install/smoke/install_smoke_test.go b/test/integration/install/smoke/install_smoke_test.go index f164902d9a96e..204e15b6680a2 100644 --- a/test/integration/install/smoke/install_smoke_test.go +++ b/test/integration/install/smoke/install_smoke_test.go @@ -75,7 +75,7 @@ func TestSmoke(t *testing.T) { // Use a short time window for check tests to get rid of transient // errors timeout := 5 * time.Minute - err = TestHelper.RetryFor(timeout, func() error { + err = testutil.RetryFor(timeout, func() error { out, err := TestHelper.LinkerdRun(cmd...) if err != nil { return fmt.Errorf("'linkerd check' command failed\n%w\n%s", err, out) diff --git a/test/integration/multicluster/multicluster-traffic/mc_traffic_test.go b/test/integration/multicluster/multicluster-traffic/mc_traffic_test.go index 1a5e88a914494..0ca4693c15a80 100644 --- a/test/integration/multicluster/multicluster-traffic/mc_traffic_test.go +++ b/test/integration/multicluster/multicluster-traffic/mc_traffic_test.go @@ -90,7 +90,7 @@ func TestGateways(t *testing.T) { }) timeout := time.Minute - err := TestHelper.RetryFor(timeout, func() error { + err := testutil.RetryFor(timeout, func() error { out, err := TestHelper.LinkerdRun("--context="+contexts[testutil.SourceContextKey], "multicluster", "gateways") if err != nil { return err @@ -178,7 +178,7 @@ func TestTargetTraffic(t *testing.T) { }) timeout := time.Minute - err := TestHelper.RetryFor(timeout, func() error { + err := testutil.RetryFor(timeout, func() error { out, err := TestHelper.KubectlWithContext("", targetCtx, "--namespace", ns, @@ -259,7 +259,7 @@ func TestMulticlusterStatefulSetTargetTraffic(t *testing.T) { t.Run("expect open outbound TCP connection from gateway to nginx", func(t *testing.T) { // Use a short time window so that slow-cooker can warm-up and send // requests. - err := TestHelper.RetryFor(1*time.Minute, func() error { + err := testutil.RetryFor(1*time.Minute, func() error { // Check gateway metrics metrics, err := TestHelper.LinkerdRun(dgCmd...) if err != nil { diff --git a/test/integration/viz/edges/edges_test.go b/test/integration/viz/edges/edges_test.go index f096c5f7ac090..845261bbb8ed6 100644 --- a/test/integration/viz/edges/edges_test.go +++ b/test/integration/viz/edges/edges_test.go @@ -49,7 +49,7 @@ func TestEdges(t *testing.T) { "-ojson", } r := regexp.MustCompile(b.String()) - err := TestHelper.RetryFor(timeout, func() error { + err := testutil.RetryFor(timeout, func() error { out, err := TestHelper.LinkerdRun(cmd...) if err != nil { t.Fatal(err) @@ -141,7 +141,7 @@ func TestDirectEdges(t *testing.T) { // check edges timeout := 50 * time.Second testDataPath := "testdata" - err = TestHelper.RetryFor(timeout, func() error { + err = testutil.RetryFor(timeout, func() error { out, err = TestHelper.LinkerdRun("-n", testNamespace, "-o", "json", "viz", "edges", "deploy") if err != nil { return err diff --git a/test/integration/viz/policy/policy_test.go b/test/integration/viz/policy/policy_test.go index 933fc06b62854..21c8b982d42ea 100644 --- a/test/integration/viz/policy/policy_test.go +++ b/test/integration/viz/policy/policy_test.go @@ -115,7 +115,7 @@ func TestPolicy(t *testing.T) { tt := tt // pin timeout := 3 * time.Minute t.Run("linkerd "+strings.Join(tt.args, " "), func(t *testing.T) { - err := TestHelper.RetryFor(timeout, func() error { + err := testutil.RetryFor(timeout, func() error { // Use a short time window so that transient errors at startup // fall out of the window. tt.args = append(tt.args, "-t", "30s") diff --git a/test/integration/viz/serviceprofiles/serviceprofiles_test.go b/test/integration/viz/serviceprofiles/serviceprofiles_test.go index 7a1e860a1a4a1..c5d8105831b8f 100644 --- a/test/integration/viz/serviceprofiles/serviceprofiles_test.go +++ b/test/integration/viz/serviceprofiles/serviceprofiles_test.go @@ -226,7 +226,7 @@ func testMetrics(t *testing.T) { func assertRouteStat(upstream, namespace, downstream string, t *testing.T, assertFn func(stat *cmd2.JSONRouteStats) error) { const routePath = "GET /testpath" timeout := 2 * time.Minute - err := TestHelper.RetryFor(timeout, func() error { + err := testutil.RetryFor(timeout, func() error { routes, err := getRoutes(upstream, namespace, []string{"--to", downstream}) if err != nil { return fmt.Errorf("'linkerd routes' command failed: %w", err) @@ -286,7 +286,7 @@ func getRoutes(deployName, namespace string, additionalArgs []string) ([]*cmd2.J cmd = append(cmd, "--output", "json") var results map[string][]*cmd2.JSONRouteStats - err := TestHelper.RetryFor(2*time.Minute, func() error { + err := testutil.RetryFor(2*time.Minute, func() error { out, err := TestHelper.LinkerdRun(cmd...) if err != nil { return err diff --git a/test/integration/viz/stat/stat_test.go b/test/integration/viz/stat/stat_test.go index 048437523aaf0..de23b0cbe6a9d 100644 --- a/test/integration/viz/stat/stat_test.go +++ b/test/integration/viz/stat/stat_test.go @@ -233,7 +233,7 @@ func TestCliStatForLinkerdNamespace(t *testing.T) { tt := tt // pin timeout := 20 * time.Second t.Run("linkerd "+strings.Join(tt.args, " "), func(t *testing.T) { - err := TestHelper.RetryFor(timeout, func() error { + err := testutil.RetryFor(timeout, func() error { // Use a short time window so that transient errors at startup // fall out of the window. tt.args = append(tt.args, "-t", "30s") diff --git a/test/integration/viz/tracing/tracing_test.go b/test/integration/viz/tracing/tracing_test.go index 9c40010ae4dab..08ea94c676d0e 100644 --- a/test/integration/viz/tracing/tracing_test.go +++ b/test/integration/viz/tracing/tracing_test.go @@ -68,7 +68,7 @@ func TestTracing(t *testing.T) { checkCmd := []string{"jaeger", "check", "--wait=0"} golden := "check.jaeger.golden" timeout := time.Minute - err = TestHelper.RetryFor(timeout, func() error { + err = testutil.RetryFor(timeout, func() error { out, err := TestHelper.LinkerdRun(checkCmd...) if err != nil { return fmt.Errorf("'linkerd jaeger check' command failed\n%w\n%s", err, out) @@ -159,7 +159,7 @@ func TestTracing(t *testing.T) { t.Run("expect full trace", func(t *testing.T) { timeout := 3 * time.Minute - err = TestHelper.RetryFor(timeout, func() error { + err = testutil.RetryFor(timeout, func() error { url, err := TestHelper.URLFor(ctx, tracingNs, "jaeger", 16686) if err != nil { return err diff --git a/test/integration/viz/trafficsplit/trafficsplit_test.go b/test/integration/viz/trafficsplit/trafficsplit_test.go index 07c3a02b159f2..a38dafc368d34 100644 --- a/test/integration/viz/trafficsplit/trafficsplit_test.go +++ b/test/integration/viz/trafficsplit/trafficsplit_test.go @@ -98,7 +98,7 @@ func TestTrafficSplitCliWithSP(t *testing.T) { t.Run(fmt.Sprintf("ensure traffic is sent to one backend only for %s", version), func(t *testing.T) { timeout := 40 * time.Second - err := TestHelper.RetryFor(timeout, func() error { + err := testutil.RetryFor(timeout, func() error { out, err := TestHelper.LinkerdRun("viz", "stat", "deploy", "--namespace", prefixedNs, "--from", "deploy/slow-cooker", "-t", "30s") if err != nil { return err @@ -148,7 +148,7 @@ func TestTrafficSplitCliWithSP(t *testing.T) { t.Run(fmt.Sprintf("ensure traffic is sent to both backends for %s", version), func(t *testing.T) { timeout := 40 * time.Second - err := TestHelper.RetryFor(timeout, func() error { + err := testutil.RetryFor(timeout, func() error { out, err := TestHelper.LinkerdRun("viz", "stat", "deploy", "-n", prefixedNs, "--from", "deploy/slow-cooker", "-t", "30s") if err != nil { diff --git a/testutil/test_helper.go b/testutil/test_helper.go index 8b100f83100dd..37cd6e856c3cd 100644 --- a/testutil/test_helper.go +++ b/testutil/test_helper.go @@ -260,7 +260,7 @@ func NewTestHelper() *TestHelper { } testHelper.version = strings.TrimSpace(version) - kubernetesHelper, err := NewKubernetesHelper(*k8sContext, testHelper.RetryFor) + kubernetesHelper, err := NewKubernetesHelper(*k8sContext, RetryFor) if err != nil { exit(1, fmt.Sprintf("error creating kubernetes helper: %s", err.Error())) } @@ -618,7 +618,7 @@ func (h *TestHelper) CheckVersion(serverVersion string) error { // RetryFor retries a given function every second until the function returns // without an error, or a timeout is reached. If the timeout is reached, it // returns the last error received from the function. -func (h *TestHelper) RetryFor(timeout time.Duration, fn func() error) error { +func RetryFor(timeout time.Duration, fn func() error) error { err := fn() if err == nil { return nil @@ -648,7 +648,7 @@ func (h *TestHelper) RetryFor(timeout time.Duration, fn func() error) error { // giving pods time to start. func (h *TestHelper) HTTPGetURL(url string) (string, error) { var body string - err := h.RetryFor(time.Minute, func() error { + err := RetryFor(time.Minute, func() error { resp, err := h.httpClient.Get(url) if err != nil { return err diff --git a/testutil/test_helper_check.go b/testutil/test_helper_check.go index 6944e08cdf236..3d2fa53f4535b 100644 --- a/testutil/test_helper_check.go +++ b/testutil/test_helper_check.go @@ -63,7 +63,7 @@ func (h *TestHelper) TestCheckProxy(expectedVersion, namespace string) error { func (h *TestHelper) testCheck(cmd []string, categories []healthcheck.CategoryID) error { timeout := time.Minute * 10 - return h.RetryFor(timeout, func() error { + return RetryFor(timeout, func() error { res, err := h.LinkerdRun(cmd...) if err != nil { return fmt.Errorf("'linkerd check' command failed\n%w\n%s", err, res)