Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions go/vt/vttablet/tabletmanager/rpc_replication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func TestDemotePrimaryWaitingForSemiSyncUnblock(t *testing.T) {
<-ch
})
// Add a fake query that makes the semi-sync monitor believe that the tablet is blocked on semi-sync ACKs.
fakeDb.AddQuery("select variable_value from performance_schema.global_status where regexp_like(variable_name, 'Rpl_semi_sync_(source|master)_wait_sessions')", sqltypes.MakeTestResult(sqltypes.MakeTestFields("Variable_name|Value", "varchar|varchar"), "Rpl_semi_sync_source_wait_sessions|1"))
fakeDb.AddQuery("select variable_value from performance_schema.global_status where regexp_like(variable_name, 'Rpl_semi_sync_(source|master)_wait_sessions')", sqltypes.MakeTestResult(sqltypes.MakeTestFields("Variable_value", "varchar"), "1"))

// Verify that in the beginning the tablet is serving.
require.True(t, tm.QueryServiceControl.IsServing())
Expand All @@ -159,7 +159,7 @@ func TestDemotePrimaryWaitingForSemiSyncUnblock(t *testing.T) {
require.False(t, fakeMysqlDaemon.SuperReadOnly.Load())

// Now we unblock the semi-sync monitor.
fakeDb.AddQuery("select variable_value from performance_schema.global_status where regexp_like(variable_name, 'Rpl_semi_sync_(source|master)_wait_sessions')", sqltypes.MakeTestResult(sqltypes.MakeTestFields("Variable_name|Value", "varchar|varchar"), "Rpl_semi_sync_source_wait_sessions|0"))
fakeDb.AddQuery("select variable_value from performance_schema.global_status where regexp_like(variable_name, 'Rpl_semi_sync_(source|master)_wait_sessions')", sqltypes.MakeTestResult(sqltypes.MakeTestFields("Variable_value", "varchar"), "0"))
close(ch)

// This should unblock the demote primary operation eventually.
Expand Down
8 changes: 4 additions & 4 deletions go/vt/vttablet/tabletmanager/semisyncmonitor/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package semisyncmonitor

import (
"context"
"errors"
"fmt"
"sync"
"time"

Expand Down Expand Up @@ -205,10 +205,10 @@ func (m *Monitor) isSemiSyncBlocked(ctx context.Context) (bool, error) {
}

// Read the status value and check if it is non-zero.
if len(res.Rows) != 1 || len(res.Rows[0]) != 2 {
return false, errors.New("unexpected number of rows received")
if len(res.Rows) != 1 || len(res.Rows[0]) != 1 {
return false, fmt.Errorf("unexpected number of rows received - %v", res.Rows)
}
value, err := res.Rows[0][1].ToCastInt64()
value, err := res.Rows[0][0].ToCastInt64()
return value != 0, err
}

Expand Down
32 changes: 16 additions & 16 deletions go/vt/vttablet/tabletmanager/semisyncmonitor/monitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,22 +73,22 @@ func TestMonitorIsSemiSyncBlocked(t *testing.T) {
},
{
name: "incorrect number of rows",
res: sqltypes.MakeTestResult(sqltypes.MakeTestFields("Variable_name|Value", "varchar|varchar"), "Rpl_semi_sync_source_wait_sessions|1", "Rpl_semi_sync_master_wait_sessions|1"),
res: sqltypes.MakeTestResult(sqltypes.MakeTestFields("variable_value", "varchar"), "1", "1"),
wantErr: "Row count exceeded 1",
},
{
name: "incorrect number of fields",
res: sqltypes.MakeTestResult(sqltypes.MakeTestFields("Variable_name|Value|a", "varchar|varchar|int"), "Rpl_semi_sync_source_wait_sessions|1|2"),
wantErr: "unexpected number of rows received",
res: sqltypes.MakeTestResult(sqltypes.MakeTestFields("variable_value|a", "varchar|varchar"), "1|2"),
wantErr: `unexpected number of rows received - [[VARCHAR("1") VARCHAR("2")]]`,
},
{
name: "Unblocked",
res: sqltypes.MakeTestResult(sqltypes.MakeTestFields("Variable_name|Value", "varchar|varchar"), "Rpl_semi_sync_source_wait_sessions|0"),
res: sqltypes.MakeTestResult(sqltypes.MakeTestFields("variable_value", "varchar"), "0"),
want: false,
},
{
name: "Blocked",
res: sqltypes.MakeTestResult(sqltypes.MakeTestFields("Variable_name|Value", "varchar|varchar"), "Rpl_semi_sync_source_wait_sessions|1"),
res: sqltypes.MakeTestResult(sqltypes.MakeTestFields("variable_value", "varchar"), "1"),
want: true,
},
}
Expand Down Expand Up @@ -479,7 +479,7 @@ func TestCheckAndFixSemiSyncBlocked(t *testing.T) {
defer m.Close()

// Initially everything is unblocked.
db.AddQuery(semiSyncWaitSessionsRead, sqltypes.MakeTestResult(sqltypes.MakeTestFields("Variable_name|Value", "varchar|varchar"), "Rpl_semi_sync_source_wait_sessions|0"))
db.AddQuery(semiSyncWaitSessionsRead, sqltypes.MakeTestResult(sqltypes.MakeTestFields("variable_value", "varchar"), "0"))
// Add a universal insert query pattern that would block until we make it unblock.
ch := make(chan int)
db.AddQueryPatternWithCallback("^INSERT INTO.*", sqltypes.MakeTestResult(nil), func(s string) {
Expand All @@ -493,7 +493,7 @@ func TestCheckAndFixSemiSyncBlocked(t *testing.T) {
m.mu.Unlock()

// Now we set the monitor to be blocked.
db.AddQuery(semiSyncWaitSessionsRead, sqltypes.MakeTestResult(sqltypes.MakeTestFields("Variable_name|Value", "varchar|varchar"), "Rpl_semi_sync_source_wait_sessions|2"))
db.AddQuery(semiSyncWaitSessionsRead, sqltypes.MakeTestResult(sqltypes.MakeTestFields("variable_value", "varchar"), "2"))
m.checkAndFixSemiSyncBlocked()

m.mu.Lock()
Expand All @@ -514,7 +514,7 @@ func TestCheckAndFixSemiSyncBlocked(t *testing.T) {
}, 2*time.Second, 100*time.Millisecond)

// Now we set the monitor to be unblocked.
db.AddQuery(semiSyncWaitSessionsRead, sqltypes.MakeTestResult(sqltypes.MakeTestFields("Variable_name|Value", "varchar|varchar"), "Rpl_semi_sync_source_wait_sessions|0"))
db.AddQuery(semiSyncWaitSessionsRead, sqltypes.MakeTestResult(sqltypes.MakeTestFields("variable_value", "varchar"), "0"))
close(ch)
m.checkAndFixSemiSyncBlocked()

Expand All @@ -541,7 +541,7 @@ func TestWaitUntilSemiSyncUnblocked(t *testing.T) {

db.SetNeverFail(true)
// Initially everything is unblocked.
db.AddQuery(semiSyncWaitSessionsRead, sqltypes.MakeTestResult(sqltypes.MakeTestFields("Variable_name|Value", "varchar|varchar"), "Rpl_semi_sync_source_wait_sessions|0"))
db.AddQuery(semiSyncWaitSessionsRead, sqltypes.MakeTestResult(sqltypes.MakeTestFields("variable_value", "varchar"), "0"))

// When everything is unblocked, then this should return without blocking.
err := m.WaitUntilSemiSyncUnblocked(context.Background())
Expand All @@ -553,7 +553,7 @@ func TestWaitUntilSemiSyncUnblocked(t *testing.T) {
<-ch
})
// Now we set the monitor to be blocked.
db.AddQuery(semiSyncWaitSessionsRead, sqltypes.MakeTestResult(sqltypes.MakeTestFields("Variable_name|Value", "varchar|varchar"), "Rpl_semi_sync_source_wait_sessions|3"))
db.AddQuery(semiSyncWaitSessionsRead, sqltypes.MakeTestResult(sqltypes.MakeTestFields("variable_value", "varchar"), "3"))

// wg is used to keep track of all the go routines.
wg := sync.WaitGroup{}
Expand Down Expand Up @@ -599,7 +599,7 @@ func TestWaitUntilSemiSyncUnblocked(t *testing.T) {
mu.Unlock()

// Now we set the monitor to be unblocked.
db.AddQuery(semiSyncWaitSessionsRead, sqltypes.MakeTestResult(sqltypes.MakeTestFields("Variable_name|Value", "varchar|varchar"), "Rpl_semi_sync_source_wait_sessions|0"))
db.AddQuery(semiSyncWaitSessionsRead, sqltypes.MakeTestResult(sqltypes.MakeTestFields("variable_value", "varchar"), "0"))
close(ch)
err = m.WaitUntilSemiSyncUnblocked(context.Background())
require.NoError(t, err)
Expand Down Expand Up @@ -643,7 +643,7 @@ func TestSemiSyncMonitor(t *testing.T) {

db.SetNeverFail(true)
// Initially everything is unblocked.
db.AddQuery(semiSyncWaitSessionsRead, sqltypes.MakeTestResult(sqltypes.MakeTestFields("Variable_name|Value", "varchar|varchar"), "Rpl_semi_sync_source_wait_sessions|0"))
db.AddQuery(semiSyncWaitSessionsRead, sqltypes.MakeTestResult(sqltypes.MakeTestFields("variable_value", "varchar"), "0"))

// Open the monitor.
m.Open()
Expand All @@ -661,7 +661,7 @@ func TestSemiSyncMonitor(t *testing.T) {
<-ch
})
// Now we set the monitor to be blocked.
db.AddQuery(semiSyncWaitSessionsRead, sqltypes.MakeTestResult(sqltypes.MakeTestFields("Variable_name|Value", "varchar|varchar"), "Rpl_semi_sync_source_wait_sessions|1"))
db.AddQuery(semiSyncWaitSessionsRead, sqltypes.MakeTestResult(sqltypes.MakeTestFields("variable_value", "varchar"), "1"))

// Start a go routine waiting for semi-sync being unblocked.
var waitFinished atomic.Bool
Expand All @@ -676,7 +676,7 @@ func TestSemiSyncMonitor(t *testing.T) {
require.False(t, waitFinished.Load())

// If we unblock the semi-sync, then the wait should finish.
db.AddQuery(semiSyncWaitSessionsRead, sqltypes.MakeTestResult(sqltypes.MakeTestFields("Variable_name|Value", "varchar|varchar"), "Rpl_semi_sync_source_wait_sessions|0"))
db.AddQuery(semiSyncWaitSessionsRead, sqltypes.MakeTestResult(sqltypes.MakeTestFields("variable_value", "varchar"), "0"))
close(ch)
require.Eventually(t, func() bool {
return waitFinished.Load()
Expand All @@ -689,7 +689,7 @@ func TestSemiSyncMonitor(t *testing.T) {
<-ch
})
// We block the semi-sync again.
db.AddQuery(semiSyncWaitSessionsRead, sqltypes.MakeTestResult(sqltypes.MakeTestFields("Variable_name|Value", "varchar|varchar"), "Rpl_semi_sync_source_wait_sessions|1"))
db.AddQuery(semiSyncWaitSessionsRead, sqltypes.MakeTestResult(sqltypes.MakeTestFields("variable_value", "varchar"), "1"))

// Start another go routine, also waiting for semi-sync being unblocked.
waitFinished.Store(false)
Expand All @@ -709,7 +709,7 @@ func TestSemiSyncMonitor(t *testing.T) {

// Now we unblock the writes and semi-sync.
close(ch)
db.AddQuery(semiSyncWaitSessionsRead, sqltypes.MakeTestResult(sqltypes.MakeTestFields("Variable_name|Value", "varchar|varchar"), "Rpl_semi_sync_source_wait_sessions|0"))
db.AddQuery(semiSyncWaitSessionsRead, sqltypes.MakeTestResult(sqltypes.MakeTestFields("variable_value", "varchar"), "0"))

// The wait should now finish.
require.Eventually(t, func() bool {
Expand Down
Loading