Skip to content

Commit

Permalink
fix(tests): added tests for Freeze/Reset
Browse files Browse the repository at this point in the history
  • Loading branch information
cnlangzi committed Mar 19, 2024
1 parent 8bbee2d commit 50b8e37
Show file tree
Hide file tree
Showing 5 changed files with 228 additions and 9 deletions.
81 changes: 75 additions & 6 deletions mutex.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ func (m *Mutex) Unlock(ctx context.Context) error {
m.mu.Lock()
defer m.mu.Unlock()

a := async.New[bool]()
a := async.NewA()
req := m.createRequest()

cluster, err := m.connect(ctx)
Expand All @@ -173,20 +173,20 @@ func (m *Mutex) Unlock(ctx context.Context) error {
}

for _, c := range cluster {
a.Add(func(c *rpc.Client) func(ctx context.Context) (bool, error) {
return func(ctx context.Context) (bool, error) {
a.Add(func(c *rpc.Client) func(ctx context.Context) error {
return func(ctx context.Context) error {
var t bool
err := c.Call("dlm.ReleaseLock", req, &t)
if err != nil {
return t, err
return err
}

return t, nil
return nil
}
}(c))
}

_, errs, err := a.WaitN(ctx, m.consensus)
errs, err := a.WaitN(ctx, m.consensus)
if err != nil {
Logger.Warn("dlm: renew lock", slog.Any("err", errs))
return m.Error(errs, err)
Expand All @@ -196,6 +196,75 @@ func (m *Mutex) Unlock(ctx context.Context) error {

return nil
}

func (m *Mutex) Freeze(ctx context.Context, topic string) error {
m.mu.Lock()
defer m.mu.Unlock()

a := async.NewA()

ctx, cancel := context.WithTimeout(ctx, m.timeout)
defer cancel()

cluster, err := m.connect(ctx)
if err != nil {
return err
}

for _, c := range cluster {
a.Add(func(c *rpc.Client) func(ctx context.Context) error {
return func(ctx context.Context) error {
var ok bool
return c.Call("dlm.Freeze", topic, &ok)
}
}(c))
}

errs, err := a.WaitN(ctx, m.consensus)

if err != nil {
Logger.Warn("dlm: freeze topic", slog.Any("err", errs))
return m.Error(errs, err)

Check warning on line 227 in mutex.go

View check run for this annotation

Codecov / codecov/patch

mutex.go#L226-L227

Added lines #L226 - L227 were not covered by tests
}

return nil

}

func (m *Mutex) Reset(ctx context.Context, topic string) error {
m.mu.Lock()
defer m.mu.Unlock()

a := async.NewA()

ctx, cancel := context.WithTimeout(ctx, m.timeout)
defer cancel()

cluster, err := m.connect(ctx)
if err != nil {
return err
}

for _, c := range cluster {
a.Add(func(c *rpc.Client) func(ctx context.Context) error {
return func(ctx context.Context) error {
var ok bool
return c.Call("dlm.Reset", topic, &ok)
}
}(c))
}

errs, err := a.WaitN(ctx, m.consensus)

if err != nil {
Logger.Warn("dlm: freeze reset", slog.Any("err", errs))
return m.Error(errs, err)

Check warning on line 261 in mutex.go

View check run for this annotation

Codecov / codecov/patch

mutex.go#L260-L261

Added lines #L260 - L261 were not covered by tests
}

return nil

}

func (m *Mutex) createRequest() LockRequest {
return LockRequest{
ID: m.id,
Expand Down
4 changes: 3 additions & 1 deletion mutex_option.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package dlm

import "time"
import (
"time"
)

type MutexOption func(m *Mutex)

Expand Down
147 changes: 146 additions & 1 deletion mutext_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,17 @@ func TestUnlock(t *testing.T) {
r.ErrorIs(err, ErrNoLease)
},
},
{
name: "unlock_should_work_even_lease_does_not_exists",
run: func(r *require.Assertions) {
ttl := 10 * time.Second
m := New("unlock", "wallet", "unlock", WithPeers(peers...), WithTTL(ttl))

err = m.Unlock(context.TODO())
r.NoError(err)

},
},
{
name: "unlock_should_not_work_when_lease_is_not_yours",
run: func(r *require.Assertions) {
Expand Down Expand Up @@ -403,7 +414,7 @@ func TestUnlock(t *testing.T) {
},
},
{
name: "renew_should_not_work_when_majority_nodes_are_down",
name: "unlock_should_not_work_when_majority_nodes_are_down",
run: func(r *require.Assertions) {
ttl := 10 * time.Second
m := New("unlock", "wallet", "unlock_majority", WithPeers(peers...), WithTTL(ttl))
Expand Down Expand Up @@ -433,3 +444,137 @@ func TestUnlock(t *testing.T) {
})
}
}

func TestTopic(t *testing.T) {
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
peers, nodes, clean, err := createCluster(ctx, 5)

require.NoError(t, err)

defer func() {
for _, c := range clean {
c()
}
}()

tests := []struct {
name string
run func(*require.Assertions)
}{
{
name: "freeze_should_work",
run: func(r *require.Assertions) {
ttl := 10 * time.Second
m := New("freeze", "freeze", "freeze", WithPeers(peers...), WithTTL(ttl))
err := m.Lock(context.TODO())

r.NoError(err)
r.Equal(ttl, m.lease.TTL.Duration())
r.Equal("freeze", m.lease.Lessee)
r.Equal("freeze", m.lease.Topic)
r.Equal("freeze", m.lease.Key)

err = m.Freeze(context.Background(), "freeze")
r.NoError(err)

err = m.Renew(context.TODO())
r.ErrorIs(err, ErrFrozenTopic)

m2 := New("freeze_2", "freeze", "freeze_2", WithPeers(peers...), WithTTL(ttl))
err = m2.Lock(context.TODO())
r.ErrorIs(err, ErrFrozenTopic)

err = m.Reset(context.Background(), "freeze")
r.NoError(err)

err = m.Renew(context.Background())
r.NoError(err)
r.Equal(ttl, m.lease.TTL.Duration())
r.Equal("freeze", m.lease.Topic)
r.Equal("freeze", m.lease.Key)

err = m2.Lock(context.TODO())
r.NoError(err)
r.Equal(ttl, m2.lease.TTL.Duration())
r.Equal("freeze_2", m2.lease.Lessee)
r.Equal("freeze", m2.lease.Topic)
r.Equal("freeze_2", m2.lease.Key)
},
},
{
name: "freeze_should_work_when_minority_nodes_are_down",
run: func(r *require.Assertions) {
ttl := 10 * time.Second
m := New("freeze", "freeze", "freeze", WithPeers(peers...), WithTTL(ttl))
err := m.Lock(context.TODO())

r.NoError(err)
r.Equal(ttl, m.lease.TTL.Duration())
r.Equal("freeze", m.lease.Lessee)
r.Equal("freeze", m.lease.Topic)
r.Equal("freeze", m.lease.Key)

nodes[0].Stop()
nodes[1].Stop()

err = m.Freeze(context.Background(), "freeze")
r.NoError(err)

err = m.Renew(context.TODO())
r.ErrorIs(err, ErrFrozenTopic)

m2 := New("freeze_2", "freeze", "freeze_2", WithPeers(peers...), WithTTL(ttl))
err = m2.Lock(context.TODO())
r.ErrorIs(err, ErrFrozenTopic)

err = m.Reset(context.Background(), "freeze")
r.NoError(err)

err = m.Renew(context.Background())
r.NoError(err)
r.Equal(ttl, m.lease.TTL.Duration())
r.Equal("freeze", m.lease.Topic)
r.Equal("freeze", m.lease.Key)

err = m2.Lock(context.TODO())
r.NoError(err)
r.Equal(ttl, m2.lease.TTL.Duration())
r.Equal("freeze_2", m2.lease.Lessee)
r.Equal("freeze", m2.lease.Topic)
r.Equal("freeze_2", m2.lease.Key)
},
},

{
name: "freeze_should_work_when_minority_nodes_are_down",
run: func(r *require.Assertions) {
ttl := 10 * time.Second
m := New("freeze", "freeze", "freeze", WithPeers(peers...), WithTTL(ttl))
err := m.Lock(context.TODO())

r.NoError(err)
r.Equal(ttl, m.lease.TTL.Duration())
r.Equal("freeze", m.lease.Lessee)
r.Equal("freeze", m.lease.Topic)
r.Equal("freeze", m.lease.Key)

nodes[0].Stop()
nodes[1].Stop()
nodes[2].Stop()

err = m.Freeze(context.Background(), "freeze")
r.ErrorIs(err, async.ErrTooLessDone)

err = m.Reset(context.Background(), "freeze")
r.ErrorIs(err, async.ErrTooLessDone)
},
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
test.run(require.New(t))
})
}
}
3 changes: 3 additions & 0 deletions node_rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,9 @@ func (n *Node) ReleaseLock(req LockRequest, ok *bool) error {

lease, err := n.getLease(req.Topic, req.Key)
if err != nil {
if errors.Is(err, ErrNoLease) {
return nil
}
return err
}

Expand Down
2 changes: 1 addition & 1 deletion node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ func TestLease(t *testing.T) {
}
}

func TestTopic(t *testing.T) {
func TestNodeTopic(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

Expand Down

0 comments on commit 50b8e37

Please sign in to comment.