Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions go/test/endtoend/vreplication/fk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/log"
vttablet "vitess.io/vitess/go/vt/vttablet/common"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
)
Expand Down Expand Up @@ -101,6 +102,49 @@ func TestFKWorkflow(t *testing.T) {
waitForWorkflowState(t, vc, ksWorkflow, binlogdatapb.VReplicationWorkflowState_Running.String())
targetKs := vc.Cells[cellName].Keyspaces[targetKeyspace]
targetTab := targetKs.Shards["0"].Tablets[fmt.Sprintf("%s-%d", cellName, targetTabletId)].Vttablet

// Stop the LoadSimulator while we are testing for workflow error, so that
// we don't error out in the LoadSimulator as we will be shutting down source dbServer.
if withLoad {
cancel()
<-ch
}

sourceTab := vc.Cells[cellName].Keyspaces[sourceKeyspace].Shards["0"].Tablets[fmt.Sprintf("%s-%d", cellName, 100)]

// Stop the source database server to simulate an error during replication phase
// This should cause recoverable errors that atomic workflows should retry
// as it is already out of copy phase.
err := sourceTab.DbServer.Stop()
require.NoError(t, err)

// Give some time for the workflow to encounter errors and potentially retry
time.Sleep(2 * vttablet.GetDefaultVReplicationConfig().RetryDelay)

// Verify workflow is still running and hasn't terminated due to errors
waitForWorkflowState(t, vc, ksWorkflow, binlogdatapb.VReplicationWorkflowState_Running.String())

// Restart the source database to allow workflow to continue
err = sourceTab.DbServer.StartProvideInit(false)
require.NoError(t, err)

err = vc.VtctldClient.ExecuteCommand("SetWritable", fmt.Sprintf("%s-%d", cellName, 100), "true")
require.NoError(t, err)

// Restart the LoadSimulator.
if withLoad {
ctx, cancel = context.WithCancel(context.Background())
ls = newFKLoadSimulator(t, ctx)
defer func() {
select {
case <-ctx.Done():
default:
cancel()
}
}()
go ls.simulateLoad()
}

require.NotNil(t, targetTab)
catchup(t, targetTab, workflowName, "MoveTables")
vdiff(t, targetKeyspace, workflowName, cellName, nil)
Expand Down
4 changes: 2 additions & 2 deletions go/vt/vttablet/tabletmanager/vreplication/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,8 +305,8 @@ func (ct *controller) runBlp(ctx context.Context) (err error) {
// it's a FAILED_PRECONDITION vterror, OR we cannot identify this as
// non-recoverable BUT it has persisted beyond the retry limit
// (maxTimeToRetryError). In addition, we cannot restart a workflow
// started with AtomicCopy which has _any_ error.
if (err != nil && vr.WorkflowSubType == int32(binlogdatapb.VReplicationWorkflowSubType_AtomicCopy)) ||
// started with AtomicCopy which has _any_ error during copy phase.
if (err != nil && vr.WorkflowSubType == int32(binlogdatapb.VReplicationWorkflowSubType_AtomicCopy) && vr.state == binlogdatapb.VReplicationWorkflowState_Copying) ||
isUnrecoverableError(err) ||
!ct.lastWorkflowError.ShouldRetry() {
err = vterrors.Wrapf(err, TerminalErrorIndicator)
Expand Down
Loading