From c59f3c9bb7a78d201a8abd7668f2ffe08062eb96 Mon Sep 17 00:00:00 2001 From: joerger Date: Mon, 25 Aug 2025 14:03:23 -0700 Subject: [PATCH 01/26] Generalize PrepareToReceiveSessionID. --- lib/sshutils/utils.go | 89 +++++++++++++++++++++++++++++++++++++++++++ lib/web/sessions.go | 81 --------------------------------------- lib/web/terminal.go | 12 +++--- 3 files changed, 96 insertions(+), 86 deletions(-) diff --git a/lib/sshutils/utils.go b/lib/sshutils/utils.go index 7dcd762629e74..d1e57dba887a8 100644 --- a/lib/sshutils/utils.go +++ b/lib/sshutils/utils.go @@ -19,12 +19,20 @@ package sshutils import ( + "context" + "log/slog" "net" "strconv" + "sync/atomic" + "time" "github.com/gravitational/trace" "golang.org/x/crypto/ssh" + "github.com/gravitational/teleport" + + tracessh "github.com/gravitational/teleport/api/observability/tracing/ssh" + "github.com/gravitational/teleport/lib/session" "github.com/gravitational/teleport/lib/utils" ) @@ -62,3 +70,84 @@ func NewSSHConnMetadataWithUser(conn ssh.ConnMetadata, user string) SSHConnMetad func (s SSHConnMetadataWithUser) User() string { return s.user } + +// SessionIDStatus indicates whether the session ID was received from +// the server or not, and if not why +type SessionIDStatus int + +const ( + // SessionIDReceived indicates the the session ID was received + SessionIDReceived SessionIDStatus = iota + 1 + // SessionIDNotSent indicates that the server set the session ID + // but didn't send it to us + SessionIDNotSent + // SessionIDNotModified indicates that the server used the session + // ID that was set by us + SessionIDNotModified +) + +// PrepareToReceiveSessionID configures the TeleportClient to listen for +// the server to send the session ID it's using. The returned function +// will return the current session ID from the server or a reason why +// one wasn't received. +func PrepareToReceiveSessionID(ctx context.Context, log *slog.Logger, client *tracessh.Client) func() (session.ID, SessionIDStatus) { + // send the session ID received from the server + var gotSessionID atomic.Bool + sessionIDFromServer := make(chan session.ID, 1) + + client.HandleSessionRequest(ctx, teleport.CurrentSessionIDRequest, func(ctx context.Context, req *ssh.Request) { + // only handle the first session ID request + if gotSessionID.Load() { + return + } + + sid, err := session.ParseID(string(req.Payload)) + if err != nil { + log.WarnContext(ctx, "Unable to parse session ID", "error", err) + return + } + + if gotSessionID.CompareAndSwap(false, true) { + sessionIDFromServer <- *sid + } + }) + + // If the session is about to close and we haven't received a session + // ID yet, ask if the server even supports sending one. Send the + // request in a new goroutine so session establishment won't be + // blocked on making this request + serverWillSetSessionID := make(chan bool, 1) + go func() { + resp, _, err := client.SendRequest(ctx, teleport.SessionIDQueryRequest, true, nil) + if err != nil { + log.WarnContext(ctx, "Failed to send session ID query request", "error", err) + serverWillSetSessionID <- false + } else { + serverWillSetSessionID <- resp + } + }() + + waitForSessionID := func() (session.ID, SessionIDStatus) { + timer := time.NewTimer(10 * time.Second) + defer timer.Stop() + + for { + select { + case sessionID := <-sessionIDFromServer: + return sessionID, SessionIDReceived + case sessionIDIsComing := <-serverWillSetSessionID: + if !sessionIDIsComing { + return "", SessionIDNotModified + } + // the server will send the session ID, continue + // waiting for it + case <-ctx.Done(): + return "", SessionIDNotSent + case <-timer.C: + return "", SessionIDNotSent + } + } + } + + return waitForSessionID +} diff --git a/lib/web/sessions.go b/lib/web/sessions.go index 60d145d7c273c..c8d809202bd97 100644 --- a/lib/web/sessions.go +++ b/lib/web/sessions.go @@ -29,7 +29,6 @@ import ( "net" "slices" "sync" - "sync/atomic" "time" "github.com/gravitational/trace" @@ -58,7 +57,6 @@ import ( "github.com/gravitational/teleport/lib/multiplexer" "github.com/gravitational/teleport/lib/reversetunnelclient" "github.com/gravitational/teleport/lib/services" - "github.com/gravitational/teleport/lib/session" alpncommon "github.com/gravitational/teleport/lib/srv/alpnproxy/common" "github.com/gravitational/teleport/lib/sshca" "github.com/gravitational/teleport/lib/sshutils" @@ -1328,82 +1326,3 @@ func (c *remoteClientCache) Close() error { return trace.NewAggregate(errors...) } - -// sessionIDStatus indicates whether the session ID was received from -// the server or not, and if not why -type sessionIDStatus int - -const ( - // sessionIDReceived indicates the the session ID was received - sessionIDReceived sessionIDStatus = iota + 1 - // sessionIDNotSent indicates that the server set the session ID - // but didn't send it to us - sessionIDNotSent - // sessionIDNotModified indicates that the server used the session - // ID that was set by us - sessionIDNotModified -) - -// prepareToReceiveSessionID configures the TeleportClient to listen for -// the server to send the session ID it's using. The returned function -// will return the current session ID from the server or a reason why -// one wasn't received. -func prepareToReceiveSessionID(ctx context.Context, log *slog.Logger, nc *client.NodeClient) func() (session.ID, sessionIDStatus) { - // send the session ID received from the server - var gotSessionID atomic.Bool - sessionIDFromServer := make(chan session.ID, 1) - - nc.Client.HandleSessionRequest(ctx, teleport.CurrentSessionIDRequest, func(ctx context.Context, req *ssh.Request) { - // only handle the first session ID request - if gotSessionID.Load() { - return - } - - sid, err := session.ParseID(string(req.Payload)) - if err != nil { - log.WarnContext(ctx, "Unable to parse session ID", "error", err) - return - } - - if gotSessionID.CompareAndSwap(false, true) { - sessionIDFromServer <- *sid - } - }) - - // If the session is about to close and we haven't received a session - // ID yet, ask if the server even supports sending one. Send the - // request in a new goroutine so session establishment won't be - // blocked on making this request - serverWillSetSessionID := make(chan bool, 1) - go func() { - resp, _, err := nc.Client.SendRequest(ctx, teleport.SessionIDQueryRequest, true, nil) - if err != nil { - log.WarnContext(ctx, "Failed to send session ID query request", "error", err) - serverWillSetSessionID <- false - } else { - serverWillSetSessionID <- resp - } - }() - - return func() (session.ID, sessionIDStatus) { - timer := time.NewTimer(10 * time.Second) - defer timer.Stop() - - for { - select { - case sessionID := <-sessionIDFromServer: - return sessionID, sessionIDReceived - case sessionIDIsComing := <-serverWillSetSessionID: - if !sessionIDIsComing { - return session.ID(""), sessionIDNotModified - } - // the server will send the session ID, continue - // waiting for it - case <-ctx.Done(): - return session.ID(""), sessionIDNotSent - case <-timer.C: - return session.ID(""), sessionIDNotSent - } - } - } -} diff --git a/lib/web/terminal.go b/lib/web/terminal.go index 6a604db2818cb..7f31556666854 100644 --- a/lib/web/terminal.go +++ b/lib/web/terminal.go @@ -63,6 +63,7 @@ import ( "github.com/gravitational/teleport/lib/session" "github.com/gravitational/teleport/lib/sshagent" "github.com/gravitational/teleport/lib/sshca" + libsshutils "github.com/gravitational/teleport/lib/sshutils" "github.com/gravitational/teleport/lib/utils" "github.com/gravitational/teleport/lib/utils/diagnostics/latency" "github.com/gravitational/teleport/lib/web/terminal" @@ -841,23 +842,24 @@ func (t *TerminalHandler) streamTerminal(ctx context.Context, tc *client.Telepor // created and the server sends us the session ID it is using writeSessionCtx, writeSessionCancel := context.WithCancel(ctx) defer writeSessionCancel() - waitForSessionID := prepareToReceiveSessionID(writeSessionCtx, t.logger, nc) + + waitForSessionID := libsshutils.PrepareToReceiveSessionID(writeSessionCtx, t.logger, nc.Client) // wait in a new goroutine because the server won't set a - // session ID until we open a shell + // session ID until we start the session. go func() { defer close(sessionDataSent) sid, status := waitForSessionID() switch status { - case sessionIDReceived: + case libsshutils.SessionIDReceived: t.sessionData.ID = sid fallthrough - case sessionIDNotModified: + case libsshutils.SessionIDNotModified: if err := t.writeSessionData(ctx); err != nil { t.logger.WarnContext(ctx, "Failure sending session data", "error", err) } - case sessionIDNotSent: + case libsshutils.SessionIDNotSent: t.logger.WarnContext(ctx, "Failed to receive session data") default: t.logger.WarnContext(ctx, "Invalid session ID status", "status", status) From 2c52f7066728966f68d30da92ba0cb5dcfe3c449 Mon Sep 17 00:00:00 2001 From: joerger Date: Mon, 25 Aug 2025 17:33:56 -0700 Subject: [PATCH 02/26] Initialize session ID in the connection context and update it from node current-session-id request. --- lib/srv/ctx.go | 41 +++++++++++++++++++++++++++++++----- lib/srv/forward/sshserver.go | 20 ++++++++++++++++++ lib/srv/regular/sshserver.go | 12 +++++++++++ lib/srv/sess.go | 8 +++---- lib/web/terminal.go | 2 ++ 5 files changed, 74 insertions(+), 9 deletions(-) diff --git a/lib/srv/ctx.go b/lib/srv/ctx.go index ed85312232aef..9da04d2e4a509 100644 --- a/lib/srv/ctx.go +++ b/lib/srv/ctx.go @@ -52,6 +52,7 @@ import ( "github.com/gravitational/teleport/lib/events" "github.com/gravitational/teleport/lib/service/servicecfg" "github.com/gravitational/teleport/lib/services" + rsession "github.com/gravitational/teleport/lib/session" "github.com/gravitational/teleport/lib/sshca" "github.com/gravitational/teleport/lib/sshutils" "github.com/gravitational/teleport/lib/sshutils/sftp" @@ -319,6 +320,13 @@ type ServerContext struct { // sessionParams are parameters associated with this server session. sessionParams *tracessh.SessionParams + // newSessionID is set if this server context is going to create a new session. + // If this is a join session, or it is not known whether this will be a join session, + // then this field should not be set. Additionally, this field should only be set + // through [ServerContext.SetNewSessionID] to ensure it notifies the client of the + // session ID. + newSessionID rsession.ID + // session holds the active session (if there's an active one). session *session @@ -692,23 +700,46 @@ func (c *ServerContext) GetSessionParams() tracessh.SessionParams { return sessionParams } -// setSession sets the context's session -func (c *ServerContext) setSession(ctx context.Context, sess *session, ch ssh.Channel) { +// SetNewSessionID sets the ID for a new session in this server context. +// This is a noop if the session ID is already set. +func (c *ServerContext) SetNewSessionID(ctx context.Context, ch ssh.Channel) { c.mu.Lock() defer c.mu.Unlock() - c.session = sess - // inform the client of the session ID that is being used in a new + // newSessionID only needs to be set once. + if c.newSessionID != "" { + return + } + + c.newSessionID = rsession.NewID() + + // inform the client of the session ID that is going to be used in a new // goroutine to reduce latency go func() { c.Logger.DebugContext(ctx, "Sending current session ID") - _, err := ch.SendRequest(teleport.CurrentSessionIDRequest, false, []byte(sess.ID())) + _, err := ch.SendRequest(teleport.CurrentSessionIDRequest, false, []byte(c.newSessionID)) if err != nil { c.Logger.DebugContext(ctx, "Failed to send the current session ID", "error", err) } }() } +// GetSetNewSessionID gets or sets the ID for a new session in this server context. +func (c *ServerContext) GetSetNewSessionID(ctx context.Context, ch ssh.Channel) rsession.ID { + c.mu.Lock() + defer c.mu.Unlock() + + c.SetNewSessionID(ctx, ch) + return c.newSessionID +} + +// setSession sets the context's session +func (c *ServerContext) setSession(ctx context.Context, sess *session) { + c.mu.Lock() + defer c.mu.Unlock() + c.session = sess +} + // getSession returns the context's session // // The associated session is not set in the server context until a diff --git a/lib/srv/forward/sshserver.go b/lib/srv/forward/sshserver.go index 20da5ea0b1b40..e08794afeca42 100644 --- a/lib/srv/forward/sshserver.go +++ b/lib/srv/forward/sshserver.go @@ -1153,6 +1153,26 @@ func (s *Server) handleSessionChannel(ctx context.Context, nch ssh.NewChannel) { scx.SetAllowFileCopying(true) defer scx.Close() + // If this is a Teleport node server, it should send the session ID + // right after the session channel is accepted. We should reuse this + // session ID in order to track the node session from the proxy rather + // than creating duplicate session trackers, events, etc. + if s.targetServer.GetSubKind() == types.SubKindTeleportNode { + waitForSessionID := sshutils.PrepareToReceiveSessionID(ctx, s.logger, s.remoteClient) + + // Wait for the session ID in a goroutine. + go func() { + sid, status := waitForSessionID() + switch status { + case sshutils.SessionIDReceived: + scx.ConnectionContext.SetSessionID(sid) + fallthrough + default: + s.logger.WarnContext(ctx, "Unexpected session ID status in proxy recording mode. Ensure the targeted Teleport Node is upgraded to v19.0.0+ to avoid duplicate events.", "status", status) + } + }() + } + // Create a "session" channel on the remote host. Note that we // create the remote session channel before accepting the local // channel request; this allows us to propagate the rejection diff --git a/lib/srv/regular/sshserver.go b/lib/srv/regular/sshserver.go index 106297cd48a32..b9c2276d2f5d4 100644 --- a/lib/srv/regular/sshserver.go +++ b/lib/srv/regular/sshserver.go @@ -1646,6 +1646,18 @@ func (s *Server) handleSessionRequests(ctx context.Context, ccx *sshutils.Connec trackingChan := scx.TrackActivity(ch) + // If we are creating a new session (not joining a session), inform the + // client of the session ID that is being used. Do this in a new goroutine + // to reduce latency. + // + // TODO(Joerger): DELETE IN v20.0.0 - the nil conditional is only necessary + // for old clients which don't provide session params upfront and instead send + // them in env var requests later. Setting the new session ID late causes + // seession ID sync issues between the node and proxy in proxy recording mode. + if sessionParams != nil && sessionParams.JoinSessionID == "" { + scx.SetNewSessionID(ctx, ch) + } + // The keep-alive loop will keep pinging the remote server and after it has // missed a certain number of keep-alive requests it will cancel the // closeContext which signals the server to shutdown. diff --git a/lib/srv/sess.go b/lib/srv/sess.go index 18b8b20396b54..14c522a886f57 100644 --- a/lib/srv/sess.go +++ b/lib/srv/sess.go @@ -366,7 +366,7 @@ func (s *SessionRegistry) OpenSession(ctx context.Context, ch ssh.Channel, scx * if err != nil { return trace.Wrap(err) } - scx.setSession(ctx, sess, ch) + scx.setSession(ctx, sess) s.addSession(sess) scx.Logger.InfoContext(ctx, "Creating interactive session", "session_id", sess.id) @@ -400,7 +400,7 @@ func (s *SessionRegistry) JoinSession(ctx context.Context, ch ssh.Channel, scx * return trace.BadParameter("Unrecognized session participant mode: %q", mode) } - scx.setSession(ctx, session, ch) + scx.setSession(ctx, session) // Update the in-memory data structure that a party member has joined. if err := session.join(ch, scx, mode); err != nil { @@ -440,7 +440,7 @@ func (s *SessionRegistry) OpenExecSession(ctx context.Context, channel ssh.Chann // Start a non-interactive session (TTY attached). Close the session if an error // occurs, otherwise it will be closed by the callee. - scx.setSession(ctx, sess, channel) + scx.setSession(ctx, sess) err = sess.startExec(ctx, channel, scx) if err != nil { @@ -828,7 +828,7 @@ func newSession(ctx context.Context, r *SessionRegistry, scx *ServerContext, ch startTime := time.Now().UTC() rsess := rsession.Session{ Kind: types.SSHSessionKind, - ID: rsession.NewID(), + ID: scx.GetSetNewSessionID(ctx, ch), TerminalParams: rsession.TerminalParams{ W: teleport.DefaultTerminalWidth, H: teleport.DefaultTerminalHeight, diff --git a/lib/web/terminal.go b/lib/web/terminal.go index 7f31556666854..37af1ec03bc51 100644 --- a/lib/web/terminal.go +++ b/lib/web/terminal.go @@ -847,6 +847,8 @@ func (t *TerminalHandler) streamTerminal(ctx context.Context, tc *client.Telepor // wait in a new goroutine because the server won't set a // session ID until we start the session. + // Note: before v19.0.0, the session ID isn't set until during + // the shell request, rather than right after the session request. go func() { defer close(sessionDataSent) From 3adea075f428643def8fa850d535c68277a2851e Mon Sep 17 00:00:00 2001 From: joerger Date: Mon, 25 Aug 2025 17:56:01 -0700 Subject: [PATCH 03/26] Add session-id-query-v2@goteleport.com request and ensure new session ID is correctly set in proxy recording mode during the channel request. --- constants.go | 12 +++++++++- lib/srv/ctx.go | 25 +++++++------------- lib/srv/forward/sshserver.go | 44 ++++++++++++++++++++++++------------ lib/srv/regular/sshserver.go | 28 +++++++++++++++-------- lib/srv/sess.go | 7 +++++- lib/sshutils/utils.go | 9 ++++++-- lib/web/terminal.go | 2 +- 7 files changed, 80 insertions(+), 47 deletions(-) diff --git a/constants.go b/constants.go index 47b4defc1846f..d80695aafe77f 100644 --- a/constants.go +++ b/constants.go @@ -831,9 +831,19 @@ const ( CurrentSessionIDRequest = "current-session-id@goteleport.com" // SessionIDQueryRequest is sent by clients to ask servers if they - // will generate their own session ID when a new session is created. + // will generate and share their own session ID when a new session + // is started (session and exec/shell channels accepted). + // + // TODO(Joerger): DELETE IN v20.0.0 in favor of v2 below, which allows + // the client to know the session ID earlier, which is required for the + // proxy forwarding server to coordinate the session ID with target nodes. SessionIDQueryRequest = "session-id-query@goteleport.com" + // SessionIDQueryRequestV2 is sent by clients to ask servers if they + // will generate and share their own session ID when a new session + // channel is accepted. + SessionIDQueryRequestV2 = "session-id-query-v2@goteleport.com" + // ForceTerminateRequest is an SSH request to forcefully terminate a session. ForceTerminateRequest = "x-teleport-force-terminate" diff --git a/lib/srv/ctx.go b/lib/srv/ctx.go index 9da04d2e4a509..77017d968fc14 100644 --- a/lib/srv/ctx.go +++ b/lib/srv/ctx.go @@ -321,10 +321,9 @@ type ServerContext struct { sessionParams *tracessh.SessionParams // newSessionID is set if this server context is going to create a new session. - // If this is a join session, or it is not known whether this will be a join session, - // then this field should not be set. Additionally, this field should only be set - // through [ServerContext.SetNewSessionID] to ensure it notifies the client of the - // session ID. + // This field must be set through [ServerContext.SetNewSessionID] for non-join + // sessions before as soon as a session channel is accepted in order to inform + // the client of the to-be session ID. newSessionID rsession.ID // session holds the active session (if there's an active one). @@ -701,20 +700,14 @@ func (c *ServerContext) GetSessionParams() tracessh.SessionParams { } // SetNewSessionID sets the ID for a new session in this server context. -// This is a noop if the session ID is already set. -func (c *ServerContext) SetNewSessionID(ctx context.Context, ch ssh.Channel) { +func (c *ServerContext) SetNewSessionID(ctx context.Context, sid rsession.ID, ch ssh.Channel) { c.mu.Lock() defer c.mu.Unlock() - // newSessionID only needs to be set once. - if c.newSessionID != "" { - return - } - - c.newSessionID = rsession.NewID() + c.newSessionID = sid // inform the client of the session ID that is going to be used in a new - // goroutine to reduce latency + // goroutine to reduce latency. go func() { c.Logger.DebugContext(ctx, "Sending current session ID") _, err := ch.SendRequest(teleport.CurrentSessionIDRequest, false, []byte(c.newSessionID)) @@ -724,12 +717,10 @@ func (c *ServerContext) SetNewSessionID(ctx context.Context, ch ssh.Channel) { }() } -// GetSetNewSessionID gets or sets the ID for a new session in this server context. -func (c *ServerContext) GetSetNewSessionID(ctx context.Context, ch ssh.Channel) rsession.ID { +// GetNewSessionID gets the ID for a new session in this server context. +func (c *ServerContext) GetNewSessionID(ctx context.Context) rsession.ID { c.mu.Lock() defer c.mu.Unlock() - - c.SetNewSessionID(ctx, ch) return c.newSessionID } diff --git a/lib/srv/forward/sshserver.go b/lib/srv/forward/sshserver.go index e08794afeca42..2a9b57085e38b 100644 --- a/lib/srv/forward/sshserver.go +++ b/lib/srv/forward/sshserver.go @@ -51,6 +51,7 @@ import ( "github.com/gravitational/teleport/lib/events" "github.com/gravitational/teleport/lib/service/servicecfg" "github.com/gravitational/teleport/lib/services" + "github.com/gravitational/teleport/lib/session" "github.com/gravitational/teleport/lib/srv" "github.com/gravitational/teleport/lib/sshagent" "github.com/gravitational/teleport/lib/sshutils" @@ -968,7 +969,16 @@ func (s *Server) handleGlobalRequest(ctx context.Context, req *ssh.Request) { // Pass request on unchanged. case teleport.SessionIDQueryRequest: // Reply true to session ID query requests, we will set new - // session IDs for new sessions + // session IDs for new sessions during the shel/exec channel + // request. + if err := req.Reply(true, nil); err != nil { + s.logger.WarnContext(ctx, "Failed to reply to session ID query request", "error", err) + } + return + case teleport.SessionIDQueryRequestV2: + // Reply true to session ID query requests, we will set new + // session IDs for new sessions directly after accepting the + // session channel request. if err := req.Reply(true, nil); err != nil { s.logger.WarnContext(ctx, "Failed to reply to session ID query request", "error", err) } @@ -1156,21 +1166,10 @@ func (s *Server) handleSessionChannel(ctx context.Context, nch ssh.NewChannel) { // If this is a Teleport node server, it should send the session ID // right after the session channel is accepted. We should reuse this // session ID in order to track the node session from the proxy rather - // than creating duplicate session trackers, events, etc. + // than creating duplicate session trackers, events, etc. on the node. + var waitForSessionID func() (session.ID, sshutils.SessionIDStatus) if s.targetServer.GetSubKind() == types.SubKindTeleportNode { - waitForSessionID := sshutils.PrepareToReceiveSessionID(ctx, s.logger, s.remoteClient) - - // Wait for the session ID in a goroutine. - go func() { - sid, status := waitForSessionID() - switch status { - case sshutils.SessionIDReceived: - scx.ConnectionContext.SetSessionID(sid) - fallthrough - default: - s.logger.WarnContext(ctx, "Unexpected session ID status in proxy recording mode. Ensure the targeted Teleport Node is upgraded to v19.0.0+ to avoid duplicate events.", "status", status) - } - }() + waitForSessionID = sshutils.PrepareToReceiveSessionID(ctx, s.logger, s.remoteClient, true) } // Create a "session" channel on the remote host. Note that we @@ -1203,6 +1202,21 @@ func (s *Server) handleSessionChannel(ctx context.Context, nch ssh.NewChannel) { } scx.AddCloser(ch) + // Wait for the session ID if we expect the node to provide it. + if waitForSessionID != nil { + sid, status := waitForSessionID() + switch status { + case sshutils.SessionIDReceived: + scx.SetNewSessionID(ctx, sid, ch) + default: + s.logger.WarnContext(ctx, "Unexpected session ID status from a Teleport Node in proxy recording mode. Ensure the targeted Teleport Node is upgraded to v19.0.0+ to avoid duplicate events due to mismatched session IDs.", "status", status) + scx.SetNewSessionID(ctx, session.NewID(), ch) + } + } else { + // This is not a Teleport node so we don't expect session ID to be reported. + scx.SetNewSessionID(ctx, session.NewID(), ch) + } + ch = scx.TrackActivity(ch) s.logger.DebugContext(ctx, "Opening session request", "target_addr", s.sconn.RemoteAddr(), "session_id", scx.ID()) diff --git a/lib/srv/regular/sshserver.go b/lib/srv/regular/sshserver.go index b9c2276d2f5d4..345aa35a5c9f1 100644 --- a/lib/srv/regular/sshserver.go +++ b/lib/srv/regular/sshserver.go @@ -66,6 +66,7 @@ import ( authorizedkeysreporter "github.com/gravitational/teleport/lib/secretsscanner/authorizedkeys" "github.com/gravitational/teleport/lib/service/servicecfg" "github.com/gravitational/teleport/lib/services" + "github.com/gravitational/teleport/lib/session" "github.com/gravitational/teleport/lib/srv" "github.com/gravitational/teleport/lib/srv/ingress" "github.com/gravitational/teleport/lib/sshagent" @@ -1297,7 +1298,16 @@ func (s *Server) HandleRequest(ctx context.Context, ccx *sshutils.ConnectionCont } case teleport.SessionIDQueryRequest: // Reply true to session ID query requests, we will set new - // session IDs for new sessions + // session IDs for new sessions during the shel/exec channel + // request. + if err := r.Reply(true, nil); err != nil { + s.logger.WarnContext(ctx, "Failed to reply to session ID query request", "error", err) + } + return + case teleport.SessionIDQueryRequestV2: + // Reply true to session ID query requests, we will set new + // session IDs for new sessions directly after accepting the + // session channel request. if err := r.Reply(true, nil); err != nil { s.logger.WarnContext(ctx, "Failed to reply to session ID query request", "error", err) } @@ -1646,16 +1656,14 @@ func (s *Server) handleSessionRequests(ctx context.Context, ccx *sshutils.Connec trackingChan := scx.TrackActivity(ch) - // If we are creating a new session (not joining a session), inform the - // client of the session ID that is being used. Do this in a new goroutine - // to reduce latency. + // If we are creating a new session (not joining a session), prepare a new session + // ID and inform the client. // - // TODO(Joerger): DELETE IN v20.0.0 - the nil conditional is only necessary - // for old clients which don't provide session params upfront and instead send - // them in env var requests later. Setting the new session ID late causes - // seession ID sync issues between the node and proxy in proxy recording mode. - if sessionParams != nil && sessionParams.JoinSessionID == "" { - scx.SetNewSessionID(ctx, ch) + // Note: If this is an old client ( Date: Tue, 30 Sep 2025 17:24:59 -0700 Subject: [PATCH 04/26] Replace PrepareToReceiveSessionID with simpler in-place logic. --- constants.go | 11 +++-- lib/srv/forward/sshserver.go | 60 +++++++++++++++++------ lib/srv/regular/sshserver.go | 6 +++ lib/sshutils/utils.go | 94 ------------------------------------ lib/web/terminal.go | 36 ++++++++------ 5 files changed, 79 insertions(+), 128 deletions(-) diff --git a/constants.go b/constants.go index d80695aafe77f..1a868da4cd2f0 100644 --- a/constants.go +++ b/constants.go @@ -834,14 +834,17 @@ const ( // will generate and share their own session ID when a new session // is started (session and exec/shell channels accepted). // - // TODO(Joerger): DELETE IN v20.0.0 in favor of v2 below, which allows - // the client to know the session ID earlier, which is required for the - // proxy forwarding server to coordinate the session ID with target nodes. + // TODO(Joerger): DELETE IN v21.0.0 (1 extra major version grace period) + // All v17+ servers set the session ID. v19+ clients stop checking. SessionIDQueryRequest = "session-id-query@goteleport.com" // SessionIDQueryRequestV2 is sent by clients to ask servers if they // will generate and share their own session ID when a new session - // channel is accepted. + // channel is accepted, rather than when the shell/exec channel is. + // + // TODO(Joerger): DELETE IN v22.0.0 (1 extra major version grace period) + // all v19+ servers set the session ID directly after accepting the session channel. + // clients should stop checking in v21, and servers should stop responding to the query in v22. SessionIDQueryRequestV2 = "session-id-query-v2@goteleport.com" // ForceTerminateRequest is an SSH request to forcefully terminate a session. diff --git a/lib/srv/forward/sshserver.go b/lib/srv/forward/sshserver.go index 2a9b57085e38b..4f434033ead51 100644 --- a/lib/srv/forward/sshserver.go +++ b/lib/srv/forward/sshserver.go @@ -28,6 +28,7 @@ import ( "net" "os" "strings" + "sync" "time" "github.com/gravitational/trace" @@ -968,6 +969,9 @@ func (s *Server) handleGlobalRequest(ctx context.Context, req *ssh.Request) { } // Pass request on unchanged. case teleport.SessionIDQueryRequest: + // TODO(Joerger): DELETE IN v21.0.0 (1 extra major version grace period) + // All v17+ servers set the session ID. v19+ clients stop checking. + // Reply true to session ID query requests, we will set new // session IDs for new sessions during the shel/exec channel // request. @@ -976,6 +980,9 @@ func (s *Server) handleGlobalRequest(ctx context.Context, req *ssh.Request) { } return case teleport.SessionIDQueryRequestV2: + // TODO(Joerger): DELETE IN v22.0.0 (1 extra major version grace period) + // clients should stop checking in v21, and servers should stop responding to the query in v22. + // Reply true to session ID query requests, we will set new // session IDs for new sessions directly after accepting the // session channel request. @@ -1167,9 +1174,39 @@ func (s *Server) handleSessionChannel(ctx context.Context, nch ssh.NewChannel) { // right after the session channel is accepted. We should reuse this // session ID in order to track the node session from the proxy rather // than creating duplicate session trackers, events, etc. on the node. - var waitForSessionID func() (session.ID, sshutils.SessionIDStatus) + var receiveSessionIDOnce sync.Once + receivedSessionID := make(chan session.ID, 1) if s.targetServer.GetSubKind() == types.SubKindTeleportNode { - waitForSessionID = sshutils.PrepareToReceiveSessionID(ctx, s.logger, s.remoteClient, true) + // TODO(Joerger): DELETE IN v21.0.0 (1 extra major version grace period) + // all v19+ servers set the session ID directly after accepting the session channel. + // clients should stop checking in v21, and servers should stop responding to the query in v22. + willSendSID, _, err := s.remoteClient.SendRequest(ctx, teleport.SessionIDQueryRequestV2, true, nil) + if err != nil { + s.logger.WarnContext(ctx, "Failed to send session ID query request", "error", err) + } + + if willSendSID { + s.remoteClient.HandleSessionRequest(ctx, teleport.CurrentSessionIDRequest, func(ctx context.Context, req *ssh.Request) { + receiveSessionIDOnce.Do(func() { + sid, err := session.ParseID(string(req.Payload)) + if err != nil { + s.logger.WarnContext(ctx, "Unable to parse session ID", "error", err) + return + } + + receivedSessionID <- *sid + close(receivedSessionID) + }) + }) + } else { + receivedSessionID <- session.NewID() + close(receivedSessionID) + s.logger.WarnContext(ctx, "Failed to query session ID from target node. Ensure the targeted Teleport Node is upgraded to v19.0.0+ to avoid duplicate events due to mismatched session IDs.") + } + } else { + // This is not a Teleport node so we don't expect session ID to be reported. + receivedSessionID <- session.NewID() + close(receivedSessionID) } // Create a "session" channel on the remote host. Note that we @@ -1202,19 +1239,12 @@ func (s *Server) handleSessionChannel(ctx context.Context, nch ssh.NewChannel) { } scx.AddCloser(ch) - // Wait for the session ID if we expect the node to provide it. - if waitForSessionID != nil { - sid, status := waitForSessionID() - switch status { - case sshutils.SessionIDReceived: - scx.SetNewSessionID(ctx, sid, ch) - default: - s.logger.WarnContext(ctx, "Unexpected session ID status from a Teleport Node in proxy recording mode. Ensure the targeted Teleport Node is upgraded to v19.0.0+ to avoid duplicate events due to mismatched session IDs.", "status", status) - scx.SetNewSessionID(ctx, session.NewID(), ch) - } - } else { - // This is not a Teleport node so we don't expect session ID to be reported. - scx.SetNewSessionID(ctx, session.NewID(), ch) + // Wait for the session ID to be reported by the target node. + select { + case sid := <-receivedSessionID: + scx.SetNewSessionID(ctx, sid, ch) + case <-time.After(5 * time.Second): + s.logger.WarnContext(ctx, "Failed to receive session ID from target node. Ensure the targeted Teleport Node is upgraded to v19.0.0+ to avoid duplicate events due to mismatched session IDs.") } ch = scx.TrackActivity(ch) diff --git a/lib/srv/regular/sshserver.go b/lib/srv/regular/sshserver.go index 345aa35a5c9f1..93b22a8914f6b 100644 --- a/lib/srv/regular/sshserver.go +++ b/lib/srv/regular/sshserver.go @@ -1297,6 +1297,9 @@ func (s *Server) HandleRequest(ctx context.Context, ccx *sshutils.ConnectionCont } } case teleport.SessionIDQueryRequest: + // TODO(Joerger): DELETE IN v21.0.0 (1 extra major version grace period) + // All v17+ servers set the session ID. v19+ clients stop checking. + // Reply true to session ID query requests, we will set new // session IDs for new sessions during the shel/exec channel // request. @@ -1305,6 +1308,9 @@ func (s *Server) HandleRequest(ctx context.Context, ccx *sshutils.ConnectionCont } return case teleport.SessionIDQueryRequestV2: + // TODO(Joerger): DELETE IN v22.0.0 (1 extra major version grace period) + // clients should stop checking in v21, and servers should stop responding to the query in v22. + // Reply true to session ID query requests, we will set new // session IDs for new sessions directly after accepting the // session channel request. diff --git a/lib/sshutils/utils.go b/lib/sshutils/utils.go index eb2feea664737..7dcd762629e74 100644 --- a/lib/sshutils/utils.go +++ b/lib/sshutils/utils.go @@ -19,20 +19,12 @@ package sshutils import ( - "context" - "log/slog" "net" "strconv" - "sync/atomic" - "time" "github.com/gravitational/trace" "golang.org/x/crypto/ssh" - "github.com/gravitational/teleport" - - tracessh "github.com/gravitational/teleport/api/observability/tracing/ssh" - "github.com/gravitational/teleport/lib/session" "github.com/gravitational/teleport/lib/utils" ) @@ -70,89 +62,3 @@ func NewSSHConnMetadataWithUser(conn ssh.ConnMetadata, user string) SSHConnMetad func (s SSHConnMetadataWithUser) User() string { return s.user } - -// SessionIDStatus indicates whether the session ID was received from -// the server or not, and if not why -type SessionIDStatus int - -const ( - // SessionIDReceived indicates the the session ID was received - SessionIDReceived SessionIDStatus = iota + 1 - // SessionIDNotSent indicates that the server set the session ID - // but didn't send it to us - SessionIDNotSent - // SessionIDNotModified indicates that the server used the session - // ID that was set by us - SessionIDNotModified -) - -// PrepareToReceiveSessionID configures the TeleportClient to listen for -// the server to send the session ID it's using. The returned function -// will return the current session ID from the server or a reason why -// one wasn't received. -func PrepareToReceiveSessionID(ctx context.Context, log *slog.Logger, client *tracessh.Client, useV2 bool) func() (session.ID, SessionIDStatus) { - // send the session ID received from the server - var gotSessionID atomic.Bool - sessionIDFromServer := make(chan session.ID, 1) - - client.HandleSessionRequest(ctx, teleport.CurrentSessionIDRequest, func(ctx context.Context, req *ssh.Request) { - // only handle the first session ID request - if gotSessionID.Load() { - return - } - - sid, err := session.ParseID(string(req.Payload)) - if err != nil { - log.WarnContext(ctx, "Unable to parse session ID", "error", err) - return - } - - if gotSessionID.CompareAndSwap(false, true) { - sessionIDFromServer <- *sid - } - }) - - // If the session is about to close and we haven't received a session - // ID yet, ask if the server even supports sending one. Send the - // request in a new goroutine so session establishment won't be - // blocked on making this request - serverWillSetSessionID := make(chan bool, 1) - go func() { - reqName := teleport.SessionIDQueryRequest - if useV2 { - reqName = teleport.SessionIDQueryRequestV2 - } - - resp, _, err := client.SendRequest(ctx, reqName, true, nil) - if err != nil { - log.WarnContext(ctx, "Failed to send session ID query request", "error", err) - serverWillSetSessionID <- false - } else { - serverWillSetSessionID <- resp - } - }() - - waitForSessionID := func() (session.ID, SessionIDStatus) { - timer := time.NewTimer(10 * time.Second) - defer timer.Stop() - - for { - select { - case sessionID := <-sessionIDFromServer: - return sessionID, SessionIDReceived - case sessionIDIsComing := <-serverWillSetSessionID: - if !sessionIDIsComing { - return "", SessionIDNotModified - } - // the server will send the session ID, continue - // waiting for it - case <-ctx.Done(): - return "", SessionIDNotSent - case <-timer.C: - return "", SessionIDNotSent - } - } - } - - return waitForSessionID -} diff --git a/lib/web/terminal.go b/lib/web/terminal.go index ab7e896c40199..fa6e90d4aaefe 100644 --- a/lib/web/terminal.go +++ b/lib/web/terminal.go @@ -63,7 +63,6 @@ import ( "github.com/gravitational/teleport/lib/session" "github.com/gravitational/teleport/lib/sshagent" "github.com/gravitational/teleport/lib/sshca" - libsshutils "github.com/gravitational/teleport/lib/sshutils" "github.com/gravitational/teleport/lib/utils" "github.com/gravitational/teleport/lib/utils/diagnostics/latency" "github.com/gravitational/teleport/lib/web/terminal" @@ -843,28 +842,35 @@ func (t *TerminalHandler) streamTerminal(ctx context.Context, tc *client.Telepor writeSessionCtx, writeSessionCancel := context.WithCancel(ctx) defer writeSessionCancel() - waitForSessionID := libsshutils.PrepareToReceiveSessionID(writeSessionCtx, t.logger, nc.Client, false) + // only handle the first session ID request + var receiveSessionIDOnce sync.Once + receivedSessionID := make(chan struct{}) + nc.Client.HandleSessionRequest(ctx, teleport.CurrentSessionIDRequest, func(ctx context.Context, req *ssh.Request) { + receiveSessionIDOnce.Do(func() { + sid, err := session.ParseID(string(req.Payload)) + if err != nil { + t.logger.WarnContext(ctx, "Unable to parse session ID", "error", err) + return + } + + t.sessionData.ID = *sid + close(receivedSessionID) + }) + }) // wait in a new goroutine because the server won't set a // session ID until we start the session. - // Note: before v19.0.0, the session ID isn't set until during - // the shell request, rather than right after the session request. go func() { - defer close(sessionDataSent) - - sid, status := waitForSessionID() - switch status { - case libsshutils.SessionIDReceived: - t.sessionData.ID = sid - fallthrough - case libsshutils.SessionIDNotModified: + ctx, cancel := context.WithTimeout(writeSessionCtx, 10*time.Second) + defer cancel() + + select { + case <-receivedSessionID: if err := t.writeSessionData(ctx); err != nil { t.logger.WarnContext(ctx, "Failure sending session data", "error", err) } - case libsshutils.SessionIDNotSent: + case <-ctx.Done(): t.logger.WarnContext(ctx, "Failed to receive session data") - default: - t.logger.WarnContext(ctx, "Invalid session ID status", "status", status) } }() } From 72e5b493a07c481ceb26c30531e1fd46dc66aec0 Mon Sep 17 00:00:00 2001 From: joerger Date: Tue, 30 Sep 2025 17:57:42 -0700 Subject: [PATCH 05/26] Don't emit session events or tracker when proxy forwarding to a Teleport Node. --- lib/srv/ctx.go | 3 +-- lib/srv/sess.go | 15 ++++++++++----- lib/srv/sess_test.go | 8 ++++---- 3 files changed, 15 insertions(+), 11 deletions(-) diff --git a/lib/srv/ctx.go b/lib/srv/ctx.go index 77017d968fc14..a68eb90af6312 100644 --- a/lib/srv/ctx.go +++ b/lib/srv/ctx.go @@ -901,8 +901,7 @@ func (c *ServerContext) reportStats(conn *utils.TrackingConn) { // sessions are being recorded at the proxy (this would result in double // events). // Do not emit session data for git commands as they have their own events. - if c.GetServer().Component() == teleport.ComponentProxy || - c.GetServer().Component() == teleport.ComponentForwardingGit { + if c.GetServer().Component() == teleport.ComponentForwardingGit { return } if services.IsRecordAtProxy(c.SessionRecordingConfig.GetMode()) && diff --git a/lib/srv/sess.go b/lib/srv/sess.go index f923b15996b31..33033ca0b17f9 100644 --- a/lib/srv/sess.go +++ b/lib/srv/sess.go @@ -2386,12 +2386,12 @@ func (s *session) trackSession(ctx context.Context, teleportUser string, policyS Invited: s.scx.GetSessionParams().Invited, } + // Don't propagate the session tracker if: + // - This is a proxy forwarding server for a Teleport Node (tracking is handled by the target node server) + // - this is a non-interactive session + // - the session was initiated by a bot svc := s.registry.SessionTrackerService - // Only propagate the session tracker when the recording mode and component are in sync - // AND the sesssion is interactive - // AND the session was not initiated by a bot - if (s.registry.Srv.Component() == teleport.ComponentNode && services.IsRecordAtProxy(s.scx.SessionRecordingConfig.GetMode())) || - (s.registry.Srv.Component() == teleport.ComponentProxy && !services.IsRecordAtProxy(s.scx.SessionRecordingConfig.GetMode())) || + if (s.registry.Srv.Component() == teleport.ComponentForwardingNode && !s.registry.Srv.GetInfo().IsOpenSSHNode()) || sessType == sessionTypeNonInteractive || s.scx.Identity.BotName != "" { svc = nil @@ -2445,6 +2445,11 @@ func (s *session) trackSession(ctx context.Context, teleportUser string, policyS // emitAuditEvent emits audit events. func (s *session) emitAuditEvent(ctx context.Context, event apievents.AuditEvent) error { + // Nodes discard audit events in cases when proxies are already emitting them. + if s.scx.srv.Component() == teleport.ComponentNode && services.IsRecordAtProxy(s.scx.SessionRecordingConfig.GetMode()) { + return nil + } + return s.emitter.EmitAuditEvent(ctx, event) } diff --git a/lib/srv/sess_test.go b/lib/srv/sess_test.go index 99a2629b29490..173d96b1244c4 100644 --- a/lib/srv/sess_test.go +++ b/lib/srv/sess_test.go @@ -964,7 +964,7 @@ func TestTrackingSession(t *testing.T) { }, { name: "proxy with proxy recording mode", - component: teleport.ComponentProxy, + component: teleport.ComponentForwardingNode, recordingMode: types.RecordAtProxy, interactive: true, assertion: require.NoError, @@ -974,7 +974,7 @@ func TestTrackingSession(t *testing.T) { }, { name: "proxy with node recording mode", - component: teleport.ComponentProxy, + component: teleport.ComponentForwardingNode, recordingMode: types.RecordAtNode, interactive: true, assertion: require.NoError, @@ -1186,7 +1186,7 @@ func TestCloseProxySession(t *testing.T) { ctx := t.Context() srv := newMockServer(t) - srv.component = teleport.ComponentProxy + srv.component = teleport.ComponentForwardingNode reg, err := NewSessionRegistry(SessionRegistryConfig{ Srv: srv, @@ -1233,7 +1233,7 @@ func TestCloseRemoteSession(t *testing.T) { ctx := t.Context() srv := newMockServer(t) - srv.component = teleport.ComponentProxy + srv.component = teleport.ComponentForwardingNode // init a session registry reg, _ := NewSessionRegistry(SessionRegistryConfig{ From 612b742c6313fac6ab228344094ba647478c4792 Mon Sep 17 00:00:00 2001 From: joerger Date: Wed, 1 Oct 2025 19:11:22 -0700 Subject: [PATCH 06/26] Fix missing session tracker for outdated Teleport Node. --- lib/srv/ctx.go | 16 ++++++++++++++++ lib/srv/forward/sshserver.go | 1 + lib/srv/sess.go | 2 +- 3 files changed, 18 insertions(+), 1 deletion(-) diff --git a/lib/srv/ctx.go b/lib/srv/ctx.go index a68eb90af6312..1c22edb722757 100644 --- a/lib/srv/ctx.go +++ b/lib/srv/ctx.go @@ -326,6 +326,12 @@ type ServerContext struct { // the client of the to-be session ID. newSessionID rsession.ID + // proxyShouldCreateSessionTracker indicates that a session tracker should be created + // for a proxy forwarded session because the node failed to report its session ID after + // a session channel request. + // TODO(Joerger): DELETE IN v21.0.0 - All v19+ nodes report session ID after session channel + proxyShouldCreateSessionTracker bool + // session holds the active session (if there's an active one). session *session @@ -699,6 +705,16 @@ func (c *ServerContext) GetSessionParams() tracessh.SessionParams { return sessionParams } +// proxyShouldCreateSessionTracker indicates that a session tracker should be created +// for a proxy forwarded session because the node failed to report its session ID after +// a session channel request. +// TODO(Joerger): DELETE IN v21.0.0 - All v19+ nodes report session ID after session channel +func (c *ServerContext) SetProxyShouldCreateSessionTracker() { + c.mu.Lock() + defer c.mu.Unlock() + c.proxyShouldCreateSessionTracker = true +} + // SetNewSessionID sets the ID for a new session in this server context. func (c *ServerContext) SetNewSessionID(ctx context.Context, sid rsession.ID, ch ssh.Channel) { c.mu.Lock() diff --git a/lib/srv/forward/sshserver.go b/lib/srv/forward/sshserver.go index 4f434033ead51..e22fa6a2401fb 100644 --- a/lib/srv/forward/sshserver.go +++ b/lib/srv/forward/sshserver.go @@ -1200,6 +1200,7 @@ func (s *Server) handleSessionChannel(ctx context.Context, nch ssh.NewChannel) { }) } else { receivedSessionID <- session.NewID() + scx.SetProxyShouldCreateSessionTracker() close(receivedSessionID) s.logger.WarnContext(ctx, "Failed to query session ID from target node. Ensure the targeted Teleport Node is upgraded to v19.0.0+ to avoid duplicate events due to mismatched session IDs.") } diff --git a/lib/srv/sess.go b/lib/srv/sess.go index 33033ca0b17f9..d495b2d4ca4d0 100644 --- a/lib/srv/sess.go +++ b/lib/srv/sess.go @@ -2391,7 +2391,7 @@ func (s *session) trackSession(ctx context.Context, teleportUser string, policyS // - this is a non-interactive session // - the session was initiated by a bot svc := s.registry.SessionTrackerService - if (s.registry.Srv.Component() == teleport.ComponentForwardingNode && !s.registry.Srv.GetInfo().IsOpenSSHNode()) || + if (s.registry.Srv.Component() == teleport.ComponentForwardingNode && !s.registry.Srv.GetInfo().IsOpenSSHNode() && !s.scx.proxyShouldCreateSessionTracker) || sessType == sessionTypeNonInteractive || s.scx.Identity.BotName != "" { svc = nil From eaee2b0018867f4e96b4e8d27c99c4eb9e8e0202 Mon Sep 17 00:00:00 2001 From: joerger Date: Thu, 2 Oct 2025 14:50:58 -0700 Subject: [PATCH 07/26] Remove extra major version grace period. --- constants.go | 4 ++-- lib/srv/forward/sshserver.go | 6 +++--- lib/srv/regular/sshserver.go | 4 ++-- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/constants.go b/constants.go index 1a868da4cd2f0..c52e95ad3df36 100644 --- a/constants.go +++ b/constants.go @@ -834,7 +834,7 @@ const ( // will generate and share their own session ID when a new session // is started (session and exec/shell channels accepted). // - // TODO(Joerger): DELETE IN v21.0.0 (1 extra major version grace period) + // TODO(Joerger): DELETE IN v20.0.0 // All v17+ servers set the session ID. v19+ clients stop checking. SessionIDQueryRequest = "session-id-query@goteleport.com" @@ -842,7 +842,7 @@ const ( // will generate and share their own session ID when a new session // channel is accepted, rather than when the shell/exec channel is. // - // TODO(Joerger): DELETE IN v22.0.0 (1 extra major version grace period) + // TODO(Joerger): DELETE IN v21.0.0 // all v19+ servers set the session ID directly after accepting the session channel. // clients should stop checking in v21, and servers should stop responding to the query in v22. SessionIDQueryRequestV2 = "session-id-query-v2@goteleport.com" diff --git a/lib/srv/forward/sshserver.go b/lib/srv/forward/sshserver.go index e22fa6a2401fb..f2f936716f64a 100644 --- a/lib/srv/forward/sshserver.go +++ b/lib/srv/forward/sshserver.go @@ -969,7 +969,7 @@ func (s *Server) handleGlobalRequest(ctx context.Context, req *ssh.Request) { } // Pass request on unchanged. case teleport.SessionIDQueryRequest: - // TODO(Joerger): DELETE IN v21.0.0 (1 extra major version grace period) + // TODO(Joerger): DELETE IN v20.0.0 // All v17+ servers set the session ID. v19+ clients stop checking. // Reply true to session ID query requests, we will set new @@ -980,7 +980,7 @@ func (s *Server) handleGlobalRequest(ctx context.Context, req *ssh.Request) { } return case teleport.SessionIDQueryRequestV2: - // TODO(Joerger): DELETE IN v22.0.0 (1 extra major version grace period) + // TODO(Joerger): DELETE IN v21.0.0 // clients should stop checking in v21, and servers should stop responding to the query in v22. // Reply true to session ID query requests, we will set new @@ -1177,7 +1177,7 @@ func (s *Server) handleSessionChannel(ctx context.Context, nch ssh.NewChannel) { var receiveSessionIDOnce sync.Once receivedSessionID := make(chan session.ID, 1) if s.targetServer.GetSubKind() == types.SubKindTeleportNode { - // TODO(Joerger): DELETE IN v21.0.0 (1 extra major version grace period) + // TODO(Joerger): DELETE IN v20.0.0 // all v19+ servers set the session ID directly after accepting the session channel. // clients should stop checking in v21, and servers should stop responding to the query in v22. willSendSID, _, err := s.remoteClient.SendRequest(ctx, teleport.SessionIDQueryRequestV2, true, nil) diff --git a/lib/srv/regular/sshserver.go b/lib/srv/regular/sshserver.go index 93b22a8914f6b..0078767580da5 100644 --- a/lib/srv/regular/sshserver.go +++ b/lib/srv/regular/sshserver.go @@ -1297,7 +1297,7 @@ func (s *Server) HandleRequest(ctx context.Context, ccx *sshutils.ConnectionCont } } case teleport.SessionIDQueryRequest: - // TODO(Joerger): DELETE IN v21.0.0 (1 extra major version grace period) + // TODO(Joerger): DELETE IN v20.0.0 // All v17+ servers set the session ID. v19+ clients stop checking. // Reply true to session ID query requests, we will set new @@ -1308,7 +1308,7 @@ func (s *Server) HandleRequest(ctx context.Context, ccx *sshutils.ConnectionCont } return case teleport.SessionIDQueryRequestV2: - // TODO(Joerger): DELETE IN v22.0.0 (1 extra major version grace period) + // TODO(Joerger): DELETE IN v21.0.0 // clients should stop checking in v21, and servers should stop responding to the query in v22. // Reply true to session ID query requests, we will set new From 0784ee614fda012ab3b08f7ee62bd52c16b20daa Mon Sep 17 00:00:00 2001 From: joerger Date: Thu, 2 Oct 2025 15:46:00 -0700 Subject: [PATCH 08/26] Update integration test. --- integration/integration_test.go | 48 ++++++++++++++++++++++----------- 1 file changed, 32 insertions(+), 16 deletions(-) diff --git a/integration/integration_test.go b/integration/integration_test.go index 61cbd69254b18..2927c6eaed4c1 100644 --- a/integration/integration_test.go +++ b/integration/integration_test.go @@ -531,6 +531,7 @@ func testAuditOn(t *testing.T, suite *integrationTestSuite) { } } + // Test streaming events and recording. capturedStream, sessionEvents := streamSession(ctx, t, site, sessionID) findByType := func(et string) apievents.AuditEvent { @@ -541,19 +542,6 @@ func testAuditOn(t *testing.T, suite *integrationTestSuite) { } return nil } - // helper that asserts that a session event is also included in the - // general audit log. - requireInAuditLog := func(t *testing.T, sessionEvent apievents.AuditEvent) { - t.Helper() - auditEvents, _, err := site.SearchEvents(ctx, events.SearchEventsRequest{ - To: time.Now(), - EventTypes: []string{sessionEvent.GetType()}, - }) - require.NoError(t, err) - require.True(t, slices.ContainsFunc(auditEvents, func(ae apievents.AuditEvent) bool { - return ae.GetID() == sessionEvent.GetID() - })) - } // there should always be 'session.start' event (and it must be first) first := sessionEvents[0].(*apievents.SessionStart) @@ -561,19 +549,16 @@ func testAuditOn(t *testing.T, suite *integrationTestSuite) { require.Equal(t, first, start) require.Equal(t, sessionID, start.SessionID) require.NotEmpty(t, start.TerminalSize) - requireInAuditLog(t, start) // there should always be 'session.end' event end := findByType(events.SessionEndEvent).(*apievents.SessionEnd) require.NotNil(t, end) require.Equal(t, sessionID, end.SessionID) - requireInAuditLog(t, end) // there should always be 'session.leave' event leave := findByType(events.SessionLeaveEvent).(*apievents.SessionLeave) require.NotNil(t, leave) require.Equal(t, sessionID, leave.SessionID) - requireInAuditLog(t, leave) // all of them should have a proper time for _, e := range sessionEvents { @@ -584,6 +569,37 @@ func testAuditOn(t *testing.T, suite *integrationTestSuite) { recorded := replaceNewlines(capturedStream) require.Regexp(t, ".*exit.*", recorded) require.Regexp(t, ".*echo hi.*", recorded) + + // Ensure that we find the 4 primary session events with the correct session ID and order + // and without duplicates or mismatched session IDs. + sessionEvents, _, err = site.SearchEvents(ctx, events.SearchEventsRequest{ + From: time.Time{}, + To: time.Now(), + EventTypes: []string{ + events.SessionStartEvent, + events.SessionLeaveEvent, + events.SessionEndEvent, + events.SessionDataEvent, + }, + }) + require.NoError(t, err) + require.Len(t, sessionEvents, 4) + + startEvent, ok := sessionEvents[0].(*apievents.SessionStart) + require.True(t, ok, "Expected session start event but got %s", sessionEvents[0].GetType()) + require.Equal(t, sessionID, startEvent.SessionID) + + endEvent, ok := sessionEvents[1].(*apievents.SessionEnd) + require.True(t, ok, "Expected session end event but got %s", sessionEvents[1].GetType()) + require.Equal(t, sessionID, endEvent.SessionID) + + leaveEvent, ok := sessionEvents[2].(*apievents.SessionLeave) + require.True(t, ok, "Expected session leave event but got %s", sessionEvents[2].GetType()) + require.Equal(t, sessionID, leaveEvent.SessionID) + + dataEvent, ok := sessionEvents[3].(*apievents.SessionData) + require.True(t, ok, "Expected session data event but got %s", sessionEvents[3].GetType()) + require.Equal(t, sessionID, dataEvent.SessionID) }) } } From a1276348d1189b718006e9a830006cf4d1188653 Mon Sep 17 00:00:00 2001 From: joerger Date: Thu, 2 Oct 2025 16:18:00 -0700 Subject: [PATCH 09/26] Cleanup current session ID handling and fix failing tests. --- lib/srv/ctx.go | 2 +- lib/srv/forward/sshserver.go | 84 ++++++++++++++++++++---------------- lib/srv/sess.go | 5 ++- 3 files changed, 53 insertions(+), 38 deletions(-) diff --git a/lib/srv/ctx.go b/lib/srv/ctx.go index 1c22edb722757..8849a58f412e8 100644 --- a/lib/srv/ctx.go +++ b/lib/srv/ctx.go @@ -705,7 +705,7 @@ func (c *ServerContext) GetSessionParams() tracessh.SessionParams { return sessionParams } -// proxyShouldCreateSessionTracker indicates that a session tracker should be created +// SetProxyShouldCreateSessionTracker indicates that a session tracker should be created // for a proxy forwarded session because the node failed to report its session ID after // a session channel request. // TODO(Joerger): DELETE IN v21.0.0 - All v19+ nodes report session ID after session channel diff --git a/lib/srv/forward/sshserver.go b/lib/srv/forward/sshserver.go index f2f936716f64a..1bb47ccfc58ec 100644 --- a/lib/srv/forward/sshserver.go +++ b/lib/srv/forward/sshserver.go @@ -1172,42 +1172,42 @@ func (s *Server) handleSessionChannel(ctx context.Context, nch ssh.NewChannel) { // If this is a Teleport node server, it should send the session ID // right after the session channel is accepted. We should reuse this - // session ID in order to track the node session from the proxy rather - // than creating duplicate session trackers, events, etc. on the node. - var receiveSessionIDOnce sync.Once - receivedSessionID := make(chan session.ID, 1) - if s.targetServer.GetSubKind() == types.SubKindTeleportNode { - // TODO(Joerger): DELETE IN v20.0.0 - // all v19+ servers set the session ID directly after accepting the session channel. - // clients should stop checking in v21, and servers should stop responding to the query in v22. - willSendSID, _, err := s.remoteClient.SendRequest(ctx, teleport.SessionIDQueryRequestV2, true, nil) + // session ID and delegate session responsibilities (recordings, audit + // events, and session trackers) to avoid duplicates. + willReceiveSID := s.targetServer.GetSubKind() == types.SubKindTeleportNode + + // Check if the Teleport Node is outdated and won't actually send the session ID. + // + // TODO(Joerger): DELETE IN v20.0.0 + // all v19+ servers set the session ID directly after accepting the session channel. + // clients should stop checking in v21, and servers should stop responding to the query in v22. + if willReceiveSID { + willReceiveSID, _, err = s.remoteClient.SendRequest(ctx, teleport.SessionIDQueryRequestV2, true, nil) if err != nil { s.logger.WarnContext(ctx, "Failed to send session ID query request", "error", err) } - if willSendSID { - s.remoteClient.HandleSessionRequest(ctx, teleport.CurrentSessionIDRequest, func(ctx context.Context, req *ssh.Request) { - receiveSessionIDOnce.Do(func() { - sid, err := session.ParseID(string(req.Payload)) - if err != nil { - s.logger.WarnContext(ctx, "Unable to parse session ID", "error", err) - return - } - - receivedSessionID <- *sid - close(receivedSessionID) - }) - }) - } else { - receivedSessionID <- session.NewID() - scx.SetProxyShouldCreateSessionTracker() - close(receivedSessionID) + if !willReceiveSID { s.logger.WarnContext(ctx, "Failed to query session ID from target node. Ensure the targeted Teleport Node is upgraded to v19.0.0+ to avoid duplicate events due to mismatched session IDs.") } - } else { - // This is not a Teleport node so we don't expect session ID to be reported. - receivedSessionID <- session.NewID() - close(receivedSessionID) + } + + // Register handler to receive the current session ID before starting the session. + newSessionID := make(chan session.ID, 1) + if willReceiveSID { + var receiveSessionIDOnce sync.Once + s.remoteClient.HandleSessionRequest(ctx, teleport.CurrentSessionIDRequest, func(ctx context.Context, req *ssh.Request) { + receiveSessionIDOnce.Do(func() { + sid, err := session.ParseID(string(req.Payload)) + if err != nil { + s.logger.WarnContext(ctx, "Unable to parse session ID", "error", err) + return + } + + newSessionID <- *sid + close(newSessionID) + }) + }) } // Create a "session" channel on the remote host. Note that we @@ -1240,12 +1240,24 @@ func (s *Server) handleSessionChannel(ctx context.Context, nch ssh.NewChannel) { } scx.AddCloser(ch) - // Wait for the session ID to be reported by the target node. - select { - case sid := <-receivedSessionID: - scx.SetNewSessionID(ctx, sid, ch) - case <-time.After(5 * time.Second): - s.logger.WarnContext(ctx, "Failed to receive session ID from target node. Ensure the targeted Teleport Node is upgraded to v19.0.0+ to avoid duplicate events due to mismatched session IDs.") + if willReceiveSID { + // Wait for the session ID to be reported by the target node. + select { + case sid := <-newSessionID: + scx.SetNewSessionID(ctx, sid, ch) + case <-time.After(10 * time.Second): + s.logger.WarnContext(ctx, "Failed to receive session ID from target node. Ensure the targeted Teleport Node is upgraded to v19.0.0+ to avoid duplicate events due to mismatched session IDs.") + if err := nch.Reject(ssh.ConnectionFailed, "target Teleport Node failed to report session ID"); err != nil { + s.logger.WarnContext(ctx, "Failed to reject channel", "channel", nch.ChannelType(), "error", err) + } + return + } + } else { + // The target node is not expected to report session ID, either because it's + // outdated or an agentless node. Continue with a random session ID and ensure + // we create a new session tracker. + scx.SetNewSessionID(ctx, session.NewID(), ch) + scx.SetProxyShouldCreateSessionTracker() } ch = scx.TrackActivity(ch) diff --git a/lib/srv/sess.go b/lib/srv/sess.go index d495b2d4ca4d0..667ecc9e240d6 100644 --- a/lib/srv/sess.go +++ b/lib/srv/sess.go @@ -826,7 +826,10 @@ func newSession(ctx context.Context, r *SessionRegistry, scx *ServerContext, ch sid := scx.GetNewSessionID(ctx) if sid == "" { - return nil, nil, trace.BadParameter("newSessionID should be set prior to session creation") + // Some flows (particularly tests) do not set the session ID ahead of time. + sid = rsession.NewID() + scx.SetNewSessionID(ctx, sid, ch) + r.logger.WarnContext(ctx, "newSessionID should be set prior to session creation. If this log is seen outside of tests, this is a bug.") } serverSessions.Inc() From 28fb930d7e6c5ab6ca3d887a116df86f06ce1017 Mon Sep 17 00:00:00 2001 From: joerger Date: Thu, 2 Oct 2025 17:14:08 -0700 Subject: [PATCH 10/26] Fix tests. --- lib/srv/exec_test.go | 1 + lib/srv/mock_test.go | 1 + lib/srv/regular/sshserver_test.go | 17 ++++++++++------- lib/srv/sess_test.go | 4 ++-- 4 files changed, 14 insertions(+), 9 deletions(-) diff --git a/lib/srv/exec_test.go b/lib/srv/exec_test.go index 086a43e7985a2..d5b38453e38ef 100644 --- a/lib/srv/exec_test.go +++ b/lib/srv/exec_test.go @@ -149,6 +149,7 @@ func newExecServerContext(t *testing.T, srv Server) *ServerContext { term: term, emitter: rec, recorder: rec, + scx: scx, } err = scx.SetSSHRequest(&ssh.Request{Type: sshutils.ExecRequest}) require.NoError(t, err) diff --git a/lib/srv/mock_test.go b/lib/srv/mock_test.go index ab5e7c2f49594..1de3f604cf44f 100644 --- a/lib/srv/mock_test.go +++ b/lib/srv/mock_test.go @@ -160,6 +160,7 @@ func newMockServer(t *testing.T) *mockServer { datadir: t.TempDir(), MockRecorderEmitter: &eventstest.MockRecorderEmitter{}, clock: clock, + component: teleport.ComponentNode, } } diff --git a/lib/srv/regular/sshserver_test.go b/lib/srv/regular/sshserver_test.go index 2d20a4257d2c9..38a707ce40408 100644 --- a/lib/srv/regular/sshserver_test.go +++ b/lib/srv/regular/sshserver_test.go @@ -3677,6 +3677,16 @@ func TestSessionParams(t *testing.T) { require.NoError(t, err) t.Cleanup(func() { require.NoError(t, se.Close()) }) + // If this isn't a join session, wait for the server to send the session ID. + sid := baseCase.params.JoinSessionID + if sid == "" { + select { + case sid = <-sidC: + case <-time.After(time.Second): + t.Fatalf("Failed to received session ID from server") + } + } + if sessionCase.envVars != nil { err := se.SetEnvs(ctx, sessionCase.envVars) require.NoError(t, err) @@ -3684,13 +3694,6 @@ func TestSessionParams(t *testing.T) { require.NoError(t, se.Shell(ctx)) - var sid string - select { - case sid = <-sidC: - case <-time.After(time.Second): - t.Fatalf("Failed to received session ID from server") - } - sessionTracker, err := f.ssh.srv.termHandlers.SessionRegistry.SessionTrackerService.GetSessionTracker(ctx, sid) require.NoError(t, err) diff --git a/lib/srv/sess_test.go b/lib/srv/sess_test.go index 173d96b1244c4..de6ca7f422f3f 100644 --- a/lib/srv/sess_test.go +++ b/lib/srv/sess_test.go @@ -949,7 +949,7 @@ func TestTrackingSession(t *testing.T) { interactive: true, assertion: require.NoError, createAssertion: func(t *testing.T, count int) { - require.Equal(t, 0, count) + require.Equal(t, 1, count) }, }, { @@ -969,7 +969,7 @@ func TestTrackingSession(t *testing.T) { interactive: true, assertion: require.NoError, createAssertion: func(t *testing.T, count int) { - require.Equal(t, 1, count) + require.Equal(t, 0, count) }, }, { From b387aa494945d422541f5f0eaae73ff518612a23 Mon Sep 17 00:00:00 2001 From: joerger Date: Thu, 9 Oct 2025 16:07:03 -0700 Subject: [PATCH 11/26] Address comments. --- lib/srv/ctx.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/lib/srv/ctx.go b/lib/srv/ctx.go index 8849a58f412e8..5657d6a0e9789 100644 --- a/lib/srv/ctx.go +++ b/lib/srv/ctx.go @@ -322,7 +322,7 @@ type ServerContext struct { // newSessionID is set if this server context is going to create a new session. // This field must be set through [ServerContext.SetNewSessionID] for non-join - // sessions before as soon as a session channel is accepted in order to inform + // sessions as soon as a session channel is accepted in order to inform // the client of the to-be session ID. newSessionID rsession.ID @@ -917,7 +917,8 @@ func (c *ServerContext) reportStats(conn *utils.TrackingConn) { // sessions are being recorded at the proxy (this would result in double // events). // Do not emit session data for git commands as they have their own events. - if c.GetServer().Component() == teleport.ComponentForwardingGit { + if c.GetServer().Component() == teleport.ComponentProxy || + c.GetServer().Component() == teleport.ComponentForwardingGit { return } if services.IsRecordAtProxy(c.SessionRecordingConfig.GetMode()) && From a4e934293ad2b76f45bf6726419bc4e72d1d5acf Mon Sep 17 00:00:00 2001 From: joerger Date: Thu, 9 Oct 2025 16:41:59 -0700 Subject: [PATCH 12/26] Restructure currentSessionID handling. --- lib/srv/forward/sshserver.go | 57 ++++++++++++++++++------------------ 1 file changed, 28 insertions(+), 29 deletions(-) diff --git a/lib/srv/forward/sshserver.go b/lib/srv/forward/sshserver.go index 1bb47ccfc58ec..76d28ad0bfc8a 100644 --- a/lib/srv/forward/sshserver.go +++ b/lib/srv/forward/sshserver.go @@ -1174,42 +1174,41 @@ func (s *Server) handleSessionChannel(ctx context.Context, nch ssh.NewChannel) { // right after the session channel is accepted. We should reuse this // session ID and delegate session responsibilities (recordings, audit // events, and session trackers) to avoid duplicates. - willReceiveSID := s.targetServer.GetSubKind() == types.SubKindTeleportNode - - // Check if the Teleport Node is outdated and won't actually send the session ID. // - // TODO(Joerger): DELETE IN v20.0.0 - // all v19+ servers set the session ID directly after accepting the session channel. - // clients should stop checking in v21, and servers should stop responding to the query in v22. - if willReceiveSID { - willReceiveSID, _, err = s.remoteClient.SendRequest(ctx, teleport.SessionIDQueryRequestV2, true, nil) + // Register handler to receive the current session ID before starting the session. + var newSessionIDFromServer chan session.ID + if s.targetServer.GetSubKind() == types.SubKindTeleportNode { + // Check if the Teleport Node is outdated and won't actually send the session ID. + // + // TODO(Joerger): DELETE IN v20.0.0 + // all v19+ servers set and share the session ID directly after accepting the session channel. + // clients should stop checking in v21, and servers should stop responding to the query in v22. + reply, _, err := s.remoteClient.SendRequest(ctx, teleport.SessionIDQueryRequestV2, true, nil) if err != nil { s.logger.WarnContext(ctx, "Failed to send session ID query request", "error", err) } - if !willReceiveSID { + if reply { + newSessionIDFromServer = make(chan session.ID, 1) + var receiveSessionIDOnce sync.Once + s.remoteClient.HandleSessionRequest(ctx, teleport.CurrentSessionIDRequest, func(ctx context.Context, req *ssh.Request) { + // Only handle the first request - only one is expected. + receiveSessionIDOnce.Do(func() { + sid, err := session.ParseID(string(req.Payload)) + if err != nil { + s.logger.WarnContext(ctx, "Unable to parse session ID", "error", err) + return + } + + newSessionIDFromServer <- *sid + close(newSessionIDFromServer) + }) + }) + } else { s.logger.WarnContext(ctx, "Failed to query session ID from target node. Ensure the targeted Teleport Node is upgraded to v19.0.0+ to avoid duplicate events due to mismatched session IDs.") } } - // Register handler to receive the current session ID before starting the session. - newSessionID := make(chan session.ID, 1) - if willReceiveSID { - var receiveSessionIDOnce sync.Once - s.remoteClient.HandleSessionRequest(ctx, teleport.CurrentSessionIDRequest, func(ctx context.Context, req *ssh.Request) { - receiveSessionIDOnce.Do(func() { - sid, err := session.ParseID(string(req.Payload)) - if err != nil { - s.logger.WarnContext(ctx, "Unable to parse session ID", "error", err) - return - } - - newSessionID <- *sid - close(newSessionID) - }) - }) - } - // Create a "session" channel on the remote host. Note that we // create the remote session channel before accepting the local // channel request; this allows us to propagate the rejection @@ -1240,10 +1239,10 @@ func (s *Server) handleSessionChannel(ctx context.Context, nch ssh.NewChannel) { } scx.AddCloser(ch) - if willReceiveSID { + if newSessionIDFromServer != nil { // Wait for the session ID to be reported by the target node. select { - case sid := <-newSessionID: + case sid := <-newSessionIDFromServer: scx.SetNewSessionID(ctx, sid, ch) case <-time.After(10 * time.Second): s.logger.WarnContext(ctx, "Failed to receive session ID from target node. Ensure the targeted Teleport Node is upgraded to v19.0.0+ to avoid duplicate events due to mismatched session IDs.") From 8ede4db2809d22a76045125e4c435cb27b394d9e Mon Sep 17 00:00:00 2001 From: joerger Date: Thu, 9 Oct 2025 16:49:58 -0700 Subject: [PATCH 13/26] Set newSessionID in test server context. --- lib/srv/ctx.go | 2 +- lib/srv/mock_test.go | 2 ++ lib/srv/sess.go | 10 +--------- 3 files changed, 4 insertions(+), 10 deletions(-) diff --git a/lib/srv/ctx.go b/lib/srv/ctx.go index 5657d6a0e9789..fd76ecac64391 100644 --- a/lib/srv/ctx.go +++ b/lib/srv/ctx.go @@ -734,7 +734,7 @@ func (c *ServerContext) SetNewSessionID(ctx context.Context, sid rsession.ID, ch } // GetNewSessionID gets the ID for a new session in this server context. -func (c *ServerContext) GetNewSessionID(ctx context.Context) rsession.ID { +func (c *ServerContext) GetNewSessionID() rsession.ID { c.mu.Lock() defer c.mu.Unlock() return c.newSessionID diff --git a/lib/srv/mock_test.go b/lib/srv/mock_test.go index 1de3f604cf44f..39dc19074ab6a 100644 --- a/lib/srv/mock_test.go +++ b/lib/srv/mock_test.go @@ -47,6 +47,7 @@ import ( "github.com/gravitational/teleport/lib/fixtures" "github.com/gravitational/teleport/lib/service/servicecfg" "github.com/gravitational/teleport/lib/services" + rsession "github.com/gravitational/teleport/lib/session" "github.com/gravitational/teleport/lib/sshca" "github.com/gravitational/teleport/lib/sshutils" "github.com/gravitational/teleport/lib/utils" @@ -74,6 +75,7 @@ func newTestServerContext(t *testing.T, srv Server, sessionJoiningRoleSet servic clusterName := "localhost" _, connCtx := sshutils.NewConnectionContext(ctx, nil, &ssh.ServerConn{Conn: sshConn}) scx := &ServerContext{ + newSessionID: rsession.NewID(), Logger: logtest.NewLogger(), ConnectionContext: connCtx, env: make(map[string]string), diff --git a/lib/srv/sess.go b/lib/srv/sess.go index 667ecc9e240d6..aba8525befac7 100644 --- a/lib/srv/sess.go +++ b/lib/srv/sess.go @@ -824,19 +824,11 @@ func newSession(ctx context.Context, r *SessionRegistry, scx *ServerContext, ch return nil, nil, trace.BadParameter("session creation only supported in context of ssh access or proxying permit") } - sid := scx.GetNewSessionID(ctx) - if sid == "" { - // Some flows (particularly tests) do not set the session ID ahead of time. - sid = rsession.NewID() - scx.SetNewSessionID(ctx, sid, ch) - r.logger.WarnContext(ctx, "newSessionID should be set prior to session creation. If this log is seen outside of tests, this is a bug.") - } - serverSessions.Inc() startTime := time.Now().UTC() rsess := rsession.Session{ Kind: types.SSHSessionKind, - ID: sid, + ID: scx.GetNewSessionID(), TerminalParams: rsession.TerminalParams{ W: teleport.DefaultTerminalWidth, H: teleport.DefaultTerminalHeight, From 94c278fe9e1fdf810977050d32335bf7ce98e4da Mon Sep 17 00:00:00 2001 From: joerger Date: Thu, 16 Oct 2025 18:44:15 -0700 Subject: [PATCH 14/26] Fix integration test. --- lib/srv/forward/sshserver.go | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/lib/srv/forward/sshserver.go b/lib/srv/forward/sshserver.go index 76d28ad0bfc8a..001c9b6684bbb 100644 --- a/lib/srv/forward/sshserver.go +++ b/lib/srv/forward/sshserver.go @@ -1183,9 +1183,18 @@ func (s *Server) handleSessionChannel(ctx context.Context, nch ssh.NewChannel) { // TODO(Joerger): DELETE IN v20.0.0 // all v19+ servers set and share the session ID directly after accepting the session channel. // clients should stop checking in v21, and servers should stop responding to the query in v22. - reply, _, err := s.remoteClient.SendRequest(ctx, teleport.SessionIDQueryRequestV2, true, nil) + reply, payload, err := s.remoteClient.SendRequest(ctx, teleport.SessionIDQueryRequestV2, true, nil) if err != nil { s.logger.WarnContext(ctx, "Failed to send session ID query request", "error", err) + } else if !reply && payload != nil { + // If the target node replies with a payload, this means that the connection itself has been rejected, + // presumably due to an authz error, and the server is trying to communicate the error with the first + // req/chan received. + s.logger.WarnContext(ctx, "Remote session open failed", "error", err) + if err := nch.Reject(ssh.Prohibited, fmt.Sprintf("remote session open failed: %v", string(payload))); err != nil { + s.logger.WarnContext(ctx, "Failed to reject channel", "channel", nch.ChannelType(), "error", err) + } + return } if reply { From 33cd6e9138edf601167c6249f3dac7f8cd033c54 Mon Sep 17 00:00:00 2001 From: joerger Date: Fri, 17 Oct 2025 18:10:50 -0700 Subject: [PATCH 15/26] Fix AuditOn integration test. --- integration/integration_test.go | 30 ++++++++++++++---------------- 1 file changed, 14 insertions(+), 16 deletions(-) diff --git a/integration/integration_test.go b/integration/integration_test.go index 2927c6eaed4c1..10f859fc2d814 100644 --- a/integration/integration_test.go +++ b/integration/integration_test.go @@ -583,23 +583,21 @@ func testAuditOn(t *testing.T, suite *integrationTestSuite) { }, }) require.NoError(t, err) - require.Len(t, sessionEvents, 4) - startEvent, ok := sessionEvents[0].(*apievents.SessionStart) - require.True(t, ok, "Expected session start event but got %s", sessionEvents[0].GetType()) - require.Equal(t, sessionID, startEvent.SessionID) - - endEvent, ok := sessionEvents[1].(*apievents.SessionEnd) - require.True(t, ok, "Expected session end event but got %s", sessionEvents[1].GetType()) - require.Equal(t, sessionID, endEvent.SessionID) - - leaveEvent, ok := sessionEvents[2].(*apievents.SessionLeave) - require.True(t, ok, "Expected session leave event but got %s", sessionEvents[2].GetType()) - require.Equal(t, sessionID, leaveEvent.SessionID) - - dataEvent, ok := sessionEvents[3].(*apievents.SessionData) - require.True(t, ok, "Expected session data event but got %s", sessionEvents[3].GetType()) - require.Equal(t, sessionID, dataEvent.SessionID) + // Check that the events found above in the session stream show up in the backend. + require.True(t, slices.ContainsFunc(sessionEvents, func(ae apievents.AuditEvent) bool { + return ae.GetID() == start.GetID() + }), "expected session events to contain session.start event") + require.True(t, slices.ContainsFunc(sessionEvents, func(ae apievents.AuditEvent) bool { + return ae.GetID() == end.GetID() + }), "expected session events to contain session.end event") + require.True(t, slices.ContainsFunc(sessionEvents, func(ae apievents.AuditEvent) bool { + return ae.GetID() == leave.GetID() + }), "expected session events to contain session.leave event") + + // Ensure there are no duplicate events, e.g. from proxy recording mode. + // The "session data" event may or may not be available yet, so the length may be 3 or 4. + require.True(t, len(sessionEvents) <= 4, "%d unexpected duplicate events", len(sessionEvents)-4) }) } } From 3fb3d846c2c5d2d6c32ecd8a73b4c2b77d718e52 Mon Sep 17 00:00:00 2001 From: joerger Date: Wed, 5 Nov 2025 12:59:32 -0800 Subject: [PATCH 16/26] Address comment on channel close. --- lib/srv/forward/sshserver.go | 1 - 1 file changed, 1 deletion(-) diff --git a/lib/srv/forward/sshserver.go b/lib/srv/forward/sshserver.go index 001c9b6684bbb..2f7a9dba011f5 100644 --- a/lib/srv/forward/sshserver.go +++ b/lib/srv/forward/sshserver.go @@ -1210,7 +1210,6 @@ func (s *Server) handleSessionChannel(ctx context.Context, nch ssh.NewChannel) { } newSessionIDFromServer <- *sid - close(newSessionIDFromServer) }) }) } else { From 5dc9efece8201c50c3fe65197fdd8ca8e968064f Mon Sep 17 00:00:00 2001 From: joerger Date: Thu, 6 Nov 2025 12:14:19 -0800 Subject: [PATCH 17/26] Track session on forwarding node. --- integration/integration_test.go | 57 +++++++++++++++++---------------- lib/srv/ctx.go | 25 ++++++--------- lib/srv/exec.go | 8 ++--- lib/srv/forward/sshserver.go | 1 - lib/srv/sess.go | 27 ++++++++-------- lib/srv/sess_test.go | 10 +++--- lib/srv/term.go | 4 +-- 7 files changed, 61 insertions(+), 71 deletions(-) diff --git a/integration/integration_test.go b/integration/integration_test.go index 10f859fc2d814..d47793987847d 100644 --- a/integration/integration_test.go +++ b/integration/integration_test.go @@ -570,34 +570,37 @@ func testAuditOn(t *testing.T, suite *integrationTestSuite) { require.Regexp(t, ".*exit.*", recorded) require.Regexp(t, ".*echo hi.*", recorded) - // Ensure that we find the 4 primary session events with the correct session ID and order - // and without duplicates or mismatched session IDs. - sessionEvents, _, err = site.SearchEvents(ctx, events.SearchEventsRequest{ - From: time.Time{}, - To: time.Now(), - EventTypes: []string{ - events.SessionStartEvent, - events.SessionLeaveEvent, - events.SessionEndEvent, - events.SessionDataEvent, - }, - }) - require.NoError(t, err) + // Ensure that we find the 4 primary session without duplicates. + require.EventuallyWithT(t, func(collect *assert.CollectT) { + sessionEvents, _, err = site.SearchEvents(ctx, events.SearchEventsRequest{ + From: time.Time{}, + To: time.Now(), + EventTypes: []string{ + events.SessionStartEvent, + events.SessionLeaveEvent, + events.SessionEndEvent, + events.SessionDataEvent, + }, + }) + require.NoError(t, err) - // Check that the events found above in the session stream show up in the backend. - require.True(t, slices.ContainsFunc(sessionEvents, func(ae apievents.AuditEvent) bool { - return ae.GetID() == start.GetID() - }), "expected session events to contain session.start event") - require.True(t, slices.ContainsFunc(sessionEvents, func(ae apievents.AuditEvent) bool { - return ae.GetID() == end.GetID() - }), "expected session events to contain session.end event") - require.True(t, slices.ContainsFunc(sessionEvents, func(ae apievents.AuditEvent) bool { - return ae.GetID() == leave.GetID() - }), "expected session events to contain session.leave event") - - // Ensure there are no duplicate events, e.g. from proxy recording mode. - // The "session data" event may or may not be available yet, so the length may be 3 or 4. - require.True(t, len(sessionEvents) <= 4, "%d unexpected duplicate events", len(sessionEvents)-4) + // Check that the events found above in the session stream show up in the backend. + require.True(t, slices.ContainsFunc(sessionEvents, func(ae apievents.AuditEvent) bool { + return ae.GetID() == start.GetID() + }), "expected session events to contain session.start event") + require.True(t, slices.ContainsFunc(sessionEvents, func(ae apievents.AuditEvent) bool { + return ae.GetID() == end.GetID() + }), "expected session events to contain session.end event") + require.True(t, slices.ContainsFunc(sessionEvents, func(ae apievents.AuditEvent) bool { + return ae.GetID() == leave.GetID() + }), "expected session events to contain session.leave event") + require.True(t, slices.ContainsFunc(sessionEvents, func(ae apievents.AuditEvent) bool { + return ae.GetType() == events.SessionDataEvent + }), "expected session events to contain session.data event") + + // Ensure there are no duplicate events, e.g. from proxy recording mode. + require.Len(t, sessionEvents, 4, "%d unexpected duplicate events", len(sessionEvents)-4) + }, 10*time.Second, 100*time.Millisecond) }) } } diff --git a/lib/srv/ctx.go b/lib/srv/ctx.go index fd76ecac64391..3a8d773468498 100644 --- a/lib/srv/ctx.go +++ b/lib/srv/ctx.go @@ -326,12 +326,6 @@ type ServerContext struct { // the client of the to-be session ID. newSessionID rsession.ID - // proxyShouldCreateSessionTracker indicates that a session tracker should be created - // for a proxy forwarded session because the node failed to report its session ID after - // a session channel request. - // TODO(Joerger): DELETE IN v21.0.0 - All v19+ nodes report session ID after session channel - proxyShouldCreateSessionTracker bool - // session holds the active session (if there's an active one). session *session @@ -705,16 +699,6 @@ func (c *ServerContext) GetSessionParams() tracessh.SessionParams { return sessionParams } -// SetProxyShouldCreateSessionTracker indicates that a session tracker should be created -// for a proxy forwarded session because the node failed to report its session ID after -// a session channel request. -// TODO(Joerger): DELETE IN v21.0.0 - All v19+ nodes report session ID after session channel -func (c *ServerContext) SetProxyShouldCreateSessionTracker() { - c.mu.Lock() - defer c.mu.Unlock() - c.proxyShouldCreateSessionTracker = true -} - // SetNewSessionID sets the ID for a new session in this server context. func (c *ServerContext) SetNewSessionID(ctx context.Context, sid rsession.ID, ch ssh.Channel) { c.mu.Lock() @@ -960,6 +944,15 @@ func (c *ServerContext) reportStats(conn *utils.TrackingConn) { serverRX.Add(float64(rxBytes)) } +// shouldHandleRecording returns whether this server context is responsible for +// recording session events, including session recording, audit events, and session tracking. +func (c *ServerContext) ShouldHandleSessionRecording() bool { + // The only time this server is not responsible for recording the session is when this + // is a Teleport Node with Proxy recording mode turned on, where the forwarding node will + // handle the recording. + return c.srv.Component() != teleport.ComponentNode || !services.IsRecordAtProxy(c.SessionRecordingConfig.GetMode()) +} + func (c *ServerContext) Close() error { // If the underlying connection is holding tracking information, report that // to the audit log at close. diff --git a/lib/srv/exec.go b/lib/srv/exec.go index 8daf6c7c1c31f..2fab90d5a1a1c 100644 --- a/lib/srv/exec.go +++ b/lib/srv/exec.go @@ -39,10 +39,8 @@ import ( "github.com/gravitational/teleport" tracessh "github.com/gravitational/teleport/api/observability/tracing/ssh" - "github.com/gravitational/teleport/api/types" apievents "github.com/gravitational/teleport/api/types/events" "github.com/gravitational/teleport/lib/events" - "github.com/gravitational/teleport/lib/services" "github.com/gravitational/teleport/lib/utils" ) @@ -102,10 +100,8 @@ func NewExecRequest(ctx *ServerContext, command string) (Exec, error) { }, nil } - // If this is a registered OpenSSH node or proxy recoding mode is - // enabled, execute the command on a remote host. This is used by - // in-memory forwarding nodes. - if types.IsOpenSSHNodeSubKind(ctx.ServerSubKind) || services.IsRecordAtProxy(ctx.SessionRecordingConfig.GetMode()) { + // If this is a forwarding node, execute the command on a remote host. + if ctx.srv.Component() == teleport.ComponentForwardingNode { return &remoteExec{ ctx: ctx, command: command, diff --git a/lib/srv/forward/sshserver.go b/lib/srv/forward/sshserver.go index 2f7a9dba011f5..b756e43054717 100644 --- a/lib/srv/forward/sshserver.go +++ b/lib/srv/forward/sshserver.go @@ -1264,7 +1264,6 @@ func (s *Server) handleSessionChannel(ctx context.Context, nch ssh.NewChannel) { // outdated or an agentless node. Continue with a random session ID and ensure // we create a new session tracker. scx.SetNewSessionID(ctx, session.NewID(), ch) - scx.SetProxyShouldCreateSessionTracker() } ch = scx.TrackActivity(ch) diff --git a/lib/srv/sess.go b/lib/srv/sess.go index aba8525befac7..3963ce14fc2bc 100644 --- a/lib/srv/sess.go +++ b/lib/srv/sess.go @@ -884,6 +884,11 @@ func newSession(ctx context.Context, r *SessionRegistry, scx *ServerContext, ch sess.io.OnWriteError = sess.onWriteErrorCallback(sessionRecordingMode) + // Nodes discard events in cases when proxies are already recording them. + if !sess.shouldHandleRecording() { + sess.emitter = events.NewDiscardAuditLog() + } + go func() { if _, open := <-sess.io.TerminateNotifier(); open { err := sess.registry.ForceTerminate(sess.scx) @@ -1273,6 +1278,10 @@ func (s *session) emitSessionEndEvent() { } } +func (s *session) shouldHandleRecording() bool { + return s.scx.ShouldHandleSessionRecording() +} + func (s *session) sessionRecordingLocation() string { sessionRecMode := s.scx.SessionRecordingConfig.GetMode() subKind := s.serverMeta.ServerSubKind @@ -1563,9 +1572,8 @@ func newRecorder(s *session, ctx *ServerContext) (events.SessionPreparerRecorder return nil, trace.BadParameter("session recorder creation only supported in context of ssh access or proxying permit") } - // Nodes discard events in cases when proxies are already recording them. - if s.registry.Srv.Component() == teleport.ComponentNode && - services.IsRecordAtProxy(ctx.SessionRecordingConfig.GetMode()) { + // Don't record on the Node when the proxy forwarding node is already recording. + if !s.shouldHandleRecording() { s.logger.DebugContext(s.serverCtx, "Session will be recorded at proxy.") return events.WithNoOpPreparer(events.NewDiscardRecorder()), nil } @@ -2381,14 +2389,12 @@ func (s *session) trackSession(ctx context.Context, teleportUser string, policyS Invited: s.scx.GetSessionParams().Invited, } - // Don't propagate the session tracker if: - // - This is a proxy forwarding server for a Teleport Node (tracking is handled by the target node server) + // Don't propagate the session tracker to the backend if: + // - this is a Teleport Node and proxy recording mode is turned on (tracking is handled by the proxy forwarding node) // - this is a non-interactive session // - the session was initiated by a bot svc := s.registry.SessionTrackerService - if (s.registry.Srv.Component() == teleport.ComponentForwardingNode && !s.registry.Srv.GetInfo().IsOpenSSHNode() && !s.scx.proxyShouldCreateSessionTracker) || - sessType == sessionTypeNonInteractive || - s.scx.Identity.BotName != "" { + if !s.shouldHandleRecording() || sessType == sessionTypeNonInteractive || s.scx.Identity.BotName != "" { svc = nil } @@ -2440,11 +2446,6 @@ func (s *session) trackSession(ctx context.Context, teleportUser string, policyS // emitAuditEvent emits audit events. func (s *session) emitAuditEvent(ctx context.Context, event apievents.AuditEvent) error { - // Nodes discard audit events in cases when proxies are already emitting them. - if s.scx.srv.Component() == teleport.ComponentNode && services.IsRecordAtProxy(s.scx.SessionRecordingConfig.GetMode()) { - return nil - } - return s.emitter.EmitAuditEvent(ctx, event) } diff --git a/lib/srv/sess_test.go b/lib/srv/sess_test.go index de6ca7f422f3f..f9e1f3e527c6a 100644 --- a/lib/srv/sess_test.go +++ b/lib/srv/sess_test.go @@ -949,7 +949,7 @@ func TestTrackingSession(t *testing.T) { interactive: true, assertion: require.NoError, createAssertion: func(t *testing.T, count int) { - require.Equal(t, 1, count) + require.Equal(t, 0, count) }, }, { @@ -963,23 +963,23 @@ func TestTrackingSession(t *testing.T) { }, }, { - name: "proxy with proxy recording mode", + name: "forwarding node with proxy recording mode", component: teleport.ComponentForwardingNode, recordingMode: types.RecordAtProxy, interactive: true, assertion: require.NoError, createAssertion: func(t *testing.T, count int) { - require.Equal(t, 0, count) + require.Equal(t, 1, count) }, }, { - name: "proxy with node recording mode", + name: "forwarding node with node recording mode (agentless)", component: teleport.ComponentForwardingNode, recordingMode: types.RecordAtNode, interactive: true, assertion: require.NoError, createAssertion: func(t *testing.T, count int) { - require.Equal(t, 0, count) + require.Equal(t, 1, count) }, }, { diff --git a/lib/srv/term.go b/lib/srv/term.go index 6b662606fbcb4..c0a4632497e21 100644 --- a/lib/srv/term.go +++ b/lib/srv/term.go @@ -38,8 +38,6 @@ import ( "github.com/gravitational/teleport" tracessh "github.com/gravitational/teleport/api/observability/tracing/ssh" - "github.com/gravitational/teleport/api/types" - "github.com/gravitational/teleport/lib/services" rsession "github.com/gravitational/teleport/lib/session" ) @@ -120,7 +118,7 @@ func NewTerminal(ctx *ServerContext) (Terminal, error) { // If this is not a Teleport node, find out what mode the cluster is in and // return the correct terminal. - if types.IsOpenSSHNodeSubKind(ctx.ServerSubKind) || services.IsRecordAtProxy(ctx.SessionRecordingConfig.GetMode()) { + if ctx.srv.Component() == teleport.ComponentForwardingNode { return newRemoteTerminal(ctx) } return newLocalTerminal(ctx) From 497605b4cbffc2649fd9c438a16636cce8c57186 Mon Sep 17 00:00:00 2001 From: joerger Date: Thu, 6 Nov 2025 12:38:43 -0800 Subject: [PATCH 18/26] Fix web shutdown. --- lib/web/terminal.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/lib/web/terminal.go b/lib/web/terminal.go index fa6e90d4aaefe..d3c5f7d45420b 100644 --- a/lib/web/terminal.go +++ b/lib/web/terminal.go @@ -861,6 +861,8 @@ func (t *TerminalHandler) streamTerminal(ctx context.Context, tc *client.Telepor // wait in a new goroutine because the server won't set a // session ID until we start the session. go func() { + defer close(sessionDataSent) + ctx, cancel := context.WithTimeout(writeSessionCtx, 10*time.Second) defer cancel() From bc936ed13615f3f08f60ea284cb7885e15d59478 Mon Sep 17 00:00:00 2001 From: joerger Date: Thu, 6 Nov 2025 15:57:37 -0800 Subject: [PATCH 19/26] Fix nil pointer dereference in test. --- lib/srv/mock_test.go | 3 +++ lib/srv/sess.go | 5 +++-- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/lib/srv/mock_test.go b/lib/srv/mock_test.go index 39dc19074ab6a..5b48830edc04b 100644 --- a/lib/srv/mock_test.go +++ b/lib/srv/mock_test.go @@ -95,6 +95,9 @@ func newTestServerContext(t *testing.T, srv Server, sessionJoiningRoleSet servic }, cancelContext: ctx, cancel: cancel, + // If proxy forwarding is being used (proxy recording, agentless), then remote session must be set. + // Otherwise, this field is ignored. + RemoteSession: mockSSHSession(t), } err = scx.SetExecRequest(&localExec{Ctx: scx}) diff --git a/lib/srv/sess.go b/lib/srv/sess.go index 3963ce14fc2bc..e2d680b5fc413 100644 --- a/lib/srv/sess.go +++ b/lib/srv/sess.go @@ -1540,12 +1540,13 @@ func (s *session) startTerminal(ctx context.Context, scx *ServerContext) error { // allocate a terminal or take the one previously allocated via a // separate "allocate TTY" SSH request - var err error if s.term = scx.GetTerm(); s.term != nil { scx.SetTerm(nil) - } else if s.term, err = NewTerminal(scx); err != nil { + } else if term, err := NewTerminal(scx); err != nil { s.logger.InfoContext(ctx, "Unable to allocate new terminal.", "error", err) return trace.Wrap(err) + } else { + s.term = term } if err := s.term.Run(ctx); err != nil { From 7ac24834cf4c61820949649359b2db45790f42c6 Mon Sep 17 00:00:00 2001 From: joerger Date: Thu, 6 Nov 2025 16:10:59 -0800 Subject: [PATCH 20/26] Fix test flake. --- integration/integration_test.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/integration/integration_test.go b/integration/integration_test.go index d47793987847d..ea2061cc8f9ef 100644 --- a/integration/integration_test.go +++ b/integration/integration_test.go @@ -582,24 +582,24 @@ func testAuditOn(t *testing.T, suite *integrationTestSuite) { events.SessionDataEvent, }, }) - require.NoError(t, err) + require.NoError(collect, err) // Check that the events found above in the session stream show up in the backend. - require.True(t, slices.ContainsFunc(sessionEvents, func(ae apievents.AuditEvent) bool { + require.True(collect, slices.ContainsFunc(sessionEvents, func(ae apievents.AuditEvent) bool { return ae.GetID() == start.GetID() }), "expected session events to contain session.start event") - require.True(t, slices.ContainsFunc(sessionEvents, func(ae apievents.AuditEvent) bool { + require.True(collect, slices.ContainsFunc(sessionEvents, func(ae apievents.AuditEvent) bool { return ae.GetID() == end.GetID() }), "expected session events to contain session.end event") - require.True(t, slices.ContainsFunc(sessionEvents, func(ae apievents.AuditEvent) bool { + require.True(collect, slices.ContainsFunc(sessionEvents, func(ae apievents.AuditEvent) bool { return ae.GetID() == leave.GetID() }), "expected session events to contain session.leave event") - require.True(t, slices.ContainsFunc(sessionEvents, func(ae apievents.AuditEvent) bool { + require.True(collect, slices.ContainsFunc(sessionEvents, func(ae apievents.AuditEvent) bool { return ae.GetType() == events.SessionDataEvent }), "expected session events to contain session.data event") // Ensure there are no duplicate events, e.g. from proxy recording mode. - require.Len(t, sessionEvents, 4, "%d unexpected duplicate events", len(sessionEvents)-4) + require.Len(collect, sessionEvents, 4, "%d unexpected duplicate events", len(sessionEvents)-4) }, 10*time.Second, 100*time.Millisecond) }) } From e77fe859984aba4d974d4cc43ca4fdf4d5af6110 Mon Sep 17 00:00:00 2001 From: joerger Date: Thu, 6 Nov 2025 17:00:06 -0800 Subject: [PATCH 21/26] Fix nil pointer in test. --- lib/srv/sess.go | 6 +-- lib/srv/sess_test.go | 115 +++++++++++++++++-------------------------- 2 files changed, 48 insertions(+), 73 deletions(-) diff --git a/lib/srv/sess.go b/lib/srv/sess.go index e2d680b5fc413..0b41213b4e82c 100644 --- a/lib/srv/sess.go +++ b/lib/srv/sess.go @@ -911,7 +911,7 @@ func newSession(ctx context.Context, r *SessionRegistry, scx *ServerContext, ch return nil, nil, trace.Wrap(err) } - sess.recorder, err = newRecorder(sess, scx) + sess.recorder, err = newRecorder(sess, scx, sessType) if err != nil { return nil, nil, trace.Wrap(err) } @@ -1559,7 +1559,7 @@ func (s *session) startTerminal(ctx context.Context, scx *ServerContext) error { // newRecorder creates a new [events.SessionPreparerRecorder] to be used as the recorder // of the passed in session. -func newRecorder(s *session, ctx *ServerContext) (events.SessionPreparerRecorder, error) { +func newRecorder(s *session, ctx *ServerContext, sessType sessionType) (events.SessionPreparerRecorder, error) { // determine session recording mode. in theory we could choose to only do this in the unhappy // path since thats the only place we use it, but we do it here to ensure that any test coverage // we have for this code will always enforce its permit requirements. @@ -1580,7 +1580,7 @@ func newRecorder(s *session, ctx *ServerContext) (events.SessionPreparerRecorder } // Don't record non-interactive sessions when enhanced recording is disabled. - if ctx.GetTerm() == nil && !ctx.srv.GetBPF().Enabled() { + if sessType == sessionTypeNonInteractive && !ctx.srv.GetBPF().Enabled() { return events.WithNoOpPreparer(events.NewDiscardRecorder()), nil } diff --git a/lib/srv/sess_test.go b/lib/srv/sess_test.go index f9e1f3e527c6a..542aa3583f0da 100644 --- a/lib/srv/sess_test.go +++ b/lib/srv/sess_test.go @@ -203,29 +203,20 @@ func TestSession_newRecorder(t *testing.T) { } cases := []struct { - desc string - sess *session - sctx *ServerContext - errAssertion require.ErrorAssertionFunc - recAssertion require.ValueAssertionFunc + desc string + sctx *ServerContext + noninteractive bool + errAssertion require.ErrorAssertionFunc + recAssertion require.ValueAssertionFunc }{ { desc: "discard-stream-when-proxy-recording", - sess: &session{ - id: "test", - logger: logger, - registry: &SessionRegistry{ - logger: logtest.NewLogger(), - SessionRegistryConfig: SessionRegistryConfig{ - Srv: &mockServer{ - component: teleport.ComponentNode, - }, - }, - }, - }, + sctx: &ServerContext{ SessionRecordingConfig: proxyRecording, - term: &terminal{}, + srv: &mockServer{ + component: teleport.ComponentNode, + }, Identity: IdentityContext{ AccessPermit: &decisionpb.SSHAccessPermit{}, }, @@ -234,22 +225,27 @@ func TestSession_newRecorder(t *testing.T) { recAssertion: isNotSessionWriter, }, { - desc: "discard-stream--when-proxy-sync-recording", - sess: &session{ - id: "test", - logger: logger, - registry: &SessionRegistry{ - logger: logtest.NewLogger(), - SessionRegistryConfig: SessionRegistryConfig{ - Srv: &mockServer{ - component: teleport.ComponentNode, - }, - }, + desc: "discard-stream-when-proxy-sync-recording", + sctx: &ServerContext{ + SessionRecordingConfig: proxyRecordingSync, + srv: &mockServer{ + component: teleport.ComponentNode, + }, + Identity: IdentityContext{ + AccessPermit: &decisionpb.SSHAccessPermit{}, }, }, + errAssertion: require.NoError, + recAssertion: isNotSessionWriter, + }, + { + desc: "discard-stream-when-non-interactive-non-bpf", + noninteractive: true, sctx: &ServerContext{ SessionRecordingConfig: proxyRecordingSync, - term: &terminal{}, + srv: &mockServer{ + component: teleport.ComponentNode, + }, Identity: IdentityContext{ AccessPermit: &decisionpb.SSHAccessPermit{}, }, @@ -259,24 +255,11 @@ func TestSession_newRecorder(t *testing.T) { }, { desc: "strict-err-new-audit-writer-fails", - sess: &session{ - id: "test", - logger: logger, - registry: &SessionRegistry{ - logger: logtest.NewLogger(), - SessionRegistryConfig: SessionRegistryConfig{ - Srv: &mockServer{ - component: teleport.ComponentNode, - }, - }, - }, - }, sctx: &ServerContext{ SessionRecordingConfig: nodeRecordingSync, srv: &mockServer{ component: teleport.ComponentNode, }, - term: &terminal{}, Identity: IdentityContext{ AccessPermit: &decisionpb.SSHAccessPermit{ SessionRecordingMode: string(constants.SessionRecordingModeStrict), @@ -288,18 +271,6 @@ func TestSession_newRecorder(t *testing.T) { }, { desc: "best-effort-err-new-audit-writer-succeeds", - sess: &session{ - id: "test", - logger: logger, - registry: &SessionRegistry{ - logger: logtest.NewLogger(), - SessionRegistryConfig: SessionRegistryConfig{ - Srv: &mockServer{ - component: teleport.ComponentNode, - }, - }, - }, - }, sctx: &ServerContext{ ClusterName: "test", SessionRecordingConfig: nodeRecordingSync, @@ -312,7 +283,6 @@ func TestSession_newRecorder(t *testing.T) { SessionRecordingMode: string(constants.SessionRecordingModeBestEffort), }, }, - term: &terminal{}, }, errAssertion: require.NoError, recAssertion: func(t require.TestingT, i any, _ ...any) { @@ -324,29 +294,17 @@ func TestSession_newRecorder(t *testing.T) { }, { desc: "audit-writer", - sess: &session{ - id: "test", - logger: logger, - registry: &SessionRegistry{ - logger: logtest.NewLogger(), - SessionRegistryConfig: SessionRegistryConfig{ - Srv: &mockServer{ - component: teleport.ComponentNode, - }, - }, - }, - }, sctx: &ServerContext{ ClusterName: "test", SessionRecordingConfig: nodeRecordingSync, srv: &mockServer{ MockRecorderEmitter: &eventstest.MockRecorderEmitter{}, datadir: t.TempDir(), + component: teleport.ComponentNode, }, Identity: IdentityContext{ AccessPermit: &decisionpb.SSHAccessPermit{}, }, - term: &terminal{}, }, errAssertion: require.NoError, recAssertion: func(t require.TestingT, i any, i2 ...any) { @@ -360,7 +318,24 @@ func TestSession_newRecorder(t *testing.T) { for _, tt := range cases { t.Run(tt.desc, func(t *testing.T) { - rec, err := newRecorder(tt.sess, tt.sctx) + sess := &session{ + id: "test", + logger: logger, + registry: &SessionRegistry{ + logger: logtest.NewLogger(), + SessionRegistryConfig: SessionRegistryConfig{ + Srv: tt.sctx.srv, + }, + }, + scx: tt.sctx, + } + + sessType := sessionTypeInteractive + if tt.noninteractive { + sessType = sessionTypeNonInteractive + } + + rec, err := newRecorder(sess, tt.sctx, sessType) tt.errAssertion(t, err) tt.recAssertion(t, rec) }) From 4d3a6ea55c6d3dc21c5fb0e0f915c4b2fee6b88e Mon Sep 17 00:00:00 2001 From: joerger Date: Thu, 6 Nov 2025 17:54:04 -0800 Subject: [PATCH 22/26] Fix test flake. --- integration/integration_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integration/integration_test.go b/integration/integration_test.go index ea2061cc8f9ef..2c0b9f0d73255 100644 --- a/integration/integration_test.go +++ b/integration/integration_test.go @@ -600,7 +600,7 @@ func testAuditOn(t *testing.T, suite *integrationTestSuite) { // Ensure there are no duplicate events, e.g. from proxy recording mode. require.Len(collect, sessionEvents, 4, "%d unexpected duplicate events", len(sessionEvents)-4) - }, 10*time.Second, 100*time.Millisecond) + }, 20*time.Second, 100*time.Millisecond) }) } } From 4ccb96910795c14b5bd3a74c4f5aeae470adb68f Mon Sep 17 00:00:00 2001 From: Brian Joerger Date: Fri, 7 Nov 2025 11:18:30 -0800 Subject: [PATCH 23/26] Update lib/srv/ctx.go Co-authored-by: rosstimothy <39066650+rosstimothy@users.noreply.github.com> --- lib/srv/ctx.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/srv/ctx.go b/lib/srv/ctx.go index 3a8d773468498..90bef256ee70e 100644 --- a/lib/srv/ctx.go +++ b/lib/srv/ctx.go @@ -944,7 +944,7 @@ func (c *ServerContext) reportStats(conn *utils.TrackingConn) { serverRX.Add(float64(rxBytes)) } -// shouldHandleRecording returns whether this server context is responsible for +// ShouldHandleRecording returns whether this server context is responsible for // recording session events, including session recording, audit events, and session tracking. func (c *ServerContext) ShouldHandleSessionRecording() bool { // The only time this server is not responsible for recording the session is when this From e0e68881fc4f2df7f892e145f6887e2e0fdcf838 Mon Sep 17 00:00:00 2001 From: joerger Date: Mon, 10 Nov 2025 16:55:12 -0800 Subject: [PATCH 24/26] Forwarding Node accepts client connection after receiving preparing session ID from node. This way, the forwarder can reject client connections if there is an issue preparing the session ID (impossible join sessions). --- lib/srv/ctx.go | 13 +------------ lib/srv/forward/sshserver.go | 35 ++++++++++++++++++++++------------- lib/srv/regular/sshserver.go | 13 ++++++++++++- 3 files changed, 35 insertions(+), 26 deletions(-) diff --git a/lib/srv/ctx.go b/lib/srv/ctx.go index 90bef256ee70e..6431fa246edba 100644 --- a/lib/srv/ctx.go +++ b/lib/srv/ctx.go @@ -700,21 +700,10 @@ func (c *ServerContext) GetSessionParams() tracessh.SessionParams { } // SetNewSessionID sets the ID for a new session in this server context. -func (c *ServerContext) SetNewSessionID(ctx context.Context, sid rsession.ID, ch ssh.Channel) { +func (c *ServerContext) SetNewSessionID(ctx context.Context, sid rsession.ID) { c.mu.Lock() defer c.mu.Unlock() - c.newSessionID = sid - - // inform the client of the session ID that is going to be used in a new - // goroutine to reduce latency. - go func() { - c.Logger.DebugContext(ctx, "Sending current session ID") - _, err := ch.SendRequest(teleport.CurrentSessionIDRequest, false, []byte(c.newSessionID)) - if err != nil { - c.Logger.DebugContext(ctx, "Failed to send the current session ID", "error", err) - } - }() } // GetNewSessionID gets the ID for a new session in this server context. diff --git a/lib/srv/forward/sshserver.go b/lib/srv/forward/sshserver.go index b756e43054717..7f9cef0e8ce6f 100644 --- a/lib/srv/forward/sshserver.go +++ b/lib/srv/forward/sshserver.go @@ -1236,22 +1236,11 @@ func (s *Server) handleSessionChannel(ctx context.Context, nch ssh.NewChannel) { } scx.RemoteSession = remoteSession - // Accept the session channel request - ch, in, err := nch.Accept() - if err != nil { - s.logger.WarnContext(ctx, "Unable to accept channel", "channel", nch.ChannelType(), "error", err) - if err := nch.Reject(ssh.ConnectionFailed, fmt.Sprintf("unable to accept channel: %v", err)); err != nil { - s.logger.WarnContext(ctx, "Failed to reject channel", "channel", nch.ChannelType(), "error", err) - } - return - } - scx.AddCloser(ch) - if newSessionIDFromServer != nil { // Wait for the session ID to be reported by the target node. select { case sid := <-newSessionIDFromServer: - scx.SetNewSessionID(ctx, sid, ch) + scx.SetNewSessionID(ctx, sid) case <-time.After(10 * time.Second): s.logger.WarnContext(ctx, "Failed to receive session ID from target node. Ensure the targeted Teleport Node is upgraded to v19.0.0+ to avoid duplicate events due to mismatched session IDs.") if err := nch.Reject(ssh.ConnectionFailed, "target Teleport Node failed to report session ID"); err != nil { @@ -1263,11 +1252,31 @@ func (s *Server) handleSessionChannel(ctx context.Context, nch ssh.NewChannel) { // The target node is not expected to report session ID, either because it's // outdated or an agentless node. Continue with a random session ID and ensure // we create a new session tracker. - scx.SetNewSessionID(ctx, session.NewID(), ch) + scx.SetNewSessionID(ctx, session.NewID()) } + // Accept the session channel request + ch, in, err := nch.Accept() + if err != nil { + s.logger.WarnContext(ctx, "Unable to accept channel", "channel", nch.ChannelType(), "error", err) + if err := nch.Reject(ssh.ConnectionFailed, fmt.Sprintf("unable to accept channel: %v", err)); err != nil { + s.logger.WarnContext(ctx, "Failed to reject channel", "channel", nch.ChannelType(), "error", err) + } + return + } + scx.AddCloser(ch) ch = scx.TrackActivity(ch) + // inform the client of the session ID that is going to be used in a new + // goroutine to reduce latency. + go func() { + s.logger.DebugContext(ctx, "Sending current session ID") + _, err := ch.SendRequest(teleport.CurrentSessionIDRequest, false, []byte(scx.GetNewSessionID())) + if err != nil { + s.logger.DebugContext(ctx, "Failed to send the current session ID", "error", err) + } + }() + s.logger.DebugContext(ctx, "Opening session request", "target_addr", s.sconn.RemoteAddr(), "session_id", scx.ID()) defer s.logger.DebugContext(ctx, "Closing session request", "target_addr", s.sconn.RemoteAddr(), "session_id", scx.ID()) diff --git a/lib/srv/regular/sshserver.go b/lib/srv/regular/sshserver.go index 0078767580da5..6ba1f69be84ea 100644 --- a/lib/srv/regular/sshserver.go +++ b/lib/srv/regular/sshserver.go @@ -1669,7 +1669,18 @@ func (s *Server) handleSessionRequests(ctx context.Context, ccx *sshutils.Connec // from env vars. There is no harm in sending the ephemeral session ID anyways // as clients should ignore the reported session ID when joining a session. if scx.GetSessionParams().JoinSessionID == "" { - scx.SetNewSessionID(ctx, session.NewID(), ch) + sid := session.NewID() + scx.SetNewSessionID(ctx, sid) + + // inform the client of the session ID that is going to be used in a new + // goroutine to reduce latency. + go func() { + s.logger.DebugContext(ctx, "Sending current session ID") + _, err := ch.SendRequest(teleport.CurrentSessionIDRequest, false, []byte(sid)) + if err != nil { + s.logger.DebugContext(ctx, "Failed to send the current session ID", "error", err) + } + }() } // The keep-alive loop will keep pinging the remote server and after it has From 0a1a7b8ecdbd6afe27824fa86943a22b09bf58c4 Mon Sep 17 00:00:00 2001 From: joerger Date: Tue, 11 Nov 2025 10:32:18 -0800 Subject: [PATCH 25/26] Remove check for session.data event which may not be emitted in time for the test. --- integration/integration_test.go | 55 ++++++++++++++------------------- 1 file changed, 24 insertions(+), 31 deletions(-) diff --git a/integration/integration_test.go b/integration/integration_test.go index 2c0b9f0d73255..7c5bc248ed1b6 100644 --- a/integration/integration_test.go +++ b/integration/integration_test.go @@ -570,37 +570,30 @@ func testAuditOn(t *testing.T, suite *integrationTestSuite) { require.Regexp(t, ".*exit.*", recorded) require.Regexp(t, ".*echo hi.*", recorded) - // Ensure that we find the 4 primary session without duplicates. - require.EventuallyWithT(t, func(collect *assert.CollectT) { - sessionEvents, _, err = site.SearchEvents(ctx, events.SearchEventsRequest{ - From: time.Time{}, - To: time.Now(), - EventTypes: []string{ - events.SessionStartEvent, - events.SessionLeaveEvent, - events.SessionEndEvent, - events.SessionDataEvent, - }, - }) - require.NoError(collect, err) - - // Check that the events found above in the session stream show up in the backend. - require.True(collect, slices.ContainsFunc(sessionEvents, func(ae apievents.AuditEvent) bool { - return ae.GetID() == start.GetID() - }), "expected session events to contain session.start event") - require.True(collect, slices.ContainsFunc(sessionEvents, func(ae apievents.AuditEvent) bool { - return ae.GetID() == end.GetID() - }), "expected session events to contain session.end event") - require.True(collect, slices.ContainsFunc(sessionEvents, func(ae apievents.AuditEvent) bool { - return ae.GetID() == leave.GetID() - }), "expected session events to contain session.leave event") - require.True(collect, slices.ContainsFunc(sessionEvents, func(ae apievents.AuditEvent) bool { - return ae.GetType() == events.SessionDataEvent - }), "expected session events to contain session.data event") - - // Ensure there are no duplicate events, e.g. from proxy recording mode. - require.Len(collect, sessionEvents, 4, "%d unexpected duplicate events", len(sessionEvents)-4) - }, 20*time.Second, 100*time.Millisecond) + sessionEvents, _, err = site.SearchEvents(ctx, events.SearchEventsRequest{ + From: time.Time{}, + To: time.Now(), + EventTypes: []string{ + events.SessionStartEvent, + events.SessionLeaveEvent, + events.SessionEndEvent, + }, + }) + require.NoError(t, err) + + // Check that the events found above in the session stream show up in the backend. + require.True(t, slices.ContainsFunc(sessionEvents, func(ae apievents.AuditEvent) bool { + return ae.GetID() == start.GetID() + }), "expected session events to contain session.start event") + require.True(t, slices.ContainsFunc(sessionEvents, func(ae apievents.AuditEvent) bool { + return ae.GetID() == end.GetID() + }), "expected session events to contain session.end event") + require.True(t, slices.ContainsFunc(sessionEvents, func(ae apievents.AuditEvent) bool { + return ae.GetID() == leave.GetID() + }), "expected session events to contain session.leave event") + + // Ensure there are no duplicate events, e.g. from proxy recording mode. + require.Len(t, sessionEvents, 3, "%d unexpected duplicate events", len(sessionEvents)-4) }) } } From 1d5961c3d1c8bc0d33396225233bd22417ea9a7c Mon Sep 17 00:00:00 2001 From: joerger Date: Wed, 12 Nov 2025 10:02:21 -0800 Subject: [PATCH 26/26] Address comments. --- lib/srv/forward/sshserver.go | 36 ++++++++++++++++++++++-------------- lib/srv/regular/sshserver.go | 2 +- 2 files changed, 23 insertions(+), 15 deletions(-) diff --git a/lib/srv/forward/sshserver.go b/lib/srv/forward/sshserver.go index 7f9cef0e8ce6f..14b965299be8b 100644 --- a/lib/srv/forward/sshserver.go +++ b/lib/srv/forward/sshserver.go @@ -1176,7 +1176,7 @@ func (s *Server) handleSessionChannel(ctx context.Context, nch ssh.NewChannel) { // events, and session trackers) to avoid duplicates. // // Register handler to receive the current session ID before starting the session. - var newSessionIDFromServer chan session.ID + var newSessionIDFromServer chan string if s.targetServer.GetSubKind() == types.SubKindTeleportNode { // Check if the Teleport Node is outdated and won't actually send the session ID. // @@ -1197,19 +1197,13 @@ func (s *Server) handleSessionChannel(ctx context.Context, nch ssh.NewChannel) { return } - if reply { - newSessionIDFromServer = make(chan session.ID, 1) + if err == nil && reply { + newSessionIDFromServer = make(chan string, 1) var receiveSessionIDOnce sync.Once s.remoteClient.HandleSessionRequest(ctx, teleport.CurrentSessionIDRequest, func(ctx context.Context, req *ssh.Request) { // Only handle the first request - only one is expected. receiveSessionIDOnce.Do(func() { - sid, err := session.ParseID(string(req.Payload)) - if err != nil { - s.logger.WarnContext(ctx, "Unable to parse session ID", "error", err) - return - } - - newSessionIDFromServer <- *sid + newSessionIDFromServer <- string(req.Payload) }) }) } else { @@ -1239,14 +1233,27 @@ func (s *Server) handleSessionChannel(ctx context.Context, nch ssh.NewChannel) { if newSessionIDFromServer != nil { // Wait for the session ID to be reported by the target node. select { - case sid := <-newSessionIDFromServer: - scx.SetNewSessionID(ctx, sid) + case sidString := <-newSessionIDFromServer: + sid, err := session.ParseID(sidString) + if err != nil { + s.logger.WarnContext(ctx, "Unable to parse session ID reported by target Teleport Node", "error", err) + if err := nch.Reject(ssh.ConnectionFailed, "target Teleport Node failed to report session ID"); err != nil { + s.logger.WarnContext(ctx, "Failed to reject channel", "channel", nch.ChannelType(), "error", err) + } + return + } + scx.SetNewSessionID(ctx, *sid) case <-time.After(10 * time.Second): s.logger.WarnContext(ctx, "Failed to receive session ID from target node. Ensure the targeted Teleport Node is upgraded to v19.0.0+ to avoid duplicate events due to mismatched session IDs.") if err := nch.Reject(ssh.ConnectionFailed, "target Teleport Node failed to report session ID"); err != nil { s.logger.WarnContext(ctx, "Failed to reject channel", "channel", nch.ChannelType(), "error", err) } return + case <-ctx.Done(): + if err := nch.Reject(ssh.ConnectionFailed, "target Teleport Node failed to report session ID"); err != nil { + s.logger.WarnContext(ctx, "Failed to reject channel", "channel", nch.ChannelType(), "error", err) + } + return } } else { // The target node is not expected to report session ID, either because it's @@ -1270,8 +1277,9 @@ func (s *Server) handleSessionChannel(ctx context.Context, nch ssh.NewChannel) { // inform the client of the session ID that is going to be used in a new // goroutine to reduce latency. go func() { - s.logger.DebugContext(ctx, "Sending current session ID") - _, err := ch.SendRequest(teleport.CurrentSessionIDRequest, false, []byte(scx.GetNewSessionID())) + sid := scx.GetNewSessionID() + s.logger.DebugContext(ctx, "Sending current session ID", "sid", sid) + _, err := ch.SendRequest(teleport.CurrentSessionIDRequest, false, []byte(sid)) if err != nil { s.logger.DebugContext(ctx, "Failed to send the current session ID", "error", err) } diff --git a/lib/srv/regular/sshserver.go b/lib/srv/regular/sshserver.go index 6ba1f69be84ea..7b48bd44a8ee1 100644 --- a/lib/srv/regular/sshserver.go +++ b/lib/srv/regular/sshserver.go @@ -1675,7 +1675,7 @@ func (s *Server) handleSessionRequests(ctx context.Context, ccx *sshutils.Connec // inform the client of the session ID that is going to be used in a new // goroutine to reduce latency. go func() { - s.logger.DebugContext(ctx, "Sending current session ID") + s.logger.DebugContext(ctx, "Sending current session ID", "sid", sid) _, err := ch.SendRequest(teleport.CurrentSessionIDRequest, false, []byte(sid)) if err != nil { s.logger.DebugContext(ctx, "Failed to send the current session ID", "error", err)