Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 45 additions & 26 deletions test/e2e/grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (

"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"

corev1 "k8s.io/api/core/v1"

Expand All @@ -49,7 +50,6 @@ import (
const (
grpcContainerConcurrency = 1
grpcMinScale = 3
defaultPort = "80"
)

type grpcTest func(*TestContext, string, string)
Expand All @@ -64,30 +64,35 @@ func hasPort(u string) bool {
return err == nil
}

func dial(host, domain string) (*grpc.ClientConn, error) {
func dial(ctx *TestContext, host, domain string) (*grpc.ClientConn, error) {
defaultPort := "80"
if test.ServingFlags.HTTPS {
defaultPort = "443"
}
if !hasPort(host) {
host = net.JoinHostPort(host, defaultPort)
}
if !hasPort(domain) {
domain = net.JoinHostPort(domain, defaultPort)
}

if host != domain {
// The host to connect and the domain accepted differ.
// We need to do grpc.WithAuthority(...) here.
return grpc.Dial(
host,
grpc.WithAuthority(domain),
grpc.WithInsecure(),
// Retrying DNS errors to avoid .xip.io issues.
grpc.WithDefaultCallOptions(grpc.WaitForReady(true)),
)
secureOpt := grpc.WithInsecure()
if test.ServingFlags.HTTPS {
tlsConfig := test.TLSClientConfig(context.Background(), ctx.t.Logf, ctx.clients)
// Set ServerName for pseudo hostname with TLS.
var err error
tlsConfig.ServerName, _, err = net.SplitHostPort(domain)
if err != nil {
return nil, err
}
secureOpt = grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig))
}
// This is a more preferred usage of the go-grpc client.

return grpc.Dial(
host,
grpc.WithInsecure(),
// Retrying DNS errors to avoid .xip.io issues.
grpc.WithAuthority(domain),
secureOpt,
// Retrying DNS errors to avoid .sslip.io issues.
grpc.WithDefaultCallOptions(grpc.WaitForReady(true)),
)
}
Expand All @@ -96,7 +101,7 @@ func unaryTest(ctx *TestContext, host, domain string) {
ctx.t.Helper()
ctx.t.Logf("Connecting to grpc-ping using host %q and authority %q", host, domain)
const want = "Hello!"
got, err := pingGRPC(host, domain, want)
got, err := pingGRPC(ctx, host, domain, want)
if err != nil {
ctx.t.Fatal("gRPC ping =", err)
}
Expand Down Expand Up @@ -150,7 +155,7 @@ func loadBalancingTest(ctx *TestContext, host, domain string) {
case <-stopChan:
return nil
default:
got, err := pingGRPC(host, domain, wantPrefix)
got, err := pingGRPC(ctx, host, domain, wantPrefix)
if err != nil {
return fmt.Errorf("ping gRPC error: %w", err)
}
Expand Down Expand Up @@ -190,7 +195,7 @@ func loadBalancingTest(ctx *TestContext, host, domain string) {
}
}

func generateGRPCTraffic(concurrentRequests int, host, domain string, stopChan chan struct{}) error {
func generateGRPCTraffic(ctx *TestContext, concurrentRequests int, host, domain string, stopChan chan struct{}) error {
var grp errgroup.Group

for i := 0; i < concurrentRequests; i++ {
Expand All @@ -202,7 +207,7 @@ func generateGRPCTraffic(concurrentRequests int, host, domain string, stopChan c
return nil
default:
want := fmt.Sprintf("Hello! stream:%d request: %d", i, j)
got, err := pingGRPC(host, domain, want)
got, err := pingGRPC(ctx, host, domain, want)

if err != nil {
return fmt.Errorf("ping gRPC error: %w", err)
Expand All @@ -220,8 +225,8 @@ func generateGRPCTraffic(concurrentRequests int, host, domain string, stopChan c
return nil
}

func pingGRPC(host, domain, message string) (string, error) {
conn, err := dial(host, domain)
func pingGRPC(ctx *TestContext, host, domain, message string) (string, error) {
conn, err := dial(ctx, host, domain)
if err != nil {
return "", err
}
Expand Down Expand Up @@ -251,7 +256,7 @@ func assertGRPCAutoscaleUpToNumPods(ctx *TestContext, curPods, targetPods float6
var grp errgroup.Group

grp.Go(func() error {
return generateGRPCTraffic(int(targetPods*grpcContainerConcurrency), host, domain, stopChan)
return generateGRPCTraffic(ctx, int(targetPods*grpcContainerConcurrency), host, domain, stopChan)
})

grp.Go(func() error {
Expand All @@ -267,7 +272,7 @@ func assertGRPCAutoscaleUpToNumPods(ctx *TestContext, curPods, targetPods float6
func streamTest(tc *TestContext, host, domain string) {
tc.t.Helper()
tc.t.Logf("Connecting to grpc-ping using host %q and authority %q", host, domain)
conn, err := dial(host, domain)
conn, err := dial(tc, host, domain)
if err != nil {
tc.t.Fatal("Fail to dial:", err)
}
Expand Down Expand Up @@ -317,15 +322,25 @@ func streamTest(tc *TestContext, host, domain string) {

func testGRPC(t *testing.T, f grpcTest, fopts ...rtesting.ServiceOption) {
t.Helper()
t.Parallel()
// TODO: https option with parallel leads to flakes.
// https://github.com/knative/serving/issues/11387
if !test.ServingFlags.HTTPS {
t.Parallel()
}

// Setup
clients := Setup(t)

t.Log("Creating service for grpc-ping")

svcName := test.ObjectNameForTest(t)
// Long name hits this issue https://github.com/knative-sandbox/net-certmanager/issues/214
if t.Name() == "TestGRPCStreamingPingViaActivator" {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This very specific check makes me somewhat uncomfortable. Can we do something more generic? Do we need to make sure that the entire domain is < 64 bytes?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we need to make sure that the entire domain is 64 bytes...
So, I don't have good idea to make it generic. I updated but it would not be good code I think 😓

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I reverted my change. I think that the current specific workaround is simple and better. (But please let me know if you have a better idea 😅 )

svcName = test.AppendRandomString("grpc-streaming-pig-act")
}

names := &test.ResourceNames{
Service: test.ObjectNameForTest(t),
Service: svcName,
Image: "grpc-ping",
}

Expand Down Expand Up @@ -357,7 +372,11 @@ func testGRPC(t *testing.T, f grpcTest, fopts ...rtesting.ServiceOption) {
if err != nil {
t.Fatal("Could not get service endpoint:", err)
}
host = net.JoinHostPort(addr, mapper("80"))
if test.ServingFlags.HTTPS {
host = net.JoinHostPort(addr, mapper("443"))
} else {
host = net.JoinHostPort(addr, mapper("80"))
}
}

f(&TestContext{
Expand Down
15 changes: 14 additions & 1 deletion test/e2e/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,23 @@ func connect(t *testing.T, clients *test.Clients, domain string) (*websocket.Con
}

u := url.URL{Scheme: "ws", Host: net.JoinHostPort(address, mapper("80")), Path: "/"}
if test.ServingFlags.HTTPS {
u = url.URL{Scheme: "wss", Host: net.JoinHostPort(address, mapper("443")), Path: "/"}
}

var conn *websocket.Conn
waitErr := wait.PollImmediate(connectRetryInterval, connectTimeout, func() (bool, error) {
t.Logf("Connecting using websocket: url=%s, host=%s", u.String(), domain)
c, resp, err := websocket.DefaultDialer.Dial(u.String(), http.Header{"Host": {domain}})
dialer := &websocket.Dialer{
Proxy: http.ProxyFromEnvironment,
HandshakeTimeout: 45 * time.Second,
}
if test.ServingFlags.HTTPS {
dialer.TLSClientConfig = test.TLSClientConfig(context.Background(), t.Logf, clients)
dialer.TLSClientConfig.ServerName = domain // Set ServerName for pseudo hostname with TLS.
}

c, resp, err := dialer.Dial(u.String(), http.Header{"Host": {domain}})
if err == nil {
t.Log("WebSocket connection established.")
conn = c
Expand Down
26 changes: 22 additions & 4 deletions test/e2e/websocket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,11 @@ func webSocketResponseFreqs(t *testing.T, clients *test.Clients, url string, num
// (3) sends a message, and
// (4) verifies that we receive back the same message.
func TestWebSocket(t *testing.T) {
t.Parallel()
// TODO: https option with parallel leads to flakes.
// https://github.com/knative/serving/issues/11387
if !test.ServingFlags.HTTPS {
t.Parallel()
}

clients := Setup(t)

Expand All @@ -146,7 +150,11 @@ func TestWebSocket(t *testing.T) {

// and with -1 as target burst capacity and then validates that we can still serve.
func TestWebSocketViaActivator(t *testing.T) {
t.Parallel()
// TODO: https option with parallel leads to flakes.
// https://github.com/knative/serving/issues/11387
if !test.ServingFlags.HTTPS {
t.Parallel()
}

clients := Setup(t)

Expand Down Expand Up @@ -182,12 +190,22 @@ func TestWebSocketViaActivator(t *testing.T) {
}

func TestWebSocketBlueGreenRoute(t *testing.T) {
t.Parallel()
// TODO: https option with parallel leads to flakes.
// https://github.com/knative/serving/issues/11387
if !test.ServingFlags.HTTPS {
t.Parallel()
}
clients := test.Setup(t)

svcName := test.ObjectNameForTest(t)
// Long name hits this issue https://github.com/knative-sandbox/net-certmanager/issues/214
if test.ServingFlags.HTTPS {
svcName = test.AppendRandomString("web-socket-blue-green")
}

names := test.ResourceNames{
// Set Service and Image for names to create the initial service
Service: test.ObjectNameForTest(t),
Service: svcName,
Image: wsServerTestImageName,
}

Expand Down
12 changes: 8 additions & 4 deletions test/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,17 +72,21 @@ func AddRootCAtoTransport(ctx context.Context, logf logging.FormatLogger, client
return transport
}
}
return func(transport *http.Transport) *http.Transport {
transport.TLSClientConfig = TLSClientConfig(ctx, logf, clients)
return transport
}
}

func TLSClientConfig(ctx context.Context, logf logging.FormatLogger, clients *Clients) *tls.Config {
rootCAs, _ := x509.SystemCertPool()
if rootCAs == nil {
rootCAs = x509.NewCertPool()
}
if !rootCAs.AppendCertsFromPEM(PemDataFromSecret(ctx, logf, clients, caSecretNamespace, caSecretName)) {
logf("Failed to add the certificate to the root CA")
}
return func(transport *http.Transport) *http.Transport {
transport.TLSClientConfig = &tls.Config{RootCAs: rootCAs}
return transport
}
return &tls.Config{RootCAs: rootCAs}
}

// PemDataFromSecret gets pem data from secret.
Expand Down