Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 23 additions & 4 deletions lib/kube/proxy/forwarder.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ import (
kubeexec "k8s.io/client-go/util/exec"

"github.com/gravitational/teleport"
"github.com/gravitational/teleport/api/client/proto"
"github.com/gravitational/teleport/api/constants"
apidefaults "github.com/gravitational/teleport/api/defaults"
"github.com/gravitational/teleport/api/observability/tracing"
Expand Down Expand Up @@ -171,8 +172,14 @@ type ForwarderConfig struct {
// the upstream Teleport proxy or Kubernetes service when forwarding requests
// using the forward identity (i.e. proxy impersonating a user) method.
ConnTLSConfig *tls.Config
// ClusterFeaturesGetter is a function that returns the Teleport cluster licensed features.
// It is used to determine if the cluster is licensed for Kubernetes usage.
ClusterFeatures ClusterFeaturesGetter
}

// ClusterFeaturesGetter is a function that returns the Teleport cluster licensed features.
type ClusterFeaturesGetter func() proto.Features

// CheckAndSetDefaults checks and sets default values
func (f *ForwarderConfig) CheckAndSetDefaults() error {
if f.AuthClient == nil {
Expand All @@ -199,6 +206,9 @@ func (f *ForwarderConfig) CheckAndSetDefaults() error {
if f.HostID == "" {
return trace.BadParameter("missing parameter ServerID")
}
if f.ClusterFeatures == nil {
return trace.BadParameter("missing parameter ClusterFeatures")
}
if f.KubeServiceType != KubeService && f.PROXYSigner == nil {
return trace.BadParameter("missing parameter PROXYSigner")
}
Expand Down Expand Up @@ -274,7 +284,6 @@ func NewForwarder(cfg ForwarderConfig) (*Forwarder, error) {
}

closeCtx, close := context.WithCancel(cfg.Context)

fwd := &Forwarder{
log: cfg.log,
cfg: cfg,
Expand All @@ -290,6 +299,7 @@ func NewForwarder(cfg ForwarderConfig) (*Forwarder, error) {
clusterDetails: make(map[string]*kubeDetails),
cachedTransport: transportClients,
}

router := httprouter.New()

router.UseRawPath = true
Expand Down Expand Up @@ -339,6 +349,7 @@ func NewForwarder(cfg ForwarderConfig) (*Forwarder, error) {
return nil, trace.Wrap(err)
}
}

return fwd, nil
}

Expand Down Expand Up @@ -498,6 +509,11 @@ const accessDeniedMsg = "[00] access denied"

// authenticate function authenticates request
func (f *Forwarder) authenticate(req *http.Request) (*authContext, error) {
// If the cluster is not licensed for Kubernetes, return an error to the client.
if !f.cfg.ClusterFeatures().Kubernetes {
// If the cluster is not licensed for Kubernetes, return an error to the client.
return nil, trace.AccessDenied("Teleport cluster is not licensed for Kubernetes")
}
ctx, span := f.cfg.tracer.Start(
req.Context(),
"kube.Forwarder/authenticate",
Expand Down Expand Up @@ -711,14 +727,15 @@ func (f *Forwarder) writeResponseErrorToBody(rw http.ResponseWriter, respErr err

// formatStatusResponseError formats the error response into a kube Status object.
func (f *Forwarder) formatStatusResponseError(rw http.ResponseWriter, respErr error) {
code := trace.ErrorToCode(respErr)
status := &metav1.Status{
Status: metav1.StatusFailure,
// Don't trace.Unwrap the error, in case it was wrapped with a
// user-friendly message. The underlying root error is likely too
// low-level to be useful.
Message: respErr.Error(),
Code: int32(trace.ErrorToCode(respErr)),
Reason: errorToKubeStatusReason(respErr),
Code: int32(code),
Reason: errorToKubeStatusReason(respErr, code),
}
data, err := runtime.Encode(kubeCodecs.LegacyCodec(), status)
if err != nil {
Expand Down Expand Up @@ -2991,7 +3008,7 @@ func getRequestVerb(method string) string {

// errorToKubeStatusReason returns an appropriate StatusReason based on the
// provided error type.
func errorToKubeStatusReason(err error) metav1.StatusReason {
func errorToKubeStatusReason(err error, code int) metav1.StatusReason {
switch {
case trace.IsAggregate(err):
return metav1.StatusReasonTimeout
Expand All @@ -3011,6 +3028,8 @@ func errorToKubeStatusReason(err error) metav1.StatusReason {
return metav1.StatusReasonTooManyRequests
case trace.IsConnectionProblem(err):
return metav1.StatusReasonTimeout
case code == http.StatusInternalServerError:
return metav1.StatusReasonInternalError
default:
return metav1.StatusReasonUnknown
}
Expand Down
87 changes: 87 additions & 0 deletions lib/kube/proxy/forwarder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,16 +43,20 @@ import (
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel"
kubeerrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/transport"

"github.com/gravitational/teleport"
"github.com/gravitational/teleport/api/client/proto"
"github.com/gravitational/teleport/api/types"
"github.com/gravitational/teleport/lib/auth"
"github.com/gravitational/teleport/lib/auth/testauthority"
"github.com/gravitational/teleport/lib/authz"
"github.com/gravitational/teleport/lib/backend/memory"
"github.com/gravitational/teleport/lib/defaults"
"github.com/gravitational/teleport/lib/fixtures"
testingkubemock "github.com/gravitational/teleport/lib/kube/proxy/testing/kube_server"
"github.com/gravitational/teleport/lib/reversetunnel"
"github.com/gravitational/teleport/lib/services"
"github.com/gravitational/teleport/lib/services/local"
Expand Down Expand Up @@ -124,6 +128,12 @@ func TestRequestCertificate(t *testing.T) {
require.Empty(t, cmp.Diff(*idFromCSR, ctx.UnmappedIdentity.GetIdentity()))
}

func fakeClusterFeatures() proto.Features {
return proto.Features{
Kubernetes: true,
}
}

func TestAuthenticate(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -163,6 +173,7 @@ func TestAuthenticate(t *testing.T) {
CachingAuthClient: ap,
TracerProvider: otel.GetTracerProvider(),
tracer: otel.Tracer(teleport.ComponentKube),
ClusterFeatures: fakeClusterFeatures,
},
getKubernetesServersForKubeCluster: func(ctx context.Context, name string) ([]types.KubeServer, error) {
servers, err := ap.GetKubernetesServers(ctx)
Expand Down Expand Up @@ -1266,6 +1277,7 @@ func newMockForwader(ctx context.Context, t *testing.T) *Forwarder {
Context: ctx,
TracerProvider: otel.GetTracerProvider(),
tracer: otel.Tracer(teleport.ComponentKube),
ClusterFeatures: fakeClusterFeatures,
},
clientCredentials: clientCreds,
activeRequests: make(map[string]context.Context),
Expand Down Expand Up @@ -1821,3 +1833,78 @@ func Test_copyImpersonationHeaders(t *testing.T) {
})
}
}

func TestKubernetesLicenseEnforcement(t *testing.T) {
t.Parallel()
// kubeMock is a Kubernetes API mock for the session tests.
kubeMock, err := testingkubemock.NewKubeAPIMock()
require.NoError(t, err)
t.Cleanup(func() { kubeMock.Close() })

tests := []struct {
name string
features proto.Features
assertErrFunc require.ErrorAssertionFunc
}{
{
name: "kubernetes agent is licensed",
features: proto.Features{
Kubernetes: true,
},
assertErrFunc: require.NoError,
},
{
name: "kubernetes isn't licensed",
features: proto.Features{
Kubernetes: false,
},
assertErrFunc: func(tt require.TestingT, err error, i ...interface{}) {
require.Error(tt, err)
var kubeErr *kubeerrors.StatusError
require.ErrorAs(tt, err, &kubeErr)
require.Equal(tt, kubeErr.ErrStatus.Code, int32(http.StatusForbidden))
require.Equal(tt, kubeErr.ErrStatus.Reason, metav1.StatusReasonForbidden)
require.Equal(tt, kubeErr.ErrStatus.Message, "Teleport cluster is not licensed for Kubernetes")
},
},
}
for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
// creates a Kubernetes service with a configured cluster pointing to mock api server
testCtx := SetupTestContext(
context.Background(),
t,
TestConfig{
Clusters: []KubeClusterConfig{{Name: kubeCluster, APIEndpoint: kubeMock.URL}},
ClusterFeatures: func() proto.Features {
return tt.features
},
},
)
// close tests
t.Cleanup(func() { require.NoError(t, testCtx.Close()) })

_, _ = testCtx.CreateUserAndRole(
testCtx.Context,
t,
username,
RoleSpec{
Name: roleName,
KubeUsers: roleKubeUsers,
KubeGroups: roleKubeGroups,
})

// generate a kube client with user certs for auth
client, _ := testCtx.GenTestKubeClientTLSCert(
t,
username,
kubeCluster,
)

_, err = client.CoreV1().Pods(metav1.NamespaceDefault).List(context.Background(), metav1.ListOptions{})
tt.assertErrFunc(t, err)
})
}
}
10 changes: 9 additions & 1 deletion lib/kube/proxy/utils_testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
clientcmdapi "k8s.io/client-go/tools/clientcmd/api"

"github.com/gravitational/teleport"
"github.com/gravitational/teleport/api/client/proto"
apidefaults "github.com/gravitational/teleport/api/defaults"
"github.com/gravitational/teleport/api/types"
apievents "github.com/gravitational/teleport/api/types/events"
Expand Down Expand Up @@ -82,6 +83,7 @@ type TestConfig struct {
ResourceMatchers []services.ResourceMatcher
OnReconcile func(types.KubeClusters)
OnEvent func(apievents.AuditEvent)
ClusterFeatures func() proto.Features
}

// SetupTestContext creates a kube service with clusters configured.
Expand Down Expand Up @@ -173,6 +175,11 @@ func SetupTestContext(ctx context.Context, t *testing.T, cfg TestConfig) *TestCo
// heartbeatsWaitChannel waits for clusters heartbeats to start.
heartbeatsWaitChannel := make(chan struct{}, len(cfg.Clusters)+1)
client := newAuthClientWithStreamer(testCtx)

features := func() proto.Features { return proto.Features{Kubernetes: true} }
if cfg.ClusterFeatures != nil {
features = cfg.ClusterFeatures
}
// Create kubernetes service server.
testCtx.KubeServer, err = NewTLSServer(TLSServerConfig{
ForwarderConfig: ForwarderConfig{
Expand Down Expand Up @@ -202,7 +209,8 @@ func SetupTestContext(ctx context.Context, t *testing.T, cfg TestConfig) *TestCo
CheckImpersonationPermissions: func(ctx context.Context, clusterName string, sarClient authztypes.SelfSubjectAccessReviewInterface) error {
return nil
},
Clock: clockwork.NewRealClock(),
Clock: clockwork.NewRealClock(),
ClusterFeatures: features,
},
DynamicLabels: nil,
TLS: tlsConfig,
Expand Down
1 change: 1 addition & 0 deletions lib/service/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,7 @@ func (process *TeleportProcess) initKubernetesService(log *logrus.Entry, conn *C
LockWatcher: lockWatcher,
CheckImpersonationPermissions: cfg.Kube.CheckImpersonationPermissions,
PublicAddr: publicAddr,
ClusterFeatures: process.getClusterFeatures,
},
TLS: tlsConfig,
AccessPoint: accessPoint,
Expand Down
3 changes: 2 additions & 1 deletion lib/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -4070,7 +4070,8 @@ func (process *TeleportProcess) initProxyEndpoint(conn *Connector) error {
// using Impersonation headers. The upstream service will validate if
// the provided connection certificate is from a proxy server and
// will impersonate the identity of the user that is making the request.
ConnTLSConfig: tlsConfig.Clone(),
ConnTLSConfig: tlsConfig.Clone(),
ClusterFeatures: process.getClusterFeatures,
},
TLS: tlsConfig.Clone(),
LimiterConfig: cfg.Proxy.Limiter,
Expand Down
5 changes: 5 additions & 0 deletions lib/web/apiserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7943,6 +7943,11 @@ func startKubeWithoutCleanup(ctx context.Context, t *testing.T, cfg startKubeOpt
},
ConnTLSConfig: tlsConfig,
Clock: clockwork.NewRealClock(),
ClusterFeatures: func() clientproto.Features {
return clientproto.Features{
Kubernetes: true,
}
},
},
TLS: tlsConfig,
AccessPoint: client,
Expand Down