diff --git a/lib/kube/proxy/exec_test.go b/lib/kube/proxy/exec_test.go index 477bcdaeb8d86..ffd3194e9ad12 100644 --- a/lib/kube/proxy/exec_test.go +++ b/lib/kube/proxy/exec_test.go @@ -126,7 +126,7 @@ func TestExecKubeService(t *testing.T) { }, }, { - name: "Websocket protocol", + name: "Websocket protocol v4", args: args{ // We can delete the dummy client once https://github.com/kubernetes/kubernetes/pull/110142 // is merged into k8s go-client. @@ -137,6 +137,15 @@ func TestExecKubeService(t *testing.T) { config: configWithSingleKubeUser, }, }, + { + name: "Websocket protocol v5", + args: args{ + executorBuilder: func(c *rest.Config, s string, u *url.URL) (remotecommand.Executor, error) { + return remotecommand.NewWebSocketExecutor(c, s, u.String()) + }, + config: configWithSingleKubeUser, + }, + }, { name: "SPDY protocol for user with multiple kubernetes users", args: args{ @@ -146,7 +155,7 @@ func TestExecKubeService(t *testing.T) { }, }, { - name: "Websocket protocol for user with multiple kubernetes users", + name: "Websocket protocol v4 for user with multiple kubernetes users", args: args{ // We can delete the dummy client once https://github.com/kubernetes/kubernetes/pull/110142 // is merged into k8s go-client. @@ -158,6 +167,16 @@ func TestExecKubeService(t *testing.T) { impersonateUser: "admin", }, }, + { + name: "Websocket protocol v5 for user with multiple kubernetes users", + args: args{ + executorBuilder: func(c *rest.Config, s string, u *url.URL) (remotecommand.Executor, error) { + return remotecommand.NewWebSocketExecutor(c, s, u.String()) + }, + config: configMultiKubeUsers, + impersonateUser: "admin", + }, + }, { name: "SPDY protocol for user with multiple kubernetes users without specifying impersonate user", args: args{ @@ -166,6 +185,16 @@ func TestExecKubeService(t *testing.T) { }, wantErr: true, }, + { + name: "Websocket protocol v5 for user with multiple kubernetes users without specifying impersonate user", + args: args{ + executorBuilder: func(c *rest.Config, s string, u *url.URL) (remotecommand.Executor, error) { + return remotecommand.NewWebSocketExecutor(c, s, u.String()) + }, + config: configMultiKubeUsers, + }, + wantErr: true, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { diff --git a/lib/kube/proxy/remotecommand_websocket.go b/lib/kube/proxy/remotecommand_websocket.go index 885c3f376df13..5b08407f79cc2 100644 --- a/lib/kube/proxy/remotecommand_websocket.go +++ b/lib/kube/proxy/remotecommand_websocket.go @@ -22,21 +22,17 @@ import ( "github.com/go-logr/logr" "github.com/gravitational/trace" "k8s.io/apimachinery/pkg/util/httpstream/wsstream" + "k8s.io/apimachinery/pkg/util/remotecommand" "k8s.io/apiserver/pkg/endpoints/responsewriter" "k8s.io/klog/v2" ) const ( - stdinChannel = iota - stdoutChannel - stderrChannel - errorChannel - resizeChannel - preV4BinaryWebsocketProtocol = wsstream.ChannelWebSocketProtocol preV4Base64WebsocketProtocol = wsstream.Base64ChannelWebSocketProtocol v4BinaryWebsocketProtocol = "v4." + wsstream.ChannelWebSocketProtocol v4Base64WebsocketProtocol = "v4." + wsstream.Base64ChannelWebSocketProtocol + v5BinaryWebsocketProtocol = remotecommand.StreamProtocolV5Name ) func init() { @@ -59,11 +55,11 @@ func init() { func createChannels(req remoteCommandRequest) []wsstream.ChannelType { // open the requested channels, and always open the error channel channels := make([]wsstream.ChannelType, 5) - channels[stdinChannel] = readChannel(req.stdin) - channels[stdoutChannel] = writeChannel(req.stdout) - channels[stderrChannel] = writeChannel(req.stderr) - channels[errorChannel] = wsstream.WriteChannel - channels[resizeChannel] = wsstream.ReadChannel + channels[remotecommand.StreamStdIn] = readChannel(req.stdin) + channels[remotecommand.StreamStdOut] = writeChannel(req.stdout) + channels[remotecommand.StreamStdErr] = writeChannel(req.stderr) + channels[remotecommand.StreamErr] = wsstream.WriteChannel + channels[remotecommand.StreamResize] = wsstream.ReadChannel return channels } @@ -108,6 +104,10 @@ func createWebSocketStreams(req remoteCommandRequest) (*remoteCommandProxy, erro Binary: false, Channels: channels, }, + v5BinaryWebsocketProtocol: { + Binary: true, + Channels: channels, + }, }) conn.SetIdleTimeout(IdleTimeout) negotiatedProtocol, streams, err := conn.Open( @@ -121,20 +121,20 @@ func createWebSocketStreams(req remoteCommandRequest) (*remoteCommandProxy, erro // Send an empty message to the lowest writable channel to notify the client the connection is established switch { case req.stdout: - streams[stdoutChannel].Write([]byte{}) + streams[remotecommand.StreamStdOut].Write([]byte{}) case req.stderr: - streams[stderrChannel].Write([]byte{}) + streams[remotecommand.StreamStdErr].Write([]byte{}) default: - streams[errorChannel].Write([]byte{}) + streams[streamErr].Write([]byte{}) } proxy := &remoteCommandProxy{ conn: conn, - stdinStream: streams[stdinChannel], - stdoutStream: streams[stdoutChannel], - stderrStream: streams[stderrChannel], + stdinStream: streams[remotecommand.StreamStdIn], + stdoutStream: streams[remotecommand.StreamStdOut], + stderrStream: streams[remotecommand.StreamStdErr], tty: req.tty, - resizeStream: streams[resizeChannel], + resizeStream: streams[remotecommand.StreamResize], } // When stdin, stdout or stderr are not enabled, websocket creates a io.Pipe @@ -153,10 +153,10 @@ func createWebSocketStreams(req remoteCommandRequest) (*remoteCommandProxy, erro } switch negotiatedProtocol { - case v4BinaryWebsocketProtocol, v4Base64WebsocketProtocol: - proxy.writeStatus = v4WriteStatusFunc(streams[errorChannel]) + case v5BinaryWebsocketProtocol, v4BinaryWebsocketProtocol, v4Base64WebsocketProtocol: + proxy.writeStatus = v4WriteStatusFunc(streams[remotecommand.StreamErr]) default: - proxy.writeStatus = v1WriteStatusFunc(streams[errorChannel]) + proxy.writeStatus = v1WriteStatusFunc(streams[remotecommand.StreamErr]) } return proxy, nil diff --git a/lib/kube/proxy/testing/kube_server/kube_mock.go b/lib/kube/proxy/testing/kube_server/kube_mock.go index d67775ade50a1..a415617d9a5fc 100644 --- a/lib/kube/proxy/testing/kube_server/kube_mock.go +++ b/lib/kube/proxy/testing/kube_server/kube_mock.go @@ -43,6 +43,8 @@ import ( "k8s.io/apimachinery/pkg/util/httpstream" spdystream "k8s.io/apimachinery/pkg/util/httpstream/spdy" "k8s.io/apimachinery/pkg/util/httpstream/wsstream" + apiremotecommand "k8s.io/apimachinery/pkg/util/remotecommand" + "k8s.io/apiserver/pkg/endpoints/responsewriter" "k8s.io/client-go/tools/remotecommand" "github.com/gravitational/teleport/lib/defaults" @@ -76,6 +78,12 @@ const ( // Value for streamType header for terminal resize stream StreamTypeResize = "resize" + preV4BinaryWebsocketProtocol = wsstream.ChannelWebSocketProtocol + preV4Base64WebsocketProtocol = wsstream.Base64ChannelWebSocketProtocol + v4BinaryWebsocketProtocol = "v4." + wsstream.ChannelWebSocketProtocol + v4Base64WebsocketProtocol = "v4." + wsstream.Base64ChannelWebSocketProtocol + v5BinaryWebsocketProtocol = "v5." + wsstream.ChannelWebSocketProtocol + // CloseStreamMessage is an expected keyword if stdin is enable and the // underlying protocol does not support half closed streams. // It is only required for websockets. @@ -335,12 +343,15 @@ func createRemoteCommandProxy(req remoteCommandRequest) (*remoteCommandProxy, er err error ) if wsstream.IsWebSocketRequest(req.httpRequest) { - return nil, fmt.Errorf("only SPDY streams upgrades are supported") - } - - proxy, err = createSPDYStreams(req) - if err != nil { - return nil, trace.Wrap(err) + proxy, err = createWebSocketStreams(req) + if err != nil { + return nil, trace.Wrap(err) + } + } else { + proxy, err = createSPDYStreams(req) + if err != nil { + return nil, trace.Wrap(err) + } } if proxy.resizeStream != nil { @@ -350,6 +361,95 @@ func createRemoteCommandProxy(req remoteCommandRequest) (*remoteCommandProxy, er return proxy, nil } +func channelOrIgnore(channel wsstream.ChannelType, real bool) wsstream.ChannelType { + if real { + return channel + } + return wsstream.IgnoreChannel +} + +func createWebSocketStreams(req remoteCommandRequest) (*remoteCommandProxy, error) { + channels := make([]wsstream.ChannelType, 5) + channels[apiremotecommand.StreamStdIn] = channelOrIgnore(wsstream.ReadChannel, req.stdin) + channels[apiremotecommand.StreamStdOut] = channelOrIgnore(wsstream.WriteChannel, req.stdout) + channels[apiremotecommand.StreamStdErr] = channelOrIgnore(wsstream.WriteChannel, req.stderr) + channels[apiremotecommand.StreamErr] = wsstream.WriteChannel + channels[apiremotecommand.StreamResize] = wsstream.ReadChannel + + conn := wsstream.NewConn(map[string]wsstream.ChannelProtocolConfig{ + "": { + Binary: true, + Channels: channels, + }, + preV4BinaryWebsocketProtocol: { + Binary: true, + Channels: channels, + }, + preV4Base64WebsocketProtocol: { + Binary: false, + Channels: channels, + }, + v4BinaryWebsocketProtocol: { + Binary: true, + Channels: channels, + }, + v4Base64WebsocketProtocol: { + Binary: false, + Channels: channels, + }, + v5BinaryWebsocketProtocol: { + Binary: true, + Channels: channels, + }, + }) + conn.SetIdleTimeout(IdleTimeout) + _, streams, err := conn.Open( + responsewriter.GetOriginal(req.httpResponseWriter), + req.httpRequest, + ) + if err != nil { + return nil, trace.Wrap(err, "unable to upgrade websocket connection") + } + + // Send an empty message to the lowest writable channel to notify the client the connection is established + switch { + case req.stdout: + streams[apiremotecommand.StreamStdOut].Write([]byte{}) + case req.stderr: + streams[apiremotecommand.StreamStdErr].Write([]byte{}) + default: + streams[apiremotecommand.StreamErr].Write([]byte{}) + } + + proxy := &remoteCommandProxy{ + conn: conn, + stdinStream: streams[apiremotecommand.StreamStdIn], + stdoutStream: streams[apiremotecommand.StreamStdOut], + stderrStream: streams[apiremotecommand.StreamStdErr], + tty: req.tty, + resizeStream: streams[apiremotecommand.StreamResize], + } + + // When stdin, stdout or stderr are not enabled, websocket creates a io.Pipe + // for them so they are not nil. + // Since we need to forward to another k8s server (Teleport or real k8s API), + // we must disabled the readers, otherwise the SPDY executor will wait for + // read/write into the streams and will hang. + if !req.stdin { + proxy.stdinStream = nil + } + if !req.stdout { + proxy.stdoutStream = nil + } + if !req.stderr { + proxy.stderrStream = nil + } + + proxy.writeStatus = v4WriteStatusFunc(streams[apiremotecommand.StreamErr]) + + return proxy, nil +} + func createSPDYStreams(req remoteCommandRequest) (*remoteCommandProxy, error) { protocol, err := httpstream.Handshake(req.httpRequest, req.httpResponseWriter, []string{StreamProtocolV4Name}) if err != nil {