From 1147aeda144dea9ba46e62a8f65088b482c738df Mon Sep 17 00:00:00 2001 From: Sergey Stepanov Date: Sun, 17 Nov 2024 00:51:53 +0300 Subject: [PATCH] Track all worker saves to resume old games Move library config to the top level --- pkg/api/api.go | 16 +++++++++ pkg/api/worker.go | 8 +++-- pkg/config/emulator.go | 4 +++ pkg/coordinator/coordinator.go | 6 +--- pkg/coordinator/hub.go | 43 ++++++++++++++++-------- pkg/coordinator/user.go | 5 ++- pkg/coordinator/userhandlers.go | 31 +++++------------ pkg/coordinator/worker.go | 56 +++++++++++++++++++++++++++++++ pkg/coordinator/workerapi.go | 5 ++- pkg/coordinator/workerhandlers.go | 15 ++++++++- pkg/games/library.go | 53 +++++++++++++++++++++-------- pkg/worker/coordinator.go | 20 +++++++++-- pkg/worker/coordinatorhandlers.go | 27 ++++++++++++--- pkg/worker/worker.go | 13 ++++--- 14 files changed, 226 insertions(+), 76 deletions(-) diff --git a/pkg/api/api.go b/pkg/api/api.go index 6700ed04c..3c33e1e72 100644 --- a/pkg/api/api.go +++ b/pkg/api/api.go @@ -19,6 +19,7 @@ package api import ( "encoding/json" "fmt" + "strings" ) type ( @@ -86,6 +87,7 @@ const ( TerminateSession PT = 204 AppVideoChange PT = 150 LibNewGameList PT = 205 + PrevSessions PT = 206 ) func (p PT) String() string { @@ -128,6 +130,8 @@ func (p PT) String() string { return "AppVideoChange" case LibNewGameList: return "LibNewGameList" + case PrevSessions: + return "PrevSessions" default: return "Unknown" } @@ -166,3 +170,15 @@ func UnwrapChecked[T any](bytes []byte, err error) (*T, error) { } func Wrap(t any) ([]byte, error) { return json.Marshal(t) } + +const separator = "___" + +func ExplodeDeepLink(link string) (string, string) { + p := strings.SplitN(link, separator, 2) + + if len(p) == 1 { + return p[0], "" + } + + return p[0], p[1] +} diff --git a/pkg/api/worker.go b/pkg/api/worker.go index 2bf963e3b..5031afab6 100644 --- a/pkg/api/worker.go +++ b/pkg/api/worker.go @@ -21,8 +21,8 @@ type ( StatefulRoom[T] Record bool RecordUser string - Game GameInfo `json:"game"` - PlayerIndex int `json:"player_index"` + Game string `json:"game"` + PlayerIndex int `json:"player_index"` } GameInfo struct { Alias string `json:"alias"` @@ -71,4 +71,8 @@ type ( T int List []GameInfo } + + PrevSessionInfo struct { + List []string + } ) diff --git a/pkg/config/emulator.go b/pkg/config/emulator.go index 4db9b825e..2fc71fe2d 100644 --- a/pkg/config/emulator.go +++ b/pkg/config/emulator.go @@ -103,6 +103,10 @@ func (e Emulator) GetSupportedExtensions() []string { return extensions } +func (e Emulator) SessionStoragePath() string { + return e.Storage +} + func (l *LibretroConfig) GetCores() (cores []CoreInfo) { for k, core := range l.Cores.List { cores = append(cores, CoreInfo{Id: k, Name: core.Lib, AltRepo: core.AltRepo}) diff --git a/pkg/coordinator/coordinator.go b/pkg/coordinator/coordinator.go index 61dcf43c9..45f1ec446 100644 --- a/pkg/coordinator/coordinator.go +++ b/pkg/coordinator/coordinator.go @@ -8,7 +8,6 @@ import ( "strings" "github.com/giongto35/cloud-game/v3/pkg/config" - "github.com/giongto35/cloud-game/v3/pkg/games" "github.com/giongto35/cloud-game/v3/pkg/logger" "github.com/giongto35/cloud-game/v3/pkg/monitoring" "github.com/giongto35/cloud-game/v3/pkg/network/httpx" @@ -23,10 +22,7 @@ type Coordinator struct { } func New(conf config.CoordinatorConfig, log *logger.Logger) (*Coordinator, error) { - coordinator := &Coordinator{} - lib := games.NewLib(conf.Library, conf.Emulator, log) - lib.Scan() - coordinator.hub = NewHub(conf, lib, log) + coordinator := &Coordinator{hub: NewHub(conf, log)} h, err := NewHTTPServer(conf, log, func(mux *httpx.Mux) *httpx.Mux { mux.HandleFunc("/ws", coordinator.hub.handleUserConnection()) mux.HandleFunc("/wso", coordinator.hub.handleWorkerConnection()) diff --git a/pkg/coordinator/hub.go b/pkg/coordinator/hub.go index c3c33b918..4b8215919 100644 --- a/pkg/coordinator/hub.go +++ b/pkg/coordinator/hub.go @@ -10,7 +10,6 @@ import ( "github.com/giongto35/cloud-game/v3/pkg/api" "github.com/giongto35/cloud-game/v3/pkg/com" "github.com/giongto35/cloud-game/v3/pkg/config" - "github.com/giongto35/cloud-game/v3/pkg/games" "github.com/giongto35/cloud-game/v3/pkg/logger" ) @@ -24,20 +23,18 @@ type Connection interface { } type Hub struct { - conf config.CoordinatorConfig - launcher games.Launcher - log *logger.Logger - users com.NetMap[com.Uid, *User] - workers com.NetMap[com.Uid, *Worker] + conf config.CoordinatorConfig + log *logger.Logger + users com.NetMap[com.Uid, *User] + workers com.NetMap[com.Uid, *Worker] } -func NewHub(conf config.CoordinatorConfig, lib games.GameLibrary, log *logger.Logger) *Hub { +func NewHub(conf config.CoordinatorConfig, log *logger.Logger) *Hub { return &Hub{ - conf: conf, - users: com.NewNetMap[com.Uid, *User](), - workers: com.NewNetMap[com.Uid, *Worker](), - launcher: games.NewGameLauncher(lib), - log: log, + conf: conf, + users: com.NewNetMap[com.Uid, *User](), + workers: com.NewNetMap[com.Uid, *Worker](), + log: log, } } @@ -62,8 +59,9 @@ func (h *Hub) handleUserConnection() http.HandlerFunc { user := NewUser(conn, log) defer h.users.RemoveDisconnect(user) - done := user.HandleRequests(h, h.launcher, h.conf) + done := user.HandleRequests(h, h.conf) params := r.URL.Query() + worker := h.findWorkerFor(user, params, h.log.Extend(h.log.With().Str("cid", user.Id().Short()))) if worker == nil { user.Notify(api.ErrNoFreeSlots, "") @@ -72,11 +70,13 @@ func (h *Hub) handleUserConnection() http.HandlerFunc { } user.Bind(worker) h.users.Add(user) - apps := h.launcher.GetAppNames() + + apps := worker.AppNames() list := make([]api.AppMeta, len(apps)) for i := range apps { list[i] = api.AppMeta{Alias: apps[i].Alias, Title: apps[i].Name, System: apps[i].System} } + user.InitSession(worker.Id().String(), h.conf.Webrtc.IceServers, list) log.Info().Str(logger.DirectionField, logger.MarkPlus).Msgf("user %s", user.Id()) <-done @@ -175,9 +175,13 @@ func (h *Hub) findWorkerFor(usr *User, q url.Values, log *logger.Logger) *Worker zone := q.Get(api.ZoneQueryParam) wid := q.Get(api.WorkerIdParam) + sessionId, _ := api.ExplodeDeepLink(roomId) + var worker *Worker if worker = h.findWorkerByRoom(roomId, zone); worker != nil { log.Debug().Str("room", roomId).Msg("An existing worker has been found") + } else if worker = h.findWorkerByPreviousRoom(sessionId); worker != nil { + log.Debug().Msgf("Worker %v with the previous room: %v is found", wid, roomId) } else if worker = h.findWorkerById(wid, h.conf.Coordinator.Debug); worker != nil { log.Debug().Msgf("Worker with id: %v has been found", wid) } else { @@ -198,6 +202,17 @@ func (h *Hub) findWorkerFor(usr *User, q url.Values, log *logger.Logger) *Worker return worker } +func (h *Hub) findWorkerByPreviousRoom(id string) *Worker { + if id == "" { + return nil + } + w, _ := h.workers.FindBy(func(w *Worker) bool { + // session and room id are the same + return w.HadSession(id) && w.HasSlot() + }) + return w +} + func (h *Hub) findWorkerByRoom(id string, region string) *Worker { if id == "" { return nil diff --git a/pkg/coordinator/user.go b/pkg/coordinator/user.go index 00479fda2..471561716 100644 --- a/pkg/coordinator/user.go +++ b/pkg/coordinator/user.go @@ -4,7 +4,6 @@ import ( "github.com/giongto35/cloud-game/v3/pkg/api" "github.com/giongto35/cloud-game/v3/pkg/com" "github.com/giongto35/cloud-game/v3/pkg/config" - "github.com/giongto35/cloud-game/v3/pkg/games" "github.com/giongto35/cloud-game/v3/pkg/logger" ) @@ -42,7 +41,7 @@ func (u *User) Disconnect() { } } -func (u *User) HandleRequests(info HasServerInfo, launcher games.Launcher, conf config.CoordinatorConfig) chan struct{} { +func (u *User) HandleRequests(info HasServerInfo, conf config.CoordinatorConfig) chan struct{} { return u.ProcessPackets(func(x api.In[com.Uid]) error { payload := x.GetPayload() switch x.GetType() { @@ -67,7 +66,7 @@ func (u *User) HandleRequests(info HasServerInfo, launcher games.Launcher, conf if rq == nil { return api.ErrMalformed } - u.HandleStartGame(*rq, launcher, conf) + u.HandleStartGame(*rq, conf) case api.QuitGame: rq := api.Unwrap[api.GameQuitRequest[com.Uid]](payload) if rq == nil { diff --git a/pkg/coordinator/userhandlers.go b/pkg/coordinator/userhandlers.go index cf62d65ba..61c3f494e 100644 --- a/pkg/coordinator/userhandlers.go +++ b/pkg/coordinator/userhandlers.go @@ -6,7 +6,6 @@ import ( "github.com/giongto35/cloud-game/v3/pkg/api" "github.com/giongto35/cloud-game/v3/pkg/com" "github.com/giongto35/cloud-game/v3/pkg/config" - "github.com/giongto35/cloud-game/v3/pkg/games" ) func (u *User) HandleWebrtcInit() { @@ -26,27 +25,8 @@ func (u *User) HandleWebrtcIceCandidate(rq api.WebrtcUserIceCandidate) { u.w.WebrtcIceCandidate(u.Id(), string(rq)) } -func (u *User) HandleStartGame(rq api.GameStartUserRequest, launcher games.Launcher, conf config.CoordinatorConfig) { - // +injects game data into the original game request - // the name of the game either in the `room id` field or - // it's in the initial request - game := rq.GameName - if rq.RoomId != "" { - name := launcher.ExtractAppNameFromUrl(rq.RoomId) - if name == "" { - u.log.Warn().Msg("couldn't decode game name from the room id") - return - } - game = name - } - - gameInfo, err := launcher.FindAppByName(game) - if err != nil { - u.log.Error().Err(err).Send() - return - } - - startGameResp, err := u.w.StartGame(u.Id(), gameInfo, rq) +func (u *User) HandleStartGame(rq api.GameStartUserRequest, conf config.CoordinatorConfig) { + startGameResp, err := u.w.StartGame(u.Id(), rq) if err != nil || startGameResp == nil { u.log.Error().Err(err).Msg("malformed game start response") return @@ -75,6 +55,13 @@ func (u *User) HandleSaveGame() error { if err != nil { return err } + + if *resp == api.OK { + if id, _ := api.ExplodeDeepLink(u.w.RoomId); id != "" { + u.w.AddSession(id) + } + } + u.Notify(api.SaveGame, resp) return nil } diff --git a/pkg/coordinator/worker.go b/pkg/coordinator/worker.go index 3ce1087ff..a00ad2264 100644 --- a/pkg/coordinator/worker.go +++ b/pkg/coordinator/worker.go @@ -10,8 +10,10 @@ import ( ) type Worker struct { + AppLibrary Connection RegionalClient + Session slotted Addr string @@ -21,6 +23,9 @@ type Worker struct { Tag string Zone string + Lib []api.GameInfo + Sessions map[string]struct{} + log *logger.Logger } @@ -32,6 +37,27 @@ type HasUserRegistry interface { Find(com.Uid) *User } +type AppLibrary interface { + SetLib([]api.GameInfo) + AppNames() []api.GameInfo +} + +type Session interface { + AddSession(id string) + // HadSession is true when an old session is found + HadSession(id string) bool + SetSessions(map[string]struct{}) +} + +type AppMeta struct { + Alias string + Base string + Name string + Path string + System string + Type string +} + func NewWorker(sock *com.Connection, handshake api.ConnectionRequest[com.Uid], log *logger.Logger) *Worker { conn := com.NewConnection[api.PT, api.In[com.Uid], api.Out, *api.Out](sock, handshake.Id, log) return &Worker{ @@ -84,6 +110,15 @@ func (w *Worker) HandleRequests(users HasUserRegistry) chan struct{} { w.log.Error().Err(err).Send() return api.ErrMalformed } + case api.PrevSessions: + sess := api.Unwrap[api.PrevSessionInfo](payload) + if sess == nil { + return api.ErrMalformed + } + if err := w.HandlePrevSessionList(*sess); err != nil { + w.log.Error().Err(err).Send() + return api.ErrMalformed + } default: w.log.Warn().Msgf("Unknown packet: %+v", p) } @@ -91,6 +126,27 @@ func (w *Worker) HandleRequests(users HasUserRegistry) chan struct{} { }) } +func (w *Worker) SetLib(list []api.GameInfo) { + w.Lib = list +} + +func (w *Worker) AppNames() []api.GameInfo { + return w.Lib +} + +func (w *Worker) AddSession(id string) { + w.Sessions[id] = struct{}{} +} + +func (w *Worker) HadSession(id string) bool { + _, ok := w.Sessions[id] + return ok +} + +func (w *Worker) SetSessions(sessions map[string]struct{}) { + w.Sessions = sessions +} + // In say whether some worker from this region (zone). // Empty region always returns true. func (w *Worker) In(region string) bool { return region == "" || region == w.Zone } diff --git a/pkg/coordinator/workerapi.go b/pkg/coordinator/workerapi.go index 000059060..05ead1d98 100644 --- a/pkg/coordinator/workerapi.go +++ b/pkg/coordinator/workerapi.go @@ -3,7 +3,6 @@ package coordinator import ( "github.com/giongto35/cloud-game/v3/pkg/api" "github.com/giongto35/cloud-game/v3/pkg/com" - "github.com/giongto35/cloud-game/v3/pkg/games" ) func (w *Worker) WebrtcInit(id com.Uid) (*api.WebrtcInitResponse, error) { @@ -19,11 +18,11 @@ func (w *Worker) WebrtcIceCandidate(id com.Uid, can string) { w.Notify(api.WebrtcIce, api.WebrtcIceCandidateRequest[com.Uid]{Stateful: api.Stateful[com.Uid]{Id: id}, Candidate: can}) } -func (w *Worker) StartGame(id com.Uid, app games.AppMeta, req api.GameStartUserRequest) (*api.StartGameResponse, error) { +func (w *Worker) StartGame(id com.Uid, req api.GameStartUserRequest) (*api.StartGameResponse, error) { return api.UnwrapChecked[api.StartGameResponse]( w.Send(api.StartGame, api.StartGameRequest[com.Uid]{ StatefulRoom: StateRoom(id, req.RoomId), - Game: api.GameInfo(app), + Game: req.GameName, PlayerIndex: req.PlayerIndex, Record: req.Record, RecordUser: req.RecordUser, diff --git a/pkg/coordinator/workerhandlers.go b/pkg/coordinator/workerhandlers.go index 0a1571e6e..5716b6215 100644 --- a/pkg/coordinator/workerhandlers.go +++ b/pkg/coordinator/workerhandlers.go @@ -23,6 +23,19 @@ func (w *Worker) HandleIceCandidate(rq api.WebrtcIceCandidateRequest[com.Uid], u } func (w *Worker) HandleLibGameList(inf api.LibGameListInfo) error { - w.log.Info().Msgf("Oh, lib: %v", inf) + w.SetLib(inf.List) + return nil +} + +func (w *Worker) HandlePrevSessionList(sess api.PrevSessionInfo) error { + if len(sess.List) == 0 { + return nil + } + + m := make(map[string]struct{}) + for _, v := range sess.List { + m[v] = struct{}{} + } + w.SetSessions(m) return nil } diff --git a/pkg/games/library.go b/pkg/games/library.go index d4ef214e5..80572a988 100644 --- a/pkg/games/library.go +++ b/pkg/games/library.go @@ -18,12 +18,13 @@ import ( // libConf is an optimized internal library configuration type libConf struct { - aliasFile string - path string - supported map[string]struct{} - ignored map[string]struct{} - verbose bool - watchMode bool + aliasFile string + path string + supported map[string]struct{} + ignored map[string]struct{} + verbose bool + watchMode bool + sessionPath string } type library struct { @@ -39,6 +40,9 @@ type library struct { games map[string]GameMetadata log *logger.Logger + // ids of saved games to find closed sessions + sessions []string + emuConf WithEmulatorInfo // to restrict parallel execution or throttling @@ -51,12 +55,14 @@ type library struct { type GameLibrary interface { GetAll() []GameMetadata FindGameByName(name string) GameMetadata + Sessions() []string Scan() } type WithEmulatorInfo interface { GetSupportedExtensions() []string GetEmulator(rom string, path string) string + SessionStoragePath() string } type GameMetadata struct { @@ -89,12 +95,13 @@ func NewLib(conf config.Library, emu WithEmulatorInfo, log *logger.Logger) GameL library := &library{ config: libConf{ - aliasFile: conf.AliasFile, - path: dir, - supported: toMap(conf.Supported), - ignored: toMap(conf.Ignored), - verbose: conf.Verbose, - watchMode: conf.WatchMode, + aliasFile: conf.AliasFile, + path: dir, + supported: toMap(conf.Supported), + ignored: toMap(conf.Ignored), + verbose: conf.Verbose, + watchMode: conf.WatchMode, + sessionPath: emu.SessionStoragePath(), }, mu: sync.Mutex{}, games: map[string]GameMetadata{}, @@ -110,6 +117,10 @@ func NewLib(conf config.Library, emu WithEmulatorInfo, log *logger.Logger) GameL return library } +func (lib *library) Sessions() []string { + return lib.sessions +} + func (lib *library) GetAll() []GameMetadata { var res []GameMetadata for _, value := range lib.games { @@ -224,6 +235,20 @@ func (lib *library) Scan() { lib.set(games) } + var sessions []string + dir = lib.config.sessionPath + err = filepath.WalkDir(dir, func(path string, info fs.DirEntry, err error) error { + if err != nil { + return err + } + + if info != nil && !info.IsDir() { + sessions = append(sessions, info.Name()) + } + return nil + }) + lib.sessions = sessions + lib.lastScanDuration = time.Since(start) if lib.config.verbose { lib.dumpLibrary() @@ -336,9 +361,9 @@ func (lib *library) dumpLibrary() { "--------------------------------------------\n"+ "%v"+ "--------------------------------------------\n"+ - "--- ROMs: %03d %26s ---\n"+ + "--- ROMs: %03d --- Saves: %04d %10s ---\n"+ "--------------------------------------------", - gameList.String(), len(lib.games), lib.lastScanDuration) + gameList.String(), len(lib.games), len(lib.sessions), lib.lastScanDuration) } func toMap(list []string) map[string]struct{} { diff --git a/pkg/worker/coordinator.go b/pkg/worker/coordinator.go index 734363874..1dd4c7ef0 100644 --- a/pkg/worker/coordinator.go +++ b/pkg/worker/coordinator.go @@ -151,12 +151,26 @@ func (c *coordinator) IceCandidate(candidate string, sessionId com.Uid) { } func (c *coordinator) SendLibrary(w *Worker) { - games := w.lib.GetAll() + g := w.lib.GetAll() - var gg = make([]api.GameInfo, len(games)) - for i, g := range games { + var gg = make([]api.GameInfo, len(g)) + for i, g := range g { gg[i] = api.GameInfo(g) } c.Notify(api.LibNewGameList, api.LibGameListInfo{T: 1, List: gg}) } + +func (c *coordinator) SendPrevSessions(w *Worker) { + sessions := w.lib.Sessions() + + // extract ids from save states, i.e. sessions + var ids []string + + for _, id := range sessions { + x, _ := api.ExplodeDeepLink(id) + ids = append(ids, x) + } + + c.Notify(api.PrevSessions, api.PrevSessionInfo{List: ids}) +} diff --git a/pkg/worker/coordinatorhandlers.go b/pkg/worker/coordinatorhandlers.go index 120318da0..db0b5ad01 100644 --- a/pkg/worker/coordinatorhandlers.go +++ b/pkg/worker/coordinatorhandlers.go @@ -81,12 +81,31 @@ func (c *coordinator) HandleGameStart(rq api.StartGameRequest[com.Uid], w *Worke r := w.router.FindRoom(rq.Room.Rid) + // +injects game data into the original game request + // the name of the game either in the `room id` field or + // it's in the initial request + gameName := rq.Game + if rq.Room.Rid != "" { + name := w.launcher.ExtractAppNameFromUrl(rq.Room.Rid) + if name == "" { + c.log.Warn().Msg("couldn't decode game name from the room id") + return api.EmptyPacket + } + gameName = name + } + + gameInfo, err := w.launcher.FindAppByName(gameName) + if err != nil { + c.log.Error().Err(err).Send() + return api.EmptyPacket + } + if r == nil { // new room uid := rq.Room.Rid if uid == "" { - uid = games.GenerateRoomID(rq.Game.Name) + uid = games.GenerateRoomID(gameName) } - game := games.GameMetadata(rq.Game) + game := games.GameMetadata(gameInfo) r = room.NewRoom[*room.GameSession](uid, nil, w.router.Users(), nil) r.HandleClose = func() { @@ -108,7 +127,7 @@ func (c *coordinator) HandleGameStart(rq api.StartGameRequest[com.Uid], w *Worke app.SetSessionId(uid) app.SetSaveOnClose(true) app.EnableCloudStorage(uid, w.storage) - app.EnableRecording(rq.Record, rq.RecordUser, rq.Game.Name) + app.EnableRecording(rq.Record, rq.RecordUser, gameName) r.SetApp(app) @@ -140,7 +159,7 @@ func (c *coordinator) HandleGameStart(rq api.StartGameRequest[com.Uid], w *Worke r.Send(data) }) - w.log.Info().Msgf("Starting the game: %v", rq.Game.Name) + w.log.Info().Msgf("Starting the game: %v", gameName) if err := app.Load(game, w.conf.Library.BasePath); err != nil { c.log.Error().Err(err).Msgf("couldn't load the game %v", game) r.Close() diff --git a/pkg/worker/worker.go b/pkg/worker/worker.go index f4333d29d..505a55d2d 100644 --- a/pkg/worker/worker.go +++ b/pkg/worker/worker.go @@ -21,6 +21,7 @@ type Worker struct { conf config.WorkerConfig cord *coordinator lib games.GameLibrary + launcher games.Launcher log *logger.Logger mana *caged.Manager router *room.GameRouter @@ -41,11 +42,12 @@ func New(conf config.WorkerConfig, log *logger.Logger) (*Worker, error) { library.Scan() worker := &Worker{ - conf: conf, - lib: library, - log: log, - mana: manager, - router: room.NewGameRouter(), + conf: conf, + lib: library, + launcher: games.NewGameLauncher(library), + log: log, + mana: manager, + router: room.NewGameRouter(), } h, err := httpx.NewServer( @@ -122,6 +124,7 @@ func (w *Worker) Start(done chan struct{}) { w.cord.log.Info().Msgf("Connected to the coordinator %v", remoteAddr) wait := w.cord.HandleRequests(w) w.cord.SendLibrary(w) + w.cord.SendPrevSessions(w) <-wait retry.SuccessCheck() }