Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion config/mycnf/default.cnf
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ server-id = {{.ServerID}}
# additional configuration (like enabling semi-sync) before we connect to
# the master.
skip_slave_start
slave_load_tmpdir = {{.SlaveLoadTmpDir}}
socket = {{.SocketFile}}
tmpdir = {{.TmpDir}}

Expand Down
20 changes: 10 additions & 10 deletions go/cmd/vtbackup/vtbackup.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ func takeBackup(ctx context.Context, topoServer *topo.Server, backupStorage back

// Get the current master replication position, and wait until we catch up
// to that point. We do this instead of looking at Seconds_Behind_Master
// (replication lag reported by SHOW SLAVE STATUS) because that value can
// because that value can
// sometimes lie and tell you there's 0 lag when actually replication is
// stopped. Also, if replication is making progress but is too slow to ever
// catch up to live changes, we'd rather take a backup of something rather
Expand Down Expand Up @@ -360,7 +360,7 @@ func takeBackup(ctx context.Context, topoServer *topo.Server, backupStorage back
case <-time.After(time.Second):
}

status, statusErr := mysqld.SlaveStatus()
status, statusErr := mysqld.ReplicationStatus()
if statusErr != nil {
log.Warningf("Error getting replication status: %v", statusErr)
continue
Expand All @@ -371,7 +371,7 @@ func takeBackup(ctx context.Context, topoServer *topo.Server, backupStorage back
log.Infof("Replication caught up to %v after %v", status.Position, time.Since(waitStartTime))
break
}
if !status.SlaveRunning() {
if !status.ReplicationRunning() {
log.Warning("Replication has stopped before backup could be taken. Trying to restart replication.")
if err := startReplication(ctx, mysqld, topoServer); err != nil {
log.Warningf("Failed to restart replication: %v", err)
Expand All @@ -380,12 +380,12 @@ func takeBackup(ctx context.Context, topoServer *topo.Server, backupStorage back
}

// Stop replication and see where we are.
if err := mysqld.StopSlave(nil); err != nil {
if err := mysqld.StopReplication(nil); err != nil {
return fmt.Errorf("can't stop replication: %v", err)
}

// Did we make any progress?
status, err := mysqld.SlaveStatus()
status, err := mysqld.ReplicationStatus()
if err != nil {
return fmt.Errorf("can't get replication status: %v", err)
}
Expand Down Expand Up @@ -414,14 +414,14 @@ func resetReplication(ctx context.Context, pos mysql.Position, mysqld mysqlctl.M
"RESET SLAVE ALL", // "ALL" makes it forget master host:port.
}
if err := mysqld.ExecuteSuperQueryList(ctx, cmds); err != nil {
return vterrors.Wrap(err, "failed to reset slave")
return vterrors.Wrap(err, "failed to reset replication")
}

// Check if we have a position to resume from, if not reset to the beginning of time
if !pos.IsZero() {
// Set the position at which to resume from the master.
if err := mysqld.SetSlavePosition(ctx, pos); err != nil {
return vterrors.Wrap(err, "failed to set slave position")
if err := mysqld.SetReplicationPosition(ctx, pos); err != nil {
return vterrors.Wrap(err, "failed to set replica position")
}
} else {
if err := mysqld.ResetReplication(ctx); err != nil {
Expand All @@ -448,8 +448,8 @@ func startReplication(ctx context.Context, mysqld mysqlctl.MysqlDaemon, topoServ
return vterrors.Wrapf(err, "Cannot read master tablet %v", si.MasterAlias)
}

// Stop slave (in case we're restarting), set master, and start slave.
if err := mysqld.SetMaster(ctx, ti.Tablet.MysqlHostname, int(ti.Tablet.MysqlPort), true /* slaveStopBefore */, true /* slaveStartAfter */); err != nil {
// Stop replication (in case we're restarting), set master, and start replication.
if err := mysqld.SetMaster(ctx, ti.Tablet.MysqlHostname, int(ti.Tablet.MysqlPort), true /* stopReplicationBefore */, true /* startReplicationAfter */); err != nil {
return vterrors.Wrap(err, "MysqlDaemon.SetMaster failed")
}
return nil
Expand Down
2 changes: 1 addition & 1 deletion go/mysql/binlog_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
// BinlogEvent represents a single event from a raw MySQL binlog dump stream.
// The implementation is provided by each supported flavor in go/vt/mysqlctl.
//
// binlog.Streamer receives these events through a mysqlctl.SlaveConnection and
// binlog.Streamer receives these events through a mysqlctl.BinlogConnection and
// processes them, grouping statements into BinlogTransactions as appropriate.
//
// Methods that only access header fields can't fail as long as IsValid()
Expand Down
2 changes: 1 addition & 1 deletion go/mysql/binlog_event_make.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ func NewQueryEvent(f BinlogFormat, s *FakeBinlogStream, q Query) BinlogEvent {
if q.Charset != nil {
statusVarLength += 1 + 2 + 2 + 2
}
length := 4 + // slave proxy id
length := 4 + // proxy id
4 + // execution time
1 + // schema length
2 + // error code
Expand Down
8 changes: 4 additions & 4 deletions go/mysql/endtoend/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ func TestTLS(t *testing.T) {
}
}

func TestSlaveStatus(t *testing.T) {
func TestReplicationStatus(t *testing.T) {
params := connParams
ctx := context.Background()
conn, err := mysql.Connect(ctx, &params)
Expand All @@ -312,8 +312,8 @@ func TestSlaveStatus(t *testing.T) {
}
defer conn.Close()

status, err := conn.ShowSlaveStatus()
if err != mysql.ErrNotSlave {
t.Errorf("Got unexpected result for ShowSlaveStatus: %v %v", status, err)
status, err := conn.ShowReplicationStatus()
if err != mysql.ErrNotReplica {
t.Errorf("Got unexpected result for ShowReplicationStatus: %v %v", status, err)
}
}
93 changes: 47 additions & 46 deletions go/mysql/flavor.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ import (
)

var (
// ErrNotSlave means there is no slave status.
// Returned by ShowSlaveStatus().
ErrNotSlave = errors.New("no slave status")
// ErrNotReplica means there is no replication status.
// Returned by ShowReplicationStatus().
ErrNotReplica = errors.New("no slave status")
)

const (
Expand All @@ -52,22 +52,22 @@ type flavor interface {
// masterGTIDSet returns the current GTIDSet of a server.
masterGTIDSet(c *Conn) (GTIDSet, error)

// startSlave returns the command to start the slave.
startSlaveCommand() string
// startReplicationCommand returns the command to start the replication.
startReplicationCommand() string

// restartSlave returns the commands to stop, reset and start the slave.
restartSlaveCommands() []string
// restartReplicationCommands returns the commands to stop, reset and start the replication.
restartReplicationCommands() []string

// startSlaveUntilAfter will restart replication, but only allow it
// startReplicationUntilAfter will restart replication, but only allow it
// to run until `pos` is reached. After reaching pos, replication will be stopped again
startSlaveUntilAfter(pos Position) string
startReplicationUntilAfter(pos Position) string

// stopSlave returns the command to stop the slave.
stopSlaveCommand() string
// stopReplicationCommand returns the command to stop the replication.
stopReplicationCommand() string

// sendBinlogDumpCommand sends the packet required to start
// dumping binlogs from the specified location.
sendBinlogDumpCommand(c *Conn, slaveID uint32, startPos Position) error
sendBinlogDumpCommand(c *Conn, serverID uint32, startPos Position) error

// readBinlogEvent reads the next BinlogEvent from the connection.
readBinlogEvent(c *Conn) (BinlogEvent, error)
Expand All @@ -76,17 +76,17 @@ type flavor interface {
// replication on the host.
resetReplicationCommands(c *Conn) []string

// setSlavePositionCommands returns the commands to set the
// replication position at which the slave will resume.
setSlavePositionCommands(pos Position) []string
// setReplicationPositionCommands returns the commands to set the
// replication position at which the replica will resume.
setReplicationPositionCommands(pos Position) []string

// changeMasterArg returns the specific parameter to add to
// a change master command.
changeMasterArg() string

// status returns the result of 'SHOW SLAVE STATUS',
// status returns the result of the appropriate status command,
// with parsed replication position.
status(c *Conn) (SlaveStatus, error)
status(c *Conn) (ReplicationStatus, error)

// waitUntilPositionCommand returns the SQL command to issue
// to wait until the given position, until the context
Expand Down Expand Up @@ -163,31 +163,31 @@ func (c *Conn) MasterPosition() (Position, error) {
}, nil
}

// StartSlaveCommand returns the command to start the slave.
func (c *Conn) StartSlaveCommand() string {
return c.flavor.startSlaveCommand()
// StartReplicationCommand returns the command to start the replication.
func (c *Conn) StartReplicationCommand() string {
return c.flavor.startReplicationCommand()
}

// RestartSlaveCommands returns the commands to stop, reset and start the slave.
func (c *Conn) RestartSlaveCommands() []string {
return c.flavor.restartSlaveCommands()
// RestartReplicationCommands returns the commands to stop, reset and start the replication.
func (c *Conn) RestartReplicationCommands() []string {
return c.flavor.restartReplicationCommands()
}

// StartSlaveUntilAfterCommand returns the command to start the slave.
func (c *Conn) StartSlaveUntilAfterCommand(pos Position) string {
return c.flavor.startSlaveUntilAfter(pos)
// StartReplicationUntilAfterCommand returns the command to start the replication.
func (c *Conn) StartReplicationUntilAfterCommand(pos Position) string {
return c.flavor.startReplicationUntilAfter(pos)
}

// StopSlaveCommand returns the command to stop the slave.
func (c *Conn) StopSlaveCommand() string {
return c.flavor.stopSlaveCommand()
// StopReplicationCommand returns the command to stop the replication.
func (c *Conn) StopReplicationCommand() string {
return c.flavor.stopReplicationCommand()
}

// SendBinlogDumpCommand sends the flavor-specific version of
// the COM_BINLOG_DUMP command to start dumping raw binlog
// events over a slave connection, starting at a given GTID.
func (c *Conn) SendBinlogDumpCommand(slaveID uint32, startPos Position) error {
return c.flavor.sendBinlogDumpCommand(c, slaveID, startPos)
// events over a server connection, starting at a given GTID.
func (c *Conn) SendBinlogDumpCommand(serverID uint32, startPos Position) error {
return c.flavor.sendBinlogDumpCommand(c, serverID, startPos)
}

// ReadBinlogEvent reads the next BinlogEvent. This must be used
Expand All @@ -202,11 +202,11 @@ func (c *Conn) ResetReplicationCommands() []string {
return c.flavor.resetReplicationCommands(c)
}

// SetSlavePositionCommands returns the commands to set the
// replication position at which the slave will resume
// SetReplicationPositionCommands returns the commands to set the
// replication position at which the replica will resume
// when it is later reparented with SetMasterCommands.
func (c *Conn) SetSlavePositionCommands(pos Position) []string {
return c.flavor.setSlavePositionCommands(pos)
func (c *Conn) SetReplicationPositionCommands(pos Position) []string {
return c.flavor.setReplicationPositionCommands(pos)
}

// SetMasterCommand returns the command to use the provided master
Expand Down Expand Up @@ -240,7 +240,7 @@ func (c *Conn) SetMasterCommand(params *ConnParams, masterHost string, masterPor
return "CHANGE MASTER TO\n " + strings.Join(args, ",\n ")
}

// resultToMap is a helper function used by ShowSlaveStatus.
// resultToMap is a helper function used by ShowReplicationStatus.
func resultToMap(qr *sqltypes.Result) (map[string]string, error) {
if len(qr.Rows) == 0 {
// The query succeeded, but there is no data.
Expand All @@ -260,12 +260,13 @@ func resultToMap(qr *sqltypes.Result) (map[string]string, error) {
return result, nil
}

// parseSlaveStatus parses the common fields of SHOW SLAVE STATUS.
func parseSlaveStatus(fields map[string]string) SlaveStatus {
status := SlaveStatus{
MasterHost: fields["Master_Host"],
SlaveIORunning: fields["Slave_IO_Running"] == "Yes",
SlaveSQLRunning: fields["Slave_SQL_Running"] == "Yes",
// parseReplicationStatus parses the common (non-flavor-specific) fields of ReplicationStatus
func parseReplicationStatus(fields map[string]string) ReplicationStatus {
status := ReplicationStatus{
MasterHost: fields["Master_Host"],
// These fields are returned from the underlying DB and cannot be renamed
IOThreadRunning: fields["Slave_IO_Running"] == "Yes",
SQLThreadRunning: fields["Slave_SQL_Running"] == "Yes",
}
parseInt, _ := strconv.ParseInt(fields["Master_Port"], 10, 0)
status.MasterPort = int(parseInt)
Expand Down Expand Up @@ -302,9 +303,9 @@ func parseSlaveStatus(fields map[string]string) SlaveStatus {
return status
}

// ShowSlaveStatus executes the right SHOW SLAVE STATUS command,
// and returns a parse Position with other fields.
func (c *Conn) ShowSlaveStatus() (SlaveStatus, error) {
// ShowReplicationStatus executes the right command to fetch replication status,
// and returns a parsed Position with other fields.
func (c *Conn) ShowReplicationStatus() (ReplicationStatus, error) {
return c.flavor.status(c)
}

Expand Down
34 changes: 17 additions & 17 deletions go/mysql/flavor_filepos.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,27 +63,27 @@ func (flv *filePosFlavor) masterGTIDSet(c *Conn) (GTIDSet, error) {
}, nil
}

func (flv *filePosFlavor) startSlaveCommand() string {
func (flv *filePosFlavor) startReplicationCommand() string {
return "unsupported"
}

func (flv *filePosFlavor) restartSlaveCommands() []string {
func (flv *filePosFlavor) restartReplicationCommands() []string {
return []string{"unsupported"}
}

func (flv *filePosFlavor) stopSlaveCommand() string {
func (flv *filePosFlavor) stopReplicationCommand() string {
return "unsupported"
}

// sendBinlogDumpCommand is part of the Flavor interface.
func (flv *filePosFlavor) sendBinlogDumpCommand(c *Conn, slaveID uint32, startPos Position) error {
func (flv *filePosFlavor) sendBinlogDumpCommand(c *Conn, serverID uint32, startPos Position) error {
rpos, ok := startPos.GTIDSet.(filePosGTID)
if !ok {
return fmt.Errorf("startPos.GTIDSet is wrong type - expected filePosGTID, got: %#v", startPos.GTIDSet)
}

flv.file = rpos.file
return c.WriteComBinlogDump(slaveID, rpos.file, uint32(rpos.pos), 0)
return c.WriteComBinlogDump(serverID, rpos.file, uint32(rpos.pos), 0)
}

// readBinlogEvent is part of the Flavor interface.
Expand Down Expand Up @@ -168,40 +168,40 @@ func (flv *filePosFlavor) resetReplicationCommands(c *Conn) []string {
}
}

// setSlavePositionCommands is part of the Flavor interface.
func (flv *filePosFlavor) setSlavePositionCommands(pos Position) []string {
// setReplicationPositionCommands is part of the Flavor interface.
func (flv *filePosFlavor) setReplicationPositionCommands(pos Position) []string {
return []string{
"unsupported",
}
}

// setSlavePositionCommands is part of the Flavor interface.
// setReplicationPositionCommands is part of the Flavor interface.
func (flv *filePosFlavor) changeMasterArg() string {
return "unsupported"
}

// status is part of the Flavor interface.
func (flv *filePosFlavor) status(c *Conn) (SlaveStatus, error) {
func (flv *filePosFlavor) status(c *Conn) (ReplicationStatus, error) {
qr, err := c.ExecuteFetch("SHOW SLAVE STATUS", 100, true /* wantfields */)
if err != nil {
return SlaveStatus{}, err
return ReplicationStatus{}, err
}
if len(qr.Rows) == 0 {
// The query returned no data, meaning the server
// is not configured as a slave.
return SlaveStatus{}, ErrNotSlave
// is not configured as a replica.
return ReplicationStatus{}, ErrNotReplica
}

resultMap, err := resultToMap(qr)
if err != nil {
return SlaveStatus{}, err
return ReplicationStatus{}, err
}

return parseFilePosSlaveStatus(resultMap)
return parseFilePosReplicationStatus(resultMap)
}

func parseFilePosSlaveStatus(resultMap map[string]string) (SlaveStatus, error) {
status := parseSlaveStatus(resultMap)
func parseFilePosReplicationStatus(resultMap map[string]string) (ReplicationStatus, error) {
status := parseReplicationStatus(resultMap)

status.Position = status.FilePosition
status.RelayLogPosition = status.FileRelayLogPosition
Expand All @@ -227,7 +227,7 @@ func (flv *filePosFlavor) waitUntilPositionCommand(ctx context.Context, pos Posi
return fmt.Sprintf("SELECT MASTER_POS_WAIT('%s', %d)", filePosPos.file, filePosPos.pos), nil
}

func (*filePosFlavor) startSlaveUntilAfter(pos Position) string {
func (*filePosFlavor) startReplicationUntilAfter(pos Position) string {
return "unsupported"
}

Expand Down
Loading