Skip to content

Commit

Permalink
Merge pull request #9065 from gyuho/watch-id-2
Browse files Browse the repository at this point in the history
*: allow user-provided watch ID to mvcc
  • Loading branch information
gyuho authored Jan 5, 2018
2 parents 2fb9728 + 10522f8 commit 6546483
Show file tree
Hide file tree
Showing 13 changed files with 305 additions and 198 deletions.
1 change: 1 addition & 0 deletions Documentation/dev-guide/api_reference_v3.md
Original file line number Diff line number Diff line change
Expand Up @@ -832,6 +832,7 @@ From google paxosdb paper: Our implementation hinges around a powerful primitive
| progress_notify | progress_notify is set so that the etcd server will periodically send a WatchResponse with no events to the new watcher if there are no recent events. It is useful when clients wish to recover a disconnected watcher starting from a recent known revision. The etcd server may decide how often it will send notifications based on current load. | bool |
| filters | filters filter the events at server side before it sends back to the watcher. | (slice of) FilterType |
| prev_kv | If prev_kv is set, created watcher gets the previous KV before the event happens. If the previous KV is already compacted, nothing will be returned. | bool |
| watch_id | If watch_id is provided and non-zero, it will be assigned to this watcher. Since creating a watcher in etcd is not a synchronous operation, this can be used ensure that ordering is correct when creating multiple watchers on the same stream. Creating a watcher with an ID already in use on the stream will cause an error to be returned. | int64 |



Expand Down
5 changes: 5 additions & 0 deletions Documentation/dev-guide/apispec/swagger/rpc.swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -2283,6 +2283,11 @@
"description": "start_revision is an optional revision to watch from (inclusive). No start_revision is \"now\".",
"type": "string",
"format": "int64"
},
"watch_id": {
"description": "If watch_id is provided and non-zero, it will be assigned to this watcher.\nSince creating a watcher in etcd is not a synchronous operation,\nthis can be used ensure that ordering is correct when creating multiple\nwatchers on the same stream. Creating a watcher with an ID already in\nuse on the stream will cause an error to be returned.",
"type": "string",
"format": "int64"
}
}
},
Expand Down
5 changes: 4 additions & 1 deletion clientv3/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,8 @@ func (w *watcher) closeStream(wgs *watchGrpcStream) {
}

