Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
223 changes: 169 additions & 54 deletions go/test/endtoend/vreplication/partial_movetables_seq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"strings"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/tidwall/gjson"

Expand All @@ -38,6 +39,50 @@ import (
As part of a separate cleanup we will build on this framework to replace the existing one.
*/

var (
seqVSchema = `{
"sharded": false,
"tables": {
"customer_seq": {
"type": "sequence"
}
}
}`
seqSchema = `create table customer_seq(id int, next_id bigint, cache bigint, primary key(id)) comment 'vitess_sequence';`
commerceSchema = `create table customer(cid int, name varchar(128), ts timestamp(3) not null default current_timestamp(3), primary key(cid));`
commerceVSchema = `
{
"tables": {
"customer": {}
}
}
`
customerSequenceVSchema = `
{
"sharded": true,
"vindexes": {
"reverse_bits": {
"type": "reverse_bits"
}
},
"tables": {
"customer": {
"column_vindexes": [
{
"column": "cid",
"name": "reverse_bits"
}
],
"auto_increment": {
"column": "cid",
"sequence": "customer_seq"
}
}
}
}
`
)

type keyspace struct {
name string
vschema string
Expand Down Expand Up @@ -74,50 +119,6 @@ type vrepTestCase struct {
}

func initPartialMoveTablesComplexTestCase(t *testing.T) *vrepTestCase {
const (
seqVSchema = `{
"sharded": false,
"tables": {
"customer_seq": {
"type": "sequence"
}
}
}`
seqSchema = `create table customer_seq(id int, next_id bigint, cache bigint, primary key(id)) comment 'vitess_sequence';`
commerceSchema = `create table customer(cid int, name varchar(128), ts timestamp(3) not null default current_timestamp(3), primary key(cid));`
commerceVSchema = `
{
"tables": {
"customer": {}
}
}
`
customerSchema = ""
customerVSchema = `
{
"sharded": true,
"vindexes": {
"reverse_bits": {
"type": "reverse_bits"
}
},
"tables": {
"customer": {
"column_vindexes": [
{
"column": "cid",
"name": "reverse_bits"
}
],
"auto_increment": {
"column": "cid",
"sequence": "customer_seq"
}
}
}
}
`
)
tc := &vrepTestCase{
t: t,
testName: t.Name(),
Expand All @@ -134,14 +135,14 @@ func initPartialMoveTablesComplexTestCase(t *testing.T) *vrepTestCase {
}
tc.keyspaces["customer"] = &keyspace{
name: "customer",
vschema: customerVSchema,
schema: customerSchema,
vschema: customerSequenceVSchema,
schema: "",
baseID: 200,
shards: []string{"-80", "80-"},
}
tc.keyspaces["customer2"] = &keyspace{
name: "customer2",
vschema: customerVSchema,
vschema: customerSequenceVSchema,
schema: "",
baseID: 1200,
shards: []string{"-80", "80-"},
Expand All @@ -165,6 +166,40 @@ func initPartialMoveTablesComplexTestCase(t *testing.T) *vrepTestCase {
return tc
}

func initSequenceResetTestCase(t *testing.T) *vrepTestCase {
tc := &vrepTestCase{
t: t,
testName: t.Name(),
keyspaces: make(map[string]*keyspace),
defaultCellName: "zone1",
workflows: make(map[string]*workflow),
}
tc.keyspaces["commerce"] = &keyspace{
name: "commerce",
vschema: commerceVSchema,
schema: commerceSchema,
baseID: 100,
shards: []string{"0"},
}
tc.keyspaces["customer"] = &keyspace{
name: "customer",
vschema: customerSequenceVSchema,
schema: "",
baseID: 200,
shards: []string{"-80", "80-"},
}
tc.keyspaces["seqSrc"] = &keyspace{
name: "seqSrc",
vschema: seqVSchema,
schema: seqSchema,
baseID: 400,
shards: []string{"0"},
}
tc.setupCluster()
tc.initData()
return tc
}

func (tc *vrepTestCase) teardown() {
tc.vtgateConn.Close()
vc.TearDown()
Expand Down Expand Up @@ -268,6 +303,93 @@ func (wf *workflow) complete() {
require.NoError(wf.tc.t, tstWorkflowExec(wf.tc.t, wf.tc.defaultCellName, wf.name, wf.fromKeyspace, wf.toKeyspace, "", workflowActionComplete, "", "", "", defaultWorkflowExecOptions))
}

// TestSequenceResetOnSwitchTraffic tests that in-memory sequence info is
// reset when switching traffic back and forth between keyspaces during
// MoveTables workflow. This catches a bug where cached sequence values would
// persist after traffic switches, causing sequence generation to produce
// duplicate values in target keyspace.
func TestSequenceResetOnSwitchTraffic(t *testing.T) {
origExtraVTGateArgs := extraVTGateArgs
extraVTGateArgs = append(extraVTGateArgs, []string{
"--enable-partial-keyspace-migration",
"--schema_change_signal=false",
}...)
defer func() {
extraVTGateArgs = origExtraVTGateArgs
}()

tc := initSequenceResetTestCase(t)
defer tc.teardown()

currentCustomerCount = getCustomerCount(t, "")
newCustomerCount = 4
t.Run("Verify sequence reset during traffic switching", func(t *testing.T) {
tc.setupKeyspaces([]string{"customer"})
wf := tc.newWorkflow("MoveTables", "customer", "commerce", "customer", &workflowOptions{
tables: []string{"customer"},
})
wf.create()

vtgateConn, closeConn := getVTGateConn()
defer closeConn()

getSequenceNextID := func() int64 {
qr := execVtgateQuery(t, vtgateConn, "", "SELECT next_id FROM seqSrc.customer_seq WHERE id = 0")
nextID, _ := qr.Rows[0][0].ToInt64()
return nextID
}

initialSeqValue := getSequenceNextID()
t.Logf("Initial sequence next_id: %d", initialSeqValue)

wf.switchTraffic()

insertCustomers(t)

afterFirstSwitchSeqValue := getSequenceNextID()
t.Logf("After first switch sequence next_id: %d", afterFirstSwitchSeqValue)
require.Greater(t, afterFirstSwitchSeqValue, initialSeqValue, "Sequence should increment after inserting customers")

wf.reverseTraffic()

afterReverseSeqValue := getSequenceNextID()
t.Logf("After reverse switch sequence next_id: %d", afterReverseSeqValue)

// Insert some random values when all writes are reversed back to
// source keyspace. We are inserting here rows with IDs 1004, 1005,
// 1006 (since the cache value was 1000) which would be the next
// in-memory sequence IDs for inserting any new rows in `customer`
// table if the sequence info isn't reset. This will result in
// duplicate primary key value error in the next insert.
//
// Hence, this way we verify that even if there are any new
// values inserted after the traffic has been reversed, the in-memory
// sequence info is reset, so that on switching back the traffic to
// the target keyspace the tablet refetches the next_id from the
// sequence table for generating the next insert ID.
_, err := tc.vtgateConn.ExecuteFetch("insert into customer(cid, name) values(1004, 'customer8'), (1005, 'customer9'),(1006, 'customer10')", 1000, false)
require.NoError(t, err)
_, err = tc.vtgateConn.ExecuteFetch("insert into customer(cid, name) values(2004, 'customer11'), (2005, 'customer12'),(2006, 'customer13')", 1000, false)
require.NoError(t, err)

wf.switchTraffic()

afterSecondSwitchSeqValue := getSequenceNextID()
// Since the highest ID before switching traffic was 2026, which is
// greater than 2000 (the expected next_id from sequence table before switch.)
assert.Equal(t, int64(2007), afterSecondSwitchSeqValue)

currentCustomerCount = getCustomerCount(t, "after second switch")
newCustomerCount = 4
insertCustomers(t)

finalSeqValue := getSequenceNextID()
assert.Equal(t, int64(3007), finalSeqValue, "Since the cache is set to 1000, next_id is expected to be incremented to 3007")

wf.complete()
})
}

// TestPartialMoveTablesWithSequences enhances TestPartialMoveTables by adding an unsharded keyspace which has a
// sequence. This tests that the sequence is migrated correctly and that we can reverse traffic back to the source
func TestPartialMoveTablesWithSequences(t *testing.T) {
Expand Down Expand Up @@ -324,7 +446,6 @@ func TestPartialMoveTablesWithSequences(t *testing.T) {
targetKs := "customer2"
shard := "80-"
var wf80Dash, wfDash80 *workflow
currentCustomerCount = getCustomerCount(t, "before customer2.80-")
vtgateConn, closeConn := getVTGateConn()
t.Run("Start MoveTables on customer2.80-", func(t *testing.T) {
// Now setup the customer2 keyspace so we can do a partial move tables for one of the two shards: 80-.
Expand All @@ -336,14 +457,11 @@ func TestPartialMoveTablesWithSequences(t *testing.T) {
})
wf80Dash.create()

currentCustomerCount = getCustomerCount(t, "after customer2.80-")
waitForRowCount(t, vtgateConn, "customer2:80-", "customer", 2) // customer2: 80-
waitForRowCount(t, vtgateConn, "customer", "customer", 3) // customer: all shards
waitForRowCount(t, vtgateConn, "customer2", "customer", 3) // customer2: all shards
})

currentCustomerCount = getCustomerCount(t, "after customer2.80-/2")

// This query uses an ID that should always get routed to shard 80-
shard80DashRoutedQuery := "select name from customer where cid = 1 and noexistcol = 'foo'"
// This query uses an ID that should always get routed to shard -80
Expand Down Expand Up @@ -382,14 +500,12 @@ func TestPartialMoveTablesWithSequences(t *testing.T) {

_, err = vtgateConn.ExecuteFetch("use `customer`", 0, false) // switch vtgate default db back to customer
require.NoError(t, err)
currentCustomerCount = getCustomerCount(t, "")

// Switch all traffic for the shard
wf80Dash.switchTraffic()
expectedSwitchOutput := fmt.Sprintf("SwitchTraffic was successful for workflow %s.%s\n\nStart State: Reads Not Switched. Writes Not Switched\nCurrent State: Reads partially switched, for shards: %s. Writes partially switched, for shards: %s\n\n",
targetKs, wfName, shard, shard)
require.Equal(t, expectedSwitchOutput, lastOutput)
currentCustomerCount = getCustomerCount(t, "")

// Confirm global routing rules -- everything should still be routed
// to the source side, customer, globally.
Expand Down Expand Up @@ -431,7 +547,6 @@ func TestPartialMoveTablesWithSequences(t *testing.T) {
_, err = vtgateConn.ExecuteFetch("use `customer`", 0, false) // switch vtgate default db back to customer
require.NoError(t, err)
})
currentCustomerCount = getCustomerCount(t, "")

// Now move the other shard: -80
t.Run("Move shard -80 and validate routing rules", func(t *testing.T) {
Expand Down
7 changes: 7 additions & 0 deletions go/vt/vtctl/workflow/sequences.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,13 @@ func (ts *trafficSwitcher) updateSequenceValue(ctx context.Context, seq *sequenc
MaxRows: 1,
})
if err == nil {
// It is important to reset in-memory sequence counters on the table,
// since it is possible for it to be outdated, this will prevent duplicate
// key errors.
err := ts.TabletManagerClient().ResetSequences(ctx, sequenceTablet.Tablet, []string{seq.backingTableName})
if err != nil {
return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "failed to reset sequences on %q: %v", seq.backingTableKeyspace, err)
}
return nil
}

Expand Down
9 changes: 9 additions & 0 deletions test/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -1209,6 +1209,15 @@
"RetryMax": 1,
"Tags": []
},
"vreplication_sequence_reset_on_switch_traffic": {
"File": "unused.go",
"Args": ["vitess.io/vitess/go/test/endtoend/vreplication", "-run", "TestSequenceResetOnSwitchTraffic"],
"Command": [],
"Manual": false,
"Shard": "vreplication_partial_movetables_and_materialize",
"RetryMax": 1,
"Tags": []
},
"vstream_flush_binlog": {
"File": "unused.go",
"Args": ["vitess.io/vitess/go/test/endtoend/vreplication", "-run", "TestVStreamFlushBinlog"],
Expand Down
Loading