Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 97 additions & 19 deletions go/test/endtoend/vtgate/concurrentdml/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import (
"testing"
"time"

"github.com/google/go-cmp/cmp"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"vitess.io/vitess/go/mysql"
Expand Down Expand Up @@ -60,9 +62,11 @@ CREATE TABLE t1 (
c1 BIGINT NOT NULL,
c2 BIGINT NOT NULL,
c3 BIGINT,
c4 varchar(100),
PRIMARY KEY (c1),
UNIQUE KEY (c2),
UNIQUE KEY (c3)
UNIQUE KEY (c3),
UNIQUE KEY (c4)
) ENGINE=Innodb;

CREATE TABLE lookup_t1 (
Expand All @@ -76,6 +80,12 @@ CREATE TABLE lookup_t2 (
keyspace_id BINARY(8),
primary key (c3)
);

CREATE TABLE lookup_t3 (
c4 varchar(100) NOT NULL,
keyspace_id BINARY(8),
primary key (c4)
);
`

sVSchema = `
Expand Down Expand Up @@ -104,6 +114,16 @@ CREATE TABLE lookup_t2 (
"ignore_nulls": "true"
},
"owner": "t1"
},
"lookup_c4": {
"type": "consistent_lookup_unique",
"params": {
"table": "lookup_t3",
"from": "c4",
"to": "keyspace_id",
"ignore_nulls": "true"
},
"owner": "t1"
}
},
"tables": {
Expand All @@ -120,6 +140,10 @@ CREATE TABLE lookup_t2 (
{
"column": "c3",
"name": "lookup_c3"
},
{
"column": "c4",
"name": "lookup_c4"
}
],
"columns": [
Expand All @@ -134,6 +158,10 @@ CREATE TABLE lookup_t2 (
{
"name": "c3",
"type": "INT64"
},
{
"name": "c4",
"type": "VARCHAR"
}
],
"autoIncrement": {
Expand All @@ -157,6 +185,14 @@ CREATE TABLE lookup_t2 (
"name": "xxhash"
}
]
},
"lookup_t3": {
"columnVindexes": [
{
"column": "c4",
"name": "xxhash"
}
]
}
}
}
Expand All @@ -170,7 +206,6 @@ func TestMain(m *testing.M) {
exitCode := func() int {
clusterInstance = cluster.NewCluster(cell, hostname)
defer clusterInstance.Teardown()
//defer time.Sleep(10 * time.Minute)

// Start topo server
if err := clusterInstance.StartTopo(); err != nil {
Expand All @@ -183,7 +218,7 @@ func TestMain(m *testing.M) {
SchemaSQL: unsSchema,
VSchema: unsVSchema,
}
if err := clusterInstance.StartUnshardedKeyspace(*uKeyspace, 1, false); err != nil {
if err := clusterInstance.StartUnshardedKeyspace(*uKeyspace, 0, false); err != nil {
return 1
}

Expand All @@ -192,7 +227,7 @@ func TestMain(m *testing.M) {
SchemaSQL: sSchema,
VSchema: sVSchema,
}
if err := clusterInstance.StartKeyspace(*sKeyspace, []string{"-80", "80-"}, 1, false); err != nil {
if err := clusterInstance.StartKeyspace(*sKeyspace, []string{"-80", "80-"}, 0, false); err != nil {
return 1
}

Expand All @@ -217,14 +252,14 @@ func TestInsertIgnoreOnLookupUniqueVindex(t *testing.T) {
require.Nil(t, err)
defer conn.Close()

exec(t, conn, `delete from t1 where c1 = 300`)
exec(t, conn, `insert into t1 values (300,100,300)`)
defer exec(t, conn, `delete from t1`)
exec(t, conn, `insert into t1(c1, c2, c3) values (300,100,300)`)
qr1 := exec(t, conn, `select c2.keyspace_id, c3.keyspace_id from lookup_t1 c2, lookup_t2 c3`)

qr := exec(t, conn, `insert ignore into t1 values (200,100,200)`)
qr := exec(t, conn, `insert ignore into t1(c1, c2, c3) values (200,100,200)`)
assert.Zero(t, qr.RowsAffected)

qr = exec(t, conn, `select * from t1 order by c1`)
qr = exec(t, conn, `select c1, c2, c3 from t1 order by c1`)
assert.Equal(t, fmt.Sprintf("%v", qr.Rows), `[[INT64(300) INT64(100) INT64(300)]]`)

qr2 := exec(t, conn, `select c2.keyspace_id, c3.keyspace_id from lookup_t1 c2, lookup_t2 c3`)
Expand All @@ -233,6 +268,7 @@ func TestInsertIgnoreOnLookupUniqueVindex(t *testing.T) {
}

func TestOpenTxBlocksInSerial(t *testing.T) {
t.Skip("Update and Insert in same transaction does not work with the unique consistent lookup having same value.")
defer cluster.PanicHandler(t)
ctx := context.Background()
vtParams := mysql.ConnParams{
Expand All @@ -247,23 +283,24 @@ func TestOpenTxBlocksInSerial(t *testing.T) {
require.Nil(t, err)
defer conn2.Close()

exec(t, conn1, `delete from t1 where c1 = 300`)
exec(t, conn1, `insert into t1 values (300,100,300)`)
defer exec(t, conn1, `delete from t1`)
exec(t, conn1, `insert into t1(c1, c2, c3) values (300,100,300)`)
exec(t, conn1, `begin`)
exec(t, conn1, `UPDATE t1 SET c3 = 400 WHERE c2 = 100`)

// This will wait for innodb_lock_wait_timeout timeout pf 20 seconds to kick in.
execAssertError(t, conn2, `insert into t1 values (400,100,400)`, `Lock wait timeout exceeded`)
execAssertError(t, conn2, `insert into t1(c1, c2, c3) values (400,100,400)`, `Lock wait timeout exceeded`)

qr := exec(t, conn1, `insert ignore into t1 values (200,100,200)`)
qr := exec(t, conn1, `insert ignore into t1(c1, c2, c3) values (200,100,200)`)
assert.Zero(t, qr.RowsAffected)
exec(t, conn1, `commit`)

qr = exec(t, conn1, `select * from t1 order by c1`)
qr = exec(t, conn1, `select c1, c2, c3 from t1 order by c1`)
assert.Equal(t, fmt.Sprintf("%v", qr.Rows), `[[INT64(300) INT64(100) INT64(400)]]`)
}

func TestOpenTxBlocksInConcurrent(t *testing.T) {
t.Skip("Update and Insert in same transaction does not work with the unique consistent lookup having same value.")
defer cluster.PanicHandler(t)
ctx := context.Background()
vtParams := mysql.ConnParams{
Expand All @@ -278,29 +315,60 @@ func TestOpenTxBlocksInConcurrent(t *testing.T) {
require.Nil(t, err)
defer conn2.Close()

exec(t, conn1, `delete from t1 where c1 = 300`)
exec(t, conn1, `insert into t1 values (300,100,300)`)
defer exec(t, conn1, `delete from t1`)
exec(t, conn1, `insert into t1(c1, c2, c3) values (300,100,300)`)
exec(t, conn1, `begin`)
exec(t, conn1, `UPDATE t1 SET c3 = 400 WHERE c2 = 100`)

var wg sync.WaitGroup
wg.Add(1)
go func() {
// This will wait for other transaction to complete to through the duplicate key error.
execAssertError(t, conn2, `insert into t1 values (400,100,400)`, `Duplicate entry '100' for key`)
// This will wait for other transaction to complete before throwing the duplicate key error.
execAssertError(t, conn2, `insert into t1(c1, c2, c3) values (400,100,400)`, `Duplicate entry '100' for key`)
wg.Done()
}()

time.Sleep(3 * time.Second)
qr := exec(t, conn1, `insert ignore into t1 values (200,100,200)`)
qr := exec(t, conn1, `insert ignore into t1(c1, c2, c3) values (200,100,200)`)
assert.Zero(t, qr.RowsAffected)
exec(t, conn1, `commit`)

qr = exec(t, conn1, `select * from t1 order by c1`)
qr = exec(t, conn1, `select c1, c2, c3 from t1 order by c1`)
assert.Equal(t, fmt.Sprintf("%v", qr.Rows), `[[INT64(300) INT64(100) INT64(400)]]`)
wg.Wait()
}

func TestUpdateLookupUniqueVindex(t *testing.T) {
defer cluster.PanicHandler(t)
ctx := context.Background()
vtParams := mysql.ConnParams{
Host: "localhost",
Port: clusterInstance.VtgateMySQLPort,
}
conn, err := mysql.Connect(ctx, &vtParams)
require.Nil(t, err)
defer conn.Close()

defer exec(t, conn, `delete from t1`)
exec(t, conn, `insert into t1(c1, c2, c3) values (999,100,300)`)
assertMatches(t, conn, `select c1,c2,c3 from t1`, `[[INT64(999) INT64(100) INT64(300)]]`)
assertMatches(t, conn, `select c2 from lookup_t1`, `[[INT64(100)]]`)
assertMatches(t, conn, `select c3 from lookup_t2`, `[[INT64(300)]]`)
// not changed - same vindex
exec(t, conn, `update t1 set c2 = 100 where c2 = 100`)
// changed - same vindex
exec(t, conn, `update t1 set c2 = 200 where c2 = 100`)
// not changed - different vindex
exec(t, conn, `update t1 set c3 = 300 where c2 = 200`)
// changed - different vindex
exec(t, conn, `update t1 set c3 = 400 where c2 = 200`)
// changed - same vindex
exec(t, conn, `update t1 set c4 = 'abc' where c1 = 999`)
// not changed - same vindex - not yet supported bcoz of varchar field
// exec(t, conn, `update t1 set c4 = 'abc' where c4 = 'abc'`)

}

func exec(t *testing.T, conn *mysql.Conn, query string) *sqltypes.Result {
t.Helper()
qr, err := conn.ExecuteFetch(query, 1000, true)
Expand All @@ -314,3 +382,13 @@ func execAssertError(t *testing.T, conn *mysql.Conn, query string, errorString s
require.Error(t, err)
assert.Contains(t, err.Error(), errorString)
}

func assertMatches(t *testing.T, conn *mysql.Conn, query, expected string) {
t.Helper()
qr := exec(t, conn, query)
got := fmt.Sprintf("%v", qr.Rows)
diff := cmp.Diff(expected, got)
if diff != "" {
t.Errorf("Query: %s (-want +got):\n%s", query, diff)
}
}
2 changes: 1 addition & 1 deletion go/vt/vtgate/vcursor_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ func (vc *vcursorImpl) InTransactionAndIsDML() bool {

func (vc *vcursorImpl) LookupRowLockShardSession() vtgatepb.CommitOrder {
switch vc.logStats.StmtType {
case "DELETE":
case "DELETE", "UPDATE":
return vtgatepb.CommitOrder_POST
}
return vtgatepb.CommitOrder_PRE
Expand Down