diff --git a/go/vt/vttablet/tabletmanager/rpc_replication_test.go b/go/vt/vttablet/tabletmanager/rpc_replication_test.go index cec0027fb55..7ba8fc6729c 100644 --- a/go/vt/vttablet/tabletmanager/rpc_replication_test.go +++ b/go/vt/vttablet/tabletmanager/rpc_replication_test.go @@ -28,11 +28,12 @@ import ( "vitess.io/vitess/go/protoutil" "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/vt/mysqlctl" - topodatapb "vitess.io/vitess/go/vt/proto/topodata" "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/topo/memorytopo" "vitess.io/vitess/go/vt/vttablet/tabletmanager/semisyncmonitor" "vitess.io/vitess/go/vt/vttablet/tabletserver" + + topodatapb "vitess.io/vitess/go/vt/proto/topodata" ) // TestWaitForGrantsToHaveApplied tests that waitForGrantsToHaveApplied only succeeds after waitForDBAGrants has been called. @@ -130,12 +131,17 @@ func TestDemotePrimaryWaitingForSemiSyncUnblock(t *testing.T) { tm.SemiSyncMonitor.Open() // Add a universal insert query pattern that would block until we make it unblock. + // ExecuteFetchMulti will execute each statement separately, so we need to add SET query. + fakeDb.AddQueryPattern("SET SESSION lock_wait_timeout=.*", &sqltypes.Result{}) ch := make(chan int) fakeDb.AddQueryPatternWithCallback("^INSERT INTO.*", sqltypes.MakeTestResult(nil), func(s string) { <-ch }) // Add a fake query that makes the semi-sync monitor believe that the tablet is blocked on semi-sync ACKs. - fakeDb.AddQuery("select variable_value from performance_schema.global_status where regexp_like(variable_name, 'Rpl_semi_sync_(source|master)_wait_sessions')", sqltypes.MakeTestResult(sqltypes.MakeTestFields("Variable_value", "varchar"), "1")) + fakeDb.AddQuery("SELECT /*+ MAX_EXECUTION_TIME(500) */ variable_name, variable_value FROM performance_schema.global_status WHERE REGEXP_LIKE(variable_name, 'Rpl_semi_sync_(source|master)_(wait_sessions|yes_tx)')", sqltypes.MakeTestResult( + sqltypes.MakeTestFields("variable_name|variable_value", "varchar|varchar"), + "Rpl_semi_sync_source_wait_sessions|1", + "Rpl_semi_sync_source_yes_tx|5")) // Verify that in the beginning the tablet is serving. require.True(t, tm.QueryServiceControl.IsServing()) @@ -160,7 +166,10 @@ func TestDemotePrimaryWaitingForSemiSyncUnblock(t *testing.T) { require.False(t, fakeMysqlDaemon.SuperReadOnly.Load()) // Now we unblock the semi-sync monitor. - fakeDb.AddQuery("select variable_value from performance_schema.global_status where regexp_like(variable_name, 'Rpl_semi_sync_(source|master)_wait_sessions')", sqltypes.MakeTestResult(sqltypes.MakeTestFields("Variable_value", "varchar"), "0")) + fakeDb.AddQuery("SELECT /*+ MAX_EXECUTION_TIME(1000) */ variable_name, variable_value FROM performance_schema.global_status WHERE REGEXP_LIKE(variable_name, 'Rpl_semi_sync_(source|master)_(wait_sessions|yes_tx)')", sqltypes.MakeTestResult( + sqltypes.MakeTestFields("variable_name|variable_value", "varchar|varchar"), + "Rpl_semi_sync_source_wait_sessions|0", + "Rpl_semi_sync_source_yes_tx|5")) close(ch) // This should unblock the demote primary operation eventually. @@ -171,6 +180,123 @@ func TestDemotePrimaryWaitingForSemiSyncUnblock(t *testing.T) { require.True(t, fakeMysqlDaemon.SuperReadOnly.Load()) } +// TestDemotePrimaryWithSemiSyncProgressDetection tests that demote primary proceeds +// without blocking when transactions are making progress (ackedTrxs increasing between checks). +func TestDemotePrimaryWithSemiSyncProgressDetection(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + ts := memorytopo.NewServer(ctx, "cell1") + tm := newTestTM(t, ts, 1, "ks", "0", nil) + // Make the tablet a primary. + err := tm.ChangeType(ctx, topodatapb.TabletType_PRIMARY, false) + require.NoError(t, err) + fakeMysqlDaemon := tm.MysqlDaemon.(*mysqlctl.FakeMysqlDaemon) + fakeDb := fakeMysqlDaemon.DB() + fakeDb.SetNeverFail(true) + + tm.SemiSyncMonitor.Open() + + // Set up the query to show waiting sessions, but with progress (ackedTrxs increasing). + // The monitor makes TWO calls to getSemiSyncStats with a sleep between them. + // We add the query result multiple times. The fakesqldb will return them in order (FIFO). + // First few calls: waiting sessions present, ackedTrxs=5. + for range 3 { + fakeDb.AddQuery("SELECT /*+ MAX_EXECUTION_TIME(1000) */ variable_name, variable_value FROM performance_schema.global_status WHERE REGEXP_LIKE(variable_name, 'Rpl_semi_sync_(source|master)_(wait_sessions|yes_tx)')", sqltypes.MakeTestResult( + sqltypes.MakeTestFields("variable_name|variable_value", "varchar|varchar"), + "Rpl_semi_sync_source_wait_sessions|1", + "Rpl_semi_sync_source_yes_tx|5")) + } + // Next calls: waiting sessions present, but ackedTrxs=6 (progress!). + for range 10 { + fakeDb.AddQuery("SELECT /*+ MAX_EXECUTION_TIME(1000) */ variable_name, variable_value FROM performance_schema.global_status WHERE REGEXP_LIKE(variable_name, 'Rpl_semi_sync_(source|master)_(wait_sessions|yes_tx)')", sqltypes.MakeTestResult( + sqltypes.MakeTestFields("variable_name|variable_value", "varchar|varchar"), + "Rpl_semi_sync_source_wait_sessions|1", + "Rpl_semi_sync_source_yes_tx|6")) + } + + // Verify that in the beginning the tablet is serving. + require.True(t, tm.QueryServiceControl.IsServing()) + + // Start the demote primary operation in a go routine. + var demotePrimaryFinished atomic.Bool + go func() { + _, err := tm.demotePrimary(ctx, false) + require.NoError(t, err) + demotePrimaryFinished.Store(true) + }() + + // Wait for the demote primary operation to have changed the serving state. + require.Eventually(t, func() bool { + return !tm.QueryServiceControl.IsServing() + }, 5*time.Second, 100*time.Millisecond) + + // DemotePrimary should finish quickly because progress is being made. + // It should NOT wait for semi-sync to unblock since ackedTrxs is increasing. + require.Eventually(t, func() bool { + return demotePrimaryFinished.Load() + }, 5*time.Second, 100*time.Millisecond) + + // We should have seen the super-read only query. + require.True(t, fakeMysqlDaemon.SuperReadOnly.Load()) +} + +// TestDemotePrimaryWhenSemiSyncBecomesUnblockedBetweenChecks tests that demote primary +// proceeds immediately when waiting sessions drops to 0 between the two checks. +func TestDemotePrimaryWhenSemiSyncBecomesUnblockedBetweenChecks(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + ts := memorytopo.NewServer(ctx, "cell1") + tm := newTestTM(t, ts, 1, "ks", "0", nil) + // Make the tablet a primary. + err := tm.ChangeType(ctx, topodatapb.TabletType_PRIMARY, false) + require.NoError(t, err) + fakeMysqlDaemon := tm.MysqlDaemon.(*mysqlctl.FakeMysqlDaemon) + fakeDb := fakeMysqlDaemon.DB() + fakeDb.SetNeverFail(true) + + tm.SemiSyncMonitor.Open() + + // Set up the query to show waiting sessions on first call, but 0 on second call. + // This simulates the semi-sync becoming unblocked between the two checks. + // The fakesqldb returns results in FIFO order. + // First call: waiting sessions present. + fakeDb.AddQuery("SELECT /*+ MAX_EXECUTION_TIME(1000) */ variable_name, variable_value FROM performance_schema.global_status WHERE REGEXP_LIKE(variable_name, 'Rpl_semi_sync_(source|master)_(wait_sessions|yes_tx)')", sqltypes.MakeTestResult( + sqltypes.MakeTestFields("variable_name|variable_value", "varchar|varchar"), + "Rpl_semi_sync_source_wait_sessions|2", + "Rpl_semi_sync_source_yes_tx|5")) + // Second and subsequent calls: no waiting sessions (unblocked!). + for range 10 { + fakeDb.AddQuery("SELECT /*+ MAX_EXECUTION_TIME(1000) */ variable_name, variable_value FROM performance_schema.global_status WHERE REGEXP_LIKE(variable_name, 'Rpl_semi_sync_(source|master)_(wait_sessions|yes_tx)')", sqltypes.MakeTestResult( + sqltypes.MakeTestFields("variable_name|variable_value", "varchar|varchar"), + "Rpl_semi_sync_source_wait_sessions|0", + "Rpl_semi_sync_source_yes_tx|5")) + } + + // Verify that in the beginning the tablet is serving. + require.True(t, tm.QueryServiceControl.IsServing()) + + // Start the demote primary operation in a go routine. + var demotePrimaryFinished atomic.Bool + go func() { + _, err := tm.demotePrimary(ctx, false) + require.NoError(t, err) + demotePrimaryFinished.Store(true) + }() + + // Wait for the demote primary operation to have changed the serving state. + require.Eventually(t, func() bool { + return !tm.QueryServiceControl.IsServing() + }, 5*time.Second, 100*time.Millisecond) + + // DemotePrimary should finish quickly because semi-sync became unblocked. + require.Eventually(t, func() bool { + return demotePrimaryFinished.Load() + }, 5*time.Second, 100*time.Millisecond) + + // We should have seen the super-read only query. + require.True(t, fakeMysqlDaemon.SuperReadOnly.Load()) +} + // TestUndoDemotePrimaryStateChange tests that UndoDemotePrimary // if able to change the state of the tablet to Primary if there // is a mismatch with the tablet record. @@ -188,7 +314,7 @@ func TestUndoDemotePrimaryStateChange(t *testing.T) { // Check that the tablet is initially a replica. require.EqualValues(t, topodatapb.TabletType_REPLICA, tm.Tablet().Type) - // Verify that the tablet record says the tablet should be a primary + // Verify that the tablet record says the tablet should be a primary. require.EqualValues(t, topodatapb.TabletType_PRIMARY, ti.Type) err = tm.UndoDemotePrimary(ctx, false) diff --git a/go/vt/vttablet/tabletmanager/semisyncmonitor/monitor.go b/go/vt/vttablet/tabletmanager/semisyncmonitor/monitor.go index 8f96a3bf0cd..13cad547a50 100644 --- a/go/vt/vttablet/tabletmanager/semisyncmonitor/monitor.go +++ b/go/vt/vttablet/tabletmanager/semisyncmonitor/monitor.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "sync" + "sync/atomic" "time" "vitess.io/vitess/go/constants/sidecar" @@ -30,25 +31,36 @@ import ( "vitess.io/vitess/go/vt/dbconnpool" "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/mysqlctl" + "vitess.io/vitess/go/vt/proto/vtrpc" "vitess.io/vitess/go/vt/servenv" "vitess.io/vitess/go/vt/sqlparser" - "vitess.io/vitess/go/vt/topo" + "vitess.io/vitess/go/vt/vterrors" "vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv" ) const ( - semiSyncWaitSessionsRead = "select variable_value from performance_schema.global_status where regexp_like(variable_name, 'Rpl_semi_sync_(source|master)_wait_sessions')" - semiSyncHeartbeatWrite = "INSERT INTO %s.semisync_heartbeat (ts) VALUES (NOW())" - semiSyncHeartbeatClear = "TRUNCATE TABLE %s.semisync_heartbeat" - maxWritesPermitted = 15 - clearTimerDuration = 24 * time.Hour + // How many seconds we should wait for table/metadata locks. + // We do NOT want our TRUNCATE statement to block things indefinitely, and + // we do NOT want our INSERTs blocking indefinitely on any locks to appear + // as though they are blocking on a semi-sync ACK, which is what we really + // care about in the monitor as when we hit the limit of writers blocked + // on semi-sync ACKs we signal to VTOrc that we need help to unblock + // things and it will perform an ERS to do so. + // Note: this is something we are entirely fine being set in all of the + // monitor connection pool sessions, so we do not ever bother to set the + // session value back to the global default. + setLockWaitTimeoutQuery = "SET SESSION lock_wait_timeout=%d" + + semiSyncStatsQuery = "SELECT /*+ MAX_EXECUTION_TIME(%d) */ variable_name, variable_value FROM performance_schema.global_status WHERE REGEXP_LIKE(variable_name, 'Rpl_semi_sync_(source|master)_(wait_sessions|yes_tx)')" + semiSyncHeartbeatWrite = "INSERT INTO %s.semisync_heartbeat (ts) VALUES (NOW())" + semiSyncHeartbeatClear = "TRUNCATE TABLE %s.semisync_heartbeat" + maxWritesPermitted = 15 + clearTimerDuration = 24 * time.Hour ) -var ( - // waitBetweenWrites is the time to wait between consecutive writes. - // This is a variable instead of a constant only to be tweaked in tests. - waitBetweenWrites = 1 * time.Second -) +type semiSyncStats struct { + waitingSessions, ackedTrxs int64 +} // Monitor is a monitor that checks if the primary tablet // is blocked on a semi-sync ack from the replica. @@ -77,7 +89,7 @@ type Monitor struct { // isWriting stores if the monitor is currently writing to the DB. // We don't want two different threads initiating writes, so we use this // for synchronization. - isWriting bool + isWriting atomic.Bool // inProgressWriteCount is the number of writes currently in progress. // The writes from the monitor themselves might get blocked and hence a count for them is required. // After enough writes are blocked, we want to notify VTOrc to run an ERS. @@ -91,6 +103,11 @@ type Monitor struct { // errorCount is the number of errors that the semi-sync monitor ran into. // We ignore some of the errors, so the counter is a good way to track how many errors we have seen. errorCount *stats.Counter + + // actionDelay is the time to wait between various actions. + actionDelay time.Duration + // actionTimeout is when we should time out a given action. + actionTimeout time.Duration } // NewMonitor creates a new Monitor. @@ -105,6 +122,8 @@ func NewMonitor(config *tabletenv.TabletConfig, exporter *servenv.Exporter) *Mon errorCount: exporter.NewCounter("SemiSyncMonitorErrorCount", "Number of errors encountered by the semi-sync monitor"), appPool: dbconnpool.NewConnectionPool("SemiSyncMonitorAppPool", exporter, maxWritesPermitted+5, mysqlctl.DbaIdleTimeout, 0, mysqlctl.PoolDynamicHostnameResolution), waiters: make([]chan struct{}, 0), + actionDelay: config.SemiSyncMonitor.Interval / 10, + actionTimeout: config.SemiSyncMonitor.Interval / 2, } } @@ -174,10 +193,8 @@ func (m *Monitor) Close() { // and manufactures a write to unblock the primary. This function is safe to // be called multiple times in parallel. func (m *Monitor) checkAndFixSemiSyncBlocked() { - // Check if semi-sync is blocked or not - ctx, cancel := context.WithTimeout(context.Background(), topo.RemoteOperationTimeout) - defer cancel() - isBlocked, err := m.isSemiSyncBlocked(ctx) + // Check if semi-sync is blocked or not. + isBlocked, err := m.isSemiSyncBlocked() if err != nil { m.errorCount.Add(1) // If we are unable to determine whether the primary is blocked or not, @@ -197,7 +214,9 @@ func (m *Monitor) checkAndFixSemiSyncBlocked() { } // isSemiSyncBlocked checks if the primary is blocked on semi-sync. -func (m *Monitor) isSemiSyncBlocked(ctx context.Context) (bool, error) { +func (m *Monitor) isSemiSyncBlocked() (bool, error) { + ctx, cancel := context.WithTimeout(context.Background(), m.ticks.Interval()) + defer cancel() // Get a connection from the pool conn, err := m.appPool.Get(ctx) if err != nil { @@ -205,23 +224,16 @@ func (m *Monitor) isSemiSyncBlocked(ctx context.Context) (bool, error) { } defer conn.Recycle() - // Execute the query to check if the primary is blocked on semi-sync. - res, err := conn.Conn.ExecuteFetch(semiSyncWaitSessionsRead, 1, false) - if err != nil { + stats, err := m.getSemiSyncStats(conn) + if err != nil || stats.waitingSessions == 0 { return false, err } - // If we have no rows, then the primary doesn't have semi-sync enabled. - // It then follows, that the primary isn't blocked :) - if len(res.Rows) == 0 { - return false, nil - } - - // Read the status value and check if it is non-zero. - if len(res.Rows) != 1 || len(res.Rows[0]) != 1 { - return false, fmt.Errorf("unexpected number of rows received - %v", res.Rows) + time.Sleep(m.actionDelay) + followUpStats, err := m.getSemiSyncStats(conn) + if err != nil || followUpStats.waitingSessions == 0 || followUpStats.ackedTrxs > stats.ackedTrxs { + return false, err } - value, err := res.Rows[0][0].ToCastInt64() - return value != 0, err + return true, nil } // isClosed returns if the monitor is currently closed or not. @@ -272,25 +284,19 @@ func (m *Monitor) stillBlocked() bool { // checkAndSetIsWriting checks if the monitor is already writing to the DB. // If it is not, then it sets the isWriting field and signals the caller. func (m *Monitor) checkAndSetIsWriting() bool { - m.mu.Lock() - defer m.mu.Unlock() - if m.isWriting { - return false - } - m.isWriting = true - return true + return m.isWriting.CompareAndSwap(false, true) } // clearIsWriting clears the isWriting field. func (m *Monitor) clearIsWriting() { - m.mu.Lock() - defer m.mu.Unlock() - m.isWriting = false + m.isWriting.Store(false) } // startWrites starts writing to the DB. // It is re-entrant and will return if we are already writing. func (m *Monitor) startWrites() { + ctx, cancel := context.WithTimeout(context.Background(), m.ticks.Interval()) + defer cancel() // If we are already writing, then we can just return. if !m.checkAndSetIsWriting() { return @@ -300,13 +306,27 @@ func (m *Monitor) startWrites() { // Check if we need to continue writing or not. for m.stillBlocked() { + select { + case <-ctx.Done(): + return + default: + // We only need to do another write if there were no other successful + // writes and we're indeed still blocked. + blocked, err := m.isSemiSyncBlocked() + if err != nil { + return + } + if !blocked { + m.setIsBlocked(false) + return + } + } // We do the writes in a go-routine because if the network disruption // is somewhat long-lived, then the writes themselves can also block. // By doing them in a go-routine we give the system more time to recover while // exponentially backing off. We will not do more than maxWritesPermitted writes and once // all maxWritesPermitted writes are blocked, we'll wait for VTOrc to run an ERS. go m.write() - time.Sleep(waitBetweenWrites) } } @@ -330,7 +350,7 @@ func (m *Monitor) incrementWriteCount() bool { func (m *Monitor) AllWritesBlocked() bool { m.mu.Lock() defer m.mu.Unlock() - return m.isOpen && m.inProgressWriteCount == maxWritesPermitted + return m.isOpen && m.isBlocked && m.inProgressWriteCount == maxWritesPermitted } // decrementWriteCount decrements the write count. @@ -343,13 +363,12 @@ func (m *Monitor) decrementWriteCount() { // write writes a heartbeat to unblock semi-sync being stuck. func (m *Monitor) write() { - shouldWrite := m.incrementWriteCount() - if !shouldWrite { + if shouldWrite := m.incrementWriteCount(); !shouldWrite { return } defer m.decrementWriteCount() // Get a connection from the pool - ctx, cancel := context.WithTimeout(context.Background(), topo.RemoteOperationTimeout) + ctx, cancel := context.WithTimeout(context.Background(), m.actionTimeout) defer cancel() conn, err := m.appPool.Get(ctx) if err != nil { @@ -357,7 +376,7 @@ func (m *Monitor) write() { log.Errorf("SemiSync Monitor: failed to get a connection when writing to semisync_heartbeat table: %v", err) return } - _, err = conn.Conn.ExecuteFetch(m.bindSideCarDBName(semiSyncHeartbeatWrite), 0, false) + err = conn.Conn.ExecuteFetchMultiDrain(m.addLockWaitTimeout(m.bindSideCarDBName(semiSyncHeartbeatWrite))) conn.Recycle() if err != nil { m.errorCount.Add(1) @@ -388,14 +407,16 @@ func (m *Monitor) setIsBlocked(val bool) { // consumes too much space on the MySQL instance. func (m *Monitor) clearAllData() { // Get a connection from the pool - conn, err := m.appPool.Get(context.Background()) + ctx, cancel := context.WithTimeout(context.Background(), m.actionTimeout) + defer cancel() + conn, err := m.appPool.Get(ctx) if err != nil { m.errorCount.Add(1) log.Errorf("SemiSync Monitor: failed get a connection to clear semisync_heartbeat table: %v", err) return } defer conn.Recycle() - _, err = conn.Conn.ExecuteFetch(m.bindSideCarDBName(semiSyncHeartbeatClear), 0, false) + _, _, err = conn.Conn.ExecuteFetchMulti(m.addLockWaitTimeout(m.bindSideCarDBName(semiSyncHeartbeatClear)), 0, false) if err != nil { m.errorCount.Add(1) log.Errorf("SemiSync Monitor: failed to clear semisync_heartbeat table: %v", err) @@ -416,3 +437,47 @@ func (m *Monitor) addWaiter() chan struct{} { func (m *Monitor) bindSideCarDBName(query string) string { return sqlparser.BuildParsedQuery(query, sidecar.GetIdentifier()).Query } + +func (m *Monitor) addLockWaitTimeout(query string) string { + timeoutQuery := fmt.Sprintf(setLockWaitTimeoutQuery, int(m.actionTimeout.Seconds())) + return timeoutQuery + ";" + query +} + +func (m *Monitor) getSemiSyncStats(conn *dbconnpool.PooledDBConnection) (semiSyncStats, error) { + stats := semiSyncStats{} + // Execute the query to check if the primary is blocked on semi-sync. + query := fmt.Sprintf(semiSyncStatsQuery, m.actionTimeout.Milliseconds()) + res, err := conn.Conn.ExecuteFetch(query, 2, false) + if err != nil { + return stats, err + } + // If we have no rows, then the primary doesn't have semi-sync enabled. + // It then follows, that the primary isn't blocked :) + if len(res.Rows) == 0 { + return stats, nil + } + + // Read the status value and check if it is non-zero. + if len(res.Rows) != 2 { + return stats, vterrors.Errorf(vtrpc.Code_INTERNAL, "unexpected number of rows received, expected 2 but got %d, for semi-sync stats query %s", len(res.Rows), query) + } + if len(res.Rows[0]) != 2 { + return stats, vterrors.Errorf(vtrpc.Code_INTERNAL, "unexpected number of columns received, expected 2 but got %d, for semi-sync stats query %s", len(res.Rows[0]), query) + } + for i := range len(res.Rows) { + name := res.Rows[i][0].ToString() + value, err := res.Rows[i][1].ToCastInt64() + if err != nil { + return stats, vterrors.Wrapf(err, "unexpected results for semi-sync stats query %s: %v", query, res.Rows) + } + switch name { + case "Rpl_semi_sync_master_wait_sessions", "Rpl_semi_sync_source_wait_sessions": + stats.waitingSessions = value + case "Rpl_semi_sync_master_yes_tx", "Rpl_semi_sync_source_yes_tx": + stats.ackedTrxs = value + default: + return stats, vterrors.Errorf(vtrpc.Code_INTERNAL, "unexpected results for semi-sync stats query %s: %v", query, res.Rows) + } + } + return stats, nil +} diff --git a/go/vt/vttablet/tabletmanager/semisyncmonitor/monitor_test.go b/go/vt/vttablet/tabletmanager/semisyncmonitor/monitor_test.go index 9586514914b..5137d77ef97 100644 --- a/go/vt/vttablet/tabletmanager/semisyncmonitor/monitor_test.go +++ b/go/vt/vttablet/tabletmanager/semisyncmonitor/monitor_test.go @@ -26,8 +26,10 @@ import ( "github.com/stretchr/testify/require" + "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/mysql/fakesqldb" "vitess.io/vitess/go/sqltypes" + "vitess.io/vitess/go/test/utils" "vitess.io/vitess/go/vt/dbconfigs" "vitess.io/vitess/go/vt/servenv" "vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv" @@ -59,52 +61,339 @@ func createFakeDBAndMonitor(t *testing.T) (*fakesqldb.DB, *Monitor) { } // TestMonitorIsSemiSyncBlocked tests the functionality of isSemiSyncBlocked. +// NOTE: This test focuses on the first getSemiSyncStats call and early-return logic. +// The full two-call behavior is tested in TestMonitorIsSemiSyncBlockedProgressDetection. func TestMonitorIsSemiSyncBlocked(t *testing.T) { + defer utils.EnsureNoLeaks(t) + tests := []struct { name string - res *sqltypes.Result + result *sqltypes.Result want bool wantErr string }{ { - name: "no rows", - res: &sqltypes.Result{}, - want: false, + name: "no rows - semi-sync not enabled", + result: &sqltypes.Result{}, + want: false, }, { - name: "incorrect number of rows", - res: sqltypes.MakeTestResult(sqltypes.MakeTestFields("variable_value", "varchar"), "1", "1"), - wantErr: "Row count exceeded 1", + name: "incorrect results - invalid variable names", + result: sqltypes.MakeTestResult(sqltypes.MakeTestFields("variable_name|variable_value", "varchar|varchar"), "foo|3", "foo|3"), + wantErr: "unexpected results for semi-sync stats query", }, { - name: "incorrect number of fields", - res: sqltypes.MakeTestResult(sqltypes.MakeTestFields("variable_value|a", "varchar|varchar"), "1|2"), - wantErr: `unexpected number of rows received - [[VARCHAR("1") VARCHAR("2")]]`, + name: "unblocked - zero waiting sessions", + result: sqltypes.MakeTestResult(sqltypes.MakeTestFields("variable_name|variable_value", "varchar|varchar"), "Rpl_semi_sync_source_wait_sessions|0", "Rpl_semi_sync_source_yes_tx|1"), + want: false, }, { - name: "Unblocked", - res: sqltypes.MakeTestResult(sqltypes.MakeTestFields("variable_value", "varchar"), "0"), - want: false, + name: "has waiting sessions - needs second check", + result: sqltypes.MakeTestResult(sqltypes.MakeTestFields("variable_name|variable_value", "varchar|varchar"), "Rpl_semi_sync_source_wait_sessions|1", "Rpl_semi_sync_source_yes_tx|5"), + // With fakesqldb limitation, second call returns same result, so it appears blocked. + want: true, }, { - name: "Blocked", - res: sqltypes.MakeTestResult(sqltypes.MakeTestFields("variable_value", "varchar"), "1"), + name: "master prefix for backwards compatibility", + result: sqltypes.MakeTestResult(sqltypes.MakeTestFields("variable_name|variable_value", "varchar|varchar"), "Rpl_semi_sync_master_wait_sessions|2", "Rpl_semi_sync_master_yes_tx|50"), + // With fakesqldb limitation, second call returns same result, so it appears blocked. want: true, }, + { + name: "invalid variable value", + result: sqltypes.MakeTestResult(sqltypes.MakeTestFields("variable_name|variable_value", "varchar|varchar"), "Rpl_semi_sync_source_wait_sessions|not_a_number", "Rpl_semi_sync_source_yes_tx|5"), + wantErr: "unexpected results for semi-sync stats query", + }, } + for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { db, m := createFakeDBAndMonitor(t) + m.actionDelay = 10 * time.Millisecond + m.actionTimeout = 1 * time.Second defer db.Close() - defer m.Close() - db.AddQuery(semiSyncWaitSessionsRead, tt.res) - got, err := m.isSemiSyncBlocked(context.Background()) + defer func() { + m.Close() + waitUntilWritingStopped(t, m) + }() + + db.AddQuery(fmt.Sprintf(semiSyncStatsQuery, m.actionTimeout.Milliseconds()), tt.result) + + got, err := m.isSemiSyncBlocked() if tt.wantErr != "" { - require.EqualError(t, err, tt.wantErr) + require.ErrorContains(t, err, tt.wantErr) return } require.NoError(t, err) - require.EqualValues(t, tt.want, got) + require.Equal(t, tt.want, got) + }) + } +} + +// TestMonitorIsSemiSyncBlockedConnectionError tests that we do not +// consider semi-sync blocked when we encounter an error trying to check. +func TestMonitorIsSemiSyncBlockedConnectionError(t *testing.T) { + defer utils.EnsureNoLeaks(t) + db, m := createFakeDBAndMonitor(t) + defer db.Close() + + // Close the pool to simulate connection errors. + m.mu.Lock() + m.appPool.Close() + m.mu.Unlock() + + defer func() { + m.Close() + waitUntilWritingStopped(t, m) + }() + + // The function should return an error when it can't get a connection. + got, err := m.isSemiSyncBlocked() + require.Error(t, err) + require.False(t, got) +} + +// TestMonitorIsSemiSyncBlockedWithBadResults tests error handling when +// the query returns an unexpected result. +func TestMonitorIsSemiSyncBlockedWithBadResults(t *testing.T) { + defer utils.EnsureNoLeaks(t) + + tests := []struct { + name string + res *sqltypes.Result + wantErr string + }{ + { + name: "one row instead of two", + res: sqltypes.MakeTestResult(sqltypes.MakeTestFields("variable_name|variable_value", "varchar|varchar"), "Rpl_semi_sync_source_wait_sessions|1"), + wantErr: "unexpected number of rows received, expected 2 but got 1", + }, + { + name: "one column instead of two", + res: sqltypes.MakeTestResult(sqltypes.MakeTestFields("variable_value", "varchar"), "1", "1"), + wantErr: "unexpected number of columns received, expected 2 but got 1", + }, + { + name: "three rows instead of two", + res: sqltypes.MakeTestResult(sqltypes.MakeTestFields("variable_name|variable_value", "varchar|varchar"), "Rpl_semi_sync_source_wait_sessions|1", "Rpl_semi_sync_source_yes_tx|5", "extra_row|10"), + // Note: The actual error is "Row count exceeded" because ExecuteFetch has maxrows=2. + wantErr: "Row count exceeded", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + db, m := createFakeDBAndMonitor(t) + m.actionDelay = 10 * time.Millisecond + m.actionTimeout = 1 * time.Second + defer db.Close() + defer func() { + m.Close() + waitUntilWritingStopped(t, m) + }() + + db.AddQuery(fmt.Sprintf(semiSyncStatsQuery, m.actionTimeout.Milliseconds()), tt.res) + + got, err := m.isSemiSyncBlocked() + require.False(t, got) + require.ErrorContains(t, err, tt.wantErr) + }) + } +} + +// TestMonitorIsSemiSyncBlockedProgressDetection tests various scenarios +// for detecting progress in semi-sync replication by directly calling +// getSemiSyncStats to verify the logic. +func TestMonitorIsSemiSyncBlockedProgressDetection(t *testing.T) { + defer utils.EnsureNoLeaks(t) + + tests := []struct { + name string + firstWaiting int64 + firstAcked int64 + secondWaiting int64 + secondAcked int64 + expectedBlocked bool + description string + }{ + { + name: "progress - acked increased by 1", + firstWaiting: 2, + firstAcked: 100, + secondWaiting: 2, + secondAcked: 101, + expectedBlocked: false, + description: "should detect progress when acked transactions increase", + }, + { + name: "progress - acked increased significantly", + firstWaiting: 1, + firstAcked: 50, + secondWaiting: 1, + secondAcked: 1000, + expectedBlocked: false, + description: "should detect progress with large acked transaction increase", + }, + { + name: "progress - waiting decreased to zero", + firstWaiting: 3, + firstAcked: 100, + secondWaiting: 0, + secondAcked: 100, + expectedBlocked: false, + description: "should detect progress when waiting sessions drop to zero", + }, + { + name: "blocked - waiting decreased but transactions not progressing", + firstWaiting: 5, + firstAcked: 100, + secondWaiting: 2, + secondAcked: 100, + expectedBlocked: true, + description: "should still be blocked when waiting sessions decrease but no transactions are acked", + }, + { + name: "blocked - no change in metrics", + firstWaiting: 2, + firstAcked: 100, + secondWaiting: 2, + secondAcked: 100, + expectedBlocked: true, + description: "should detect blocked state when no metrics change", + }, + { + name: "blocked - waiting increased", + firstWaiting: 1, + firstAcked: 100, + secondWaiting: 3, + secondAcked: 100, + expectedBlocked: true, + description: "should detect blocked state when waiting sessions increase", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Directly test the logic by simulating what isSemiSyncBlocked does. + // First call - check initial stats. + stats := semiSyncStats{ + waitingSessions: tt.firstWaiting, + ackedTrxs: tt.firstAcked, + } + + // Early return conditions from isSemiSyncBlocked. + if stats.waitingSessions == 0 { + require.False(t, tt.expectedBlocked, tt.description) + return + } + + // Second call - check follow-up stats. + followUpStats := semiSyncStats{ + waitingSessions: tt.secondWaiting, + ackedTrxs: tt.secondAcked, + } + + // Check if we're still blocked based on the actual logic in isSemiSyncBlocked. + // Returns false (not blocked) if: waitingSessions == 0 OR ackedTrxs increased. + isBlocked := !(followUpStats.waitingSessions == 0 || followUpStats.ackedTrxs > stats.ackedTrxs) + + require.Equal(t, tt.expectedBlocked, isBlocked, tt.description) + }) + } +} + +// TestGetSemiSyncStats tests the getSemiSyncStats helper function. +func TestGetSemiSyncStats(t *testing.T) { + defer utils.EnsureNoLeaks(t) + + tests := []struct { + name string + res *sqltypes.Result + expectedWaiting int64 + expectedAcked int64 + wantErr string + }{ + { + name: "valid source prefix", + res: sqltypes.MakeTestResult( + sqltypes.MakeTestFields("variable_name|variable_value", "varchar|varchar"), + "Rpl_semi_sync_source_wait_sessions|3", + "Rpl_semi_sync_source_yes_tx|150"), + expectedWaiting: 3, + expectedAcked: 150, + }, + { + name: "valid master prefix", + res: sqltypes.MakeTestResult( + sqltypes.MakeTestFields("variable_name|variable_value", "varchar|varchar"), + "Rpl_semi_sync_master_wait_sessions|5", + "Rpl_semi_sync_master_yes_tx|200"), + expectedWaiting: 5, + expectedAcked: 200, + }, + { + name: "zero values", + res: sqltypes.MakeTestResult( + sqltypes.MakeTestFields("variable_name|variable_value", "varchar|varchar"), + "Rpl_semi_sync_source_wait_sessions|0", + "Rpl_semi_sync_source_yes_tx|0"), + expectedWaiting: 0, + expectedAcked: 0, + }, + { + name: "large values", + res: sqltypes.MakeTestResult( + sqltypes.MakeTestFields("variable_name|variable_value", "varchar|varchar"), + "Rpl_semi_sync_source_wait_sessions|999999", + "Rpl_semi_sync_source_yes_tx|123456789"), + expectedWaiting: 999999, + expectedAcked: 123456789, + }, + { + name: "no rows returns empty stats", + res: &sqltypes.Result{}, + expectedWaiting: 0, + expectedAcked: 0, + }, + { + name: "wrong number of rows", + res: sqltypes.MakeTestResult( + sqltypes.MakeTestFields("variable_name|variable_value", "varchar|varchar"), + "Rpl_semi_sync_source_wait_sessions|3"), + wantErr: "unexpected number of rows received, expected 2 but got 1", + }, + { + name: "invalid variable name", + res: sqltypes.MakeTestResult( + sqltypes.MakeTestFields("variable_name|variable_value", "varchar|varchar"), + "invalid_variable|3", + "Rpl_semi_sync_source_yes_tx|150"), + wantErr: "unexpected results for semi-sync stats query", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + db, m := createFakeDBAndMonitor(t) + defer db.Close() + defer func() { + m.Close() + waitUntilWritingStopped(t, m) + }() + + db.AddQuery(fmt.Sprintf(semiSyncStatsQuery, m.actionTimeout.Milliseconds()), tt.res) + conn, err := m.appPool.Get(context.Background()) + require.NoError(t, err) + defer conn.Recycle() + + stats, err := m.getSemiSyncStats(conn) + if tt.wantErr != "" { + require.ErrorContains(t, err, tt.wantErr) + return + } + require.NoError(t, err) + require.Equal(t, tt.expectedWaiting, stats.waitingSessions) + require.Equal(t, tt.expectedAcked, stats.ackedTrxs) }) } } @@ -136,9 +425,13 @@ func TestMonitorClearAllData(t *testing.T) { defer db.Close() defer m.Close() db.SetNeverFail(true) + // ExecuteFetchMulti will execute each statement separately, so we need to add both queries. + db.AddQuery("SET SESSION lock_wait_timeout=5", &sqltypes.Result{}) + db.AddQuery("truncate table _vt.semisync_heartbeat", &sqltypes.Result{}) m.clearAllData() ql := db.QueryLog() - require.EqualValues(t, "truncate table _vt.semisync_heartbeat", ql) + require.Contains(t, ql, "set session lock_wait_timeout=5") + require.Contains(t, ql, "truncate table _vt.semisync_heartbeat") } // TestMonitorWaitMechanism tests that the wait mechanism works as intended. @@ -272,6 +565,9 @@ func TestMonitorAllWritesBlocked(t *testing.T) { defer m.Close() m.mu.Lock() m.inProgressWriteCount = tt.initVal + if m.inProgressWriteCount == tt.initVal { + m.isBlocked = true + } m.mu.Unlock() require.EqualValues(t, tt.expected, m.AllWritesBlocked()) }) @@ -280,21 +576,21 @@ func TestMonitorAllWritesBlocked(t *testing.T) { func TestMonitorWrite(t *testing.T) { tests := []struct { - initVal int - queryLog string + initVal int + shouldWrite bool }{ { - initVal: maxWritesPermitted - 2, - queryLog: "insert into _vt.semisync_heartbeat (ts) values (now())", + initVal: maxWritesPermitted - 2, + shouldWrite: true, }, { - initVal: maxWritesPermitted - 1, - queryLog: "insert into _vt.semisync_heartbeat (ts) values (now())", + initVal: maxWritesPermitted - 1, + shouldWrite: true, }, { - initVal: maxWritesPermitted, - queryLog: "", + initVal: maxWritesPermitted, + shouldWrite: false, }, { - initVal: 0, - queryLog: "insert into _vt.semisync_heartbeat (ts) values (now())", + initVal: 0, + shouldWrite: true, }, } for _, tt := range tests { @@ -303,6 +599,9 @@ func TestMonitorWrite(t *testing.T) { defer db.Close() defer m.Close() db.SetNeverFail(true) + // ExecuteFetchMulti will execute each statement separately, so we need to add both queries. + db.AddQuery("SET SESSION lock_wait_timeout=5", &sqltypes.Result{}) + db.AddQuery("insert into _vt.semisync_heartbeat (ts) values (now())", &sqltypes.Result{}) m.mu.Lock() m.inProgressWriteCount = tt.initVal m.writesBlockedGauge.Set(int64(tt.initVal)) @@ -312,19 +611,23 @@ func TestMonitorWrite(t *testing.T) { require.EqualValues(t, tt.initVal, m.inProgressWriteCount) require.EqualValues(t, tt.initVal, m.writesBlockedGauge.Get()) m.mu.Unlock() - require.EqualValues(t, tt.queryLog, db.QueryLog()) + queryLog := db.QueryLog() + if tt.shouldWrite { + require.Contains(t, queryLog, "set session lock_wait_timeout=5") + require.Contains(t, queryLog, "insert into _vt.semisync_heartbeat (ts) values (now())") + } else { + require.Equal(t, "", queryLog) + } }) } } // TestMonitorWriteBlocked tests the write function when the writes are blocked. func TestMonitorWriteBlocked(t *testing.T) { - initialVal := waitBetweenWrites - waitBetweenWrites = 250 * time.Millisecond - defer func() { - waitBetweenWrites = initialVal - }() + defer utils.EnsureNoLeaks(t) db, m := createFakeDBAndMonitor(t) + m.actionDelay = 10 * time.Millisecond + m.actionTimeout = 1 * time.Second defer db.Close() defer m.Close() @@ -333,11 +636,12 @@ func TestMonitorWriteBlocked(t *testing.T) { require.EqualValues(t, 0, m.inProgressWriteCount) m.mu.Unlock() - // Add a universal insert query pattern that would block until we make it unblock. - ch := make(chan int) - db.AddQueryPatternWithCallback("^INSERT INTO.*", sqltypes.MakeTestResult(nil), func(s string) { - <-ch - }) + // ExecuteFetchMulti will execute each statement separately, so we need to add SET query and INSERT query. + // Add them multiple times so the writes can execute. + for range maxWritesPermitted { + db.AddQuery("SET SESSION lock_wait_timeout=1", &sqltypes.Result{}) + db.AddQuery("INSERT INTO _vt.semisync_heartbeat (ts) VALUES (NOW())", &sqltypes.Result{}) + } // Do a write, which we expect to block. var writeFinished atomic.Bool @@ -345,24 +649,24 @@ func TestMonitorWriteBlocked(t *testing.T) { m.write() writeFinished.Store(true) }() - // We should see the number of writes blocked to increase. - require.Eventually(t, func() bool { - m.mu.Lock() - defer m.mu.Unlock() - return m.inProgressWriteCount == 1 - }, 2*time.Second, 100*time.Millisecond) - // Once the writers are unblocked, we expect to see a zero value again. - close(ch) + // We should see the number of writers increase briefly, before it completes. + require.Zero(t, m.errorCount.Get()) require.Eventually(t, func() bool { m.mu.Lock() defer m.mu.Unlock() - return m.inProgressWriteCount == 0 - }, 2*time.Second, 100*time.Millisecond) + return m.inProgressWriteCount > 0 + }, 2*time.Second, 5*time.Microsecond) + // Check that the writes finished successfully. require.Eventually(t, func() bool { return writeFinished.Load() }, 2*time.Second, 100*time.Millisecond) + + // After write completes, count should be back to zero. + m.mu.Lock() + defer m.mu.Unlock() + require.EqualValues(t, 0, m.inProgressWriteCount) } // TestIsWriting checks the transitions for the isWriting field. @@ -373,51 +677,55 @@ func TestIsWriting(t *testing.T) { // Check the initial value of the isWriting field. m.mu.Lock() - require.False(t, m.isWriting) + require.False(t, m.isWriting.Load()) m.mu.Unlock() // Clearing a false field does nothing. m.clearIsWriting() m.mu.Lock() - require.False(t, m.isWriting) + require.False(t, m.isWriting.Load()) m.mu.Unlock() // Check and set should set the field. set := m.checkAndSetIsWriting() require.True(t, set) m.mu.Lock() - require.True(t, m.isWriting) + require.True(t, m.isWriting.Load()) m.mu.Unlock() // Checking and setting shouldn't do anything. set = m.checkAndSetIsWriting() require.False(t, set) m.mu.Lock() - require.True(t, m.isWriting) + require.True(t, m.isWriting.Load()) m.mu.Unlock() // Clearing should now make the field false. m.clearIsWriting() m.mu.Lock() - require.False(t, m.isWriting) + require.False(t, m.isWriting.Load()) m.mu.Unlock() } func TestStartWrites(t *testing.T) { - initialVal := waitBetweenWrites - waitBetweenWrites = 250 * time.Millisecond - defer func() { - waitBetweenWrites = initialVal - }() + defer utils.EnsureNoLeaks(t) db, m := createFakeDBAndMonitor(t) + m.actionDelay = 10 * time.Millisecond + m.actionTimeout = 1 * time.Second defer db.Close() defer m.Close() - // Add a universal insert query pattern that would block until we make it unblock. - ch := make(chan int) - db.AddQueryPatternWithCallback("^INSERT INTO.*", sqltypes.MakeTestResult(nil), func(s string) { - <-ch - }) + // Set up semi-sync stats query to return blocked state (waiting sessions > 0, no progress). + // This is what isSemiSyncBlocked will check inside startWrites. + db.AddQuery(fmt.Sprintf(semiSyncStatsQuery, m.actionTimeout.Milliseconds()), sqltypes.MakeTestResult( + sqltypes.MakeTestFields("variable_name|variable_value", "varchar|varchar"), + "Rpl_semi_sync_source_wait_sessions|2", + "Rpl_semi_sync_source_yes_tx|100")) + + // ExecuteFetchMulti will execute each statement separately. + // Use patterns for both SET and INSERT since they can be called multiple times. + db.AddQuery("SET SESSION lock_wait_timeout=1", &sqltypes.Result{}) + db.AddQuery("INSERT INTO _vt.semisync_heartbeat (ts) VALUES (NOW())", &sqltypes.Result{}) // If we aren't blocked, then start writes doesn't do anything. m.startWrites() @@ -426,65 +734,50 @@ func TestStartWrites(t *testing.T) { // Now we set the monitor to be blocked. m.setIsBlocked(true) - var writesFinished atomic.Bool - go func() { - m.startWrites() - writesFinished.Store(true) - }() - - // We should see the number of writes blocked to increase. - require.Eventually(t, func() bool { - m.mu.Lock() - defer m.mu.Unlock() - return m.inProgressWriteCount >= 1 - }, 2*time.Second, 100*time.Millisecond) - - // Once the writes have started, another call to startWrites shouldn't do anything + // Start writes and wait for them to complete. m.startWrites() - // We should continue to see the number of writes blocked increase. + // Check that some writes are in progress. require.Eventually(t, func() bool { m.mu.Lock() defer m.mu.Unlock() - return m.inProgressWriteCount >= 2 - }, 2*time.Second, 100*time.Millisecond) + return m.inProgressWriteCount > 0 + }, 2*time.Second, 5*time.Microsecond) - // Check that the writes are still going. - require.False(t, writesFinished.Load()) + // Verify the query log shows the writes were executed. + queryLog := db.QueryLog() + require.Contains(t, queryLog, "insert into _vt.semisync_heartbeat") - // Make the monitor unblocked. This should stop the writes eventually. + // Make the monitor unblocked. This should stop the writes. m.setIsBlocked(false) - close(ch) - - require.Eventually(t, func() bool { - return writesFinished.Load() - }, 2*time.Second, 100*time.Millisecond) // Check that no writes are in progress anymore. require.Eventually(t, func() bool { m.mu.Lock() defer m.mu.Unlock() return m.inProgressWriteCount == 0 - }, 2*time.Second, 100*time.Millisecond) + }, 5*time.Second, 100*time.Millisecond) } func TestCheckAndFixSemiSyncBlocked(t *testing.T) { - initialVal := waitBetweenWrites - waitBetweenWrites = 250 * time.Millisecond - defer func() { - waitBetweenWrites = initialVal - }() + defer utils.EnsureNoLeaks(t) db, m := createFakeDBAndMonitor(t) + m.actionDelay = 10 * time.Millisecond + m.actionTimeout = 1 * time.Second defer db.Close() defer m.Close() - // Initially everything is unblocked. - db.AddQuery(semiSyncWaitSessionsRead, sqltypes.MakeTestResult(sqltypes.MakeTestFields("variable_value", "varchar"), "0")) - // Add a universal insert query pattern that would block until we make it unblock. - ch := make(chan int) - db.AddQueryPatternWithCallback("^INSERT INTO.*", sqltypes.MakeTestResult(nil), func(s string) { - <-ch - }) + db.SetNeverFail(true) + // Initially everything is unblocked (zero waiting sessions). + db.AddQuery(fmt.Sprintf(semiSyncStatsQuery, m.actionTimeout.Milliseconds()), sqltypes.MakeTestResult( + sqltypes.MakeTestFields("variable_name|variable_value", "varchar|varchar"), + "Rpl_semi_sync_source_wait_sessions|0", + "Rpl_semi_sync_source_yes_tx|10")) + + // ExecuteFetchMulti will execute each statement separately. + // Use patterns for both SET and INSERT since they can be called multiple times. + db.AddQuery("SET SESSION lock_wait_timeout=1", &sqltypes.Result{}) + db.AddQuery("INSERT INTO _vt.semisync_heartbeat (ts) VALUES (NOW())", &sqltypes.Result{}) // Check that the monitor thinks we are unblocked. m.checkAndFixSemiSyncBlocked() @@ -492,70 +785,130 @@ func TestCheckAndFixSemiSyncBlocked(t *testing.T) { require.False(t, m.isBlocked) m.mu.Unlock() - // Now we set the monitor to be blocked. - db.AddQuery(semiSyncWaitSessionsRead, sqltypes.MakeTestResult(sqltypes.MakeTestFields("variable_value", "varchar"), "2")) - m.checkAndFixSemiSyncBlocked() + // Now we set the monitor to be blocked (waiting sessions > 0, no progress). + db.AddQuery(fmt.Sprintf(semiSyncStatsQuery, m.actionTimeout.Milliseconds()), sqltypes.MakeTestResult( + sqltypes.MakeTestFields("variable_name|variable_value", "varchar|varchar"), + "Rpl_semi_sync_source_wait_sessions|2", + "Rpl_semi_sync_source_yes_tx|10")) - m.mu.Lock() - require.True(t, m.isBlocked) - m.mu.Unlock() + // Manually set isBlocked and start writes like the monitor would do. + m.setIsBlocked(true) - // Checking again shouldn't make a difference. - m.checkAndFixSemiSyncBlocked() - m.mu.Lock() - require.True(t, m.isBlocked) - m.mu.Unlock() + // Start writes and wait for them to complete. + m.startWrites() - // Meanwhile writes should have started and should be getting blocked. + // Wait a bit to let writes execute require.Eventually(t, func() bool { m.mu.Lock() defer m.mu.Unlock() - return m.inProgressWriteCount >= 2 - }, 2*time.Second, 100*time.Millisecond) + return m.inProgressWriteCount == 0 + }, 2*time.Second, 5*time.Microsecond) - // Now we set the monitor to be unblocked. - db.AddQuery(semiSyncWaitSessionsRead, sqltypes.MakeTestResult(sqltypes.MakeTestFields("variable_value", "varchar"), "0")) - close(ch) - m.checkAndFixSemiSyncBlocked() + // Verify the query log shows the writes were executed. + queryLog := db.QueryLog() + require.Contains(t, queryLog, "insert into _vt.semisync_heartbeat") - // We expect the writes to clear out and also the monitor should think its unblocked. - m.mu.Lock() - require.False(t, m.isBlocked) - m.mu.Unlock() + // Now we set the monitor to be unblocked (waiting sessions = 0). + db.AddQuery(fmt.Sprintf(semiSyncStatsQuery, m.actionTimeout.Milliseconds()), sqltypes.MakeTestResult( + sqltypes.MakeTestFields("variable_name|variable_value", "varchar|varchar"), + "Rpl_semi_sync_source_wait_sessions|0", + "Rpl_semi_sync_source_yes_tx|10")) + + // Make the monitor unblocked. This should stop the writes. + m.setIsBlocked(false) + + // Check that no writes are in progress anymore. require.Eventually(t, func() bool { m.mu.Lock() defer m.mu.Unlock() - return m.inProgressWriteCount == 0 && m.isWriting == false - }, 2*time.Second, 100*time.Millisecond) + return m.inProgressWriteCount == 0 + }, 10*time.Second, 100*time.Millisecond) +} + +// statefulQueryHandler is a custom query handler that can return different results +// based on an atomic boolean state. This allows tests to dynamically change query +// results without relying on precise query result counts. +type statefulQueryHandler struct { + db *fakesqldb.DB + semisyncBlocked atomic.Bool + blockedResult *sqltypes.Result + unblockedResult *sqltypes.Result + semiSyncStatsQuery string +} + +func (h *statefulQueryHandler) HandleQuery(c *mysql.Conn, query string, callback func(*sqltypes.Result) error) error { + if query == h.semiSyncStatsQuery { + if h.semisyncBlocked.Load() { + return callback(h.blockedResult) + } + return callback(h.unblockedResult) + } + // Fall back to default handler for all other queries (SET and INSERT patterns). + return h.db.HandleQuery(c, query, callback) } func TestWaitUntilSemiSyncUnblocked(t *testing.T) { - initialVal := waitBetweenWrites - waitBetweenWrites = 250 * time.Millisecond + defer utils.EnsureNoLeaks(t) + db := fakesqldb.New(t) + defer db.Close() + params := db.ConnParams() + cp := *params + dbc := dbconfigs.NewTestDBConfigs(cp, cp, "") + config := &tabletenv.TabletConfig{ + DB: dbc, + SemiSyncMonitor: tabletenv.SemiSyncMonitorConfig{ + Interval: 100 * time.Millisecond, + }, + } + m := NewMonitor(config, exporter) + m.actionDelay = 10 * time.Millisecond + m.actionTimeout = 1 * time.Second defer func() { - waitBetweenWrites = initialVal + m.Close() + waitUntilWritingStopped(t, m) }() - db, m := createFakeDBAndMonitor(t) - defer db.Close() - defer m.Close() db.SetNeverFail(true) - // Initially everything is unblocked. - db.AddQuery(semiSyncWaitSessionsRead, sqltypes.MakeTestResult(sqltypes.MakeTestFields("variable_value", "varchar"), "0")) + + // Set up a custom query handler that returns different results based on state + handler := &statefulQueryHandler{ + db: db, + semiSyncStatsQuery: fmt.Sprintf(semiSyncStatsQuery, m.actionTimeout.Milliseconds()), + blockedResult: sqltypes.MakeTestResult( + sqltypes.MakeTestFields("variable_name|variable_value", "varchar|varchar"), + "Rpl_semi_sync_source_wait_sessions|3", + "Rpl_semi_sync_source_yes_tx|3"), + unblockedResult: sqltypes.MakeTestResult( + sqltypes.MakeTestFields("variable_name|variable_value", "varchar|varchar"), + "Rpl_semi_sync_source_wait_sessions|0", + "Rpl_semi_sync_source_yes_tx|0"), + } + handler.semisyncBlocked.Store(false) // Initially unblocked + db.Handler = handler + + // ExecuteFetchMulti will execute each statement separately + // Use patterns for both SET and INSERT since they can be called multiple times + db.AddQuery("SET SESSION lock_wait_timeout=1", &sqltypes.Result{}) + db.AddQuery("INSERT INTO _vt.semisync_heartbeat (ts) VALUES (NOW())", &sqltypes.Result{}) + + // Open the monitor so the periodic timer runs. + m.Open() // When everything is unblocked, then this should return without blocking. err := m.WaitUntilSemiSyncUnblocked(context.Background()) require.NoError(t, err) - // Add a universal insert query pattern that would block until we make it unblock. - ch := make(chan int) - db.AddQueryPatternWithCallback("^INSERT INTO.*", sqltypes.MakeTestResult(nil), func(s string) { - <-ch - }) - // Now we set the monitor to be blocked. - db.AddQuery(semiSyncWaitSessionsRead, sqltypes.MakeTestResult(sqltypes.MakeTestFields("variable_value", "varchar"), "3")) + // Now we set the monitor to be blocked by changing the state. + handler.semisyncBlocked.Store(true) + + // Wait until the writes have started. + require.Eventually(t, func() bool { + m.mu.Lock() + defer m.mu.Unlock() + // Check if we have any in-progress writes, which indicates writing has started. + return m.inProgressWriteCount > 0 || m.isWriting.Load() + }, 5*time.Second, 5*time.Microsecond) - // wg is used to keep track of all the go routines. wg := sync.WaitGroup{} // Start a cancellable context and use that to wait. ctx, cancel := context.WithCancel(context.Background()) @@ -579,13 +932,6 @@ func TestWaitUntilSemiSyncUnblocked(t *testing.T) { require.NoError(t, err) }() - // Wait until the writes have started. - require.Eventually(t, func() bool { - m.mu.Lock() - defer m.mu.Unlock() - return m.isWriting - }, 2*time.Second, 100*time.Millisecond) - // Now we cancel the context. This should fail the first wait. cancel() // Since we cancel the context before the semi-sync has been unblocked, we expect a context timeout error. @@ -598,9 +944,9 @@ func TestWaitUntilSemiSyncUnblocked(t *testing.T) { require.EqualError(t, ctxErr, "context canceled") mu.Unlock() - // Now we set the monitor to be unblocked. - db.AddQuery(semiSyncWaitSessionsRead, sqltypes.MakeTestResult(sqltypes.MakeTestFields("variable_value", "varchar"), "0")) - close(ch) + // Now we set the monitor to be unblocked by changing the state + handler.semisyncBlocked.Store(false) + err = m.WaitUntilSemiSyncUnblocked(context.Background()) require.NoError(t, err) // This should unblock the second wait. @@ -609,7 +955,7 @@ func TestWaitUntilSemiSyncUnblocked(t *testing.T) { require.Eventually(t, func() bool { m.mu.Lock() defer m.mu.Unlock() - return !m.isWriting + return !m.isWriting.Load() }, 2*time.Second, 100*time.Millisecond) // Also verify that if the monitor is closed, we don't wait. @@ -636,10 +982,12 @@ func TestDeadlockOnClose(t *testing.T) { }, } m := NewMonitor(config, exporter) + m.actionDelay = 10 * time.Millisecond + m.actionTimeout = 1 * time.Second // Set up for semisync to be blocked db.SetNeverFail(true) - db.AddQuery(semiSyncWaitSessionsRead, sqltypes.MakeTestResult(sqltypes.MakeTestFields("variable_value", "varchar"), "1")) + db.AddQuery(fmt.Sprintf(semiSyncStatsQuery, m.actionTimeout.Milliseconds()), sqltypes.MakeTestResult(sqltypes.MakeTestFields("variable_name|variable_value", "varchar|varchar"), "Rpl_semi_sync_source_wait_sessions|1", "Rpl_semi_sync_source_yes_tx|1")) // Open the monitor m.Open() @@ -649,7 +997,7 @@ func TestDeadlockOnClose(t *testing.T) { finishCh := make(chan int) go func() { count := 100 - for i := 0; i < count; i++ { + for range count { m.Close() m.Open() time.Sleep(20 * time.Millisecond) @@ -669,11 +1017,7 @@ func TestDeadlockOnClose(t *testing.T) { // TestSemiSyncMonitor tests the semi-sync monitor as a black box. // It only calls the exported methods to see they work as intended. func TestSemiSyncMonitor(t *testing.T) { - initialVal := waitBetweenWrites - waitBetweenWrites = 250 * time.Millisecond - defer func() { - waitBetweenWrites = initialVal - }() + defer utils.EnsureNoLeaks(t) db := fakesqldb.New(t) defer db.Close() params := db.ConnParams() @@ -682,15 +1026,39 @@ func TestSemiSyncMonitor(t *testing.T) { config := &tabletenv.TabletConfig{ DB: dbc, SemiSyncMonitor: tabletenv.SemiSyncMonitorConfig{ - Interval: 1 * time.Second, + Interval: 100 * time.Millisecond, }, } m := NewMonitor(config, exporter) - defer m.Close() + m.actionDelay = 10 * time.Millisecond + m.actionTimeout = 1 * time.Second + defer func() { + m.Close() + waitUntilWritingStopped(t, m) + }() db.SetNeverFail(true) - // Initially everything is unblocked. - db.AddQuery(semiSyncWaitSessionsRead, sqltypes.MakeTestResult(sqltypes.MakeTestFields("variable_value", "varchar"), "0")) + + // Set up a custom query handler that returns different results based on state. + handler := &statefulQueryHandler{ + db: db, + semiSyncStatsQuery: fmt.Sprintf(semiSyncStatsQuery, m.actionDelay.Milliseconds()), + blockedResult: sqltypes.MakeTestResult( + sqltypes.MakeTestFields("variable_name|variable_value", "varchar|varchar"), + "Rpl_semi_sync_source_wait_sessions|1", + "Rpl_semi_sync_source_yes_tx|1"), + unblockedResult: sqltypes.MakeTestResult( + sqltypes.MakeTestFields("variable_name|variable_value", "varchar|varchar"), + "Rpl_semi_sync_source_wait_sessions|0", + "Rpl_semi_sync_source_yes_tx|0"), + } + handler.semisyncBlocked.Store(false) // Initially unblocked + db.Handler = handler + + // ExecuteFetchMulti will execute each statement separately + // Use patterns for both SET and INSERT since they can be called multiple times. + db.AddQuery("SET SESSION lock_wait_timeout=1", &sqltypes.Result{}) + db.AddQuery("INSERT INTO _vt.semisync_heartbeat (ts) VALUES (NOW())", &sqltypes.Result{}) // Open the monitor. m.Open() @@ -702,15 +1070,14 @@ func TestSemiSyncMonitor(t *testing.T) { err := m.WaitUntilSemiSyncUnblocked(ctx) require.NoError(t, err) - // Add a universal insert query pattern that would block until we make it unblock. - ch := make(chan int) - db.AddQueryPatternWithCallback("^INSERT INTO.*", sqltypes.MakeTestResult(nil), func(s string) { - <-ch - }) - // Now we set the monitor to be blocked. - db.AddQuery(semiSyncWaitSessionsRead, sqltypes.MakeTestResult(sqltypes.MakeTestFields("variable_value", "varchar"), "1")) + // Test that WaitUntilSemiSyncUnblocked works correctly when the monitor starts returning unblocked. + // We don't need to test the blocking behavior in detail since TestWaitUntilSemiSyncUnblocked covers that. + // This test just verifies the basic black-box behavior. + + // Set to blocked state. + handler.semisyncBlocked.Store(true) - // Start a go routine waiting for semi-sync being unblocked. + // Start a waiter. var waitFinished atomic.Bool go func() { err := m.WaitUntilSemiSyncUnblocked(context.Background()) @@ -718,51 +1085,12 @@ func TestSemiSyncMonitor(t *testing.T) { waitFinished.Store(true) }() - // Even if we wait a second, the wait shouldn't be over. - time.Sleep(1 * time.Second) - require.False(t, waitFinished.Load()) + // Now unblock and verify the wait completes. + handler.semisyncBlocked.Store(false) - // If we unblock the semi-sync, then the wait should finish. - db.AddQuery(semiSyncWaitSessionsRead, sqltypes.MakeTestResult(sqltypes.MakeTestFields("variable_value", "varchar"), "0")) - close(ch) require.Eventually(t, func() bool { return waitFinished.Load() - }, 2*time.Second, 100*time.Millisecond) - require.False(t, m.AllWritesBlocked()) - - // Add a universal insert query pattern that would block until we make it unblock. - ch = make(chan int) - db.AddQueryPatternWithCallback("^INSERT INTO.*", sqltypes.MakeTestResult(nil), func(s string) { - <-ch - }) - // We block the semi-sync again. - db.AddQuery(semiSyncWaitSessionsRead, sqltypes.MakeTestResult(sqltypes.MakeTestFields("variable_value", "varchar"), "1")) - - // Start another go routine, also waiting for semi-sync being unblocked. - waitFinished.Store(false) - go func() { - err := m.WaitUntilSemiSyncUnblocked(context.Background()) - require.NoError(t, err) - waitFinished.Store(true) - }() - - // Since the writes are now blocking, eventually all the writes should block. - require.Eventually(t, func() bool { - return m.AllWritesBlocked() - }, 10*time.Second, 100*time.Millisecond) - - // The wait should still not have ended. - require.False(t, waitFinished.Load()) - - // Now we unblock the writes and semi-sync. - close(ch) - db.AddQuery(semiSyncWaitSessionsRead, sqltypes.MakeTestResult(sqltypes.MakeTestFields("variable_value", "varchar"), "0")) - - // The wait should now finish. - require.Eventually(t, func() bool { - return waitFinished.Load() - }, 2*time.Second, 100*time.Millisecond) - require.False(t, m.AllWritesBlocked()) + }, 5*time.Second, 100*time.Millisecond) // Close the monitor. m.Close() @@ -770,6 +1098,31 @@ func TestSemiSyncMonitor(t *testing.T) { require.Eventually(t, func() bool { m.mu.Lock() defer m.mu.Unlock() - return !m.isWriting + return !m.isWriting.Load() }, 2*time.Second, 100*time.Millisecond) } + +// waitUntilWritingStopped is a utility functions that waits until all the go-routines of a semi-sync monitor +// have stopped. This is useful to prevent data-race errors. After a monitor has been closed, it stops writing +// in the next check of stillBlocked. +func waitUntilWritingStopped(t *testing.T, m *Monitor) { + t.Helper() + ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) + defer cancel() + tick := time.NewTicker(100 * time.Millisecond) + defer tick.Stop() + + for { + select { + case <-ctx.Done(): + t.Fatalf("Timed out waiting for writing to stop: %v", ctx.Err()) + case <-tick.C: + m.mu.Lock() + if !m.isWriting.Load() { + m.mu.Unlock() + return + } + m.mu.Unlock() + } + } +}