Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions go/test/endtoend/cluster/vttablet_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,20 @@ func (vttablet *VttabletProcess) TearDown() error {
return vttablet.TearDownWithTimeout(vttabletStateTimeout)
}

func (vttablet *VttabletProcess) Stop() {
if vttablet.proc == nil || vttablet.exit == nil {
return
}
vttablet.proc.Process.Signal(syscall.SIGSTOP)
}

func (vttablet *VttabletProcess) Resume() {
if vttablet.proc == nil || vttablet.exit == nil {
return
}
vttablet.proc.Process.Signal(syscall.SIGCONT)
}

// Kill shuts down the running vttablet service immediately.
func (vttablet *VttabletProcess) Kill() error {
if vttablet.proc == nil || vttablet.exit == nil {
Expand Down
129 changes: 129 additions & 0 deletions go/test/endtoend/reparent/emergencyreparent/ers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,20 @@ package emergencyreparent
import (
"context"
"os/exec"
"sync"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/test/endtoend/cluster"
"vitess.io/vitess/go/test/endtoend/reparent/utils"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/vtctl/reparentutil/policy"

topodatapb "vitess.io/vitess/go/vt/proto/topodata"
)

func TestTrivialERS(t *testing.T) {
Expand Down Expand Up @@ -131,6 +135,131 @@ func TestReparentDownPrimary(t *testing.T) {
utils.ResurrectTablet(ctx, t, clusterInstance, tablets[0])
}

func TestEmergencyReparentWithBlockedPrimary(t *testing.T) {
clusterInstance := utils.SetupReparentCluster(t, policy.DurabilitySemiSync)
defer utils.TeardownCluster(clusterInstance)

if clusterInstance.VtTabletMajorVersion < 24 {
t.Skip("Skipping test since `DemotePrimary` on earlier versions does not handle blocked primaries correctly")
}

// start vtgate w/disabled buffering
clusterInstance.VtGateExtraArgs = append(clusterInstance.VtGateExtraArgs,
"--enable_buffer=false",
"--query-timeout", "3000")
err := clusterInstance.StartVtgate()
require.NoError(t, err)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

conn, err := mysql.Connect(ctx, &mysql.ConnParams{
Host: clusterInstance.Hostname,
Port: clusterInstance.VtgateMySQLPort,
})
require.NoError(t, err)
defer conn.Close()

_, err = conn.ExecuteFetch("CREATE TABLE test (id INT PRIMARY KEY, msg VARCHAR(64))", 0, false)
require.NoError(t, err)

tablets := clusterInstance.Keyspaces[0].Shards[0].Vttablets

// Simulate no semi-sync replicas being available by disabling semi-sync on all replicas
for _, tablet := range tablets[1:] {
utils.RunSQL(ctx, t, "STOP REPLICA IO_THREAD", tablet)

// Disable semi-sync on replicas to simulate blocking
semisyncType, err := utils.SemiSyncExtensionLoaded(context.Background(), tablet)
require.NoError(t, err)
switch semisyncType {
case mysql.SemiSyncTypeSource:
utils.RunSQL(context.Background(), t, "SET GLOBAL rpl_semi_sync_replica_enabled = false", tablet)
case mysql.SemiSyncTypeMaster:
utils.RunSQL(context.Background(), t, "SET GLOBAL rpl_semi_sync_slave_enabled = false", tablet)
}

utils.RunSQL(context.Background(), t, "START REPLICA IO_THREAD", tablet)
}

// Try performing a write and ensure that it blocks.
writeSQL := `insert into test(id, msg) values (1, 'test 1')`
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()

// Attempt writing via vtgate against the primary. This should block (because there's no replicas to ack the semi-sync),
// and fail on the vtgate query timeout. Async replicas will still receive this write (probably), because it is written
// to the PRIMARY binlog even when no ackers exist. This means we need to disable the vtgate buffer (above), because it
// will attempt the write on the promoted, unblocked primary - and this will hit a dupe key error.
_, err := conn.ExecuteFetch(writeSQL, 0, false)
require.ErrorContains(t, err, "context deadline exceeded (errno 1317) (sqlstate 70100) during query: "+writeSQL)

// Verify vtgate really processed the insert in case something unrelated caused the deadline exceeded.
vtgateVars := clusterInstance.VtgateProcess.GetVars()
require.NotNil(t, vtgateVars)
require.NotNil(t, vtgateVars["QueryRoutes"])
require.NotNil(t, vtgateVars["VtgateApiErrorCounts"])
require.EqualValues(t, map[string]interface{}{
"DDL.DirectDDL.PRIMARY": float64(1),
"INSERT.Passthrough.PRIMARY": float64(1),
}, vtgateVars["QueryRoutes"])
require.EqualValues(t, map[string]interface{}{
"Execute.ks.primary.DEADLINE_EXCEEDED": float64(1),
}, vtgateVars["VtgateApiErrorCounts"])
}()

wg.Add(1)
waitReplicasTimeout := time.Second * 10
go func() {
defer wg.Done()

// Ensure the write (other goroutine above) is blocked waiting on ACKs on the primary.
utils.WaitForQueryWithStateInProcesslist(context.Background(), t, tablets[0], writeSQL, "Waiting for semi-sync ACK from replica", time.Second*20)

// Send SIGSTOP to primary to simulate it being unresponsive.
tablets[0].VttabletProcess.Stop()

// Run forced reparent operation, this should now proceed unimpeded.
out, err := utils.Ers(clusterInstance, tablets[1], "15s", waitReplicasTimeout.String())
require.NoError(t, err, out)
}()

wg.Wait()

// We need to wait at least 10 seconds here to ensure the wait-for-replicas-timeout has passed,
// before we resume the old primary - otherwise the old primary will receive a `SetReplicationSource` call.
time.Sleep(waitReplicasTimeout * 2)

// Bring back the demoted primary
tablets[0].VttabletProcess.Resume()

// Give the old primary some time to realize it's no longer the primary,
// and for a new primary to be promoted.
require.EventuallyWithT(t, func(c *assert.CollectT) {
// Ensure the old primary was demoted correctly
tabletInfo, err := clusterInstance.VtctldClientProcess.GetTablet(tablets[0].Alias)
require.NoError(c, err)

// The old primary should have noticed there's a new primary tablet now and should
// have demoted itself to REPLICA.
require.Equal(c, topodatapb.TabletType_REPLICA, tabletInfo.GetType())

// The old primary should be in not serving mode because we should be unable to re-attach it
// as a replica due to the errant GTID caused by semi-sync writes that were never replicated out.
//
// Note: The writes that were not replicated were caused by the semi sync unblocker, which
// performed writes after ERS.
require.Equal(c, "NOT_SERVING", tablets[0].VttabletProcess.GetTabletStatus())
require.Equal(c, "replica", tablets[0].VttabletProcess.GetTabletType())

// Check the 2nd tablet becomes PRIMARY.
require.Equal(c, "SERVING", tablets[1].VttabletProcess.GetTabletStatus())
require.Equal(c, "primary", tablets[1].VttabletProcess.GetTabletType())
}, 30*time.Second, time.Second, "could not validate primary was demoted")
}

func TestReparentNoChoiceDownPrimary(t *testing.T) {
clusterInstance := utils.SetupReparentCluster(t, policy.DurabilitySemiSync)
defer utils.TeardownCluster(clusterInstance)
Expand Down
29 changes: 24 additions & 5 deletions go/test/endtoend/reparent/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,15 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

querypb "vitess.io/vitess/go/vt/proto/query"
"vitess.io/vitess/go/vt/utils"
"vitess.io/vitess/go/vt/vtctl/reparentutil/policy"
"vitess.io/vitess/go/vt/vttablet/tabletconn"

"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/test/endtoend/cluster"
"vitess.io/vitess/go/vt/log"
querypb "vitess.io/vitess/go/vt/proto/query"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/utils"
"vitess.io/vitess/go/vt/vtctl/reparentutil/policy"
"vitess.io/vitess/go/vt/vttablet/tabletconn"
)

var (
Expand Down Expand Up @@ -842,6 +841,7 @@ func CheckReplicationStatus(ctx context.Context, t *testing.T, tablet *cluster.V
}
}

// WaitForTabletToBeServing waits for a tablet to reach a serving state.
func WaitForTabletToBeServing(ctx context.Context, t *testing.T, clusterInstance *cluster.LocalProcessCluster, tablet *cluster.Vttablet, timeout time.Duration) {
vTablet, err := clusterInstance.VtctldClientProcess.GetTablet(tablet.Alias)
require.NoError(t, err)
Expand All @@ -862,3 +862,22 @@ func WaitForTabletToBeServing(ctx context.Context, t *testing.T, clusterInstance
t.Fatal(err.Error())
}
}

// WaitForQueryWithStateInProcesslist waits for a query to be present in the processlist with a specific state.
func WaitForQueryWithStateInProcesslist(ctx context.Context, t *testing.T, tablet *cluster.Vttablet, sql, state string, timeout time.Duration) {
require.Eventually(t, func() bool {
qr := RunSQL(ctx, t, "select Command, State, Info from information_schema.processlist", tablet)
for _, row := range qr.Rows {
if len(row) != 3 {
continue
}
if strings.EqualFold(row[0].ToString(), "Query") {
continue
}
if strings.EqualFold(row[1].ToString(), state) && strings.EqualFold(row[2].ToString(), sql) {
return true
}
}
return false
}, timeout, time.Second, "query with state not in processlist")
}
5 changes: 5 additions & 0 deletions go/vt/mysqlctl/fakemysqldaemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -768,6 +768,11 @@ func (fmd *FakeMysqlDaemon) SemiSyncReplicationStatus(ctx context.Context) (bool
return fmd.SemiSyncReplicaEnabled, nil
}

// IsSemiSyncBlocked is part of the MysqlDaemon interface.
func (fmd *FakeMysqlDaemon) IsSemiSyncBlocked(ctx context.Context) (bool, error) {
return false, nil
}

// GetVersionString is part of the MysqlDaemon interface.
func (fmd *FakeMysqlDaemon) GetVersionString(ctx context.Context) (string, error) {
return fmd.Version, nil
Expand Down
1 change: 1 addition & 0 deletions go/vt/mysqlctl/mysql_daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ type MysqlDaemon interface {
SemiSyncClients(ctx context.Context) (count uint32)
SemiSyncSettings(ctx context.Context) (timeout uint64, numReplicas uint32)
SemiSyncReplicationStatus(ctx context.Context) (bool, error)
IsSemiSyncBlocked(ctx context.Context) (bool, error)
ResetReplicationParameters(ctx context.Context) error
GetBinlogInformation(ctx context.Context) (binlogFormat string, logEnabled bool, logReplicaUpdate bool, binlogRowImage string, err error)
GetGTIDMode(ctx context.Context) (gtidMode string, err error)
Expand Down
27 changes: 27 additions & 0 deletions go/vt/mysqlctl/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -819,3 +819,30 @@ func (mysqld *Mysqld) SemiSyncExtensionLoaded(ctx context.Context) (mysql.SemiSy

return conn.Conn.SemiSyncExtensionLoaded()
}

func (mysqld *Mysqld) IsSemiSyncBlocked(ctx context.Context) (bool, error) {
conn, err := getPoolReconnect(ctx, mysqld.dbaPool)
if err != nil {
return false, err
}
defer conn.Recycle()

// Execute the query to check if the primary is blocked on semi-sync.
semiSyncWaitSessionsRead := "select variable_value from performance_schema.global_status where regexp_like(variable_name, 'Rpl_semi_sync_(source|master)_wait_sessions')"
res, err := conn.Conn.ExecuteFetch(semiSyncWaitSessionsRead, 1, false)
if err != nil {
return false, err
}
// If we have no rows, then the primary doesn't have semi-sync enabled.
// It then follows, that the primary isn't blocked :)
if len(res.Rows) == 0 {
return false, nil
}

// Read the status value and check if it is non-zero.
if len(res.Rows) != 1 || len(res.Rows[0]) != 1 {
return false, fmt.Errorf("unexpected number of rows received - %v", res.Rows)
}
value, err := res.Rows[0][0].ToCastInt64()
return value != 0, err
}
13 changes: 11 additions & 2 deletions go/vt/proto/tabletmanagerdata/tabletmanagerdata.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

34 changes: 34 additions & 0 deletions go/vt/proto/tabletmanagerdata/tabletmanagerdata_vtproto.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 4 additions & 2 deletions go/vt/vtcombo/tablet_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package vtcombo

import (
"context"
"errors"
"fmt"
"os"
"path"
Expand Down Expand Up @@ -1017,8 +1018,8 @@ func (itmc *internalTabletManagerClient) ReadReparentJournalInfo(ctx context.Con
return 0, fmt.Errorf("not implemented in vtcombo")
}

func (itmc *internalTabletManagerClient) DemotePrimary(context.Context, *topodatapb.Tablet) (*replicationdatapb.PrimaryStatus, error) {
return nil, fmt.Errorf("not implemented in vtcombo")
func (itmc *internalTabletManagerClient) DemotePrimary(context.Context, *topodatapb.Tablet, bool) (*replicationdatapb.PrimaryStatus, error) {
return nil, errors.New("not implemented in vtcombo")
}

func (itmc *internalTabletManagerClient) UndoDemotePrimary(context.Context, *topodatapb.Tablet, bool) error {
Expand Down Expand Up @@ -1103,6 +1104,7 @@ func (itmc *internalTabletManagerClient) ResetReplicationParameters(context.Cont
func (itmc *internalTabletManagerClient) ReplicaWasRestarted(context.Context, *topodatapb.Tablet, *topodatapb.TabletAlias) error {
return fmt.Errorf("not implemented in vtcombo")
}

func (itmc *internalTabletManagerClient) ResetSequences(ctx context.Context, tablet *topodatapb.Tablet, tables []string) error {
return fmt.Errorf("not implemented in vtcombo")
}
Loading
Loading