diff --git a/go/test/endtoend/vreplication/fk_test.go b/go/test/endtoend/vreplication/fk_test.go index a349a94ffa1..5566dcbf6bc 100644 --- a/go/test/endtoend/vreplication/fk_test.go +++ b/go/test/endtoend/vreplication/fk_test.go @@ -29,6 +29,7 @@ import ( "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/vt/log" + vttablet "vitess.io/vitess/go/vt/vttablet/common" binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" ) @@ -101,6 +102,49 @@ func TestFKWorkflow(t *testing.T) { waitForWorkflowState(t, vc, ksWorkflow, binlogdatapb.VReplicationWorkflowState_Running.String()) targetKs := vc.Cells[cellName].Keyspaces[targetKeyspace] targetTab := targetKs.Shards["0"].Tablets[fmt.Sprintf("%s-%d", cellName, targetTabletId)].Vttablet + + // Stop the LoadSimulator while we are testing for workflow error, so that + // we don't error out in the LoadSimulator as we will be shutting down source dbServer. + if withLoad { + cancel() + <-ch + } + + sourceTab := vc.Cells[cellName].Keyspaces[sourceKeyspace].Shards["0"].Tablets[fmt.Sprintf("%s-%d", cellName, 100)] + + // Stop the source database server to simulate an error during replication phase + // This should cause recoverable errors that atomic workflows should retry + // as it is already out of copy phase. + err := sourceTab.DbServer.Stop() + require.NoError(t, err) + + // Give some time for the workflow to encounter errors and potentially retry + time.Sleep(2 * vttablet.GetDefaultVReplicationConfig().RetryDelay) + + // Verify workflow is still running and hasn't terminated due to errors + waitForWorkflowState(t, vc, ksWorkflow, binlogdatapb.VReplicationWorkflowState_Running.String()) + + // Restart the source database to allow workflow to continue + err = sourceTab.DbServer.StartProvideInit(false) + require.NoError(t, err) + + err = vc.VtctldClient.ExecuteCommand("SetWritable", fmt.Sprintf("%s-%d", cellName, 100), "true") + require.NoError(t, err) + + // Restart the LoadSimulator. + if withLoad { + ctx, cancel = context.WithCancel(context.Background()) + ls = newFKLoadSimulator(t, ctx) + defer func() { + select { + case <-ctx.Done(): + default: + cancel() + } + }() + go ls.simulateLoad() + } + require.NotNil(t, targetTab) catchup(t, targetTab, workflowName, "MoveTables") vdiff(t, targetKeyspace, workflowName, cellName, nil) diff --git a/go/vt/vttablet/tabletmanager/vreplication/controller.go b/go/vt/vttablet/tabletmanager/vreplication/controller.go index 113cb2314a0..e75c4fea0e4 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/controller.go +++ b/go/vt/vttablet/tabletmanager/vreplication/controller.go @@ -305,8 +305,8 @@ func (ct *controller) runBlp(ctx context.Context) (err error) { // it's a FAILED_PRECONDITION vterror, OR we cannot identify this as // non-recoverable BUT it has persisted beyond the retry limit // (maxTimeToRetryError). In addition, we cannot restart a workflow - // started with AtomicCopy which has _any_ error. - if (err != nil && vr.WorkflowSubType == int32(binlogdatapb.VReplicationWorkflowSubType_AtomicCopy)) || + // started with AtomicCopy which has _any_ error during copy phase. + if (err != nil && vr.WorkflowSubType == int32(binlogdatapb.VReplicationWorkflowSubType_AtomicCopy) && vr.state == binlogdatapb.VReplicationWorkflowState_Copying) || isUnrecoverableError(err) || !ct.lastWorkflowError.ShouldRetry() { err = vterrors.Wrapf(err, TerminalErrorIndicator)