Skip to content
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This has actual bug fixes in it.

Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

"vitess.io/vitess/go/cmd/vtctldclient/cli"
"vitess.io/vitess/go/cmd/vtctldclient/command/vreplication/common"
"vitess.io/vitess/go/sqlescape"

topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vschemapb "vitess.io/vitess/go/vt/proto/vschema"
Expand All @@ -50,7 +51,7 @@ var (
}

baseOptions = struct {
// This is where the lookup table and VReplicaiton workflow
// This is where the lookup table and VReplication workflow
// will be created.
TableKeyspace string
// This will be the name of the Lookup Vindex and the name
Expand Down Expand Up @@ -133,12 +134,20 @@ var (
if !strings.Contains(createOptions.Type, "lookup") {
return fmt.Errorf("vindex type must be a lookup vindex")
}
escapedTableKeyspace, err := sqlescape.EnsureEscaped(baseOptions.TableKeyspace)
if err != nil {
return fmt.Errorf("invalid table keyspace (%s): %v", baseOptions.TableKeyspace, err)
}
escapedTableName, err := sqlescape.EnsureEscaped(createOptions.TableName)
if err != nil {
return fmt.Errorf("invalid table name (%s): %v", createOptions.TableName, err)
}
baseOptions.Vschema = &vschemapb.Keyspace{
Vindexes: map[string]*vschemapb.Vindex{
baseOptions.Name: {
Type: createOptions.Type,
Params: map[string]string{
"table": baseOptions.TableKeyspace + "." + createOptions.TableName,
"table": escapedTableKeyspace + "." + escapedTableName,
"from": strings.Join(createOptions.TableOwnerColumns, ","),
"to": "keyspace_id",
"ignore_nulls": fmt.Sprintf("%t", createOptions.IgnoreNulls),
Expand Down Expand Up @@ -204,10 +213,18 @@ var (
return fmt.Errorf("%s is not a lookup vindex type", vindex.LookupVindexType)
}

escapedTableKeyspace, err := sqlescape.EnsureEscaped(baseOptions.TableKeyspace)
if err != nil {
return fmt.Errorf("invalid table keyspace (%s): %v", baseOptions.TableKeyspace, err)
}
escapedTableName, err := sqlescape.EnsureEscaped(createOptions.TableName)
if err != nil {
return fmt.Errorf("invalid table name (%s): %v", vindex.TableName, err)
}
vindexes[vindexName] = &vschemapb.Vindex{
Type: vindex.LookupVindexType,
Params: map[string]string{
"table": baseOptions.TableKeyspace + "." + vindex.TableName,
"table": escapedTableKeyspace + "." + escapedTableName,
"from": strings.Join(vindex.TableOwnerColumns, ","),
"to": "keyspace_id",
"ignore_nulls": fmt.Sprintf("%t", vindex.IgnoreNulls),
Expand Down
2 changes: 1 addition & 1 deletion go/test/endtoend/vreplication/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ func getClusterOptions(opts *clusterOptions) *clusterOptions {
opts = &clusterOptions{}
}
if opts.cells == nil {
opts.cells = []string{"zone1"}
opts.cells = []string{defaultCellName}
}
if opts.clusterConfig == nil {
opts.clusterConfig = mainClusterConfig
Expand Down
52 changes: 26 additions & 26 deletions go/test/endtoend/vreplication/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -431,44 +431,44 @@ create table ukTable (id1 int not null, id2 int not null, name varchar(20), uniq
}
}
`
materializeProductSpec = `
materializeProductSpec = fmt.Sprintf(`
{
"workflow": "cproduct",
"source_keyspace": "product",
"target_keyspace": "customer",
"source_keyspace": "%s",
"target_keyspace": "%s",
"table_settings": [{
"target_table": "cproduct",
"source_expression": "select * from product",
"create_ddl": "create table cproduct(pid bigint, description varchar(128), date1 datetime not null default '0000-00-00 00:00:00', date2 datetime not null default '2021-00-01 00:00:00', primary key(pid)) CHARSET=utf8mb4"
}]
}
`
`, defaultSourceKs, defaultTargetKs)

materializeCustomerNameSpec = `
materializeCustomerNameSpec = fmt.Sprintf(`
{
"workflow": "customer_name",
"source_keyspace": "customer",
"target_keyspace": "customer",
"source_keyspace": "%s",
"target_keyspace": "%s",
"table_settings": [{
"target_table": "customer_name",
"source_expression": "select cid, name from customer",
"create_ddl": "create table if not exists customer_name (cid bigint not null, name varchar(128), primary key(cid), key(name))"
}]
}
`
`, defaultTargetKs, defaultTargetKs)

materializeCustomerTypeSpec = `
materializeCustomerTypeSpec = fmt.Sprintf(`
{
"workflow": "enterprise_customer",
"source_keyspace": "customer",
"target_keyspace": "customer",
"source_keyspace": "%s",
"target_keyspace": "%s",
"table_settings": [{
"target_table": "enterprise_customer",
"source_expression": "select cid, name, typ from customer where typ = 'enterprise'",
"create_ddl": "create table if not exists enterprise_customer (cid bigint not null, name varchar(128), typ varchar(64), primary key(cid), key(typ))"
}]
}
`
`, defaultTargetKs, defaultTargetKs)

merchantOrdersVSchema = `
{
Expand Down Expand Up @@ -512,31 +512,31 @@ create table ukTable (id1 int not null, id2 int not null, name varchar(20), uniq
`

// the merchant-type keyspace allows us to test keyspace names with special characters in them (dash)
materializeMerchantOrdersSpec = `
materializeMerchantOrdersSpec = fmt.Sprintf(`
{
"workflow": "morders",
"source_keyspace": "customer",
"source_keyspace": "%s",
"target_keyspace": "merchant-type",
"table_settings": [{
"target_table": "morders",
"source_expression": "select oid, cid, mname, pid, price, qty, total from orders",
"create_ddl": "create table morders(oid int, cid int, mname varchar(128), pid int, price int, qty int, total int, total2 int as (10 * total), primary key(oid)) CHARSET=utf8"
}]
}
`
`, defaultTargetKs)

materializeMerchantSalesSpec = `
materializeMerchantSalesSpec = fmt.Sprintf(`
{
"workflow": "msales",
"source_keyspace": "customer",
"source_keyspace": "%s",
"target_keyspace": "merchant-type",
"table_settings": [{
"target_table": "msales",
"source_expression": "select mname as merchant_name, count(*) as kount, sum(price) as amount from orders group by merchant_name",
"create_ddl": "create table msales(merchant_name varchar(128), kount int, amount int, primary key(merchant_name)) CHARSET=utf8"
}]
}
`
`, defaultTargetKs)

materializeSalesVSchema = `
{
Expand All @@ -552,30 +552,30 @@ create table ukTable (id1 int not null, id2 int not null, name varchar(20), uniq
}
}
`
materializeSalesSpec = `
materializeSalesSpec = fmt.Sprintf(`
{
"workflow": "sales",
"source_keyspace": "customer",
"target_keyspace": "product",
"source_keyspace": "%s",
"target_keyspace": "%s",
"table_settings": [{
"target_Table": "sales",
"source_expression": "select pid, count(*) as kount, sum(price) as amount from orders group by pid",
"create_ddl": "create table sales(pid int, kount int, amount int, primary key(pid)) CHARSET=utf8"
}]
}
`
materializeRollupSpec = `
`, defaultTargetKs, defaultSourceKs)
materializeRollupSpec = fmt.Sprintf(`
{
"workflow": "rollup",
"source_keyspace": "product",
"target_keyspace": "product",
"source_keyspace": "%s",
"target_keyspace": "%s",
"table_settings": [{
"target_table": "rollup",
"source_expression": "select 'total' as rollupname, count(*) as kount from product group by rollupname",
"create_ddl": "create table rollup(rollupname varchar(100), kount int, primary key (rollupname)) CHARSET=utf8mb4"
}]
}
`
`, defaultSourceKs, defaultSourceKs)
initialExternalSchema = `
create table review(rid int, pid int, review varbinary(128), primary key(rid));
create table rating(gid int, pid int, rating int, primary key(gid));
Expand Down
4 changes: 2 additions & 2 deletions go/test/endtoend/vreplication/fk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func TestFKWorkflow(t *testing.T) {
defer vc.TearDown()

cell := vc.Cells[cellName]
vc.AddKeyspace(t, []*Cell{cell}, sourceKeyspace, shardName, initialFKSourceVSchema, initialFKSchema, 0, 0, 100, sourceKsOpts)
vc.AddKeyspace(t, []*Cell{cell}, sourceKeyspace, shardName, initialFKSourceVSchema, initialFKSchema, 0, 0, 100, defaultSourceKsOpts)

verifyClusterHealth(t, vc)
insertInitialFKData(t)
Expand All @@ -82,7 +82,7 @@ func TestFKWorkflow(t *testing.T) {

targetKeyspace := "fktarget"
targetTabletId := 200
vc.AddKeyspace(t, []*Cell{cell}, targetKeyspace, shardName, initialFKTargetVSchema, "", 0, 0, targetTabletId, sourceKsOpts)
vc.AddKeyspace(t, []*Cell{cell}, targetKeyspace, shardName, initialFKTargetVSchema, "", 0, 0, targetTabletId, defaultSourceKsOpts)

testFKCancel(t, vc)

Expand Down
14 changes: 9 additions & 5 deletions go/test/endtoend/vreplication/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,11 @@ func waitForRowCountInTablet(t *testing.T, vttablet *cluster.VttabletProcess, da
// Note: you specify the number of values that you want to reserve
// and you get back the max value reserved.
func waitForSequenceValue(t *testing.T, conn *mysql.Conn, database, sequence string, numVals int) int64 {
query := fmt.Sprintf("select next %d values from %s.%s", numVals, database, sequence)
escapedDB, err := sqlescape.EnsureEscaped(database)
require.NoError(t, err)
escapedSeq, err := sqlescape.EnsureEscaped(sequence)
require.NoError(t, err)
query := fmt.Sprintf("select next %d values from %s.%s", numVals, escapedDB, escapedSeq)
timer := time.NewTimer(defaultTimeout)
defer timer.Stop()
for {
Expand Down Expand Up @@ -545,7 +549,7 @@ func validateDryRunResults(t *testing.T, output string, want []string) {
}
if !match {
fail = true
require.Fail(t, "invlaid dry run results", "want %s, got %s\n", w, gotDryRun[i])
require.Fail(t, "invalid dry run results", "want %s, got %s\n", w, gotDryRun[i])
}
}
if fail {
Expand Down Expand Up @@ -646,11 +650,11 @@ func getDebugVar(t *testing.T, port int, varPath []string) (string, error) {
return string(val), nil
}

func confirmWorkflowHasCopiedNoData(t *testing.T, targetKS, workflow string) {
func confirmWorkflowHasCopiedNoData(t *testing.T, defaultTargetKs, workflow string) {
timer := time.NewTimer(defaultTimeout)
defer timer.Stop()
for {
output, err := vc.VtctldClient.ExecuteCommandWithOutput("Workflow", "--keyspace", targetKs, "show", "--workflow", workflow, "--compact", "--include-logs=false")
output, err := vc.VtctldClient.ExecuteCommandWithOutput("Workflow", "--keyspace", defaultTargetKs, "show", "--workflow", workflow, "--compact", "--include-logs=false")
require.NoError(t, err, output)
streams := gjson.Get(output, "workflows.0.shard_streams.*.streams")
streams.ForEach(func(streamId, stream gjson.Result) bool { // For each stream
Expand All @@ -662,7 +666,7 @@ func confirmWorkflowHasCopiedNoData(t *testing.T, targetKS, workflow string) {
(pos.Exists() && pos.String() != "") {
require.FailNowf(t, "Unexpected data copied in workflow",
"The MoveTables workflow %q copied data in less than %s when it should have been waiting. Show output: %s",
ksWorkflow, defaultTimeout, output)
defaultKsWorkflow, defaultTimeout, output)
}
return true
})
Expand Down
40 changes: 20 additions & 20 deletions go/test/endtoend/vreplication/initial_data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,15 @@ func insertInitialData(t *testing.T) {
defer closeConn()
log.Infof("Inserting initial data")
lines, _ := os.ReadFile("unsharded_init_data.sql")
execMultipleQueries(t, vtgateConn, "product:0", string(lines))
execVtgateQuery(t, vtgateConn, "product:0", "insert into customer_seq(id, next_id, cache) values(0, 100, 100);")
execVtgateQuery(t, vtgateConn, "product:0", "insert into order_seq(id, next_id, cache) values(0, 100, 100);")
execVtgateQuery(t, vtgateConn, "product:0", "insert into customer_seq2(id, next_id, cache) values(0, 100, 100);")
execMultipleQueries(t, vtgateConn, fmt.Sprintf("%s:0", defaultSourceKs), string(lines))
execVtgateQuery(t, vtgateConn, fmt.Sprintf("%s:0", defaultSourceKs), "insert into customer_seq(id, next_id, cache) values(0, 100, 100);")
execVtgateQuery(t, vtgateConn, fmt.Sprintf("%s:0", defaultSourceKs), "insert into order_seq(id, next_id, cache) values(0, 100, 100);")
execVtgateQuery(t, vtgateConn, fmt.Sprintf("%s:0", defaultSourceKs), "insert into customer_seq2(id, next_id, cache) values(0, 100, 100);")
log.Infof("Done inserting initial data")

waitForRowCount(t, vtgateConn, "product:0", "product", 2)
waitForRowCount(t, vtgateConn, "product:0", "customer", 3)
waitForQueryResult(t, vtgateConn, "product:0", "select * from merchant",
waitForRowCount(t, vtgateConn, fmt.Sprintf("%s:0", defaultSourceKs), "product", 2)
waitForRowCount(t, vtgateConn, fmt.Sprintf("%s:0", defaultSourceKs), "customer", 3)
waitForQueryResult(t, vtgateConn, fmt.Sprintf("%s:0", defaultSourceKs), "select * from merchant",
`[[VARCHAR("Monoprice") VARCHAR("eléctronics")] [VARCHAR("newegg") VARCHAR("elec†ronics")]]`)

insertJSONValues(t)
Expand All @@ -52,12 +52,12 @@ func insertJSONValues(t *testing.T) {
// insert null value combinations
vtgateConn, closeConn := getVTGateConn()
defer closeConn()
execVtgateQuery(t, vtgateConn, "product:0", "insert into json_tbl(id, j3) values(1, \"{}\")")
execVtgateQuery(t, vtgateConn, "product:0", "insert into json_tbl(id, j1, j3) values(2, \"{}\", \"{}\")")
execVtgateQuery(t, vtgateConn, "product:0", "insert into json_tbl(id, j2, j3) values(3, \"{}\", \"{}\")")
execVtgateQuery(t, vtgateConn, "product:0", "insert into json_tbl(id, j1, j2, j3) values(4, NULL, 'null', '\"null\"')")
execVtgateQuery(t, vtgateConn, "product:0", "insert into json_tbl(id, j3) values(5, JSON_QUOTE('null'))")
execVtgateQuery(t, vtgateConn, "product:0", "insert into json_tbl(id, j3) values(6, '{}')")
execVtgateQuery(t, vtgateConn, fmt.Sprintf("%s:0", defaultSourceKs), "insert into json_tbl(id, j3) values(1, \"{}\")")
execVtgateQuery(t, vtgateConn, fmt.Sprintf("%s:0", defaultSourceKs), "insert into json_tbl(id, j1, j3) values(2, \"{}\", \"{}\")")
execVtgateQuery(t, vtgateConn, fmt.Sprintf("%s:0", defaultSourceKs), "insert into json_tbl(id, j2, j3) values(3, \"{}\", \"{}\")")
execVtgateQuery(t, vtgateConn, fmt.Sprintf("%s:0", defaultSourceKs), "insert into json_tbl(id, j1, j2, j3) values(4, NULL, 'null', '\"null\"')")
execVtgateQuery(t, vtgateConn, fmt.Sprintf("%s:0", defaultSourceKs), "insert into json_tbl(id, j3) values(5, JSON_QUOTE('null'))")
execVtgateQuery(t, vtgateConn, fmt.Sprintf("%s:0", defaultSourceKs), "insert into json_tbl(id, j3) values(6, '{}')")

id := 8 // 6 inserted above and one after copy phase is done

Expand All @@ -68,7 +68,7 @@ func insertJSONValues(t *testing.T) {
j1 := rand.IntN(numJsonValues)
j2 := rand.IntN(numJsonValues)
query := fmt.Sprintf(q, id, jsonValues[j1], jsonValues[j2])
execVtgateQuery(t, vtgateConn, "product:0", query)
execVtgateQuery(t, vtgateConn, fmt.Sprintf("%s:0", defaultSourceKs), query)
}
}

Expand All @@ -82,7 +82,7 @@ func insertMoreCustomers(t *testing.T, numCustomers int) {
// that we reserved.
vtgateConn, closeConn := getVTGateConn()
defer closeConn()
maxID := waitForSequenceValue(t, vtgateConn, "product", "customer_seq", numCustomers)
maxID := waitForSequenceValue(t, vtgateConn, defaultSourceKs, "customer_seq", numCustomers)
// So we need to calculate the first value we reserved
// from the max.
cid := maxID - int64(numCustomers)
Expand All @@ -97,28 +97,28 @@ func insertMoreCustomers(t *testing.T, numCustomers int) {
}
cid++
}
execVtgateQuery(t, vtgateConn, "customer", sql)
execVtgateQuery(t, vtgateConn, defaultTargetKs, sql)
}

func insertMoreProducts(t *testing.T) {
vtgateConn, closeConn := getVTGateConn()
defer closeConn()
sql := "insert into product(pid, description) values(3, 'cpu'),(4, 'camera'),(5, 'mouse');"
execVtgateQuery(t, vtgateConn, "product", sql)
execVtgateQuery(t, vtgateConn, defaultSourceKs, sql)
}

func insertMoreProductsForSourceThrottler(t *testing.T) {
vtgateConn, closeConn := getVTGateConn()
defer closeConn()
sql := "insert into product(pid, description) values(103, 'new-cpu'),(104, 'new-camera'),(105, 'new-mouse');"
execVtgateQuery(t, vtgateConn, "product", sql)
execVtgateQuery(t, vtgateConn, defaultSourceKs, sql)
}

func insertMoreProductsForTargetThrottler(t *testing.T) {
vtgateConn, closeConn := getVTGateConn()
defer closeConn()
sql := "insert into product(pid, description) values(203, 'new-cpu'),(204, 'new-camera'),(205, 'new-mouse');"
execVtgateQuery(t, vtgateConn, "product", sql)
execVtgateQuery(t, vtgateConn, defaultSourceKs, sql)
}

var blobTableQueries = []string{
Expand All @@ -137,6 +137,6 @@ func insertIntoBlobTable(t *testing.T) {
vtgateConn, closeConn := getVTGateConn()
defer closeConn()
for _, query := range blobTableQueries {
execVtgateQuery(t, vtgateConn, "product:0", query)
execVtgateQuery(t, vtgateConn, fmt.Sprintf("%s:0", defaultSourceKs), query)
}
}
Loading
Loading