diff --git a/go/vt/vttablet/tabletmanager/action_agent.go b/go/vt/vttablet/tabletmanager/action_agent.go index 3968883f526..36b4c131505 100644 --- a/go/vt/vttablet/tabletmanager/action_agent.go +++ b/go/vt/vttablet/tabletmanager/action_agent.go @@ -38,8 +38,6 @@ import ( "flag" "fmt" "math/rand" - "os" - "path" "regexp" "sync" "time" @@ -69,12 +67,6 @@ import ( topodatapb "vitess.io/vitess/go/vt/proto/topodata" ) -const ( - // slaveStoppedFile is the file name for the file whose existence informs - // vttablet to NOT try to repair replication. - slaveStoppedFile = "do_not_replicate" -) - var ( tabletHostname = flag.String("tablet_hostname", "", "if not empty, this hostname will be assumed instead of trying to resolve it") ) @@ -190,10 +182,6 @@ type ActionAgent struct { // _ignoreHealthErrorExpr can be set by RPC to selectively disable certain // healthcheck errors. It should only be accessed while holding actionMutex. _ignoreHealthErrorExpr *regexp.Regexp - - // _slaveStopped remembers if we've been told to stop replicating. - // If it's nil, we'll try to check for the slaveStoppedFile. - _slaveStopped *bool } // NewActionAgent creates a new ActionAgent and registers all the @@ -461,48 +449,6 @@ func (agent *ActionAgent) EnableUpdateStream() bool { return agent._enableUpdateStream } -func (agent *ActionAgent) slaveStopped() bool { - agent.mutex.Lock() - defer agent.mutex.Unlock() - - // If we already know the value, don't bother checking the file. - if agent._slaveStopped != nil { - return *agent._slaveStopped - } - - // If the marker file exists, we're stopped. - // Treat any read error as if the file doesn't exist. - _, err := os.Stat(path.Join(agent.MysqlDaemon.TabletDir(), slaveStoppedFile)) - slaveStopped := err == nil - agent._slaveStopped = &slaveStopped - return slaveStopped -} - -func (agent *ActionAgent) setSlaveStopped(slaveStopped bool) { - agent.mutex.Lock() - defer agent.mutex.Unlock() - - agent._slaveStopped = &slaveStopped - - // Make a best-effort attempt to persist the value across tablet restarts. - // We store a marker in the filesystem so it works regardless of whether - // mysqld is running, and so it's tied to this particular instance of the - // tablet data dir (the one that's paused at a known replication position). - tabletDir := agent.MysqlDaemon.TabletDir() - if tabletDir == "" { - return - } - markerFile := path.Join(tabletDir, slaveStoppedFile) - if slaveStopped { - file, err := os.Create(markerFile) - if err == nil { - file.Close() - } - } else { - os.Remove(markerFile) - } -} - func (agent *ActionAgent) setServicesDesiredState(disallowQueryService string, enableUpdateStream bool) { agent.mutex.Lock() agent._disallowQueryService = disallowQueryService diff --git a/go/vt/vttablet/tabletmanager/replication_reporter.go b/go/vt/vttablet/tabletmanager/replication_reporter.go index 753507c9c89..56053598150 100644 --- a/go/vt/vttablet/tabletmanager/replication_reporter.go +++ b/go/vt/vttablet/tabletmanager/replication_reporter.go @@ -18,16 +18,10 @@ package tabletmanager import ( "flag" - "fmt" "html/template" "time" - "golang.org/x/net/context" - - "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/vt/health" - "vitess.io/vitess/go/vt/log" - "vitess.io/vitess/go/vt/mysqlctl" ) var ( @@ -53,28 +47,6 @@ func (r *replicationReporter) Report(isSlaveType, shouldQueryServiceBeRunning bo } status, statusErr := r.agent.MysqlDaemon.SlaveStatus() - if statusErr == mysql.ErrNotSlave || - (statusErr == nil && !status.SlaveSQLRunning && !status.SlaveIORunning) { - // MySQL is up, but slave is either not configured or not running. - // Both SQL and IO threads are stopped, so it's probably either - // stopped on purpose, or stopped because of a mysqld restart. - if !r.agent.slaveStopped() { - // As far as we've been told, it isn't stopped on purpose, - // so let's try to start it. - if *mysqlctl.DisableActiveReparents { - log.Infof("Slave is stopped. Running with --disable_active_reparents so will not try to reconnect to master...") - } else { - log.Infof("Slave is stopped. Trying to reconnect to master...") - ctx, cancel := context.WithTimeout(r.agent.batchCtx, 5*time.Second) - if err := repairReplication(ctx, r.agent); err != nil { - log.Infof("Failed to reconnect to master: %v", err) - } - cancel() - // Check status again. - status, statusErr = r.agent.MysqlDaemon.SlaveStatus() - } - } - } if statusErr != nil { // mysqld is not running or slave is not configured. // We can't report healthy. @@ -106,25 +78,6 @@ func (r *replicationReporter) HTMLName() template.HTML { return template.HTML("MySQLReplicationLag") } -// repairReplication tries to connect this slave to whoever is -// the current master of the shard, and start replicating. -func repairReplication(ctx context.Context, agent *ActionAgent) error { - if *mysqlctl.DisableActiveReparents { - return fmt.Errorf("can't repair replication with --disable_active_reparents") - } - - ts := agent.TopoServer - tablet := agent.Tablet() - si, err := ts.GetShard(ctx, tablet.Keyspace, tablet.Shard) - if err != nil { - return err - } - if !si.HasMaster() { - return fmt.Errorf("no master tablet for shard %v/%v", tablet.Keyspace, tablet.Shard) - } - return agent.setMasterLocked(ctx, si.MasterAlias, 0, true) -} - func registerReplicationReporter(agent *ActionAgent) { if *enableReplicationReporter { health.DefaultAggregator.Register("replication_reporter", diff --git a/go/vt/vttablet/tabletmanager/replication_reporter_test.go b/go/vt/vttablet/tabletmanager/replication_reporter_test.go index 5365ac06313..b1923925542 100644 --- a/go/vt/vttablet/tabletmanager/replication_reporter_test.go +++ b/go/vt/vttablet/tabletmanager/replication_reporter_test.go @@ -29,10 +29,9 @@ func TestBasicMySQLReplicationLag(t *testing.T) { mysqld := fakemysqldaemon.NewFakeMysqlDaemon(nil) mysqld.Replicating = true mysqld.SecondsBehindMaster = 10 - slaveStopped := true rep := &replicationReporter{ - agent: &ActionAgent{MysqlDaemon: mysqld, _slaveStopped: &slaveStopped}, + agent: &ActionAgent{MysqlDaemon: mysqld}, now: time.Now, } dur, err := rep.Report(true, true) @@ -44,10 +43,9 @@ func TestBasicMySQLReplicationLag(t *testing.T) { func TestNoKnownMySQLReplicationLag(t *testing.T) { mysqld := fakemysqldaemon.NewFakeMysqlDaemon(nil) mysqld.Replicating = false - slaveStopped := true rep := &replicationReporter{ - agent: &ActionAgent{MysqlDaemon: mysqld, _slaveStopped: &slaveStopped}, + agent: &ActionAgent{MysqlDaemon: mysqld}, now: time.Now, } dur, err := rep.Report(true, true) @@ -60,11 +58,10 @@ func TestExtrapolatedMySQLReplicationLag(t *testing.T) { mysqld := fakemysqldaemon.NewFakeMysqlDaemon(nil) mysqld.Replicating = true mysqld.SecondsBehindMaster = 10 - slaveStopped := true now := time.Now() rep := &replicationReporter{ - agent: &ActionAgent{MysqlDaemon: mysqld, _slaveStopped: &slaveStopped}, + agent: &ActionAgent{MysqlDaemon: mysqld}, now: func() time.Time { return now }, } @@ -88,11 +85,10 @@ func TestNoExtrapolatedMySQLReplicationLag(t *testing.T) { mysqld := fakemysqldaemon.NewFakeMysqlDaemon(nil) mysqld.Replicating = true mysqld.SecondsBehindMaster = 10 - slaveStopped := true now := time.Now() rep := &replicationReporter{ - agent: &ActionAgent{MysqlDaemon: mysqld, _slaveStopped: &slaveStopped}, + agent: &ActionAgent{MysqlDaemon: mysqld}, now: func() time.Time { return now }, } diff --git a/go/vt/vttablet/tabletmanager/rpc_replication.go b/go/vt/vttablet/tabletmanager/rpc_replication.go index c4b109d776c..d1af6b0e3cd 100644 --- a/go/vt/vttablet/tabletmanager/rpc_replication.go +++ b/go/vt/vttablet/tabletmanager/rpc_replication.go @@ -68,12 +68,7 @@ func (agent *ActionAgent) StopSlave(ctx context.Context) error { } func (agent *ActionAgent) stopSlaveLocked(ctx context.Context) error { - - // Remember that we were told to stop, so we don't try to - // restart ourselves (in replication_reporter). - agent.setSlaveStopped(true) - - // Also tell Orchestrator we're stopped on purpose for some Vitess task. + // Tell Orchestrator we're stopped on purpose for some Vitess task. // Do this in the background, as it's best-effort. go func() { if agent.orc == nil { @@ -123,8 +118,6 @@ func (agent *ActionAgent) StartSlave(ctx context.Context) error { } defer agent.unlock() - agent.setSlaveStopped(false) - // Tell Orchestrator we're no longer stopped on purpose. // Do this in the background, as it's best-effort. go func() { @@ -154,8 +147,6 @@ func (agent *ActionAgent) ResetReplication(ctx context.Context) error { return err } defer agent.unlock() - - agent.setSlaveStopped(true) return agent.MysqlDaemon.ResetReplication(ctx) } @@ -166,9 +157,6 @@ func (agent *ActionAgent) InitMaster(ctx context.Context) (string, error) { } defer agent.unlock() - // Initializing as master implies undoing any previous "do not replicate". - agent.setSlaveStopped(false) - // we need to insert something in the binlogs, so we can get the // current position. Let's just use the mysqlctl.CreateReparentJournal commands. cmds := mysqlctl.CreateReparentJournal() @@ -241,8 +229,6 @@ func (agent *ActionAgent) InitSlave(ctx context.Context, parent *topodatapb.Tabl return err } - agent.setSlaveStopped(false) - // If using semi-sync, we need to enable it before connecting to master. // If we were a master type, we need to switch back to replica settings. // Otherwise we won't be able to commit anything. diff --git a/test/reparent.py b/test/reparent.py index c4957465e81..7aff1010dc3 100755 --- a/test/reparent.py +++ b/test/reparent.py @@ -195,7 +195,12 @@ def test_reparent_down_master(self): # bring back the old master as a slave, check that it catches up tablet_62344.start_mysql().wait() tablet_62344.init_tablet('replica', 'test_keyspace', '0', start=True, - wait_for_start=False) + wait_for_start=True) + # Fix replication after mysqld restart. + utils.run_vtctl(['ReparentTablet', + tablet_62344.tablet_alias], auto_log=True) + utils.run_vtctl(['StartSlave', + tablet_62344.tablet_alias], auto_log=True) self._check_vt_insert_test(tablet_62344, 2) tablet.kill_tablets( diff --git a/test/tabletmanager.py b/test/tabletmanager.py index b5c84d1dbd1..8e32b712632 100755 --- a/test/tabletmanager.py +++ b/test/tabletmanager.py @@ -350,54 +350,10 @@ def test_health_check(self): self.assertEqual(ti['type'], topodata_pb2.MASTER, 'unexpected master type: %s' % ti['type']) - # stop replication at the mysql level. - tablet_62044.mquery('', 'stop slave') - # vttablet replication_reporter should restart it. - utils.run_vtctl(['RunHealthCheck', tablet_62044.tablet_alias]) - # insert something on the master and wait for it on the slave. - tablet_62344.mquery('vt_test_keyspace', [ - 'create table repl_test_table (id int)', - 'insert into repl_test_table values (123)'], write=True) - timeout = 10.0 - while True: - try: - result = tablet_62044.mquery('vt_test_keyspace', - 'select * from repl_test_table') - if result: - self.assertEqual(result[0][0], 123L) - break - except MySQLdb.ProgrammingError: - # Maybe the create table hasn't gone trough yet, we wait more - logging.exception('got this exception waiting for data, ignoring it') - timeout = utils.wait_step( - 'slave replication repaired by replication_reporter', timeout) - - # stop replication, make sure we don't go unhealthy. - # (we have a baseline as well, so the time should be good). - utils.run_vtctl(['StopSlave', tablet_62044.tablet_alias]) - utils.run_vtctl(['RunHealthCheck', tablet_62044.tablet_alias]) - self.check_healthz(tablet_62044, True) - - # make sure status web page is healthy - self.assertRegexpMatches(tablet_62044.get_status(), healthy_expr) - - # make sure the health stream is updated - health = utils.run_vtctl_json(['VtTabletStreamHealth', - '-count', '1', - tablet_62044.tablet_alias]) - self.assertTrue(('seconds_behind_master' not in health['realtime_stats']) or - (health['realtime_stats']['seconds_behind_master'] < 30), - 'got unexpected health: %s' % str(health)) - self.assertIn('serving', health) - - # then restart replication, make sure we stay healthy - utils.run_vtctl(['StartSlave', tablet_62044.tablet_alias]) - utils.run_vtctl(['RunHealthCheck', tablet_62044.tablet_alias]) - # make sure status web page is healthy self.assertRegexpMatches(tablet_62044.get_status(), healthy_expr) - # now test VtTabletStreamHealth returns the right thing + # test VtTabletStreamHealth returns the right thing stdout, _ = utils.run_vtctl(['VtTabletStreamHealth', '-count', '2', tablet_62044.tablet_alias],