Skip to content

Commit

Permalink
Only acquire semaphore lease if maxconnections is configured (#12462) (
Browse files Browse the repository at this point in the history
…#12468)

Co-authored-by: Roman Tkachenko <[email protected]>
  • Loading branch information
xacrimon and r0mant authored May 5, 2022
1 parent aa25efb commit 883ccd5
Show file tree
Hide file tree
Showing 6 changed files with 114 additions and 101 deletions.
80 changes: 0 additions & 80 deletions integration/kube_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,6 @@ func TestKube(t *testing.T) {
t.Run("TrustedClustersSNI", suite.bind(testKubeTrustedClustersSNI))
t.Run("Disconnect", suite.bind(testKubeDisconnect))
t.Run("Join", suite.bind(testKubeJoin))
t.Run("ConnectionLimit", suite.bind(testKubeConnectionLimit))
}

// TestKubeExec tests kubernetes Exec command set
Expand Down Expand Up @@ -1612,82 +1611,3 @@ func testKubeJoin(t *testing.T, suite *KubeSuite) {
require.Contains(t, participantOutput, []byte("echo hi"))
require.Contains(t, out.String(), []byte("echo hi2"))
}

// testKubeConnectionLimit checks that the `max_kubernetes_connections` role option is enforced.
func testKubeConnectionLimit(t *testing.T, suite *KubeSuite) {
teleport := NewInstance(InstanceConfig{
ClusterName: Site,
HostID: HostID,
NodeName: Host,
Priv: suite.priv,
Pub: suite.pub,
log: suite.log,
})

const maxConnections = 10
hostUsername := suite.me.Username
kubeGroups := []string{testImpersonationGroup}
kubeUsers := []string{"[email protected]"}
role, err := types.NewRole("kubemaster", types.RoleSpecV5{
Allow: types.RoleConditions{
Logins: []string{hostUsername},
KubeGroups: kubeGroups,
KubeUsers: kubeUsers,
},
Options: types.RoleOptions{
MaxKubernetesConnections: maxConnections,
},
})
require.NoError(t, err)
teleport.AddUserWithRole(hostUsername, role)

err = teleport.Start()
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, teleport.StopAll()) })

// set up kube configuration using proxy
proxyClient, proxyClientConfig, err := kubeProxyClient(kubeProxyConfig{
t: teleport,
username: hostUsername,
kubeUsers: kubeUsers,
kubeGroups: kubeGroups,
})
require.NoError(t, err)

ctx := context.Background()
// try get request to fetch available pods
pod, err := proxyClient.CoreV1().Pods(testNamespace).Get(ctx, testPod, metav1.GetOptions{})
require.NoError(t, err)

openExec := func() error {
// interactive command, allocate pty
term := NewTerminal(250)
out := &bytes.Buffer{}

return kubeExec(proxyClientConfig, kubeExecArgs{
podName: pod.Name,
podNamespace: pod.Namespace,
container: pod.Spec.Containers[0].Name,
command: []string{"/bin/sh", "-c", "sleep 300"},
stdout: out,
tty: true,
stdin: term,
})
}

// Create and maintain the maximum amount of open connections
for i := 0; i < maxConnections; i++ {
go openExec()
}

// Wait for the connections to open and check for any errors
require.Eventually(t, func() bool {
trackers, err := teleport.Process.GetAuthServer().GetActiveSessionTrackers(ctx)
require.NoError(t, err)
return len(trackers) == maxConnections
}, time.Second*30, time.Second)

