diff --git a/go/vt/srvtopo/keyspace_filtering_server.go b/go/vt/srvtopo/keyspace_filtering_server.go index 3762ed64476..cf3ed3b7438 100644 --- a/go/vt/srvtopo/keyspace_filtering_server.go +++ b/go/vt/srvtopo/keyspace_filtering_server.go @@ -95,6 +95,23 @@ func (ksf keyspaceFilteringServer) GetSrvKeyspace( return ksf.server.GetSrvKeyspace(ctx, cell, keyspace) } +func (ksf keyspaceFilteringServer) WatchSrvKeyspace( + ctx context.Context, + cell, keyspace string, + callback func(*topodatapb.SrvKeyspace, error) bool, +) { + filteringCallback := func(ks *topodatapb.SrvKeyspace, err error) bool { + if ks != nil { + if !ksf.selectKeyspaces[keyspace] { + return callback(nil, topo.NewError(topo.NoNode, keyspace)) + } + } + return callback(ks, err) + } + + ksf.server.WatchSrvKeyspace(ctx, cell, keyspace, filteringCallback) +} + func (ksf keyspaceFilteringServer) WatchSrvVSchema( ctx context.Context, cell string, diff --git a/go/vt/srvtopo/server.go b/go/vt/srvtopo/server.go index 3c5842480a9..bfbde498122 100644 --- a/go/vt/srvtopo/server.go +++ b/go/vt/srvtopo/server.go @@ -42,6 +42,8 @@ type Server interface { // GetSrvKeyspace returns the SrvKeyspace for a cell/keyspace. GetSrvKeyspace(ctx context.Context, cell, keyspace string) (*topodatapb.SrvKeyspace, error) + WatchSrvKeyspace(ctx context.Context, cell, keyspace string, callback func(*topodatapb.SrvKeyspace, error) bool) + // WatchSrvVSchema starts watching the SrvVSchema object for // the provided cell. It will call the callback when // a new value or an error occurs. diff --git a/go/vt/srvtopo/srvtopotest/passthrough.go b/go/vt/srvtopo/srvtopotest/passthrough.go index 7a49c599e1a..ad528cb0e8c 100644 --- a/go/vt/srvtopo/srvtopotest/passthrough.go +++ b/go/vt/srvtopo/srvtopotest/passthrough.go @@ -59,6 +59,10 @@ func (srv *PassthroughSrvTopoServer) GetSrvKeyspace(ctx context.Context, cell, k return srv.SrvKeyspace, srv.SrvKeyspaceError } +func (srv *PassthroughSrvTopoServer) WatchSrvKeyspace(ctx context.Context, cell, keyspace string, callback func(*topodatapb.SrvKeyspace, error) bool) { + callback(srv.SrvKeyspace, srv.SrvKeyspaceError) +} + // WatchSrvVSchema implements srvtopo.Server func (srv *PassthroughSrvTopoServer) WatchSrvVSchema(ctx context.Context, cell string, callback func(*vschemapb.SrvVSchema, error) bool) { callback(srv.WatchedSrvVSchema, srv.WatchedSrvVSchemaError) diff --git a/go/vt/vtexplain/vtexplain_topo.go b/go/vt/vtexplain/vtexplain_topo.go index 3302b68d5ce..789a7670175 100644 --- a/go/vt/vtexplain/vtexplain_topo.go +++ b/go/vt/vtexplain/vtexplain_topo.go @@ -113,6 +113,11 @@ func (et *ExplainTopo) GetSrvKeyspace(ctx context.Context, cell, keyspace string return srvKeyspace, nil } +func (et *ExplainTopo) WatchSrvKeyspace(ctx context.Context, cell, keyspace string, callback func(*topodatapb.SrvKeyspace, error) bool) { + ks, err := et.GetSrvKeyspace(ctx, cell, keyspace) + callback(ks, err) +} + // WatchSrvVSchema is part of the srvtopo.Server interface. func (et *ExplainTopo) WatchSrvVSchema(ctx context.Context, cell string, callback func(*vschemapb.SrvVSchema, error) bool) { callback(et.getSrvVSchema(), nil) diff --git a/go/vt/vtgate/sandbox_test.go b/go/vt/vtgate/sandbox_test.go index 96bac8d6f74..4a307480056 100644 --- a/go/vt/vtgate/sandbox_test.go +++ b/go/vt/vtgate/sandbox_test.go @@ -283,6 +283,10 @@ func (sct *sandboxTopo) GetSrvKeyspace(ctx context.Context, cell, keyspace strin return createShardedSrvKeyspace(sand.ShardSpec, sand.KeyspaceServedFrom) } +func (sct *sandboxTopo) WatchSrvKeyspace(ctx context.Context, cell, keyspace string, callback func(*topodatapb.SrvKeyspace, error) bool) { + panic("not supported: WatchSrvKeyspace") +} + // WatchSrvVSchema is part of the srvtopo.Server interface. // // If the sandbox was created with a backing topo service, piggy back on it diff --git a/go/vt/vtgate/vcursor_impl_test.go b/go/vt/vtgate/vcursor_impl_test.go index 0df824f43d3..3a0259aab4a 100644 --- a/go/vt/vtgate/vcursor_impl_test.go +++ b/go/vt/vtgate/vcursor_impl_test.go @@ -68,6 +68,11 @@ func (f *fakeTopoServer) GetSrvKeyspace(ctx context.Context, cell, keyspace stri return ks, nil } +func (f *fakeTopoServer) WatchSrvKeyspace(ctx context.Context, cell, keyspace string, callback func(*topodatapb.SrvKeyspace, error) bool) { + ks, err := f.GetSrvKeyspace(ctx, cell, keyspace) + callback(ks, err) +} + // WatchSrvVSchema starts watching the SrvVSchema object for // the provided cell. It will call the callback when // a new value or an error occurs.