Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 67 additions & 4 deletions go/vt/topo/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ func (ts *Server) GetShard(ctx context.Context, keyspace, shard string) (*ShardI
span.Annotate("shard", shard)
defer span.Finish()

shardPath := path.Join(KeyspacesPath, keyspace, ShardsPath, shard, ShardFile)
shardPath := shardFilePath(keyspace, shard)
data, version, err := ts.globalCell.Get(ctx, shardPath)
if err != nil {
return nil, err
Expand Down Expand Up @@ -212,7 +212,7 @@ func (ts *Server) updateShard(ctx context.Context, si *ShardInfo) error {
if err != nil {
return err
}
shardPath := path.Join(KeyspacesPath, si.keyspace, ShardsPath, si.shardName, ShardFile)
shardPath := shardFilePath(si.keyspace, si.shardName)
newVersion, err := ts.globalCell.Update(ctx, shardPath, data, si.version)
if err != nil {
return err
Expand Down Expand Up @@ -302,7 +302,7 @@ func (ts *Server) CreateShard(ctx context.Context, keyspace, shard string) (err
if err != nil {
return err
}
shardPath := path.Join(KeyspacesPath, keyspace, ShardsPath, shard, ShardFile)
shardPath := shardFilePath(keyspace, shard)
if _, err := ts.globalCell.Create(ctx, shardPath, data); err != nil {
// Return error as is, we need to propagate
// ErrNodeExists for instance.
Expand Down Expand Up @@ -349,7 +349,7 @@ func (ts *Server) GetOrCreateShard(ctx context.Context, keyspace, shard string)
// DeleteShard wraps the underlying conn.Delete
// and dispatches the event.
func (ts *Server) DeleteShard(ctx context.Context, keyspace, shard string) error {
shardPath := path.Join(KeyspacesPath, keyspace, ShardsPath, shard, ShardFile)
shardPath := shardFilePath(keyspace, shard)
if err := ts.globalCell.Delete(ctx, shardPath, nil); err != nil {
return err
}
Expand Down Expand Up @@ -578,3 +578,66 @@ func (ts *Server) GetTabletMapForShardByCell(ctx context.Context, keyspace, shar
}
return result, gerr
}

func shardFilePath(keyspace, shard string) string {
return path.Join(KeyspacesPath, keyspace, ShardsPath, shard, ShardFile)
}

// WatchShardData wraps the data we receive on the watch channel
// The WatchShard API guarantees exactly one of Value or Err will be set.
type WatchShardData struct {
Value *topodatapb.Shard
Err error
}

// WatchShard will set a watch on the Shard object.
// It has the same contract as conn.Watch, but it also unpacks the
// contents into a Shard object
func (ts *Server) WatchShard(ctx context.Context, keyspace, shard string) (*WatchShardData, <-chan *WatchShardData, CancelFunc) {
shardPath := shardFilePath(keyspace, shard)
current, wdChannel, cancel := ts.globalCell.Watch(ctx, shardPath)
if current.Err != nil {
return &WatchShardData{Err: current.Err}, nil, nil
}
value := &topodatapb.Shard{}
if err := proto.Unmarshal(current.Contents, value); err != nil {
// Cancel the watch, drain channel.
cancel()
for range wdChannel {
}
return &WatchShardData{Err: vterrors.Wrapf(err, "error unpacking initial Shard object")}, nil, nil
}

changes := make(chan *WatchShardData, 10)
// The background routine reads any event from the watch channel,
// translates it, and sends it to the caller.
// If cancel() is called, the underlying Watch() code will
// send an ErrInterrupted and then close the channel. We'll
// just propagate that back to our caller.
go func() {
defer close(changes)

for wd := range wdChannel {
if wd.Err != nil {
// Last error value, we're done.
// wdChannel will be closed right after
// this, no need to do anything.
changes <- &WatchShardData{Err: wd.Err}
return
}

value := &topodatapb.Shard{}
if err := proto.Unmarshal(wd.Contents, value); err != nil {
cancel()
for range wdChannel {
}
changes <- &WatchShardData{Err: vterrors.Wrapf(err, "error unpacking Shard object")}
return
}

changes <- &WatchShardData{Value: value}
}
}()

return &WatchShardData{Value: value}, changes, cancel
}