Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions data/test/vtgate/dml_cases.txt
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,10 @@
}
}

# update by primary keyspace id, changing same vindex twice
"update user_metadata set email = 'a', email = 'b' where user_id = 1"
"column has duplicate set values: 'email'"

# update by primary keyspace id, changing multiple vindex columns
"update user_metadata set email = 'juan@vitess.io', address = '155 5th street' where user_id = 1"
{
Expand Down
37 changes: 36 additions & 1 deletion data/test/vtgate/vindex_func_cases.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,39 @@
# vindex func read
# vindex func read all cols
"select id, keyspace_id, range_start, range_end from user_index where id = :id"
{
"Original": "select id, keyspace_id, range_start, range_end from user_index where id = :id",
"Instructions": {
"Opcode": "VindexMap",
"Fields": [
{
"name": "id",
"type": 10262
},
{
"name": "keyspace_id",
"type": 10262
},
{
"name": "range_start",
"type": 10262
},
{
"name": "range_end",
"type": 10262
}
],
"Cols": [
0,
1,
2,
3
],
"Vindex": "user_index",
"Value": ":id"
}
}

# vindex func read with id repeated
"select id, keyspace_id, id from user_index where id = :id"
{
"Original": "select id, keyspace_id, id from user_index where id = :id",
Expand Down
18 changes: 15 additions & 3 deletions go/vt/srvtopo/resolve.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func MapKeyRangesToShards(ctx context.Context, topoServ Server, cell, keyspace s
}
uniqueShards := make(map[string]bool)
for _, kr := range krs {
ResolveKeyRangeToShards(allShards, uniqueShards, kr)
keyRangeToShardMap(allShards, uniqueShards, kr)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can remove keyRangeToShardMap altogether and do something like:

func MapKeyRangesToShards(ctx context.Context, topoServ Server, cell, keyspace string, tabletType topodatapb.TabletType, krs []*topodatapb.KeyRange) (string, []string, error) {
	keyspace, _, allShards, err := GetKeyspaceShards(ctx, topoServ, cell, keyspace, tabletType)
	if err != nil {
		return "", nil, err
	}
	var res []string
	for _, kr := range krs {
		res = append(res, GetShardsForKeyRange(allShards, kr)...)
	}
	return keyspace, res, nil
}

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was about to do it. Then realized that we can eventually deprecate this (all V2). So, left it mostly untouched.

}
var res = make([]string, 0, len(uniqueShards))
for s := range uniqueShards {
Expand All @@ -153,8 +153,8 @@ func MapKeyRangesToShards(ctx context.Context, topoServ Server, cell, keyspace s
return keyspace, res, nil
}

// ResolveKeyRangeToShards maps a list of keyranges to shard names.
func ResolveKeyRangeToShards(allShards []*topodatapb.ShardReference, matches map[string]bool, kr *topodatapb.KeyRange) {
// keyRangeToShardMap adds shards to a map based on the input KeyRange.
func keyRangeToShardMap(allShards []*topodatapb.ShardReference, matches map[string]bool, kr *topodatapb.KeyRange) {
if !key.KeyRangeIsPartial(kr) {
for _, shard := range allShards {
matches[shard.Name] = true
Expand All @@ -168,6 +168,18 @@ func ResolveKeyRangeToShards(allShards []*topodatapb.ShardReference, matches map
}
}

// GetShardsForKeyRange maps keyranges to shards.
func GetShardsForKeyRange(allShards []*topodatapb.ShardReference, kr *topodatapb.KeyRange) []string {
isPartial := key.KeyRangeIsPartial(kr)
var shards []string
for _, shard := range allShards {
if !isPartial || key.KeyRangesIntersect(kr, shard.KeyRange) {
shards = append(shards, shard.Name)
}
}
return shards
}

// MapExactShards maps a keyrange to shards only if there's a complete
// match. If there's any partial match the function returns no match.
func MapExactShards(ctx context.Context, topoServ Server, cell, keyspace string, tabletType topodatapb.TabletType, kr *topodatapb.KeyRange) (newkeyspace string, shards []string, err error) {
Expand Down
76 changes: 75 additions & 1 deletion go/vt/srvtopo/resolve_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,80 @@ func TestMapKeyRangesToShards(t *testing.T) {
}
}

func TestGetShardsForKeyRange(t *testing.T) {
ctx := context.Background()
rs, err := initTopo("TestGetShardsForKeyRange")
if err != nil {
t.Fatal(err)
}
_, _, allShards, err := GetKeyspaceShards(ctx, rs, "cell1", "sks", topodatapb.TabletType_MASTER)
if err != nil {
t.Fatal(err)
}

testCases := []struct {
input *topodatapb.KeyRange
output []string
}{{
input: &topodatapb.KeyRange{
Start: []byte{0x40},
End: []byte{0x60},
},
output: []string{
"40-60",
},
}, {
input: &topodatapb.KeyRange{
Start: []byte{0x40},
End: []byte{0x80},
},
output: []string{
"40-60",
"60-80",
},
}, {
input: &topodatapb.KeyRange{
Start: []byte{0x50},
End: []byte{0x70},
},
output: []string{
"40-60",
"60-80",
},
}, {
input: &topodatapb.KeyRange{},
output: []string{
"-20",
"20-40",
"40-60",
"60-80",
"80-a0",
"a0-c0",
"c0-e0",
"e0-",
},
}, {
input: nil,
output: []string{
"-20",
"20-40",
"40-60",
"60-80",
"80-a0",
"a0-c0",
"c0-e0",
"e0-",
},
}}

for _, testCase := range testCases {
shards := GetShardsForKeyRange(allShards, testCase.input)
if !reflect.DeepEqual(shards, testCase.output) {
t.Errorf("GetShardsForKeyRange(%s): %v, want %v", key.KeyRangeString(testCase.input), shards, testCase.output)
}
}
}

func TestMapExactShards(t *testing.T) {
ctx := context.Background()
rs, err := initTopo("TestMapExactShards")
Expand Down Expand Up @@ -201,6 +275,6 @@ func BenchmarkResolveKeyRangeToShards(b *testing.B) {

b.ResetTimer()
for i := 0; i < b.N; i++ {
ResolveKeyRangeToShards(allShards, uniqueShards, kr)
keyRangeToShardMap(allShards, uniqueShards, kr)
}
}
7 changes: 2 additions & 5 deletions go/vt/vtgate/autocommit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,11 +105,8 @@ func TestAutocommitUpdateVindexChange(t *testing.T) {
Sql: "select name, lastname from user2 where id = 1 for update",
BindVariables: map[string]*querypb.BindVariable{},
}, {
Sql: "update user2 set name = 'myname', lastname = 'mylastname' where id = 1 /* vtgate:: keyspace_id:166b40b44aba4bd6 */",
BindVariables: map[string]*querypb.BindVariable{
"_name0": sqltypes.BytesBindVariable([]byte("myname")),
"_lastname0": sqltypes.BytesBindVariable([]byte("mylastname")),
},
Sql: "update user2 set name = 'myname', lastname = 'mylastname' where id = 1 /* vtgate:: keyspace_id:166b40b44aba4bd6 */",
BindVariables: map[string]*querypb.BindVariable{},
}})
testAsTransactionCount(t, "sbc1", sbc1, 0)
testCommitCount(t, "sbc1", sbc1, 1)
Expand Down
8 changes: 8 additions & 0 deletions go/vt/vtgate/engine/merge_sort_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,10 @@ func (t *fakeVcursor) Execute(method string, query string, bindvars map[string]*
panic("unimplemented")
}

func (t *fakeVcursor) ExecuteAutocommit(method string, query string, bindvars map[string]*querypb.BindVariable, isDML bool) (*sqltypes.Result, error) {
panic("unimplemented")
}

func (t *fakeVcursor) ExecuteMultiShard(keyspace string, shardQueries map[string]*querypb.BoundQuery, isDML, canAutocommit bool) (*sqltypes.Result, error) {
panic("unimplemented")
}
Expand Down Expand Up @@ -374,6 +378,10 @@ func (t *fakeVcursor) GetKeyspaceShards(vkeyspace *vindexes.Keyspace) (string, [
panic("unimplemented")
}

func (t *fakeVcursor) GetShardsForKsids(allShards []*topodatapb.ShardReference, ksids vindexes.Ksids) ([]string, error) {
panic("unimplemented")
}

func (t *fakeVcursor) GetShardForKeyspaceID(allShards []*topodatapb.ShardReference, keyspaceID []byte) (string, error) {
panic("unimplemented")
}
8 changes: 8 additions & 0 deletions go/vt/vtgate/engine/primitive.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,20 @@ const ListVarName = "__vals"
type VCursor interface {
// Context returns the context of the current request.
Context() context.Context

// V3 functions.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the newcomer to the code it would be great to provide some more context on what "V3" means -- something like "Routing functions used by Vindex operations" would be clearer than "V3".

Perhaps you eventually plan to have the VCursor used in other contexts, but AFAIK Execute (and now ExecuteAutocommit) are only used by the vindex implementations so having this be clear in the interface would definitely help those of us who have tried to wade through what's going on here.

Execute(method string, query string, bindvars map[string]*querypb.BindVariable, isDML bool) (*sqltypes.Result, error)
ExecuteAutocommit(method string, query string, bindvars map[string]*querypb.BindVariable, isDML bool) (*sqltypes.Result, error)

// Shard-level functions.
ExecuteMultiShard(keyspace string, shardQueries map[string]*querypb.BoundQuery, isDML, canAutocommit bool) (*sqltypes.Result, error)
ExecuteStandalone(query string, bindvars map[string]*querypb.BindVariable, keyspace, shard string) (*sqltypes.Result, error)
StreamExecuteMulti(query string, keyspace string, shardVars map[string]map[string]*querypb.BindVariable, callback func(reply *sqltypes.Result) error) error

// Topo functions.
GetKeyspaceShards(vkeyspace *vindexes.Keyspace) (string, []*topodatapb.ShardReference, error)
GetShardForKeyspaceID(allShards []*topodatapb.ShardReference, keyspaceID []byte) (string, error)
GetShardsForKsids(allShards []*topodatapb.ShardReference, ksids vindexes.Ksids) ([]string, error)
}

// Plan represents the execution strategy for a given query.
Expand Down
46 changes: 20 additions & 26 deletions go/vt/vtgate/engine/route.go
Original file line number Diff line number Diff line change
Expand Up @@ -612,11 +612,11 @@ func (route *Route) resolveShards(vcursor VCursor, bindVars map[string]*querypb.
return "", nil, err
}
for i, ksids := range ksidss {
for _, ksid := range ksids {
shard, err := vcursor.GetShardForKeyspaceID(allShards, ksid)
if err != nil {
return "", nil, err
}
shards, err := vcursor.GetShardsForKsids(allShards, ksids)
if err != nil {
return "", nil, err
}
for _, shard := range shards {
routing.Add(shard, sqltypes.ValueToProto(vindexKeys[i]))
}
}
Expand Down Expand Up @@ -648,40 +648,34 @@ func (route *Route) resolveSingleShard(vcursor VCursor, bindVars map[string]*que
}

func (route *Route) updateChangedVindexes(subQueryResult *sqltypes.Result, vcursor VCursor, bindVars map[string]*querypb.BindVariable, ksid []byte) error {
if len(route.ChangedVindexValues) == 0 {
return nil
}
if len(subQueryResult.Rows) == 0 {
// NOOP, there are no actual rows changing due to this statement
return nil
}
for tIdx, colVindex := range route.Table.Owned {
if colValues, ok := route.ChangedVindexValues[colVindex.Name]; ok {
if len(subQueryResult.Rows) > 1 {
return vterrors.Errorf(vtrpcpb.Code_UNIMPLEMENTED, "unsupported: update changes multiple columns in the vindex")
}
if len(subQueryResult.Rows) > 1 {
return vterrors.Errorf(vtrpcpb.Code_UNIMPLEMENTED, "unsupported: update changes multiple columns in the vindex")
}
colnum := 0
for _, colVindex := range route.Table.Owned {
// Fetch the column values. colnum must keep incrementing.
fromIds := make([]sqltypes.Value, 0, len(colVindex.Columns))
for range colVindex.Columns {
fromIds = append(fromIds, subQueryResult.Rows[0][colnum])
colnum++
}

fromIds := make([][]sqltypes.Value, len(subQueryResult.Rows))
var vindexColumnKeys []sqltypes.Value
// Update columns only if they're being changed.
if colValues, ok := route.ChangedVindexValues[colVindex.Name]; ok {
vindexColumnKeys := make([]sqltypes.Value, 0, len(colValues))
for _, colValue := range colValues {
resolvedVal, err := colValue.ResolveValue(bindVars)
if err != nil {
return err
}
vindexColumnKeys = append(vindexColumnKeys, resolvedVal)

}

for rowIdx, row := range subQueryResult.Rows {
for colIdx := range colVindex.Columns {
fromIds[rowIdx] = append(fromIds[rowIdx], row[tIdx+colIdx])
}
}

if err := colVindex.Vindex.(vindexes.Lookup).Delete(vcursor, fromIds, ksid); err != nil {
return err
}
if err := route.processOwned(vcursor, [][]sqltypes.Value{vindexColumnKeys}, colVindex, bindVars, [][]byte{ksid}); err != nil {
if err := colVindex.Vindex.(vindexes.Lookup).Update(vcursor, fromIds, ksid, vindexColumnKeys); err != nil {
return err
}
}
Expand Down
42 changes: 33 additions & 9 deletions go/vt/vtgate/engine/vindex_func.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/youtube/vitess/go/vt/vtgate/vindexes"

querypb "github.com/youtube/vitess/go/vt/proto/query"
topodatapb "github.com/youtube/vitess/go/vt/proto/topodata"
)

var _ Primitive = (*VindexFunc)(nil)
Expand Down Expand Up @@ -121,7 +122,7 @@ func (vf *VindexFunc) mapVindex(vcursor VCursor, bindVars, joinVars map[string]*
}
if ksids[0] != nil {
result.Rows = [][]sqltypes.Value{
vf.buildRow(vkey, ksids[0]),
vf.buildRow(vkey, ksids[0], nil),
}
result.RowsAffected = 1
}
Expand All @@ -130,24 +131,47 @@ func (vf *VindexFunc) mapVindex(vcursor VCursor, bindVars, joinVars map[string]*
if err != nil {
return nil, err
}
for _, ksid := range ksidss[0] {
result.Rows = append(result.Rows, vf.buildRow(vkey, ksid))
if ksidss[0].Range != nil {
result.Rows = append(result.Rows, vf.buildRow(vkey, nil, ksidss[0].Range))
result.RowsAffected = 1
} else {
for _, ksid := range ksidss[0].IDs {
result.Rows = append(result.Rows, vf.buildRow(vkey, ksid, nil))
}
result.RowsAffected = uint64(len(ksidss[0].IDs))
}
result.RowsAffected = uint64(len(ksidss[0]))
default:
panic("unexpected")
}
return result, nil
}

func (vf *VindexFunc) buildRow(id sqltypes.Value, ksid []byte) []sqltypes.Value {
func (vf *VindexFunc) buildRow(id sqltypes.Value, ksid []byte, kr *topodatapb.KeyRange) []sqltypes.Value {
row := make([]sqltypes.Value, 0, len(vf.Fields))
keyspaceID := sqltypes.MakeTrusted(sqltypes.VarBinary, ksid)
for _, col := range vf.Cols {
if col == 0 {
switch col {
case 0:
row = append(row, id)
} else {
row = append(row, keyspaceID)
case 1:
if ksid != nil {
row = append(row, sqltypes.MakeTrusted(sqltypes.VarBinary, ksid))
} else {
row = append(row, sqltypes.NULL)
}
case 2:
if kr != nil {
row = append(row, sqltypes.MakeTrusted(sqltypes.VarBinary, kr.Start))
} else {
row = append(row, sqltypes.NULL)
}
case 3:
if kr != nil {
row = append(row, sqltypes.MakeTrusted(sqltypes.VarBinary, kr.End))
} else {
row = append(row, sqltypes.NULL)
}
default:
panic("BUG: unexpected column number")
}
}
return row
Expand Down
Loading