Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 41 additions & 1 deletion go/vt/vtgate/endtoend/lookup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ func TestConsistentLookupMultiInsert(t *testing.T) {
exec(t, conn, "delete from t1_id2_idx where id2=4")
}

func TestHashLookupMultiInsertIgnore(t *testing.T) {
func TestLookupMultiInsertIgnore(t *testing.T) {
ctx := context.Background()
conn, err := mysql.Connect(ctx, &vtParams)
if err != nil {
Expand Down Expand Up @@ -260,6 +260,46 @@ func TestHashLookupMultiInsertIgnore(t *testing.T) {
}
}

func TestConsistentLookupMultiInsertIgnore(t *testing.T) {
ctx := context.Background()
conn, err := mysql.Connect(ctx, &vtParams)
if err != nil {
t.Fatal(err)
}
defer conn.Close()
// conn2 is for queries that target shards.
conn2, err := mysql.Connect(ctx, &vtParams)
if err != nil {
t.Fatal(err)
}
defer conn2.Close()

// DB should start out clean
qr := exec(t, conn, "select count(*) from t1_id2_idx")
if got, want := fmt.Sprintf("%v", qr.Rows), "[[INT64(0)]]"; got != want {
t.Errorf("select:\n%v want\n%v", got, want)
}
qr = exec(t, conn, "select count(*) from t1")
if got, want := fmt.Sprintf("%v", qr.Rows), "[[INT64(0)]]"; got != want {
t.Errorf("select:\n%v want\n%v", got, want)
}

// Try inserting a bunch of ids at once
exec(t, conn, "begin")
exec(t, conn, "insert ignore into t1(id1, id2) values(50,60), (30,40), (10,20)")
exec(t, conn, "commit")

// Verify
qr = exec(t, conn, "select id1, id2 from t1 order by id1")
if got, want := fmt.Sprintf("%v", qr.Rows), "[[INT64(10) INT64(20)] [INT64(30) INT64(40)] [INT64(50) INT64(60)]]"; got != want {
t.Errorf("select:\n%v want\n%v", got, want)
}
qr = exec(t, conn, "select id2 from t1_id2_idx order by id2")
if got, want := fmt.Sprintf("%v", qr.Rows), "[[INT64(20)] [INT64(40)] [INT64(60)]]"; got != want {
t.Errorf("select:\n%v want\n%v", got, want)
}
}

func exec(t *testing.T, conn *mysql.Conn, query string) *sqltypes.Result {
t.Helper()
qr, err := conn.ExecuteFetch(query, 1000, true)
Expand Down
3 changes: 2 additions & 1 deletion go/vt/vtgate/vindexes/consistent_lookup.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/key"
querypb "vitess.io/vitess/go/vt/proto/query"
"vitess.io/vitess/go/vt/proto/vtgate"
vtgatepb "vitess.io/vitess/go/vt/proto/vtgate"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/sqlparser"
Expand Down Expand Up @@ -205,7 +206,7 @@ func (lu *clCommon) IsFunctional() bool {

// Verify returns true if ids maps to ksids.
func (lu *clCommon) Verify(vcursor VCursor, ids []sqltypes.Value, ksids [][]byte) ([]bool, error) {
return lu.lkp.Verify(vcursor, ids, ksidsToValues(ksids))
return lu.lkp.VerifyCustom(vcursor, ids, ksidsToValues(ksids), vtgate.CommitOrder_PRE)
}

// Create reserves the id by inserting it into the vindex table.
Expand Down
4 changes: 2 additions & 2 deletions go/vt/vtgate/vindexes/consistent_lookup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,8 +178,8 @@ func TestConsistentLookupVerify(t *testing.T) {
t.Error(err)
}
vc.verifyLog(t, []string{
"Execute select fromc1 from t where fromc1 = :fromc1 and toc = :toc [{fromc1 1} {toc test1}] false",
"Execute select fromc1 from t where fromc1 = :fromc1 and toc = :toc [{fromc1 2} {toc test2}] false",
"ExecutePre select fromc1 from t where fromc1 = :fromc1 and toc = :toc [{fromc1 1} {toc test1}] false",
"ExecutePre select fromc1 from t where fromc1 = :fromc1 and toc = :toc [{fromc1 2} {toc test2}] false",
})

// Test query fail.
Expand Down
16 changes: 9 additions & 7 deletions go/vt/vtgate/vindexes/lookup_internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,19 +84,21 @@ func (lkp *lookupInternal) Lookup(vcursor VCursor, ids []sqltypes.Value) ([]*sql

// Verify returns true if ids map to values.
func (lkp *lookupInternal) Verify(vcursor VCursor, ids, values []sqltypes.Value) ([]bool, error) {
co := vtgatepb.CommitOrder_NORMAL
if lkp.Autocommit {
co = vtgatepb.CommitOrder_AUTOCOMMIT
}
return lkp.VerifyCustom(vcursor, ids, values, co)
}

func (lkp *lookupInternal) VerifyCustom(vcursor VCursor, ids, values []sqltypes.Value, co vtgatepb.CommitOrder) ([]bool, error) {
out := make([]bool, len(ids))
for i, id := range ids {
bindVars := map[string]*querypb.BindVariable{
lkp.FromColumns[0]: sqltypes.ValueBindVariable(id),
lkp.To: sqltypes.ValueBindVariable(values[i]),
}
var err error
var result *sqltypes.Result
co := vtgatepb.CommitOrder_NORMAL
if lkp.Autocommit {
co = vtgatepb.CommitOrder_AUTOCOMMIT
}
result, err = vcursor.Execute("VindexVerify", lkp.ver, bindVars, false /* isDML */, co)
result, err := vcursor.Execute("VindexVerify", lkp.ver, bindVars, false /* isDML */, co)
if err != nil {
return nil, fmt.Errorf("lookup.Verify: %v", err)
}
Expand Down