diff --git a/go/vt/vtctl/workflow/materializer_test.go b/go/vt/vtctl/workflow/materializer_test.go index c28beb933e8..e16a6ab2515 100644 --- a/go/vt/vtctl/workflow/materializer_test.go +++ b/go/vt/vtctl/workflow/materializer_test.go @@ -20,6 +20,7 @@ import ( "context" "errors" "fmt" + "path" "slices" "strings" "testing" @@ -1014,6 +1015,210 @@ func TestMoveTablesNoRoutingRules(t *testing.T) { require.Zerof(t, len(rr.Rules), "routing rules should be empty, found %+v", rr.Rules) } +func TestMoveTablesCreateShardedVSchemaRollback(t *testing.T) { + ms := &vtctldatapb.MaterializeSettings{ + Workflow: "workflow", + SourceKeyspace: "sourceks", + TargetKeyspace: "targetks", + TableSettings: []*vtctldatapb.TableMaterializeSettings{{ + TargetTable: "t1", + SourceExpression: "select * from t1", + }}, + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + env := newTestMaterializerEnv(t, ctx, ms, []string{"-"}, []string{"-"}) + defer env.close() + + targetVSchema := &vschemapb.Keyspace{ + Sharded: true, + Vindexes: map[string]*vschemapb.Vindex{ + "hash": { + Type: "hash", + }, + }, + Tables: map[string]*vschemapb.Table{ + "t1": { + ColumnVindexes: []*vschemapb.ColumnVindex{{ + Name: "hash", + Column: "id", + }}, + }, + }, + } + err := env.ws.ts.SaveVSchema(ctx, &topo.KeyspaceVSchemaInfo{ + Name: ms.TargetKeyspace, + Keyspace: targetVSchema, + }) + require.NoError(t, err) + + env.tmc.expectFetchAsAllPrivsQuery(startingTargetTabletUID, getNonEmptyTable, &sqltypes.Result{}) + + sourceDeleteQuery := fmt.Sprintf(sqlDeleteWorkflow, encodeString("vt_sourceks"), encodeString(ReverseWorkflowName(ms.Workflow))) + targetDeleteQuery := fmt.Sprintf(sqlDeleteWorkflow, encodeString("vt_targetks"), encodeString(ms.Workflow)) + env.tmc.expectVRQuery(startingSourceTabletUID, sourceDeleteQuery, &sqltypes.Result{}) + env.tmc.expectVRQuery(startingTargetTabletUID, targetDeleteQuery, &sqltypes.Result{}) + + readCalls := 0 + env.tmc.readVReplicationWorkflow = func(ctx context.Context, tablet *topodatapb.Tablet, request *tabletmanagerdatapb.ReadVReplicationWorkflowRequest) (*tabletmanagerdatapb.ReadVReplicationWorkflowResponse, error) { + readCalls++ + if readCalls == 1 { + return &tabletmanagerdatapb.ReadVReplicationWorkflowResponse{ + Workflow: request.Workflow, + WorkflowType: binlogdatapb.VReplicationWorkflowType_MoveTables, + Streams: []*tabletmanagerdatapb.ReadVReplicationWorkflowResponse_Stream{ + { + Id: 1, + Bls: &binlogdatapb.BinlogSource{ + Keyspace: ms.SourceKeyspace, + Shard: "-", + Filter: &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{{ + Match: "t1", + Filter: "select * from t1", + }}, + }, + }, + }, + }, + }, nil + } + return nil, errors.New("read vreplication failed") + } + + _, err = env.ws.MoveTablesCreate(ctx, &vtctldatapb.MoveTablesCreateRequest{ + Workflow: ms.Workflow, + SourceKeyspace: ms.SourceKeyspace, + TargetKeyspace: ms.TargetKeyspace, + IncludeTables: []string{"t1"}, + }) + require.ErrorContains(t, err, "read vreplication failed") + + got, err := env.ws.ts.GetVSchema(ctx, ms.TargetKeyspace) + require.NoError(t, err) + require.True(t, proto.Equal(got.Keyspace, targetVSchema), "got: %v, want: %v", got.Keyspace, targetVSchema) +} + +func TestMoveTablesCreateUnshardedVSchemaRollback(t *testing.T) { + ms := &vtctldatapb.MaterializeSettings{ + Workflow: "workflow", + SourceKeyspace: "sourceks", + TargetKeyspace: "targetks", + TableSettings: []*vtctldatapb.TableMaterializeSettings{{ + TargetTable: "t1", + SourceExpression: "select * from t1", + }}, + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + env := newTestMaterializerEnv(t, ctx, ms, []string{"-"}, []string{"-"}) + defer env.close() + + originalVSchema := &vschemapb.Keyspace{ + Tables: map[string]*vschemapb.Table{ + "t0": {}, + }, + } + err := env.ws.ts.SaveVSchema(ctx, &topo.KeyspaceVSchemaInfo{ + Name: ms.TargetKeyspace, + Keyspace: originalVSchema, + }) + require.NoError(t, err) + + env.tmc.expectFetchAsAllPrivsQuery(startingTargetTabletUID, getNonEmptyTable, &sqltypes.Result{}) + + sourceDeleteQuery := fmt.Sprintf(sqlDeleteWorkflow, encodeString("vt_sourceks"), encodeString(ReverseWorkflowName(ms.Workflow))) + targetDeleteQuery := fmt.Sprintf(sqlDeleteWorkflow, encodeString("vt_targetks"), encodeString(ms.Workflow)) + env.tmc.expectVRQuery(startingSourceTabletUID, sourceDeleteQuery, &sqltypes.Result{}) + env.tmc.expectVRQuery(startingTargetTabletUID, targetDeleteQuery, &sqltypes.Result{}) + + conn, err := env.ws.ts.ConnForCell(ctx, topo.GlobalCell) + require.NoError(t, err) + current, changes, err := conn.Watch(ctx, path.Join(topo.KeyspacesPath, ms.TargetKeyspace, topo.VSchemaFile)) + require.NoError(t, err) + initialVersion := "" + if current != nil && current.Version != nil { + initialVersion = current.Version.String() + } + + failCh := make(chan struct{}) + readCalls := 0 + env.tmc.readVReplicationWorkflow = func(ctx context.Context, tablet *topodatapb.Tablet, request *tabletmanagerdatapb.ReadVReplicationWorkflowRequest) (*tabletmanagerdatapb.ReadVReplicationWorkflowResponse, error) { + readCalls++ + if readCalls == 1 { + return &tabletmanagerdatapb.ReadVReplicationWorkflowResponse{ + Workflow: request.Workflow, + WorkflowType: binlogdatapb.VReplicationWorkflowType_MoveTables, + Streams: []*tabletmanagerdatapb.ReadVReplicationWorkflowResponse_Stream{ + { + Id: 1, + Bls: &binlogdatapb.BinlogSource{ + Keyspace: ms.SourceKeyspace, + Shard: "-", + Filter: &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{{ + Match: "t1", + Filter: "select * from t1", + }}, + }, + }, + }, + }, + }, nil + } + <-failCh + return nil, errors.New("read vreplication failed") + } + + errCh := make(chan error, 1) + go func() { + _, err := env.ws.MoveTablesCreate(ctx, &vtctldatapb.MoveTablesCreateRequest{ + Workflow: ms.Workflow, + SourceKeyspace: ms.SourceKeyspace, + TargetKeyspace: ms.TargetKeyspace, + IncludeTables: []string{"t1"}, + }) + errCh <- err + }() + + updatedVersion := "" + assert.Eventually(t, func() bool { + select { + case wd := <-changes: + if wd == nil || wd.Err != nil || wd.Contents == nil { + return false + } + ks := &vschemapb.Keyspace{} + if err := ks.UnmarshalVT(wd.Contents); err != nil { + return false + } + if ks.Tables["t1"] == nil { + return false + } + if wd.Version != nil { + updatedVersion = wd.Version.String() + } + return true + default: + return false + } + }, 5*time.Second, 50*time.Millisecond) + require.NotEmpty(t, updatedVersion) + if initialVersion != "" { + require.NotEqual(t, initialVersion, updatedVersion) + } + + close(failCh) + err = <-errCh + require.ErrorContains(t, err, "read vreplication failed") + + got, err := env.ws.ts.GetVSchema(ctx, ms.TargetKeyspace) + require.NoError(t, err) + require.True(t, proto.Equal(got.Keyspace, originalVSchema), "got: %v, want: %v", got.Keyspace, originalVSchema) +} + func TestCreateLookupVindexFull(t *testing.T) { ms := &vtctldatapb.MaterializeSettings{ Workflow: "lookup", diff --git a/go/vt/vtctl/workflow/server.go b/go/vt/vtctl/workflow/server.go index e37a6bf1504..8d8ade19b83 100644 --- a/go/vt/vtctl/workflow/server.go +++ b/go/vt/vtctl/workflow/server.go @@ -897,9 +897,7 @@ func (s *Server) moveTablesCreate(ctx context.Context, req *vtctldatapb.MoveTabl s.Logger().Infof("Successfully opened external topo: %+v", externalTopo) } - origVSchema := &topo.KeyspaceVSchemaInfo{ // If we need to rollback a failed create - Name: targetKeyspace, - } + var origVSchema *topo.KeyspaceVSchemaInfo // If we need to rollback a failed create vschema, err := s.ts.GetVSchema(ctx, targetKeyspace) if err != nil { return nil, err @@ -957,9 +955,11 @@ func (s *Server) moveTablesCreate(ctx context.Context, req *vtctldatapb.MoveTabl if !vschema.Sharded { // Save the original in case we need to restore it for a late failure in // the defer(). We do NOT want to clone the version field as we will - // intentionally be going back in time. So we only clone the internal - // vschemapb.Keyspace field. - origVSchema.Keyspace = vschema.Keyspace.CloneVT() + // intentionally be going back in time. + origVSchema = &topo.KeyspaceVSchemaInfo{ + Name: targetKeyspace, + Keyspace: vschema.Keyspace.CloneVT(), + } if err := s.addTablesToVSchema(ctx, sourceKeyspace, vschema.Keyspace, tables, externalTopo == nil); err != nil { return nil, err } @@ -1061,7 +1061,8 @@ func (s *Server) moveTablesCreate(ctx context.Context, req *vtctldatapb.MoveTabl if cerr := s.dropArtifacts(ctx, false, &switcher{s: s, ts: ts}); cerr != nil { err = vterrors.Wrapf(err, "failed to cleanup workflow artifacts: %v", cerr) } - if origVSchema == nil { // There's no previous version to restore + if vschema.Sharded || origVSchema == nil { + // We don't modify the vschema for sharded keyspaces, so there's nothing to restore. return } if cerr := s.ts.SaveVSchema(ctx, origVSchema); cerr != nil {