Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 47 additions & 14 deletions go/test/endtoend/vtgate/transaction/single/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,16 @@ import (
"context"
_ "embed"
"flag"
"fmt"
"os"
"testing"

"github.com/stretchr/testify/require"

"vitess.io/vitess/go/test/endtoend/utils"

"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/test/endtoend/cluster"
"vitess.io/vitess/go/test/endtoend/utils"
"vitess.io/vitess/go/vt/vtgate/planbuilder"
)

var (
Expand Down Expand Up @@ -70,6 +71,7 @@ func TestMain(m *testing.M) {
}

// Start vtgate
clusterInstance.VtGatePlannerVersion = planbuilder.Gen4
clusterInstance.VtGateExtraArgs = []string{"--transaction_mode", "SINGLE"}
err = clusterInstance.StartVtgate()
if err != nil {
Expand Down Expand Up @@ -168,12 +170,8 @@ func TestLookupDangleRowLaterMultiDB(t *testing.T) {
}

func TestLookupDangleRowRecordInSameShard(t *testing.T) {
conn, err := mysql.Connect(context.Background(), &vtParams)
require.NoError(t, err)
defer conn.Close()
defer func() {
utils.Exec(t, conn, `delete from txn_unique_constraints where txn_id = 'txn1'`)
}()
conn, cleanup := setup(t)
defer cleanup()

// insert a dangling row in lookup table
utils.Exec(t, conn, `INSERT INTO uniqueConstraint_vdx(unique_constraint, keyspace_id) VALUES ('foo', 'J\xda\xf0p\x0e\xcc(\x8fਁ\xa7P\x86\xa5=')`)
Expand All @@ -190,12 +188,8 @@ func TestLookupDangleRowRecordInSameShard(t *testing.T) {
}

func TestMultiDbSecondRecordLookupDangle(t *testing.T) {
conn, err := mysql.Connect(context.Background(), &vtParams)
require.NoError(t, err)
defer conn.Close()
defer func() {
utils.Exec(t, conn, `delete from uniqueConstraint_vdx where unique_constraint = 'bar'`)
}()
conn, cleanup := setup(t)
defer cleanup()

// insert a dangling row in lookup table
utils.Exec(t, conn, `INSERT INTO uniqueConstraint_vdx(unique_constraint, keyspace_id) VALUES ('bar', '\x86\xc8\xc5\x1ac\xfb\x8c+6\xe4\x1f\x03\xd8ϝB')`)
Expand All @@ -220,3 +214,42 @@ func TestMultiDbSecondRecordLookupDangle(t *testing.T) {
// no row should exist.
utils.AssertMatches(t, conn, `select txn_id from txn_unique_constraints`, `[]`)
}

// TestNoRecordInTableNotFail test that vindex lookup query creates a transaction on one shard say x.
// To fetch the fields for the actual table, the Select Impossible query should also be reouted to x.
// If it routes to other shard then the test will fail with multi-shard transaction attempted error.
// The fix ensures it does not happen.
func TestNoRecordInTableNotFail(t *testing.T) {
conn, cleanup := setup(t)
defer cleanup()

utils.AssertMatches(t, conn, `select @@transaction_mode`, `[[VARCHAR("SINGLE")]]`)
// Need to run this test multiple times as shards are picked randomly for Impossible query.
// After the fix it is not random if a shard session already exists then it reuses that same shard session.
for i := 0; i < 100; i++ {
utils.Exec(t, conn, `begin`)
utils.Exec(t, conn, `INSERT INTO t1(id, txn_id) VALUES (1, "t1")`)
utils.Exec(t, conn, `SELECT * FROM t2 WHERE id = 1`)
utils.Exec(t, conn, `rollback`)
}
}

func setup(t *testing.T) (*mysql.Conn, func()) {
t.Helper()
conn, err := mysql.Connect(context.Background(), &vtParams)
require.NoError(t, err)

tables := []string{
"txn_unique_constraints", "uniqueConstraint_vdx",
"t1", "t1_id_vdx", "t2", "t2_id_vdx",
}
cleanup := func() {
utils.Exec(t, conn, "set transaction_mode=multi")
for _, table := range tables {
utils.Exec(t, conn, fmt.Sprintf("delete from %s /* cleanup */", table))
}
utils.Exec(t, conn, "set transaction_mode=single")
}
cleanup()
return conn, cleanup
}
26 changes: 25 additions & 1 deletion go/test/endtoend/vtgate/transaction/single/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,28 @@ CREATE TABLE uniqueConstraint_vdx(
`unique_constraint` VARCHAR(50) NOT NULL,
`keyspace_id` VARBINARY(50) NOT NULL,
PRIMARY KEY(unique_constraint)
) ENGINE=InnoDB;
) ENGINE=InnoDB;

CREATE TABLE `t1` (
`id` bigint(20) NOT NULL,
`txn_id` varchar(50) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

CREATE TABLE `t1_id_vdx` (
`id` bigint(20) NOT NULL,
`keyspace_id` varbinary(50) NOT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

CREATE TABLE `t2` (
`id` bigint(20) NOT NULL,
`txn_id` varchar(50) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

CREATE TABLE `t2_id_vdx` (
`id` bigint(20) NOT NULL,
`keyspace_id` varbinary(50) NOT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
63 changes: 63 additions & 0 deletions go/test/endtoend/vtgate/transaction/single/vschema.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,29 @@
"autocommit": "true"
},
"owner": "txn_unique_constraints"
},
"hash_vdx": {
"type": "hash"
},
"t1_id_vdx": {
"type": "consistent_lookup_unique",
"params": {
"autocommit": "true",
"from": "id",
"table": "t1_id_vdx",
"to": "keyspace_id"
},
"owner": "t1"
},
"t2_id_vdx": {
"type": "consistent_lookup_unique",
"params": {
"autocommit": "true",
"from": "id",
"table": "t2_id_vdx",
"to": "keyspace_id"
},
"owner": "t2"
}
},
"tables": {
Expand All @@ -35,6 +58,46 @@
"name": "unicode_loose_md5_vdx"
}
]
},
"t1": {
"columnVindexes": [
{
"column": "txn_id",
"name": "unicode_loose_md5_vdx"
},
{
"column": "id",
"name": "t1_id_vdx"
}
]
},
"t2": {
"columnVindexes": [
{
"column": "txn_id",
"name": "unicode_loose_md5_vdx"
},
{
"column": "id",
"name": "t2_id_vdx"
}
]
},
"t1_id_vdx": {
"columnVindexes": [
{
"column": "id",
"name": "hash_vdx"
}
]
},
"t2_id_vdx": {
"columnVindexes": [
{
"column": "id",
"name": "hash_vdx"
}
]
}
}
}
29 changes: 22 additions & 7 deletions go/vt/vtgate/engine/route.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,15 +389,30 @@ func (route *Route) mergeSort(

// GetFields fetches the field info.
func (route *Route) GetFields(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable) (*sqltypes.Result, error) {
rss, _, err := vcursor.ResolveDestinations(ctx, route.Keyspace.Name, nil, []key.Destination{key.DestinationAnyShard{}})
if err != nil {
return nil, err
var rs *srvtopo.ResolvedShard

// Use an existing shard session
sss := vcursor.Session().ShardSession()
for _, ss := range sss {
if ss.Target.Keyspace == route.Keyspace.Name {
rs = ss
break
}
}
if len(rss) != 1 {
// This code is unreachable. It's just a sanity check.
return nil, fmt.Errorf("no shards for keyspace: %s", route.Keyspace.Name)

// If not find, then pick any shard.
if rs == nil {
rss, _, err := vcursor.ResolveDestinations(ctx, route.Keyspace.Name, nil, []key.Destination{key.DestinationAnyShard{}})
if err != nil {
return nil, err
}
if len(rss) != 1 {
// This code is unreachable. It's just a sanity check.
return nil, fmt.Errorf("no shards for keyspace: %s", route.Keyspace.Name)
}
rs = rss[0]
}
qr, err := execShard(ctx, route, vcursor, route.FieldQuery, bindVars, rss[0], false /* rollbackOnError */, false /* canAutocommit */)
qr, err := execShard(ctx, route, vcursor, route.FieldQuery, bindVars, rs, false /* rollbackOnError */, false /* canAutocommit */)
if err != nil {
return nil, err
}
Expand Down