diff --git a/integration/helpers/instance.go b/integration/helpers/instance.go index 6da3e45daa79f..2f0d00f3d7e1e 100644 --- a/integration/helpers/instance.go +++ b/integration/helpers/instance.go @@ -66,6 +66,7 @@ import ( "github.com/gravitational/teleport/lib/service" "github.com/gravitational/teleport/lib/service/servicecfg" "github.com/gravitational/teleport/lib/services" + "github.com/gravitational/teleport/lib/session" "github.com/gravitational/teleport/lib/sshca" "github.com/gravitational/teleport/lib/sshutils" "github.com/gravitational/teleport/lib/tlsca" @@ -1658,6 +1659,58 @@ func (w *WebClient) SSH(termReq web.TerminalRequest) (*terminal.Stream, error) { return terminal.NewStream(context.Background(), terminal.StreamConfig{WS: ws}), nil } +func (w *WebClient) JoinKubernetesSession(id string, mode types.SessionParticipantMode) (*terminal.Stream, error) { + u := url.URL{ + Host: w.i.Web, + Scheme: client.WSS, + Path: fmt.Sprintf("/v1/webapi/sites/%v/kube/exec/ws", w.tc.SiteName), + } + + params := struct { + // Term is the initial PTY size. + Term session.TerminalParams `json:"term"` + // SessionID is a Teleport session ID to join as. + SessionID session.ID `json:"sid"` + // ParticipantMode is the mode that determines what you can do when you join an active session. + ParticipantMode types.SessionParticipantMode `json:"mode"` + }{ + SessionID: session.ID(id), + ParticipantMode: mode, + } + + data, err := json.Marshal(params) + if err != nil { + return nil, err + } + + q := u.Query() + q.Set("params", string(data)) + u.RawQuery = q.Encode() + + header := http.Header{} + header.Add("Origin", "http://localhost") + for _, cookie := range w.cookies { + header.Add("Cookie", cookie.String()) + } + + dialer := websocket.Dialer{ + TLSClientConfig: &tls.Config{ + InsecureSkipVerify: true, + }, + } + ws, resp, err := dialer.Dial(u.String(), header) + if err != nil { + return nil, trace.Wrap(err) + } + + if err := makeAuthReqOverWS(ws, w.token); err != nil { + return nil, trace.Wrap(err) + } + + defer resp.Body.Close() + return terminal.NewStream(context.Background(), terminal.StreamConfig{WS: ws}), nil +} + // AddClientCredentials adds authenticated credentials to a client. // (server CAs and signed session key). func (i *TeleInstance) AddClientCredentials(tc *client.TeleportClient, cfg ClientConfig) (*client.TeleportClient, error) { diff --git a/integration/kube_integration_test.go b/integration/kube_integration_test.go index a6854a8d81748..5062c891d131a 100644 --- a/integration/kube_integration_test.go +++ b/integration/kube_integration_test.go @@ -24,6 +24,7 @@ import ( "crypto/tls" "crypto/x509" "encoding/json" + "errors" "fmt" "io" "net" @@ -33,7 +34,6 @@ import ( "os/user" "strconv" "strings" - "sync" "testing" "time" @@ -47,7 +47,7 @@ import ( "golang.org/x/net/http2" "golang.org/x/sync/errgroup" v1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/errors" + kubeerrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/runtime" @@ -70,12 +70,14 @@ import ( "github.com/gravitational/teleport/api/breaker" "github.com/gravitational/teleport/api/constants" apidefaults "github.com/gravitational/teleport/api/defaults" + "github.com/gravitational/teleport/api/mfa" "github.com/gravitational/teleport/api/profile" "github.com/gravitational/teleport/api/types" "github.com/gravitational/teleport/entitlements" "github.com/gravitational/teleport/integration/helpers" "github.com/gravitational/teleport/integration/kube" "github.com/gravitational/teleport/lib" + "github.com/gravitational/teleport/lib/auth/authclient" "github.com/gravitational/teleport/lib/auth/testauthority" "github.com/gravitational/teleport/lib/client" "github.com/gravitational/teleport/lib/defaults" @@ -148,12 +150,12 @@ func newKubeSuite(t *testing.T) *KubeSuite { ns := newNamespace(testNamespace) _, err = suite.CoreV1().Namespaces().Create(context.Background(), ns, metav1.CreateOptions{}) if err != nil { - require.True(t, errors.IsAlreadyExists(err), "Failed to create namespace: %v:", err) + require.True(t, kubeerrors.IsAlreadyExists(err), "Failed to create namespace: %v:", err) } p := newPod(testNamespace, testPod) _, err = suite.CoreV1().Pods(testNamespace).Create(context.Background(), p, metav1.CreateOptions{}) if err != nil { - require.True(t, errors.IsAlreadyExists(err), "Failed to create test pod: %v", err) + require.True(t, kubeerrors.IsAlreadyExists(err), "Failed to create test pod: %v", err) } // Wait for pod to be running. require.Eventually(t, func() bool { @@ -187,6 +189,7 @@ func TestKube(t *testing.T) { t.Run("TrustedClustersSNI", suite.bind(testKubeTrustedClustersSNI)) t.Run("Disconnect", suite.bind(testKubeDisconnect)) t.Run("Join", suite.bind(testKubeJoin)) + t.Run("JoinWeb", suite.bind(testKubeJoinWeb)) t.Run("IPPinning", suite.bind(testIPPinning)) // ExecWithNoAuth tests that a user can get the pod and exec into it when // moderated session is not enforced. @@ -1565,12 +1568,18 @@ func testKubeEphemeralContainers(t *testing.T, suite *KubeSuite) { return trace.Wrap(err) } - stream, err := kubeJoin(kube.ProxyConfig{ - T: teleport, - Username: moderatorUser, - KubeUsers: kubeUsers, - KubeGroups: kubeGroups, - }, tc, session, types.SessionModeratorMode) + stream, err := kubeJoin( + ctx, + kube.ProxyConfig{ + T: teleport, + Username: moderatorUser, + KubeUsers: kubeUsers, + KubeGroups: kubeGroups, + }, + tc, + session, + types.SessionModeratorMode, + ) if err != nil { return trace.Wrap(err) } @@ -1619,7 +1628,7 @@ func waitForContainer(ctx context.Context, podClient corev1client.PodInterface, ev, err := watchtools.UntilWithSync(ctx, lw, &v1.Pod{}, nil, func(ev watch.Event) (bool, error) { switch ev.Type { case watch.Deleted: - return false, errors.NewNotFound(schema.GroupResource{Resource: "pods"}, "") + return false, kubeerrors.NewNotFound(schema.GroupResource{Resource: "pods"}, "") } p, ok := ev.Object.(*v1.Pod) @@ -2103,13 +2112,39 @@ func kubeExec(kubeConfig *rest.Config, mode execMode, args kubeExecArgs) error { return executor.StreamWithContext(context.Background(), opts) } -func kubeJoin(kubeConfig kube.ProxyConfig, tc *client.TeleportClient, meta types.SessionTracker, mode types.SessionParticipantMode) (*client.KubeSession, error) { +func kubeJoin(ctx context.Context, kubeConfig kube.ProxyConfig, tc *client.TeleportClient, meta types.SessionTracker, mode types.SessionParticipantMode) (*client.KubeSession, error) { tlsConfig, err := kubeProxyTLSConfig(kubeConfig) if err != nil { return nil, trace.Wrap(err) } - sess, err := client.NewKubeSession(context.TODO(), tc, meta, tc.KubeProxyAddr, "", mode, tlsConfig) + sess, err := client.NewKubeSession(ctx, + client.KubeSessionConfig{ + KubeProxyAddr: tc.Config.KubeProxyAddr, + WebProxyAddr: tc.Config.WebProxyAddr, + TLSRoutingConnUpgradeRequired: tc.Config.TLSRoutingConnUpgradeRequired, + EnableEscapeSequences: tc.Config.EnableEscapeSequences, + Tracker: meta, + TLSConfig: tlsConfig, + Mode: mode, + AuthClient: func(context.Context) (authclient.ClientI, error) { + clt, err := tc.ConnectToCluster(ctx) + if err != nil { + return nil, trace.Wrap(err) + } + + auth, err := clt.ConnectToCluster(ctx, meta.GetClusterName()) + if err != nil { + return nil, trace.Wrap(err) + } + + return authClientCloser{ClientI: auth, clusterClient: clt}, nil + }, + Ceremony: tc.NewMFAPrompt(mfa.WithQuiet()), + Stdin: tc.Config.Stdin, + Stdout: tc.Config.Stdout, + Stderr: tc.Config.Stderr, + }) if err != nil { return nil, trace.Wrap(err) } @@ -2117,6 +2152,15 @@ func kubeJoin(kubeConfig kube.ProxyConfig, tc *client.TeleportClient, meta types return sess, nil } +type authClientCloser struct { + authclient.ClientI + clusterClient *client.ClusterClient +} + +func (a authClientCloser) Close() error { + return trace.NewAggregate(a.ClientI.Close(), a.clusterClient.Close()) +} + // testKubeJoin tests that that joining an interactive exec session works. func testKubeJoin(t *testing.T, suite *KubeSuite) { tconf := suite.teleKubeConfig(Host) @@ -2132,7 +2176,9 @@ func testKubeJoin(t *testing.T, suite *KubeSuite) { // fooey hostUsername := suite.me.Username - participantUsername := suite.me.Username + "-participant" + peerUsername := suite.me.Username + "-peer" + observer1Username := suite.me.Username + "-observer1" + observer2Username := suite.me.Username + "-observer2" kubeGroups := []string{kube.TestImpersonationGroup} kubeUsers := []string{"alice@example.com"} role, err := types.NewRole("kubemaster", types.RoleSpecV6{ @@ -2163,7 +2209,9 @@ func testKubeJoin(t *testing.T, suite *KubeSuite) { }) require.NoError(t, err) teleport.AddUserWithRole(hostUsername, role) - teleport.AddUserWithRole(participantUsername, joinRole) + teleport.AddUserWithRole(peerUsername, joinRole) + teleport.AddUserWithRole(observer1Username, joinRole) + teleport.AddUserWithRole(observer2Username, joinRole) err = teleport.CreateEx(t, nil, tconf) require.NoError(t, err) @@ -2172,7 +2220,8 @@ func testKubeJoin(t *testing.T, suite *KubeSuite) { require.NoError(t, err) defer teleport.StopAll() - ctx := context.Background() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() // set up kube configuration using proxy proxyClient, proxyClientConfig, err := kube.ProxyClient(kube.ProxyConfig{ @@ -2211,30 +2260,32 @@ func testKubeJoin(t *testing.T, suite *KubeSuite) { // We need to wait for the exec request to be handled here for the session to be // created. Sadly though the k8s API doesn't give us much indication of when that is. var session types.SessionTracker - require.Eventually(t, func() bool { + require.EventuallyWithT(t, func(t *assert.CollectT) { // We need to wait for the session to be created here. We can't use the // session manager's WaitUntilExists method because it doesn't work for // kubernetes sessions. - sessions, err := teleport.Process.GetAuthServer().GetActiveSessionTrackers(context.Background()) - if err != nil || len(sessions) == 0 { - return false + sessions, err := teleport.Process.GetAuthServer().GetActiveSessionTrackers(ctx) + assert.NoError(t, err) + if assert.Len(t, sessions, 1) { + session = sessions[0] } - - session = sessions[0] - return true }, 10*time.Second, time.Second) participantStdinR, participantStdinW, err := os.Pipe() require.NoError(t, err) participantStdoutR, participantStdoutW, err := os.Pipe() require.NoError(t, err) - streamsMu := &sync.Mutex{} - streams := make([]*client.KubeSession, 0, 3) - observerCaptures := make([]*bytes.Buffer, 0, 2) + + observerCaptures := make([]*bytes.Buffer, 2) albProxy := helpers.MustStartMockALBProxy(t, teleport.Config.Proxy.WebAddr.Addr) // join peer by KubeProxyAddr group.Go(func() error { + defer func() { + // close participant stdout so that we can read it after till EOF + participantStdoutW.Close() + }() + tc, err := teleport.NewClient(helpers.ClientConfig{ Login: hostUsername, Cluster: helpers.Site, @@ -2247,52 +2298,62 @@ func testKubeJoin(t *testing.T, suite *KubeSuite) { tc.Stdin = participantStdinR tc.Stdout = participantStdoutW - stream, err := kubeJoin(kube.ProxyConfig{ - T: teleport, - Username: participantUsername, - KubeUsers: kubeUsers, - KubeGroups: kubeGroups, - }, tc, session, types.SessionPeerMode) + stream, err := kubeJoin( + ctx, + kube.ProxyConfig{ + T: teleport, + Username: peerUsername, + KubeUsers: kubeUsers, + KubeGroups: kubeGroups, + }, + tc, + session, + types.SessionPeerMode, + ) if err != nil { return trace.Wrap(err) } - streamsMu.Lock() - streams = append(streams, stream) - streamsMu.Unlock() + + t.Cleanup(func() { + _ = stream.Close() + }) stream.Wait() - // close participant stdout so that we can read it after till EOF - participantStdoutW.Close() + return nil }) // join observer by WebProxyAddr group.Go(func() error { - stream, capture := kubeJoinByWebAddr(t, teleport, participantUsername, kubeUsers, kubeGroups) - streamsMu.Lock() - streams = append(streams, stream) - observerCaptures = append(observerCaptures, capture) - streamsMu.Unlock() + stream, capture := kubeJoinByWebAddr(ctx, t, teleport, observer1Username, kubeUsers, kubeGroups) + t.Cleanup(func() { + _ = stream.Close() + }) + + observerCaptures[0] = capture stream.Wait() return nil }) // join observer with ALPN conn upgrade group.Go(func() error { - stream, capture := kubeJoinByALBAddr(t, teleport, participantUsername, kubeUsers, kubeGroups, albProxy.Addr().String()) - streamsMu.Lock() - streams = append(streams, stream) - observerCaptures = append(observerCaptures, capture) - streamsMu.Unlock() + stream, capture := kubeJoinByALBAddr(ctx, t, teleport, observer2Username, kubeUsers, kubeGroups, albProxy.Addr().String()) + t.Cleanup(func() { + _ = stream.Close() + }) + + observerCaptures[1] = capture stream.Wait() return nil }) - // We wait again for the second user to finish joining the session. - // We allow a bit of time to pass here to give the session manager time to recognize the - // new IO streams of the second client. - time.Sleep(time.Second * 5) + // Wait for all users to finish joining the session. + require.EventuallyWithT(t, func(t *assert.CollectT) { + session, err := teleport.Process.GetAuthServer().GetSessionTracker(ctx, session.GetName()) + assert.NoError(t, err) + assert.Len(t, session.GetParticipants(), 4) + }, 30*time.Second, 500*time.Millisecond) - // sent a test message from the participant + // send a test message from the participant participantStdinW.Write([]byte("\ahi from peer\n\r")) // lets type "echo hi" followed by "enter" and then "exit" + "enter": @@ -2317,12 +2378,235 @@ func testKubeJoin(t *testing.T, suite *KubeSuite) { // Verify observers. for _, capture := range observerCaptures { - require.Contains(t, capture.String(), "hi from peer") - require.Contains(t, capture.String(), "hi from term") + assert.Contains(t, capture.String(), "hi from peer") + assert.Contains(t, capture.String(), "hi from term") } } -func kubeJoinByWebAddr(t *testing.T, teleport *helpers.TeleInstance, username string, kubeUsers, kubeGroups []string) (*client.KubeSession, *bytes.Buffer) { +// testKubeJoinWeb tests that joining an interactive exec session +// via the web ui works. +func testKubeJoinWeb(t *testing.T, suite *KubeSuite) { + tconf := suite.teleKubeConfig(Host) + + teleport := helpers.NewInstance(t, helpers.InstanceConfig{ + ClusterName: helpers.Site, + HostID: helpers.HostID, + NodeName: Host, + Priv: suite.priv, + Pub: suite.pub, + Log: suite.log, + }) + + hostUsername := suite.me.Username + peerUsername := suite.me.Username + "-peer" + observerUsername := suite.me.Username + "-observer" + moderatorUsername := suite.me.Username + "-moderator" + kubeGroups := []string{"system:masters"} + kubeUsers := []string{"alice@example.com"} + role, err := types.NewRole("kubemaster", types.RoleSpecV6{ + Allow: types.RoleConditions{ + Logins: []string{hostUsername}, + KubeGroups: kubeGroups, + KubeUsers: kubeUsers, + KubernetesLabels: types.Labels{ + types.Wildcard: []string{types.Wildcard}, + }, + KubernetesResources: []types.KubernetesResource{ + { + Kind: types.KindKubePod, Name: types.Wildcard, Namespace: types.Wildcard, Verbs: []string{types.Wildcard}, + }, + }, + }, + }) + require.NoError(t, err) + joinRole, err := types.NewRole("participant", types.RoleSpecV6{ + Allow: types.RoleConditions{ + JoinSessions: []*types.SessionJoinPolicy{{ + Name: "foo", + Roles: []string{"kubemaster"}, + Kinds: []string{string(types.KubernetesSessionKind)}, + Modes: []string{string(types.SessionPeerMode), string(types.SessionObserverMode), string(types.SessionModeratorMode)}, + }}, + }, + }) + require.NoError(t, err) + teleport.AddUserWithRole(hostUsername, role) + teleport.AddUserWithRole(peerUsername, joinRole) + teleport.AddUserWithRole(observerUsername, joinRole) + teleport.AddUserWithRole(moderatorUsername, joinRole) + + ap := types.DefaultAuthPreference() + ap.SetSecondFactor(constants.SecondFactorOff) + tconf.Auth.Preference = ap + src := types.DefaultSessionRecordingConfig() + src.SetMode(types.RecordAtNodeSync) + tconf.Auth.SessionRecordingConfig = src + + err = teleport.CreateEx(t, nil, tconf) + require.NoError(t, err) + + err = teleport.Start() + require.NoError(t, err) + defer teleport.StopAll() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // set up kube configuration using proxy + proxyClient, proxyClientConfig, err := kube.ProxyClient(kube.ProxyConfig{ + T: teleport, + Username: hostUsername, + KubeUsers: kubeUsers, + KubeGroups: kubeGroups, + }) + require.NoError(t, err) + + // try get request to fetch available pods + pod, err := proxyClient.CoreV1().Pods(testNamespace).Get(ctx, testPod, metav1.GetOptions{}) + require.NoError(t, err) + + term := NewTerminal(250) + peerTerm := NewTerminal(250) + + var out bytes.Buffer + var group errgroup.Group + + // Start the main session. + group.Go(func() error { + err := kubeExec(proxyClientConfig, execInContainer, kubeExecArgs{ + podName: pod.Name, + podNamespace: pod.Namespace, + container: pod.Spec.Containers[0].Name, + command: []string{"/bin/sh"}, + stdout: &out, + tty: true, + stdin: term, + }) + return trace.Wrap(err) + }) + + // We need to wait for the exec request to be handled here for the session to be + // created. Sadly though the k8s API doesn't give us much indication of when that is. + var tracker types.SessionTracker + require.EventuallyWithT(t, func(t *assert.CollectT) { + // We need to wait for the session to be created here. We can't use the + // session manager's WaitUntilExists method because it doesn't work for + // kubernetes sessions. + sessions, err := teleport.Process.GetAuthServer().GetActiveSessionTrackers(ctx) + assert.NoError(t, err) + if assert.Len(t, sessions, 1) { + tracker = sessions[0] + } + }, 10*time.Second, time.Second) + + var observerOut, peerOut, moderatorOut bytes.Buffer + // join an observer + group.Go(func() error { + stream := kubeJoinByWebAPI(ctx, t, teleport, observerUsername, kubeUsers, kubeGroups, types.SessionObserverMode) + + t.Cleanup(func() { _ = stream.Close() }) + + if _, err := io.Copy(&observerOut, stream); err != nil && !errors.Is(err, io.EOF) { + return trace.Wrap(err) + } + + return trace.Wrap(peerTerm.Close()) + }) + + // join a moderator + group.Go(func() error { + stream := kubeJoinByWebAPI(ctx, t, teleport, moderatorUsername, kubeUsers, kubeGroups, types.SessionModeratorMode) + + t.Cleanup(func() { _ = stream.Close() }) + + if _, err := io.Copy(&moderatorOut, stream); err != nil && !errors.Is(err, io.EOF) { + return trace.Wrap(err) + } + + return trace.Wrap(peerTerm.Close()) + }) + + // join a peer + group.Go(func() error { + stream := kubeJoinByWebAPI(ctx, t, teleport, peerUsername, kubeUsers, kubeGroups, types.SessionPeerMode) + + t.Cleanup(func() { _ = stream.Close() }) + + group.Go(func() error { + if _, err := io.Copy(stream, peerTerm); err != nil && !errors.Is(err, io.EOF) { + return trace.Wrap(err) + } + + return nil + }) + + if _, err := io.Copy(&peerOut, stream); err != nil && !errors.Is(err, io.EOF) { + return trace.Wrap(err) + } + + return nil + }) + + // Wait for all users to finish joining the session. + require.EventuallyWithT(t, func(t *assert.CollectT) { + session, err := teleport.Process.GetAuthServer().GetSessionTracker(ctx, tracker.GetName()) + if !assert.NoError(t, err) { + return + } + assert.Len(t, session.GetParticipants(), 4) + }, 30*time.Second, 500*time.Millisecond) + + // enter a command from the session creator + term.Type("\ahi from term\n\r") + + // send a test message from the peer + peerTerm.Type("\ahi from peer\n\r") + + // Terminate the session after a moment to allow for the IO to reach the clients. + time.AfterFunc(5*time.Second, func() { + // send exit command to close the session + term.Type("exit 0\n\r\a") + }) + + // wait for all clients to finish + require.NoError(t, group.Wait()) + + for _, output := range []string{out.String(), observerOut.String(), peerOut.String(), moderatorOut.String()} { + require.Contains(t, output, "hi from term") + require.Contains(t, output, "hi from peer") + } +} + +func kubeJoinByWebAPI(ctx context.Context, t *testing.T, teleport *helpers.TeleInstance, username string, kubeUsers, kubeGroups []string, mode types.SessionParticipantMode) *terminal.Stream { + t.Helper() + + sessions, err := teleport.Process.GetAuthServer().GetActiveSessionTrackers(ctx) + require.NoError(t, err) + require.NotEmpty(t, sessions) + + password := uuid.NewString() + err = teleport.Process.GetAuthServer().UpsertPassword(username, []byte(password)) + require.NoError(t, err) + + wc, err := teleport.NewWebClient(helpers.ClientConfig{ + Login: username, + Password: password, + Cluster: helpers.Site, + Host: Host, + Proxy: &helpers.ProxyConfig{ + WebAddr: teleport.Config.Proxy.WebAddr.Addr, + KubeAddr: teleport.Config.Proxy.WebAddr.Addr, + }, + }) + require.NoError(t, err) + + stream, err := wc.JoinKubernetesSession(sessions[0].GetSessionID(), mode) + require.NoError(t, err) + + return stream +} + +func kubeJoinByWebAddr(ctx context.Context, t *testing.T, teleport *helpers.TeleInstance, username string, kubeUsers, kubeGroups []string) (*client.KubeSession, *bytes.Buffer) { t.Helper() tc, err := teleport.NewClient(helpers.ClientConfig{ @@ -2336,12 +2620,20 @@ func kubeJoinByWebAddr(t *testing.T, teleport *helpers.TeleInstance, username st }) require.NoError(t, err) - buffer := new(bytes.Buffer) - tc.Stdout = buffer - return kubeJoinObserverWithSNISet(t, tc, teleport, kubeUsers, kubeGroups), buffer + stdinR, stdinW, err := os.Pipe() + require.NoError(t, err) + + t.Cleanup(func() { + _ = stdinW.Close() + }) + + var out bytes.Buffer + tc.Stdout = &out + tc.Stdin = stdinR + return kubeJoinObserverWithSNISet(ctx, t, tc, teleport, kubeUsers, kubeGroups), &out } -func kubeJoinByALBAddr(t *testing.T, teleport *helpers.TeleInstance, username string, kubeUsers, kubeGroups []string, albAddr string) (*client.KubeSession, *bytes.Buffer) { +func kubeJoinByALBAddr(ctx context.Context, t *testing.T, teleport *helpers.TeleInstance, username string, kubeUsers, kubeGroups []string, albAddr string) (*client.KubeSession, *bytes.Buffer) { t.Helper() tc, err := teleport.NewClient(helpers.ClientConfig{ @@ -2352,25 +2644,39 @@ func kubeJoinByALBAddr(t *testing.T, teleport *helpers.TeleInstance, username st }) require.NoError(t, err) - buffer := new(bytes.Buffer) - tc.Stdout = buffer - return kubeJoinObserverWithSNISet(t, tc, teleport, kubeUsers, kubeGroups), buffer + stdinR, stdinW, err := os.Pipe() + require.NoError(t, err) + + t.Cleanup(func() { + _ = stdinW.Close() + }) + + var out bytes.Buffer + tc.Stdout = &out + tc.Stdin = stdinR + return kubeJoinObserverWithSNISet(ctx, t, tc, teleport, kubeUsers, kubeGroups), &out } -func kubeJoinObserverWithSNISet(t *testing.T, tc *client.TeleportClient, teleport *helpers.TeleInstance, kubeUsers, kubeGroups []string) *client.KubeSession { +func kubeJoinObserverWithSNISet(ctx context.Context, t *testing.T, tc *client.TeleportClient, teleport *helpers.TeleInstance, kubeUsers, kubeGroups []string) *client.KubeSession { t.Helper() - sessions, err := teleport.Process.GetAuthServer().GetActiveSessionTrackers(context.Background()) + sessions, err := teleport.Process.GetAuthServer().GetActiveSessionTrackers(ctx) require.NoError(t, err) require.NotEmpty(t, sessions) - stream, err := kubeJoin(kube.ProxyConfig{ - T: teleport, - Username: tc.Username, - KubeUsers: kubeUsers, - KubeGroups: kubeGroups, - CustomTLSServerName: constants.KubeTeleportProxyALPNPrefix + Host, - }, tc, sessions[0], types.SessionObserverMode) + stream, err := kubeJoin( + ctx, + kube.ProxyConfig{ + T: teleport, + Username: tc.Username, + KubeUsers: kubeUsers, + KubeGroups: kubeGroups, + CustomTLSServerName: constants.KubeTeleportProxyALPNPrefix + Host, + }, + tc, + sessions[0], + types.SessionObserverMode, + ) require.NoError(t, err) return stream } diff --git a/lib/client/kubesession.go b/lib/client/kubesession.go index 1182364d6a42d..d3fa3a1e996e7 100644 --- a/lib/client/kubesession.go +++ b/lib/client/kubesession.go @@ -34,6 +34,7 @@ import ( "github.com/gravitational/teleport/api/defaults" "github.com/gravitational/teleport/api/mfa" "github.com/gravitational/teleport/api/types" + "github.com/gravitational/teleport/lib/auth/authclient" "github.com/gravitational/teleport/lib/client/terminal" "github.com/gravitational/teleport/lib/kube/proxy/streamproto" "github.com/gravitational/teleport/lib/utils" @@ -49,18 +50,31 @@ type KubeSession struct { wg sync.WaitGroup } +// KubeSessionConfig contains configuration parameters used to join +// an existing Kubernetes session by [NewKubeSession]. +type KubeSessionConfig struct { + KubeProxyAddr string + WebProxyAddr string + TLSRoutingConnUpgradeRequired bool + EnableEscapeSequences bool + Tracker types.SessionTracker + TLSConfig *tls.Config + Mode types.SessionParticipantMode + AuthClient func(context.Context) (authclient.ClientI, error) + Ceremony mfa.Prompt + Stdin io.Reader + Stdout io.Writer + Stderr io.Writer +} + // NewKubeSession joins a live kubernetes session. -func NewKubeSession(ctx context.Context, tc *TeleportClient, meta types.SessionTracker, kubeAddr string, tlsServer string, mode types.SessionParticipantMode, tlsConfig *tls.Config) (*KubeSession, error) { +func NewKubeSession(ctx context.Context, cfg KubeSessionConfig) (*KubeSession, error) { ctx, cancel := context.WithCancel(ctx) - joinEndpoint := "wss://" + kubeAddr + "/api/v1/teleport/join/" + meta.GetSessionID() - - if tlsServer != "" { - tlsConfig.ServerName = tlsServer - } + joinEndpoint := "wss://" + cfg.KubeProxyAddr + "/api/v1/teleport/join/" + cfg.Tracker.GetSessionID() dialer := &websocket.Dialer{ - NetDialContext: kubeSessionNetDialer(ctx, tc, kubeAddr).DialContext, - TLSClientConfig: tlsConfig, + NetDialContext: kubeSessionNetDialer(ctx, cfg).DialContext, + TLSClientConfig: cfg.TLSConfig, } ws, resp, err := dialer.DialContext(ctx, joinEndpoint, nil) @@ -87,14 +101,27 @@ func NewKubeSession(ctx context.Context, tc *TeleportClient, meta types.SessionT return nil, trace.BadParameter("failed to decode remote error: %v", string(body)) } + defer func() { + if err == nil { + return + } - stream, err := streamproto.NewSessionStream(ws, streamproto.ClientHandshake{Mode: mode}) + if err := ws.Close(); err != nil { + log.Debugf("Close stream in response to context termination %v", err) + } + }() + + stream, err := streamproto.NewSessionStream(ws, streamproto.ClientHandshake{Mode: cfg.Mode}) if err != nil { cancel() return nil, trace.Wrap(err) } - term, err := terminal.New(tc.Stdin, tc.Stdout, tc.Stderr) + context.AfterFunc(ctx, func() { + _ = stream.Close() + }) + + term, err := terminal.New(cfg.Stdin, cfg.Stdout, cfg.Stderr) if err != nil { cancel() return nil, trace.Wrap(err) @@ -109,28 +136,27 @@ func NewKubeSession(ctx context.Context, tc *TeleportClient, meta types.SessionT stdout := utils.NewSyncWriter(term.Stdout()) go handleOutgoingResizeEvents(ctx, stream, term) - go handleIncomingResizeEvents(stream, term) + go handleIncomingResizeEvents(ctx, stream, term) - s := &KubeSession{stream, term, ctx, cancel, meta, sync.WaitGroup{}} - err = s.handleMFA(ctx, tc, mode, stdout) - if err != nil { + s := &KubeSession{stream, term, ctx, cancel, cfg.Tracker, sync.WaitGroup{}} + if err := s.handleMFA(ctx, cfg.AuthClient, cfg.Ceremony, cfg.Mode, stdout); err != nil { return nil, trace.Wrap(err) } - s.pipeInOut(stdout, tc.EnableEscapeSequences, mode) + s.pipeInOut(ctx, stdout, cfg.EnableEscapeSequences, cfg.Mode) return s, nil } -func kubeSessionNetDialer(ctx context.Context, tc *TeleportClient, kubeAddr string) client.ContextDialer { +func kubeSessionNetDialer(ctx context.Context, cfg KubeSessionConfig) client.ContextDialer { dialOpts := []client.DialOption{ - client.WithInsecureSkipVerify(tc.InsecureSkipVerify), + client.WithInsecureSkipVerify(cfg.TLSConfig.InsecureSkipVerify), } // Add options for ALPN connection upgrade only if kube is served at Proxy // web address. - if tc.WebProxyAddr == kubeAddr && tc.TLSRoutingConnUpgradeRequired { + if cfg.WebProxyAddr == cfg.KubeProxyAddr && cfg.TLSRoutingConnUpgradeRequired { dialOpts = append(dialOpts, - client.WithALPNConnUpgrade(tc.TLSRoutingConnUpgradeRequired), + client.WithALPNConnUpgrade(cfg.TLSRoutingConnUpgradeRequired), client.WithALPNConnUpgradePing(true), // Use Ping protocol for long-lived connections. ) } @@ -158,47 +184,44 @@ func handleOutgoingResizeEvents(ctx context.Context, stream *streamproto.Session } } -func handleIncomingResizeEvents(stream *streamproto.SessionStream, term *terminal.Terminal) { +func handleIncomingResizeEvents(ctx context.Context, stream *streamproto.SessionStream, term *terminal.Terminal) { events := term.Subscribe() for { - event, more := <-events - _, ok := event.(terminal.ResizeEvent) - if ok { - w, h, err := term.Size() - if err != nil { - fmt.Printf("Error attempting to fetch terminal size: %v\n\r", err) - } + select { + case <-ctx.Done(): + return + case event, more := <-events: + _, ok := event.(terminal.ResizeEvent) + if ok { + w, h, err := term.Size() + if err != nil { + fmt.Printf("Error attempting to fetch terminal size: %v\n\r", err) + } - size := remotecommand.TerminalSize{Width: uint16(w), Height: uint16(h)} - err = stream.Resize(&size) - if err != nil { - fmt.Printf("Error attempting to resize terminal: %v\n\r", err) + size := remotecommand.TerminalSize{Width: uint16(w), Height: uint16(h)} + if err := stream.Resize(&size); err != nil { + fmt.Printf("Error attempting to resize terminal: %v\n\r", err) + } } - } - if !more { - break + if !more { + return + } } } } -func (s *KubeSession) handleMFA(ctx context.Context, tc *TeleportClient, mode types.SessionParticipantMode, stdout io.Writer) error { +func (s *KubeSession) handleMFA(ctx context.Context, authFn func(context.Context) (authclient.ClientI, error), ceremony mfa.Prompt, mode types.SessionParticipantMode, stdout io.Writer) error { if s.stream.MFARequired && mode == types.SessionModeratorMode { - clt, err := tc.ConnectToCluster(ctx) - if err != nil { - return trace.Wrap(err) - } - - auth, err := clt.ConnectToCluster(ctx, s.meta.GetClusterName()) + auth, err := authFn(ctx) if err != nil { return trace.Wrap(err) } go func() { - RunPresenceTask(ctx, stdout, auth, s.meta.GetSessionID(), tc.NewMFAPrompt(mfa.WithQuiet())) - auth.Close() - clt.Close() + defer auth.Close() + RunPresenceTask(ctx, stdout, auth, s.meta.GetSessionID(), ceremony) }() } @@ -206,14 +229,15 @@ func (s *KubeSession) handleMFA(ctx context.Context, tc *TeleportClient, mode ty } // pipeInOut starts background tasks that copy input to and from the terminal. -func (s *KubeSession) pipeInOut(stdout io.Writer, enableEscapeSequences bool, mode types.SessionParticipantMode) { +func (s *KubeSession) pipeInOut(ctx context.Context, stdout io.Writer, enableEscapeSequences bool, mode types.SessionParticipantMode) { // wait for the session to copy everything s.wg.Add(1) go func() { - defer s.wg.Done() - defer s.cancel() - _, err := io.Copy(stdout, s.stream) - if err != nil { + defer func() { + s.wg.Done() + s.cancel() + }() + if _, err := io.Copy(stdout, s.stream); err != nil { fmt.Printf("Error while reading remote stream: %v\n\r", err.Error()) } }() @@ -226,8 +250,7 @@ func (s *KubeSession) pipeInOut(stdout io.Writer, enableEscapeSequences bool, mo handlePeerControls(s.term, enableEscapeSequences, s.stream) default: handleNonPeerControls(mode, s.term, func() { - err := s.stream.ForceTerminate() - if err != nil { + if err := s.stream.ForceTerminate(); err != nil { log.Debugf("Error sending force termination request: %v", err) fmt.Print("\n\rError while sending force termination request\n\r") } diff --git a/lib/web/apiserver.go b/lib/web/apiserver.go index e4ed3b8c48a9f..bab9499b827c2 100644 --- a/lib/web/apiserver.go +++ b/lib/web/apiserver.go @@ -3654,6 +3654,10 @@ func (h *Handler) siteNodeConnect( type podConnectParams struct { // Term is the initial PTY size. Term session.TerminalParams `json:"term"` + // SessionID is a Teleport session ID to join as. + SessionID session.ID `json:"sid"` + // ParticipantMode is the mode that determines what you can do when you join an active session. + ParticipantMode types.SessionParticipantMode `json:"mode"` } func (h *Handler) podConnect( @@ -3673,6 +3677,20 @@ func (h *Handler) podConnect( return nil, trace.Wrap(err) } + // If a session is provided, then join an existing session + // instead of creating a new one. + if !params.SessionID.IsZero() { + return nil, trace.Wrap(h.joinKubernetesSession( + r.Context(), + params.SessionID.String(), + params.ParticipantMode, + sctx, + site, + ws, + )) + } + + // Wait for the user to supply the pod information. execReq, err := readPodExecRequestFromWS(ws) if err != nil { if errors.Is(err, io.EOF) || errors.Is(err, net.ErrClosed) || terminal.IsOKWebsocketCloseError(trace.Unwrap(err)) { @@ -3691,26 +3709,11 @@ func (h *Handler) podConnect( return nil, trace.Wrap(err) } - clt, err := sctx.GetUserClient(r.Context(), site) - if err != nil { - return nil, trace.Wrap(err) - } - - clusterName := site.GetName() - - accessChecker, err := sctx.GetUserAccessChecker() - if err != nil { - return session.Session{}, trace.Wrap(err) - } - policySets := accessChecker.SessionPolicySets() - accessEvaluator := auth.NewSessionAccessEvaluator(policySets, types.KubernetesSessionKind, sctx.GetUser()) - sess := session.Session{ Kind: types.KubernetesSessionKind, Login: "root", - ClusterName: clusterName, + ClusterName: site.GetName(), KubernetesClusterName: execReq.KubeCluster, - Moderated: accessEvaluator.IsModerated(), ID: session.NewID(), Created: h.clock.Now().UTC(), LastActive: h.clock.Now().UTC(), @@ -3732,8 +3735,6 @@ func (h *Handler) podConnect( return nil, trace.Wrap(err) } - keepAliveInterval := netConfig.GetKeepAliveInterval() - serverAddr, tlsServerName, err := h.getKubeExecClusterData(netConfig) if err != nil { return nil, trace.Wrap(err) @@ -3747,13 +3748,18 @@ func (h *Handler) podConnect( return nil, trace.Wrap(err) } - ph := podHandler{ + clt, err := sctx.GetUserClient(r.Context(), site) + if err != nil { + return nil, trace.Wrap(err) + } + + ph := podExecHandler{ req: execReq, sess: sess, sctx: sctx, teleportCluster: site.GetName(), ws: ws, - keepAliveInterval: keepAliveInterval, + keepAliveInterval: netConfig.GetKeepAliveInterval(), log: h.log.WithField(teleport.ComponentKey, "pod"), userClient: clt, localCA: hostCA, diff --git a/lib/web/kube.go b/lib/web/kube.go index 6df327358b3bc..dc4643b77e887 100644 --- a/lib/web/kube.go +++ b/lib/web/kube.go @@ -21,6 +21,8 @@ package web import ( "bytes" "context" + "crypto/tls" + "crypto/x509" "encoding/json" "errors" "net/http" @@ -29,6 +31,7 @@ import ( "time" "github.com/gogo/protobuf/proto" + gogoproto "github.com/gogo/protobuf/proto" "github.com/gorilla/websocket" "github.com/gravitational/trace" "github.com/sirupsen/logrus" @@ -46,13 +49,14 @@ import ( "github.com/gravitational/teleport/lib/auth/authclient" "github.com/gravitational/teleport/lib/client" "github.com/gravitational/teleport/lib/defaults" + "github.com/gravitational/teleport/lib/reversetunnelclient" "github.com/gravitational/teleport/lib/services" "github.com/gravitational/teleport/lib/session" "github.com/gravitational/teleport/lib/web/terminal" ) -// podHandler connects Kube exec session and web-based terminal via a websocket. -type podHandler struct { +// podExecHandler connects Kube exec session and web-based terminal via a websocket. +type podExecHandler struct { teleportCluster string configTLSServerName string configServerAddr string @@ -120,7 +124,7 @@ func (r *PodExecRequest) Validate() error { // ServeHTTP sends session metadata to web UI to signal beginning of the session, then // handles Kube exec request and connects it to web based terminal input/output. -func (p *podHandler) ServeHTTP(_ http.ResponseWriter, r *http.Request) { +func (p *podExecHandler) ServeHTTP(_ http.ResponseWriter, r *http.Request) { // Allow closing websocket if the user logs out before exiting // the session. p.sctx.AddClosers(p) @@ -128,7 +132,7 @@ func (p *podHandler) ServeHTTP(_ http.ResponseWriter, r *http.Request) { sessionMetadataResponse, err := json.Marshal(siteSessionGenerateResponse{Session: p.sess}) if err != nil { - p.sendAndLogError(err) + p.sendErrorMessage(err) return } @@ -140,28 +144,26 @@ func (p *podHandler) ServeHTTP(_ http.ResponseWriter, r *http.Request) { envelopeBytes, err := proto.Marshal(envelope) if err != nil { - p.sendAndLogError(err) + p.sendErrorMessage(err) return } err = p.ws.WriteMessage(websocket.BinaryMessage, envelopeBytes) if err != nil { - p.sendAndLogError(err) + p.sendErrorMessage(err) return } if err := p.handler(r); err != nil { - p.sendAndLogError(err) + p.sendErrorMessage(err) } } -func (p *podHandler) Close() error { +func (p *podExecHandler) Close() error { return trace.Wrap(p.ws.Close()) } -func (p *podHandler) sendAndLogError(err error) { - p.log.Error(err) - +func (p *podExecHandler) sendErrorMessage(err error) { if p.closedByClient.Load() { return } @@ -182,7 +184,7 @@ func (p *podHandler) sendAndLogError(err error) { } } -func (p *podHandler) handler(r *http.Request) error { +func (p *podExecHandler) handler(r *http.Request) error { p.log.Debug("Creating websocket stream for a kube exec request") // Create a context for signaling when the terminal session is over and @@ -329,7 +331,7 @@ func (p *podHandler) handler(r *http.Request) error { return nil } -func (p *podHandler) handleResize(termSizeQueue *termSizeQueue) func(context.Context, terminal.Envelope) { +func (p *podExecHandler) handleResize(termSizeQueue *termSizeQueue) func(context.Context, terminal.Envelope) { return func(ctx context.Context, envelope terminal.Envelope) { var e map[string]any if err := json.Unmarshal([]byte(envelope.Payload), &e); err != nil { @@ -406,3 +408,188 @@ func createKubeRestConfig(serverAddr, tlsServerName string, ca types.CertAuthori }, }, nil } + +func (h *Handler) joinKubernetesSession( + ctx context.Context, + sessionID string, + mode types.SessionParticipantMode, + sctx *SessionContext, + site reversetunnelclient.RemoteSite, + ws *websocket.Conn, +) error { + h.logger.InfoContext(ctx, "Attempting to join kubernetes existing session", + "session_id", sessionID, + "mode", mode, + "user", sctx.GetUser(), + ) + + if _, err := session.ParseID(sessionID); err != nil { + return trace.Wrap(err) + } + + clt, err := sctx.GetUserClient(ctx, site) + if err != nil { + return trace.Wrap(err) + } + + tracker, err := clt.GetSessionTracker(ctx, sessionID) + if err != nil { + return trace.Wrap(err) + } + + if tracker.GetSessionKind() != types.KubernetesSessionKind || tracker.GetState() == types.SessionState_SessionStateTerminated { + return trace.NotFound("Kubernetes session %v not found", sessionID) + } + + sessionMetadataResponse, err := json.Marshal(siteSessionGenerateResponse{Session: session.Session{ + Kind: types.KubernetesSessionKind, + ID: session.ID(tracker.GetName()), + Login: tracker.GetLogin(), + KubernetesClusterName: tracker.GetKubeCluster(), + ServerHostname: tracker.GetHostname(), + }}) + if err != nil { + return trace.Wrap(err) + } + + stream := terminal.NewStream(ctx, terminal.StreamConfig{ + WS: ws, + Logger: h.log, + // Disable all out of band handling of requests + Handlers: map[string]terminal.WSHandlerFunc{ + defaults.WebsocketResize: func(ctx context.Context, envelope terminal.Envelope) {}, + defaults.WebsocketFileTransferRequest: func(ctx context.Context, envelope terminal.Envelope) {}, + defaults.WebsocketFileTransferDecision: func(ctx context.Context, envelope terminal.Envelope) {}, + }, + }) + + envelopeBytes, err := gogoproto.Marshal(&terminal.Envelope{ + Version: defaults.WebsocketVersion, + Type: defaults.WebsocketSessionMetadata, + Payload: string(sessionMetadataResponse), + }) + if err != nil { + return trace.Wrap(err) + } + + if err := stream.WriteMessage(websocket.BinaryMessage, envelopeBytes); err != nil { + return trace.Wrap(err) + } + + authAccessPoint, err := site.CachingAccessPoint() + if err != nil { + return trace.Wrap(err) + } + + netConfig, err := authAccessPoint.GetClusterNetworkingConfig(ctx) + if err != nil { + return trace.Wrap(err) + } + + kubeAddr, tlsServerName, err := h.getKubeExecClusterData(netConfig) + if err != nil { + return trace.Wrap(err) + } + + pk, err := keys.ParsePrivateKey(sctx.cfg.Session.GetPriv()) + if err != nil { + return trace.Wrap(err, "failed getting user private key from the session") + } + userKey := &client.Key{ + PrivateKey: pk, + Cert: sctx.cfg.Session.GetPub(), + TLSCert: sctx.cfg.Session.GetTLSCert(), + } + + certsReq := clientproto.UserCertsRequest{ + PublicKey: userKey.MarshalSSHPublicKey(), + Username: sctx.GetUser(), + Expires: sctx.cfg.Session.GetExpiryTime(), + Format: constants.CertificateFormatStandard, + RouteToCluster: tracker.GetClusterName(), + KubernetesCluster: tracker.GetKubeCluster(), + Usage: clientproto.UserCertsRequest_Kubernetes, + } + + _, certs, err := client.PerformSessionMFACeremony(ctx, client.PerformSessionMFACeremonyParams{ + CurrentAuthClient: clt, + RootAuthClient: sctx.cfg.RootClient, + MFACeremony: newMFACeremony(stream.WSStream, sctx.cfg.RootClient.CreateAuthenticateChallenge), + MFAAgainstRoot: sctx.cfg.RootClusterName == tracker.GetClusterName(), + MFARequiredReq: &clientproto.IsMFARequiredRequest{ + Target: &clientproto.IsMFARequiredRequest_KubernetesCluster{KubernetesCluster: tracker.GetKubeCluster()}, + }, + CertsReq: &certsReq, + }) + if err != nil && !errors.Is(err, services.ErrSessionMFANotRequired) { + return trace.Wrap(err, "failed performing mfa ceremony") + } + + if certs == nil { + certs, err = sctx.cfg.RootClient.GenerateUserCerts(ctx, certsReq) + if err != nil { + return trace.Wrap(err, "failed issuing user certs") + } + } + + hostCA, err := h.auth.accessPoint.GetCertAuthority(ctx, types.CertAuthID{ + Type: types.HostCA, + DomainName: h.auth.clusterName, + }, false) + if err != nil { + return trace.Wrap(err) + } + + certPool := x509.NewCertPool() + for _, keyPair := range hostCA.GetTrustedTLSKeyPairs() { + if ok := certPool.AppendCertsFromPEM(keyPair.Cert); !ok { + return trace.BadParameter("invalid ca format") + } + } + + session, err := client.NewKubeSession(ctx, + client.KubeSessionConfig{ + KubeProxyAddr: strings.TrimPrefix(kubeAddr, "https://"), + WebProxyAddr: h.cfg.ProxyWebAddr.String(), + TLSRoutingConnUpgradeRequired: netConfig.GetProxyListenerMode() == types.ProxyListenerMode_Multiplex, + EnableEscapeSequences: true, + Tracker: tracker, + TLSConfig: &tls.Config{ + RootCAs: certPool, + ServerName: tlsServerName, + GetClientCertificate: func(*tls.CertificateRequestInfo) (*tls.Certificate, error) { + cert, err := tls.X509KeyPair(certs.TLS, pk.PrivateKeyPEM()) + if err != nil { + return nil, trace.Wrap(err) + } + + return &cert, nil + }, + }, + Mode: mode, + AuthClient: func(ctx context.Context) (authclient.ClientI, error) { + return noopAuthClientCloser{clt}, nil + }, + Ceremony: newMFAPrompt(stream.WSStream), + Stdin: stream, + Stdout: stream, + Stderr: stderrWriter{stream: stream}, + }) + if err != nil { + return trace.Wrap(err) + } + + session.Wait() + + if err := session.Detach(); err != nil && !terminal.IsOKWebsocketCloseError(err) { + return trace.Wrap(err) + } + + return nil +} + +type noopAuthClientCloser struct { + authclient.ClientI +} + +func (noopAuthClientCloser) Close() error { return nil } diff --git a/lib/web/terminal/terminal.go b/lib/web/terminal/terminal.go index 6c4d1ff4d2a47..2aaf8fca9d367 100644 --- a/lib/web/terminal/terminal.go +++ b/lib/web/terminal/terminal.go @@ -196,7 +196,7 @@ func (t *WSStream) processMessages(ctx context.Context) { handler, ok := t.handlers[envelope.Type] if !ok { - t.log.Warnf("Received web socket envelope with unknown type %v", envelope.Type) + t.log.Warnf("Received web socket envelope with unknown type %v", envelope) continue } diff --git a/tool/tsh/common/kube.go b/tool/tsh/common/kube.go index ce76d434a810f..c2523c3e4a706 100644 --- a/tool/tsh/common/kube.go +++ b/tool/tsh/common/kube.go @@ -61,6 +61,7 @@ import ( apiclient "github.com/gravitational/teleport/api/client" "github.com/gravitational/teleport/api/client/proto" apidefaults "github.com/gravitational/teleport/api/defaults" + "github.com/gravitational/teleport/api/mfa" "github.com/gravitational/teleport/api/profile" "github.com/gravitational/teleport/api/types" "github.com/gravitational/teleport/api/utils/keypaths" @@ -209,7 +210,34 @@ func (c *kubeJoinCommand) run(cf *CLIConf) error { } tlsConfig.InsecureSkipVerify = cf.InsecureSkipVerify - session, err := client.NewKubeSession(cf.Context, tc, meta, tc.KubeProxyAddr, kubeStatus.tlsServerName, types.SessionParticipantMode(c.mode), tlsConfig) + tlsConfig.ServerName = kubeStatus.tlsServerName + session, err := client.NewKubeSession(cf.Context, + client.KubeSessionConfig{ + KubeProxyAddr: tc.Config.KubeProxyAddr, + WebProxyAddr: tc.Config.WebProxyAddr, + TLSRoutingConnUpgradeRequired: tc.Config.TLSRoutingConnUpgradeRequired, + EnableEscapeSequences: tc.Config.EnableEscapeSequences, + Tracker: meta, + TLSConfig: tlsConfig, + Mode: types.SessionParticipantMode(c.mode), + AuthClient: func(ctx context.Context) (authclient.ClientI, error) { + clt, err := tc.ConnectToCluster(ctx) + if err != nil { + return nil, trace.Wrap(err) + } + + auth, err := clt.ConnectToCluster(ctx, meta.GetClusterName()) + if err != nil { + return nil, trace.Wrap(err) + } + + return authClientCloser{ClientI: auth, clusterClient: clt}, nil + }, + Ceremony: tc.NewMFAPrompt(mfa.WithQuiet()), + Stdin: tc.Config.Stdin, + Stdout: tc.Config.Stdout, + Stderr: tc.Config.Stderr, + }) if err != nil { return trace.Wrap(err) } @@ -218,6 +246,15 @@ func (c *kubeJoinCommand) run(cf *CLIConf) error { return trace.Wrap(session.Detach()) } +type authClientCloser struct { + authclient.ClientI + clusterClient *client.ClusterClient +} + +func (a authClientCloser) Close() error { + return trace.NewAggregate(a.ClientI.Close(), a.clusterClient.Close()) +} + // RemoteExecutor defines the interface accepted by the Exec command - provided for test stubbing type RemoteExecutor interface { Execute(ctx context.Context, method string, url *url.URL, config *restclient.Config, stdin io.Reader, stdout, stderr io.Writer, tty bool, terminalSizeQueue remotecommand.TerminalSizeQueue) error diff --git a/web/packages/teleport/src/Console/DocumentKubeExec/useKubeExecSession.tsx b/web/packages/teleport/src/Console/DocumentKubeExec/useKubeExecSession.tsx index 4e2c29b56c188..6f236976bb5fb 100644 --- a/web/packages/teleport/src/Console/DocumentKubeExec/useKubeExecSession.tsx +++ b/web/packages/teleport/src/Console/DocumentKubeExec/useKubeExecSession.tsx @@ -92,13 +92,27 @@ export default function useKubeExecSession(doc: DocumentKubeExec) { const data = JSON.parse(payload); data.session.kind = 'k8s'; setSession(data.session); + + if (doc.mode !== undefined) { + ctx.updateKubeExecDocument(doc.id, { + title: `${data.session.server_hostname}@${data.session.kubernetes_cluster_name}`, + kubeNamespace: data.session, + }); + + setStatus('initialized'); + } + handleTtyConnect(ctx, data.session, doc.id); }); // assign tty reference so it can be passed down to xterm ttyRef.current = tty; setSession(session); - setStatus('waiting-for-exec-data'); + if (doc.mode === undefined) { + setStatus('waiting-for-exec-data'); + } else { + setStatus('initialized'); + } span.end(); } ); diff --git a/web/packages/teleport/src/Console/consoleContext.tsx b/web/packages/teleport/src/Console/consoleContext.tsx index daf8b3fba12d9..9f2a2e3bf17a6 100644 --- a/web/packages/teleport/src/Console/consoleContext.tsx +++ b/web/packages/teleport/src/Console/consoleContext.tsx @@ -119,8 +119,8 @@ export default class ConsoleContext { title: params.kubeId, url, created: new Date(), - mode: null, - + mode: params.mode, + sid: params.sid, kubeCluster: params.kubeId, kubeNamespace: '', pod: '', @@ -170,7 +170,9 @@ export default class ConsoleContext { } getKubeExecDocumentUrl(kubeExecParams: UrlKubeExecParams) { - return cfg.getKubeExecConnectRoute(kubeExecParams); + return kubeExecParams.sid + ? cfg.getKubeExecSessionRoute(kubeExecParams) + : cfg.getKubeExecConnectRoute(kubeExecParams); } refreshParties() { @@ -235,6 +237,10 @@ export default class ConsoleContext { }; break; case 'k8s': + ttyParams = { + sid, + mode, + }; break; } diff --git a/web/packages/teleport/src/Console/useTabRouting.ts b/web/packages/teleport/src/Console/useTabRouting.ts index fe78aed6790a3..8c2c4f987a26b 100644 --- a/web/packages/teleport/src/Console/useTabRouting.ts +++ b/web/packages/teleport/src/Console/useTabRouting.ts @@ -35,6 +35,9 @@ export default function useRouting(ctx: ConsoleContext) { const joinSshRouteMatch = useRouteMatch( cfg.routes.consoleSession ); + const joinKubeExecRouteMatch = useRouteMatch( + cfg.routes.kubeExecSession + ); // Ensure that each URL has corresponding document React.useMemo(() => { @@ -42,22 +45,22 @@ export default function useRouting(ctx: ConsoleContext) { return; } + const participantMode = getParticipantMode(search); + // When no document matches current URL that means we need to // create one base on URL parameters. if (sshRouteMatch) { ctx.addSshDocument(sshRouteMatch.params); } else if (joinSshRouteMatch) { - // Extract the mode param from the URL if it is present. - const searchParams = new URLSearchParams(search); - const mode = searchParams.get('mode'); - if (mode) { - joinSshRouteMatch.params.mode = mode as ParticipantMode; - } + joinSshRouteMatch.params.mode = participantMode; ctx.addSshDocument(joinSshRouteMatch.params); } else if (nodesRouteMatch) { ctx.addNodeDocument(clusterId); } else if (kubeExecRouteMatch) { ctx.addKubeExecDocument(kubeExecRouteMatch.params); + } else if (joinKubeExecRouteMatch) { + joinKubeExecRouteMatch.params.mode = participantMode; + ctx.addKubeExecDocument(joinKubeExecRouteMatch.params); } }, [ctx, pathname]); @@ -66,3 +69,11 @@ export default function useRouting(ctx: ConsoleContext) { activeDocId: ctx.getActiveDocId(pathname), }; } + +function getParticipantMode(search: string): ParticipantMode | undefined { + const searchParams = new URLSearchParams(search); + const mode = searchParams.get('mode'); + if (mode === 'observer' || mode === 'moderator' || mode === 'peer') { + return mode; + } +} diff --git a/web/packages/teleport/src/Sessions/SessionList/SessionJoinBtn.test.tsx b/web/packages/teleport/src/Sessions/SessionList/SessionJoinBtn.test.tsx index f3b751fe0b15a..de8db7e53602b 100644 --- a/web/packages/teleport/src/Sessions/SessionList/SessionJoinBtn.test.tsx +++ b/web/packages/teleport/src/Sessions/SessionList/SessionJoinBtn.test.tsx @@ -36,6 +36,7 @@ test('all participant modes are properly listed and in the correct order', () => participantModes={['moderator', 'peer', 'observer']} showCTA={false} showModeratedCTA={false} + kind="ssh" /> ); @@ -84,6 +85,7 @@ test('showCTA does not render a join link for any sessions', () => { participantModes={['moderator', 'peer', 'observer']} showCTA={true} showModeratedCTA={false} + kind="ssh" /> ); @@ -112,6 +114,7 @@ test('showModeratedCTA does not render a join link for moderated sessions', () = participantModes={['moderator', 'peer', 'observer']} showCTA={false} showModeratedCTA={true} + kind="ssh" /> ); diff --git a/web/packages/teleport/src/Sessions/SessionList/SessionJoinBtn.tsx b/web/packages/teleport/src/Sessions/SessionList/SessionJoinBtn.tsx index b3dc0f9905a71..f49424afbc826 100644 --- a/web/packages/teleport/src/Sessions/SessionList/SessionJoinBtn.tsx +++ b/web/packages/teleport/src/Sessions/SessionList/SessionJoinBtn.tsx @@ -23,17 +23,19 @@ import { ChevronDown, Warning } from 'design/Icon'; import { ButtonLockedFeature } from 'teleport/components/ButtonLockedFeature'; import cfg from 'teleport/config'; -import { ParticipantMode } from 'teleport/services/session'; +import { ParticipantMode, SessionKind } from 'teleport/services/session'; import { CtaEvent } from 'teleport/services/userEvent'; export const SessionJoinBtn = ({ sid, + kind, clusterId, participantModes, showCTA, showModeratedCTA, }: { sid: string; + kind: SessionKind; clusterId: string; participantModes: ParticipantMode[]; showCTA: boolean; @@ -45,6 +47,17 @@ export const SessionJoinBtn = ({ setAnchorEl(null); } + function joinSession(joinMode: ParticipantMode): string { + switch (kind) { + case 'ssh': + return cfg.getSshSessionRoute({ sid, clusterId }, joinMode); + case 'k8s': + return cfg.getKubeExecSessionRoute({ sid, clusterId }, joinMode); + default: + return ''; + } + } + return ( {showCTA && ( @@ -61,7 +74,7 @@ export const SessionJoinBtn = ({ JSX.Element; joinable: boolean }; } = { ssh: { icon: Icons.Cli, joinable: true }, - k8s: { icon: Icons.Kubernetes, joinable: false }, + k8s: { icon: Icons.Kubernetes, joinable: true }, desktop: { icon: Icons.Desktop, joinable: false }, app: { icon: Icons.Application, joinable: false }, db: { icon: Icons.Database, joinable: false }, @@ -141,6 +141,7 @@ const renderJoinCell = ({ + > +
+ +
+ @@ -1831,7 +1860,36 @@ exports[`loaded 1`] = ` + > +
+ +
+ @@ -2905,7 +2963,36 @@ exports[`moderated sessions CTA for non-enterprise 1`] = ` + > +
+ +
+ diff --git a/web/packages/teleport/src/config.ts b/web/packages/teleport/src/config.ts index 9359b90ffeb8e..8686fa7d6b3b8 100644 --- a/web/packages/teleport/src/config.ts +++ b/web/packages/teleport/src/config.ts @@ -171,7 +171,7 @@ const cfg = { consoleConnect: '/web/cluster/:clusterId/console/node/:serverId/:login', consoleSession: '/web/cluster/:clusterId/console/session/:sid', kubeExec: '/web/cluster/:clusterId/console/kube/exec/:kubeId/', - kubeExecSession: '/web/cluster/:clusterId/console/kube/exec/:sid', + kubeExecSession: '/web/cluster/:clusterId/console/kube/session/:sid', player: '/web/cluster/:clusterId/session/:sid', // ?recordingType=ssh|desktop|k8s&durationMs=1234 login: '/web/login', loginSuccess: '/web/msg/info/login_success', @@ -1239,6 +1239,8 @@ export interface UrlSshParams { export interface UrlKubeExecParams { clusterId: string; kubeId: string; + sid?: string; + mode?: ParticipantMode; } export interface UrlSessionRecordingsParams {