Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions integration/utmp_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"github.com/gravitational/teleport/lib/pam"
restricted "github.com/gravitational/teleport/lib/restrictedsession"
"github.com/gravitational/teleport/lib/services"
"github.com/gravitational/teleport/lib/srv"
"github.com/gravitational/teleport/lib/srv/regular"
"github.com/gravitational/teleport/lib/srv/uacc"
"github.com/gravitational/teleport/lib/sshutils"
Expand Down Expand Up @@ -257,8 +258,19 @@ func newSrvCtx(ctx context.Context, t *testing.T) *SrvCtx {
require.NoError(t, err)
t.Cleanup(lockWatcher.Close)

nodeSessionController, err := srv.NewSessionController(srv.SessionControllerConfig{
Semaphores: s.nodeClient,
AccessPoint: s.nodeClient,
LockEnforcer: lockWatcher,
Emitter: s.nodeClient,
Component: teleport.ComponentNode,
ServerID: s.nodeID,
})
require.NoError(t, err)

nodeDir := t.TempDir()
srv, err := regular.New(
ctx,
utils.NetAddr{AddrNetwork: "tcp", Addr: "127.0.0.1:0"},
s.server.ClusterName(),
[]ssh.Signer{s.signer},
Expand Down Expand Up @@ -286,6 +298,7 @@ func newSrvCtx(ctx context.Context, t *testing.T) *SrvCtx {
regular.SetClock(s.clock),
regular.SetUtmpPath(utmpPath, utmpPath),
regular.SetLockWatcher(lockWatcher),
regular.SetSessionController(nodeSessionController),
)
require.NoError(t, err)
s.srv = srv
Expand Down
116 changes: 79 additions & 37 deletions lib/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"crypto/x509"
"encoding/json"
"errors"
"fmt"
"io"
"net"
"os"
Expand Down Expand Up @@ -84,7 +85,6 @@ type NodeClient struct {
Namespace string
Tracer oteltrace.Tracer
Client *tracessh.Client
Proxy *ProxyClient
TC *TeleportClient
OnMFA func()
FIPSEnabled bool
Expand Down Expand Up @@ -1595,6 +1595,7 @@ func (proxy *ProxyClient) ConnectToNode(ctx context.Context, nodeAddress NodeDet
return nil, trace.ConnectionProblem(err, "failed connecting to node %v. %s",
nodeName(nodeAddress.Addr), serverErrorMsg)
}

pipeNetConn := utils.NewPipeNetConn(
proxyReader,
proxyWriter,
Expand All @@ -1608,35 +1609,9 @@ func (proxy *ProxyClient) ConnectToNode(ctx context.Context, nodeAddress NodeDet
Auth: authMethods,
HostKeyCallback: proxy.hostKeyCallback,
}
conn, chans, reqs, err := newClientConn(ctx, pipeNetConn, nodeAddress.ProxyFormat(), sshConfig)
if err != nil {
if utils.IsHandshakeFailedError(err) {
proxySession.Close()
return nil, trace.AccessDenied(`access denied to %v connecting to %v`, user, nodeAddress)
}
return nil, trace.Wrap(err)
}

// We pass an empty channel which we close right away to ssh.NewClient
// because the client need to handle requests itself.
emptyCh := make(chan *ssh.Request)
close(emptyCh)

nc := &NodeClient{
Client: tracessh.NewClient(conn, chans, emptyCh),
Proxy: proxy,
Namespace: apidefaults.Namespace,
TC: proxy.teleportClient,
Tracer: proxy.Tracer,
FIPSEnabled: details.FIPSEnabled,
}

// Start a goroutine that will run for the duration of the client to process
// global requests from the client. Teleport clients will use this to update
// terminal sizes when the remote PTY size has changed.
go nc.handleGlobalRequests(ctx, reqs)

return nc, nil
nc, err := NewNodeClient(ctx, sshConfig, pipeNetConn, nodeAddress.ProxyFormat(), proxy.teleportClient, details.FIPSEnabled)
return nc, trace.Wrap(err)
}

// PortForwardToNode connects to the ssh server via Proxy
Expand Down Expand Up @@ -1680,11 +1655,28 @@ func (proxy *ProxyClient) PortForwardToNode(ctx context.Context, nodeAddress Nod
Auth: authMethods,
HostKeyCallback: proxy.hostKeyCallback,
}
conn, chans, reqs, err := newClientConn(ctx, proxyConn, nodeAddress.Addr, sshConfig)

nc, err := NewNodeClient(ctx, sshConfig, proxyConn, nodeAddress.Addr, proxy.teleportClient, details.FIPSEnabled)
return nc, trace.Wrap(err)
}

// NewNodeClient constructs a NodeClient that is connected to the node at nodeAddress
func NewNodeClient(ctx context.Context, sshConfig *ssh.ClientConfig, conn net.Conn, nodeAddress string, tc *TeleportClient, fipsEnabled bool) (*NodeClient, error) {
ctx, span := tc.Tracer.Start(
ctx,
"NewNodeClient",
oteltrace.WithSpanKind(oteltrace.SpanKindClient),
oteltrace.WithAttributes(
attribute.String("node", nodeAddress),
),
)
defer span.End()

sshconn, chans, reqs, err := newClientConn(ctx, conn, nodeAddress, sshConfig)
if err != nil {
if utils.IsHandshakeFailedError(err) {
proxyConn.Close()
return nil, trace.AccessDenied(`access denied to %v connecting to %v`, user, nodeAddress)
conn.Close()
return nil, trace.AccessDenied(`access denied to %v connecting to %v`, sshConfig.User, nodeAddress)
}
return nil, trace.Wrap(err)
}
Expand All @@ -1695,11 +1687,11 @@ func (proxy *ProxyClient) PortForwardToNode(ctx context.Context, nodeAddress Nod
close(emptyCh)

nc := &NodeClient{
Client: tracessh.NewClient(conn, chans, emptyCh),
Proxy: proxy,
Namespace: apidefaults.Namespace,
TC: proxy.teleportClient,
Tracer: proxy.Tracer,
Client: tracessh.NewClient(sshconn, chans, emptyCh),
Namespace: apidefaults.Namespace,
TC: tc,
Tracer: tc.Tracer,
FIPSEnabled: fipsEnabled,
}

// Start a goroutine that will run for the duration of the client to process
Expand All @@ -1710,6 +1702,56 @@ func (proxy *ProxyClient) PortForwardToNode(ctx context.Context, nodeAddress Nod
return nc, nil
}

// RunInteractiveShell creates an interactive shell on the node and copies stdin/stdout/stderr
// to and from the node and local shell. This will block until the interactive shell on the node
// is terminated.
func (c *NodeClient) RunInteractiveShell(ctx context.Context, mode types.SessionParticipantMode, sessToJoin types.SessionTracker) error {
ctx, span := c.Tracer.Start(
ctx,
"nodeClient/RunInteractiveShell",
oteltrace.WithSpanKind(oteltrace.SpanKindClient),
)
defer span.End()

env := make(map[string]string)
env[teleport.EnvSSHJoinMode] = string(mode)
env[teleport.EnvSSHSessionReason] = c.TC.Config.Reason
env[teleport.EnvSSHSessionDisplayParticipantRequirements] = strconv.FormatBool(c.TC.Config.DisplayParticipantRequirements)
encoded, err := json.Marshal(&c.TC.Config.Invited)
if err != nil {
return trace.Wrap(err)
}

env[teleport.EnvSSHSessionInvited] = string(encoded)
for key, value := range c.TC.Env {
env[key] = value
}

nodeSession, err := newSession(ctx, c, sessToJoin, env, c.TC.Stdin, c.TC.Stdout, c.TC.Stderr, c.TC.EnableEscapeSequences)
if err != nil {
return trace.Wrap(err)
}

if err = nodeSession.runShell(ctx, mode, nil, c.TC.OnShellCreated); err != nil {
switch e := trace.Unwrap(err).(type) {
case *ssh.ExitError:
c.TC.ExitStatus = e.ExitStatus()
case *ssh.ExitMissingError:
c.TC.ExitStatus = 1
}

return trace.Wrap(err)
}

if nodeSession.ExitMsg == "" {
fmt.Fprintln(c.TC.Stderr, "the connection was closed on the remote side at ", time.Now().Format(time.RFC822))
} else {
fmt.Fprintln(c.TC.Stderr, nodeSession.ExitMsg)
}

return nil
}

func (c *NodeClient) handleGlobalRequests(ctx context.Context, requestCh <-chan *ssh.Request) {
for {
select {
Expand Down
2 changes: 1 addition & 1 deletion lib/client/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ func (ns *NodeSession) createServerSession(ctx context.Context) (*tracessh.Sessi

// if agent forwarding was requested (and we have a agent to forward),
// forward the agent to endpoint.
tc := ns.nodeClient.Proxy.teleportClient
tc := ns.nodeClient.TC
targetAgent := selectKeyAgent(tc)

if targetAgent != nil {
Expand Down
85 changes: 68 additions & 17 deletions lib/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -2285,7 +2285,29 @@ func (process *TeleportProcess) initSSH() error {

storagePresence := local.NewPresenceService(process.storage.BackendStorage)

s, err := regular.New(cfg.SSH.Addr,
// read the host UUID:
serverID, err := utils.ReadOrMakeHostUUID(cfg.DataDir)
if err != nil {
return trace.Wrap(err)
}

sessionController, err := srv.NewSessionController(srv.SessionControllerConfig{
Semaphores: authClient,
AccessPoint: authClient,
LockEnforcer: lockWatcher,
Emitter: &events.StreamerAndEmitter{Emitter: asyncEmitter, Streamer: streamer},
Component: teleport.ComponentNode,
Logger: process.log.WithField(trace.Component, "sessionctrl"),
TracerProvider: process.TracingProvider,
ServerID: serverID,
})
if err != nil {
return trace.Wrap(err)
}

s, err := regular.New(
process.ExitContext(),
cfg.SSH.Addr,
cfg.Hostname,
[]ssh.Signer{conn.ServerIdentity.KeySigner},
authClient,
Expand Down Expand Up @@ -2317,6 +2339,8 @@ func (process *TeleportProcess) initSSH() error {
regular.SetCreateHostUser(!cfg.SSH.DisableCreateHostUser),
regular.SetStoragePresenceService(storagePresence),
regular.SetInventoryControlHandle(process.inventoryHandle),
regular.SetTracerProvider(process.TracingProvider),
regular.SetSessionController(sessionController),
)
if err != nil {
return trace.Wrap(err)
Expand Down Expand Up @@ -3447,6 +3471,42 @@ func (process *TeleportProcess) initProxyEndpoint(conn *Connector) error {
}
}

var proxyRouter *proxy.Router
if !process.Config.Proxy.DisableReverseTunnel {
router, err := proxy.NewRouter(proxy.RouterConfig{
ClusterName: clusterName,
Log: process.log.WithField(trace.Component, "router"),
RemoteClusterGetter: accessPoint,
SiteGetter: tsrv,
TracerProvider: process.TracingProvider,
})
if err != nil {
return trace.Wrap(err)
}

proxyRouter = router
}

// read the host UUID:
serverID, err := utils.ReadOrMakeHostUUID(cfg.DataDir)
if err != nil {
return trace.Wrap(err)
}

sessionController, err := srv.NewSessionController(srv.SessionControllerConfig{
Semaphores: accessPoint,
AccessPoint: accessPoint,
LockEnforcer: lockWatcher,
Emitter: &events.StreamerAndEmitter{Emitter: asyncEmitter, Streamer: streamer},
Component: teleport.ComponentProxy,
Logger: process.log.WithField(trace.Component, "sessionctrl"),
TracerProvider: process.TracingProvider,
ServerID: serverID,
})
if err != nil {
return trace.Wrap(err)
}

// Register web proxy server
alpnHandlerForWeb := &alpnproxy.ConnectionHandlerWrapper{}
var webServer *http.Server
Expand Down Expand Up @@ -3512,6 +3572,8 @@ func (process *TeleportProcess) initProxyEndpoint(conn *Connector) error {
ALPNHandler: alpnHandlerForWeb.HandleConnection,
ProxyKubeAddr: proxyKubeAddr,
TraceClient: traceClt,
Router: proxyRouter,
SessionControl: sessionController,
}
webHandler, err = web.NewHandler(webConfig)
if err != nil {
Expand Down Expand Up @@ -3609,22 +3671,9 @@ func (process *TeleportProcess) initProxyEndpoint(conn *Connector) error {
})
}

var proxyRouter *proxy.Router
if !process.Config.Proxy.DisableReverseTunnel {
router, err := proxy.NewRouter(proxy.RouterConfig{
ClusterName: clusterName,
Log: process.log.WithField(trace.Component, "router"),
RemoteClusterGetter: accessPoint,
SiteGetter: tsrv,
TracerProvider: process.TracingProvider,
})
if err != nil {
return trace.Wrap(err)
}
proxyRouter = router
}

sshProxy, err := regular.New(cfg.Proxy.SSHAddr,
sshProxy, err := regular.New(
process.ExitContext(),
cfg.SSH.Addr,
cfg.Hostname,
[]ssh.Signer{conn.ServerIdentity.KeySigner},
accessPoint,
Expand All @@ -3648,6 +3697,8 @@ func (process *TeleportProcess) initProxyEndpoint(conn *Connector) error {
// accurately checked later when an SCP/SFTP request hits the
// destination Node.
regular.SetAllowFileCopying(true),
regular.SetTracerProvider(process.TracingProvider),
regular.SetSessionController(sessionController),
)
if err != nil {
return trace.Wrap(err)
Expand Down
Loading