Skip to content

Commit a760efe

Browse files
authored
Using Tester for Lock (#7)
1 parent 15046de commit a760efe

File tree

5 files changed

+166
-5
lines changed

5 files changed

+166
-5
lines changed

concurrency/lock_helper_test.go

+83
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
package concurrency
2+
3+
import (
4+
"errors"
5+
"strconv"
6+
7+
"github.com/QuangTung97/zk"
8+
"github.com/QuangTung97/zk/curator"
9+
)
10+
11+
type simpleCounter struct {
12+
client curator.FakeClientID
13+
}
14+
15+
func newSimpleCounter(client curator.FakeClientID) *simpleCounter {
16+
return &simpleCounter{
17+
client: client,
18+
}
19+
}
20+
21+
func (l *simpleCounter) isLeader(sess *curator.Session) {
22+
sess.Run(func(client curator.Client) {
23+
client.Get("/counter", func(resp zk.GetResponse, err error) {
24+
if err != nil {
25+
if errors.Is(err, zk.ErrNoNode) {
26+
l.increase(sess, 1, 0)
27+
return
28+
}
29+
if errors.Is(err, zk.ErrConnectionClosed) {
30+
sess.AddRetry(l.isLeader)
31+
return
32+
}
33+
panic(err)
34+
}
35+
36+
num, err := strconv.ParseInt(string(resp.Data), 10, 64)
37+
if err != nil {
38+
panic(err)
39+
}
40+
l.increase(sess, int(num)+1, resp.Stat.Version)
41+
})
42+
})
43+
}
44+
45+
func numToBytes(val int) []byte {
46+
return []byte(strconv.FormatInt(int64(val), 10))
47+
}
48+
49+
func (l *simpleCounter) setCounterResp(sess *curator.Session) func(_ zk.SetResponse, err error) {
50+
return func(_ zk.SetResponse, err error) {
51+
if err != nil {
52+
if errors.Is(err, zk.ErrConnectionClosed) {
53+
sess.AddRetry(l.isLeader)
54+
return
55+
}
56+
panic(err)
57+
}
58+
l.isLeader(sess)
59+
}
60+
}
61+
62+
func (l *simpleCounter) createCounterResp(sess *curator.Session) func(_ zk.CreateResponse, err error) {
63+
return func(_ zk.CreateResponse, err error) {
64+
if err != nil {
65+
if errors.Is(err, zk.ErrConnectionClosed) {
66+
sess.AddRetry(l.isLeader)
67+
return
68+
}
69+
panic(err)
70+
}
71+
l.isLeader(sess)
72+
}
73+
}
74+
75+
func (l *simpleCounter) increase(sess *curator.Session, nextVal int, version int32) {
76+
sess.Run(func(client curator.Client) {
77+
if nextVal > 1 {
78+
client.Set("/counter", numToBytes(nextVal), version, l.setCounterResp(sess))
79+
} else {
80+
client.Create("/counter", numToBytes(nextVal), 0, l.createCounterResp(sess))
81+
}
82+
})
83+
}

concurrency/lock_test.go

+62
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
package concurrency
22

33
import (
4+
"fmt"
45
"slices"
56
"testing"
7+
"time"
68

79
"github.com/stretchr/testify/assert"
810

@@ -12,6 +14,7 @@ import (
1214

1315
const client1 curator.FakeClientID = "client1"
1416
const client2 curator.FakeClientID = "client2"
17+
const client3 curator.FakeClientID = "client3"
1518
const initClient curator.FakeClientID = "init"
1619

1720
func initStore(parent string) *curator.FakeZookeeper {
@@ -336,3 +339,62 @@ func TestSortString(t *testing.T) {
336339
"A", "B", "D", "EE", "IA", "IY", "M", "Z",
337340
}, s)
338341
}
342+
343+
func TestLock_With_Tester(t *testing.T) {
344+
l1 := NewLock("/workers", "node01")
345+
l2 := NewLock("/workers", "node02")
346+
l3 := NewLock("/workers", "node03")
347+
348+
store := initStore("/workers")
349+
350+
tester := curator.NewFakeZookeeperTester(store,
351+
[]curator.FakeClientID{client1, client2, client3},
352+
123,
353+
)
354+
355+
startLock(l1, store, client1, newSimpleCounter(client1).isLeader)
356+
startLock(l2, store, client2, newSimpleCounter(client2).isLeader)
357+
startLock(l3, store, client3, newSimpleCounter(client3).isLeader)
358+
359+
tester.Begin()
360+
361+
steps := tester.RunSessionExpiredAndConnectionError(
362+
10,
363+
10,
364+
2000,
365+
)
366+
assert.Equal(t, 2000, steps)
367+
368+
store.PrintData()
369+
}
370+
371+
func TestLock_With_Tester__Multi_Times(t *testing.T) {
372+
for k := 0; k < 1000; k++ {
373+
seed := time.Now().UnixNano()
374+
fmt.Println("SEED:", seed)
375+
376+
l1 := NewLock("/workers", "node01")
377+
l2 := NewLock("/workers", "node02")
378+
l3 := NewLock("/workers", "node03")
379+
380+
store := initStore("/workers")
381+
382+
tester := curator.NewFakeZookeeperTester(store,
383+
[]curator.FakeClientID{client1, client2, client3},
384+
seed,
385+
)
386+
387+
startLock(l1, store, client1, newSimpleCounter(client1).isLeader)
388+
startLock(l2, store, client2, newSimpleCounter(client2).isLeader)
389+
startLock(l3, store, client3, newSimpleCounter(client3).isLeader)
390+
391+
tester.Begin()
392+
393+
steps := tester.RunSessionExpiredAndConnectionError(
394+
20,
395+
20,
396+
2000,
397+
)
398+
assert.Equal(t, 2000, steps)
399+
}
400+
}

curator/fake_client.go

+6-1
Original file line numberDiff line numberDiff line change
@@ -384,7 +384,11 @@ func (s *FakeZookeeper) notifyChildrenWatches(parent *ZNode, path string) {
384384

385385
// ConnError ...
386386
func (s *FakeZookeeper) ConnError(clientID FakeClientID) {
387-
s.States[clientID].ConnErr = true
387+
state := s.States[clientID]
388+
if state.ConnErr {
389+
panic("Can NOT call ConnError multiple times in a row")
390+
}
391+
state.ConnErr = true
388392
s.runAllCallbacksWithConnectionError(clientID, true)
389393
}
390394

@@ -513,6 +517,7 @@ func (s *FakeZookeeper) Retry(clientID FakeClientID) {
513517
getActionWithType[RetryInput](s, clientID, "Retry")
514518

515519
state := s.States[clientID]
520+
state.ConnErr = false
516521
for _, fn := range state.PendingEvents {
517522
fn()
518523
}

curator/fake_property.go

+12-2
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ RetrySelect:
4141
for _, c := range f.clients {
4242
if !f.store.States[c].HasSession {
4343
sessionExpiredClients = append(sessionExpiredClients, c)
44+
continue
4445
}
4546

4647
if len(f.store.Pending[c]) == 0 {
@@ -74,6 +75,9 @@ func (f *FakeZookeeperTester) doSessionExpired(client FakeClientID) {
7475
}
7576

7677
func (f *FakeZookeeperTester) doConnectionError(client FakeClientID) {
78+
if f.store.States[client].ConnErr {
79+
return
80+
}
7781
cmds := f.store.Pending[client]
7882
if len(cmds) == 0 {
7983
f.store.ConnError(client)
@@ -91,7 +95,7 @@ func (f *FakeZookeeperTester) RunSessionExpiredAndConnectionError(
9195
sessionExpiredPercentage float64,
9296
connectionErrorPercentage float64,
9397
numSteps int,
94-
) {
98+
) int {
9599
sessionExpiredEnd := int(sessionExpiredPercentage / 100.0 * randMax)
96100
connectionErrorEnd := int(connectionErrorPercentage / 100.0 * randMax)
97101

@@ -114,14 +118,18 @@ func (f *FakeZookeeperTester) RunSessionExpiredAndConnectionError(
114118

115119
client, ok := f.getActionableRandomClient()
116120
if !ok {
117-
return
121+
return i + 1
118122
}
119123
genericCmd := f.store.Pending[client][0]
120124
switch genericCmd.(type) {
121125
case CreateInput:
122126
f.store.CreateApply(client)
123127
case GetInput:
124128
f.store.GetApply(client)
129+
case ChildrenInput:
130+
f.store.ChildrenApply(client)
131+
case DeleteInput:
132+
f.store.DeleteApply(client)
125133
case SetInput:
126134
f.store.SetApply(client)
127135
case RetryInput:
@@ -130,4 +138,6 @@ func (f *FakeZookeeperTester) RunSessionExpiredAndConnectionError(
130138
panic(fmt.Sprintf("unknown command: %s%+v", reflect.TypeOf(genericCmd).String(), genericCmd))
131139
}
132140
}
141+
142+
return numSteps
133143
}

curator/fake_property_test.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -143,11 +143,12 @@ func TestFakeZookeeperTester_Master_Lock(t *testing.T) {
143143

144144
tester.Begin()
145145

146-
tester.RunSessionExpiredAndConnectionError(
146+
steps := tester.RunSessionExpiredAndConnectionError(
147147
10,
148148
10,
149149
1000,
150150
)
151+
assert.Equal(t, 1000, steps)
151152

152153
store.PrintData()
153154
store.PrintPendingCalls()
@@ -159,7 +160,7 @@ func TestFakeZookeeperTester_Master_Lock(t *testing.T) {
159160
break
160161
}
161162
}
162-
assert.Equal(t, "283", string(node.Data))
163+
assert.Equal(t, "274", string(node.Data))
163164
}
164165

165166
func TestFakeZookeeperTester_Master_Lock__Multi_Times(t *testing.T) {

0 commit comments

Comments
 (0)