Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
37316cc
Update materialize source shard/position when in resharded keyspace
mattlord Mar 21, 2024
e5017e8
Draft fix
mattlord Mar 21, 2024
62fab28
Comment corrections
mattlord Mar 21, 2024
a0840f7
Add test
mattlord Mar 21, 2024
41a26f0
Merge remote-tracking branch 'origin/main' into reshard_materialize_s…
mattlord Mar 21, 2024
966b571
More test work
mattlord Mar 21, 2024
5c32193
Further adjustments
mattlord Mar 22, 2024
88b5314
WiP fixes
mattlord Mar 23, 2024
de73562
Merge remote-tracking branch 'origin/main' into reshard_materialize_s…
mattlord Mar 23, 2024
2b61e47
Cleanup debug logging
mattlord Mar 23, 2024
63f9ec6
Remaining tweaks and fixes to get existing tests working
mattlord Mar 23, 2024
491f291
WiP fixes
mattlord Mar 25, 2024
6e55219
Merge remote-tracking branch 'origin/main' into reshard_materialize_s…
mattlord Mar 29, 2024
eff3a6b
Post merge fixups
mattlord Mar 29, 2024
f1b4a3e
WiP tweaks
mattlord Mar 29, 2024
b2bdda1
Further tweaks
mattlord Mar 31, 2024
fbaba41
Alter controller picker options via comment
mattlord Apr 1, 2024
bcb1c92
Don't create redundant streams on multi switches
mattlord Apr 2, 2024
35eb3b2
Enable all e2e test code again
mattlord Apr 2, 2024
6bdb582
Merge remote-tracking branch 'origin/main' into reshard_materialize_s…
mattlord Apr 2, 2024
f1414f5
Fix unit tests
mattlord Apr 2, 2024
cd5ff0f
Properly prevent creation of duplicate streams
mattlord Apr 2, 2024
0ed4d72
Relax throttler
mattlord Apr 2, 2024
444a6f6
Minor changes after self review
mattlord Apr 2, 2024
af518eb
Add background customer data generation
mattlord Apr 3, 2024
1504bd5
Handle multiple intra-keyspace workflows
mattlord Apr 3, 2024
9ec5cc1
Test with multiple intra-keyspace materializations
mattlord Apr 3, 2024
ffef83d
Update test comment and misc
mattlord Apr 3, 2024
84a756f
Specify simulated NULL for new StopPosition field
mattlord Apr 3, 2024
995d687
Several minor changes...
mattlord Apr 3, 2024
0d090f6
Specify simulated NULL for StopPosition in vtctldclient usage
mattlord Apr 3, 2024
0ce2cde
Missed protobuf change
mattlord Apr 3, 2024
3027347
Merge remote-tracking branch 'origin/main' into reshard_materialize_s…
mattlord Apr 3, 2024
b2388a2
Generate much more data to stress the system
mattlord Apr 5, 2024
3adf5cb
Address review comment
mattlord Apr 12, 2024
9a37232
Address review comment part 2
mattlord Apr 12, 2024
bb0c7be
Address minor review comments
mattlord Apr 15, 2024
20ed6be
Remove unused stop_position related code
mattlord Apr 15, 2024
e9320b5
Minor changes after self review post addressing comments
mattlord Apr 15, 2024
d9af228
Merge remote-tracking branch 'origin/main' into reshard_materialize_s…
mattlord Apr 15, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion go/test/endtoend/throttler/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,10 @@ import (
"vitess.io/vitess/go/test/endtoend/cluster"
"vitess.io/vitess/go/vt/concurrency"
"vitess.io/vitess/go/vt/log"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/base"
"vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp"

topodatapb "vitess.io/vitess/go/vt/proto/topodata"
)

type Config struct {
Expand Down
42 changes: 42 additions & 0 deletions go/test/endtoend/vreplication/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,22 @@ create table nopk (name varchar(128), age int unsigned);
"sequence": "customer_seq"
}
},
"customer_name": {
"column_vindexes": [
{
"column": "cid",
"name": "xxhash"
}
]
},
"customer_type": {
"column_vindexes": [
{
"column": "cid",
"name": "xxhash"
}
]
},
"customer2": {
"column_vindexes": [
{
Expand Down Expand Up @@ -401,6 +417,32 @@ create table nopk (name varchar(128), age int unsigned);
"create_ddl": "create table cproduct(pid bigint, description varchar(128), date1 datetime not null default '0000-00-00 00:00:00', date2 datetime not null default '2021-00-01 00:00:00', primary key(pid)) CHARSET=utf8mb4"
}]
}
`

materializeCustomerNameSpec = `
{
"workflow": "customer_name",
"source_keyspace": "customer",
"target_keyspace": "customer",
"table_settings": [{
"target_table": "customer_name",
"source_expression": "select cid, name from customer",
"create_ddl": "create table if not exists customer_name (cid bigint not null, name varchar(128), primary key(cid), key(name))"
}]
}
`

materializeCustomerTypeSpec = `
{
"workflow": "customer_type",
"source_keyspace": "customer",
"target_keyspace": "customer",
"table_settings": [{
"target_table": "customer_type",
"source_expression": "select cid, typ from customer",
"create_ddl": "create table if not exists customer_type (cid bigint not null, typ enum('individual','soho','enterprise'), primary key(cid), key(typ))"
}]
}
`

merchantOrdersVSchema = `
Expand Down
127 changes: 101 additions & 26 deletions go/test/endtoend/vreplication/resharding_workflows_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@ limitations under the License.
package vreplication

