diff --git a/go/mysql/mysql56_gtid_set.go b/go/mysql/mysql56_gtid_set.go index 408c3cac0d6..071e0d929b7 100644 --- a/go/mysql/mysql56_gtid_set.go +++ b/go/mysql/mysql56_gtid_set.go @@ -573,7 +573,11 @@ func (set Mysql56GTIDSet) Difference(other Mysql56GTIDSet) Mysql56GTIDSet { diffIntervals = append(diffIntervals, intervals...) } - differenceSet[sid] = diffIntervals + if len(diffIntervals) == 0 { + delete(differenceSet, sid) + } else { + differenceSet[sid] = diffIntervals + } } return differenceSet diff --git a/go/mysql/mysql56_gtid_set_test.go b/go/mysql/mysql56_gtid_set_test.go index fdb0569c1a2..e30e6fbef0e 100644 --- a/go/mysql/mysql56_gtid_set_test.go +++ b/go/mysql/mysql56_gtid_set_test.go @@ -481,6 +481,20 @@ func TestMysql56GTIDSetDifference(t *testing.T) { if !got.Equal(want) { t.Errorf("got %#v; want %#v", got, want) } + + sid10 := SID{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15} + sid11 := SID{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15} + set10 := Mysql56GTIDSet{ + sid10: []interval{{1, 30}}, + } + set11 := Mysql56GTIDSet{ + sid11: []interval{{1, 30}}, + } + got = set10.Difference(set11) + want = Mysql56GTIDSet{} + if !got.Equal(want) { + t.Errorf("got %#v; want %#v", got, want) + } } func TestMysql56GTIDSetSIDBlock(t *testing.T) { diff --git a/go/mysql/replication_status.go b/go/mysql/replication_status.go index 58f17835b5f..38503e10d0e 100644 --- a/go/mysql/replication_status.go +++ b/go/mysql/replication_status.go @@ -112,7 +112,7 @@ func ProtoToReplicationStatus(s *replicationdatapb.Status) ReplicationStatus { // provided as a list of ReplicationStatus's. This method only works if the flavor for all retrieved ReplicationStatus's is MySQL. // The result is returned as a Mysql56GTIDSet, each of whose elements is a found errant GTID. func (s *ReplicationStatus) FindErrantGTIDs(otherReplicaStatuses []*ReplicationStatus) (Mysql56GTIDSet, error) { - set, ok := s.RelayLogPosition.GTIDSet.(Mysql56GTIDSet) + relayLogSet, ok := s.RelayLogPosition.GTIDSet.(Mysql56GTIDSet) if !ok { return nil, fmt.Errorf("errant GTIDs can only be computed on the MySQL flavor") } @@ -136,8 +136,8 @@ func (s *ReplicationStatus) FindErrantGTIDs(otherReplicaStatuses []*ReplicationS } // Copy set for final diffSet so we don't mutate receiver. - diffSet := make(Mysql56GTIDSet, len(set)) - for sid, intervals := range set { + diffSet := make(Mysql56GTIDSet, len(relayLogSet)) + for sid, intervals := range relayLogSet { if sid == s.MasterUUID { continue } diff --git a/go/test/endtoend/reparent/reparent_test.go b/go/test/endtoend/reparent/reparent_test.go index c8f0719dbb9..0e1f3ac043e 100644 --- a/go/test/endtoend/reparent/reparent_test.go +++ b/go/test/endtoend/reparent/reparent_test.go @@ -117,6 +117,23 @@ func TestReparentNoChoiceDownMaster(t *testing.T) { resurrectTablet(ctx, t, tab1) } +func TestTrivialERS(t *testing.T) { + defer cluster.PanicHandler(t) + setupReparentCluster(t) + defer teardownCluster() + + confirmReplication(t, tab1, []*cluster.Vttablet{tab2, tab3, tab4}) + + // We should be able to do a series of ERS-es, even if nothing + // is down, without issue + for i := 1; i <= 4; i++ { + out, err := ers(t, nil, "30s") + log.Infof("ERS loop %d. EmergencyReparentShard Output: %v", i, out) + require.NoError(t, err) + time.Sleep(5 * time.Second) + } +} + func TestReparentIgnoreReplicas(t *testing.T) { defer cluster.PanicHandler(t) setupReparentCluster(t) diff --git a/go/vt/vttablet/tabletmanager/rpc_replication.go b/go/vt/vttablet/tabletmanager/rpc_replication.go index eeb08050009..9c497bf6742 100644 --- a/go/vt/vttablet/tabletmanager/rpc_replication.go +++ b/go/vt/vttablet/tabletmanager/rpc_replication.go @@ -49,7 +49,7 @@ func (tm *TabletManager) ReplicationStatus(ctx context.Context) (*replicationdat return mysql.ReplicationStatusToProto(status), nil } -// MasterStatus returns the replication status fopr a master tablet. +// MasterStatus returns the replication status for a master tablet. func (tm *TabletManager) MasterStatus(ctx context.Context) (*replicationdatapb.MasterStatus, error) { status, err := tm.MysqlDaemon.MasterStatus(ctx) if err != nil { @@ -69,6 +69,7 @@ func (tm *TabletManager) MasterPosition(ctx context.Context) (string, error) { // WaitForPosition returns the master position func (tm *TabletManager) WaitForPosition(ctx context.Context, pos string) error { + log.Infof("WaitForPosition: %v", pos) mpos, err := mysql.DecodePosition(pos) if err != nil { return err @@ -79,6 +80,7 @@ func (tm *TabletManager) WaitForPosition(ctx context.Context, pos string) error // StopReplication will stop the mysql. Works both when Vitess manages // replication or not (using hook if not). func (tm *TabletManager) StopReplication(ctx context.Context) error { + log.Infof("StopReplication") if err := tm.lock(ctx); err != nil { return err } @@ -131,6 +133,7 @@ func (tm *TabletManager) stopIOThreadLocked(ctx context.Context) error { // provided position. Works both when Vitess manages // replication or not (using hook if not). func (tm *TabletManager) StopReplicationMinimum(ctx context.Context, position string, waitTime time.Duration) (string, error) { + log.Infof("StopReplicationMinimum: position: %v waitTime: %v", position, waitTime) if err := tm.lock(ctx); err != nil { return "", err } @@ -158,6 +161,7 @@ func (tm *TabletManager) StopReplicationMinimum(ctx context.Context, position st // StartReplication will start the mysql. Works both when Vitess manages // replication or not (using hook if not). func (tm *TabletManager) StartReplication(ctx context.Context) error { + log.Infof("StartReplication") if err := tm.lock(ctx); err != nil { return err } @@ -185,6 +189,7 @@ func (tm *TabletManager) StartReplication(ctx context.Context) error { // StartReplicationUntilAfter will start the replication and let it catch up // until and including the transactions in `position` func (tm *TabletManager) StartReplicationUntilAfter(ctx context.Context, position string, waitTime time.Duration) error { + log.Infof("StartReplicationUntilAfter: position: %v waitTime: %v", position, waitTime) if err := tm.lock(ctx); err != nil { return err } @@ -209,6 +214,7 @@ func (tm *TabletManager) GetReplicas(ctx context.Context) ([]string, error) { // ResetReplication completely resets the replication on the host. // All binary and relay logs are flushed. All replication positions are reset. func (tm *TabletManager) ResetReplication(ctx context.Context) error { + log.Infof("ResetReplication") if err := tm.lock(ctx); err != nil { return err } @@ -220,6 +226,7 @@ func (tm *TabletManager) ResetReplication(ctx context.Context) error { // InitMaster enables writes and returns the replication position. func (tm *TabletManager) InitMaster(ctx context.Context) (string, error) { + log.Infof("InitMaster") if err := tm.lock(ctx); err != nil { return "", err } @@ -259,6 +266,7 @@ func (tm *TabletManager) InitMaster(ctx context.Context) (string, error) { // PopulateReparentJournal adds an entry into the reparent_journal table. func (tm *TabletManager) PopulateReparentJournal(ctx context.Context, timeCreatedNS int64, actionName string, masterAlias *topodatapb.TabletAlias, position string) error { + log.Infof("PopulateReparentJournal: action: %v parent: %v position: %v", actionName, masterAlias, position) pos, err := mysql.DecodePosition(position) if err != nil { return err @@ -272,6 +280,7 @@ func (tm *TabletManager) PopulateReparentJournal(ctx context.Context, timeCreate // InitReplica sets replication master and position, and waits for the // reparent_journal table entry up to context timeout func (tm *TabletManager) InitReplica(ctx context.Context, parent *topodatapb.TabletAlias, position string, timeCreatedNS int64) error { + log.Infof("InitReplica: parent: %v position: %v", parent, position) if err := tm.lock(ctx); err != nil { return err } @@ -335,6 +344,7 @@ func (tm *TabletManager) InitReplica(ctx context.Context, parent *topodatapb.Tab // // If a step fails in the middle, it will try to undo any changes it made. func (tm *TabletManager) DemoteMaster(ctx context.Context) (*replicationdatapb.MasterStatus, error) { + log.Infof("DemoteMaster") // The public version always reverts on partial failure. return tm.demoteMaster(ctx, true /* revertPartialFailure */) } @@ -441,6 +451,7 @@ func (tm *TabletManager) demoteMaster(ctx context.Context, revertPartialFailure // it sets read-only to false, fixes semi-sync // and returns its master position. func (tm *TabletManager) UndoDemoteMaster(ctx context.Context) error { + log.Infof("UndoDemoteMaster") if err := tm.lock(ctx); err != nil { return err } @@ -477,12 +488,14 @@ func (tm *TabletManager) UndoDemoteMaster(ctx context.Context) error { // ReplicaWasPromoted promotes a replica to master, no questions asked. func (tm *TabletManager) ReplicaWasPromoted(ctx context.Context) error { + log.Infof("ReplicaWasPromoted") return tm.ChangeType(ctx, topodatapb.TabletType_MASTER) } // SetMaster sets replication master, and waits for the // reparent_journal table entry up to context timeout func (tm *TabletManager) SetMaster(ctx context.Context, parentAlias *topodatapb.TabletAlias, timeCreatedNS int64, waitPosition string, forceStartReplication bool) error { + log.Infof("SetMaster: parent: %v position: %v force: %v", parentAlias, waitPosition, forceStartReplication) if err := tm.lock(ctx); err != nil { return err } @@ -613,6 +626,8 @@ func (tm *TabletManager) setMasterLocked(ctx context.Context, parentAlias *topod return err } } + // Clear replication sentinel flag for this replica + tm.replManager.setReplicationStopped(false) } return nil @@ -620,6 +635,7 @@ func (tm *TabletManager) setMasterLocked(ctx context.Context, parentAlias *topod // ReplicaWasRestarted updates the parent record for a tablet. func (tm *TabletManager) ReplicaWasRestarted(ctx context.Context, parent *topodatapb.TabletAlias) error { + log.Infof("ReplicaWasRestarted: parent: %v", parent) if err := tm.lock(ctx); err != nil { return err } @@ -637,6 +653,7 @@ func (tm *TabletManager) ReplicaWasRestarted(ctx context.Context, parent *topoda // StopReplicationAndGetStatus stops MySQL replication, and returns the // current status. func (tm *TabletManager) StopReplicationAndGetStatus(ctx context.Context, stopReplicationMode replicationdatapb.StopReplicationMode) (StopReplicationAndGetStatusResponse, error) { + log.Infof("StopReplicationAndGetStatus: mode: %v", stopReplicationMode) if err := tm.lock(ctx); err != nil { return StopReplicationAndGetStatusResponse{}, err } @@ -727,6 +744,7 @@ type StopReplicationAndGetStatusResponse struct { // PromoteReplica makes the current tablet the master func (tm *TabletManager) PromoteReplica(ctx context.Context) (string, error) { + log.Infof("PromoteReplica") if err := tm.lock(ctx); err != nil { return "", err } @@ -746,6 +764,10 @@ func (tm *TabletManager) PromoteReplica(ctx context.Context) (string, error) { return "", err } + // Clear replication sentinel flag for this master, + // or we might block replication the next time we demote it + tm.replManager.setReplicationStopped(false) + return mysql.EncodePosition(pos), nil }