// Open one more connection. It should fail due to the limit.
err = openExec()
require.Error(t, err)
}
27 changes: 16 additions & 11 deletions lib/kube/proxy/forwarder.go
Original file line number Diff line number Diff line change
Expand Up @@ -452,7 +452,14 @@ func (f *Forwarder) withAuth(handler handlerWithAuthFunc) httprouter.Handle {
if err := f.authorize(req.Context(), authContext); err != nil {
return nil, trace.Wrap(err)
}
if err := f.acquireConnectionLock(req.Context(), authContext); err != nil {

user := authContext.Identity.GetIdentity().Username
roles, err := getRolesByName(f, authContext.Identity.GetIdentity().Groups)
if err != nil {
return nil, trace.Wrap(err)
}

if err := f.AcquireConnectionLock(req.Context(), user, roles); err != nil {
return nil, trace.Wrap(err)
}
return handler(authContext, w, req, p)
Expand Down Expand Up @@ -896,25 +903,23 @@ func wsProxy(wsSource *websocket.Conn, wsTarget *websocket.Conn) error {
return trace.Wrap(err)
}

// acquireConnectionLock acquires a semaphore used to limit connections to the Kubernetes agent.
// AcquireConnectionLock acquires a semaphore used to limit connections to the Kubernetes agent.
// The semaphore is releasted when the request is returned/connection is closed.
// Returns an error if a semaphore could not be acquired.
func (f *Forwarder) acquireConnectionLock(ctx context.Context, identity *authContext) error {
user := identity.Identity.GetIdentity().Username
roles, err := getRolesByName(f, identity.Identity.GetIdentity().Groups)
if err != nil {
return trace.Wrap(err)
func (f *Forwarder) AcquireConnectionLock(ctx context.Context, user string, roles services.RoleSet) error {
maxConnections := roles.MaxKubernetesConnections()
if maxConnections == 0 {
return nil
}

maxConnections := services.RoleSet(roles).MaxKubernetesConnections()
semLock, err := services.AcquireSemaphoreLock(ctx, services.SemaphoreLockConfig{
_, err := services.AcquireSemaphoreLock(ctx, services.SemaphoreLockConfig{
Service: f.cfg.AuthClient,
Expiry: sessionMaxLifetime,
Params: types.AcquireSemaphoreRequest{
SemaphoreKind: types.SemaphoreKindKubernetesConnection,
SemaphoreName: user,
MaxLeases: maxConnections,
Holder: identity.teleportCluster.name,
Holder: user,
},
})
if err != nil {
Expand All @@ -927,7 +932,7 @@ func (f *Forwarder) acquireConnectionLock(ctx context.Context, identity *authCon

return trace.Wrap(err)
}
go semLock.KeepAlive(ctx)

return nil
}

Expand Down
92 changes: 92 additions & 0 deletions lib/kube/proxy/forwarder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,12 @@ import (
"github.com/gravitational/teleport/api/types"
"github.com/gravitational/teleport/lib/auth"
"github.com/gravitational/teleport/lib/auth/testauthority"
"github.com/gravitational/teleport/lib/backend/memory"
"github.com/gravitational/teleport/lib/defaults"
"github.com/gravitational/teleport/lib/fixtures"
"github.com/gravitational/teleport/lib/reversetunnel"
"github.com/gravitational/teleport/lib/services"
"github.com/gravitational/teleport/lib/services/local"
"github.com/gravitational/teleport/lib/tlsca"
"github.com/gravitational/teleport/lib/utils"
)
Expand Down Expand Up @@ -1052,3 +1054,93 @@ func (m *mockWatcher) Events() <-chan types.Event {
func (m *mockWatcher) Done() <-chan struct{} {
return m.ctx.Done()
}

func newTestForwarder(ctx context.Context, cfg ForwarderConfig) *Forwarder {
return &Forwarder{
log: logrus.New(),
router: *httprouter.New(),
cfg: cfg,
activeRequests: make(map[string]context.Context),
ctx: ctx,
}
}

type mockSemaphoreClient struct {
auth.ClientI
sem types.Semaphores
}

func (m *mockSemaphoreClient) AcquireSemaphore(ctx context.Context, params types.AcquireSemaphoreRequest) (*types.SemaphoreLease, error) {
return m.sem.AcquireSemaphore(ctx, params)
}

func (m *mockSemaphoreClient) CancelSemaphoreLease(ctx context.Context, lease types.SemaphoreLease) error {
return m.sem.CancelSemaphoreLease(ctx, lease)
}

func TestKubernetesConnectionLimit(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

type testCase struct {
name string
connections int
role types.Role
assert require.ErrorAssertionFunc
}

unlimitedRole, err := types.NewRole("unlimited", types.RoleSpecV5{})
require.NoError(t, err)

limitedRole, err := types.NewRole("unlimited", types.RoleSpecV5{
Options: types.RoleOptions{
MaxKubernetesConnections: 5,
},
})
require.NoError(t, err)

testCases := []testCase{
{
name: "unlimited",
connections: 7,
role: unlimitedRole,
assert: require.NoError,
},
{
name: "limited-success",
connections: 5,
role: limitedRole,
assert: require.NoError,
},
{
name: "limited-fail",
connections: 6,
role: limitedRole,
assert: require.Error,
},
}

for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
user, err := types.NewUser("bob")
require.NoError(t, err)
user.SetRoles([]string{testCase.role.GetName()})

backend, err := memory.New(memory.Config{})
require.NoError(t, err)

sem := local.NewPresenceService(backend)
client := &mockSemaphoreClient{sem: sem}
forwarder := newTestForwarder(ctx, ForwarderConfig{
AuthClient: client,
})

for i := 0; i < testCase.connections; i++ {
err = forwarder.AcquireConnectionLock(ctx, user.GetName(), services.NewRoleSet(testCase.role))
if i == testCase.connections-1 {
testCase.assert(t, err)
}
}
})
}
}
5 changes: 3 additions & 2 deletions lib/services/semaphore.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func (l *SemaphoreLock) Renewed() <-chan struct{} {
return l.renewalC
}

func (l *SemaphoreLock) KeepAlive(ctx context.Context) {
func (l *SemaphoreLock) keepAlive(ctx context.Context) {
var nodrop bool
var err error
lease := l.lease0
Expand Down Expand Up @@ -227,7 +227,7 @@ func AcquireSemaphoreWithRetry(ctx context.Context, req AcquireSemaphoreWithRetr
}

// AcquireSemaphoreLock attempts to acquire and hold a semaphore lease. If successfully acquired,
// background keepalive processes are started and an associated lock handle is returned. Cancelling
// background keepalive processes are started and an associated lock handle is returned. Cancelling
// the supplied context releases the semaphore.
func AcquireSemaphoreLock(ctx context.Context, cfg SemaphoreLockConfig) (*SemaphoreLock, error) {
if err := cfg.CheckAndSetDefaults(); err != nil {
Expand Down Expand Up @@ -255,6 +255,7 @@ func AcquireSemaphoreLock(ctx context.Context, cfg SemaphoreLockConfig) (*Semaph
renewalC: make(chan struct{}),
cond: sync.NewCond(&sync.Mutex{}),
}
go lock.keepAlive(ctx)
return lock, nil
}

Expand Down
9 changes: 2 additions & 7 deletions lib/services/suite/suite.go
Original file line number Diff line number Diff line change
Expand Up @@ -1250,7 +1250,6 @@ func (s *ServicesTestSuite) SemaphoreFlakiness(c *check.C) {

lock, err := services.AcquireSemaphoreLock(cancelCtx, cfg)
c.Assert(err, check.IsNil)
go lock.KeepAlive(cancelCtx)

for i := 0; i < renewals; i++ {
select {
Expand Down Expand Up @@ -1294,9 +1293,8 @@ func (s *ServicesTestSuite) SemaphoreContention(c *check.C) {
wg.Add(1)
go func() {
defer wg.Done()
lock, err := services.AcquireSemaphoreLock(cancelCtx, cfg)
_, err := services.AcquireSemaphoreLock(cancelCtx, cfg)
c.Assert(err, check.IsNil)
go lock.KeepAlive(cancelCtx)
}()
}
wg.Wait()
Expand Down Expand Up @@ -1334,9 +1332,8 @@ func (s *ServicesTestSuite) SemaphoreConcurrency(c *check.C) {
for i := int64(0); i < attempts; i++ {
wg.Add(1)
go func() {
lock, err := services.AcquireSemaphoreLock(cancelCtx, cfg)
_, err := services.AcquireSemaphoreLock(cancelCtx, cfg)
if err == nil {
go lock.KeepAlive(cancelCtx)
atomic.AddInt64(&success, 1)
} else {
atomic.AddInt64(&failure, 1)
Expand Down Expand Up @@ -1366,7 +1363,6 @@ func (s *ServicesTestSuite) SemaphoreLock(c *check.C) {
defer cancel()
lock, err := services.AcquireSemaphoreLock(cancelCtx, cfg)
c.Assert(err, check.IsNil)
go lock.KeepAlive(cancelCtx)

// MaxLeases is 1, so second acquire op fails.
_, err = services.AcquireSemaphoreLock(cancelCtx, cfg)
Expand All @@ -1382,7 +1378,6 @@ func (s *ServicesTestSuite) SemaphoreLock(c *check.C) {
cfg.TickRate = time.Millisecond * 50
lock, err = services.AcquireSemaphoreLock(cancelCtx, cfg)
c.Assert(err, check.IsNil)
go lock.KeepAlive(cancelCtx)

timeout := time.After(time.Second)

Expand Down
2 changes: 1 addition & 1 deletion lib/srv/regular/sshserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -1010,7 +1010,7 @@ func (s *Server) HandleNewConn(ctx context.Context, ccx *sshutils.ConnectionCont
}
return ctx, trace.Wrap(err)
}
go semLock.KeepAlive(ctx)

// ensure that losing the lock closes the connection context. Under normal
// conditions, cancellation propagates from the connection context to the
// lock, but if we lose the lock due to some error (e.g. poor connectivity
Expand Down

0 comments on commit 883ccd5

Please sign in to comment.