Skip to content

Commit

Permalink
Serialize requests for certificates.
Browse files Browse the repository at this point in the history
This commit serializes requests for certificates
arrigiving for the same user, concurrent requests
will wait for the first request to finish.

This is done to fix kubectl usage problem that tends
to issue many requests in parallel on first use.
  • Loading branch information
klizhentas committed Jun 28, 2018
1 parent bd29843 commit 9fc6c04
Showing 1 changed file with 68 additions and 2 deletions.
70 changes: 68 additions & 2 deletions lib/kube/proxy/forwarder.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ type ForwarderConfig struct {
// ClusterOverride if set, routes all requests
// to the cluster name, used in tests
ClusterOverride string
// Context passes the optional external context
// passing global close to all forwarder operations
Context context.Context
}

// CheckAndSetDefaults checks and sets default values
Expand Down Expand Up @@ -100,6 +103,9 @@ func (f *ForwarderConfig) CheckAndSetDefaults() error {
if f.Namespace == "" {
f.Namespace = defaults.Namespace
}
if f.Context == nil {
f.Context = context.TODO()
}
return nil
}

Expand All @@ -113,13 +119,17 @@ func NewForwarder(cfg ForwarderConfig) (*Forwarder, error) {
if err != nil {
return nil, trace.Wrap(err)
}
closeCtx, close := context.WithCancel(cfg.Context)
fwd := &Forwarder{
Entry: logrus.WithFields(logrus.Fields{
trace.Component: teleport.Component(teleport.ComponentKube),
}),
Router: *httprouter.New(),
ForwarderConfig: cfg,
clusterSessions: clusterSessions,
activeRequests: make(map[string]context.Context),
ctx: closeCtx,
close: close,
}

fwd.POST("/api/:ver/namespaces/:podNamespace/pods/:podName/exec", fwd.withAuth(fwd.exec))
Expand Down Expand Up @@ -153,6 +163,19 @@ type Forwarder struct {
// if user changes kubernetes groups via RBAC or cache has expired
// TODO(klizhentas): flush certs on teleport CA rotation?
clusterSessions *ttlmap.TTLMap
// activeRequests is a map used to serialize active CSR requests to the auth server
activeRequests map[string]context.Context
// close is a close function
close context.CancelFunc
// ctx is a global context signalling exit
ctx context.Context
}

// Close signals close to all outstanding or background operations
// to complete
func (f *Forwarder) Close() error {
f.close()
return nil
}

// authContext is a context of authenticated user,
Expand Down Expand Up @@ -563,8 +586,7 @@ func (f *Forwarder) getOrCreateClusterSession(ctx authContext) (*clusterSession,
if client != nil {
return client, nil
}
f.Debugf("Requesting new creds for %v.", ctx)
return f.newClusterSession(ctx)
return f.serializedNewClusterSession(ctx)
}

func (f *Forwarder) getClusterSession(ctx authContext) *clusterSession {
Expand All @@ -577,6 +599,28 @@ func (f *Forwarder) getClusterSession(ctx authContext) *clusterSession {
return nil
}

func (f *Forwarder) serializedNewClusterSession(authContext authContext) (*clusterSession, error) {
ctx, cancel := f.getOrCreateRequestContext(authContext.key())
if cancel != nil {
f.Debugf("Requesting new creds for %v.", authContext)
defer cancel()
return f.newClusterSession(authContext)
}
// cancel == nil means that another request is in progress, so simply wait until
// it finishes or fails
f.Debugf("Another request is in progress for %v, waiting until it gets completed.", authContext)
select {
case <-ctx.Done():
sess := f.getClusterSession(authContext)
if sess == nil {
return nil, trace.BadParameter("failed to request certificate, try again")
}
return sess, nil
case <-f.ctx.Done():
return nil, trace.BadParameter("forwarder is closing, aborting the request")
}
}

func (f *Forwarder) newClusterSession(ctx authContext) (*clusterSession, error) {
response, err := f.requestCertificate(ctx)
if err != nil {
Expand Down Expand Up @@ -654,6 +698,28 @@ type bundle struct {
certAuthorities [][]byte
}

// getOrCreateRequestContext creates a new certificate request for a given context,
// if there is no active CSR request in progress, or returns an existing one.
// if the new context has been created, cancel function is returned as a
// second argument. Caller should call this function to signal that CSR has been
// completed or failed.
func (f *Forwarder) getOrCreateRequestContext(key string) (context.Context, context.CancelFunc) {
f.Lock()
defer f.Unlock()
ctx, ok := f.activeRequests[key]
if ok {
return ctx, nil
}
ctx, cancel := context.WithCancel(context.TODO())
f.activeRequests[key] = ctx
return ctx, func() {
cancel()
f.Lock()
defer f.Unlock()
delete(f.activeRequests, key)
}
}

func (f *Forwarder) requestCertificate(ctx authContext) (*bundle, error) {
f.Debugf("Requesting K8s cert for %v.", ctx)
keyPEM, _, err := f.Keygen.GenerateKeyPair("")
Expand Down

0 comments on commit 9fc6c04

Please sign in to comment.