From 639f1e3e69d2d903d97bc4df99ca5d22571c1f95 Mon Sep 17 00:00:00 2001 From: deepthi Date: Fri, 11 Oct 2019 21:23:31 -0700 Subject: [PATCH 1/2] unit tests for shard watch Signed-off-by: deepthi --- go/vt/topo/topotests/shard_watch_test.go | 261 +++++++++++++++++++++++ 1 file changed, 261 insertions(+) create mode 100644 go/vt/topo/topotests/shard_watch_test.go diff --git a/go/vt/topo/topotests/shard_watch_test.go b/go/vt/topo/topotests/shard_watch_test.go new file mode 100644 index 00000000000..61d52d293f3 --- /dev/null +++ b/go/vt/topo/topotests/shard_watch_test.go @@ -0,0 +1,261 @@ +/* +Copyright 2019 The Vitess Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreedto in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package topotests + +import ( + "strings" + "testing" + "time" + + "github.com/golang/protobuf/proto" + "golang.org/x/net/context" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" + "vitess.io/vitess/go/vt/topo" + "vitess.io/vitess/go/vt/topo/memorytopo" +) + +// waitForInitialShard waits for the initial Shard to +// appear, and match the provided srvKeyspace. +func waitForInitialShard(t *testing.T, ts *topo.Server, keyspace, shard string) (current *topo.WatchShardData, changes <-chan *topo.WatchShardData, cancel topo.CancelFunc) { + ctx := context.Background() + start := time.Now() + for { + current, changes, cancel = ts.WatchShard(ctx, keyspace, shard) + switch { + case topo.IsErrType(current.Err, topo.NoNode): + // hasn't appeared yet + if time.Since(start) > 10*time.Second { + t.Fatalf("time out waiting for file to appear") + } + time.Sleep(10 * time.Millisecond) + continue + case current.Err == nil: + return + default: + t.Fatalf("watch failed: %v", current.Err) + } + } +} + +func TestWatchShardNoNode(t *testing.T) { + keyspace := "ks1" + shard := "0" + ctx := context.Background() + ts := memorytopo.NewServer("cell1") + + // No Shard -> ErrNoNode + current, _, _ := ts.WatchShard(ctx, keyspace, shard) + if !topo.IsErrType(current.Err, topo.NoNode) { + t.Errorf("Got invalid result from WatchShard(not there): %v", current.Err) + } +} + +func TestWatchShard(t *testing.T) { + cell := "cell1" + keyspace := "ks1" + shard := "0" + ctx := context.Background() + ts := memorytopo.NewServer(cell) + + // Create keyspace + if err := ts.CreateKeyspace(ctx, keyspace, &topodatapb.Keyspace{}); err != nil { + t.Fatalf("CreateKeyspace %v failed: %v", keyspace, err) + } + + // Create initial value + if err := ts.CreateShard(ctx, keyspace, shard); err != nil { + t.Fatalf("Create(/keyspaces/ks1/shards/0/Shard) failed: %v", err) + } + + // Starting the watch should now work, and return an empty + // Shard. + // Shards are always created with IsMasterServing true + wanted := &topodatapb.Shard{IsMasterServing: true} + current, changes, cancel := waitForInitialShard(t, ts, keyspace, shard) + if !proto.Equal(current.Value, wanted) { + t.Fatalf("got bad data: %v expected: %v", current.Value, wanted) + } + + // Update the value with good data, wait until we see it + wanted.IsMasterServing = false + if _, err := ts.UpdateShardFields(ctx, keyspace, shard, func(si *topo.ShardInfo) error { + si.IsMasterServing = false + return nil + }); err != nil { + t.Fatalf("Update(/keyspaces/ks1/shards/0/Shard) failed: %v", err) + } + for { + wd, ok := <-changes + if !ok { + t.Fatalf("watch channel unexpectedly closed") + } + if wd.Err != nil { + t.Fatalf("watch channel unexpectedly got error: %v", wd.Err) + } + if proto.Equal(wd.Value, wanted) { + break + } + if proto.Equal(wd.Value, &topodatapb.Shard{}) { + t.Log("got duplicate empty value, skipping.") + } + t.Fatalf("got bad data: %v expected: %v", wd.Value, wanted) + } + + conn, err := ts.ConnForCell(ctx, "global") + if err != nil { + t.Fatalf("ConnForCell failed: %v", err) + } + // Update the value with bad data, wait until error. + if _, err := conn.Update(ctx, "/keyspaces/"+keyspace+"/shards/"+shard+"/Shard", []byte("BAD PROTO DATA"), nil); err != nil { + t.Fatalf("Update(/keyspaces/ks1/shards/0/Shard) failed: %v", err) + } + for { + wd, ok := <-changes + if !ok { + t.Fatalf("watch channel unexpectedly closed") + } + if wd.Err != nil { + if strings.Contains(wd.Err.Error(), "error unpacking Shard object") { + break + } + t.Fatalf("watch channel unexpectedly got unknown error: %v", wd.Err) + } + if !proto.Equal(wd.Value, wanted) { + t.Fatalf("got bad data: %v expected: %v", wd.Value, wanted) + } + t.Log("got duplicate right value, skipping.") + } + + // Cancel should still work here, although it does nothing. + cancel() + + // Bad data in topo, setting the watch should now fail. + current, _, _ = ts.WatchShard(ctx, keyspace, shard) + if current.Err == nil || !strings.Contains(current.Err.Error(), "error unpacking initial Shard object") { + t.Fatalf("expected an initial error setting watch on bad content, but got: %v", current.Err) + } + + data, err := proto.Marshal(wanted) + if err != nil { + t.Fatalf("error marshalling proto data: %v", err) + } + // Update content, wait until Watch works again + if _, err := conn.Update(ctx, "/keyspaces/"+keyspace+"/shards/"+shard+"/Shard", data, nil); err != nil { + t.Fatalf("Update(/keyspaces/ks1/shards/0/Shard) failed: %v", err) + } + start := time.Now() + for { + current, changes, _ = ts.WatchShard(ctx, keyspace, shard) + if current.Err != nil { + if strings.Contains(current.Err.Error(), "error unpacking initial Shard object") { + // hasn't changed yet + if time.Since(start) > 10*time.Second { + t.Fatalf("time out waiting for file to appear") + } + time.Sleep(10 * time.Millisecond) + continue + } + t.Fatalf("got unexpected error while setting watch: %v", err) + } + if !proto.Equal(current.Value, wanted) { + t.Fatalf("got bad data: %v expected: %v", current.Value, wanted) + } + break + } + + // Delete node, wait for error (skip any duplicate). + if err := ts.DeleteShard(ctx, keyspace, shard); err != nil { + t.Fatalf("DeleteShard() failed: %v", err) + } + for { + wd, ok := <-changes + if !ok { + t.Fatalf("watch channel unexpectedly closed") + } + if topo.IsErrType(wd.Err, topo.NoNode) { + break + } + if wd.Err != nil { + t.Fatalf("watch channel unexpectedly got unknown error: %v", wd.Err) + } + if !proto.Equal(wd.Value, wanted) { + t.Fatalf("got bad data: %v expected: %v", wd.Value, wanted) + } + t.Log("got duplicate right value, skipping.") + } +} + +func TestWatchShardCancel(t *testing.T) { + cell := "cell1" + keyspace := "ks1" + shard := "0" + ctx := context.Background() + ts := memorytopo.NewServer(cell) + + // No Shard -> ErrNoNode + current, _, _ := ts.WatchShard(ctx, keyspace, shard) + if !topo.IsErrType(current.Err, topo.NoNode) { + t.Errorf("Got invalid result from WatchShard(not there): %v", current.Err) + } + + // Create keyspace + if err := ts.CreateKeyspace(ctx, keyspace, &topodatapb.Keyspace{}); err != nil { + t.Fatalf("CreateKeyspace %v failed: %v", keyspace, err) + } + + // Create initial value + if err := ts.CreateShard(ctx, keyspace, shard); err != nil { + t.Fatalf("Create(/keyspaces/ks1/shards/0/Shard) failed: %v", err) + } + wanted := &topodatapb.Shard{ + IsMasterServing: false, + } + if _, err := ts.UpdateShardFields(ctx, keyspace, shard, func(si *topo.ShardInfo) error { + si.IsMasterServing = false + return nil + }); err != nil { + t.Fatalf("UpdateShardFields() failed: %v", err) + } + + // Starting the watch should now work. + current, changes, cancel := waitForInitialShard(t, ts, keyspace, shard) + if !proto.Equal(current.Value, wanted) { + t.Fatalf("got bad data: %v expected: %v", current.Value, wanted) + } + + // Cancel watch, wait for error. + cancel() + for { + wd, ok := <-changes + if !ok { + t.Fatalf("watch channel unexpectedly closed") + } + if topo.IsErrType(wd.Err, topo.Interrupted) { + break + } + if wd.Err != nil { + t.Fatalf("watch channel unexpectedly got unknown error: %v", wd.Err) + } + if !proto.Equal(wd.Value, wanted) { + t.Fatalf("got bad data: %v expected: %v", wd.Value, wanted) + } + t.Log("got duplicate right value, skipping.") + } + + // Cancel should still work here, although it does nothing. + cancel() +} From 352d31dff48b1e5d877babd0ac89f2339e5aef6d Mon Sep 17 00:00:00 2001 From: deepthi Date: Mon, 14 Oct 2019 10:50:19 -0700 Subject: [PATCH 2/2] fix doc comment Signed-off-by: deepthi --- go/vt/topo/topotests/shard_watch_test.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/go/vt/topo/topotests/shard_watch_test.go b/go/vt/topo/topotests/shard_watch_test.go index 61d52d293f3..bdc2eab1db9 100644 --- a/go/vt/topo/topotests/shard_watch_test.go +++ b/go/vt/topo/topotests/shard_watch_test.go @@ -28,8 +28,7 @@ import ( "vitess.io/vitess/go/vt/topo/memorytopo" ) -// waitForInitialShard waits for the initial Shard to -// appear, and match the provided srvKeyspace. +// waitForInitialShard waits for the initial Shard to appear. func waitForInitialShard(t *testing.T, ts *topo.Server, keyspace, shard string) (current *topo.WatchShardData, changes <-chan *topo.WatchShardData, cancel topo.CancelFunc) { ctx := context.Background() start := time.Now()