Skip to content

Commit

Permalink
Merge pull request #272 from twitchdev/eventsocket-264-265
Browse files Browse the repository at this point in the history
Solving 264 and 265
  • Loading branch information
Xemdo authored Sep 12, 2023
2 parents 100d3b0 + 490f61d commit 98ddbfb
Show file tree
Hide file tree
Showing 5 changed files with 51 additions and 21 deletions.
2 changes: 1 addition & 1 deletion internal/events/websocket/mock_server/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ type Client struct {
clientName string // Unique name for the client. Not the Client ID.
conn *websocket.Conn
mutex sync.Mutex
ConnectedAtTimestamp string
ConnectedAtTimestamp string // RFC3339Nano timestamp indicating when the client connected to the server
connectionUrl string

mustSubscribeTimer *time.Timer
Expand Down
9 changes: 7 additions & 2 deletions internal/events/websocket/mock_server/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,9 +263,14 @@ func subscriptionPageHandlerGet(w http.ResponseWriter, r *http.Request) {

for clientName, clientSubscriptions := range server.Subscriptions {
for _, subscription := range clientSubscriptions {
if clientID == "debug" || subscription.ClientID == clientID {
disabledAndExpired := false // Production EventSub only shows disabled WebSocket subscriptions that were disabled under 1 hour ago
if subscription.DisabledAt != nil && subscription.DisabledAt.Add(time.Hour).Before(util.GetTimestamp()) {
disabledAndExpired = true
}

if clientID == "debug" || (subscription.ClientID == clientID && !disabledAndExpired) {
allSubscriptions = append(allSubscriptions, SubscriptionPostSuccessResponseBody{
ID: subscription.ClientID,
ID: subscription.SubscriptionID,
Status: subscription.Status,
Type: subscription.Type,
Version: subscription.Version,
Expand Down
11 changes: 7 additions & 4 deletions internal/events/websocket/mock_server/rpc_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,10 +117,7 @@ func RPCFireEventSubHandler(args rpc.RPCArgs) rpc.RPCResponse {
}
}

clientName, exists := args.Variables["ClientName"]
if !exists {

}
clientName := args.Variables["ClientName"]
if sessionRegex.MatchString(clientName) {
// Users can include the full session_id given in the response. If they do, subtract it to just the client name
clientName = sessionRegex.FindAllStringSubmatch(clientName, -1)[0][2]
Expand Down Expand Up @@ -253,6 +250,12 @@ func RPCSubscriptionHandler(args rpc.RPCArgs) rpc.RPCResponse {
found = true

server.Subscriptions[client][i].Status = args.Variables["SubscriptionStatus"]
if args.Variables["SubscriptionStatus"] == STATUS_ENABLED {
server.Subscriptions[client][i].DisabledAt = nil
} else {
tNow := util.GetTimestamp()
server.Subscriptions[client][i].DisabledAt = &tNow
}
break
}
}
Expand Down
29 changes: 23 additions & 6 deletions internal/events/websocket/mock_server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ import (
const KEEPALIVE_TIMEOUT_SECONDS = 10

type WebSocketServer struct {
ServerId string // Int representing the ID of the server
//ConnectionUrl string // Server's url for people to connect to. Used for messaging in reconnect testing
DebugEnabled bool // Display debug messages; --debug
StrictMode bool // Force stricter production-like qualities; --strict
Upgrader websocket.Upgrader
ServerId string // Int representing the ID of the server
DebugEnabled bool // Display debug messages; --debug
StrictMode bool // Force stricter production-like qualities; --strict

Upgrader websocket.Upgrader

Clients *util.List[Client] // All connected clients
muClients sync.Mutex // Mutex for WebSocketServer.Clients
Expand Down Expand Up @@ -410,6 +410,8 @@ func (ws *WebSocketServer) HandleRPCEventSubForwarding(eventsubBody string, clie
foundClientId = sub.ClientID

ws.Subscriptions[client][i].Status = STATUS_AUTHORIZATION_REVOKED
tNow := util.GetTimestamp()
ws.Subscriptions[client][i].DisabledAt = &tNow
break
}
}
Expand All @@ -426,6 +428,7 @@ func (ws *WebSocketServer) HandleRPCEventSubForwarding(eventsubBody string, clie
}

// Check for subscriptions when running with --require-subscription
subscriptionCreatedAtTimestamp := "" // Used below if in strict mode
if ws.StrictMode {
found := false
for _, clientSubscriptions := range ws.Subscriptions {
Expand All @@ -436,6 +439,7 @@ func (ws *WebSocketServer) HandleRPCEventSubForwarding(eventsubBody string, clie
for _, sub := range clientSubscriptions {
if sub.SessionClientName == client.clientName && sub.Type == eventObj.Subscription.Type && sub.Version == eventObj.Subscription.Version {
found = true
subscriptionCreatedAtTimestamp = sub.CreatedAt
}
}
}
Expand All @@ -448,6 +452,16 @@ func (ws *WebSocketServer) HandleRPCEventSubForwarding(eventsubBody string, clie
// Change payload's subscription.transport.session_id to contain the correct Session ID
eventObj.Subscription.Transport.SessionID = fmt.Sprintf("%v_%v", ws.ServerId, client.clientName)

// Change payload's subscription.created_at to contain the correct timestamp -- https://github.com/twitchdev/twitch-cli/issues/264
if ws.StrictMode {
// When running WITH --require-subscription, created_at will be set to the time the subscription was created using the mock EventSub REST endpoint
eventObj.Subscription.CreatedAt = subscriptionCreatedAtTimestamp
} else {
// When running WITHOUT --require-subscription, created_at will be set to the time the client connected
// This is because without --require-subscription the server "grants" access to all event subscriptions at the moment the client is connected
eventObj.Subscription.CreatedAt = client.ConnectedAtTimestamp
}

// Build notification message
notificationMsg, err := json.Marshal(
NotificationMessage{
Expand Down Expand Up @@ -503,9 +517,12 @@ func (ws *WebSocketServer) handleClientConnectionClose(client *Client, closeReas
subscriptions := ws.Subscriptions[client.clientName]
for i := range subscriptions {
if subscriptions[i].Status == STATUS_ENABLED {
tNow := util.GetTimestamp()

subscriptions[i].Status = getStatusFromCloseMessage(closeReason)
subscriptions[i].ClientConnectedAt = ""
subscriptions[i].ClientDisconnectedAt = time.Now().UTC().Format(time.RFC3339Nano)
subscriptions[i].ClientDisconnectedAt = tNow.Format(time.RFC3339Nano)
subscriptions[i].DisabledAt = &tNow
}
}
ws.Subscriptions[client.clientName] = subscriptions
Expand Down
21 changes: 13 additions & 8 deletions internal/events/websocket/mock_server/subscription.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,20 @@
package mock_server

import "github.com/twitchdev/twitch-cli/internal/models"
import (
"time"

"github.com/twitchdev/twitch-cli/internal/models"
)

type Subscription struct {
SubscriptionID string // Random GUID for the subscription
ClientID string // Client ID included in headers
Type string // EventSub topic
Version string // EventSub topic version
CreatedAt string // Timestamp of when the subscription was created
Status string // Status of the subscription
SessionClientName string // Client name of the session this is associated with.
SubscriptionID string // Random GUID for the subscription
ClientID string // Client ID included in headers
Type string // EventSub topic
Version string // EventSub topic version
CreatedAt string // Timestamp of when the subscription was created
DisabledAt *time.Time // Not public; Timestamp of when the subscription was disabled
Status string // Status of the subscription
SessionClientName string // Client name of the session this is associated with.

ClientConnectedAt string // Time client connected
ClientDisconnectedAt string // Time client disconnected
Expand Down

0 comments on commit 98ddbfb

Please sign in to comment.