From 9073b80047811c4af1f09e4da9c91cc84bacb9f8 Mon Sep 17 00:00:00 2001 From: Alejandro Pedraza Date: Thu, 28 Sep 2023 10:08:55 -0500 Subject: [PATCH] Extend unit test for HostPort subscriptions Followup to https://github.com/linkerd/linkerd2/pull/11334#issuecomment-1736093592 This extends the test introduced in #11334 to excercise upgrading a Server associated to a pod's HostPort, and observing how the stream updates the OpaqueProtocol field. Helper functions were refactored a bit to allow retrieving the l5dCRDClientSet used when building the fake API. --- controller/api/destination/server_test.go | 108 ++++++++++++++++++---- controller/api/destination/test_util.go | 12 ++- controller/k8s/test_helper.go | 22 ++++- 3 files changed, 118 insertions(+), 24 deletions(-) diff --git a/controller/api/destination/server_test.go b/controller/api/destination/server_test.go index 11c00054872ea..20307c6392d57 100644 --- a/controller/api/destination/server_test.go +++ b/controller/api/destination/server_test.go @@ -10,12 +10,15 @@ import ( "github.com/linkerd/linkerd2-proxy-api/go/net" "github.com/linkerd/linkerd2/controller/api/destination/watcher" "github.com/linkerd/linkerd2/controller/api/util" + "github.com/linkerd/linkerd2/controller/gen/apis/server/v1beta1" "github.com/linkerd/linkerd2/controller/k8s" "github.com/linkerd/linkerd2/pkg/addr" + pkgk8s "github.com/linkerd/linkerd2/pkg/k8s" "github.com/linkerd/linkerd2/testutil" logging "github.com/sirupsen/logrus" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/intstr" ) const fullyQualifiedName = "name1.ns.svc.mycluster.local" @@ -170,7 +173,8 @@ func TestGetProfiles(t *testing.T) { }) t.Run("Returns server profile", func(t *testing.T) { - stream, server := profileStream(t, fullyQualifiedName, port, "ns:other") + server := makeServer(t) + stream := profileStream(t, server, fullyQualifiedName, port, "ns:other") defer stream.Cancel() profile := assertSingleProfile(t, stream.Updates()) if profile.FullyQualifiedName != fullyQualifiedName { @@ -189,7 +193,8 @@ func TestGetProfiles(t *testing.T) { }) t.Run("Return service profile when using json token", func(t *testing.T) { - stream, server := profileStream(t, fullyQualifiedName, port, `{"ns":"other"}`) + server := makeServer(t) + stream := profileStream(t, server, fullyQualifiedName, port, `{"ns":"other"}`) defer stream.Cancel() profile := assertSingleProfile(t, stream.Updates()) if profile.FullyQualifiedName != fullyQualifiedName { @@ -204,7 +209,8 @@ func TestGetProfiles(t *testing.T) { }) t.Run("Returns client profile", func(t *testing.T) { - stream, server := profileStream(t, fullyQualifiedName, port, `{"ns":"client-ns"}`) + server := makeServer(t) + stream := profileStream(t, server, fullyQualifiedName, port, `{"ns":"client-ns"}`) defer stream.Cancel() profile := assertSingleProfile(t, stream.Updates()) routes := profile.GetRoutes() @@ -219,7 +225,8 @@ func TestGetProfiles(t *testing.T) { }) t.Run("Return profile when using cluster IP", func(t *testing.T) { - stream, server := profileStream(t, clusterIP, port, "") + server := makeServer(t) + stream := profileStream(t, server, clusterIP, port, "") defer stream.Cancel() profile := assertSingleProfile(t, stream.Updates()) if profile.FullyQualifiedName != fullyQualifiedName { @@ -237,7 +244,8 @@ func TestGetProfiles(t *testing.T) { }) t.Run("Return profile with endpoint when using pod DNS", func(t *testing.T) { - stream, server := profileStream(t, fullyQualifiedPodDNS, port, "ns:ns") + server := makeServer(t) + stream := profileStream(t, server, fullyQualifiedPodDNS, port, "ns:ns") defer stream.Cancel() epAddr, err := toAddress(podIPStatefulSet, port) @@ -277,7 +285,8 @@ func TestGetProfiles(t *testing.T) { }) t.Run("Return profile with endpoint when using pod IP", func(t *testing.T) { - stream, server := profileStream(t, podIP1, port, "ns:ns") + server := makeServer(t) + stream := profileStream(t, server, podIP1, port, "ns:ns") defer stream.Cancel() epAddr, err := toAddress(podIP1, port) @@ -317,7 +326,8 @@ func TestGetProfiles(t *testing.T) { }) t.Run("Return default profile when IP does not map to service or pod", func(t *testing.T) { - stream, server := profileStream(t, "172.0.0.0", 1234, "") + server := makeServer(t) + stream := profileStream(t, server, "172.0.0.0", 1234, "") defer stream.Cancel() profile := assertSingleProfile(t, stream.Updates()) if profile.RetryBudget == nil { @@ -328,7 +338,8 @@ func TestGetProfiles(t *testing.T) { }) t.Run("Return profile with no protocol hint when pod does not have label", func(t *testing.T) { - stream, server := profileStream(t, podIP2, port, "") + server := makeServer(t) + stream := profileStream(t, server, podIP2, port, "") defer stream.Cancel() profile := assertSingleProfile(t, stream.Updates()) if profile.Endpoint == nil { @@ -342,7 +353,8 @@ func TestGetProfiles(t *testing.T) { }) t.Run("Return non-opaque protocol profile when using cluster IP and opaque protocol port", func(t *testing.T) { - stream, server := profileStream(t, clusterIPOpaque, opaquePort, "") + server := makeServer(t) + stream := profileStream(t, server, clusterIPOpaque, opaquePort, "") defer stream.Cancel() profile := assertSingleProfile(t, stream.Updates()) if profile.FullyQualifiedName != fullyQualifiedNameOpaque { @@ -356,7 +368,8 @@ func TestGetProfiles(t *testing.T) { }) t.Run("Return opaque protocol profile with endpoint when using pod IP and opaque protocol port", func(t *testing.T) { - stream, server := profileStream(t, podIPOpaque, opaquePort, "") + server := makeServer(t) + stream := profileStream(t, server, podIPOpaque, opaquePort, "") defer stream.Cancel() epAddr, err := toAddress(podIPOpaque, opaquePort) @@ -396,7 +409,8 @@ func TestGetProfiles(t *testing.T) { }) t.Run("Return opaque protocol profile when using service name with opaque port annotation", func(t *testing.T) { - stream, server := profileStream(t, fullyQualifiedNameOpaqueService, opaquePort, "") + server := makeServer(t) + stream := profileStream(t, server, fullyQualifiedNameOpaqueService, opaquePort, "") defer stream.Cancel() profile := assertSingleProfile(t, stream.Updates()) if profile.FullyQualifiedName != fullyQualifiedNameOpaqueService { @@ -410,7 +424,8 @@ func TestGetProfiles(t *testing.T) { }) t.Run("Return profile with unknown protocol hint and identity when pod contains skipped inbound port", func(t *testing.T) { - stream, server := profileStream(t, podIPSkipped, skippedPort, "") + server := makeServer(t) + stream := profileStream(t, server, podIPSkipped, skippedPort, "") defer stream.Cancel() profile := assertSingleProfile(t, stream.Updates()) addr := profile.GetEndpoint() @@ -428,7 +443,8 @@ func TestGetProfiles(t *testing.T) { }) t.Run("Return profile with opaque protocol when using Pod IP selected by a Server", func(t *testing.T) { - stream, server := profileStream(t, podIPPolicy, 80, "") + server := makeServer(t) + stream := profileStream(t, server, podIPPolicy, 80, "") defer stream.Cancel() profile := assertSingleProfile(t, stream.Updates()) if profile.Endpoint == nil { @@ -448,7 +464,8 @@ func TestGetProfiles(t *testing.T) { }) t.Run("Return profile with opaque protocol when using an opaque port with an external IP", func(t *testing.T) { - stream, server := profileStream(t, externalIP, 3306, "") + server := makeServer(t) + stream := profileStream(t, server, externalIP, 3306, "") defer stream.Cancel() profile := assertSingleProfile(t, stream.Updates()) if !profile.OpaqueProtocol { @@ -459,7 +476,8 @@ func TestGetProfiles(t *testing.T) { }) t.Run("Return profile with non-opaque protocol when using an arbitrary port with an external IP", func(t *testing.T) { - stream, server := profileStream(t, externalIP, 80, "") + server := makeServer(t) + stream := profileStream(t, server, externalIP, 80, "") defer stream.Cancel() profile := assertSingleProfile(t, stream.Updates()) if profile.OpaqueProtocol { @@ -472,7 +490,8 @@ func TestGetProfiles(t *testing.T) { t.Run("Return profile for host port pods", func(t *testing.T) { hostPort := uint32(7777) containerPort := uint32(80) - stream, server := profileStream(t, externalIP, hostPort, "") + server, l5dClient := getServerWithClient(t) + stream := profileStream(t, server, externalIP, hostPort, "") defer stream.Cancel() // HostPort maps to pod. @@ -517,9 +536,21 @@ func TestGetProfiles(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Name: "hostport-mapping-2", Namespace: "ns", + Labels: map[string]string{ + "app": "hostport-mapping-2", + }, }, Spec: corev1.PodSpec{ Containers: []corev1.Container{ + { + Name: pkgk8s.ProxyContainerName, + Env: []corev1.EnvVar{ + { + Name: "LINKERD2_PROXY_INBOUND_LISTEN_ADDR", + Value: "0.0.0.0:4143", + }, + }, + }, { Name: "nginx", Image: "nginx", @@ -566,6 +597,46 @@ func TestGetProfiles(t *testing.T) { if dstPod != "hostport-mapping-2" { t.Fatalf("Expected dst_pod to be %s got %s", "hostport-mapping-2", dstPod) } + if profile.OpaqueProtocol { + t.Fatal("Expected OpaqueProtocol=false") + } + + // Server is created, setting the port to opaque + (*l5dClient).ServerV1beta1().Servers("ns").Create(context.Background(), &v1beta1.Server{ + ObjectMeta: metav1.ObjectMeta{ + Name: "srv-hostport-mapping-2", + Namespace: "ns", + }, + Spec: v1beta1.ServerSpec{ + PodSelector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "app": "hostport-mapping-2", + }, + }, + Port: intstr.IntOrString{ + Type: intstr.String, + StrVal: "nginx-7777", + }, + ProxyProtocol: "opaque", + }, + }, metav1.CreateOptions{}) + + var updates []*pb.DestinationProfile + err = testutil.RetryFor(time.Second*10, func() error { + updates = stream.Updates() + if len(updates) < 4 { + return fmt.Errorf("expected 4 updates, got %d", len(updates)) + } + return nil + }) + if err != nil { + t.Fatal(err) + } + + profile = stream.Updates()[3] + if !profile.OpaqueProtocol { + t.Fatal("Expected OpaqueProtocol=true") + } server.clusterStore.UnregisterGauges() }) @@ -713,10 +784,9 @@ func assertSingleUpdate(t *testing.T, updates []*pb.Update) *pb.Update { return updates[0] } -func profileStream(t *testing.T, host string, port uint32, token string) (*bufferingGetProfileStream, *server) { +func profileStream(t *testing.T, server *server, host string, port uint32, token string) *bufferingGetProfileStream { t.Helper() - server := makeServer(t) stream := &bufferingGetProfileStream{ updates: []*pb.DestinationProfile{}, MockServerStream: util.NewMockServerStream(), @@ -735,5 +805,5 @@ func profileStream(t *testing.T, host string, port uint32, token string) (*buffe // Give GetProfile some slack time.Sleep(50 * time.Millisecond) - return stream, server + return stream } diff --git a/controller/api/destination/test_util.go b/controller/api/destination/test_util.go index fc975db005d85..b8a9b5b86ea65 100644 --- a/controller/api/destination/test_util.go +++ b/controller/api/destination/test_util.go @@ -7,11 +7,17 @@ import ( pb "github.com/linkerd/linkerd2-proxy-api/go/destination" "github.com/linkerd/linkerd2/controller/api/destination/watcher" "github.com/linkerd/linkerd2/controller/api/util" + l5dcrdclient "github.com/linkerd/linkerd2/controller/gen/client/clientset/versioned" "github.com/linkerd/linkerd2/controller/k8s" logging "github.com/sirupsen/logrus" ) func makeServer(t *testing.T) *server { + srv, _ := getServerWithClient(t) + return srv +} + +func getServerWithClient(t *testing.T) (*server, *l5dcrdclient.Interface) { meshedPodResources := []string{` apiVersion: v1 kind: Namespace @@ -445,9 +451,9 @@ spec: res = append(res, hostPortMapping...) res = append(res, mirrorServiceResources...) res = append(res, destinationCredentialsResources...) - k8sAPI, err := k8s.NewFakeAPI(res...) + k8sAPI, l5dClient, err := k8s.NewFakeAPIWithL5dClient(res...) if err != nil { - t.Fatalf("NewFakeAPI returned an error: %s", err) + t.Fatalf("NewFakeAPIWithL5dClient returned an error: %s", err) } metadataAPI, err := k8s.NewFakeMetadataAPI(nil) if err != nil { @@ -513,7 +519,7 @@ spec: metadataAPI, log, make(<-chan struct{}), - } + }, l5dClient } type bufferingGetStream struct { diff --git a/controller/k8s/test_helper.go b/controller/k8s/test_helper.go index 7337cc99cacc9..725cb65da2456 100644 --- a/controller/k8s/test_helper.go +++ b/controller/k8s/test_helper.go @@ -1,10 +1,12 @@ package k8s import ( + l5dcrdclient "github.com/linkerd/linkerd2/controller/gen/client/clientset/versioned" "github.com/linkerd/linkerd2/pkg/k8s" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/kubernetes" clientsetscheme "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/metadata/fake" ) @@ -16,10 +18,26 @@ func NewFakeAPI(configs ...string) (*API, error) { return nil, err } + return NewFakeClusterScopedAPI(clientSet, spClientSet), nil +} + +// NewFakeAPIWithL5dClient provides a mock Kubernetes API for testing like +// NewFakeAPI, but it also returns the mock client for linkerd CRDs +func NewFakeAPIWithL5dClient(configs ...string) (*API, *l5dcrdclient.Interface, error) { + clientSet, _, _, l5dClientSet, err := k8s.NewFakeClientSets(configs...) + if err != nil { + return nil, nil, err + } + + return NewFakeClusterScopedAPI(clientSet, l5dClientSet), &l5dClientSet, nil +} + +// NewFakeClusterScopedAPI provides a mock Kubernetes API for testing. +func NewFakeClusterScopedAPI(clientSet kubernetes.Interface, l5dClientSet l5dcrdclient.Interface) *API { return NewClusterScopedAPI( clientSet, nil, - spClientSet, + l5dClientSet, "fake", CJ, CM, @@ -39,7 +57,7 @@ func NewFakeAPI(configs ...string) (*API, error) { ES, Srv, Secret, - ), nil + ) } // NewFakeMetadataAPI provides a mock Kubernetes API for testing.