Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions go/cmd/vtctld/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@ var (
schemaChangeController = flag.String("schema_change_controller", "", "schema change controller is responsible for finding schema changes and responding to schema change events")
schemaChangeCheckInterval = flag.Int("schema_change_check_interval", 60, "this value decides how often we check schema change dir, in seconds")
schemaChangeUser = flag.String("schema_change_user", "", "The user who submits this schema change.")
schemaChangeSlaveTimeout = flag.Duration("schema_change_slave_timeout", 10*time.Second, "how long to wait for replicas to receive the schema change")
// for backwards compatibility
deprecatedTimeout = flag.Duration("schema_change_slave_timeout", wrangler.DefaultWaitReplicasTimeout, "DEPRECATED -- use -schema_change_replicas_timeout instead")
schemaChangeReplicasTimeout = flag.Duration("schema_change_replicas_timeout", wrangler.DefaultWaitReplicasTimeout, "how long to wait for replicas to receive the schema change")
)

func initSchema() {
Expand All @@ -46,6 +48,9 @@ func initSchema() {
if *schemaChangeCheckInterval > 0 {
interval = *schemaChangeCheckInterval
}
if *deprecatedTimeout != 10*time.Second {
*schemaChangeReplicasTimeout = *deprecatedTimeout
}
timer := timer.NewTimer(time.Duration(interval) * time.Second)
controllerFactory, err :=
schemamanager.GetControllerFactory(*schemaChangeController)
Expand All @@ -67,7 +72,7 @@ func initSchema() {
err = schemamanager.Run(
ctx,
controller,
schemamanager.NewTabletExecutor(wr, *schemaChangeSlaveTimeout),
schemamanager.NewTabletExecutor(wr, *schemaChangeReplicasTimeout),
)
if err != nil {
log.Errorf("Schema change failed, error: %v", err)
Expand Down
2 changes: 1 addition & 1 deletion go/test/endtoend/cluster/vtctlclient_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ type VtctlClientProcess struct {
func (vtctlclient *VtctlClientProcess) InitShardMaster(Keyspace string, Shard string, Cell string, TabletUID int) (err error) {
return vtctlclient.ExecuteCommand(
"InitShardMaster",
"-force",
"-force", "-wait_replicas_timeout", "31s",
fmt.Sprintf("%s/%s", Keyspace, Shard),
fmt.Sprintf("%s-%d", Cell, TabletUID))
}
Expand Down
20 changes: 12 additions & 8 deletions go/test/endtoend/reparent/reparent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,8 @@ func TestReparentDownMaster(t *testing.T) {
err = clusterInstance.VtctlclientProcess.ExecuteCommand(
"EmergencyReparentShard",
"-keyspace_shard", keyspaceShard,
"-new_master", tablet62044.Alias)
"-new_master", tablet62044.Alias,
"-wait_replicas_timeout", "31s")
require.Nil(t, err)

validateTopology(t, false)
Expand Down Expand Up @@ -324,7 +325,9 @@ func TestReparentReplicaOffline(t *testing.T) {
out, err := clusterInstance.VtctlclientProcess.ExecuteCommandWithOutput(
"PlannedReparentShard",
"-keyspace_shard", keyspaceShard,
"-new_master", tablet62044.Alias)
"-new_master", tablet62044.Alias,
"-wait_replicas_timeout", "31s")

require.Error(t, err)
assert.Contains(t, out, "tablet zone2-0000031981 SetMaster failed")

Expand Down Expand Up @@ -635,9 +638,10 @@ func TestChangeTypeSemiSync(t *testing.T) {
}

// Updated rdonly tablet and set tablet type to rdonly
// TODO: replace with ChangeTabletType once ChangeSlaveType is removed
err = clusterInstance.VtctlclientProcess.ExecuteCommand("ChangeSlaveType", rdonly1.Alias, "rdonly")
require.Nil(t, err)
err = clusterInstance.VtctlclientProcess.ExecuteCommand("ChangeSlaveType", rdonly2.Alias, "rdonly")
err = clusterInstance.VtctlclientProcess.ExecuteCommand("ChangeTabletType", rdonly2.Alias, "rdonly")
require.Nil(t, err)

validateTopology(t, true)
Expand All @@ -646,7 +650,7 @@ func TestChangeTypeSemiSync(t *testing.T) {

// Stop replication on rdonly1, to make sure when we make it replica it doesn't start again.
// Note we do a similar test for replica -> rdonly below.
err = clusterInstance.VtctlclientProcess.ExecuteCommand("StopSlave", rdonly1.Alias)
err = clusterInstance.VtctlclientProcess.ExecuteCommand("StopReplication", rdonly1.Alias)
require.Nil(t, err)

// Check semi-sync on replicas.
Expand All @@ -661,27 +665,27 @@ func TestChangeTypeSemiSync(t *testing.T) {
checkDBstatus(ctx, t, rdonly2, "Rpl_semi_sync_slave_status", "OFF")

// Change replica to rdonly while replicating, should turn off semi-sync, and restart replication.
err = clusterInstance.VtctlclientProcess.ExecuteCommand("ChangeSlaveType", replica.Alias, "rdonly")
err = clusterInstance.VtctlclientProcess.ExecuteCommand("ChangeTabletType", replica.Alias, "rdonly")
require.Nil(t, err)
checkDBvar(ctx, t, replica, "rpl_semi_sync_slave_enabled", "OFF")
checkDBstatus(ctx, t, replica, "Rpl_semi_sync_slave_status", "OFF")

// Change rdonly1 to replica, should turn on semi-sync, and not start replication.
err = clusterInstance.VtctlclientProcess.ExecuteCommand("ChangeSlaveType", rdonly1.Alias, "replica")
err = clusterInstance.VtctlclientProcess.ExecuteCommand("ChangeTabletType", rdonly1.Alias, "replica")
require.Nil(t, err)
checkDBvar(ctx, t, rdonly1, "rpl_semi_sync_slave_enabled", "ON")
checkDBstatus(ctx, t, rdonly1, "Rpl_semi_sync_slave_status", "OFF")
checkReplicaStatus(ctx, t, rdonly1)

// Now change from replica back to rdonly, make sure replication is still not enabled.
err = clusterInstance.VtctlclientProcess.ExecuteCommand("ChangeSlaveType", rdonly1.Alias, "rdonly")
err = clusterInstance.VtctlclientProcess.ExecuteCommand("ChangeTabletType", rdonly1.Alias, "rdonly")
require.Nil(t, err)
checkDBvar(ctx, t, rdonly1, "rpl_semi_sync_slave_enabled", "OFF")
checkDBstatus(ctx, t, rdonly1, "Rpl_semi_sync_slave_status", "OFF")
checkReplicaStatus(ctx, t, rdonly1)

// Change rdonly2 to replica, should turn on semi-sync, and restart replication.
err = clusterInstance.VtctlclientProcess.ExecuteCommand("ChangeSlaveType", rdonly2.Alias, "replica")
err = clusterInstance.VtctlclientProcess.ExecuteCommand("ChangeTabletType", rdonly2.Alias, "replica")
require.Nil(t, err)
checkDBvar(ctx, t, rdonly2, "rpl_semi_sync_slave_enabled", "ON")
checkDBstatus(ctx, t, rdonly2, "Rpl_semi_sync_slave_status", "ON")
Expand Down
15 changes: 8 additions & 7 deletions go/test/endtoend/tabletmanager/tablet_health_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ func TestHealthCheck(t *testing.T) {
exec(t, masterConn, "stop slave")

// stop replication, make sure we don't go unhealthy.
// TODO: replace with StopReplication once StopSlave has been removed
err = clusterInstance.VtctlclientProcess.ExecuteCommand("StopSlave", rTablet.Alias)
require.NoError(t, err)
err = clusterInstance.VtctlclientProcess.ExecuteCommand("RunHealthCheck", rTablet.Alias)
Expand All @@ -141,7 +142,7 @@ func TestHealthCheck(t *testing.T) {
verifyStreamHealth(t, result)

// then restart replication, make sure we stay healthy
err = clusterInstance.VtctlclientProcess.ExecuteCommand("StopSlave", rTablet.Alias)
err = clusterInstance.VtctlclientProcess.ExecuteCommand("StopReplication", rTablet.Alias)
require.NoError(t, err)
err = clusterInstance.VtctlclientProcess.ExecuteCommand("RunHealthCheck", rTablet.Alias)
require.NoError(t, err)
Expand Down Expand Up @@ -220,13 +221,13 @@ func TestHealthCheckDrainedStateDoesNotShutdownQueryService(t *testing.T) {
// actions are similar to the SplitClone vtworker command
// implementation.) The tablet will stay healthy, and the
// query service is still running.
err = clusterInstance.VtctlclientProcess.ExecuteCommand("ChangeSlaveType", rdonlyTablet.Alias, "drained")
err = clusterInstance.VtctlclientProcess.ExecuteCommand("ChangeTabletType", rdonlyTablet.Alias, "drained")
require.NoError(t, err)
// Trying to drain the same tablet again, should error
err = clusterInstance.VtctlclientProcess.ExecuteCommand("ChangeSlaveType", rdonlyTablet.Alias, "drained")
err = clusterInstance.VtctlclientProcess.ExecuteCommand("ChangeTabletType", rdonlyTablet.Alias, "drained")
assert.Error(t, err, "already drained")

err = clusterInstance.VtctlclientProcess.ExecuteCommand("StopSlave", rdonlyTablet.Alias)
err = clusterInstance.VtctlclientProcess.ExecuteCommand("StopReplication", rdonlyTablet.Alias)
require.NoError(t, err)
// Trigger healthcheck explicitly to avoid waiting for the next interval.
err = clusterInstance.VtctlclientProcess.ExecuteCommand("RunHealthCheck", rdonlyTablet.Alias)
Expand All @@ -239,7 +240,7 @@ func TestHealthCheckDrainedStateDoesNotShutdownQueryService(t *testing.T) {
require.NoError(t, err)

// Restart replication. Tablet will become healthy again.
err = clusterInstance.VtctlclientProcess.ExecuteCommand("ChangeSlaveType", rdonlyTablet.Alias, "rdonly")
err = clusterInstance.VtctlclientProcess.ExecuteCommand("ChangeTabletType", rdonlyTablet.Alias, "rdonly")
require.NoError(t, err)
err = clusterInstance.VtctlclientProcess.ExecuteCommand("StartSlave", rdonlyTablet.Alias)
require.NoError(t, err)
Expand Down Expand Up @@ -382,9 +383,9 @@ func TestNoMysqlHealthCheck(t *testing.T) {
checkHealth(t, rTablet.HTTPPort, true)

// Tell replica to not try to repair replication in healthcheck.
// The StopSlave will ultimately fail because mysqld is not running,
// The StopReplication will ultimately fail because mysqld is not running,
// But vttablet should remember that it's not supposed to fix replication.
err = clusterInstance.VtctlclientProcess.ExecuteCommand("StopSlave", rTablet.Alias)
err = clusterInstance.VtctlclientProcess.ExecuteCommand("StopReplication", rTablet.Alias)
assert.Error(t, err, "Fail as mysqld not running")

//The above notice to not fix replication should survive tablet restart.
Expand Down
28 changes: 21 additions & 7 deletions go/vt/vtctl/reparent.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package vtctl
import (
"flag"
"fmt"
"time"

"golang.org/x/net/context"
"vitess.io/vitess/go/vt/topo"
Expand All @@ -40,17 +39,17 @@ func init() {
addCommand("Shards", command{
"InitShardMaster",
commandInitShardMaster,
"[-force] [-wait_slave_timeout=<duration>] <keyspace/shard> <tablet alias>",
"[-force] [-wait_replicas_timeout=<duration>] <keyspace/shard> <tablet alias>",
"Sets the initial master for a shard. Will make all other tablets in the shard replicas of the provided master. WARNING: this could cause data loss on an already replicating shard. PlannedReparentShard or EmergencyReparentShard should be used instead."})
addCommand("Shards", command{
"PlannedReparentShard",
commandPlannedReparentShard,
"-keyspace_shard=<keyspace/shard> [-new_master=<tablet alias>] [-avoid_master=<tablet alias>] [-wait_slave_timeout=<duration>]",
"-keyspace_shard=<keyspace/shard> [-new_master=<tablet alias>] [-avoid_master=<tablet alias>] [-wait_replicas_timeout=<duration>]",
"Reparents the shard to the new master, or away from old master. Both old and new master need to be up and running."})
addCommand("Shards", command{
"EmergencyReparentShard",
commandEmergencyReparentShard,
"-keyspace_shard=<keyspace/shard> -new_master=<tablet alias>",
"-keyspace_shard=<keyspace/shard> -new_master=<tablet alias> [-wait_replicas_timeout=<duration>]",
"Reparents the shard to the new master. Assumes the old master is dead and not responsding."})
addCommand("Shards", command{
"TabletExternallyReparented",
Expand Down Expand Up @@ -84,7 +83,12 @@ func commandInitShardMaster(ctx context.Context, wr *wrangler.Wrangler, subFlags
}

force := subFlags.Bool("force", false, "will force the reparent even if the provided tablet is not a master or the shard master")
waitReplicasTimeout := subFlags.Duration("wait_slave_timeout", 30*time.Second, "time to wait for replicas to catch up in reparenting")
// for backwards compatibility
deprecatedTimeout := subFlags.Duration("wait_slave_timeout", *topo.RemoteOperationTimeout, "DEPRECATED -- use -wait_replicas_timeout")
waitReplicasTimeout := subFlags.Duration("wait_replicas_timeout", *topo.RemoteOperationTimeout, "time to wait for replicas to catch up in reparenting")
if *deprecatedTimeout != *topo.RemoteOperationTimeout {
*waitReplicasTimeout = *deprecatedTimeout
}
if err := subFlags.Parse(args); err != nil {
return err
}
Expand All @@ -107,7 +111,12 @@ func commandPlannedReparentShard(ctx context.Context, wr *wrangler.Wrangler, sub
return fmt.Errorf("active reparent commands disabled (unset the -disable_active_reparents flag to enable)")
}

waitReplicasTimeout := subFlags.Duration("wait_slave_timeout", *topo.RemoteOperationTimeout, "time to wait for replicas to catch up on replication before and after reparenting")
// for backwards compatibility
deprecatedTimeout := subFlags.Duration("wait_slave_timeout", *topo.RemoteOperationTimeout, "DEPRECATED -- use -wait_replicas_timeout")
waitReplicasTimeout := subFlags.Duration("wait_replicas_timeout", *topo.RemoteOperationTimeout, "time to wait for replicas to catch up on replication before and after reparenting")
if *deprecatedTimeout != *topo.RemoteOperationTimeout {
*waitReplicasTimeout = *deprecatedTimeout
}
keyspaceShard := subFlags.String("keyspace_shard", "", "keyspace/shard of the shard that needs to be reparented")
newMaster := subFlags.String("new_master", "", "alias of a tablet that should be the new master")
avoidMaster := subFlags.String("avoid_master", "", "alias of a tablet that should not be the master, i.e. reparent to any other tablet if this one is the master")
Expand Down Expand Up @@ -150,7 +159,12 @@ func commandEmergencyReparentShard(ctx context.Context, wr *wrangler.Wrangler, s
return fmt.Errorf("active reparent commands disabled (unset the -disable_active_reparents flag to enable)")
}

waitReplicasTimeout := subFlags.Duration("wait_slave_timeout", 30*time.Second, "time to wait for replicas to catch up in reparenting")
// for backwards compatibility
deprecatedTimeout := subFlags.Duration("wait_slave_timeout", *topo.RemoteOperationTimeout, "DEPRECATED -- use -wait_replicas_timeout")
waitReplicasTimeout := subFlags.Duration("wait_replicas_timeout", *topo.RemoteOperationTimeout, "time to wait for replicas to catch up in reparenting")
if *deprecatedTimeout != *topo.RemoteOperationTimeout {
*waitReplicasTimeout = *deprecatedTimeout
}
keyspaceShard := subFlags.String("keyspace_shard", "", "keyspace/shard of the shard that needs to be reparented")
newMaster := subFlags.String("new_master", "", "alias of a tablet that should be the new master")
if err := subFlags.Parse(args); err != nil {
Expand Down
34 changes: 27 additions & 7 deletions go/vt/vtctl/vtctl.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,12 +173,22 @@ var commands = []commandGroup{
"<tablet alias>",
"Sets the tablet as read-write."},
{"StartSlave", commandStartReplication,
"<tablet alias>",
"DEPRECATED -- Use StartReplication <tablet alias>",
"Starts replication on the specified tablet."},
{"StartReplication", commandStartReplication,
"<table alias>",
"Starts replication on the specified tablet."},
{"StopSlave", commandStopReplication,
"DEPRECATED -- Use StopReplication <tablet alias>",
"Stops replication on the specified tablet."},
{"StopReplication", commandStopReplication,
"<tablet alias>",
"Stops replication on the specified tablet."},
{"ChangeSlaveType", commandChangeTabletType,
"DEPRECATED -- Use ChangeTabletType [-dry-run] <tablet alias> <tablet type>",
"Changes the db type for the specified tablet, if possible. This command is used primarily to arrange replicas, and it will not convert a master.\n" +
"NOTE: This command automatically updates the serving graph.\n"},
{"ChangeTabletType", commandChangeTabletType,
"[-dry-run] <tablet alias> <tablet type>",
"Changes the db type for the specified tablet, if possible. This command is used primarily to arrange replicas, and it will not convert a master.\n" +
"NOTE: This command automatically updates the serving graph.\n"},
Expand Down Expand Up @@ -391,10 +401,10 @@ var commands = []commandGroup{
"[-exclude_tables=''] [-include-views] [-skip-no-master] <keyspace name>",
"Validates that the master schema from shard 0 matches the schema on all of the other tablets in the keyspace."},
{"ApplySchema", commandApplySchema,
"[-allow_long_unavailability] [-wait_slave_timeout=10s] {-sql=<sql> || -sql-file=<filename>} <keyspace>",
"[-allow_long_unavailability] [-wait_replicas_timeout=10s] {-sql=<sql> || -sql-file=<filename>} <keyspace>",
"Applies the schema change to the specified keyspace on every master, running in parallel on all shards. The changes are then propagated to replicas via replication. If -allow_long_unavailability is set, schema changes affecting a large number of rows (and possibly incurring a longer period of unavailability) will not be rejected."},
{"CopySchemaShard", commandCopySchemaShard,
"[-tables=<table1>,<table2>,...] [-exclude_tables=<table1>,<table2>,...] [-include-views] [-skip-verify] [-wait_slave_timeout=10s] {<source keyspace/shard> || <source tablet alias>} <destination keyspace/shard>",
"[-tables=<table1>,<table2>,...] [-exclude_tables=<table1>,<table2>,...] [-include-views] [-skip-verify] [-wait_replicas_timeout=10s] {<source keyspace/shard> || <source tablet alias>} <destination keyspace/shard>",
"Copies the schema from a source shard's master (or a specific tablet) to a destination shard. The schema is applied directly on the master of the destination shard, and it is propagated to the replicas through binlogs."},

{"ValidateVersionShard", commandValidateVersionShard,
Expand Down Expand Up @@ -887,7 +897,7 @@ func commandStartReplication(ctx context.Context, wr *wrangler.Wrangler, subFlag
return err
}
if subFlags.NArg() != 1 {
return fmt.Errorf("action StartSlave requires <tablet alias>")
return fmt.Errorf("action StartReplication requires <tablet alias>")
}

tabletAlias, err := topoproto.ParseTabletAlias(subFlags.Arg(0))
Expand All @@ -906,7 +916,7 @@ func commandStopReplication(ctx context.Context, wr *wrangler.Wrangler, subFlags
return err
}
if subFlags.NArg() != 1 {
return fmt.Errorf("action StopSlave requires <tablet alias>")
return fmt.Errorf("action StopReplication requires <tablet alias>")
}

tabletAlias, err := topoproto.ParseTabletAlias(subFlags.Arg(0))
Expand Down Expand Up @@ -2364,7 +2374,12 @@ func commandApplySchema(ctx context.Context, wr *wrangler.Wrangler, subFlags *fl
allowLongUnavailability := subFlags.Bool("allow_long_unavailability", false, "Allow large schema changes which incur a longer unavailability of the database.")
sql := subFlags.String("sql", "", "A list of semicolon-delimited SQL commands")
sqlFile := subFlags.String("sql-file", "", "Identifies the file that contains the SQL commands")
waitReplicasTimeout := subFlags.Duration("wait_slave_timeout", wrangler.DefaultWaitReplicasTimeout, "The amount of time to wait for replicas to receive the schema change via replication.")
// for backwards compatibility
deprecatedTimeout := subFlags.Duration("wait_slave_timeout", wrangler.DefaultWaitReplicasTimeout, "DEPRECATED -- use -wait_replicas_timeout")
waitReplicasTimeout := subFlags.Duration("wait_replicas_timeout", wrangler.DefaultWaitReplicasTimeout, "The amount of time to wait for replicas to receive the schema change via replication.")
if *deprecatedTimeout != wrangler.DefaultWaitReplicasTimeout {
*waitReplicasTimeout = *deprecatedTimeout
}
if err := subFlags.Parse(args); err != nil {
return err
}
Expand Down Expand Up @@ -2394,7 +2409,12 @@ func commandCopySchemaShard(ctx context.Context, wr *wrangler.Wrangler, subFlags
excludeTables := subFlags.String("exclude_tables", "", "Specifies a comma-separated list of tables to exclude. Each is either an exact match, or a regular expression of the form /regexp/")
includeViews := subFlags.Bool("include-views", true, "Includes views in the output")
skipVerify := subFlags.Bool("skip-verify", false, "Skip verification of source and target schema after copy")
waitReplicasTimeout := subFlags.Duration("wait_slave_timeout", 10*time.Second, "The amount of time to wait for replicas to receive the schema change via replication.")
// for backwards compatibility
deprecatedTimeout := subFlags.Duration("wait_slave_timeout", wrangler.DefaultWaitReplicasTimeout, "DEPRECATED -- use -wait_replicas_timeout")
waitReplicasTimeout := subFlags.Duration("wait_replicas_timeout", wrangler.DefaultWaitReplicasTimeout, "The amount of time to wait for replicas to receive the schema change via replication.")
if *deprecatedTimeout != wrangler.DefaultWaitReplicasTimeout {
*waitReplicasTimeout = *deprecatedTimeout
}
if err := subFlags.Parse(args); err != nil {
return err
}
Expand Down
Loading