Skip to content

Commit

Permalink
fix(tests): added tests for Renew
Browse files Browse the repository at this point in the history
  • Loading branch information
cnlangzi committed Mar 19, 2024
1 parent a433a03 commit 92261fe
Show file tree
Hide file tree
Showing 6 changed files with 138 additions and 55 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

# Test binary, built with `go test -c`
*.test
.db
*.db
snapshots

# Output of the go coverage tool, specifically when used with LiteIDE
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go 1.18

require (
github.com/mattn/go-sqlite3 v1.14.22
github.com/pkg/errors v0.9.1
github.com/stretchr/testify v1.9.0
github.com/yaitoo/async v1.0.4
github.com/yaitoo/sqle v1.3.1
Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWE
github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/mattn/go-sqlite3 v1.14.22 h1:2gZY6PC6kBnID23Tichd1K+Z0oS6nE/XwU+Vz/5o4kU=
github.com/mattn/go-sqlite3 v1.14.22/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
Expand Down
1 change: 1 addition & 0 deletions lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ func (l *Lease) IsLive() bool {
return time.Now().Before(time.Unix(l.Since, 0).Add(l.TTL.Duration()))
}

// IsExpired check if lease expires on mutex side
func (l *Lease) IsExpired(start time.Time) bool {
now := time.Now()
l.ExpiresOn = now.Add(l.TTL.Duration() - time.Until(start))
Expand Down
61 changes: 32 additions & 29 deletions mutex.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ func New(id, topic, key string, options ...MutexOption) *Mutex {
id: id,
topic: strings.ToLower(topic),
key: strings.ToLower(key),
done: make(chan struct{}),
timeout: DefaultTimeout,
ttl: DefaultLeaseTerm,
}
Expand All @@ -28,6 +27,7 @@ func New(id, topic, key string, options ...MutexOption) *Mutex {
o(m)
}

m.Context, m.cancel = context.WithCancelCause(context.Background())
m.consensus = int(math.Floor(float64(len(m.peers))/2)) + 1

return m
Expand All @@ -44,7 +44,9 @@ type Mutex struct {
ttl time.Duration

consensus int
done chan struct{}

context.Context
cancel context.CancelCauseFunc

lease Lease
}
Expand All @@ -69,7 +71,7 @@ func (m *Mutex) connect(ctx context.Context) ([]*rpc.Client, error) {

}

func (m *Mutex) Lock(ctx context.Context) (context.Context, context.CancelFunc, error) {
func (m *Mutex) Lock(ctx context.Context) error {
m.mu.Lock()
defer m.mu.Unlock()

Expand All @@ -81,7 +83,7 @@ func (m *Mutex) Lock(ctx context.Context) (context.Context, context.CancelFunc,

cluster, err := m.connect(ctx)
if err != nil {
return nil, nil, err
return err
}

for _, c := range cluster {
Expand All @@ -99,23 +101,20 @@ func (m *Mutex) Lock(ctx context.Context) (context.Context, context.CancelFunc,

if err != nil {
Logger.Warn("dlm: renew lock", slog.Any("err", errs))
return nil, nil, m.Error(errs, err)
return m.Error(errs, err)
}

t := result[0]

if t.IsExpired(start) {
return nil, nil, ErrExpiredLease
return ErrExpiredLease
}

m.lease = t

statusCtx, statusCancel := context.WithCancel(context.Background())

go m.keepalive(statusCtx, statusCancel)
go m.waitExpires(statusCtx, statusCancel)
go m.waitExpires()

return statusCtx, statusCancel, nil
return nil

}

Expand Down Expand Up @@ -186,14 +185,15 @@ func (m *Mutex) Unlock(ctx context.Context) error {
}
}(c))
}
m.done <- struct{}{}

_, errs, err := a.WaitN(ctx, m.consensus)
if err != nil {
Logger.Warn("dlm: renew lock", slog.Any("err", errs))
return m.Error(errs, err)
}

m.cancel(nil)

return nil
}
func (m *Mutex) createRequest() LockRequest {
Expand All @@ -205,50 +205,53 @@ func (m *Mutex) createRequest() LockRequest {
}
}

func (m *Mutex) waitExpires(ctx context.Context, cancel context.CancelFunc) {
defer cancel()
func (m *Mutex) waitExpires() {

var expiresOn time.Time
for {
m.mu.RLock()
expiresOn = m.lease.ExpiresOn
m.mu.RUnlock()

select {
case <-m.done:
return
case <-ctx.Done():

case <-m.Context.Done():
return
case <-time.After(time.Until(expiresOn)):
if !expiresOn.Before(expiresOn) {
// get latest ExpiresOn
m.mu.RLock()
expiresOn = m.lease.ExpiresOn
m.mu.RUnlock()

if !time.Now().Before(expiresOn) {
m.cancel(ErrExpiredLease)
return
}
}
}
}

func (m *Mutex) keepalive(ctx context.Context, cancel context.CancelFunc) {
defer cancel()
func (m *Mutex) Keepalive() {

var err error
for {

m.mu.RLock()
expiresOn := m.lease.ExpiresOn
m.mu.RUnlock()

select {
case <-m.done:
return
case <-ctx.Done():
case <-m.Context.Done():
return
case <-time.After(1 * time.Second):
m.mu.RLock()
expiresOn := m.lease.ExpiresOn
m.mu.RUnlock()

// lease already expires
if !expiresOn.Before(expiresOn) {
if !time.Now().Before(expiresOn) {
m.cancel(ErrExpiredLease)
return
}

err = m.Renew(context.Background())
if errors.Is(err, ErrExpiredLease) {
m.cancel(ErrExpiredLease)
return
}
}
Expand Down
127 changes: 102 additions & 25 deletions mutext_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,17 +54,74 @@ func TestLock(t *testing.T) {
name: "lock_should_work",
run: func(r *require.Assertions) {
m := New("lock_should_work", "wallet", "lock_should_work", WithPeers(peers...), WithTTL(10*time.Second))
_, cancel, err := m.Lock(context.TODO())
defer cancel()
err := m.Lock(context.TODO())
r.NoError(err)
r.Equal(10*time.Second, m.lease.TTL.Duration())
r.Equal("wallet", m.lease.Topic)
r.Equal("lock_should_work", m.lease.Key)

m2 := New("lock_should_work_2", "wallet", "lock_should_work", WithPeers(peers...))
_, _, err = m2.Lock(context.TODO())
},
},
{
name: "lock_should_not_work_if_lease_exists",
run: func(r *require.Assertions) {
m := New("lock", "wallet", "lock_exists", WithPeers(peers...), WithTTL(10*time.Second))
err := m.Lock(context.TODO())
r.NoError(err)
r.Equal(10*time.Second, m.lease.TTL.Duration())
r.Equal("wallet", m.lease.Topic)
r.Equal("lock_exists", m.lease.Key)

r.Error(err, async.ErrTooLessDone)
m2 := New("lock_2", "wallet", "lock_exists", WithPeers(peers...))
err = m2.Lock(context.TODO())

r.Error(err, ErrLeaseExists)
},
},
{
name: "expires_should_work",
run: func(r *require.Assertions) {
ttl := 3 * time.Second
m := New("lock_should_work", "wallet", "expires_should_work", WithPeers(peers...), WithTTL(ttl))
err := m.Lock(context.TODO())
r.NoError(err)
r.Equal(ttl, m.lease.TTL.Duration())
r.Equal("wallet", m.lease.Topic)
r.Equal("expires_should_work", m.lease.Key)

time.Sleep(ttl)

<-m.Done()
r.ErrorIs(context.Cause(m), ErrExpiredLease)

},
},
{
name: "lock_should_work_when_old_lease_expires",
run: func(r *require.Assertions) {
ttl := 3 * time.Second
m := New("lock", "wallet", "lock_exists", WithPeers(peers...), WithTTL(ttl))
err := m.Lock(context.TODO())
r.NoError(err)
r.Equal(ttl, m.lease.TTL.Duration())
r.Equal("lock", m.lease.Lessee)
r.Equal("wallet", m.lease.Topic)
r.Equal("lock_exists", m.lease.Key)

ttl2 := 5 * time.Second
m2 := New("lock_2", "wallet", "lock_exists", WithPeers(peers...), WithTTL(ttl2))
err = m2.Lock(context.TODO())

r.Error(err, ErrLeaseExists)

time.Sleep(ttl) // wait for 1st lease expires

err = m2.Lock(context.TODO())
r.NoError(err)
r.Equal(ttl2, m2.lease.TTL.Duration())
r.Equal("lock_2", m2.lease.Lessee)
r.Equal("wallet", m2.lease.Topic)
r.Equal("lock_exists", m2.lease.Key)
},
},
{
Expand All @@ -75,18 +132,17 @@ func TestLock(t *testing.T) {
nodes[0].Stop()
nodes[1].Stop()

_, cancel, err := m.Lock(context.TODO())
defer cancel()
err := m.Lock(context.TODO())

r.NoError(err)
r.Equal(10*time.Second, m.lease.TTL.Duration())
r.Equal("wallet", m.lease.Topic)
r.Equal("minority_nodes_are_down", m.lease.Key)

m2 := New("lock_should_work_2", "wallet", "minority_nodes_are_down", WithPeers(peers...))
_, _, err = m2.Lock(context.TODO())
err = m2.Lock(context.TODO())

r.Error(err, async.ErrTooLessDone)

},
},
{
Expand All @@ -96,13 +152,8 @@ func TestLock(t *testing.T) {

nodes[2].Stop()

_, cancel, err := m.Lock(context.TODO())
if cancel != nil {
defer cancel()
}

err = m.Lock(context.TODO())
r.Error(err, async.ErrTooLessDone)

},
},
}
Expand Down Expand Up @@ -136,8 +187,8 @@ func TestRenew(t *testing.T) {
run: func(r *require.Assertions) {
ttl := 10 * time.Second
m := New("renew", "wallet", "renew", WithPeers(peers...), WithTTL(ttl))
_, cancel, err := m.Lock(context.TODO())
defer cancel()
err := m.Lock(context.TODO())

r.NoError(err)
r.Equal(ttl, m.lease.TTL.Duration())
r.Equal("wallet", m.lease.Topic)
Expand All @@ -154,13 +205,13 @@ func TestRenew(t *testing.T) {
name: "renew_should_not_work_when_lease_is_expired",
run: func(r *require.Assertions) {
ttl := 2 * time.Second
m := New("renew", "wallet", "renew", WithPeers(peers...), WithTTL(ttl))
_, cancel, err := m.Lock(context.TODO())
defer cancel()
m := New("renew", "wallet", "renew_expires", WithPeers(peers...), WithTTL(ttl))
err := m.Lock(context.TODO())

r.NoError(err)
r.Equal(ttl, m.lease.TTL.Duration())
r.Equal("wallet", m.lease.Topic)
r.Equal("renew", m.lease.Key)
r.Equal("renew_expires", m.lease.Key)

time.Sleep(ttl)

Expand All @@ -169,13 +220,39 @@ func TestRenew(t *testing.T) {

},
},
{
name: "keepalive_should_work",
run: func(r *require.Assertions) {
ttl := 2 * time.Second
m := New("renew", "wallet", "renew_keepalive", WithPeers(peers...), WithTTL(ttl))
err := m.Lock(context.TODO())

r.NoError(err)
r.Equal(ttl, m.lease.TTL.Duration())
r.Equal("renew", m.lease.Lessee)
r.Equal("wallet", m.lease.Topic)
r.Equal("renew_keepalive", m.lease.Key)

go m.Keepalive()

time.Sleep(ttl)

err = m.Renew(context.TODO())
r.NoError(err)

time.Sleep(1 * time.Second)
err = m.Renew(context.TODO())
r.NoError(err)

},
},
{
name: "renew_should_work_when_minority_nodes_are_down",
run: func(r *require.Assertions) {
m := New("renew", "wallet", "renew_minority", WithPeers(peers...), WithTTL(10*time.Second))

_, cancel, err := m.Lock(context.TODO())
defer cancel()
err := m.Lock(context.TODO())

r.NoError(err)
r.Equal(10*time.Second, m.lease.TTL.Duration())
r.Equal("wallet", m.lease.Topic)
Expand All @@ -193,8 +270,8 @@ func TestRenew(t *testing.T) {
run: func(r *require.Assertions) {
m := New("renew", "wallet", "renew_majority", WithPeers(peers...), WithTTL(10*time.Second))

_, cancel, err := m.Lock(context.TODO())
defer cancel()
err := m.Lock(context.TODO())

r.NoError(err)
r.Equal(10*time.Second, m.lease.TTL.Duration())
r.Equal("wallet", m.lease.Topic)
Expand Down

0 comments on commit 92261fe

Please sign in to comment.