Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extend unit test for HostPort subscriptions #11439

Merged
merged 1 commit into from
Oct 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 89 additions & 19 deletions controller/api/destination/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,15 @@ import (
"github.com/linkerd/linkerd2-proxy-api/go/net"
"github.com/linkerd/linkerd2/controller/api/destination/watcher"
"github.com/linkerd/linkerd2/controller/api/util"
"github.com/linkerd/linkerd2/controller/gen/apis/server/v1beta1"
"github.com/linkerd/linkerd2/controller/k8s"
"github.com/linkerd/linkerd2/pkg/addr"
pkgk8s "github.com/linkerd/linkerd2/pkg/k8s"
"github.com/linkerd/linkerd2/testutil"
logging "github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
)

const fullyQualifiedName = "name1.ns.svc.mycluster.local"
Expand Down Expand Up @@ -170,7 +173,8 @@ func TestGetProfiles(t *testing.T) {
})

t.Run("Returns server profile", func(t *testing.T) {
stream, server := profileStream(t, fullyQualifiedName, port, "ns:other")
server := makeServer(t)
stream := profileStream(t, server, fullyQualifiedName, port, "ns:other")
defer stream.Cancel()
profile := assertSingleProfile(t, stream.Updates())
if profile.FullyQualifiedName != fullyQualifiedName {
Expand All @@ -189,7 +193,8 @@ func TestGetProfiles(t *testing.T) {
})

t.Run("Return service profile when using json token", func(t *testing.T) {
stream, server := profileStream(t, fullyQualifiedName, port, `{"ns":"other"}`)
server := makeServer(t)
stream := profileStream(t, server, fullyQualifiedName, port, `{"ns":"other"}`)
defer stream.Cancel()
profile := assertSingleProfile(t, stream.Updates())
if profile.FullyQualifiedName != fullyQualifiedName {
Expand All @@ -204,7 +209,8 @@ func TestGetProfiles(t *testing.T) {
})

t.Run("Returns client profile", func(t *testing.T) {
stream, server := profileStream(t, fullyQualifiedName, port, `{"ns":"client-ns"}`)
server := makeServer(t)
stream := profileStream(t, server, fullyQualifiedName, port, `{"ns":"client-ns"}`)
defer stream.Cancel()
profile := assertSingleProfile(t, stream.Updates())
routes := profile.GetRoutes()
Expand All @@ -219,7 +225,8 @@ func TestGetProfiles(t *testing.T) {
})

t.Run("Return profile when using cluster IP", func(t *testing.T) {
stream, server := profileStream(t, clusterIP, port, "")
server := makeServer(t)
stream := profileStream(t, server, clusterIP, port, "")
defer stream.Cancel()
profile := assertSingleProfile(t, stream.Updates())
if profile.FullyQualifiedName != fullyQualifiedName {
Expand All @@ -237,7 +244,8 @@ func TestGetProfiles(t *testing.T) {
})

t.Run("Return profile with endpoint when using pod DNS", func(t *testing.T) {
stream, server := profileStream(t, fullyQualifiedPodDNS, port, "ns:ns")
server := makeServer(t)
stream := profileStream(t, server, fullyQualifiedPodDNS, port, "ns:ns")
defer stream.Cancel()

epAddr, err := toAddress(podIPStatefulSet, port)
Expand Down Expand Up @@ -277,7 +285,8 @@ func TestGetProfiles(t *testing.T) {
})

t.Run("Return profile with endpoint when using pod IP", func(t *testing.T) {
stream, server := profileStream(t, podIP1, port, "ns:ns")
server := makeServer(t)
stream := profileStream(t, server, podIP1, port, "ns:ns")
defer stream.Cancel()

epAddr, err := toAddress(podIP1, port)
Expand Down Expand Up @@ -317,7 +326,8 @@ func TestGetProfiles(t *testing.T) {
})

t.Run("Return default profile when IP does not map to service or pod", func(t *testing.T) {
stream, server := profileStream(t, "172.0.0.0", 1234, "")
server := makeServer(t)
stream := profileStream(t, server, "172.0.0.0", 1234, "")
defer stream.Cancel()
profile := assertSingleProfile(t, stream.Updates())
if profile.RetryBudget == nil {
Expand All @@ -328,7 +338,8 @@ func TestGetProfiles(t *testing.T) {
})

t.Run("Return profile with no protocol hint when pod does not have label", func(t *testing.T) {
stream, server := profileStream(t, podIP2, port, "")
server := makeServer(t)
stream := profileStream(t, server, podIP2, port, "")
defer stream.Cancel()
profile := assertSingleProfile(t, stream.Updates())
if profile.Endpoint == nil {
Expand All @@ -342,7 +353,8 @@ func TestGetProfiles(t *testing.T) {
})

t.Run("Return non-opaque protocol profile when using cluster IP and opaque protocol port", func(t *testing.T) {
stream, server := profileStream(t, clusterIPOpaque, opaquePort, "")
server := makeServer(t)
stream := profileStream(t, server, clusterIPOpaque, opaquePort, "")
defer stream.Cancel()
profile := assertSingleProfile(t, stream.Updates())
if profile.FullyQualifiedName != fullyQualifiedNameOpaque {
Expand All @@ -356,7 +368,8 @@ func TestGetProfiles(t *testing.T) {
})

t.Run("Return opaque protocol profile with endpoint when using pod IP and opaque protocol port", func(t *testing.T) {
stream, server := profileStream(t, podIPOpaque, opaquePort, "")
server := makeServer(t)
stream := profileStream(t, server, podIPOpaque, opaquePort, "")
defer stream.Cancel()

epAddr, err := toAddress(podIPOpaque, opaquePort)
Expand Down Expand Up @@ -396,7 +409,8 @@ func TestGetProfiles(t *testing.T) {
})

t.Run("Return opaque protocol profile when using service name with opaque port annotation", func(t *testing.T) {
stream, server := profileStream(t, fullyQualifiedNameOpaqueService, opaquePort, "")
server := makeServer(t)
stream := profileStream(t, server, fullyQualifiedNameOpaqueService, opaquePort, "")
defer stream.Cancel()
profile := assertSingleProfile(t, stream.Updates())
if profile.FullyQualifiedName != fullyQualifiedNameOpaqueService {
Expand All @@ -410,7 +424,8 @@ func TestGetProfiles(t *testing.T) {
})

t.Run("Return profile with unknown protocol hint and identity when pod contains skipped inbound port", func(t *testing.T) {
stream, server := profileStream(t, podIPSkipped, skippedPort, "")
server := makeServer(t)
stream := profileStream(t, server, podIPSkipped, skippedPort, "")
defer stream.Cancel()
profile := assertSingleProfile(t, stream.Updates())
addr := profile.GetEndpoint()
Expand All @@ -428,7 +443,8 @@ func TestGetProfiles(t *testing.T) {
})

t.Run("Return profile with opaque protocol when using Pod IP selected by a Server", func(t *testing.T) {
stream, server := profileStream(t, podIPPolicy, 80, "")
server := makeServer(t)
stream := profileStream(t, server, podIPPolicy, 80, "")
defer stream.Cancel()
profile := assertSingleProfile(t, stream.Updates())
if profile.Endpoint == nil {
Expand All @@ -448,7 +464,8 @@ func TestGetProfiles(t *testing.T) {
})

t.Run("Return profile with opaque protocol when using an opaque port with an external IP", func(t *testing.T) {
stream, server := profileStream(t, externalIP, 3306, "")
server := makeServer(t)
stream := profileStream(t, server, externalIP, 3306, "")
defer stream.Cancel()
profile := assertSingleProfile(t, stream.Updates())
if !profile.OpaqueProtocol {
Expand All @@ -459,7 +476,8 @@ func TestGetProfiles(t *testing.T) {
})

t.Run("Return profile with non-opaque protocol when using an arbitrary port with an external IP", func(t *testing.T) {
stream, server := profileStream(t, externalIP, 80, "")
server := makeServer(t)
stream := profileStream(t, server, externalIP, 80, "")
defer stream.Cancel()
profile := assertSingleProfile(t, stream.Updates())
if profile.OpaqueProtocol {
Expand All @@ -472,7 +490,8 @@ func TestGetProfiles(t *testing.T) {
t.Run("Return profile for host port pods", func(t *testing.T) {
hostPort := uint32(7777)
containerPort := uint32(80)
stream, server := profileStream(t, externalIP, hostPort, "")
server, l5dClient := getServerWithClient(t)
stream := profileStream(t, server, externalIP, hostPort, "")
defer stream.Cancel()

// HostPort maps to pod.
Expand Down Expand Up @@ -517,9 +536,21 @@ func TestGetProfiles(t *testing.T) {
ObjectMeta: metav1.ObjectMeta{
Name: "hostport-mapping-2",
Namespace: "ns",
Labels: map[string]string{
"app": "hostport-mapping-2",
},
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: pkgk8s.ProxyContainerName,
Env: []corev1.EnvVar{
{
Name: "LINKERD2_PROXY_INBOUND_LISTEN_ADDR",
Value: "0.0.0.0:4143",
},
},
},
{
Name: "nginx",
Image: "nginx",
Expand Down Expand Up @@ -566,6 +597,46 @@ func TestGetProfiles(t *testing.T) {
if dstPod != "hostport-mapping-2" {
t.Fatalf("Expected dst_pod to be %s got %s", "hostport-mapping-2", dstPod)
}
if profile.OpaqueProtocol {
t.Fatal("Expected OpaqueProtocol=false")
}

// Server is created, setting the port to opaque
(*l5dClient).ServerV1beta1().Servers("ns").Create(context.Background(), &v1beta1.Server{
ObjectMeta: metav1.ObjectMeta{
Name: "srv-hostport-mapping-2",
Namespace: "ns",
},
Spec: v1beta1.ServerSpec{
PodSelector: &metav1.LabelSelector{
MatchLabels: map[string]string{
"app": "hostport-mapping-2",
},
},
Port: intstr.IntOrString{
Type: intstr.String,
StrVal: "nginx-7777",
},
ProxyProtocol: "opaque",
},
}, metav1.CreateOptions{})

var updates []*pb.DestinationProfile
err = testutil.RetryFor(time.Second*10, func() error {
updates = stream.Updates()
if len(updates) < 4 {
return fmt.Errorf("expected 4 updates, got %d", len(updates))
}
return nil
})
if err != nil {
t.Fatal(err)
}

profile = stream.Updates()[3]
if !profile.OpaqueProtocol {
t.Fatal("Expected OpaqueProtocol=true")
}

server.clusterStore.UnregisterGauges()
})
Expand Down Expand Up @@ -713,10 +784,9 @@ func assertSingleUpdate(t *testing.T, updates []*pb.Update) *pb.Update {
return updates[0]
}

func profileStream(t *testing.T, host string, port uint32, token string) (*bufferingGetProfileStream, *server) {
func profileStream(t *testing.T, server *server, host string, port uint32, token string) *bufferingGetProfileStream {
t.Helper()

server := makeServer(t)
stream := &bufferingGetProfileStream{
updates: []*pb.DestinationProfile{},
MockServerStream: util.NewMockServerStream(),
Expand All @@ -735,5 +805,5 @@ func profileStream(t *testing.T, host string, port uint32, token string) (*buffe
// Give GetProfile some slack
time.Sleep(50 * time.Millisecond)

return stream, server
return stream
}
12 changes: 9 additions & 3 deletions controller/api/destination/test_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,17 @@ import (
pb "github.com/linkerd/linkerd2-proxy-api/go/destination"
"github.com/linkerd/linkerd2/controller/api/destination/watcher"
"github.com/linkerd/linkerd2/controller/api/util"
l5dcrdclient "github.com/linkerd/linkerd2/controller/gen/client/clientset/versioned"
"github.com/linkerd/linkerd2/controller/k8s"
logging "github.com/sirupsen/logrus"
)

func makeServer(t *testing.T) *server {
srv, _ := getServerWithClient(t)
return srv
}

func getServerWithClient(t *testing.T) (*server, *l5dcrdclient.Interface) {
meshedPodResources := []string{`
apiVersion: v1
kind: Namespace
Expand Down Expand Up @@ -445,9 +451,9 @@ spec:
res = append(res, hostPortMapping...)
res = append(res, mirrorServiceResources...)
res = append(res, destinationCredentialsResources...)
k8sAPI, err := k8s.NewFakeAPI(res...)
k8sAPI, l5dClient, err := k8s.NewFakeAPIWithL5dClient(res...)
if err != nil {
t.Fatalf("NewFakeAPI returned an error: %s", err)
t.Fatalf("NewFakeAPIWithL5dClient returned an error: %s", err)
}
metadataAPI, err := k8s.NewFakeMetadataAPI(nil)
if err != nil {
Expand Down Expand Up @@ -513,7 +519,7 @@ spec:
metadataAPI,
log,
make(<-chan struct{}),
}
}, l5dClient
}

type bufferingGetStream struct {
Expand Down
22 changes: 20 additions & 2 deletions controller/k8s/test_helper.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package k8s

import (
l5dcrdclient "github.com/linkerd/linkerd2/controller/gen/client/clientset/versioned"
"github.com/linkerd/linkerd2/pkg/k8s"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes"
clientsetscheme "k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/metadata/fake"
)
Expand All @@ -16,10 +18,26 @@ func NewFakeAPI(configs ...string) (*API, error) {
return nil, err
}

return NewFakeClusterScopedAPI(clientSet, spClientSet), nil
}

// NewFakeAPIWithL5dClient provides a mock Kubernetes API for testing like
// NewFakeAPI, but it also returns the mock client for linkerd CRDs
func NewFakeAPIWithL5dClient(configs ...string) (*API, *l5dcrdclient.Interface, error) {
clientSet, _, _, l5dClientSet, err := k8s.NewFakeClientSets(configs...)
if err != nil {
return nil, nil, err
}

return NewFakeClusterScopedAPI(clientSet, l5dClientSet), &l5dClientSet, nil
}

// NewFakeClusterScopedAPI provides a mock Kubernetes API for testing.
func NewFakeClusterScopedAPI(clientSet kubernetes.Interface, l5dClientSet l5dcrdclient.Interface) *API {
return NewClusterScopedAPI(
clientSet,
nil,
spClientSet,
l5dClientSet,
"fake",
CJ,
CM,
Expand All @@ -39,7 +57,7 @@ func NewFakeAPI(configs ...string) (*API, error) {
ES,
Srv,
Secret,
), nil
)
}

// NewFakeMetadataAPI provides a mock Kubernetes API for testing.
Expand Down
Loading