Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions integration/helpers/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -448,3 +448,16 @@ func MakeTestDatabaseServer(t *testing.T, proxyAddr utils.NetAddr, token string,

return db
}

// MustCreateListener creates a tcp listener at 127.0.0.1 with random port.
func MustCreateListener(t *testing.T) net.Listener {
t.Helper()

listener, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err)

t.Cleanup(func() {
listener.Close()
})
return listener
}
7 changes: 7 additions & 0 deletions integration/helpers/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -1017,6 +1017,8 @@ type ProxyConfig struct {
SSHAddr string
// WebAddr the address the web service should listen on
WebAddr string
// KubeAddr is the kube proxy address.
KubeAddr string
// ReverseTunnelAddr the address the reverse proxy service should listen on
ReverseTunnelAddr string
// Disable the web service
Expand Down Expand Up @@ -1281,17 +1283,21 @@ func (i *TeleInstance) NewUnauthenticatedClient(cfg ClientConfig) (tc *client.Te

var webProxyAddr string
var sshProxyAddr string
var kubeProxyAddr string

switch {
case cfg.Proxy != nil:
webProxyAddr = cfg.Proxy.WebAddr
sshProxyAddr = cfg.Proxy.SSHAddr
kubeProxyAddr = cfg.Proxy.KubeAddr
case cfg.ALBAddr != "":
webProxyAddr = cfg.ALBAddr
sshProxyAddr = cfg.ALBAddr
kubeProxyAddr = cfg.ALBAddr
default:
webProxyAddr = i.Web
sshProxyAddr = i.SSHProxy
kubeProxyAddr = i.Config.Proxy.Kube.ListenAddr.Addr
}

fwdAgentMode := client.ForwardAgentNo
Expand All @@ -1311,6 +1317,7 @@ func (i *TeleInstance) NewUnauthenticatedClient(cfg ClientConfig) (tc *client.Te
Labels: cfg.Labels,
WebProxyAddr: webProxyAddr,
SSHProxyAddr: sshProxyAddr,
KubeProxyAddr: kubeProxyAddr,
InteractiveCommand: cfg.Interactive,
TLSRoutingEnabled: i.IsSinglePortSetup,
TLSRoutingConnUpgradeRequired: cfg.ALBAddr != "",
Expand Down
78 changes: 78 additions & 0 deletions integration/helpers/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,23 @@
package helpers

import (
"context"
"crypto/tls"
"fmt"
"io"
"net"
"net/http"
"net/url"
"sync"
"testing"
"time"

"github.com/gravitational/trace"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/require"

"github.com/gravitational/teleport/api/fixtures"
"github.com/gravitational/teleport/lib/utils"
)

type ProxyHandler struct {
Expand Down Expand Up @@ -201,3 +209,73 @@ func MakeProxyAddr(user, pass, host string) string {
userPass := url.UserPassword(user, pass).String()
return fmt.Sprintf("%v@%v", userPass, host)
}

// MockAWSALBProxy is a mock proxy server that simulates an AWS application
// load balancer where ALPN is not supported. Note that this mock does not
// actually balance traffic.
type MockAWSALBProxy struct {
net.Listener
proxyAddr string
cert tls.Certificate
}

func (m *MockAWSALBProxy) serve(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
default:
}

conn, err := m.Accept()
if err != nil {
logrus.WithError(err).Debugf("Failed to accept conn.")
return
}

go func() {
defer conn.Close()

// Handshake with incoming client and drops ALPN.
downstreamConn := tls.Server(conn, &tls.Config{
Certificates: []tls.Certificate{m.cert},
})

// api.Client may try different connection methods. Just close the
// connection when something goes wrong.
if err := downstreamConn.HandshakeContext(ctx); err != nil {
logrus.WithError(err).Debugf("Failed to handshake.")
return
}

// Make a connection to the proxy server with ALPN protos.
upstreamConn, err := tls.Dial("tcp", m.proxyAddr, &tls.Config{
InsecureSkipVerify: true,
})
if err != nil {
logrus.WithError(err).Debugf("Failed to dial upstream.")
return
}
utils.ProxyConn(ctx, downstreamConn, upstreamConn)
}()
}
}

// MustStartMockALBProxy creates and starts a MockAWSALBProxy.
func MustStartMockALBProxy(t *testing.T, proxyAddr string) *MockAWSALBProxy {
t.Helper()

cert, err := tls.X509KeyPair([]byte(fixtures.TLSCACertPEM), []byte(fixtures.TLSCAKeyPEM))
require.NoError(t, err)

ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)

m := &MockAWSALBProxy{
proxyAddr: proxyAddr,
Listener: MustCreateListener(t),
cert: cert,
}
go m.serve(ctx)
return m
}
133 changes: 110 additions & 23 deletions integration/kube_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ import (

"github.com/gravitational/teleport"
"github.com/gravitational/teleport/api/breaker"
"github.com/gravitational/teleport/api/constants"
apidefaults "github.com/gravitational/teleport/api/defaults"
"github.com/gravitational/teleport/api/profile"
"github.com/gravitational/teleport/api/types"
Expand Down Expand Up @@ -1508,13 +1509,13 @@ func kubeExec(kubeConfig *rest.Config, args kubeExecArgs) error {
return executor.StreamWithContext(context.Background(), opts)
}

func kubeJoin(kubeConfig kube.ProxyConfig, tc *client.TeleportClient, meta types.SessionTracker) (*client.KubeSession, error) {
func kubeJoin(kubeConfig kube.ProxyConfig, tc *client.TeleportClient, meta types.SessionTracker, mode types.SessionParticipantMode) (*client.KubeSession, error) {
tlsConfig, err := kubeProxyTLSConfig(kubeConfig)
if err != nil {
return nil, trace.Wrap(err)
}

sess, err := client.NewKubeSession(context.TODO(), tc, meta, kubeConfig.T.Config.Proxy.Kube.ListenAddr.Addr, "", types.SessionPeerMode, tlsConfig)
sess, err := client.NewKubeSession(context.TODO(), tc, meta, tc.KubeProxyAddr, "", mode, tlsConfig)
if err != nil {
return nil, trace.Wrap(err)
}
Expand Down Expand Up @@ -1561,7 +1562,7 @@ func testKubeJoin(t *testing.T, suite *KubeSuite) {
Name: "foo",
Roles: []string{"kubemaster"},
Kinds: []string{string(types.KubernetesSessionKind)},
Modes: []string{string(types.SessionPeerMode)},
Modes: []string{string(types.SessionPeerMode), string(types.SessionObserverMode)},
}},
},
})
Expand Down Expand Up @@ -1628,45 +1629,131 @@ func testKubeJoin(t *testing.T, suite *KubeSuite) {

participantStdinR, participantStdinW := io.Pipe()
participantStdoutR, participantStdoutW := io.Pipe()
streams := make([]*client.KubeSession, 0, 3)
observerCaptures := make([]*bytes.Buffer, 0, 2)
albProxy := helpers.MustStartMockALBProxy(t, teleport.Config.Proxy.WebAddr.Addr)

t.Run("join peer by KubeProxyAddr", func(t *testing.T) {
tc, err := teleport.NewClient(helpers.ClientConfig{
Login: hostUsername,
Cluster: helpers.Site,
Host: Host,
})
require.NoError(t, err)

tc, err := teleport.NewClient(helpers.ClientConfig{
Login: hostUsername,
Cluster: helpers.Site,
Host: Host,
tc.Stdin = participantStdinR
tc.Stdout = participantStdoutW

stream, err := kubeJoin(kube.ProxyConfig{
T: teleport,
Username: participantUsername,
KubeUsers: kubeUsers,
KubeGroups: kubeGroups,
}, tc, session, types.SessionPeerMode)
require.NoError(t, err)
streams = append(streams, stream)
})
require.NoError(t, err)

tc.Stdin = participantStdinR
tc.Stdout = participantStdoutW
t.Run("join observer by WebProxyAddr", func(t *testing.T) {
stream, capture := kubeJoinByWebAddr(t, teleport, participantUsername, kubeUsers, kubeGroups)
streams = append(streams, stream)
observerCaptures = append(observerCaptures, capture)
})
t.Run("join observer with ALPN conn upgrade", func(t *testing.T) {
stream, capture := kubeJoinByALBAddr(t, teleport, participantUsername, kubeUsers, kubeGroups, albProxy.Addr().String())
streams = append(streams, stream)
observerCaptures = append(observerCaptures, capture)
})

stream, err := kubeJoin(kube.ProxyConfig{
T: teleport,
Username: participantUsername,
KubeUsers: kubeUsers,
KubeGroups: kubeGroups,
}, tc, session)
require.NoError(t, err)
defer stream.Close()
require.Len(t, observerCaptures, 2)
require.Len(t, streams, 3)
for _, stream := range streams {
defer stream.Close()
}

// We wait again for the second user to finish joining the session.
// We allow a bit of time to pass here to give the session manager time to recognize the
// new IO streams of the second client.
time.Sleep(time.Second * 5)

// sent a test message from the participant
participantStdinW.Write([]byte("\aecho hi2\n\r"))
participantStdinW.Write([]byte("\ahi from peer\n\r"))

// lets type "echo hi" followed by "enter" and then "exit" + "enter":
term.Type("\aecho hi\n\r")
term.Type("\ahi from term\n\r")

// Terminate the session after a moment to allow for the IO to reach the second client.
time.AfterFunc(5*time.Second, func() {
term.Type("\aexit\n\r\a")
participantStdoutW.Close()
})

participantOutput, err := io.ReadAll(participantStdoutR)
t.Run("verify output", func(t *testing.T) {
// Verify peer.
participantOutput, err := io.ReadAll(participantStdoutR)
require.NoError(t, err)
require.Contains(t, string(participantOutput), "hi from term")

// Verify original session.
require.Contains(t, out.String(), "hi from peer")

// Verify observers.
for _, capture := range observerCaptures {
require.Contains(t, capture.String(), "hi from peer")
require.Contains(t, capture.String(), "hi from term")
}
})
}

func kubeJoinByWebAddr(t *testing.T, teleport *helpers.TeleInstance, username string, kubeUsers, kubeGroups []string) (*client.KubeSession, *bytes.Buffer) {
t.Helper()

tc, err := teleport.NewClient(helpers.ClientConfig{
Login: username,
Cluster: helpers.Site,
Host: Host,
Proxy: &helpers.ProxyConfig{
WebAddr: teleport.Config.Proxy.WebAddr.Addr,
KubeAddr: teleport.Config.Proxy.WebAddr.Addr,
},
})
require.NoError(t, err)
require.Contains(t, string(participantOutput), "echo hi")
require.Contains(t, out.String(), "echo hi2")

buffer := new(bytes.Buffer)
tc.Stdout = buffer
return kubeJoinObserverWithSNISet(t, tc, teleport, kubeUsers, kubeGroups), buffer
}

func kubeJoinByALBAddr(t *testing.T, teleport *helpers.TeleInstance, username string, kubeUsers, kubeGroups []string, albAddr string) (*client.KubeSession, *bytes.Buffer) {
t.Helper()

tc, err := teleport.NewClient(helpers.ClientConfig{
Login: username,
Cluster: helpers.Site,
Host: Host,
ALBAddr: albAddr,
})
require.NoError(t, err)

buffer := new(bytes.Buffer)
tc.Stdout = buffer
return kubeJoinObserverWithSNISet(t, tc, teleport, kubeUsers, kubeGroups), buffer
}

func kubeJoinObserverWithSNISet(t *testing.T, tc *client.TeleportClient, teleport *helpers.TeleInstance, kubeUsers, kubeGroups []string) *client.KubeSession {
t.Helper()

sessions, err := teleport.Process.GetAuthServer().GetActiveSessionTrackers(context.Background())
require.NoError(t, err)
require.Greater(t, len(sessions), 0)

stream, err := kubeJoin(kube.ProxyConfig{
T: teleport,
Username: tc.Username,
KubeUsers: kubeUsers,
KubeGroups: kubeGroups,
CustomTLSServerName: constants.KubeTeleportProxyALPNPrefix + Host,
}, tc, sessions[0], types.SessionObserverMode)
require.NoError(t, err)
return stream
}
Loading