Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(tests): added tests for Mutex #2

Merged
merged 6 commits into from
Mar 19, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,13 @@ type Lease struct {
ExpiresOn time.Time `json:"-"`
}

// IsLive check if lease is live on node side
func (l *Lease) IsLive() bool {
return time.Now().Before(time.Unix(l.Since, 0).Add(l.TTL.Duration()))
}

func (l *Lease) IsExpired(start time.Time) bool {
now := time.Now()
l.ExpiresOn = now.Add(l.TTL.Duration() - time.Until(start))
return !now.Before(l.ExpiresOn)
}
194 changes: 126 additions & 68 deletions mutex.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import (
"context"
"errors"
"log/slog"
"math"
"net/rpc"
"strings"
Expand All @@ -27,7 +28,7 @@
o(m)
}

m.consensus = int(math.Ceil(float64(len(m.peers)) / 2))
m.consensus = int(math.Floor(float64(len(m.peers))/2)) + 1

return m
}
Expand All @@ -43,33 +44,29 @@
ttl time.Duration

consensus int
cluster []*rpc.Client
done chan struct{}

lease Lease
}

func (m *Mutex) connect(ctx context.Context) error {
if m.cluster == nil {
a := async.New[*rpc.Client]()
for _, d := range m.peers {
a.Add(func(addr string) func(context.Context) (*rpc.Client, error) {
return func(ctx context.Context) (*rpc.Client, error) {
return connect(ctx, addr, m.timeout)
}
}(d))
}
func (m *Mutex) connect(ctx context.Context) ([]*rpc.Client, error) {

cluster, _, err := a.Wait(ctx)
if len(cluster) >= m.consensus {
m.cluster = cluster
return nil
}
a := async.New[*rpc.Client]()
for _, d := range m.peers {
a.Add(func(addr string) func(context.Context) (*rpc.Client, error) {
return func(ctx context.Context) (*rpc.Client, error) {
return connect(ctx, addr, m.timeout)
}
}(d))
}

return err
cluster, _, err := a.Wait(ctx)
if len(cluster) >= m.consensus {
return cluster, nil
}

return nil
return nil, err

}

func (m *Mutex) Lock(ctx context.Context) (context.Context, context.CancelFunc, error) {
Expand All @@ -82,12 +79,12 @@
ctx, cancel := context.WithTimeout(ctx, m.timeout)
defer cancel()

err := m.connect(ctx)
cluster, err := m.connect(ctx)
if err != nil {
return nil, nil, err
}

for _, c := range m.cluster {
for _, c := range cluster {
a.Add(func(c *rpc.Client) func(ctx context.Context) (Lease, error) {
return func(ctx context.Context) (Lease, error) {
var t Lease
Expand All @@ -98,17 +95,16 @@
}

start := time.Now()
result, _, err := a.WaitN(ctx, m.consensus)
result, errs, err := a.WaitN(ctx, m.consensus)

if err != nil {
return nil, nil, err
Logger.Warn("dlm: renew lock", slog.Any("err", errs))
return nil, nil, m.Error(errs, err)
}

t := result[0]
now := time.Now()
t.ExpiresOn = now.Add(t.TTL.Duration() - time.Until(start))

if !now.Before(t.ExpiresOn) {
if t.IsExpired(start) {
return nil, nil, ErrExpiredLease
}

Expand All @@ -123,49 +119,17 @@

}

func (m *Mutex) Unlock(ctx context.Context) error {
func (m *Mutex) Renew(ctx context.Context) error {
m.mu.Lock()
defer m.mu.Unlock()

a := async.New[bool]()
a := async.New[Lease]()
req := m.createRequest()
for _, c := range m.cluster {
a.Add(func(c *rpc.Client) func(ctx context.Context) (bool, error) {
return func(ctx context.Context) (bool, error) {
var t bool
err := c.Call("dlm.ReleaseLock", req, &t)
if err != nil {
return t, err
}

return t, nil
}
}(c))
}
m.done <- struct{}{}

_, _, err := a.WaitN(ctx, m.consensus)
cluster, err := m.connect(ctx)
if err != nil {
return err
}

return nil
}
func (m *Mutex) createRequest() LockRequest {
return LockRequest{
ID: m.id,
Topic: m.topic,
Key: m.key,
TTL: m.ttl,
}
}

func (m *Mutex) Renew(ctx context.Context) error {
m.mu.Lock()
defer m.mu.Unlock()
a := async.New[Lease]()
req := m.createRequest()
for _, c := range m.cluster {
for _, c := range cluster {
a.Add(func(c *rpc.Client) func(ctx context.Context) (Lease, error) {
return func(ctx context.Context) (Lease, error) {
var t Lease
Expand All @@ -182,23 +146,65 @@
defer cancel()

start := time.Now()
result, _, err := a.WaitN(ctx, m.consensus)
result, errs, err := a.WaitN(ctx, m.consensus)
if err != nil {
return err
Logger.Warn("dlm: renew lock", slog.Any("err", errs))
return m.Error(errs, err)
}

now := time.Now()
t := result[0]
t.ExpiresOn = now.Add(t.TTL.Duration() - time.Until(start))

if !now.After(t.ExpiresOn) {
if t.IsExpired(start) {
return ErrExpiredLease
}

m.lease = t
return nil
}

func (m *Mutex) Unlock(ctx context.Context) error {
m.mu.Lock()
defer m.mu.Unlock()

Check warning on line 166 in mutex.go

View check run for this annotation

Codecov / codecov/patch

mutex.go#L164-L166

Added lines #L164 - L166 were not covered by tests

a := async.New[bool]()
req := m.createRequest()

Check warning on line 169 in mutex.go

View check run for this annotation

Codecov / codecov/patch

mutex.go#L168-L169

Added lines #L168 - L169 were not covered by tests

cluster, err := m.connect(ctx)
if err != nil {
return err

Check warning on line 173 in mutex.go

View check run for this annotation

Codecov / codecov/patch

mutex.go#L171-L173

Added lines #L171 - L173 were not covered by tests
}

for _, c := range cluster {
a.Add(func(c *rpc.Client) func(ctx context.Context) (bool, error) {
return func(ctx context.Context) (bool, error) {
var t bool
err := c.Call("dlm.ReleaseLock", req, &t)
if err != nil {
return t, err

Check warning on line 182 in mutex.go

View check run for this annotation

Codecov / codecov/patch

mutex.go#L176-L182

Added lines #L176 - L182 were not covered by tests
}

return t, nil

Check warning on line 185 in mutex.go

View check run for this annotation

Codecov / codecov/patch

mutex.go#L185

Added line #L185 was not covered by tests
}
}(c))
}
m.done <- struct{}{}

Check warning on line 189 in mutex.go

View check run for this annotation

Codecov / codecov/patch

mutex.go#L189

Added line #L189 was not covered by tests

_, errs, err := a.WaitN(ctx, m.consensus)
if err != nil {
Logger.Warn("dlm: renew lock", slog.Any("err", errs))
return m.Error(errs, err)

Check warning on line 194 in mutex.go

View check run for this annotation

Codecov / codecov/patch

mutex.go#L191-L194

Added lines #L191 - L194 were not covered by tests
}

return nil

Check warning on line 197 in mutex.go

View check run for this annotation

Codecov / codecov/patch

mutex.go#L197

Added line #L197 was not covered by tests
}
func (m *Mutex) createRequest() LockRequest {
return LockRequest{
ID: m.id,
Topic: m.topic,
Key: m.key,
TTL: m.ttl,
}
}

func (m *Mutex) waitExpires(ctx context.Context, cancel context.CancelFunc) {
defer cancel()
var expiresOn time.Time
Expand Down Expand Up @@ -248,3 +254,55 @@
}
}
}

// Error try unwrap consensus known error
func (m *Mutex) Error(errs []error, err error) error {
consensus := make(map[rpc.ServerError]int)

for _, err := range errs {
s, ok := err.(rpc.ServerError)
if ok {
c, ok := consensus[s]
if !ok {
consensus[s] = 1
} else {
consensus[s] = c + 1
}
}
}

max := 0
var msg string

for k, v := range consensus {
if v > max {
max = v
msg = string(k)
}
}

if max < m.consensus {
return err

Check warning on line 285 in mutex.go

View check run for this annotation

Codecov / codecov/patch

mutex.go#L285

Added line #L285 was not covered by tests
}

if !strings.HasPrefix(msg, "dlm:") {
return err

Check warning on line 289 in mutex.go

View check run for this annotation

Codecov / codecov/patch

mutex.go#L289

Added line #L289 was not covered by tests
}

switch msg {
case ErrExpiredLease.Error():
return ErrExpiredLease
case ErrNoLease.Error():
return ErrNoLease
case ErrNotYourLease.Error():
return ErrNotYourLease

Check warning on line 298 in mutex.go

View check run for this annotation

Codecov / codecov/patch

mutex.go#L295-L298

Added lines #L295 - L298 were not covered by tests
case ErrLeaseExists.Error():
return ErrLeaseExists
case ErrFrozenTopic.Error():
return ErrFrozenTopic
case ErrBadDatabase.Error():
return ErrBadDatabase

Check warning on line 304 in mutex.go

View check run for this annotation

Codecov / codecov/patch

mutex.go#L301-L304

Added lines #L301 - L304 were not covered by tests
}

return err

Check warning on line 307 in mutex.go

View check run for this annotation

Codecov / codecov/patch

mutex.go#L307

Added line #L307 was not covered by tests
}
Loading
Loading