-
Notifications
You must be signed in to change notification settings - Fork 2.1k
Add latency detector for desktop sessions #52827
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
86774cf
f4b581a
74450eb
fa07c79
236cbec
cd45be8
c0f2dfb
78f17d3
4906593
8b7ae4b
c9b6a01
d8b7aad
5b08181
88db008
853b91c
25a7ef0
71e14d0
2b8b31a
eabea28
58bd4f3
bbc1073
0a1e727
a3f5c80
aea59ed
8b80181
02d2ea6
8491486
aee5f35
989af81
f6f8df1
5383d6b
46c85fe
512175d
b2ba75f
31d5ee0
de874d7
4641324
5942a2b
4eb3c0a
ae258df
bf49275
3a30f62
da25c99
b5b4c66
cc98086
f21aec9
b7edc60
b5e0f90
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -74,8 +74,10 @@ import ( | |
| "encoding/binary" | ||
| "fmt" | ||
| "log/slog" | ||
| "net" | ||
| "os" | ||
| "runtime/cgo" | ||
| "strconv" | ||
| "sync" | ||
| "sync/atomic" | ||
| "time" | ||
|
|
@@ -414,6 +416,9 @@ func (c *Client) startInputStreaming(stopCh chan struct{}) error { | |
| c.cfg.Logger.InfoContext(context.Background(), "TDP input streaming starting") | ||
| defer c.cfg.Logger.InfoContext(context.Background(), "TDP input streaming finished") | ||
|
|
||
| // we will disable ping only if the env var is truthy | ||
| disableDesktopPing, _ := strconv.ParseBool(os.Getenv("TELEPORT_DISABLE_DESKTOP_LATENCY_DETECTOR_PING")) | ||
|
|
||
| var withheldResize *tdp.ClientScreenSpec | ||
| for { | ||
| select { | ||
|
|
@@ -432,6 +437,22 @@ func (c *Client) startInputStreaming(stopCh chan struct{}) error { | |
| c.cfg.Logger.WarnContext(context.Background(), "Failed reading TDP input message", "error", err) | ||
| return err | ||
| } | ||
| if m, ok := msg.(tdp.Ping); ok { | ||
| // Upon receiving a ping message, we make a connection | ||
| // to the host and send the same message back to the proxy. | ||
| // The proxy will then compute the round trip time. | ||
| if !disableDesktopPing { | ||
| conn, err := net.Dial("tcp", c.cfg.Addr) | ||
| if err == nil { | ||
| conn.Close() | ||
| } | ||
| } | ||
|
Comment on lines
+444
to
+449
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This will just block the whole stream of messages until the dial is completed on every ping, won't it? |
||
| if err := c.cfg.Conn.WriteMessage(m); err != nil { | ||
| c.cfg.Logger.WarnContext(context.Background(), "Failed writing TDP ping message", "error", err) | ||
| return err | ||
| } | ||
| continue | ||
| } | ||
|
|
||
| if atomic.LoadUint32(&c.readyForInput) == 0 { | ||
| switch m := msg.(type) { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -30,9 +30,12 @@ import ( | |
| "net/http" | ||
| "sync" | ||
|
|
||
| "github.com/google/uuid" | ||
| "github.com/gorilla/websocket" | ||
| "github.com/gravitational/trace" | ||
| "github.com/jonboulle/clockwork" | ||
| "github.com/julienschmidt/httprouter" | ||
| "golang.org/x/sync/errgroup" | ||
|
|
||
| "github.com/gravitational/teleport/api/client/proto" | ||
| "github.com/gravitational/teleport/api/constants" | ||
|
|
@@ -47,6 +50,7 @@ import ( | |
| "github.com/gravitational/teleport/lib/reversetunnelclient" | ||
| "github.com/gravitational/teleport/lib/srv/desktop/tdp" | ||
| "github.com/gravitational/teleport/lib/utils" | ||
| "github.com/gravitational/teleport/lib/utils/diagnostics/latency" | ||
| logutils "github.com/gravitational/teleport/lib/utils/log" | ||
| ) | ||
|
|
||
|
|
@@ -194,7 +198,7 @@ func (h *Handler) createDesktopConnection( | |
| clientSrcAddr: clientSrcAddr, | ||
| clientDstAddr: clientDstAddr, | ||
| } | ||
| serviceConn, _, err := c.connectToWindowsService(ctx, clusterName, validServiceIDs) | ||
| serviceConn, version, err := c.connectToWindowsService(ctx, clusterName, validServiceIDs) | ||
| if err != nil { | ||
| return sendTDPError(trace.Wrap(err, "cannot connect to Windows Desktop Service")) | ||
| } | ||
|
|
@@ -233,7 +237,7 @@ func (h *Handler) createDesktopConnection( | |
| // proxyWebsocketConn hangs here until connection is closed | ||
| handleProxyWebsocketConnErr( | ||
| ctx, | ||
| proxyWebsocketConn(ws, serviceConnTLS), | ||
| proxyWebsocketConn(ctx, ws, serviceConnTLS, log, version), | ||
| log, | ||
| ) | ||
|
|
||
|
|
@@ -535,19 +539,108 @@ func (c *connector) tryConnect(ctx context.Context, clusterName, desktopServiceI | |
| return conn, ver, trace.Wrap(err) | ||
| } | ||
|
|
||
| // desktopPinger measures latency between proxy and the desktop by sending tdp.Ping messages | ||
| // Windows Desktop Service and measuring the time it takes to receive message with the same UUID back. | ||
| type desktopPinger struct { | ||
| wds net.Conn | ||
| ch <-chan tdp.Ping | ||
| } | ||
|
|
||
| func (d desktopPinger) Ping(ctx context.Context) error { | ||
| ping := tdp.Ping{ | ||
| UUID: uuid.New(), | ||
| } | ||
| buf, err := ping.Encode() | ||
| if err != nil { | ||
| return trace.Wrap(err) | ||
| } | ||
| _, err = d.wds.Write(buf) | ||
| if err != nil { | ||
| return trace.Wrap(err) | ||
| } | ||
| for { | ||
| select { | ||
| case pong := <-d.ch: | ||
| if pong.UUID == ping.UUID { | ||
| return nil | ||
| } | ||
| case <-ctx.Done(): | ||
| return trace.Wrap(ctx.Err()) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| // proxyWebsocketConn does a bidrectional copy between the websocket | ||
| // connection to the browser (ws) and the mTLS connection to Windows | ||
| // Desktop Serivce (wds) | ||
| func proxyWebsocketConn(ws *websocket.Conn, wds net.Conn) error { | ||
| func proxyWebsocketConn(ctx context.Context, ws *websocket.Conn, wds net.Conn, log *slog.Logger, version string) error { | ||
| ctx, cancel := context.WithCancel(ctx) | ||
| var closeOnce sync.Once | ||
| close := func() { | ||
| cancel() | ||
| ws.Close() | ||
| wds.Close() | ||
| } | ||
|
|
||
| errs := make(chan error, 2) | ||
| tdpMessagesToSend := make(chan tdp.Message) | ||
|
|
||
| latencySupported, err := utils.MinVerWithoutPreRelease(version, "17.5.0") | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Instead of relying on indirect info such as the version string (in a heartbeat, even, not even from the actual connection in use) can we start thinking about exchanging capabilities as part of the TDP "handshake", so to speak? |
||
| if err != nil { | ||
| return trace.Wrap(err) | ||
| } | ||
|
|
||
| pings := make(chan tdp.Ping) | ||
|
|
||
| if latencySupported { | ||
| pinger := desktopPinger{ | ||
| wds: wds, | ||
| ch: pings, | ||
| } | ||
|
|
||
| go monitorLatency(ctx, clockwork.NewRealClock(), ws, pinger, | ||
| latency.ReporterFunc(func(ctx context.Context, stats latency.Statistics) error { | ||
| tdpMessagesToSend <- tdp.LatencyStats{ | ||
| ClientLatency: uint32(stats.Client), | ||
| ServerLatency: uint32(stats.Server), | ||
| } | ||
| return nil | ||
| }), | ||
| ) | ||
|
|
||
| } | ||
|
|
||
| var errs errgroup.Group | ||
|
|
||
| // run a goroutine to pick TDP messages up from a channel and send | ||
| // them to the browser | ||
| errs.Go(func() error { | ||
| for msg := range tdpMessagesToSend { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does this not just block forever and leak the goroutine at the end of a connection? Nothing closes
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It does, could you please check 0551d44 where I tried to fix all 3 problems you mention |
||
| if ping, ok := msg.(tdp.Ping); ok { | ||
| pings <- ping | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This can also block if the context for |
||
| continue | ||
| } | ||
| if ls, ok := msg.(tdp.LatencyStats); ok { | ||
| log.DebugContext(ctx, "sending latency stats", "client", ls.ClientLatency, "server", ls.ServerLatency) | ||
| } | ||
| encoded, err := msg.Encode() | ||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
||
| err = ws.WriteMessage(websocket.BinaryMessage, encoded) | ||
| if utils.IsOKNetworkError(err) { | ||
| return err | ||
| } | ||
| if err != nil { | ||
| return err | ||
| } | ||
| } | ||
| return nil | ||
| }) | ||
|
|
||
| go func() { | ||
| // run a second goroutine to read TDP messages from the Windows | ||
| // agent and write them to our send channel | ||
| errs.Go(func() error { | ||
| defer closeOnce.Do(close) | ||
|
|
||
| // we avoid using io.Copy here, as we want to make sure | ||
|
|
@@ -563,8 +656,7 @@ func proxyWebsocketConn(ws *websocket.Conn, wds net.Conn) error { | |
| for { | ||
| msg, err := tc.ReadMessage() | ||
| if utils.IsOKNetworkError(err) { | ||
| errs <- nil | ||
| return | ||
| return err | ||
| } else if err != nil { | ||
| isFatal := tdp.IsFatalErr(err) | ||
| severity := tdp.SeverityError | ||
|
|
@@ -585,58 +677,38 @@ func proxyWebsocketConn(ws *websocket.Conn, wds net.Conn) error { | |
| if sendErr != nil { | ||
| err = sendErr | ||
| } | ||
| errs <- err | ||
| return | ||
| } | ||
| encoded, err := msg.Encode() | ||
| if err != nil { | ||
| errs <- err | ||
| return | ||
| } | ||
| err = ws.WriteMessage(websocket.BinaryMessage, encoded) | ||
| if utils.IsOKNetworkError(err) { | ||
| errs <- nil | ||
| return | ||
| } | ||
| if err != nil { | ||
| errs <- err | ||
| return | ||
| return err | ||
| } | ||
| tdpMessagesToSend <- msg | ||
| } | ||
| }() | ||
| }) | ||
|
|
||
| go func() { | ||
| // run a goroutine to read TDP messages coming from the browser | ||
| // and pass them on to the Windows agent | ||
| errs.Go(func() error { | ||
| defer closeOnce.Do(close) | ||
|
|
||
| var buf bytes.Buffer | ||
| for { | ||
| _, reader, err := ws.NextReader() | ||
| switch { | ||
| case utils.IsOKNetworkError(err): | ||
| errs <- nil | ||
| return | ||
| return err | ||
| case err != nil: | ||
| errs <- err | ||
| return | ||
| return err | ||
| } | ||
| buf.Reset() | ||
| if _, err := io.Copy(&buf, reader); err != nil { | ||
| errs <- err | ||
| return | ||
| return err | ||
| } | ||
|
|
||
| if _, err := wds.Write(buf.Bytes()); err != nil { | ||
| errs <- trace.Wrap(err, "sending TDP message to desktop agent") | ||
| return | ||
| return trace.Wrap(err, "sending TDP message to desktop agent") | ||
| } | ||
| } | ||
| }() | ||
| }) | ||
|
|
||
| var retErrs []error | ||
| for i := 0; i < 2; i++ { | ||
| retErrs = append(retErrs, <-errs) | ||
| } | ||
| return trace.NewAggregate(retErrs...) | ||
| return trace.Wrap(errs.Wait()) | ||
| } | ||
|
|
||
| // handleProxyWebsocketConnErr handles the error returned by proxyWebsocketConn by | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.