Skip to content
This repository was archived by the owner on Dec 1, 2022. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 45 additions & 26 deletions test/e2e/grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (

"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"

corev1 "k8s.io/api/core/v1"

Expand All @@ -49,7 +50,6 @@ import (
const (
grpcContainerConcurrency = 1
grpcMinScale = 3
defaultPort = "80"
)

type grpcTest func(*TestContext, string, string)
Expand All @@ -64,30 +64,35 @@ func hasPort(u string) bool {
return err == nil
}

func dial(host, domain string) (*grpc.ClientConn, error) {
func dial(ctx *TestContext, host, domain string) (*grpc.ClientConn, error) {
defaultPort := "80"
if test.ServingFlags.HTTPS {
defaultPort = "443"
}
if !hasPort(host) {
host = net.JoinHostPort(host, defaultPort)
}
if !hasPort(domain) {
domain = net.JoinHostPort(domain, defaultPort)
}

if host != domain {
// The host to connect and the domain accepted differ.
// We need to do grpc.WithAuthority(...) here.
return grpc.Dial(
host,
grpc.WithAuthority(domain),
grpc.WithInsecure(),
// Retrying DNS errors to avoid .xip.io issues.
grpc.WithDefaultCallOptions(grpc.WaitForReady(true)),
)
secureOpt := grpc.WithInsecure()
if test.ServingFlags.HTTPS {
tlsConfig := test.TLSClientConfig(context.Background(), ctx.t.Logf, ctx.clients)
// Set ServerName for pseudo hostname with TLS.
var err error
tlsConfig.ServerName, _, err = net.SplitHostPort(domain)
if err != nil {
return nil, err
}
secureOpt = grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig))
}
// This is a more preferred usage of the go-grpc client.

return grpc.Dial(
host,
grpc.WithInsecure(),
// Retrying DNS errors to avoid .xip.io issues.
grpc.WithAuthority(domain),
secureOpt,
// Retrying DNS errors to avoid .sslip.io issues.
grpc.WithDefaultCallOptions(grpc.WaitForReady(true)),
)
}
Expand All @@ -96,7 +101,7 @@ func unaryTest(ctx *TestContext, host, domain string) {
ctx.t.Helper()
ctx.t.Logf("Connecting to grpc-ping using host %q and authority %q", host, domain)
const want = "Hello!"
got, err := pingGRPC(host, domain, want)
got, err := pingGRPC(ctx, host, domain, want)
if err != nil {
ctx.t.Fatal("gRPC ping =", err)
}
Expand Down Expand Up @@ -150,7 +155,7 @@ func loadBalancingTest(ctx *TestContext, host, domain string) {
case <-stopChan:
return nil
default:
got, err := pingGRPC(host, domain, wantPrefix)
got, err := pingGRPC(ctx, host, domain, wantPrefix)
if err != nil {
return fmt.Errorf("ping gRPC error: %w", err)
}
Expand Down Expand Up @@ -190,7 +195,7 @@ func loadBalancingTest(ctx *TestContext, host, domain string) {
}
}

func generateGRPCTraffic(concurrentRequests int, host, domain string, stopChan chan struct{}) error {
func generateGRPCTraffic(ctx *TestContext, concurrentRequests int, host, domain string, stopChan chan struct{}) error {
var grp errgroup.Group

for i := 0; i < concurrentRequests; i++ {
Expand All @@ -202,7 +207,7 @@ func generateGRPCTraffic(concurrentRequests int, host, domain string, stopChan c
return nil
default:
want := fmt.Sprintf("Hello! stream:%d request: %d", i, j)
got, err := pingGRPC(host, domain, want)
got, err := pingGRPC(ctx, host, domain, want)

if err != nil {
return fmt.Errorf("ping gRPC error: %w", err)
Expand All @@ -220,8 +225,8 @@ func generateGRPCTraffic(concurrentRequests int, host, domain string, stopChan c
return nil
}

func pingGRPC(host, domain, message string) (string, error) {
conn, err := dial(host, domain)
func pingGRPC(ctx *TestContext, host, domain, message string) (string, error) {
conn, err := dial(ctx, host, domain)
if err != nil {
return "", err
}
Expand Down Expand Up @@ -251,7 +256,7 @@ func assertGRPCAutoscaleUpToNumPods(ctx *TestContext, curPods, targetPods float6
var grp errgroup.Group

grp.Go(func() error {
return generateGRPCTraffic(int(targetPods*grpcContainerConcurrency), host, domain, stopChan)
return generateGRPCTraffic(ctx, int(targetPods*grpcContainerConcurrency), host, domain, stopChan)
})

grp.Go(func() error {
Expand All @@ -267,7 +272,7 @@ func assertGRPCAutoscaleUpToNumPods(ctx *TestContext, curPods, targetPods float6
func streamTest(tc *TestContext, host, domain string) {
tc.t.Helper()
tc.t.Logf("Connecting to grpc-ping using host %q and authority %q", host, domain)
conn, err := dial(host, domain)
conn, err := dial(tc, host, domain)
if err != nil {
tc.t.Fatal("Fail to dial:", err)
}
Expand Down Expand Up @@ -317,15 +322,25 @@ func streamTest(tc *TestContext, host, domain string) {

func testGRPC(t *testing.T, f grpcTest, fopts ...rtesting.ServiceOption) {
t.Helper()
t.Parallel()
// TODO: https option with parallel leads to flakes.
// https://github.com/knative/serving/issues/11387
if !test.ServingFlags.HTTPS {
t.Parallel()
}

// Setup
clients := Setup(t)

t.Log("Creating service for grpc-ping")

svcName := test.ObjectNameForTest(t)
// Long name hits this issue https://github.com/knative-sandbox/net-certmanager/issues/214
if t.Name() == "TestGRPCStreamingPingViaActivator" {
svcName = test.AppendRandomString("grpc-streaming-pig-act")
}

names := &test.ResourceNames{
Service: test.ObjectNameForTest(t),
Service: svcName,
Image: "grpc-ping",
}

Expand Down Expand Up @@ -357,7 +372,11 @@ func testGRPC(t *testing.T, f grpcTest, fopts ...rtesting.ServiceOption) {
if err != nil {
t.Fatal("Could not get service endpoint:", err)
}
host = net.JoinHostPort(addr, mapper("80"))
if test.ServingFlags.HTTPS {
host = net.JoinHostPort(addr, mapper("443"))
} else {
host = net.JoinHostPort(addr, mapper("80"))
}
}

f(&TestContext{
Expand Down
29 changes: 21 additions & 8 deletions test/e2e/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,20 +46,33 @@ func connect(t *testing.T, clients *test.Clients, domain string) (*websocket.Con
address string
)

address, mapper, err := ingress.GetIngressEndpoint(context.Background(), clients.KubeClient, pkgTest.Flags.IngressEndpoint)
if err != nil {
return nil, err
}
if test.ServingFlags.ResolvableDomain {
address = domain
mapper = func(in string) string { return in }
address = domain
mapper := func(in string) string { return in }
if !test.ServingFlags.ResolvableDomain {
address, mapper, err = ingress.GetIngressEndpoint(context.Background(), clients.KubeClient, pkgTest.Flags.IngressEndpoint)
if err != nil {
return nil, err
}
}

u := url.URL{Scheme: "ws", Host: net.JoinHostPort(address, mapper("80")), Path: "/"}
if test.ServingFlags.HTTPS {
u = url.URL{Scheme: "wss", Host: net.JoinHostPort(address, mapper("443")), Path: "/"}
}

var conn *websocket.Conn
waitErr := wait.PollImmediate(connectRetryInterval, connectTimeout, func() (bool, error) {
t.Logf("Connecting using websocket: url=%s, host=%s", u.String(), domain)
c, resp, err := websocket.DefaultDialer.Dial(u.String(), http.Header{"Host": {domain}})
dialer := &websocket.Dialer{
Proxy: http.ProxyFromEnvironment,
HandshakeTimeout: 45 * time.Second,
}
if test.ServingFlags.HTTPS {
dialer.TLSClientConfig = test.TLSClientConfig(context.Background(), t.Logf, clients)
dialer.TLSClientConfig.ServerName = domain // Set ServerName for pseudo hostname with TLS.
}

c, resp, err := dialer.Dial(u.String(), http.Header{"Host": {domain}})
if err == nil {
t.Log("WebSocket connection established.")
conn = c
Expand Down
26 changes: 22 additions & 4 deletions test/e2e/websocket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,11 @@ func webSocketResponseFreqs(t *testing.T, clients *test.Clients, url string, num
// (3) sends a message, and
// (4) verifies that we receive back the same message.
func TestWebSocket(t *testing.T) {
t.Parallel()
// TODO: https option with parallel leads to flakes.
// https://github.com/knative/serving/issues/11387
if !test.ServingFlags.HTTPS {
t.Parallel()
}

clients := Setup(t)

Expand All @@ -146,7 +150,11 @@ func TestWebSocket(t *testing.T) {

// and with -1 as target burst capacity and then validates that we can still serve.
func TestWebSocketViaActivator(t *testing.T) {
t.Parallel()
// TODO: https option with parallel leads to flakes.
// https://github.com/knative/serving/issues/11387
if !test.ServingFlags.HTTPS {
t.Parallel()
}

clients := Setup(t)

Expand Down Expand Up @@ -182,12 +190,22 @@ func TestWebSocketViaActivator(t *testing.T) {
}

func TestWebSocketBlueGreenRoute(t *testing.T) {
t.Parallel()
// TODO: https option with parallel leads to flakes.
// https://github.com/knative/serving/issues/11387
if !test.ServingFlags.HTTPS {
t.Parallel()
}
clients := test.Setup(t)

svcName := test.ObjectNameForTest(t)
// Long name hits this issue https://github.com/knative-sandbox/net-certmanager/issues/214
if test.ServingFlags.HTTPS {
svcName = test.AppendRandomString("web-socket-blue-green")
}

names := test.ResourceNames{
// Set Service and Image for names to create the initial service
Service: test.ObjectNameForTest(t),
Service: svcName,
Image: wsServerTestImageName,
}

Expand Down
12 changes: 8 additions & 4 deletions test/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,17 +72,21 @@ func AddRootCAtoTransport(ctx context.Context, logf logging.FormatLogger, client
return transport
}
}
return func(transport *http.Transport) *http.Transport {
transport.TLSClientConfig = TLSClientConfig(ctx, logf, clients)
return transport
}
}

func TLSClientConfig(ctx context.Context, logf logging.FormatLogger, clients *Clients) *tls.Config {
rootCAs, _ := x509.SystemCertPool()
if rootCAs == nil {
rootCAs = x509.NewCertPool()
}
if !rootCAs.AppendCertsFromPEM(PemDataFromSecret(ctx, logf, clients, caSecretNamespace, caSecretName)) {
logf("Failed to add the certificate to the root CA")
}
return func(transport *http.Transport) *http.Transport {
transport.TLSClientConfig = &tls.Config{RootCAs: rootCAs}
return transport
}
return &tls.Config{RootCAs: rootCAs}
}

// PemDataFromSecret gets pem data from secret.
Expand Down