Skip to content

Commit

Permalink
Merge pull request #149 from palcalde/pablo/add-set-when-extending
Browse files Browse the repository at this point in the history
Improve extending
  • Loading branch information
hjr265 authored Feb 17, 2024
2 parents ab6a63c + 9dbb9f3 commit c541100
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 7 deletions.
37 changes: 31 additions & 6 deletions mutex.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,12 @@ type Mutex struct {

quorum int

genValueFunc func() (string, error)
value string
until time.Time
shuffle bool
failFast bool
genValueFunc func() (string, error)
value string
until time.Time
shuffle bool
failFast bool
setNXOnExtend bool

pools []redis.Pool
}
Expand Down Expand Up @@ -117,7 +118,7 @@ func (m *Mutex) lockContext(ctx context.Context, tries int) error {
m.until = until
return nil
}
func() (int, error) {
_, _ = func() (int, error) {
ctx, cancel := context.WithTimeout(ctx, time.Duration(int64(float64(m.expiry)*m.timeoutFactor)))
defer cancel()
return m.actOnPoolsAsync(func(pool redis.Pool) (bool, error) {
Expand Down Expand Up @@ -257,6 +258,16 @@ func (m *Mutex) release(ctx context.Context, pool redis.Pool, value string) (boo
return status != int64(0), nil
}

var touchWithSetNXScript = redis.NewScript(1, `
if redis.call("GET", KEYS[1]) == ARGV[1] then
return redis.call("PEXPIRE", KEYS[1], ARGV[2])
elseif redis.call("SET", KEYS[1], ARGV[1], "PX", ARGV[2], "NX") then
return 1
else
return 0
end
`)

var touchScript = redis.NewScript(1, `
if redis.call("GET", KEYS[1]) == ARGV[1] then
return redis.call("PEXPIRE", KEYS[1], ARGV[2])
Expand All @@ -271,8 +282,22 @@ func (m *Mutex) touch(ctx context.Context, pool redis.Pool, value string, expiry
return false, err
}
defer conn.Close()

touchScript := touchScript
if m.setNXOnExtend {
touchScript = touchWithSetNXScript
}

status, err := conn.Eval(touchScript, m.name, value, expiry)
if err != nil {
// extend failed: clean up locks
_, _ = func() (int, error) {
ctx, cancel := context.WithTimeout(ctx, time.Duration(int64(float64(m.expiry)*m.timeoutFactor)))
defer cancel()
return m.actOnPoolsAsync(func(pool redis.Pool) (bool, error) {
return m.release(ctx, pool, value)
})
}()
return false, err
}
return status != int64(0), nil
Expand Down
57 changes: 56 additions & 1 deletion mutex_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,62 @@ func TestMutexExtendExpired(t *testing.T) {

time.Sleep(1 * time.Second)

ok, err := mutex.Extend()
_, err = mutex.Extend()
if err == nil {
t.Fatalf("mutex extend didn't fail")
}
})
}
}

func TestSetNXOnExtendAcquiresLockWhenKeyIsExpired(t *testing.T) {
for k, v := range makeCases(8) {
t.Run(k, func(t *testing.T) {
mutexes := newTestMutexes(v.pools, k+"-test-mutex-extend", 1)
mutex := mutexes[0]
mutex.setNXOnExtend = true
mutex.expiry = 500 * time.Millisecond

err := mutex.Lock()
if err != nil {
t.Fatalf("mutex lock failed: %s", err)
}
defer mutex.Unlock()

time.Sleep(1 * time.Second)

_, err = mutex.Extend()
if err != nil {
t.Fatalf("mutex didn't extend")
}
})
}
}

func TestSetNXOnExtendFailsToAcquireLockWhenKeyIsTaken(t *testing.T) {
for k, v := range makeCases(8) {
t.Run(k, func(t *testing.T) {
firstMutex := newTestMutexes(v.pools, k+"-test-mutex-extend", 1)[0]
firstMutex.expiry = 500 * time.Millisecond

err := firstMutex.Lock()
if err != nil {
t.Fatalf("mutex lock failed: %s", err)
}
defer firstMutex.Unlock()

time.Sleep(1 * time.Second)

secondMutex := newTestMutexes(v.pools, k+"-test-mutex-extend", 1)[0]
firstMutex.expiry = 500 * time.Millisecond
err = secondMutex.Lock()
defer secondMutex.Unlock()
if err != nil {
t.Fatalf("second mutex couldn't lock")
}

ok, err := firstMutex.Extend()
firstMutex.setNXOnExtend = true
if err == nil {
t.Fatalf("mutex extend didn't fail")
}
Expand Down
10 changes: 10 additions & 0 deletions redsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,16 @@ func WithRetryDelay(delay time.Duration) Option {
})
}

// WithSetNXOnExtend improves extending logic to extend the key if exist
// and if not, tries to set a new key in redis
// Useful if your redises restart often and you want to reduce the chances of losing the lock
// Read this MR for more info: https://github.com/go-redsync/redsync/pull/149
func WithSetNXOnExtend() Option {
return OptionFunc(func(m *Mutex) {
m.setNXOnExtend = true
})
}

// WithRetryDelayFunc can be used to override default delay behavior.
func WithRetryDelayFunc(delayFunc DelayFunc) Option {
return OptionFunc(func(m *Mutex) {
Expand Down

0 comments on commit c541100

Please sign in to comment.