Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 57 additions & 23 deletions go/test/endtoend/vtgate/foreignkey/stress/fk_stress_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"path"
"strings"
"sync"
"sync/atomic"
"testing"
"time"

Expand All @@ -34,6 +33,7 @@ import (
"golang.org/x/exp/slices"

"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/mysql/replication"
"vitess.io/vitess/go/mysql/sqlerror"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/test/endtoend/cluster"
Expand Down Expand Up @@ -308,6 +308,7 @@ func TestMain(m *testing.M) {
"--heartbeat_enable",
"--heartbeat_interval", "250ms",
"--heartbeat_on_demand_duration", "5s",
"--migration_check_interval", "5s",
"--watch_replication_stream",
"--vreplication_tablet_type", "primary",
}
Expand Down Expand Up @@ -376,8 +377,39 @@ func tabletTestName(t *testing.T, tablet *cluster.Vttablet) string {
return ""
}

func getTabletPosition(t *testing.T, tablet *cluster.Vttablet) replication.Position {
rs := queryTablet(t, tablet, "select @@gtid_executed as gtid_executed", "")
row := rs.Named().Row()
require.NotNil(t, row)
gtidExecuted := row.AsString("gtid_executed", "")
require.NotEmpty(t, gtidExecuted)
pos, err := replication.DecodePositionDefaultFlavor(gtidExecuted, replication.Mysql56FlavorID)
assert.NoError(t, err)
return pos
}

func waitForReplicaCatchup(t *testing.T) {
cluster.WaitForReplicationPos(t, primary, replica, true, time.Minute)
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
primaryPos := getTabletPosition(t, primary)
for {
replicaPos := getTabletPosition(t, replica)
if replicaPos.GTIDSet.Contains(primaryPos.GTIDSet) {
// success
return
}
if !cluster.ValidateReplicationIsHealthy(t, replica) {
assert.FailNow(t, "replication is broken; not waiting for catchup")
return
}
select {
case <-ctx.Done():
assert.FailNow(t, "timeout waiting for replica to catch up")
return
case <-time.After(time.Second):
//
}
}
}

func validateMetrics(t *testing.T, tcase *testCase) {
Expand Down Expand Up @@ -421,8 +453,8 @@ func TestInitialSetup(t *testing.T) {

if val, present := os.LookupEnv("GITHUB_ACTIONS"); present && val != "" {
// This is the place to fine tune the stress parameters if GitHub actions are too slow
maxConcurrency = maxConcurrency * 1
singleConnectionSleepInterval = singleConnectionSleepInterval * 1
maxConcurrency = maxConcurrency / 2
singleConnectionSleepInterval = singleConnectionSleepInterval * 2
}
t.Logf("==== test setup: maxConcurrency=%v, singleConnectionSleepInterval=%v", maxConcurrency, singleConnectionSleepInterval)
}
Expand All @@ -440,6 +472,7 @@ type testCase struct {
// - Either one of ON UPDATE actions
// - Potentially running an Online DDL on an indicated table (this will not work in Vanilla MySQL, see https://vitess.io/blog/2021-06-15-online-ddl-why-no-fk/)
func ExecuteFKTest(t *testing.T, tcase *testCase) {
t.Logf("==== test setup: maxConcurrency=%v, singleConnectionSleepInterval=%v", maxConcurrency, singleConnectionSleepInterval)
workloadName := "static data"
if tcase.workload {
workloadName = "workload"
Expand All @@ -449,8 +482,7 @@ func ExecuteFKTest(t *testing.T, tcase *testCase) {
testName = fmt.Sprintf("%s/ddl=%s", testName, tcase.onlineDDLTable)
}
t.Run(testName, func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ctx := context.Background()

t.Run("create schema", func(t *testing.T) {
createInitialSchema(t, tcase)
Expand All @@ -460,6 +492,9 @@ func ExecuteFKTest(t *testing.T, tcase *testCase) {
})
if tcase.workload {
t.Run("workload", func(t *testing.T) {
ctx, cancel := context.WithTimeout(ctx, workloadDuration)
defer cancel()

var wg sync.WaitGroup
for _, workloadTable := range []string{parentTableName, childTableName, child2TableName, grandchildTableName} {
wg.Add(1)
Expand All @@ -468,7 +503,6 @@ func ExecuteFKTest(t *testing.T, tcase *testCase) {
runMultipleConnections(ctx, t, tbl)
}(workloadTable)
}
timer := time.NewTimer(workloadDuration)

if tcase.onlineDDLTable != "" {
t.Run("migrating", func(t *testing.T) {
Expand All @@ -494,9 +528,6 @@ func ExecuteFKTest(t *testing.T, tcase *testCase) {
})
})
}

<-timer.C
cancel() // will cause runMultipleConnections() to terminate
wg.Wait()
})
}
Expand Down Expand Up @@ -524,7 +555,9 @@ func TestStressFK(t *testing.T) {
})

runOnlineDDL := false

if val, present := os.LookupEnv("FK_STRESS_ONLINE_DDL"); present && val != "" {
runOnlineDDL = true
}
// Without workload ; with workload
for _, workload := range []bool{false, true} {
// For any type of ON DELETE action
Expand All @@ -542,6 +575,10 @@ func TestStressFK(t *testing.T) {
}

if runOnlineDDL {
// Foreign keys introduce some overhead. We reduce concurrency so that GitHub CI can accommodate.
maxConcurrency = maxConcurrency * 4 / 5
singleConnectionSleepInterval = singleConnectionSleepInterval * 2

// Running Online DDL on all test tables. We don't use all of the combinations
// presented above; we will run with workload, and suffice with same ON DELETE - ON UPDATE actions.
for _, action := range referenceActions {
Expand Down Expand Up @@ -877,7 +914,7 @@ func generateDelete(t *testing.T, tableName string, conn *mysql.Conn) error {
return err
}

func runSingleConnection(ctx context.Context, t *testing.T, tableName string, done *int64) {
func runSingleConnection(ctx context.Context, t *testing.T, tableName string) {
log.Infof("Running single connection on %s", tableName)
conn, err := mysql.Connect(ctx, &vtParams)
require.Nil(t, err)
Expand All @@ -889,10 +926,6 @@ func runSingleConnection(ctx context.Context, t *testing.T, tableName string, do
require.Nil(t, err)

for {
if atomic.LoadInt64(done) == 1 {
log.Infof("Terminating single connection")
return
}
switch rand.Int31n(3) {
case 0:
_ = generateInsert(t, tableName, conn)
Expand All @@ -901,26 +934,27 @@ func runSingleConnection(ctx context.Context, t *testing.T, tableName string, do
case 2:
_ = generateDelete(t, tableName, conn)
}
time.Sleep(singleConnectionSleepInterval)
select {
case <-ctx.Done():
log.Infof("Terminating single connection")
return
case <-time.After(singleConnectionSleepInterval):
}
}
}

func runMultipleConnections(ctx context.Context, t *testing.T, tableName string) {
log.Infof("Running multiple connections")
var done int64
var wg sync.WaitGroup
for i := 0; i < maxConcurrency; i++ {
wg.Add(1)
go func() {
defer wg.Done()
runSingleConnection(ctx, t, tableName, &done)
runSingleConnection(ctx, t, tableName)
}()
}
<-ctx.Done()
atomic.StoreInt64(&done, 1)
log.Infof("Running multiple connections: done")
wg.Wait()
log.Infof("All connections cancelled")
log.Infof("Running multiple connections: done")
}

func wrapWithNoFKChecks(sql string) string {
Expand Down