Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 35 additions & 3 deletions integration/port_forwarding_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@ import (
"net/http"
"net/http/httptest"
"net/url"
"os/user"
"strconv"
"testing"
"time"

"github.com/google/uuid"
"github.com/gravitational/trace"
"github.com/stretchr/testify/require"

Expand Down Expand Up @@ -71,19 +73,49 @@ func waitForSessionToBeEstablished(ctx context.Context, namespace string, site a
}

func testPortForwarding(t *testing.T, suite *integrationTestSuite) {
invalidOSLogin := uuid.NewString()[:12]
notFound := false
for i := 0; i < 10; i++ {
if _, err := user.Lookup(invalidOSLogin); err == nil {
invalidOSLogin = uuid.NewString()[:12]
continue
}
notFound = true
break
}
require.True(t, notFound, "unable to locate invalid os user")

// Providing our own logins to Teleport so we can verify that a user
// that exists within Teleport but does not exist on the local node
// cannot port forward.
logins := []string{
invalidOSLogin,
suite.Me.Username,
}

testCases := []struct {
desc string
portForwardingAllowed bool
expectSuccess bool
login string
}{
{
desc: "Enabled",
portForwardingAllowed: true,
expectSuccess: true,
}, {
login: suite.Me.Username,
},
{
desc: "Disabled",
portForwardingAllowed: false,
expectSuccess: false,
login: suite.Me.Username,
},
{
desc: "Enabled with invalid user",
portForwardingAllowed: true,
expectSuccess: false,
login: invalidOSLogin,
},
}

Expand All @@ -106,7 +138,7 @@ func testPortForwarding(t *testing.T, suite *integrationTestSuite) {
cfg.SSH.Enabled = true
cfg.SSH.AllowTCPForwarding = tt.portForwardingAllowed

teleport := suite.NewTeleportWithConfig(t, nil, nil, cfg)
teleport := suite.NewTeleportWithConfig(t, logins, nil, cfg)
defer teleport.StopAll()

site := teleport.GetSiteAPI(helpers.Site)
Expand All @@ -127,7 +159,7 @@ func testPortForwarding(t *testing.T, suite *integrationTestSuite) {

nodeSSHPort := helpers.Port(t, teleport.SSH)
cl, err := teleport.NewClient(helpers.ClientConfig{
Login: suite.Me.Username,
Login: tt.login,
Cluster: helpers.Site,
Host: Host,
Port: nodeSSHPort,
Expand Down
2 changes: 1 addition & 1 deletion lib/authz/permissions.go
Original file line number Diff line number Diff line change
Expand Up @@ -748,7 +748,7 @@ func definitionForBuiltinRole(clusterName string, recConfig types.SessionRecordi
types.NewRule(types.KindRole, services.RO()),
types.NewRule(types.KindNamespace, services.RO()),
types.NewRule(types.KindLock, services.RO()),
types.NewRule(types.KindKubernetesCluster, services.RW()),
types.NewRule(types.KindKubernetesCluster, services.RO()),
types.NewRule(types.KindSemaphore, services.RW()),
},
},
Expand Down
28 changes: 28 additions & 0 deletions lib/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1622,6 +1622,34 @@ func TestApplicationServers(t *testing.T) {
})
}

// TestKubernetesServers tests that CRUD operations on kube servers are
// replicated from the backend to the cache.
func TestKubernetesServers(t *testing.T) {
t.Parallel()

p := newTestPack(t, ForProxy)
t.Cleanup(p.Close)

testResources(t, p, testFuncs[types.KubeServer]{
newResource: func(name string) (types.KubeServer, error) {
app, err := types.NewKubernetesClusterV3(types.Metadata{Name: name}, types.KubernetesClusterSpecV3{})
require.NoError(t, err)
return types.NewKubernetesServerV3FromCluster(app, "host", uuid.New().String())
},
create: withKeepalive(p.presenceS.UpsertKubernetesServer),
list: func(ctx context.Context) ([]types.KubeServer, error) {
return p.presenceS.GetKubernetesServers(ctx)
},
cacheList: func(ctx context.Context) ([]types.KubeServer, error) {
return p.cache.GetKubernetesServers(ctx)
},
update: withKeepalive(p.presenceS.UpsertKubernetesServer),
deleteAll: func(ctx context.Context) error {
return p.presenceS.DeleteAllKubernetesServers(ctx)
},
})
}

// TestApps tests that CRUD operations on application resources are
// replicated from the backend to the cache.
func TestApps(t *testing.T) {
Expand Down
28 changes: 20 additions & 8 deletions lib/client/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"crypto/x509"
"encoding/json"
"encoding/pem"
"errors"
"fmt"
"io"
"net"
Expand Down Expand Up @@ -1587,16 +1588,27 @@ func (tc *TeleportClient) runShellOrCommandOnSingleNode(ctx context.Context, nod
return trace.Wrap(err)
}

// If no remote command execution was requested, block on the context which
// will unblock upon error or SIGINT.
// If no remote command execution was requested block on which ever comes first:
// 1) the context which will unblock upon error or user terminating the process
// 2) ssh.Client.Wait which will unblock when the connection has shut down
if tc.NoRemoteExec {
log.Debugf("Connected to node, no remote command execution was requested, blocking until context closes.")
<-ctx.Done()

// Only return an error if the context was canceled by something other than SIGINT.
if ctx.Err() != context.Canceled {
return ctx.Err()
connClosed := make(chan error, 1)
go func() {
connClosed <- nodeClient.Client.Wait()
}()
log.Debugf("Connected to node, no remote command execution was requested, blocking indefinitely.")
select {
case <-ctx.Done():
// Only return an error if the context was canceled by something other than SIGINT.
if err := ctx.Err(); !errors.Is(err, context.Canceled) {
return trace.Wrap(err)
}
case err := <-connClosed:
if !errors.Is(err, io.EOF) {
return trace.Wrap(err)
}
}

return nil
}

Expand Down
82 changes: 30 additions & 52 deletions lib/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import (
tracessh "github.com/gravitational/teleport/api/observability/tracing/ssh"
"github.com/gravitational/teleport/api/types"
apievents "github.com/gravitational/teleport/api/types/events"
"github.com/gravitational/teleport/api/utils/retryutils"
"github.com/gravitational/teleport/lib/auth"
"github.com/gravitational/teleport/lib/defaults"
"github.com/gravitational/teleport/lib/events"
Expand Down Expand Up @@ -1840,75 +1841,52 @@ func (c *NodeClient) TransferFiles(ctx context.Context, cfg *sftp.Config) error
}

type netDialer interface {
Dial(string, string) (net.Conn, error)
DialContext(context.Context, string, string) (net.Conn, error)
}

func proxyConnection(ctx context.Context, conn net.Conn, remoteAddr string, dialer netDialer) error {
defer conn.Close()
defer log.Debugf("Finished proxy from %v to %v.", conn.RemoteAddr(), remoteAddr)

var (
remoteConn net.Conn
err error
)

var remoteConn net.Conn
log.Debugf("Attempting to connect proxy from %v to %v.", conn.RemoteAddr(), remoteAddr)
for attempt := 1; attempt <= 5; attempt++ {
remoteConn, err = dialer.Dial("tcp", remoteAddr)
if err != nil {
log.Debugf("Proxy connection attempt %v: %v.", attempt, err)

timer := time.NewTimer(time.Duration(100*attempt) * time.Millisecond)
defer timer.Stop()

// Wait and attempt to connect again, if the context has closed, exit
// right away.
select {
case <-ctx.Done():
return trace.Wrap(ctx.Err())
case <-timer.C:
continue
}
}
// Connection established, break out of the loop.
break
}

retry, err := retryutils.NewLinear(retryutils.LinearConfig{
First: 100 * time.Millisecond,
Step: 100 * time.Millisecond,
Max: time.Second,
Jitter: retryutils.NewHalfJitter(),
})
if err != nil {
return trace.BadParameter("failed to connect to node: %v", remoteAddr)
return trace.Wrap(err)
}
defer remoteConn.Close()

// Start proxying, close the connection if a problem occurs on either leg.
errCh := make(chan error, 2)
go func() {
defer conn.Close()
defer remoteConn.Close()

_, err := io.Copy(conn, remoteConn)
errCh <- err
}()
go func() {
defer conn.Close()
defer remoteConn.Close()

_, err := io.Copy(remoteConn, conn)
errCh <- err
}()
for attempt := 1; attempt <= 5; attempt++ {
conn, err := dialer.DialContext(ctx, "tcp", remoteAddr)
if err == nil {
// Connection established, break out of the loop.
remoteConn = conn
break
}

var errs []error
for i := 0; i < 2; i++ {
log.Debugf("Proxy connection attempt %v: %v.", attempt, err)
// Wait and attempt to connect again, if the context has closed, exit
// right away.
select {
case err := <-errCh:
if err != nil && err != io.EOF && !strings.Contains(err.Error(), "use of closed network connection") {
log.Warnf("Failed to proxy connection: %v.", err)
errs = append(errs, err)
}
case <-ctx.Done():
return trace.Wrap(ctx.Err())
case <-retry.After():
retry.Inc()
continue
}
}
if remoteConn == nil {
return trace.BadParameter("failed to connect to node: %v", remoteAddr)
}
defer remoteConn.Close()

return trace.NewAggregate(errs...)
// Start proxying, close the connection if a problem occurs on either leg.
return trace.Wrap(utils.ProxyConn(ctx, remoteConn, conn))
}

// acceptWithContext calls "Accept" on the listener but will unblock when the
Expand Down
Loading