Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion lib/kube/proxy/moderated_sessions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
"github.com/gravitational/teleport/api/types"
apievents "github.com/gravitational/teleport/api/types/events"
"github.com/gravitational/teleport/entitlements"
"github.com/gravitational/teleport/lib/auth/authclient"
"github.com/gravitational/teleport/lib/events"
testingkubemock "github.com/gravitational/teleport/lib/kube/proxy/testing/kube_server"
"github.com/gravitational/teleport/lib/modules"
Expand Down Expand Up @@ -527,12 +528,20 @@ func TestInteractiveSessionsNoAuth(t *testing.T) {
require.NoError(t, err)
t.Cleanup(func() { kubeMock.Close() })

authClient := &fakeClient{
// ClientI is left nil intentionally because we to set it via AuthClientWrapper.
closeC: make(chan struct{}),
}
// creates a Kubernetes service with a configured cluster pointing to mock api server
testCtx := SetupTestContext(
context.Background(),
t,
TestConfig{
Clusters: []KubeClusterConfig{{Name: kubeCluster, APIEndpoint: kubeMock.URL}},
WrapAuthClient: func(client authclient.ClientI) authclient.ClientI {
authClient.ClientI = client
return authClient
},
},
)
// close tests
Expand Down Expand Up @@ -620,7 +629,7 @@ func TestInteractiveSessionsNoAuth(t *testing.T) {
// Mark the lock as stale so that the session with strict locking is denied.
close(testCtx.lockWatcher.StaleC)
// force the auth client to return an error when trying to create a session.
close(testCtx.closeSessionTrackers)
close(authClient.closeC)

require.Eventually(t, func() bool {
return testCtx.lockWatcher.IsStale()
Expand Down
56 changes: 30 additions & 26 deletions lib/kube/proxy/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,23 +72,22 @@ import (
)

type TestContext struct {
HostID string
ClusterName string
TLSServer *authtest.TLSServer
AuthServer *auth.Server
AuthClient *authclient.Client
Authz authz.Authorizer
KubeServer *TLSServer
KubeProxy *TLSServer
Emitter *eventstest.ChannelEmitter
Context context.Context
kubeServerListener net.Listener
kubeProxyListener net.Listener
cancel context.CancelFunc
heartbeatCtx context.Context
heartbeatCancel context.CancelFunc
lockWatcher *services.LockWatcher
closeSessionTrackers chan struct{}
HostID string
ClusterName string
TLSServer *authtest.TLSServer
AuthServer *auth.Server
AuthClient *authclient.Client
Authz authz.Authorizer
KubeServer *TLSServer
KubeProxy *TLSServer
Emitter *eventstest.ChannelEmitter
Context context.Context
kubeServerListener net.Listener
kubeProxyListener net.Listener
cancel context.CancelFunc
heartbeatCtx context.Context
heartbeatCancel context.CancelFunc
lockWatcher *services.LockWatcher
}

// KubeClusterConfig defines the cluster to be created
Expand All @@ -107,20 +106,20 @@ type TestConfig struct {
OnEvent func(apievents.AuditEvent)
ClusterFeatures func() proto.Features
CreateAuditStreamErr error
WrapAuthClient func(authclient.ClientI) authclient.ClientI
}

// SetupTestContext creates a kube service with clusters configured.
func SetupTestContext(ctx context.Context, t *testing.T, cfg TestConfig) *TestContext {
ctx, cancel := context.WithCancel(ctx)
heartbeatCtx, heartbeatCancel := context.WithCancel(ctx)
testCtx := &TestContext{
ClusterName: "root.example.com",
HostID: uuid.New().String(),
Context: ctx,
cancel: cancel,
heartbeatCtx: heartbeatCtx,
heartbeatCancel: heartbeatCancel,
closeSessionTrackers: make(chan struct{}),
ClusterName: "root.example.com",
HostID: uuid.New().String(),
Context: ctx,
cancel: cancel,
heartbeatCtx: heartbeatCtx,
heartbeatCancel: heartbeatCancel,
}
t.Cleanup(func() { testCtx.Close() })

Expand Down Expand Up @@ -255,6 +254,11 @@ func SetupTestContext(ctx context.Context, t *testing.T, cfg TestConfig) *TestCo
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, healthCheckManager.Close()) })

var authClient authclient.ClientI = client
if cfg.WrapAuthClient != nil {
authClient = cfg.WrapAuthClient(client)
}

// Create kubernetes service server.
testCtx.KubeServer, err = NewTLSServer(TLSServerConfig{
ForwarderConfig: ForwarderConfig{
Expand All @@ -268,7 +272,7 @@ func SetupTestContext(ctx context.Context, t *testing.T, cfg TestConfig) *TestCo
// directly to AuthClient solves the issue.
// We wrap the AuthClient with an events.TeeStreamer to send non-disk
// events like session.end to testCtx.emitter as well.
AuthClient: &fakeClient{ClientI: client, closeC: testCtx.closeSessionTrackers},
AuthClient: authClient,
// StreamEmitter is required although not used because we are using
// "node-sync" as session recording mode.
Emitter: testCtx.Emitter,
Expand Down Expand Up @@ -351,7 +355,7 @@ func SetupTestContext(ctx context.Context, t *testing.T, cfg TestConfig) *TestCo
// directly to AuthClient solves the issue.
// We wrap the AuthClient with an events.TeeStreamer to send non-disk
// events like session.end to testCtx.emitter as well.
AuthClient: &fakeClient{ClientI: client, closeC: testCtx.closeSessionTrackers},
AuthClient: authClient,
// StreamEmitter is required although not used because we are using
// "node-sync" as session recording mode.
Emitter: testCtx.Emitter,
Expand Down
11 changes: 7 additions & 4 deletions lib/kube/proxy/watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,8 @@ type mockAuthClient struct {
deleteCh chan string
}

func newMockAuthClient(client authclient.ClientI) *mockAuthClient {
func newMockAuthClient() *mockAuthClient {
return &mockAuthClient{
ClientI: client,
deleteCh: make(chan string, 1),
}
}
Expand All @@ -70,6 +69,8 @@ func TestWatcher(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
reconcileCh := make(chan types.KubeClusters)

authClient := newMockAuthClient()
// Setup kubernetes server that proxies one static kube cluster and
// watches for kube_clusters with label group=a.
testCtx := SetupTestContext(ctx, t, TestConfig{
Expand All @@ -86,9 +87,11 @@ func TestWatcher(t *testing.T) {
return
}
},
WrapAuthClient: func(client authclient.ClientI) authclient.ClientI {
authClient.ClientI = client
return authClient
},
})
authClient := newMockAuthClient(testCtx.KubeServer.AuthClient)
testCtx.KubeServer.AuthClient = authClient

require.Len(t, testCtx.KubeServer.fwd.kubeClusters(), 1)
kube0 := testCtx.KubeServer.fwd.kubeClusters()[0]
Expand Down
Loading