Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions go/vt/binlog/keyspace_id_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,8 +178,8 @@ func (r *keyspaceIDResolverFactoryV3) keyspaceID(v sqltypes.Value) ([]byte, erro
if len(ksids) != 1 {
return nil, fmt.Errorf("mapping row to keyspace id returned an invalid array of keyspace ids: %v", ksids)
}
if ksids[0] == nil {
if ksids[0].Range != nil || ksids[0].ID == nil {
return nil, fmt.Errorf("could not map %v to a keyspace id", v)
}
return ksids[0], nil
return ksids[0].ID, nil
}
38 changes: 30 additions & 8 deletions go/vt/vtgate/engine/route.go
Original file line number Diff line number Diff line change
Expand Up @@ -612,15 +612,28 @@ func (route *Route) resolveShards(vcursor VCursor, bindVars map[string]*querypb.
if err != nil {
return "", nil, err
}
var shards []string
for i, ksid := range ksids {
if ksid == nil {
continue
switch {
case ksid.Range != nil:
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should add a comment here explaining in which cases it will be a range vs an ID (i.e when using this feature). Otherwise, in this context it's going to be hard to understand why it could be one way or the other.

// Even for a unique vindex, a KeyRange can be returned if a keypace
// id cannot be identified. For example, this can happen during backfill.
// In such cases, we scatter over the KeyRange.
// Use the multi-keyspace id API to convert a keyrange to shards.
shards, err = vcursor.GetShardsForKsids(allShards, vindexes.Ksids{Range: ksid.Range})
if err != nil {
return "", nil, err
}
case ksid.ID != nil:
shard, err := vcursor.GetShardForKeyspaceID(allShards, ksid.ID)
if err != nil {
return "", nil, err
}
shards = []string{shard}
}
shard, err := vcursor.GetShardForKeyspaceID(allShards, ksid)
if err != nil {
return "", nil, err
for _, shard := range shards {
routing.Add(shard, sqltypes.ValueToProto(vindexKeys[i]))
}
routing.Add(shard, sqltypes.ValueToProto(vindexKeys[i]))
}
case vindexes.NonUnique:
ksidss, err := mapper.Map(vcursor, vindexKeys)
Expand Down Expand Up @@ -652,7 +665,10 @@ func (route *Route) resolveSingleShard(vcursor VCursor, bindVars map[string]*que
if err != nil {
return "", "", nil, err
}
ksid = ksids[0]
if err := ksids[0].ValidateUnique(); err != nil {
return "", "", nil, err
}
ksid = ksids[0].ID
if ksid == nil {
return "", "", ksid, nil
}
Expand Down Expand Up @@ -875,10 +891,16 @@ func (route *Route) processPrimary(vcursor VCursor, vindexKeys [][]sqltypes.Valu
flattenedVidexKeys = append(flattenedVidexKeys, internalVal)
}
}
keyspaceIDs, err = mapper.Map(vcursor, flattenedVidexKeys)
ksids, err := mapper.Map(vcursor, flattenedVidexKeys)
if err != nil {
return nil, err
}
for _, ksid := range ksids {
if err := ksid.ValidateUnique(); err != nil {
return nil, err
}
keyspaceIDs = append(keyspaceIDs, ksid.ID)
}

for rowNum, vindexKey := range flattenedVidexKeys {
if keyspaceIDs[rowNum] == nil {
Expand Down
8 changes: 6 additions & 2 deletions go/vt/vtgate/engine/vindex_func.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,9 +120,13 @@ func (vf *VindexFunc) mapVindex(vcursor VCursor, bindVars, joinVars map[string]*
if err != nil {
return nil, err
}
if ksids[0] != nil {
switch {
case ksids[0].Range != nil:
result.Rows = append(result.Rows, vf.buildRow(vkey, nil, ksids[0].Range))
result.RowsAffected = 1
case ksids[0].ID != nil:
result.Rows = [][]sqltypes.Value{
vf.buildRow(vkey, ksids[0], nil),
vf.buildRow(vkey, ksids[0].ID, nil),
}
result.RowsAffected = 1
}
Expand Down
47 changes: 38 additions & 9 deletions go/vt/vtgate/engine/vindex_func_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,26 +21,35 @@ import (
"testing"

"github.com/youtube/vitess/go/sqltypes"
topodatapb "github.com/youtube/vitess/go/vt/proto/topodata"
"github.com/youtube/vitess/go/vt/vtgate/vindexes"

topodatapb "github.com/youtube/vitess/go/vt/proto/topodata"
)

// uvindex is Unique.
type uvindex struct{ match bool }
type uvindex struct{ matchid, matchkr bool }

func (*uvindex) String() string { return "uvindex" }
func (*uvindex) Cost() int { return 1 }
func (*uvindex) Verify(vindexes.VCursor, []sqltypes.Value, [][]byte) ([]bool, error) {
panic("unimplemented")
}

func (v *uvindex) Map(vindexes.VCursor, []sqltypes.Value) ([][]byte, error) {
if v.match {
return [][]byte{
[]byte("foo"),
func (v *uvindex) Map(vindexes.VCursor, []sqltypes.Value) ([]vindexes.KsidOrRange, error) {
if v.matchkr {
return []vindexes.KsidOrRange{{
Range: &topodatapb.KeyRange{
Start: []byte{0x40},
End: []byte{0x60},
},
}}, nil
}
if v.matchid {
return []vindexes.KsidOrRange{
{ID: []byte("foo")},
}, nil
}
return [][]byte{nil}, nil
return []vindexes.KsidOrRange{{}}, nil
}

// nvindex is NonUnique.
Expand Down Expand Up @@ -87,7 +96,7 @@ func TestVindexFuncMap(t *testing.T) {
}

// Unique Vindex returning 1 row.
vf = testVindexFunc(&uvindex{match: true})
vf = testVindexFunc(&uvindex{matchid: true})
got, err = vf.Execute(nil, nil, nil, false)
if err != nil {
t.Fatal(err)
Expand All @@ -100,6 +109,26 @@ func TestVindexFuncMap(t *testing.T) {
t.Errorf("Execute(Map, uvindex(none)):\n%v, want\n%v", got, want)
}

// Unique Vindex returning keyrange.
vf = testVindexFunc(&uvindex{matchkr: true})
got, err = vf.Execute(nil, nil, nil, false)
if err != nil {
t.Fatal(err)
}
want = &sqltypes.Result{
Fields: sqltypes.MakeTestFields("id|keyspace_id|range_start|range_end", "varbinary|varbinary|varbinary|varbinary"),
Rows: [][]sqltypes.Value{{
sqltypes.NewVarBinary("1"),
sqltypes.NULL,
sqltypes.MakeTrusted(sqltypes.VarBinary, []byte{0x40}),
sqltypes.MakeTrusted(sqltypes.VarBinary, []byte{0x60}),
}},
RowsAffected: 1,
}
if !reflect.DeepEqual(got, want) {
t.Errorf("Execute(Map, uvindex(none)):\n%v, want\n%v", got, want)
}

// NonUnique Vindex returning 0 rows.
vf = testVindexFunc(&nvindex{})
got, err = vf.Execute(nil, nil, nil, false)
Expand Down Expand Up @@ -179,7 +208,7 @@ func TestVindexFuncStreamExecute(t *testing.T) {
}

func TestVindexFuncGetFields(t *testing.T) {
vf := testVindexFunc(&uvindex{match: true})
vf := testVindexFunc(&uvindex{matchid: true})
got, err := vf.GetFields(nil, nil, nil)
if err != nil {
t.Fatal(err)
Expand Down
9 changes: 8 additions & 1 deletion go/vt/vtgate/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -902,7 +902,14 @@ func (e *Executor) MessageAck(ctx context.Context, keyspace, name string, ids []
if err != nil {
return 0, err
}
rss, rssValues, err = e.resolver.resolver.ResolveKeyspaceIdsValues(ctx, table.Keyspace.Name, ids, ksids, topodatapb.TabletType_MASTER)
keyspaceids := make([][]byte, len(ksids))
for i, ksid := range ksids {
if err := ksid.ValidateUnique(); err != nil {
return 0, err
}
keyspaceids[i] = ksid.ID
}
rss, rssValues, err = e.resolver.resolver.ResolveKeyspaceIdsValues(ctx, table.Keyspace.Name, ids, keyspaceids, topodatapb.TabletType_MASTER)
if err != nil {
return 0, err
}
Expand Down
33 changes: 33 additions & 0 deletions go/vt/vtgate/executor_dml_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,17 @@ func TestUpdateEqual(t *testing.T) {
}
}

func TestUpdateEqualKeyrange(t *testing.T) {
executor, _, _, _ := createExecutorEnv()

// If a unique vindex returns a keyrange, we fail the update
_, err := executorExec(executor, "update keyrange_table set a=2 where krcol_unique = 1", nil)
want := "execUpdateEqual: vindex could not map the value to a unique keyspace id"
if err == nil || err.Error() != want {
t.Errorf("executorExec error: %v, want %s", err, want)
}
}

func TestUpdateMultiOwned(t *testing.T) {
vschema := `
{
Expand Down Expand Up @@ -491,6 +502,17 @@ func TestDeleteEqual(t *testing.T) {
}
}

func TestDeleteEqualKeyrange(t *testing.T) {
executor, _, _, _ := createExecutorEnv()

// If a unique vindex returns a keyrange, we fail the delete
_, err := executorExec(executor, "delete from keyrange_table where krcol_unique = 1", nil)
want := "execDeleteEqual: vindex could not map the value to a unique keyspace id"
if err == nil || err.Error() != want {
t.Errorf("executorExec error: %v, want %s", err, want)
}
}

func TestDeleteSharded(t *testing.T) {
executor, sbc1, sbc2, _ := createExecutorEnv()
_, err := executorExec(executor, "delete from user_extra", nil)
Expand Down Expand Up @@ -667,6 +689,17 @@ func TestInsertSharded(t *testing.T) {
}
}

func TestInsertShardedKeyrange(t *testing.T) {
executor, _, _, _ := createExecutorEnv()

// If a unique vindex returns a keyrange, we fail the insert
_, err := executorExec(executor, "insert into keyrange_table(krcol_unique, krcol) values(1, 1)", nil)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had a question around this that @sougou responded in Slack. For future reference, adding here. It was not clear to me what would happen if you have an unique LookupIndex that is in write mode and then it enters in this code path.

In that case, it would mean that lookup was the primary vindex and it should fail.

want := "execInsertSharded: getInsertShardedRoute: vindex could not map the value to a unique keyspace id"
if err == nil || err.Error() != want {
t.Errorf("executorExec error: %v, want %s", err, want)
}
}

func TestInsertShardedAutocommitLookup(t *testing.T) {

vschema := `
Expand Down
33 changes: 29 additions & 4 deletions go/vt/vtgate/executor_framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,9 @@ var executorVSchema = `
"keyspace_id": {
"type": "numeric"
},
"krcol_unique_vdx": {
"type": "keyrange_lookuper_unique"
},
"krcol_vdx": {
"type": "keyrange_lookuper"
}
Expand Down Expand Up @@ -209,8 +212,8 @@ var executorVSchema = `
"keyrange_table": {
"column_vindexes": [
{
"column": "id",
"name": "hash_index"
"column": "krcol_unique",
"name": "krcol_unique_vdx"
},
{
"column": "krcol",
Expand Down Expand Up @@ -279,12 +282,34 @@ func (*keyRangeLookuper) Map(vindexes.VCursor, []sqltypes.Value) ([]vindexes.Ksi
}}, nil
}

func newLookupMigrator(name string, params map[string]string) (vindexes.Vindex, error) {
func newKeyRangeLookuper(name string, params map[string]string) (vindexes.Vindex, error) {
return &keyRangeLookuper{}, nil
}

// keyRangeLookuperUnique is for testing a unique lookup that returns a keyrange.
type keyRangeLookuperUnique struct {
}

func (v *keyRangeLookuperUnique) String() string { return "keyrange_lookuper" }
func (*keyRangeLookuperUnique) Cost() int { return 0 }
func (*keyRangeLookuperUnique) Verify(vindexes.VCursor, []sqltypes.Value, [][]byte) ([]bool, error) {
return []bool{}, nil
}
func (*keyRangeLookuperUnique) Map(vindexes.VCursor, []sqltypes.Value) ([]vindexes.KsidOrRange, error) {
return []vindexes.KsidOrRange{{
Range: &topodatapb.KeyRange{
End: []byte{0x10},
},
}}, nil
}

func newKeyRangeLookuperUnique(name string, params map[string]string) (vindexes.Vindex, error) {
return &keyRangeLookuperUnique{}, nil
}

func init() {
vindexes.Register("keyrange_lookuper", newLookupMigrator)
vindexes.Register("keyrange_lookuper", newKeyRangeLookuper)
vindexes.Register("keyrange_lookuper_unique", newKeyRangeLookuperUnique)
}

const testBufferSize = 10
Expand Down
24 changes: 22 additions & 2 deletions go/vt/vtgate/executor_select_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -586,12 +586,32 @@ func TestStreamSelectEqual(t *testing.T) {
func TestSelectKeyRange(t *testing.T) {
executor, sbc1, sbc2, _ := createExecutorEnv()

_, err := executorExec(executor, "select id, krcol from keyrange_table where krcol = 1", nil)
_, err := executorExec(executor, "select krcol_unique, krcol from keyrange_table where krcol = 1", nil)
if err != nil {
t.Error(err)
}
wantQueries := []*querypb.BoundQuery{{
Sql: "select id, krcol from keyrange_table where krcol = 1",
Sql: "select krcol_unique, krcol from keyrange_table where krcol = 1",
BindVariables: map[string]*querypb.BindVariable{},
}}
if !reflect.DeepEqual(sbc1.Queries, wantQueries) {
t.Errorf("sbc1.Queries: %+v, want %+v\n", sbc1.Queries, wantQueries)
}
if sbc2.Queries != nil {
t.Errorf("sbc2.Queries: %+v, want nil\n", sbc2.Queries)
}
sbc1.Queries = nil
}

func TestSelectKeyRangeUnique(t *testing.T) {
executor, sbc1, sbc2, _ := createExecutorEnv()

_, err := executorExec(executor, "select krcol_unique, krcol from keyrange_table where krcol_unique = 1", nil)
if err != nil {
t.Error(err)
}
wantQueries := []*querypb.BoundQuery{{
Sql: "select krcol_unique, krcol from keyrange_table where krcol_unique = 1",
BindVariables: map[string]*querypb.BindVariable{},
}}
if !reflect.DeepEqual(sbc1.Queries, wantQueries) {
Expand Down
3 changes: 2 additions & 1 deletion go/vt/vtgate/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -580,12 +580,13 @@ func TestExecutorShow(t *testing.T) {
buildVarCharRow("TestExecutor", "idx_noauto", "hash", "", "noauto_table"),
buildVarCharRow("TestExecutor", "insert_ignore_idx", "lookup_hash", "from=fromcol; table=ins_lookup; to=tocol", "insert_ignore_test"),
buildVarCharRow("TestExecutor", "keyspace_id", "numeric", "", ""),
buildVarCharRow("TestExecutor", "krcol_unique_vdx", "keyrange_lookuper_unique", "", ""),
buildVarCharRow("TestExecutor", "krcol_vdx", "keyrange_lookuper", "", ""),
buildVarCharRow("TestExecutor", "music_user_map", "lookup_hash_unique", "from=music_id; table=music_user_map; to=user_id", "music"),
buildVarCharRow("TestExecutor", "name_lastname_keyspace_id_map", "lookup", "from=name,lastname; table=name_lastname_keyspace_id_map; to=keyspace_id", "user2"),
buildVarCharRow("TestExecutor", "name_user_map", "lookup_hash", "from=name; table=name_user_map; to=user_id", "user"),
},
RowsAffected: 9,
RowsAffected: 10,
}
if !reflect.DeepEqual(qr, wantqr) {
t.Errorf("show vindexes:\n%+v, want\n%+v", qr, wantqr)
Expand Down
8 changes: 6 additions & 2 deletions go/vt/vtgate/planbuilder/plan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@ func (*hashIndex) Cost() int { return 1 }
func (*hashIndex) Verify(vindexes.VCursor, []sqltypes.Value, [][]byte) ([]bool, error) {
return []bool{}, nil
}
func (*hashIndex) Map(vindexes.VCursor, []sqltypes.Value) ([][]byte, error) { return nil, nil }
func (*hashIndex) Map(vindexes.VCursor, []sqltypes.Value) ([]vindexes.KsidOrRange, error) {
return nil, nil
}

func newHashIndex(name string, _ map[string]string) (vindexes.Vindex, error) {
return &hashIndex{name: name}, nil
Expand All @@ -58,7 +60,9 @@ func (*lookupIndex) Cost() int { return 2 }
func (*lookupIndex) Verify(vindexes.VCursor, []sqltypes.Value, [][]byte) ([]bool, error) {
return []bool{}, nil
}
func (*lookupIndex) Map(vindexes.VCursor, []sqltypes.Value) ([][]byte, error) { return nil, nil }
func (*lookupIndex) Map(vindexes.VCursor, []sqltypes.Value) ([]vindexes.KsidOrRange, error) {
return nil, nil
}
func (*lookupIndex) Create(vindexes.VCursor, [][]sqltypes.Value, [][]byte, bool) error { return nil }
func (*lookupIndex) Delete(vindexes.VCursor, [][]sqltypes.Value, []byte) error { return nil }
func (*lookupIndex) Update(vindexes.VCursor, []sqltypes.Value, []byte, []sqltypes.Value) error {
Expand Down
6 changes: 3 additions & 3 deletions go/vt/vtgate/vindexes/binary.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,10 @@ func (vind *Binary) Verify(_ VCursor, ids []sqltypes.Value, ksids [][]byte) ([]b
}

// Map returns the corresponding keyspace id values for the given ids.
func (vind *Binary) Map(_ VCursor, ids []sqltypes.Value) ([][]byte, error) {
out := make([][]byte, 0, len(ids))
func (vind *Binary) Map(_ VCursor, ids []sqltypes.Value) ([]KsidOrRange, error) {
out := make([]KsidOrRange, 0, len(ids))
for _, id := range ids {
out = append(out, id.ToBytes())
out = append(out, KsidOrRange{ID: id.ToBytes()})
}
return out, nil
}
Expand Down
Loading