diff --git a/go/test/endtoend/vtgate/lookup_test.go b/go/test/endtoend/vtgate/lookup_test.go index 82cd63e5401..e8521be1932 100644 --- a/go/test/endtoend/vtgate/lookup_test.go +++ b/go/test/endtoend/vtgate/lookup_test.go @@ -131,7 +131,7 @@ func TestConsistentLookup(t *testing.T) { mysqlErr := err.(*sqlerror.SQLError) assert.Equal(t, sqlerror.ERDupEntry, mysqlErr.Num) assert.Equal(t, "23000", mysqlErr.State) - assert.ErrorContains(t, mysqlErr, "reverted partial DML execution") + assert.ErrorContains(t, mysqlErr, "lookup.Create: target: ks.80-.primary: vttablet: (errno 1062) (sqlstate 23000)") // Simple delete. utils.Exec(t, conn, "begin") diff --git a/go/test/endtoend/vtgate/queries/dml/dml_test.go b/go/test/endtoend/vtgate/queries/dml/dml_test.go index 3cbfaef21f6..52e9053fb9d 100644 --- a/go/test/endtoend/vtgate/queries/dml/dml_test.go +++ b/go/test/endtoend/vtgate/queries/dml/dml_test.go @@ -28,6 +28,92 @@ import ( "github.com/stretchr/testify/require" ) +// TestUniqueLookupDuplicateEntries should fail if the is duplicate in unique lookup column. +func TestUniqueLookupDuplicateEntries(t *testing.T) { + mcmp, closer := start(t) + defer closer() + + // initial row + utils.Exec(t, mcmp.VtConn, "insert into s_tbl(id, num) values (1,10)") + utils.AssertMatches(t, mcmp.VtConn, "select id, num from s_tbl order by id", `[[INT64(1) INT64(10)]]`) + utils.AssertMatches(t, mcmp.VtConn, "select num, hex(keyspace_id) from num_vdx_tbl order by num", `[[INT64(10) VARCHAR("166B40B44ABA4BD6")]]`) + + // insert duplicate row + utils.AssertContainsError(t, mcmp.VtConn, "insert into s_tbl(id, num) values (2,10)", "lookup.Create: target: sks.-80.primary: vttablet: "+ + "Duplicate entry '10' for key 'num_vdx_tbl.PRIMARY'") + utils.AssertMatches(t, mcmp.VtConn, "select id, num from s_tbl order by id", `[[INT64(1) INT64(10)]]`) + utils.AssertMatches(t, mcmp.VtConn, "select num, hex(keyspace_id) from num_vdx_tbl order by num", `[[INT64(10) VARCHAR("166B40B44ABA4BD6")]]`) + + // insert duplicate row in multi-row insert multi shard + utils.AssertContainsError(t, mcmp.VtConn, "insert into s_tbl(id, num) values (3,20), (4,20),(5,30)", + "transaction rolled back to reverse changes of partial DML execution: target: sks.80-.primary: vttablet: "+ + "Duplicate entry '20' for key 'num_vdx_tbl.PRIMARY'") + utils.AssertMatches(t, mcmp.VtConn, "select id, num from s_tbl order by id", `[[INT64(1) INT64(10)]]`) + utils.AssertMatches(t, mcmp.VtConn, "select num, hex(keyspace_id) from num_vdx_tbl order by num", `[[INT64(10) VARCHAR("166B40B44ABA4BD6")]]`) + + // insert duplicate row in multi-row insert - lookup single shard + utils.AssertContainsError(t, mcmp.VtConn, "insert into s_tbl(id, num) values (3,20), (4,20)", + "transaction rolled back to reverse changes of partial DML execution: lookup.Create: target: sks.80-.primary: vttablet: "+ + "Duplicate entry '20' for key 'num_vdx_tbl.PRIMARY'") + utils.AssertMatches(t, mcmp.VtConn, "select id, num from s_tbl order by id", `[[INT64(1) INT64(10)]]`) + utils.AssertMatches(t, mcmp.VtConn, "select num, hex(keyspace_id) from num_vdx_tbl order by num", `[[INT64(10) VARCHAR("166B40B44ABA4BD6")]]`) + + // insert second row to test with limit update. + utils.Exec(t, mcmp.VtConn, "insert into s_tbl(id, num) values (10,100)") + utils.AssertMatches(t, mcmp.VtConn, "select id, num from s_tbl order by id", `[[INT64(1) INT64(10)] [INT64(10) INT64(100)]]`) + utils.AssertMatches(t, mcmp.VtConn, "select num, hex(keyspace_id) from num_vdx_tbl order by num", `[[INT64(10) VARCHAR("166B40B44ABA4BD6")] [INT64(100) VARCHAR("594764E1A2B2D98E")]]`) + + // update with limit 1 succeed. + utils.Exec(t, mcmp.VtConn, "update s_tbl set num = 30 order by id limit 1") + utils.AssertMatches(t, mcmp.VtConn, "select id, num from s_tbl order by id", `[[INT64(1) INT64(30)] [INT64(10) INT64(100)]]`) + utils.AssertMatches(t, mcmp.VtConn, "select num, hex(keyspace_id) from num_vdx_tbl order by num", `[[INT64(30) VARCHAR("166B40B44ABA4BD6")] [INT64(100) VARCHAR("594764E1A2B2D98E")]]`) + + // update to same value on multiple row should fail. + utils.AssertContainsError(t, mcmp.VtConn, "update s_tbl set num = 40 limit 2", + "lookup.Create: transaction rolled back to reverse changes of partial DML execution: target: sks.80-.primary: vttablet: "+ + "rpc error: code = AlreadyExists desc = Duplicate entry '40' for key 'num_vdx_tbl.PRIMARY'") + utils.AssertMatches(t, mcmp.VtConn, "select id, num from s_tbl order by id", `[[INT64(1) INT64(30)] [INT64(10) INT64(100)]]`) + utils.AssertMatches(t, mcmp.VtConn, "select num, hex(keyspace_id) from num_vdx_tbl order by num", `[[INT64(30) VARCHAR("166B40B44ABA4BD6")] [INT64(100) VARCHAR("594764E1A2B2D98E")]]`) +} + +// TestUniqueLookupDuplicateIgnore tests the insert ignore on lookup table. +func TestUniqueLookupDuplicateIgnore(t *testing.T) { + mcmp, closer := start(t) + defer closer() + + // initial row + utils.Exec(t, mcmp.VtConn, "insert into s_tbl(id, num) values (1,10)") + utils.AssertMatches(t, mcmp.VtConn, "select id, num from s_tbl order by id", `[[INT64(1) INT64(10)]]`) + utils.AssertMatches(t, mcmp.VtConn, "select num, hex(keyspace_id) from num_vdx_tbl order by num", `[[INT64(10) VARCHAR("166B40B44ABA4BD6")]]`) + + // insert ignore duplicate row + qr := utils.Exec(t, mcmp.VtConn, "insert ignore into s_tbl(id, num) values (2,10)") + assert.EqualValues(t, 0, qr.RowsAffected) + utils.AssertMatches(t, mcmp.VtConn, "select id, num from s_tbl order by id", `[[INT64(1) INT64(10)]]`) + utils.AssertMatches(t, mcmp.VtConn, "select num, hex(keyspace_id) from num_vdx_tbl order by num", `[[INT64(10) VARCHAR("166B40B44ABA4BD6")]]`) + + // insert duplicate row in multi-row insert - lookup single shard + // Current behavior does not work as expected—one of the rows should be inserted. + // The lookup table is updated, but the main table is not. This is a bug in Vitess. + // The issue occurs because the table has two vindex columns (`num` and `col`), both of which ignore nulls during vindex insertion. + // In the `INSERT IGNORE` case, after the vindex create API call, a verify call checks if the row exists in the lookup table. + // - If the row exists, it is inserted into the main table. + // - If the row does not exist, the main table insertion is skipped. + // Since the `col` column is null, the row is not inserted into the lookup table, causing the main table insertion to be ignored. + qr = utils.Exec(t, mcmp.VtConn, "insert ignore into s_tbl(id, num) values (3,20), (4,20)") + assert.EqualValues(t, 0, qr.RowsAffected) + utils.AssertMatches(t, mcmp.VtConn, "select id, num from s_tbl order by id", `[[INT64(1) INT64(10)]]`) + utils.AssertMatches(t, mcmp.VtConn, "select num, hex(keyspace_id) from num_vdx_tbl order by num", `[[INT64(10) VARCHAR("166B40B44ABA4BD6")] [INT64(20) VARCHAR("4EB190C9A2FA169C")]]`) + + // insert duplicate row in multi-row insert - vindex values are not null + qr = utils.Exec(t, mcmp.VtConn, "insert ignore into s_tbl(id, num, col) values (3,20, 30), (4,20, 40)") + assert.EqualValues(t, 1, qr.RowsAffected) + utils.AssertMatches(t, mcmp.VtConn, "select id, num, col from s_tbl order by id", `[[INT64(1) INT64(10) NULL] [INT64(3) INT64(20) INT64(30)]]`) + utils.AssertMatches(t, mcmp.VtConn, "select num, hex(keyspace_id) from num_vdx_tbl order by num", `[[INT64(10) VARCHAR("166B40B44ABA4BD6")] [INT64(20) VARCHAR("4EB190C9A2FA169C")]]`) + utils.AssertMatches(t, mcmp.VtConn, "select col, hex(keyspace_id) from col_vdx_tbl order by col", `[[INT64(30) VARCHAR("4EB190C9A2FA169C")]]`) + +} + func TestMultiEqual(t *testing.T) { if clusterInstance.HasPartialKeyspaces { t.Skip("test uses multiple keyspaces, test framework only supports partial keyspace testing for a single keyspace") @@ -81,7 +167,7 @@ func TestDeleteWithLimit(t *testing.T) { defer closer() // initial rows - mcmp.Exec("insert into s_tbl(id, num) values (1,10), (2,10), (3,10), (4,20), (5,5), (6,15), (7,17), (8,80)") + mcmp.Exec("insert into s_tbl(id, num) values (1,10), (4,20), (5,5), (6,15), (7,17), (8,80)") mcmp.Exec("insert into order_tbl(region_id, oid, cust_no) values (1,1,4), (1,2,2), (2,3,5), (2,4,55)") // delete with limit @@ -93,14 +179,14 @@ func TestDeleteWithLimit(t *testing.T) { // check rows mcmp.AssertMatches(`select id, num from s_tbl order by id`, - `[[INT64(3) INT64(10)] [INT64(4) INT64(20)] [INT64(6) INT64(15)] [INT64(7) INT64(17)] [INT64(8) INT64(80)]]`) + `[[INT64(4) INT64(20)] [INT64(7) INT64(17)] [INT64(8) INT64(80)]]`) // 2 rows matches but limit is 1, so any one of the row can remain in table. mcmp.AssertMatchesAnyNoCompare(`select region_id, oid, cust_no from order_tbl order by oid`, `[[INT64(1) INT64(2) INT64(2)] [INT64(2) INT64(3) INT64(5)] [INT64(2) INT64(4) INT64(55)]]`, `[[INT64(1) INT64(1) INT64(4)] [INT64(2) INT64(3) INT64(5)] [INT64(2) INT64(4) INT64(55)]]`) // delete with limit - qr = mcmp.Exec(`delete from s_tbl where num < 20 limit 2`) + qr = mcmp.Exec(`delete from s_tbl where num < 25 limit 2`) require.EqualValues(t, 2, qr.RowsAffected) qr = mcmp.Exec(`delete from order_tbl limit 5`) @@ -108,10 +194,8 @@ func TestDeleteWithLimit(t *testing.T) { // check rows // 3 rows matches `num < 20` but limit is 2 so any one of them can remain in the table. - mcmp.AssertMatchesAnyNoCompare(`select id, num from s_tbl order by id`, - `[[INT64(4) INT64(20)] [INT64(7) INT64(17)] [INT64(8) INT64(80)]]`, - `[[INT64(3) INT64(10)] [INT64(4) INT64(20)] [INT64(8) INT64(80)]]`, - `[[INT64(4) INT64(20)] [INT64(6) INT64(15)] [INT64(8) INT64(80)]]`) + mcmp.AssertMatches(`select id, num from s_tbl order by id`, + `[[INT64(8) INT64(80)]]`) mcmp.AssertMatches(`select region_id, oid, cust_no from order_tbl order by oid`, `[]`) @@ -134,18 +218,18 @@ func TestUpdateWithLimit(t *testing.T) { defer closer() // initial rows - mcmp.Exec("insert into s_tbl(id, num) values (1,10), (4,20), (5,5), (6,15), (7,17), (8,80)") + mcmp.Exec("insert into s_tbl(id, col) values (1,10), (4,20), (5,5), (6,15), (7,17), (8,80)") mcmp.Exec("insert into order_tbl(region_id, oid, cust_no) values (1,1,4), (1,2,2), (2,3,5), (2,4,55)") // update with limit - qr := mcmp.Exec(`update s_tbl set num = 12 order by num, id limit 1`) + qr := mcmp.Exec(`update s_tbl set col = 12 order by col, id limit 1`) require.EqualValues(t, 1, qr.RowsAffected) qr = mcmp.Exec(`update order_tbl set cust_no = 12 where region_id = 1 limit 1`) require.EqualValues(t, 1, qr.RowsAffected) // check rows - mcmp.AssertMatches(`select id, num from s_tbl order by id`, + mcmp.AssertMatches(`select id, col from s_tbl order by id`, `[[INT64(1) INT64(10)] [INT64(4) INT64(20)] [INT64(5) INT64(12)] [INT64(6) INT64(15)] [INT64(7) INT64(17)] [INT64(8) INT64(80)]]`) // 2 rows matches but limit is 1, so any one of the row can be modified in the table. mcmp.AssertMatchesAnyNoCompare(`select region_id, oid, cust_no from order_tbl order by oid`, @@ -153,15 +237,15 @@ func TestUpdateWithLimit(t *testing.T) { `[[INT64(1) INT64(1) INT64(4)] [INT64(1) INT64(2) INT64(12)] [INT64(2) INT64(3) INT64(5)] [INT64(2) INT64(4) INT64(55)]]`) // update with limit - qr = mcmp.Exec(`update s_tbl set num = 32 where num > 17 limit 1`) + qr = mcmp.Exec(`update s_tbl set col = 32 where col > 17 limit 1`) require.EqualValues(t, 1, qr.RowsAffected) qr = mcmp.Exec(`update order_tbl set cust_no = cust_no + 10 limit 5`) require.EqualValues(t, 4, qr.RowsAffected) // check rows - // 2 rows matches `num > 17` but limit is 1 so any one of them will be updated. - mcmp.AssertMatchesAnyNoCompare(`select id, num from s_tbl order by id`, + // 2 rows matches `col > 17` but limit is 1 so any one of them will be updated. + mcmp.AssertMatchesAnyNoCompare(`select id, col from s_tbl order by id`, `[[INT64(1) INT64(10)] [INT64(4) INT64(32)] [INT64(5) INT64(12)] [INT64(6) INT64(15)] [INT64(7) INT64(17)] [INT64(8) INT64(80)]]`, `[[INT64(1) INT64(10)] [INT64(4) INT64(20)] [INT64(5) INT64(12)] [INT64(6) INT64(15)] [INT64(7) INT64(17)] [INT64(8) INT64(32)]]`) mcmp.AssertMatchesAnyNoCompare(`select region_id, oid, cust_no from order_tbl order by oid`, @@ -169,14 +253,14 @@ func TestUpdateWithLimit(t *testing.T) { `[[INT64(1) INT64(1) INT64(14)] [INT64(1) INT64(2) INT64(22)] [INT64(2) INT64(3) INT64(15)] [INT64(2) INT64(4) INT64(65)]]`) // trying with zero limit. - qr = mcmp.Exec(`update s_tbl set num = 44 limit 0`) + qr = mcmp.Exec(`update s_tbl set col = 44 limit 0`) require.EqualValues(t, 0, qr.RowsAffected) qr = mcmp.Exec(`update order_tbl set oid = 44 limit 0`) require.EqualValues(t, 0, qr.RowsAffected) // trying with limit with no-matching row. - qr = mcmp.Exec(`update s_tbl set num = 44 where id > 100 limit 2`) + qr = mcmp.Exec(`update s_tbl set col = 44 where id > 100 limit 2`) require.EqualValues(t, 0, qr.RowsAffected) qr = mcmp.Exec(`update order_tbl set oid = 44 where region_id > 100 limit 2`) @@ -219,7 +303,7 @@ func TestDeleteWithSubquery(t *testing.T) { defer closer() // initial rows - mcmp.Exec("insert into s_tbl(id, num) values (1,10), (2,10), (3,10), (4,20), (5,5), (6,15), (7,17), (8,80)") + mcmp.Exec("insert into s_tbl(id, col) values (1,10), (2,10), (3,10), (4,20), (5,5), (6,15), (7,17), (8,80)") mcmp.Exec("insert into order_tbl(region_id, oid, cust_no) values (1,1,4), (1,2,2), (2,3,5), (2,4,55)") // delete with subquery on s_tbl @@ -227,17 +311,17 @@ func TestDeleteWithSubquery(t *testing.T) { require.EqualValues(t, 4, qr.RowsAffected) // check rows - mcmp.AssertMatches(`select id, num from s_tbl order by id`, + mcmp.AssertMatches(`select id, col from s_tbl order by id`, `[[INT64(5) INT64(5)] [INT64(6) INT64(15)] [INT64(7) INT64(17)] [INT64(8) INT64(80)]]`) mcmp.AssertMatches(`select region_id, oid, cust_no from order_tbl order by oid`, `[[INT64(1) INT64(1) INT64(4)] [INT64(1) INT64(2) INT64(2)] [INT64(2) INT64(3) INT64(5)] [INT64(2) INT64(4) INT64(55)]]`) // delete with subquery on order_tbl - qr = mcmp.Exec(`delete from order_tbl where cust_no > (select num from s_tbl where id = 7)`) + qr = mcmp.Exec(`delete from order_tbl where cust_no > (select col from s_tbl where id = 7)`) require.EqualValues(t, 1, qr.RowsAffected) // check rows - mcmp.AssertMatches(`select id, num from s_tbl order by id`, + mcmp.AssertMatches(`select id, col from s_tbl order by id`, `[[INT64(5) INT64(5)] [INT64(6) INT64(15)] [INT64(7) INT64(17)] [INT64(8) INT64(80)]]`) mcmp.AssertMatches(`select region_id, oid, cust_no from order_tbl order by oid`, `[[INT64(1) INT64(1) INT64(4)] [INT64(1) INT64(2) INT64(2)] [INT64(2) INT64(3) INT64(5)]]`) @@ -251,7 +335,7 @@ func TestDeleteWithSubquery(t *testing.T) { require.EqualValues(t, 1, qr.RowsAffected) // check rows - utils.AssertMatches(t, mcmp.VtConn, `select id, num from s_tbl order by id`, + utils.AssertMatches(t, mcmp.VtConn, `select id, col from s_tbl order by id`, `[[INT64(5) INT64(5)] [INT64(6) INT64(15)] [INT64(7) INT64(17)] [INT64(8) INT64(80)]]`) utils.AssertMatches(t, mcmp.VtConn, `select region_id, oid, cust_no from order_tbl order by oid`, `[[INT64(1) INT64(1) INT64(4)] [INT64(1) INT64(2) INT64(2)]]`) diff --git a/go/test/endtoend/vtgate/queries/dml/main_test.go b/go/test/endtoend/vtgate/queries/dml/main_test.go index 3bdfd492a06..08f28abd80d 100644 --- a/go/test/endtoend/vtgate/queries/dml/main_test.go +++ b/go/test/endtoend/vtgate/queries/dml/main_test.go @@ -134,7 +134,7 @@ func start(t *testing.T) (utils.MySQLCompare, func()) { _, _ = utils.ExecAllowError(t, mcmp.VtConn, "set workload = oltp") tables := []string{ - "s_tbl", "num_vdx_tbl", "user_tbl", "order_tbl", "oevent_tbl", "oextra_tbl", + "s_tbl", "num_vdx_tbl", "col_vdx_tbl", "user_tbl", "order_tbl", "oevent_tbl", "oextra_tbl", "auto_tbl", "oid_vdx_tbl", "unq_idx", "nonunq_idx", "u_tbl", "mixed_tbl", "j_tbl", "j_utbl", "t1", "t2", } diff --git a/go/test/endtoend/vtgate/queries/dml/sharded_schema.sql b/go/test/endtoend/vtgate/queries/dml/sharded_schema.sql index 781fcc1f871..9b2e14f5201 100644 --- a/go/test/endtoend/vtgate/queries/dml/sharded_schema.sql +++ b/go/test/endtoend/vtgate/queries/dml/sharded_schema.sql @@ -2,6 +2,8 @@ create table s_tbl ( id bigint, num bigint, + col bigint, + unique key (num), primary key (id) ) Engine = InnoDB; @@ -12,6 +14,14 @@ create table num_vdx_tbl primary key (num) ) Engine = InnoDB; +create table col_vdx_tbl +( + col bigint, + id bigint, + keyspace_id varbinary(20), + primary key (col, id) +) Engine = InnoDB; + create table user_tbl ( id bigint, diff --git a/go/test/endtoend/vtgate/queries/dml/vschema.json b/go/test/endtoend/vtgate/queries/dml/vschema.json index c3be318f162..dd44d8989ca 100644 --- a/go/test/endtoend/vtgate/queries/dml/vschema.json +++ b/go/test/endtoend/vtgate/queries/dml/vschema.json @@ -9,7 +9,18 @@ "params": { "table": "num_vdx_tbl", "from": "num", - "to": "keyspace_id" + "to": "keyspace_id", + "ignore_nulls": "true" + }, + "owner": "s_tbl" + }, + "col_vdx": { + "type": "consistent_lookup", + "params": { + "table": "col_vdx_tbl", + "from": "col,id", + "to": "keyspace_id", + "ignore_nulls": "true" }, "owner": "s_tbl" }, @@ -63,6 +74,10 @@ { "column": "num", "name": "num_vdx" + }, + { + "columns": ["col", "id"], + "name": "col_vdx" } ] }, @@ -74,6 +89,14 @@ } ] }, + "col_vdx_tbl": { + "column_vindexes": [ + { + "column": "col", + "name": "hash" + } + ] + }, "user_tbl": { "auto_increment": { "column": "id", diff --git a/go/vt/vterrors/constants.go b/go/vt/vterrors/constants.go index 5fa13873ef5..05110a9346a 100644 --- a/go/vt/vterrors/constants.go +++ b/go/vt/vterrors/constants.go @@ -27,11 +27,26 @@ const ( // RxOp regex for operation not allowed error var RxOp = regexp.MustCompile("operation not allowed in state (NOT_SERVING|SHUTTING_DOWN)") -// TxEngineClosed for transaction engine closed error -const TxEngineClosed = "tx engine can't accept new connections in state %v" +// Constants for error messages +const ( + // TxEngineClosed for transaction engine closed error + TxEngineClosed = "tx engine can't accept new connections in state %v" + + // PrimaryVindexNotSet is the error message to be used when there is no primary vindex found on a table + PrimaryVindexNotSet = "table '%s' does not have a primary vindex" + + // WrongTablet for invalid tablet type error + WrongTablet = "wrong tablet type" + + // TxKillerRollback purpose when acquire lock on connection for rolling back transaction. + TxKillerRollback = "in use: for tx killer rollback" + + // RevertedPartialExec is the error message to be used when a partial DML execution failure is reverted using savepoint. + RevertedPartialExec = "reverted partial DML execution failure" -// WrongTablet for invalid tablet type error -const WrongTablet = "wrong tablet type" + // TxRollbackOnPartialExec is the error message to be used when a transaction is rolled back to reverse changes of partial DML execution + TxRollbackOnPartialExec = "transaction rolled back to reverse changes of partial DML execution" +) // ConnectionRefused is for gRPC client not being able to connect to a server const ConnectionRefused = "connection refused" @@ -39,14 +54,5 @@ const ConnectionRefused = "connection refused" // RxWrongTablet regex for invalid tablet type error var RxWrongTablet = regexp.MustCompile("(wrong|invalid) tablet type") -// Constants for error messages -const ( - // PrimaryVindexNotSet is the error message to be used when there is no primary vindex found on a table - PrimaryVindexNotSet = "table '%s' does not have a primary vindex" -) - -// TxKillerRollback purpose when acquire lock on connection for rolling back transaction. -const TxKillerRollback = "in use: for tx killer rollback" - // TxClosed regex for connection closed var TxClosed = regexp.MustCompile("transaction ([a-z0-9:]+) (?:ended|not found|in use: for tx killer rollback)") diff --git a/go/vt/vtgate/executor_dml_test.go b/go/vt/vtgate/executor_dml_test.go index 1078d4203e1..d62055a6bfa 100644 --- a/go/vt/vtgate/executor_dml_test.go +++ b/go/vt/vtgate/executor_dml_test.go @@ -25,16 +25,17 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - econtext "vitess.io/vitess/go/vt/vtgate/executorcontext" - "vitess.io/vitess/go/mysql/config" "vitess.io/vitess/go/mysql/sqlerror" "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/test/utils" + "vitess.io/vitess/go/vt/discovery" querypb "vitess.io/vitess/go/vt/proto/query" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" vtgatepb "vitess.io/vitess/go/vt/proto/vtgate" vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" "vitess.io/vitess/go/vt/sqlparser" + econtext "vitess.io/vitess/go/vt/vtgate/executorcontext" _ "vitess.io/vitess/go/vt/vtgate/vindexes" "vitess.io/vitess/go/vt/vttablet/sandboxconn" ) @@ -3201,3 +3202,66 @@ func TestSessionRowsAffected(t *testing.T) { require.NoError(t, err) require.Zero(t, session.ShardSessions) } + +func TestConsistentLookupInsert(t *testing.T) { + ctx := utils.LeakCheckContext(t) + + // Special setup + cell := "zone1" + ks := "TestExecutor" + hc := discovery.NewFakeHealthCheck(nil) + s := createSandbox(ks) + s.ShardSpec = "-80-" + s.VSchema = executorVSchema + serv := newSandboxForCells(ctx, []string{cell}) + resolver := newTestResolver(ctx, hc, serv, cell) + sbc1 := hc.AddTestTablet(cell, "-80", 1, ks, "-80", topodatapb.TabletType_PRIMARY, true, 1, nil) + sbc2 := hc.AddTestTablet(cell, "80-", 1, ks, "80-", topodatapb.TabletType_PRIMARY, true, 1, nil) + + executor := createExecutor(ctx, serv, cell, resolver) + defer executor.Close() + + logChan := executor.queryLogger.Subscribe("Test") + defer executor.queryLogger.Unsubscribe(logChan) + + session := econtext.NewAutocommitSession(&vtgatepb.Session{}) + + t.Run("transaction rollback due to partial execution error, no duplicate handling", func(t *testing.T) { + sbc1.EphemeralShardErr = sqlerror.NewSQLError(sqlerror.ERDupEntry, sqlerror.SSConstraintViolation, "Duplicate entry '10' for key 't1_lkp_idx.PRIMARY'") + sbc2.SetResults([]*sqltypes.Result{{RowsAffected: 1}}) + _, err := executorExecSession(ctx, executor, session, "insert into t1(id, unq_col) values (1, 10), (4, 10), (50, 4)", nil) + assert.ErrorContains(t, err, + "lookup.Create: transaction rolled back to reverse changes of partial DML execution: target: TestExecutor.-80.primary: "+ + "Duplicate entry '10' for key 't1_lkp_idx.PRIMARY' (errno 1062) (sqlstate 23000)") + + assert.EqualValues(t, 0, sbc1.ExecCount.Load()) + assert.EqualValues(t, 1, sbc2.ExecCount.Load()) + + testQueryLog(t, executor, logChan, "VindexCreate", "INSERT", "insert into t1_lkp_idx(unq_col, keyspace_id) values (:unq_col_0, :keyspace_id_0), (:unq_col_1, :keyspace_id_1), (:unq_col_2, :keyspace_id_2)", 2) + testQueryLog(t, executor, logChan, "TestExecute", "INSERT", "insert into t1(id, unq_col) values (1, 10), (4, 10), (50, 4)", 0) + }) + + sbc1.ResetCounter() + sbc2.ResetCounter() + session = econtext.NewAutocommitSession(session.Session) + + t.Run("duplicate handling failing on same unique column value", func(t *testing.T) { + sbc1.EphemeralShardErr = sqlerror.NewSQLError(sqlerror.ERDupEntry, sqlerror.SSConstraintViolation, "Duplicate entry '10' for key 't1_lkp_idx.PRIMARY'") + sbc1.SetResults([]*sqltypes.Result{ + sqltypes.MakeTestResult(sqltypes.MakeTestFields("keyspace_id", "varbinary")), + {RowsAffected: 1}, + }) + _, err := executorExecSession(ctx, executor, session, "insert into t1(id, unq_col) values (1, 10), (4, 10)", nil) + assert.ErrorContains(t, err, + "transaction rolled back to reverse changes of partial DML execution: lookup.Create: target: TestExecutor.-80.primary: "+ + "Duplicate entry '10' for key 't1_lkp_idx.PRIMARY' (errno 1062) (sqlstate 23000)") + + assert.EqualValues(t, 2, sbc1.ExecCount.Load()) + assert.EqualValues(t, 0, sbc2.ExecCount.Load()) + + testQueryLog(t, executor, logChan, "VindexCreate", "INSERT", "insert into t1_lkp_idx(unq_col, keyspace_id) values (:unq_col_0, :keyspace_id_0), (:unq_col_1, :keyspace_id_1)", 1) + testQueryLog(t, executor, logChan, "VindexCreate", "SELECT", "select keyspace_id from t1_lkp_idx where unq_col = :unq_col for update", 1) + testQueryLog(t, executor, logChan, "VindexCreate", "INSERT", "insert into t1_lkp_idx(unq_col, keyspace_id) values (:unq_col, :keyspace_id)", 1) + testQueryLog(t, executor, logChan, "TestExecute", "INSERT", "insert into t1(id, unq_col) values (1, 10), (4, 10)", 0) + }) +} diff --git a/go/vt/vtgate/executorcontext/vcursor_impl.go b/go/vt/vtgate/executorcontext/vcursor_impl.go index e3a80390c56..a66eca0da84 100644 --- a/go/vt/vtgate/executorcontext/vcursor_impl.go +++ b/go/vt/vtgate/executorcontext/vcursor_impl.go @@ -805,7 +805,9 @@ func (vc *VCursorImpl) Execute(ctx context.Context, method string, query string, } qr, err := vc.executor.Execute(ctx, nil, method, session, vc.marginComments.Leading+query+vc.marginComments.Trailing, bindVars, false) - vc.setRollbackOnPartialExecIfRequired(err != nil, rollbackOnError) + // If there is no error, it indicates at least one successful execution, + // meaning a rollback should be triggered if a failure occurs later. + vc.setRollbackOnPartialExecIfRequired(err == nil, rollbackOnError) return qr, err } diff --git a/go/vt/vtgate/plan_execute.go b/go/vt/vtgate/plan_execute.go index 855fb0bfc32..fe35f104709 100644 --- a/go/vt/vtgate/plan_execute.go +++ b/go/vt/vtgate/plan_execute.go @@ -384,7 +384,7 @@ func (e *Executor) rollbackPartialExec(ctx context.Context, safeSession *econtex _, _, err = e.execute(ctx, nil, safeSession, rQuery, bindVars, false, logStats) // If no error, the revert is successful with the savepoint. Notify the reason as error to the client. if err == nil { - errMsg.WriteString("reverted partial DML execution failure") + errMsg.WriteString(vterrors.RevertedPartialExec) return vterrors.New(vtrpcpb.Code_ABORTED, errMsg.String()) } // not able to rollback changes of the failed query, so have to abort the complete transaction. @@ -392,7 +392,8 @@ func (e *Executor) rollbackPartialExec(ctx context.Context, safeSession *econtex // abort the transaction. _ = e.txConn.Rollback(ctx, safeSession) - errMsg.WriteString("transaction rolled back to reverse changes of partial DML execution") + + errMsg.WriteString(vterrors.TxRollbackOnPartialExec) if err != nil { return vterrors.Wrap(err, errMsg.String()) } diff --git a/go/vt/vtgate/vindexes/consistent_lookup.go b/go/vt/vtgate/vindexes/consistent_lookup.go index 637368d2af3..b619963d012 100644 --- a/go/vt/vtgate/vindexes/consistent_lookup.go +++ b/go/vt/vtgate/vindexes/consistent_lookup.go @@ -23,16 +23,17 @@ import ( "fmt" "strings" + "github.com/cespare/xxhash/v2" + "vitess.io/vitess/go/mysql/sqlerror" "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/vt/key" - "vitess.io/vitess/go/vt/sqlparser" - "vitess.io/vitess/go/vt/vterrors" - "vitess.io/vitess/go/vt/vtgate/evalengine" - querypb "vitess.io/vitess/go/vt/proto/query" topodatapb "vitess.io/vitess/go/vt/proto/topodata" vtgatepb "vitess.io/vitess/go/vt/proto/vtgate" + "vitess.io/vitess/go/vt/sqlparser" + "vitess.io/vitess/go/vt/vterrors" + "vitess.io/vitess/go/vt/vtgate/evalengine" ) const ( @@ -342,18 +343,36 @@ func (lu *clCommon) Verify(ctx context.Context, vcursor VCursor, ids []sqltypes. // Create reserves the id by inserting it into the vindex table. func (lu *clCommon) Create(ctx context.Context, vcursor VCursor, rowsColValues [][]sqltypes.Value, ksids [][]byte, ignoreMode bool) error { + // Attempt to insert values into the lookup vindex table. origErr := lu.lkp.createCustom(ctx, vcursor, rowsColValues, ksidsToValues(ksids), ignoreMode, vtgatepb.CommitOrder_PRE) if origErr == nil { return nil } + + // If the transaction is already rolled back. We should not handle the case for duplicate error. + if strings.Contains(origErr.Error(), vterrors.TxRollbackOnPartialExec) { + return origErr + } + // Try and convert the error to a MySQL error sqlErr, isSQLErr := sqlerror.NewSQLErrorFromError(origErr).(*sqlerror.SQLError) - // If it is a MySQL error and its code is of duplicate entry, then we would like to continue - // Otherwise, we return the error + + // If the error is NOT a duplicate entry error, return it immediately. if !(isSQLErr && sqlErr != nil && sqlErr.Number() == sqlerror.ERDupEntry) { return origErr } + + // Map to track unique row hashes and their original index in `rowsColValues`. + rowHashIndex := make(map[uint64]int, len(rowsColValues)) for i, row := range rowsColValues { + rowKey := hashKeyXXH(row) + // If a row with the same hash exists, perform an explicit value check to avoid hash collisions. + if idx, exists := rowHashIndex[rowKey]; exists && sqltypes.RowEqual(row, rowsColValues[idx]) { + return origErr // Exact duplicate found, return the original error. + } + rowHashIndex[rowKey] = i + + // Attempt to handle the duplicate entry. if err := lu.handleDup(ctx, vcursor, row, ksids[i], origErr); err != nil { return err } @@ -361,6 +380,16 @@ func (lu *clCommon) Create(ctx context.Context, vcursor VCursor, rowsColValues [ return nil } +// hashKeyXXH generates a fast 64-bit hash for a row using xxHash. +// This is optimized for performance and helps detect duplicates efficiently. +func hashKeyXXH(row []sqltypes.Value) uint64 { + h := xxhash.New() + for _, col := range row { + _, _ = h.Write([]byte(col.String())) // Ignoring error as xxHash Write never fails + } + return h.Sum64() +} + func (lu *clCommon) handleDup(ctx context.Context, vcursor VCursor, values []sqltypes.Value, ksid []byte, dupError error) error { bindVars := make(map[string]*querypb.BindVariable, len(values)) for colnum, val := range values {