diff --git a/doc/ServerConfiguration.md b/doc/ServerConfiguration.md index 0124e470182..07020310175 100644 --- a/doc/ServerConfiguration.md +++ b/doc/ServerConfiguration.md @@ -552,6 +552,7 @@ Load-balancer in front of vtgate to scale up (not covered by Vitess). Stateless, ### Parameters * **cells_to_watch**: which cell vtgate is in and will monitor tablets from. Cross-cell master access needs multiple cells here. +* **keyspaces_to_watch**: Specifies that a vtgate will only be able to perform queries against or view the topology of these keyspaces * **tablet_types_to_wait**: VTGate waits for at least one serving tablet per tablet type specified here during startup, before listening to the serving port. So VTGate does not serve error. It should match the available tablet types VTGate connects to (master, replica, rdonly). * **discovery_low_replication_lag**: when replication lags of all VTTablet in a particular shard and tablet type are less than or equal the flag (in seconds), VTGate does not filter them by replication lag and uses all to balance traffic. * **degraded_threshold (30s)**: a tablet will publish itself as degraded if replication lag exceeds this threshold. This will cause VTGates to choose more up-to-date servers over this one. If all servers are degraded, VTGate resorts to serving from all of them. diff --git a/go/vt/srvtopo/keyspace_filtering_server.go b/go/vt/srvtopo/keyspace_filtering_server.go new file mode 100644 index 00000000000..32b264001cc --- /dev/null +++ b/go/vt/srvtopo/keyspace_filtering_server.go @@ -0,0 +1,116 @@ +/* +Copyright 2018 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package srvtopo + +import ( + "fmt" + + "golang.org/x/net/context" + + topodatapb "vitess.io/vitess/go/vt/proto/topodata" + vschemapb "vitess.io/vitess/go/vt/proto/vschema" + "vitess.io/vitess/go/vt/topo" +) + +var ( + // ErrNilUnderlyingServer is returned when attempting to create a new keyspace + // filtering server if a nil underlying server implementation is provided. + ErrNilUnderlyingServer = fmt.Errorf("Unable to construct filtering server without an underlying server") + + // ErrTopoServerNotAvailable is returned if a caller tries to access the + // topo.Server supporting this srvtopo.Server. + ErrTopoServerNotAvailable = fmt.Errorf("Cannot access underlying topology server when keyspace filtering is enabled") +) + +// NewKeyspaceFilteringServer constructs a new server based on the provided +// implementation that prevents the specified keyspaces from being exposed +// to consumers of the new Server. +// +// A filtering server will not allow access to the topo.Server to prevent +// updates that may corrupt the global VSchema keyspace. +func NewKeyspaceFilteringServer(underlying Server, selectedKeyspaces []string) (Server, error) { + if underlying == nil { + return nil, ErrNilUnderlyingServer + } + + keyspaces := map[string]bool{} + for _, ks := range selectedKeyspaces { + keyspaces[ks] = true + } + + return keyspaceFilteringServer{ + server: underlying, + selectKeyspaces: keyspaces, + }, nil +} + +type keyspaceFilteringServer struct { + server Server + selectKeyspaces map[string]bool +} + +// GetTopoServer returns an error; filtering srvtopo.Server consumers may not +// access the underlying topo.Server. +func (ksf keyspaceFilteringServer) GetTopoServer() (*topo.Server, error) { + return nil, ErrTopoServerNotAvailable +} + +func (ksf keyspaceFilteringServer) GetSrvKeyspaceNames( + ctx context.Context, + cell string, +) ([]string, error) { + keyspaces, err := ksf.server.GetSrvKeyspaceNames(ctx, cell) + ret := make([]string, 0, len(keyspaces)) + for _, ks := range keyspaces { + if ksf.selectKeyspaces[ks] { + ret = append(ret, ks) + } + } + return ret, err +} + +func (ksf keyspaceFilteringServer) GetSrvKeyspace( + ctx context.Context, + cell, + keyspace string, +) (*topodatapb.SrvKeyspace, error) { + if !ksf.selectKeyspaces[keyspace] { + return nil, topo.NewError(topo.NoNode, keyspace) + } + + return ksf.server.GetSrvKeyspace(ctx, cell, keyspace) +} + +func (ksf keyspaceFilteringServer) WatchSrvVSchema( + ctx context.Context, + cell string, + callback func(*vschemapb.SrvVSchema, error), +) { + filteringCallback := func(schema *vschemapb.SrvVSchema, err error) { + if schema != nil { + for ks := range schema.Keyspaces { + if !ksf.selectKeyspaces[ks] { + delete(schema.Keyspaces, ks) + } + } + } + + callback(schema, err) + } + + ksf.server.WatchSrvVSchema(ctx, cell, filteringCallback) +} diff --git a/go/vt/srvtopo/keyspace_filtering_server_test.go b/go/vt/srvtopo/keyspace_filtering_server_test.go new file mode 100644 index 00000000000..7ec154dd403 --- /dev/null +++ b/go/vt/srvtopo/keyspace_filtering_server_test.go @@ -0,0 +1,229 @@ +/* +Copyright 2018 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package srvtopo + +import ( + "fmt" + "reflect" + "sync" + "testing" + + "golang.org/x/net/context" + + topodatapb "vitess.io/vitess/go/vt/proto/topodata" + vschemapb "vitess.io/vitess/go/vt/proto/vschema" + "vitess.io/vitess/go/vt/srvtopo/srvtopotest" + "vitess.io/vitess/go/vt/topo" + "vitess.io/vitess/go/vt/topo/memorytopo" +) + +var ( + stockCell = "some-cell" + stockCtx = context.Background() + stockFilters = []string{"bar", "baz"} + stockKeyspaces = map[string]*topodatapb.SrvKeyspace{ + "foo": &topodatapb.SrvKeyspace{ShardingColumnName: "foo"}, + "bar": &topodatapb.SrvKeyspace{ShardingColumnName: "bar"}, + "baz": &topodatapb.SrvKeyspace{ShardingColumnName: "baz"}, + } + stockVSchema = &vschemapb.SrvVSchema{ + Keyspaces: map[string]*vschemapb.Keyspace{ + "foo": &vschemapb.Keyspace{Sharded: true}, + "bar": &vschemapb.Keyspace{Sharded: true}, + "baz": &vschemapb.Keyspace{Sharded: false}, + }, + } +) + +func newFiltering(filter []string) (*topo.Server, *srvtopotest.PassthroughSrvTopoServer, Server) { + testServer := srvtopotest.NewPassthroughSrvTopoServer() + + testServer.TopoServer = memorytopo.NewServer(stockCell) + testServer.SrvKeyspaceNames = []string{"foo", "bar", "baz"} + testServer.SrvKeyspace = &topodatapb.SrvKeyspace{ShardingColumnName: "test-column"} + testServer.WatchedSrvVSchema = stockVSchema + + filtering, _ := NewKeyspaceFilteringServer(testServer, filter) + return testServer.TopoServer, testServer, filtering +} + +func TestFilteringServerHandlesNilUnderlying(t *testing.T) { + got, gotErr := NewKeyspaceFilteringServer(nil, []string{}) + if got != nil { + t.Errorf("got: %v, wanted: nil server", got) + } + if gotErr != ErrNilUnderlyingServer { + t.Errorf("Bad error returned: got %v wanted %v", gotErr, ErrNilUnderlyingServer) + } +} + +func TestFilteringServerReturnsUnderlyingServer(t *testing.T) { + _, _, f := newFiltering(nil) + got, gotErr := f.GetTopoServer() + if got != nil { + t.Errorf("Got non-nil topo.Server from FilteringServer") + } + if gotErr != ErrTopoServerNotAvailable { + t.Errorf("Unexpected error from GetTopoServer; wanted %v but got %v", ErrTopoServerNotAvailable, gotErr) + } +} + +func doTestGetSrvKeyspaceNames( + t *testing.T, + f Server, + cell string, + want []string, + wantErr error, +) { + got, gotErr := f.GetSrvKeyspaceNames(stockCtx, cell) + + if got == nil { + t.Errorf("GetSrvKeyspaceNames failed: should not return nil") + } + if !reflect.DeepEqual(got, want) { + t.Errorf("GetSrvKeyspaceNames failed: want %v, got %v", want, got) + } + if wantErr != gotErr { + t.Errorf("GetSrvKeyspaceNames returned incorrect error: want %v, got %v", wantErr, gotErr) + } +} + +func TestFilteringServerGetSrvKeyspameNamesFiltersEverythingOut(t *testing.T) { + _, _, f := newFiltering(nil) + doTestGetSrvKeyspaceNames(t, f, stockCell, []string{}, nil) +} + +func TestFilteringServerGetSrvKeyspaceNamesFiltersKeyspaces(t *testing.T) { + _, _, f := newFiltering(stockFilters) + doTestGetSrvKeyspaceNames(t, f, stockCell, stockFilters, nil) +} + +func TestFilteringServerGetSrvKeyspaceNamesPassesThroughErrors(t *testing.T) { + _, mock, f := newFiltering(stockFilters) + wantErr := fmt.Errorf("some badcell error") + mock.SrvKeyspaceNamesError = wantErr + doTestGetSrvKeyspaceNames(t, f, "badcell", stockFilters, wantErr) +} + +func doTestGetSrvKeyspace( + t *testing.T, + f Server, + cell, + ksName string, + want *topodatapb.SrvKeyspace, + wantErr error, +) { + got, gotErr := f.GetSrvKeyspace(stockCtx, cell, ksName) + + gotColumnName := "" + wantColumnName := "" + if got != nil { + gotColumnName = got.ShardingColumnName + } + if want != nil { + wantColumnName = want.ShardingColumnName + } + + // a different pointer comes back so compare the expected return by proxy + // of a field we know the value of + if gotColumnName != wantColumnName { + t.Errorf("keyspace incorrect: got %v, want %v", got, want) + } + + if wantErr != gotErr { + t.Errorf("returned error incorrect: got %v, want %v", gotErr, wantErr) + } +} + +func TestFilteringServerGetSrvKeyspaceReturnsSelectedKeyspaces(t *testing.T) { + _, mock, f := newFiltering(stockFilters) + mock.SrvKeyspace = stockKeyspaces["bar"] + doTestGetSrvKeyspace(t, f, stockCell, "bar", stockKeyspaces["bar"], nil) +} + +func TestFilteringServerGetSrvKeyspaceErrorPassthrough(t *testing.T) { + wantErr := fmt.Errorf("some error") + _, mock, f := newFiltering(stockFilters) + mock.SrvKeyspace = stockKeyspaces["bar"] + mock.SrvKeyspaceError = wantErr + doTestGetSrvKeyspace(t, f, "badcell", "bar", stockKeyspaces["bar"], wantErr) +} + +func TestFilteringServerGetSrvKeyspaceFilters(t *testing.T) { + wantErr := topo.NewError(topo.NoNode, "foo") + _, mock, f := newFiltering(stockFilters) + mock.SrvKeyspaceError = wantErr + doTestGetSrvKeyspace(t, f, stockCell, "foo", nil, wantErr) +} + +func TestFilteringServerWatchSrvVSchemaFiltersPassthroughSrvVSchema(t *testing.T) { + _, mock, f := newFiltering(stockFilters) + + allowed := map[string]bool{} + for _, ks := range stockFilters { + allowed[ks] = true + } + + // we need to verify that the nested callback actually gets called + wg := sync.WaitGroup{} + wg.Add(1) + + cb := func(gotSchema *vschemapb.SrvVSchema, gotErr error) { + // ensure that only selected keyspaces made it into the callback + for name, ks := range gotSchema.Keyspaces { + if !allowed[name] { + t.Errorf("Unexpected keyspace found in callback: %v", ks) + } + wantKS := mock.WatchedSrvVSchema.Keyspaces[name] + if !reflect.DeepEqual(ks, wantKS) { + t.Errorf( + "Expected keyspace to be passed through unmodified: want %#v got %#v", + wantKS, + ks, + ) + } + } + wg.Done() + } + + f.WatchSrvVSchema(stockCtx, stockCell, cb) + wg.Wait() +} + +func TestFilteringServerWatchSrvVSchemaHandlesNilSchema(t *testing.T) { + wantErr := fmt.Errorf("some err") + _, mock, f := newFiltering(stockFilters) + mock.WatchedSrvVSchema = nil + mock.WatchedSrvVSchemaError = wantErr + + // we need to verify that the nested callback actually gets called + wg := sync.WaitGroup{} + wg.Add(1) + + cb := func(gotSchema *vschemapb.SrvVSchema, gotErr error) { + if gotSchema != nil { + t.Errorf("Expected nil gotSchema: got %#v", gotSchema) + } + if gotErr != wantErr { + t.Errorf("Unexpected error: want %v got %v", wantErr, gotErr) + } + wg.Done() + } + + f.WatchSrvVSchema(stockCtx, "other-cell", cb) + wg.Wait() +} diff --git a/go/vt/srvtopo/resilient_server.go b/go/vt/srvtopo/resilient_server.go index adcd2f0751c..6340bfe6551 100644 --- a/go/vt/srvtopo/resilient_server.go +++ b/go/vt/srvtopo/resilient_server.go @@ -223,8 +223,8 @@ func NewResilientServer(base *topo.Server, counterPrefix string) *ResilientServe } // GetTopoServer returns the topo.Server that backs the resilient server. -func (server *ResilientServer) GetTopoServer() *topo.Server { - return server.topoServer +func (server *ResilientServer) GetTopoServer() (*topo.Server, error) { + return server.topoServer, nil } // GetSrvKeyspaceNames returns all keyspace names for the given cell. diff --git a/go/vt/srvtopo/server.go b/go/vt/srvtopo/server.go index bb7b0cc6be1..b783c78d3f8 100644 --- a/go/vt/srvtopo/server.go +++ b/go/vt/srvtopo/server.go @@ -32,8 +32,8 @@ import ( // the serving graph read-only calls used by clients to resolve // serving addresses, and to get VSchema. type Server interface { - // GetTopoServer returns the full topo.Server instance - GetTopoServer() *topo.Server + // GetTopoServer returns the full topo.Server instance. + GetTopoServer() (*topo.Server, error) // GetSrvKeyspaceNames returns the list of keyspaces served in // the provided cell. diff --git a/go/vt/srvtopo/srvtopotest/passthrough.go b/go/vt/srvtopo/srvtopotest/passthrough.go new file mode 100644 index 00000000000..09b0bae2fa7 --- /dev/null +++ b/go/vt/srvtopo/srvtopotest/passthrough.go @@ -0,0 +1,65 @@ +/* +Copyright 2018 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package srvtopotest + +import ( + "golang.org/x/net/context" + + topodatapb "vitess.io/vitess/go/vt/proto/topodata" + vschemapb "vitess.io/vitess/go/vt/proto/vschema" + "vitess.io/vitess/go/vt/topo" +) + +// PassthroughSrvTopoServer is a bare implementation of srvtopo.Server for use in tests +type PassthroughSrvTopoServer struct { + TopoServer *topo.Server + TopoServerError error + + SrvKeyspaceNames []string + SrvKeyspaceNamesError error + + SrvKeyspace *topodatapb.SrvKeyspace + SrvKeyspaceError error + + WatchedSrvVSchema *vschemapb.SrvVSchema + WatchedSrvVSchemaError error +} + +// NewPassthroughSrvTopoServer returns a new, unconfigured test PassthroughSrvTopoServer +func NewPassthroughSrvTopoServer() *PassthroughSrvTopoServer { + return &PassthroughSrvTopoServer{} +} + +// GetTopoServer implements srvtopo.Server +func (srv *PassthroughSrvTopoServer) GetTopoServer() (*topo.Server, error) { + return srv.TopoServer, srv.TopoServerError +} + +// GetSrvKeyspaceNames implements srvtopo.Server +func (srv *PassthroughSrvTopoServer) GetSrvKeyspaceNames(ctx context.Context, cell string) ([]string, error) { + return srv.SrvKeyspaceNames, srv.SrvKeyspaceNamesError +} + +// GetSrvKeyspace implements srvtopo.Server +func (srv *PassthroughSrvTopoServer) GetSrvKeyspace(ctx context.Context, cell, keyspace string) (*topodatapb.SrvKeyspace, error) { + return srv.SrvKeyspace, srv.SrvKeyspaceError +} + +// WatchSrvVSchema implements srvtopo.Server +func (srv *PassthroughSrvTopoServer) WatchSrvVSchema(ctx context.Context, cell string, callback func(*vschemapb.SrvVSchema, error)) { + callback(srv.WatchedSrvVSchema, srv.WatchedSrvVSchemaError) +} diff --git a/go/vt/vtexplain/vtexplain_topo.go b/go/vt/vtexplain/vtexplain_topo.go index f168636eb80..e4e0f374969 100644 --- a/go/vt/vtexplain/vtexplain_topo.go +++ b/go/vt/vtexplain/vtexplain_topo.go @@ -55,8 +55,8 @@ func (et *ExplainTopo) getSrvVSchema() *vschemapb.SrvVSchema { } // GetTopoServer is part of the srvtopo.Server interface -func (et *ExplainTopo) GetTopoServer() *topo.Server { - return nil +func (et *ExplainTopo) GetTopoServer() (*topo.Server, error) { + return nil, nil } // GetSrvKeyspaceNames is part of the srvtopo.Server interface. diff --git a/go/vt/vtgate/gateway/discoverygateway.go b/go/vt/vtgate/gateway/discoverygateway.go index 6d5d4ed7522..9b075d2f593 100644 --- a/go/vt/vtgate/gateway/discoverygateway.go +++ b/go/vt/vtgate/gateway/discoverygateway.go @@ -87,8 +87,13 @@ type discoveryGateway struct { func createDiscoveryGateway(hc discovery.HealthCheck, serv srvtopo.Server, cell string, retryCount int) Gateway { var topoServer *topo.Server if serv != nil { - topoServer = serv.GetTopoServer() + var err error + topoServer, err = serv.GetTopoServer() + if err != nil { + log.Exitf("Unable to create new discoverygateway: %v", err) + } } + dg := &discoveryGateway{ hc: hc, tsc: discovery.NewTabletStatsCacheDoNotSetListener(topoServer, cell), @@ -111,6 +116,10 @@ func createDiscoveryGateway(hc discovery.HealthCheck, serv srvtopo.Server, cell } var tr discovery.TabletRecorder = dg.hc if len(tabletFilters) > 0 { + if len(KeyspacesToWatch) > 0 { + log.Exitf("Only one of -keyspaces_to_watch and -tablet_filters may be specified at a time") + } + fbs, err := discovery.NewFilterByShard(dg.hc, tabletFilters) if err != nil { log.Exitf("Cannot parse tablet_filters parameter: %v", err) diff --git a/go/vt/vtgate/gateway/gateway.go b/go/vt/vtgate/gateway/gateway.go index 35893063b4a..3a6cd395fa9 100644 --- a/go/vt/vtgate/gateway/gateway.go +++ b/go/vt/vtgate/gateway/gateway.go @@ -24,6 +24,7 @@ import ( "time" "golang.org/x/net/context" + "vitess.io/vitess/go/flagutil" "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/discovery" @@ -40,8 +41,17 @@ import ( var ( implementation = flag.String("gateway_implementation", "discoverygateway", "The implementation of gateway") initialTabletTimeout = flag.Duration("gateway_initial_tablet_timeout", 30*time.Second, "At startup, the gateway will wait up to that duration to get one tablet per keyspace/shard/tablettype") + + // KeyspacesToWatch - if provided this specifies which keyspaces should be + // visible to a vtgate. By default the vtgate will allow access to any + // keyspace. + KeyspacesToWatch flagutil.StringListValue ) +func init() { + flag.Var(&KeyspacesToWatch, "keyspaces_to_watch", "Specifies which keyspaces this vtgate should have access to while routing queries or accessing the vschema") +} + // A Gateway is the query processing module for each shard, // which is used by ScatterConn. type Gateway interface { diff --git a/go/vt/vtgate/sandbox_test.go b/go/vt/vtgate/sandbox_test.go index 484464de4f6..24b78580b5a 100644 --- a/go/vt/vtgate/sandbox_test.go +++ b/go/vt/vtgate/sandbox_test.go @@ -235,8 +235,8 @@ func newSandboxForCells(cells []string) *sandboxTopo { } // GetTopoServer is part of the srvtopo.Server interface -func (sct *sandboxTopo) GetTopoServer() *topo.Server { - return sct.topoServer +func (sct *sandboxTopo) GetTopoServer() (*topo.Server, error) { + return sct.topoServer, nil } // GetSrvKeyspaceNames is part of the srvtopo.Server interface. diff --git a/go/vt/vtgate/vschema_manager.go b/go/vt/vtgate/vschema_manager.go index 5823c080c14..d3189f58b2d 100644 --- a/go/vt/vtgate/vschema_manager.go +++ b/go/vt/vtgate/vschema_manager.go @@ -122,22 +122,25 @@ func (vm *VSchemaManager) watchSrvVSchema(ctx context.Context, cell string) { // the given keyspace is updated in the global topo, and the full SrvVSchema // is updated in all known cells. func (vm *VSchemaManager) UpdateVSchema(ctx context.Context, ksName string, vschema *vschemapb.SrvVSchema) error { - topo := vm.e.serv.GetTopoServer() + topoServer, err := vm.e.serv.GetTopoServer() + if err != nil { + return err + } ks := vschema.Keyspaces[ksName] - err := topo.SaveVSchema(ctx, ksName, ks) + err = topoServer.SaveVSchema(ctx, ksName, ks) if err != nil { return err } - cells, err := vm.e.serv.GetTopoServer().GetKnownCells(ctx) + cells, err := topoServer.GetKnownCells(ctx) if err != nil { return err } // even if one cell fails, continue to try the others for _, cell := range cells { - cellErr := vm.e.serv.GetTopoServer().UpdateSrvVSchema(ctx, cell, vschema) + cellErr := topoServer.UpdateSrvVSchema(ctx, cell, vschema) if cellErr != nil { err = cellErr log.Errorf("error updating vschema in cell %s: %v", cell, cellErr) diff --git a/go/vt/vtgate/vtgate.go b/go/vt/vtgate/vtgate.go index 0e85e7af4d7..75305867e18 100644 --- a/go/vt/vtgate/vtgate.go +++ b/go/vt/vtgate/vtgate.go @@ -193,6 +193,17 @@ func Init(ctx context.Context, hc discovery.HealthCheck, serv srvtopo.Server, ce log.Fatalf("'-disable_local_gateway' cannot be specified if 'l2vtgate_addrs' is also empty, otherwise this vtgate has no backend") } + // If we want to filter keyspaces replace the srvtopo.Server with a + // filtering server + if len(gateway.KeyspacesToWatch) > 0 { + log.Infof("Keyspace filtering enabled, selecting %v", gateway.KeyspacesToWatch) + var err error + serv, err = srvtopo.NewKeyspaceFilteringServer(serv, gateway.KeyspacesToWatch) + if err != nil { + log.Fatalf("Unable to construct SrvTopo server: %v", err.Error()) + } + } + tc := NewTxConn(gw, getTxMode()) // ScatterConn depends on TxConn to perform forced rollbacks. sc := NewScatterConn("VttabletCall", tc, gw, hc)