Skip to content

Commit

Permalink
Track all worker saves to resume old games
Browse files Browse the repository at this point in the history
Move library config to the top level
  • Loading branch information
sergystepanov committed Nov 17, 2024
1 parent 7b57f73 commit 1147aed
Show file tree
Hide file tree
Showing 14 changed files with 226 additions and 76 deletions.
16 changes: 16 additions & 0 deletions pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package api
import (
"encoding/json"
"fmt"
"strings"
)

type (
Expand Down Expand Up @@ -86,6 +87,7 @@ const (
TerminateSession PT = 204
AppVideoChange PT = 150
LibNewGameList PT = 205
PrevSessions PT = 206
)

func (p PT) String() string {
Expand Down Expand Up @@ -128,6 +130,8 @@ func (p PT) String() string {
return "AppVideoChange"
case LibNewGameList:
return "LibNewGameList"
case PrevSessions:
return "PrevSessions"
default:
return "Unknown"
}
Expand Down Expand Up @@ -166,3 +170,15 @@ func UnwrapChecked[T any](bytes []byte, err error) (*T, error) {
}

func Wrap(t any) ([]byte, error) { return json.Marshal(t) }

const separator = "___"

func ExplodeDeepLink(link string) (string, string) {
p := strings.SplitN(link, separator, 2)

if len(p) == 1 {
return p[0], ""
}

return p[0], p[1]
}
8 changes: 6 additions & 2 deletions pkg/api/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ type (
StatefulRoom[T]
Record bool
RecordUser string
Game GameInfo `json:"game"`
PlayerIndex int `json:"player_index"`
Game string `json:"game"`
PlayerIndex int `json:"player_index"`
}
GameInfo struct {
Alias string `json:"alias"`
Expand Down Expand Up @@ -71,4 +71,8 @@ type (
T int
List []GameInfo
}

PrevSessionInfo struct {
List []string
}
)
4 changes: 4 additions & 0 deletions pkg/config/emulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,10 @@ func (e Emulator) GetSupportedExtensions() []string {
return extensions
}

func (e Emulator) SessionStoragePath() string {
return e.Storage
}

func (l *LibretroConfig) GetCores() (cores []CoreInfo) {
for k, core := range l.Cores.List {
cores = append(cores, CoreInfo{Id: k, Name: core.Lib, AltRepo: core.AltRepo})
Expand Down
6 changes: 1 addition & 5 deletions pkg/coordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"strings"

"github.com/giongto35/cloud-game/v3/pkg/config"
"github.com/giongto35/cloud-game/v3/pkg/games"
"github.com/giongto35/cloud-game/v3/pkg/logger"
"github.com/giongto35/cloud-game/v3/pkg/monitoring"
"github.com/giongto35/cloud-game/v3/pkg/network/httpx"
Expand All @@ -23,10 +22,7 @@ type Coordinator struct {
}

func New(conf config.CoordinatorConfig, log *logger.Logger) (*Coordinator, error) {
coordinator := &Coordinator{}
lib := games.NewLib(conf.Library, conf.Emulator, log)
lib.Scan()
coordinator.hub = NewHub(conf, lib, log)
coordinator := &Coordinator{hub: NewHub(conf, log)}
h, err := NewHTTPServer(conf, log, func(mux *httpx.Mux) *httpx.Mux {
mux.HandleFunc("/ws", coordinator.hub.handleUserConnection())
mux.HandleFunc("/wso", coordinator.hub.handleWorkerConnection())
Expand Down
43 changes: 29 additions & 14 deletions pkg/coordinator/hub.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"github.com/giongto35/cloud-game/v3/pkg/api"
"github.com/giongto35/cloud-game/v3/pkg/com"
"github.com/giongto35/cloud-game/v3/pkg/config"
"github.com/giongto35/cloud-game/v3/pkg/games"
"github.com/giongto35/cloud-game/v3/pkg/logger"
)

Expand All @@ -24,20 +23,18 @@ type Connection interface {
}

type Hub struct {
conf config.CoordinatorConfig
launcher games.Launcher
log *logger.Logger
users com.NetMap[com.Uid, *User]
workers com.NetMap[com.Uid, *Worker]
conf config.CoordinatorConfig
log *logger.Logger
users com.NetMap[com.Uid, *User]
workers com.NetMap[com.Uid, *Worker]
}

func NewHub(conf config.CoordinatorConfig, lib games.GameLibrary, log *logger.Logger) *Hub {
func NewHub(conf config.CoordinatorConfig, log *logger.Logger) *Hub {
return &Hub{
conf: conf,
users: com.NewNetMap[com.Uid, *User](),
workers: com.NewNetMap[com.Uid, *Worker](),
launcher: games.NewGameLauncher(lib),
log: log,
conf: conf,
users: com.NewNetMap[com.Uid, *User](),
workers: com.NewNetMap[com.Uid, *Worker](),
log: log,
}
}

Expand All @@ -62,8 +59,9 @@ func (h *Hub) handleUserConnection() http.HandlerFunc {

user := NewUser(conn, log)
defer h.users.RemoveDisconnect(user)
done := user.HandleRequests(h, h.launcher, h.conf)
done := user.HandleRequests(h, h.conf)
params := r.URL.Query()

worker := h.findWorkerFor(user, params, h.log.Extend(h.log.With().Str("cid", user.Id().Short())))
if worker == nil {
user.Notify(api.ErrNoFreeSlots, "")
Expand All @@ -72,11 +70,13 @@ func (h *Hub) handleUserConnection() http.HandlerFunc {
}
user.Bind(worker)
h.users.Add(user)
apps := h.launcher.GetAppNames()

apps := worker.AppNames()
list := make([]api.AppMeta, len(apps))
for i := range apps {
list[i] = api.AppMeta{Alias: apps[i].Alias, Title: apps[i].Name, System: apps[i].System}
}

user.InitSession(worker.Id().String(), h.conf.Webrtc.IceServers, list)
log.Info().Str(logger.DirectionField, logger.MarkPlus).Msgf("user %s", user.Id())
<-done
Expand Down Expand Up @@ -175,9 +175,13 @@ func (h *Hub) findWorkerFor(usr *User, q url.Values, log *logger.Logger) *Worker
zone := q.Get(api.ZoneQueryParam)
wid := q.Get(api.WorkerIdParam)

sessionId, _ := api.ExplodeDeepLink(roomId)

var worker *Worker
if worker = h.findWorkerByRoom(roomId, zone); worker != nil {
log.Debug().Str("room", roomId).Msg("An existing worker has been found")
} else if worker = h.findWorkerByPreviousRoom(sessionId); worker != nil {
log.Debug().Msgf("Worker %v with the previous room: %v is found", wid, roomId)
} else if worker = h.findWorkerById(wid, h.conf.Coordinator.Debug); worker != nil {
log.Debug().Msgf("Worker with id: %v has been found", wid)
} else {
Expand All @@ -198,6 +202,17 @@ func (h *Hub) findWorkerFor(usr *User, q url.Values, log *logger.Logger) *Worker
return worker
}

func (h *Hub) findWorkerByPreviousRoom(id string) *Worker {
if id == "" {
return nil
}
w, _ := h.workers.FindBy(func(w *Worker) bool {
// session and room id are the same
return w.HadSession(id) && w.HasSlot()
})
return w
}

func (h *Hub) findWorkerByRoom(id string, region string) *Worker {
if id == "" {
return nil
Expand Down
5 changes: 2 additions & 3 deletions pkg/coordinator/user.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"github.com/giongto35/cloud-game/v3/pkg/api"
"github.com/giongto35/cloud-game/v3/pkg/com"
"github.com/giongto35/cloud-game/v3/pkg/config"
"github.com/giongto35/cloud-game/v3/pkg/games"
"github.com/giongto35/cloud-game/v3/pkg/logger"
)

Expand Down Expand Up @@ -42,7 +41,7 @@ func (u *User) Disconnect() {
}
}

func (u *User) HandleRequests(info HasServerInfo, launcher games.Launcher, conf config.CoordinatorConfig) chan struct{} {
func (u *User) HandleRequests(info HasServerInfo, conf config.CoordinatorConfig) chan struct{} {
return u.ProcessPackets(func(x api.In[com.Uid]) error {
payload := x.GetPayload()
switch x.GetType() {
Expand All @@ -67,7 +66,7 @@ func (u *User) HandleRequests(info HasServerInfo, launcher games.Launcher, conf
if rq == nil {
return api.ErrMalformed
}
u.HandleStartGame(*rq, launcher, conf)
u.HandleStartGame(*rq, conf)
case api.QuitGame:
rq := api.Unwrap[api.GameQuitRequest[com.Uid]](payload)
if rq == nil {
Expand Down
31 changes: 9 additions & 22 deletions pkg/coordinator/userhandlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"github.com/giongto35/cloud-game/v3/pkg/api"
"github.com/giongto35/cloud-game/v3/pkg/com"
"github.com/giongto35/cloud-game/v3/pkg/config"
"github.com/giongto35/cloud-game/v3/pkg/games"
)

func (u *User) HandleWebrtcInit() {
Expand All @@ -26,27 +25,8 @@ func (u *User) HandleWebrtcIceCandidate(rq api.WebrtcUserIceCandidate) {
u.w.WebrtcIceCandidate(u.Id(), string(rq))
}

func (u *User) HandleStartGame(rq api.GameStartUserRequest, launcher games.Launcher, conf config.CoordinatorConfig) {
// +injects game data into the original game request
// the name of the game either in the `room id` field or
// it's in the initial request
game := rq.GameName
if rq.RoomId != "" {
name := launcher.ExtractAppNameFromUrl(rq.RoomId)
if name == "" {
u.log.Warn().Msg("couldn't decode game name from the room id")
return
}
game = name
}

gameInfo, err := launcher.FindAppByName(game)
if err != nil {
u.log.Error().Err(err).Send()
return
}

startGameResp, err := u.w.StartGame(u.Id(), gameInfo, rq)
func (u *User) HandleStartGame(rq api.GameStartUserRequest, conf config.CoordinatorConfig) {
startGameResp, err := u.w.StartGame(u.Id(), rq)
if err != nil || startGameResp == nil {
u.log.Error().Err(err).Msg("malformed game start response")
return
Expand Down Expand Up @@ -75,6 +55,13 @@ func (u *User) HandleSaveGame() error {
if err != nil {
return err
}

if *resp == api.OK {
if id, _ := api.ExplodeDeepLink(u.w.RoomId); id != "" {
u.w.AddSession(id)
}
}

u.Notify(api.SaveGame, resp)
return nil
}
Expand Down
56 changes: 56 additions & 0 deletions pkg/coordinator/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@ import (
)

type Worker struct {
AppLibrary
Connection
RegionalClient
Session
slotted

Addr string
Expand All @@ -21,6 +23,9 @@ type Worker struct {
Tag string
Zone string

Lib []api.GameInfo
Sessions map[string]struct{}

log *logger.Logger
}

Expand All @@ -32,6 +37,27 @@ type HasUserRegistry interface {
Find(com.Uid) *User
}

type AppLibrary interface {
SetLib([]api.GameInfo)
AppNames() []api.GameInfo
}

type Session interface {
AddSession(id string)
// HadSession is true when an old session is found
HadSession(id string) bool
SetSessions(map[string]struct{})
}

type AppMeta struct {
Alias string
Base string
Name string
Path string
System string
Type string
}

func NewWorker(sock *com.Connection, handshake api.ConnectionRequest[com.Uid], log *logger.Logger) *Worker {
conn := com.NewConnection[api.PT, api.In[com.Uid], api.Out, *api.Out](sock, handshake.Id, log)
return &Worker{
Expand Down Expand Up @@ -84,13 +110,43 @@ func (w *Worker) HandleRequests(users HasUserRegistry) chan struct{} {
w.log.Error().Err(err).Send()
return api.ErrMalformed
}
case api.PrevSessions:
sess := api.Unwrap[api.PrevSessionInfo](payload)
if sess == nil {
return api.ErrMalformed
}
if err := w.HandlePrevSessionList(*sess); err != nil {
w.log.Error().Err(err).Send()
return api.ErrMalformed
}
default:
w.log.Warn().Msgf("Unknown packet: %+v", p)
}
return nil
})
}

func (w *Worker) SetLib(list []api.GameInfo) {
w.Lib = list
}

func (w *Worker) AppNames() []api.GameInfo {
return w.Lib
}

func (w *Worker) AddSession(id string) {
w.Sessions[id] = struct{}{}
}

func (w *Worker) HadSession(id string) bool {
_, ok := w.Sessions[id]
return ok
}

func (w *Worker) SetSessions(sessions map[string]struct{}) {
w.Sessions = sessions
}

// In say whether some worker from this region (zone).
// Empty region always returns true.
func (w *Worker) In(region string) bool { return region == "" || region == w.Zone }
Expand Down
5 changes: 2 additions & 3 deletions pkg/coordinator/workerapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package coordinator
import (
"github.com/giongto35/cloud-game/v3/pkg/api"
"github.com/giongto35/cloud-game/v3/pkg/com"
"github.com/giongto35/cloud-game/v3/pkg/games"
)

func (w *Worker) WebrtcInit(id com.Uid) (*api.WebrtcInitResponse, error) {
Expand All @@ -19,11 +18,11 @@ func (w *Worker) WebrtcIceCandidate(id com.Uid, can string) {
w.Notify(api.WebrtcIce, api.WebrtcIceCandidateRequest[com.Uid]{Stateful: api.Stateful[com.Uid]{Id: id}, Candidate: can})
}

func (w *Worker) StartGame(id com.Uid, app games.AppMeta, req api.GameStartUserRequest) (*api.StartGameResponse, error) {
func (w *Worker) StartGame(id com.Uid, req api.GameStartUserRequest) (*api.StartGameResponse, error) {
return api.UnwrapChecked[api.StartGameResponse](
w.Send(api.StartGame, api.StartGameRequest[com.Uid]{
StatefulRoom: StateRoom(id, req.RoomId),
Game: api.GameInfo(app),
Game: req.GameName,
PlayerIndex: req.PlayerIndex,
Record: req.Record,
RecordUser: req.RecordUser,
Expand Down
Loading

0 comments on commit 1147aed

Please sign in to comment.