Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 0 additions & 54 deletions go/vt/vttablet/tabletmanager/action_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,6 @@ import (
"flag"
"fmt"
"math/rand"
"os"
"path"
"regexp"
"sync"
"time"
Expand Down Expand Up @@ -69,12 +67,6 @@ import (
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
)

const (
// slaveStoppedFile is the file name for the file whose existence informs
// vttablet to NOT try to repair replication.
slaveStoppedFile = "do_not_replicate"
)

var (
tabletHostname = flag.String("tablet_hostname", "", "if not empty, this hostname will be assumed instead of trying to resolve it")
)
Expand Down Expand Up @@ -190,10 +182,6 @@ type ActionAgent struct {
// _ignoreHealthErrorExpr can be set by RPC to selectively disable certain
// healthcheck errors. It should only be accessed while holding actionMutex.
_ignoreHealthErrorExpr *regexp.Regexp

// _slaveStopped remembers if we've been told to stop replicating.
// If it's nil, we'll try to check for the slaveStoppedFile.
_slaveStopped *bool
}

// NewActionAgent creates a new ActionAgent and registers all the
Expand Down Expand Up @@ -461,48 +449,6 @@ func (agent *ActionAgent) EnableUpdateStream() bool {
return agent._enableUpdateStream
}

func (agent *ActionAgent) slaveStopped() bool {
agent.mutex.Lock()
defer agent.mutex.Unlock()

// If we already know the value, don't bother checking the file.
if agent._slaveStopped != nil {
return *agent._slaveStopped
}

// If the marker file exists, we're stopped.
// Treat any read error as if the file doesn't exist.
_, err := os.Stat(path.Join(agent.MysqlDaemon.TabletDir(), slaveStoppedFile))
slaveStopped := err == nil
agent._slaveStopped = &slaveStopped
return slaveStopped
}

func (agent *ActionAgent) setSlaveStopped(slaveStopped bool) {
agent.mutex.Lock()
defer agent.mutex.Unlock()

agent._slaveStopped = &slaveStopped

// Make a best-effort attempt to persist the value across tablet restarts.
// We store a marker in the filesystem so it works regardless of whether
// mysqld is running, and so it's tied to this particular instance of the
// tablet data dir (the one that's paused at a known replication position).
tabletDir := agent.MysqlDaemon.TabletDir()
if tabletDir == "" {
return
}
markerFile := path.Join(tabletDir, slaveStoppedFile)
if slaveStopped {
file, err := os.Create(markerFile)
if err == nil {
file.Close()
}
} else {
os.Remove(markerFile)
}
}

func (agent *ActionAgent) setServicesDesiredState(disallowQueryService string, enableUpdateStream bool) {
agent.mutex.Lock()
agent._disallowQueryService = disallowQueryService
Expand Down
47 changes: 0 additions & 47 deletions go/vt/vttablet/tabletmanager/replication_reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,10 @@ package tabletmanager

import (
"flag"
"fmt"
"html/template"
"time"

"golang.org/x/net/context"

"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/vt/health"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/mysqlctl"
)

var (
Expand All @@ -53,28 +47,6 @@ func (r *replicationReporter) Report(isSlaveType, shouldQueryServiceBeRunning bo
}

status, statusErr := r.agent.MysqlDaemon.SlaveStatus()
if statusErr == mysql.ErrNotSlave ||
(statusErr == nil && !status.SlaveSQLRunning && !status.SlaveIORunning) {
// MySQL is up, but slave is either not configured or not running.
// Both SQL and IO threads are stopped, so it's probably either
// stopped on purpose, or stopped because of a mysqld restart.
if !r.agent.slaveStopped() {
// As far as we've been told, it isn't stopped on purpose,
// so let's try to start it.
if *mysqlctl.DisableActiveReparents {
log.Infof("Slave is stopped. Running with --disable_active_reparents so will not try to reconnect to master...")
} else {
log.Infof("Slave is stopped. Trying to reconnect to master...")
ctx, cancel := context.WithTimeout(r.agent.batchCtx, 5*time.Second)
if err := repairReplication(ctx, r.agent); err != nil {
log.Infof("Failed to reconnect to master: %v", err)
}
cancel()
// Check status again.
status, statusErr = r.agent.MysqlDaemon.SlaveStatus()
}
}
}
if statusErr != nil {
// mysqld is not running or slave is not configured.
// We can't report healthy.
Expand Down Expand Up @@ -106,25 +78,6 @@ func (r *replicationReporter) HTMLName() template.HTML {
return template.HTML("MySQLReplicationLag")
}

// repairReplication tries to connect this slave to whoever is
// the current master of the shard, and start replicating.
func repairReplication(ctx context.Context, agent *ActionAgent) error {
if *mysqlctl.DisableActiveReparents {
return fmt.Errorf("can't repair replication with --disable_active_reparents")
}

ts := agent.TopoServer
tablet := agent.Tablet()
si, err := ts.GetShard(ctx, tablet.Keyspace, tablet.Shard)
if err != nil {
return err
}
if !si.HasMaster() {
return fmt.Errorf("no master tablet for shard %v/%v", tablet.Keyspace, tablet.Shard)
}
return agent.setMasterLocked(ctx, si.MasterAlias, 0, true)
}

func registerReplicationReporter(agent *ActionAgent) {
if *enableReplicationReporter {
health.DefaultAggregator.Register("replication_reporter",
Expand Down
12 changes: 4 additions & 8 deletions go/vt/vttablet/tabletmanager/replication_reporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,9 @@ func TestBasicMySQLReplicationLag(t *testing.T) {
mysqld := fakemysqldaemon.NewFakeMysqlDaemon(nil)
mysqld.Replicating = true
mysqld.SecondsBehindMaster = 10
slaveStopped := true

rep := &replicationReporter{
agent: &ActionAgent{MysqlDaemon: mysqld, _slaveStopped: &slaveStopped},
agent: &ActionAgent{MysqlDaemon: mysqld},
now: time.Now,
}
dur, err := rep.Report(true, true)
Expand All @@ -44,10 +43,9 @@ func TestBasicMySQLReplicationLag(t *testing.T) {
func TestNoKnownMySQLReplicationLag(t *testing.T) {
mysqld := fakemysqldaemon.NewFakeMysqlDaemon(nil)
mysqld.Replicating = false
slaveStopped := true

rep := &replicationReporter{
agent: &ActionAgent{MysqlDaemon: mysqld, _slaveStopped: &slaveStopped},
agent: &ActionAgent{MysqlDaemon: mysqld},
now: time.Now,
}
dur, err := rep.Report(true, true)
Expand All @@ -60,11 +58,10 @@ func TestExtrapolatedMySQLReplicationLag(t *testing.T) {
mysqld := fakemysqldaemon.NewFakeMysqlDaemon(nil)
mysqld.Replicating = true
mysqld.SecondsBehindMaster = 10
slaveStopped := true

now := time.Now()
rep := &replicationReporter{
agent: &ActionAgent{MysqlDaemon: mysqld, _slaveStopped: &slaveStopped},
agent: &ActionAgent{MysqlDaemon: mysqld},
now: func() time.Time { return now },
}

Expand All @@ -88,11 +85,10 @@ func TestNoExtrapolatedMySQLReplicationLag(t *testing.T) {
mysqld := fakemysqldaemon.NewFakeMysqlDaemon(nil)
mysqld.Replicating = true
mysqld.SecondsBehindMaster = 10
slaveStopped := true

now := time.Now()
rep := &replicationReporter{
agent: &ActionAgent{MysqlDaemon: mysqld, _slaveStopped: &slaveStopped},
agent: &ActionAgent{MysqlDaemon: mysqld},
now: func() time.Time { return now },
}

Expand Down
16 changes: 1 addition & 15 deletions go/vt/vttablet/tabletmanager/rpc_replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,7 @@ func (agent *ActionAgent) StopSlave(ctx context.Context) error {
}

func (agent *ActionAgent) stopSlaveLocked(ctx context.Context) error {

// Remember that we were told to stop, so we don't try to
// restart ourselves (in replication_reporter).
agent.setSlaveStopped(true)

// Also tell Orchestrator we're stopped on purpose for some Vitess task.
// Tell Orchestrator we're stopped on purpose for some Vitess task.
// Do this in the background, as it's best-effort.
go func() {
if agent.orc == nil {
Expand Down Expand Up @@ -123,8 +118,6 @@ func (agent *ActionAgent) StartSlave(ctx context.Context) error {
}
defer agent.unlock()

agent.setSlaveStopped(false)

// Tell Orchestrator we're no longer stopped on purpose.
// Do this in the background, as it's best-effort.
go func() {
Expand Down Expand Up @@ -154,8 +147,6 @@ func (agent *ActionAgent) ResetReplication(ctx context.Context) error {
return err
}
defer agent.unlock()

agent.setSlaveStopped(true)
return agent.MysqlDaemon.ResetReplication(ctx)
}

Expand All @@ -166,9 +157,6 @@ func (agent *ActionAgent) InitMaster(ctx context.Context) (string, error) {
}
defer agent.unlock()

// Initializing as master implies undoing any previous "do not replicate".
agent.setSlaveStopped(false)

// we need to insert something in the binlogs, so we can get the
// current position. Let's just use the mysqlctl.CreateReparentJournal commands.
cmds := mysqlctl.CreateReparentJournal()
Expand Down Expand Up @@ -241,8 +229,6 @@ func (agent *ActionAgent) InitSlave(ctx context.Context, parent *topodatapb.Tabl
return err
}

agent.setSlaveStopped(false)

// If using semi-sync, we need to enable it before connecting to master.
// If we were a master type, we need to switch back to replica settings.
// Otherwise we won't be able to commit anything.
Expand Down
7 changes: 6 additions & 1 deletion test/reparent.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,12 @@ def test_reparent_down_master(self):
# bring back the old master as a slave, check that it catches up
tablet_62344.start_mysql().wait()
tablet_62344.init_tablet('replica', 'test_keyspace', '0', start=True,
wait_for_start=False)
wait_for_start=True)
# Fix replication after mysqld restart.
utils.run_vtctl(['ReparentTablet',
tablet_62344.tablet_alias], auto_log=True)
utils.run_vtctl(['StartSlave',
tablet_62344.tablet_alias], auto_log=True)
self._check_vt_insert_test(tablet_62344, 2)

tablet.kill_tablets(
Expand Down
46 changes: 1 addition & 45 deletions test/tabletmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -350,54 +350,10 @@ def test_health_check(self):
self.assertEqual(ti['type'], topodata_pb2.MASTER,
'unexpected master type: %s' % ti['type'])

# stop replication at the mysql level.
tablet_62044.mquery('', 'stop slave')
# vttablet replication_reporter should restart it.
utils.run_vtctl(['RunHealthCheck', tablet_62044.tablet_alias])
# insert something on the master and wait for it on the slave.
tablet_62344.mquery('vt_test_keyspace', [
'create table repl_test_table (id int)',
'insert into repl_test_table values (123)'], write=True)
timeout = 10.0
while True:
try:
result = tablet_62044.mquery('vt_test_keyspace',
'select * from repl_test_table')
if result:
self.assertEqual(result[0][0], 123L)
break
except MySQLdb.ProgrammingError:
# Maybe the create table hasn't gone trough yet, we wait more
logging.exception('got this exception waiting for data, ignoring it')
timeout = utils.wait_step(
'slave replication repaired by replication_reporter', timeout)

# stop replication, make sure we don't go unhealthy.
# (we have a baseline as well, so the time should be good).
utils.run_vtctl(['StopSlave', tablet_62044.tablet_alias])
utils.run_vtctl(['RunHealthCheck', tablet_62044.tablet_alias])
self.check_healthz(tablet_62044, True)

# make sure status web page is healthy
self.assertRegexpMatches(tablet_62044.get_status(), healthy_expr)

# make sure the health stream is updated
health = utils.run_vtctl_json(['VtTabletStreamHealth',
'-count', '1',
tablet_62044.tablet_alias])
self.assertTrue(('seconds_behind_master' not in health['realtime_stats']) or
(health['realtime_stats']['seconds_behind_master'] < 30),
'got unexpected health: %s' % str(health))
self.assertIn('serving', health)

# then restart replication, make sure we stay healthy
utils.run_vtctl(['StartSlave', tablet_62044.tablet_alias])
utils.run_vtctl(['RunHealthCheck', tablet_62044.tablet_alias])

# make sure status web page is healthy
self.assertRegexpMatches(tablet_62044.get_status(), healthy_expr)

# now test VtTabletStreamHealth returns the right thing
# test VtTabletStreamHealth returns the right thing
stdout, _ = utils.run_vtctl(['VtTabletStreamHealth',
'-count', '2',
tablet_62044.tablet_alias],
Expand Down