Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
205 changes: 205 additions & 0 deletions go/vt/vtctl/workflow/materializer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"errors"
"fmt"
"path"
"slices"
"strings"
"testing"
Expand Down Expand Up @@ -1014,6 +1015,210 @@ func TestMoveTablesNoRoutingRules(t *testing.T) {
require.Zerof(t, len(rr.Rules), "routing rules should be empty, found %+v", rr.Rules)
}

func TestMoveTablesCreateShardedVSchemaRollback(t *testing.T) {
ms := &vtctldatapb.MaterializeSettings{
Workflow: "workflow",
SourceKeyspace: "sourceks",
TargetKeyspace: "targetks",
TableSettings: []*vtctldatapb.TableMaterializeSettings{{
TargetTable: "t1",
SourceExpression: "select * from t1",
}},
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
env := newTestMaterializerEnv(t, ctx, ms, []string{"-"}, []string{"-"})
defer env.close()

targetVSchema := &vschemapb.Keyspace{
Sharded: true,
Vindexes: map[string]*vschemapb.Vindex{
"hash": {
Type: "hash",
},
},
Tables: map[string]*vschemapb.Table{
"t1": {
ColumnVindexes: []*vschemapb.ColumnVindex{{
Name: "hash",
Column: "id",
}},
},
},
}
err := env.ws.ts.SaveVSchema(ctx, &topo.KeyspaceVSchemaInfo{
Name: ms.TargetKeyspace,
Keyspace: targetVSchema,
})
require.NoError(t, err)

env.tmc.expectFetchAsAllPrivsQuery(startingTargetTabletUID, getNonEmptyTable, &sqltypes.Result{})

sourceDeleteQuery := fmt.Sprintf(sqlDeleteWorkflow, encodeString("vt_sourceks"), encodeString(ReverseWorkflowName(ms.Workflow)))
targetDeleteQuery := fmt.Sprintf(sqlDeleteWorkflow, encodeString("vt_targetks"), encodeString(ms.Workflow))
env.tmc.expectVRQuery(startingSourceTabletUID, sourceDeleteQuery, &sqltypes.Result{})
env.tmc.expectVRQuery(startingTargetTabletUID, targetDeleteQuery, &sqltypes.Result{})

readCalls := 0
env.tmc.readVReplicationWorkflow = func(ctx context.Context, tablet *topodatapb.Tablet, request *tabletmanagerdatapb.ReadVReplicationWorkflowRequest) (*tabletmanagerdatapb.ReadVReplicationWorkflowResponse, error) {
readCalls++
if readCalls == 1 {
return &tabletmanagerdatapb.ReadVReplicationWorkflowResponse{
Workflow: request.Workflow,
WorkflowType: binlogdatapb.VReplicationWorkflowType_MoveTables,
Streams: []*tabletmanagerdatapb.ReadVReplicationWorkflowResponse_Stream{
{
Id: 1,
Bls: &binlogdatapb.BinlogSource{
Keyspace: ms.SourceKeyspace,
Shard: "-",
Filter: &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{{
Match: "t1",
Filter: "select * from t1",
}},
},
},
},
},
}, nil
}
return nil, errors.New("read vreplication failed")
}

_, err = env.ws.MoveTablesCreate(ctx, &vtctldatapb.MoveTablesCreateRequest{
Workflow: ms.Workflow,
SourceKeyspace: ms.SourceKeyspace,
TargetKeyspace: ms.TargetKeyspace,
IncludeTables: []string{"t1"},
})
require.ErrorContains(t, err, "read vreplication failed")

got, err := env.ws.ts.GetVSchema(ctx, ms.TargetKeyspace)
require.NoError(t, err)
require.True(t, proto.Equal(got.Keyspace, targetVSchema), "got: %v, want: %v", got.Keyspace, targetVSchema)
}

