Skip to content

Commit

Permalink
Added RPC command to disable keepalive messages for a client; #291
Browse files Browse the repository at this point in the history
  • Loading branch information
Xemdo committed Dec 29, 2023
1 parent 90a6af4 commit 2a62486
Show file tree
Hide file tree
Showing 6 changed files with 101 additions and 19 deletions.
26 changes: 15 additions & 11 deletions cmd/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,15 +53,16 @@ var (

// websocketCmd-specific flags
var (
wsDebug bool
wsStrict bool
wsClient string
wsSubscription string
wsStatus string
wsReason string
wsServerIP string
wsServerPort int
wsSSL bool
wsDebug bool
wsStrict bool
wsClient string
wsSubscription string
wsStatus string
wsReason string
wsServerIP string
wsServerPort int
wsSSL bool
wsFeatureEnabled bool
)

var eventCmd = &cobra.Command{
Expand Down Expand Up @@ -108,7 +109,8 @@ var websocketCmd = &cobra.Command{
Example: fmt.Sprintf(` twitch event websocket start-server
twitch event websocket reconnect
twitch event websocket close --session=e411cc1e_a2613d4e --reason=4006
twitch event websocket subscription --status=user_removed --subscription=82a855-fae8-93bff0`,
twitch event websocket subscription --status=user_removed --subscription=82a855-fae8-93bff0
twitch event websocket keepalive --session=e411cc1e_a2613d4e --enabled=false`,
),
Aliases: []string{
"websockets",
Expand Down Expand Up @@ -205,6 +207,7 @@ func init() {
websocketCmd.Flags().StringVar(&wsSubscription, "subscription", "", `Subscription to target with your server command. Used with "websocket subscription".`)
websocketCmd.Flags().StringVar(&wsStatus, "status", "", `Changes the status of an existing subscription. Used with "websocket subscription".`)
websocketCmd.Flags().StringVar(&wsReason, "reason", "", `Sets the close reason when sending a Close message to the client. Used with "websocket close".`)
websocketCmd.Flags().BoolVar(&wsFeatureEnabled, "enabled", false, "Sets on/off for the specified feature.")

// configure flags
configureEventCmd.Flags().StringVarP(&forwardAddress, "forward-address", "F", "", "Forward address for mock event (webhook only).")
Expand Down Expand Up @@ -366,7 +369,7 @@ https://dev.twitch.tv/docs/eventsub/handling-webhook-events#processing-an-event`
Timestamp: timestamp,
EventID: eventID,
BroadcasterUserID: toUser,
Version: version,
Version: version,
})

if err != nil {
Expand All @@ -393,6 +396,7 @@ func websocketCmdRun(cmd *cobra.Command, args []string) error {
Subscription: wsSubscription,
SubscriptionStatus: wsStatus,
CloseReason: wsReason,
FeatureEnabled: wsFeatureEnabled,
})

return err
Expand Down
1 change: 1 addition & 0 deletions internal/events/websocket/mock_server/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ type Client struct {
mutex sync.Mutex
ConnectedAtTimestamp string // RFC3339Nano timestamp indicating when the client connected to the server
connectionUrl string
KeepAliveEnabled bool

mustSubscribeTimer *time.Timer
keepAliveChanOpen bool
Expand Down
1 change: 1 addition & 0 deletions internal/events/websocket/mock_server/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ func StartWebsocketServer(enableDebug bool, ip string, port int, enableSSL bool,
rpc.RegisterHandler("EventSubWebSocketForwardEvent", RPCFireEventSubHandler)
rpc.RegisterHandler("EventSubWebSocketCloseClient", RPCCloseHandler)
rpc.RegisterHandler("EventSubWebSocketSubscription", RPCSubscriptionHandler)
rpc.RegisterHandler("EventSubWebSocketKeepalive", RPCKeepaliveHandler)
rpc.StartBackgroundServer()

// TODO: Interactive shell maybe?
Expand Down
80 changes: 72 additions & 8 deletions internal/events/websocket/mock_server/rpc_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ func ResolveRPCName(cmd string) string {
return "EventSubWebSocketCloseClient"
} else if cmd == "subscription" {
return "EventSubWebSocketSubscription"
} else if cmd == "keepalive" {
return "EventSubWebSocketKeepalive"
} else {
return ""
}
Expand Down Expand Up @@ -151,8 +153,6 @@ func RPCCloseHandler(args rpc.RPCArgs) rpc.RPCResponse {
}
}

clientName := args.Variables["ClientName"]

if serverManager.reconnectTesting {
log.Printf("Error on RPC call (EventSubWebSocketCloseClient): Could not activate while reconnect testing is active.")
return rpc.RPCResponse{
Expand All @@ -170,21 +170,21 @@ func RPCCloseHandler(args rpc.RPCArgs) rpc.RPCResponse {
}
}

cn := clientName
if sessionRegex.MatchString(clientName) {
clientName := args.Variables["ClientName"]
if sessionRegex.MatchString(args.Variables["ClientName"]) {
// Client name given was formatted as <server_id>_<client_name>. We must extract it
sessionRegexExec := sessionRegex.FindAllStringSubmatch(clientName, -1)
cn = sessionRegexExec[0][2]
sessionRegexExec := sessionRegex.FindAllStringSubmatch(args.Variables["ClientName"], -1)
clientName = sessionRegexExec[0][2]
}

server.muClients.Lock()

client, ok := server.Clients.Get(cn)
client, ok := server.Clients.Get(clientName)
if !ok {
server.muClients.Unlock()
return rpc.RPCResponse{
ResponseCode: COMMAND_RESPONSE_FAILED_ON_SERVER,
DetailedInfo: "Client [" + cn + "] does not exist on WebSocket server.",
DetailedInfo: "Client [" + clientName + "] does not exist on WebSocket server.",
}
}

Expand Down Expand Up @@ -273,3 +273,67 @@ func RPCSubscriptionHandler(args rpc.RPCArgs) rpc.RPCResponse {
ResponseCode: COMMAND_RESPONSE_SUCCESS,
}
}

func RPCKeepaliveHandler(args rpc.RPCArgs) rpc.RPCResponse {
if args.Variables["FeatureEnabled"] == "" || args.Variables["ClientName"] == "" {
return rpc.RPCResponse{
ResponseCode: COMMAND_RESPONSE_MISSING_FLAG,
DetailedInfo: "Command \"keepalive\" requires flags --session and --enabled" +
"\n\nExample: twitch event websocket keepalive --session=e411cc1e_a2613d4e --enabled=false",
}
}

enabled, err := strconv.ParseBool(args.Variables["FeatureEnabled"])
if err != nil {
return rpc.RPCResponse{
ResponseCode: COMMAND_RESPONSE_MISSING_FLAG,
DetailedInfo: "Command \"keepalive\" requires --enabled to be \"true\" or \"false\"" +
"\n\nExample: twitch event websocket keepalive --session=e411cc1e_a2613d4e --enabled=false",
}
}

if serverManager.reconnectTesting {
log.Printf("Error on RPC call (EventSubWebSocketCloseClient): Could not activate while reconnect testing is active.")
return rpc.RPCResponse{
ResponseCode: COMMAND_RESPONSE_FAILED_ON_SERVER,
DetailedInfo: "Cannot activate this command while reconnect testing is active.",
}
}

server, ok := serverManager.serverList.Get(serverManager.primaryServer)
if !ok {
log.Printf("Error on RPC call (EventSubWebSocketCloseClient): Primary server not in server list.")
return rpc.RPCResponse{
ResponseCode: COMMAND_RESPONSE_FAILED_ON_SERVER,
DetailedInfo: "Primary server not in server list.",
}
}

clientName := args.Variables["ClientName"]
if sessionRegex.MatchString(args.Variables["ClientName"]) {
// Client name given was formatted as <server_id>_<client_name>. We must extract it
sessionRegexExec := sessionRegex.FindAllStringSubmatch(args.Variables["ClientName"], -1)
clientName = sessionRegexExec[0][2]
}

server.muClients.Lock()

client, ok := server.Clients.Get(clientName)
if !ok {
server.muClients.Unlock()
return rpc.RPCResponse{
ResponseCode: COMMAND_RESPONSE_FAILED_ON_SERVER,
DetailedInfo: "Client [" + clientName + "] does not exist on WebSocket server.",
}
}

client.KeepAliveEnabled = enabled

server.muClients.Unlock()

log.Printf("RPC set status on client feature [KeepAliveEnabled] for client [%v]: %v", clientName, enabled)

return rpc.RPCResponse{
ResponseCode: COMMAND_RESPONSE_SUCCESS,
}
}
9 changes: 9 additions & 0 deletions internal/events/websocket/mock_server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ func (ws *WebSocketServer) WsPageHandler(w http.ResponseWriter, r *http.Request)
conn: conn,
ConnectedAtTimestamp: connectedAtTimestamp,
connectionUrl: fmt.Sprintf("%v://%v/ws", serverManager.protocolHttp, r.Host),
KeepAliveEnabled: true,
keepAliveChanOpen: false,
pingChanOpen: false,
}
Expand Down Expand Up @@ -178,6 +179,14 @@ func (ws *WebSocketServer) WsPageHandler(w http.ResponseWriter, r *http.Request)
return

case <-client.keepAliveTimer.C: // Send KeepAlive message
if !client.KeepAliveEnabled {
// Sending keep alives was disabled manually, so we skip this one.
if ws.DebugEnabled {
log.Printf("Skipped sending session_keepalive to client [%s]", client.clientName)
}
continue
}

keepAliveMsg, _ := json.Marshal(
KeepaliveMessage{
Metadata: MessageMetadata{
Expand Down
3 changes: 3 additions & 0 deletions internal/events/websocket/websocket_cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package websocket
import (
"fmt"
"net/rpc"
"strconv"

"github.com/fatih/color"
"github.com/twitchdev/twitch-cli/internal/events/websocket/mock_server"
Expand All @@ -14,6 +15,7 @@ type WebsocketCommandParameters struct {
Subscription string
SubscriptionStatus string
CloseReason string
FeatureEnabled bool
}

func ForwardWebsocketCommand(cmd string, p WebsocketCommandParameters) error {
Expand All @@ -36,6 +38,7 @@ func ForwardWebsocketCommand(cmd string, p WebsocketCommandParameters) error {
variables["SubscriptionID"] = p.Subscription
variables["SubscriptionStatus"] = p.SubscriptionStatus
variables["CloseReason"] = p.CloseReason
variables["FeatureEnabled"] = strconv.FormatBool(p.FeatureEnabled)

args := &rpc_handler.RPCArgs{
RPCName: rpcName,
Expand Down

0 comments on commit 2a62486

Please sign in to comment.