diff --git a/go/test/endtoend/vtgate/keyspace_watches/keyspace_watch_test.go b/go/test/endtoend/vtgate/keyspace_watches/keyspace_watch_test.go new file mode 100644 index 00000000000..9fc3c9d59ea --- /dev/null +++ b/go/test/endtoend/vtgate/keyspace_watches/keyspace_watch_test.go @@ -0,0 +1,136 @@ +/* +Copyright 2021 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +/* +Test the vtgate's ability to route while watching a subset of keyspaces. +*/ + +package keyspacewatches + +import ( + "database/sql" + "fmt" + "math/rand" + "os" + "testing" + "time" + + _ "github.com/go-sql-driver/mysql" + "github.com/stretchr/testify/require" + + "vitess.io/vitess/go/mysql" + "vitess.io/vitess/go/test/endtoend/cluster" +) + +var ( + vtParams mysql.ConnParams + keyspaceUnshardedName = "ks1" + cell = "zone1" + hostname = "localhost" + mysqlAuthServerStatic = "mysql_auth_server_static.json" + sqlSchema = ` + create table keyspaces_to_watch_test( + id BIGINT NOT NULL, + msg VARCHAR(64) NOT NULL, + PRIMARY KEY (id) + ) Engine=InnoDB;` +) + +// createConfig creates a config file in TmpDir in vtdataroot and writes the given data. +func createConfig(clusterInstance *cluster.LocalProcessCluster, name, data string) error { + // creating new file + f, err := os.Create(clusterInstance.TmpDirectory + "/" + name) + if err != nil { + return err + } + + if data == "" { + return nil + } + + // write the given data + _, err = fmt.Fprint(f, data) + return err +} + +func createCluster() (*cluster.LocalProcessCluster, int) { + clusterInstance := cluster.NewCluster(cell, hostname) + + // Start topo server + if err := clusterInstance.StartTopo(); err != nil { + return nil, 1 + } + + // create auth server config + SQLConfig := `{ + "testuser1": { + "Password": "testpassword1", + "UserData": "vtgate client 1" + } + }` + if err := createConfig(clusterInstance, mysqlAuthServerStatic, SQLConfig); err != nil { + return nil, 1 + } + + // Start keyspace + keyspace := &cluster.Keyspace{ + Name: keyspaceUnshardedName, + SchemaSQL: sqlSchema, + } + if err := clusterInstance.StartUnshardedKeyspace(*keyspace, 1, false); err != nil { + return nil, 1 + } + + clusterInstance.VtGateExtraArgs = []string{ + "-mysql_auth_server_static_file", clusterInstance.TmpDirectory + "/" + mysqlAuthServerStatic, + "-keyspaces_to_watch", "ks1", + } + + // Start vtgate + if err := clusterInstance.StartVtgate(); err != nil { + return nil, 1 + } + vtParams = mysql.ConnParams{ + Host: clusterInstance.Hostname, + Port: clusterInstance.VtgateMySQLPort, + } + rand.Seed(time.Now().UnixNano()) + return clusterInstance, 0 +} + +func TestRoutingWithKeyspacesToWatch(t *testing.T) { + defer cluster.PanicHandler(t) + + clusterInstance, exitCode := createCluster() + defer clusterInstance.Teardown() + + if exitCode != 0 { + os.Exit(exitCode) + } + + dsn := fmt.Sprintf( + "testuser1:testpassword1@tcp(%s:%v)/", + clusterInstance.Hostname, + clusterInstance.VtgateMySQLPort, + ) + db, err := sql.Open("mysql", dsn) + require.Nil(t, err) + defer db.Close() + + // if this returns w/o failing the test we're good to go + _, err = db.Exec("select * from keyspaces_to_watch_test") + require.Nil(t, err) +} diff --git a/go/vt/discovery/healthcheck.go b/go/vt/discovery/healthcheck.go index 4af9dfd7202..6d2416cd754 100644 --- a/go/vt/discovery/healthcheck.go +++ b/go/vt/discovery/healthcheck.go @@ -148,6 +148,11 @@ func init() { flag.Var(&KeyspacesToWatch, "keyspaces_to_watch", "Specifies which keyspaces this vtgate should have access to while routing queries or accessing the vschema") } +// FilteringKeyspaces returns true if any keyspaces have been configured to be filtered. +func FilteringKeyspaces() bool { + return len(KeyspacesToWatch) > 0 +} + // TabletRecorder is a sub interface of HealthCheck. // It is separated out to enable unit testing. type TabletRecorder interface { diff --git a/go/vt/vtgate/discoverygateway.go b/go/vt/vtgate/discoverygateway.go index e324e998efd..f0c2fed433a 100644 --- a/go/vt/vtgate/discoverygateway.go +++ b/go/vt/vtgate/discoverygateway.go @@ -126,7 +126,7 @@ func NewDiscoveryGateway(ctx context.Context, hc discovery.LegacyHealthCheck, se } var recorder discovery.LegacyTabletRecorder = dg.hc if len(discovery.TabletFilters) > 0 { - if len(discovery.KeyspacesToWatch) > 0 { + if discovery.FilteringKeyspaces() { log.Exitf("Only one of -keyspaces_to_watch and -tablet_filters may be specified at a time") } @@ -135,7 +135,7 @@ func NewDiscoveryGateway(ctx context.Context, hc discovery.LegacyHealthCheck, se log.Exitf("Cannot parse tablet_filters parameter: %v", err) } recorder = fbs - } else if len(discovery.KeyspacesToWatch) > 0 { + } else if discovery.FilteringKeyspaces() { recorder = discovery.NewLegacyFilterByKeyspace(recorder, discovery.KeyspacesToWatch) } diff --git a/go/vt/vtgate/vcursor_impl.go b/go/vt/vtgate/vcursor_impl.go index f60af6dca2b..0e5bc215689 100644 --- a/go/vt/vtgate/vcursor_impl.go +++ b/go/vt/vtgate/vcursor_impl.go @@ -42,6 +42,7 @@ import ( "context" "vitess.io/vitess/go/sqltypes" + "vitess.io/vitess/go/vt/discovery" "vitess.io/vitess/go/vt/key" "vitess.io/vitess/go/vt/sqlparser" "vitess.io/vitess/go/vt/srvtopo" @@ -163,7 +164,8 @@ func newVCursorImpl( vschema *vindexes.VSchema, resolver *srvtopo.Resolver, serv srvtopo.Server, - warnShardedOnly bool) (*vcursorImpl, error) { + warnShardedOnly bool, +) (*vcursorImpl, error) { keyspace, tabletType, destination, err := parseDestinationTarget(safeSession.TargetString, vschema) if err != nil { return nil, err @@ -174,7 +176,9 @@ func newVCursorImpl( return nil, vterrors.Errorf(vtrpcpb.Code_UNIMPLEMENTED, "transaction is supported only for master tablet type, current type: %v", tabletType) } var ts *topo.Server - if serv != nil { + // We don't have access to the underlying TopoServer if this vtgate is + // filtering keyspaces because we don't have an accurate view of the topo. + if serv != nil && !discovery.FilteringKeyspaces() { ts, err = serv.GetTopoServer() if err != nil { return nil, err @@ -588,6 +592,9 @@ func (vc *vcursorImpl) TabletType() topodatapb.TabletType { // SubmitOnlineDDL implements the VCursor interface func (vc *vcursorImpl) SubmitOnlineDDL(onlineDDl *schema.OnlineDDL) error { + if vc.topoServer == nil { + return vterrors.New(vtrpcpb.Code_INTERNAL, "Unable to apply DDL toposerver unavailable, ensure this vtgate is not using filtered keyspaces") + } conn, err := vc.topoServer.ConnForCell(vc.ctx, topo.GlobalCell) if err != nil { return err diff --git a/go/vt/vtgate/vtgate.go b/go/vt/vtgate/vtgate.go index 33e18832d6b..428c418fb22 100644 --- a/go/vt/vtgate/vtgate.go +++ b/go/vt/vtgate/vtgate.go @@ -172,7 +172,7 @@ func Init(ctx context.Context, serv srvtopo.Server, cell string, tabletTypesToWa // If we want to filter keyspaces replace the srvtopo.Server with a // filtering server - if len(discovery.KeyspacesToWatch) > 0 { + if discovery.FilteringKeyspaces() { log.Infof("Keyspace filtering enabled, selecting %v", discovery.KeyspacesToWatch) var err error serv, err = srvtopo.NewKeyspaceFilteringServer(serv, discovery.KeyspacesToWatch) @@ -505,7 +505,7 @@ func LegacyInit(ctx context.Context, hc discovery.LegacyHealthCheck, serv srvtop // If we want to filter keyspaces replace the srvtopo.Server with a // filtering server - if len(discovery.KeyspacesToWatch) > 0 { + if discovery.FilteringKeyspaces() { log.Infof("Keyspace filtering enabled, selecting %v", discovery.KeyspacesToWatch) var err error serv, err = srvtopo.NewKeyspaceFilteringServer(serv, discovery.KeyspacesToWatch)