Skip to content

Commit

Permalink
Fix Fake Client Watch & Remove Run() (#11)
Browse files Browse the repository at this point in the history
* Add Lock Property Tests with Ops Error

* Fix Watch when Session Expired and then Begin

* Fix Fake Watch when Session Expired & Remove Run()
  • Loading branch information
QuangTung97 authored Apr 10, 2024
1 parent b653f01 commit 0b639ad
Show file tree
Hide file tree
Showing 11 changed files with 522 additions and 608 deletions.
106 changes: 50 additions & 56 deletions concurrency/lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,31 +45,29 @@ func (e *Lock) Start(sess *curator.Session, next func(sess *curator.Session)) {
}

func (e *Lock) initFunc(sess *curator.Session) {
sess.Run(func(client curator.Client) {
client.Children(e.parent, func(resp zk.ChildrenResponse, err error) {
if err != nil {
if errors.Is(err, zk.ErrConnectionClosed) {
sess.AddRetry(e.initFunc)
return
}
if errors.Is(err, zk.ErrNoNode) {
log.Panicf("ZNode '%s' does NOT exist", e.parent)
}
panic(err)
}

var prevNode string
status := e.computeLockStatus(resp, &prevNode)
if status == lockStatusNeedCreate {
e.createEphemeral(sess)
sess.GetClient().Children(e.parent, func(resp zk.ChildrenResponse, err error) {
if err != nil {
if errors.Is(err, zk.ErrConnectionClosed) {
sess.AddRetry(e.initFunc)
return
}
if status == lockStatusBlocked {
e.watchPreviousNode(sess, prevNode)
return
if errors.Is(err, zk.ErrNoNode) {
log.Panicf("ZNode '%s' does NOT exist", e.parent)
}
e.onGranted(sess)
})
panic(err)
}

var prevNode string
status := e.computeLockStatus(resp, &prevNode)
if status == lockStatusNeedCreate {
e.createEphemeral(sess)
return
}
if status == lockStatusBlocked {
e.watchPreviousNode(sess, prevNode)
return
}
e.onGranted(sess)
})
}

Expand Down Expand Up @@ -133,43 +131,39 @@ func stringCmp(a, b string) int {
}

func (e *Lock) createEphemeral(sess *curator.Session) {
sess.Run(func(client curator.Client) {
p := e.parent + "/node:" + e.nodeID + "-"
client.Create(p, nil, zk.FlagEphemeral|zk.FlagSequence,
func(resp zk.CreateResponse, err error) {
if err != nil {
if errors.Is(err, zk.ErrConnectionClosed) {
sess.AddRetry(e.initFunc)
return
}
panic(err)
p := e.parent + "/node:" + e.nodeID + "-"
sess.GetClient().Create(p, nil, zk.FlagEphemeral|zk.FlagSequence,
func(resp zk.CreateResponse, err error) {
if err != nil {
if errors.Is(err, zk.ErrConnectionClosed) {
sess.AddRetry(e.initFunc)
return
}
e.initFunc(sess)
},
)
})
panic(err)
}
e.initFunc(sess)
},
)
}

func (e *Lock) watchPreviousNode(sess *curator.Session, prevNode string) {
sess.Run(func(client curator.Client) {
client.GetW(prevNode, func(resp zk.GetResponse, err error) {
if err == nil {
return
}
if errors.Is(err, zk.ErrNoNode) {
e.initFunc(sess)
return
}
if errors.Is(err, zk.ErrConnectionClosed) {
sess.AddRetry(e.initFunc)
return
}
panic(err)
}, func(ev zk.Event) {
if ev.Type == zk.EventNodeDeleted {
e.initFunc(sess)
return
}
})
sess.GetClient().GetW(prevNode, func(resp zk.GetResponse, err error) {
if err == nil {
return
}
if errors.Is(err, zk.ErrNoNode) {
e.initFunc(sess)
return
}
if errors.Is(err, zk.ErrConnectionClosed) {
sess.AddRetry(e.initFunc)
return
}
panic(err)
}, func(ev zk.Event) {
if ev.Type == zk.EventNodeDeleted {
e.initFunc(sess)
return
}
})
}
47 changes: 22 additions & 25 deletions concurrency/lock_helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,26 +19,24 @@ func newSimpleCounter(client curator.FakeClientID) *simpleCounter {
}

func (l *simpleCounter) isLeader(sess *curator.Session) {
sess.Run(func(client curator.Client) {
client.Get("/counter", func(resp zk.GetResponse, err error) {
if err != nil {
if errors.Is(err, zk.ErrNoNode) {
l.increase(sess, 1, 0)
return
}
if errors.Is(err, zk.ErrConnectionClosed) {
sess.AddRetry(l.isLeader)
return
}
panic(err)
sess.GetClient().Get("/counter", func(resp zk.GetResponse, err error) {
if err != nil {
if errors.Is(err, zk.ErrNoNode) {
l.increase(sess, 1, 0)
return
}

num, err := strconv.ParseInt(string(resp.Data), 10, 64)
if err != nil {
panic(err)
if errors.Is(err, zk.ErrConnectionClosed) {
sess.AddRetry(l.isLeader)
return
}
l.increase(sess, int(num)+1, resp.Stat.Version)
})
panic(err)
}

num, err := strconv.ParseInt(string(resp.Data), 10, 64)
if err != nil {
panic(err)
}
l.increase(sess, int(num)+1, resp.Stat.Version)
})
}

Expand Down Expand Up @@ -73,11 +71,10 @@ func (l *simpleCounter) createCounterResp(sess *curator.Session) func(_ zk.Creat
}

func (l *simpleCounter) increase(sess *curator.Session, nextVal int, version int32) {
sess.Run(func(client curator.Client) {
if nextVal > 1 {
client.Set("/counter", numToBytes(nextVal), version, l.setCounterResp(sess))
} else {
client.Create("/counter", numToBytes(nextVal), 0, l.createCounterResp(sess))
}
})
client := sess.GetClient()
if nextVal > 1 {
client.Set("/counter", numToBytes(nextVal), version, l.setCounterResp(sess))
} else {
client.Create("/counter", numToBytes(nextVal), 0, l.createCounterResp(sess))
}
}
42 changes: 36 additions & 6 deletions concurrency/lock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,10 @@ func initStore(parent string) *curator.FakeZookeeper {

c := curator.NewFakeClientFactory(store, initClient)
c.Start(curator.New(func(sess *curator.Session) {
sess.Run(func(client curator.Client) {
client.Create(parent, nil, 0, func(resp zk.CreateResponse, err error) {
if err != nil {
panic(err)
}
})
sess.GetClient().Create(parent, nil, 0, func(resp zk.CreateResponse, err error) {
if err != nil {
panic(err)
}
})
}))

Expand Down Expand Up @@ -398,3 +396,35 @@ func TestLock_With_Tester__Multi_Times(t *testing.T) {
assert.Equal(t, 2000, steps)
}
}

func TestLock_With_Tester__Multi_Times__With_Ops_Error(t *testing.T) {
for k := 0; k < 1000; k++ {
seed := time.Now().UnixNano()
fmt.Println("SEED:", seed)

l1 := NewLock("/workers", "node01")
l2 := NewLock("/workers", "node02")
l3 := NewLock("/workers", "node03")

store := initStore("/workers")

tester := curator.NewFakeZookeeperTester(store,
[]curator.FakeClientID{client1, client2, client3},
seed,
)

startLock(l1, store, client1, newSimpleCounter(client1).isLeader)
startLock(l2, store, client2, newSimpleCounter(client2).isLeader)
startLock(l3, store, client3, newSimpleCounter(client3).isLeader)

tester.Begin()

steps := tester.RunSessionExpiredAndConnectionError(
10,
10,
2000,
curator.WithRunOperationErrorPercentage(10),
)
assert.Equal(t, 2000, steps)
}
}
26 changes: 3 additions & 23 deletions curator/curator.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,29 +77,9 @@ func (c *Curator) End() {
c.sess = nil
}

type nullClient struct {
valid bool
client Client
}

func (s *Session) getClient() nullClient {
if s.state.sess != s {
return nullClient{}
}
return nullClient{
valid: true,
client: s.state.client,
}
}

// Run allows to access to the Client object for accessing zookeeper.
// The callback fn function is only be called when the session is still active.
func (s *Session) Run(fn func(client Client)) {
sessClient := s.getClient()
if !sessClient.valid {
return
}
fn(sessClient.client)
// GetClient returns Client
func (s *Session) GetClient() Client {
return s.state.client
}

// AddRetry add a callback function that will be called after connection is re-established.
Expand Down
31 changes: 4 additions & 27 deletions curator/curator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,8 @@ func TestCurator(t *testing.T) {
steps := make([]string, 0)
c := New(func(sess *Session) {
steps = append(steps, "start")
sess.Run(func(client Client) {
sess.AddRetry(func(sess *Session) {
steps = append(steps, "retry")
})
sess.AddRetry(func(sess *Session) {
steps = append(steps, "retry")
})
})

Expand All @@ -41,10 +39,8 @@ func TestCurator(t *testing.T) {
steps := make([]string, 0)
c := New(func(sess *Session) {
steps = append(steps, "start")
sess.Run(func(client Client) {
sess.AddRetry(func(sess *Session) {
steps = append(steps, "retry")
})
sess.AddRetry(func(sess *Session) {
steps = append(steps, "retry")
})
})

Expand All @@ -56,25 +52,6 @@ func TestCurator(t *testing.T) {

assert.Equal(t, []string{"start", "start", "retry"}, steps)
})

t.Run("callback after end", func(t *testing.T) {
steps := make([]string, 0)
var callback func()
c := New(func(sess *Session) {
steps = append(steps, "start")
callback = func() {
sess.Run(func(client Client) {
steps = append(steps, "run-callback")
})
}
})

c.Begin(nil)
c.End()
callback()

assert.Equal(t, []string{"start"}, steps)
})
}

func TestCurator_Chain(t *testing.T) {
Expand Down
4 changes: 4 additions & 0 deletions curator/fake_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -581,8 +581,12 @@ func (c *fakeClient) Get(path string, callback func(resp zk.GetResponse, err err
}

func (c *fakeClient) buildWatcher(fn func(ev zk.Event)) func(ev zk.Event) {
sessionID := c.store.States[c.clientID].SessionID
return func(ev zk.Event) {
state := c.store.States[c.clientID]
if state.SessionID != sessionID {
return
}
if state.ConnErr {
state.PendingEvents = append(state.PendingEvents, func() {
fn(ev)
Expand Down
Loading

0 comments on commit 0b639ad

Please sign in to comment.