diff --git a/README.md b/README.md index 7bd7851..3bc02f5 100644 --- a/README.md +++ b/README.md @@ -40,4 +40,5 @@ env CGO_ENABLED=0 go install -trimpath -ldflags="-s -w" github.com/RoyXiang/plex * Or, you can set it to [the official one](https://plaxt.astandke.com/) - `REDIRECT_WEB_APP` (Optional, default: `true`) - `DISABLE_TRANSCODE` (Optional, default: `true`) + - `NO_REQUEST_LOGS` (Optional, default: `false`) 2. Run the program diff --git a/common/lock.go b/common/lock.go index b25d267..8016354 100644 --- a/common/lock.go +++ b/common/lock.go @@ -13,6 +13,7 @@ type refCounter struct { type MultipleLock interface { TryLock(interface{}, time.Duration) bool + Lock(interface{}) Unlock(interface{}) } @@ -31,6 +32,12 @@ func (l *lock) TryLock(key interface{}, timeout time.Duration) bool { return isLocked } +func (l *lock) Lock(key interface{}) { + m := l.getLocker(key) + atomic.AddInt64(&m.counter, 1) + m.lock.lock() +} + func (l *lock) Unlock(key interface{}) { m := l.getLocker(key) m.lock.unlock() diff --git a/common/mutex.go b/common/mutex.go index ad63538..dd74df8 100644 --- a/common/mutex.go +++ b/common/mutex.go @@ -24,6 +24,10 @@ func (m *timedMutex) tryLock(timeout time.Duration) bool { return false } +func (m *timedMutex) lock() { + m.c <- struct{}{} +} + func (m *timedMutex) unlock() { <-m.c } diff --git a/go.mod b/go.mod index ba283a3..2d1099f 100644 --- a/go.mod +++ b/go.mod @@ -17,4 +17,4 @@ require ( github.com/google/uuid v1.3.0 // indirect ) -replace github.com/jrudio/go-plex-client v0.0.0-20220106065909-9e1d590b99aa => github.com/RoyXiang/go-plex-client v0.0.0-20220310030059-ef5991e7e4e2 +replace github.com/jrudio/go-plex-client v0.0.0-20220106065909-9e1d590b99aa => github.com/RoyXiang/go-plex-client v0.0.0-20220310075632-2e318838193f diff --git a/go.sum b/go.sum index fc5bdfc..eb51076 100644 --- a/go.sum +++ b/go.sum @@ -1,8 +1,8 @@ cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= -github.com/RoyXiang/go-plex-client v0.0.0-20220310030059-ef5991e7e4e2 h1:CV52ZCM7Xjtk3524V31RoX3mRvBHa2PZL6YZDOVb87U= -github.com/RoyXiang/go-plex-client v0.0.0-20220310030059-ef5991e7e4e2/go.mod h1:twidbPLE4eUk3CgDno5uCzpnPRboBTElH+iJrQO7S4w= +github.com/RoyXiang/go-plex-client v0.0.0-20220310075632-2e318838193f h1:vUNGSKhuDTgEqQOmx/mpzR00IPXw7pCbtWha3IlsQyA= +github.com/RoyXiang/go-plex-client v0.0.0-20220310075632-2e318838193f/go.mod h1:twidbPLE4eUk3CgDno5uCzpnPRboBTElH+iJrQO7S4w= github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8= github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko= github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc= diff --git a/handler/const.go b/handler/const.go index 29d29cb..e6d8976 100644 --- a/handler/const.go +++ b/handler/const.go @@ -34,6 +34,10 @@ const ( contentTypeAny = "*/*" contentTypeXml = "xml" + lockKeyFriends = "plex:friends" + lockKeySections = "plex:library:sections" + lockKeySessions = "plex:playback:sessions" + watchedThreshold = 90 webhookEventPlay = "media.play" diff --git a/handler/main.go b/handler/main.go index 7f4a5ae..ceb046f 100644 --- a/handler/main.go +++ b/handler/main.go @@ -6,7 +6,6 @@ import ( "os" "sync" - "github.com/RoyXiang/plexproxy/common" "github.com/go-chi/chi/v5/middleware" "github.com/go-redis/redis/v8" "github.com/gorilla/mux" @@ -19,7 +18,6 @@ var ( emptyStruct = struct{}{} mu sync.RWMutex - ml common.MultipleLock ) func init() { @@ -29,6 +27,7 @@ func init() { PlaxtUrl: os.Getenv("PLAXT_URL"), RedirectWebApp: os.Getenv("REDIRECT_WEB_APP"), DisableTranscode: os.Getenv("DISABLE_TRANSCODE"), + NoRequestLogs: os.Getenv("NO_REQUEST_LOGS"), }) if plexClient == nil { log.Fatalln("Please configure PLEX_BASEURL as a valid URL at first") @@ -41,16 +40,15 @@ func init() { redisClient = redis.NewClient(options) } } - - ml = common.NewMultipleLock() } func NewRouter() http.Handler { r := mux.NewRouter() r.Use(normalizeMiddleware) - r.Use(middleware.Logger) - r.Use(middleware.Recoverer) - r.Use(trafficMiddleware) + if !plexClient.NoRequestLogs { + r.Use(middleware.Logger) + } + r.Use(wrapMiddleware, middleware.Recoverer, trafficMiddleware) if redisClient != nil { // bypass cache diff --git a/handler/middleware.go b/handler/middleware.go index 9bd42f0..bf6b378 100644 --- a/handler/middleware.go +++ b/handler/middleware.go @@ -12,12 +12,11 @@ import ( "strconv" "strings" "time" - - "github.com/go-chi/chi/v5/middleware" ) var ( cacheInfoCtxKey = &ctxKeyType{"cacheInfo"} + userCtxKey = &ctxKeyType{"user"} ) func normalizeMiddleware(next http.Handler) http.Handler { @@ -74,6 +73,17 @@ func normalizeMiddleware(next http.Handler) http.Handler { }) } +func wrapMiddleware(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if token := r.Header.Get(headerToken); token != "" { + if user := plexClient.GetUser(token); user != nil { + r = r.WithContext(context.WithValue(r.Context(), userCtxKey, user)) + } + } + next.ServeHTTP(wrapResponseWriter(w, r.ProtoMajor), r) + }) +} + func trafficMiddleware(next http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { params := r.URL.Query() @@ -84,11 +94,11 @@ func trafficMiddleware(next http.Handler) http.Handler { params.Set(headerRange, rg) } lockKey := fmt.Sprintf("%s?%s", r.URL.EscapedPath(), params.Encode()) - if !ml.TryLock(lockKey, time.Second) { + if !plexClient.MulLock.TryLock(lockKey, time.Second) { w.WriteHeader(http.StatusGatewayTimeout) return } - defer ml.Unlock(lockKey) + defer plexClient.MulLock.Unlock(lockKey) next.ServeHTTP(w, r) }) } @@ -148,7 +158,7 @@ func cacheMiddleware(next http.Handler) http.Handler { resp, _ = http.ReadResponse(reader, r) } if resp == nil { - nw := middleware.NewWrapResponseWriter(httptest.NewRecorder(), r.ProtoMajor) + nw := wrapResponseWriter(httptest.NewRecorder(), r.ProtoMajor) next.ServeHTTP(nw, r) resp = nw.Unwrap().(*httptest.ResponseRecorder).Result() defer func() { @@ -181,16 +191,12 @@ func cacheMiddleware(next http.Handler) http.Handler { case cachePrefixStatic: break case cachePrefixDynamic, cachePrefixMetadata: - token := r.Header.Get(headerToken) - if token == "" { - return - } - user := plexClient.GetUser(token) + user := r.Context().Value(userCtxKey) if user != nil { - params.Set(headerUserId, strconv.Itoa(user.Id)) + params.Set(headerUserId, strconv.Itoa(user.(*plexUser).Id)) params.Set(headerAccept, getAcceptContentType(r)) } else { - params.Set(headerToken, token) + return } default: // invalid prefix diff --git a/handler/plex.go b/handler/plex.go index ff0e2a2..0062dd6 100644 --- a/handler/plex.go +++ b/handler/plex.go @@ -15,7 +15,6 @@ import ( "strconv" "strings" "sync" - "time" "github.com/RoyXiang/plexproxy/common" "github.com/jrudio/go-plex-client" @@ -28,6 +27,7 @@ type PlexConfig struct { PlaxtUrl string RedirectWebApp string DisableTranscode string + NoRequestLogs string } type PlexClient struct { @@ -37,6 +37,7 @@ type PlexClient struct { plaxtUrl string redirectWebApp bool disableTranscode bool + NoRequestLogs bool serverIdentifier *string sections map[string]plex.Directory @@ -44,6 +45,8 @@ type PlexClient struct { friends map[string]plexUser mu sync.RWMutex + + MulLock common.MultipleLock } func NewPlexClient(config PlexConfig) *PlexClient { @@ -69,7 +72,7 @@ func NewPlexClient(config PlexConfig) *PlexClient { plaxtUrl = u.String() } - var redirectWebApp, disableTranscode bool + var redirectWebApp, disableTranscode, noRequestLogs bool if b, err := strconv.ParseBool(config.RedirectWebApp); err == nil { redirectWebApp = b } else { @@ -80,6 +83,11 @@ func NewPlexClient(config PlexConfig) *PlexClient { } else { disableTranscode = true } + if b, err := strconv.ParseBool(config.NoRequestLogs); err == nil { + noRequestLogs = b + } else { + noRequestLogs = false + } return &PlexClient{ proxy: proxy, @@ -87,9 +95,11 @@ func NewPlexClient(config PlexConfig) *PlexClient { plaxtUrl: plaxtUrl, redirectWebApp: redirectWebApp, disableTranscode: disableTranscode, + NoRequestLogs: noRequestLogs, sections: make(map[string]plex.Directory, 0), sessions: make(map[string]sessionData), friends: make(map[string]plexUser), + MulLock: common.NewMultipleLock(), } } @@ -127,21 +137,19 @@ func (c *PlexClient) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } - if token := r.Header.Get(headerToken); token != "" { - // If it is an authorized request - if user := c.GetUser(token); user != nil { - switch path { - case "/:/scrobble", "/:/unscrobble": - ratingKey := r.URL.Query().Get("key") - if ratingKey != "" { - go clearCachedMetadata(ratingKey, user.Id) - } - case "/:/timeline": - go c.syncTimelineWithPlaxt(r, user) - case "/video/:/transcode/universal/decision": - if c.disableTranscode { - r = c.disableTranscoding(r) - } + // If it is an authorized request + if user := r.Context().Value(userCtxKey); user != nil { + switch path { + case "/:/scrobble", "/:/unscrobble": + ratingKey := r.URL.Query().Get("key") + if ratingKey != "" { + go clearCachedMetadata(ratingKey, user.(*plexUser).Id) + } + case "/:/timeline": + go c.syncTimelineWithPlaxt(r, user.(*plexUser)) + case "/video/:/transcode/universal/decision": + if c.disableTranscode { + r = c.disableTranscoding(r) } } } @@ -171,10 +179,9 @@ func (c *PlexClient) SubscribeToNotifications(events *plex.NotificationEvents, i c.client.SubscribeToNotifications(events, interrupt, fn) } -func (c *PlexClient) GetUser(token string) (user *plexUser) { +func (c *PlexClient) GetUser(token string) *plexUser { if realUser, ok := c.friends[token]; ok { - user = &realUser - return + return &realUser } var err error @@ -183,17 +190,22 @@ func (c *PlexClient) GetUser(token string) (user *plexUser) { isCacheEnabled := redisClient != nil if isCacheEnabled { - err = redisClient.Get(ctx, cacheKey).Scan(user) + var result plexUser + err = redisClient.Get(ctx, cacheKey).Scan(&result) if err == nil { - c.friends[token] = *user - return + c.friends[token] = result + return &result } } + c.MulLock.Lock(lockKeyFriends) + defer c.MulLock.Unlock(lockKeyFriends) + response := c.GetSharedServers() if response == nil { - return + return nil } + var user *plexUser for _, friend := range response.Friends { realUser := plexUser{ Id: friend.UserId, @@ -209,7 +221,7 @@ func (c *PlexClient) GetUser(token string) (user *plexUser) { } } if user != nil { - return + return user } info := c.GetAccountInfo(token) @@ -220,7 +232,7 @@ func (c *PlexClient) GetUser(token string) (user *plexUser) { redisClient.Set(ctx, cacheKey, user, 0) } } - return + return user } func (c *PlexClient) GetSharedServers() *plex.SharedServersResponse { @@ -233,6 +245,7 @@ func (c *PlexClient) GetSharedServers() *plex.SharedServersResponse { } response, err := c.client.GetSharedServers(identifier) if err != nil { + common.GetLogger().Printf("Failed to get friend list: %s", err.Error()) return nil } return &response @@ -282,16 +295,13 @@ func (c *PlexClient) syncTimelineWithPlaxt(r *http.Request, user *plexUser) { return } lockKey := fmt.Sprintf("plex:session:%s", sessionKey) - if ml.TryLock(lockKey, time.Second) { - defer ml.Unlock(lockKey) - } else { - return - } + c.MulLock.Lock(lockKey) + defer c.MulLock.Unlock(lockKey) + session := c.sessions[sessionKey] if session.status == sessionWatched { return } - progress := int(math.Round(float64(viewOffset) / float64(session.metadata.Duration) * 100.0)) if progress == 0 { if session.progress >= watchedThreshold { @@ -348,18 +358,17 @@ func (c *PlexClient) syncTimelineWithPlaxt(r *http.Request, user *plexUser) { session.status = sessionPlaying case webhookEventPause: session.status = sessionPaused - case webhookEventStop: + case webhookEventStop, webhookEventScrobble: session.status = sessionStopped go clearCachedMetadata(ratingKey, user.Id) - case webhookEventScrobble: - session.status = sessionWatched - go clearCachedMetadata(ratingKey, user.Id) } session.lastEvent = event session.progress = progress shouldUpdate, shouldScrobble := session.Check(c.sessions[sessionKey]) if shouldUpdate { - c.sessions[sessionKey] = session + defer func() { + c.sessions[sessionKey] = session + }() } if !shouldScrobble { return @@ -407,7 +416,16 @@ func (c *PlexClient) syncTimelineWithPlaxt(r *http.Request, user *plexUser) { }, } b, _ := json.Marshal(webhook) - _, _ = c.client.HTTPClient.Post(c.plaxtUrl, "application/json", bytes.NewBuffer(b)) + resp, err := c.client.HTTPClient.Post(c.plaxtUrl, "application/json", bytes.NewBuffer(b)) + if err != nil { + return + } + defer func(Body io.ReadCloser) { + _ = Body.Close() + }(resp.Body) + if event == webhookEventScrobble && resp.StatusCode == http.StatusOK { + session.status = sessionWatched + } } func (c *PlexClient) getServerIdentifier() string { @@ -417,6 +435,7 @@ func (c *PlexClient) getServerIdentifier() string { identity, err := c.client.GetServerIdentity() if err != nil { + common.GetLogger().Printf("Failed to get server identifier: %s", err.Error()) return "" } c.serverIdentifier = &identity.MediaContainer.MachineIdentifier @@ -430,11 +449,16 @@ func (c *PlexClient) getLibrarySection(sectionKey string) (isFound bool) { return } + c.MulLock.Lock(lockKeySections) c.mu.RLock() - defer c.mu.RUnlock() + defer func() { + c.mu.RUnlock() + c.MulLock.Unlock(lockKeySections) + }() sections, err := c.client.GetLibraries() if err != nil { + common.GetLogger().Printf("Failed to get library sections: %s", err.Error()) return } @@ -456,11 +480,16 @@ func (c *PlexClient) getPlayerSession(playerIdentifier, ratingKey string) (sessi } } + c.MulLock.Lock(lockKeySessions) c.mu.RLock() - defer c.mu.RUnlock() + defer func() { + c.mu.RUnlock() + c.MulLock.Unlock(lockKeySessions) + }() sessions, err := c.client.GetSessions() if err != nil { + common.GetLogger().Printf("Failed to get playback sessions: %s", err.Error()) return } @@ -517,6 +546,7 @@ func (c *PlexClient) getMetadata(ratingKey string) *plex.MediaMetadata { if resp == nil { resp, err = c.client.HTTPClient.Do(req) if err != nil { + common.GetLogger().Printf("Failed to parse metadata of item %s: %s", ratingKey, err.Error()) return nil } } @@ -525,13 +555,18 @@ func (c *PlexClient) getMetadata(ratingKey string) *plex.MediaMetadata { }(resp.Body) if resp.StatusCode != http.StatusOK { + common.GetLogger().Printf("Failed to get metadata of item %s (status: %d)", ratingKey, resp.StatusCode) return nil } else if !isFromCache { writeToCache(cacheKey, resp, cacheTtlMetadata) } var result plex.MediaMetadata - _ = json.NewDecoder(resp.Body).Decode(&result) + err = json.NewDecoder(resp.Body).Decode(&result) + if err != nil { + common.GetLogger().Printf("Failed to parse metadata of item %s: %s", ratingKey, err.Error()) + return nil + } return &result } diff --git a/handler/utils.go b/handler/utils.go index cb51b9a..0832ddb 100644 --- a/handler/utils.go +++ b/handler/utils.go @@ -14,6 +14,13 @@ import ( "github.com/go-chi/chi/v5/middleware" ) +func wrapResponseWriter(w http.ResponseWriter, protoMajor int) middleware.WrapResponseWriter { + if nw, ok := w.(middleware.WrapResponseWriter); ok { + return nw + } + return middleware.NewWrapResponseWriter(w, protoMajor) +} + func modifyResponse(resp *http.Response) error { contentType := resp.Header.Get(headerContentType) if contentType == "" {