Skip to content

Commit

Permalink
feat(platform): support proxy CR through websocket (#1791)
Browse files Browse the repository at this point in the history
  • Loading branch information
Leo Ryu authored Mar 16, 2022
1 parent db367bc commit 1392241
Showing 1 changed file with 61 additions and 17 deletions.
78 changes: 61 additions & 17 deletions pkg/platform/registry/cluster/storage/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,25 @@ package storage
import (
"context"
"fmt"
"net"
"net/http"
"net/http/httputil"
"net/url"
"path"
"strings"
"time"

"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
utilnet "k8s.io/apimachinery/pkg/util/net"
k8sproxy "k8s.io/apimachinery/pkg/util/proxy"
"k8s.io/apiserver/pkg/registry/generic/registry"
"k8s.io/apiserver/pkg/registry/rest"
clientrest "k8s.io/client-go/rest"
"k8s.io/client-go/transport"
"k8s.io/klog"
platforminternalclient "tkestack.io/tke/api/client/clientset/internalversion/typed/platform/internalversion"
"tkestack.io/tke/api/platform"
"tkestack.io/tke/pkg/apiserver/authentication"
"tkestack.io/tke/pkg/platform/apiserver/filter"
"tkestack.io/tke/pkg/platform/proxy"
"tkestack.io/tke/pkg/platform/util"
Expand Down Expand Up @@ -86,7 +90,6 @@ func (r *ProxyREST) Connect(ctx context.Context, clusterName string, opts runtim
return nil, errors.NewInternalError(err)
}

userName, tenantID := authentication.UsernameAndTenantID(ctx)
uri, err := makeURL(config.Host, proxyOpts.Path)
if err != nil {
return nil, errors.NewBadRequest(err.Error())
Expand All @@ -96,28 +99,23 @@ func (r *ProxyREST) Connect(ctx context.Context, clusterName string, opts runtim
if err != nil {
return nil, err
}
upgradeTransport, err := makeUpgradeTransport(config, 30*time.Second)
if err != nil {
return nil, err
}
responders := &responders{}
proxy := k8sproxy.NewUpgradeAwareHandler(uri, transport, false, false, responders)
proxy.UpgradeTransport = upgradeTransport
proxy.Location = uri

return &httputil.ReverseProxy{
Director: makeDirector(cluster.ObjectMeta.Name, userName, tenantID, uri, config.BearerToken),
Transport: transport,
}, nil
return redirect(uri, cluster.ObjectMeta.Name, config.BearerToken, proxy), nil
}

// New creates a new helm proxy options object
func (r *ProxyREST) New() runtime.Object {
return &platform.ProxyOptions{}
}

func makeDirector(clusterName, userName, tenantID string, uri *url.URL, token string) func(req *http.Request) {
return func(req *http.Request) {
req.Header.Set(filter.ClusterNameHeaderKey, clusterName)
if token != "" {
req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", token))
}
req.URL = uri
}
}

//proxyPath have been decoded somewhere before passing to makeURL
func makeURL(host, proxyPath string) (*url.URL, error) {
u, err := url.Parse(host) //will returen error if a host not contains a schema
Expand All @@ -143,3 +141,49 @@ func makeURL(host, proxyPath string) (*url.URL, error) {
}
return u, nil
}

type responders struct{}

func (r *responders) Error(w http.ResponseWriter, req *http.Request, err error) {
klog.Errorf("Error while proxying request: %v", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
}

func makeUpgradeTransport(config *clientrest.Config, keepalive time.Duration) (k8sproxy.UpgradeRequestRoundTripper, error) {
transportConfig, err := config.TransportConfig()
if err != nil {
return nil, err
}
tlsConfig, err := transport.TLSConfigFor(transportConfig)
if err != nil {
return nil, err
}
rt := utilnet.SetOldTransportDefaults(&http.Transport{
TLSClientConfig: tlsConfig,
DialContext: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: keepalive,
}).DialContext,
})

upgrader, err := transport.HTTPWrappersForConfig(transportConfig, k8sproxy.MirrorRequest)
if err != nil {
return nil, err
}
return k8sproxy.NewUpgradeRequestRoundTripper(rt, upgrader), nil
}

func redirect(uri *url.URL, clusterName, token string, h http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
req.Header.Set(filter.ClusterNameHeaderKey, clusterName)
if token != "" {
req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", token))
}
reqClone := utilnet.CloneRequest(req)
reqClone.URL.Host = uri.Host
reqClone.URL.Path = uri.Path
reqClone.URL.RawPath = uri.RawPath
reqClone.URL.RawQuery = uri.RawQuery
h.ServeHTTP(w, reqClone)
})
}

0 comments on commit 1392241

Please sign in to comment.