Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions integration/kube_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -758,7 +758,7 @@ func testKubeTrustedClustersClientCert(t *testing.T, suite *KubeSuite) {
loop:
for {
select {
case event := <-main.UploadEventsC:
case event := <-aux.UploadEventsC:
sessionID = event.SessionID
break loop
case <-timeoutC:
Expand All @@ -767,7 +767,7 @@ loop:
}

// read back the entire session and verify that it matches the stated output
capturedStream, err := main.Process.GetAuthServer().GetSessionChunk(apidefaults.Namespace, session.ID(sessionID), 0, events.MaxChunkBytes)
capturedStream, err := aux.Process.GetAuthServer().GetSessionChunk(apidefaults.Namespace, session.ID(sessionID), 0, events.MaxChunkBytes)
require.NoError(t, err)

require.Equal(t, sessionStream, string(capturedStream))
Expand Down Expand Up @@ -1583,7 +1583,6 @@ func waitForContainer(ctx context.Context, podClient corev1client.PodInterface,
}

s := getContainerStatusByName(p, containerName)
fmt.Println("test", s)
if s == nil {
return false, nil
}
Expand Down
2 changes: 1 addition & 1 deletion lib/kube/proxy/ephemeral_containers.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func (f *Forwarder) ephemeralContainers(authCtx *authContext, w http.ResponseWri
f.log.Errorf("Failed to set up forwarding headers: %v.", err)
return nil, trace.Wrap(err)
}
if !f.isLocalKubeCluster(sess.teleportCluster.isRemote, sess.kubeClusterName) {
if !sess.isLocalKubernetesCluster {
sess.forwarder.ServeHTTP(w, req)
return nil, nil
}
Expand Down
43 changes: 23 additions & 20 deletions lib/kube/proxy/forwarder.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,10 @@ type authContext struct {
kubeServers []types.KubeServer
// apiResource holds the information about the requested API resource.
apiResource apiResource
// isLocalKubernetesCluster is true if the target cluster is served by this teleport service.
// It is false if the target cluster is served by another teleport service or a different
// Teleport cluster.
isLocalKubernetesCluster bool
}

func (c authContext) String() string {
Expand Down Expand Up @@ -775,7 +779,8 @@ func (f *Forwarder) setupContext(
return nil, trace.NotFound("Kubernetes cluster %q not found", kubeCluster)
}
}
if f.isLocalKubeCluster(isRemoteCluster, kubeCluster) {
isLocalKubernetesCluster := f.isLocalKubeCluster(isRemoteCluster, kubeCluster)
if isLocalKubernetesCluster {
kubeResource, apiResource, err = f.parseResourceFromRequest(req, kubeCluster)
if err != nil {
return nil, trace.Wrap(err)
Expand Down Expand Up @@ -809,10 +814,11 @@ func (f *Forwarder) setupContext(
remoteAddr: utils.NetAddr{AddrNetwork: "tcp", Addr: req.RemoteAddr},
isRemote: isRemoteCluster,
},
kubeServers: kubeServers,
requestVerb: apiResource.getVerb(req),
apiResource: apiResource,
kubeResource: kubeResource,
kubeServers: kubeServers,
requestVerb: apiResource.getVerb(req),
apiResource: apiResource,
kubeResource: kubeResource,
isLocalKubernetesCluster: isLocalKubernetesCluster,
}, nil
}

Expand Down Expand Up @@ -865,9 +871,11 @@ func (f *Forwarder) emitAuditEvent(req *http.Request, sess *clusterSession, stat
)
defer span.End()

if sess.noAuditEvents {
// If the session is not local, don't emit the event.
if !sess.isLocalKubernetesCluster {
return
}

r := sess.apiResource
if r.skipEvent {
return
Expand Down Expand Up @@ -1161,7 +1169,7 @@ func (f *Forwarder) join(ctx *authContext, w http.ResponseWriter, req *http.Requ
return nil, trace.Wrap(err)
}

if !f.isLocalKubeCluster(ctx.teleportCluster.isRemote, ctx.kubeClusterName) {
if !sess.isLocalKubernetesCluster {
return f.remoteJoin(ctx, w, req, p, sess)
}

Expand Down Expand Up @@ -1658,7 +1666,7 @@ func (f *Forwarder) exec(authCtx *authContext, w http.ResponseWriter, req *http.
}
// proxy.Close closes the underlying connection and releases the resources.
defer proxy.Close()
if sess.noAuditEvents {
if !sess.isLocalKubernetesCluster {
// We're forwarding this to another kubernetes_service instance, let it handle multiplexing.
return f.remoteExec(authCtx, w, req, p, sess, request, proxy)
}
Expand Down Expand Up @@ -1777,7 +1785,7 @@ func (f *Forwarder) portForward(authCtx *authContext, w http.ResponseWriter, req
}

onPortForward := func(addr string, success bool) {
if sess.noAuditEvents {
if !sess.isLocalKubernetesCluster {
return
}
portForward := &apievents.PortForward{
Expand Down Expand Up @@ -2048,7 +2056,7 @@ func (f *Forwarder) catchAll(authCtx *authContext, w http.ResponseWriter, req *h
return nil, trace.Wrap(err)
}

isLocalKubeCluster := f.isLocalKubeCluster(sess.teleportCluster.isRemote, sess.kubeClusterName)
isLocalKubeCluster := sess.isLocalKubernetesCluster
isListRequest := authCtx.requestVerb == types.KubeVerbList
// Watch requests can be send to a single resource or to a collection of resources.
// isWatchingCollectionRequest is true when the request is a watch request and
Expand Down Expand Up @@ -2145,10 +2153,8 @@ type clusterSession struct {
// nil otherwise.
kubeAPICreds kubeCreds
forwarder *reverseproxy.Forwarder
// noAuditEvents is true if this teleport service should leave audit event
// logging to another service.
noAuditEvents bool
targetAddr string
// targetAddr is the address of the target cluster.
targetAddr string
// kubeAddress is the address of this session's active connection (if there is one)
kubeAddress string
// upgradeToHTTP2 indicates whether the transport should be configured to use HTTP2.
Expand Down Expand Up @@ -2357,11 +2363,8 @@ func (f *Forwarder) newClusterSessionLocal(ctx context.Context, authCtx authCont
func (f *Forwarder) newClusterSessionDirect(ctx context.Context, authCtx authContext) (*clusterSession, error) {
connCtx, cancel := context.WithCancelCause(ctx)
return &clusterSession{
parent: f,
authContext: authCtx,
// This session talks to a kubernetes_service, which should handle
// audit logging. Avoid duplicate logging.
noAuditEvents: true,
parent: f,
authContext: authCtx,
requestContext: ctx,
connCtx: connCtx,
connMonitorCancel: cancel,
Expand All @@ -2385,7 +2388,7 @@ func (f *Forwarder) makeSessionForwarder(sess *clusterSession) (*reverseproxy.Fo
reverseproxy.WithLogger(f.log),
reverseproxy.WithErrorHandler(f.formatForwardResponseError),
}
if f.isLocalKubeCluster(sess.teleportCluster.isRemote, sess.kubeClusterName) {
if sess.isLocalKubernetesCluster {
// If the target cluster is local, i.e. the cluster that is served by this
// teleport service, then we set up the forwarder to allow re-writing
// the response to the client to include user friendly error messages.
Expand Down
2 changes: 1 addition & 1 deletion lib/kube/proxy/resource_deletecollection.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func (f *Forwarder) deleteResourcesCollection(sess *clusterSession, w http.Respo
defer span.End()
req = req.WithContext(ctx)
var (
isLocalKubeCluster = f.isLocalKubeCluster(sess.teleportCluster.isRemote, sess.kubeClusterName)
isLocalKubeCluster = sess.isLocalKubernetesCluster
kubeObjType string
namespace string
)
Expand Down
2 changes: 1 addition & 1 deletion lib/kube/proxy/resource_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (f *Forwarder) listResources(sess *clusterSession, w http.ResponseWriter, r

req = req.WithContext(ctx)

isLocalKubeCluster := f.isLocalKubeCluster(sess.teleportCluster.isRemote, sess.kubeClusterName)
isLocalKubeCluster := sess.isLocalKubernetesCluster
supportsType := false
if isLocalKubeCluster {
_, supportsType = sess.rbacSupportedResources.getTeleportResourceKindFromAPIResource(sess.apiResource)
Expand Down
2 changes: 1 addition & 1 deletion lib/kube/proxy/self_subject_reviews.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func (f *Forwarder) selfSubjectAccessReviews(authCtx *authContext, w http.Respon

// only allow self subject access reviews for the service that proxies the
// request to the kubernetes API server.
if f.isLocalKubeCluster(sess.teleportCluster.isRemote, sess.kubeClusterName) {
if sess.isLocalKubernetesCluster {
if err := f.validateSelfSubjectAccessReview(sess, w, req); trace.IsAccessDenied(err) {
return nil, nil
} else if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion lib/kube/proxy/sess.go
Original file line number Diff line number Diff line change
Expand Up @@ -729,7 +729,7 @@ func (s *session) lockedSetupLaunch(request *remoteCommandRequest, eventPodMeta
s.started = true
sessionStart := s.forwarder.cfg.Clock.Now().UTC()

if !s.sess.noAuditEvents {
if s.sess.isLocalKubernetesCluster {
s.terminalSizeQueue.callback = func(termSize terminalResizeMessage) {
s.mu.Lock()
defer s.mu.Unlock()
Expand Down