From 13494ed63a2b160d6e06ee4079877ec2c8ce2b19 Mon Sep 17 00:00:00 2001 From: deepthi Date: Thu, 26 Sep 2019 13:54:49 -0700 Subject: [PATCH] Reparent: add ability to watch shard data Signed-off-by: deepthi --- go/vt/topo/shard.go | 71 ++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 67 insertions(+), 4 deletions(-) diff --git a/go/vt/topo/shard.go b/go/vt/topo/shard.go index 2dcf4448ba4..8a6952047c8 100644 --- a/go/vt/topo/shard.go +++ b/go/vt/topo/shard.go @@ -182,7 +182,7 @@ func (ts *Server) GetShard(ctx context.Context, keyspace, shard string) (*ShardI span.Annotate("shard", shard) defer span.Finish() - shardPath := path.Join(KeyspacesPath, keyspace, ShardsPath, shard, ShardFile) + shardPath := shardFilePath(keyspace, shard) data, version, err := ts.globalCell.Get(ctx, shardPath) if err != nil { return nil, err @@ -212,7 +212,7 @@ func (ts *Server) updateShard(ctx context.Context, si *ShardInfo) error { if err != nil { return err } - shardPath := path.Join(KeyspacesPath, si.keyspace, ShardsPath, si.shardName, ShardFile) + shardPath := shardFilePath(si.keyspace, si.shardName) newVersion, err := ts.globalCell.Update(ctx, shardPath, data, si.version) if err != nil { return err @@ -302,7 +302,7 @@ func (ts *Server) CreateShard(ctx context.Context, keyspace, shard string) (err if err != nil { return err } - shardPath := path.Join(KeyspacesPath, keyspace, ShardsPath, shard, ShardFile) + shardPath := shardFilePath(keyspace, shard) if _, err := ts.globalCell.Create(ctx, shardPath, data); err != nil { // Return error as is, we need to propagate // ErrNodeExists for instance. @@ -349,7 +349,7 @@ func (ts *Server) GetOrCreateShard(ctx context.Context, keyspace, shard string) // DeleteShard wraps the underlying conn.Delete // and dispatches the event. func (ts *Server) DeleteShard(ctx context.Context, keyspace, shard string) error { - shardPath := path.Join(KeyspacesPath, keyspace, ShardsPath, shard, ShardFile) + shardPath := shardFilePath(keyspace, shard) if err := ts.globalCell.Delete(ctx, shardPath, nil); err != nil { return err } @@ -578,3 +578,66 @@ func (ts *Server) GetTabletMapForShardByCell(ctx context.Context, keyspace, shar } return result, gerr } + +func shardFilePath(keyspace, shard string) string { + return path.Join(KeyspacesPath, keyspace, ShardsPath, shard, ShardFile) +} + +// WatchShardData wraps the data we receive on the watch channel +// The WatchShard API guarantees exactly one of Value or Err will be set. +type WatchShardData struct { + Value *topodatapb.Shard + Err error +} + +// WatchShard will set a watch on the Shard object. +// It has the same contract as conn.Watch, but it also unpacks the +// contents into a Shard object +func (ts *Server) WatchShard(ctx context.Context, keyspace, shard string) (*WatchShardData, <-chan *WatchShardData, CancelFunc) { + shardPath := shardFilePath(keyspace, shard) + current, wdChannel, cancel := ts.globalCell.Watch(ctx, shardPath) + if current.Err != nil { + return &WatchShardData{Err: current.Err}, nil, nil + } + value := &topodatapb.Shard{} + if err := proto.Unmarshal(current.Contents, value); err != nil { + // Cancel the watch, drain channel. + cancel() + for range wdChannel { + } + return &WatchShardData{Err: vterrors.Wrapf(err, "error unpacking initial Shard object")}, nil, nil + } + + changes := make(chan *WatchShardData, 10) + // The background routine reads any event from the watch channel, + // translates it, and sends it to the caller. + // If cancel() is called, the underlying Watch() code will + // send an ErrInterrupted and then close the channel. We'll + // just propagate that back to our caller. + go func() { + defer close(changes) + + for wd := range wdChannel { + if wd.Err != nil { + // Last error value, we're done. + // wdChannel will be closed right after + // this, no need to do anything. + changes <- &WatchShardData{Err: wd.Err} + return + } + + value := &topodatapb.Shard{} + if err := proto.Unmarshal(wd.Contents, value); err != nil { + cancel() + for range wdChannel { + } + changes <- &WatchShardData{Err: vterrors.Wrapf(err, "error unpacking Shard object")} + return + } + + changes <- &WatchShardData{Value: value} + } + }() + + return &WatchShardData{Value: value}, changes, cancel +}