diff --git a/go/test/endtoend/vreplication/partial_movetables_seq_test.go b/go/test/endtoend/vreplication/partial_movetables_seq_test.go index 959a0169950..66cc81dcaff 100644 --- a/go/test/endtoend/vreplication/partial_movetables_seq_test.go +++ b/go/test/endtoend/vreplication/partial_movetables_seq_test.go @@ -21,6 +21,7 @@ import ( "strings" "testing" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/tidwall/gjson" @@ -38,6 +39,50 @@ import ( As part of a separate cleanup we will build on this framework to replace the existing one. */ +var ( + seqVSchema = `{ + "sharded": false, + "tables": { + "customer_seq": { + "type": "sequence" + } + } + }` + seqSchema = `create table customer_seq(id int, next_id bigint, cache bigint, primary key(id)) comment 'vitess_sequence';` + commerceSchema = `create table customer(cid int, name varchar(128), ts timestamp(3) not null default current_timestamp(3), primary key(cid));` + commerceVSchema = ` + { + "tables": { + "customer": {} + } + } +` + customerSequenceVSchema = ` + { + "sharded": true, + "vindexes": { + "reverse_bits": { + "type": "reverse_bits" + } + }, + "tables": { + "customer": { + "column_vindexes": [ + { + "column": "cid", + "name": "reverse_bits" + } + ], + "auto_increment": { + "column": "cid", + "sequence": "customer_seq" + } + } + } + } + ` +) + type keyspace struct { name string vschema string @@ -74,50 +119,6 @@ type vrepTestCase struct { } func initPartialMoveTablesComplexTestCase(t *testing.T) *vrepTestCase { - const ( - seqVSchema = `{ - "sharded": false, - "tables": { - "customer_seq": { - "type": "sequence" - } - } - }` - seqSchema = `create table customer_seq(id int, next_id bigint, cache bigint, primary key(id)) comment 'vitess_sequence';` - commerceSchema = `create table customer(cid int, name varchar(128), ts timestamp(3) not null default current_timestamp(3), primary key(cid));` - commerceVSchema = ` - { - "tables": { - "customer": {} - } - } -` - customerSchema = "" - customerVSchema = ` - { - "sharded": true, - "vindexes": { - "reverse_bits": { - "type": "reverse_bits" - } - }, - "tables": { - "customer": { - "column_vindexes": [ - { - "column": "cid", - "name": "reverse_bits" - } - ], - "auto_increment": { - "column": "cid", - "sequence": "customer_seq" - } - } - } - } - ` - ) tc := &vrepTestCase{ t: t, testName: t.Name(), @@ -134,14 +135,14 @@ func initPartialMoveTablesComplexTestCase(t *testing.T) *vrepTestCase { } tc.keyspaces["customer"] = &keyspace{ name: "customer", - vschema: customerVSchema, - schema: customerSchema, + vschema: customerSequenceVSchema, + schema: "", baseID: 200, shards: []string{"-80", "80-"}, } tc.keyspaces["customer2"] = &keyspace{ name: "customer2", - vschema: customerVSchema, + vschema: customerSequenceVSchema, schema: "", baseID: 1200, shards: []string{"-80", "80-"}, @@ -165,6 +166,40 @@ func initPartialMoveTablesComplexTestCase(t *testing.T) *vrepTestCase { return tc } +func initSequenceResetTestCase(t *testing.T) *vrepTestCase { + tc := &vrepTestCase{ + t: t, + testName: t.Name(), + keyspaces: make(map[string]*keyspace), + defaultCellName: "zone1", + workflows: make(map[string]*workflow), + } + tc.keyspaces["commerce"] = &keyspace{ + name: "commerce", + vschema: commerceVSchema, + schema: commerceSchema, + baseID: 100, + shards: []string{"0"}, + } + tc.keyspaces["customer"] = &keyspace{ + name: "customer", + vschema: customerSequenceVSchema, + schema: "", + baseID: 200, + shards: []string{"-80", "80-"}, + } + tc.keyspaces["seqSrc"] = &keyspace{ + name: "seqSrc", + vschema: seqVSchema, + schema: seqSchema, + baseID: 400, + shards: []string{"0"}, + } + tc.setupCluster() + tc.initData() + return tc +} + func (tc *vrepTestCase) teardown() { tc.vtgateConn.Close() vc.TearDown() @@ -268,6 +303,93 @@ func (wf *workflow) complete() { require.NoError(wf.tc.t, tstWorkflowExec(wf.tc.t, wf.tc.defaultCellName, wf.name, wf.fromKeyspace, wf.toKeyspace, "", workflowActionComplete, "", "", "", defaultWorkflowExecOptions)) } +// TestSequenceResetOnSwitchTraffic tests that in-memory sequence info is +// reset when switching traffic back and forth between keyspaces during +// MoveTables workflow. This catches a bug where cached sequence values would +// persist after traffic switches, causing sequence generation to produce +// duplicate values in target keyspace. +func TestSequenceResetOnSwitchTraffic(t *testing.T) { + origExtraVTGateArgs := extraVTGateArgs + extraVTGateArgs = append(extraVTGateArgs, []string{ + "--enable-partial-keyspace-migration", + "--schema_change_signal=false", + }...) + defer func() { + extraVTGateArgs = origExtraVTGateArgs + }() + + tc := initSequenceResetTestCase(t) + defer tc.teardown() + + currentCustomerCount = getCustomerCount(t, "") + newCustomerCount = 4 + t.Run("Verify sequence reset during traffic switching", func(t *testing.T) { + tc.setupKeyspaces([]string{"customer"}) + wf := tc.newWorkflow("MoveTables", "customer", "commerce", "customer", &workflowOptions{ + tables: []string{"customer"}, + }) + wf.create() + + vtgateConn, closeConn := getVTGateConn() + defer closeConn() + + getSequenceNextID := func() int64 { + qr := execVtgateQuery(t, vtgateConn, "", "SELECT next_id FROM seqSrc.customer_seq WHERE id = 0") + nextID, _ := qr.Rows[0][0].ToInt64() + return nextID + } + + initialSeqValue := getSequenceNextID() + t.Logf("Initial sequence next_id: %d", initialSeqValue) + + wf.switchTraffic() + + insertCustomers(t) + + afterFirstSwitchSeqValue := getSequenceNextID() + t.Logf("After first switch sequence next_id: %d", afterFirstSwitchSeqValue) + require.Greater(t, afterFirstSwitchSeqValue, initialSeqValue, "Sequence should increment after inserting customers") + + wf.reverseTraffic() + + afterReverseSeqValue := getSequenceNextID() + t.Logf("After reverse switch sequence next_id: %d", afterReverseSeqValue) + + // Insert some random values when all writes are reversed back to + // source keyspace. We are inserting here rows with IDs 1004, 1005, + // 1006 (since the cache value was 1000) which would be the next + // in-memory sequence IDs for inserting any new rows in `customer` + // table if the sequence info isn't reset. This will result in + // duplicate primary key value error in the next insert. + // + // Hence, this way we verify that even if there are any new + // values inserted after the traffic has been reversed, the in-memory + // sequence info is reset, so that on switching back the traffic to + // the target keyspace the tablet refetches the next_id from the + // sequence table for generating the next insert ID. + _, err := tc.vtgateConn.ExecuteFetch("insert into customer(cid, name) values(1004, 'customer8'), (1005, 'customer9'),(1006, 'customer10')", 1000, false) + require.NoError(t, err) + _, err = tc.vtgateConn.ExecuteFetch("insert into customer(cid, name) values(2004, 'customer11'), (2005, 'customer12'),(2006, 'customer13')", 1000, false) + require.NoError(t, err) + + wf.switchTraffic() + + afterSecondSwitchSeqValue := getSequenceNextID() + // Since the highest ID before switching traffic was 2026, which is + // greater than 2000 (the expected next_id from sequence table before switch.) + assert.Equal(t, int64(2007), afterSecondSwitchSeqValue) + + currentCustomerCount = getCustomerCount(t, "after second switch") + newCustomerCount = 4 + insertCustomers(t) + + finalSeqValue := getSequenceNextID() + assert.Equal(t, int64(3007), finalSeqValue, "Since the cache is set to 1000, next_id is expected to be incremented to 3007") + + wf.complete() + }) +} + // TestPartialMoveTablesWithSequences enhances TestPartialMoveTables by adding an unsharded keyspace which has a // sequence. This tests that the sequence is migrated correctly and that we can reverse traffic back to the source func TestPartialMoveTablesWithSequences(t *testing.T) { @@ -324,7 +446,6 @@ func TestPartialMoveTablesWithSequences(t *testing.T) { targetKs := "customer2" shard := "80-" var wf80Dash, wfDash80 *workflow - currentCustomerCount = getCustomerCount(t, "before customer2.80-") vtgateConn, closeConn := getVTGateConn() t.Run("Start MoveTables on customer2.80-", func(t *testing.T) { // Now setup the customer2 keyspace so we can do a partial move tables for one of the two shards: 80-. @@ -336,14 +457,11 @@ func TestPartialMoveTablesWithSequences(t *testing.T) { }) wf80Dash.create() - currentCustomerCount = getCustomerCount(t, "after customer2.80-") waitForRowCount(t, vtgateConn, "customer2:80-", "customer", 2) // customer2: 80- waitForRowCount(t, vtgateConn, "customer", "customer", 3) // customer: all shards waitForRowCount(t, vtgateConn, "customer2", "customer", 3) // customer2: all shards }) - currentCustomerCount = getCustomerCount(t, "after customer2.80-/2") - // This query uses an ID that should always get routed to shard 80- shard80DashRoutedQuery := "select name from customer where cid = 1 and noexistcol = 'foo'" // This query uses an ID that should always get routed to shard -80 @@ -382,14 +500,12 @@ func TestPartialMoveTablesWithSequences(t *testing.T) { _, err = vtgateConn.ExecuteFetch("use `customer`", 0, false) // switch vtgate default db back to customer require.NoError(t, err) - currentCustomerCount = getCustomerCount(t, "") // Switch all traffic for the shard wf80Dash.switchTraffic() expectedSwitchOutput := fmt.Sprintf("SwitchTraffic was successful for workflow %s.%s\n\nStart State: Reads Not Switched. Writes Not Switched\nCurrent State: Reads partially switched, for shards: %s. Writes partially switched, for shards: %s\n\n", targetKs, wfName, shard, shard) require.Equal(t, expectedSwitchOutput, lastOutput) - currentCustomerCount = getCustomerCount(t, "") // Confirm global routing rules -- everything should still be routed // to the source side, customer, globally. @@ -431,7 +547,6 @@ func TestPartialMoveTablesWithSequences(t *testing.T) { _, err = vtgateConn.ExecuteFetch("use `customer`", 0, false) // switch vtgate default db back to customer require.NoError(t, err) }) - currentCustomerCount = getCustomerCount(t, "") // Now move the other shard: -80 t.Run("Move shard -80 and validate routing rules", func(t *testing.T) { diff --git a/go/vt/vtctl/workflow/sequences.go b/go/vt/vtctl/workflow/sequences.go index 2ef83013e94..b2f6e8e70e0 100644 --- a/go/vt/vtctl/workflow/sequences.go +++ b/go/vt/vtctl/workflow/sequences.go @@ -290,6 +290,13 @@ func (ts *trafficSwitcher) updateSequenceValue(ctx context.Context, seq *sequenc MaxRows: 1, }) if err == nil { + // It is important to reset in-memory sequence counters on the table, + // since it is possible for it to be outdated, this will prevent duplicate + // key errors. + err := ts.TabletManagerClient().ResetSequences(ctx, sequenceTablet.Tablet, []string{seq.backingTableName}) + if err != nil { + return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "failed to reset sequences on %q: %v", seq.backingTableKeyspace, err) + } return nil } diff --git a/test/config.json b/test/config.json index 0bd874af373..a7135c4c13f 100644 --- a/test/config.json +++ b/test/config.json @@ -1209,6 +1209,15 @@ "RetryMax": 1, "Tags": [] }, + "vreplication_sequence_reset_on_switch_traffic": { + "File": "unused.go", + "Args": ["vitess.io/vitess/go/test/endtoend/vreplication", "-run", "TestSequenceResetOnSwitchTraffic"], + "Command": [], + "Manual": false, + "Shard": "vreplication_partial_movetables_and_materialize", + "RetryMax": 1, + "Tags": [] + }, "vstream_flush_binlog": { "File": "unused.go", "Args": ["vitess.io/vitess/go/test/endtoend/vreplication", "-run", "TestVStreamFlushBinlog"],