Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
create table customers (
id bigint,
name varchar(64),
age SMALLINT,
primary key (id)
) Engine=InnoDB;

Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
alter vschema on customers add vindex hash (id);
10 changes: 10 additions & 0 deletions go/cmd/vttestserver/data/schema/app_customer/vschema.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"sharded": true,
"vindexes": {
"hash": {
"type": "hash"
}
},
"tables": {
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
create table test_table (
id bigint,
name varchar(64),
age SMALLINT,
percent DECIMAL(5,2),
datetime_col DATETIME,
timestamp_col TIMESTAMP,
date_col DATE,
time_col TIME,
primary key (id)
) Engine=InnoDB;

Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
alter vschema create vindex my_vdx using hash
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
alter vschema on test_table add vindex my_vdx (id)
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
create table test_table1 (
id bigint,
name varchar(64),
age SMALLINT,
percent DECIMAL(5,2),
datetime_col DATETIME,
timestamp_col TIMESTAMP,
date_col DATE,
time_col TIME,
primary key (id)
) Engine=InnoDB;
24 changes: 13 additions & 11 deletions go/cmd/vttestserver/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,32 +199,34 @@ func parseFlags() (config vttest.Config, env vttest.Environment, err error) {
}

func main() {
cluster := runCluster()
defer cluster.TearDown()

kvconf := cluster.JSONConfig()
if err := json.NewEncoder(os.Stdout).Encode(kvconf); err != nil {
log.Fatal(err)
}

select {}
}

func runCluster() vttest.LocalCluster {
config, env, err := parseFlags()
if err != nil {
log.Fatal(err)
}

log.Infof("Starting local cluster...")
log.Infof("config: %#v", config)

cluster := vttest.LocalCluster{
Config: config,
Env: env,
}

err = cluster.Setup()
defer cluster.TearDown()

if err != nil {
log.Fatal(err)
}

kvconf := cluster.JSONConfig()
if err := json.NewEncoder(os.Stdout).Encode(kvconf); err != nil {
log.Fatal(err)
}

log.Info("Local cluster started.")

select {}
return cluster
}
68 changes: 68 additions & 0 deletions go/cmd/vttestserver/vttestserver_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package main

import (
"context"
"fmt"
"os"
"testing"

"vitess.io/vitess/go/vt/vttest"

"github.com/golang/protobuf/jsonpb"
"vitess.io/vitess/go/vt/proto/logutil"
"vitess.io/vitess/go/vt/proto/vschema"
"vitess.io/vitess/go/vt/vtctl/vtctlclient"
)

type columnVindex struct {
keyspace string
table string
vindex string
vindexType string
column string
}

func TestRunsVschemaMigrations(t *testing.T) {
schemaDirArg := "-schema_dir=data/schema"
webDirArg := "-web_dir=web/vtctld/app"
webDir2Arg := "-web_dir2=web/vtctld2/app"
tabletHostname := "-tablet_hostname=localhost"
keyspaceArg := "-keyspaces=test_keyspace,app_customer"
numShardsArg := "-num_shards=2,2"

os.Args = append(os.Args, []string{schemaDirArg, keyspaceArg, numShardsArg, webDirArg, webDir2Arg, tabletHostname}...)

cluster := runCluster()
defer cluster.TearDown()

assertColumnVindex(t, cluster, columnVindex{keyspace: "test_keyspace", table: "test_table", vindex: "my_vdx", vindexType: "hash", column: "id"})
assertColumnVindex(t, cluster, columnVindex{keyspace: "app_customer", table: "customers", vindex: "hash", vindexType: "hash", column: "id"})
}

func assertColumnVindex(t *testing.T, cluster vttest.LocalCluster, expected columnVindex) {
server := fmt.Sprintf("localhost:%v", cluster.GrpcPort())
args := []string{"GetVSchema", expected.keyspace}
ctx := context.Background()

err := vtctlclient.RunCommandAndWait(ctx, server, args, func(e *logutil.Event) {
var keyspace vschema.Keyspace
if err := jsonpb.UnmarshalString(e.Value, &keyspace); err != nil {
t.Error(err)
}

columnVindex := keyspace.Tables[expected.table].ColumnVindexes[0]
actualVindex := keyspace.Vindexes[expected.vindex]
assertEqual(t, actualVindex.Type, expected.vindexType, "Actual vindex type different from expected")
assertEqual(t, columnVindex.Name, expected.vindex, "Actual vindex name different from expected")
assertEqual(t, columnVindex.Columns[0], expected.column, "Actual vindex column different from expected")
})
if err != nil {
t.Error(err)
}
}

func assertEqual(t *testing.T, actual string, expected string, message string) {
if actual != expected {
t.Errorf("%s: actual %s, expected %s", message, actual, expected)
}
}
11 changes: 7 additions & 4 deletions go/vt/vtgate/endtoend/deletetest/delete_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,11 @@ import (
)

var (
cluster *vttest.LocalCluster
vtParams mysql.ConnParams
mysqlParams mysql.ConnParams
grpcAddress string
cluster *vttest.LocalCluster
vtParams mysql.ConnParams
mysqlParams mysql.ConnParams
grpcAddress string
tabletHostName = flag.String("tablet_hostname", "", "the tablet hostname")
Comment thread
tirsen marked this conversation as resolved.
Outdated

schema = `
create table t1(
Expand Down Expand Up @@ -134,6 +135,8 @@ func TestMain(m *testing.M) {
}
defer os.RemoveAll(cfg.SchemaDir)

cfg.TabletHostName = *tabletHostName

cluster = &vttest.LocalCluster{
Config: cfg,
}
Expand Down
11 changes: 7 additions & 4 deletions go/vt/vtgate/endtoend/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,11 @@ import (
)

var (
cluster *vttest.LocalCluster
vtParams mysql.ConnParams
mysqlParams mysql.ConnParams
grpcAddress string
cluster *vttest.LocalCluster
vtParams mysql.ConnParams
mysqlParams mysql.ConnParams
grpcAddress string
tabletHostName = flag.String("tablet_hostname", "", "the tablet hostname")

schema = `
create table t1(
Expand Down Expand Up @@ -178,6 +179,8 @@ func TestMain(m *testing.M) {
}
defer os.RemoveAll(cfg.SchemaDir)

cfg.TabletHostName = *tabletHostName

cluster = &vttest.LocalCluster{
Config: cfg,
}
Expand Down
68 changes: 59 additions & 9 deletions go/vt/vttest/local_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ import (
"strings"
"unicode"

"vitess.io/vitess/go/vt/proto/logutil"
// we need to import the grpcvtctlclient library so the gRPC
// vtctl client is registered and can be used.
_ "vitess.io/vitess/go/vt/vtctl/grpcvtctlclient"
"vitess.io/vitess/go/vt/vtctl/vtctlclient"

"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/log"
Expand Down Expand Up @@ -238,6 +244,16 @@ func (db *LocalCluster) Setup() error {
return err
}

if !db.OnlyMySQL {
log.Infof("Starting vtcombo...")
db.vt = VtcomboProcess(db.Env, &db.Config, db.mysql)
if err := db.vt.WaitStart(); err != nil {
return err
}
log.Infof("vtcombo up: %s", db.vt.Address())
}

// Load schema will apply db and vschema migrations. Running after vtcombo starts to be able to apply vschema migrations
if err := db.loadSchema(); err != nil {
return err
}
Expand All @@ -248,15 +264,6 @@ func (db *LocalCluster) Setup() error {
}
}

if !db.OnlyMySQL {
log.Infof("Starting vtcombo...")
db.vt = VtcomboProcess(db.Env, &db.Config, db.mysql)
if err := db.vt.WaitStart(); err != nil {
return err
}
log.Infof("vtcombo up: %s", db.vt.Address())
}

return nil
}

Expand Down Expand Up @@ -310,6 +317,7 @@ func isDir(path string) bool {
return err == nil && info.IsDir()
}

// loadSchema applies sql and vschema migrations respectively for each keyspace in the topology
func (db *LocalCluster) loadSchema() error {
if db.SchemaDir == "" {
return nil
Expand Down Expand Up @@ -345,12 +353,26 @@ func (db *LocalCluster) loadSchema() error {
return err
}

// One single vschema migration per file
if !db.OnlyMySQL && len(cmds) == 1 && strings.HasPrefix(strings.ToUpper(cmds[0]), "ALTER VSCHEMA") {
if err = db.applyVschema(keyspace, cmds[0]); err != nil {
return err
}
continue
}

for _, dbname := range db.shardNames(kpb) {
if err := db.Execute(cmds, dbname); err != nil {
return err
}
}
}

if !db.OnlyMySQL {
if err := db.reloadSchemaKeyspace(keyspace); err != nil {
return err
}
}
}

return nil
Expand Down Expand Up @@ -435,6 +457,34 @@ func (db *LocalCluster) JSONConfig() interface{} {
return config
}

// GrpcPort returns the grpc port used by vtcombo
func (db *LocalCluster) GrpcPort() int {
return db.vt.PortGrpc
}

func (db *LocalCluster) applyVschema(keyspace string, migration string) error {
server := fmt.Sprintf("localhost:%v", db.vt.PortGrpc)
args := []string{"ApplyVSchema", "-sql", migration, keyspace}
fmt.Printf("Applying vschema %v", args)
err := vtctlclient.RunCommandAndWait(context.Background(), server, args, func(e *logutil.Event) {
log.Info(e)
})

return err
}

func (db *LocalCluster) reloadSchemaKeyspace(keyspace string) error {
server := fmt.Sprintf("localhost:%v", db.vt.PortGrpc)
args := []string{"ReloadSchemaKeyspace", "-include_master=true", keyspace}
fmt.Printf("Reloading keyspace schema %v", args)

err := vtctlclient.RunCommandAndWait(context.Background(), server, args, func(e *logutil.Event) {
log.Info(e)
})

return err
}

// LoadSQLFile loads a parses a .sql file from disk, removing all the
// different comments that mysql/mysqldump inserts in these, and returning
// each individual SQL statement as its own string.
Expand Down