diff --git a/go/test/utils/noleak.go b/go/test/utils/noleak.go index 41e1a42b960..ea8dc10513d 100644 --- a/go/test/utils/noleak.go +++ b/go/test/utils/noleak.go @@ -72,6 +72,8 @@ func ensureNoLeaks() error { func ensureNoGoroutines() error { var ignored = []goleak.Option{ + goleak.IgnoreTopFunction("internal/synctest.Run"), + goleak.IgnoreTopFunction("testing/synctest.testingSynctestTest"), goleak.IgnoreTopFunction("github.com/golang/glog.(*fileSink).flushDaemon"), goleak.IgnoreTopFunction("github.com/golang/glog.(*loggingT).flushDaemon"), goleak.IgnoreTopFunction("vitess.io/vitess/go/vt/dbconfigs.init.0.func1"), diff --git a/go/vt/vtgate/executor_scatter_stats_test.go b/go/vt/vtgate/executor_scatter_stats_test.go index 487b2ea4df6..115b35aa36c 100644 --- a/go/vt/vtgate/executor_scatter_stats_test.go +++ b/go/vt/vtgate/executor_scatter_stats_test.go @@ -19,7 +19,7 @@ package vtgate import ( "net/http/httptest" "testing" - "time" + "testing/synctest" "github.com/stretchr/testify/require" @@ -53,29 +53,31 @@ func TestScatterStatsWithSingleScatterQuery(t *testing.T) { } func TestScatterStatsHttpWriting(t *testing.T) { - executor, _, _, _, ctx := createExecutorEnv(t) - session := econtext.NewSafeSession(&vtgatepb.Session{TargetString: "@primary"}) + synctest.Test(t, func(t *testing.T) { + executor, _, _, _, ctx := createExecutorEnv(t) + session := econtext.NewSafeSession(&vtgatepb.Session{TargetString: "@primary"}) - _, err := executorExecSession(ctx, executor, session, "select * from user", nil) - require.NoError(t, err) + _, err := executorExecSession(ctx, executor, session, "select * from user", nil) + require.NoError(t, err) - _, err = executorExecSession(ctx, executor, session, "select * from user where Id = 15", nil) - require.NoError(t, err) + _, err = executorExecSession(ctx, executor, session, "select * from user where Id = 15", nil) + require.NoError(t, err) - _, err = executorExecSession(ctx, executor, session, "select * from user where Id > 15", nil) - require.NoError(t, err) + _, err = executorExecSession(ctx, executor, session, "select * from user where Id > 15", nil) + require.NoError(t, err) - query4 := "select * from user as u1 join user as u2 on u1.Id = u2.Id" - _, err = executorExecSession(ctx, executor, session, query4, nil) - require.NoError(t, err) + query4 := "select * from user as u1 join user as u2 on u1.Id = u2.Id" + _, err = executorExecSession(ctx, executor, session, query4, nil) + require.NoError(t, err) - time.Sleep(500 * time.Millisecond) + synctest.Wait() - recorder := httptest.NewRecorder() - executor.WriteScatterStats(recorder) + recorder := httptest.NewRecorder() + executor.WriteScatterStats(recorder) - // Here we are checking that the template was executed correctly. - // If it wasn't, instead of html, we'll get an error message - require.Contains(t, recorder.Body.String(), "select * from `user` as u1 join `user` as u2 on u1.Id = u2.Id") - require.NoError(t, err) + // Here we are checking that the template was executed correctly. + // If it wasn't, instead of html, we'll get an error message + require.Contains(t, recorder.Body.String(), "select * from `user` as u1 join `user` as u2 on u1.Id = u2.Id") + require.NoError(t, err) + }) } diff --git a/go/vt/vtgate/executor_select_test.go b/go/vt/vtgate/executor_select_test.go index 1a51f7c3587..87bd2759aaa 100644 --- a/go/vt/vtgate/executor_select_test.go +++ b/go/vt/vtgate/executor_select_test.go @@ -25,6 +25,7 @@ import ( "strconv" "strings" "testing" + "testing/synctest" "time" "github.com/google/go-cmp/cmp" @@ -3484,46 +3485,50 @@ func TestSelectFromInformationSchema(t *testing.T) { } func TestStreamOrderByWithMultipleResults(t *testing.T) { - ctx := utils.LeakCheckContext(t) - - // Special setup: Don't use createExecutorEnv. - cell := "aa" - hc := discovery.NewFakeHealthCheck(nil) - u := createSandbox(KsTestUnsharded) - s := createSandbox(KsTestSharded) - s.VSchema = executorVSchema - u.VSchema = unshardedVSchema - serv := newSandboxForCells(ctx, []string{cell}) - resolver := newTestResolver(ctx, hc, serv, cell) - shards := []string{"-20", "20-40", "40-60", "60-80", "80-a0", "a0-c0", "c0-e0", "e0-"} - count := 1 - for _, shard := range shards { - sbc := hc.AddTestTablet(cell, shard, 1, "TestExecutor", shard, topodatapb.TabletType_PRIMARY, true, 1, nil) - sbc.SetResults([]*sqltypes.Result{ - sqltypes.MakeTestResult(sqltypes.MakeTestFields("id|col|weight_string(id)", "int32|int32|varchar"), fmt.Sprintf("%d|%d|NULL", count, count)), - sqltypes.MakeTestResult(sqltypes.MakeTestFields("id|col|weight_string(id)", "int32|int32|varchar"), fmt.Sprintf("%d|%d|NULL", count+10, count)), - }) - count++ - } - queryLogger := streamlog.New[*logstats.LogStats]("VTGate", queryLogBufferSize) - plans := DefaultPlanCache() - executor := NewExecutor(ctx, vtenv.NewTestEnv(), serv, cell, resolver, createExecutorConfigWithNormalizer(), false, plans, nil, querypb.ExecuteOptions_Gen4, NewDynamicViperConfig()) - executor.SetQueryLogger(queryLogger) - defer executor.Close() - // some sleep for all goroutines to start - time.Sleep(100 * time.Millisecond) - before := runtime.NumGoroutine() + synctest.Test(t, func(t *testing.T) { + ctx := utils.LeakCheckContext(t) - query := "select id, col from user order by id" - gotResult, err := executorStream(ctx, executor, query) - require.NoError(t, err) - - wantResult := sqltypes.MakeTestResult(sqltypes.MakeTestFields("id|col", "int32|int32"), - "1|1", "2|2", "3|3", "4|4", "5|5", "6|6", "7|7", "8|8", "11|1", "12|2", "13|3", "14|4", "15|5", "16|6", "17|7", "18|8") - assert.Equal(t, fmt.Sprintf("%v", wantResult.Rows), fmt.Sprintf("%v", gotResult.Rows)) - // some sleep to close all goroutines. - time.Sleep(100 * time.Millisecond) - assert.GreaterOrEqual(t, before, runtime.NumGoroutine(), "left open goroutines lingering") + // Special setup: Don't use createExecutorEnv. + cell := "aa" + hc := discovery.NewFakeHealthCheck(nil) + u := createSandbox(KsTestUnsharded) + s := createSandbox(KsTestSharded) + s.VSchema = executorVSchema + u.VSchema = unshardedVSchema + serv := newSandboxForCells(ctx, []string{cell}) + resolver := newTestResolver(ctx, hc, serv, cell) + shards := []string{"-20", "20-40", "40-60", "60-80", "80-a0", "a0-c0", "c0-e0", "e0-"} + count := 1 + for _, shard := range shards { + sbc := hc.AddTestTablet(cell, shard, 1, "TestExecutor", shard, topodatapb.TabletType_PRIMARY, true, 1, nil) + sbc.SetResults([]*sqltypes.Result{ + sqltypes.MakeTestResult(sqltypes.MakeTestFields("id|col|weight_string(id)", "int32|int32|varchar"), fmt.Sprintf("%d|%d|NULL", count, count)), + sqltypes.MakeTestResult(sqltypes.MakeTestFields("id|col|weight_string(id)", "int32|int32|varchar"), fmt.Sprintf("%d|%d|NULL", count+10, count)), + }) + count++ + } + queryLogger := streamlog.New[*logstats.LogStats]("VTGate", queryLogBufferSize) + plans := DefaultPlanCache() + executor := NewExecutor(ctx, vtenv.NewTestEnv(), serv, cell, resolver, createExecutorConfigWithNormalizer(), false, plans, nil, querypb.ExecuteOptions_Gen4, NewDynamicViperConfig()) + executor.SetQueryLogger(queryLogger) + defer executor.Close() + + // some sleep for all goroutines to start + synctest.Wait() + before := runtime.NumGoroutine() + + query := "select id, col from user order by id" + gotResult, err := executorStream(ctx, executor, query) + require.NoError(t, err) + + wantResult := sqltypes.MakeTestResult(sqltypes.MakeTestFields("id|col", "int32|int32"), + "1|1", "2|2", "3|3", "4|4", "5|5", "6|6", "7|7", "8|8", "11|1", "12|2", "13|3", "14|4", "15|5", "16|6", "17|7", "18|8") + assert.Equal(t, fmt.Sprintf("%v", wantResult.Rows), fmt.Sprintf("%v", gotResult.Rows)) + + // some sleep to close all goroutines. + synctest.Wait() + assert.GreaterOrEqual(t, before, runtime.NumGoroutine(), "left open goroutines lingering") + }) } func TestStreamOrderByLimitWithMultipleResults(t *testing.T) { diff --git a/go/vt/vtgate/executor_test.go b/go/vt/vtgate/executor_test.go index f27257344d0..0c7c1e46b79 100644 --- a/go/vt/vtgate/executor_test.go +++ b/go/vt/vtgate/executor_test.go @@ -28,6 +28,7 @@ import ( "sort" "strings" "testing" + "testing/synctest" "time" "unsafe" @@ -1578,17 +1579,19 @@ func TestExecutorUnrecognized(t *testing.T) { } func TestExecutorDeniedErrorNoBuffer(t *testing.T) { - executor, sbc1, _, _, ctx := createExecutorEnv(t) - sbc1.EphemeralShardErr = errors.New("enforce denied tables") - - vschemaWaitTimeout = 500 * time.Millisecond - - session := econtext.NewAutocommitSession(&vtgatepb.Session{TargetString: "@primary"}) - startExec := time.Now() - _, err := executorExecSession(ctx, executor, session, "select * from user", nil) - require.NoError(t, err, "enforce denied tables not buffered") - endExec := time.Now() - require.GreaterOrEqual(t, endExec.Sub(startExec).Milliseconds(), int64(500)) + synctest.Test(t, func(t *testing.T) { + executor, sbc1, _, _, ctx := createExecutorEnv(t) + sbc1.EphemeralShardErr = errors.New("enforce denied tables") + + vschemaWaitTimeout = 500 * time.Millisecond + + session := econtext.NewAutocommitSession(&vtgatepb.Session{TargetString: "@primary"}) + startExec := time.Now() + _, err := executorExecSession(ctx, executor, session, "select * from user", nil) + require.NoError(t, err, "enforce denied tables not buffered") + endExec := time.Now() + require.GreaterOrEqual(t, endExec.Sub(startExec).Milliseconds(), int64(500)) + }) } // TestVSchemaStats makes sure the building and displaying of the diff --git a/go/vt/vtgate/plugin_mysql_server_test.go b/go/vt/vtgate/plugin_mysql_server_test.go index aaeca651882..4891fc3461f 100644 --- a/go/vt/vtgate/plugin_mysql_server_test.go +++ b/go/vt/vtgate/plugin_mysql_server_test.go @@ -25,7 +25,7 @@ import ( "strings" "syscall" "testing" - "time" + "testing/synctest" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -298,35 +298,39 @@ func TestInitTLSConfigWithServerCA(t *testing.T) { } func testInitTLSConfig(t *testing.T, serverCA bool) { - // Create the certs. - ctx := utils.LeakCheckContext(t) - - root := t.TempDir() - tlstest.CreateCA(root) - tlstest.CreateCRL(root, tlstest.CA) - tlstest.CreateSignedCert(root, tlstest.CA, "01", "server", "server.example.com") + synctest.Test(t, func(t *testing.T) { + // Create the certs. + ctx := utils.LeakCheckContext(t) + + root := t.TempDir() + tlstest.CreateCA(root) + tlstest.CreateCRL(root, tlstest.CA) + tlstest.CreateSignedCert(root, tlstest.CA, "01", "server", "server.example.com") + + serverCACert := "" + if serverCA { + serverCACert = path.Join(root, "ca-cert.pem") + } - serverCACert := "" - if serverCA { - serverCACert = path.Join(root, "ca-cert.pem") - } + srv := &mysqlServer{tcpListener: &mysql.Listener{}} + if err := initTLSConfig(ctx, srv, path.Join(root, "server-cert.pem"), path.Join(root, "server-key.pem"), path.Join(root, "ca-cert.pem"), path.Join(root, "ca-crl.pem"), serverCACert, true, tls.VersionTLS12); err != nil { + t.Fatalf("init tls config failure due to: +%v", err) + } - srv := &mysqlServer{tcpListener: &mysql.Listener{}} - if err := initTLSConfig(ctx, srv, path.Join(root, "server-cert.pem"), path.Join(root, "server-key.pem"), path.Join(root, "ca-cert.pem"), path.Join(root, "ca-crl.pem"), serverCACert, true, tls.VersionTLS12); err != nil { - t.Fatalf("init tls config failure due to: +%v", err) - } + serverConfig := srv.tcpListener.TLSConfig.Load() + if serverConfig == nil { + t.Fatalf("init tls config shouldn't create nil server config") + } - serverConfig := srv.tcpListener.TLSConfig.Load() - if serverConfig == nil { - t.Fatalf("init tls config shouldn't create nil server config") - } + srv.sigChan <- syscall.SIGHUP - srv.sigChan <- syscall.SIGHUP - time.Sleep(100 * time.Millisecond) // wait for signal handler + // wait for signal handler + synctest.Wait() - if srv.tcpListener.TLSConfig.Load() == serverConfig { - t.Fatalf("init tls config should have been recreated after SIGHUP") - } + if srv.tcpListener.TLSConfig.Load() == serverConfig { + t.Fatalf("init tls config should have been recreated after SIGHUP") + } + }) } // TestKillMethods test the mysql plugin for kill method calls. diff --git a/go/vt/vtgate/tabletgateway_flaky_test.go b/go/vt/vtgate/tabletgateway_flaky_test.go index 760c0dba9da..ca4c8449101 100644 --- a/go/vt/vtgate/tabletgateway_flaky_test.go +++ b/go/vt/vtgate/tabletgateway_flaky_test.go @@ -18,6 +18,7 @@ package vtgate import ( "testing" + "testing/synctest" "time" econtext "vitess.io/vitess/go/vt/vtgate/executorcontext" @@ -135,133 +136,137 @@ func TestGatewayBufferingWhenPrimarySwitchesServingState(t *testing.T) { } } +// TestGatewayBufferingWhileReparenting is used to test that the buffering mechanism buffers the queries when a PRS happens +// the healthchecks that happen during a PRS are simulated in this test // TestGatewayBufferingWhileReparenting is used to test that the buffering mechanism buffers the queries when a PRS happens // the healthchecks that happen during a PRS are simulated in this test func TestGatewayBufferingWhileReparenting(t *testing.T) { - ctx := utils.LeakCheckContext(t) - - buffer.SetBufferingModeInTestingEnv(true) - defer func() { - buffer.SetBufferingModeInTestingEnv(false) - }() - - keyspace := "ks1" - shard := "-80" - tabletType := topodatapb.TabletType_PRIMARY - host := "1.1.1.1" - hostReplica := "1.1.1.2" - port := int32(1001) - portReplica := int32(1002) - target := &querypb.Target{ - Keyspace: keyspace, - Shard: shard, - TabletType: tabletType, - } - - ts := &econtext.FakeTopoServer{} - // create a new fake health check. We want to check the buffering code which uses Subscribe, so we must also pass a channel - hc := discovery.NewFakeHealthCheck(make(chan *discovery.TabletHealth)) - // create a new tablet gateway - tg := NewTabletGateway(ctx, hc, ts, "cell") - defer tg.Close(ctx) - - // add a primary tablet which is serving - sbc := hc.AddTestTablet("cell", host, port, keyspace, shard, tabletType, true, 10, nil) - // also add a replica which is serving - sbcReplica := hc.AddTestTablet("cell", hostReplica, portReplica, keyspace, shard, topodatapb.TabletType_REPLICA, true, 0, nil) - - // add a result to the sandbox connection - sqlResult1 := &sqltypes.Result{ - Fields: []*querypb.Field{{ - Name: "col1", - Type: sqltypes.VarChar, - Charset: uint32(collations.MySQL8().DefaultConnectionCharset()), - }}, - RowsAffected: 1, - Rows: [][]sqltypes.Value{{ - sqltypes.MakeTrusted(sqltypes.VarChar, []byte("bb")), - }}, - } - sbc.SetResults([]*sqltypes.Result{sqlResult1}) - - // run a query that we indeed get the result added to the sandbox connection back - // this also checks that the query reaches the primary tablet and not the replica - res, err := tg.Execute(ctx, nil, target, "query", nil, 0, 0, nil) - require.NoError(t, err) - require.Equal(t, res, sqlResult1) - - // get the primary and replica tablet from the fake health check - tablets := hc.GetAllTablets() - var primaryTablet *topodatapb.Tablet - var replicaTablet *topodatapb.Tablet - - for _, tablet := range tablets { - if tablet.Type == topodatapb.TabletType_PRIMARY { - primaryTablet = tablet - } else { - replicaTablet = tablet + synctest.Test(t, func(t *testing.T) { + ctx := utils.LeakCheckContext(t) + + buffer.SetBufferingModeInTestingEnv(true) + defer func() { + buffer.SetBufferingModeInTestingEnv(false) + }() + + keyspace := "ks1" + shard := "-80" + tabletType := topodatapb.TabletType_PRIMARY + host := "1.1.1.1" + hostReplica := "1.1.1.2" + port := int32(1001) + portReplica := int32(1002) + target := &querypb.Target{ + Keyspace: keyspace, + Shard: shard, + TabletType: tabletType, } - } - require.NotNil(t, primaryTablet) - require.NotNil(t, replicaTablet) - // broadcast its state initially - hc.Broadcast(primaryTablet) - // set the serving type for the primary tablet false and broadcast it so that the buffering code registers this change - hc.SetServing(primaryTablet, false) - // We call the broadcast twice to ensure that the change has been processed by the keyspace event watcher. - // The second broadcast call is blocking until the first one has been processed. - hc.Broadcast(primaryTablet) - hc.Broadcast(primaryTablet) - - require.Len(t, tg.hc.GetHealthyTabletStats(target), 0, "GetHealthyTabletStats has tablets even though it shouldn't") - _, shouldStartBuffering := tg.kev.ShouldStartBufferingForTarget(ctx, target) - require.True(t, shouldStartBuffering) - - // add a result to the sandbox connection of the new primary - sbcReplica.SetResults([]*sqltypes.Result{sqlResult1}) - - // execute the query in a go routine since it should be buffered, and check that it eventually succeed - queryChan := make(chan struct{}) - go func() { - res, err = tg.Execute(ctx, nil, target, "query", nil, 0, 0, nil) - queryChan <- struct{}{} - }() + ts := &econtext.FakeTopoServer{} + // create a new fake health check. We want to check the buffering code which uses Subscribe, so we must also pass a channel + hc := discovery.NewFakeHealthCheck(make(chan *discovery.TabletHealth)) + // create a new tablet gateway + tg := NewTabletGateway(ctx, hc, ts, "cell") + defer tg.Close(ctx) + + // add a primary tablet which is serving + sbc := hc.AddTestTablet("cell", host, port, keyspace, shard, tabletType, true, 10, nil) + // also add a replica which is serving + sbcReplica := hc.AddTestTablet("cell", hostReplica, portReplica, keyspace, shard, topodatapb.TabletType_REPLICA, true, 0, nil) + + // add a result to the sandbox connection + sqlResult1 := &sqltypes.Result{ + Fields: []*querypb.Field{{ + Name: "col1", + Type: sqltypes.VarChar, + Charset: uint32(collations.MySQL8().DefaultConnectionCharset()), + }}, + RowsAffected: 1, + Rows: [][]sqltypes.Value{{ + sqltypes.MakeTrusted(sqltypes.VarChar, []byte("bb")), + }}, + } + sbc.SetResults([]*sqltypes.Result{sqlResult1}) - // set the serving type for the new primary tablet true and broadcast it so that the buffering code registers this change - // this should stop the buffering and the query executed in the go routine should work. This should be done with some delay so - // that we know that the query was buffered - time.Sleep(1 * time.Second) - // change the tablets types to simulate a PRS. - hc.SetTabletType(primaryTablet, topodatapb.TabletType_REPLICA) - hc.Broadcast(primaryTablet) - hc.SetTabletType(replicaTablet, topodatapb.TabletType_PRIMARY) - hc.SetPrimaryTimestamp(replicaTablet, 100) // We set a higher timestamp than before to simulate a PRS. - hc.SetServing(replicaTablet, true) - hc.Broadcast(replicaTablet) - - timeout := time.After(1 * time.Minute) -outer: - for { - select { - case <-timeout: - require.Fail(t, "timed out - could not verify the new primary") - case <-time.After(10 * time.Millisecond): - newPrimary, shouldBuffer := tg.kev.ShouldStartBufferingForTarget(ctx, target) - if newPrimary != nil && newPrimary.Uid == replicaTablet.Alias.Uid && !shouldBuffer { - break outer + // run a query that we indeed get the result added to the sandbox connection back + // this also checks that the query reaches the primary tablet and not the replica + res, err := tg.Execute(ctx, nil, target, "query", nil, 0, 0, nil) + require.NoError(t, err) + require.Equal(t, res, sqlResult1) + + // get the primary and replica tablet from the fake health check + tablets := hc.GetAllTablets() + var primaryTablet *topodatapb.Tablet + var replicaTablet *topodatapb.Tablet + + for _, tablet := range tablets { + if tablet.Type == topodatapb.TabletType_PRIMARY { + primaryTablet = tablet + } else { + replicaTablet = tablet + } + } + require.NotNil(t, primaryTablet) + require.NotNil(t, replicaTablet) + + // broadcast its state initially + hc.Broadcast(primaryTablet) + // set the serving type for the primary tablet false and broadcast it so that the buffering code registers this change + hc.SetServing(primaryTablet, false) + // We call the broadcast twice to ensure that the change has been processed by the keyspace event watcher. + // The second broadcast call is blocking until the first one has been processed. + hc.Broadcast(primaryTablet) + hc.Broadcast(primaryTablet) + + require.Len(t, tg.hc.GetHealthyTabletStats(target), 0, "GetHealthyTabletStats has tablets even though it shouldn't") + _, shouldStartBuffering := tg.kev.ShouldStartBufferingForTarget(ctx, target) + require.True(t, shouldStartBuffering) + + // add a result to the sandbox connection of the new primary + sbcReplica.SetResults([]*sqltypes.Result{sqlResult1}) + + // execute the query in a go routine since it should be buffered, and check that it eventually succeed + queryChan := make(chan struct{}) + go func() { + res, err = tg.Execute(ctx, nil, target, "query", nil, 0, 0, nil) + queryChan <- struct{}{} + }() + + // set the serving type for the new primary tablet true and broadcast it so that the buffering code registers this change + // this should stop the buffering and the query executed in the go routine should work. This should be done with some delay so + // that we know that the query was buffered + time.Sleep(1 * time.Second) + // change the tablets types to simulate a PRS. + hc.SetTabletType(primaryTablet, topodatapb.TabletType_REPLICA) + hc.Broadcast(primaryTablet) + hc.SetTabletType(replicaTablet, topodatapb.TabletType_PRIMARY) + hc.SetPrimaryTimestamp(replicaTablet, 100) // We set a higher timestamp than before to simulate a PRS. + hc.SetServing(replicaTablet, true) + hc.Broadcast(replicaTablet) + + timeout := time.After(1 * time.Minute) + outer: + for { + select { + case <-timeout: + require.Fail(t, "timed out - could not verify the new primary") + case <-time.After(10 * time.Millisecond): + newPrimary, shouldBuffer := tg.kev.ShouldStartBufferingForTarget(ctx, target) + if newPrimary != nil && newPrimary.Uid == replicaTablet.Alias.Uid && !shouldBuffer { + break outer + } } } - } - // wait for the query to execute before checking for results - select { - case <-queryChan: - require.NoError(t, err) - require.Equal(t, sqlResult1, res) - case <-time.After(15 * time.Second): - t.Fatalf("timed out waiting for query to execute") - } + // wait for the query to execute before checking for results + select { + case <-queryChan: + require.NoError(t, err) + require.Equal(t, sqlResult1, res) + case <-time.After(15 * time.Second): + t.Fatalf("timed out waiting for query to execute") + } + }) } // TestInconsistentStateDetectedBuffering simulates the case where we have used up all our buffering retries and in the diff --git a/go/vt/vtgate/vstream_manager_test.go b/go/vt/vtgate/vstream_manager_test.go index 04487917f79..3ff64194f8a 100644 --- a/go/vt/vtgate/vstream_manager_test.go +++ b/go/vt/vtgate/vstream_manager_test.go @@ -25,6 +25,7 @@ import ( "strings" "sync" "testing" + "testing/synctest" "time" "github.com/stretchr/testify/assert" @@ -58,6 +59,7 @@ func TestVStreamSkew(t *testing.T) { time.Sleep(time.Duration(idx*100) * time.Millisecond) } } + type skewTestCase struct { numEventsPerShard int64 shard0idx, shard1idx int64 @@ -84,55 +86,57 @@ func TestVStreamSkew(t *testing.T) { cell := "aa" for idx, tcase := range tcases { t.Run("", func(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - ks := fmt.Sprintf("TestVStreamSkew-%d", idx) - _ = createSandbox(ks) - hc := discovery.NewFakeHealthCheck(nil) - st := getSandboxTopo(ctx, cell, ks, []string{"-20", "20-40"}) - vsm := newTestVStreamManager(ctx, hc, st, cell) - vgtid := &binlogdatapb.VGtid{ShardGtids: []*binlogdatapb.ShardGtid{}} - want := int64(0) - var sbc0, sbc1 *sandboxconn.SandboxConn - if tcase.shard0idx != 0 { - sbc0 = hc.AddTestTablet(cell, "1.1.1.1", 1001, ks, "-20", topodatapb.TabletType_PRIMARY, true, 1, nil) - addTabletToSandboxTopo(t, ctx, st, ks, "-20", sbc0.Tablet()) - sbc0.VStreamCh = make(chan *binlogdatapb.VEvent) - want += 2 * tcase.numEventsPerShard - vgtid.ShardGtids = append(vgtid.ShardGtids, &binlogdatapb.ShardGtid{Keyspace: ks, Gtid: "pos", Shard: "-20"}) - go stream(sbc0, ks, "-20", tcase.numEventsPerShard, tcase.shard0idx) - } - if tcase.shard1idx != 0 { - sbc1 = hc.AddTestTablet(cell, "1.1.1.1", 1002, ks, "20-40", topodatapb.TabletType_PRIMARY, true, 1, nil) - addTabletToSandboxTopo(t, ctx, st, ks, "20-40", sbc1.Tablet()) - sbc1.VStreamCh = make(chan *binlogdatapb.VEvent) - want += 2 * tcase.numEventsPerShard - vgtid.ShardGtids = append(vgtid.ShardGtids, &binlogdatapb.ShardGtid{Keyspace: ks, Gtid: "pos", Shard: "20-40"}) - go stream(sbc1, ks, "20-40", tcase.numEventsPerShard, tcase.shard1idx) - } + synctest.Test(t, func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + ks := fmt.Sprintf("TestVStreamSkew-%d", idx) + _ = createSandbox(ks) + hc := discovery.NewFakeHealthCheck(nil) + st := getSandboxTopo(ctx, cell, ks, []string{"-20", "20-40"}) + vsm := newTestVStreamManager(ctx, hc, st, cell) + vgtid := &binlogdatapb.VGtid{ShardGtids: []*binlogdatapb.ShardGtid{}} + want := int64(0) + var sbc0, sbc1 *sandboxconn.SandboxConn + if tcase.shard0idx != 0 { + sbc0 = hc.AddTestTablet(cell, "1.1.1.1", 1001, ks, "-20", topodatapb.TabletType_PRIMARY, true, 1, nil) + addTabletToSandboxTopo(t, ctx, st, ks, "-20", sbc0.Tablet()) + sbc0.VStreamCh = make(chan *binlogdatapb.VEvent) + want += 2 * tcase.numEventsPerShard + vgtid.ShardGtids = append(vgtid.ShardGtids, &binlogdatapb.ShardGtid{Keyspace: ks, Gtid: "pos", Shard: "-20"}) + go stream(sbc0, ks, "-20", tcase.numEventsPerShard, tcase.shard0idx) + } + if tcase.shard1idx != 0 { + sbc1 = hc.AddTestTablet(cell, "1.1.1.1", 1002, ks, "20-40", topodatapb.TabletType_PRIMARY, true, 1, nil) + addTabletToSandboxTopo(t, ctx, st, ks, "20-40", sbc1.Tablet()) + sbc1.VStreamCh = make(chan *binlogdatapb.VEvent) + want += 2 * tcase.numEventsPerShard + vgtid.ShardGtids = append(vgtid.ShardGtids, &binlogdatapb.ShardGtid{Keyspace: ks, Gtid: "pos", Shard: "20-40"}) + go stream(sbc1, ks, "20-40", tcase.numEventsPerShard, tcase.shard1idx) + } - vstreamCtx, vstreamCancel := context.WithTimeout(ctx, 1*time.Minute) - defer vstreamCancel() + vstreamCtx, vstreamCancel := context.WithTimeout(ctx, 1*time.Minute) + defer vstreamCancel() - receivedEvents := make([]*binlogdatapb.VEvent, 0) - err := vsm.VStream(vstreamCtx, topodatapb.TabletType_PRIMARY, vgtid, nil, &vtgatepb.VStreamFlags{MinimizeSkew: true}, func(events []*binlogdatapb.VEvent) error { - receivedEvents = append(receivedEvents, events...) + receivedEvents := make([]*binlogdatapb.VEvent, 0) + err := vsm.VStream(vstreamCtx, topodatapb.TabletType_PRIMARY, vgtid, nil, &vtgatepb.VStreamFlags{MinimizeSkew: true}, func(events []*binlogdatapb.VEvent) error { + receivedEvents = append(receivedEvents, events...) - if int64(len(receivedEvents)) == want { - // Stop streaming after receiving both expected responses. - vstreamCancel() - } + if int64(len(receivedEvents)) == want { + // Stop streaming after receiving both expected responses. + vstreamCancel() + } - return nil - }) + return nil + }) - require.Error(t, err) - require.ErrorIs(t, vterrors.UnwrapAll(err), context.Canceled) + require.Error(t, err) + require.ErrorIs(t, vterrors.UnwrapAll(err), context.Canceled) - require.Equal(t, int(want), int(len(receivedEvents))) - require.Equal(t, tcase.expectedDelays, vsm.GetTotalStreamDelay()-previousDelays) - previousDelays = vsm.GetTotalStreamDelay() + require.Equal(t, int(want), int(len(receivedEvents))) + require.Equal(t, tcase.expectedDelays, vsm.GetTotalStreamDelay()-previousDelays) + previousDelays = vsm.GetTotalStreamDelay() + }) }) } }