Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions go/mysql/sqlerror/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,11 +84,12 @@ const (
ERKeyDoesNotExist = ErrorCode(1176)

// permissions
ERDBAccessDenied = ErrorCode(1044)
ERAccessDeniedError = ErrorCode(1045)
ERKillDenied = ErrorCode(1095)
ERNoPermissionToCreateUsers = ErrorCode(1211)
ERSpecifiedAccessDenied = ErrorCode(1227)
ERDBAccessDenied = ErrorCode(1044)
ERAccessDeniedError = ErrorCode(1045)
ERKillDenied = ErrorCode(1095)
ERNoPermissionToCreateUsers = ErrorCode(1211)
ERSpecifiedAccessDenied = ErrorCode(1227)
ERBinlogCreateRoutineNeedSuper = ErrorCode(1419)

// failed precondition
ERNoDb = ErrorCode(1046)
Expand Down
44 changes: 44 additions & 0 deletions go/test/endtoend/vreplication/fk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/log"
vttablet "vitess.io/vitess/go/vt/vttablet/common"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
)
Expand Down Expand Up @@ -101,6 +102,49 @@ func TestFKWorkflow(t *testing.T) {
waitForWorkflowState(t, vc, ksWorkflow, binlogdatapb.VReplicationWorkflowState_Running.String())
targetKs := vc.Cells[cellName].Keyspaces[targetKeyspace]
targetTab := targetKs.Shards["0"].Tablets[fmt.Sprintf("%s-%d", cellName, targetTabletId)].Vttablet

// Stop the LoadSimulator while we are testing for workflow error, so that
// we don't error out in the LoadSimulator as we will be shutting down source dbServer.
if withLoad {
cancel()
<-ch
}

sourceTab := vc.Cells[cellName].Keyspaces[sourceKeyspace].Shards["0"].Tablets[fmt.Sprintf("%s-%d", cellName, 100)]

// Stop the source database server to simulate an error during replication phase
// This should cause recoverable errors that atomic workflows should retry
// as it is already out of copy phase.
err := sourceTab.DbServer.Stop()
require.NoError(t, err)

// Give some time for the workflow to encounter errors and potentially retry
time.Sleep(2 * vttablet.GetDefaultVReplicationConfig().RetryDelay)

// Verify workflow is still running and hasn't terminated due to errors
waitForWorkflowState(t, vc, ksWorkflow, binlogdatapb.VReplicationWorkflowState_Running.String())

// Restart the source database to allow workflow to continue
err = sourceTab.DbServer.StartProvideInit(false)
require.NoError(t, err)

err = vc.VtctldClient.ExecuteCommand("SetWritable", fmt.Sprintf("%s-%d", cellName, 100), "true")
require.NoError(t, err)

// Restart the LoadSimulator.
if withLoad {
ctx, cancel = context.WithCancel(context.Background())
ls = newFKLoadSimulator(t, ctx)
defer func() {
select {
case <-ctx.Done():
default:
cancel()
}
}()
go ls.simulateLoad()
}

require.NotNil(t, targetTab)
catchup(t, targetTab, workflowName, "MoveTables")
vdiff(t, targetKeyspace, workflowName, cellName, nil)
Expand Down
15 changes: 13 additions & 2 deletions go/vt/vtgate/vstream_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,9 +319,19 @@ func (vsm *vstreamManager) GetTotalStreamDelay() int64 {

func (vs *vstream) stream(ctx context.Context) error {
ctx, vs.cancel = context.WithCancel(ctx)
defer vs.cancel()

go vs.sendEvents(ctx)
vs.wg.Add(1)
go func() {
defer vs.wg.Done()

// sendEvents returns either if the given context has been canceled or if
// an error is returned from the callback. If the callback returns an error,
// we need to cancel the context to stop the other stream goroutines
// and to unblock the VStream call.
defer vs.cancel()

vs.sendEvents(ctx)
}()

// Make a copy first, because the ShardGtids list can change once streaming starts.
copylist := append(([]*binlogdatapb.ShardGtid)(nil), vs.vgtid.ShardGtids...)
Expand Down Expand Up @@ -358,6 +368,7 @@ func (vs *vstream) sendEvents(ctx context.Context) {
}
return nil
}

for {
select {
case <-ctx.Done():
Expand Down
Loading
Loading