Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
59 commits
Select commit Hold shift + click to select a range
13aba46
Don't allow resume if VDiff not completed.
mattlord Jul 6, 2022
8c8d024
Record and display VDiff errors per shard
mattlord Jul 6, 2022
b6d687f
Merge branch 'main' into vdiff2_retry
mattlord Jul 6, 2022
c9bddf3
Better align errors in text based output
mattlord Jul 6, 2022
cc02035
Work with underlying database errors
mattlord Jul 6, 2022
1025394
Retry when appropriate on vdiff engine startup
mattlord Jul 7, 2022
e690daa
Address bugs in resume/retry logic after adding support for both
mattlord Jul 8, 2022
08f68f8
Make auto-retry the default
mattlord Jul 8, 2022
070e2ec
Ensure report is valid json before unmarshaling
mattlord Jul 8, 2022
c992fe8
Merge remote-tracking branch 'origin/main' into vdiff2_retry
mattlord Jul 8, 2022
bcca833
Auto retry error'd VDiffs
mattlord Jul 8, 2022
eb560b1
Don't retry error'd VDiffs on engine start anymore
mattlord Jul 8, 2022
05b6070
Limit withDDL usage to entry points
mattlord Jul 8, 2022
53d4824
Add e2e test and fix bugs it exposed
mattlord Jul 8, 2022
6d30ecd
Merge remote-tracking branch 'origin/main' into vdiff2_retry
mattlord Jul 8, 2022
c859fb4
Ensure we always signal the retry goroutine to stop on engine.Close()
mattlord Jul 8, 2022
421d6b3
Use vdiff engine mutex during retries
mattlord Jul 9, 2022
7ff193b
Close retry goroutine on vde ctx cancel w/o done channel
mattlord Jul 9, 2022
a528811
Open & Close of VDiff engine controls retry goroutine
mattlord Jul 9, 2022
0fc3ad7
Rely on sync.Once to apply VDiff schema
mattlord Jul 9, 2022
8846231
Minor change after self review
mattlord Jul 9, 2022
37475a6
Tidy up vdiff retry goroutine mgmt
mattlord Jul 9, 2022
2554fb5
Improving gorouting mgmt -- trying to eliminate flakes
mattlord Jul 9, 2022
2467045
Moar safety
mattlord Jul 9, 2022
53c7b09
Making more tweaks to exec and term more quickly
mattlord Jul 9, 2022
91e4385
Aye dios mio
mattlord Jul 9, 2022
a27ec3a
Go back to 30s ticker
mattlord Jul 9, 2022
c1766b3
Make engine open more efficient to improve PRS times
mattlord Jul 9, 2022
b6670ce
isOpen=true before initControllers to ensure proper Close
mattlord Jul 9, 2022
daf33f3
Do lazy init in 2 entry points when doing actual work.
mattlord Jul 9, 2022
71c6b94
Bug fixes and improvements
mattlord Jul 10, 2022
6341281
Final (🤞) set of bug fixes
mattlord Jul 11, 2022
d05a1bf
Add progress reporting
mattlord Jul 11, 2022
34d122b
Minor changes after self review
mattlord Jul 12, 2022
cb31385
Improved template for error handling in text format
mattlord Jul 12, 2022
16083ce
Bug fixes around progress calculation
mattlord Jul 12, 2022
41bc5b3
Comment improvement
mattlord Jul 12, 2022
399266e
Ensure we get correct DB name from vreplication workflow.
mattlord Jul 12, 2022
05a85a7
Bug fixes and improvements to progress reporting
mattlord Jul 12, 2022
b1fe4b9
Minor changes after self review
mattlord Jul 12, 2022
ad1e212
Remove unnecessary case in select
mattlord Jul 14, 2022
5284fb7
Add unit test for progress reporting
mattlord Jul 15, 2022
6fed378
Be more realistic in half way unit test
mattlord Jul 15, 2022
7f83e36
Use more descriptive var names in unit test to make logic clear
mattlord Jul 15, 2022
50aa615
Merge remote-tracking branch 'origin/main' into vdiff2_retry
mattlord Jul 15, 2022
77057eb
Protect all access to vde.controllers
mattlord Jul 17, 2022
bcef795
De-flake ers_prs tests using state waits with timeouts
mattlord Jul 17, 2022
1225cb7
Minor improvement -- also to exec the test suite again
mattlord Jul 18, 2022
8004e07
Merge branch 'main' into vdiff2_retry
mattlord Jul 18, 2022
af89cfc
Use simpler defer stmt
mattlord Jul 19, 2022
86d9215
Correct VDiff final state handling
mattlord Jul 21, 2022
d7c5276
Merge remote-tracking branch 'origin/main' into vdiff2_retry
mattlord Jul 21, 2022
c4fd477
Improve WorkflowDiffer logging & error handling
mattlord Jul 21, 2022
cce31de
Minor changes after final self review
mattlord Jul 21, 2022
8fc5f94
Try to make PRS tests less flaky
mattlord Jul 21, 2022
27c7574
Try to address TestReparentGracefulRangeBased flakes
mattlord Jul 22, 2022
c23ad54
Increase timeout for PRS test
mattlord Jul 22, 2022
7f0e261
revert my PRS changes as they traded one issue for another
mattlord Jul 22, 2022
06c847b
Use withDDL.ExecIgnore during engine startup
mattlord Jul 22, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions go/mysql/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -672,6 +672,47 @@ func IsConnLostDuringQuery(err error) bool {
return false
}