import (
"context"
"encoding/json"
"fmt"
"net"
"strconv"
"strings"
"sync"
"testing"
"time"

Expand All @@ -30,6 +32,7 @@ import (
"google.golang.org/protobuf/encoding/protojson"

"vitess.io/vitess/go/test/endtoend/cluster"
"vitess.io/vitess/go/test/endtoend/throttler"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/wrangler"

Expand Down Expand Up @@ -335,8 +338,8 @@ func validateReadsRoute(t *testing.T, tabletTypes string, tablet *cluster.Vttabl
for _, tt := range []string{"replica", "rdonly"} {
destination := fmt.Sprintf("%s:%s@%s", tablet.Keyspace, tablet.Shard, tt)
if strings.Contains(tabletTypes, tt) {
readQuery := "select * from customer"
assertQueryExecutesOnTablet(t, vtgateConn, tablet, destination, readQuery, readQuery)
readQuery := "select cid from customer limit 10"
assertQueryExecutesOnTablet(t, vtgateConn, tablet, destination, readQuery, "select cid from customer limit :vtg1")
}
}
}
Expand All @@ -355,7 +358,7 @@ func validateWritesRouteToSource(t *testing.T) {
insertQuery := "insert into customer(name, cid) values('tempCustomer2', 200)"
matchInsertQuery := "insert into customer(`name`, cid) values"
assertQueryExecutesOnTablet(t, vtgateConn, sourceTab, "customer", insertQuery, matchInsertQuery)
execVtgateQuery(t, vtgateConn, "customer", "delete from customer where cid > 100")
execVtgateQuery(t, vtgateConn, "customer", "delete from customer where cid = 200")
}

func validateWritesRouteToTarget(t *testing.T) {
Expand All @@ -366,7 +369,7 @@ func validateWritesRouteToTarget(t *testing.T) {
assertQueryExecutesOnTablet(t, vtgateConn, targetTab2, "customer", insertQuery, matchInsertQuery)
insertQuery = "insert into customer(name, cid) values('tempCustomer3', 102)"
assertQueryExecutesOnTablet(t, vtgateConn, targetTab1, "customer", insertQuery, matchInsertQuery)
execVtgateQuery(t, vtgateConn, "customer", "delete from customer where cid > 100")
execVtgateQuery(t, vtgateConn, "customer", "delete from customer where cid in (101, 102)")
}

func revert(t *testing.T, workflowType string) {
Expand Down Expand Up @@ -534,6 +537,31 @@ func testReshardV2Workflow(t *testing.T) {
defer closeConn()
currentWorkflowType = binlogdatapb.VReplicationWorkflowType_Reshard

// Generate customer records in the background for the rest of the test
// in order to confirm that no writes are lost in either the customer
// table or the customer_name and customer_type materializations
// against it during the Reshard and all of the traffic switches.
dataGenCtx, dataGenCancel := context.WithCancel(context.Background())
defer dataGenCancel()
dataGenConn, dataGenCloseConn := getVTGateConn()
defer dataGenCloseConn()
dataGenWg := sync.WaitGroup{}
dataGenWg.Add(1)
go func() {
defer dataGenWg.Done()
id := 1000
for {
select {
case <-dataGenCtx.Done():
return
default:
_ = execVtgateQuery(t, dataGenConn, "customer", fmt.Sprintf("insert into customer (cid, name) values (%d, 'tempCustomer%d')", id, id))
}
time.Sleep(1 * time.Millisecond)
id++
}
}()

// create internal tables on the original customer shards that should be
// ignored and not show up on the new shards
execMultipleQueries(t, vtgateConn, targetKs+"/-80", internalSchema)
Expand All @@ -553,32 +581,45 @@ func testReshardV2Workflow(t *testing.T) {
testWorkflowUpdate(t)

testRestOfWorkflow(t)

// Confirm that we lost no customer related writes during the Reshard.
dataGenCancel()
dataGenWg.Wait()
cres := execVtgateQuery(t, dataGenConn, "customer", "select count(*) from customer")
require.Len(t, cres.Rows, 1)
waitForNoWorkflowLag(t, vc, "customer", "customer_name")
cnres := execVtgateQuery(t, dataGenConn, "customer", "select count(*) from customer_name")
require.Len(t, cnres.Rows, 1)
require.EqualValues(t, cres.Rows, cnres.Rows)
waitForNoWorkflowLag(t, vc, "customer", "customer_type")
ctres := execVtgateQuery(t, dataGenConn, "customer", "select count(*) from customer_type")
require.Len(t, ctres.Rows, 1)
require.EqualValues(t, cres.Rows, ctres.Rows)
if debugMode {
t.Logf("Done inserting customer data. Record counts in customer: %s, customer_name: %s, customer_type: %s",
cres.Rows[0][0].ToString(), cnres.Rows[0][0].ToString(), ctres.Rows[0][0].ToString())
}
// We also do a vdiff on the materialize workflows for good measure.
doVtctldclientVDiff(t, "customer", "customer_name", "", nil)
doVtctldclientVDiff(t, "customer", "customer_type", "", nil)
}

func testMoveTablesV2Workflow(t *testing.T) {
vtgateConn, closeConn := getVTGateConn()
defer closeConn()
currentWorkflowType = binlogdatapb.VReplicationWorkflowType_MoveTables

// test basic forward and reverse flows
setupCustomerKeyspace(t)
// The purge table should get skipped/ignored
// If it's not then we'll get an error as the table doesn't exist in the vschema
createMoveTablesWorkflow(t, "customer,loadtest,vdiff_order,reftable,_vt_PURGE_4f9194b43b2011eb8a0104ed332e05c2_20221210194431")
waitForWorkflowState(t, vc, ksWorkflow, binlogdatapb.VReplicationWorkflowState_Running.String())
validateReadsRouteToSource(t, "replica")
validateWritesRouteToSource(t)

// Verify that we've properly ignored any internal operational tables
// and that they were not copied to the new target keyspace
verifyNoInternalTables(t, vtgateConn, targetKs)

testReplicatingWithPKEnumCols(t)

// Confirm that updating MoveTable workflows works.
testWorkflowUpdate(t)
materializeShow := func() {
if !debugMode {
return
}
output, err := vc.VtctldClient.ExecuteCommandWithOutput("materialize", "--target-keyspace=customer", "show", "--workflow=customer_name", "--compact", "--include-logs=false")
require.NoError(t, err)
t.Logf("Materialize show output: %s", output)
}

testRestOfWorkflow(t)
// Test basic forward and reverse flows.
setupCustomerKeyspace(t)

listOutputContainsWorkflow := func(output string, workflow string) bool {
workflows := []string{}
Expand All @@ -597,12 +638,39 @@ func testMoveTablesV2Workflow(t *testing.T) {
require.NoError(t, err)
return len(workflows) == 0
}

listAllArgs := []string{"workflow", "--keyspace", "customer", "list"}

output, err := vc.VtctldClient.ExecuteCommandWithOutput(listAllArgs...)
require.NoError(t, err)
require.True(t, listOutputIsEmpty(output))

// The purge table should get skipped/ignored
// If it's not then we'll get an error as the table doesn't exist in the vschema
createMoveTablesWorkflow(t, "customer,loadtest,vdiff_order,reftable,_vt_PURGE_4f9194b43b2011eb8a0104ed332e05c2_20221210194431")
waitForWorkflowState(t, vc, ksWorkflow, binlogdatapb.VReplicationWorkflowState_Running.String())
validateReadsRouteToSource(t, "replica")
validateWritesRouteToSource(t)

// Verify that we've properly ignored any internal operational tables
// and that they were not copied to the new target keyspace
verifyNoInternalTables(t, vtgateConn, targetKs)

testReplicatingWithPKEnumCols(t)

// Confirm that updating MoveTable workflows works.
testWorkflowUpdate(t)

testRestOfWorkflow(t)
// Create our primary intra-keyspace materialization.
materialize(t, materializeCustomerNameSpec, false)
// Create a second one to confirm that multiple ones get migrated correctly.
materialize(t, materializeCustomerTypeSpec, false)
materializeShow()

output, err = vc.VtctldClient.ExecuteCommandWithOutput(listAllArgs...)
require.NoError(t, err)
require.True(t, listOutputContainsWorkflow(output, "customer_name") && listOutputContainsWorkflow(output, "customer_type") && !listOutputContainsWorkflow(output, "wf1"))

testVSchemaForSequenceAfterMoveTables(t)

// Confirm that the auto_increment clause on customer.cid was removed.
Expand All @@ -616,14 +684,14 @@ func testMoveTablesV2Workflow(t *testing.T) {
createMoveTablesWorkflow(t, "Lead,Lead-1")
output, err = vc.VtctldClient.ExecuteCommandWithOutput(listAllArgs...)
require.NoError(t, err)
require.True(t, listOutputContainsWorkflow(output, "wf1"))
require.True(t, listOutputContainsWorkflow(output, "wf1") && listOutputContainsWorkflow(output, "customer_name") && listOutputContainsWorkflow(output, "customer_type"))

err = tstWorkflowCancel(t)
require.NoError(t, err)

output, err = vc.VtctldClient.ExecuteCommandWithOutput(listAllArgs...)
require.NoError(t, err)
require.True(t, listOutputIsEmpty(output))
require.True(t, listOutputContainsWorkflow(output, "customer_name") && listOutputContainsWorkflow(output, "customer_type") && !listOutputContainsWorkflow(output, "wf1"))
}

func testPartialSwitches(t *testing.T) {
Expand Down Expand Up @@ -671,6 +739,11 @@ func testPartialSwitches(t *testing.T) {
}

func testRestOfWorkflow(t *testing.T) {
// Relax the throttler so that it does not cause switches to fail because it can block
// the catchup for the intra-keyspace materialization.
res, err := throttler.UpdateThrottlerTopoConfigRaw(vc.VtctldClient, "customer", true, false, throttlerConfig.Threshold*5, throttlerConfig.Query, nil)
require.NoError(t, err, res)

testPartialSwitches(t)

// test basic forward and reverse flows
Expand Down Expand Up @@ -732,12 +805,14 @@ func testRestOfWorkflow(t *testing.T) {
validateWritesRouteToSource(t)

// trying to complete an unswitched workflow should error
err := tstWorkflowComplete(t)
err = tstWorkflowComplete(t)
require.Error(t, err)
require.Contains(t, err.Error(), wrangler.ErrWorkflowNotFullySwitched)

// fully switch and complete
waitForLowLag(t, "customer", "wf1")
waitForLowLag(t, "customer", "customer_name")
waitForLowLag(t, "customer", "customer_type")
tstWorkflowSwitchReadsAndWrites(t)
validateReadsRoute(t, "rdonly", targetRdonlyTab1)
validateReadsRouteToTarget(t, "replica")
Expand Down
29 changes: 21 additions & 8 deletions go/vt/vtctl/workflow/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -3302,22 +3302,35 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit
return 0, sw.logs(), nil
}

// We stop writes on the source before stopping the source streams so that the catchup time
// is lessened and other workflows that we have to migrate such as intra-keyspace materialize
// workflows also have a chance to catch up as well because those are internally generated
// GTIDs within the shards we're switching traffic away from.
// For intra-keyspace materialization streams that we migrate where the source and target are
// the keyspace being resharded, we wait for those to catchup in the stopStreams path before
// we actually stop them.
ts.Logger().Infof("Stopping source writes")
if err := sw.stopSourceWrites(ctx); err != nil {
sw.cancelMigration(ctx, sm)
return handleError(fmt.Sprintf("failed to stop writes in the %s keyspace", ts.SourceKeyspaceName()), err)
}

ts.Logger().Infof("Stopping streams")
sourceWorkflows, err = sw.stopStreams(ctx, sm)
// Use a shorter context for this since since when doing a Reshard, if there are intra-keyspace
// materializations then we have to wait for them to catchup before switching traffic for the
// Reshard workflow. We use the the same timeout value here that is used for VReplication catchup
// with the inter-keyspace workflows.
stopCtx, stopCancel := context.WithTimeout(ctx, timeout)
defer stopCancel()
sourceWorkflows, err = sw.stopStreams(stopCtx, sm)
if err != nil {
for key, streams := range sm.Streams() {
for _, stream := range streams {
ts.Logger().Errorf("stream in stopStreams: key %s shard %s stream %+v", key, stream.BinlogSource.Shard, stream.BinlogSource)
}
}
sw.cancelMigration(ctx, sm)
return handleError("failed to stop the workflow streams", err)
}

ts.Logger().Infof("Stopping source writes")
if err := sw.stopSourceWrites(ctx); err != nil {
sw.cancelMigration(ctx, sm)
return handleError(fmt.Sprintf("failed to stop writes in the %s keyspace", ts.SourceKeyspaceName()), err)
return handleError(fmt.Sprintf("failed to stop the workflow streams in the %s keyspace", ts.SourceKeyspaceName()), err)
}

if ts.MigrationType() == binlogdatapb.MigrationType_TABLES {
Expand Down
Loading