Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 130 additions & 4 deletions go/vt/vttablet/tabletmanager/rpc_replication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,12 @@ import (
"vitess.io/vitess/go/protoutil"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/mysqlctl"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/memorytopo"
"vitess.io/vitess/go/vt/vttablet/tabletmanager/semisyncmonitor"
"vitess.io/vitess/go/vt/vttablet/tabletserver"

topodatapb "vitess.io/vitess/go/vt/proto/topodata"
)

// TestWaitForGrantsToHaveApplied tests that waitForGrantsToHaveApplied only succeeds after waitForDBAGrants has been called.
Expand Down Expand Up @@ -130,12 +131,17 @@ func TestDemotePrimaryWaitingForSemiSyncUnblock(t *testing.T) {

tm.SemiSyncMonitor.Open()
// Add a universal insert query pattern that would block until we make it unblock.
// ExecuteFetchMulti will execute each statement separately, so we need to add SET query.
fakeDb.AddQueryPattern("SET SESSION lock_wait_timeout=.*", &sqltypes.Result{})
ch := make(chan int)
fakeDb.AddQueryPatternWithCallback("^INSERT INTO.*", sqltypes.MakeTestResult(nil), func(s string) {
<-ch
})
// Add a fake query that makes the semi-sync monitor believe that the tablet is blocked on semi-sync ACKs.
fakeDb.AddQuery("select variable_value from performance_schema.global_status where regexp_like(variable_name, 'Rpl_semi_sync_(source|master)_wait_sessions')", sqltypes.MakeTestResult(sqltypes.MakeTestFields("Variable_value", "varchar"), "1"))
fakeDb.AddQuery("SELECT /*+ MAX_EXECUTION_TIME(500) */ variable_name, variable_value FROM performance_schema.global_status WHERE REGEXP_LIKE(variable_name, 'Rpl_semi_sync_(source|master)_(wait_sessions|yes_tx)')", sqltypes.MakeTestResult(
sqltypes.MakeTestFields("variable_name|variable_value", "varchar|varchar"),
"Rpl_semi_sync_source_wait_sessions|1",
"Rpl_semi_sync_source_yes_tx|5"))

// Verify that in the beginning the tablet is serving.
require.True(t, tm.QueryServiceControl.IsServing())
Expand All @@ -160,7 +166,10 @@ func TestDemotePrimaryWaitingForSemiSyncUnblock(t *testing.T) {
require.False(t, fakeMysqlDaemon.SuperReadOnly.Load())

// Now we unblock the semi-sync monitor.
fakeDb.AddQuery("select variable_value from performance_schema.global_status where regexp_like(variable_name, 'Rpl_semi_sync_(source|master)_wait_sessions')", sqltypes.MakeTestResult(sqltypes.MakeTestFields("Variable_value", "varchar"), "0"))
fakeDb.AddQuery("SELECT /*+ MAX_EXECUTION_TIME(1000) */ variable_name, variable_value FROM performance_schema.global_status WHERE REGEXP_LIKE(variable_name, 'Rpl_semi_sync_(source|master)_(wait_sessions|yes_tx)')", sqltypes.MakeTestResult(
sqltypes.MakeTestFields("variable_name|variable_value", "varchar|varchar"),
"Rpl_semi_sync_source_wait_sessions|0",
"Rpl_semi_sync_source_yes_tx|5"))
close(ch)

// This should unblock the demote primary operation eventually.
Expand All @@ -171,6 +180,123 @@ func TestDemotePrimaryWaitingForSemiSyncUnblock(t *testing.T) {
require.True(t, fakeMysqlDaemon.SuperReadOnly.Load())
}

// TestDemotePrimaryWithSemiSyncProgressDetection tests that demote primary proceeds
// without blocking when transactions are making progress (ackedTrxs increasing between checks).
func TestDemotePrimaryWithSemiSyncProgressDetection(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
ts := memorytopo.NewServer(ctx, "cell1")
tm := newTestTM(t, ts, 1, "ks", "0", nil)
// Make the tablet a primary.
err := tm.ChangeType(ctx, topodatapb.TabletType_PRIMARY, false)
require.NoError(t, err)
fakeMysqlDaemon := tm.MysqlDaemon.(*mysqlctl.FakeMysqlDaemon)
fakeDb := fakeMysqlDaemon.DB()
fakeDb.SetNeverFail(true)

tm.SemiSyncMonitor.Open()

// Set up the query to show waiting sessions, but with progress (ackedTrxs increasing).
// The monitor makes TWO calls to getSemiSyncStats with a sleep between them.
// We add the query result multiple times. The fakesqldb will return them in order (FIFO).
// First few calls: waiting sessions present, ackedTrxs=5.
for range 3 {
fakeDb.AddQuery("SELECT /*+ MAX_EXECUTION_TIME(1000) */ variable_name, variable_value FROM performance_schema.global_status WHERE REGEXP_LIKE(variable_name, 'Rpl_semi_sync_(source|master)_(wait_sessions|yes_tx)')", sqltypes.MakeTestResult(
sqltypes.MakeTestFields("variable_name|variable_value", "varchar|varchar"),
"Rpl_semi_sync_source_wait_sessions|1",
"Rpl_semi_sync_source_yes_tx|5"))
}
// Next calls: waiting sessions present, but ackedTrxs=6 (progress!).
for range 10 {
fakeDb.AddQuery("SELECT /*+ MAX_EXECUTION_TIME(1000) */ variable_name, variable_value FROM performance_schema.global_status WHERE REGEXP_LIKE(variable_name, 'Rpl_semi_sync_(source|master)_(wait_sessions|yes_tx)')", sqltypes.MakeTestResult(
sqltypes.MakeTestFields("variable_name|variable_value", "varchar|varchar"),
"Rpl_semi_sync_source_wait_sessions|1",
"Rpl_semi_sync_source_yes_tx|6"))
}

// Verify that in the beginning the tablet is serving.
require.True(t, tm.QueryServiceControl.IsServing())

// Start the demote primary operation in a go routine.
var demotePrimaryFinished atomic.Bool
go func() {
_, err := tm.demotePrimary(ctx, false)
require.NoError(t, err)
demotePrimaryFinished.Store(true)
}()

// Wait for the demote primary operation to have changed the serving state.
require.Eventually(t, func() bool {
return !tm.QueryServiceControl.IsServing()
}, 5*time.Second, 100*time.Millisecond)

// DemotePrimary should finish quickly because progress is being made.
// It should NOT wait for semi-sync to unblock since ackedTrxs is increasing.
require.Eventually(t, func() bool {
return demotePrimaryFinished.Load()
}, 5*time.Second, 100*time.Millisecond)

// We should have seen the super-read only query.
require.True(t, fakeMysqlDaemon.SuperReadOnly.Load())
}

// TestDemotePrimaryWhenSemiSyncBecomesUnblockedBetweenChecks tests that demote primary
// proceeds immediately when waiting sessions drops to 0 between the two checks.
func TestDemotePrimaryWhenSemiSyncBecomesUnblockedBetweenChecks(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
ts := memorytopo.NewServer(ctx, "cell1")
tm := newTestTM(t, ts, 1, "ks", "0", nil)
// Make the tablet a primary.
err := tm.ChangeType(ctx, topodatapb.TabletType_PRIMARY, false)
require.NoError(t, err)
fakeMysqlDaemon := tm.MysqlDaemon.(*mysqlctl.FakeMysqlDaemon)
fakeDb := fakeMysqlDaemon.DB()
fakeDb.SetNeverFail(true)

tm.SemiSyncMonitor.Open()

// Set up the query to show waiting sessions on first call, but 0 on second call.
// This simulates the semi-sync becoming unblocked between the two checks.
// The fakesqldb returns results in FIFO order.
// First call: waiting sessions present.
fakeDb.AddQuery("SELECT /*+ MAX_EXECUTION_TIME(1000) */ variable_name, variable_value FROM performance_schema.global_status WHERE REGEXP_LIKE(variable_name, 'Rpl_semi_sync_(source|master)_(wait_sessions|yes_tx)')", sqltypes.MakeTestResult(
sqltypes.MakeTestFields("variable_name|variable_value", "varchar|varchar"),
"Rpl_semi_sync_source_wait_sessions|2",
"Rpl_semi_sync_source_yes_tx|5"))
// Second and subsequent calls: no waiting sessions (unblocked!).
for range 10 {
fakeDb.AddQuery("SELECT /*+ MAX_EXECUTION_TIME(1000) */ variable_name, variable_value FROM performance_schema.global_status WHERE REGEXP_LIKE(variable_name, 'Rpl_semi_sync_(source|master)_(wait_sessions|yes_tx)')", sqltypes.MakeTestResult(
sqltypes.MakeTestFields("variable_name|variable_value", "varchar|varchar"),
"Rpl_semi_sync_source_wait_sessions|0",
"Rpl_semi_sync_source_yes_tx|5"))
}

// Verify that in the beginning the tablet is serving.
require.True(t, tm.QueryServiceControl.IsServing())

// Start the demote primary operation in a go routine.
var demotePrimaryFinished atomic.Bool
go func() {
_, err := tm.demotePrimary(ctx, false)
require.NoError(t, err)
demotePrimaryFinished.Store(true)
}()

// Wait for the demote primary operation to have changed the serving state.
require.Eventually(t, func() bool {
return !tm.QueryServiceControl.IsServing()
}, 5*time.Second, 100*time.Millisecond)

// DemotePrimary should finish quickly because semi-sync became unblocked.
require.Eventually(t, func() bool {
return demotePrimaryFinished.Load()
}, 5*time.Second, 100*time.Millisecond)

// We should have seen the super-read only query.
require.True(t, fakeMysqlDaemon.SuperReadOnly.Load())
}

// TestUndoDemotePrimaryStateChange tests that UndoDemotePrimary
// if able to change the state of the tablet to Primary if there
// is a mismatch with the tablet record.
Expand All @@ -188,7 +314,7 @@ func TestUndoDemotePrimaryStateChange(t *testing.T) {

// Check that the tablet is initially a replica.
require.EqualValues(t, topodatapb.TabletType_REPLICA, tm.Tablet().Type)
// Verify that the tablet record says the tablet should be a primary
// Verify that the tablet record says the tablet should be a primary.
require.EqualValues(t, topodatapb.TabletType_PRIMARY, ti.Type)

err = tm.UndoDemotePrimary(ctx, false)
Expand Down
Loading
Loading