Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 22 additions & 15 deletions go/vt/vtctl/reparentutil/emergency_reparenter.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,17 @@ import (
"sync"
"time"

"vitess.io/vitess/go/mysql/replication"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/vtctl/reparentutil/policy"

"vitess.io/vitess/go/event"
"vitess.io/vitess/go/mysql/replication"
"vitess.io/vitess/go/sets"
"vitess.io/vitess/go/stats"
"vitess.io/vitess/go/vt/concurrency"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/logutil"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/topoproto"
"vitess.io/vitess/go/vt/topotools/events"
"vitess.io/vitess/go/vt/vtctl/reparentutil/policy"
"vitess.io/vitess/go/vt/vtctl/reparentutil/promotionrule"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vttablet/tmclient"
Expand Down Expand Up @@ -154,7 +153,7 @@ func (erp *EmergencyReparenter) reparentShardLocked(ctx context.Context, ev *eve
shardInfo *topo.ShardInfo
prevPrimary *topodatapb.Tablet
tabletMap map[string]*topo.TabletInfo
validCandidates map[string]replication.Position
validCandidates map[string]*RelayLogPositions
intermediateSource *topodatapb.Tablet
validCandidateTablets []*topodatapb.Tablet
validReplacementCandidates []*topodatapb.Tablet
Expand Down Expand Up @@ -330,7 +329,7 @@ func (erp *EmergencyReparenter) reparentShardLocked(ctx context.Context, ev *eve

func (erp *EmergencyReparenter) waitForAllRelayLogsToApply(
ctx context.Context,
validCandidates map[string]replication.Position,
validCandidates map[string]*RelayLogPositions,
tabletMap map[string]*topo.TabletInfo,
statusMap map[string]*replicationdatapb.StopReplicationStatus,
waitReplicasTimeout time.Duration,
Expand Down Expand Up @@ -396,7 +395,7 @@ func (erp *EmergencyReparenter) waitForAllRelayLogsToApply(

// findMostAdvanced finds the intermediate source for ERS. We always choose the most advanced one from our valid candidates list. Further ties are broken by looking at the promotion rules.
func (erp *EmergencyReparenter) findMostAdvanced(
validCandidates map[string]replication.Position,
validCandidates map[string]*RelayLogPositions,
tabletMap map[string]*topo.TabletInfo,
opts EmergencyReparentOptions,
) (*topodatapb.Tablet, []*topodatapb.Tablet, error) {
Expand Down Expand Up @@ -791,11 +790,11 @@ func (erp *EmergencyReparenter) filterValidCandidates(validTablets []*topodatapb
// The caller of this function (ERS) will then choose from among the list of candidate tablets, based on higher-level criteria.
func (erp *EmergencyReparenter) findErrantGTIDs(
ctx context.Context,
validCandidates map[string]replication.Position,
validCandidates map[string]*RelayLogPositions,
statusMap map[string]*replicationdatapb.StopReplicationStatus,
tabletMap map[string]*topo.TabletInfo,
waitReplicasTimeout time.Duration,
) (map[string]replication.Position, error) {
) (map[string]*RelayLogPositions, error) {
// First we need to collect the reparent journal length for all the candidates.
// This will tell us, which of the tablets are severly lagged, and haven't even seen all the primary promotions.
// Such severely lagging tablets cannot be used to find errant GTIDs in other tablets, seeing that they themselves don't have enough information.
Expand All @@ -820,8 +819,13 @@ func (erp *EmergencyReparenter) findErrantGTIDs(

// We use all the candidates with the maximum length of the reparent journal to find the errant GTIDs amongst them.
var maxLenPositions []replication.Position
updatedValidCandidates := make(map[string]replication.Position)
updatedValidCandidates := make(map[string]*RelayLogPositions)
for _, candidate := range maxLenCandidates {
candidatePositions := validCandidates[candidate]
if candidatePositions == nil || candidatePositions.IsZero() {
continue
}

status, ok := statusMap[candidate]
if !ok {
// If the tablet is not in the status map, and has the maximum length of the reparent journal,
Expand All @@ -834,7 +838,7 @@ func (erp *EmergencyReparenter) findErrantGTIDs(
// 4. During this ERS call, both A and B are seen online. They would both report being primary tablets with the same reparent journal length.
// Even in this case, the best we can do is not run errant GTID detection on either, and let the split brain detection code
// deal with it, if A in fact has errant GTIDs.
maxLenPositions = append(maxLenPositions, validCandidates[candidate])
maxLenPositions = append(maxLenPositions, candidatePositions.Combined)
updatedValidCandidates[candidate] = validCandidates[candidate]
continue
}
Expand All @@ -844,7 +848,10 @@ func (erp *EmergencyReparenter) findErrantGTIDs(
if otherCandidate == candidate {
continue
}
otherPositions = append(otherPositions, validCandidates[otherCandidate])
otherPosition := validCandidates[otherCandidate]
if otherPosition != nil || !otherPosition.IsZero() {
otherPositions = append(otherPositions, otherPosition.Combined)
}
}
// Run errant GTID detection and throw away any tablet that has errant GTIDs.
afterStatus := replication.ProtoToReplicationStatus(status.After)
Expand All @@ -856,7 +863,7 @@ func (erp *EmergencyReparenter) findErrantGTIDs(
log.Errorf("skipping %v with GTIDSet:%v because we detected errant GTIDs - %v", candidate, afterStatus.RelayLogPosition.GTIDSet, errantGTIDs)
continue
}
maxLenPositions = append(maxLenPositions, validCandidates[candidate])
maxLenPositions = append(maxLenPositions, candidatePositions.Combined)
updatedValidCandidates[candidate] = validCandidates[candidate]
}

Expand All @@ -882,7 +889,7 @@ func (erp *EmergencyReparenter) findErrantGTIDs(
// This exact scenario outlined above, can be found in the test for this function, subtest `Case 5a`.
// The idea is that if the tablet is lagged, then even the server UUID that it is replicating from
// should not be considered a valid source of writes that no other tablet has.
errantGTIDs, err := replication.FindErrantGTIDs(validCandidates[alias], replication.SID{}, maxLenPositions)
errantGTIDs, err := replication.FindErrantGTIDs(validCandidates[alias].Combined, replication.SID{}, maxLenPositions)
if err != nil {
return nil, err
}
Expand All @@ -899,7 +906,7 @@ func (erp *EmergencyReparenter) findErrantGTIDs(
// gatherReparenJournalInfo reads the reparent journal information from all the tablets in the valid candidates list.
func (erp *EmergencyReparenter) gatherReparenJournalInfo(
ctx context.Context,
validCandidates map[string]replication.Position,
validCandidates map[string]*RelayLogPositions,
tabletMap map[string]*topo.TabletInfo,
waitReplicasTimeout time.Duration,
) (map[string]int32, error) {
Expand Down
102 changes: 80 additions & 22 deletions go/vt/vtctl/reparentutil/emergency_reparenter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2402,7 +2402,7 @@ func TestEmergencyReparenter_waitForAllRelayLogsToApply(t *testing.T) {
tests := []struct {
name string
tmc *testutil.TabletManagerClient
candidates map[string]replication.Position
candidates map[string]*RelayLogPositions
tabletMap map[string]*topo.TabletInfo
statusMap map[string]*replicationdatapb.StopReplicationStatus
shouldErr bool
Expand All @@ -2419,7 +2419,7 @@ func TestEmergencyReparenter_waitForAllRelayLogsToApply(t *testing.T) {
},
},
},
candidates: map[string]replication.Position{
candidates: map[string]*RelayLogPositions{
"zone1-0000000100": {},
"zone1-0000000101": {},
},
Expand Down Expand Up @@ -2467,7 +2467,7 @@ func TestEmergencyReparenter_waitForAllRelayLogsToApply(t *testing.T) {
},
},
},
candidates: map[string]replication.Position{
candidates: map[string]*RelayLogPositions{
"zone1-0000000100": {},
"zone1-0000000101": {},
},
Expand Down Expand Up @@ -2518,7 +2518,7 @@ func TestEmergencyReparenter_waitForAllRelayLogsToApply(t *testing.T) {
},
},
},
candidates: map[string]replication.Position{
candidates: map[string]*RelayLogPositions{
"zone1-0000000100": {},
"zone1-0000000101": {},
"zone1-0000000102": {},
Expand Down Expand Up @@ -2583,7 +2583,7 @@ func TestEmergencyReparenter_waitForAllRelayLogsToApply(t *testing.T) {
},
},
},
candidates: map[string]replication.Position{
candidates: map[string]*RelayLogPositions{
"zone1-0000000100": {},
"zone1-0000000101": {},
},
Expand Down Expand Up @@ -2793,37 +2793,69 @@ func TestEmergencyReparenter_findMostAdvanced(t *testing.T) {
Sequence: 11,
}

positionMostAdvanced := replication.Position{GTIDSet: replication.Mysql56GTIDSet{}}
positionMostAdvanced.GTIDSet = positionMostAdvanced.GTIDSet.AddGTID(mysqlGTID1)
positionMostAdvanced.GTIDSet = positionMostAdvanced.GTIDSet.AddGTID(mysqlGTID2)
positionMostAdvanced.GTIDSet = positionMostAdvanced.GTIDSet.AddGTID(mysqlGTID3)
// most advanced gtid set
positionMostAdvanced := &RelayLogPositions{
Combined: replication.Position{GTIDSet: replication.Mysql56GTIDSet{}},
Executed: replication.Position{GTIDSet: replication.Mysql56GTIDSet{}},
}
positionMostAdvanced.Combined.GTIDSet = positionMostAdvanced.Combined.GTIDSet.AddGTID(mysqlGTID1)
positionMostAdvanced.Combined.GTIDSet = positionMostAdvanced.Combined.GTIDSet.AddGTID(mysqlGTID2)
positionMostAdvanced.Combined.GTIDSet = positionMostAdvanced.Combined.GTIDSet.AddGTID(mysqlGTID3)
positionMostAdvanced.Executed.GTIDSet = positionMostAdvanced.Executed.GTIDSet.AddGTID(mysqlGTID1)
positionMostAdvanced.Executed.GTIDSet = positionMostAdvanced.Executed.GTIDSet.AddGTID(mysqlGTID2)

// same combined gtid set as positionMostAdvanced, but 1 position behind in gtid executed
positionAlmostMostAdvanced := &RelayLogPositions{
Combined: replication.Position{GTIDSet: replication.Mysql56GTIDSet{}},
Executed: replication.Position{GTIDSet: replication.Mysql56GTIDSet{}},
}
positionAlmostMostAdvanced.Combined.GTIDSet = positionAlmostMostAdvanced.Combined.GTIDSet.AddGTID(mysqlGTID1)
positionAlmostMostAdvanced.Combined.GTIDSet = positionAlmostMostAdvanced.Combined.GTIDSet.AddGTID(mysqlGTID2)
positionAlmostMostAdvanced.Combined.GTIDSet = positionAlmostMostAdvanced.Combined.GTIDSet.AddGTID(mysqlGTID3)
positionAlmostMostAdvanced.Executed.GTIDSet = positionAlmostMostAdvanced.Executed.GTIDSet.AddGTID(mysqlGTID1)

positionIntermediate1 := replication.Position{GTIDSet: replication.Mysql56GTIDSet{}}
positionIntermediate1.GTIDSet = positionIntermediate1.GTIDSet.AddGTID(mysqlGTID1)
positionIntermediate1 := &RelayLogPositions{
Combined: replication.Position{GTIDSet: replication.Mysql56GTIDSet{}},
Executed: replication.Position{GTIDSet: replication.Mysql56GTIDSet{}},
}
positionIntermediate1.Combined.GTIDSet = positionIntermediate1.Combined.GTIDSet.AddGTID(mysqlGTID1)
positionIntermediate1.Executed.GTIDSet = positionIntermediate1.Executed.GTIDSet.AddGTID(mysqlGTID1)

positionIntermediate2 := replication.Position{GTIDSet: replication.Mysql56GTIDSet{}}
positionIntermediate2.GTIDSet = positionIntermediate2.GTIDSet.AddGTID(mysqlGTID1)
positionIntermediate2.GTIDSet = positionIntermediate2.GTIDSet.AddGTID(mysqlGTID2)
positionIntermediate2 := &RelayLogPositions{
Combined: replication.Position{GTIDSet: replication.Mysql56GTIDSet{}},
Executed: replication.Position{GTIDSet: replication.Mysql56GTIDSet{}},
}
positionIntermediate2.Combined.GTIDSet = positionIntermediate2.Combined.GTIDSet.AddGTID(mysqlGTID1)
positionIntermediate2.Combined.GTIDSet = positionIntermediate2.Combined.GTIDSet.AddGTID(mysqlGTID2)
positionIntermediate2.Executed.GTIDSet = positionIntermediate2.Executed.GTIDSet.AddGTID(mysqlGTID1)

positionOnly2 := replication.Position{GTIDSet: replication.Mysql56GTIDSet{}}
positionOnly2.GTIDSet = positionOnly2.GTIDSet.AddGTID(mysqlGTID2)
positionOnly2 := &RelayLogPositions{
Combined: replication.Position{GTIDSet: replication.Mysql56GTIDSet{}},
Executed: replication.Position{GTIDSet: replication.Mysql56GTIDSet{}},
}
positionOnly2.Combined.GTIDSet = positionOnly2.Combined.GTIDSet.AddGTID(mysqlGTID2)
positionOnly2.Executed.GTIDSet = positionOnly2.Executed.GTIDSet.AddGTID(mysqlGTID2)

positionEmpty := replication.Position{GTIDSet: replication.Mysql56GTIDSet{}}
positionEmpty := &RelayLogPositions{
Combined: replication.Position{GTIDSet: replication.Mysql56GTIDSet{}},
Executed: replication.Position{GTIDSet: replication.Mysql56GTIDSet{}},
}

tests := []struct {
name string
validCandidates map[string]replication.Position
validCandidates map[string]*RelayLogPositions
tabletMap map[string]*topo.TabletInfo
emergencyReparentOps EmergencyReparentOptions
result *topodatapb.Tablet
err string
}{
{
name: "choose most advanced",
validCandidates: map[string]replication.Position{
validCandidates: map[string]*RelayLogPositions{
"zone1-0000000100": positionMostAdvanced,
"zone1-0000000101": positionIntermediate1,
"zone1-0000000102": positionIntermediate2,
"zone1-0000000103": positionAlmostMostAdvanced,
},
tabletMap: map[string]*topo.TabletInfo{
"zone1-0000000100": {
Expand All @@ -2850,6 +2882,14 @@ func TestEmergencyReparenter_findMostAdvanced(t *testing.T) {
},
},
},
"zone1-0000000103": {
Tablet: &topodatapb.Tablet{
Alias: &topodatapb.TabletAlias{
Cell: "zone1",
Uid: 103,
},
},
},
"zone1-0000000404": {
Tablet: &topodatapb.Tablet{
Alias: &topodatapb.TabletAlias{
Expand All @@ -2868,10 +2908,11 @@ func TestEmergencyReparenter_findMostAdvanced(t *testing.T) {
},
}, {
name: "choose most advanced with the best promotion rule",
validCandidates: map[string]replication.Position{
validCandidates: map[string]*RelayLogPositions{
"zone1-0000000100": positionMostAdvanced,
"zone1-0000000101": positionIntermediate1,
"zone1-0000000102": positionMostAdvanced,
"zone1-0000000103": positionAlmostMostAdvanced,
},
tabletMap: map[string]*topo.TabletInfo{
"zone1-0000000100": {
Expand Down Expand Up @@ -2900,6 +2941,14 @@ func TestEmergencyReparenter_findMostAdvanced(t *testing.T) {
Type: topodatapb.TabletType_RDONLY,
},
},
"zone1-0000000103": {
Tablet: &topodatapb.Tablet{
Alias: &topodatapb.TabletAlias{
Cell: "zone1",
Uid: 103,
},
},
},
"zone1-0000000404": {
Tablet: &topodatapb.Tablet{
Alias: &topodatapb.TabletAlias{
Expand All @@ -2922,10 +2971,11 @@ func TestEmergencyReparenter_findMostAdvanced(t *testing.T) {
Cell: "zone1",
Uid: 102,
}},
validCandidates: map[string]replication.Position{
validCandidates: map[string]*RelayLogPositions{
"zone1-0000000100": positionMostAdvanced,
"zone1-0000000101": positionIntermediate1,
"zone1-0000000102": positionMostAdvanced,
"zone1-0000000103": positionAlmostMostAdvanced,
},
tabletMap: map[string]*topo.TabletInfo{
"zone1-0000000100": {
Expand Down Expand Up @@ -2954,6 +3004,14 @@ func TestEmergencyReparenter_findMostAdvanced(t *testing.T) {
Type: topodatapb.TabletType_RDONLY,
},
},
"zone1-0000000103": {
Tablet: &topodatapb.Tablet{
Alias: &topodatapb.TabletAlias{
Cell: "zone1",
Uid: 103,
},
},
},
"zone1-0000000404": {
Tablet: &topodatapb.Tablet{
Alias: &topodatapb.TabletAlias{
Expand All @@ -2976,7 +3034,7 @@ func TestEmergencyReparenter_findMostAdvanced(t *testing.T) {
Cell: "zone1",
Uid: 102,
}},
validCandidates: map[string]replication.Position{
validCandidates: map[string]*RelayLogPositions{
"zone1-0000000100": positionOnly2,
"zone1-0000000101": positionIntermediate1,
"zone1-0000000102": positionEmpty,
Expand Down
15 changes: 9 additions & 6 deletions go/vt/vtctl/reparentutil/reparent_sorter.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package reparentutil
import (
"sort"

"vitess.io/vitess/go/mysql/replication"
"vitess.io/vitess/go/vt/vtctl/reparentutil/policy"
"vitess.io/vitess/go/vt/vterrors"

Expand All @@ -31,13 +30,13 @@ import (
// candidate for intermediate promotion in emergency reparent shard, and the new primary in planned reparent shard
type reparentSorter struct {
tablets []*topodatapb.Tablet
positions []replication.Position
positions []*RelayLogPositions
innodbBufferPool []int
durability policy.Durabler
}

// newReparentSorter creates a new reparentSorter
func newReparentSorter(tablets []*topodatapb.Tablet, positions []replication.Position, innodbBufferPool []int, durability policy.Durabler) *reparentSorter {
func newReparentSorter(tablets []*topodatapb.Tablet, positions []*RelayLogPositions, innodbBufferPool []int, durability policy.Durabler) *reparentSorter {
return &reparentSorter{
tablets: tablets,
positions: positions,
Expand Down Expand Up @@ -72,11 +71,15 @@ func (rs *reparentSorter) Less(i, j int) bool {
return true
}

if !rs.positions[i].AtLeast(rs.positions[j]) {
// sort by combined positions. if equal, also sort by the executed GTID positions.
jPositions := rs.positions[j]
iPositions := rs.positions[i]

if !iPositions.AtLeast(jPositions) {
// [i] does not have all GTIDs that [j] does
return false
}
if !rs.positions[j].AtLeast(rs.positions[i]) {
if !jPositions.AtLeast(iPositions) {
// [j] does not have all GTIDs that [i] does
return true
}
Expand All @@ -101,7 +104,7 @@ func (rs *reparentSorter) Less(i, j int) bool {

// sortTabletsForReparent sorts the tablets, given their positions for emergency reparent shard and planned reparent shard.
// Tablets are sorted first by their replication positions, with ties broken by the promotion rules.
func sortTabletsForReparent(tablets []*topodatapb.Tablet, positions []replication.Position, innodbBufferPool []int, durability policy.Durabler) error {
func sortTabletsForReparent(tablets []*topodatapb.Tablet, positions []*RelayLogPositions, innodbBufferPool []int, durability policy.Durabler) error {
// throw an error internal error in case of unequal number of tablets and positions
// fail-safe code prevents panic in sorting in case the lengths are unequal
if len(tablets) != len(positions) {
Expand Down
Loading
Loading