Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

named lock performance improvements #517

Merged
merged 1 commit into from
Dec 15, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
164 changes: 59 additions & 105 deletions pkg/locks/locks.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,163 +29,117 @@ type NamedLocker interface {

type namedLocker struct {
locks map[string]*namedLock
mapLock *sync.Mutex
mapLock namedLock
}

// NewNamedLocker returns a new Named Locker
func NewNamedLocker() NamedLocker {
return &namedLocker{
locks: make(map[string]*namedLock),
mapLock: &sync.Mutex{},
locks: make(map[string]*namedLock),
}
}

// NamedLock defines the interface for implementing Named Locks
type NamedLock interface {
Release() error
RRelease() error
Upgrade() (NamedLock, error)
WriteLockCounter() int
WriteLockMode() bool
Upgrade() bool
}

func newNamedLock(name string, locker *namedLocker) *namedLock {
return &namedLock{
name: name,
RWMutex: &sync.RWMutex{},
locker: locker,
name: name,
locker: locker,
}
}

type namedLock struct {
*sync.RWMutex
name string
queueSize int32
writeLockMode int32
writeLockCount int
locker *namedLocker
sync.RWMutex
name string
queueSize int32
locker *namedLocker
subsequentWriter bool
}

// Release releases the write lock on the subject Named Lock
func (nl *namedLock) Release() error {

if nl.name == "" {
return errInvalidLockName(nl.name)
}

func (nl *namedLock) release(unlockFunc func()) {
qs := atomic.AddInt32(&nl.queueSize, -1)
if qs == 0 {
nl.locker.mapLock.Lock()
delete(nl.locker.locks, nl.name)
// recheck queue size after getting the lock since another client
// might have joined since the map lock was acquired
if nl.queueSize == 0 {
delete(nl.locker.locks, nl.name)
}
nl.locker.mapLock.Unlock()
}
unlockFunc()
}

atomic.AddInt32(&nl.writeLockMode, -1)
nl.Unlock()
// Release releases the write lock on the subject Named Lock
func (nl *namedLock) Release() error {
nl.release(nl.Unlock)
return nil
}

// RRelease releases the read lock on the subject Named Lock
func (nl *namedLock) RRelease() error {

if nl.name == "" {
return errInvalidLockName(nl.name)
}

qs := atomic.AddInt32(&nl.queueSize, -1)
if qs == 0 {
nl.locker.mapLock.Lock()
delete(nl.locker.locks, nl.name)
nl.locker.mapLock.Unlock()
}

nl.RUnlock()
nl.release(nl.RUnlock)
return nil
}

// WriteLockCounter returns the number of write locks acquired by the namedLock
// This function should only be called by a goroutine actively holding a write lock,
// as it is otherwise not atomic
func (nl *namedLock) WriteLockCounter() int {
return nl.writeLockCount
}

// WriteLockMode returns true if a caller is waiting for a write lock
func (nl *namedLock) WriteLockMode() bool {
return atomic.LoadInt32(&nl.writeLockMode) > 0
}

// Upgrade will upgrade the current read-lock to a write lock without losing the reference to the
// underlying sync map, enabling goroutines, after receiving a write lock, to know how many other
// goroutines acquired a write lock (naturally or upgraded) during the time this routine released
// it's read lock and got a write lock. This helps the receiver of the write lock know if any extra
// state checks are required (e.g., re-querying a cache that might have changed) before proceeding.
func (nl *namedLock) Upgrade() (NamedLock, error) {

ch := make(chan bool, 1)
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
atomic.AddInt32(&nl.queueSize, 1)
ch <- true
atomic.AddInt32(&nl.writeLockMode, 1)
nl.Lock()
nl.writeLockCount++
wg.Done()
}()

// once we know the write lock queueSize is incremented, we can release our read lock
<-ch
close(ch)
nl.RRelease()

// wait until write mode is set, read lock is released, and write lock is acquired
wg.Wait()

return nl, nil
// Upgrade will upgrade the current read lock to a write lock. This method will
// always succeed unless a read lock did not already exist, which panics like a
// normal mutex. the return value indicates whether the requesting goroutine was
// first to receive a lock, and will be false when multiple goroutines upgraded
// concurrently and the caller was not the first in the queue to receive it.
// This helps the caller know if any extra state checks are required
// (e.g., re-querying a cache that might have changed) before proceeding.
func (nl *namedLock) Upgrade() bool {
nl.RUnlock()
nl.Lock()
if nl.subsequentWriter {
return false
}
nl.subsequentWriter = true
return true
}

// Acquire locks the named lock for writing, and blocks until the wlock is acquired
func (lk *namedLocker) Acquire(lockName string) (NamedLock, error) {
func (lk *namedLocker) acquire(lockName string, isWrite bool) (NamedLock, error) {
if lockName == "" {
return nil, errInvalidLockName(lockName)
}

lk.mapLock.Lock()
lk.mapLock.RLock()
nl, ok := lk.locks[lockName]
mapUnlockFunc := lk.mapLock.RUnlock
if !ok {
nl = newNamedLock(lockName, lk)
mapUnlockFunc = lk.mapLock.Unlock
lk.mapLock.Upgrade()
// check again in case we weren't the first to upgrade
nl, ok = lk.locks[lockName]
if !ok {
nl = newNamedLock(lockName, lk)
}
lk.locks[lockName] = nl
}
atomic.AddInt32(&nl.queueSize, 1)
lk.mapLock.Unlock()
atomic.AddInt32(&nl.writeLockMode, 1)
mapUnlockFunc()

nl.Lock()

nl.writeLockCount++
if isWrite {
nl.Lock()
} else {
nl.RLock()
}
return nl, nil
}

// Acquire locks the named lock for writing, and blocks until the wlock is acquired
func (lk *namedLocker) Acquire(lockName string) (NamedLock, error) {
return lk.acquire(lockName, true)
}

// RAcquire locks the named lock for reading, and blocks until the rlock is acquired
func (lk *namedLocker) RAcquire(lockName string) (NamedLock, error) {
if lockName == "" {
return nil, errInvalidLockName(lockName)
}

lk.mapLock.Lock()
nl, ok := lk.locks[lockName]
if !ok {
nl = newNamedLock(lockName, lk)
lk.locks[lockName] = nl
}

atomic.AddInt32(&nl.queueSize, 1)
lk.mapLock.Unlock()
atomic.StoreInt32(&nl.writeLockMode, 0)

nl.RLock()
return nl, nil
return lk.acquire(lockName, false)
}

func errInvalidLockName(name string) error {
Expand Down
67 changes: 16 additions & 51 deletions pkg/locks/locks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,19 +56,6 @@ func TestLocks(t *testing.T) {
t.Errorf("got %s expected %s", err.Error(), expected)
}

nl3, _ := lk.Acquire("test1")
nl3.(*namedLock).name = ""
err = nl3.RRelease()
if err.Error() != expected {
t.Errorf("got %s expected %s", err.Error(), expected)
}

nl = &namedLock{}

err = nl.Release()
if err.Error() != expected {
t.Errorf("got %s expected %s", err.Error(), expected)
}
}

func TestLocksConcurrent(t *testing.T) {
Expand All @@ -82,8 +69,9 @@ func TestLocksConcurrent(t *testing.T) {

rand.Seed(time.Now().UnixNano())

wg.Add(size)

for i := 0; i < size; i++ {
wg.Add(1)
go func() {
nl, err := lk.Acquire(testKey)
if err != nil {
Expand Down Expand Up @@ -142,49 +130,26 @@ func TestLockReadAndWrite(t *testing.T) {
t.Error("expected error for invalid lock name")
}

nl, _ = lk.RAcquire("testKeyReadOnly")
nl.(*namedLock).name = ""

err = nl.Release()
if err == nil || !strings.HasPrefix(err.Error(), "invalid lock name:") {
t.Error("expected error for invalid lock name")
}
}

func TestWriteLockCounter(t *testing.T) {

const expected = 50
nl := newNamedLock("testKey", nil)
nl.writeLockCount = expected
v := nl.WriteLockCounter()
if v != expected {
t.Errorf("expected %d got %d", expected, v)
}

}

func TestWriteLockMode(t *testing.T) {

nl := newNamedLock("testKey", nil)
if nl.WriteLockMode() {
t.Error("expected false")
}
nl.writeLockMode = 1
if !nl.WriteLockMode() {
t.Error("expected true")
}

}

func TestUpgrade(t *testing.T) {

locker := NewNamedLocker()
nl, _ := locker.RAcquire("test")
if nl.WriteLockCounter() != 0 {
t.Errorf("expected 0 got %d", nl.WriteLockCounter())

b := nl.Upgrade()
nl.Release()
if !b {
t.Errorf("expected firstWrite to be true")
}
nl, _ = nl.Upgrade()
if nl.WriteLockCounter() != 1 {
t.Errorf("expected 1 got %d", nl.WriteLockCounter())

nl, _ = locker.RAcquire("test2")
nl1 := nl.(*namedLock)
nl1.subsequentWriter = true
b = nl.Upgrade()
nl.Release()
if b {
t.Errorf("expected firstWrite to be false")
}

}
32 changes: 13 additions & 19 deletions pkg/proxy/engines/deltaproxycache.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ func DeltaProxyCacheRequest(w http.ResponseWriter, r *http.Request) {
var elapsed time.Duration

coReq := GetRequestCachingPolicy(r.Header)
checkCache:
if coReq.NoCache {
if span != nil {
span.AddEvent(
Expand Down Expand Up @@ -236,27 +237,20 @@ func DeltaProxyCacheRequest(w http.ResponseWriter, r *http.Request) {
// which will have the same cacheStatus, from causing the same or similar HTTP requests
// to be made against the origin, since just one should do.

// to do this we can ask the lock object how many write locks have been acquired. since
// we have a read lock, this number can't be updated again until all reads are released.
cwc := pr.cacheLock.WriteLockCounter()
// acquire a write lock via the Upgrade method, which will swap your read lock for a
// write lock, ensuring that write lock counter state is intact during the upgrade
pr.cacheLock, _ = pr.cacheLock.Upgrade()
// now we have the write lock. so we can check if the write lock counter incremented by 1
// or more. If the difference is just 1, that means this request was the first to acquire
// a write lock following all of the read locks being released. That means it is good to
// proceed with upstream communications and caching. when the difference is > 1, it means
// another requests was first to acquire the mutex, and we have no idea what changed in
// the cache since between when we queried, and the other requests with the lock serviced
// so in that case, we will just send the request back through the DPC from the top in
// case the updated cache might benefit them.

// now check if we were the first request for this url to upgrade from a reader to writer
if pr.cacheLock.WriteLockCounter()-cwc != 1 {
// acquire a write lock via the Upgrade method, which will swap the read lock for a
// write lock, and return true if this client was the only one, or otherwise the first
// client in a concurrent read lock group to request an Upgrade.
wasFirst := pr.cacheLock.Upgrade()

// if this request was first, it is good to proceed with upstream communications and caching.
// when another requests was first to acquire the mutex, we will jump up to checkCache
// to get the refreshed version. after 3 reiterations, we'll proceed anyway to avoid long loops.
if !wasFirst && pr.rerunCount < 3 {
// we weren't first, so quickly drop our write lock, and re-run the request
pr.cacheLock.Release()
DeltaProxyCacheRequest(w, r)
return
pr.cacheLock, _ = locker.RAcquire(key)
pr.rerunCount++
goto checkCache
}
writeLock = pr.cacheLock
}
Expand Down
11 changes: 5 additions & 6 deletions pkg/proxy/engines/objectproxycache.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func handleCachePartialHit(pr *proxyRequest) error {
}

b1, b2 := upgradeLock(pr)
if b1 && !b2 {
if b1 && !b2 && pr.rerunCount < 3 {
rerunRequest(pr)
return nil
}
Expand Down Expand Up @@ -485,14 +485,13 @@ func recordOPCResult(pr *proxyRequest, cacheStatus status.LookupStatus, httpStat

func upgradeLock(pr *proxyRequest) (bool, bool) {
if pr.hasReadLock && !pr.hasWriteLock {
cwc := pr.cacheLock.WriteLockCounter()
pr.cacheLock.Upgrade()
wasFirst := pr.cacheLock.Upgrade()
pr.hasReadLock = false
pr.hasWriteLock = true
if pr.cacheLock.WriteLockCounter()-cwc != 1 {
return true, false
if wasFirst {
return true, true
}
return true, true
return true, false
}
return false, false
}
Expand Down
Loading