Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions go/vt/srvtopo/keyspace_filtering_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,23 @@ func (ksf keyspaceFilteringServer) GetSrvKeyspace(
return ksf.server.GetSrvKeyspace(ctx, cell, keyspace)
}

func (ksf keyspaceFilteringServer) WatchSrvKeyspace(
ctx context.Context,
cell, keyspace string,
callback func(*topodatapb.SrvKeyspace, error) bool,
) {
filteringCallback := func(ks *topodatapb.SrvKeyspace, err error) bool {
if ks != nil {
if !ksf.selectKeyspaces[keyspace] {
return callback(nil, topo.NewError(topo.NoNode, keyspace))
}
}
return callback(ks, err)
}

ksf.server.WatchSrvKeyspace(ctx, cell, keyspace, filteringCallback)
}

func (ksf keyspaceFilteringServer) WatchSrvVSchema(
ctx context.Context,
cell string,
Expand Down
2 changes: 2 additions & 0 deletions go/vt/srvtopo/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ type Server interface {
// GetSrvKeyspace returns the SrvKeyspace for a cell/keyspace.
GetSrvKeyspace(ctx context.Context, cell, keyspace string) (*topodatapb.SrvKeyspace, error)

WatchSrvKeyspace(ctx context.Context, cell, keyspace string, callback func(*topodatapb.SrvKeyspace, error) bool)

// WatchSrvVSchema starts watching the SrvVSchema object for
// the provided cell. It will call the callback when
// a new value or an error occurs.
Expand Down
4 changes: 4 additions & 0 deletions go/vt/srvtopo/srvtopotest/passthrough.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ func (srv *PassthroughSrvTopoServer) GetSrvKeyspace(ctx context.Context, cell, k
return srv.SrvKeyspace, srv.SrvKeyspaceError
}

func (srv *PassthroughSrvTopoServer) WatchSrvKeyspace(ctx context.Context, cell, keyspace string, callback func(*topodatapb.SrvKeyspace, error) bool) {
callback(srv.SrvKeyspace, srv.SrvKeyspaceError)
}

// WatchSrvVSchema implements srvtopo.Server
func (srv *PassthroughSrvTopoServer) WatchSrvVSchema(ctx context.Context, cell string, callback func(*vschemapb.SrvVSchema, error) bool) {
callback(srv.WatchedSrvVSchema, srv.WatchedSrvVSchemaError)
Expand Down
5 changes: 5 additions & 0 deletions go/vt/vtexplain/vtexplain_topo.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,11 @@ func (et *ExplainTopo) GetSrvKeyspace(ctx context.Context, cell, keyspace string
return srvKeyspace, nil
}

func (et *ExplainTopo) WatchSrvKeyspace(ctx context.Context, cell, keyspace string, callback func(*topodatapb.SrvKeyspace, error) bool) {
ks, err := et.GetSrvKeyspace(ctx, cell, keyspace)
callback(ks, err)
}

// WatchSrvVSchema is part of the srvtopo.Server interface.
func (et *ExplainTopo) WatchSrvVSchema(ctx context.Context, cell string, callback func(*vschemapb.SrvVSchema, error) bool) {
callback(et.getSrvVSchema(), nil)
Expand Down
4 changes: 4 additions & 0 deletions go/vt/vtgate/sandbox_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,10 @@ func (sct *sandboxTopo) GetSrvKeyspace(ctx context.Context, cell, keyspace strin
return createShardedSrvKeyspace(sand.ShardSpec, sand.KeyspaceServedFrom)
}

func (sct *sandboxTopo) WatchSrvKeyspace(ctx context.Context, cell, keyspace string, callback func(*topodatapb.SrvKeyspace, error) bool) {
panic("not supported: WatchSrvKeyspace")
}

// WatchSrvVSchema is part of the srvtopo.Server interface.
//
// If the sandbox was created with a backing topo service, piggy back on it
Expand Down
5 changes: 5 additions & 0 deletions go/vt/vtgate/vcursor_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,11 @@ func (f *fakeTopoServer) GetSrvKeyspace(ctx context.Context, cell, keyspace stri
return ks, nil
}

func (f *fakeTopoServer) WatchSrvKeyspace(ctx context.Context, cell, keyspace string, callback func(*topodatapb.SrvKeyspace, error) bool) {
ks, err := f.GetSrvKeyspace(ctx, cell, keyspace)
callback(ks, err)
}

// WatchSrvVSchema starts watching the SrvVSchema object for
// the provided cell. It will call the callback when
// a new value or an error occurs.
Expand Down