Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 5 additions & 7 deletions go/cmd/vtctldclient/command/vreplication/reshard/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,20 +60,18 @@ func commandReshardCreate(cmd *cobra.Command, args []string) error {
cli.FinishedParsing(cmd)

req := &vtctldatapb.ReshardCreateRequest{
Workflow: common.BaseOptions.Workflow,
Keyspace: common.BaseOptions.TargetKeyspace,

Workflow: common.BaseOptions.Workflow,
Keyspace: common.BaseOptions.TargetKeyspace,
TabletTypes: common.CreateOptions.TabletTypes,
TabletSelectionPreference: tsp,
Cells: common.CreateOptions.Cells,
OnDdl: common.CreateOptions.OnDDL,
DeferSecondaryKeys: common.CreateOptions.DeferSecondaryKeys,
AutoStart: common.CreateOptions.AutoStart,
StopAfterCopy: common.CreateOptions.StopAfterCopy,

SourceShards: reshardCreateOptions.sourceShards,
TargetShards: reshardCreateOptions.targetShards,
SkipSchemaCopy: reshardCreateOptions.skipSchemaCopy,
SourceShards: reshardCreateOptions.sourceShards,
TargetShards: reshardCreateOptions.targetShards,
SkipSchemaCopy: reshardCreateOptions.skipSchemaCopy,
}
resp, err := common.GetClient().ReshardCreate(common.GetCommandCtx(), req)
if err != nil {
Expand Down
191 changes: 165 additions & 26 deletions go/test/endtoend/vreplication/vreplication_vtctldclient_cli_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"google.golang.org/protobuf/encoding/protojson"

"vitess.io/vitess/go/test/endtoend/cluster"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
Expand All @@ -54,20 +53,33 @@ func TestVtctldclientCLI(t *testing.T) {
require.NotNil(t, zone2)
defer vc.TearDown()

sourceKeyspace := "product"
targetKeyspace := "customer"
sourceKeyspaceName := "product"
targetKeyspaceName := "customer"
var mt iMoveTables
workflowName := "wf1"
targetTabs := setupMinimalCustomerKeyspace(t)

t.Run("MoveTablesCreateFlags1", func(t *testing.T) {
testMoveTablesFlags1(t, &mt, sourceKeyspace, targetKeyspace, workflowName, targetTabs)
testMoveTablesFlags1(t, &mt, sourceKeyspaceName, targetKeyspaceName, workflowName, targetTabs)
})
t.Run("MoveTablesCreateFlags2", func(t *testing.T) {
testMoveTablesFlags2(t, &mt, sourceKeyspace, targetKeyspace, workflowName, targetTabs)
testMoveTablesFlags2(t, &mt, sourceKeyspaceName, targetKeyspaceName, workflowName, targetTabs)
})
t.Run("MoveTablesCompleteFlags3", func(t *testing.T) {
testMoveTablesFlags3(t, sourceKeyspaceName, targetKeyspaceName, targetTabs)
})
t.Run("MoveTablesCompleteFlags", func(t *testing.T) {
testMoveTablesFlags3(t, sourceKeyspace, targetKeyspace, targetTabs)
t.Run("Reshard", func(t *testing.T) {
cell := vc.Cells["zone1"]
targetKeyspace := cell.Keyspaces[targetKeyspaceName]
sourceShard := "-80"
newShards := "-40,40-80"
require.NoError(t, vc.AddShards(t, []*Cell{cell}, targetKeyspace, newShards, 1, 0, 400, nil))
reshardWorkflowName := "reshard"
tablets := map[string]*cluster.VttabletProcess{
"-40": targetKeyspace.Shards["-40"].Tablets["zone1-400"].Vttablet,
"40-80": targetKeyspace.Shards["40-80"].Tablets["zone1-500"].Vttablet,
}
splitShard(t, targetKeyspaceName, reshardWorkflowName, sourceShard, newShards, tablets)
})
}

Expand All @@ -81,34 +93,31 @@ func testMoveTablesFlags1(t *testing.T, mt *iMoveTables, sourceKeyspace, targetK
}
completeFlags := []string{"--keep-routing-rules", "--keep-data"}
switchFlags := []string{}
// Test one set of MoveTable flags.
*mt = createMoveTables(t, sourceKeyspace, targetKeyspace, workflowName, tables, createFlags, completeFlags, switchFlags)
(*mt).Show()
moveTablesOutput := (*mt).GetLastOutput()
// Test one set of MoveTable flags.
moveTablesResponse := getMoveTablesShowResponse(mt)
workflowResponse := getWorkflow(targetKeyspace, workflowName)

workflowOutput, err := vc.VtctldClient.ExecuteCommandWithOutput("Workflow", "--keyspace", "customer", "show", "--workflow", "wf1")
require.NoError(t, err)
var moveTablesResponse vtctldatapb.GetWorkflowsResponse
err = protojson.Unmarshal([]byte(moveTablesOutput), &moveTablesResponse)
require.NoError(t, err)

var workflowResponse vtctldatapb.GetWorkflowsResponse
err = protojson.Unmarshal([]byte(workflowOutput), &workflowResponse)
require.NoError(t, err)

moveTablesResponse.Workflows[0].MaxVReplicationTransactionLag = 0
moveTablesResponse.Workflows[0].MaxVReplicationLag = 0
workflowResponse.Workflows[0].MaxVReplicationTransactionLag = 0
workflowResponse.Workflows[0].MaxVReplicationLag = 0
// also validates that MoveTables Show and Workflow Show return the same output.
require.EqualValues(t, moveTablesResponse.CloneVT(), workflowResponse.CloneVT())
require.EqualValues(t, moveTablesResponse.CloneVT(), workflowResponse)

// Validate that the flags are set correctly in the database.
validateWorkflow1(t, workflowResponse.Workflows)
validateMoveTablesWorkflow(t, workflowResponse.Workflows)
// Since we used --no-routing-rules, there should be no routing rules.
confirmNoRoutingRules(t)
}

func getMoveTablesShowResponse(mt *iMoveTables) *vtctldatapb.GetWorkflowsResponse {
moveTablesOutput := (*mt).GetLastOutput()
var moveTablesResponse vtctldatapb.GetWorkflowsResponse
err := protojson.Unmarshal([]byte(moveTablesOutput), &moveTablesResponse)
require.NoError(vc.t, err)
moveTablesResponse.Workflows[0].MaxVReplicationTransactionLag = 0
moveTablesResponse.Workflows[0].MaxVReplicationLag = 0
return moveTablesResponse.CloneVT()
}

// Validates some of the flags created from the previous test.
func testMoveTablesFlags2(t *testing.T, mt *iMoveTables, sourceKeyspace, targetKeyspace, workflowName string, targetTabs map[string]*cluster.VttabletProcess) {
ksWorkflow := fmt.Sprintf("%s.%s", targetKeyspace, workflowName)
Expand Down Expand Up @@ -184,6 +193,135 @@ func createMoveTables(t *testing.T, sourceKeyspace, targetKeyspace, workflowName
return mt
}

// reshard helpers

func splitShard(t *testing.T, keyspace, workflowName, sourceShards, targetShards string, targetTabs map[string]*cluster.VttabletProcess) {
createFlags := []string{"--auto-start=false", "--defer-secondary-keys=false", "--stop-after-copy",
"--on-ddl", "STOP", "--tablet-types", "primary,rdonly", "--tablet-types-in-preference-order=true",
"--all-cells", "--format=json",
}
rs := newReshard(vc, &reshardWorkflow{
workflowInfo: &workflowInfo{
vc: vc,
workflowName: workflowName,
targetKeyspace: keyspace,
},
sourceShards: sourceShards,
targetShards: targetShards,
createFlags: createFlags,
}, workflowFlavorVtctld)

ksWorkflow := fmt.Sprintf("%s.%s", keyspace, workflowName)
rs.Create()
validateReshardResponse(rs)
workflowResponse := getWorkflow(keyspace, workflowName)
reshardShowResponse := getReshardShowResponse(&rs)
require.EqualValues(t, reshardShowResponse, workflowResponse)
validateReshardWorkflow(t, workflowResponse.Workflows)
waitForWorkflowState(t, vc, fmt.Sprintf("%s.%s", keyspace, workflowName), binlogdatapb.VReplicationWorkflowState_Stopped.String())
rs.Start()
waitForWorkflowState(t, vc, ksWorkflow, binlogdatapb.VReplicationWorkflowState_Stopped.String())
for _, tab := range targetTabs {
alias := fmt.Sprintf("zone1-%d", tab.TabletUID)
query := "update _vt.vreplication set source := replace(source, 'stop_after_copy:true', 'stop_after_copy:false') where db_name = 'vt_customer' and workflow = '" + workflowName + "'"
output, err := vc.VtctlClient.ExecuteCommandWithOutput("ExecuteFetchAsDba", alias, query)
require.NoError(t, err, output)
}
rs.Start()
waitForWorkflowState(t, vc, fmt.Sprintf("%s.%s", keyspace, workflowName), binlogdatapb.VReplicationWorkflowState_Running.String())
rs.Stop()
waitForWorkflowState(t, vc, ksWorkflow, binlogdatapb.VReplicationWorkflowState_Stopped.String())
rs.Start()
waitForWorkflowState(t, vc, fmt.Sprintf("%s.%s", keyspace, workflowName), binlogdatapb.VReplicationWorkflowState_Running.String())
for _, targetTab := range targetTabs {
catchup(t, targetTab, workflowName, "Reshard")
}
vdiff(t, keyspace, workflowName, "zone1", false, true, nil)

rs.SwitchReadsAndWrites()
waitForLowLag(t, keyspace, workflowName+"_reverse")
vdiff(t, keyspace, workflowName+"_reverse", "zone1", true, false, nil)

rs.ReverseReadsAndWrites()
waitForLowLag(t, keyspace, workflowName)
vdiff(t, keyspace, workflowName, "zone1", false, true, nil)
rs.SwitchReadsAndWrites()
rs.Complete()
}

func getReshardShowResponse(rs *iReshard) *vtctldatapb.GetWorkflowsResponse {
(*rs).Show()
reshardOutput := (*rs).GetLastOutput()
var reshardResponse vtctldatapb.GetWorkflowsResponse
err := protojson.Unmarshal([]byte(reshardOutput), &reshardResponse)
require.NoError(vc.t, err)
reshardResponse.Workflows[0].MaxVReplicationTransactionLag = 0
reshardResponse.Workflows[0].MaxVReplicationLag = 0
return reshardResponse.CloneVT()
}

func validateReshardResponse(rs iReshard) {
resp := getReshardResponse(rs)
require.NotNil(vc.t, resp)
require.NotNil(vc.t, resp.ShardStreams)
require.Equal(vc.t, len(resp.ShardStreams), 2)
keyspace := "customer"
for _, shard := range []string{"-40", "40-80"} {
streams := resp.ShardStreams[fmt.Sprintf("%s/%s", keyspace, shard)]
require.Equal(vc.t, 1, len(streams.Streams))
require.Equal(vc.t, binlogdatapb.VReplicationWorkflowState_Stopped.String(), streams.Streams[0].Status)
}
}

func validateReshardWorkflow(t *testing.T, workflows []*vtctldatapb.Workflow) {
require.Equal(t, 1, len(workflows))
wf := workflows[0]
require.Equal(t, "reshard", wf.Name)
require.Equal(t, binlogdatapb.VReplicationWorkflowType_Reshard.String(), wf.WorkflowType)
require.Equal(t, "None", wf.WorkflowSubType)
require.Equal(t, "customer", wf.Target.Keyspace)
require.Equal(t, 2, len(wf.Target.Shards))
require.Equal(t, "customer", wf.Source.Keyspace)
require.Equal(t, 1, len(wf.Source.Shards))
require.False(t, wf.DeferSecondaryKeys)

require.GreaterOrEqual(t, len(wf.ShardStreams), int(1))
oneStream := maps.Values(wf.ShardStreams)[0]
require.NotNil(t, oneStream)

stream := oneStream.Streams[0]
require.Equal(t, binlogdatapb.VReplicationWorkflowState_Stopped.String(), stream.State)
require.Equal(t, stream.TabletSelectionPreference, tabletmanagerdatapb.TabletSelectionPreference_INORDER)
require.True(t, slices.Equal([]topodatapb.TabletType{topodatapb.TabletType_PRIMARY, topodatapb.TabletType_RDONLY}, stream.TabletTypes))
require.True(t, slices.Equal([]string{"zone1", "zone2"}, stream.Cells))

bls := stream.BinlogSource
require.Equal(t, binlogdatapb.OnDDLAction_STOP, bls.OnDdl)
require.True(t, bls.StopAfterCopy)

}

func getReshardResponse(rs iReshard) *vtctldatapb.WorkflowStatusResponse {
reshardOutput := rs.GetLastOutput()
var reshardResponse vtctldatapb.WorkflowStatusResponse
err := protojson.Unmarshal([]byte(reshardOutput), &reshardResponse)
require.NoError(vc.t, err)
return reshardResponse.CloneVT()
}

// helper functions

func getWorkflow(targetKeyspace, workflow string) *vtctldatapb.GetWorkflowsResponse {
workflowOutput, err := vc.VtctldClient.ExecuteCommandWithOutput("Workflow", "--keyspace", targetKeyspace, "show", "--workflow", workflow)
require.NoError(vc.t, err)
var workflowResponse vtctldatapb.GetWorkflowsResponse
err = protojson.Unmarshal([]byte(workflowOutput), &workflowResponse)
require.NoError(vc.t, err)
workflowResponse.Workflows[0].MaxVReplicationTransactionLag = 0
workflowResponse.Workflows[0].MaxVReplicationLag = 0
return workflowResponse.CloneVT()
}

func checkTablesExist(t *testing.T, tabletAlias string, tables []string) bool {
tablesResponse, err := vc.VtctldClient.ExecuteCommandWithOutput("GetSchema", tabletAlias, "--tables", strings.Join(tables, ","), "--table-names-only")
require.NoError(t, err)
Expand Down Expand Up @@ -211,6 +349,7 @@ func getRoutingRules(t *testing.T) *vschemapb.RoutingRules {
require.NoError(t, err)
return &routingRulesResponse
}

func confirmNoRoutingRules(t *testing.T) {
routingRulesResponse := getRoutingRules(t)
require.Zero(t, len(routingRulesResponse.Rules))
Expand All @@ -223,7 +362,7 @@ func confirmRoutingRulesExist(t *testing.T) {

// We only want to validate non-standard attributes that are set by the CLI. The other end-to-end tests validate the rest.
// We also check some of the standard attributes to make sure they are set correctly.
func validateWorkflow1(t *testing.T, workflows []*vtctldatapb.Workflow) {
func validateMoveTablesWorkflow(t *testing.T, workflows []*vtctldatapb.Workflow) {
require.Equal(t, 1, len(workflows))
wf := workflows[0]
require.Equal(t, "wf1", wf.Name)
Expand Down
31 changes: 22 additions & 9 deletions go/test/endtoend/vreplication/wrappers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,10 @@ type moveTablesWorkflow struct {
tables string
atomicCopy bool
sourceShards string
createFlags []string // currently only used by vtctld

// currently only used by vtctld
lastOutput string
createFlags []string
completeFlags []string
switchFlags []string
}
Expand Down Expand Up @@ -270,7 +271,12 @@ type reshardWorkflow struct {
targetShards string
skipSchemaCopy bool

lastOutput string
// currently only used by vtctld
lastOutput string
createFlags []string
completeFlags []string
cancelFlags []string
switchFlags []string
}

type iReshard interface {
Expand Down Expand Up @@ -379,8 +385,9 @@ func (v VtctldReshard) Flavor() string {
func (v VtctldReshard) exec(args ...string) {
args2 := []string{"Reshard", "--workflow=" + v.workflowName, "--target-keyspace=" + v.targetKeyspace}
args2 = append(args2, args...)
if err := vc.VtctldClient.ExecuteCommand(args2...); err != nil {
v.vc.t.Fatalf("failed to create Reshard workflow: %v", err)
var err error
if v.lastOutput, err = vc.VtctldClient.ExecuteCommandWithOutput(args2...); err != nil {
v.vc.t.Fatalf("failed to create Reshard workflow: %v: %s", err, v.lastOutput)
}
}

Expand All @@ -395,20 +402,22 @@ func (v VtctldReshard) Create() {
if v.skipSchemaCopy {
args = append(args, "--skip-schema-copy="+strconv.FormatBool(v.skipSchemaCopy))
}
args = append(args, v.createFlags...)
v.exec(args...)
}

func (v VtctldReshard) SwitchReadsAndWrites() {
v.exec("SwitchTraffic")
args := []string{"SwitchTraffic"}
args = append(args, v.switchFlags...)
v.exec(args...)
}

func (v VtctldReshard) ReverseReadsAndWrites() {
v.exec("ReverseTraffic")
}

func (v VtctldReshard) Show() {
//TODO implement me
panic("implement me")
v.exec("Show")
}

func (v VtctldReshard) SwitchReads() {
Expand All @@ -422,11 +431,15 @@ func (v VtctldReshard) SwitchWrites() {
}

func (v VtctldReshard) Cancel() {
v.exec("Cancel")
args := []string{"Cancel"}
args = append(args, v.cancelFlags...)
v.exec(args...)
}

func (v VtctldReshard) Complete() {
v.exec("Complete")
args := []string{"Complete"}
args = append(args, v.completeFlags...)
v.exec(args...)
}

func (v VtctldReshard) GetLastOutput() string {
Expand Down
11 changes: 9 additions & 2 deletions go/vt/vtctl/workflow/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1672,7 +1672,11 @@ func (s *Server) ReshardCreate(ctx context.Context, req *vtctldatapb.ReshardCrea
log.Errorf("%w", err2)
return nil, err
}
rs, err := s.buildResharder(ctx, keyspace, req.Workflow, req.SourceShards, req.TargetShards, strings.Join(cells, ","), "")
tabletTypesStr := topoproto.MakeStringTypeCSV(req.TabletTypes)
if req.TabletSelectionPreference == tabletmanagerdatapb.TabletSelectionPreference_INORDER {
tabletTypesStr = discovery.InOrderHint + tabletTypesStr
}
rs, err := s.buildResharder(ctx, keyspace, req.Workflow, req.SourceShards, req.TargetShards, strings.Join(cells, ","), tabletTypesStr)
if err != nil {
return nil, vterrors.Wrap(err, "buildResharder")
}
Expand All @@ -1695,7 +1699,10 @@ func (s *Server) ReshardCreate(ctx context.Context, req *vtctldatapb.ReshardCrea
} else {
log.Warningf("Streams will not be started since --auto-start is set to false")
}
return nil, nil
return s.WorkflowStatus(ctx, &vtctldatapb.WorkflowStatusRequest{
Keyspace: keyspace,
Workflow: req.Workflow,
})
}

// VDiffCreate is part of the vtctlservicepb.VtctldServer interface.
Expand Down