From e5b7216574d2e92e8dd3b9e36eb59c0c3864d052 Mon Sep 17 00:00:00 2001 From: Tiago Silva Date: Fri, 20 Sep 2024 09:46:18 +0100 Subject: [PATCH] [kube] return Kubernetes API errors when using Websocket API Kubernetes 1.30 introduced a new Remote Command protocol that uses WebSockets, replacing the deprecated SPDY protocol. The new executor protocol didn't returned the status error when API call failed. For Teleport, this creates an issue because permission errors are absorbed by the WebSocket executor and were never passed to users. This pull request implements a status parser for websocket Kubernetes API error. Signed-off-by: Tiago Silva --- lib/kube/proxy/auth_test.go | 31 +++- lib/kube/proxy/exec_test.go | 144 ++++++++++++++++++ lib/kube/proxy/roundtrip.go | 39 ++--- lib/kube/proxy/roundtrip_websocket.go | 3 + .../proxy/testing/kube_server/kube_mock.go | 39 +++++ 5 files changed, 238 insertions(+), 18 deletions(-) diff --git a/lib/kube/proxy/auth_test.go b/lib/kube/proxy/auth_test.go index 565981138c575..ba88bdfecb607 100644 --- a/lib/kube/proxy/auth_test.go +++ b/lib/kube/proxy/auth_test.go @@ -34,6 +34,7 @@ import ( "golang.org/x/exp/maps" authzapi "k8s.io/api/authorization/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + apimachineryversion "k8s.io/apimachinery/pkg/version" "k8s.io/client-go/kubernetes" authztypes "k8s.io/client-go/kubernetes/typed/authorization/v1" "k8s.io/client-go/rest" @@ -212,7 +213,11 @@ current-context: foo kubeClient: &kubernetes.Clientset{}, clientRestCfg: &rest.Config{}, }, - kubeCluster: mustCreateKubernetesClusterV3(t, "foo"), + kubeCluster: mustCreateKubernetesClusterV3(t, "foo"), + kubeClusterVersion: &apimachineryversion.Info{ + Major: "1", + Minor: "20", + }, rbacSupportedTypes: rbacSupportedTypes, }, "bar": { @@ -222,6 +227,10 @@ current-context: foo kubeClient: &kubernetes.Clientset{}, clientRestCfg: &rest.Config{}, }, + kubeClusterVersion: &apimachineryversion.Info{ + Major: "1", + Minor: "20", + }, kubeCluster: mustCreateKubernetesClusterV3(t, "bar"), rbacSupportedTypes: rbacSupportedTypes, }, @@ -232,6 +241,10 @@ current-context: foo kubeClient: &kubernetes.Clientset{}, clientRestCfg: &rest.Config{}, }, + kubeClusterVersion: &apimachineryversion.Info{ + Major: "1", + Minor: "20", + }, kubeCluster: mustCreateKubernetesClusterV3(t, "baz"), rbacSupportedTypes: rbacSupportedTypes, }, @@ -257,6 +270,10 @@ current-context: foo kubeClient: &kubernetes.Clientset{}, clientRestCfg: &rest.Config{}, }, + kubeClusterVersion: &apimachineryversion.Info{ + Major: "1", + Minor: "20", + }, kubeCluster: mustCreateKubernetesClusterV3(t, teleClusterName), rbacSupportedTypes: rbacSupportedTypes, }, @@ -275,6 +292,10 @@ current-context: foo kubeClient: &kubernetes.Clientset{}, clientRestCfg: &rest.Config{}, }, + kubeClusterVersion: &apimachineryversion.Info{ + Major: "1", + Minor: "20", + }, kubeCluster: mustCreateKubernetesClusterV3(t, "foo"), rbacSupportedTypes: rbacSupportedTypes, }, @@ -285,6 +306,10 @@ current-context: foo kubeClient: &kubernetes.Clientset{}, clientRestCfg: &rest.Config{}, }, + kubeClusterVersion: &apimachineryversion.Info{ + Major: "1", + Minor: "20", + }, kubeCluster: mustCreateKubernetesClusterV3(t, "bar"), rbacSupportedTypes: rbacSupportedTypes, }, @@ -295,6 +320,10 @@ current-context: foo kubeClient: &kubernetes.Clientset{}, clientRestCfg: &rest.Config{}, }, + kubeClusterVersion: &apimachineryversion.Info{ + Major: "1", + Minor: "20", + }, kubeCluster: mustCreateKubernetesClusterV3(t, "baz"), rbacSupportedTypes: rbacSupportedTypes, }, diff --git a/lib/kube/proxy/exec_test.go b/lib/kube/proxy/exec_test.go index 8fc4949c4652b..3aca4f095b157 100644 --- a/lib/kube/proxy/exec_test.go +++ b/lib/kube/proxy/exec_test.go @@ -34,6 +34,7 @@ import ( corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + apimachineryversion "k8s.io/apimachinery/pkg/version" "k8s.io/client-go/rest" "k8s.io/client-go/tools/remotecommand" "k8s.io/kubectl/pkg/scheme" @@ -455,3 +456,146 @@ func TestExecMissingGETPermissionError(t *testing.T) { }) } } + +func TestExecWebsocketEndToEndErrReturn(t *testing.T) { + t.Parallel() + + const ( + errorMessage = "pods \"api-1\" is forbidden: User \"bar\" cannot %s resource " + + "\"pods/exec\" in API group \"\" in the namespace \"ns\"" + ) + + const errorCode = http.StatusForbidden + + kubeMock, err := testingkubemock.NewKubeAPIMock( + testingkubemock.WithExecError( + metav1.Status{ + Status: metav1.StatusFailure, + Message: fmt.Sprintf(errorMessage, "get"), + Reason: metav1.StatusReasonForbidden, + Code: errorCode, + }, + ), + testingkubemock.WithVersion( + &apimachineryversion.Info{ + Major: "1", + Minor: "31", + GitVersion: "v1.31.0", + }), + ) + require.NoError(t, err) + t.Cleanup(func() { + require.EqualValues(t, 0, kubeMock.KubeExecRequests.SPDY.Load(), "expected no SPDY requests") + require.EqualValues(t, 2, kubeMock.KubeExecRequests.Websocket.Load(), "expected one websocket request") + kubeMock.Close() + }) + var ( + execEvent *apievents.Exec + eventsLock sync.Mutex + ) + + // creates a Kubernetes service with a configured cluster pointing to mock api server + testCtx := SetupTestContext( + context.Background(), + t, + TestConfig{ + Clusters: []KubeClusterConfig{{Name: kubeCluster, APIEndpoint: kubeMock.URL}}, + OnEvent: func(evt apievents.AuditEvent) { + eventsLock.Lock() + defer eventsLock.Unlock() + if exec, ok := evt.(*apievents.Exec); ok { + execEvent = exec + } + }, + }, + ) + + t.Cleanup(func() { require.NoError(t, testCtx.Close()) }) + + // create a user with access to kubernetes (kubernetes_user and kubernetes_groups specified) + user, _ := testCtx.CreateUserAndRole( + testCtx.Context, + t, + username, + RoleSpec{ + Name: roleName, + KubeUsers: roleKubeUsers, + KubeGroups: roleKubeGroups, + }) + + // generate a kube client with user certs for auth + _, userRestConfig := testCtx.GenTestKubeClientTLSCert( + t, + user.GetName(), + kubeCluster, + ) + + tests := []struct { + name string + interactive bool + }{ + { + name: "error propagation in non-interactive session", + }, + { + name: "error propgation in interactive session", + interactive: true, + }, + } + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + var streamOpts remotecommand.StreamOptions + if !tt.interactive { + streamOpts = remotecommand.StreamOptions{ + Stdin: nil, + Stdout: &bytes.Buffer{}, + Stderr: &bytes.Buffer{}, + Tty: false, + } + } else { + stdinReader, _ := io.Pipe() + t.Cleanup(func() { stdinReader.Close() }) + streamOpts = remotecommand.StreamOptions{ + Stdin: stdinReader, + Stdout: &bytes.Buffer{}, + Stderr: nil, + Tty: true, + } + } + req, err := generateExecRequest( + generateExecRequestConfig{ + addr: testCtx.KubeProxyAddress(), + podName: podName, + podNamespace: podNamespace, + containerName: podContainerName, + cmd: containerCommmandExecute, // placeholder for commands to execute in the dummy pod + options: streamOpts, + }, + ) + require.NoError(t, err) + + exec, err := remotecommand.NewSPDYExecutor(userRestConfig, http.MethodPost, req.URL()) + require.NoError(t, err) + err = exec.StreamWithContext(context.Background(), streamOpts) + require.Error(t, err) + require.Contains(t, err.Error(), kubernetes130BreakingChangeHint) + + require.Eventually(t, func() bool { + eventsLock.Lock() + defer eventsLock.Unlock() + return execEvent != nil + }, 5*time.Second, 100*time.Millisecond, "expected exec event to be recorded") + + eventsLock.Lock() + require.Equal(t, events.ExecFailureCode, execEvent.Code) + require.Equal(t, "403", execEvent.ExitCode) + require.NotEmpty(t, execEvent.Error) + eventsLock.Unlock() + + }) + } + +} diff --git a/lib/kube/proxy/roundtrip.go b/lib/kube/proxy/roundtrip.go index ed68364f5fe2d..f2d172f990b0e 100644 --- a/lib/kube/proxy/roundtrip.go +++ b/lib/kube/proxy/roundtrip.go @@ -259,23 +259,7 @@ func (s *SpdyRoundTripper) NewConnection(resp *http.Response) (httpstream.Connec connectionHeader := strings.ToLower(resp.Header.Get(httpstream.HeaderConnection)) upgradeHeader := strings.ToLower(resp.Header.Get(httpstream.HeaderUpgrade)) if (resp.StatusCode != http.StatusSwitchingProtocols) || !strings.Contains(connectionHeader, strings.ToLower(httpstream.HeaderUpgrade)) || !strings.Contains(upgradeHeader, strings.ToLower(streamspdy.HeaderSpdy31)) { - defer resp.Body.Close() - responseError := "" - responseErrorBytes, err := io.ReadAll(resp.Body) - if err != nil { - responseError = "unable to read error from server response" - } else { - // TODO: I don't belong here, I should be abstracted from this class - if obj, _, err := statusCodecs.UniversalDecoder().Decode(responseErrorBytes, nil, &metav1.Status{}); err == nil { - if status, ok := obj.(*metav1.Status); ok { - return nil, &apierrors.StatusError{ErrStatus: *status} - } - } - responseError = string(responseErrorBytes) - responseError = strings.TrimSpace(responseError) - } - - return nil, fmt.Errorf("unable to upgrade connection: %s", responseError) + return nil, trace.Wrap(extractKubeAPIStatusFromReq(resp)) } return streamspdy.NewClientConnectionWithPings(s.conn, s.pingPeriod) @@ -292,3 +276,24 @@ func init() { &metav1.Status{}, ) } + +// extractKubeAPIStatusFromReq extracts the status from the response body and returns it as an error. +func extractKubeAPIStatusFromReq(rsp *http.Response) error { + defer func() { + _ = rsp.Body.Close() + }() + responseError := "" + responseErrorBytes, err := io.ReadAll(rsp.Body) + if err != nil { + responseError = "unable to read error from server response" + } else { + if obj, _, err := statusCodecs.UniversalDecoder().Decode(responseErrorBytes, nil, &metav1.Status{}); err == nil { + if status, ok := obj.(*metav1.Status); ok { + return &apierrors.StatusError{ErrStatus: *status} + } + } + responseError = string(responseErrorBytes) + responseError = strings.TrimSpace(responseError) + } + return fmt.Errorf("unable to upgrade connection: %s", responseError) +} diff --git a/lib/kube/proxy/roundtrip_websocket.go b/lib/kube/proxy/roundtrip_websocket.go index 6e232b46eb61f..1ee847384c826 100644 --- a/lib/kube/proxy/roundtrip_websocket.go +++ b/lib/kube/proxy/roundtrip_websocket.go @@ -103,6 +103,9 @@ func (w *WebsocketRoundTripper) RoundTrip(req *http.Request) (*http.Response, er wsConn, wsResp, err := wsDialer.DialContext(w.ctx, clone.URL.String(), clone.Header) if err != nil { + if wsResp != nil { + return nil, trace.Wrap(extractKubeAPIStatusFromReq(wsResp)) + } return nil, &httpstream.UpgradeFailureError{Cause: err} } w.conn = wsConn diff --git a/lib/kube/proxy/testing/kube_server/kube_mock.go b/lib/kube/proxy/testing/kube_server/kube_mock.go index ff43040d4ae37..eb1cc97edacce 100644 --- a/lib/kube/proxy/testing/kube_server/kube_mock.go +++ b/lib/kube/proxy/testing/kube_server/kube_mock.go @@ -30,6 +30,7 @@ import ( "net/http/httptest" "strings" "sync" + "sync/atomic" "time" "github.com/gravitational/trace" @@ -44,6 +45,7 @@ import ( spdystream "k8s.io/apimachinery/pkg/util/httpstream/spdy" "k8s.io/apimachinery/pkg/util/httpstream/wsstream" apiremotecommand "k8s.io/apimachinery/pkg/util/remotecommand" + apimachineryversion "k8s.io/apimachinery/pkg/version" "k8s.io/apiserver/pkg/endpoints/responsewriter" "k8s.io/client-go/tools/remotecommand" @@ -116,10 +118,26 @@ func WithExecError(status metav1.Status) Option { } } +// WithVersion sets the version of the server +func WithVersion(version *apimachineryversion.Info) Option { + return func(s *KubeMockServer) { + s.version = version + } +} + type deletedResource struct { requestID string kind string } + +// KubeExecRequests keeps track of the number of exec requests +type KubeExecRequests struct { + // SPDY is the number of SPDY exec requests + SPDY atomic.Int32 + // Websocket is the number of Websocket exec requests + Websocket atomic.Int32 +} + type KubeMockServer struct { router *httprouter.Router log *log.Entry @@ -132,6 +150,8 @@ type KubeMockServer struct { getPodError *metav1.Status execPodError *metav1.Status mu sync.Mutex + version *apimachineryversion.Info + KubeExecRequests } // NewKubeAPIMock creates Kubernetes API server for handling exec calls. @@ -146,6 +166,10 @@ func NewKubeAPIMock(opts ...Option) (*KubeMockServer, error) { router: httprouter.New(), log: log.NewEntry(log.New()), deletedResources: make(map[deletedResource][]string), + version: &apimachineryversion.Info{ + Major: "1", + Minor: "20", + }, } for _, o := range opts { @@ -191,6 +215,8 @@ func (s *KubeMockServer) setup() { s.router.GET("/apis/resources.teleport.dev/v6/namespaces/:namespace/teleportroles/:name", s.withWriter(s.getTeleportRole)) s.router.DELETE("/apis/resources.teleport.dev/v6/namespaces/:namespace/teleportroles/:name", s.withWriter(s.deleteTeleportRole)) + s.router.GET("/version", s.withWriter(s.versionEndpoint)) + for _, endpoint := range []string{"/api", "/api/:ver", "/apis", "/apis/resources.teleport.dev/v6"} { s.router.GET(endpoint, s.withWriter(s.discoveryEndpoint)) } @@ -238,6 +264,12 @@ func (s *KubeMockServer) writeResponseError(rw http.ResponseWriter, respErr erro } func (s *KubeMockServer) exec(w http.ResponseWriter, req *http.Request, p httprouter.Params) (resp any, err error) { + if wsstream.IsWebSocketRequest(req) { + s.KubeExecRequests.Websocket.Add(1) + } else { + s.KubeExecRequests.SPDY.Add(1) + } + q := req.URL.Query() if s.execPodError != nil { s.writeResponseError(w, nil, s.execPodError) @@ -778,3 +810,10 @@ func httpStreamReceived(ctx context.Context, streams chan httpstream.Stream) fun } } } + +func (s *KubeMockServer) versionEndpoint(_ http.ResponseWriter, _ *http.Request, _ httprouter.Params) (resp any, err error) { + if s.version == nil { + return nil, trace.BadParameter("version not set") + } + return s.version, nil +}