// IsEphemeralError returns true if the error is ephemeral and the caller should
// retry if possible. Note: non-SQL errors are always treated as ephemeral.
func IsEphemeralError(err error) bool {
if sqlErr, ok := err.(*SQLError); ok {
en := sqlErr.Number()
switch en {
case
CRConnectionError,
CRConnHostError,
CRMalformedPacket,
CRNamedPipeStateError,
CRServerLost,
CRSSLConnectionError,
ERCantCreateThread,
ERDiskFull,
ERForcingClose,
ERGotSignal,
ERHostIsBlocked,
ERLockTableFull,
ERInnodbReadOnly,
ERInternalError,
ERLockDeadlock,
ERLockWaitTimeout,
EROutOfMemory,
EROutOfResources,
EROutOfSortMemory,
ERQueryInterrupted,
ERServerIsntAvailable,
ERServerShutdown,
ERTooManyUserConnections,
ERUnknownError,
ERUserLimitReached:
return true
default:
return false
}
}
// If it's not an sqlError then we assume it's ephemeral
return true
}

// IsTooManyConnectionsErr returns true if the error is due to too many connections.
func IsTooManyConnectionsErr(err error) bool {
if sqlErr, ok := err.(*SQLError); ok {
Expand Down
9 changes: 5 additions & 4 deletions go/test/endtoend/reparent/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -514,10 +514,11 @@ func GetShardReplicationPositions(t *testing.T, clusterInstance *cluster.LocalPr
}

func WaitForReplicationToStart(t *testing.T, clusterInstance *cluster.LocalProcessCluster, keyspaceName, shardName string, tabletCnt int, doPrint bool) {
tck := time.NewTicker(500 * time.Millisecond)
tkr := time.NewTicker(500 * time.Millisecond)
defer tkr.Stop()
for {
select {
case <-tck.C:
case <-tkr.C:
strArray := GetShardReplicationPositions(t, clusterInstance, KeyspaceName, shardName, true)
if len(strArray) == tabletCnt && strings.Contains(strArray[0], "primary") { // primary first
return
Expand Down Expand Up @@ -575,13 +576,13 @@ func CheckReparentFromOutside(t *testing.T, clusterInstance *cluster.LocalProces
// WaitForReplicationPosition waits for tablet B to catch up to the replication position of tablet A.
func WaitForReplicationPosition(t *testing.T, tabletA *cluster.Vttablet, tabletB *cluster.Vttablet) error {
posA, _ := cluster.GetPrimaryPosition(t, *tabletA, Hostname)
timeout := time.Now().Add(5 * time.Second)
timeout := time.Now().Add(replicationWaitTimeout)
for time.Now().Before(timeout) {
posB, _ := cluster.GetPrimaryPosition(t, *tabletB, Hostname)
if positionAtLeast(t, tabletB, posA, posB) {
return nil
}
time.Sleep(100 * time.Millisecond)
time.Sleep(500 * time.Millisecond)
}
return fmt.Errorf("failed to catch up on replication position")
}
Expand Down
167 changes: 127 additions & 40 deletions go/test/endtoend/vreplication/vdiff2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,47 +31,66 @@ type testCase struct {
tables string
workflow string
tabletBaseID int
resume bool // test resume functionality with this workflow
resumeInsert string // if testing resume, what new rows should be diff'd
testCLIErrors bool // test CLI errors against this workflow
autoRetryError bool // if true, test auto retry on error against this workflow
// If testing auto retry on error, what new rows should be diff'd. These rows must have a PK > all initial rows.
retryInsert string
resume bool // test resume functionality with this workflow
// If testing resume, what new rows should be diff'd. These rows must have a PK > all initial rows and retry rows.
resumeInsert string
testCLIErrors bool // test CLI errors against this workflow
}

const (
sqlSimulateError = `update _vt.vdiff as vd, _vt.vdiff_table as vdt set vd.state = 'error', vdt.state = 'error', vd.completed_at = NULL,
vd.last_error = 'vttablet: rpc error: code = Unknown desc = (errno 1213) (sqlstate 40001): Deadlock found when trying to get lock; try restarting transaction'
where vd.vdiff_uuid = %s and vd.id = vdt.vdiff_id`
sqlAnalyzeTable = `analyze table %s`
)

var testCases = []*testCase{
{
name: "MoveTables/unsharded to two shards",
workflow: "p1c2",
typ: "MoveTables",
sourceKs: "product",
targetKs: "customer",
sourceShards: "0",
targetShards: "-80,80-",
tabletBaseID: 200,
tables: "customer,Lead,Lead-1",
resume: true,
resumeInsert: `insert into customer(cid, name, typ) values(12345678, 'Testy McTester', 'soho')`,
testCLIErrors: true, // test for errors in the simplest workflow
name: "MoveTables/unsharded to two shards",
workflow: "p1c2",
typ: "MoveTables",
sourceKs: "product",
targetKs: "customer",
sourceShards: "0",
targetShards: "-80,80-",
tabletBaseID: 200,
tables: "customer,Lead,Lead-1",
autoRetryError: true,
retryInsert: `insert into customer(cid, name, typ) values(91234, 'Testy McTester', 'soho')`,
resume: true,
resumeInsert: `insert into customer(cid, name, typ) values(92234, 'Testy McTester (redux)', 'enterprise')`,
testCLIErrors: true, // test for errors in the simplest workflow
},
{
name: "Reshard Merge/split 2 to 3",
workflow: "c2c3",
typ: "Reshard",
sourceKs: "customer",
targetKs: "customer",
sourceShards: "-80,80-",
targetShards: "-40,40-a0,a0-",
tabletBaseID: 400,
resume: true,
resumeInsert: `insert into customer(cid, name, typ) values(987654321, 'Testy McTester Jr.', 'enterprise'), (987654322, 'Testy McTester III', 'enterprise')`,
name: "Reshard Merge/split 2 to 3",
workflow: "c2c3",
typ: "Reshard",
sourceKs: "customer",
targetKs: "customer",
sourceShards: "-80,80-",
targetShards: "-40,40-a0,a0-",
tabletBaseID: 400,
autoRetryError: true,
retryInsert: `insert into customer(cid, name, typ) values(93234, 'Testy McTester Jr', 'enterprise'), (94234, 'Testy McTester II', 'enterprise')`,
resume: true,
resumeInsert: `insert into customer(cid, name, typ) values(95234, 'Testy McTester III', 'enterprise')`,
},
{
name: "Reshard/merge 3 to 1",
workflow: "c3c1",
typ: "Reshard",
sourceKs: "customer",
targetKs: "customer",
sourceShards: "-40,40-a0,a0-",
targetShards: "0",
tabletBaseID: 700,
name: "Reshard/merge 3 to 1",
workflow: "c3c1",
typ: "Reshard",
sourceKs: "customer",
targetKs: "customer",
sourceShards: "-40,40-a0,a0-",
targetShards: "0",
tabletBaseID: 700,
autoRetryError: true,
retryInsert: `insert into customer(cid, name, typ) values(96234, 'Testy McTester IV', 'enterprise')`,
resume: true,
resumeInsert: `insert into customer(cid, name, typ) values(97234, 'Testy McTester V', 'enterprise'), (98234, 'Testy McTester VI', 'enterprise')`,
},
}

Expand Down Expand Up @@ -105,12 +124,13 @@ func TestVDiff2(t *testing.T) {
verifyClusterHealth(t, vc)

insertInitialData(t)

// Insert null and empty enum values for testing vdiff comparisons for those values.
// If we add this to the initial data list, the counts in several other tests will need to change
query := `insert into customer(cid, name, typ, sport) values(1001, null, 'soho','')`
execVtgateQuery(t, vtgateConn, fmt.Sprintf("%s:%s", sourceKs, sourceShards[0]), query)

generateMoreCustomers(t, sourceKs, 100)

_, err := vc.AddKeyspace(t, cells, targetKs, strings.Join(targetShards, ","), customerVSchema, customerSchema, 0, 0, 200, targetKsOpts)
require.NoError(t, err)
for _, shard := range targetShards {
Expand Down Expand Up @@ -149,16 +169,17 @@ func testWorkflow(t *testing.T, vc *VitessCluster, tc *testCase, cells []*Cell)
for _, shard := range arrTargetShards {
tab := vc.getPrimaryTablet(t, tc.targetKs, shard)
catchup(t, tab, tc.workflow, tc.typ)
updateTableStats(t, tab, tc.tables) // need to do this in order to test progress reports
}

vdiff(t, tc.targetKs, tc.workflow, cells[0].Name, true, true, nil)

if tc.autoRetryError {
testAutoRetryError(t, tc, cells[0].Name)
}

if tc.resume {
expectedRows := int64(0)
if tc.resumeInsert != "" {
res := execVtgateQuery(t, vtgateConn, tc.sourceKs, tc.resumeInsert)
expectedRows = int64(res.RowsAffected)
}
vdiff2Resume(t, tc.targetKs, tc.workflow, cells[0].Name, expectedRows)
testResume(t, tc, cells[0].Name)
}

// This is done here so that we have a valid workflow to test the commands against
Expand Down Expand Up @@ -224,3 +245,69 @@ func testNoOrphanedData(t *testing.T, keyspace, workflow string, shards []string
}
})
}

func testResume(t *testing.T, tc *testCase, cells string) {
t.Run("Resume", func(t *testing.T) {
ksWorkflow := fmt.Sprintf("%s.%s", tc.targetKs, tc.workflow)

// confirm the last VDiff is in the expected completed state
uuid, output := performVDiff2Action(t, ksWorkflow, cells, "show", "last", false)
jsonOutput := getVDiffInfo(output)
require.Equal(t, "completed", jsonOutput.State)
// save the number of rows compared in previous runs
rowsCompared := jsonOutput.RowsCompared
ogTime := time.Now() // the completed_at should be later than this after resuming

expectedNewRows := int64(0)
if tc.resumeInsert != "" {
res := execVtgateQuery(t, vtgateConn, tc.sourceKs, tc.resumeInsert)
expectedNewRows = int64(res.RowsAffected)
}
expectedRows := rowsCompared + expectedNewRows

// confirm that the VDiff was resumed, able to complete, and we compared the
// expected number of rows in total (original run and resume)
uuid, _ = performVDiff2Action(t, ksWorkflow, cells, "resume", uuid, false)
info := waitForVDiff2ToComplete(t, ksWorkflow, cells, uuid, ogTime)
require.False(t, info.HasMismatch)
require.Equal(t, expectedRows, info.RowsCompared)
})
}

func testAutoRetryError(t *testing.T, tc *testCase, cells string) {
t.Run("Auto retry on error", func(t *testing.T) {
ksWorkflow := fmt.Sprintf("%s.%s", tc.targetKs, tc.workflow)

// confirm the last VDiff is in the expected completed state
uuid, output := performVDiff2Action(t, ksWorkflow, cells, "show", "last", false)
jsonOutput := getVDiffInfo(output)
require.Equal(t, "completed", jsonOutput.State)
// save the number of rows compared in the first run
rowsCompared := jsonOutput.RowsCompared
ogTime := time.Now() // the completed_at should be later than this upon retry

// create new data since original VDiff run -- if requested -- to confirm that the rows
// compared is cumulative
expectedNewRows := int64(0)
if tc.retryInsert != "" {
res := execVtgateQuery(t, vtgateConn, tc.sourceKs, tc.retryInsert)
expectedNewRows = int64(res.RowsAffected)
}
expectedRows := rowsCompared + expectedNewRows

// update the VDiff to simulate an ephemeral error having occurred
for _, shard := range strings.Split(tc.targetShards, ",") {
tab := vc.getPrimaryTablet(t, tc.targetKs, shard)
res, err := tab.QueryTabletWithDB(fmt.Sprintf(sqlSimulateError, encodeString(uuid)), "vt_"+tc.targetKs)
require.NoError(t, err)
// should have updated the vdiff record and at least one vdiff_table record
require.GreaterOrEqual(t, int(res.RowsAffected), 2)
}

// confirm that the VDiff was retried, able to complete, and we compared the expected
// number of rows in total (original run and retry)
info := waitForVDiff2ToComplete(t, ksWorkflow, cells, uuid, ogTime)
require.False(t, info.HasMismatch)
require.Equal(t, expectedRows, info.RowsCompared)
})
}
Loading