diff --git a/pkg/locks/locks.go b/pkg/locks/locks.go index b3330a036..d8d7b8050 100644 --- a/pkg/locks/locks.go +++ b/pkg/locks/locks.go @@ -29,14 +29,13 @@ type NamedLocker interface { type namedLocker struct { locks map[string]*namedLock - mapLock *sync.Mutex + mapLock namedLock } // NewNamedLocker returns a new Named Locker func NewNamedLocker() NamedLocker { return &namedLocker{ - locks: make(map[string]*namedLock), - mapLock: &sync.Mutex{}, + locks: make(map[string]*namedLock), } } @@ -44,148 +43,103 @@ func NewNamedLocker() NamedLocker { type NamedLock interface { Release() error RRelease() error - Upgrade() (NamedLock, error) - WriteLockCounter() int - WriteLockMode() bool + Upgrade() bool } func newNamedLock(name string, locker *namedLocker) *namedLock { return &namedLock{ - name: name, - RWMutex: &sync.RWMutex{}, - locker: locker, + name: name, + locker: locker, } } type namedLock struct { - *sync.RWMutex - name string - queueSize int32 - writeLockMode int32 - writeLockCount int - locker *namedLocker + sync.RWMutex + name string + queueSize int32 + locker *namedLocker + subsequentWriter bool } -// Release releases the write lock on the subject Named Lock -func (nl *namedLock) Release() error { - - if nl.name == "" { - return errInvalidLockName(nl.name) - } - +func (nl *namedLock) release(unlockFunc func()) { qs := atomic.AddInt32(&nl.queueSize, -1) if qs == 0 { nl.locker.mapLock.Lock() - delete(nl.locker.locks, nl.name) + // recheck queue size after getting the lock since another client + // might have joined since the map lock was acquired + if nl.queueSize == 0 { + delete(nl.locker.locks, nl.name) + } nl.locker.mapLock.Unlock() } + unlockFunc() +} - atomic.AddInt32(&nl.writeLockMode, -1) - nl.Unlock() +// Release releases the write lock on the subject Named Lock +func (nl *namedLock) Release() error { + nl.release(nl.Unlock) return nil } // RRelease releases the read lock on the subject Named Lock func (nl *namedLock) RRelease() error { - - if nl.name == "" { - return errInvalidLockName(nl.name) - } - - qs := atomic.AddInt32(&nl.queueSize, -1) - if qs == 0 { - nl.locker.mapLock.Lock() - delete(nl.locker.locks, nl.name) - nl.locker.mapLock.Unlock() - } - - nl.RUnlock() + nl.release(nl.RUnlock) return nil } -// WriteLockCounter returns the number of write locks acquired by the namedLock -// This function should only be called by a goroutine actively holding a write lock, -// as it is otherwise not atomic -func (nl *namedLock) WriteLockCounter() int { - return nl.writeLockCount -} - -// WriteLockMode returns true if a caller is waiting for a write lock -func (nl *namedLock) WriteLockMode() bool { - return atomic.LoadInt32(&nl.writeLockMode) > 0 -} - -// Upgrade will upgrade the current read-lock to a write lock without losing the reference to the -// underlying sync map, enabling goroutines, after receiving a write lock, to know how many other -// goroutines acquired a write lock (naturally or upgraded) during the time this routine released -// it's read lock and got a write lock. This helps the receiver of the write lock know if any extra -// state checks are required (e.g., re-querying a cache that might have changed) before proceeding. -func (nl *namedLock) Upgrade() (NamedLock, error) { - - ch := make(chan bool, 1) - wg := &sync.WaitGroup{} - wg.Add(1) - go func() { - atomic.AddInt32(&nl.queueSize, 1) - ch <- true - atomic.AddInt32(&nl.writeLockMode, 1) - nl.Lock() - nl.writeLockCount++ - wg.Done() - }() - - // once we know the write lock queueSize is incremented, we can release our read lock - <-ch - close(ch) - nl.RRelease() - - // wait until write mode is set, read lock is released, and write lock is acquired - wg.Wait() - - return nl, nil +// Upgrade will upgrade the current read lock to a write lock. This method will +// always succeed unless a read lock did not already exist, which panics like a +// normal mutex. the return value indicates whether the requesting goroutine was +// first to receive a lock, and will be false when multiple goroutines upgraded +// concurrently and the caller was not the first in the queue to receive it. +// This helps the caller know if any extra state checks are required +// (e.g., re-querying a cache that might have changed) before proceeding. +func (nl *namedLock) Upgrade() bool { + nl.RUnlock() + nl.Lock() + if nl.subsequentWriter { + return false + } + nl.subsequentWriter = true + return true } -// Acquire locks the named lock for writing, and blocks until the wlock is acquired -func (lk *namedLocker) Acquire(lockName string) (NamedLock, error) { +func (lk *namedLocker) acquire(lockName string, isWrite bool) (NamedLock, error) { if lockName == "" { return nil, errInvalidLockName(lockName) } - - lk.mapLock.Lock() + lk.mapLock.RLock() nl, ok := lk.locks[lockName] + mapUnlockFunc := lk.mapLock.RUnlock if !ok { - nl = newNamedLock(lockName, lk) + mapUnlockFunc = lk.mapLock.Unlock + lk.mapLock.Upgrade() + // check again in case we weren't the first to upgrade + nl, ok = lk.locks[lockName] + if !ok { + nl = newNamedLock(lockName, lk) + } lk.locks[lockName] = nl } atomic.AddInt32(&nl.queueSize, 1) - lk.mapLock.Unlock() - atomic.AddInt32(&nl.writeLockMode, 1) + mapUnlockFunc() - nl.Lock() - - nl.writeLockCount++ + if isWrite { + nl.Lock() + } else { + nl.RLock() + } return nl, nil } +// Acquire locks the named lock for writing, and blocks until the wlock is acquired +func (lk *namedLocker) Acquire(lockName string) (NamedLock, error) { + return lk.acquire(lockName, true) +} + // RAcquire locks the named lock for reading, and blocks until the rlock is acquired func (lk *namedLocker) RAcquire(lockName string) (NamedLock, error) { - if lockName == "" { - return nil, errInvalidLockName(lockName) - } - - lk.mapLock.Lock() - nl, ok := lk.locks[lockName] - if !ok { - nl = newNamedLock(lockName, lk) - lk.locks[lockName] = nl - } - - atomic.AddInt32(&nl.queueSize, 1) - lk.mapLock.Unlock() - atomic.StoreInt32(&nl.writeLockMode, 0) - - nl.RLock() - return nl, nil + return lk.acquire(lockName, false) } func errInvalidLockName(name string) error { diff --git a/pkg/locks/locks_test.go b/pkg/locks/locks_test.go index 8e188da38..30526f01c 100644 --- a/pkg/locks/locks_test.go +++ b/pkg/locks/locks_test.go @@ -56,19 +56,6 @@ func TestLocks(t *testing.T) { t.Errorf("got %s expected %s", err.Error(), expected) } - nl3, _ := lk.Acquire("test1") - nl3.(*namedLock).name = "" - err = nl3.RRelease() - if err.Error() != expected { - t.Errorf("got %s expected %s", err.Error(), expected) - } - - nl = &namedLock{} - - err = nl.Release() - if err.Error() != expected { - t.Errorf("got %s expected %s", err.Error(), expected) - } } func TestLocksConcurrent(t *testing.T) { @@ -82,8 +69,9 @@ func TestLocksConcurrent(t *testing.T) { rand.Seed(time.Now().UnixNano()) + wg.Add(size) + for i := 0; i < size; i++ { - wg.Add(1) go func() { nl, err := lk.Acquire(testKey) if err != nil { @@ -142,49 +130,26 @@ func TestLockReadAndWrite(t *testing.T) { t.Error("expected error for invalid lock name") } - nl, _ = lk.RAcquire("testKeyReadOnly") - nl.(*namedLock).name = "" - - err = nl.Release() - if err == nil || !strings.HasPrefix(err.Error(), "invalid lock name:") { - t.Error("expected error for invalid lock name") - } -} - -func TestWriteLockCounter(t *testing.T) { - - const expected = 50 - nl := newNamedLock("testKey", nil) - nl.writeLockCount = expected - v := nl.WriteLockCounter() - if v != expected { - t.Errorf("expected %d got %d", expected, v) - } - -} - -func TestWriteLockMode(t *testing.T) { - - nl := newNamedLock("testKey", nil) - if nl.WriteLockMode() { - t.Error("expected false") - } - nl.writeLockMode = 1 - if !nl.WriteLockMode() { - t.Error("expected true") - } - } func TestUpgrade(t *testing.T) { locker := NewNamedLocker() nl, _ := locker.RAcquire("test") - if nl.WriteLockCounter() != 0 { - t.Errorf("expected 0 got %d", nl.WriteLockCounter()) + + b := nl.Upgrade() + nl.Release() + if !b { + t.Errorf("expected firstWrite to be true") } - nl, _ = nl.Upgrade() - if nl.WriteLockCounter() != 1 { - t.Errorf("expected 1 got %d", nl.WriteLockCounter()) + + nl, _ = locker.RAcquire("test2") + nl1 := nl.(*namedLock) + nl1.subsequentWriter = true + b = nl.Upgrade() + nl.Release() + if b { + t.Errorf("expected firstWrite to be false") } + } diff --git a/pkg/proxy/engines/deltaproxycache.go b/pkg/proxy/engines/deltaproxycache.go index 98a7b0511..589bd056e 100644 --- a/pkg/proxy/engines/deltaproxycache.go +++ b/pkg/proxy/engines/deltaproxycache.go @@ -121,6 +121,7 @@ func DeltaProxyCacheRequest(w http.ResponseWriter, r *http.Request) { var elapsed time.Duration coReq := GetRequestCachingPolicy(r.Header) +checkCache: if coReq.NoCache { if span != nil { span.AddEvent( @@ -236,27 +237,20 @@ func DeltaProxyCacheRequest(w http.ResponseWriter, r *http.Request) { // which will have the same cacheStatus, from causing the same or similar HTTP requests // to be made against the origin, since just one should do. - // to do this we can ask the lock object how many write locks have been acquired. since - // we have a read lock, this number can't be updated again until all reads are released. - cwc := pr.cacheLock.WriteLockCounter() - // acquire a write lock via the Upgrade method, which will swap your read lock for a - // write lock, ensuring that write lock counter state is intact during the upgrade - pr.cacheLock, _ = pr.cacheLock.Upgrade() - // now we have the write lock. so we can check if the write lock counter incremented by 1 - // or more. If the difference is just 1, that means this request was the first to acquire - // a write lock following all of the read locks being released. That means it is good to - // proceed with upstream communications and caching. when the difference is > 1, it means - // another requests was first to acquire the mutex, and we have no idea what changed in - // the cache since between when we queried, and the other requests with the lock serviced - // so in that case, we will just send the request back through the DPC from the top in - // case the updated cache might benefit them. - - // now check if we were the first request for this url to upgrade from a reader to writer - if pr.cacheLock.WriteLockCounter()-cwc != 1 { + // acquire a write lock via the Upgrade method, which will swap the read lock for a + // write lock, and return true if this client was the only one, or otherwise the first + // client in a concurrent read lock group to request an Upgrade. + wasFirst := pr.cacheLock.Upgrade() + + // if this request was first, it is good to proceed with upstream communications and caching. + // when another requests was first to acquire the mutex, we will jump up to checkCache + // to get the refreshed version. after 3 reiterations, we'll proceed anyway to avoid long loops. + if !wasFirst && pr.rerunCount < 3 { // we weren't first, so quickly drop our write lock, and re-run the request pr.cacheLock.Release() - DeltaProxyCacheRequest(w, r) - return + pr.cacheLock, _ = locker.RAcquire(key) + pr.rerunCount++ + goto checkCache } writeLock = pr.cacheLock } diff --git a/pkg/proxy/engines/objectproxycache.go b/pkg/proxy/engines/objectproxycache.go index a14fd1f0e..42607ff7d 100644 --- a/pkg/proxy/engines/objectproxycache.go +++ b/pkg/proxy/engines/objectproxycache.go @@ -72,7 +72,7 @@ func handleCachePartialHit(pr *proxyRequest) error { } b1, b2 := upgradeLock(pr) - if b1 && !b2 { + if b1 && !b2 && pr.rerunCount < 3 { rerunRequest(pr) return nil } @@ -485,14 +485,13 @@ func recordOPCResult(pr *proxyRequest, cacheStatus status.LookupStatus, httpStat func upgradeLock(pr *proxyRequest) (bool, bool) { if pr.hasReadLock && !pr.hasWriteLock { - cwc := pr.cacheLock.WriteLockCounter() - pr.cacheLock.Upgrade() + wasFirst := pr.cacheLock.Upgrade() pr.hasReadLock = false pr.hasWriteLock = true - if pr.cacheLock.WriteLockCounter()-cwc != 1 { - return true, false + if wasFirst { + return true, true } - return true, true + return true, false } return false, false } diff --git a/pkg/proxy/engines/proxy_request.go b/pkg/proxy/engines/proxy_request.go index cf6894b19..5eadc3dcc 100644 --- a/pkg/proxy/engines/proxy_request.go +++ b/pkg/proxy/engines/proxy_request.go @@ -57,6 +57,8 @@ type proxyRequest struct { revalidationResponse *http.Response revalidationReader io.ReadCloser + rerunCount int + cacheDocument *HTTPDocument cacheBuffer *bytes.Buffer cacheLock locks.NamedLock