func (w *watchGrpcStream) addSubstream(resp *pb.WatchResponse, ws *watcherStream) {
if resp.WatchId == -1 {
// check watch ID for backward compatibility (<= v3.3)
if resp.WatchId == -1 || (resp.Canceled && resp.CancelReason != "") {
// failed; no channel
close(ws.recvc)
return
Expand Down Expand Up @@ -453,6 +454,7 @@ func (w *watchGrpcStream) run() {
// Watch() requested
case wreq := <-w.reqc:
outc := make(chan WatchResponse, 1)
// TODO: pass custom watch ID?
ws := &watcherStream{
initReq: *wreq,
id: -1,
Expand Down Expand Up @@ -553,6 +555,7 @@ func (w *watchGrpcStream) dispatchEvent(pbresp *pb.WatchResponse) bool {
for i, ev := range pbresp.Events {
events[i] = (*Event)(ev)
}
// TODO: return watch ID?
wr := &WatchResponse{
Header: *pbresp.Header,
Events: events,
Expand Down
11 changes: 7 additions & 4 deletions etcdserver/api/v3rpc/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ func (sws *serverWatchStream) recvLoop() error {
if !sws.isWatchPermitted(creq) {
wr := &pb.WatchResponse{
Header: sws.newResponseHeader(sws.watchStream.Rev()),
WatchId: -1,
WatchId: creq.WatchId,
Canceled: true,
Created: true,
CancelReason: rpctypes.ErrGRPCPermissionDenied.Error(),
Expand All @@ -225,8 +225,8 @@ func (sws *serverWatchStream) recvLoop() error {
if rev == 0 {
rev = wsrev + 1
}
id := sws.watchStream.Watch(creq.Key, creq.RangeEnd, rev, filters...)
if id != -1 {
id, err := sws.watchStream.Watch(mvcc.WatchID(creq.WatchId), creq.Key, creq.RangeEnd, rev, filters...)
if err == nil {
sws.mu.Lock()
if creq.ProgressNotify {
sws.progress[id] = true
Expand All @@ -240,7 +240,10 @@ func (sws *serverWatchStream) recvLoop() error {
Header: sws.newResponseHeader(wsrev),
WatchId: int64(id),
Created: true,
Canceled: id == -1,
Canceled: err != nil,
}
if err != nil {
wr.CancelReason = err.Error()
}
select {
case sws.ctrlStream <- wr:
Expand Down
338 changes: 189 additions & 149 deletions etcdserver/etcdserverpb/rpc.pb.go

Large diffs are not rendered by default.

7 changes: 7 additions & 0 deletions etcdserver/etcdserverpb/rpc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -671,6 +671,13 @@ message WatchCreateRequest {
// If prev_kv is set, created watcher gets the previous KV before the event happens.
// If the previous KV is already compacted, nothing will be returned.
bool prev_kv = 6;

// If watch_id is provided and non-zero, it will be assigned to this watcher.
// Since creating a watcher in etcd is not a synchronous operation,
// this can be used ensure that ordering is correct when creating multiple
// watchers on the same stream. Creating a watcher with an ID already in
// use on the stream will cause an error to be returned.
int64 watch_id = 7;
}

message WatchCancelRequest {
Expand Down
4 changes: 2 additions & 2 deletions mvcc/kv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -716,7 +716,7 @@ func TestWatchableKVWatch(t *testing.T) {
w := s.NewWatchStream()
defer w.Close()

wid := w.Watch([]byte("foo"), []byte("fop"), 0)
wid, _ := w.Watch(0, []byte("foo"), []byte("fop"), 0)

wev := []mvccpb.Event{
{Type: mvccpb.PUT,
Expand Down Expand Up @@ -783,7 +783,7 @@ func TestWatchableKVWatch(t *testing.T) {
}

w = s.NewWatchStream()
wid = w.Watch([]byte("foo1"), []byte("foo2"), 3)
wid, _ = w.Watch(0, []byte("foo1"), []byte("foo2"), 3)

select {
case resp := <-w.Chan():
Expand Down
6 changes: 3 additions & 3 deletions mvcc/watchable_store_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func BenchmarkWatchableStoreWatchSyncPut(b *testing.B) {
watchIDs := make([]WatchID, b.N)
for i := range watchIDs {
// non-0 value to keep watchers in unsynced
watchIDs[i] = w.Watch(k, nil, 1)
watchIDs[i], _ = w.Watch(0, k, nil, 1)
}

b.ResetTimer()
Expand Down Expand Up @@ -142,7 +142,7 @@ func BenchmarkWatchableStoreUnsyncedCancel(b *testing.B) {
watchIDs := make([]WatchID, watcherN)
for i := 0; i < watcherN; i++ {
// non-0 value to keep watchers in unsynced
watchIDs[i] = w.Watch(testKey, nil, 1)
watchIDs[i], _ = w.Watch(0, testKey, nil, 1)
}

// random-cancel N watchers to make it not biased towards
Expand Down Expand Up @@ -182,7 +182,7 @@ func BenchmarkWatchableStoreSyncedCancel(b *testing.B) {
watchIDs := make([]WatchID, watcherN)
for i := 0; i < watcherN; i++ {
// 0 for startRev to keep watchers in synced
watchIDs[i] = w.Watch(testKey, nil, 0)
watchIDs[i], _ = w.Watch(0, testKey, nil, 0)
}

// randomly cancel watchers to make it not biased towards
Expand Down
20 changes: 10 additions & 10 deletions mvcc/watchable_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func TestWatch(t *testing.T) {
s.Put(testKey, testValue, lease.NoLease)

w := s.NewWatchStream()
w.Watch(testKey, nil, 0)
w.Watch(0, testKey, nil, 0)

if !s.synced.contains(string(testKey)) {
// the key must have had an entry in synced
Expand All @@ -63,7 +63,7 @@ func TestNewWatcherCancel(t *testing.T) {
s.Put(testKey, testValue, lease.NoLease)

w := s.NewWatchStream()
wt := w.Watch(testKey, nil, 0)
wt, _ := w.Watch(0, testKey, nil, 0)

if err := w.Cancel(wt); err != nil {
t.Error(err)
Expand Down Expand Up @@ -114,7 +114,7 @@ func TestCancelUnsynced(t *testing.T) {
watchIDs := make([]WatchID, watcherN)
for i := 0; i < watcherN; i++ {
// use 1 to keep watchers in unsynced
watchIDs[i] = w.Watch(testKey, nil, 1)
watchIDs[i], _ = w.Watch(0, testKey, nil, 1)
}

for _, idx := range watchIDs {
Expand Down Expand Up @@ -160,7 +160,7 @@ func TestSyncWatchers(t *testing.T) {

for i := 0; i < watcherN; i++ {
// specify rev as 1 to keep watchers in unsynced
w.Watch(testKey, nil, 1)
w.Watch(0, testKey, nil, 1)
}

// Before running s.syncWatchers() synced should be empty because we manually
Expand Down Expand Up @@ -242,7 +242,7 @@ func TestWatchCompacted(t *testing.T) {
}

w := s.NewWatchStream()
wt := w.Watch(testKey, nil, compactRev-1)
wt, _ := w.Watch(0, testKey, nil, compactRev-1)

select {
case resp := <-w.Chan():
Expand Down Expand Up @@ -271,7 +271,7 @@ func TestWatchFutureRev(t *testing.T) {

w := s.NewWatchStream()
wrev := int64(10)
w.Watch(testKey, nil, wrev)
w.Watch(0, testKey, nil, wrev)

for i := 0; i < 10; i++ {
rev := s.Put(testKey, testValue, lease.NoLease)
Expand Down Expand Up @@ -310,7 +310,7 @@ func TestWatchRestore(t *testing.T) {
defer cleanup(newStore, newBackend, newPath)

w := newStore.NewWatchStream()
w.Watch(testKey, nil, rev-1)
w.Watch(0, testKey, nil, rev-1)

newStore.Restore(b)
select {
Expand Down Expand Up @@ -349,7 +349,7 @@ func TestWatchBatchUnsynced(t *testing.T) {
}

w := s.NewWatchStream()
w.Watch(v, nil, 1)
w.Watch(0, v, nil, 1)
for i := 0; i < batches; i++ {
if resp := <-w.Chan(); len(resp.Events) != watchBatchMaxRevs {
t.Fatalf("len(events) = %d, want %d", len(resp.Events), watchBatchMaxRevs)
Expand Down Expand Up @@ -485,7 +485,7 @@ func TestWatchVictims(t *testing.T) {
for i := 0; i < numWatches; i++ {
go func() {
w := s.NewWatchStream()
w.Watch(testKey, nil, 1)
w.Watch(0, testKey, nil, 1)
defer func() {
w.Close()
wg.Done()
Expand Down Expand Up @@ -561,7 +561,7 @@ func TestStressWatchCancelClose(t *testing.T) {
w := s.NewWatchStream()
ids := make([]WatchID, 10)
for i := range ids {
ids[i] = w.Watch(testKey, nil, 0)
ids[i], _ = w.Watch(0, testKey, nil, 0)
}
<-readyc
wg.Add(1 + len(ids)/2)
Expand Down
37 changes: 25 additions & 12 deletions mvcc/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,14 @@ import (
"github.com/coreos/etcd/mvcc/mvccpb"
)

// AutoWatchID is the watcher ID passed in WatchStream.Watch when no
// user-provided ID is available. If pass, an ID will automatically be assigned.
const AutoWatchID WatchID = 0

var (
ErrWatcherNotExist = errors.New("mvcc: watcher does not exist")
ErrWatcherNotExist = errors.New("mvcc: watcher does not exist")
ErrEmptyWatcherRange = errors.New("mvcc: watcher range is empty")
ErrWatcherDuplicateID = errors.New("mvcc: duplicate watch ID provided on the WatchStream")
)

type WatchID int64
Expand All @@ -36,12 +42,13 @@ type WatchStream interface {
// happened on the given key or range [key, end) from the given startRev.
//
// The whole event history can be watched unless compacted.
// If `startRev` <=0, watch observes events after currentRev.
// If "startRev" <=0, watch observes events after currentRev.
//
// The returned `id` is the ID of this watcher. It appears as WatchID
// The returned "id" is the ID of this watcher. It appears as WatchID
// in events that are sent to the created watcher through stream channel.
//
Watch(key, end []byte, startRev int64, fcs ...FilterFunc) WatchID
// The watch ID is used when it's not equal to AutoWatchID. Otherwise,
// an auto-generated watch ID is returned.
Watch(id WatchID, key, end []byte, startRev int64, fcs ...FilterFunc) (WatchID, error)

// Chan returns a chan. All watch response will be sent to the returned chan.
Chan() <-chan WatchResponse
Expand Down Expand Up @@ -98,28 +105,34 @@ type watchStream struct {
}

// Watch creates a new watcher in the stream and returns its WatchID.
// TODO: return error if ws is closed?
func (ws *watchStream) Watch(key, end []byte, startRev int64, fcs ...FilterFunc) WatchID {
func (ws *watchStream) Watch(id WatchID, key, end []byte, startRev int64, fcs ...FilterFunc) (WatchID, error) {
// prevent wrong range where key >= end lexicographically
// watch request with 'WithFromKey' has empty-byte range end
if len(end) != 0 && bytes.Compare(key, end) != -1 {
return -1
return -1, ErrEmptyWatcherRange
}

ws.mu.Lock()
defer ws.mu.Unlock()
if ws.closed {
return -1
return -1, ErrEmptyWatcherRange
}

id := ws.nextID
ws.nextID++
if id == AutoWatchID {
for ws.watchers[ws.nextID] != nil {
ws.nextID++
}
id = ws.nextID
ws.nextID++
} else if _, ok := ws.watchers[id]; ok {
return -1, ErrWatcherDuplicateID
}

w, c := ws.watchable.watch(key, end, startRev, id, ws.ch, fcs...)

ws.cancels[id] = c
ws.watchers[id] = w
return id
return id, nil
}

func (ws *watchStream) Chan() <-chan WatchResponse {
Expand Down
2 changes: 1 addition & 1 deletion mvcc/watcher_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,6 @@ func BenchmarkKVWatcherMemoryUsage(b *testing.B) {
b.ReportAllocs()
b.StartTimer()
for i := 0; i < b.N; i++ {
w.Watch([]byte(fmt.Sprint("foo", i)), nil, 0)
w.Watch(0, []byte(fmt.Sprint("foo", i)), nil, 0)
}
}
Loading

0 comments on commit 6546483

Please sign in to comment.