From 10077a5c7c0f30c400d44f5e90ff3efbe6ac760c Mon Sep 17 00:00:00 2001 From: Manan Gupta Date: Thu, 14 Sep 2023 12:19:09 +0530 Subject: [PATCH 01/10] feat: add preliminary fuzzer test that only runs inserts Signed-off-by: Manan Gupta --- .../vtgate/foreignkey/fk_fuzz_test.go | 323 ++++++++++++++++++ go/test/endtoend/vtgate/foreignkey/fk_test.go | 23 +- .../endtoend/vtgate/foreignkey/main_test.go | 11 +- .../endtoend/vtgate/foreignkey/utils_test.go | 50 +++ 4 files changed, 380 insertions(+), 27 deletions(-) create mode 100644 go/test/endtoend/vtgate/foreignkey/fk_fuzz_test.go create mode 100644 go/test/endtoend/vtgate/foreignkey/utils_test.go diff --git a/go/test/endtoend/vtgate/foreignkey/fk_fuzz_test.go b/go/test/endtoend/vtgate/foreignkey/fk_fuzz_test.go new file mode 100644 index 00000000000..04a3f58b56d --- /dev/null +++ b/go/test/endtoend/vtgate/foreignkey/fk_fuzz_test.go @@ -0,0 +1,323 @@ +/* +Copyright 2023 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package foreignkey + +import ( + "fmt" + "math/rand" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "vitess.io/vitess/go/test/endtoend/utils" +) + +// fuzzer runs threads that runs queries against the databases. +// It has parameters that define the way the queries are constructed. +type fuzzer struct { + maxValForId int + maxValForCol int + insertShare int + deleteShare int + updateShare int + concurrency int + + // shouldStop is an internal state variable, that tells the fuzzer + // whether it should stop or not. + shouldStop atomic.Bool + // wg is an internal state variable, that used to know whether the fuzzer threads are running or not. + wg sync.WaitGroup + // idCollisionMap is used to make sure that we don't end up running two queries from two threads that are running on the same table + // on the same primary column. This is going to be a challenge, because lets say we run two inserts on the same primary col value, + // In this case, the two queries will race with each other, and Vitess and MySQL might end up breaking ties in differnt ways, causing + // the data to be different in them. mu is the set of mutexes, protecting access to the map. + mu []sync.Mutex + idCollisionMap []map[int]bool +} + +// newFuzzer creates a new fuzzer struct. +func newFuzzer(concurrency int, maxValForId int, maxValForCol int, insertShare int, deleteShare int, updateShare int) *fuzzer { + fz := &fuzzer{ + concurrency: concurrency, + maxValForId: maxValForId, + maxValForCol: maxValForCol, + insertShare: insertShare, + deleteShare: deleteShare, + updateShare: updateShare, + wg: sync.WaitGroup{}, + } + // Initially the fuzzer thread is stopped. + fz.shouldStop.Store(true) + // Initialize the map and mutexes. + for range fkTables { + fz.mu = append(fz.mu, sync.Mutex{}) + fz.idCollisionMap = append(fz.idCollisionMap, map[int]bool{}) + } + return fz +} + +// generateQuery generates a query from the parameters for the fuzzer. +func (fz *fuzzer) generateQuery(tableId, idValue, colValue int) string { + val := rand.Intn(fz.insertShare + fz.updateShare + fz.deleteShare) + if val < fz.insertShare { + return fz.generateInsertQuery(tableId, idValue, colValue) + } + if val < fz.insertShare+fz.updateShare { + return fz.generateUpdateQuery(tableId, idValue, colValue) + } + return fz.generateDeleteQuery(tableId, idValue) +} + +// generateInsertQuery generates an INSERT query from the parameters for the fuzzer. +func (fz *fuzzer) generateInsertQuery(tableId, idValue, colValue int) string { + query := fmt.Sprintf("insert into %v (id, col) values (%v, %v)", fkTables[tableId], idValue, colValue) + if colValue == 0 { + query = fmt.Sprintf("insert into %v (id, col) values (%v, NULL)", fkTables[tableId], idValue) + } + return query +} + +// generateUpdateQuery generates an UPDATE query from the parameters for the fuzzer. +func (fz *fuzzer) generateUpdateQuery(tableId, idValue, colValue int) string { + query := fmt.Sprintf("update %v set col = %v where id = %v", fkTables[tableId], colValue, idValue) + if colValue == 0 { + query = fmt.Sprintf("update %v set col = NULL where id = %v", fkTables[tableId], idValue) + } + return query +} + +// generateDeleteQuery generates a DELETE query from the parameters for the fuzzer. +func (fz *fuzzer) generateDeleteQuery(tableId, idValue int) string { + query := fmt.Sprintf("delete from %v where id = %v", fkTables[tableId], idValue) + return query +} + +// start starts running the fuzzer. +func (fz *fuzzer) start(t *testing.T, sharded bool) { + // We mark the fuzzer thread to be running now. + fz.shouldStop.Store(false) + fz.wg.Add(fz.concurrency) + for i := 0; i < fz.concurrency; i++ { + fuzzerThreadId := i + go func() { + fz.runFuzzerThread(t, sharded, fuzzerThreadId) + }() + } +} + +// runFuzzerThread is used to run a thread of the fuzzer. +func (fz *fuzzer) runFuzzerThread(t *testing.T, sharded bool, fuzzerThreadId int) { + // Whenever we finish running this thread, we should mark the thread has stopped. + defer func() { + fz.wg.Done() + }() + mcmp, err := utils.NewMySQLCompare(t, vtParams, mysqlParams) + require.NoError(t, err) + // Set the correct keyspace to use from VtGates. + if sharded { + _ = utils.Exec(t, mcmp.VtConn, "use `ks`") + } else { + _ = utils.Exec(t, mcmp.VtConn, "use `uks`") + } + for { + // If fuzzer thread is marked to be stopped, then we should exit this go routine. + if fz.shouldStop.Load() == true { + return + } + tableId := rand.Intn(len(fkTables)) + idValue := 1 + rand.Intn(fz.maxValForId) + colValue := rand.Intn(1 + fz.maxValForCol) + safeToUse := fz.checkAndMarkValue(tableId, idValue) + if !safeToUse { + continue + } + // Get a query and execute it. + query := fz.generateQuery(tableId, idValue, colValue) + _, _ = mcmp.ExecAllowAndCompareError(query) + fz.freeValue(tableId, idValue) + } +} + +// stop stops the fuzzer and waits for it to finish execution. +func (fz *fuzzer) stop() { + // Mark the thread to be stopped. + fz.shouldStop.Store(true) + // Wait for the fuzzer thread to stop. + fz.wg.Wait() +} + +// checkAndMarkValue checks if a concurrent query is currently running on the given tableId and idValue. +// If not, then it marks it in the map and returns that it is safe to continue with this query. +func (fz *fuzzer) checkAndMarkValue(tableId int, idValue int) bool { + fz.mu[tableId].Lock() + defer fz.mu[tableId].Unlock() + + if !fz.idCollisionMap[tableId][idValue] { + fz.idCollisionMap[tableId][idValue] = true + return true + } + return false +} + +// freeValue frees the value for the given table and id pair. This can now be used by other running threads. +func (fz *fuzzer) freeValue(tableId int, idValue int) { + fz.mu[tableId].Lock() + defer fz.mu[tableId].Unlock() + + fz.idCollisionMap[tableId][idValue] = false +} + +// TestFkFuzzTest is a fuzzer test that works by querying the database concurrently. +// We have a pre-written set of query templates that we will use, but the data in the queries will +// be randomly generated. The intent is that we hammer the database as a real-world application would +// and check the correctness of data with MySQL. +// We are using the same schema for this test as we do for TestFkScenarios. +/* + * fk_t1 + * │ + * │ On Delete Restrict + * │ On Update Restrict + * ▼ + * ┌────────────────fk_t2────────────────┐ + * │ │ + * │On Delete Set Null │ On Delete Set Null + * │On Update Set Null │ On Update Set Null + * ▼ ▼ + * fk_t7 fk_t3───────────────────┐ + * │ │ + * │ │ On Delete Set Null + * On Delete Set Null │ │ On Update Set Null + * On Update Set Null │ │ + * ▼ ▼ + * fk_t4 fk_t6 + * │ + * │ + * On Delete Restrict │ + * On Update Restrict │ + * │ + * ▼ + * fk_t5 + */ +/* + * fk_t10 + * │ + * On Delete Cascade │ + * On Update Cascade │ + * │ + * ▼ + * fk_t11──────────────────┐ + * │ │ + * │ │ On Delete Restrict + * On Delete Cascade │ │ On Update Restrict + * On Update Cascade │ │ + * │ │ + * ▼ ▼ + * fk_t12 fk_t13 + */ +/* + * fk_t15 + * │ + * │ + * On Delete Cascade │ + * On Update Cascade │ + * │ + * ▼ + * fk_t16 + * │ + * On Delete Set Null │ + * On Update Set Null │ + * │ + * ▼ + * fk_t17──────────────────┐ + * │ │ + * │ │ On Delete Set Null + * On Delete Cascade │ │ On Update Set Null + * On Update Cascade │ │ + * │ │ + * ▼ ▼ + * fk_t18 fk_t19 + */ +/* + Self referenced foreign key from col2 to col in fk_t20 +*/ +func TestFkFuzzTest(t *testing.T) { + // Wait for schema-tracking to be complete. + waitForSchemaTrackingForFkTables(t) + + testcases := []struct { + name string + concurrency int + timeForTesting time.Duration + maxValForId int + maxValForCol int + insertShare int + deleteShare int + updateShare int + }{ + { + name: "Single Thread - Only Inserts", + concurrency: 1, + timeForTesting: 5 * time.Second, + maxValForCol: 5, + maxValForId: 10, + insertShare: 100, + deleteShare: 0, + updateShare: 0, + }, + } + + for _, tt := range testcases { + for _, testSharded := range []bool{false, true} { + t.Run(getTestName(tt.name, testSharded), func(t *testing.T) { + mcmp, closer := start(t) + defer closer() + // Set the correct keyspace to use from VtGates. + if testSharded { + t.Skip("Skip test since we don't have sharded foreign key support yet") + _ = utils.Exec(t, mcmp.VtConn, "use `ks`") + } else { + _ = utils.Exec(t, mcmp.VtConn, "use `uks`") + } + + // Create the fuzzer. + fz := newFuzzer(tt.concurrency, tt.maxValForId, tt.maxValForCol, tt.insertShare, tt.deleteShare, tt.updateShare) + + // Start the fuzzer. + fz.start(t, testSharded) + + // Wait for the timeForTesting so that the threads continue to run. + time.Sleep(tt.timeForTesting) + + fz.stop() + + // Verify that the data in the MySQL database and Vitess database matches exactly. + verifyDataIsCorrect(mcmp) + }) + } + } +} + +// verifyDataIsCorrect verifies that the data in MySQL database matches the data in the Vitess database. +func verifyDataIsCorrect(mcmp utils.MySQLCompare) { + for _, table := range fkTables { + query := fmt.Sprintf("SELECT * FROM %v ORDER BY id", table) + mcmp.Exec(query) + } +} diff --git a/go/test/endtoend/vtgate/foreignkey/fk_test.go b/go/test/endtoend/vtgate/foreignkey/fk_test.go index 3f59a247273..f35fd20d986 100644 --- a/go/test/endtoend/vtgate/foreignkey/fk_test.go +++ b/go/test/endtoend/vtgate/foreignkey/fk_test.go @@ -236,18 +236,7 @@ func TestUpdateWithFK(t *testing.T) { */ func TestFkScenarios(t *testing.T) { // Wait for schema-tracking to be complete. - err := utils.WaitForColumn(t, clusterInstance.VtgateProcess, shardedKs, "fk_t1", "col") - require.NoError(t, err) - err = utils.WaitForColumn(t, clusterInstance.VtgateProcess, shardedKs, "fk_t18", "col") - require.NoError(t, err) - err = utils.WaitForColumn(t, clusterInstance.VtgateProcess, shardedKs, "fk_t11", "col") - require.NoError(t, err) - err = utils.WaitForColumn(t, clusterInstance.VtgateProcess, unshardedKs, "fk_t1", "col") - require.NoError(t, err) - err = utils.WaitForColumn(t, clusterInstance.VtgateProcess, unshardedKs, "fk_t18", "col") - require.NoError(t, err) - err = utils.WaitForColumn(t, clusterInstance.VtgateProcess, unshardedKs, "fk_t11", "col") - require.NoError(t, err) + waitForSchemaTrackingForFkTables(t) testcases := []struct { name string @@ -619,7 +608,7 @@ func TestFkScenarios(t *testing.T) { mcmp.Exec("SELECT * FROM fk_t13 ORDER BY id") // Update that fails - _, err = mcmp.ExecAllowAndCompareError("UPDATE fk_t10 SET col = 15 WHERE id = 1") + _, err := mcmp.ExecAllowAndCompareError("UPDATE fk_t10 SET col = 15 WHERE id = 1") require.Error(t, err) // Verify the results @@ -659,11 +648,3 @@ func TestFkScenarios(t *testing.T) { }) } } - -// getTestName prepends whether the test is for a sharded keyspace or not to the test name. -func getTestName(testName string, testSharded bool) string { - if testSharded { - return "Sharded - " + testName - } - return "Unsharded - " + testName -} diff --git a/go/test/endtoend/vtgate/foreignkey/main_test.go b/go/test/endtoend/vtgate/foreignkey/main_test.go index cf0c76b5404..c882bbd7be0 100644 --- a/go/test/endtoend/vtgate/foreignkey/main_test.go +++ b/go/test/endtoend/vtgate/foreignkey/main_test.go @@ -49,6 +49,9 @@ var ( //go:embed unsharded_vschema.json unshardedVSchema string + + fkTables = []string{"fk_t1", "fk_t2", "fk_t3", "fk_t4", "fk_t5", "fk_t6", "fk_t7", + "fk_t10", "fk_t11", "fk_t12", "fk_t13", "fk_t15", "fk_t16", "fk_t17", "fk_t18", "fk_t19"} ) func TestMain(m *testing.M) { @@ -121,9 +124,7 @@ func start(t *testing.T) (utils.MySQLCompare, func()) { deleteAll := func() { _ = utils.Exec(t, mcmp.VtConn, "use `ks/-80`") tables := []string{"t4", "t3", "t2", "t1", "multicol_tbl2", "multicol_tbl1"} - for i := 20; i > 0; i-- { - tables = append(tables, fmt.Sprintf("fk_t%v", i)) - } + tables = append(tables, fkTables...) for _, table := range tables { _, _ = mcmp.ExecAndIgnore("delete from " + table) } @@ -133,9 +134,7 @@ func start(t *testing.T) (utils.MySQLCompare, func()) { } _ = utils.Exec(t, mcmp.VtConn, "use `uks`") tables = []string{"u_t1", "u_t2", "u_t3"} - for i := 20; i > 0; i-- { - tables = append(tables, fmt.Sprintf("fk_t%v", i)) - } + tables = append(tables, fkTables...) for _, table := range tables { _, _ = mcmp.ExecAndIgnore("delete from " + table) } diff --git a/go/test/endtoend/vtgate/foreignkey/utils_test.go b/go/test/endtoend/vtgate/foreignkey/utils_test.go new file mode 100644 index 00000000000..cd6c13b9f68 --- /dev/null +++ b/go/test/endtoend/vtgate/foreignkey/utils_test.go @@ -0,0 +1,50 @@ +/* +Copyright 2023 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package foreignkey + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "vitess.io/vitess/go/test/endtoend/utils" +) + +// getTestName prepends whether the test is for a sharded keyspace or not to the test name. +func getTestName(testName string, testSharded bool) string { + if testSharded { + return "Sharded - " + testName + } + return "Unsharded - " + testName +} + +// waitForSchemaTrackingForFkTables waits for schema tracking to have run and seen the tables used +// for foreign key tests. +func waitForSchemaTrackingForFkTables(t *testing.T) { + err := utils.WaitForColumn(t, clusterInstance.VtgateProcess, shardedKs, "fk_t1", "col") + require.NoError(t, err) + err = utils.WaitForColumn(t, clusterInstance.VtgateProcess, shardedKs, "fk_t18", "col") + require.NoError(t, err) + err = utils.WaitForColumn(t, clusterInstance.VtgateProcess, shardedKs, "fk_t11", "col") + require.NoError(t, err) + err = utils.WaitForColumn(t, clusterInstance.VtgateProcess, unshardedKs, "fk_t1", "col") + require.NoError(t, err) + err = utils.WaitForColumn(t, clusterInstance.VtgateProcess, unshardedKs, "fk_t18", "col") + require.NoError(t, err) + err = utils.WaitForColumn(t, clusterInstance.VtgateProcess, unshardedKs, "fk_t11", "col") + require.NoError(t, err) +} From 61b34d3ad6f39bfce91a1c9e92a258578ab039f1 Mon Sep 17 00:00:00 2001 From: Manan Gupta Date: Thu, 14 Sep 2023 13:38:35 +0530 Subject: [PATCH 02/10] feat: remove collision map and add code to store the debug information in single concurrency case Signed-off-by: Manan Gupta --- .../vtgate/foreignkey/fk_fuzz_test.go | 147 +++++++++++------- 1 file changed, 94 insertions(+), 53 deletions(-) diff --git a/go/test/endtoend/vtgate/foreignkey/fk_fuzz_test.go b/go/test/endtoend/vtgate/foreignkey/fk_fuzz_test.go index 04a3f58b56d..c34654d7fc6 100644 --- a/go/test/endtoend/vtgate/foreignkey/fk_fuzz_test.go +++ b/go/test/endtoend/vtgate/foreignkey/fk_fuzz_test.go @@ -26,7 +26,10 @@ import ( "github.com/stretchr/testify/require" + "vitess.io/vitess/go/mysql" + "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/test/endtoend/utils" + "vitess.io/vitess/go/vt/log" ) // fuzzer runs threads that runs queries against the databases. @@ -44,12 +47,15 @@ type fuzzer struct { shouldStop atomic.Bool // wg is an internal state variable, that used to know whether the fuzzer threads are running or not. wg sync.WaitGroup - // idCollisionMap is used to make sure that we don't end up running two queries from two threads that are running on the same table - // on the same primary column. This is going to be a challenge, because lets say we run two inserts on the same primary col value, - // In this case, the two queries will race with each other, and Vitess and MySQL might end up breaking ties in differnt ways, causing - // the data to be different in them. mu is the set of mutexes, protecting access to the map. - mu []sync.Mutex - idCollisionMap []map[int]bool + // firstFailureInfo stores the information about the database state after the first failure occurs. + firstFailureInfo *debugInfo +} + +// debugInfo stores the debugging information we can collect after a failure happens. +type debugInfo struct { + queryToFail string + vitessState []*sqltypes.Result + mysqlState []*sqltypes.Result } // newFuzzer creates a new fuzzer struct. @@ -65,28 +71,26 @@ func newFuzzer(concurrency int, maxValForId int, maxValForCol int, insertShare i } // Initially the fuzzer thread is stopped. fz.shouldStop.Store(true) - // Initialize the map and mutexes. - for range fkTables { - fz.mu = append(fz.mu, sync.Mutex{}) - fz.idCollisionMap = append(fz.idCollisionMap, map[int]bool{}) - } return fz } // generateQuery generates a query from the parameters for the fuzzer. -func (fz *fuzzer) generateQuery(tableId, idValue, colValue int) string { +func (fz *fuzzer) generateQuery() string { val := rand.Intn(fz.insertShare + fz.updateShare + fz.deleteShare) if val < fz.insertShare { - return fz.generateInsertQuery(tableId, idValue, colValue) + return fz.generateInsertQuery() } if val < fz.insertShare+fz.updateShare { - return fz.generateUpdateQuery(tableId, idValue, colValue) + return fz.generateUpdateQuery() } - return fz.generateDeleteQuery(tableId, idValue) + return fz.generateDeleteQuery() } // generateInsertQuery generates an INSERT query from the parameters for the fuzzer. -func (fz *fuzzer) generateInsertQuery(tableId, idValue, colValue int) string { +func (fz *fuzzer) generateInsertQuery() string { + tableId := rand.Intn(len(fkTables)) + idValue := 1 + rand.Intn(fz.maxValForId) + colValue := rand.Intn(1 + fz.maxValForCol) query := fmt.Sprintf("insert into %v (id, col) values (%v, %v)", fkTables[tableId], idValue, colValue) if colValue == 0 { query = fmt.Sprintf("insert into %v (id, col) values (%v, NULL)", fkTables[tableId], idValue) @@ -95,7 +99,10 @@ func (fz *fuzzer) generateInsertQuery(tableId, idValue, colValue int) string { } // generateUpdateQuery generates an UPDATE query from the parameters for the fuzzer. -func (fz *fuzzer) generateUpdateQuery(tableId, idValue, colValue int) string { +func (fz *fuzzer) generateUpdateQuery() string { + tableId := rand.Intn(len(fkTables)) + idValue := 1 + rand.Intn(fz.maxValForId) + colValue := rand.Intn(1 + fz.maxValForCol) query := fmt.Sprintf("update %v set col = %v where id = %v", fkTables[tableId], colValue, idValue) if colValue == 0 { query = fmt.Sprintf("update %v set col = NULL where id = %v", fkTables[tableId], idValue) @@ -104,7 +111,9 @@ func (fz *fuzzer) generateUpdateQuery(tableId, idValue, colValue int) string { } // generateDeleteQuery generates a DELETE query from the parameters for the fuzzer. -func (fz *fuzzer) generateDeleteQuery(tableId, idValue int) string { +func (fz *fuzzer) generateDeleteQuery() string { + tableId := rand.Intn(len(fkTables)) + idValue := 1 + rand.Intn(fz.maxValForId) query := fmt.Sprintf("delete from %v where id = %v", fkTables[tableId], idValue) return query } @@ -141,17 +150,25 @@ func (fz *fuzzer) runFuzzerThread(t *testing.T, sharded bool, fuzzerThreadId int if fz.shouldStop.Load() == true { return } - tableId := rand.Intn(len(fkTables)) - idValue := 1 + rand.Intn(fz.maxValForId) - colValue := rand.Intn(1 + fz.maxValForCol) - safeToUse := fz.checkAndMarkValue(tableId, idValue) - if !safeToUse { - continue - } // Get a query and execute it. - query := fz.generateQuery(tableId, idValue, colValue) - _, _ = mcmp.ExecAllowAndCompareError(query) - fz.freeValue(tableId, idValue) + query := fz.generateQuery() + // When the concurrency is 1, then we run the query both on MySQL and Vitess. + if fz.concurrency == 1 { + _, _ = mcmp.ExecAllowAndCompareError(query) + // If t is marked failed, we have encountered our first failure. + // Lets collect the required information and finish execution. + if t.Failed() { + fz.firstFailureInfo = &debugInfo{ + queryToFail: query, + mysqlState: collectFkTablesState(mcmp.MySQLConn), + vitessState: collectFkTablesState(mcmp.VtConn), + } + return + } + } else { + // When we are running concurrent threads, then we run all the queries on Vitess. + _ = utils.Exec(t, mcmp.VtConn, query) + } } } @@ -163,27 +180,6 @@ func (fz *fuzzer) stop() { fz.wg.Wait() } -// checkAndMarkValue checks if a concurrent query is currently running on the given tableId and idValue. -// If not, then it marks it in the map and returns that it is safe to continue with this query. -func (fz *fuzzer) checkAndMarkValue(tableId int, idValue int) bool { - fz.mu[tableId].Lock() - defer fz.mu[tableId].Unlock() - - if !fz.idCollisionMap[tableId][idValue] { - fz.idCollisionMap[tableId][idValue] = true - return true - } - return false -} - -// freeValue frees the value for the given table and id pair. This can now be used by other running threads. -func (fz *fuzzer) freeValue(tableId int, idValue int) { - fz.mu[tableId].Lock() - defer fz.mu[tableId].Unlock() - - fz.idCollisionMap[tableId][idValue] = false -} - // TestFkFuzzTest is a fuzzer test that works by querying the database concurrently. // We have a pre-written set of query templates that we will use, but the data in the queries will // be randomly generated. The intent is that we hammer the database as a real-world application would @@ -280,6 +276,24 @@ func TestFkFuzzTest(t *testing.T) { insertShare: 100, deleteShare: 0, updateShare: 0, + }, { + name: "Single Thread - Balanced Inserts and Deletes", + concurrency: 1, + timeForTesting: 5 * time.Second, + maxValForCol: 5, + maxValForId: 10, + insertShare: 50, + deleteShare: 50, + updateShare: 0, + }, { + name: "Single Thread - Balanced Inserts and Updates", + concurrency: 1, + timeForTesting: 5 * time.Second, + maxValForCol: 5, + maxValForId: 10, + insertShare: 50, + deleteShare: 0, + updateShare: 50, }, } @@ -307,17 +321,44 @@ func TestFkFuzzTest(t *testing.T) { fz.stop() - // Verify that the data in the MySQL database and Vitess database matches exactly. - verifyDataIsCorrect(mcmp) + // We encountered an error while running the fuzzer. Let's print out the information! + if fz.firstFailureInfo != nil { + log.Errorf("Failing query - %v", fz.firstFailureInfo.queryToFail) + for idx, table := range fkTables { + log.Errorf("MySQL data for %v -\n%v", table, fz.firstFailureInfo.mysqlState[idx].Rows) + log.Errorf("Vitess data for %v -\n%v", table, fz.firstFailureInfo.vitessState[idx].Rows) + } + } + + // Verify the consistency of the data. + verifyDataIsCorrect(mcmp, tt.concurrency) }) } } } // verifyDataIsCorrect verifies that the data in MySQL database matches the data in the Vitess database. -func verifyDataIsCorrect(mcmp utils.MySQLCompare) { +func verifyDataIsCorrect(mcmp utils.MySQLCompare, concurrency int) { + // For single concurrent thread, we run all the queries on both MySQL and Vitess, so we can verify correctness + // by just checking if the data in MySQL and Vitess match. + if concurrency == 1 { + for _, table := range fkTables { + query := fmt.Sprintf("SELECT * FROM %v ORDER BY id", table) + mcmp.Exec(query) + } + } + // For higher concurrency, we don't have MySQL data to verify everything is fine, + // so we'll have to do something different. + // TODO: Do something different. +} + +// collectFkTablesState collects the data stored in the foreign key tables for the given connection. +func collectFkTablesState(conn *mysql.Conn) []*sqltypes.Result { + var tablesData []*sqltypes.Result for _, table := range fkTables { query := fmt.Sprintf("SELECT * FROM %v ORDER BY id", table) - mcmp.Exec(query) + res, _ := conn.ExecuteFetch(query, 10000, true) + tablesData = append(tablesData, res) } + return tablesData } From 748f683f25d7bbd8ec57ff5616d15b677a2c9af8 Mon Sep 17 00:00:00 2001 From: Manan Gupta Date: Thu, 14 Sep 2023 16:34:07 +0530 Subject: [PATCH 03/10] feat: augment tests to verify that a replica without any foreign key constraints has the same data Signed-off-by: Manan Gupta --- go/test/endtoend/utils/utils.go | 59 +++++++++++++ .../vtgate/foreignkey/fk_fuzz_test.go | 87 +++++++++++++++++-- .../endtoend/vtgate/foreignkey/main_test.go | 25 +++++- .../endtoend/vtgate/foreignkey/utils_test.go | 38 ++++++++ 4 files changed, 199 insertions(+), 10 deletions(-) diff --git a/go/test/endtoend/utils/utils.go b/go/test/endtoend/utils/utils.go index c0137b27066..8396ea9b613 100644 --- a/go/test/endtoend/utils/utils.go +++ b/go/test/endtoend/utils/utils.go @@ -17,7 +17,10 @@ limitations under the License. package utils import ( + "context" "fmt" + "os" + "path" "strings" "testing" "time" @@ -340,3 +343,59 @@ func TimeoutAction(t *testing.T, timeout time.Duration, errMsg string, action fu } } } + +// RunSQLs is used to run a list of SQL statements on the given tablet +func RunSQLs(t *testing.T, sqls []string, tablet *cluster.Vttablet, db string) error { + // Get Connection + tabletParams := getMysqlConnParam(tablet, db) + var timeoutDuration = time.Duration(5 * len(sqls)) + ctx, cancel := context.WithTimeout(context.Background(), timeoutDuration*time.Second) + defer cancel() + conn, err := mysql.Connect(ctx, &tabletParams) + require.Nil(t, err) + defer conn.Close() + + // Run SQLs + for _, sql := range sqls { + if _, err := execute(t, conn, sql); err != nil { + return err + } + } + return nil +} + +// RunSQL is used to run a SQL statement on the given tablet +func RunSQL(t *testing.T, sql string, tablet *cluster.Vttablet, db string) (*sqltypes.Result, error) { + // Get Connection + tabletParams := getMysqlConnParam(tablet, db) + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + conn, err := mysql.Connect(ctx, &tabletParams) + require.Nil(t, err) + defer conn.Close() + + // RunSQL + return execute(t, conn, sql) +} + +// GetMySQLConn gets a MySQL connection for the given tablet +func GetMySQLConn(tablet *cluster.Vttablet, db string) (*mysql.Conn, error) { + tabletParams := getMysqlConnParam(tablet, db) + return mysql.Connect(context.Background(), &tabletParams) +} + +func execute(t *testing.T, conn *mysql.Conn, query string) (*sqltypes.Result, error) { + t.Helper() + return conn.ExecuteFetch(query, 1000, true) +} + +func getMysqlConnParam(tablet *cluster.Vttablet, db string) mysql.ConnParams { + connParams := mysql.ConnParams{ + Uname: "vt_dba", + UnixSocket: path.Join(os.Getenv("VTDATAROOT"), fmt.Sprintf("/vt_%010d/mysql.sock", tablet.TabletUID)), + } + if db != "" { + connParams.DbName = db + } + return connParams +} diff --git a/go/test/endtoend/vtgate/foreignkey/fk_fuzz_test.go b/go/test/endtoend/vtgate/foreignkey/fk_fuzz_test.go index c34654d7fc6..b6b6413cb29 100644 --- a/go/test/endtoend/vtgate/foreignkey/fk_fuzz_test.go +++ b/go/test/endtoend/vtgate/foreignkey/fk_fuzz_test.go @@ -28,6 +28,7 @@ import ( "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/sqltypes" + "vitess.io/vitess/go/test/endtoend/cluster" "vitess.io/vitess/go/test/endtoend/utils" "vitess.io/vitess/go/vt/log" ) @@ -167,7 +168,7 @@ func (fz *fuzzer) runFuzzerThread(t *testing.T, sharded bool, fuzzerThreadId int } } else { // When we are running concurrent threads, then we run all the queries on Vitess. - _ = utils.Exec(t, mcmp.VtConn, query) + _, _ = utils.ExecAllowError(t, mcmp.VtConn, query) } } } @@ -256,6 +257,15 @@ func (fz *fuzzer) stop() { func TestFkFuzzTest(t *testing.T) { // Wait for schema-tracking to be complete. waitForSchemaTrackingForFkTables(t) + // Remove all the foreign key constraints for all the replicas. + // We can then verify that the replica, and the primary have the same data, to ensure + // that none of the queries ever lead to cascades/updates on MySQL level. + for _, ks := range []string{shardedKs, unshardedKs} { + replicas := getReplicaTablets(ks) + for _, replica := range replicas { + removeAllForeignKeyConstraints(t, replica, ks) + } + } testcases := []struct { name string @@ -276,7 +286,8 @@ func TestFkFuzzTest(t *testing.T) { insertShare: 100, deleteShare: 0, updateShare: 0, - }, { + }, + { name: "Single Thread - Balanced Inserts and Deletes", concurrency: 1, timeForTesting: 5 * time.Second, @@ -285,7 +296,8 @@ func TestFkFuzzTest(t *testing.T) { insertShare: 50, deleteShare: 50, updateShare: 0, - }, { + }, + { name: "Single Thread - Balanced Inserts and Updates", concurrency: 1, timeForTesting: 5 * time.Second, @@ -295,6 +307,26 @@ func TestFkFuzzTest(t *testing.T) { deleteShare: 0, updateShare: 50, }, + { + name: "Single Thread - Balanced Inserts, Updates and Deletes", + concurrency: 1, + timeForTesting: 5 * time.Second, + maxValForCol: 5, + maxValForId: 10, + insertShare: 50, + deleteShare: 50, + updateShare: 50, + }, + { + name: "Multi Thread - Balanced Inserts, Updates and Deletes", + concurrency: 30, + timeForTesting: 5 * time.Second, + maxValForCol: 5, + maxValForId: 30, + insertShare: 50, + deleteShare: 50, + updateShare: 50, + }, } for _, tt := range testcases { @@ -331,14 +363,14 @@ func TestFkFuzzTest(t *testing.T) { } // Verify the consistency of the data. - verifyDataIsCorrect(mcmp, tt.concurrency) + verifyDataIsCorrect(t, mcmp, tt.concurrency) }) } } } // verifyDataIsCorrect verifies that the data in MySQL database matches the data in the Vitess database. -func verifyDataIsCorrect(mcmp utils.MySQLCompare, concurrency int) { +func verifyDataIsCorrect(t *testing.T, mcmp utils.MySQLCompare, concurrency int) { // For single concurrent thread, we run all the queries on both MySQL and Vitess, so we can verify correctness // by just checking if the data in MySQL and Vitess match. if concurrency == 1 { @@ -346,10 +378,49 @@ func verifyDataIsCorrect(mcmp utils.MySQLCompare, concurrency int) { query := fmt.Sprintf("SELECT * FROM %v ORDER BY id", table) mcmp.Exec(query) } + } else { + // For higher concurrency, we don't have MySQL data to verify everything is fine, + // so we'll have to do something different. + // We run LEFT JOIN queries on all the parent and child tables linked by foreign keys + // to make sure that nothing is broken in the database. + for _, reference := range fkReferences { + query := fmt.Sprintf("select %v.id from %v left join %v on (%v.col = %v.col) where %v.col is null and %v.col is not null", reference.childTable, reference.childTable, reference.parentTable, reference.parentTable, reference.childTable, reference.parentTable, reference.childTable) + res, err := mcmp.VtConn.ExecuteFetch(query, 1000, false) + require.NoError(t, err) + require.Zerof(t, len(res.Rows), "Query %v gave non-empty results", query) + } + } + // We also verify that the results in Primary and Replica table match as is. + for _, keyspace := range clusterInstance.Keyspaces { + for _, shard := range keyspace.Shards { + var primaryTab, replicaTab *cluster.Vttablet + for _, vttablet := range shard.Vttablets { + if vttablet.Type == "primary" { + primaryTab = vttablet + } else { + replicaTab = vttablet + } + } + require.NotNil(t, primaryTab) + require.NotNil(t, replicaTab) + primaryConn, err := utils.GetMySQLConn(primaryTab, fmt.Sprintf("vt_%v", keyspace.Name)) + require.NoError(t, err) + replicaConn, err := utils.GetMySQLConn(replicaTab, fmt.Sprintf("vt_%v", keyspace.Name)) + require.NoError(t, err) + primaryRes := collectFkTablesState(primaryConn) + replicaRes := collectFkTablesState(replicaConn) + verifyDataMatches(t, primaryRes, replicaRes) + } + } +} + +// verifyDataMatches verifies that the two list of results are the same. +func verifyDataMatches(t *testing.T, resOne []*sqltypes.Result, resTwo []*sqltypes.Result) { + require.EqualValues(t, len(resTwo), len(resOne), "Res 1 - %v, Res 2 - %v", resOne, resTwo) + for idx, resultOne := range resOne { + resultTwo := resTwo[idx] + require.True(t, resultOne.Equal(resultTwo), "Rows 1 - %v, Rows 2 - %v", resultOne.Rows, resultTwo.Rows) } - // For higher concurrency, we don't have MySQL data to verify everything is fine, - // so we'll have to do something different. - // TODO: Do something different. } // collectFkTablesState collects the data stored in the foreign key tables for the given connection. diff --git a/go/test/endtoend/vtgate/foreignkey/main_test.go b/go/test/endtoend/vtgate/foreignkey/main_test.go index c882bbd7be0..2c7eb3f486b 100644 --- a/go/test/endtoend/vtgate/foreignkey/main_test.go +++ b/go/test/endtoend/vtgate/foreignkey/main_test.go @@ -52,8 +52,29 @@ var ( fkTables = []string{"fk_t1", "fk_t2", "fk_t3", "fk_t4", "fk_t5", "fk_t6", "fk_t7", "fk_t10", "fk_t11", "fk_t12", "fk_t13", "fk_t15", "fk_t16", "fk_t17", "fk_t18", "fk_t19"} + fkReferences = []fkReference{ + {parentTable: "fk_t1", childTable: "fk_t2"}, + {parentTable: "fk_t2", childTable: "fk_t7"}, + {parentTable: "fk_t2", childTable: "fk_t3"}, + {parentTable: "fk_t3", childTable: "fk_t4"}, + {parentTable: "fk_t3", childTable: "fk_t6"}, + {parentTable: "fk_t4", childTable: "fk_t5"}, + {parentTable: "fk_t10", childTable: "fk_t11"}, + {parentTable: "fk_t11", childTable: "fk_t12"}, + {parentTable: "fk_t11", childTable: "fk_t13"}, + {parentTable: "fk_t15", childTable: "fk_t16"}, + {parentTable: "fk_t16", childTable: "fk_t17"}, + {parentTable: "fk_t17", childTable: "fk_t18"}, + {parentTable: "fk_t17", childTable: "fk_t19"}, + } ) +// fkReference stores a foreign key reference from one table to another. +type fkReference struct { + parentTable string + childTable string +} + func TestMain(m *testing.M) { defer cluster.PanicHandler(nil) flag.Parse() @@ -75,7 +96,7 @@ func TestMain(m *testing.M) { VSchema: shardedVSchema, } - err = clusterInstance.StartKeyspace(*sKs, []string{"-80", "80-"}, 0, false) + err = clusterInstance.StartKeyspace(*sKs, []string{"-80", "80-"}, 1, false) if err != nil { return 1 } @@ -85,7 +106,7 @@ func TestMain(m *testing.M) { SchemaSQL: unshardedSchemaSQL, VSchema: unshardedVSchema, } - err = clusterInstance.StartUnshardedKeyspace(*uKs, 0, false) + err = clusterInstance.StartUnshardedKeyspace(*uKs, 1, false) if err != nil { return 1 } diff --git a/go/test/endtoend/vtgate/foreignkey/utils_test.go b/go/test/endtoend/vtgate/foreignkey/utils_test.go index cd6c13b9f68..2178bb54f06 100644 --- a/go/test/endtoend/vtgate/foreignkey/utils_test.go +++ b/go/test/endtoend/vtgate/foreignkey/utils_test.go @@ -17,10 +17,12 @@ limitations under the License. package foreignkey import ( + "fmt" "testing" "github.com/stretchr/testify/require" + "vitess.io/vitess/go/test/endtoend/cluster" "vitess.io/vitess/go/test/endtoend/utils" ) @@ -48,3 +50,39 @@ func waitForSchemaTrackingForFkTables(t *testing.T) { err = utils.WaitForColumn(t, clusterInstance.VtgateProcess, unshardedKs, "fk_t11", "col") require.NoError(t, err) } + +// getReplicaTablets gets all the replica tablets. +func getReplicaTablets(keyspace string) []*cluster.Vttablet { + var replicaTablets []*cluster.Vttablet + for _, ks := range clusterInstance.Keyspaces { + if ks.Name != keyspace { + continue + } + for _, shard := range ks.Shards { + for _, vttablet := range shard.Vttablets { + if vttablet.Type != "primary" { + replicaTablets = append(replicaTablets, vttablet) + } + } + } + } + return replicaTablets +} + +// removeAllForeignKeyConstraints removes all the foreign key constraints from the given tablet. +func removeAllForeignKeyConstraints(t *testing.T, vttablet *cluster.Vttablet, keyspace string) { + getAllFksQuery := `SELECT RefCons.table_name, RefCons.constraint_name FROM information_schema.referential_constraints RefCons;` + res, err := utils.RunSQL(t, getAllFksQuery, vttablet, "") + require.NoError(t, err) + var queries []string + queries = append(queries, "set global super_read_only=0") + for _, row := range res.Rows { + tableName := row[0].ToString() + constraintName := row[1].ToString() + removeFkQuery := fmt.Sprintf("ALTER TABLE %v DROP CONSTRAINT %v", tableName, constraintName) + queries = append(queries, removeFkQuery) + } + queries = append(queries, "set global super_read_only=1") + err = utils.RunSQLs(t, queries, vttablet, fmt.Sprintf("vt_%v", keyspace)) + require.NoError(t, err) +} From 32575982a89c72613eefd83e4d181eb8b21c71d6 Mon Sep 17 00:00:00 2001 From: Manan Gupta Date: Fri, 15 Sep 2023 08:29:32 +0530 Subject: [PATCH 04/10] feat: add tests for multicol foreign keys as well Signed-off-by: Manan Gupta --- .../vtgate/foreignkey/fk_fuzz_test.go | 47 +++- .../endtoend/vtgate/foreignkey/main_test.go | 17 +- .../vtgate/foreignkey/sharded_schema.sql | 226 ++++++++++++++++- .../vtgate/foreignkey/sharded_vschema.json | 136 +++++++++++ .../vtgate/foreignkey/unsharded_schema.sql | 227 +++++++++++++++++- .../vtgate/foreignkey/unsharded_vschema.json | 19 +- .../endtoend/vtgate/foreignkey/utils_test.go | 6 + 7 files changed, 663 insertions(+), 15 deletions(-) diff --git a/go/test/endtoend/vtgate/foreignkey/fk_fuzz_test.go b/go/test/endtoend/vtgate/foreignkey/fk_fuzz_test.go index b6b6413cb29..241353b47f9 100644 --- a/go/test/endtoend/vtgate/foreignkey/fk_fuzz_test.go +++ b/go/test/endtoend/vtgate/foreignkey/fk_fuzz_test.go @@ -91,24 +91,46 @@ func (fz *fuzzer) generateQuery() string { func (fz *fuzzer) generateInsertQuery() string { tableId := rand.Intn(len(fkTables)) idValue := 1 + rand.Intn(fz.maxValForId) - colValue := rand.Intn(1 + fz.maxValForCol) - query := fmt.Sprintf("insert into %v (id, col) values (%v, %v)", fkTables[tableId], idValue, colValue) - if colValue == 0 { - query = fmt.Sprintf("insert into %v (id, col) values (%v, NULL)", fkTables[tableId], idValue) + tableName := fkTables[tableId] + if tableName == "fk_t20" { + colValue := rand.Intn(1 + fz.maxValForCol) + col2Value := rand.Intn(1 + fz.maxValForCol) + return fmt.Sprintf("insert into %v (id, col, col2) values (%v, %v, %v)", tableName, idValue, convertColValueToString(colValue), convertColValueToString(col2Value)) + } else if isMultiColFkTable(tableName) { + colaValue := rand.Intn(1 + fz.maxValForCol) + colbValue := rand.Intn(1 + fz.maxValForCol) + return fmt.Sprintf("insert into %v (id, cola, colb) values (%v, %v, %v)", tableName, idValue, convertColValueToString(colaValue), convertColValueToString(colbValue)) + } else { + colValue := rand.Intn(1 + fz.maxValForCol) + return fmt.Sprintf("insert into %v (id, col) values (%v, %v)", tableName, idValue, convertColValueToString(colValue)) } - return query +} + +// convertColValueToString converts the given value to a string +func convertColValueToString(value int) string { + if value == 0 { + return "NULL" + } + return fmt.Sprintf("%d", value) } // generateUpdateQuery generates an UPDATE query from the parameters for the fuzzer. func (fz *fuzzer) generateUpdateQuery() string { tableId := rand.Intn(len(fkTables)) idValue := 1 + rand.Intn(fz.maxValForId) - colValue := rand.Intn(1 + fz.maxValForCol) - query := fmt.Sprintf("update %v set col = %v where id = %v", fkTables[tableId], colValue, idValue) - if colValue == 0 { - query = fmt.Sprintf("update %v set col = NULL where id = %v", fkTables[tableId], idValue) + tableName := fkTables[tableId] + if tableName == "fk_t20" { + colValue := rand.Intn(1 + fz.maxValForCol) + col2Value := rand.Intn(1 + fz.maxValForCol) + return fmt.Sprintf("update %v set col = %v, col2 = %v where id = %v", tableName, convertColValueToString(colValue), convertColValueToString(col2Value), idValue) + } else if isMultiColFkTable(tableName) { + colaValue := rand.Intn(1 + fz.maxValForCol) + colbValue := rand.Intn(1 + fz.maxValForCol) + return fmt.Sprintf("update %v set cola = %v, colb = %v where id = %v", tableName, convertColValueToString(colaValue), convertColValueToString(colbValue), idValue) + } else { + colValue := rand.Intn(1 + fz.maxValForCol) + return fmt.Sprintf("update %v set col = %v where id = %v", tableName, convertColValueToString(colValue), idValue) } - return query } // generateDeleteQuery generates a DELETE query from the parameters for the fuzzer. @@ -385,6 +407,9 @@ func verifyDataIsCorrect(t *testing.T, mcmp utils.MySQLCompare, concurrency int) // to make sure that nothing is broken in the database. for _, reference := range fkReferences { query := fmt.Sprintf("select %v.id from %v left join %v on (%v.col = %v.col) where %v.col is null and %v.col is not null", reference.childTable, reference.childTable, reference.parentTable, reference.parentTable, reference.childTable, reference.parentTable, reference.childTable) + if isMultiColFkTable(reference.childTable) { + query = fmt.Sprintf("select %v.id from %v left join %v on (%v.cola = %v.cola and %v.colb = %v.colb) where %v.cola is null and %v.cola is not null and %v.colb is not null", reference.childTable, reference.childTable, reference.parentTable, reference.parentTable, reference.childTable, reference.parentTable, reference.childTable, reference.parentTable, reference.childTable, reference.childTable) + } res, err := mcmp.VtConn.ExecuteFetch(query, 1000, false) require.NoError(t, err) require.Zerof(t, len(res.Rows), "Query %v gave non-empty results", query) @@ -419,7 +444,7 @@ func verifyDataMatches(t *testing.T, resOne []*sqltypes.Result, resTwo []*sqltyp require.EqualValues(t, len(resTwo), len(resOne), "Res 1 - %v, Res 2 - %v", resOne, resTwo) for idx, resultOne := range resOne { resultTwo := resTwo[idx] - require.True(t, resultOne.Equal(resultTwo), "Rows 1 - %v, Rows 2 - %v", resultOne.Rows, resultTwo.Rows) + require.True(t, resultOne.Equal(resultTwo), "Data for %v doesn't match\nRows 1\n%v\nRows 2\n%v", fkTables[idx], resultOne.Rows, resultTwo.Rows) } } diff --git a/go/test/endtoend/vtgate/foreignkey/main_test.go b/go/test/endtoend/vtgate/foreignkey/main_test.go index 2c7eb3f486b..6cfc53f441f 100644 --- a/go/test/endtoend/vtgate/foreignkey/main_test.go +++ b/go/test/endtoend/vtgate/foreignkey/main_test.go @@ -51,7 +51,9 @@ var ( unshardedVSchema string fkTables = []string{"fk_t1", "fk_t2", "fk_t3", "fk_t4", "fk_t5", "fk_t6", "fk_t7", - "fk_t10", "fk_t11", "fk_t12", "fk_t13", "fk_t15", "fk_t16", "fk_t17", "fk_t18", "fk_t19"} + "fk_t10", "fk_t11", "fk_t12", "fk_t13", "fk_t15", "fk_t16", "fk_t17", "fk_t18", "fk_t19", "fk_t20", + "fk_multicol_t1", "fk_multicol_t2", "fk_multicol_t3", "fk_multicol_t4", "fk_multicol_t5", "fk_multicol_t6", "fk_multicol_t7", + "fk_multicol_t10", "fk_multicol_t11", "fk_multicol_t12", "fk_multicol_t13", "fk_multicol_t15", "fk_multicol_t16", "fk_multicol_t17", "fk_multicol_t18", "fk_multicol_t19"} fkReferences = []fkReference{ {parentTable: "fk_t1", childTable: "fk_t2"}, {parentTable: "fk_t2", childTable: "fk_t7"}, @@ -66,6 +68,19 @@ var ( {parentTable: "fk_t16", childTable: "fk_t17"}, {parentTable: "fk_t17", childTable: "fk_t18"}, {parentTable: "fk_t17", childTable: "fk_t19"}, + {parentTable: "fk_multicol_t1", childTable: "fk_multicol_t2"}, + {parentTable: "fk_multicol_t2", childTable: "fk_multicol_t7"}, + {parentTable: "fk_multicol_t2", childTable: "fk_multicol_t3"}, + {parentTable: "fk_multicol_t3", childTable: "fk_multicol_t4"}, + {parentTable: "fk_multicol_t3", childTable: "fk_multicol_t6"}, + {parentTable: "fk_multicol_t4", childTable: "fk_multicol_t5"}, + {parentTable: "fk_multicol_t10", childTable: "fk_multicol_t11"}, + {parentTable: "fk_multicol_t11", childTable: "fk_multicol_t12"}, + {parentTable: "fk_multicol_t11", childTable: "fk_multicol_t13"}, + {parentTable: "fk_multicol_t15", childTable: "fk_multicol_t16"}, + {parentTable: "fk_multicol_t16", childTable: "fk_multicol_t17"}, + {parentTable: "fk_multicol_t17", childTable: "fk_multicol_t18"}, + {parentTable: "fk_multicol_t17", childTable: "fk_multicol_t19"}, } ) diff --git a/go/test/endtoend/vtgate/foreignkey/sharded_schema.sql b/go/test/endtoend/vtgate/foreignkey/sharded_schema.sql index b530c982904..c1f511350f2 100644 --- a/go/test/endtoend/vtgate/foreignkey/sharded_schema.sql +++ b/go/test/endtoend/vtgate/foreignkey/sharded_schema.sql @@ -294,4 +294,228 @@ create table fk_t20 primary key (id), index(col), foreign key (col2) references fk_t20(col) on delete restrict on update restrict -) Engine = InnoDB; \ No newline at end of file +) Engine = InnoDB; + +/* + * fk_multicol_t1 + * │ + * │ On Delete Restrict + * │ On Update Restrict + * ▼ + * ┌────────fk_multicol_t2───────────────┐ + * │ │ + * │On Delete Set Null │ On Delete Set Null + * │On Update Set Null │ On Update Set Null + * ▼ ▼ + * fk_multicol_t7 fk_multicol_t3───────────────────┐ + * │ │ + * │ │ On Delete Set Null + * On Delete Set Null │ │ On Update Set Null + * On Update Set Null │ │ + * ▼ ▼ + * fk_multicol_t4 fk_multicol_t6 + * │ + * │ + * On Delete Restrict │ + * On Update Restrict │ + * │ + * ▼ + * fk_multicol_t5 + */ +create table fk_multicol_t1 +( + id bigint, + colb varchar(10), + cola varchar(10), + primary key (id), + index(cola, colb) +) Engine = InnoDB; + +create table fk_multicol_t2 +( + id bigint, + colb varchar(10), + cola varchar(10), + primary key (id), + index(cola, colb), + foreign key (cola, colb) references fk_multicol_t1(cola, colb) on delete restrict on update restrict +) Engine = InnoDB; + +create table fk_multicol_t3 +( + id bigint, + colb varchar(10), + cola varchar(10), + primary key (id), + index(cola, colb), + foreign key (cola, colb) references fk_multicol_t2(cola, colb) on delete set null on update set null +) Engine = InnoDB; + +create table fk_multicol_t4 +( + id bigint, + colb varchar(10), + cola varchar(10), + primary key (id), + index(cola, colb), + foreign key (cola, colb) references fk_multicol_t3(cola, colb) on delete set null on update set null +) Engine = InnoDB; + +create table fk_multicol_t5 +( + id bigint, + colb varchar(10), + cola varchar(10), + primary key (id), + index(cola, colb), + foreign key (cola, colb) references fk_multicol_t4(cola, colb) on delete restrict on update restrict +) Engine = InnoDB; + +create table fk_multicol_t6 +( + id bigint, + colb varchar(10), + cola varchar(10), + primary key (id), + index(cola, colb), + foreign key (cola, colb) references fk_multicol_t3(cola, colb) on delete set null on update set null +) Engine = InnoDB; + +create table fk_multicol_t7 +( + id bigint, + colb varchar(10), + cola varchar(10), + primary key (id), + index(cola, colb), + foreign key (cola, colb) references fk_multicol_t2(cola, colb) on delete set null on update set null +) Engine = InnoDB; + +/* + * fk_multicol_t10 + * │ + * On Delete Cascade │ + * On Update Cascade │ + * │ + * ▼ + * fk_multicol_t11──────────────────┐ + * │ │ + * │ │ On Delete Restrict + * On Delete Cascade │ │ On Update Restrict + * On Update Cascade │ │ + * │ │ + * ▼ ▼ + * fk_multicol_t12 fk_multicol_t13 + */ + +create table fk_multicol_t10 +( + id bigint, + colb varchar(10), + cola varchar(10), + primary key (id), + index(cola, colb) +) Engine = InnoDB; + +create table fk_multicol_t11 +( + id bigint, + colb varchar(10), + cola varchar(10), + primary key (id), + index(cola, colb), + foreign key (cola, colb) references fk_multicol_t10(cola, colb) on delete cascade on update cascade +) Engine = InnoDB; + +create table fk_multicol_t12 +( + id bigint, + colb varchar(10), + cola varchar(10), + primary key (id), + index(cola, colb), + foreign key (cola, colb) references fk_multicol_t11(cola, colb) on delete cascade on update cascade +) Engine = InnoDB; + +create table fk_multicol_t13 +( + id bigint, + colb varchar(10), + cola varchar(10), + primary key (id), + index(cola, colb), + foreign key (cola, colb) references fk_multicol_t11(cola, colb) on delete restrict on update restrict +) Engine = InnoDB; + +/* + * fk_multicol_t15 + * │ + * │ + * On Delete Cascade │ + * On Update Cascade │ + * │ + * ▼ + * fk_multicol_t16 + * │ + * On Delete Set Null │ + * On Update Set Null │ + * │ + * ▼ + * fk_multicol_t17──────────────────┐ + * │ │ + * │ │ On Delete Set Null + * On Delete Cascade │ │ On Update Set Null + * On Update Cascade │ │ + * │ │ + * ▼ ▼ + * fk_multicol_t18 fk_multicol_t19 + */ + +create table fk_multicol_t15 +( + id bigint, + colb varchar(10), + cola varchar(10), + primary key (id), + index(cola, colb) +) Engine = InnoDB; + +create table fk_multicol_t16 +( + id bigint, + colb varchar(10), + cola varchar(10), + primary key (id), + index(cola, colb), + foreign key (cola, colb) references fk_multicol_t15(cola, colb) on delete cascade on update cascade +) Engine = InnoDB; + +create table fk_multicol_t17 +( + id bigint, + colb varchar(10), + cola varchar(10), + primary key (id), + index(cola, colb), + foreign key (cola, colb) references fk_multicol_t16(cola, colb) on delete set null on update set null +) Engine = InnoDB; + +create table fk_multicol_t18 +( + id bigint, + colb varchar(10), + cola varchar(10), + primary key (id), + index(cola, colb), + foreign key (cola, colb) references fk_multicol_t17(cola, colb) on delete cascade on update cascade +) Engine = InnoDB; + +create table fk_multicol_t19 +( + id bigint, + colb varchar(10), + cola varchar(10), + primary key (id), + index(cola, colb), + foreign key (cola, colb) references fk_multicol_t17(cola, colb) on delete set null on update set null +) Engine = InnoDB; diff --git a/go/test/endtoend/vtgate/foreignkey/sharded_vschema.json b/go/test/endtoend/vtgate/foreignkey/sharded_vschema.json index 074f08ce848..b40b10b88e0 100644 --- a/go/test/endtoend/vtgate/foreignkey/sharded_vschema.json +++ b/go/test/endtoend/vtgate/foreignkey/sharded_vschema.json @@ -214,6 +214,142 @@ "name": "xxhash" } ] + }, + "fk_t20": { + "column_vindexes": [ + { + "column": "id", + "name": "xxhash" + } + ] + }, + "fk_multicol_t1": { + "column_vindexes": [ + { + "column": "id", + "name": "xxhash" + } + ] + }, + "fk_multicol_t2": { + "column_vindexes": [ + { + "column": "id", + "name": "xxhash" + } + ] + }, + "fk_multicol_t3": { + "column_vindexes": [ + { + "column": "id", + "name": "xxhash" + } + ] + }, + "fk_multicol_t4": { + "column_vindexes": [ + { + "column": "id", + "name": "xxhash" + } + ] + }, + "fk_multicol_t5": { + "column_vindexes": [ + { + "column": "id", + "name": "xxhash" + } + ] + }, + "fk_multicol_t6": { + "column_vindexes": [ + { + "column": "id", + "name": "xxhash" + } + ] + }, + "fk_multicol_t7": { + "column_vindexes": [ + { + "column": "id", + "name": "xxhash" + } + ] + }, + "fk_multicol_t10": { + "column_vindexes": [ + { + "column": "id", + "name": "xxhash" + } + ] + }, + "fk_multicol_t11": { + "column_vindexes": [ + { + "column": "id", + "name": "xxhash" + } + ] + }, + "fk_multicol_t12": { + "column_vindexes": [ + { + "column": "id", + "name": "xxhash" + } + ] + }, + "fk_multicol_t13": { + "column_vindexes": [ + { + "column": "id", + "name": "xxhash" + } + ] + }, + "fk_multicol_t15": { + "column_vindexes": [ + { + "column": "id", + "name": "xxhash" + } + ] + }, + "fk_multicol_t16": { + "column_vindexes": [ + { + "column": "id", + "name": "xxhash" + } + ] + }, + "fk_multicol_t17": { + "column_vindexes": [ + { + "column": "id", + "name": "xxhash" + } + ] + }, + "fk_multicol_t18": { + "column_vindexes": [ + { + "column": "id", + "name": "xxhash" + } + ] + }, + "fk_multicol_t19": { + "column_vindexes": [ + { + "column": "id", + "name": "xxhash" + } + ] } } } \ No newline at end of file diff --git a/go/test/endtoend/vtgate/foreignkey/unsharded_schema.sql b/go/test/endtoend/vtgate/foreignkey/unsharded_schema.sql index dc6cba7bb08..3b4496d47fb 100644 --- a/go/test/endtoend/vtgate/foreignkey/unsharded_schema.sql +++ b/go/test/endtoend/vtgate/foreignkey/unsharded_schema.sql @@ -244,4 +244,229 @@ create table fk_t20 primary key (id), index(col), foreign key (col2) references fk_t20(col) on delete restrict on update restrict -) Engine = InnoDB; \ No newline at end of file +) Engine = InnoDB; + + +/* + * fk_multicol_t1 + * │ + * │ On Delete Restrict + * │ On Update Restrict + * ▼ + * ┌────────fk_multicol_t2───────────────┐ + * │ │ + * │On Delete Set Null │ On Delete Set Null + * │On Update Set Null │ On Update Set Null + * ▼ ▼ + * fk_multicol_t7 fk_multicol_t3───────────────────┐ + * │ │ + * │ │ On Delete Set Null + * On Delete Set Null │ │ On Update Set Null + * On Update Set Null │ │ + * ▼ ▼ + * fk_multicol_t4 fk_multicol_t6 + * │ + * │ + * On Delete Restrict │ + * On Update Restrict │ + * │ + * ▼ + * fk_multicol_t5 + */ +create table fk_multicol_t1 +( + id bigint, + colb varchar(10), + cola varchar(10), + primary key (id), + index(cola, colb) +) Engine = InnoDB; + +create table fk_multicol_t2 +( + id bigint, + colb varchar(10), + cola varchar(10), + primary key (id), + index(cola, colb), + foreign key (cola, colb) references fk_multicol_t1(cola, colb) on delete restrict on update restrict +) Engine = InnoDB; + +create table fk_multicol_t3 +( + id bigint, + colb varchar(10), + cola varchar(10), + primary key (id), + index(cola, colb), + foreign key (cola, colb) references fk_multicol_t2(cola, colb) on delete set null on update set null +) Engine = InnoDB; + +create table fk_multicol_t4 +( + id bigint, + colb varchar(10), + cola varchar(10), + primary key (id), + index(cola, colb), + foreign key (cola, colb) references fk_multicol_t3(cola, colb) on delete set null on update set null +) Engine = InnoDB; + +create table fk_multicol_t5 +( + id bigint, + colb varchar(10), + cola varchar(10), + primary key (id), + index(cola, colb), + foreign key (cola, colb) references fk_multicol_t4(cola, colb) on delete restrict on update restrict +) Engine = InnoDB; + +create table fk_multicol_t6 +( + id bigint, + colb varchar(10), + cola varchar(10), + primary key (id), + index(cola, colb), + foreign key (cola, colb) references fk_multicol_t3(cola, colb) on delete set null on update set null +) Engine = InnoDB; + +create table fk_multicol_t7 +( + id bigint, + colb varchar(10), + cola varchar(10), + primary key (id), + index(cola, colb), + foreign key (cola, colb) references fk_multicol_t2(cola, colb) on delete set null on update set null +) Engine = InnoDB; + +/* + * fk_multicol_t10 + * │ + * On Delete Cascade │ + * On Update Cascade │ + * │ + * ▼ + * fk_multicol_t11──────────────────┐ + * │ │ + * │ │ On Delete Restrict + * On Delete Cascade │ │ On Update Restrict + * On Update Cascade │ │ + * │ │ + * ▼ ▼ + * fk_multicol_t12 fk_multicol_t13 + */ + +create table fk_multicol_t10 +( + id bigint, + colb varchar(10), + cola varchar(10), + primary key (id), + index(cola, colb) +) Engine = InnoDB; + +create table fk_multicol_t11 +( + id bigint, + colb varchar(10), + cola varchar(10), + primary key (id), + index(cola, colb), + foreign key (cola, colb) references fk_multicol_t10(cola, colb) on delete cascade on update cascade +) Engine = InnoDB; + +create table fk_multicol_t12 +( + id bigint, + colb varchar(10), + cola varchar(10), + primary key (id), + index(cola, colb), + foreign key (cola, colb) references fk_multicol_t11(cola, colb) on delete cascade on update cascade +) Engine = InnoDB; + +create table fk_multicol_t13 +( + id bigint, + colb varchar(10), + cola varchar(10), + primary key (id), + index(cola, colb), + foreign key (cola, colb) references fk_multicol_t11(cola, colb) on delete restrict on update restrict +) Engine = InnoDB; + +/* + * fk_multicol_t15 + * │ + * │ + * On Delete Cascade │ + * On Update Cascade │ + * │ + * ▼ + * fk_multicol_t16 + * │ + * On Delete Set Null │ + * On Update Set Null │ + * │ + * ▼ + * fk_multicol_t17──────────────────┐ + * │ │ + * │ │ On Delete Set Null + * On Delete Cascade │ │ On Update Set Null + * On Update Cascade │ │ + * │ │ + * ▼ ▼ + * fk_multicol_t18 fk_multicol_t19 + */ + +create table fk_multicol_t15 +( + id bigint, + colb varchar(10), + cola varchar(10), + primary key (id), + index(cola, colb) +) Engine = InnoDB; + +create table fk_multicol_t16 +( + id bigint, + colb varchar(10), + cola varchar(10), + primary key (id), + index(cola, colb), + foreign key (cola, colb) references fk_multicol_t15(cola, colb) on delete cascade on update cascade +) Engine = InnoDB; + +create table fk_multicol_t17 +( + id bigint, + colb varchar(10), + cola varchar(10), + primary key (id), + index(cola, colb), + foreign key (cola, colb) references fk_multicol_t16(cola, colb) on delete set null on update set null +) Engine = InnoDB; + +create table fk_multicol_t18 +( + id bigint, + colb varchar(10), + cola varchar(10), + primary key (id), + index(cola, colb), + foreign key (cola, colb) references fk_multicol_t17(cola, colb) on delete cascade on update cascade +) Engine = InnoDB; + +create table fk_multicol_t19 +( + id bigint, + colb varchar(10), + cola varchar(10), + primary key (id), + index(cola, colb), + foreign key (cola, colb) references fk_multicol_t17(cola, colb) on delete set null on update set null +) Engine = InnoDB; diff --git a/go/test/endtoend/vtgate/foreignkey/unsharded_vschema.json b/go/test/endtoend/vtgate/foreignkey/unsharded_vschema.json index c0d2368849f..fbdc3dd7c04 100644 --- a/go/test/endtoend/vtgate/foreignkey/unsharded_vschema.json +++ b/go/test/endtoend/vtgate/foreignkey/unsharded_vschema.json @@ -19,6 +19,23 @@ "fk_t16": {}, "fk_t17": {}, "fk_t18": {}, - "fk_t19": {} + "fk_t19": {}, + "fk_t20": {}, + "fk_multicol_t1": {}, + "fk_multicol_t2": {}, + "fk_multicol_t3": {}, + "fk_multicol_t4": {}, + "fk_multicol_t5": {}, + "fk_multicol_t6": {}, + "fk_multicol_t7": {}, + "fk_multicol_t10": {}, + "fk_multicol_t11": {}, + "fk_multicol_t12": {}, + "fk_multicol_t13": {}, + "fk_multicol_t15": {}, + "fk_multicol_t16": {}, + "fk_multicol_t17": {}, + "fk_multicol_t18": {}, + "fk_multicol_t19": {} } } \ No newline at end of file diff --git a/go/test/endtoend/vtgate/foreignkey/utils_test.go b/go/test/endtoend/vtgate/foreignkey/utils_test.go index 2178bb54f06..b2297ff188b 100644 --- a/go/test/endtoend/vtgate/foreignkey/utils_test.go +++ b/go/test/endtoend/vtgate/foreignkey/utils_test.go @@ -18,6 +18,7 @@ package foreignkey import ( "fmt" + "strings" "testing" "github.com/stretchr/testify/require" @@ -34,6 +35,11 @@ func getTestName(testName string, testSharded bool) string { return "Unsharded - " + testName } +// isMultiColFkTable tells if the table is a multicol table or not. +func isMultiColFkTable(tableName string) bool { + return strings.Contains(tableName, "multicol") +} + // waitForSchemaTrackingForFkTables waits for schema tracking to have run and seen the tables used // for foreign key tests. func waitForSchemaTrackingForFkTables(t *testing.T) { From 055765bb5f2cc26c9a64c00b1e16f034b1c7f603 Mon Sep 17 00:00:00 2001 From: Manan Gupta Date: Fri, 15 Sep 2023 12:42:33 +0530 Subject: [PATCH 05/10] feat: wait for replica to be caught up before verifying that data matches Signed-off-by: Manan Gupta --- .../endtoend/vtgate/foreignkey/fk_fuzz_test.go | 2 ++ .../endtoend/vtgate/foreignkey/utils_test.go | 18 ++++++++++++++++++ 2 files changed, 20 insertions(+) diff --git a/go/test/endtoend/vtgate/foreignkey/fk_fuzz_test.go b/go/test/endtoend/vtgate/foreignkey/fk_fuzz_test.go index 241353b47f9..fc27aa221cb 100644 --- a/go/test/endtoend/vtgate/foreignkey/fk_fuzz_test.go +++ b/go/test/endtoend/vtgate/foreignkey/fk_fuzz_test.go @@ -428,6 +428,8 @@ func verifyDataIsCorrect(t *testing.T, mcmp utils.MySQLCompare, concurrency int) } require.NotNil(t, primaryTab) require.NotNil(t, replicaTab) + checkReplicationHealthy(t, replicaTab) + cluster.WaitForReplicationPos(t, primaryTab, replicaTab, "localhost", 60.0) primaryConn, err := utils.GetMySQLConn(primaryTab, fmt.Sprintf("vt_%v", keyspace.Name)) require.NoError(t, err) replicaConn, err := utils.GetMySQLConn(replicaTab, fmt.Sprintf("vt_%v", keyspace.Name)) diff --git a/go/test/endtoend/vtgate/foreignkey/utils_test.go b/go/test/endtoend/vtgate/foreignkey/utils_test.go index b2297ff188b..e8e66df5c45 100644 --- a/go/test/endtoend/vtgate/foreignkey/utils_test.go +++ b/go/test/endtoend/vtgate/foreignkey/utils_test.go @@ -92,3 +92,21 @@ func removeAllForeignKeyConstraints(t *testing.T, vttablet *cluster.Vttablet, ke err = utils.RunSQLs(t, queries, vttablet, fmt.Sprintf("vt_%v", keyspace)) require.NoError(t, err) } + +// checkReplicationHealthy verifies that the replication on the given vttablet is working as expected. +func checkReplicationHealthy(t *testing.T, vttablet *cluster.Vttablet) { + rs, err := utils.RunSQL(t, "show replica status", vttablet, "") + require.NoError(t, err) + var ioThreadRunning, sqlThreadRunning string + for idx, value := range rs.Rows[0] { + fieldName := rs.Fields[idx].Name + if fieldName == "Replica_IO_Running" { + ioThreadRunning = value.ToString() + } + if fieldName == "Replica_SQL_Running" { + sqlThreadRunning = value.ToString() + } + } + require.Equal(t, "Yes", sqlThreadRunning, "SQL Thread isn't happy on %v, Replica status - %v", vttablet.Alias, rs.Rows) + require.Equal(t, "Yes", ioThreadRunning, "IO Thread isn't happy on %v, Replica status - %v", vttablet.Alias, rs.Rows) +} From f518818506db7cf5c5a9a434af394d46c6f0a921 Mon Sep 17 00:00:00 2001 From: Manan Gupta Date: Mon, 18 Sep 2023 17:02:14 +0530 Subject: [PATCH 06/10] feat: augment the test with prepared statements too Signed-off-by: Manan Gupta --- go/test/endtoend/utils/mysql.go | 8 +- .../vtgate/foreignkey/fk_fuzz_test.go | 253 +++++++++++++----- .../endtoend/vtgate/foreignkey/main_test.go | 6 +- 3 files changed, 195 insertions(+), 72 deletions(-) diff --git a/go/test/endtoend/utils/mysql.go b/go/test/endtoend/utils/mysql.go index 6e85ec6bdf7..de8ce40f992 100644 --- a/go/test/endtoend/utils/mysql.go +++ b/go/test/endtoend/utils/mysql.go @@ -208,13 +208,17 @@ func compareVitessAndMySQLResults(t *testing.T, query string, vtConn *mysql.Conn for _, row := range vtQr.Rows { errStr += fmt.Sprintf("%s\n", row) } + errStr += fmt.Sprintf("Vitess RowsAffected: %v\n", vtQr.RowsAffected) errStr += "MySQL Results:\n" for _, row := range mysqlQr.Rows { errStr += fmt.Sprintf("%s\n", row) } + errStr += fmt.Sprintf("MySQL RowsAffected: %v\n", mysqlQr.RowsAffected) if vtConn != nil { - qr := Exec(t, vtConn, fmt.Sprintf("vexplain plan %s", query)) - errStr += fmt.Sprintf("query plan: \n%s\n", qr.Rows[0][0].ToString()) + qr, _ := ExecAllowError(t, vtConn, fmt.Sprintf("vexplain plan %s", query)) + if qr != nil && len(qr.Rows) > 0 { + errStr += fmt.Sprintf("query plan: \n%s\n", qr.Rows[0][0].ToString()) + } } t.Error(errStr) return errors.New(errStr) diff --git a/go/test/endtoend/vtgate/foreignkey/fk_fuzz_test.go b/go/test/endtoend/vtgate/foreignkey/fk_fuzz_test.go index fc27aa221cb..ba86104a295 100644 --- a/go/test/endtoend/vtgate/foreignkey/fk_fuzz_test.go +++ b/go/test/endtoend/vtgate/foreignkey/fk_fuzz_test.go @@ -36,12 +36,13 @@ import ( // fuzzer runs threads that runs queries against the databases. // It has parameters that define the way the queries are constructed. type fuzzer struct { - maxValForId int - maxValForCol int - insertShare int - deleteShare int - updateShare int - concurrency int + maxValForId int + maxValForCol int + insertShare int + deleteShare int + updateShare int + concurrency int + preparedStmts bool // shouldStop is an internal state variable, that tells the fuzzer // whether it should stop or not. @@ -54,21 +55,23 @@ type fuzzer struct { // debugInfo stores the debugging information we can collect after a failure happens. type debugInfo struct { - queryToFail string + // This can be a list of queries for prepared statements. + queryToFail []string vitessState []*sqltypes.Result mysqlState []*sqltypes.Result } // newFuzzer creates a new fuzzer struct. -func newFuzzer(concurrency int, maxValForId int, maxValForCol int, insertShare int, deleteShare int, updateShare int) *fuzzer { +func newFuzzer(concurrency int, maxValForId int, maxValForCol int, insertShare int, deleteShare int, updateShare int, preparedStmts bool) *fuzzer { fz := &fuzzer{ - concurrency: concurrency, - maxValForId: maxValForId, - maxValForCol: maxValForCol, - insertShare: insertShare, - deleteShare: deleteShare, - updateShare: updateShare, - wg: sync.WaitGroup{}, + concurrency: concurrency, + maxValForId: maxValForId, + maxValForCol: maxValForCol, + insertShare: insertShare, + deleteShare: deleteShare, + updateShare: updateShare, + preparedStmts: preparedStmts, + wg: sync.WaitGroup{}, } // Initially the fuzzer thread is stopped. fz.shouldStop.Store(true) @@ -76,19 +79,30 @@ func newFuzzer(concurrency int, maxValForId int, maxValForCol int, insertShare i } // generateQuery generates a query from the parameters for the fuzzer. -func (fz *fuzzer) generateQuery() string { +// The returned set is a list of strings, because for prepared statements, we have to run +// set queries first and then the final query eventually. +func (fz *fuzzer) generateQuery() []string { val := rand.Intn(fz.insertShare + fz.updateShare + fz.deleteShare) if val < fz.insertShare { - return fz.generateInsertQuery() + if fz.preparedStmts { + return fz.getPreparedInsertQueries() + } + return []string{fz.generateInsertDMLQuery()} } if val < fz.insertShare+fz.updateShare { - return fz.generateUpdateQuery() + if fz.preparedStmts { + return fz.getPreparedUpdateQueries() + } + return []string{fz.generateUpdateDMLQuery()} } - return fz.generateDeleteQuery() + if fz.preparedStmts { + return fz.getPreparedDeleteQueries() + } + return []string{fz.generateDeleteDMLQuery()} } -// generateInsertQuery generates an INSERT query from the parameters for the fuzzer. -func (fz *fuzzer) generateInsertQuery() string { +// generateInsertDMLQuery generates an INSERT query from the parameters for the fuzzer. +func (fz *fuzzer) generateInsertDMLQuery() string { tableId := rand.Intn(len(fkTables)) idValue := 1 + rand.Intn(fz.maxValForId) tableName := fkTables[tableId] @@ -114,8 +128,8 @@ func convertColValueToString(value int) string { return fmt.Sprintf("%d", value) } -// generateUpdateQuery generates an UPDATE query from the parameters for the fuzzer. -func (fz *fuzzer) generateUpdateQuery() string { +// generateUpdateDMLQuery generates an UPDATE query from the parameters for the fuzzer. +func (fz *fuzzer) generateUpdateDMLQuery() string { tableId := rand.Intn(len(fkTables)) idValue := 1 + rand.Intn(fz.maxValForId) tableName := fkTables[tableId] @@ -133,8 +147,8 @@ func (fz *fuzzer) generateUpdateQuery() string { } } -// generateDeleteQuery generates a DELETE query from the parameters for the fuzzer. -func (fz *fuzzer) generateDeleteQuery() string { +// generateDeleteDMLQuery generates a DELETE query from the parameters for the fuzzer. +func (fz *fuzzer) generateDeleteDMLQuery() string { tableId := rand.Intn(len(fkTables)) idValue := 1 + rand.Intn(fz.maxValForId) query := fmt.Sprintf("delete from %v where id = %v", fkTables[tableId], idValue) @@ -174,23 +188,26 @@ func (fz *fuzzer) runFuzzerThread(t *testing.T, sharded bool, fuzzerThreadId int return } // Get a query and execute it. - query := fz.generateQuery() - // When the concurrency is 1, then we run the query both on MySQL and Vitess. - if fz.concurrency == 1 { - _, _ = mcmp.ExecAllowAndCompareError(query) - // If t is marked failed, we have encountered our first failure. - // Lets collect the required information and finish execution. - if t.Failed() { - fz.firstFailureInfo = &debugInfo{ - queryToFail: query, - mysqlState: collectFkTablesState(mcmp.MySQLConn), - vitessState: collectFkTablesState(mcmp.VtConn), + queries := fz.generateQuery() + // We get a set of queries only when we are using prepared statements, which require running `SET` queries before running the actual DML query. + for _, query := range queries { + // When the concurrency is 1, then we run the query both on MySQL and Vitess. + if fz.concurrency == 1 { + _, _ = mcmp.ExecAllowAndCompareError(query) + // If t is marked failed, we have encountered our first failure. + // Let's collect the required information and finish execution. + if t.Failed() { + fz.firstFailureInfo = &debugInfo{ + queryToFail: queries, + mysqlState: collectFkTablesState(mcmp.MySQLConn), + vitessState: collectFkTablesState(mcmp.VtConn), + } + return } - return + } else { + // When we are running concurrent threads, then we run all the queries on Vitess. + _, _ = utils.ExecAllowError(t, mcmp.VtConn, query) } - } else { - // When we are running concurrent threads, then we run all the queries on Vitess. - _, _ = utils.ExecAllowError(t, mcmp.VtConn, query) } } } @@ -203,6 +220,89 @@ func (fz *fuzzer) stop() { fz.wg.Wait() } +// getPreparedDeleteQueries gets the list of queries to run for executing an DELETE using prepared statements. +func (fz *fuzzer) getPreparedDeleteQueries() []string { + tableId := rand.Intn(len(fkTables)) + idValue := 1 + rand.Intn(fz.maxValForId) + return []string{ + fmt.Sprintf("prepare stmt_del from 'delete from %v where id = ?'", fkTables[tableId]), + fmt.Sprintf("SET @id = %v", idValue), + "execute stmt_del using @id", + } +} + +// getPreparedInsertQueries gets the list of queries to run for executing an INSERT using prepared statements. +func (fz *fuzzer) getPreparedInsertQueries() []string { + tableId := rand.Intn(len(fkTables)) + idValue := 1 + rand.Intn(fz.maxValForId) + tableName := fkTables[tableId] + if tableName == "fk_t20" { + colValue := rand.Intn(1 + fz.maxValForCol) + col2Value := rand.Intn(1 + fz.maxValForCol) + return []string{ + "prepare stmt_insert from 'insert into fk_t20 (id, col, col2) values (?, ?, ?)'", + fmt.Sprintf("SET @id = %v", idValue), + fmt.Sprintf("SET @col = %v", convertColValueToString(colValue)), + fmt.Sprintf("SET @col2 = %v", convertColValueToString(col2Value)), + "execute stmt_insert using @id, @col, @col2", + } + } else if isMultiColFkTable(tableName) { + colaValue := rand.Intn(1 + fz.maxValForCol) + colbValue := rand.Intn(1 + fz.maxValForCol) + return []string{ + fmt.Sprintf("prepare stmt_insert from 'insert into %v (id, cola, colb) values (?, ?, ?)'", tableName), + fmt.Sprintf("SET @id = %v", idValue), + fmt.Sprintf("SET @cola = %v", convertColValueToString(colaValue)), + fmt.Sprintf("SET @colb = %v", convertColValueToString(colbValue)), + "execute stmt_insert using @id, @cola, @colb", + } + } else { + colValue := rand.Intn(1 + fz.maxValForCol) + return []string{ + fmt.Sprintf("prepare stmt_insert from 'insert into %v (id, col) values (?, ?)'", tableName), + fmt.Sprintf("SET @id = %v", idValue), + fmt.Sprintf("SET @col = %v", convertColValueToString(colValue)), + "execute stmt_insert using @id, @col", + } + } +} + +// getPreparedUpdateQueries gets the list of queries to run for executing an UPDATE using prepared statements. +func (fz *fuzzer) getPreparedUpdateQueries() []string { + tableId := rand.Intn(len(fkTables)) + idValue := 1 + rand.Intn(fz.maxValForId) + tableName := fkTables[tableId] + if tableName == "fk_t20" { + colValue := rand.Intn(1 + fz.maxValForCol) + col2Value := rand.Intn(1 + fz.maxValForCol) + return []string{ + "prepare stmt_update from 'update fk_t20 set col = ?, col2 = ? where id = ?'", + fmt.Sprintf("SET @id = %v", idValue), + fmt.Sprintf("SET @col = %v", convertColValueToString(colValue)), + fmt.Sprintf("SET @col2 = %v", convertColValueToString(col2Value)), + "execute stmt_update using @col, @col2, @id", + } + } else if isMultiColFkTable(tableName) { + colaValue := rand.Intn(1 + fz.maxValForCol) + colbValue := rand.Intn(1 + fz.maxValForCol) + return []string{ + fmt.Sprintf("prepare stmt_update from 'update %v set cola = ?, colb = ? where id = ?'", tableName), + fmt.Sprintf("SET @id = %v", idValue), + fmt.Sprintf("SET @cola = %v", convertColValueToString(colaValue)), + fmt.Sprintf("SET @colb = %v", convertColValueToString(colbValue)), + "execute stmt_update using @cola, @colb, @id", + } + } else { + colValue := rand.Intn(1 + fz.maxValForCol) + return []string{ + fmt.Sprintf("prepare stmt_update from 'update %v set col = ? where id = ?'", tableName), + fmt.Sprintf("SET @id = %v", idValue), + fmt.Sprintf("SET @col = %v", convertColValueToString(colValue)), + "execute stmt_update using @col, @id", + } + } +} + // TestFkFuzzTest is a fuzzer test that works by querying the database concurrently. // We have a pre-written set of query templates that we will use, but the data in the queries will // be randomly generated. The intent is that we hammer the database as a real-world application would @@ -353,42 +453,61 @@ func TestFkFuzzTest(t *testing.T) { for _, tt := range testcases { for _, testSharded := range []bool{false, true} { - t.Run(getTestName(tt.name, testSharded), func(t *testing.T) { - mcmp, closer := start(t) - defer closer() - // Set the correct keyspace to use from VtGates. - if testSharded { - t.Skip("Skip test since we don't have sharded foreign key support yet") - _ = utils.Exec(t, mcmp.VtConn, "use `ks`") - } else { - _ = utils.Exec(t, mcmp.VtConn, "use `uks`") - } + for _, preparedStmts := range []bool{false, true} { + t.Run(getTestName(tt.name, testSharded)+fmt.Sprintf(" Prepared - %v", preparedStmts), func(t *testing.T) { + mcmp, closer := start(t) + defer closer() + // Set the correct keyspace to use from VtGates. + if testSharded { + t.Skip("Skip test since we don't have sharded foreign key support yet") + _ = utils.Exec(t, mcmp.VtConn, "use `ks`") + } else { + _ = utils.Exec(t, mcmp.VtConn, "use `uks`") + } + // Ensure that the Vitess database is originally empty + ensureDatabaseState(t, mcmp.VtConn, true) + ensureDatabaseState(t, mcmp.MySQLConn, true) - // Create the fuzzer. - fz := newFuzzer(tt.concurrency, tt.maxValForId, tt.maxValForCol, tt.insertShare, tt.deleteShare, tt.updateShare) + // Create the fuzzer. + fz := newFuzzer(tt.concurrency, tt.maxValForId, tt.maxValForCol, tt.insertShare, tt.deleteShare, tt.updateShare, preparedStmts) - // Start the fuzzer. - fz.start(t, testSharded) + // Start the fuzzer. + fz.start(t, testSharded) - // Wait for the timeForTesting so that the threads continue to run. - time.Sleep(tt.timeForTesting) + // Wait for the timeForTesting so that the threads continue to run. + time.Sleep(tt.timeForTesting) - fz.stop() + fz.stop() - // We encountered an error while running the fuzzer. Let's print out the information! - if fz.firstFailureInfo != nil { - log.Errorf("Failing query - %v", fz.firstFailureInfo.queryToFail) - for idx, table := range fkTables { - log.Errorf("MySQL data for %v -\n%v", table, fz.firstFailureInfo.mysqlState[idx].Rows) - log.Errorf("Vitess data for %v -\n%v", table, fz.firstFailureInfo.vitessState[idx].Rows) + // We encountered an error while running the fuzzer. Let's print out the information! + if fz.firstFailureInfo != nil { + log.Errorf("Failing query - %v", fz.firstFailureInfo.queryToFail) + for idx, table := range fkTables { + log.Errorf("MySQL data for %v -\n%v", table, fz.firstFailureInfo.mysqlState[idx].Rows) + log.Errorf("Vitess data for %v -\n%v", table, fz.firstFailureInfo.vitessState[idx].Rows) + } } - } - // Verify the consistency of the data. - verifyDataIsCorrect(t, mcmp, tt.concurrency) - }) + // ensure Vitess database has some data. This ensures not all the commands failed. + ensureDatabaseState(t, mcmp.VtConn, false) + // Verify the consistency of the data. + verifyDataIsCorrect(t, mcmp, tt.concurrency) + }) + } + } + } +} + +// ensureDatabaseState ensures that the database is either empty or not. +func ensureDatabaseState(t *testing.T, vtconn *mysql.Conn, empty bool) { + results := collectFkTablesState(vtconn) + isEmpty := true + for _, res := range results { + if len(res.Rows) > 0 { + isEmpty = false } } + require.Equal(t, isEmpty, empty) } // verifyDataIsCorrect verifies that the data in MySQL database matches the data in the Vitess database. diff --git a/go/test/endtoend/vtgate/foreignkey/main_test.go b/go/test/endtoend/vtgate/foreignkey/main_test.go index 6cfc53f441f..d083e12536b 100644 --- a/go/test/endtoend/vtgate/foreignkey/main_test.go +++ b/go/test/endtoend/vtgate/foreignkey/main_test.go @@ -162,17 +162,17 @@ func start(t *testing.T) (utils.MySQLCompare, func()) { tables := []string{"t4", "t3", "t2", "t1", "multicol_tbl2", "multicol_tbl1"} tables = append(tables, fkTables...) for _, table := range tables { - _, _ = mcmp.ExecAndIgnore("delete from " + table) + _, _ = mcmp.ExecAndIgnore("delete /*+ SET_VAR(foreign_key_checks=OFF) */ from " + table) } _ = utils.Exec(t, mcmp.VtConn, "use `ks/80-`") for _, table := range tables { - _, _ = mcmp.ExecAndIgnore("delete from " + table) + _, _ = mcmp.ExecAndIgnore("delete /*+ SET_VAR(foreign_key_checks=OFF) */ from " + table) } _ = utils.Exec(t, mcmp.VtConn, "use `uks`") tables = []string{"u_t1", "u_t2", "u_t3"} tables = append(tables, fkTables...) for _, table := range tables { - _, _ = mcmp.ExecAndIgnore("delete from " + table) + _, _ = mcmp.ExecAndIgnore("delete /*+ SET_VAR(foreign_key_checks=OFF) */ from " + table) } _ = utils.Exec(t, mcmp.VtConn, "use `ks`") } From d767319783cd67bc4e997f106e9b8103f12e6d38 Mon Sep 17 00:00:00 2001 From: Manan Gupta Date: Fri, 22 Sep 2023 13:17:05 +0530 Subject: [PATCH 07/10] feat: fix build issue in test Signed-off-by: Manan Gupta --- go/test/endtoend/vtgate/foreignkey/fk_fuzz_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/test/endtoend/vtgate/foreignkey/fk_fuzz_test.go b/go/test/endtoend/vtgate/foreignkey/fk_fuzz_test.go index ba86104a295..a071a05ed08 100644 --- a/go/test/endtoend/vtgate/foreignkey/fk_fuzz_test.go +++ b/go/test/endtoend/vtgate/foreignkey/fk_fuzz_test.go @@ -548,7 +548,7 @@ func verifyDataIsCorrect(t *testing.T, mcmp utils.MySQLCompare, concurrency int) require.NotNil(t, primaryTab) require.NotNil(t, replicaTab) checkReplicationHealthy(t, replicaTab) - cluster.WaitForReplicationPos(t, primaryTab, replicaTab, "localhost", 60.0) + cluster.WaitForReplicationPos(t, primaryTab, replicaTab, true, 60.0) primaryConn, err := utils.GetMySQLConn(primaryTab, fmt.Sprintf("vt_%v", keyspace.Name)) require.NoError(t, err) replicaConn, err := utils.GetMySQLConn(replicaTab, fmt.Sprintf("vt_%v", keyspace.Name)) From 429ae963b7f37e7a9b0b4e1fe557ca86713539a6 Mon Sep 17 00:00:00 2001 From: Manan Gupta Date: Fri, 22 Sep 2023 13:26:57 +0530 Subject: [PATCH 08/10] test: queryFormat as a type instead of a preparedstaement boolean Signed-off-by: Manan Gupta --- .../vtgate/foreignkey/fk_fuzz_test.go | 70 ++++++++++++------- 1 file changed, 45 insertions(+), 25 deletions(-) diff --git a/go/test/endtoend/vtgate/foreignkey/fk_fuzz_test.go b/go/test/endtoend/vtgate/foreignkey/fk_fuzz_test.go index a071a05ed08..2b4d59fd404 100644 --- a/go/test/endtoend/vtgate/foreignkey/fk_fuzz_test.go +++ b/go/test/endtoend/vtgate/foreignkey/fk_fuzz_test.go @@ -33,16 +33,24 @@ import ( "vitess.io/vitess/go/vt/log" ) +type QueryFormat string + +const ( + SQLQueries QueryFormat = "SQLQueries" + PreparedStatmentQueries QueryFormat = "PreparedStatmentQueries" + PreparedStatementPacket QueryFormat = "PreparedStatementPacket" +) + // fuzzer runs threads that runs queries against the databases. // It has parameters that define the way the queries are constructed. type fuzzer struct { - maxValForId int - maxValForCol int - insertShare int - deleteShare int - updateShare int - concurrency int - preparedStmts bool + maxValForId int + maxValForCol int + insertShare int + deleteShare int + updateShare int + concurrency int + queryFormat QueryFormat // shouldStop is an internal state variable, that tells the fuzzer // whether it should stop or not. @@ -62,16 +70,16 @@ type debugInfo struct { } // newFuzzer creates a new fuzzer struct. -func newFuzzer(concurrency int, maxValForId int, maxValForCol int, insertShare int, deleteShare int, updateShare int, preparedStmts bool) *fuzzer { +func newFuzzer(concurrency int, maxValForId int, maxValForCol int, insertShare int, deleteShare int, updateShare int, queryFormat QueryFormat) *fuzzer { fz := &fuzzer{ - concurrency: concurrency, - maxValForId: maxValForId, - maxValForCol: maxValForCol, - insertShare: insertShare, - deleteShare: deleteShare, - updateShare: updateShare, - preparedStmts: preparedStmts, - wg: sync.WaitGroup{}, + concurrency: concurrency, + maxValForId: maxValForId, + maxValForCol: maxValForCol, + insertShare: insertShare, + deleteShare: deleteShare, + updateShare: updateShare, + queryFormat: queryFormat, + wg: sync.WaitGroup{}, } // Initially the fuzzer thread is stopped. fz.shouldStop.Store(true) @@ -84,21 +92,33 @@ func newFuzzer(concurrency int, maxValForId int, maxValForCol int, insertShare i func (fz *fuzzer) generateQuery() []string { val := rand.Intn(fz.insertShare + fz.updateShare + fz.deleteShare) if val < fz.insertShare { - if fz.preparedStmts { + switch fz.queryFormat { + case SQLQueries: + return []string{fz.generateInsertDMLQuery()} + case PreparedStatmentQueries: return fz.getPreparedInsertQueries() + default: + panic("Unknown query type") } - return []string{fz.generateInsertDMLQuery()} } if val < fz.insertShare+fz.updateShare { - if fz.preparedStmts { + switch fz.queryFormat { + case SQLQueries: + return []string{fz.generateUpdateDMLQuery()} + case PreparedStatmentQueries: return fz.getPreparedUpdateQueries() + default: + panic("Unknown query type") } - return []string{fz.generateUpdateDMLQuery()} } - if fz.preparedStmts { + switch fz.queryFormat { + case SQLQueries: + return []string{fz.generateDeleteDMLQuery()} + case PreparedStatmentQueries: return fz.getPreparedDeleteQueries() + default: + panic("Unknown query type") } - return []string{fz.generateDeleteDMLQuery()} } // generateInsertDMLQuery generates an INSERT query from the parameters for the fuzzer. @@ -453,8 +473,8 @@ func TestFkFuzzTest(t *testing.T) { for _, tt := range testcases { for _, testSharded := range []bool{false, true} { - for _, preparedStmts := range []bool{false, true} { - t.Run(getTestName(tt.name, testSharded)+fmt.Sprintf(" Prepared - %v", preparedStmts), func(t *testing.T) { + for _, queryFormat := range []QueryFormat{SQLQueries, PreparedStatmentQueries, PreparedStatementPacket} { + t.Run(getTestName(tt.name, testSharded)+fmt.Sprintf(" QueryFormat - %v", queryFormat), func(t *testing.T) { mcmp, closer := start(t) defer closer() // Set the correct keyspace to use from VtGates. @@ -469,7 +489,7 @@ func TestFkFuzzTest(t *testing.T) { ensureDatabaseState(t, mcmp.MySQLConn, true) // Create the fuzzer. - fz := newFuzzer(tt.concurrency, tt.maxValForId, tt.maxValForCol, tt.insertShare, tt.deleteShare, tt.updateShare, preparedStmts) + fz := newFuzzer(tt.concurrency, tt.maxValForId, tt.maxValForCol, tt.insertShare, tt.deleteShare, tt.updateShare, queryFormat) // Start the fuzzer. fz.start(t, testSharded) From 27a2c066115c61b9bf99bda18ffc8ba1c64ca533 Mon Sep 17 00:00:00 2001 From: Manan Gupta Date: Sat, 23 Sep 2023 17:19:25 +0530 Subject: [PATCH 09/10] feat: add prepared statements using COM_PREPARED as packets as a new query format Signed-off-by: Manan Gupta --- .../vtgate/foreignkey/fk_fuzz_test.go | 171 ++++++++++++++++-- .../endtoend/vtgate/foreignkey/utils_test.go | 32 ++++ 2 files changed, 184 insertions(+), 19 deletions(-) diff --git a/go/test/endtoend/vtgate/foreignkey/fk_fuzz_test.go b/go/test/endtoend/vtgate/foreignkey/fk_fuzz_test.go index 2b4d59fd404..134b9cfa180 100644 --- a/go/test/endtoend/vtgate/foreignkey/fk_fuzz_test.go +++ b/go/test/endtoend/vtgate/foreignkey/fk_fuzz_test.go @@ -17,6 +17,7 @@ limitations under the License. package foreignkey import ( + "database/sql" "fmt" "math/rand" "sync" @@ -24,6 +25,7 @@ import ( "testing" "time" + _ "github.com/go-sql-driver/mysql" "github.com/stretchr/testify/require" "vitess.io/vitess/go/mysql" @@ -194,42 +196,116 @@ func (fz *fuzzer) runFuzzerThread(t *testing.T, sharded bool, fuzzerThreadId int defer func() { fz.wg.Done() }() + // Create a MySQL Compare that connects to both Vitess and MySQL and runs the queries against both. mcmp, err := utils.NewMySQLCompare(t, vtParams, mysqlParams) require.NoError(t, err) + var vitessDb, mysqlDb *sql.DB + if fz.queryFormat == PreparedStatementPacket { + // Open another connection to Vitess using the go-sql-driver so that we can send prepared statements as COM_STMT_PREPARE packets. + vitessDb, err = sql.Open("mysql", fmt.Sprintf("@tcp(%s:%v)/%s", vtParams.Host, vtParams.Port, vtParams.DbName)) + require.NoError(t, err) + defer vitessDb.Close() + // Open a similar connection to MySQL + mysqlDb, err = sql.Open("mysql", fmt.Sprintf("%v:%v@unix(%s)/%s", mysqlParams.Uname, mysqlParams.Pass, mysqlParams.UnixSocket, mysqlParams.DbName)) + require.NoError(t, err) + defer mysqlDb.Close() + } // Set the correct keyspace to use from VtGates. if sharded { _ = utils.Exec(t, mcmp.VtConn, "use `ks`") + if vitessDb != nil { + _, _ = vitessDb.Exec("use `ks`") + } } else { _ = utils.Exec(t, mcmp.VtConn, "use `uks`") + if vitessDb != nil { + _, _ = vitessDb.Exec("use `uks`") + } } for { // If fuzzer thread is marked to be stopped, then we should exit this go routine. if fz.shouldStop.Load() == true { return } - // Get a query and execute it. - queries := fz.generateQuery() - // We get a set of queries only when we are using prepared statements, which require running `SET` queries before running the actual DML query. - for _, query := range queries { - // When the concurrency is 1, then we run the query both on MySQL and Vitess. - if fz.concurrency == 1 { - _, _ = mcmp.ExecAllowAndCompareError(query) - // If t is marked failed, we have encountered our first failure. - // Let's collect the required information and finish execution. - if t.Failed() { - fz.firstFailureInfo = &debugInfo{ - queryToFail: queries, - mysqlState: collectFkTablesState(mcmp.MySQLConn), - vitessState: collectFkTablesState(mcmp.VtConn), - } - return + switch fz.queryFormat { + case SQLQueries, PreparedStatmentQueries: + if fz.generateAndExecuteStatementQuery(t, mcmp) { + return + } + case PreparedStatementPacket: + if fz.generateAndExecutePreparedPacketQuery(t, mysqlDb, vitessDb, mcmp) { + return + } + default: + panic("Unknown query format") + } + + } +} + +// generateAndExecuteStatementQuery generates a query and runs it on Vitess (and possibly MySQL). +// In this function we send the queries to Vitess always using COM_QUERY packets. +// We handle 2 query formats in this function: +// 1. SQLQueries: DML queries are run as a single SQL query. +// 2. PreparedStatmentQueries: We execute a prepared statement as a SQL query, SET user defined variables and then Execute the DML. +func (fz *fuzzer) generateAndExecuteStatementQuery(t *testing.T, mcmp utils.MySQLCompare) (exit bool) { + // Get a query and execute it. + queries := fz.generateQuery() + // We get a set of queries only when we are using prepared statements, which require running `SET` queries before running the actual DML query. + for _, query := range queries { + // When the concurrency is 1, then we run the query both on MySQL and Vitess. + if fz.concurrency == 1 { + _, _ = mcmp.ExecAllowAndCompareError(query) + // If t is marked failed, we have encountered our first failure. + // Let's collect the required information and finish execution. + if t.Failed() { + fz.firstFailureInfo = &debugInfo{ + queryToFail: queries, + mysqlState: collectFkTablesState(mcmp.MySQLConn), + vitessState: collectFkTablesState(mcmp.VtConn), } - } else { - // When we are running concurrent threads, then we run all the queries on Vitess. - _, _ = utils.ExecAllowError(t, mcmp.VtConn, query) + return true + } + } else { + // When we are running concurrent threads, then we run all the queries on Vitess. + _, _ = utils.ExecAllowError(t, mcmp.VtConn, query) + } + } + return false +} + +// generateAndExecutePreparedPacketQuery generates a query and runs it on Vitess (and possibly MySQL). +// This function handles the query format PreparedStatementPacket. Here we send the prepared statement as a COM_STMT_PREPARE packet. +// Following which we execute it. To this end, we use the go-sql-driver. +func (fz *fuzzer) generateAndExecutePreparedPacketQuery(t *testing.T, mysqlDB *sql.DB, vitessDb *sql.DB, mcmp utils.MySQLCompare) bool { + query, params := fz.generateParameterizedQuery() + // When the concurrency is 1, then we run the query both on MySQL and Vitess. + if fz.concurrency == 1 { + // When the concurrency is 1, then we run the query both on MySQL and Vitess. + fz.execAndCompareMySQlAndVitess(t, mysqlDB, vitessDb, query, params) + // If t is marked failed, we have encountered our first failure. + // Let's collect the required information and finish execution. + if t.Failed() { + fz.firstFailureInfo = &debugInfo{ + queryToFail: []string{query}, + mysqlState: collectFkTablesState(mcmp.MySQLConn), + vitessState: collectFkTablesState(mcmp.VtConn), } + return true } + } else { + // When we are running concurrent threads, then we run all the queries on Vitess. + _, _ = vitessDb.Exec(query, params...) } + return false +} + +// execAndCompareMySQlAndVitess executes the given query with the parameters on MySQL and Vitess and compares their results. +func (fz *fuzzer) execAndCompareMySQlAndVitess(t *testing.T, mysqlDB *sql.DB, vitessDb *sql.DB, query string, params []any) { + mysqlRes, mysqlErr := mysqlDB.Exec(query, params...) + vtRes, vtErr := vitessDb.Exec(query, params...) + compareVitessAndMySQLErrors(t, vtErr, mysqlErr) + compareVitessAndMySQLResults(t, vtRes, mysqlRes) } // stop stops the fuzzer and waits for it to finish execution. @@ -323,6 +399,63 @@ func (fz *fuzzer) getPreparedUpdateQueries() []string { } } +// generateParameterizedQuery generates a parameterized query for the query format PreparedStatementPacket. +func (fz *fuzzer) generateParameterizedQuery() (query string, params []any) { + val := rand.Intn(fz.insertShare + fz.updateShare + fz.deleteShare) + if val < fz.insertShare { + return fz.generateParameterizedInsertQuery() + } + if val < fz.insertShare+fz.updateShare { + return fz.generateParameterizedUpdateQuery() + } + return fz.generateParameterizedDeleteQuery() +} + +// generateParameterizedInsertQuery generates a parameterized INSERT query for the query format PreparedStatementPacket. +func (fz *fuzzer) generateParameterizedInsertQuery() (query string, params []any) { + tableId := rand.Intn(len(fkTables)) + idValue := 1 + rand.Intn(fz.maxValForId) + tableName := fkTables[tableId] + if tableName == "fk_t20" { + colValue := rand.Intn(1 + fz.maxValForCol) + col2Value := rand.Intn(1 + fz.maxValForCol) + return fmt.Sprintf("insert into %v (id, col, col2) values (?, ?, ?)", tableName), []any{idValue, convertColValueToString(colValue), convertColValueToString(col2Value)} + } else if isMultiColFkTable(tableName) { + colaValue := rand.Intn(1 + fz.maxValForCol) + colbValue := rand.Intn(1 + fz.maxValForCol) + return fmt.Sprintf("insert into %v (id, cola, colb) values (?, ?, ?)", tableName), []any{idValue, convertColValueToString(colaValue), convertColValueToString(colbValue)} + } else { + colValue := rand.Intn(1 + fz.maxValForCol) + return fmt.Sprintf("insert into %v (id, col) values (?, ?)", tableName), []any{idValue, convertColValueToString(colValue)} + } +} + +// generateParameterizedUpdateQuery generates a parameterized UPDATE query for the query format PreparedStatementPacket. +func (fz *fuzzer) generateParameterizedUpdateQuery() (query string, params []any) { + tableId := rand.Intn(len(fkTables)) + idValue := 1 + rand.Intn(fz.maxValForId) + tableName := fkTables[tableId] + if tableName == "fk_t20" { + colValue := rand.Intn(1 + fz.maxValForCol) + col2Value := rand.Intn(1 + fz.maxValForCol) + return fmt.Sprintf("update %v set col = ?, col2 = ? where id = ?", tableName), []any{convertColValueToString(colValue), convertColValueToString(col2Value), idValue} + } else if isMultiColFkTable(tableName) { + colaValue := rand.Intn(1 + fz.maxValForCol) + colbValue := rand.Intn(1 + fz.maxValForCol) + return fmt.Sprintf("update %v set cola = ?, colb = ? where id = ?", tableName), []any{convertColValueToString(colaValue), convertColValueToString(colbValue), idValue} + } else { + colValue := rand.Intn(1 + fz.maxValForCol) + return fmt.Sprintf("update %v set col = ? where id = ?", tableName), []any{convertColValueToString(colValue), idValue} + } +} + +// generateParameterizedDeleteQuery generates a parameterized DELETE query for the query format PreparedStatementPacket. +func (fz *fuzzer) generateParameterizedDeleteQuery() (query string, params []any) { + tableId := rand.Intn(len(fkTables)) + idValue := 1 + rand.Intn(fz.maxValForId) + return fmt.Sprintf("delete from %v where id = ?", fkTables[tableId]), []any{idValue} +} + // TestFkFuzzTest is a fuzzer test that works by querying the database concurrently. // We have a pre-written set of query templates that we will use, but the data in the queries will // be randomly generated. The intent is that we hammer the database as a real-world application would diff --git a/go/test/endtoend/vtgate/foreignkey/utils_test.go b/go/test/endtoend/vtgate/foreignkey/utils_test.go index e8e66df5c45..5e0b4a8a3cc 100644 --- a/go/test/endtoend/vtgate/foreignkey/utils_test.go +++ b/go/test/endtoend/vtgate/foreignkey/utils_test.go @@ -17,6 +17,7 @@ limitations under the License. package foreignkey import ( + "database/sql" "fmt" "strings" "testing" @@ -110,3 +111,34 @@ func checkReplicationHealthy(t *testing.T, vttablet *cluster.Vttablet) { require.Equal(t, "Yes", sqlThreadRunning, "SQL Thread isn't happy on %v, Replica status - %v", vttablet.Alias, rs.Rows) require.Equal(t, "Yes", ioThreadRunning, "IO Thread isn't happy on %v, Replica status - %v", vttablet.Alias, rs.Rows) } + +// compareVitessAndMySQLResults compares Vitess and MySQL results and reports if they don't report the same number of rows affected. +func compareVitessAndMySQLResults(t *testing.T, vtRes sql.Result, mysqlRes sql.Result) { + if vtRes == nil && mysqlRes == nil { + return + } + if vtRes == nil { + t.Error("Vitess result is 'nil' while MySQL's is not.") + return + } + if mysqlRes == nil { + t.Error("MySQL result is 'nil' while Vitess' is not.") + return + } + vtRa, err := vtRes.RowsAffected() + require.NoError(t, err) + mysqlRa, err := mysqlRes.RowsAffected() + require.NoError(t, err) + if mysqlRa != vtRa { + t.Errorf("Vitess and MySQL don't agree on the rows affected. Vitess rows affected - %v, MySQL rows affected - %v", vtRa, mysqlRa) + } +} + +// compareVitessAndMySQLErrors compares Vitess and MySQL errors and reports if one errors and the other doesn't. +func compareVitessAndMySQLErrors(t *testing.T, vtErr, mysqlErr error) { + if vtErr != nil && mysqlErr != nil || vtErr == nil && mysqlErr == nil { + return + } + out := fmt.Sprintf("Vitess and MySQL are not erroring the same way.\nVitess error: %v\nMySQL error: %v", vtErr, mysqlErr) + t.Error(out) +} From aa150c33db0b33db89c93a8e5932e6d8e1f0588a Mon Sep 17 00:00:00 2001 From: Manan Gupta Date: Mon, 25 Sep 2023 13:53:19 +0530 Subject: [PATCH 10/10] feat: move foreign key tests into a single package Signed-off-by: Manan Gupta --- .../workflows/cluster_endtoend_vtgate_foreignkey_stress.yml | 2 +- test/config.json | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/workflows/cluster_endtoend_vtgate_foreignkey_stress.yml b/.github/workflows/cluster_endtoend_vtgate_foreignkey_stress.yml index d44389d1497..5a186d11086 100644 --- a/.github/workflows/cluster_endtoend_vtgate_foreignkey_stress.yml +++ b/.github/workflows/cluster_endtoend_vtgate_foreignkey_stress.yml @@ -71,7 +71,7 @@ jobs: if: steps.skip-workflow.outputs.skip-workflow == 'false' && steps.changes.outputs.end_to_end == 'true' uses: actions/setup-go@v4 with: - go-version: 1.21.0 + go-version: 1.21.1 - name: Set up python if: steps.skip-workflow.outputs.skip-workflow == 'false' && steps.changes.outputs.end_to_end == 'true' diff --git a/test/config.json b/test/config.json index 15907216cd7..595b47c1cda 100644 --- a/test/config.json +++ b/test/config.json @@ -856,8 +856,8 @@ "Args": ["vitess.io/vitess/go/test/endtoend/vtgate/foreignkey"], "Command": [], "Manual": false, - "Shard": "vtgate_gen4", - "RetryMax": 2, + "Shard": "vtgate_foreignkey_stress", + "RetryMax": 1, "Tags": [] }, "vtgate_foreignkey_stress": {