Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions go/mysql/flavor_mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,15 @@ func (mysqlFlavor) status(c *Conn) (SlaveStatus, error) {
if err != nil {
return SlaveStatus{}, vterrors.Wrapf(err, "SlaveStatus can't parse MySQL 5.6 GTID (Executed_Gtid_Set: %#v)", resultMap["Executed_Gtid_Set"])
}
relayLogGTIDSet, err := parseMysql56GTIDSet(resultMap["Retrieved_Gtid_Set"])
if err != nil {
return SlaveStatus{}, vterrors.Wrapf(err, "SlaveStatus can't parse MySQL 5.6 GTID (Retrieved_Gtid_Set: %#v)", resultMap["Retrieved_Gtid_Set"])
}
// We take the union of the executed and retrieved gtidset, because the retrieved gtidset only represents GTIDs since
// the relay log has been reset. To get the full Position, we need to take a union of executed GTIDSets, since these would
// have been in the relay log's GTIDSet in the past, prior to a reset.
status.RelayLogPosition.GTIDSet = status.Position.GTIDSet.Union(relayLogGTIDSet)

return status, nil
}

Expand Down
13 changes: 12 additions & 1 deletion go/mysql/slave_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,12 @@ import (

// SlaveStatus holds replication information from SHOW SLAVE STATUS.
type SlaveStatus struct {
Position Position
Position Position
// RelayLogPosition is the Position that the replica would be at if it
// were to finish executing everything that's currently in its relay log.
// However, some MySQL flavors don't expose this information,
// in which case RelayLogPosition.IsZero() will be true.
RelayLogPosition Position
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It turns out we do have the info we need to fill this in for the filepos flavor: #6190 (comment)

Leaving this here since there's not a better place to stick a line comment about it.

SlaveIORunning bool
SlaveSQLRunning bool
SecondsBehindMaster uint
Expand All @@ -42,6 +47,7 @@ func (s *SlaveStatus) SlaveRunning() bool {
func SlaveStatusToProto(s SlaveStatus) *replicationdatapb.Status {
return &replicationdatapb.Status{
Position: EncodePosition(s.Position),
RelayLogPosition: EncodePosition(s.RelayLogPosition),
SlaveIoRunning: s.SlaveIORunning,
SlaveSqlRunning: s.SlaveSQLRunning,
SecondsBehindMaster: uint32(s.SecondsBehindMaster),
Expand All @@ -57,8 +63,13 @@ func ProtoToSlaveStatus(s *replicationdatapb.Status) SlaveStatus {
if err != nil {
panic(vterrors.Wrapf(err, "cannot decode Position"))
}
relayPos, err := DecodePosition(s.RelayLogPosition)
if err != nil {
panic(vterrors.Wrapf(err, "cannot decode RelayLogPosition"))
}
return SlaveStatus{
Position: pos,
RelayLogPosition: relayPos,
SlaveIORunning: s.SlaveIoRunning,
SlaveSQLRunning: s.SlaveSqlRunning,
SecondsBehindMaster: uint(s.SecondsBehindMaster),
Expand Down
48 changes: 29 additions & 19 deletions go/vt/proto/replicationdata/replicationdata.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

202 changes: 147 additions & 55 deletions go/vt/wrangler/reparent.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ This file handles the reparenting operations.
import (
"context"
"fmt"
"sort"
"sync"
"time"

Expand Down Expand Up @@ -757,47 +758,75 @@ func (wr *Wrangler) findCurrentMaster(tabletMap map[string]*topo.TabletInfo) *to
return currentMaster
}

// maxReplPosSearch is a struct helping to search for a tablet with the largest replication
// position querying status from all tablets in parallel.
type maxReplPosSearch struct {
// tabletReplInfoAggregator is a struct that can be used to get all tablets replication positions tied to their aliases concurrently.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should mention that this looks at the relay log position whenever possible, since this is different from what people usually mean when they refer to the replication position of an instance. Usually, it refers to which snapshot of data you'd access if you sent a query to that instance, meaning the executed GTID set. We pretty much only care about the relay log position in the context of checking for semi-sync ACKed transactions that would otherwise be lost in the case of an unplanned failover.

In fact, I'm just now realizing that we should not look at relay log pos when this is called from PlannedReparentShard. In PRS, the next thing we're going to do after choosing a candidate is wait for its executed position to catch up to the demoted (frozen) master. Since the demoted master is available in this case, we don't need to look for semi-sync ACKed transactions that were otherwise lost -- we know the master must have them and we know the master's position.

The only reason we look at candidate replication positions at all in PRS is to minimize the time we spend waiting for the chosen candidate to catch up. We therefore want to choose the one that has executed the farthest, not the one that has merely retrieved the farthest; execution is what's going to take time. So I think we should make it optional to look at relay log pos and keep that off when calling this from PRS.

@deepthi Can you check my logic on this?

// It allows for a sorting of the retrieved replication infos, based on position, such that the earliest members
// of the sorted list have the largest replication position.
type tabletReplInfoAggregator struct {
wrangler *Wrangler
ctx context.Context
waitReplicasTimeout time.Duration
waitGroup sync.WaitGroup
maxPosLock sync.Mutex
maxPos mysql.Position
maxPosTablet *topodatapb.Tablet
statusLock sync.Mutex
tabletReplInfos []*replicationInfo
}

func (maxPosSearch *maxReplPosSearch) processTablet(tablet *topodatapb.Tablet) {
defer maxPosSearch.waitGroup.Done()
maxPosSearch.wrangler.logger.Infof("getting replication position from %v", topoproto.TabletAliasString(tablet.Alias))
type replicationInfo struct {
alias *topodatapb.TabletAlias
position mysql.Position
}

func (t *tabletReplInfoAggregator) processTablet(tablet *topodatapb.Tablet) {
defer t.waitGroup.Done()
t.wrangler.logger.Infof("getting replication position from %v", topoproto.TabletAliasString(tablet.Alias))

slaveStatusCtx, cancelSlaveStatus := context.WithTimeout(maxPosSearch.ctx, maxPosSearch.waitReplicasTimeout)
slaveStatusCtx, cancelSlaveStatus := context.WithTimeout(t.ctx, t.waitReplicasTimeout)
defer cancelSlaveStatus()

status, err := maxPosSearch.wrangler.tmc.SlaveStatus(slaveStatusCtx, tablet)
status, err := t.wrangler.tmc.SlaveStatus(slaveStatusCtx, tablet)
if err != nil {
maxPosSearch.wrangler.logger.Warningf("failed to get replication status from %v, ignoring tablet: %v", topoproto.TabletAliasString(tablet.Alias), err)
t.wrangler.logger.Warningf("failed to get replication status from %v, ignoring tablet: %v", topoproto.TabletAliasString(tablet.Alias), err)
return
}
replPos, err := mysql.DecodePosition(status.Position)
replPos, err := decodePosition(status)
if err != nil {
maxPosSearch.wrangler.logger.Warningf("cannot decode slave %v position %v: %v", topoproto.TabletAliasString(tablet.Alias), status.Position, err)
t.wrangler.logger.Warningf("cannot decode slave %v position %v: %v", topoproto.TabletAliasString(tablet.Alias), status.Position, err)
return
}

maxPosSearch.maxPosLock.Lock()
if maxPosSearch.maxPosTablet == nil || !maxPosSearch.maxPos.AtLeast(replPos) {
maxPosSearch.maxPos = replPos
maxPosSearch.maxPosTablet = tablet
t.statusLock.Lock()
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a TODO: Refactor this into a send on a channel. Multiple goroutines needing to append results to a list is a textbook use case for a channel.

t.tabletReplInfos = append(t.tabletReplInfos, &replicationInfo{
alias: tablet.Alias,
position: replPos,
})
t.statusLock.Unlock()
}

// unpackTabletAliases will turn a []*replicationInfo from the tabletReplInfos field into an []*TabletAlias.
func (t *tabletReplInfoAggregator) unpackTabletAliases() []*topodatapb.TabletAlias {
t.statusLock.Lock()
tabletAliases := make([]*topodatapb.TabletAlias, 0, len(t.tabletReplInfos))

for i := range t.tabletReplInfos {
tabletAliases = append(tabletAliases, t.tabletReplInfos[i].alias)
}
maxPosSearch.maxPosLock.Unlock()
t.statusLock.Unlock()
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could defer this right after the Lock to make it safer in case additional return paths are ever added.


return tabletAliases
}

func (t *tabletReplInfoAggregator) Len() int {
return len(t.tabletReplInfos)
}
func (t *tabletReplInfoAggregator) Swap(i, j int) {
t.tabletReplInfos[i], t.tabletReplInfos[j] = t.tabletReplInfos[j], t.tabletReplInfos[i]
}
func (t *tabletReplInfoAggregator) Less(i, j int) bool {
return !t.tabletReplInfos[i].position.AtLeast(t.tabletReplInfos[j].position)
}

// chooseNewMaster finds a tablet that is going to become master after reparent. The criteria
// for the new master-elect are (preferably) to be in the same cell as the current master, and
// to be different from avoidMasterTabletAlias. The tablet with the largest replication
// for the new master-elect are to be in the same cell as the current master, and
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@enisoc do you know why we limit the search to master's current cell?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It makes sense that we'd want to prefer the same cell when possible, as the comment originally said, but I don't know if it was intentional that this code has actually been requiring the same cell. It does make sense from a safety perspective, but I was also surprised by this since the vtctl command docs don't mention it.

In any case, I advised @PrismaPhonic that we should probably preserve the behavior rather than fixing it to match the comment, since users may have come to rely on this and unexpectedly reparenting to a remote cell could cause an outage if the app isn't prepared for it. What do you think?

On a related note, PRS used to only contact tablets in the same cell to get their positions, but now we will contact tablets in all cells and throw away most of them. Do you think it's worth fixing this to avoid the unnecessary calls?

// to be different from avoidMasterTabletAlias, if supplied. The tablet with the largest replication
// position is chosen to minimize the time of catching up with the master. Note that the search
// for largest replication position will race with transactions being executed on the master at
// the same time, so when all tablets are roughly at the same position then the choice of the
Expand All @@ -808,37 +837,71 @@ func (wr *Wrangler) chooseNewMaster(
tabletMap map[string]*topo.TabletInfo,
avoidMasterTabletAlias *topodatapb.TabletAlias,
waitReplicasTimeout time.Duration) (*topodatapb.TabletAlias, error) {

if avoidMasterTabletAlias == nil {
return nil, fmt.Errorf("tablet to avoid for reparent is not provided, cannot choose new master")
candidates, err := wr.FindMasterCandidates(ctx, tabletMap, waitReplicasTimeout)
if err != nil {
return nil, err
}

var masterCell string
if shardInfo.MasterAlias != nil {
masterCell = shardInfo.MasterAlias.Cell
}

maxPosSearch := maxReplPosSearch{
// Filter out avoidMasterTabletAlias, if it exists in the candidates list, and also filter out
// any candidates that aren't in the master tablets cell.
filteredCandidates := make([]*topodatapb.TabletAlias, 0, len(candidates))
for i := range candidates {
if candidates[i] != avoidMasterTabletAlias && candidates[i].Cell == masterCell {
filteredCandidates = append(filteredCandidates, candidates[i])
}
}

if len(filteredCandidates) == 0 {
return nil, nil
}
return filteredCandidates[0], nil
}

// FindMasterCandidates will look at all of the tablets in the supplied tabletMap, and return a list of tabletAliases that
// are all caught up. This means that all of the returned TabletAliases will have the same replication position.
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I'm reading this right, I don't see a guarantee (or a check) that the returned replicas are actually caught up. So we should fix the comments: "all caught up" -> "the most up-to-date" OR "the most caught up".

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we ignore tablets we can't reach, we should clarify that it's additionally "the most up-to-date that we were able to reach". It's therefore important to still call CheckTabletIsFarthestAhead.

func (wr *Wrangler) FindMasterCandidates(
ctx context.Context,
tabletMap map[string]*topo.TabletInfo,
waitReplicasTimeout time.Duration) ([]*topodatapb.TabletAlias, error) {
aggregator := &tabletReplInfoAggregator{
wrangler: wr,
ctx: ctx,
waitReplicasTimeout: waitReplicasTimeout,
waitGroup: sync.WaitGroup{},
maxPosLock: sync.Mutex{},
statusLock: sync.Mutex{},
tabletReplInfos: make([]*replicationInfo, 0, len(tabletMap)),
}
for _, tabletInfo := range tabletMap {
if (masterCell != "" && tabletInfo.Alias.Cell != masterCell) ||
topoproto.TabletAliasEqual(tabletInfo.Alias, avoidMasterTabletAlias) ||
tabletInfo.Tablet.Type != topodatapb.TabletType_REPLICA {
if tabletInfo.Tablet.Type != topodatapb.TabletType_REPLICA {
continue
}
maxPosSearch.waitGroup.Add(1)
go maxPosSearch.processTablet(tabletInfo.Tablet)
aggregator.waitGroup.Add(1)
go aggregator.processTablet(tabletInfo.Tablet)
}
maxPosSearch.waitGroup.Wait()
aggregator.waitGroup.Wait()

if maxPosSearch.maxPosTablet == nil {
return nil, nil
sort.Sort(aggregator)

if len(aggregator.tabletReplInfos) < 2 {
// Either 1 result, or 0. Regardless, we can return the list at this point as we don't need to
// filter out tablets that aren't fully caught up.
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See above comment. It seems that "fully caught up" should be changed to "the most caught up"

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahh, good point, will update.

return aggregator.unpackTabletAliases(), nil
}
return maxPosSearch.maxPosTablet.Alias, nil

for i := 1; i < len(aggregator.tabletReplInfos); i++ {
if aggregator.Less(i, i-1) {
// We found the first one that isn't fully caught up. Remove this and all further list items.
aggregator.tabletReplInfos = aggregator.tabletReplInfos[0:i]
break
}
}

return aggregator.unpackTabletAliases(), nil
}

// EmergencyReparentShard will make the provided tablet the master for
Expand Down Expand Up @@ -947,26 +1010,9 @@ func (wr *Wrangler) emergencyReparentShardLocked(ctx context.Context, ev *events
return fmt.Errorf("lost topology lock, aborting: %v", err)
}

// Verify masterElect is alive and has the most advanced position
masterElectStatus, ok := statusMap[masterElectTabletAliasStr]
if !ok {
return fmt.Errorf("couldn't get master elect %v replication position", topoproto.TabletAliasString(masterElectTabletAlias))
}
masterElectPos, err := mysql.DecodePosition(masterElectStatus.Position)
if err != nil {
return fmt.Errorf("cannot decode master elect position %v: %v", masterElectStatus.Position, err)
}
for alias, status := range statusMap {
if alias == masterElectTabletAliasStr {
continue
}
pos, err := mysql.DecodePosition(status.Position)
if err != nil {
return fmt.Errorf("cannot decode replica %v position %v: %v", alias, status.Position, err)
}
if !masterElectPos.AtLeast(pos) {
return fmt.Errorf("tablet %v is more advanced than master elect tablet %v: %v > %v", alias, masterElectTabletAliasStr, status.Position, masterElectStatus.Position)
}
// Bail out if the master tablet candidate is not the farthest ahead.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't realize that we ignore errors from StopReplicationAndGetStatus() above. Doesn't that mean ERS might cause you to lose transactions? I guess we need a force mode like that, but I would have expected an intermediate option as well that says, reparent only if it's safe.

@deepthi @sougou Did you know about this?

I'm also worried that it doesn't look like we have any step to wait for replicas to finish playing their relay logs. It seems like we just stop them and leave them stopped if our checks fail? If that's the case, then we might never meet the new criterion of ensuring the new master candidate has executed at least as far as anybody's relay log.

if err := wr.CheckTabletIsFarthestAhead(masterElectTabletAliasStr, statusMap); err != nil {
return err
}

// Promote the masterElect
Expand Down Expand Up @@ -1043,6 +1089,52 @@ func (wr *Wrangler) emergencyReparentShardLocked(ctx context.Context, ev *events
return nil
}

// CheckTabletIsFarthestAhead will take a tablet alias string, along with a statusMap of tablet alias strings to tablet Status objects,
// and return an error if the tablet is not the farthest ahead, or otherwise return nil if it is the farthest ahead.
func (wr *Wrangler) CheckTabletIsFarthestAhead(tabletAliasStr string, statusMap map[string]*replicationdatapb.Status) error {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we actually need to ensure that the candidate master's executed position is at least as advanced as every replica's relay log pos. Promoting a replica to master purges its relay log, so only the transactions that have been executed should count.

tabletStatus, ok := statusMap[tabletAliasStr]
if !ok {
return fmt.Errorf("couldn't get tablet %v replication position", tabletAliasStr)
}
candidatePos, err := decodePosition(tabletStatus)
if err != nil {
return fmt.Errorf("cannot decode tablet position %v: %v", tabletStatus.Position, err)
}
for alias, status := range statusMap {
if alias == tabletAliasStr {
continue
}
pos, err := decodePosition(status)
if err != nil {
return fmt.Errorf("cannot decode and merge replica %v executed and retrieved positions %v: %v", alias, status.Position, err)
}
if !candidatePos.AtLeast(pos) {
return fmt.Errorf("tablet %v is more advanced than master elect tablet %v: %v > %v", alias, tabletAliasStr, status.Position, tabletStatus.Position)
}
}

return nil
}

// decodePosition is a helper that will decode the RelayLogPosition, if it's non-empty, and otherwise fall back
// to the executed position.
func decodePosition(status *replicationdatapb.Status) (mysql.Position, error) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should name this something to indicate that it's special. We don't normally want to consider the relay log position - only when trying to use semi-sync to save transactions that otherwise might be lost due to an unplanned failover.

Suggested change
func decodePosition(status *replicationdatapb.Status) (mysql.Position, error) {
func bestEffortRelayLogPosition(status *replicationdatapb.Status) (mysql.Position, error) {

relayPos, err := mysql.DecodePosition(status.RelayLogPosition)
if err != nil {
log.Infof("Decode relay log position failed, err: %v", err)
return mysql.Position{}, err
}
if !relayPos.IsZero() {
return relayPos, nil
}
executedPos, err := mysql.DecodePosition(status.Position)
if err != nil {
log.Infof("Decode position failed, err: %v", err)
return mysql.Position{}, err
}
return executedPos, nil
}

// TabletExternallyReparented changes the type of new master for this shard to MASTER
// and updates it's tablet record in the topo. Updating the shard record is handled
// by the new master tablet
Expand Down
Loading