func TestMoveTablesCreateUnshardedVSchemaRollback(t *testing.T) {
ms := &vtctldatapb.MaterializeSettings{
Workflow: "workflow",
SourceKeyspace: "sourceks",
TargetKeyspace: "targetks",
TableSettings: []*vtctldatapb.TableMaterializeSettings{{
TargetTable: "t1",
SourceExpression: "select * from t1",
}},
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
env := newTestMaterializerEnv(t, ctx, ms, []string{"-"}, []string{"-"})
defer env.close()

originalVSchema := &vschemapb.Keyspace{
Tables: map[string]*vschemapb.Table{
"t0": {},
},
}
err := env.ws.ts.SaveVSchema(ctx, &topo.KeyspaceVSchemaInfo{
Name: ms.TargetKeyspace,
Keyspace: originalVSchema,
})
require.NoError(t, err)

env.tmc.expectFetchAsAllPrivsQuery(startingTargetTabletUID, getNonEmptyTable, &sqltypes.Result{})

sourceDeleteQuery := fmt.Sprintf(sqlDeleteWorkflow, encodeString("vt_sourceks"), encodeString(ReverseWorkflowName(ms.Workflow)))
targetDeleteQuery := fmt.Sprintf(sqlDeleteWorkflow, encodeString("vt_targetks"), encodeString(ms.Workflow))
env.tmc.expectVRQuery(startingSourceTabletUID, sourceDeleteQuery, &sqltypes.Result{})
env.tmc.expectVRQuery(startingTargetTabletUID, targetDeleteQuery, &sqltypes.Result{})

conn, err := env.ws.ts.ConnForCell(ctx, topo.GlobalCell)
require.NoError(t, err)
current, changes, err := conn.Watch(ctx, path.Join(topo.KeyspacesPath, ms.TargetKeyspace, topo.VSchemaFile))
require.NoError(t, err)
initialVersion := ""
if current != nil && current.Version != nil {
initialVersion = current.Version.String()
}

failCh := make(chan struct{})
readCalls := 0
env.tmc.readVReplicationWorkflow = func(ctx context.Context, tablet *topodatapb.Tablet, request *tabletmanagerdatapb.ReadVReplicationWorkflowRequest) (*tabletmanagerdatapb.ReadVReplicationWorkflowResponse, error) {
readCalls++
if readCalls == 1 {
return &tabletmanagerdatapb.ReadVReplicationWorkflowResponse{
Workflow: request.Workflow,
WorkflowType: binlogdatapb.VReplicationWorkflowType_MoveTables,
Streams: []*tabletmanagerdatapb.ReadVReplicationWorkflowResponse_Stream{
{
Id: 1,
Bls: &binlogdatapb.BinlogSource{
Keyspace: ms.SourceKeyspace,
Shard: "-",
Filter: &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{{
Match: "t1",
Filter: "select * from t1",
}},
},
},
},
},
}, nil
}
<-failCh
return nil, errors.New("read vreplication failed")
}

errCh := make(chan error, 1)
go func() {
_, err := env.ws.MoveTablesCreate(ctx, &vtctldatapb.MoveTablesCreateRequest{
Workflow: ms.Workflow,
SourceKeyspace: ms.SourceKeyspace,
TargetKeyspace: ms.TargetKeyspace,
IncludeTables: []string{"t1"},
})
errCh <- err
}()

updatedVersion := ""
assert.Eventually(t, func() bool {
select {
case wd := <-changes:
if wd == nil || wd.Err != nil || wd.Contents == nil {
return false
}
ks := &vschemapb.Keyspace{}
if err := ks.UnmarshalVT(wd.Contents); err != nil {
return false
}
if ks.Tables["t1"] == nil {
return false
}
if wd.Version != nil {
updatedVersion = wd.Version.String()
}
return true
default:
return false
}
}, 5*time.Second, 50*time.Millisecond)
require.NotEmpty(t, updatedVersion)
if initialVersion != "" {
require.NotEqual(t, initialVersion, updatedVersion)
}

close(failCh)
err = <-errCh
require.ErrorContains(t, err, "read vreplication failed")

got, err := env.ws.ts.GetVSchema(ctx, ms.TargetKeyspace)
require.NoError(t, err)
require.True(t, proto.Equal(got.Keyspace, originalVSchema), "got: %v, want: %v", got.Keyspace, originalVSchema)
}

func TestCreateLookupVindexFull(t *testing.T) {
ms := &vtctldatapb.MaterializeSettings{
Workflow: "lookup",
Expand Down
15 changes: 8 additions & 7 deletions go/vt/vtctl/workflow/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -897,9 +897,7 @@ func (s *Server) moveTablesCreate(ctx context.Context, req *vtctldatapb.MoveTabl
s.Logger().Infof("Successfully opened external topo: %+v", externalTopo)
}

origVSchema := &topo.KeyspaceVSchemaInfo{ // If we need to rollback a failed create
Name: targetKeyspace,
}
var origVSchema *topo.KeyspaceVSchemaInfo // If we need to rollback a failed create
vschema, err := s.ts.GetVSchema(ctx, targetKeyspace)
if err != nil {
return nil, err
Expand Down Expand Up @@ -957,9 +955,11 @@ func (s *Server) moveTablesCreate(ctx context.Context, req *vtctldatapb.MoveTabl
if !vschema.Sharded {
// Save the original in case we need to restore it for a late failure in
// the defer(). We do NOT want to clone the version field as we will
// intentionally be going back in time. So we only clone the internal
// vschemapb.Keyspace field.
origVSchema.Keyspace = vschema.Keyspace.CloneVT()
// intentionally be going back in time.
origVSchema = &topo.KeyspaceVSchemaInfo{
Name: targetKeyspace,
Keyspace: vschema.Keyspace.CloneVT(),
}
if err := s.addTablesToVSchema(ctx, sourceKeyspace, vschema.Keyspace, tables, externalTopo == nil); err != nil {
return nil, err
}
Expand Down Expand Up @@ -1061,7 +1061,8 @@ func (s *Server) moveTablesCreate(ctx context.Context, req *vtctldatapb.MoveTabl
if cerr := s.dropArtifacts(ctx, false, &switcher{s: s, ts: ts}); cerr != nil {
err = vterrors.Wrapf(err, "failed to cleanup workflow artifacts: %v", cerr)
}
if origVSchema == nil { // There's no previous version to restore
if vschema.Sharded || origVSchema == nil {
// We don't modify the vschema for sharded keyspaces, so there's nothing to restore.
return
}
if cerr := s.ts.SaveVSchema(ctx, origVSchema); cerr != nil {
Expand Down
Loading