diff --git a/integration/helpers/helpers.go b/integration/helpers/helpers.go index d2c14bdbfb28e..9a6a27686fbd2 100644 --- a/integration/helpers/helpers.go +++ b/integration/helpers/helpers.go @@ -448,3 +448,16 @@ func MakeTestDatabaseServer(t *testing.T, proxyAddr utils.NetAddr, token string, return db } + +// MustCreateListener creates a tcp listener at 127.0.0.1 with random port. +func MustCreateListener(t *testing.T) net.Listener { + t.Helper() + + listener, err := net.Listen("tcp", "127.0.0.1:0") + require.NoError(t, err) + + t.Cleanup(func() { + listener.Close() + }) + return listener +} diff --git a/integration/helpers/instance.go b/integration/helpers/instance.go index 2d9a36361a8cb..4245f19a98fc3 100644 --- a/integration/helpers/instance.go +++ b/integration/helpers/instance.go @@ -1017,6 +1017,8 @@ type ProxyConfig struct { SSHAddr string // WebAddr the address the web service should listen on WebAddr string + // KubeAddr is the kube proxy address. + KubeAddr string // ReverseTunnelAddr the address the reverse proxy service should listen on ReverseTunnelAddr string // Disable the web service @@ -1281,17 +1283,21 @@ func (i *TeleInstance) NewUnauthenticatedClient(cfg ClientConfig) (tc *client.Te var webProxyAddr string var sshProxyAddr string + var kubeProxyAddr string switch { case cfg.Proxy != nil: webProxyAddr = cfg.Proxy.WebAddr sshProxyAddr = cfg.Proxy.SSHAddr + kubeProxyAddr = cfg.Proxy.KubeAddr case cfg.ALBAddr != "": webProxyAddr = cfg.ALBAddr sshProxyAddr = cfg.ALBAddr + kubeProxyAddr = cfg.ALBAddr default: webProxyAddr = i.Web sshProxyAddr = i.SSHProxy + kubeProxyAddr = i.Config.Proxy.Kube.ListenAddr.Addr } fwdAgentMode := client.ForwardAgentNo @@ -1311,6 +1317,7 @@ func (i *TeleInstance) NewUnauthenticatedClient(cfg ClientConfig) (tc *client.Te Labels: cfg.Labels, WebProxyAddr: webProxyAddr, SSHProxyAddr: sshProxyAddr, + KubeProxyAddr: kubeProxyAddr, InteractiveCommand: cfg.Interactive, TLSRoutingEnabled: i.IsSinglePortSetup, TLSRoutingConnUpgradeRequired: cfg.ALBAddr != "", diff --git a/integration/helpers/proxy.go b/integration/helpers/proxy.go index e0a25822d4996..359f4caf1c0cd 100644 --- a/integration/helpers/proxy.go +++ b/integration/helpers/proxy.go @@ -15,15 +15,23 @@ package helpers import ( + "context" + "crypto/tls" "fmt" "io" "net" "net/http" "net/url" "sync" + "testing" "time" "github.com/gravitational/trace" + "github.com/sirupsen/logrus" + "github.com/stretchr/testify/require" + + "github.com/gravitational/teleport/api/fixtures" + "github.com/gravitational/teleport/lib/utils" ) type ProxyHandler struct { @@ -201,3 +209,73 @@ func MakeProxyAddr(user, pass, host string) string { userPass := url.UserPassword(user, pass).String() return fmt.Sprintf("%v@%v", userPass, host) } + +// MockAWSALBProxy is a mock proxy server that simulates an AWS application +// load balancer where ALPN is not supported. Note that this mock does not +// actually balance traffic. +type MockAWSALBProxy struct { + net.Listener + proxyAddr string + cert tls.Certificate +} + +func (m *MockAWSALBProxy) serve(ctx context.Context) { + for { + select { + case <-ctx.Done(): + return + default: + } + + conn, err := m.Accept() + if err != nil { + logrus.WithError(err).Debugf("Failed to accept conn.") + return + } + + go func() { + defer conn.Close() + + // Handshake with incoming client and drops ALPN. + downstreamConn := tls.Server(conn, &tls.Config{ + Certificates: []tls.Certificate{m.cert}, + }) + + // api.Client may try different connection methods. Just close the + // connection when something goes wrong. + if err := downstreamConn.HandshakeContext(ctx); err != nil { + logrus.WithError(err).Debugf("Failed to handshake.") + return + } + + // Make a connection to the proxy server with ALPN protos. + upstreamConn, err := tls.Dial("tcp", m.proxyAddr, &tls.Config{ + InsecureSkipVerify: true, + }) + if err != nil { + logrus.WithError(err).Debugf("Failed to dial upstream.") + return + } + utils.ProxyConn(ctx, downstreamConn, upstreamConn) + }() + } +} + +// MustStartMockALBProxy creates and starts a MockAWSALBProxy. +func MustStartMockALBProxy(t *testing.T, proxyAddr string) *MockAWSALBProxy { + t.Helper() + + cert, err := tls.X509KeyPair([]byte(fixtures.TLSCACertPEM), []byte(fixtures.TLSCAKeyPEM)) + require.NoError(t, err) + + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + m := &MockAWSALBProxy{ + proxyAddr: proxyAddr, + Listener: MustCreateListener(t), + cert: cert, + } + go m.serve(ctx) + return m +} diff --git a/integration/kube_integration_test.go b/integration/kube_integration_test.go index 473a9b4808f37..59f029e3f0f10 100644 --- a/integration/kube_integration_test.go +++ b/integration/kube_integration_test.go @@ -50,6 +50,7 @@ import ( "github.com/gravitational/teleport" "github.com/gravitational/teleport/api/breaker" + "github.com/gravitational/teleport/api/constants" apidefaults "github.com/gravitational/teleport/api/defaults" "github.com/gravitational/teleport/api/profile" "github.com/gravitational/teleport/api/types" @@ -1508,13 +1509,13 @@ func kubeExec(kubeConfig *rest.Config, args kubeExecArgs) error { return executor.StreamWithContext(context.Background(), opts) } -func kubeJoin(kubeConfig kube.ProxyConfig, tc *client.TeleportClient, meta types.SessionTracker) (*client.KubeSession, error) { +func kubeJoin(kubeConfig kube.ProxyConfig, tc *client.TeleportClient, meta types.SessionTracker, mode types.SessionParticipantMode) (*client.KubeSession, error) { tlsConfig, err := kubeProxyTLSConfig(kubeConfig) if err != nil { return nil, trace.Wrap(err) } - sess, err := client.NewKubeSession(context.TODO(), tc, meta, kubeConfig.T.Config.Proxy.Kube.ListenAddr.Addr, "", types.SessionPeerMode, tlsConfig) + sess, err := client.NewKubeSession(context.TODO(), tc, meta, tc.KubeProxyAddr, "", mode, tlsConfig) if err != nil { return nil, trace.Wrap(err) } @@ -1561,7 +1562,7 @@ func testKubeJoin(t *testing.T, suite *KubeSuite) { Name: "foo", Roles: []string{"kubemaster"}, Kinds: []string{string(types.KubernetesSessionKind)}, - Modes: []string{string(types.SessionPeerMode)}, + Modes: []string{string(types.SessionPeerMode), string(types.SessionObserverMode)}, }}, }, }) @@ -1628,25 +1629,47 @@ func testKubeJoin(t *testing.T, suite *KubeSuite) { participantStdinR, participantStdinW := io.Pipe() participantStdoutR, participantStdoutW := io.Pipe() + streams := make([]*client.KubeSession, 0, 3) + observerCaptures := make([]*bytes.Buffer, 0, 2) + albProxy := helpers.MustStartMockALBProxy(t, teleport.Config.Proxy.WebAddr.Addr) + + t.Run("join peer by KubeProxyAddr", func(t *testing.T) { + tc, err := teleport.NewClient(helpers.ClientConfig{ + Login: hostUsername, + Cluster: helpers.Site, + Host: Host, + }) + require.NoError(t, err) - tc, err := teleport.NewClient(helpers.ClientConfig{ - Login: hostUsername, - Cluster: helpers.Site, - Host: Host, + tc.Stdin = participantStdinR + tc.Stdout = participantStdoutW + + stream, err := kubeJoin(kube.ProxyConfig{ + T: teleport, + Username: participantUsername, + KubeUsers: kubeUsers, + KubeGroups: kubeGroups, + }, tc, session, types.SessionPeerMode) + require.NoError(t, err) + streams = append(streams, stream) }) - require.NoError(t, err) - tc.Stdin = participantStdinR - tc.Stdout = participantStdoutW + t.Run("join observer by WebProxyAddr", func(t *testing.T) { + stream, capture := kubeJoinByWebAddr(t, teleport, participantUsername, kubeUsers, kubeGroups) + streams = append(streams, stream) + observerCaptures = append(observerCaptures, capture) + }) + t.Run("join observer with ALPN conn upgrade", func(t *testing.T) { + stream, capture := kubeJoinByALBAddr(t, teleport, participantUsername, kubeUsers, kubeGroups, albProxy.Addr().String()) + streams = append(streams, stream) + observerCaptures = append(observerCaptures, capture) + }) - stream, err := kubeJoin(kube.ProxyConfig{ - T: teleport, - Username: participantUsername, - KubeUsers: kubeUsers, - KubeGroups: kubeGroups, - }, tc, session) - require.NoError(t, err) - defer stream.Close() + require.Len(t, observerCaptures, 2) + require.Len(t, streams, 3) + for _, stream := range streams { + defer stream.Close() + } // We wait again for the second user to finish joining the session. // We allow a bit of time to pass here to give the session manager time to recognize the @@ -1654,10 +1677,10 @@ func testKubeJoin(t *testing.T, suite *KubeSuite) { time.Sleep(time.Second * 5) // sent a test message from the participant - participantStdinW.Write([]byte("\aecho hi2\n\r")) + participantStdinW.Write([]byte("\ahi from peer\n\r")) // lets type "echo hi" followed by "enter" and then "exit" + "enter": - term.Type("\aecho hi\n\r") + term.Type("\ahi from term\n\r") // Terminate the session after a moment to allow for the IO to reach the second client. time.AfterFunc(5*time.Second, func() { @@ -1665,8 +1688,72 @@ func testKubeJoin(t *testing.T, suite *KubeSuite) { participantStdoutW.Close() }) - participantOutput, err := io.ReadAll(participantStdoutR) + t.Run("verify output", func(t *testing.T) { + // Verify peer. + participantOutput, err := io.ReadAll(participantStdoutR) + require.NoError(t, err) + require.Contains(t, string(participantOutput), "hi from term") + + // Verify original session. + require.Contains(t, out.String(), "hi from peer") + + // Verify observers. + for _, capture := range observerCaptures { + require.Contains(t, capture.String(), "hi from peer") + require.Contains(t, capture.String(), "hi from term") + } + }) +} + +func kubeJoinByWebAddr(t *testing.T, teleport *helpers.TeleInstance, username string, kubeUsers, kubeGroups []string) (*client.KubeSession, *bytes.Buffer) { + t.Helper() + + tc, err := teleport.NewClient(helpers.ClientConfig{ + Login: username, + Cluster: helpers.Site, + Host: Host, + Proxy: &helpers.ProxyConfig{ + WebAddr: teleport.Config.Proxy.WebAddr.Addr, + KubeAddr: teleport.Config.Proxy.WebAddr.Addr, + }, + }) require.NoError(t, err) - require.Contains(t, string(participantOutput), "echo hi") - require.Contains(t, out.String(), "echo hi2") + + buffer := new(bytes.Buffer) + tc.Stdout = buffer + return kubeJoinObserverWithSNISet(t, tc, teleport, kubeUsers, kubeGroups), buffer +} + +func kubeJoinByALBAddr(t *testing.T, teleport *helpers.TeleInstance, username string, kubeUsers, kubeGroups []string, albAddr string) (*client.KubeSession, *bytes.Buffer) { + t.Helper() + + tc, err := teleport.NewClient(helpers.ClientConfig{ + Login: username, + Cluster: helpers.Site, + Host: Host, + ALBAddr: albAddr, + }) + require.NoError(t, err) + + buffer := new(bytes.Buffer) + tc.Stdout = buffer + return kubeJoinObserverWithSNISet(t, tc, teleport, kubeUsers, kubeGroups), buffer +} + +func kubeJoinObserverWithSNISet(t *testing.T, tc *client.TeleportClient, teleport *helpers.TeleInstance, kubeUsers, kubeGroups []string) *client.KubeSession { + t.Helper() + + sessions, err := teleport.Process.GetAuthServer().GetActiveSessionTrackers(context.Background()) + require.NoError(t, err) + require.Greater(t, len(sessions), 0) + + stream, err := kubeJoin(kube.ProxyConfig{ + T: teleport, + Username: tc.Username, + KubeUsers: kubeUsers, + KubeGroups: kubeGroups, + CustomTLSServerName: constants.KubeTeleportProxyALPNPrefix + Host, + }, tc, sessions[0], types.SessionObserverMode) + require.NoError(t, err) + return stream } diff --git a/integration/proxy/proxy_helpers.go b/integration/proxy/proxy_helpers.go index eee6649fd9214..ee42ca1bf82f3 100644 --- a/integration/proxy/proxy_helpers.go +++ b/integration/proxy/proxy_helpers.go @@ -20,7 +20,6 @@ import ( "bytes" "context" "crypto/tls" - "crypto/x509/pkix" "encoding/json" "fmt" "io" @@ -38,7 +37,6 @@ import ( "github.com/gravitational/trace" "github.com/jackc/pgconn" "github.com/jonboulle/clockwork" - "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "golang.org/x/crypto/ssh" @@ -425,7 +423,7 @@ func withTrustedClusterBehindALB() proxySuiteOptionsFunc { } require.NotNil(t, options.trustedCluster) - albProxy := mustStartMockALBProxy(t, suite.root.Config.Proxy.WebAddr.Addr) + albProxy := helpers.MustStartMockALBProxy(t, suite.root.Config.Proxy.WebAddr.Addr) options.trustedCluster.SetProxyAddress(albProxy.Addr().String()) options.trustedCluster.SetReverseTunnelAddress(albProxy.Addr().String()) } @@ -511,18 +509,6 @@ func mustCreateKubeConfigFile(t *testing.T, config clientcmdapi.Config) string { return configPath } -func mustCreateListener(t *testing.T) net.Listener { - t.Helper() - - listener, err := net.Listen("tcp", "127.0.0.1:0") - require.NoError(t, err) - - t.Cleanup(func() { - listener.Close() - }) - return listener -} - func mustCreateKubeLocalProxyListener(t *testing.T, teleportCluster string, caCert, caKey []byte) net.Listener { t.Helper() @@ -550,7 +536,7 @@ func mustStartALPNLocalProxyWithConfig(t *testing.T, config alpnproxy.LocalProxy t.Helper() if config.Listener == nil { - config.Listener = mustCreateListener(t) + config.Listener = helpers.MustCreateListener(t) } if config.ParentContext == nil { config.ParentContext = context.TODO() @@ -615,90 +601,6 @@ func makeNodeConfig(nodeName, proxyAddr string) *servicecfg.Config { return nodeConfig } -func mustCreateSelfSignedCert(t *testing.T) tls.Certificate { - t.Helper() - - caKey, caCert, err := tlsca.GenerateSelfSignedCA(pkix.Name{ - CommonName: "localhost", - }, []string{"localhost"}, defaults.CATTL) - require.NoError(t, err) - - cert, err := tls.X509KeyPair(caCert, caKey) - require.NoError(t, err) - return cert -} - -// mockAWSALBProxy is a mock proxy server that simulates an AWS application -// load balancer where ALPN is not supported. Note that this mock does not -// actually balance traffic. -type mockAWSALBProxy struct { - net.Listener - proxyAddr string - cert tls.Certificate -} - -func (m *mockAWSALBProxy) serve(ctx context.Context) { - for { - select { - case <-ctx.Done(): - return - default: - } - - conn, err := m.Accept() - if err != nil { - if utils.IsUseOfClosedNetworkError(err) { - continue - } - - logrus.WithError(err).Debugf("Failed to accept conn.") - return - } - - go func() { - defer conn.Close() - - // Handshake with incoming client and drops ALPN. - downstreamConn := tls.Server(conn, &tls.Config{ - Certificates: []tls.Certificate{m.cert}, - ClientAuth: tls.NoClientCert, - }) - - // api.Client may try different connection methods. Just close the - // connection when something goes wrong. - if err := downstreamConn.HandshakeContext(ctx); err != nil { - logrus.WithError(err).Debugf("Failed to handshake.") - return - } - - // Make a connection to the proxy server with ALPN protos. - upstreamConn, err := tls.Dial("tcp", m.proxyAddr, &tls.Config{ - InsecureSkipVerify: true, - }) - if err != nil { - logrus.WithError(err).Debugf("Failed to dial upstream.") - return - } - utils.ProxyConn(ctx, downstreamConn, upstreamConn) - }() - } -} - -func mustStartMockALBProxy(t *testing.T, proxyAddr string) *mockAWSALBProxy { - t.Helper() - - ctx, cancel := context.WithCancel(context.Background()) - t.Cleanup(cancel) - - m := &mockAWSALBProxy{ - proxyAddr: proxyAddr, - Listener: mustCreateListener(t), - cert: mustCreateSelfSignedCert(t), - } - go m.serve(ctx) - return m -} - // waitForActivePeerProxyConnections waits for remote cluster to report a minimum number of active proxy peer connections func waitForActivePeerProxyConnections(t *testing.T, tunnel reversetunnel.Server, expectedCount int) { //nolint:unused // Only used by skipped test TestProxyTunnelStrategyProxyPeering require.Eventually(t, func() bool { diff --git a/integration/proxy/proxy_test.go b/integration/proxy/proxy_test.go index 1b1c046d650bb..460edc8aed746 100644 --- a/integration/proxy/proxy_test.go +++ b/integration/proxy/proxy_test.go @@ -141,7 +141,7 @@ func TestALPNSNIProxyMultiCluster(t *testing.T) { if tc.testALPNConnUpgrade { t.Run("ALPN conn upgrade", func(t *testing.T) { // Make a mock ALB which points to the Teleport Proxy Service. - albProxy := mustStartMockALBProxy(t, suite.root.Config.Proxy.WebAddr.Addr) + albProxy := helpers.MustStartMockALBProxy(t, suite.root.Config.Proxy.WebAddr.Addr) // Run command in root through ALB address. suite.mustConnectToClusterAndRunSSHCommand(t, helpers.ClientConfig{ @@ -400,7 +400,7 @@ func TestALPNSNIProxyKube(t *testing.T) { // Make a mock ALB which points to the Teleport Proxy Service. Then // ALPN local proxies will point to this ALB instead. - albProxy := mustStartMockALBProxy(t, suite.root.Config.Proxy.WebAddr.Addr) + albProxy := helpers.MustStartMockALBProxy(t, suite.root.Config.Proxy.WebAddr.Addr) // Generate a self-signed CA for kube local proxy. localCAKey, localCACert, err := tlsca.GenerateSelfSignedCA(pkix.Name{ @@ -888,7 +888,7 @@ func TestALPNSNIProxyDatabaseAccess(t *testing.T) { t.Run("ALPN connection upgrade", func(t *testing.T) { // Make a mock ALB which points to the Teleport Proxy Service. Then // ALPN local proxies will point to this ALB instead. - albProxy := mustStartMockALBProxy(t, pack.Root.Cluster.Web) + albProxy := helpers.MustStartMockALBProxy(t, pack.Root.Cluster.Web) // Test a protocol in the alpncommon.IsDBTLSProtocol list where // the database client will perform a native TLS handshake. @@ -1104,7 +1104,7 @@ func TestALPNSNIProxyAppAccess(t *testing.T) { // Make a mock ALB which points to the Teleport Proxy Service. Then // ALPN local proxies will point to this ALB instead. - albProxy := mustStartMockALBProxy(t, pack.RootWebAddr()) + albProxy := helpers.MustStartMockALBProxy(t, pack.RootWebAddr()) lp := mustStartALPNLocalProxyWithConfig(t, alpnproxy.LocalProxyConfig{ RemoteProxyAddr: albProxy.Addr().String(), @@ -1214,7 +1214,7 @@ func TestALPNProxyAuthClientConnectWithUserIdentity(t *testing.T) { // Make a mock ALB which points to the Teleport Proxy Service. Then // client can point to this ALB instead. - albProxy := mustStartMockALBProxy(t, rc.Web) + albProxy := helpers.MustStartMockALBProxy(t, rc.Web) tests := []struct { name string @@ -1535,7 +1535,7 @@ func TestALPNSNIProxyGRPCInsecure(t *testing.T) { // Test register through Proxy behind a L7 load balancer. t.Run("ALPN conn upgrade", func(t *testing.T) { - albProxy := mustStartMockALBProxy(t, suite.root.Config.Proxy.WebAddr.Addr) + albProxy := helpers.MustStartMockALBProxy(t, suite.root.Config.Proxy.WebAddr.Addr) albAddr, err := utils.ParseAddr(albProxy.Addr().String()) require.NoError(t, err) @@ -1601,7 +1601,7 @@ func TestALPNSNIProxyGRPCSecure(t *testing.T) { }) t.Run("ALPN conn upgrade", func(t *testing.T) { // Make a mock ALB which points to the Teleport Proxy Service. - albProxy := mustStartMockALBProxy(t, suite.root.Config.Proxy.WebAddr.Addr) + albProxy := helpers.MustStartMockALBProxy(t, suite.root.Config.Proxy.WebAddr.Addr) tc, err := suite.root.NewClient(helpers.ClientConfig{ Login: username, diff --git a/integration/proxy/teleterm_test.go b/integration/proxy/teleterm_test.go index 94b1e9afe26cd..dd735bfa82110 100644 --- a/integration/proxy/teleterm_test.go +++ b/integration/proxy/teleterm_test.go @@ -66,7 +66,7 @@ func testTeletermGatewaysCertRenewal(t *testing.T, pack *dbhelpers.DatabasePack) t.Run("ALPN connection upgrade", func(t *testing.T) { // Make a mock ALB which points to the Teleport Proxy Service. Then // ALPN local proxies will point to this ALB instead. - albProxy := mustStartMockALBProxy(t, pack.Root.Cluster.Web) + albProxy := helpers.MustStartMockALBProxy(t, pack.Root.Cluster.Web) databaseURI := uri.NewClusterURI(rootClusterName). AppendDB(pack.Root.MysqlService.Name) diff --git a/lib/client/kubesession.go b/lib/client/kubesession.go index 0e260e286dae0..8b07e99b0b1c8 100644 --- a/lib/client/kubesession.go +++ b/lib/client/kubesession.go @@ -28,6 +28,8 @@ import ( "github.com/gravitational/trace" "k8s.io/client-go/tools/remotecommand" + "github.com/gravitational/teleport/api/client" + "github.com/gravitational/teleport/api/defaults" "github.com/gravitational/teleport/api/types" "github.com/gravitational/teleport/lib/client/terminal" "github.com/gravitational/teleport/lib/kube/proxy/streamproto" @@ -55,6 +57,7 @@ func NewKubeSession(ctx context.Context, tc *TeleportClient, meta types.SessionT } dialer := &websocket.Dialer{ + NetDialContext: kubeSessionNetDialer(ctx, tc, kubeAddr).DialContext, TLSClientConfig: tlsConfig, } @@ -118,6 +121,28 @@ func NewKubeSession(ctx context.Context, tc *TeleportClient, meta types.SessionT return s, nil } +func kubeSessionNetDialer(ctx context.Context, tc *TeleportClient, kubeAddr string) client.ContextDialer { + dialOpts := []client.DialOption{ + client.WithInsecureSkipVerify(tc.InsecureSkipVerify), + } + + // Add options for ALPN connection upgrade only if kube is served at Proxy + // web address. + if tc.WebProxyAddr == kubeAddr && tc.TLSRoutingConnUpgradeRequired { + dialOpts = append(dialOpts, + client.WithALPNConnUpgrade(tc.TLSRoutingConnUpgradeRequired), + client.WithALPNConnUpgradePing(true), // Use Ping protocol for long-lived connections. + ) + } + + return client.NewDialer( + ctx, + defaults.DefaultIdleTimeout, + defaults.DefaultIOTimeout, + dialOpts..., + ) +} + func handleOutgoingResizeEvents(ctx context.Context, stream *streamproto.SessionStream, term *terminal.Terminal) { queue := stream.ResizeQueue()