Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ require (
github.com/martini-contrib/render v0.0.0-20150707142108-ec18f8345a11
github.com/mattn/go-sqlite3 v1.14.0
github.com/minio/minio-go v0.0.0-20190131015406-c8a261de75c1
github.com/mitchellh/go-ps v1.0.0 // indirect
github.com/mitchellh/go-testing-interface v1.14.0 // indirect
github.com/mitchellh/mapstructure v1.2.3 // indirect
github.com/montanaflynn/stats v0.6.3
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -488,6 +488,8 @@ github.com/mitchellh/cli v1.1.0/go.mod h1:xcISNoH86gajksDmfB23e/pu+B+GeFRMYmoHXx
github.com/mitchellh/go-homedir v1.0.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0=
github.com/mitchellh/go-homedir v1.1.0 h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG+4E0Y=
github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0=
github.com/mitchellh/go-ps v1.0.0 h1:i6ampVEEF4wQFF+bkYfwYgY+F/uYJDktmvLPf7qIgjc=
github.com/mitchellh/go-ps v1.0.0/go.mod h1:J4lOc8z8yJs6vUwklHw2XEIiT4z4C40KtWVN3nvg8Pg=
github.com/mitchellh/go-testing-interface v1.0.0/go.mod h1:kRemZodwjscx+RGhAo8eIhFbs2+BFgRtFPeD/KE+zxI=
github.com/mitchellh/go-testing-interface v1.14.0 h1:/x0XQ6h+3U3nAyk1yx+bHPURrKa9sVVvYbuqZ7pIAtI=
github.com/mitchellh/go-testing-interface v1.14.0/go.mod h1:gfgS7OtZj6MA4U1UrDRp04twqAjfvlZyCfX3sDjEym8=
Expand Down
20 changes: 10 additions & 10 deletions go/test/endtoend/vreplication/resharding_workflows_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ const (
)

const (
workflowActionStart = "Start"
workflowActionCreate = "Create"
workflowActionSwitchTraffic = "SwitchTraffic"
workflowActionReverseTraffic = "ReverseTraffic"
workflowActionComplete = "Complete"
Expand All @@ -58,9 +58,9 @@ var (
currentWorkflowType wrangler.VReplicationWorkflowType
)

func reshard2Start(t *testing.T, sourceShards, targetShards string) error {
func createReshardWorkflow(t *testing.T, sourceShards, targetShards string) error {
err := tstWorkflowExec(t, defaultCellName, workflowName, targetKs, targetKs,
"", workflowActionStart, "", sourceShards, targetShards)
"", workflowActionCreate, "", sourceShards, targetShards)
require.NoError(t, err)
time.Sleep(1 * time.Second)
catchup(t, targetTab1, workflowName, "Reshard")
Expand All @@ -69,12 +69,12 @@ func reshard2Start(t *testing.T, sourceShards, targetShards string) error {
return nil
}

func moveTables2Start(t *testing.T, tables string) error {
func createMoveTablesWorkflow(t *testing.T, tables string) error {
if tables == "" {
tables = tablesToMove
}
err := tstWorkflowExec(t, defaultCellName, workflowName, sourceKs, targetKs,
tables, workflowActionStart, "", "", "")
tables, workflowActionCreate, "", "", "")
require.NoError(t, err)
catchup(t, targetTab1, workflowName, "MoveTables")
catchup(t, targetTab2, workflowName, "MoveTables")
Expand All @@ -96,7 +96,7 @@ func tstWorkflowExec(t *testing.T, cells, workflow, sourceKs, targetKs, tables,
}
args = append(args, "-v2")
switch action {
case workflowActionStart:
case workflowActionCreate:
if currentWorkflowType == wrangler.MoveTablesWorkflow {
args = append(args, "-source", sourceKs, "-tables", tables)
} else {
Expand Down Expand Up @@ -244,7 +244,7 @@ func testReshardV2Workflow(t *testing.T) {
currentWorkflowType = wrangler.ReshardWorkflow

createAdditionalCustomerShards(t, "-40,40-80,80-c0,c0-")
reshard2Start(t, "-80,80-", "-40,40-80,80-c0,c0-")
createReshardWorkflow(t, "-80,80-", "-40,40-80,80-c0,c0-")
if !strings.Contains(lastOutput, "Workflow started successfully") {
t.Fail()
}
Expand All @@ -259,7 +259,7 @@ func testMoveTablesV2Workflow(t *testing.T) {

// test basic forward and reverse flows
setupCustomerKeyspace(t)
moveTables2Start(t, "customer")
createMoveTablesWorkflow(t, "customer")
if !strings.Contains(lastOutput, "Workflow started successfully") {
t.Fail()
}
Expand All @@ -272,7 +272,7 @@ func testMoveTablesV2Workflow(t *testing.T) {
output, _ := vc.VtctlClient.ExecuteCommandWithOutput(listAllArgs...)
require.Contains(t, output, "No workflows found in keyspace customer")

moveTables2Start(t, "customer2")
createMoveTablesWorkflow(t, "customer2")
output, _ = vc.VtctlClient.ExecuteCommandWithOutput(listAllArgs...)
require.Contains(t, output, "Following workflow(s) found in keyspace customer: wf1")

Expand Down Expand Up @@ -523,7 +523,7 @@ func moveCustomerTableSwitchFlows(t *testing.T, cells []*Cell, sourceCellOrAlias
switchWrites(t, ksWorkflow, false)
validateWritesRouteToTarget(t)

switchWrites(t, ksWorkflow, true)
switchWrites(t, reverseKsWorkflow, true)
validateWritesRouteToSource(t)

validateReadsRouteToSource(t, "replica")
Expand Down
4 changes: 3 additions & 1 deletion go/test/endtoend/vreplication/vreplication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -637,7 +637,9 @@ func switchReadsDryRun(t *testing.T, cells, ksWorkflow string, dryRunResults []s
}

func switchReads(t *testing.T, cells, ksWorkflow string) {
output, err := vc.VtctlClient.ExecuteCommandWithOutput("SwitchReads", "-cells="+cells, "-tablet_type=rdonly", ksWorkflow)
var output string
var err error
output, err = vc.VtctlClient.ExecuteCommandWithOutput("SwitchReads", "-cells="+cells, "-tablet_type=rdonly", ksWorkflow)
require.NoError(t, err, fmt.Sprintf("SwitchReads Error: %s: %s", err, output))
output, err = vc.VtctlClient.ExecuteCommandWithOutput("SwitchReads", "-cells="+cells, "-tablet_type=replica", ksWorkflow)
require.NoError(t, err, fmt.Sprintf("SwitchReads Error: %s: %s", err, output))
Expand Down
2 changes: 1 addition & 1 deletion go/test/endtoend/vreplication/vreplication_test_env.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ var dryRunResultsSwitchWritesCustomerShard = []string{

var dryRunResultsReadCustomerShard = []string{
"Lock keyspace product",
"Switch reads for tables [customer] to keyspace customer for tablet types [REPLICA]",
"Switch reads for tables [customer] to keyspace customer for tablet types [REPLICA,RDONLY]",
"Routing rules for tables [customer] will be updated",
"Unlock keyspace product",
}
Expand Down
16 changes: 10 additions & 6 deletions go/vt/vtctl/vtctl.go
Original file line number Diff line number Diff line change
Expand Up @@ -1954,7 +1954,7 @@ func commandMoveTables(ctx context.Context, wr *wrangler.Wrangler, subFlags *fla
type VReplicationWorkflowAction string

const (
vReplicationWorkflowActionStart = "start"
vReplicationWorkflowActionCreate = "create"
vReplicationWorkflowActionSwitchTraffic = "switchtraffic"
vReplicationWorkflowActionReverseTraffic = "reversetraffic"
vReplicationWorkflowActionComplete = "complete"
Expand Down Expand Up @@ -2052,7 +2052,7 @@ func commandVRWorkflow(ctx context.Context, wr *wrangler.Wrangler, subFlags *fla
originalAction := action
action = strings.ToLower(action) // allow users to input action in a case-insensitive manner
switch action {
case vReplicationWorkflowActionStart:
case vReplicationWorkflowActionCreate:
switch workflowType {
case wrangler.MoveTablesWorkflow:
if *sourceKeyspace == "" {
Expand Down Expand Up @@ -2085,6 +2085,9 @@ func commandVRWorkflow(ctx context.Context, wr *wrangler.Wrangler, subFlags *fla
case vReplicationWorkflowActionSwitchTraffic, vReplicationWorkflowActionReverseTraffic:
vrwp.Cells = *cells
vrwp.TabletTypes = *tabletTypes
if vrwp.TabletTypes == "" {
vrwp.TabletTypes = "master,replica,rdonly"
}
vrwp.Timeout = *timeout
vrwp.EnableReverseReplication = *reverseReplication
case vReplicationWorkflowActionCancel:
Expand All @@ -2105,7 +2108,7 @@ func commandVRWorkflow(ctx context.Context, wr *wrangler.Wrangler, subFlags *fla
log.Warningf("NewVReplicationWorkflow returned error %+v", wf)
return err
}
if !wf.Exists() && action != vReplicationWorkflowActionStart {
if !wf.Exists() && action != vReplicationWorkflowActionCreate {
return fmt.Errorf("workflow %s does not exist", ksWorkflow)
}

Expand All @@ -2117,7 +2120,7 @@ func commandVRWorkflow(ctx context.Context, wr *wrangler.Wrangler, subFlags *fla
if copyProgress == nil {
wr.Logger().Printf("\nCopy Completed.\n")
} else {
wr.Logger().Printf("\nCopy Progress (approx.):\n")
wr.Logger().Printf("\nCopy Progress (approx):\n")
var tables []string
for table := range *copyProgress {
tables = append(tables, table)
Expand Down Expand Up @@ -2154,8 +2157,8 @@ func commandVRWorkflow(ctx context.Context, wr *wrangler.Wrangler, subFlags *fla
return printDetails()
case vReplicationWorkflowActionProgress:
return printCopyProgress()
case vReplicationWorkflowActionStart:
err = wf.Start()
case vReplicationWorkflowActionCreate:
err = wf.Create()
if err != nil {
return err
}
Expand Down Expand Up @@ -2200,6 +2203,7 @@ func commandVRWorkflow(ctx context.Context, wr *wrangler.Wrangler, subFlags *fla
case progress := <-progressCh:
if progress.running == progress.total {
wr.Logger().Printf("\nWorkflow started successfully with %d stream(s)\n", progress.total)
printDetails()
return nil
}
wr.Logger().Printf("%d%% ... ", 100*progress.running/progress.total)
Expand Down
1 change: 0 additions & 1 deletion go/vt/vtgate/discoverygateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,6 @@ func TestDiscoveryGatewayWaitForTablets(t *testing.T) {
},
},
}

dg := NewDiscoveryGateway(context.Background(), hc, srvTopo, "local", 2)

// replica should only use local ones
Expand Down
4 changes: 3 additions & 1 deletion go/vt/vttablet/tabletmanager/vreplication/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,9 @@ func init() {
withDDL = withddl.New(allddls)
}

var tabletTypesStr = flag.String("vreplication_tablet_type", "REPLICA", "comma separated list of tablet types used as a source")
// this are the default tablet_types that will be used by the tablet picker to find sources for a vreplication stream
// it can be overridden by passing a different list to the MoveTables or Reshard commands
var tabletTypesStr = flag.String("vreplication_tablet_type", "MASTER,REPLICA", "comma separated list of tablet types used as a source")

// waitRetryTime can be changed to a smaller value for tests.
// A VReplication stream can be created by sending an insert statement
Expand Down
60 changes: 58 additions & 2 deletions go/vt/wrangler/traffic_switcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import (
"sync"
"time"

"vitess.io/vitess/go/vt/topotools"

"vitess.io/vitess/go/vt/vtgate/evalengine"

"vitess.io/vitess/go/vt/log"
Expand Down Expand Up @@ -346,6 +348,39 @@ func (wr *Wrangler) getWorkflowState(ctx context.Context, targetKeyspace, workfl
return ts, ws, nil
}

func (wr *Wrangler) doCellsHaveRdonlyTablets(ctx context.Context, cells []string) (bool, error) {
areAnyRdonly := func(tablets []*topo.TabletInfo) bool {
for _, tablet := range tablets {
if tablet.Type == topodatapb.TabletType_RDONLY {
return true
}
}
return false
}

if len(cells) == 0 {
tablets, err := topotools.GetAllTabletsAcrossCells(ctx, wr.ts)
if err != nil {
return false, err
}
if areAnyRdonly(tablets) {
return true, nil
}

} else {
for _, cell := range cells {
tablets, err := topotools.GetAllTablets(ctx, wr.ts, cell)
if err != nil {
return false, err
}
if areAnyRdonly(tablets) {
return true, nil
}
}
}
return false, nil
}

// SwitchReads is a generic way of switching read traffic for a resharding workflow.
func (wr *Wrangler) SwitchReads(ctx context.Context, targetKeyspace, workflow string, servedTypes []topodatapb.TabletType,
cells []string, direction TrafficSwitchDirection, dryRun bool) (*[]string, error) {
Expand All @@ -360,7 +395,8 @@ func (wr *Wrangler) SwitchReads(ctx context.Context, targetKeyspace, workflow st
wr.Logger().Errorf(errorMsg)
return nil, fmt.Errorf(errorMsg)
}
wr.Logger().Infof("SwitchReads: %s.%s tt %+v, cells %+v, workflow state: %+v", targetKeyspace, workflow, servedTypes, cells, ws)
log.Infof("SwitchReads: %s.%s tt %+v, cells %+v, workflow state: %+v", targetKeyspace, workflow, servedTypes, cells, ws)
var switchReplicas, switchRdonly bool
for _, servedType := range servedTypes {
if servedType != topodatapb.TabletType_REPLICA && servedType != topodatapb.TabletType_RDONLY {
return nil, fmt.Errorf("tablet type must be REPLICA or RDONLY: %v", servedType)
Expand All @@ -371,6 +407,26 @@ func (wr *Wrangler) SwitchReads(ctx context.Context, targetKeyspace, workflow st
if direction == DirectionBackward && servedType == topodatapb.TabletType_RDONLY && len(ws.RdonlyCellsSwitched) == 0 {
return nil, fmt.Errorf("requesting reversal of SwitchReads for RDONLYs but RDONLY reads have not been switched")
}
switch servedType {
case topodatapb.TabletType_REPLICA:
switchReplicas = true
case topodatapb.TabletType_RDONLY:
switchRdonly = true
}
}

// if there are no rdonly tablets in the cells ask to switch rdonly tablets as well so that routing rules
// are updated for rdonly as well. Otherwise vitess will not know that the workflow has completed and will
// incorrectly report that not all reads have been switched. User currently is forced to switch non-existent rdonly tablets
if switchReplicas && !switchRdonly {
var err error
rdonlyTabletsExist, err := wr.doCellsHaveRdonlyTablets(ctx, cells)
if err != nil {
return nil, err
}
if !rdonlyTabletsExist {
servedTypes = append(servedTypes, topodatapb.TabletType_RDONLY)
}
}

// If journals exist notify user and fail
Expand All @@ -380,7 +436,7 @@ func (wr *Wrangler) SwitchReads(ctx context.Context, targetKeyspace, workflow st
return nil, err
}
if journalsExist {
wr.Logger().Errorf("Found a previous journal entry for %d", ts.id)
log.Infof("Found a previous journal entry for %d", ts.id)
}
var sw iswitcher
if dryRun {
Expand Down
5 changes: 3 additions & 2 deletions go/vt/wrangler/traffic_switcher_env_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,10 +242,11 @@ func newTestShardMigrater(ctx context.Context, t *testing.T, sourceShards, targe
tme.wr = New(logutil.NewConsoleLogger(), tme.ts, tmclient.NewTabletManagerClient())
tme.sourceShards = sourceShards
tme.targetShards = targetShards
tme.tmeDB = fakesqldb.New(t)

tabletID := 10
for _, shard := range sourceShards {
tme.sourceMasters = append(tme.sourceMasters, newFakeTablet(t, tme.wr, "cell1", uint32(tabletID), topodatapb.TabletType_MASTER, nil, TabletKeyspaceShard(t, "ks", shard)))
tme.sourceMasters = append(tme.sourceMasters, newFakeTablet(t, tme.wr, "cell1", uint32(tabletID), topodatapb.TabletType_MASTER, tme.tmeDB, TabletKeyspaceShard(t, "ks", shard)))
tabletID += 10

_, sourceKeyRange, err := topo.ValidateShardName(shard)
Expand All @@ -261,7 +262,7 @@ func newTestShardMigrater(ctx context.Context, t *testing.T, sourceShards, targe
}

for _, shard := range targetShards {
tme.targetMasters = append(tme.targetMasters, newFakeTablet(t, tme.wr, "cell1", uint32(tabletID), topodatapb.TabletType_MASTER, nil, TabletKeyspaceShard(t, "ks", shard)))
tme.targetMasters = append(tme.targetMasters, newFakeTablet(t, tme.wr, "cell1", uint32(tabletID), topodatapb.TabletType_MASTER, tme.tmeDB, TabletKeyspaceShard(t, "ks", shard)))
tabletID += 10

_, targetKeyRange, err := topo.ValidateShardName(shard)
Expand Down
33 changes: 15 additions & 18 deletions go/vt/wrangler/traffic_switcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,12 +177,12 @@ func TestTableMigrateMainflow(t *testing.T) {
"ks2.t1": {"ks1.t1"},
"t2": {"ks1.t2"},
"ks2.t2": {"ks1.t2"},
"t1@rdonly": {"ks2.t1"},
"ks2.t1@rdonly": {"ks2.t1"},
"ks1.t1@rdonly": {"ks2.t1"},
"t2@rdonly": {"ks2.t2"},
"ks2.t2@rdonly": {"ks2.t2"},
"ks1.t2@rdonly": {"ks2.t2"},
"t1@rdonly": {"ks1.t1"},
"ks2.t1@rdonly": {"ks1.t1"},
"ks1.t1@rdonly": {"ks1.t1"},
"t2@rdonly": {"ks1.t2"},
"ks2.t2@rdonly": {"ks1.t2"},
"ks1.t2@rdonly": {"ks1.t2"},
"t1@replica": {"ks1.t1"},
"ks2.t1@replica": {"ks1.t1"},
"ks1.t1@replica": {"ks1.t1"},
Expand Down Expand Up @@ -526,10 +526,10 @@ func TestShardMigrateMainflow(t *testing.T) {
checkCellServedTypes(t, tme.ts, "ks:40-", "cell1", 2)
checkCellServedTypes(t, tme.ts, "ks:-80", "cell1", 1)
checkCellServedTypes(t, tme.ts, "ks:80-", "cell1", 1)
checkCellServedTypes(t, tme.ts, "ks:-40", "cell2", 2)
checkCellServedTypes(t, tme.ts, "ks:40-", "cell2", 2)
checkCellServedTypes(t, tme.ts, "ks:-80", "cell2", 1)
checkCellServedTypes(t, tme.ts, "ks:80-", "cell2", 1)
checkCellServedTypes(t, tme.ts, "ks:-40", "cell2", 1)
checkCellServedTypes(t, tme.ts, "ks:40-", "cell2", 1)
checkCellServedTypes(t, tme.ts, "ks:-80", "cell2", 2)
checkCellServedTypes(t, tme.ts, "ks:80-", "cell2", 2)
verifyQueries(t, tme.allDBClients)

tme.expectNoPreviousJournals()
Expand Down Expand Up @@ -1764,7 +1764,7 @@ func checkCellRouting(t *testing.T, wr *Wrangler, cell string, want map[string][
got[rr.FromTable] = append(got[rr.FromTable], rr.ToTables...)
}
if !reflect.DeepEqual(got, want) {
t.Errorf("srv rules for cell %s:\n%v, want\n%v", cell, got, want)
t.Fatalf("ERROR: routing rules don't match for cell %s:got\n%v, want\n%v", cell, got, want)
}
}

Expand Down Expand Up @@ -1799,10 +1799,8 @@ func checkServedTypes(t *testing.T, ts *topo.Server, keyspaceShard string, want
if err != nil {
t.Fatal(err)
}

if len(servedTypes) != want {
t.Errorf("shard %v has wrong served types: got: %v, want: %v", keyspaceShard, len(servedTypes), want)
}
require.Equal(t, want, len(servedTypes), fmt.Sprintf("shard %v has wrong served types: got: %v, want: %v",
keyspaceShard, len(servedTypes), want))
}

func checkCellServedTypes(t *testing.T, ts *topo.Server, keyspaceShard, cell string, want int) {
Expand All @@ -1823,9 +1821,8 @@ outer:
}
}
}
if count != want {
t.Errorf("serving types for keyspaceShard %s, cell %s: %d, want %d", keyspaceShard, cell, count, want)
}
require.Equal(t, want, count, fmt.Sprintf("serving types for keyspaceShard %s, cell %s: %d, want %d",
keyspaceShard, cell, count, want))
}

func checkIsMasterServing(t *testing.T, ts *topo.Server, keyspaceShard string, want bool) {
Expand Down
Loading