Skip to content

Commit

Permalink
feat(plaxt): mark a session as watched only if http status 200 received
Browse files Browse the repository at this point in the history
  • Loading branch information
RoyXiang committed Mar 10, 2022
1 parent 5d769e3 commit bacb542
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 16 deletions.
7 changes: 7 additions & 0 deletions common/lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ type refCounter struct {

type MultipleLock interface {
TryLock(interface{}, time.Duration) bool
Lock(interface{})
Unlock(interface{})
}

Expand All @@ -31,6 +32,12 @@ func (l *lock) TryLock(key interface{}, timeout time.Duration) bool {
return isLocked
}

func (l *lock) Lock(key interface{}) {
m := l.getLocker(key)
atomic.AddInt64(&m.counter, 1)
m.lock.lock()
}

func (l *lock) Unlock(key interface{}) {
m := l.getLocker(key)
m.lock.unlock()
Expand Down
4 changes: 4 additions & 0 deletions common/mutex.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ func (m *timedMutex) tryLock(timeout time.Duration) bool {
return false
}

func (m *timedMutex) lock() {
m.c <- struct{}{}
}

func (m *timedMutex) unlock() {
<-m.c
}
4 changes: 0 additions & 4 deletions handler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"os"
"sync"

"github.com/RoyXiang/plexproxy/common"
"github.com/go-chi/chi/v5/middleware"
"github.com/go-redis/redis/v8"
"github.com/gorilla/mux"
Expand All @@ -19,7 +18,6 @@ var (
emptyStruct = struct{}{}

mu sync.RWMutex
ml common.MultipleLock
)

func init() {
Expand All @@ -41,8 +39,6 @@ func init() {
redisClient = redis.NewClient(options)
}
}

ml = common.NewMultipleLock()
}

func NewRouter() http.Handler {
Expand Down
4 changes: 2 additions & 2 deletions handler/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,11 +84,11 @@ func trafficMiddleware(next http.Handler) http.Handler {
params.Set(headerRange, rg)
}
lockKey := fmt.Sprintf("%s?%s", r.URL.EscapedPath(), params.Encode())
if !ml.TryLock(lockKey, time.Second) {
if !plexClient.MulLock.TryLock(lockKey, time.Second) {
w.WriteHeader(http.StatusGatewayTimeout)
return
}
defer ml.Unlock(lockKey)
defer plexClient.MulLock.Unlock(lockKey)
next.ServeHTTP(w, r)
})
}
Expand Down
29 changes: 19 additions & 10 deletions handler/plex.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"strconv"
"strings"
"sync"
"time"

"github.com/RoyXiang/plexproxy/common"
"github.com/jrudio/go-plex-client"
Expand Down Expand Up @@ -44,6 +43,8 @@ type PlexClient struct {
friends map[string]plexUser

mu sync.RWMutex

MulLock common.MultipleLock
}

func NewPlexClient(config PlexConfig) *PlexClient {
Expand Down Expand Up @@ -90,6 +91,7 @@ func NewPlexClient(config PlexConfig) *PlexClient {
sections: make(map[string]plex.Directory, 0),
sessions: make(map[string]sessionData),
friends: make(map[string]plexUser),
MulLock: common.NewMultipleLock(),
}
}

Expand Down Expand Up @@ -282,16 +284,13 @@ func (c *PlexClient) syncTimelineWithPlaxt(r *http.Request, user *plexUser) {
return
}
lockKey := fmt.Sprintf("plex:session:%s", sessionKey)
if ml.TryLock(lockKey, time.Second) {
defer ml.Unlock(lockKey)
} else {
return
}
c.MulLock.Lock(lockKey)
defer c.MulLock.Unlock(lockKey)

session := c.sessions[sessionKey]
if session.status == sessionWatched {
return
}

progress := int(math.Round(float64(viewOffset) / float64(session.metadata.Duration) * 100.0))
if progress == 0 {
if session.progress >= watchedThreshold {
Expand Down Expand Up @@ -352,14 +351,15 @@ func (c *PlexClient) syncTimelineWithPlaxt(r *http.Request, user *plexUser) {
session.status = sessionStopped
go clearCachedMetadata(ratingKey, user.Id)
case webhookEventScrobble:
session.status = sessionWatched
go clearCachedMetadata(ratingKey, user.Id)
}
session.lastEvent = event
session.progress = progress
shouldUpdate, shouldScrobble := session.Check(c.sessions[sessionKey])
if shouldUpdate {
c.sessions[sessionKey] = session
defer func() {
c.sessions[sessionKey] = session
}()
}
if !shouldScrobble {
return
Expand Down Expand Up @@ -407,7 +407,16 @@ func (c *PlexClient) syncTimelineWithPlaxt(r *http.Request, user *plexUser) {
},
}
b, _ := json.Marshal(webhook)
_, _ = c.client.HTTPClient.Post(c.plaxtUrl, "application/json", bytes.NewBuffer(b))
resp, err := c.client.HTTPClient.Post(c.plaxtUrl, "application/json", bytes.NewBuffer(b))
if err != nil {
return
}
defer func(Body io.ReadCloser) {
_ = Body.Close()
}(resp.Body)
if event == webhookEventScrobble && resp.StatusCode == http.StatusOK {
session.status = sessionWatched
}
}

func (c *PlexClient) getServerIdentifier() string {
Expand Down

0 comments on commit bacb542

Please sign in to comment.