From 2a62486abac1fa81758a0c85804ee34e45028cac Mon Sep 17 00:00:00 2001 From: Marc Szanto <11840265+Xemdo@users.noreply.github.com> Date: Fri, 29 Dec 2023 00:49:13 -0800 Subject: [PATCH] Added RPC command to disable keepalive messages for a client; #291 --- cmd/events.go | 26 +++--- .../events/websocket/mock_server/client.go | 1 + .../events/websocket/mock_server/manager.go | 1 + .../websocket/mock_server/rpc_handler.go | 80 +++++++++++++++++-- .../events/websocket/mock_server/server.go | 9 +++ internal/events/websocket/websocket_cmd.go | 3 + 6 files changed, 101 insertions(+), 19 deletions(-) diff --git a/cmd/events.go b/cmd/events.go index 19924ab5..09bd8614 100644 --- a/cmd/events.go +++ b/cmd/events.go @@ -53,15 +53,16 @@ var ( // websocketCmd-specific flags var ( - wsDebug bool - wsStrict bool - wsClient string - wsSubscription string - wsStatus string - wsReason string - wsServerIP string - wsServerPort int - wsSSL bool + wsDebug bool + wsStrict bool + wsClient string + wsSubscription string + wsStatus string + wsReason string + wsServerIP string + wsServerPort int + wsSSL bool + wsFeatureEnabled bool ) var eventCmd = &cobra.Command{ @@ -108,7 +109,8 @@ var websocketCmd = &cobra.Command{ Example: fmt.Sprintf(` twitch event websocket start-server twitch event websocket reconnect twitch event websocket close --session=e411cc1e_a2613d4e --reason=4006 - twitch event websocket subscription --status=user_removed --subscription=82a855-fae8-93bff0`, + twitch event websocket subscription --status=user_removed --subscription=82a855-fae8-93bff0 + twitch event websocket keepalive --session=e411cc1e_a2613d4e --enabled=false`, ), Aliases: []string{ "websockets", @@ -205,6 +207,7 @@ func init() { websocketCmd.Flags().StringVar(&wsSubscription, "subscription", "", `Subscription to target with your server command. Used with "websocket subscription".`) websocketCmd.Flags().StringVar(&wsStatus, "status", "", `Changes the status of an existing subscription. Used with "websocket subscription".`) websocketCmd.Flags().StringVar(&wsReason, "reason", "", `Sets the close reason when sending a Close message to the client. Used with "websocket close".`) + websocketCmd.Flags().BoolVar(&wsFeatureEnabled, "enabled", false, "Sets on/off for the specified feature.") // configure flags configureEventCmd.Flags().StringVarP(&forwardAddress, "forward-address", "F", "", "Forward address for mock event (webhook only).") @@ -366,7 +369,7 @@ https://dev.twitch.tv/docs/eventsub/handling-webhook-events#processing-an-event` Timestamp: timestamp, EventID: eventID, BroadcasterUserID: toUser, - Version: version, + Version: version, }) if err != nil { @@ -393,6 +396,7 @@ func websocketCmdRun(cmd *cobra.Command, args []string) error { Subscription: wsSubscription, SubscriptionStatus: wsStatus, CloseReason: wsReason, + FeatureEnabled: wsFeatureEnabled, }) return err diff --git a/internal/events/websocket/mock_server/client.go b/internal/events/websocket/mock_server/client.go index 6ac29f38..495d2b66 100644 --- a/internal/events/websocket/mock_server/client.go +++ b/internal/events/websocket/mock_server/client.go @@ -13,6 +13,7 @@ type Client struct { mutex sync.Mutex ConnectedAtTimestamp string // RFC3339Nano timestamp indicating when the client connected to the server connectionUrl string + KeepAliveEnabled bool mustSubscribeTimer *time.Timer keepAliveChanOpen bool diff --git a/internal/events/websocket/mock_server/manager.go b/internal/events/websocket/mock_server/manager.go index 3c1cb2e9..cd923a5d 100644 --- a/internal/events/websocket/mock_server/manager.go +++ b/internal/events/websocket/mock_server/manager.go @@ -154,6 +154,7 @@ func StartWebsocketServer(enableDebug bool, ip string, port int, enableSSL bool, rpc.RegisterHandler("EventSubWebSocketForwardEvent", RPCFireEventSubHandler) rpc.RegisterHandler("EventSubWebSocketCloseClient", RPCCloseHandler) rpc.RegisterHandler("EventSubWebSocketSubscription", RPCSubscriptionHandler) + rpc.RegisterHandler("EventSubWebSocketKeepalive", RPCKeepaliveHandler) rpc.StartBackgroundServer() // TODO: Interactive shell maybe? diff --git a/internal/events/websocket/mock_server/rpc_handler.go b/internal/events/websocket/mock_server/rpc_handler.go index df1254ad..c46d19c1 100644 --- a/internal/events/websocket/mock_server/rpc_handler.go +++ b/internal/events/websocket/mock_server/rpc_handler.go @@ -28,6 +28,8 @@ func ResolveRPCName(cmd string) string { return "EventSubWebSocketCloseClient" } else if cmd == "subscription" { return "EventSubWebSocketSubscription" + } else if cmd == "keepalive" { + return "EventSubWebSocketKeepalive" } else { return "" } @@ -151,8 +153,6 @@ func RPCCloseHandler(args rpc.RPCArgs) rpc.RPCResponse { } } - clientName := args.Variables["ClientName"] - if serverManager.reconnectTesting { log.Printf("Error on RPC call (EventSubWebSocketCloseClient): Could not activate while reconnect testing is active.") return rpc.RPCResponse{ @@ -170,21 +170,21 @@ func RPCCloseHandler(args rpc.RPCArgs) rpc.RPCResponse { } } - cn := clientName - if sessionRegex.MatchString(clientName) { + clientName := args.Variables["ClientName"] + if sessionRegex.MatchString(args.Variables["ClientName"]) { // Client name given was formatted as _. We must extract it - sessionRegexExec := sessionRegex.FindAllStringSubmatch(clientName, -1) - cn = sessionRegexExec[0][2] + sessionRegexExec := sessionRegex.FindAllStringSubmatch(args.Variables["ClientName"], -1) + clientName = sessionRegexExec[0][2] } server.muClients.Lock() - client, ok := server.Clients.Get(cn) + client, ok := server.Clients.Get(clientName) if !ok { server.muClients.Unlock() return rpc.RPCResponse{ ResponseCode: COMMAND_RESPONSE_FAILED_ON_SERVER, - DetailedInfo: "Client [" + cn + "] does not exist on WebSocket server.", + DetailedInfo: "Client [" + clientName + "] does not exist on WebSocket server.", } } @@ -273,3 +273,67 @@ func RPCSubscriptionHandler(args rpc.RPCArgs) rpc.RPCResponse { ResponseCode: COMMAND_RESPONSE_SUCCESS, } } + +func RPCKeepaliveHandler(args rpc.RPCArgs) rpc.RPCResponse { + if args.Variables["FeatureEnabled"] == "" || args.Variables["ClientName"] == "" { + return rpc.RPCResponse{ + ResponseCode: COMMAND_RESPONSE_MISSING_FLAG, + DetailedInfo: "Command \"keepalive\" requires flags --session and --enabled" + + "\n\nExample: twitch event websocket keepalive --session=e411cc1e_a2613d4e --enabled=false", + } + } + + enabled, err := strconv.ParseBool(args.Variables["FeatureEnabled"]) + if err != nil { + return rpc.RPCResponse{ + ResponseCode: COMMAND_RESPONSE_MISSING_FLAG, + DetailedInfo: "Command \"keepalive\" requires --enabled to be \"true\" or \"false\"" + + "\n\nExample: twitch event websocket keepalive --session=e411cc1e_a2613d4e --enabled=false", + } + } + + if serverManager.reconnectTesting { + log.Printf("Error on RPC call (EventSubWebSocketCloseClient): Could not activate while reconnect testing is active.") + return rpc.RPCResponse{ + ResponseCode: COMMAND_RESPONSE_FAILED_ON_SERVER, + DetailedInfo: "Cannot activate this command while reconnect testing is active.", + } + } + + server, ok := serverManager.serverList.Get(serverManager.primaryServer) + if !ok { + log.Printf("Error on RPC call (EventSubWebSocketCloseClient): Primary server not in server list.") + return rpc.RPCResponse{ + ResponseCode: COMMAND_RESPONSE_FAILED_ON_SERVER, + DetailedInfo: "Primary server not in server list.", + } + } + + clientName := args.Variables["ClientName"] + if sessionRegex.MatchString(args.Variables["ClientName"]) { + // Client name given was formatted as _. We must extract it + sessionRegexExec := sessionRegex.FindAllStringSubmatch(args.Variables["ClientName"], -1) + clientName = sessionRegexExec[0][2] + } + + server.muClients.Lock() + + client, ok := server.Clients.Get(clientName) + if !ok { + server.muClients.Unlock() + return rpc.RPCResponse{ + ResponseCode: COMMAND_RESPONSE_FAILED_ON_SERVER, + DetailedInfo: "Client [" + clientName + "] does not exist on WebSocket server.", + } + } + + client.KeepAliveEnabled = enabled + + server.muClients.Unlock() + + log.Printf("RPC set status on client feature [KeepAliveEnabled] for client [%v]: %v", clientName, enabled) + + return rpc.RPCResponse{ + ResponseCode: COMMAND_RESPONSE_SUCCESS, + } +} diff --git a/internal/events/websocket/mock_server/server.go b/internal/events/websocket/mock_server/server.go index 484358fd..b776f383 100644 --- a/internal/events/websocket/mock_server/server.go +++ b/internal/events/websocket/mock_server/server.go @@ -60,6 +60,7 @@ func (ws *WebSocketServer) WsPageHandler(w http.ResponseWriter, r *http.Request) conn: conn, ConnectedAtTimestamp: connectedAtTimestamp, connectionUrl: fmt.Sprintf("%v://%v/ws", serverManager.protocolHttp, r.Host), + KeepAliveEnabled: true, keepAliveChanOpen: false, pingChanOpen: false, } @@ -178,6 +179,14 @@ func (ws *WebSocketServer) WsPageHandler(w http.ResponseWriter, r *http.Request) return case <-client.keepAliveTimer.C: // Send KeepAlive message + if !client.KeepAliveEnabled { + // Sending keep alives was disabled manually, so we skip this one. + if ws.DebugEnabled { + log.Printf("Skipped sending session_keepalive to client [%s]", client.clientName) + } + continue + } + keepAliveMsg, _ := json.Marshal( KeepaliveMessage{ Metadata: MessageMetadata{ diff --git a/internal/events/websocket/websocket_cmd.go b/internal/events/websocket/websocket_cmd.go index c4a16882..ced3a28a 100644 --- a/internal/events/websocket/websocket_cmd.go +++ b/internal/events/websocket/websocket_cmd.go @@ -3,6 +3,7 @@ package websocket import ( "fmt" "net/rpc" + "strconv" "github.com/fatih/color" "github.com/twitchdev/twitch-cli/internal/events/websocket/mock_server" @@ -14,6 +15,7 @@ type WebsocketCommandParameters struct { Subscription string SubscriptionStatus string CloseReason string + FeatureEnabled bool } func ForwardWebsocketCommand(cmd string, p WebsocketCommandParameters) error { @@ -36,6 +38,7 @@ func ForwardWebsocketCommand(cmd string, p WebsocketCommandParameters) error { variables["SubscriptionID"] = p.Subscription variables["SubscriptionStatus"] = p.SubscriptionStatus variables["CloseReason"] = p.CloseReason + variables["FeatureEnabled"] = strconv.FormatBool(p.FeatureEnabled) args := &rpc_handler.RPCArgs{ RPCName: rpcName,