diff --git a/go/vt/vttablet/tabletmanager/rpc_replication_test.go b/go/vt/vttablet/tabletmanager/rpc_replication_test.go index 98600b5f9b5..900e3581499 100644 --- a/go/vt/vttablet/tabletmanager/rpc_replication_test.go +++ b/go/vt/vttablet/tabletmanager/rpc_replication_test.go @@ -134,7 +134,7 @@ func TestDemotePrimaryWaitingForSemiSyncUnblock(t *testing.T) { <-ch }) // Add a fake query that makes the semi-sync monitor believe that the tablet is blocked on semi-sync ACKs. - fakeDb.AddQuery("select variable_value from performance_schema.global_status where regexp_like(variable_name, 'Rpl_semi_sync_(source|master)_wait_sessions')", sqltypes.MakeTestResult(sqltypes.MakeTestFields("Variable_name|Value", "varchar|varchar"), "Rpl_semi_sync_source_wait_sessions|1")) + fakeDb.AddQuery("select variable_value from performance_schema.global_status where regexp_like(variable_name, 'Rpl_semi_sync_(source|master)_wait_sessions')", sqltypes.MakeTestResult(sqltypes.MakeTestFields("Variable_value", "varchar"), "1")) // Verify that in the beginning the tablet is serving. require.True(t, tm.QueryServiceControl.IsServing()) @@ -159,7 +159,7 @@ func TestDemotePrimaryWaitingForSemiSyncUnblock(t *testing.T) { require.False(t, fakeMysqlDaemon.SuperReadOnly.Load()) // Now we unblock the semi-sync monitor. - fakeDb.AddQuery("select variable_value from performance_schema.global_status where regexp_like(variable_name, 'Rpl_semi_sync_(source|master)_wait_sessions')", sqltypes.MakeTestResult(sqltypes.MakeTestFields("Variable_name|Value", "varchar|varchar"), "Rpl_semi_sync_source_wait_sessions|0")) + fakeDb.AddQuery("select variable_value from performance_schema.global_status where regexp_like(variable_name, 'Rpl_semi_sync_(source|master)_wait_sessions')", sqltypes.MakeTestResult(sqltypes.MakeTestFields("Variable_value", "varchar"), "0")) close(ch) // This should unblock the demote primary operation eventually. diff --git a/go/vt/vttablet/tabletmanager/semisyncmonitor/monitor.go b/go/vt/vttablet/tabletmanager/semisyncmonitor/monitor.go index 0ea37e29284..0bc04e1fd39 100644 --- a/go/vt/vttablet/tabletmanager/semisyncmonitor/monitor.go +++ b/go/vt/vttablet/tabletmanager/semisyncmonitor/monitor.go @@ -18,7 +18,7 @@ package semisyncmonitor import ( "context" - "errors" + "fmt" "sync" "time" @@ -205,10 +205,10 @@ func (m *Monitor) isSemiSyncBlocked(ctx context.Context) (bool, error) { } // Read the status value and check if it is non-zero. - if len(res.Rows) != 1 || len(res.Rows[0]) != 2 { - return false, errors.New("unexpected number of rows received") + if len(res.Rows) != 1 || len(res.Rows[0]) != 1 { + return false, fmt.Errorf("unexpected number of rows received - %v", res.Rows) } - value, err := res.Rows[0][1].ToCastInt64() + value, err := res.Rows[0][0].ToCastInt64() return value != 0, err } diff --git a/go/vt/vttablet/tabletmanager/semisyncmonitor/monitor_test.go b/go/vt/vttablet/tabletmanager/semisyncmonitor/monitor_test.go index 97716569563..83bf2c2259c 100644 --- a/go/vt/vttablet/tabletmanager/semisyncmonitor/monitor_test.go +++ b/go/vt/vttablet/tabletmanager/semisyncmonitor/monitor_test.go @@ -73,22 +73,22 @@ func TestMonitorIsSemiSyncBlocked(t *testing.T) { }, { name: "incorrect number of rows", - res: sqltypes.MakeTestResult(sqltypes.MakeTestFields("Variable_name|Value", "varchar|varchar"), "Rpl_semi_sync_source_wait_sessions|1", "Rpl_semi_sync_master_wait_sessions|1"), + res: sqltypes.MakeTestResult(sqltypes.MakeTestFields("variable_value", "varchar"), "1", "1"), wantErr: "Row count exceeded 1", }, { name: "incorrect number of fields", - res: sqltypes.MakeTestResult(sqltypes.MakeTestFields("Variable_name|Value|a", "varchar|varchar|int"), "Rpl_semi_sync_source_wait_sessions|1|2"), - wantErr: "unexpected number of rows received", + res: sqltypes.MakeTestResult(sqltypes.MakeTestFields("variable_value|a", "varchar|varchar"), "1|2"), + wantErr: `unexpected number of rows received - [[VARCHAR("1") VARCHAR("2")]]`, }, { name: "Unblocked", - res: sqltypes.MakeTestResult(sqltypes.MakeTestFields("Variable_name|Value", "varchar|varchar"), "Rpl_semi_sync_source_wait_sessions|0"), + res: sqltypes.MakeTestResult(sqltypes.MakeTestFields("variable_value", "varchar"), "0"), want: false, }, { name: "Blocked", - res: sqltypes.MakeTestResult(sqltypes.MakeTestFields("Variable_name|Value", "varchar|varchar"), "Rpl_semi_sync_source_wait_sessions|1"), + res: sqltypes.MakeTestResult(sqltypes.MakeTestFields("variable_value", "varchar"), "1"), want: true, }, } @@ -479,7 +479,7 @@ func TestCheckAndFixSemiSyncBlocked(t *testing.T) { defer m.Close() // Initially everything is unblocked. - db.AddQuery(semiSyncWaitSessionsRead, sqltypes.MakeTestResult(sqltypes.MakeTestFields("Variable_name|Value", "varchar|varchar"), "Rpl_semi_sync_source_wait_sessions|0")) + db.AddQuery(semiSyncWaitSessionsRead, sqltypes.MakeTestResult(sqltypes.MakeTestFields("variable_value", "varchar"), "0")) // Add a universal insert query pattern that would block until we make it unblock. ch := make(chan int) db.AddQueryPatternWithCallback("^INSERT INTO.*", sqltypes.MakeTestResult(nil), func(s string) { @@ -493,7 +493,7 @@ func TestCheckAndFixSemiSyncBlocked(t *testing.T) { m.mu.Unlock() // Now we set the monitor to be blocked. - db.AddQuery(semiSyncWaitSessionsRead, sqltypes.MakeTestResult(sqltypes.MakeTestFields("Variable_name|Value", "varchar|varchar"), "Rpl_semi_sync_source_wait_sessions|2")) + db.AddQuery(semiSyncWaitSessionsRead, sqltypes.MakeTestResult(sqltypes.MakeTestFields("variable_value", "varchar"), "2")) m.checkAndFixSemiSyncBlocked() m.mu.Lock() @@ -514,7 +514,7 @@ func TestCheckAndFixSemiSyncBlocked(t *testing.T) { }, 2*time.Second, 100*time.Millisecond) // Now we set the monitor to be unblocked. - db.AddQuery(semiSyncWaitSessionsRead, sqltypes.MakeTestResult(sqltypes.MakeTestFields("Variable_name|Value", "varchar|varchar"), "Rpl_semi_sync_source_wait_sessions|0")) + db.AddQuery(semiSyncWaitSessionsRead, sqltypes.MakeTestResult(sqltypes.MakeTestFields("variable_value", "varchar"), "0")) close(ch) m.checkAndFixSemiSyncBlocked() @@ -541,7 +541,7 @@ func TestWaitUntilSemiSyncUnblocked(t *testing.T) { db.SetNeverFail(true) // Initially everything is unblocked. - db.AddQuery(semiSyncWaitSessionsRead, sqltypes.MakeTestResult(sqltypes.MakeTestFields("Variable_name|Value", "varchar|varchar"), "Rpl_semi_sync_source_wait_sessions|0")) + db.AddQuery(semiSyncWaitSessionsRead, sqltypes.MakeTestResult(sqltypes.MakeTestFields("variable_value", "varchar"), "0")) // When everything is unblocked, then this should return without blocking. err := m.WaitUntilSemiSyncUnblocked(context.Background()) @@ -553,7 +553,7 @@ func TestWaitUntilSemiSyncUnblocked(t *testing.T) { <-ch }) // Now we set the monitor to be blocked. - db.AddQuery(semiSyncWaitSessionsRead, sqltypes.MakeTestResult(sqltypes.MakeTestFields("Variable_name|Value", "varchar|varchar"), "Rpl_semi_sync_source_wait_sessions|3")) + db.AddQuery(semiSyncWaitSessionsRead, sqltypes.MakeTestResult(sqltypes.MakeTestFields("variable_value", "varchar"), "3")) // wg is used to keep track of all the go routines. wg := sync.WaitGroup{} @@ -599,7 +599,7 @@ func TestWaitUntilSemiSyncUnblocked(t *testing.T) { mu.Unlock() // Now we set the monitor to be unblocked. - db.AddQuery(semiSyncWaitSessionsRead, sqltypes.MakeTestResult(sqltypes.MakeTestFields("Variable_name|Value", "varchar|varchar"), "Rpl_semi_sync_source_wait_sessions|0")) + db.AddQuery(semiSyncWaitSessionsRead, sqltypes.MakeTestResult(sqltypes.MakeTestFields("variable_value", "varchar"), "0")) close(ch) err = m.WaitUntilSemiSyncUnblocked(context.Background()) require.NoError(t, err) @@ -643,7 +643,7 @@ func TestSemiSyncMonitor(t *testing.T) { db.SetNeverFail(true) // Initially everything is unblocked. - db.AddQuery(semiSyncWaitSessionsRead, sqltypes.MakeTestResult(sqltypes.MakeTestFields("Variable_name|Value", "varchar|varchar"), "Rpl_semi_sync_source_wait_sessions|0")) + db.AddQuery(semiSyncWaitSessionsRead, sqltypes.MakeTestResult(sqltypes.MakeTestFields("variable_value", "varchar"), "0")) // Open the monitor. m.Open() @@ -661,7 +661,7 @@ func TestSemiSyncMonitor(t *testing.T) { <-ch }) // Now we set the monitor to be blocked. - db.AddQuery(semiSyncWaitSessionsRead, sqltypes.MakeTestResult(sqltypes.MakeTestFields("Variable_name|Value", "varchar|varchar"), "Rpl_semi_sync_source_wait_sessions|1")) + db.AddQuery(semiSyncWaitSessionsRead, sqltypes.MakeTestResult(sqltypes.MakeTestFields("variable_value", "varchar"), "1")) // Start a go routine waiting for semi-sync being unblocked. var waitFinished atomic.Bool @@ -676,7 +676,7 @@ func TestSemiSyncMonitor(t *testing.T) { require.False(t, waitFinished.Load()) // If we unblock the semi-sync, then the wait should finish. - db.AddQuery(semiSyncWaitSessionsRead, sqltypes.MakeTestResult(sqltypes.MakeTestFields("Variable_name|Value", "varchar|varchar"), "Rpl_semi_sync_source_wait_sessions|0")) + db.AddQuery(semiSyncWaitSessionsRead, sqltypes.MakeTestResult(sqltypes.MakeTestFields("variable_value", "varchar"), "0")) close(ch) require.Eventually(t, func() bool { return waitFinished.Load() @@ -689,7 +689,7 @@ func TestSemiSyncMonitor(t *testing.T) { <-ch }) // We block the semi-sync again. - db.AddQuery(semiSyncWaitSessionsRead, sqltypes.MakeTestResult(sqltypes.MakeTestFields("Variable_name|Value", "varchar|varchar"), "Rpl_semi_sync_source_wait_sessions|1")) + db.AddQuery(semiSyncWaitSessionsRead, sqltypes.MakeTestResult(sqltypes.MakeTestFields("variable_value", "varchar"), "1")) // Start another go routine, also waiting for semi-sync being unblocked. waitFinished.Store(false) @@ -709,7 +709,7 @@ func TestSemiSyncMonitor(t *testing.T) { // Now we unblock the writes and semi-sync. close(ch) - db.AddQuery(semiSyncWaitSessionsRead, sqltypes.MakeTestResult(sqltypes.MakeTestFields("Variable_name|Value", "varchar|varchar"), "Rpl_semi_sync_source_wait_sessions|0")) + db.AddQuery(semiSyncWaitSessionsRead, sqltypes.MakeTestResult(sqltypes.MakeTestFields("variable_value", "varchar"), "0")) // The wait should now finish. require.Eventually(t, func() bool {