Skip to content

Commit

Permalink
Merge pull request #9281 from yudai/fix_unrestored_watchers
Browse files Browse the repository at this point in the history
mvcc: restore unsynced watchers
  • Loading branch information
gyuho authored Feb 6, 2018
2 parents 88d623f + fcab10b commit 63183f8
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 25 deletions.
2 changes: 1 addition & 1 deletion internal/mvcc/watchable_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ func (s *watchableStore) Restore(b backend.Backend) error {
}

for wa := range s.synced.watchers {
s.unsynced.watchers.add(wa)
s.unsynced.add(wa)
}
s.synced = newWatcherGroup()
return nil
Expand Down
57 changes: 33 additions & 24 deletions internal/mvcc/watchable_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,36 +297,45 @@ func TestWatchFutureRev(t *testing.T) {
}

func TestWatchRestore(t *testing.T) {
b, tmpPath := backend.NewDefaultTmpBackend()
s := newWatchableStore(b, &lease.FakeLessor{}, nil)
defer cleanup(s, b, tmpPath)
test := func(delay time.Duration) func(t *testing.T) {
return func(t *testing.T) {
b, tmpPath := backend.NewDefaultTmpBackend()
s := newWatchableStore(b, &lease.FakeLessor{}, nil)
defer cleanup(s, b, tmpPath)

testKey := []byte("foo")
testValue := []byte("bar")
rev := s.Put(testKey, testValue, lease.NoLease)
testKey := []byte("foo")
testValue := []byte("bar")
rev := s.Put(testKey, testValue, lease.NoLease)

newBackend, newPath := backend.NewDefaultTmpBackend()
newStore := newWatchableStore(newBackend, &lease.FakeLessor{}, nil)
defer cleanup(newStore, newBackend, newPath)
newBackend, newPath := backend.NewDefaultTmpBackend()
newStore := newWatchableStore(newBackend, &lease.FakeLessor{}, nil)
defer cleanup(newStore, newBackend, newPath)

w := newStore.NewWatchStream()
w.Watch(0, testKey, nil, rev-1)
w := newStore.NewWatchStream()
w.Watch(0, testKey, nil, rev-1)

newStore.Restore(b)
select {
case resp := <-w.Chan():
if resp.Revision != rev {
t.Fatalf("rev = %d, want %d", resp.Revision, rev)
}
if len(resp.Events) != 1 {
t.Fatalf("failed to get events from the response")
}
if resp.Events[0].Kv.ModRevision != rev {
t.Fatalf("kv.rev = %d, want %d", resp.Events[0].Kv.ModRevision, rev)
time.Sleep(delay)

newStore.Restore(b)
select {
case resp := <-w.Chan():
if resp.Revision != rev {
t.Fatalf("rev = %d, want %d", resp.Revision, rev)
}
if len(resp.Events) != 1 {
t.Fatalf("failed to get events from the response")
}
if resp.Events[0].Kv.ModRevision != rev {
t.Fatalf("kv.rev = %d, want %d", resp.Events[0].Kv.ModRevision, rev)
}
case <-time.After(time.Second):
t.Fatal("failed to receive event in 1 second.")
}
}
case <-time.After(time.Second):
t.Fatal("failed to receive event in 1 second.")
}

t.Run("Normal", test(0))
t.Run("RunSyncWatchLoopBeforeRestore", test(time.Millisecond*120)) // longer than default waitDuration
}

// TestWatchBatchUnsynced tests batching on unsynced watchers
Expand Down

0 comments on commit 63183f8

Please sign in to comment.