diff --git a/concurrency/lock.go b/concurrency/lock.go index 168f1ac..c5985bb 100644 --- a/concurrency/lock.go +++ b/concurrency/lock.go @@ -45,31 +45,29 @@ func (e *Lock) Start(sess *curator.Session, next func(sess *curator.Session)) { } func (e *Lock) initFunc(sess *curator.Session) { - sess.Run(func(client curator.Client) { - client.Children(e.parent, func(resp zk.ChildrenResponse, err error) { - if err != nil { - if errors.Is(err, zk.ErrConnectionClosed) { - sess.AddRetry(e.initFunc) - return - } - if errors.Is(err, zk.ErrNoNode) { - log.Panicf("ZNode '%s' does NOT exist", e.parent) - } - panic(err) - } - - var prevNode string - status := e.computeLockStatus(resp, &prevNode) - if status == lockStatusNeedCreate { - e.createEphemeral(sess) + sess.GetClient().Children(e.parent, func(resp zk.ChildrenResponse, err error) { + if err != nil { + if errors.Is(err, zk.ErrConnectionClosed) { + sess.AddRetry(e.initFunc) return } - if status == lockStatusBlocked { - e.watchPreviousNode(sess, prevNode) - return + if errors.Is(err, zk.ErrNoNode) { + log.Panicf("ZNode '%s' does NOT exist", e.parent) } - e.onGranted(sess) - }) + panic(err) + } + + var prevNode string + status := e.computeLockStatus(resp, &prevNode) + if status == lockStatusNeedCreate { + e.createEphemeral(sess) + return + } + if status == lockStatusBlocked { + e.watchPreviousNode(sess, prevNode) + return + } + e.onGranted(sess) }) } @@ -133,43 +131,39 @@ func stringCmp(a, b string) int { } func (e *Lock) createEphemeral(sess *curator.Session) { - sess.Run(func(client curator.Client) { - p := e.parent + "/node:" + e.nodeID + "-" - client.Create(p, nil, zk.FlagEphemeral|zk.FlagSequence, - func(resp zk.CreateResponse, err error) { - if err != nil { - if errors.Is(err, zk.ErrConnectionClosed) { - sess.AddRetry(e.initFunc) - return - } - panic(err) + p := e.parent + "/node:" + e.nodeID + "-" + sess.GetClient().Create(p, nil, zk.FlagEphemeral|zk.FlagSequence, + func(resp zk.CreateResponse, err error) { + if err != nil { + if errors.Is(err, zk.ErrConnectionClosed) { + sess.AddRetry(e.initFunc) + return } - e.initFunc(sess) - }, - ) - }) + panic(err) + } + e.initFunc(sess) + }, + ) } func (e *Lock) watchPreviousNode(sess *curator.Session, prevNode string) { - sess.Run(func(client curator.Client) { - client.GetW(prevNode, func(resp zk.GetResponse, err error) { - if err == nil { - return - } - if errors.Is(err, zk.ErrNoNode) { - e.initFunc(sess) - return - } - if errors.Is(err, zk.ErrConnectionClosed) { - sess.AddRetry(e.initFunc) - return - } - panic(err) - }, func(ev zk.Event) { - if ev.Type == zk.EventNodeDeleted { - e.initFunc(sess) - return - } - }) + sess.GetClient().GetW(prevNode, func(resp zk.GetResponse, err error) { + if err == nil { + return + } + if errors.Is(err, zk.ErrNoNode) { + e.initFunc(sess) + return + } + if errors.Is(err, zk.ErrConnectionClosed) { + sess.AddRetry(e.initFunc) + return + } + panic(err) + }, func(ev zk.Event) { + if ev.Type == zk.EventNodeDeleted { + e.initFunc(sess) + return + } }) } diff --git a/concurrency/lock_helper_test.go b/concurrency/lock_helper_test.go index 7c97629..23dd459 100644 --- a/concurrency/lock_helper_test.go +++ b/concurrency/lock_helper_test.go @@ -19,26 +19,24 @@ func newSimpleCounter(client curator.FakeClientID) *simpleCounter { } func (l *simpleCounter) isLeader(sess *curator.Session) { - sess.Run(func(client curator.Client) { - client.Get("/counter", func(resp zk.GetResponse, err error) { - if err != nil { - if errors.Is(err, zk.ErrNoNode) { - l.increase(sess, 1, 0) - return - } - if errors.Is(err, zk.ErrConnectionClosed) { - sess.AddRetry(l.isLeader) - return - } - panic(err) + sess.GetClient().Get("/counter", func(resp zk.GetResponse, err error) { + if err != nil { + if errors.Is(err, zk.ErrNoNode) { + l.increase(sess, 1, 0) + return } - - num, err := strconv.ParseInt(string(resp.Data), 10, 64) - if err != nil { - panic(err) + if errors.Is(err, zk.ErrConnectionClosed) { + sess.AddRetry(l.isLeader) + return } - l.increase(sess, int(num)+1, resp.Stat.Version) - }) + panic(err) + } + + num, err := strconv.ParseInt(string(resp.Data), 10, 64) + if err != nil { + panic(err) + } + l.increase(sess, int(num)+1, resp.Stat.Version) }) } @@ -73,11 +71,10 @@ func (l *simpleCounter) createCounterResp(sess *curator.Session) func(_ zk.Creat } func (l *simpleCounter) increase(sess *curator.Session, nextVal int, version int32) { - sess.Run(func(client curator.Client) { - if nextVal > 1 { - client.Set("/counter", numToBytes(nextVal), version, l.setCounterResp(sess)) - } else { - client.Create("/counter", numToBytes(nextVal), 0, l.createCounterResp(sess)) - } - }) + client := sess.GetClient() + if nextVal > 1 { + client.Set("/counter", numToBytes(nextVal), version, l.setCounterResp(sess)) + } else { + client.Create("/counter", numToBytes(nextVal), 0, l.createCounterResp(sess)) + } } diff --git a/concurrency/lock_test.go b/concurrency/lock_test.go index 55702f7..b0c73ce 100644 --- a/concurrency/lock_test.go +++ b/concurrency/lock_test.go @@ -22,12 +22,10 @@ func initStore(parent string) *curator.FakeZookeeper { c := curator.NewFakeClientFactory(store, initClient) c.Start(curator.New(func(sess *curator.Session) { - sess.Run(func(client curator.Client) { - client.Create(parent, nil, 0, func(resp zk.CreateResponse, err error) { - if err != nil { - panic(err) - } - }) + sess.GetClient().Create(parent, nil, 0, func(resp zk.CreateResponse, err error) { + if err != nil { + panic(err) + } }) })) @@ -398,3 +396,35 @@ func TestLock_With_Tester__Multi_Times(t *testing.T) { assert.Equal(t, 2000, steps) } } + +func TestLock_With_Tester__Multi_Times__With_Ops_Error(t *testing.T) { + for k := 0; k < 1000; k++ { + seed := time.Now().UnixNano() + fmt.Println("SEED:", seed) + + l1 := NewLock("/workers", "node01") + l2 := NewLock("/workers", "node02") + l3 := NewLock("/workers", "node03") + + store := initStore("/workers") + + tester := curator.NewFakeZookeeperTester(store, + []curator.FakeClientID{client1, client2, client3}, + seed, + ) + + startLock(l1, store, client1, newSimpleCounter(client1).isLeader) + startLock(l2, store, client2, newSimpleCounter(client2).isLeader) + startLock(l3, store, client3, newSimpleCounter(client3).isLeader) + + tester.Begin() + + steps := tester.RunSessionExpiredAndConnectionError( + 10, + 10, + 2000, + curator.WithRunOperationErrorPercentage(10), + ) + assert.Equal(t, 2000, steps) + } +} diff --git a/curator/curator.go b/curator/curator.go index 5d3a29e..2a2527d 100644 --- a/curator/curator.go +++ b/curator/curator.go @@ -77,29 +77,9 @@ func (c *Curator) End() { c.sess = nil } -type nullClient struct { - valid bool - client Client -} - -func (s *Session) getClient() nullClient { - if s.state.sess != s { - return nullClient{} - } - return nullClient{ - valid: true, - client: s.state.client, - } -} - -// Run allows to access to the Client object for accessing zookeeper. -// The callback fn function is only be called when the session is still active. -func (s *Session) Run(fn func(client Client)) { - sessClient := s.getClient() - if !sessClient.valid { - return - } - fn(sessClient.client) +// GetClient returns Client +func (s *Session) GetClient() Client { + return s.state.client } // AddRetry add a callback function that will be called after connection is re-established. diff --git a/curator/curator_test.go b/curator/curator_test.go index 3434476..fe8a0b9 100644 --- a/curator/curator_test.go +++ b/curator/curator_test.go @@ -24,10 +24,8 @@ func TestCurator(t *testing.T) { steps := make([]string, 0) c := New(func(sess *Session) { steps = append(steps, "start") - sess.Run(func(client Client) { - sess.AddRetry(func(sess *Session) { - steps = append(steps, "retry") - }) + sess.AddRetry(func(sess *Session) { + steps = append(steps, "retry") }) }) @@ -41,10 +39,8 @@ func TestCurator(t *testing.T) { steps := make([]string, 0) c := New(func(sess *Session) { steps = append(steps, "start") - sess.Run(func(client Client) { - sess.AddRetry(func(sess *Session) { - steps = append(steps, "retry") - }) + sess.AddRetry(func(sess *Session) { + steps = append(steps, "retry") }) }) @@ -56,25 +52,6 @@ func TestCurator(t *testing.T) { assert.Equal(t, []string{"start", "start", "retry"}, steps) }) - - t.Run("callback after end", func(t *testing.T) { - steps := make([]string, 0) - var callback func() - c := New(func(sess *Session) { - steps = append(steps, "start") - callback = func() { - sess.Run(func(client Client) { - steps = append(steps, "run-callback") - }) - } - }) - - c.Begin(nil) - c.End() - callback() - - assert.Equal(t, []string{"start"}, steps) - }) } func TestCurator_Chain(t *testing.T) { diff --git a/curator/fake_client.go b/curator/fake_client.go index c857896..7b67748 100644 --- a/curator/fake_client.go +++ b/curator/fake_client.go @@ -581,8 +581,12 @@ func (c *fakeClient) Get(path string, callback func(resp zk.GetResponse, err err } func (c *fakeClient) buildWatcher(fn func(ev zk.Event)) func(ev zk.Event) { + sessionID := c.store.States[c.clientID].SessionID return func(ev zk.Event) { state := c.store.States[c.clientID] + if state.SessionID != sessionID { + return + } if state.ConnErr { state.PendingEvents = append(state.PendingEvents, func() { fn(ev) diff --git a/curator/fake_client_test.go b/curator/fake_client_test.go index 588712d..443cbfc 100644 --- a/curator/fake_client_test.go +++ b/curator/fake_client_test.go @@ -44,15 +44,13 @@ func TestFakeClient_CreateUntilSuccess(t *testing.T) { var createErr error initFn := func(sess *Session) { - sess.Run(func(client Client) { - client.Create( - "/workers", []byte("data01"), zk.FlagEphemeral, - func(resp zk.CreateResponse, err error) { - createResp = resp - createErr = err - }, - ) - }) + sess.GetClient().Create( + "/workers", []byte("data01"), zk.FlagEphemeral, + func(resp zk.CreateResponse, err error) { + createResp = resp + createErr = err + }, + ) } c.startCuratorClient1(initFn) @@ -92,17 +90,15 @@ func TestFakeClient_CreateUntilSuccess(t *testing.T) { var initFn func(sess *Session) initFn = func(sess *Session) { - sess.Run(func(client Client) { - client.Create( - "/workers", []byte("data01"), zk.FlagEphemeral, - func(resp zk.CreateResponse, err error) { - c.addStep("create-resp") - if err != nil { - sess.AddRetry(initFn) - } - }, - ) - }) + sess.GetClient().Create( + "/workers", []byte("data01"), zk.FlagEphemeral, + func(resp zk.CreateResponse, err error) { + c.addStep("create-resp") + if err != nil { + sess.AddRetry(initFn) + } + }, + ) } c.startCuratorClient1(initFn) @@ -146,18 +142,17 @@ func TestFakeClient_CreateThenListChildren(t *testing.T) { var childrenResp zk.ChildrenResponse initFn := func(sess *Session) { - sess.Run(func(client Client) { - client.Create( - "/workers01", []byte("data01"), zk.FlagEphemeral, - func(resp zk.CreateResponse, err error) {}, - ) - client.Create( - "/workers02", []byte("data01"), zk.FlagEphemeral, - func(resp zk.CreateResponse, err error) {}, - ) - client.Children("/", func(resp zk.ChildrenResponse, err error) { - childrenResp = resp - }) + client := sess.GetClient() + client.Create( + "/workers01", []byte("data01"), zk.FlagEphemeral, + func(resp zk.CreateResponse, err error) {}, + ) + client.Create( + "/workers02", []byte("data01"), zk.FlagEphemeral, + func(resp zk.CreateResponse, err error) {}, + ) + client.Children("/", func(resp zk.ChildrenResponse, err error) { + childrenResp = resp }) } c.startCuratorClient1(initFn) @@ -188,10 +183,8 @@ func TestFakeClient_ListChildren_Not_Found_Parent(t *testing.T) { var childrenErr error initFn := func(sess *Session) { - sess.Run(func(client Client) { - client.Children("/workers", func(resp zk.ChildrenResponse, err error) { - childrenErr = err - }) + sess.GetClient().Children("/workers", func(resp zk.ChildrenResponse, err error) { + childrenErr = err }) } c.startCuratorClient1(initFn) @@ -210,13 +203,11 @@ func TestFakeClient_Create_Parent_Not_Found(t *testing.T) { var createErr error initFn := func(sess *Session) { - sess.Run(func(client Client) { - client.Create("/workers/node01", nil, zk.FlagEphemeral, - func(resp zk.CreateResponse, err error) { - createErr = err - }, - ) - }) + sess.GetClient().Create("/workers/node01", nil, zk.FlagEphemeral, + func(resp zk.CreateResponse, err error) { + createErr = err + }, + ) } c.startCuratorClient1(initFn) @@ -234,18 +225,17 @@ func TestFakeClient_Create_Duplicated(t *testing.T) { var errors []error initFn := func(sess *Session) { - sess.Run(func(client Client) { - client.Create("/workers", nil, zk.FlagEphemeral, - func(resp zk.CreateResponse, err error) { - errors = append(errors, err) - }, - ) - client.Create("/workers", nil, zk.FlagEphemeral, - func(resp zk.CreateResponse, err error) { - errors = append(errors, err) - }, - ) - }) + client := sess.GetClient() + client.Create("/workers", nil, zk.FlagEphemeral, + func(resp zk.CreateResponse, err error) { + errors = append(errors, err) + }, + ) + client.Create("/workers", nil, zk.FlagEphemeral, + func(resp zk.CreateResponse, err error) { + errors = append(errors, err) + }, + ) } c.startCuratorClient1(initFn) @@ -269,19 +259,18 @@ func TestFakeClient_ListChildren_With_Watch(t *testing.T) { var errors []error var watchEvent zk.Event initFn := func(sess *Session) { - sess.Run(func(client Client) { - client.ChildrenW("/", func(resp zk.ChildrenResponse, err error) { - errors = append(errors, err) - }, func(ev zk.Event) { - watchEvent = ev - }) - - client.Create("/workers", nil, zk.FlagEphemeral, - func(resp zk.CreateResponse, err error) { - errors = append(errors, err) - }, - ) + client := sess.GetClient() + client.ChildrenW("/", func(resp zk.ChildrenResponse, err error) { + errors = append(errors, err) + }, func(ev zk.Event) { + watchEvent = ev }) + + client.Create("/workers", nil, zk.FlagEphemeral, + func(resp zk.CreateResponse, err error) { + errors = append(errors, err) + }, + ) } c.startCuratorClient1(initFn) @@ -309,13 +298,12 @@ func TestFakeClient_GetW_Not_Found(t *testing.T) { var errors []error initFn := func(sess *Session) { - sess.Run(func(client Client) { - client.GetW("/workers", func(resp zk.GetResponse, err error) { - c.addStep("getw-resp") - errors = append(errors, err) - }, func(ev zk.Event) { - c.addStep("getw-watch") - }) + client := sess.GetClient() + client.GetW("/workers", func(resp zk.GetResponse, err error) { + c.addStep("getw-resp") + errors = append(errors, err) + }, func(ev zk.Event) { + c.addStep("getw-watch") }) } c.startCuratorClient1(initFn) @@ -338,21 +326,20 @@ func TestFakeClient_GetW_Found(t *testing.T) { var errors []error var getResp zk.GetResponse initFn := func(sess *Session) { - sess.Run(func(client Client) { - client.Create("/workers", []byte("data01"), zk.FlagEphemeral, - func(resp zk.CreateResponse, err error) { - c.addStep("create-resp") - errors = append(errors, err) - }, - ) - - client.GetW("/workers", func(resp zk.GetResponse, err error) { - c.addStep("getw-resp") + client := sess.GetClient() + client.Create("/workers", []byte("data01"), zk.FlagEphemeral, + func(resp zk.CreateResponse, err error) { + c.addStep("create-resp") errors = append(errors, err) - getResp = resp - }, func(ev zk.Event) { - c.addStep("getw-watch") - }) + }, + ) + + client.GetW("/workers", func(resp zk.GetResponse, err error) { + c.addStep("getw-resp") + errors = append(errors, err) + getResp = resp + }, func(ev zk.Event) { + c.addStep("getw-watch") }) } c.startCuratorClient1(initFn) @@ -382,10 +369,8 @@ func TestFakeClient_Set_Not_Found(t *testing.T) { var errors []error initFn := func(sess *Session) { - sess.Run(func(client Client) { - client.Set("/workers", []byte("data01"), 0, func(resp zk.SetResponse, err error) { - errors = append(errors, err) - }) + sess.GetClient().Set("/workers", []byte("data01"), 0, func(resp zk.SetResponse, err error) { + errors = append(errors, err) }) } c.startCuratorClient1(initFn) @@ -407,21 +392,20 @@ func TestFakeClient_Create_Then_Set(t *testing.T) { var errors []error var respList []zk.SetResponse initFn := func(sess *Session) { - sess.Run(func(client Client) { - client.Create("/workers", []byte("data01"), zk.FlagEphemeral, - func(resp zk.CreateResponse, err error) { - errors = append(errors, err) - }, - ) - - client.Set("/workers", []byte("data02"), 0, func(resp zk.SetResponse, err error) { - errors = append(errors, err) - respList = append(respList, resp) - }) - client.Set("/workers", []byte("data03"), 0, func(resp zk.SetResponse, err error) { + client := sess.GetClient() + client.Create("/workers", []byte("data01"), zk.FlagEphemeral, + func(resp zk.CreateResponse, err error) { errors = append(errors, err) - respList = append(respList, resp) - }) + }, + ) + + client.Set("/workers", []byte("data02"), 0, func(resp zk.SetResponse, err error) { + errors = append(errors, err) + respList = append(respList, resp) + }) + client.Set("/workers", []byte("data03"), 0, func(resp zk.SetResponse, err error) { + errors = append(errors, err) + respList = append(respList, resp) }) } c.startCuratorClient1(initFn) @@ -459,24 +443,23 @@ func TestFakeClient_Create_Then_Getw_Then_Set(t *testing.T) { var getResp zk.GetResponse var watchEvent zk.Event initFn := func(sess *Session) { - sess.Run(func(client Client) { - client.Create("/workers", []byte("data01"), zk.FlagEphemeral, - func(resp zk.CreateResponse, err error) { - }, - ) + client := sess.GetClient() + client.Create("/workers", []byte("data01"), zk.FlagEphemeral, + func(resp zk.CreateResponse, err error) { + }, + ) - client.GetW("/workers", func(resp zk.GetResponse, err error) { - getResp = resp - errors = append(errors, err) - c.addStep("get-resp") - }, func(ev zk.Event) { - watchEvent = ev - c.addStep("get-watch") - }) + client.GetW("/workers", func(resp zk.GetResponse, err error) { + getResp = resp + errors = append(errors, err) + c.addStep("get-resp") + }, func(ev zk.Event) { + watchEvent = ev + c.addStep("get-watch") + }) - client.Set("/workers", []byte("data02"), 0, func(resp zk.SetResponse, err error) { - c.addStep("set-resp") - }) + client.Set("/workers", []byte("data02"), 0, func(resp zk.SetResponse, err error) { + c.addStep("set-resp") }) } c.startCuratorClient1(initFn) @@ -518,18 +501,17 @@ func TestFakeClient_Create_Then_Session_Expired(t *testing.T) { var errors []error initFn := func(sess *Session) { - sess.Run(func(client Client) { - client.Create("/workers01", []byte("data01"), zk.FlagEphemeral, - func(resp zk.CreateResponse, err error) { - errors = append(errors, err) - }, - ) - client.Create("/workers02", []byte("data01"), zk.FlagEphemeral, - func(resp zk.CreateResponse, err error) { - errors = append(errors, err) - }, - ) - }) + client := sess.GetClient() + client.Create("/workers01", []byte("data01"), zk.FlagEphemeral, + func(resp zk.CreateResponse, err error) { + errors = append(errors, err) + }, + ) + client.Create("/workers02", []byte("data01"), zk.FlagEphemeral, + func(resp zk.CreateResponse, err error) { + errors = append(errors, err) + }, + ) } c.startCuratorClient1(initFn) @@ -564,13 +546,11 @@ func TestFakeClient_Create_Then_Session_Expired__Then_New_Session_Established(t var errors []error initFn := func(sess *Session) { - sess.Run(func(client Client) { - client.Create("/workers01", []byte("data01"), zk.FlagEphemeral, - func(resp zk.CreateResponse, err error) { - errors = append(errors, err) - }, - ) - }) + sess.GetClient().Create("/workers01", []byte("data01"), zk.FlagEphemeral, + func(resp zk.CreateResponse, err error) { + errors = append(errors, err) + }, + ) } c.startCuratorClient1(initFn) @@ -597,21 +577,20 @@ func TestFakeClient_Create_With_Sequence(t *testing.T) { var errors []error var childrenResp zk.ChildrenResponse initFn := func(sess *Session) { - sess.Run(func(client Client) { - client.Create("/node01-", []byte("data01"), zk.FlagEphemeral|zk.FlagSequence, - func(resp zk.CreateResponse, err error) { - errors = append(errors, err) - }, - ) - client.Create("/node01-", []byte("data02"), zk.FlagEphemeral|zk.FlagSequence, - func(resp zk.CreateResponse, err error) { - errors = append(errors, err) - }, - ) - client.Children("/", func(resp zk.ChildrenResponse, err error) { - childrenResp = resp + client := sess.GetClient() + client.Create("/node01-", []byte("data01"), zk.FlagEphemeral|zk.FlagSequence, + func(resp zk.CreateResponse, err error) { errors = append(errors, err) - }) + }, + ) + client.Create("/node01-", []byte("data02"), zk.FlagEphemeral|zk.FlagSequence, + func(resp zk.CreateResponse, err error) { + errors = append(errors, err) + }, + ) + client.Children("/", func(resp zk.ChildrenResponse, err error) { + childrenResp = resp + errors = append(errors, err) }) } c.startCuratorClient1(initFn) @@ -642,13 +621,11 @@ func TestFakeClient_Create_With_Ephemeral__Then_Session_Expired(t *testing.T) { var errors []error initFn := func(sess *Session) { - sess.Run(func(client Client) { - client.Create("/node01", []byte("data01"), zk.FlagEphemeral, - func(resp zk.CreateResponse, err error) { - errors = append(errors, err) - }, - ) - }) + sess.GetClient().Create("/node01", []byte("data01"), zk.FlagEphemeral, + func(resp zk.CreateResponse, err error) { + errors = append(errors, err) + }, + ) } c.startCuratorClient1(initFn) @@ -663,10 +640,8 @@ func TestFakeClient_Create_With_Ephemeral__Then_Session_Expired(t *testing.T) { // Create Client 2 var childrenResp zk.ChildrenResponse NewFakeClientFactory(c.store, client2).Start(New(func(sess *Session) { - sess.Run(func(client Client) { - client.Children("/", func(resp zk.ChildrenResponse, err error) { - childrenResp = resp - }) + sess.GetClient().Children("/", func(resp zk.ChildrenResponse, err error) { + childrenResp = resp }) })) c.store.Begin(client2) @@ -686,26 +661,23 @@ func TestFakeClient_Create_With_Ephemeral_On_Two_Clients(t *testing.T) { var childrenResp zk.ChildrenResponse NewFakeClientFactory(c.store, client1).Start(New(func(sess *Session) { - sess.Run(func(client Client) { - client.Create("/node01", nil, zk.FlagEphemeral, - func(resp zk.CreateResponse, err error) { - errors = append(errors, err) - }, - ) - }) + sess.GetClient().Create("/node01", nil, zk.FlagEphemeral, + func(resp zk.CreateResponse, err error) { + errors = append(errors, err) + }, + ) })) NewFakeClientFactory(c.store, client2).Start(New(func(sess *Session) { - sess.Run(func(client Client) { - client.Create("/node02", nil, zk.FlagEphemeral, - func(resp zk.CreateResponse, err error) { - errors = append(errors, err) - }, - ) - client.Children("/", func(resp zk.ChildrenResponse, err error) { - childrenResp = resp + client := sess.GetClient() + client.Create("/node02", nil, zk.FlagEphemeral, + func(resp zk.CreateResponse, err error) { errors = append(errors, err) - }) + }, + ) + client.Children("/", func(resp zk.ChildrenResponse, err error) { + childrenResp = resp + errors = append(errors, err) }) })) @@ -735,33 +707,30 @@ func TestFakeClient_Session_Expired_Another_Client_Watch_Children_And_Watch_Data var watchEvents []zk.Event NewFakeClientFactory(c.store, client1).Start(New(func(sess *Session) { - sess.Run(func(client Client) { - client.Create("/node01", []byte("data01"), zk.FlagEphemeral, - func(resp zk.CreateResponse, err error) { - errors = append(errors, err) - }, - ) - }) + sess.GetClient().Create("/node01", []byte("data01"), zk.FlagEphemeral, + func(resp zk.CreateResponse, err error) { + errors = append(errors, err) + }, + ) })) NewFakeClientFactory(c.store, client2).Start(New(func(sess *Session) { - sess.Run(func(client Client) { - client.ChildrenW("/", func(resp zk.ChildrenResponse, err error) { - childrenResp = resp - errors = append(errors, err) - c.addStep("children-resp") - }, func(ev zk.Event) { - watchEvents = append(watchEvents, ev) - c.addStep("children-watch") - }) - client.GetW("/node01", func(resp zk.GetResponse, err error) { - errors = append(errors, err) - getResp = resp - c.addStep("get-resp") - }, func(ev zk.Event) { - watchEvents = append(watchEvents, ev) - c.addStep("get-watch") - }) + client := sess.GetClient() + client.ChildrenW("/", func(resp zk.ChildrenResponse, err error) { + childrenResp = resp + errors = append(errors, err) + c.addStep("children-resp") + }, func(ev zk.Event) { + watchEvents = append(watchEvents, ev) + c.addStep("children-watch") + }) + client.GetW("/node01", func(resp zk.GetResponse, err error) { + errors = append(errors, err) + getResp = resp + c.addStep("get-resp") + }, func(ev zk.Event) { + watchEvents = append(watchEvents, ev) + c.addStep("get-watch") }) })) @@ -823,22 +792,21 @@ func TestFakeClient_Get_With_Not_Found_And_Found(t *testing.T) { const pathValue = "/node01" initFn := func(sess *Session) { - sess.Run(func(client Client) { - client.Get(pathValue, func(resp zk.GetResponse, err error) { - errors = append(errors, err) - getRespList = append(getRespList, resp) - }) - - client.Create(pathValue, []byte("data01"), zk.FlagEphemeral, - func(resp zk.CreateResponse, err error) { - errors = append(errors, err) - }, - ) + client := sess.GetClient() + client.Get(pathValue, func(resp zk.GetResponse, err error) { + errors = append(errors, err) + getRespList = append(getRespList, resp) + }) - client.Get(pathValue, func(resp zk.GetResponse, err error) { + client.Create(pathValue, []byte("data01"), zk.FlagEphemeral, + func(resp zk.CreateResponse, err error) { errors = append(errors, err) - getRespList = append(getRespList, resp) - }) + }, + ) + + client.Get(pathValue, func(resp zk.GetResponse, err error) { + errors = append(errors, err) + getRespList = append(getRespList, resp) }) } c.startCuratorClient1(initFn) @@ -870,9 +838,7 @@ func TestFakeClient_Set_Validate_Error(t *testing.T) { c := newFakeClientTest() initFn := func(sess *Session) { - sess.Run(func(client Client) { - client.Set("/sample/", nil, 0, func(resp zk.SetResponse, err error) { - }) + sess.GetClient().Set("/sample/", nil, 0, func(resp zk.SetResponse, err error) { }) } c.startCuratorClient1(initFn) @@ -886,16 +852,15 @@ func TestFakeClient_Print_Data(t *testing.T) { c := newFakeClientTest() initFn := func(sess *Session) { - sess.Run(func(client Client) { - client.Create("/node01", nil, 0, func(resp zk.CreateResponse, err error) {}) - client.Create("/node02", nil, 0, func(resp zk.CreateResponse, err error) {}) - client.Create("/node03", []byte("data01"), 0, func(resp zk.CreateResponse, err error) {}) - client.Create("/node01/child01", []byte("data02"), 0, func(resp zk.CreateResponse, err error) {}) - client.Create("/node01/child02", nil, 0, func(resp zk.CreateResponse, err error) {}) - client.Create("/node03/child03", []byte("data03"), zk.FlagEphemeral, - func(resp zk.CreateResponse, err error) {}, - ) - }) + client := sess.GetClient() + client.Create("/node01", nil, 0, func(resp zk.CreateResponse, err error) {}) + client.Create("/node02", nil, 0, func(resp zk.CreateResponse, err error) {}) + client.Create("/node03", []byte("data01"), 0, func(resp zk.CreateResponse, err error) {}) + client.Create("/node01/child01", []byte("data02"), 0, func(resp zk.CreateResponse, err error) {}) + client.Create("/node01/child02", nil, 0, func(resp zk.CreateResponse, err error) {}) + client.Create("/node03/child03", []byte("data03"), zk.FlagEphemeral, + func(resp zk.CreateResponse, err error) {}, + ) } c.startCuratorClient1(initFn) @@ -932,10 +897,8 @@ func TestFakeClient_Delete_Not_Found(t *testing.T) { var errors []error initFn := func(sess *Session) { - sess.Run(func(client Client) { - client.Delete("/workers", 0, func(resp zk.DeleteResponse, err error) { - errors = append(errors, err) - }) + sess.GetClient().Delete("/workers", 0, func(resp zk.DeleteResponse, err error) { + errors = append(errors, err) }) } c.startCuratorClient1(initFn) @@ -957,17 +920,16 @@ func TestFakeClient_Delete_After_Create(t *testing.T) { var errors []error var deleteResp zk.DeleteResponse initFn := func(sess *Session) { - sess.Run(func(client Client) { - client.Create("/workers", nil, 0, func(resp zk.CreateResponse, err error) { - errors = append(errors, err) - }) - client.Delete("/workers", 0, func(resp zk.DeleteResponse, err error) { - deleteResp = resp - errors = append(errors, err) - }) - client.Get("/workers", func(resp zk.GetResponse, err error) { - errors = append(errors, err) - }) + client := sess.GetClient() + client.Create("/workers", nil, 0, func(resp zk.CreateResponse, err error) { + errors = append(errors, err) + }) + client.Delete("/workers", 0, func(resp zk.DeleteResponse, err error) { + deleteResp = resp + errors = append(errors, err) + }) + client.Get("/workers", func(resp zk.GetResponse, err error) { + errors = append(errors, err) }) } c.startCuratorClient1(initFn) @@ -996,19 +958,18 @@ func TestFakeClient_Delete_Conflict_Version(t *testing.T) { var errors []error initFn := func(sess *Session) { - sess.Run(func(client Client) { - client.Create("/workers", nil, 0, func(resp zk.CreateResponse, err error) { - errors = append(errors, err) - }) - client.Set("/workers", []byte("new-data"), 0, func(resp zk.SetResponse, err error) { - errors = append(errors, err) - }) - client.Delete("/workers", 0, func(resp zk.DeleteResponse, err error) { - errors = append(errors, err) - }) - client.Get("/workers", func(resp zk.GetResponse, err error) { - errors = append(errors, err) - }) + client := sess.GetClient() + client.Create("/workers", nil, 0, func(resp zk.CreateResponse, err error) { + errors = append(errors, err) + }) + client.Set("/workers", []byte("new-data"), 0, func(resp zk.SetResponse, err error) { + errors = append(errors, err) + }) + client.Delete("/workers", 0, func(resp zk.DeleteResponse, err error) { + errors = append(errors, err) + }) + client.Get("/workers", func(resp zk.GetResponse, err error) { + errors = append(errors, err) }) } c.startCuratorClient1(initFn) @@ -1036,20 +997,19 @@ func TestFakeClient_Delete_Data_Deleted_Watch(t *testing.T) { var errors []error var watchEvent zk.Event initFn := func(sess *Session) { - sess.Run(func(client Client) { - client.Create("/workers", nil, 0, func(resp zk.CreateResponse, err error) { - errors = append(errors, err) - }) - client.GetW("/workers", func(resp zk.GetResponse, err error) { - errors = append(errors, err) - }, func(ev zk.Event) { - c.addStep("get-watch") - watchEvent = ev - }) - client.Delete("/workers", 0, func(resp zk.DeleteResponse, err error) { - c.addStep("delete-resp") - errors = append(errors, err) - }) + client := sess.GetClient() + client.Create("/workers", nil, 0, func(resp zk.CreateResponse, err error) { + errors = append(errors, err) + }) + client.GetW("/workers", func(resp zk.GetResponse, err error) { + errors = append(errors, err) + }, func(ev zk.Event) { + c.addStep("get-watch") + watchEvent = ev + }) + client.Delete("/workers", 0, func(resp zk.DeleteResponse, err error) { + c.addStep("delete-resp") + errors = append(errors, err) }) } c.startCuratorClient1(initFn) @@ -1086,20 +1046,19 @@ func TestFakeClient_Delete_Children_Watch(t *testing.T) { var errors []error var watchEvent zk.Event initFn := func(sess *Session) { - sess.Run(func(client Client) { - client.Create("/workers", nil, 0, func(resp zk.CreateResponse, err error) { - errors = append(errors, err) - }) - client.ChildrenW("/", func(resp zk.ChildrenResponse, err error) { - errors = append(errors, err) - }, func(ev zk.Event) { - c.addStep("children-watch") - watchEvent = ev - }) - client.Delete("/workers", 0, func(resp zk.DeleteResponse, err error) { - c.addStep("delete-resp") - errors = append(errors, err) - }) + client := sess.GetClient() + client.Create("/workers", nil, 0, func(resp zk.CreateResponse, err error) { + errors = append(errors, err) + }) + client.ChildrenW("/", func(resp zk.ChildrenResponse, err error) { + errors = append(errors, err) + }, func(ev zk.Event) { + c.addStep("children-watch") + watchEvent = ev + }) + client.Delete("/workers", 0, func(resp zk.DeleteResponse, err error) { + c.addStep("delete-resp") + errors = append(errors, err) }) } c.startCuratorClient1(initFn) @@ -1133,20 +1092,16 @@ func TestFakeClient_Delete_Children_Watch(t *testing.T) { func TestFakeClient_Should_Not_Have_Any_Action_After_Conn_Error(t *testing.T) { c := newFakeClientTest() - callback := func(client Client) { - client.Get("/workers", func(resp zk.GetResponse, err error) { + c.startCuratorClient1(func(sess *Session) { + sess.GetClient().Get("/workers", func(resp zk.GetResponse, err error) { if err != nil { if stderrors.Is(err, zk.ErrConnectionClosed) { - client.Get("/another", func(resp zk.GetResponse, err error) {}) + sess.GetClient().Get("/another", func(resp zk.GetResponse, err error) {}) return } panic(err) } }) - } - - c.startCuratorClient1(func(sess *Session) { - sess.Run(callback) }) c.store.Begin(client1) @@ -1160,23 +1115,17 @@ func TestFakeClient_Should_Not_Have_Any_Action_After_Conn_Error(t *testing.T) { func TestFakeClient_Should_Not_Recv_Watch_After_Connection_Error(t *testing.T) { c := newFakeClientTest() - callback := func(client Client) { - client.Create("/worker", nil, zk.FlagEphemeral, func(resp zk.CreateResponse, err error) { - }) - } - NewFakeClientFactory(c.store, client2).Start(New(func(sess *Session) { - sess.Run(func(client Client) { - client.GetW("/worker", func(resp zk.GetResponse, err error) { - c.addStep("get-resp") - }, func(ev zk.Event) { - c.addStep("get-watch") - }) + sess.GetClient().GetW("/worker", func(resp zk.GetResponse, err error) { + c.addStep("get-resp") + }, func(ev zk.Event) { + c.addStep("get-watch") }) })) c.startCuratorClient1(func(sess *Session) { - sess.Run(callback) + sess.GetClient().Create("/worker", nil, zk.FlagEphemeral, func(resp zk.CreateResponse, err error) { + }) }) c.store.Begin(client1) @@ -1204,23 +1153,17 @@ func TestFakeClient_Should_Not_Recv_Watch_After_Connection_Error(t *testing.T) { func TestFakeClient_Should_Not_Recv_Watch_After_Connection_Error_For_ChildrenW(t *testing.T) { c := newFakeClientTest() - callback := func(client Client) { - client.Create("/worker", nil, zk.FlagEphemeral, func(resp zk.CreateResponse, err error) { - }) - } - NewFakeClientFactory(c.store, client2).Start(New(func(sess *Session) { - sess.Run(func(client Client) { - client.ChildrenW("/", func(resp zk.ChildrenResponse, err error) { - c.addStep("children-resp") - }, func(ev zk.Event) { - c.addStep("children-watch") - }) + sess.GetClient().ChildrenW("/", func(resp zk.ChildrenResponse, err error) { + c.addStep("children-resp") + }, func(ev zk.Event) { + c.addStep("children-watch") }) })) c.startCuratorClient1(func(sess *Session) { - sess.Run(callback) + sess.GetClient().Create("/worker", nil, zk.FlagEphemeral, func(resp zk.CreateResponse, err error) { + }) }) c.store.Begin(client1) @@ -1246,23 +1189,17 @@ func TestFakeClient_Should_Not_Recv_Watch_After_Connection_Error_For_ChildrenW(t func TestFakeClient_Should_Not_Recv_Watch_After_Conn_Error_And_Expired_For_ChildrenW(t *testing.T) { c := newFakeClientTest() - callback := func(client Client) { - client.Create("/worker", nil, zk.FlagEphemeral, func(resp zk.CreateResponse, err error) { - }) - } - NewFakeClientFactory(c.store, client2).Start(New(func(sess *Session) { - sess.Run(func(client Client) { - client.ChildrenW("/", func(resp zk.ChildrenResponse, err error) { - c.addStep("children-resp") - }, func(ev zk.Event) { - c.addStep("children-watch") - }) + sess.GetClient().ChildrenW("/", func(resp zk.ChildrenResponse, err error) { + c.addStep("children-resp") + }, func(ev zk.Event) { + c.addStep("children-watch") }) })) c.startCuratorClient1(func(sess *Session) { - sess.Run(callback) + sess.GetClient().Create("/worker", nil, zk.FlagEphemeral, func(resp zk.CreateResponse, err error) { + }) }) c.store.Begin(client1) @@ -1291,22 +1228,16 @@ func TestFakeClient_Should_Not_Recv_Watch_After_Conn_Error_And_Expired_For_Child func TestFakeClient_Should_Not_Recv_Watch_After_Expired_For_ChildrenW(t *testing.T) { c := newFakeClientTest() - callback := func(client Client) { - client.Create("/worker", nil, zk.FlagEphemeral, func(resp zk.CreateResponse, err error) {}) - } - NewFakeClientFactory(c.store, client2).Start(New(func(sess *Session) { - sess.Run(func(client Client) { - client.ChildrenW("/", func(resp zk.ChildrenResponse, err error) { - c.addStep("children-resp") - }, func(ev zk.Event) { - c.addStep("children-watch") - }) + sess.GetClient().ChildrenW("/", func(resp zk.ChildrenResponse, err error) { + c.addStep("children-resp") + }, func(ev zk.Event) { + c.addStep("children-watch") }) })) c.startCuratorClient1(func(sess *Session) { - sess.Run(callback) + sess.GetClient().Create("/worker", nil, zk.FlagEphemeral, func(resp zk.CreateResponse, err error) {}) }) c.store.Begin(client1) @@ -1330,42 +1261,34 @@ func TestFakeClient_Should_Not_Recv_Watch_After_Expired_For_ChildrenW(t *testing func TestFakeClient_Retry_Happens_Before_Watch_Handlers(t *testing.T) { c := newFakeClientTest() - callback := func(client Client) { - client.Create("/worker", nil, zk.FlagEphemeral, func(resp zk.CreateResponse, err error) { - }) - } - var getFunc func(sess *Session) getFunc = func(sess *Session) { - sess.Run(func(client Client) { - c.addStep("get-req") - client.Get("/hello", func(resp zk.GetResponse, err error) { - c.addStep("get-resp") - if stderrors.Is(err, zk.ErrConnectionClosed) { - sess.AddRetry(getFunc) - return - } - if err != nil { - panic(err) - } - }) + c.addStep("get-req") + sess.GetClient().Get("/hello", func(resp zk.GetResponse, err error) { + c.addStep("get-resp") + if stderrors.Is(err, zk.ErrConnectionClosed) { + sess.AddRetry(getFunc) + return + } + if err != nil { + panic(err) + } }) } NewFakeClientFactory(c.store, client2).Start(New(func(sess *Session) { - sess.Run(func(client Client) { - client.ChildrenW("/", func(resp zk.ChildrenResponse, err error) { - c.addStep("children-resp") - }, func(ev zk.Event) { - c.addStep("children-watch") - }) + sess.GetClient().ChildrenW("/", func(resp zk.ChildrenResponse, err error) { + c.addStep("children-resp") + }, func(ev zk.Event) { + c.addStep("children-watch") }) getFunc(sess) })) c.startCuratorClient1(func(sess *Session) { - sess.Run(callback) + sess.GetClient().Create("/worker", nil, zk.FlagEphemeral, func(resp zk.CreateResponse, err error) { + }) }) c.store.Begin(client1) @@ -1391,7 +1314,17 @@ func TestFakeClient_Create_With_Error(t *testing.T) { var errors []error - callback := func(client Client) { + var getResp zk.GetResponse + NewFakeClientFactory(c.store, client2).Start(New(func(sess *Session) { + sess.GetClient().Get("/worker", func(resp zk.GetResponse, err error) { + c.addStep("get-resp02") + getResp = resp + errors = append(errors, err) + }) + })) + + c.startCuratorClient1(func(sess *Session) { + client := sess.GetClient() client.Create("/worker", []byte("data01"), zk.FlagEphemeral, func(resp zk.CreateResponse, err error) { c.addStep("create-resp01") errors = append(errors, err) @@ -1400,21 +1333,6 @@ func TestFakeClient_Create_With_Error(t *testing.T) { c.addStep("create-resp02") errors = append(errors, err) }) - } - - var getResp zk.GetResponse - NewFakeClientFactory(c.store, client2).Start(New(func(sess *Session) { - sess.Run(func(client Client) { - client.Get("/worker", func(resp zk.GetResponse, err error) { - c.addStep("get-resp02") - getResp = resp - errors = append(errors, err) - }) - }) - })) - - c.startCuratorClient1(func(sess *Session) { - sess.Run(callback) }) c.store.Begin(client1) @@ -1442,13 +1360,9 @@ func TestFakeClient_Create_With_Error(t *testing.T) { func TestFakeClient_Conn_Error_Multi_Times(t *testing.T) { c := newFakeClientTest() - callback := func(client Client) { - client.Create("/worker", []byte("data01"), zk.FlagEphemeral, func(resp zk.CreateResponse, err error) { - }) - } - c.startCuratorClient1(func(sess *Session) { - sess.Run(callback) + sess.GetClient().Create("/worker", []byte("data01"), zk.FlagEphemeral, func(resp zk.CreateResponse, err error) { + }) }) c.store.Begin(client1) @@ -1464,17 +1378,15 @@ func TestFakeClient_Create_Child_of_Ephemeral_Error(t *testing.T) { c := newFakeClientTest() var errors []error - callback := func(client Client) { + + c.startCuratorClient1(func(sess *Session) { + client := sess.GetClient() client.Create("/worker", []byte("data01"), zk.FlagEphemeral, func(resp zk.CreateResponse, err error) { errors = append(errors, err) }) client.Create("/worker/node01", []byte("data02"), zk.FlagEphemeral, func(resp zk.CreateResponse, err error) { errors = append(errors, err) }) - } - - c.startCuratorClient1(func(sess *Session) { - sess.Run(callback) }) c.store.Begin(client1) @@ -1493,7 +1405,9 @@ func TestFakeClient__Set_Error(t *testing.T) { c := newFakeClientTest() var errors []error - callback := func(client Client) { + + c.startCuratorClient1(func(sess *Session) { + client := sess.GetClient() client.Create("/worker", []byte("data01"), zk.FlagEphemeral, func(resp zk.CreateResponse, err error) { c.addStep("create-resp") errors = append(errors, err) @@ -1502,10 +1416,6 @@ func TestFakeClient__Set_Error(t *testing.T) { c.addStep("set-resp") errors = append(errors, err) }) - } - - c.startCuratorClient1(func(sess *Session) { - sess.Run(callback) }) c.store.Begin(client1) @@ -1532,7 +1442,9 @@ func TestFakeClient__Delete_Error(t *testing.T) { c := newFakeClientTest() var errors []error - callback := func(client Client) { + + c.startCuratorClient1(func(sess *Session) { + client := sess.GetClient() client.Create("/worker", []byte("data01"), zk.FlagEphemeral, func(resp zk.CreateResponse, err error) { c.addStep("create-resp") errors = append(errors, err) @@ -1541,10 +1453,6 @@ func TestFakeClient__Delete_Error(t *testing.T) { c.addStep("delete-resp") errors = append(errors, err) }) - } - - c.startCuratorClient1(func(sess *Session) { - sess.Run(callback) }) c.store.Begin(client1) @@ -1562,3 +1470,44 @@ func TestFakeClient__Delete_Error(t *testing.T) { assert.Equal(t, 0, len(c.store.Root.Children)) } + +func TestFakeClient__Session_Expired_And_Then_Begin__Not_Keeping_Old_Watch(t *testing.T) { + c := newFakeClientTest() + + c.startCuratorClient1(func(sess *Session) { + client := sess.GetClient() + client.Create("/worker", []byte("data01"), 0, func(resp zk.CreateResponse, err error) { + c.addStep("create-resp") + }) + client.GetW("/worker", func(resp zk.GetResponse, err error) { + c.addStep("getw-resp") + }, func(ev zk.Event) { + c.addStep("getw-watch") + }) + }) + + NewFakeClientFactory(c.store, client2).Start(New(func(sess *Session) { + sess.GetClient().Delete("/worker", 0, func(resp zk.DeleteResponse, err error) { + c.addStep("delete-resp") + }) + })) + + c.store.Begin(client1) + c.store.CreateApply(client1) + c.store.GetApply(client1) + + c.store.SessionExpired(client1) + c.store.Begin(client1) + + c.store.Begin(client2) + c.store.DeleteApply(client2) + + c.store.PrintData() + c.store.PrintPendingCalls() + + assert.Equal(t, []string{ + "create-resp", + "getw-resp", + "delete-resp", + }, c.steps) +} diff --git a/curator/fake_property_test.go b/curator/fake_property_test.go index 6ad2c1a..b4e4af2 100644 --- a/curator/fake_property_test.go +++ b/curator/fake_property_test.go @@ -29,64 +29,58 @@ func newSimpleLock(store *FakeZookeeper, client FakeClientID) *simpleLock { } func (l *simpleLock) start(sess *Session) { - sess.Run(func(client Client) { - client.Create("/master", nil, zk.FlagEphemeral, func(resp zk.CreateResponse, err error) { - if err != nil { - if errors.Is(err, zk.ErrNodeExists) { - l.onFollower(sess) - return - } - if errors.Is(err, zk.ErrConnectionClosed) { - return - } - panic(err) + sess.GetClient().Create("/master", nil, zk.FlagEphemeral, func(resp zk.CreateResponse, err error) { + if err != nil { + if errors.Is(err, zk.ErrNodeExists) { + l.onFollower(sess) + return + } + if errors.Is(err, zk.ErrConnectionClosed) { + return } - l.isLeader(sess) - }) + panic(err) + } + l.isLeader(sess) }) } func (l *simpleLock) onFollower(sess *Session) { - sess.Run(func(client Client) { - client.GetW("/master", func(resp zk.GetResponse, err error) { - if err != nil { - if errors.Is(err, zk.ErrConnectionClosed) { - sess.AddRetry(l.onFollower) - return - } - if errors.Is(err, zk.ErrNoNode) { - l.onFollower(sess) - return - } - panic(err) + sess.GetClient().GetW("/master", func(resp zk.GetResponse, err error) { + if err != nil { + if errors.Is(err, zk.ErrConnectionClosed) { + sess.AddRetry(l.onFollower) + return } - }, func(ev zk.Event) { - l.start(sess) - }) + if errors.Is(err, zk.ErrNoNode) { + l.onFollower(sess) + return + } + panic(err) + } + }, func(ev zk.Event) { + l.start(sess) }) } func (l *simpleLock) isLeader(sess *Session) { - sess.Run(func(client Client) { - client.Get("/counter", func(resp zk.GetResponse, err error) { - if err != nil { - if errors.Is(err, zk.ErrNoNode) { - l.increase(sess, 1, 0) - return - } - if errors.Is(err, zk.ErrConnectionClosed) { - sess.AddRetry(l.isLeader) - return - } - panic(err) + sess.GetClient().Get("/counter", func(resp zk.GetResponse, err error) { + if err != nil { + if errors.Is(err, zk.ErrNoNode) { + l.increase(sess, 1, 0) + return } - - num, err := strconv.ParseInt(string(resp.Data), 10, 64) - if err != nil { - panic(err) + if errors.Is(err, zk.ErrConnectionClosed) { + sess.AddRetry(l.isLeader) + return } - l.increase(sess, int(num)+1, resp.Stat.Version) - }) + panic(err) + } + + num, err := strconv.ParseInt(string(resp.Data), 10, 64) + if err != nil { + panic(err) + } + l.increase(sess, int(num)+1, resp.Stat.Version) }) } @@ -121,13 +115,12 @@ func (l *simpleLock) createCounterResp(sess *Session) func(_ zk.CreateResponse, } func (l *simpleLock) increase(sess *Session, nextVal int, version int32) { - sess.Run(func(client Client) { - if nextVal > 1 { - client.Set("/counter", numToBytes(nextVal), version, l.setCounterResp(sess)) - } else { - client.Create("/counter", numToBytes(nextVal), 0, l.createCounterResp(sess)) - } - }) + client := sess.GetClient() + if nextVal > 1 { + client.Set("/counter", numToBytes(nextVal), version, l.setCounterResp(sess)) + } else { + client.Create("/counter", numToBytes(nextVal), 0, l.createCounterResp(sess)) + } } func TestFakeZookeeperTester_Master_Lock(t *testing.T) { diff --git a/curator/util_test.go b/curator/util_test.go index 2711046..fc07647 100644 --- a/curator/util_test.go +++ b/curator/util_test.go @@ -12,10 +12,8 @@ func TestParallelRunner(t *testing.T) { r := NewParallelRunner( New(func(sess *Session) { steps = append(steps, "init01") - sess.Run(func(client Client) { - sess.AddRetry(func(sess *Session) { - steps = append(steps, "retry01") - }) + sess.AddRetry(func(sess *Session) { + steps = append(steps, "retry01") }) }), New(func(sess *Session) { diff --git a/test-examples/lock/main.go b/test-examples/lock/main.go index 4da3ee6..36666de 100644 --- a/test-examples/lock/main.go +++ b/test-examples/lock/main.go @@ -7,7 +7,6 @@ import ( "log" "os" "os/signal" - "time" "github.com/QuangTung97/zk/concurrency" "github.com/QuangTung97/zk/curator" @@ -50,13 +49,5 @@ func main() { ch := make(chan os.Signal, 1) signal.Notify(ch, os.Interrupt) - - for i := 0; i < 6000; i++ { - time.Sleep(1 * time.Second) - select { - case <-ch: - return - default: - } - } + <-ch } diff --git a/todolist b/todolist index 15927ce..9922327 100644 --- a/todolist +++ b/todolist @@ -1,3 +1,4 @@ *) Batching Read & Write to TCP (Need to do or not?) *) Stress Tests with Race Detector *) Add Multi-Ops Transactions +*) Explain why without session the locking algorithm + increase will be wrong