From 9061604d6d462457613c8292a0bae218ba5b8720 Mon Sep 17 00:00:00 2001 From: Ryan Clark Date: Wed, 5 Jul 2023 19:11:03 +0200 Subject: [PATCH 1/3] [Assist] Clear the refresh websocket timeout when closing Assist (#28610) * Clear the refresh websocket timeout when closing Assist * Missing semicolon to please prettier * Add comment to remove once the new session implementation is done --- web/packages/teleport/src/Assist/context/AssistContext.tsx | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/web/packages/teleport/src/Assist/context/AssistContext.tsx b/web/packages/teleport/src/Assist/context/AssistContext.tsx index f1676eba8f421..1084d4b49f136 100644 --- a/web/packages/teleport/src/Assist/context/AssistContext.tsx +++ b/web/packages/teleport/src/Assist/context/AssistContext.tsx @@ -78,6 +78,7 @@ const TEN_MINUTES = 10 * 60 * 1000; export function AssistContextProvider(props: PropsWithChildren) { const activeWebSocket = useRef(null); + // TODO(ryan): this should be removed once https://github.com/gravitational/teleport.e/pull/1609 is implemented const executeCommandWebSocket = useRef(null); const refreshWebSocketTimeout = useRef(null); @@ -549,6 +550,10 @@ export function AssistContextProvider(props: PropsWithChildren) { useEffect(() => { loadConversations(); + + return () => { + window.clearTimeout(refreshWebSocketTimeout.current); + }; }, []); const selectedConversationMessages = useMemo( From 2893ee9a74e592e07d9b34f882df81ab0bb9ef86 Mon Sep 17 00:00:00 2001 From: Alan Parra Date: Tue, 20 Jun 2023 11:01:43 -0300 Subject: [PATCH 2/3] fix: Decode JSON directly from stream in `waitForCommandOutput` (#28040) --- lib/web/command_test.go | 20 ++++++-------------- 1 file changed, 6 insertions(+), 14 deletions(-) diff --git a/lib/web/command_test.go b/lib/web/command_test.go index 99b018c898505..7a73d715099ba 100644 --- a/lib/web/command_test.go +++ b/lib/web/command_test.go @@ -292,29 +292,21 @@ func waitForCommandOutput(stream io.Reader, substr string) error { default: } - out := make([]byte, 100) - n, err := stream.Read(out) - if err != nil { - return trace.Wrap(err) - } - var env Envelope - err = json.Unmarshal(out[:n], &env) - if err != nil { - return trace.Wrap(err) + dec := json.NewDecoder(stream) + if err := dec.Decode(&env); err != nil { + return trace.Wrap(err, "decoding envelope JSON from stream") } d, err := base64.StdEncoding.DecodeString(env.Payload) if err != nil { - return trace.Wrap(err) + return trace.Wrap(err, "decoding b64 payload") } + data := removeSpace(string(d)) - if n > 0 && strings.Contains(data, substr) { + if strings.Contains(data, substr) { return nil } - if err != nil { - return trace.Wrap(err) - } } } From 2d92b33abcdea50940cd5d2c941c2bb80236d4bd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Justinas=20Stankevi=C4=8Dius?= Date: Fri, 7 Jul 2023 19:24:55 +0300 Subject: [PATCH 3/3] Assist: mark individual nodes as finished (#28477) * Mark nodes as done when command finishes in Assist * Split Close/CloseWithPayload * Expect a close message before the summary in test * Remediate confusing type usage in test `Envelope` is the outer layer used (in protobuf format) for execution / terminal sessions. Meanwhile `outEnvelope` is the inner layer used (in JSON) spefically with assist (execution), when outer envelope is of "raw" type Using `Envelope` where decoding `outEnvelope` in tests previously worked "by accident" due to matching field names * Assert that server ID is set in the close message * Refactor command execution logic and adjust WebSocket handling This commit changes how command execution and WebSocket handling are performed in the code. Instead of manually managing session close signals and command execution notifications via WebSocket, we have wrapped it into more easily manageable form. Changed parts: assist and websocket libraries, test and several components of 'teleport'. Why: This allows cleaner command management and error handling, leading to more reliable and maintainable application. Also made code more readable and easy to understand. Made WebSocket more precise with its handling to prevent cases where stale or incorrect data might interrupt the session. Details: Instead of sending a close signal after all commands have been executed, we now send individual session end updates for each command. Sessions no longer remain active unnecessarily. Message handling in command execution has been refactored for better error propagation. All these changes were also adjusted and reflected in the associated test cases. Also, names and types of various data structures are edited to reflect their actual usage. * Change ServerID to NodeID to improve code consistency This commit replaces occurrences of `ServerID` with `NodeID` across several files. `ServerID` was misleading and causing confusion as it was serving as an identifier for nodes, so to improve comprehension and consistency in the codebase, all instances of `ServerID` have been replaced with `NodeID`. Tests have also been updated to reflect this change. --------- Co-authored-by: Jakub Nyckowski --- lib/web/command.go | 10 ++- lib/web/command_test.go | 24 +++--- lib/web/terminal.go | 10 ++- .../src/Assist/context/AssistContext.tsx | 81 ++++++++++--------- web/packages/teleport/src/Assist/types.ts | 10 +++ 5 files changed, 81 insertions(+), 54 deletions(-) diff --git a/lib/web/command.go b/lib/web/command.go index afe31dc373db6..0d72d2427e8a2 100644 --- a/lib/web/command.go +++ b/lib/web/command.go @@ -87,6 +87,12 @@ type commandExecResult struct { SessionID string `json:"session_id"` } +// sessionEndEvent is an event that is sent when a session ends. +type sessionEndEvent struct { + // NodeID is the ID of the server where the session was created. + NodeID string `json:"node_id"` +} + // Check checks if the request is valid. func (c *CommandRequest) Check() error { if c.Command == "" { @@ -342,7 +348,7 @@ func (h *Handler) computeAndSendSummary( return trace.Wrap(err) } - // Add the summary message to the backend so it is persisted on chat + // Add the summary message to the backend, so it is persisted on chat // reload. messagePayload, err := json.Marshal(&assistlib.CommandExecSummary{ ExecutionID: req.executionID, @@ -674,7 +680,7 @@ func (t *commandHandler) streamOutput(ctx context.Context, tc *client.TeleportCl return } - if err := t.stream.SendCloseMessage(); err != nil { + if err := t.stream.SendCloseMessage(sessionEndEvent{NodeID: t.sessionData.ServerID}); err != nil { t.log.WithError(err).Error("Unable to send close event to web client.") return } diff --git a/lib/web/command_test.go b/lib/web/command_test.go index 7a73d715099ba..37a242f550b4b 100644 --- a/lib/web/command_test.go +++ b/lib/web/command_test.go @@ -19,7 +19,6 @@ package web import ( "context" "crypto/tls" - "encoding/base64" "encoding/json" "fmt" "io" @@ -180,12 +179,20 @@ func TestExecuteCommandSummary(t *testing.T) { // Wait for command execution to complete require.NoError(t, waitForCommandOutput(stream, "teleport")) - var env Envelope dec := json.NewDecoder(stream) + + // Consume the close message + var sessionMetadata sessionEndEvent + err = dec.Decode(&sessionMetadata) + require.NoError(t, err) + require.Equal(t, "node", sessionMetadata.NodeID) + + // Consume the summary message + var env outEnvelope err = dec.Decode(&env) require.NoError(t, err) - require.Equal(t, envelopeTypeSummary, env.GetType()) - require.NotEmpty(t, env.GetPayload()) + require.Equal(t, envelopeTypeSummary, env.Type) + require.NotEmpty(t, env.Payload) // Wait for the command execution history to be saved var messages *assist.GetAssistantMessagesResponse @@ -292,18 +299,13 @@ func waitForCommandOutput(stream io.Reader, substr string) error { default: } - var env Envelope + var env outEnvelope dec := json.NewDecoder(stream) if err := dec.Decode(&env); err != nil { return trace.Wrap(err, "decoding envelope JSON from stream") } - d, err := base64.StdEncoding.DecodeString(env.Payload) - if err != nil { - return trace.Wrap(err, "decoding b64 payload") - } - - data := removeSpace(string(d)) + data := removeSpace(string(env.Payload)) if strings.Contains(data, substr) { return nil } diff --git a/lib/web/terminal.go b/lib/web/terminal.go index b2235271b2ad0..befa5fa4b7fb9 100644 --- a/lib/web/terminal.go +++ b/lib/web/terminal.go @@ -745,7 +745,7 @@ func (t *TerminalHandler) streamTerminal(ctx context.Context, tc *client.Telepor } // Send close envelope to web terminal upon exit without an error. - if err := t.stream.SendCloseMessage(); err != nil { + if err := t.stream.SendCloseMessage(sessionEndEvent{NodeID: t.sessionData.ServerID}); err != nil { t.log.WithError(err).Error("Unable to send close event to web client.") } @@ -1297,10 +1297,16 @@ func (t *WSStream) Read(out []byte) (int, error) { } // SendCloseMessage sends a close message on the web socket. -func (t *WSStream) SendCloseMessage() error { +func (t *WSStream) SendCloseMessage(event sessionEndEvent) error { + sessionMetadataPayload, err := json.Marshal(&event) + if err != nil { + return trace.Wrap(err) + } + envelope := &Envelope{ Version: defaults.WebsocketVersion, Type: defaults.WebsocketClose, + Payload: string(sessionMetadataPayload), } envelopeBytes, err := proto.Marshal(envelope) if err != nil { diff --git a/web/packages/teleport/src/Assist/context/AssistContext.tsx b/web/packages/teleport/src/Assist/context/AssistContext.tsx index 1084d4b49f136..92d07f3b283ee 100644 --- a/web/packages/teleport/src/Assist/context/AssistContext.tsx +++ b/web/packages/teleport/src/Assist/context/AssistContext.tsx @@ -33,8 +33,10 @@ import { getAccessToken, getHostName } from 'teleport/services/api'; import { ExecutionEnvelopeType, + ExecutionTeleportErrorType, RawPayload, ServerMessageType, + SessionEndData, } from 'teleport/Assist/types'; import { MessageTypeEnum, Protobuf } from 'teleport/lib/term/protobuf'; @@ -429,39 +431,51 @@ export function AssistContextProvider(props: PropsWithChildren) { ); const proto = new Protobuf(); - executeCommandWebSocket.current = new WebSocket(url); executeCommandWebSocket.current.binaryType = 'arraybuffer'; - let sessionsEnded = 0; - executeCommandWebSocket.current.onmessage = event => { const uintArray = new Uint8Array(event.data); const msg = proto.decode(uintArray); switch (msg.type) { - case MessageTypeEnum.RAW: + case MessageTypeEnum.RAW: { const data = JSON.parse(msg.payload) as RawPayload; const payload = atob(data.payload); - if (data.type === ExecutionEnvelopeType) { - dispatch({ - type: AssistStateActionType.AddCommandResultSummary, - conversationId: state.conversations.selectedId, - summary: payload, - executionId: execParams.execution_id, - command: execParams.command, - }); - } else { - dispatch({ - type: AssistStateActionType.UpdateCommandResult, - conversationId: state.conversations.selectedId, - commandResultId: nodeIdToResultId.get(data.node_id), - output: payload, - }); + switch (data.type) { + case ExecutionTeleportErrorType: + dispatch({ + type: AssistStateActionType.FinishCommandResult, + conversationId: state.conversations.selectedId, + commandResultId: nodeIdToResultId.get(data.node_id), + }); + + nodeIdToResultId.delete(data.node_id); + break; + + case ExecutionEnvelopeType: + dispatch({ + type: AssistStateActionType.AddCommandResultSummary, + conversationId: state.conversations.selectedId, + summary: payload, + executionId: execParams.execution_id, + command: execParams.command, + }); + break; + + default: + dispatch({ + type: AssistStateActionType.UpdateCommandResult, + conversationId: state.conversations.selectedId, + commandResultId: nodeIdToResultId.get(data.node_id), + output: payload, + }); } + break; + } case MessageTypeEnum.WEBAUTHN_CHALLENGE: const challenge = JSON.parse(msg.payload); @@ -481,30 +495,19 @@ export function AssistContextProvider(props: PropsWithChildren) { break; - case MessageTypeEnum.SESSION_END: - // we don't know the nodeId of the session that ended, so we have to - // count the finished sessions and then mark them all as done once - // they've all finished - sessionsEnded += 1; - - if (sessionsEnded === nodeIdToResultId.size) { - const message = proto.encodeCloseMessage(); - const bytearray = new Uint8Array(message); + case MessageTypeEnum.SESSION_END: { + const data = JSON.parse(msg.payload) as SessionEndData; - for (const nodeId of nodeIdToResultId.keys()) { - dispatch({ - type: AssistStateActionType.FinishCommandResult, - conversationId: state.conversations.selectedId, - commandResultId: nodeIdToResultId.get(nodeId), - }); - - executeCommandWebSocket.current.send(bytearray.buffer); - } + dispatch({ + type: AssistStateActionType.FinishCommandResult, + conversationId: state.conversations.selectedId, + commandResultId: nodeIdToResultId.get(data.node_id), + }); - nodeIdToResultId.clear(); - } + nodeIdToResultId.delete(data.node_id); break; + } } }; diff --git a/web/packages/teleport/src/Assist/types.ts b/web/packages/teleport/src/Assist/types.ts index 5e205d3a0c5e7..ade6da6cd08b2 100644 --- a/web/packages/teleport/src/Assist/types.ts +++ b/web/packages/teleport/src/Assist/types.ts @@ -28,8 +28,14 @@ export enum ServerMessageType { AssistThought = 'CHAT_MESSAGE_PROGRESS_UPDATE', } +// ExecutionEnvelopeType is the type of message that is returned when +// the command summary is returned. export const ExecutionEnvelopeType = 'summary'; +// ExecutionTeleportErrorType is the type of error that is returned when +// Teleport returns an error (failed to execute command, failed to connect, etc.) +export const ExecutionTeleportErrorType = 'teleport-error'; + export interface Conversation { id: string; title?: string; @@ -192,6 +198,10 @@ export interface SessionData { session: { server_id: string }; } +export interface SessionEndData { + node_id: string; +} + export interface ExecuteRemoteCommandPayload { command: string; login?: string;