diff --git a/go/vt/vtctl/workflow/framework_test.go b/go/vt/vtctl/workflow/framework_test.go index 197ebf1a709..39d3e7db687 100644 --- a/go/vt/vtctl/workflow/framework_test.go +++ b/go/vt/vtctl/workflow/framework_test.go @@ -234,6 +234,7 @@ type testTMClient struct { vrQueries map[int][]*queryResult createVReplicationWorkflowRequests map[uint32]*tabletmanagerdatapb.CreateVReplicationWorkflowRequest readVReplicationWorkflowRequests map[uint32]*tabletmanagerdatapb.ReadVReplicationWorkflowRequest + primaryPositions map[uint32]string env *testEnv // For access to the env config from tmc methods. reverse atomic.Bool // Are we reversing traffic? @@ -245,6 +246,7 @@ func newTestTMClient(env *testEnv) *testTMClient { vrQueries: make(map[int][]*queryResult), createVReplicationWorkflowRequests: make(map[uint32]*tabletmanagerdatapb.CreateVReplicationWorkflowRequest), readVReplicationWorkflowRequests: make(map[uint32]*tabletmanagerdatapb.ReadVReplicationWorkflowRequest), + primaryPositions: make(map[uint32]string), env: env, } } @@ -435,7 +437,21 @@ func (tmc *testTMClient) UpdateVReplicationWorkflow(ctx context.Context, tablet }, nil } +func (tmc *testTMClient) setPrimaryPosition(tablet *topodatapb.Tablet, position string) { + tmc.mu.Lock() + defer tmc.mu.Unlock() + if tmc.primaryPositions == nil { + tmc.primaryPositions = make(map[uint32]string) + } + tmc.primaryPositions[tablet.Alias.Uid] = position +} + func (tmc *testTMClient) PrimaryPosition(ctx context.Context, tablet *topodatapb.Tablet) (string, error) { + tmc.mu.Lock() + defer tmc.mu.Unlock() + if tmc.primaryPositions != nil && tmc.primaryPositions[tablet.Alias.Uid] != "" { + return tmc.primaryPositions[tablet.Alias.Uid], nil + } return position, nil } diff --git a/go/vt/vtctl/workflow/server.go b/go/vt/vtctl/workflow/server.go index c9a512e4bf5..f2cf2463c3b 100644 --- a/go/vt/vtctl/workflow/server.go +++ b/go/vt/vtctl/workflow/server.go @@ -3279,6 +3279,13 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit } } + // Get the source positions now that writes are stopped, the streams were stopped (e.g. + // intra-keyspace materializations that write on the source), and we know for certain + // that any in progress writes are done. + if err := ts.gatherSourcePositions(ctx); err != nil { + return handleError("failed to gather replication positions on migration sources", err) + } + ts.Logger().Infof("Waiting for streams to catchup") if err := sw.waitForCatchup(ctx, timeout); err != nil { sw.cancelMigration(ctx, sm) diff --git a/go/vt/vtctl/workflow/traffic_switcher.go b/go/vt/vtctl/workflow/traffic_switcher.go index 5af6bb35348..0fdbe0eaaf1 100644 --- a/go/vt/vtctl/workflow/traffic_switcher.go +++ b/go/vt/vtctl/workflow/traffic_switcher.go @@ -952,19 +952,10 @@ func (ts *trafficSwitcher) stopSourceWrites(ctx context.Context) error { err = ts.changeShardsAccess(ctx, ts.SourceKeyspaceName(), ts.SourceShards(), disallowWrites) } if err != nil { - log.Warningf("Error: %s", err) + ts.Logger().Warningf("Error stopping writes on migration sources: %v", err) return err } - return ts.ForAllSources(func(source *MigrationSource) error { - var err error - source.Position, err = ts.TabletManagerClient().PrimaryPosition(ctx, source.GetPrimary().Tablet) - log.Infof("Stopped Source Writes. Position for source %v:%v: %v", - ts.SourceKeyspaceName(), source.GetShard().ShardName(), source.Position) - if err != nil { - log.Warningf("Error: %s", err) - } - return err - }) + return nil } func (ts *trafficSwitcher) changeTableSourceWrites(ctx context.Context, access accessType) error { @@ -1223,6 +1214,24 @@ func (ts *trafficSwitcher) gatherPositions(ctx context.Context) error { }) } +// gatherSourcePositions will get the current replication position for all +// migration sources. +func (ts *trafficSwitcher) gatherSourcePositions(ctx context.Context) error { + return ts.ForAllSources(func(source *MigrationSource) error { + var err error + tablet := source.GetPrimary().Tablet + tabletAlias := topoproto.TabletAliasString(tablet.Alias) + source.Position, err = ts.TabletManagerClient().PrimaryPosition(ctx, tablet) + if err != nil { + ts.Logger().Errorf("Error getting migration source position on %s: %s", tabletAlias, err) + return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "failed to get position on migration source %s: %v", + tabletAlias, err) + } + ts.Logger().Infof("Position on migration source %s after having stopped writes: %s", tabletAlias, source.Position) + return nil + }) +} + func (ts *trafficSwitcher) isSequenceParticipating(ctx context.Context) (bool, error) { vschema, err := ts.TopoServer().GetVSchema(ctx, ts.targetKeyspace) if err != nil { diff --git a/go/vt/vtctl/workflow/traffic_switcher_test.go b/go/vt/vtctl/workflow/traffic_switcher_test.go index c416baa18f9..85e1dc3da6d 100644 --- a/go/vt/vtctl/workflow/traffic_switcher_test.go +++ b/go/vt/vtctl/workflow/traffic_switcher_test.go @@ -17,10 +17,17 @@ limitations under the License. package workflow import ( + "context" + "fmt" + "strconv" + "strings" "testing" + "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata" "vitess.io/vitess/go/vt/vtgate/vindexes" ) @@ -56,3 +63,72 @@ func TestReverseWorkflowName(t *testing.T) { assert.Equal(t, test.out, got) } } + +// TestSwitchTrafficPositionHandling confirms that if any writes are somehow +// executed against the source between the stop source writes and wait for +// catchup steps, that we have the correct position and do not lose the write(s). +func TestTrafficSwitchPositionHandling(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) + defer cancel() + + workflowName := "wf1" + tableName := "t1" + sourceKeyspaceName := "sourceks" + targetKeyspaceName := "targetks" + + schema := map[string]*tabletmanagerdatapb.SchemaDefinition{ + tableName: { + TableDefinitions: []*tabletmanagerdatapb.TableDefinition{ + { + Name: tableName, + Schema: fmt.Sprintf("CREATE TABLE %s (id BIGINT, name VARCHAR(64), PRIMARY KEY (id))", tableName), + }, + }, + }, + } + + sourceKeyspace := &testKeyspace{ + KeyspaceName: sourceKeyspaceName, + ShardNames: []string{"0"}, + } + targetKeyspace := &testKeyspace{ + KeyspaceName: targetKeyspaceName, + ShardNames: []string{"0"}, + } + + env := newTestEnv(t, ctx, defaultCellName, sourceKeyspace, targetKeyspace) + defer env.close() + env.tmc.schema = schema + + ts, _, err := env.ws.getWorkflowState(ctx, targetKeyspaceName, workflowName) + require.NoError(t, err) + sw := &switcher{ts: ts, s: env.ws} + + lockCtx, sourceUnlock, lockErr := sw.lockKeyspace(ctx, ts.SourceKeyspaceName(), "test") + require.NoError(t, lockErr) + ctx = lockCtx + defer sourceUnlock(&err) + lockCtx, targetUnlock, lockErr := sw.lockKeyspace(ctx, ts.TargetKeyspaceName(), "test") + require.NoError(t, lockErr) + ctx = lockCtx + defer targetUnlock(&err) + + err = ts.stopSourceWrites(ctx) + require.NoError(t, err) + + // Now we simulate a write on the source. + newPosition := position[:strings.LastIndex(position, "-")+1] + oldSeqNo, err := strconv.Atoi(position[strings.LastIndex(position, "-")+1:]) + require.NoError(t, err) + newPosition = fmt.Sprintf("%s%d", newPosition, oldSeqNo+1) + env.tmc.setPrimaryPosition(env.tablets[sourceKeyspaceName][startingSourceTabletUID], newPosition) + + // And confirm that we picked up the new position. + err = ts.gatherSourcePositions(ctx) + require.NoError(t, err) + err = ts.ForAllSources(func(ms *MigrationSource) error { + require.Equal(t, newPosition, ms.Position) + return nil + }) + require.NoError(t, err) +} diff --git a/go/vt/vtctl/workflow/utils.go b/go/vt/vtctl/workflow/utils.go index 80b981026d8..84ce57a68a4 100644 --- a/go/vt/vtctl/workflow/utils.go +++ b/go/vt/vtctl/workflow/utils.go @@ -636,7 +636,7 @@ func areTabletsAvailableToStreamFrom(ctx context.Context, req *vtctldatapb.Workf wg.Wait() if allErrors.HasErrors() { - log.Errorf("%s", allErrors.Error()) + ts.Logger().Errorf("%s", allErrors.Error()) return allErrors.Error() } return nil