From bacb542cdc91cf92ab73a4566e1c062556244160 Mon Sep 17 00:00:00 2001 From: Roy Xiang Date: Thu, 10 Mar 2022 14:22:12 +0800 Subject: [PATCH 1/8] feat(plaxt): mark a session as watched only if http status 200 received --- common/lock.go | 7 +++++++ common/mutex.go | 4 ++++ handler/main.go | 4 ---- handler/middleware.go | 4 ++-- handler/plex.go | 29 +++++++++++++++++++---------- 5 files changed, 32 insertions(+), 16 deletions(-) diff --git a/common/lock.go b/common/lock.go index b25d267..8016354 100644 --- a/common/lock.go +++ b/common/lock.go @@ -13,6 +13,7 @@ type refCounter struct { type MultipleLock interface { TryLock(interface{}, time.Duration) bool + Lock(interface{}) Unlock(interface{}) } @@ -31,6 +32,12 @@ func (l *lock) TryLock(key interface{}, timeout time.Duration) bool { return isLocked } +func (l *lock) Lock(key interface{}) { + m := l.getLocker(key) + atomic.AddInt64(&m.counter, 1) + m.lock.lock() +} + func (l *lock) Unlock(key interface{}) { m := l.getLocker(key) m.lock.unlock() diff --git a/common/mutex.go b/common/mutex.go index ad63538..dd74df8 100644 --- a/common/mutex.go +++ b/common/mutex.go @@ -24,6 +24,10 @@ func (m *timedMutex) tryLock(timeout time.Duration) bool { return false } +func (m *timedMutex) lock() { + m.c <- struct{}{} +} + func (m *timedMutex) unlock() { <-m.c } diff --git a/handler/main.go b/handler/main.go index 7f4a5ae..ddfff7e 100644 --- a/handler/main.go +++ b/handler/main.go @@ -6,7 +6,6 @@ import ( "os" "sync" - "github.com/RoyXiang/plexproxy/common" "github.com/go-chi/chi/v5/middleware" "github.com/go-redis/redis/v8" "github.com/gorilla/mux" @@ -19,7 +18,6 @@ var ( emptyStruct = struct{}{} mu sync.RWMutex - ml common.MultipleLock ) func init() { @@ -41,8 +39,6 @@ func init() { redisClient = redis.NewClient(options) } } - - ml = common.NewMultipleLock() } func NewRouter() http.Handler { diff --git a/handler/middleware.go b/handler/middleware.go index 9bd42f0..aa0f966 100644 --- a/handler/middleware.go +++ b/handler/middleware.go @@ -84,11 +84,11 @@ func trafficMiddleware(next http.Handler) http.Handler { params.Set(headerRange, rg) } lockKey := fmt.Sprintf("%s?%s", r.URL.EscapedPath(), params.Encode()) - if !ml.TryLock(lockKey, time.Second) { + if !plexClient.MulLock.TryLock(lockKey, time.Second) { w.WriteHeader(http.StatusGatewayTimeout) return } - defer ml.Unlock(lockKey) + defer plexClient.MulLock.Unlock(lockKey) next.ServeHTTP(w, r) }) } diff --git a/handler/plex.go b/handler/plex.go index ff0e2a2..94e5ce7 100644 --- a/handler/plex.go +++ b/handler/plex.go @@ -15,7 +15,6 @@ import ( "strconv" "strings" "sync" - "time" "github.com/RoyXiang/plexproxy/common" "github.com/jrudio/go-plex-client" @@ -44,6 +43,8 @@ type PlexClient struct { friends map[string]plexUser mu sync.RWMutex + + MulLock common.MultipleLock } func NewPlexClient(config PlexConfig) *PlexClient { @@ -90,6 +91,7 @@ func NewPlexClient(config PlexConfig) *PlexClient { sections: make(map[string]plex.Directory, 0), sessions: make(map[string]sessionData), friends: make(map[string]plexUser), + MulLock: common.NewMultipleLock(), } } @@ -282,16 +284,13 @@ func (c *PlexClient) syncTimelineWithPlaxt(r *http.Request, user *plexUser) { return } lockKey := fmt.Sprintf("plex:session:%s", sessionKey) - if ml.TryLock(lockKey, time.Second) { - defer ml.Unlock(lockKey) - } else { - return - } + c.MulLock.Lock(lockKey) + defer c.MulLock.Unlock(lockKey) + session := c.sessions[sessionKey] if session.status == sessionWatched { return } - progress := int(math.Round(float64(viewOffset) / float64(session.metadata.Duration) * 100.0)) if progress == 0 { if session.progress >= watchedThreshold { @@ -352,14 +351,15 @@ func (c *PlexClient) syncTimelineWithPlaxt(r *http.Request, user *plexUser) { session.status = sessionStopped go clearCachedMetadata(ratingKey, user.Id) case webhookEventScrobble: - session.status = sessionWatched go clearCachedMetadata(ratingKey, user.Id) } session.lastEvent = event session.progress = progress shouldUpdate, shouldScrobble := session.Check(c.sessions[sessionKey]) if shouldUpdate { - c.sessions[sessionKey] = session + defer func() { + c.sessions[sessionKey] = session + }() } if !shouldScrobble { return @@ -407,7 +407,16 @@ func (c *PlexClient) syncTimelineWithPlaxt(r *http.Request, user *plexUser) { }, } b, _ := json.Marshal(webhook) - _, _ = c.client.HTTPClient.Post(c.plaxtUrl, "application/json", bytes.NewBuffer(b)) + resp, err := c.client.HTTPClient.Post(c.plaxtUrl, "application/json", bytes.NewBuffer(b)) + if err != nil { + return + } + defer func(Body io.ReadCloser) { + _ = Body.Close() + }(resp.Body) + if event == webhookEventScrobble && resp.StatusCode == http.StatusOK { + session.status = sessionWatched + } } func (c *PlexClient) getServerIdentifier() string { From 3c2588ef82d20fedf8b79392e60970ef53a16be4 Mon Sep 17 00:00:00 2001 From: Roy Xiang Date: Thu, 10 Mar 2022 14:50:01 +0800 Subject: [PATCH 2/8] feat(log): request logs could be turned off --- handler/main.go | 8 +++++--- handler/middleware.go | 10 +++++++--- handler/plex.go | 10 +++++++++- handler/utils.go | 7 +++++++ 4 files changed, 28 insertions(+), 7 deletions(-) diff --git a/handler/main.go b/handler/main.go index ddfff7e..ceb046f 100644 --- a/handler/main.go +++ b/handler/main.go @@ -27,6 +27,7 @@ func init() { PlaxtUrl: os.Getenv("PLAXT_URL"), RedirectWebApp: os.Getenv("REDIRECT_WEB_APP"), DisableTranscode: os.Getenv("DISABLE_TRANSCODE"), + NoRequestLogs: os.Getenv("NO_REQUEST_LOGS"), }) if plexClient == nil { log.Fatalln("Please configure PLEX_BASEURL as a valid URL at first") @@ -44,9 +45,10 @@ func init() { func NewRouter() http.Handler { r := mux.NewRouter() r.Use(normalizeMiddleware) - r.Use(middleware.Logger) - r.Use(middleware.Recoverer) - r.Use(trafficMiddleware) + if !plexClient.NoRequestLogs { + r.Use(middleware.Logger) + } + r.Use(wrapMiddleware, middleware.Recoverer, trafficMiddleware) if redisClient != nil { // bypass cache diff --git a/handler/middleware.go b/handler/middleware.go index aa0f966..54e5d05 100644 --- a/handler/middleware.go +++ b/handler/middleware.go @@ -12,8 +12,6 @@ import ( "strconv" "strings" "time" - - "github.com/go-chi/chi/v5/middleware" ) var ( @@ -74,6 +72,12 @@ func normalizeMiddleware(next http.Handler) http.Handler { }) } +func wrapMiddleware(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + next.ServeHTTP(wrapResponseWriter(w, r.ProtoMajor), r) + }) +} + func trafficMiddleware(next http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { params := r.URL.Query() @@ -148,7 +152,7 @@ func cacheMiddleware(next http.Handler) http.Handler { resp, _ = http.ReadResponse(reader, r) } if resp == nil { - nw := middleware.NewWrapResponseWriter(httptest.NewRecorder(), r.ProtoMajor) + nw := wrapResponseWriter(httptest.NewRecorder(), r.ProtoMajor) next.ServeHTTP(nw, r) resp = nw.Unwrap().(*httptest.ResponseRecorder).Result() defer func() { diff --git a/handler/plex.go b/handler/plex.go index 94e5ce7..d8de64f 100644 --- a/handler/plex.go +++ b/handler/plex.go @@ -27,6 +27,7 @@ type PlexConfig struct { PlaxtUrl string RedirectWebApp string DisableTranscode string + NoRequestLogs string } type PlexClient struct { @@ -36,6 +37,7 @@ type PlexClient struct { plaxtUrl string redirectWebApp bool disableTranscode bool + NoRequestLogs bool serverIdentifier *string sections map[string]plex.Directory @@ -70,7 +72,7 @@ func NewPlexClient(config PlexConfig) *PlexClient { plaxtUrl = u.String() } - var redirectWebApp, disableTranscode bool + var redirectWebApp, disableTranscode, noRequestLogs bool if b, err := strconv.ParseBool(config.RedirectWebApp); err == nil { redirectWebApp = b } else { @@ -81,6 +83,11 @@ func NewPlexClient(config PlexConfig) *PlexClient { } else { disableTranscode = true } + if b, err := strconv.ParseBool(config.NoRequestLogs); err == nil { + noRequestLogs = b + } else { + noRequestLogs = false + } return &PlexClient{ proxy: proxy, @@ -88,6 +95,7 @@ func NewPlexClient(config PlexConfig) *PlexClient { plaxtUrl: plaxtUrl, redirectWebApp: redirectWebApp, disableTranscode: disableTranscode, + NoRequestLogs: noRequestLogs, sections: make(map[string]plex.Directory, 0), sessions: make(map[string]sessionData), friends: make(map[string]plexUser), diff --git a/handler/utils.go b/handler/utils.go index cb51b9a..0832ddb 100644 --- a/handler/utils.go +++ b/handler/utils.go @@ -14,6 +14,13 @@ import ( "github.com/go-chi/chi/v5/middleware" ) +func wrapResponseWriter(w http.ResponseWriter, protoMajor int) middleware.WrapResponseWriter { + if nw, ok := w.(middleware.WrapResponseWriter); ok { + return nw + } + return middleware.NewWrapResponseWriter(w, protoMajor) +} + func modifyResponse(resp *http.Response) error { contentType := resp.Header.Get(headerContentType) if contentType == "" { From d71af7314e9048dec66faac5eb29c37a23662118 Mon Sep 17 00:00:00 2001 From: Roy Xiang Date: Thu, 10 Mar 2022 15:03:47 +0800 Subject: [PATCH 3/8] feat: log more errors --- handler/plex.go | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/handler/plex.go b/handler/plex.go index d8de64f..123b123 100644 --- a/handler/plex.go +++ b/handler/plex.go @@ -243,6 +243,7 @@ func (c *PlexClient) GetSharedServers() *plex.SharedServersResponse { } response, err := c.client.GetSharedServers(identifier) if err != nil { + common.GetLogger().Printf("Failed to get friend list: %s", err.Error()) return nil } return &response @@ -434,6 +435,7 @@ func (c *PlexClient) getServerIdentifier() string { identity, err := c.client.GetServerIdentity() if err != nil { + common.GetLogger().Printf("Failed to get server identifier: %s", err.Error()) return "" } c.serverIdentifier = &identity.MediaContainer.MachineIdentifier @@ -452,6 +454,7 @@ func (c *PlexClient) getLibrarySection(sectionKey string) (isFound bool) { sections, err := c.client.GetLibraries() if err != nil { + common.GetLogger().Printf("Failed to get library sections: %s", err.Error()) return } @@ -478,6 +481,7 @@ func (c *PlexClient) getPlayerSession(playerIdentifier, ratingKey string) (sessi sessions, err := c.client.GetSessions() if err != nil { + common.GetLogger().Printf("Failed to get playback sessions: %s", err.Error()) return } @@ -534,6 +538,7 @@ func (c *PlexClient) getMetadata(ratingKey string) *plex.MediaMetadata { if resp == nil { resp, err = c.client.HTTPClient.Do(req) if err != nil { + common.GetLogger().Printf("Failed to parse metadata of item %s: %s", ratingKey, err.Error()) return nil } } @@ -542,13 +547,18 @@ func (c *PlexClient) getMetadata(ratingKey string) *plex.MediaMetadata { }(resp.Body) if resp.StatusCode != http.StatusOK { + common.GetLogger().Printf("Failed to get metadata of item %s (status: %d)", ratingKey, resp.StatusCode) return nil } else if !isFromCache { writeToCache(cacheKey, resp, cacheTtlMetadata) } var result plex.MediaMetadata - _ = json.NewDecoder(resp.Body).Decode(&result) + err = json.NewDecoder(resp.Body).Decode(&result) + if err != nil { + common.GetLogger().Printf("Failed to parse metadata of item %s: %s", ratingKey, err.Error()) + return nil + } return &result } From 33ff20d1951ef5ae7a51f795c8f0accd33b0495b Mon Sep 17 00:00:00 2001 From: Roy Xiang Date: Thu, 10 Mar 2022 15:28:14 +0800 Subject: [PATCH 4/8] fix(plaxt): ensure a status change on scrobble --- handler/plex.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/handler/plex.go b/handler/plex.go index 123b123..ea1bf5e 100644 --- a/handler/plex.go +++ b/handler/plex.go @@ -356,11 +356,9 @@ func (c *PlexClient) syncTimelineWithPlaxt(r *http.Request, user *plexUser) { session.status = sessionPlaying case webhookEventPause: session.status = sessionPaused - case webhookEventStop: + case webhookEventStop, webhookEventScrobble: session.status = sessionStopped go clearCachedMetadata(ratingKey, user.Id) - case webhookEventScrobble: - go clearCachedMetadata(ratingKey, user.Id) } session.lastEvent = event session.progress = progress From 0da8a0cd063b5850610a3fa06327fc39329c405b Mon Sep 17 00:00:00 2001 From: Roy Xiang Date: Thu, 10 Mar 2022 15:29:47 +0800 Subject: [PATCH 5/8] docs(README): add NO_REQUEST_LOGS --- README.md | 1 + 1 file changed, 1 insertion(+) diff --git a/README.md b/README.md index 7bd7851..3bc02f5 100644 --- a/README.md +++ b/README.md @@ -40,4 +40,5 @@ env CGO_ENABLED=0 go install -trimpath -ldflags="-s -w" github.com/RoyXiang/plex * Or, you can set it to [the official one](https://plaxt.astandke.com/) - `REDIRECT_WEB_APP` (Optional, default: `true`) - `DISABLE_TRANSCODE` (Optional, default: `true`) + - `NO_REQUEST_LOGS` (Optional, default: `false`) 2. Run the program From 4be16b8b3bb892647b37715ebd930cdc5f34f926 Mon Sep 17 00:00:00 2001 From: Roy Xiang Date: Thu, 10 Mar 2022 15:58:26 +0800 Subject: [PATCH 6/8] fix: cannot unmarshal number into Go struct field optimizedForStreaming of type bool --- go.mod | 2 +- go.sum | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index ba283a3..2d1099f 100644 --- a/go.mod +++ b/go.mod @@ -17,4 +17,4 @@ require ( github.com/google/uuid v1.3.0 // indirect ) -replace github.com/jrudio/go-plex-client v0.0.0-20220106065909-9e1d590b99aa => github.com/RoyXiang/go-plex-client v0.0.0-20220310030059-ef5991e7e4e2 +replace github.com/jrudio/go-plex-client v0.0.0-20220106065909-9e1d590b99aa => github.com/RoyXiang/go-plex-client v0.0.0-20220310075632-2e318838193f diff --git a/go.sum b/go.sum index fc5bdfc..eb51076 100644 --- a/go.sum +++ b/go.sum @@ -1,8 +1,8 @@ cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= -github.com/RoyXiang/go-plex-client v0.0.0-20220310030059-ef5991e7e4e2 h1:CV52ZCM7Xjtk3524V31RoX3mRvBHa2PZL6YZDOVb87U= -github.com/RoyXiang/go-plex-client v0.0.0-20220310030059-ef5991e7e4e2/go.mod h1:twidbPLE4eUk3CgDno5uCzpnPRboBTElH+iJrQO7S4w= +github.com/RoyXiang/go-plex-client v0.0.0-20220310075632-2e318838193f h1:vUNGSKhuDTgEqQOmx/mpzR00IPXw7pCbtWha3IlsQyA= +github.com/RoyXiang/go-plex-client v0.0.0-20220310075632-2e318838193f/go.mod h1:twidbPLE4eUk3CgDno5uCzpnPRboBTElH+iJrQO7S4w= github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8= github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko= github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc= From 2c538a2521dd802d1c6da11acd3aa84561070a3f Mon Sep 17 00:00:00 2001 From: Roy Xiang Date: Thu, 10 Mar 2022 16:56:14 +0800 Subject: [PATCH 7/8] fix: unmarshal user info from cache correctly --- handler/const.go | 4 ++++ handler/plex.go | 34 +++++++++++++++++++++++----------- 2 files changed, 27 insertions(+), 11 deletions(-) diff --git a/handler/const.go b/handler/const.go index 29d29cb..e6d8976 100644 --- a/handler/const.go +++ b/handler/const.go @@ -34,6 +34,10 @@ const ( contentTypeAny = "*/*" contentTypeXml = "xml" + lockKeyFriends = "plex:friends" + lockKeySections = "plex:library:sections" + lockKeySessions = "plex:playback:sessions" + watchedThreshold = 90 webhookEventPlay = "media.play" diff --git a/handler/plex.go b/handler/plex.go index ea1bf5e..b8a4a19 100644 --- a/handler/plex.go +++ b/handler/plex.go @@ -181,10 +181,9 @@ func (c *PlexClient) SubscribeToNotifications(events *plex.NotificationEvents, i c.client.SubscribeToNotifications(events, interrupt, fn) } -func (c *PlexClient) GetUser(token string) (user *plexUser) { +func (c *PlexClient) GetUser(token string) *plexUser { if realUser, ok := c.friends[token]; ok { - user = &realUser - return + return &realUser } var err error @@ -193,17 +192,22 @@ func (c *PlexClient) GetUser(token string) (user *plexUser) { isCacheEnabled := redisClient != nil if isCacheEnabled { - err = redisClient.Get(ctx, cacheKey).Scan(user) + var result plexUser + err = redisClient.Get(ctx, cacheKey).Scan(&result) if err == nil { - c.friends[token] = *user - return + c.friends[token] = result + return &result } } + c.MulLock.Lock(lockKeyFriends) + defer c.MulLock.Unlock(lockKeyFriends) + response := c.GetSharedServers() if response == nil { - return + return nil } + var user *plexUser for _, friend := range response.Friends { realUser := plexUser{ Id: friend.UserId, @@ -219,7 +223,7 @@ func (c *PlexClient) GetUser(token string) (user *plexUser) { } } if user != nil { - return + return user } info := c.GetAccountInfo(token) @@ -230,7 +234,7 @@ func (c *PlexClient) GetUser(token string) (user *plexUser) { redisClient.Set(ctx, cacheKey, user, 0) } } - return + return user } func (c *PlexClient) GetSharedServers() *plex.SharedServersResponse { @@ -447,8 +451,12 @@ func (c *PlexClient) getLibrarySection(sectionKey string) (isFound bool) { return } + c.MulLock.Lock(lockKeySections) c.mu.RLock() - defer c.mu.RUnlock() + defer func() { + c.mu.RUnlock() + c.MulLock.Unlock(lockKeySections) + }() sections, err := c.client.GetLibraries() if err != nil { @@ -474,8 +482,12 @@ func (c *PlexClient) getPlayerSession(playerIdentifier, ratingKey string) (sessi } } + c.MulLock.Lock(lockKeySessions) c.mu.RLock() - defer c.mu.RUnlock() + defer func() { + c.mu.RUnlock() + c.MulLock.Unlock(lockKeySessions) + }() sessions, err := c.client.GetSessions() if err != nil { From 2fb175c3eef3bdc0f72ab63199ffa829280a0fa5 Mon Sep 17 00:00:00 2001 From: Roy Xiang Date: Thu, 10 Mar 2022 17:58:05 +0800 Subject: [PATCH 8/8] feat: put user info into the context of requests --- handler/middleware.go | 16 +++++++++------- handler/plex.go | 28 +++++++++++++--------------- 2 files changed, 22 insertions(+), 22 deletions(-) diff --git a/handler/middleware.go b/handler/middleware.go index 54e5d05..bf6b378 100644 --- a/handler/middleware.go +++ b/handler/middleware.go @@ -16,6 +16,7 @@ import ( var ( cacheInfoCtxKey = &ctxKeyType{"cacheInfo"} + userCtxKey = &ctxKeyType{"user"} ) func normalizeMiddleware(next http.Handler) http.Handler { @@ -74,6 +75,11 @@ func normalizeMiddleware(next http.Handler) http.Handler { func wrapMiddleware(next http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if token := r.Header.Get(headerToken); token != "" { + if user := plexClient.GetUser(token); user != nil { + r = r.WithContext(context.WithValue(r.Context(), userCtxKey, user)) + } + } next.ServeHTTP(wrapResponseWriter(w, r.ProtoMajor), r) }) } @@ -185,16 +191,12 @@ func cacheMiddleware(next http.Handler) http.Handler { case cachePrefixStatic: break case cachePrefixDynamic, cachePrefixMetadata: - token := r.Header.Get(headerToken) - if token == "" { - return - } - user := plexClient.GetUser(token) + user := r.Context().Value(userCtxKey) if user != nil { - params.Set(headerUserId, strconv.Itoa(user.Id)) + params.Set(headerUserId, strconv.Itoa(user.(*plexUser).Id)) params.Set(headerAccept, getAcceptContentType(r)) } else { - params.Set(headerToken, token) + return } default: // invalid prefix diff --git a/handler/plex.go b/handler/plex.go index b8a4a19..0062dd6 100644 --- a/handler/plex.go +++ b/handler/plex.go @@ -137,21 +137,19 @@ func (c *PlexClient) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } - if token := r.Header.Get(headerToken); token != "" { - // If it is an authorized request - if user := c.GetUser(token); user != nil { - switch path { - case "/:/scrobble", "/:/unscrobble": - ratingKey := r.URL.Query().Get("key") - if ratingKey != "" { - go clearCachedMetadata(ratingKey, user.Id) - } - case "/:/timeline": - go c.syncTimelineWithPlaxt(r, user) - case "/video/:/transcode/universal/decision": - if c.disableTranscode { - r = c.disableTranscoding(r) - } + // If it is an authorized request + if user := r.Context().Value(userCtxKey); user != nil { + switch path { + case "/:/scrobble", "/:/unscrobble": + ratingKey := r.URL.Query().Get("key") + if ratingKey != "" { + go clearCachedMetadata(ratingKey, user.(*plexUser).Id) + } + case "/:/timeline": + go c.syncTimelineWithPlaxt(r, user.(*plexUser)) + case "/video/:/transcode/universal/decision": + if c.disableTranscode { + r = c.disableTranscoding(r) } } }