-
Notifications
You must be signed in to change notification settings - Fork 2.3k
VSCopy: Enable to copy from all shards in either a specified keyspace or all keyspaces #11909
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
077136b
2f7adbc
3afc402
05e5ea0
013b2f0
fdb2b68
d83daaf
71e1681
436568d
fdc13e1
c39af7c
7e67f27
ca2dafd
24e69b9
dc98a42
c2138b5
6910cbc
16f10b3
6a791ba
c70fa39
cdc4077
6b2d9fe
a013980
cd119d4
f7e480f
e8f0e60
ba8c231
d730563
3b41d76
d7169ec
8970182
38eb575
256400f
94beeac
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||
|---|---|---|---|---|
|
|
@@ -177,6 +177,7 @@ func testVStreamWithFailover(t *testing.T, failover bool) { | |||
|
|
||||
| const schemaUnsharded = ` | ||||
| create table customer_seq(id int, next_id bigint, cache bigint, primary key(id)) comment 'vitess_sequence'; | ||||
| insert into customer_seq(id, next_id, cache) values(0, 1, 3); | ||||
| ` | ||||
| const vschemaUnsharded = ` | ||||
| { | ||||
|
|
@@ -218,14 +219,18 @@ const vschemaSharded = ` | |||
| func insertRow(keyspace, table string, id int) { | ||||
| vtgateConn.ExecuteFetch(fmt.Sprintf("use %s;", keyspace), 1000, false) | ||||
| vtgateConn.ExecuteFetch("begin", 1000, false) | ||||
| vtgateConn.ExecuteFetch(fmt.Sprintf("insert into %s (cid, name) values (%d, '%s%d')", table, id+100, table, id), 1000, false) | ||||
| _, err := vtgateConn.ExecuteFetch(fmt.Sprintf("insert into %s (name) values ('%s%d')", table, table, id), 1000, false) | ||||
| if err != nil { | ||||
| log.Infof("error inserting row %d: %v", id, err) | ||||
| } | ||||
| vtgateConn.ExecuteFetch("commit", 1000, false) | ||||
| } | ||||
|
|
||||
| type numEvents struct { | ||||
| numRowEvents, numJournalEvents int64 | ||||
| numLessThan80Events, numGreaterThan80Events int64 | ||||
| numLessThan40Events, numGreaterThan40Events int64 | ||||
| numRowEvents, numJournalEvents int64 | ||||
| numLessThan80Events, numGreaterThan80Events int64 | ||||
| numLessThan40Events, numGreaterThan40Events int64 | ||||
| numShard0BeforeReshardEvents, numShard0AfterReshardEvents int64 | ||||
| } | ||||
|
|
||||
| // tests the StopOnReshard flag | ||||
|
|
@@ -376,6 +381,150 @@ func testVStreamStopOnReshardFlag(t *testing.T, stopOnReshard bool, baseTabletID | |||
| return &ne | ||||
| } | ||||
|
|
||||
| // Validate that we can continue streaming from multiple keyspaces after first copying some tables and then resharding one of the keyspaces | ||||
| // Ensure that there are no missing row events during the resharding process. | ||||
| func testVStreamCopyMultiKeyspaceReshard(t *testing.T, baseTabletID int) numEvents { | ||||
| defaultCellName := "zone1" | ||||
| allCellNames = defaultCellName | ||||
| allCells := []string{allCellNames} | ||||
| vc = NewVitessCluster(t, "VStreamCopyMultiKeyspaceReshard", allCells, mainClusterConfig) | ||||
|
|
||||
| require.NotNil(t, vc) | ||||
| ogdr := defaultReplicas | ||||
| defaultReplicas = 0 // because of CI resource constraints we can only run this test with primary tablets | ||||
| defer func(dr int) { defaultReplicas = dr }(ogdr) | ||||
|
|
||||
| defer vc.TearDown(t) | ||||
|
|
||||
| defaultCell = vc.Cells[defaultCellName] | ||||
| vc.AddKeyspace(t, []*Cell{defaultCell}, "unsharded", "0", vschemaUnsharded, schemaUnsharded, defaultReplicas, defaultRdonly, baseTabletID+100, nil) | ||||
| vtgate = defaultCell.Vtgates[0] | ||||
| require.NotNil(t, vtgate) | ||||
| vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.primary", "unsharded", "0"), 1) | ||||
|
|
||||
| vtgateConn = getConnection(t, vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateMySQLPort) | ||||
| defer vtgateConn.Close() | ||||
| verifyClusterHealth(t, vc) | ||||
|
|
||||
| vc.AddKeyspace(t, []*Cell{defaultCell}, "sharded", "-80,80-", vschemaSharded, schemaSharded, defaultReplicas, defaultRdonly, baseTabletID+200, nil) | ||||
|
|
||||
| ctx := context.Background() | ||||
| vstreamConn, err := vtgateconn.Dial(ctx, fmt.Sprintf("%s:%d", vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateGrpcPort)) | ||||
| if err != nil { | ||||
| log.Fatal(err) | ||||
| } | ||||
| defer vstreamConn.Close() | ||||
| vgtid := &binlogdatapb.VGtid{ | ||||
| ShardGtids: []*binlogdatapb.ShardGtid{{ | ||||
| Keyspace: "/.*", | ||||
| }}} | ||||
|
|
||||
| filter := &binlogdatapb.Filter{ | ||||
| Rules: []*binlogdatapb.Rule{{ | ||||
| // We want to confirm that the following two tables are streamed. | ||||
| // 1. the customer_seq in the unsharded keyspace | ||||
| // 2. the customer table in the sharded keyspace | ||||
| Match: "/customer.*/", | ||||
| }}, | ||||
| } | ||||
| flags := &vtgatepb.VStreamFlags{} | ||||
| done := false | ||||
|
|
||||
| id := 1000 | ||||
| // First goroutine that keeps inserting rows into the table being streamed until a minute after reshard | ||||
| // We should keep getting events on the new shards | ||||
| go func() { | ||||
| for { | ||||
| if done { | ||||
| return | ||||
| } | ||||
| id++ | ||||
| time.Sleep(1 * time.Second) | ||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Any reason to sleep for so long? Not a problem, just curious how we landed on 1s when I think we're concerned with timing issues (although the race unit tests help there too).
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I referred to a similar test case that also verifies the VStream API during resharding and confirmed that 1 second works for this case as well.
If you feel that we need more time for this, let's discuss the rationale in the chat you set up for us. Thanks! |
||||
| insertRow("sharded", "customer", id) | ||||
| } | ||||
| }() | ||||
| // stream events from the VStream API | ||||
| var ne numEvents | ||||
| reshardDone := false | ||||
| go func() { | ||||
| var reader vtgateconn.VStreamReader | ||||
| reader, err = vstreamConn.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, filter, flags) | ||||
| require.NoError(t, err) | ||||
| for { | ||||
| evs, err := reader.Recv() | ||||
|
|
||||
| switch err { | ||||
| case nil: | ||||
| for _, ev := range evs { | ||||
| switch ev.Type { | ||||
| case binlogdatapb.VEventType_ROW: | ||||
| shard := ev.RowEvent.Shard | ||||
| switch shard { | ||||
| case "0": | ||||
| if reshardDone { | ||||
| ne.numShard0AfterReshardEvents++ | ||||
| } else { | ||||
| ne.numShard0BeforeReshardEvents++ | ||||
| } | ||||
| case "-80": | ||||
| ne.numLessThan80Events++ | ||||
| case "80-": | ||||
| ne.numGreaterThan80Events++ | ||||
| case "-40": | ||||
| ne.numLessThan40Events++ | ||||
| case "40-": | ||||
| ne.numGreaterThan40Events++ | ||||
| } | ||||
| ne.numRowEvents++ | ||||
| case binlogdatapb.VEventType_JOURNAL: | ||||
| ne.numJournalEvents++ | ||||
| } | ||||
| } | ||||
| case io.EOF: | ||||
| log.Infof("Stream Ended") | ||||
| done = true | ||||
| default: | ||||
| log.Errorf("Returned err %v", err) | ||||
| done = true | ||||
| } | ||||
| if done { | ||||
| return | ||||
| } | ||||
| } | ||||
| }() | ||||
|
|
||||
| ticker := time.NewTicker(1 * time.Second) | ||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I guess this is to line up with the other sleep. If so, we should make the duration a variable used in both places.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. NOTE: I think we need to address #11909 (comment) first.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. OK. Not a big deal, was mostly curious. |
||||
| tickCount := 0 | ||||
| for { | ||||
| <-ticker.C | ||||
| tickCount++ | ||||
| switch tickCount { | ||||
| case 1: | ||||
| reshard(t, "sharded", "customer", "vstreamCopyMultiKeyspaceReshard", "-80,80-", "-40,40-", baseTabletID+400, nil, nil, nil, defaultCellName, 1) | ||||
| reshardDone = true | ||||
| case 60: | ||||
| done = true | ||||
| } | ||||
| if done { | ||||
| break | ||||
| } | ||||
| } | ||||
| log.Infof("ne=%v", ne) | ||||
|
|
||||
| // The number of row events streamed by the VStream API should match the number of rows inserted. | ||||
| // This is important for sharded tables, where we need to ensure that no row events are missed during the resharding process. | ||||
| // | ||||
| // On the other hand, we don't verify the exact number of row events for the unsharded keyspace | ||||
| // because the keyspace remains unsharded and the number of rows in the customer_seq table is always 1. | ||||
| // We believe that checking the number of row events for the unsharded keyspace, which should always be greater than 0 before and after resharding, | ||||
| // is sufficient to confirm that the resharding of one keyspace does not affect another keyspace, while keeping the test straightforward. | ||||
| customerResult := execVtgateQuery(t, vtgateConn, "sharded", "select count(*) from customer") | ||||
| insertedCustomerRows, err := evalengine.ToInt64(customerResult.Rows[0][0]) | ||||
| require.NoError(t, err) | ||||
| require.Equal(t, insertedCustomerRows, ne.numLessThan80Events+ne.numGreaterThan80Events+ne.numLessThan40Events+ne.numGreaterThan40Events) | ||||
| return ne | ||||
| } | ||||
|
|
||||
| func TestVStreamFailover(t *testing.T) { | ||||
| testVStreamWithFailover(t, true) | ||||
| } | ||||
|
|
@@ -407,3 +556,15 @@ func TestVStreamWithKeyspacesToWatch(t *testing.T) { | |||
|
|
||||
| testVStreamWithFailover(t, false) | ||||
| } | ||||
|
|
||||
| func TestVStreamCopyMultiKeyspaceReshard(t *testing.T) { | ||||
| ne := testVStreamCopyMultiKeyspaceReshard(t, 3000) | ||||
| require.Equal(t, int64(0), ne.numJournalEvents) | ||||
| require.NotZero(t, ne.numRowEvents) | ||||
| require.NotZero(t, ne.numShard0BeforeReshardEvents) | ||||
| require.NotZero(t, ne.numShard0AfterReshardEvents) | ||||
| require.NotZero(t, ne.numLessThan80Events) | ||||
| require.NotZero(t, ne.numGreaterThan80Events) | ||||
| require.NotZero(t, ne.numLessThan40Events) | ||||
| require.NotZero(t, ne.numGreaterThan40Events) | ||||
mattlord marked this conversation as resolved.
Show resolved
Hide resolved
|
||||
| } | ||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -19,6 +19,7 @@ package endtoend | |
| import ( | ||
| "context" | ||
| "fmt" | ||
| osExec "os/exec" | ||
| "testing" | ||
|
|
||
| "github.com/stretchr/testify/assert" | ||
|
|
@@ -55,6 +56,16 @@ func TestCreateAndDropDatabase(t *testing.T) { | |
| require.NoError(t, err) | ||
| defer conn.Close() | ||
|
|
||
| // cleanup the keyspace from the topology. | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. A leaked "testitest" keyspace makes To fix it, 05e5ea0 adds a new cleanup function to delete the keyspace explicitly. |
||
| defer func() { | ||
| // the corresponding database needs to be created in advance. | ||
| // a subsequent DeleteKeyspace command returns the error of 'node doesn't exist' without it. | ||
mattlord marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| _ = exec(t, conn, "create database testitest") | ||
mattlord marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| _, err := osExec.Command("vtctldclient", "--server", grpcAddress, "DeleteKeyspace", "--recursive", "--force", "testitest").CombinedOutput() | ||
| require.NoError(t, err) | ||
| }() | ||
|
|
||
| // run it 3 times. | ||
| for count := 0; count < 3; count++ { | ||
| t.Run(fmt.Sprintf("exec:%d", count), func(t *testing.T) { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -24,13 +24,15 @@ import ( | |
| "github.com/stretchr/testify/require" | ||
|
|
||
| "vitess.io/vitess/go/mysql" | ||
| "vitess.io/vitess/go/test/endtoend/utils" | ||
| ) | ||
|
|
||
| func TestRowCount(t *testing.T) { | ||
| ctx := context.Background() | ||
| conn, err := mysql.Connect(ctx, &vtParams) | ||
| require.NoError(t, err) | ||
| defer conn.Close() | ||
| utils.Exec(t, conn, "use ks") | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This use statement is now necessary because the number of keyspace is multiple. 3afc402 |
||
| type tc struct { | ||
| query string | ||
| expected int | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.