diff --git a/data/test/vtgate/dml_cases.txt b/data/test/vtgate/dml_cases.txt index b426807c8ee..227d7d06173 100644 --- a/data/test/vtgate/dml_cases.txt +++ b/data/test/vtgate/dml_cases.txt @@ -200,6 +200,10 @@ } } +# update by primary keyspace id, changing same vindex twice +"update user_metadata set email = 'a', email = 'b' where user_id = 1" +"column has duplicate set values: 'email'" + # update by primary keyspace id, changing multiple vindex columns "update user_metadata set email = 'juan@vitess.io', address = '155 5th street' where user_id = 1" { diff --git a/data/test/vtgate/vindex_func_cases.txt b/data/test/vtgate/vindex_func_cases.txt index d98be955a86..bec3949165f 100644 --- a/data/test/vtgate/vindex_func_cases.txt +++ b/data/test/vtgate/vindex_func_cases.txt @@ -1,4 +1,39 @@ -# vindex func read +# vindex func read all cols +"select id, keyspace_id, range_start, range_end from user_index where id = :id" +{ + "Original": "select id, keyspace_id, range_start, range_end from user_index where id = :id", + "Instructions": { + "Opcode": "VindexMap", + "Fields": [ + { + "name": "id", + "type": 10262 + }, + { + "name": "keyspace_id", + "type": 10262 + }, + { + "name": "range_start", + "type": 10262 + }, + { + "name": "range_end", + "type": 10262 + } + ], + "Cols": [ + 0, + 1, + 2, + 3 + ], + "Vindex": "user_index", + "Value": ":id" + } +} + +# vindex func read with id repeated "select id, keyspace_id, id from user_index where id = :id" { "Original": "select id, keyspace_id, id from user_index where id = :id", diff --git a/go/vt/srvtopo/resolve.go b/go/vt/srvtopo/resolve.go index 08ccd40b992..a1a0b217822 100644 --- a/go/vt/srvtopo/resolve.go +++ b/go/vt/srvtopo/resolve.go @@ -144,7 +144,7 @@ func MapKeyRangesToShards(ctx context.Context, topoServ Server, cell, keyspace s } uniqueShards := make(map[string]bool) for _, kr := range krs { - ResolveKeyRangeToShards(allShards, uniqueShards, kr) + keyRangeToShardMap(allShards, uniqueShards, kr) } var res = make([]string, 0, len(uniqueShards)) for s := range uniqueShards { @@ -153,8 +153,8 @@ func MapKeyRangesToShards(ctx context.Context, topoServ Server, cell, keyspace s return keyspace, res, nil } -// ResolveKeyRangeToShards maps a list of keyranges to shard names. -func ResolveKeyRangeToShards(allShards []*topodatapb.ShardReference, matches map[string]bool, kr *topodatapb.KeyRange) { +// keyRangeToShardMap adds shards to a map based on the input KeyRange. +func keyRangeToShardMap(allShards []*topodatapb.ShardReference, matches map[string]bool, kr *topodatapb.KeyRange) { if !key.KeyRangeIsPartial(kr) { for _, shard := range allShards { matches[shard.Name] = true @@ -168,6 +168,18 @@ func ResolveKeyRangeToShards(allShards []*topodatapb.ShardReference, matches map } } +// GetShardsForKeyRange maps keyranges to shards. +func GetShardsForKeyRange(allShards []*topodatapb.ShardReference, kr *topodatapb.KeyRange) []string { + isPartial := key.KeyRangeIsPartial(kr) + var shards []string + for _, shard := range allShards { + if !isPartial || key.KeyRangesIntersect(kr, shard.KeyRange) { + shards = append(shards, shard.Name) + } + } + return shards +} + // MapExactShards maps a keyrange to shards only if there's a complete // match. If there's any partial match the function returns no match. func MapExactShards(ctx context.Context, topoServ Server, cell, keyspace string, tabletType topodatapb.TabletType, kr *topodatapb.KeyRange) (newkeyspace string, shards []string, err error) { diff --git a/go/vt/srvtopo/resolve_test.go b/go/vt/srvtopo/resolve_test.go index 475ebf281d4..bc91ba8d92a 100644 --- a/go/vt/srvtopo/resolve_test.go +++ b/go/vt/srvtopo/resolve_test.go @@ -134,6 +134,80 @@ func TestMapKeyRangesToShards(t *testing.T) { } } +func TestGetShardsForKeyRange(t *testing.T) { + ctx := context.Background() + rs, err := initTopo("TestGetShardsForKeyRange") + if err != nil { + t.Fatal(err) + } + _, _, allShards, err := GetKeyspaceShards(ctx, rs, "cell1", "sks", topodatapb.TabletType_MASTER) + if err != nil { + t.Fatal(err) + } + + testCases := []struct { + input *topodatapb.KeyRange + output []string + }{{ + input: &topodatapb.KeyRange{ + Start: []byte{0x40}, + End: []byte{0x60}, + }, + output: []string{ + "40-60", + }, + }, { + input: &topodatapb.KeyRange{ + Start: []byte{0x40}, + End: []byte{0x80}, + }, + output: []string{ + "40-60", + "60-80", + }, + }, { + input: &topodatapb.KeyRange{ + Start: []byte{0x50}, + End: []byte{0x70}, + }, + output: []string{ + "40-60", + "60-80", + }, + }, { + input: &topodatapb.KeyRange{}, + output: []string{ + "-20", + "20-40", + "40-60", + "60-80", + "80-a0", + "a0-c0", + "c0-e0", + "e0-", + }, + }, { + input: nil, + output: []string{ + "-20", + "20-40", + "40-60", + "60-80", + "80-a0", + "a0-c0", + "c0-e0", + "e0-", + }, + }} + + for _, testCase := range testCases { + shards := GetShardsForKeyRange(allShards, testCase.input) + if !reflect.DeepEqual(shards, testCase.output) { + t.Errorf("GetShardsForKeyRange(%s): %v, want %v", key.KeyRangeString(testCase.input), shards, testCase.output) + } + } +} + func TestMapExactShards(t *testing.T) { ctx := context.Background() rs, err := initTopo("TestMapExactShards") @@ -201,6 +275,6 @@ func BenchmarkResolveKeyRangeToShards(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { - ResolveKeyRangeToShards(allShards, uniqueShards, kr) + keyRangeToShardMap(allShards, uniqueShards, kr) } } diff --git a/go/vt/vtgate/autocommit_test.go b/go/vt/vtgate/autocommit_test.go index 658c4ad313d..252336fd448 100644 --- a/go/vt/vtgate/autocommit_test.go +++ b/go/vt/vtgate/autocommit_test.go @@ -105,11 +105,8 @@ func TestAutocommitUpdateVindexChange(t *testing.T) { Sql: "select name, lastname from user2 where id = 1 for update", BindVariables: map[string]*querypb.BindVariable{}, }, { - Sql: "update user2 set name = 'myname', lastname = 'mylastname' where id = 1 /* vtgate:: keyspace_id:166b40b44aba4bd6 */", - BindVariables: map[string]*querypb.BindVariable{ - "_name0": sqltypes.BytesBindVariable([]byte("myname")), - "_lastname0": sqltypes.BytesBindVariable([]byte("mylastname")), - }, + Sql: "update user2 set name = 'myname', lastname = 'mylastname' where id = 1 /* vtgate:: keyspace_id:166b40b44aba4bd6 */", + BindVariables: map[string]*querypb.BindVariable{}, }}) testAsTransactionCount(t, "sbc1", sbc1, 0) testCommitCount(t, "sbc1", sbc1, 1) diff --git a/go/vt/vtgate/engine/merge_sort_test.go b/go/vt/vtgate/engine/merge_sort_test.go index bd900acb14a..87335cb32e7 100644 --- a/go/vt/vtgate/engine/merge_sort_test.go +++ b/go/vt/vtgate/engine/merge_sort_test.go @@ -342,6 +342,10 @@ func (t *fakeVcursor) Execute(method string, query string, bindvars map[string]* panic("unimplemented") } +func (t *fakeVcursor) ExecuteAutocommit(method string, query string, bindvars map[string]*querypb.BindVariable, isDML bool) (*sqltypes.Result, error) { + panic("unimplemented") +} + func (t *fakeVcursor) ExecuteMultiShard(keyspace string, shardQueries map[string]*querypb.BoundQuery, isDML, canAutocommit bool) (*sqltypes.Result, error) { panic("unimplemented") } @@ -374,6 +378,10 @@ func (t *fakeVcursor) GetKeyspaceShards(vkeyspace *vindexes.Keyspace) (string, [ panic("unimplemented") } +func (t *fakeVcursor) GetShardsForKsids(allShards []*topodatapb.ShardReference, ksids vindexes.Ksids) ([]string, error) { + panic("unimplemented") +} + func (t *fakeVcursor) GetShardForKeyspaceID(allShards []*topodatapb.ShardReference, keyspaceID []byte) (string, error) { panic("unimplemented") } diff --git a/go/vt/vtgate/engine/primitive.go b/go/vt/vtgate/engine/primitive.go index 07458c30ad7..e4f35eef6af 100644 --- a/go/vt/vtgate/engine/primitive.go +++ b/go/vt/vtgate/engine/primitive.go @@ -42,12 +42,20 @@ const ListVarName = "__vals" type VCursor interface { // Context returns the context of the current request. Context() context.Context + + // V3 functions. Execute(method string, query string, bindvars map[string]*querypb.BindVariable, isDML bool) (*sqltypes.Result, error) + ExecuteAutocommit(method string, query string, bindvars map[string]*querypb.BindVariable, isDML bool) (*sqltypes.Result, error) + + // Shard-level functions. ExecuteMultiShard(keyspace string, shardQueries map[string]*querypb.BoundQuery, isDML, canAutocommit bool) (*sqltypes.Result, error) ExecuteStandalone(query string, bindvars map[string]*querypb.BindVariable, keyspace, shard string) (*sqltypes.Result, error) StreamExecuteMulti(query string, keyspace string, shardVars map[string]map[string]*querypb.BindVariable, callback func(reply *sqltypes.Result) error) error + + // Topo functions. GetKeyspaceShards(vkeyspace *vindexes.Keyspace) (string, []*topodatapb.ShardReference, error) GetShardForKeyspaceID(allShards []*topodatapb.ShardReference, keyspaceID []byte) (string, error) + GetShardsForKsids(allShards []*topodatapb.ShardReference, ksids vindexes.Ksids) ([]string, error) } // Plan represents the execution strategy for a given query. diff --git a/go/vt/vtgate/engine/route.go b/go/vt/vtgate/engine/route.go index 1e51e73ef1e..94cbab83ba4 100644 --- a/go/vt/vtgate/engine/route.go +++ b/go/vt/vtgate/engine/route.go @@ -612,11 +612,11 @@ func (route *Route) resolveShards(vcursor VCursor, bindVars map[string]*querypb. return "", nil, err } for i, ksids := range ksidss { - for _, ksid := range ksids { - shard, err := vcursor.GetShardForKeyspaceID(allShards, ksid) - if err != nil { - return "", nil, err - } + shards, err := vcursor.GetShardsForKsids(allShards, ksids) + if err != nil { + return "", nil, err + } + for _, shard := range shards { routing.Add(shard, sqltypes.ValueToProto(vindexKeys[i])) } } @@ -648,40 +648,34 @@ func (route *Route) resolveSingleShard(vcursor VCursor, bindVars map[string]*que } func (route *Route) updateChangedVindexes(subQueryResult *sqltypes.Result, vcursor VCursor, bindVars map[string]*querypb.BindVariable, ksid []byte) error { - if len(route.ChangedVindexValues) == 0 { - return nil - } if len(subQueryResult.Rows) == 0 { // NOOP, there are no actual rows changing due to this statement return nil } - for tIdx, colVindex := range route.Table.Owned { - if colValues, ok := route.ChangedVindexValues[colVindex.Name]; ok { - if len(subQueryResult.Rows) > 1 { - return vterrors.Errorf(vtrpcpb.Code_UNIMPLEMENTED, "unsupported: update changes multiple columns in the vindex") - } + if len(subQueryResult.Rows) > 1 { + return vterrors.Errorf(vtrpcpb.Code_UNIMPLEMENTED, "unsupported: update changes multiple columns in the vindex") + } + colnum := 0 + for _, colVindex := range route.Table.Owned { + // Fetch the column values. colnum must keep incrementing. + fromIds := make([]sqltypes.Value, 0, len(colVindex.Columns)) + for range colVindex.Columns { + fromIds = append(fromIds, subQueryResult.Rows[0][colnum]) + colnum++ + } - fromIds := make([][]sqltypes.Value, len(subQueryResult.Rows)) - var vindexColumnKeys []sqltypes.Value + // Update columns only if they're being changed. + if colValues, ok := route.ChangedVindexValues[colVindex.Name]; ok { + vindexColumnKeys := make([]sqltypes.Value, 0, len(colValues)) for _, colValue := range colValues { resolvedVal, err := colValue.ResolveValue(bindVars) if err != nil { return err } vindexColumnKeys = append(vindexColumnKeys, resolvedVal) - } - for rowIdx, row := range subQueryResult.Rows { - for colIdx := range colVindex.Columns { - fromIds[rowIdx] = append(fromIds[rowIdx], row[tIdx+colIdx]) - } - } - - if err := colVindex.Vindex.(vindexes.Lookup).Delete(vcursor, fromIds, ksid); err != nil { - return err - } - if err := route.processOwned(vcursor, [][]sqltypes.Value{vindexColumnKeys}, colVindex, bindVars, [][]byte{ksid}); err != nil { + if err := colVindex.Vindex.(vindexes.Lookup).Update(vcursor, fromIds, ksid, vindexColumnKeys); err != nil { return err } } diff --git a/go/vt/vtgate/engine/vindex_func.go b/go/vt/vtgate/engine/vindex_func.go index 70fb140d81b..9aa8ded99fb 100644 --- a/go/vt/vtgate/engine/vindex_func.go +++ b/go/vt/vtgate/engine/vindex_func.go @@ -23,6 +23,7 @@ import ( "github.com/youtube/vitess/go/vt/vtgate/vindexes" querypb "github.com/youtube/vitess/go/vt/proto/query" + topodatapb "github.com/youtube/vitess/go/vt/proto/topodata" ) var _ Primitive = (*VindexFunc)(nil) @@ -121,7 +122,7 @@ func (vf *VindexFunc) mapVindex(vcursor VCursor, bindVars, joinVars map[string]* } if ksids[0] != nil { result.Rows = [][]sqltypes.Value{ - vf.buildRow(vkey, ksids[0]), + vf.buildRow(vkey, ksids[0], nil), } result.RowsAffected = 1 } @@ -130,24 +131,47 @@ func (vf *VindexFunc) mapVindex(vcursor VCursor, bindVars, joinVars map[string]* if err != nil { return nil, err } - for _, ksid := range ksidss[0] { - result.Rows = append(result.Rows, vf.buildRow(vkey, ksid)) + if ksidss[0].Range != nil { + result.Rows = append(result.Rows, vf.buildRow(vkey, nil, ksidss[0].Range)) + result.RowsAffected = 1 + } else { + for _, ksid := range ksidss[0].IDs { + result.Rows = append(result.Rows, vf.buildRow(vkey, ksid, nil)) + } + result.RowsAffected = uint64(len(ksidss[0].IDs)) } - result.RowsAffected = uint64(len(ksidss[0])) default: panic("unexpected") } return result, nil } -func (vf *VindexFunc) buildRow(id sqltypes.Value, ksid []byte) []sqltypes.Value { +func (vf *VindexFunc) buildRow(id sqltypes.Value, ksid []byte, kr *topodatapb.KeyRange) []sqltypes.Value { row := make([]sqltypes.Value, 0, len(vf.Fields)) - keyspaceID := sqltypes.MakeTrusted(sqltypes.VarBinary, ksid) for _, col := range vf.Cols { - if col == 0 { + switch col { + case 0: row = append(row, id) - } else { - row = append(row, keyspaceID) + case 1: + if ksid != nil { + row = append(row, sqltypes.MakeTrusted(sqltypes.VarBinary, ksid)) + } else { + row = append(row, sqltypes.NULL) + } + case 2: + if kr != nil { + row = append(row, sqltypes.MakeTrusted(sqltypes.VarBinary, kr.Start)) + } else { + row = append(row, sqltypes.NULL) + } + case 3: + if kr != nil { + row = append(row, sqltypes.MakeTrusted(sqltypes.VarBinary, kr.End)) + } else { + row = append(row, sqltypes.NULL) + } + default: + panic("BUG: unexpected column number") } } return row diff --git a/go/vt/vtgate/engine/vindex_func_test.go b/go/vt/vtgate/engine/vindex_func_test.go index d4f375709a4..0184b801470 100644 --- a/go/vt/vtgate/engine/vindex_func_test.go +++ b/go/vt/vtgate/engine/vindex_func_test.go @@ -21,6 +21,7 @@ import ( "testing" "github.com/youtube/vitess/go/sqltypes" + topodatapb "github.com/youtube/vitess/go/vt/proto/topodata" "github.com/youtube/vitess/go/vt/vtgate/vindexes" ) @@ -43,7 +44,7 @@ func (v *uvindex) Map(vindexes.VCursor, []sqltypes.Value) ([][]byte, error) { } // nvindex is NonUnique. -type nvindex struct{ match bool } +type nvindex struct{ matchid, matchkr bool } func (*nvindex) String() string { return "nvindex" } func (*nvindex) Cost() int { return 1 } @@ -51,13 +52,24 @@ func (*nvindex) Verify(vindexes.VCursor, []sqltypes.Value, [][]byte) ([]bool, er panic("unimplemented") } -func (v *nvindex) Map(vindexes.VCursor, []sqltypes.Value) ([][][]byte, error) { - if v.match { - return [][][]byte{{ - []byte("foo"), []byte("bar"), +func (v *nvindex) Map(vindexes.VCursor, []sqltypes.Value) ([]vindexes.Ksids, error) { + if v.matchid { + return []vindexes.Ksids{{ + IDs: [][]byte{ + []byte("foo"), + []byte("bar"), + }, + }}, nil + } + if v.matchkr { + return []vindexes.Ksids{{ + Range: &topodatapb.KeyRange{ + Start: []byte{0x40}, + End: []byte{0x60}, + }, }}, nil } - return [][][]byte{nil}, nil + return []vindexes.Ksids{{}}, nil } func TestVindexFuncMap(t *testing.T) { @@ -68,7 +80,7 @@ func TestVindexFuncMap(t *testing.T) { t.Fatal(err) } want := &sqltypes.Result{ - Fields: sqltypes.MakeTestFields("id|keyspace_id", "varbinary|varbinary"), + Fields: sqltypes.MakeTestFields("id|keyspace_id|range_start|range_end", "varbinary|varbinary|varbinary|varbinary"), } if !reflect.DeepEqual(got, want) { t.Errorf("Execute(Map, uvindex(none)):\n%v, want\n%v", got, want) @@ -81,7 +93,7 @@ func TestVindexFuncMap(t *testing.T) { t.Fatal(err) } want = sqltypes.MakeTestResult( - sqltypes.MakeTestFields("id|keyspace_id", "varbinary|varbinary"), + sqltypes.MakeTestFields("id|keyspace_id|range_start|range_end", "varbinary|varbinary|varbinary|varbinary"), "1|foo", ) if !reflect.DeepEqual(got, want) { @@ -95,37 +107,62 @@ func TestVindexFuncMap(t *testing.T) { t.Fatal(err) } want = &sqltypes.Result{ - Fields: sqltypes.MakeTestFields("id|keyspace_id", "varbinary|varbinary"), + Fields: sqltypes.MakeTestFields("id|keyspace_id|range_start|range_end", "varbinary|varbinary|varbinary|varbinary"), } if !reflect.DeepEqual(got, want) { t.Errorf("Execute(Map, uvindex(none)):\n%v, want\n%v", got, want) } // NonUnique Vindex returning 2 rows. - vf = testVindexFunc(&nvindex{match: true}) + vf = testVindexFunc(&nvindex{matchid: true}) got, err = vf.Execute(nil, nil, nil, false) if err != nil { t.Fatal(err) } want = sqltypes.MakeTestResult( - sqltypes.MakeTestFields("id|keyspace_id", "varbinary|varbinary"), - "1|foo", - "1|bar", + sqltypes.MakeTestFields("id|keyspace_id|range_start|range_end", "varbinary|varbinary|varbinary|varbinary"), + "1|foo||", + "1|bar||", ) + // Massage the rows because MakeTestResult doesn't do NULL values. + for _, row := range want.Rows { + row[2] = sqltypes.NULL + row[3] = sqltypes.NULL + } + if !reflect.DeepEqual(got, want) { + t.Errorf("Execute(Map, uvindex(none)):\n%v, want\n%v", got, want) + } + + // NonUnique Vindex returning keyrange + vf = testVindexFunc(&nvindex{matchkr: true}) + got, err = vf.Execute(nil, nil, nil, false) + if err != nil { + t.Fatal(err) + } + want = &sqltypes.Result{ + Fields: sqltypes.MakeTestFields("id|keyspace_id|range_start|range_end", "varbinary|varbinary|varbinary|varbinary"), + Rows: [][]sqltypes.Value{{ + sqltypes.NewVarBinary("1"), + sqltypes.NULL, + sqltypes.MakeTrusted(sqltypes.VarBinary, []byte{0x40}), + sqltypes.MakeTrusted(sqltypes.VarBinary, []byte{0x60}), + }}, + RowsAffected: 1, + } if !reflect.DeepEqual(got, want) { t.Errorf("Execute(Map, uvindex(none)):\n%v, want\n%v", got, want) } } func TestVindexFuncStreamExecute(t *testing.T) { - vf := testVindexFunc(&nvindex{match: true}) + vf := testVindexFunc(&nvindex{matchid: true}) want := []*sqltypes.Result{{ - Fields: sqltypes.MakeTestFields("id|keyspace_id", "varbinary|varbinary"), + Fields: sqltypes.MakeTestFields("id|keyspace_id|range_start|range_end", "varbinary|varbinary|varbinary|varbinary"), }, { Rows: [][]sqltypes.Value{{ - sqltypes.NewVarBinary("1"), sqltypes.NewVarBinary("foo"), + sqltypes.NewVarBinary("1"), sqltypes.NewVarBinary("foo"), sqltypes.NULL, sqltypes.NULL, }, { - sqltypes.NewVarBinary("1"), sqltypes.NewVarBinary("bar"), + sqltypes.NewVarBinary("1"), sqltypes.NewVarBinary("bar"), sqltypes.NULL, sqltypes.NULL, }}, }} i := 0 @@ -148,7 +185,7 @@ func TestVindexFuncGetFields(t *testing.T) { t.Fatal(err) } want := &sqltypes.Result{ - Fields: sqltypes.MakeTestFields("id|keyspace_id", "varbinary|varbinary"), + Fields: sqltypes.MakeTestFields("id|keyspace_id|range_start|range_end", "varbinary|varbinary|varbinary|varbinary"), } if !reflect.DeepEqual(got, want) { t.Errorf("Execute(Map, uvindex(none)):\n%v, want\n%v", got, want) @@ -156,7 +193,7 @@ func TestVindexFuncGetFields(t *testing.T) { } func TestFieldOrder(t *testing.T) { - vf := testVindexFunc(&nvindex{match: true}) + vf := testVindexFunc(&nvindex{matchid: true}) vf.Fields = sqltypes.MakeTestFields("keyspace_id|id|keyspace_id", "varbinary|varbinary|varbinary") vf.Cols = []int{1, 0, 1} got, err := vf.Execute(nil, nil, nil, true) @@ -175,8 +212,8 @@ func TestFieldOrder(t *testing.T) { func testVindexFunc(v vindexes.Vindex) *VindexFunc { return &VindexFunc{ - Fields: sqltypes.MakeTestFields("id|keyspace_id", "varbinary|varbinary"), - Cols: []int{0, 1}, + Fields: sqltypes.MakeTestFields("id|keyspace_id|range_start|range_end", "varbinary|varbinary|varbinary|varbinary"), + Cols: []int{0, 1, 2, 3}, Opcode: VindexMap, Vindex: v, Value: int64PlanValue(1), diff --git a/go/vt/vtgate/executor_dml_test.go b/go/vt/vtgate/executor_dml_test.go index 756d0fe3d21..d95d7714e16 100644 --- a/go/vt/vtgate/executor_dml_test.go +++ b/go/vt/vtgate/executor_dml_test.go @@ -106,11 +106,8 @@ func TestUpdateEqual(t *testing.T) { BindVariables: map[string]*querypb.BindVariable{}, }, { - Sql: "update user2 set name = 'myname', lastname = 'mylastname' where id = 1 /* vtgate:: keyspace_id:166b40b44aba4bd6 */", - BindVariables: map[string]*querypb.BindVariable{ - "_name0": sqltypes.BytesBindVariable([]byte("myname")), - "_lastname0": sqltypes.BytesBindVariable([]byte("mylastname")), - }, + Sql: "update user2 set name = 'myname', lastname = 'mylastname' where id = 1 /* vtgate:: keyspace_id:166b40b44aba4bd6 */", + BindVariables: map[string]*querypb.BindVariable{}, }, } if !reflect.DeepEqual(sbc1.Queries, wantQueries) { @@ -142,7 +139,127 @@ func TestUpdateEqual(t *testing.T) { if !reflect.DeepEqual(sbclookup.Queries, wantQueries) { t.Errorf("sbclookup.Queries: %+v, want %+v\n", sbclookup.Queries, wantQueries) } +} +func TestUpdateMultiOwned(t *testing.T) { + vschema := ` +{ + "sharded": true, + "vindexes": { + "hash_index": { + "type": "hash" + }, + "lookup1": { + "type": "lookup_hash_unique", + "owner": "user", + "params": { + "table": "music_user_map", + "from": "from1,from2", + "to": "user_id" + } + }, + "lookup2": { + "type": "lookup_hash_unique", + "owner": "user", + "params": { + "table": "music_user_map", + "from": "from1,from2", + "to": "user_id" + } + }, + "lookup3": { + "type": "lookup_hash_unique", + "owner": "user", + "params": { + "table": "music_user_map", + "from": "from1,from2", + "to": "user_id" + } + } + }, + "tables": { + "user": { + "column_vindexes": [ + { + "column": "id", + "name": "hash_index" + }, + { + "columns": ["a", "b"], + "name": "lookup1" + }, + { + "columns": ["c", "d"], + "name": "lookup2" + }, + { + "columns": ["e", "f"], + "name": "lookup3" + } + ] + } + } +} +` + executor, sbc1, sbc2, sbclookup := createCustomExecutor(vschema) + + sbc1.SetResults([]*sqltypes.Result{ + sqltypes.MakeTestResult( + sqltypes.MakeTestFields("a|b|c|d|e|f", "int64|int64|int64|int64|int64|int64"), + "10|20|30|40|50|60", + ), + }) + _, err := executorExec(executor, "update user set a=1, b=2, f=4, e=3 where id=1", nil) + if err != nil { + t.Fatal(err) + } + wantQueries := []*querypb.BoundQuery{{ + Sql: "select a, b, c, d, e, f from user where id = 1 for update", + BindVariables: map[string]*querypb.BindVariable{}, + }, { + Sql: "update user set a = 1, b = 2, f = 4, e = 3 where id = 1 /* vtgate:: keyspace_id:166b40b44aba4bd6 */", + BindVariables: map[string]*querypb.BindVariable{}, + }} + if !reflect.DeepEqual(sbc1.Queries, wantQueries) { + t.Errorf("sbc1.Queries:\n%+v, want\n%+v\n", sbc1.Queries, wantQueries) + } + if sbc2.Queries != nil { + t.Errorf("sbc2.Queries: %+v, want nil\n", sbc2.Queries) + } + + wantQueries = []*querypb.BoundQuery{{ + Sql: "delete from music_user_map where from1 = :from1 and from2 = :from2 and user_id = :user_id", + BindVariables: map[string]*querypb.BindVariable{ + "from1": sqltypes.Int64BindVariable(10), + "from2": sqltypes.Int64BindVariable(20), + "user_id": sqltypes.Uint64BindVariable(1), + }, + }, { + Sql: "insert into music_user_map(from1, from2, user_id) values (:from10, :from20, :user_id0)", + BindVariables: map[string]*querypb.BindVariable{ + "from10": sqltypes.Int64BindVariable(1), + "from20": sqltypes.Int64BindVariable(2), + "user_id0": sqltypes.Uint64BindVariable(1), + }, + }, { + Sql: "delete from music_user_map where from1 = :from1 and from2 = :from2 and user_id = :user_id", + BindVariables: map[string]*querypb.BindVariable{ + "from1": sqltypes.Int64BindVariable(50), + "from2": sqltypes.Int64BindVariable(60), + "user_id": sqltypes.Uint64BindVariable(1), + }, + }, { + Sql: "insert into music_user_map(from1, from2, user_id) values (:from10, :from20, :user_id0)", + BindVariables: map[string]*querypb.BindVariable{ + "from10": sqltypes.Int64BindVariable(3), + "from20": sqltypes.Int64BindVariable(4), + "user_id0": sqltypes.Uint64BindVariable(1), + }, + }} + + if !reflect.DeepEqual(sbclookup.Queries, wantQueries) { + t.Errorf("sbclookup.Queries:\n%+v, want\n%+v\n", sbclookup.Queries, wantQueries) + } } func TestUpdateComments(t *testing.T) { @@ -531,6 +648,85 @@ func TestInsertSharded(t *testing.T) { } } +func TestInsertShardedAutocommitLookup(t *testing.T) { + + vschema := ` +{ + "sharded": true, + "vindexes": { + "hash_index": { + "type": "hash" + }, + "name_user_map": { + "type": "lookup_hash", + "owner": "user", + "params": { + "table": "name_user_map", + "from": "name", + "to": "user_id", + "autocommit": "true" + } + } + }, + "tables": { + "user": { + "column_vindexes": [ + { + "column": "Id", + "name": "hash_index" + }, + { + "column": "name", + "name": "name_user_map" + } + ], + "auto_increment": { + "column": "id", + "sequence": "user_seq" + }, + "columns": [ + { + "name": "textcol", + "type": "VARCHAR" + } + ] + } + } +} +` + executor, sbc1, sbc2, sbclookup := createCustomExecutor(vschema) + + _, err := executorExec(executor, "insert into user(id, v, name) values (1, 2, 'myname')", nil) + if err != nil { + t.Error(err) + } + wantQueries := []*querypb.BoundQuery{{ + Sql: "insert into user(id, v, name) values (:_Id0, 2, :_name0) /* vtgate:: keyspace_id:166b40b44aba4bd6 */", + BindVariables: map[string]*querypb.BindVariable{ + "_Id0": sqltypes.Int64BindVariable(1), + "_name0": sqltypes.BytesBindVariable([]byte("myname")), + "__seq0": sqltypes.Int64BindVariable(1), + }, + }} + if !reflect.DeepEqual(sbc1.Queries, wantQueries) { + t.Errorf("sbc1.Queries:\n%+v, want\n%+v\n", sbc1.Queries, wantQueries) + } + if sbc2.Queries != nil { + t.Errorf("sbc2.Queries: %+v, want nil\n", sbc2.Queries) + } + wantQueries = []*querypb.BoundQuery{{ + Sql: "insert into name_user_map(name, user_id) values (:name0, :user_id0) on duplicate key update name = values(name), user_id = values(user_id)", + BindVariables: map[string]*querypb.BindVariable{ + "name0": sqltypes.BytesBindVariable([]byte("myname")), + "user_id0": sqltypes.Uint64BindVariable(1), + }, + }} + // autocommit should go as ExecuteBatch + if !reflect.DeepEqual(sbclookup.BatchQueries[0], wantQueries) { + t.Errorf("sbclookup.BatchQueries[0]: \n%+v, want \n%+v", sbclookup.BatchQueries[0], wantQueries) + } +} + func TestInsertShardedIgnore(t *testing.T) { executor, sbc1, sbc2, sbclookup := createExecutorEnv() diff --git a/go/vt/vtgate/executor_framework_test.go b/go/vt/vtgate/executor_framework_test.go index 2f6fb6af06d..6970b89bc2e 100644 --- a/go/vt/vtgate/executor_framework_test.go +++ b/go/vt/vtgate/executor_framework_test.go @@ -26,6 +26,7 @@ import ( "github.com/youtube/vitess/go/sqltypes" "github.com/youtube/vitess/go/streamlog" "github.com/youtube/vitess/go/vt/discovery" + "github.com/youtube/vitess/go/vt/vtgate/vindexes" "github.com/youtube/vitess/go/vt/vttablet/sandboxconn" "golang.org/x/net/context" @@ -85,6 +86,9 @@ var executorVSchema = ` }, "keyspace_id": { "type": "numeric" + }, + "krcol_vdx": { + "type": "keyrange_lookuper" } }, "tables": { @@ -194,6 +198,18 @@ var executorVSchema = ` } ] }, + "keyrange_table": { + "column_vindexes": [ + { + "column": "id", + "name": "hash_index" + }, + { + "column": "krcol", + "name": "krcol_vdx" + } + ] + }, "ksid_table": { "column_vindexes": [ { @@ -205,6 +221,7 @@ var executorVSchema = ` } } ` + var badVSchema = ` { "sharded": false, @@ -236,6 +253,31 @@ var unshardedVSchema = ` } ` +// keyRangeLookuper is for testing a lookup that returns a keyrange. +type keyRangeLookuper struct { +} + +func (v *keyRangeLookuper) String() string { return "keyrange_lookuper" } +func (*keyRangeLookuper) Cost() int { return 0 } +func (*keyRangeLookuper) Verify(vindexes.VCursor, []sqltypes.Value, [][]byte) ([]bool, error) { + return []bool{}, nil +} +func (*keyRangeLookuper) Map(vindexes.VCursor, []sqltypes.Value) ([]vindexes.Ksids, error) { + return []vindexes.Ksids{{ + Range: &topodatapb.KeyRange{ + End: []byte{0x10}, + }, + }}, nil +} + +func newLookupMigrator(name string, params map[string]string) (vindexes.Vindex, error) { + return &keyRangeLookuper{}, nil +} + +func init() { + vindexes.Register("keyrange_lookuper", newLookupMigrator) +} + const testBufferSize = 10 const testCacheSize = int64(10) @@ -270,6 +312,24 @@ func createExecutorEnv() (executor *Executor, sbc1, sbc2, sbclookup *sandboxconn return executor, sbc1, sbc2, sbclookup } +func createCustomExecutor(vschema string) (executor *Executor, sbc1, sbc2, sbclookup *sandboxconn.SandboxConn) { + cell := "aa" + hc := discovery.NewFakeHealthCheck() + s := createSandbox("TestExecutor") + s.VSchema = vschema + serv := new(sandboxTopo) + resolver := newTestResolver(hc, serv, cell) + sbc1 = hc.AddTestTablet(cell, "-20", 1, "TestExecutor", "-20", topodatapb.TabletType_MASTER, true, 1, nil) + sbc2 = hc.AddTestTablet(cell, "40-60", 1, "TestExecutor", "40-60", topodatapb.TabletType_MASTER, true, 1, nil) + + createSandbox(KsTestUnsharded) + sbclookup = hc.AddTestTablet(cell, "0", 1, KsTestUnsharded, "0", topodatapb.TabletType_MASTER, true, 1, nil) + getSandbox(KsTestUnsharded).VSchema = unshardedVSchema + + executor = NewExecutor(context.Background(), serv, cell, "", resolver, false, testBufferSize, testCacheSize, false) + return executor, sbc1, sbc2, sbclookup +} + func executorExec(executor *Executor, sql string, bv map[string]*querypb.BindVariable) (*sqltypes.Result, error) { return executor.Execute( context.Background(), diff --git a/go/vt/vtgate/executor_select_test.go b/go/vt/vtgate/executor_select_test.go index 824e2f13c4d..e0b2e945e32 100644 --- a/go/vt/vtgate/executor_select_test.go +++ b/go/vt/vtgate/executor_select_test.go @@ -583,6 +583,26 @@ func TestStreamSelectEqual(t *testing.T) { } } +func TestSelectKeyRange(t *testing.T) { + executor, sbc1, sbc2, _ := createExecutorEnv() + + _, err := executorExec(executor, "select id, krcol from keyrange_table where krcol = 1", nil) + if err != nil { + t.Error(err) + } + wantQueries := []*querypb.BoundQuery{{ + Sql: "select id, krcol from keyrange_table where krcol = 1", + BindVariables: map[string]*querypb.BindVariable{}, + }} + if !reflect.DeepEqual(sbc1.Queries, wantQueries) { + t.Errorf("sbc1.Queries: %+v, want %+v\n", sbc1.Queries, wantQueries) + } + if sbc2.Queries != nil { + t.Errorf("sbc2.Queries: %+v, want nil\n", sbc2.Queries) + } + sbc1.Queries = nil +} + func TestSelectEqualFail(t *testing.T) { executor, _, _, sbclookup := createExecutorEnv() s := getSandbox("TestExecutor") diff --git a/go/vt/vtgate/executor_test.go b/go/vt/vtgate/executor_test.go index 6c0009e27d0..2dca883a02f 100644 --- a/go/vt/vtgate/executor_test.go +++ b/go/vt/vtgate/executor_test.go @@ -580,11 +580,12 @@ func TestExecutorShow(t *testing.T) { buildVarCharRow("TestExecutor", "idx_noauto", "hash", "", "noauto_table"), buildVarCharRow("TestExecutor", "insert_ignore_idx", "lookup_hash", "from=fromcol; table=ins_lookup; to=tocol", "insert_ignore_test"), buildVarCharRow("TestExecutor", "keyspace_id", "numeric", "", ""), + buildVarCharRow("TestExecutor", "krcol_vdx", "keyrange_lookuper", "", ""), buildVarCharRow("TestExecutor", "music_user_map", "lookup_hash_unique", "from=music_id; table=music_user_map; to=user_id", "music"), buildVarCharRow("TestExecutor", "name_lastname_keyspace_id_map", "lookup", "from=name,lastname; table=name_lastname_keyspace_id_map; to=keyspace_id", "user2"), buildVarCharRow("TestExecutor", "name_user_map", "lookup_hash", "from=name; table=name_user_map; to=user_id", "user"), }, - RowsAffected: 8, + RowsAffected: 9, } if !reflect.DeepEqual(qr, wantqr) { t.Errorf("show vindexes:\n%+v, want\n%+v", qr, wantqr) diff --git a/go/vt/vtgate/planbuilder/dml.go b/go/vt/vtgate/planbuilder/dml.go index 3e33533d435..02d87b48079 100644 --- a/go/vt/vtgate/planbuilder/dml.go +++ b/go/vt/vtgate/planbuilder/dml.go @@ -105,37 +105,46 @@ func generateQuery(statement sqlparser.Statement) string { func buildChangedVindexesValues(route *engine.Route, update *sqlparser.Update, colVindexes []*vindexes.ColumnVindex) (map[string][]sqltypes.PlanValue, error) { changedVindexes := make(map[string][]sqltypes.PlanValue) for i, vindex := range colVindexes { - for _, assignment := range update.Exprs { - for _, vcol := range vindex.Columns { - if vcol.Equal(assignment.Name.Name) { - pv, err := extractValueFromUpdate(assignment, vcol) - if err != nil { - return changedVindexes, err - } - changedVindexes[vindex.Name] = append(changedVindexes[vindex.Name], pv) + var vindexValues []sqltypes.PlanValue + for _, vcol := range vindex.Columns { + // Searching in order of columns in colvindex. + found := false + for _, assignment := range update.Exprs { + if !vcol.Equal(assignment.Name.Name) { + continue } + if found { + return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "column has duplicate set values: '%v'", assignment.Name.Name) + } + found = true + pv, err := extractValueFromUpdate(assignment, vcol) + if err != nil { + return nil, err + } + vindexValues = append(vindexValues, pv) } } - if len(changedVindexes[vindex.Name]) == 0 { + if len(vindexValues) == 0 { // Vindex not changing, continue continue } - if len(changedVindexes[vindex.Name]) != len(vindex.Columns) { - return changedVindexes, vterrors.Errorf(vtrpcpb.Code_UNIMPLEMENTED, "unsupported: update does not have values for all the columns in vindex (%s)", vindex.Name) + if len(vindexValues) != len(vindex.Columns) { + return nil, vterrors.Errorf(vtrpcpb.Code_UNIMPLEMENTED, "unsupported: update does not have values for all the columns in vindex (%s)", vindex.Name) } if update.Limit != nil && len(update.OrderBy) == 0 { - return changedVindexes, vterrors.Errorf(vtrpcpb.Code_UNIMPLEMENTED, "unsupported: Need to provide order by clause when using limit. Invalid update on vindex: %v", vindex.Name) + return nil, vterrors.Errorf(vtrpcpb.Code_UNIMPLEMENTED, "unsupported: Need to provide order by clause when using limit. Invalid update on vindex: %v", vindex.Name) } if i == 0 { - return changedVindexes, vterrors.Errorf(vtrpcpb.Code_UNIMPLEMENTED, "unsupported: You can't update primary vindex columns. Invalid update on vindex: %v", vindex.Name) + return nil, vterrors.Errorf(vtrpcpb.Code_UNIMPLEMENTED, "unsupported: You can't update primary vindex columns. Invalid update on vindex: %v", vindex.Name) } if _, ok := vindex.Vindex.(vindexes.Lookup); !ok { - return changedVindexes, vterrors.Errorf(vtrpcpb.Code_UNIMPLEMENTED, "unsupported: You can only update lookup vindexes. Invalid update on vindex: %v", vindex.Name) + return nil, vterrors.Errorf(vtrpcpb.Code_UNIMPLEMENTED, "unsupported: You can only update lookup vindexes. Invalid update on vindex: %v", vindex.Name) } if !vindex.Owned { - return changedVindexes, vterrors.Errorf(vtrpcpb.Code_UNIMPLEMENTED, "unsupported: You can only update owned vindexes. Invalid update on vindex: %v", vindex.Name) + return nil, vterrors.Errorf(vtrpcpb.Code_UNIMPLEMENTED, "unsupported: You can only update owned vindexes. Invalid update on vindex: %v", vindex.Name) } + changedVindexes[vindex.Name] = vindexValues } return changedVindexes, nil diff --git a/go/vt/vtgate/planbuilder/plan_test.go b/go/vt/vtgate/planbuilder/plan_test.go index e4b29dca02b..7893a2582be 100644 --- a/go/vt/vtgate/planbuilder/plan_test.go +++ b/go/vt/vtgate/planbuilder/plan_test.go @@ -48,6 +48,8 @@ func newHashIndex(name string, _ map[string]string) (vindexes.Vindex, error) { return &hashIndex{name: name}, nil } +var _ vindexes.Unique = (*hashIndex)(nil) + // lookupIndex satisfies Lookup, Unique. type lookupIndex struct{ name string } @@ -59,11 +61,17 @@ func (*lookupIndex) Verify(vindexes.VCursor, []sqltypes.Value, [][]byte) ([]bool func (*lookupIndex) Map(vindexes.VCursor, []sqltypes.Value) ([][]byte, error) { return nil, nil } func (*lookupIndex) Create(vindexes.VCursor, [][]sqltypes.Value, [][]byte, bool) error { return nil } func (*lookupIndex) Delete(vindexes.VCursor, [][]sqltypes.Value, []byte) error { return nil } +func (*lookupIndex) Update(vindexes.VCursor, []sqltypes.Value, []byte, []sqltypes.Value) error { + return nil +} func newLookupIndex(name string, _ map[string]string) (vindexes.Vindex, error) { return &lookupIndex{name: name}, nil } +var _ vindexes.Unique = (*lookupIndex)(nil) +var _ vindexes.Lookup = (*lookupIndex)(nil) + // multiIndex satisfies Lookup, NonUnique. type multiIndex struct{ name string } @@ -72,14 +80,20 @@ func (*multiIndex) Cost() int { return 3 } func (*multiIndex) Verify(vindexes.VCursor, []sqltypes.Value, [][]byte) ([]bool, error) { return []bool{}, nil } -func (*multiIndex) Map(vindexes.VCursor, []sqltypes.Value) ([][][]byte, error) { return nil, nil } +func (*multiIndex) Map(vindexes.VCursor, []sqltypes.Value) ([]vindexes.Ksids, error) { return nil, nil } func (*multiIndex) Create(vindexes.VCursor, [][]sqltypes.Value, [][]byte, bool) error { return nil } func (*multiIndex) Delete(vindexes.VCursor, [][]sqltypes.Value, []byte) error { return nil } +func (*multiIndex) Update(vindexes.VCursor, []sqltypes.Value, []byte, []sqltypes.Value) error { + return nil +} func newMultiIndex(name string, _ map[string]string) (vindexes.Vindex, error) { return &multiIndex{name: name}, nil } +var _ vindexes.NonUnique = (*multiIndex)(nil) +var _ vindexes.Lookup = (*multiIndex)(nil) + // costlyIndex satisfies Lookup, NonUnique. type costlyIndex struct{ name string } @@ -88,14 +102,20 @@ func (*costlyIndex) Cost() int { return 10 } func (*costlyIndex) Verify(vindexes.VCursor, []sqltypes.Value, [][]byte) ([]bool, error) { return []bool{}, nil } -func (*costlyIndex) Map(vindexes.VCursor, []sqltypes.Value) ([][][]byte, error) { return nil, nil } +func (*costlyIndex) Map(vindexes.VCursor, []sqltypes.Value) ([]vindexes.Ksids, error) { return nil, nil } func (*costlyIndex) Create(vindexes.VCursor, [][]sqltypes.Value, [][]byte, bool) error { return nil } func (*costlyIndex) Delete(vindexes.VCursor, [][]sqltypes.Value, []byte) error { return nil } +func (*costlyIndex) Update(vindexes.VCursor, []sqltypes.Value, []byte, []sqltypes.Value) error { + return nil +} func newCostlyIndex(name string, _ map[string]string) (vindexes.Vindex, error) { return &costlyIndex{name: name}, nil } +var _ vindexes.NonUnique = (*costlyIndex)(nil) +var _ vindexes.Lookup = (*costlyIndex)(nil) + func init() { vindexes.Register("hash_test", newHashIndex) vindexes.Register("lookup_test", newLookupIndex) diff --git a/go/vt/vtgate/planbuilder/vindex_func.go b/go/vt/vtgate/planbuilder/vindex_func.go index 3d03ad269fa..9aaa6fd3b00 100644 --- a/go/vt/vtgate/planbuilder/vindex_func.go +++ b/go/vt/vtgate/planbuilder/vindex_func.go @@ -172,6 +172,10 @@ func (vf *vindexFunc) PushSelect(expr *sqlparser.AliasedExpr, _ columnOriginator vf.eVindexFunc.Cols = append(vf.eVindexFunc.Cols, 0) case col.Name.EqualString("keyspace_id"): vf.eVindexFunc.Cols = append(vf.eVindexFunc.Cols, 1) + case col.Name.EqualString("range_start"): + vf.eVindexFunc.Cols = append(vf.eVindexFunc.Cols, 2) + case col.Name.EqualString("range_end"): + vf.eVindexFunc.Cols = append(vf.eVindexFunc.Cols, 3) default: return nil, 0, fmt.Errorf("unrecognized column %s for vindex: %s", col.Name, vf.eVindexFunc.Vindex) } diff --git a/go/vt/vtgate/vcursor_impl.go b/go/vt/vtgate/vcursor_impl.go index a2e642e22f3..d8159da7833 100644 --- a/go/vt/vtgate/vcursor_impl.go +++ b/go/vt/vtgate/vcursor_impl.go @@ -100,7 +100,7 @@ func (vc *vcursorImpl) DefaultKeyspace() (*vindexes.Keyspace, error) { return ks.Keyspace, nil } -// Execute performs a V3 level execution of the query. It does not take any routing directives. +// Execute performs a V3 level execution of the query. func (vc *vcursorImpl) Execute(method string, query string, BindVars map[string]*querypb.BindVariable, isDML bool) (*sqltypes.Result, error) { qr, err := vc.executor.Execute(vc.ctx, method, vc.safeSession, query+vc.trailingComments, BindVars) if err == nil { @@ -109,6 +109,15 @@ func (vc *vcursorImpl) Execute(method string, query string, BindVars map[string] return qr, err } +// ExecuteAutocommit performs a V3 level execution of the query in a separate autocommit session. +func (vc *vcursorImpl) ExecuteAutocommit(method string, query string, BindVars map[string]*querypb.BindVariable, isDML bool) (*sqltypes.Result, error) { + qr, err := vc.executor.Execute(vc.ctx, method, NewAutocommitSession(vc.safeSession.Session), query+vc.trailingComments, BindVars) + if err == nil { + vc.hasPartialDML = true + } + return qr, err +} + // ExecuteMultiShard executes different queries on different shards and returns the combined result. func (vc *vcursorImpl) ExecuteMultiShard(keyspace string, shardQueries map[string]*querypb.BoundQuery, isDML, canAutocommit bool) (*sqltypes.Result, error) { atomic.AddUint32(&vc.logStats.ShardQueries, uint32(len(shardQueries))) @@ -157,6 +166,21 @@ func (vc *vcursorImpl) GetShardForKeyspaceID(allShards []*topodatapb.ShardRefere return srvtopo.GetShardForKeyspaceID(allShards, keyspaceID) } +func (vc *vcursorImpl) GetShardsForKsids(allShards []*topodatapb.ShardReference, ksids vindexes.Ksids) ([]string, error) { + if ksids.Range != nil { + return srvtopo.GetShardsForKeyRange(allShards, ksids.Range), nil + } + var shards []string + for _, ksid := range ksids.IDs { + shard, err := srvtopo.GetShardForKeyspaceID(allShards, ksid) + if err != nil { + return nil, err + } + shards = append(shards, shard) + } + return shards, nil +} + func commentedShardQueries(shardQueries map[string]*querypb.BoundQuery, trailingComments string) map[string]*querypb.BoundQuery { if trailingComments == "" { return shardQueries diff --git a/go/vt/vtgate/vindexes/lookup.go b/go/vt/vtgate/vindexes/lookup.go index 549245bfdd2..86f9622720a 100644 --- a/go/vt/vtgate/vindexes/lookup.go +++ b/go/vt/vtgate/vindexes/lookup.go @@ -18,9 +18,11 @@ package vindexes import ( "encoding/json" + "errors" "fmt" "github.com/youtube/vitess/go/sqltypes" + "github.com/youtube/vitess/go/vt/proto/topodata" ) var ( @@ -38,8 +40,9 @@ func init() { // LookupNonUnique defines a vindex that uses a lookup table and create a mapping between from ids and KeyspaceId. // It's NonUnique and a Lookup. type LookupNonUnique struct { - name string - lkp lookupInternal + name string + writeOnly bool + lkp lookupInternal } // String returns the name of the vindex. @@ -53,24 +56,42 @@ func (ln *LookupNonUnique) Cost() int { } // Map returns the corresponding KeyspaceId values for the given ids. -func (ln *LookupNonUnique) Map(vcursor VCursor, ids []sqltypes.Value) ([][][]byte, error) { - out := make([][][]byte, 0, len(ids)) +func (ln *LookupNonUnique) Map(vcursor VCursor, ids []sqltypes.Value) ([]Ksids, error) { + out := make([]Ksids, 0, len(ids)) + if ln.writeOnly { + for range ids { + out = append(out, Ksids{Range: &topodata.KeyRange{}}) + } + return out, nil + } + results, err := ln.lkp.Lookup(vcursor, ids) if err != nil { return nil, err } for _, result := range results { + if len(result.Rows) == 0 { + out = append(out, Ksids{}) + continue + } ksids := make([][]byte, 0, len(result.Rows)) for _, row := range result.Rows { ksids = append(ksids, row[0].ToBytes()) } - out = append(out, ksids) + out = append(out, Ksids{IDs: ksids}) } return out, nil } // Verify returns true if ids maps to ksids. func (ln *LookupNonUnique) Verify(vcursor VCursor, ids []sqltypes.Value, ksids [][]byte) ([]bool, error) { + if ln.writeOnly { + out := make([]bool, len(ids)) + for i := range ids { + out[i] = true + } + return out, nil + } return ln.lkp.Verify(vcursor, ids, ksidsToValues(ksids)) } @@ -84,15 +105,41 @@ func (ln *LookupNonUnique) Delete(vcursor VCursor, rowsColValues [][]sqltypes.Va return ln.lkp.Delete(vcursor, rowsColValues, sqltypes.MakeTrusted(sqltypes.VarBinary, ksid)) } +// Update updates the entry in the vindex table. +func (ln *LookupNonUnique) Update(vcursor VCursor, oldValues []sqltypes.Value, ksid []byte, newValues []sqltypes.Value) error { + return ln.lkp.Update(vcursor, oldValues, sqltypes.MakeTrusted(sqltypes.VarBinary, ksid), newValues) +} + // MarshalJSON returns a JSON representation of LookupHash. func (ln *LookupNonUnique) MarshalJSON() ([]byte, error) { return json.Marshal(ln.lkp) } // NewLookup creates a LookupNonUnique vindex. +// The supplied map has the following required fields: +// table: name of the backing table. It can be qualified by the keyspace. +// from: list of columns in the table that have the 'from' values of the lookup vindex. +// to: The 'to' column name of the table. +// +// The following fields are optional: +// autocommit: setting this to "true" will cause inserts to upsert and deletes to be ignored. +// write_only: in this mode, Map functions return the full keyrange causing a full scatter. func NewLookup(name string, m map[string]string) (Vindex, error) { lookup := &LookupNonUnique{name: name} - lookup.lkp.Init(m) + + autocommit, err := boolFromMap(m, "autocommit") + if err != nil { + return nil, err + } + lookup.writeOnly, err = boolFromMap(m, "write_only") + if err != nil { + return nil, err + } + + // if autocommit is on for non-unique lookup, upsert should also be on. + if err := lookup.lkp.Init(m, autocommit, autocommit /* upsert */); err != nil { + return nil, err + } return lookup, nil } @@ -115,9 +162,32 @@ type LookupUnique struct { } // NewLookupUnique creates a LookupUnique vindex. +// The supplied map has the following required fields: +// table: name of the backing table. It can be qualified by the keyspace. +// from: list of columns in the table that have the 'from' values of the lookup vindex. +// to: The 'to' column name of the table. +// +// The following fields are optional: +// autocommit: setting this to "true" will cause deletes to be ignored. func NewLookupUnique(name string, m map[string]string) (Vindex, error) { lu := &LookupUnique{name: name} - lu.lkp.Init(m) + + autocommit, err := boolFromMap(m, "autocommit") + if err != nil { + return nil, err + } + scatter, err := boolFromMap(m, "write_only") + if err != nil { + return nil, err + } + if scatter { + return nil, errors.New("write_only cannot be true for a unique lookup vindex") + } + + // Don't allow upserts for unique vindexes. + if err := lu.lkp.Init(m, autocommit, false /* upsert */); err != nil { + return nil, err + } return lu, nil } @@ -161,6 +231,11 @@ func (lu *LookupUnique) Create(vcursor VCursor, rowsColValues [][]sqltypes.Value return lu.lkp.Create(vcursor, rowsColValues, ksidsToValues(ksids), ignoreMode) } +// Update updates the entry in the vindex table. +func (lu *LookupUnique) Update(vcursor VCursor, oldValues []sqltypes.Value, ksid []byte, newValues []sqltypes.Value) error { + return lu.lkp.Update(vcursor, oldValues, sqltypes.MakeTrusted(sqltypes.VarBinary, ksid), newValues) +} + // Delete deletes the entry from the vindex table. func (lu *LookupUnique) Delete(vcursor VCursor, rowsColValues [][]sqltypes.Value, ksid []byte) error { return lu.lkp.Delete(vcursor, rowsColValues, sqltypes.MakeTrusted(sqltypes.VarBinary, ksid)) diff --git a/go/vt/vtgate/vindexes/lookup_hash.go b/go/vt/vtgate/vindexes/lookup_hash.go index ea567d7056d..26797c5eea3 100644 --- a/go/vt/vtgate/vindexes/lookup_hash.go +++ b/go/vt/vtgate/vindexes/lookup_hash.go @@ -18,9 +18,11 @@ package vindexes import ( "encoding/json" + "errors" "fmt" "github.com/youtube/vitess/go/sqltypes" + "github.com/youtube/vitess/go/vt/proto/topodata" ) var ( @@ -42,14 +44,36 @@ func init() { // NonUnique and a Lookup. // Warning: This Vindex is being depcreated in favor of Lookup type LookupHash struct { - name string - lkp lookupInternal + name string + writeOnly bool + lkp lookupInternal } // NewLookupHash creates a LookupHash vindex. +// The supplied map has the following required fields: +// table: name of the backing table. It can be qualified by the keyspace. +// from: list of columns in the table that have the 'from' values of the lookup vindex. +// to: The 'to' column name of the table. +// +// The following fields are optional: +// autocommit: setting this to "true" will cause inserts to upsert and deletes to be ignored. +// write_only: in this mode, Map functions return the full keyrange causing a full scatter. func NewLookupHash(name string, m map[string]string) (Vindex, error) { lh := &LookupHash{name: name} - lh.lkp.Init(m) + + autocommit, err := boolFromMap(m, "autocommit") + if err != nil { + return nil, err + } + lh.writeOnly, err = boolFromMap(m, "write_only") + if err != nil { + return nil, err + } + + // if autocommit is on for non-unique lookup, upsert should also be on. + if err := lh.lkp.Init(m, autocommit, autocommit /* upsert */); err != nil { + return nil, err + } return lh, nil } @@ -64,13 +88,24 @@ func (lh *LookupHash) Cost() int { } // Map returns the corresponding KeyspaceId values for the given ids. -func (lh *LookupHash) Map(vcursor VCursor, ids []sqltypes.Value) ([][][]byte, error) { - out := make([][][]byte, 0, len(ids)) +func (lh *LookupHash) Map(vcursor VCursor, ids []sqltypes.Value) ([]Ksids, error) { + out := make([]Ksids, 0, len(ids)) + if lh.writeOnly { + for range ids { + out = append(out, Ksids{Range: &topodata.KeyRange{}}) + } + return out, nil + } + results, err := lh.lkp.Lookup(vcursor, ids) if err != nil { return nil, err } for _, result := range results { + if len(result.Rows) == 0 { + out = append(out, Ksids{}) + continue + } ksids := make([][]byte, 0, len(result.Rows)) for _, row := range result.Rows { num, err := sqltypes.ToUint64(row[0]) @@ -81,13 +116,20 @@ func (lh *LookupHash) Map(vcursor VCursor, ids []sqltypes.Value) ([][][]byte, er } ksids = append(ksids, vhash(num)) } - out = append(out, ksids) + out = append(out, Ksids{IDs: ksids}) } return out, nil } // Verify returns true if ids maps to ksids. func (lh *LookupHash) Verify(vcursor VCursor, ids []sqltypes.Value, ksids [][]byte) ([]bool, error) { + if lh.writeOnly { + out := make([]bool, len(ids)) + for i := range ids { + out[i] = true + } + return out, nil + } values, err := unhashList(ksids) if err != nil { return nil, fmt.Errorf("lookup.Verify.vunhash: %v", err) @@ -104,6 +146,15 @@ func (lh *LookupHash) Create(vcursor VCursor, rowsColValues [][]sqltypes.Value, return lh.lkp.Create(vcursor, rowsColValues, values, ignoreMode) } +// Update updates the entry in the vindex table. +func (lh *LookupHash) Update(vcursor VCursor, oldValues []sqltypes.Value, ksid []byte, newValues []sqltypes.Value) error { + v, err := vunhash(ksid) + if err != nil { + return fmt.Errorf("lookup.Update.vunhash: %v", err) + } + return lh.lkp.Update(vcursor, oldValues, sqltypes.NewUint64(v), newValues) +} + // Delete deletes the entry from the vindex table. func (lh *LookupHash) Delete(vcursor VCursor, rowsColValues [][]sqltypes.Value, ksid []byte) error { v, err := vunhash(ksid) @@ -143,9 +194,32 @@ type LookupHashUnique struct { } // NewLookupHashUnique creates a LookupHashUnique vindex. +// The supplied map has the following required fields: +// table: name of the backing table. It can be qualified by the keyspace. +// from: list of columns in the table that have the 'from' values of the lookup vindex. +// to: The 'to' column name of the table. +// +// The following fields are optional: +// autocommit: setting this to "true" will cause deletes to be ignored. func NewLookupHashUnique(name string, m map[string]string) (Vindex, error) { lhu := &LookupHashUnique{name: name} - lhu.lkp.Init(m) + + autocommit, err := boolFromMap(m, "autocommit") + if err != nil { + return nil, err + } + scatter, err := boolFromMap(m, "write_only") + if err != nil { + return nil, err + } + if scatter { + return nil, errors.New("write_only cannot be true for a unique lookup vindex") + } + + // Don't allow upserts for unique vindexes. + if err := lhu.lkp.Init(m, autocommit, false /* upsert */); err != nil { + return nil, err + } return lhu, nil } @@ -211,6 +285,15 @@ func (lhu *LookupHashUnique) Delete(vcursor VCursor, rowsColValues [][]sqltypes. return lhu.lkp.Delete(vcursor, rowsColValues, sqltypes.NewUint64(v)) } +// Update updates the entry in the vindex table. +func (lhu *LookupHashUnique) Update(vcursor VCursor, oldValues []sqltypes.Value, ksid []byte, newValues []sqltypes.Value) error { + v, err := vunhash(ksid) + if err != nil { + return fmt.Errorf("lookup.Update.vunhash: %v", err) + } + return lhu.lkp.Update(vcursor, oldValues, sqltypes.NewUint64(v), newValues) +} + // MarshalJSON returns a JSON representation of LookupHashUnique. func (lhu *LookupHashUnique) MarshalJSON() ([]byte, error) { return json.Marshal(lhu.lkp) diff --git a/go/vt/vtgate/vindexes/lookup_hash_test.go b/go/vt/vtgate/vindexes/lookup_hash_test.go index 56c24cfaed3..0d43034c648 100644 --- a/go/vt/vtgate/vindexes/lookup_hash_test.go +++ b/go/vt/vtgate/vindexes/lookup_hash_test.go @@ -22,25 +22,36 @@ import ( "testing" "github.com/youtube/vitess/go/sqltypes" + topodatapb "github.com/youtube/vitess/go/vt/proto/topodata" ) -var lookuphash Vindex -var lookuphashunique Vindex +func TestLookupHashNew(t *testing.T) { + l := createLookup(t, "lookup_hash", false) + if want, got := l.(*LookupHash).writeOnly, false; got != want { + t.Errorf("Create(lookup, false): %v, want %v", got, want) + } -func init() { - lh, err := CreateVindex("lookup_hash", "lookup_hash", map[string]string{"table": "t", "from": "fromc", "to": "toc"}) - if err != nil { - panic(err) + l = createLookup(t, "lookup_hash", true) + if want, got := l.(*LookupHash).writeOnly, true; got != want { + t.Errorf("Create(lookup, false): %v, want %v", got, want) } - lu, err := CreateVindex("lookup_hash_unique", "lookup_hash_unique", map[string]string{"table": "t", "from": "fromc", "to": "toc"}) - if err != nil { - panic(err) + + l, err := CreateVindex("lookup_hash", "lookup_hash", map[string]string{ + "table": "t", + "from": "fromc", + "to": "toc", + "write_only": "invalid", + }) + want := "write_only value must be 'true' or 'false': 'invalid'" + if err == nil || err.Error() != want { + t.Errorf("Create(bad_scatter): %v, want %s", err, want) } - lookuphash = lh - lookuphashunique = lu } func TestLookupHashCost(t *testing.T) { + lookuphash := createLookup(t, "lookup_hash", false) + lookuphashunique := createLookup(t, "lookup_hash_unique", false) + if lookuphash.Cost() != 20 { t.Errorf("Cost(): %d, want 20", lookuphash.Cost()) } @@ -50,6 +61,9 @@ func TestLookupHashCost(t *testing.T) { } func TestLookupHashString(t *testing.T) { + lookuphash := createLookup(t, "lookup_hash", false) + lookuphashunique := createLookup(t, "lookup_hash_unique", false) + if strings.Compare("lookup_hash", lookuphash.String()) != 0 { t.Errorf("String(): %s, want lookup_hash", lookuphash.String()) } @@ -59,17 +73,23 @@ func TestLookupHashString(t *testing.T) { } func TestLookupHashMap(t *testing.T) { + lookuphash := createLookup(t, "lookup_hash", false) vc := &vcursor{numRows: 2} + got, err := lookuphash.(NonUnique).Map(vc, []sqltypes.Value{sqltypes.NewInt64(1), sqltypes.NewInt64(2)}) if err != nil { t.Error(err) } - want := [][][]byte{{ - []byte("\x16k@\xb4J\xbaK\xd6"), - []byte("\x06\xe7\xea\"Βp\x8f"), + want := []Ksids{{ + IDs: [][]byte{ + []byte("\x16k@\xb4J\xbaK\xd6"), + []byte("\x06\xe7\xea\"Βp\x8f"), + }, }, { - []byte("\x16k@\xb4J\xbaK\xd6"), - []byte("\x06\xe7\xea\"Βp\x8f"), + IDs: [][]byte{ + []byte("\x16k@\xb4J\xbaK\xd6"), + []byte("\x06\xe7\xea\"Βp\x8f"), + }, }} if !reflect.DeepEqual(got, want) { t.Errorf("Map(): %#v, want %+v", got, want) @@ -84,7 +104,7 @@ func TestLookupHashMap(t *testing.T) { if err != nil { t.Error(err) } - want = [][][]byte{{}} + want = []Ksids{{IDs: [][]byte{}}} if !reflect.DeepEqual(got, want) { t.Errorf("Map(): %#v, want %#v", got, want) } @@ -99,8 +119,39 @@ func TestLookupHashMap(t *testing.T) { vc.mustFail = false } +func TestLookupHashMapAbsent(t *testing.T) { + lookuphash := createLookup(t, "lookup_hash", false) + vc := &vcursor{numRows: 0} + + got, err := lookuphash.(NonUnique).Map(vc, []sqltypes.Value{sqltypes.NewInt64(1), sqltypes.NewInt64(2)}) + if err != nil { + t.Error(err) + } + want := []Ksids{{}, {}} + if !reflect.DeepEqual(got, want) { + t.Errorf("Map(): %#v, want %+v", got, want) + } + + // writeOnly true should return full keyranges. + lookuphash = createLookup(t, "lookup_hash", true) + got, err = lookuphash.(NonUnique).Map(vc, []sqltypes.Value{sqltypes.NewInt64(1), sqltypes.NewInt64(2)}) + if err != nil { + t.Error(err) + } + want = []Ksids{{ + Range: &topodatapb.KeyRange{}, + }, { + Range: &topodatapb.KeyRange{}, + }} + if !reflect.DeepEqual(got, want) { + t.Errorf("Map(): %#v, want %+v", got, want) + } +} + func TestLookupHashVerify(t *testing.T) { + lookuphash := createLookup(t, "lookup_hash", false) vc := &vcursor{numRows: 1} + // The check doesn't actually happen. But we give correct values // to avoid confusion. got, err := lookuphash.Verify(vc, @@ -129,11 +180,29 @@ func TestLookupHashVerify(t *testing.T) { if err == nil || err.Error() != wantErr { t.Errorf("lookuphash.Verify(bogus) err: %v, want %s", err, wantErr) } + + // writeOnly true should always yield true. + lookuphash = createLookup(t, "lookup_hash", true) + vc.queries = nil + + got, err = lookuphash.Verify(vc, []sqltypes.Value{sqltypes.NewInt64(1), sqltypes.NewInt64(2)}, [][]byte{[]byte(""), []byte("")}) + if err != nil { + t.Error(err) + } + if vc.queries != nil { + t.Errorf("lookuphash.Verify(scatter), queries: %v, want nil", vc.queries) + } + wantBools := []bool{true, true} + if !reflect.DeepEqual(got, wantBools) { + t.Errorf("lookuphash.Verify(scatter): %v, want %v", got, wantBools) + } } func TestLookupHashCreate(t *testing.T) { + lookuphash := createLookup(t, "lookup_hash", false) vc := &vcursor{} - err := lookuphash.(Lookup).Create(vc, [][]sqltypes.Value{[]sqltypes.Value{sqltypes.NewInt64(1)}}, [][]byte{[]byte("\x16k@\xb4J\xbaK\xd6")}, false /* ignoreMode */) + + err := lookuphash.(Lookup).Create(vc, [][]sqltypes.Value{{sqltypes.NewInt64(1)}}, [][]byte{[]byte("\x16k@\xb4J\xbaK\xd6")}, false /* ignoreMode */) if err != nil { t.Error(err) } @@ -141,7 +210,7 @@ func TestLookupHashCreate(t *testing.T) { t.Errorf("vc.queries length: %v, want %v", got, want) } - err = lookuphash.(Lookup).Create(vc, [][]sqltypes.Value{[]sqltypes.Value{sqltypes.NewInt64(1)}}, [][]byte{[]byte("bogus")}, false /* ignoreMode */) + err = lookuphash.(Lookup).Create(vc, [][]sqltypes.Value{{sqltypes.NewInt64(1)}}, [][]byte{[]byte("bogus")}, false /* ignoreMode */) want := "lookup.Create.vunhash: invalid keyspace id: 626f677573" if err == nil || err.Error() != want { t.Errorf("lookuphash.Create(bogus) err: %v, want %s", err, want) @@ -149,8 +218,10 @@ func TestLookupHashCreate(t *testing.T) { } func TestLookupHashDelete(t *testing.T) { + lookuphash := createLookup(t, "lookup_hash", false) vc := &vcursor{} - err := lookuphash.(Lookup).Delete(vc, [][]sqltypes.Value{[]sqltypes.Value{sqltypes.NewInt64(1)}}, []byte("\x16k@\xb4J\xbaK\xd6")) + + err := lookuphash.(Lookup).Delete(vc, [][]sqltypes.Value{{sqltypes.NewInt64(1)}}, []byte("\x16k@\xb4J\xbaK\xd6")) if err != nil { t.Error(err) } @@ -158,9 +229,22 @@ func TestLookupHashDelete(t *testing.T) { t.Errorf("vc.queries length: %v, want %v", got, want) } - err = lookuphash.(Lookup).Delete(vc, [][]sqltypes.Value{[]sqltypes.Value{sqltypes.NewInt64(1)}}, []byte("bogus")) + err = lookuphash.(Lookup).Delete(vc, [][]sqltypes.Value{{sqltypes.NewInt64(1)}}, []byte("bogus")) want := "lookup.Delete.vunhash: invalid keyspace id: 626f677573" if err == nil || err.Error() != want { t.Errorf("lookuphash.Delete(bogus) err: %v, want %s", err, want) } } + +func TestLookupHashUpdate(t *testing.T) { + lookuphash := createLookup(t, "lookup_hash", false) + vc := &vcursor{} + + err := lookuphash.(Lookup).Update(vc, []sqltypes.Value{sqltypes.NewInt64(1)}, []byte("\x16k@\xb4J\xbaK\xd6"), []sqltypes.Value{sqltypes.NewInt64(2)}) + if err != nil { + t.Error(err) + } + if got, want := len(vc.queries), 2; got != want { + t.Errorf("vc.queries length: %v, want %v", got, want) + } +} diff --git a/go/vt/vtgate/vindexes/lookup_hash_unique_test.go b/go/vt/vtgate/vindexes/lookup_hash_unique_test.go index 7a6b4a9cef7..80d900f7ee1 100644 --- a/go/vt/vtgate/vindexes/lookup_hash_unique_test.go +++ b/go/vt/vtgate/vindexes/lookup_hash_unique_test.go @@ -23,24 +23,43 @@ import ( "github.com/youtube/vitess/go/sqltypes" ) -var lhu Vindex +func TestLookupHashUniqueNew(t *testing.T) { + _ = createLookup(t, "lookup_hash_unique", false) + + _, err := CreateVindex("lookup_hash_unique", "lookup_hash_unique", map[string]string{ + "table": "t", + "from": "fromc", + "to": "toc", + "write_only": "true", + }) + want := "write_only cannot be true for a unique lookup vindex" + if err == nil || err.Error() != want { + t.Errorf("Create(bad_scatter): %v, want %s", err, want) + } -func init() { - h, err := CreateVindex("lookup_hash_unique", "nn", map[string]string{"table": "t", "from": "fromc", "to": "toc"}) - if err != nil { - panic(err) + _, err = CreateVindex("lookup_hash_unique", "lookup_hash_unique", map[string]string{ + "table": "t", + "from": "fromc", + "to": "toc", + "write_only": "invalid", + }) + want = "write_only value must be 'true' or 'false': 'invalid'" + if err == nil || err.Error() != want { + t.Errorf("Create(bad_scatter): %v, want %s", err, want) } - lhu = h } func TestLookupHashUniqueCost(t *testing.T) { + lhu := createLookup(t, "lookup_hash_unique", false) if lhu.Cost() != 10 { t.Errorf("Cost(): %d, want 10", lhu.Cost()) } } func TestLookupHashUniqueMap(t *testing.T) { + lhu := createLookup(t, "lookup_hash_unique", false) vc := &vcursor{numRows: 1} + got, err := lhu.(Unique).Map(vc, []sqltypes.Value{sqltypes.NewInt64(1), sqltypes.NewInt64(2)}) if err != nil { t.Error(err) @@ -95,7 +114,9 @@ func TestLookupHashUniqueMap(t *testing.T) { } func TestLookupHashUniqueVerify(t *testing.T) { + lhu := createLookup(t, "lookup_hash_unique", false) vc := &vcursor{numRows: 1} + // The check doesn't actually happen. But we give correct values // to avoid confusion. got, err := lhu.Verify(vc, @@ -127,8 +148,10 @@ func TestLookupHashUniqueVerify(t *testing.T) { } func TestLookupHashUniqueCreate(t *testing.T) { + lhu := createLookup(t, "lookup_hash_unique", false) vc := &vcursor{} - err := lhu.(Lookup).Create(vc, [][]sqltypes.Value{[]sqltypes.Value{sqltypes.NewInt64(1)}}, [][]byte{[]byte("\x16k@\xb4J\xbaK\xd6")}, false /* ignoreMode */) + + err := lhu.(Lookup).Create(vc, [][]sqltypes.Value{{sqltypes.NewInt64(1)}}, [][]byte{[]byte("\x16k@\xb4J\xbaK\xd6")}, false /* ignoreMode */) if err != nil { t.Error(err) } @@ -136,7 +159,7 @@ func TestLookupHashUniqueCreate(t *testing.T) { t.Errorf("vc.queries length: %v, want %v", got, want) } - err = lhu.(Lookup).Create(vc, [][]sqltypes.Value{[]sqltypes.Value{sqltypes.NewInt64(1)}}, [][]byte{[]byte("bogus")}, false /* ignoreMode */) + err = lhu.(Lookup).Create(vc, [][]sqltypes.Value{{sqltypes.NewInt64(1)}}, [][]byte{[]byte("bogus")}, false /* ignoreMode */) want := "lookup.Create.vunhash: invalid keyspace id: 626f677573" if err == nil || err.Error() != want { t.Errorf("lhu.Create(bogus) err: %v, want %s", err, want) @@ -144,8 +167,10 @@ func TestLookupHashUniqueCreate(t *testing.T) { } func TestLookupHashUniqueDelete(t *testing.T) { + lhu := createLookup(t, "lookup_hash_unique", false) vc := &vcursor{} - err := lhu.(Lookup).Delete(vc, [][]sqltypes.Value{[]sqltypes.Value{sqltypes.NewInt64(1)}}, []byte("\x16k@\xb4J\xbaK\xd6")) + + err := lhu.(Lookup).Delete(vc, [][]sqltypes.Value{{sqltypes.NewInt64(1)}}, []byte("\x16k@\xb4J\xbaK\xd6")) if err != nil { t.Error(err) } @@ -153,9 +178,22 @@ func TestLookupHashUniqueDelete(t *testing.T) { t.Errorf("vc.queries length: %v, want %v", got, want) } - err = lhu.(Lookup).Delete(vc, [][]sqltypes.Value{[]sqltypes.Value{sqltypes.NewInt64(1)}}, []byte("bogus")) + err = lhu.(Lookup).Delete(vc, [][]sqltypes.Value{{sqltypes.NewInt64(1)}}, []byte("bogus")) want := "lookup.Delete.vunhash: invalid keyspace id: 626f677573" if err == nil || err.Error() != want { t.Errorf("lhu.Delete(bogus) err: %v, want %s", err, want) } } + +func TestLookupHashUniqueUpdate(t *testing.T) { + lhu := createLookup(t, "lookup_hash_unique", false) + vc := &vcursor{} + + err := lhu.(Lookup).Update(vc, []sqltypes.Value{sqltypes.NewInt64(1)}, []byte("\x16k@\xb4J\xbaK\xd6"), []sqltypes.Value{sqltypes.NewInt64(2)}) + if err != nil { + t.Error(err) + } + if got, want := len(vc.queries), 2; got != want { + t.Errorf("vc.queries length: %v, want %v", got, want) + } +} diff --git a/go/vt/vtgate/vindexes/lookup_internal.go b/go/vt/vtgate/vindexes/lookup_internal.go index 80797938d77..84afb94b218 100644 --- a/go/vt/vtgate/vindexes/lookup_internal.go +++ b/go/vt/vtgate/vindexes/lookup_internal.go @@ -32,10 +32,12 @@ type lookupInternal struct { Table string `json:"table"` FromColumns []string `json:"from_columns"` To string `json:"to"` + Autocommit bool `json:"autocommit,omitempty"` + Upsert bool `json:"upsert,omitempty"` sel, ver, del string } -func (lkp *lookupInternal) Init(lookupQueryParams map[string]string) { +func (lkp *lookupInternal) Init(lookupQueryParams map[string]string, autocommit, upsert bool) error { lkp.Table = lookupQueryParams["table"] lkp.To = lookupQueryParams["to"] var fromColumns []string @@ -44,12 +46,16 @@ func (lkp *lookupInternal) Init(lookupQueryParams map[string]string) { } lkp.FromColumns = fromColumns + lkp.Autocommit = autocommit + lkp.Upsert = upsert + // TODO @rafael: update sel and ver to support multi column vindexes. This will be done // as part of face 2 of https://github.com/youtube/vitess/issues/3481 // For now multi column behaves as a single column for Map and Verify operations lkp.sel = fmt.Sprintf("select %s from %s where %s = :%s", lkp.To, lkp.Table, lkp.FromColumns[0], lkp.FromColumns[0]) lkp.ver = fmt.Sprintf("select %s from %s where %s = :%s and %s = :%s", lkp.FromColumns[0], lkp.Table, lkp.FromColumns[0], lkp.FromColumns[0], lkp.To, lkp.To) - lkp.del = lkp.initDelStm() + lkp.del = lkp.initDelStmt() + return nil } // Lookup performs a lookup for the ids. @@ -59,7 +65,13 @@ func (lkp *lookupInternal) Lookup(vcursor VCursor, ids []sqltypes.Value) ([]*sql bindVars := map[string]*querypb.BindVariable{ lkp.FromColumns[0]: sqltypes.ValueBindVariable(id), } - result, err := vcursor.Execute("VindexLookup", lkp.sel, bindVars, false /* isDML */) + var err error + var result *sqltypes.Result + if lkp.Autocommit { + result, err = vcursor.ExecuteAutocommit("VindexLookup", lkp.sel, bindVars, false /* isDML */) + } else { + result, err = vcursor.Execute("VindexLookup", lkp.sel, bindVars, false /* isDML */) + } if err != nil { return nil, fmt.Errorf("lookup.Map: %v", err) } @@ -76,7 +88,13 @@ func (lkp *lookupInternal) Verify(vcursor VCursor, ids, values []sqltypes.Value) lkp.FromColumns[0]: sqltypes.ValueBindVariable(id), lkp.To: sqltypes.ValueBindVariable(values[i]), } - result, err := vcursor.Execute("VindexVerify", lkp.ver, bindVars, true /* isDML */) + var err error + var result *sqltypes.Result + if lkp.Autocommit { + result, err = vcursor.ExecuteAutocommit("VindexVerify", lkp.ver, bindVars, true /* isDML */) + } else { + result, err = vcursor.Execute("VindexVerify", lkp.ver, bindVars, true /* isDML */) + } if err != nil { return nil, fmt.Errorf("lookup.Verify: %v", err) } @@ -91,43 +109,56 @@ func (lkp *lookupInternal) Verify(vcursor VCursor, ids, values []sqltypes.Value) // toValues contains the keyspace_id of each row being inserted. // Given a vindex with two columns and the following insert: // -// INSERT INTO table_a (colum_a, column_b, column_c) VALUES (value_a1, value_b1, value_c1), (value_a2, value_b2, value_c2); +// INSERT INTO table_a (colum_a, column_b, column_c) VALUES (value_a0, value_b0, value_c0), (value_a1, value_b1, value_c1); // If we assume that the primary vindex is on column_c. The call to create will look like this: -// Create(vcursor, [[value_a1, value_b1,], [value_a2, value_b2]], [binary(value_c1), binary(value_c2)]) +// Create(vcursor, [[value_a0, value_b0,], [value_a1, value_b1]], [binary(value_c0), binary(value_c1)]) // Notice that toValues contains the computed binary value of the keyspace_id. func (lkp *lookupInternal) Create(vcursor VCursor, rowsColValues [][]sqltypes.Value, toValues []sqltypes.Value, ignoreMode bool) error { - var insBuffer bytes.Buffer + buf := new(bytes.Buffer) if ignoreMode { - fmt.Fprintf(&insBuffer, "insert ignore into %s(", lkp.Table) + fmt.Fprintf(buf, "insert ignore into %s(", lkp.Table) } else { - fmt.Fprintf(&insBuffer, "insert into %s(", lkp.Table) + fmt.Fprintf(buf, "insert into %s(", lkp.Table) } for _, col := range lkp.FromColumns { - fmt.Fprintf(&insBuffer, "%s, ", col) - + fmt.Fprintf(buf, "%s, ", col) } + fmt.Fprintf(buf, "%s) values(", lkp.To) - fmt.Fprintf(&insBuffer, "%s) values(", lkp.To) bindVars := make(map[string]*querypb.BindVariable, 2*len(rowsColValues)) for rowIdx := range toValues { colIds := rowsColValues[rowIdx] if rowIdx != 0 { - insBuffer.WriteString(", (") + buf.WriteString(", (") } for colIdx, colID := range colIds { fromStr := lkp.FromColumns[colIdx] + strconv.Itoa(rowIdx) bindVars[fromStr] = sqltypes.ValueBindVariable(colID) - insBuffer.WriteString(":" + fromStr + ", ") + buf.WriteString(":" + fromStr + ", ") } toStr := lkp.To + strconv.Itoa(rowIdx) - insBuffer.WriteString(":" + toStr + ")") + buf.WriteString(":" + toStr + ")") bindVars[toStr] = sqltypes.ValueBindVariable(toValues[rowIdx]) } - _, err := vcursor.Execute("VindexCreate", insBuffer.String(), bindVars, true /* isDML */) + + if lkp.Upsert { + fmt.Fprintf(buf, " on duplicate key update ") + for _, col := range lkp.FromColumns { + fmt.Fprintf(buf, "%s=values(%s), ", col, col) + } + fmt.Fprintf(buf, "%s=values(%s)", lkp.To, lkp.To) + } + + var err error + if lkp.Autocommit { + _, err = vcursor.ExecuteAutocommit("VindexCreate", buf.String(), bindVars, true /* isDML */) + } else { + _, err = vcursor.Execute("VindexCreate", buf.String(), bindVars, true /* isDML */) + } if err != nil { return fmt.Errorf("lookup.Create: %v", err) } - return err + return nil } // Delete deletes the association between ids and value. @@ -137,7 +168,7 @@ func (lkp *lookupInternal) Create(vcursor VCursor, rowsColValues [][]sqltypes.Va // // Given the following information in a vindex table with two columns: // -// +------------------+-----------+--------+ +// +------------------+-----------+--------+ // | hex(keyspace_id) | a | b | // +------------------+-----------+--------+ // | 52CB7B1B31B2222E | valuea | valueb | @@ -146,6 +177,10 @@ func (lkp *lookupInternal) Create(vcursor VCursor, rowsColValues [][]sqltypes.Va // A call to Delete would look like this: // Delete(vcursor, [[valuea, valueb]], 52CB7B1B31B2222E) func (lkp *lookupInternal) Delete(vcursor VCursor, rowsColValues [][]sqltypes.Value, value sqltypes.Value) error { + // In autocommit mode, it's not safe to delete. So, it's a no-op. + if lkp.Autocommit { + return nil + } for _, column := range rowsColValues { bindVars := make(map[string]*querypb.BindVariable, len(rowsColValues)) for colIdx, columnValue := range column { @@ -160,7 +195,15 @@ func (lkp *lookupInternal) Delete(vcursor VCursor, rowsColValues [][]sqltypes.Va return nil } -func (lkp *lookupInternal) initDelStm() string { +// Update implements the update functionality. +func (lkp *lookupInternal) Update(vcursor VCursor, oldValues []sqltypes.Value, ksid sqltypes.Value, newValues []sqltypes.Value) error { + if err := lkp.Delete(vcursor, [][]sqltypes.Value{oldValues}, ksid); err != nil { + return err + } + return lkp.Create(vcursor, [][]sqltypes.Value{newValues}, []sqltypes.Value{ksid}, false /* ignoreMode */) +} + +func (lkp *lookupInternal) initDelStmt() string { var delBuffer bytes.Buffer fmt.Fprintf(&delBuffer, "delete from %s where ", lkp.Table) for colIdx, column := range lkp.FromColumns { @@ -172,3 +215,18 @@ func (lkp *lookupInternal) initDelStm() string { delBuffer.WriteString(" and " + lkp.To + " = :" + lkp.To) return delBuffer.String() } + +func boolFromMap(m map[string]string, key string) (bool, error) { + val, ok := m[key] + if !ok { + return false, nil + } + switch val { + case "true": + return true, nil + case "false": + return false, nil + default: + return false, fmt.Errorf("%s value must be 'true' or 'false': '%s'", key, val) + } +} diff --git a/go/vt/vtgate/vindexes/lookup_test.go b/go/vt/vtgate/vindexes/lookup_test.go index 3f9fca1baeb..4b72862fbf1 100644 --- a/go/vt/vtgate/vindexes/lookup_test.go +++ b/go/vt/vtgate/vindexes/lookup_test.go @@ -26,19 +26,30 @@ import ( "github.com/youtube/vitess/go/sqltypes" querypb "github.com/youtube/vitess/go/vt/proto/query" + topodatapb "github.com/youtube/vitess/go/vt/proto/topodata" ) // LookupNonUnique tests are more comprehensive than others. // They also test lookupInternal functionality. type vcursor struct { - mustFail bool - numRows int - result *sqltypes.Result - queries []*querypb.BoundQuery + mustFail bool + numRows int + result *sqltypes.Result + queries []*querypb.BoundQuery + autocommits int } func (vc *vcursor) Execute(method string, query string, bindvars map[string]*querypb.BindVariable, isDML bool) (*sqltypes.Result, error) { + return vc.execute(method, query, bindvars, isDML) +} + +func (vc *vcursor) ExecuteAutocommit(method string, query string, bindvars map[string]*querypb.BindVariable, isDML bool) (*sqltypes.Result, error) { + vc.autocommits++ + return vc.execute(method, query, bindvars, isDML) +} + +func (vc *vcursor) execute(method string, query string, bindvars map[string]*querypb.BindVariable, isDML bool) (*sqltypes.Result, error) { vc.queries = append(vc.queries, &querypb.BoundQuery{ Sql: query, BindVariables: bindvars, @@ -69,47 +80,61 @@ func (vc *vcursor) Execute(method string, query string, bindvars map[string]*que panic("unexpected") } -var lookupUnique Vindex -var lookupNonUnique Vindex - -func init() { - lkpunique, err := CreateVindex("lookup_unique", "lookupUnique", map[string]string{"table": "t", "from": "fromc", "to": "toc"}) - if err != nil { - panic(err) +func TestLookupNonUniqueNew(t *testing.T) { + l := createLookup(t, "lookup", false) + if want, got := l.(*LookupNonUnique).writeOnly, false; got != want { + t.Errorf("Create(lookup, false): %v, want %v", got, want) } - lkpnonunique, err := CreateVindex("lookup", "lookupNonUnique", map[string]string{"table": "t", "from": "fromc", "to": "toc"}) - if err != nil { - panic(err) + + l = createLookup(t, "lookup", true) + if want, got := l.(*LookupNonUnique).writeOnly, true; got != want { + t.Errorf("Create(lookup, false): %v, want %v", got, want) } - lookupUnique = lkpunique - lookupNonUnique = lkpnonunique + l, err := CreateVindex("lookup", "lookup", map[string]string{ + "table": "t", + "from": "fromc", + "to": "toc", + "write_only": "invalid", + }) + want := "write_only value must be 'true' or 'false': 'invalid'" + if err == nil || err.Error() != want { + t.Errorf("Create(bad_scatter): %v, want %s", err, want) + } } func TestLookupNonUniqueCost(t *testing.T) { + lookupNonUnique := createLookup(t, "lookup", false) if lookupNonUnique.Cost() != 20 { - t.Errorf("Cost(): %d, want 20", lookupUnique.Cost()) + t.Errorf("Cost(): %d, want 20", lookupNonUnique.Cost()) } } func TestLookupNonUniqueString(t *testing.T) { - if strings.Compare("lookupNonUnique", lookupNonUnique.String()) != 0 { - t.Errorf("String(): %s, want lookupNonUnique", lookupNonUnique.String()) + lookupNonUnique := createLookup(t, "lookup", false) + if strings.Compare("lookup", lookupNonUnique.String()) != 0 { + t.Errorf("String(): %s, want lookup", lookupNonUnique.String()) } } func TestLookupNonUniqueMap(t *testing.T) { + lookupNonUnique := createLookup(t, "lookup", false) vc := &vcursor{numRows: 2} + got, err := lookupNonUnique.(NonUnique).Map(vc, []sqltypes.Value{sqltypes.NewInt64(1), sqltypes.NewInt64(2)}) if err != nil { t.Error(err) } - want := [][][]byte{{ - []byte("1"), - []byte("2"), + want := []Ksids{{ + IDs: [][]byte{ + []byte("1"), + []byte("2"), + }, }, { - []byte("1"), - []byte("2"), + IDs: [][]byte{ + []byte("1"), + []byte("2"), + }, }} if !reflect.DeepEqual(got, want) { t.Errorf("Map(): %#v, want %+v", got, want) @@ -140,8 +165,90 @@ func TestLookupNonUniqueMap(t *testing.T) { vc.mustFail = false } +func TestLookupNonUniqueMapAutocommit(t *testing.T) { + lookupNonUnique, err := CreateVindex("lookup", "lookup", map[string]string{ + "table": "t", + "from": "fromc", + "to": "toc", + "autocommit": "true", + }) + if err != nil { + t.Fatal(err) + } + vc := &vcursor{numRows: 2} + + got, err := lookupNonUnique.(NonUnique).Map(vc, []sqltypes.Value{sqltypes.NewInt64(1), sqltypes.NewInt64(2)}) + if err != nil { + t.Error(err) + } + want := []Ksids{{ + IDs: [][]byte{ + []byte("1"), + []byte("2"), + }, + }, { + IDs: [][]byte{ + []byte("1"), + []byte("2"), + }, + }} + if !reflect.DeepEqual(got, want) { + t.Errorf("Map(): %#v, want %+v", got, want) + } + + wantqueries := []*querypb.BoundQuery{{ + Sql: "select toc from t where fromc = :fromc", + BindVariables: map[string]*querypb.BindVariable{ + "fromc": sqltypes.Int64BindVariable(1), + }, + }, { + Sql: "select toc from t where fromc = :fromc", + BindVariables: map[string]*querypb.BindVariable{ + "fromc": sqltypes.Int64BindVariable(2), + }, + }} + if !reflect.DeepEqual(vc.queries, wantqueries) { + t.Errorf("lookup.Map queries:\n%v, want\n%v", vc.queries, wantqueries) + } + + if got, want := vc.autocommits, 2; got != want { + t.Errorf("Create(autocommit) count: %d, want %d", got, want) + } +} + +func TestLookupNonUniqueMapAbsent(t *testing.T) { + lookupNonUnique := createLookup(t, "lookup", false) + vc := &vcursor{numRows: 0} + + got, err := lookupNonUnique.(NonUnique).Map(vc, []sqltypes.Value{sqltypes.NewInt64(1), sqltypes.NewInt64(2)}) + if err != nil { + t.Error(err) + } + want := []Ksids{{}, {}} + if !reflect.DeepEqual(got, want) { + t.Errorf("Map(): %#v, want %+v", got, want) + } + + // writeOnly true should return full keyranges. + lookupNonUnique = createLookup(t, "lookup", true) + got, err = lookupNonUnique.(NonUnique).Map(vc, []sqltypes.Value{sqltypes.NewInt64(1), sqltypes.NewInt64(2)}) + if err != nil { + t.Error(err) + } + want = []Ksids{{ + Range: &topodatapb.KeyRange{}, + }, { + Range: &topodatapb.KeyRange{}, + }} + if !reflect.DeepEqual(got, want) { + t.Errorf("Map(): %#v, want %+v", got, want) + } +} + func TestLookupNonUniqueVerify(t *testing.T) { + lookupNonUnique := createLookup(t, "lookup", false) vc := &vcursor{numRows: 1} + _, err := lookupNonUnique.Verify(vc, []sqltypes.Value{sqltypes.NewInt64(1), sqltypes.NewInt64(2)}, [][]byte{[]byte("test1"), []byte("test2")}) if err != nil { t.Error(err) @@ -172,11 +279,68 @@ func TestLookupNonUniqueVerify(t *testing.T) { t.Errorf("lookupNonUnique(query fail) err: %v, want %s", err, want) } vc.mustFail = false + + // writeOnly true should always yield true. + lookupNonUnique = createLookup(t, "lookup", true) + vc.queries = nil + + got, err := lookupNonUnique.Verify(vc, []sqltypes.Value{sqltypes.NewInt64(1), sqltypes.NewInt64(2)}, [][]byte{[]byte(""), []byte("")}) + if err != nil { + t.Error(err) + } + if vc.queries != nil { + t.Errorf("lookup.Verify(writeOnly), queries: %v, want nil", vc.queries) + } + wantBools := []bool{true, true} + if !reflect.DeepEqual(got, wantBools) { + t.Errorf("lookup.Verify(writeOnly): %v, want %v", got, wantBools) + } +} + +func TestLookupNonUniqueVerifyAutocommit(t *testing.T) { + lookupNonUnique, err := CreateVindex("lookup", "lookup", map[string]string{ + "table": "t", + "from": "fromc", + "to": "toc", + "autocommit": "true", + }) + if err != nil { + t.Fatal(err) + } + vc := &vcursor{numRows: 1} + + _, err = lookupNonUnique.Verify(vc, []sqltypes.Value{sqltypes.NewInt64(1), sqltypes.NewInt64(2)}, [][]byte{[]byte("test1"), []byte("test2")}) + if err != nil { + t.Error(err) + } + + wantqueries := []*querypb.BoundQuery{{ + Sql: "select fromc from t where fromc = :fromc and toc = :toc", + BindVariables: map[string]*querypb.BindVariable{ + "fromc": sqltypes.Int64BindVariable(1), + "toc": sqltypes.BytesBindVariable([]byte("test1")), + }, + }, { + Sql: "select fromc from t where fromc = :fromc and toc = :toc", + BindVariables: map[string]*querypb.BindVariable{ + "fromc": sqltypes.Int64BindVariable(2), + "toc": sqltypes.BytesBindVariable([]byte("test2")), + }, + }} + if !reflect.DeepEqual(vc.queries, wantqueries) { + t.Errorf("lookup.Verify queries:\n%v, want\n%v", vc.queries, wantqueries) + } + + if got, want := vc.autocommits, 2; got != want { + t.Errorf("Create(autocommit) count: %d, want %d", got, want) + } } func TestLookupNonUniqueCreate(t *testing.T) { + lookupNonUnique := createLookup(t, "lookup", false) vc := &vcursor{} - err := lookupNonUnique.(Lookup).Create(vc, [][]sqltypes.Value{[]sqltypes.Value{sqltypes.NewInt64(1)}, []sqltypes.Value{sqltypes.NewInt64(2)}}, [][]byte{[]byte("test1"), []byte("test2")}, false /* ignoreMode */) + + err := lookupNonUnique.(Lookup).Create(vc, [][]sqltypes.Value{{sqltypes.NewInt64(1)}, {sqltypes.NewInt64(2)}}, [][]byte{[]byte("test1"), []byte("test2")}, false /* ignoreMode */) if err != nil { t.Error(err) } @@ -196,7 +360,7 @@ func TestLookupNonUniqueCreate(t *testing.T) { // With ignore. vc.queries = nil - err = lookupNonUnique.(Lookup).Create(vc, [][]sqltypes.Value{[]sqltypes.Value{sqltypes.NewInt64(1)}, []sqltypes.Value{sqltypes.NewInt64(2)}}, [][]byte{[]byte("test1"), []byte("test2")}, true /* ignoreMode */) + err = lookupNonUnique.(Lookup).Create(vc, [][]sqltypes.Value{{sqltypes.NewInt64(1)}, {sqltypes.NewInt64(2)}}, [][]byte{[]byte("test1"), []byte("test2")}, true /* ignoreMode */) if err != nil { t.Error(err) } @@ -208,7 +372,7 @@ func TestLookupNonUniqueCreate(t *testing.T) { // Test query fail. vc.mustFail = true - err = lookupNonUnique.(Lookup).Create(vc, [][]sqltypes.Value{[]sqltypes.Value{sqltypes.NewInt64(1)}}, [][]byte{[]byte("\x16k@\xb4J\xbaK\xd6")}, false /* ignoreMode */) + err = lookupNonUnique.(Lookup).Create(vc, [][]sqltypes.Value{{sqltypes.NewInt64(1)}}, [][]byte{[]byte("\x16k@\xb4J\xbaK\xd6")}, false /* ignoreMode */) want := "lookup.Create: execute failed" if err == nil || err.Error() != want { t.Errorf("lookupNonUnique(query fail) err: %v, want %s", err, want) @@ -216,9 +380,56 @@ func TestLookupNonUniqueCreate(t *testing.T) { vc.mustFail = false } +func TestLookupNonUniqueCreateAutocommit(t *testing.T) { + lookupNonUnique, err := CreateVindex("lookup", "lookup", map[string]string{ + "table": "t", + "from": "from1,from2", + "to": "toc", + "autocommit": "true", + }) + if err != nil { + t.Fatal(err) + } + vc := &vcursor{} + + err = lookupNonUnique.(Lookup).Create( + vc, + [][]sqltypes.Value{{ + sqltypes.NewInt64(1), sqltypes.NewInt64(2), + }, { + sqltypes.NewInt64(3), sqltypes.NewInt64(4), + }}, + [][]byte{[]byte("test1"), []byte("test2")}, + false /* ignoreMode */) + if err != nil { + t.Error(err) + } + + wantqueries := []*querypb.BoundQuery{{ + Sql: "insert into t(from1, from2, toc) values(:from10, :from20, :toc0), (:from11, :from21, :toc1) on duplicate key update from1=values(from1), from2=values(from2), toc=values(toc)", + BindVariables: map[string]*querypb.BindVariable{ + "from10": sqltypes.Int64BindVariable(1), + "from20": sqltypes.Int64BindVariable(2), + "toc0": sqltypes.BytesBindVariable([]byte("test1")), + "from11": sqltypes.Int64BindVariable(3), + "from21": sqltypes.Int64BindVariable(4), + "toc1": sqltypes.BytesBindVariable([]byte("test2")), + }, + }} + if !reflect.DeepEqual(vc.queries, wantqueries) { + t.Errorf("lookup.Create queries:\n%v, want\n%v", vc.queries, wantqueries) + } + + if got, want := vc.autocommits, 1; got != want { + t.Errorf("Create(autocommit) count: %d, want %d", got, want) + } +} + func TestLookupNonUniqueDelete(t *testing.T) { + lookupNonUnique := createLookup(t, "lookup", false) vc := &vcursor{} - err := lookupNonUnique.(Lookup).Delete(vc, [][]sqltypes.Value{[]sqltypes.Value{sqltypes.NewInt64(1)}, []sqltypes.Value{sqltypes.NewInt64(2)}}, []byte("test")) + + err := lookupNonUnique.(Lookup).Delete(vc, [][]sqltypes.Value{{sqltypes.NewInt64(1)}, {sqltypes.NewInt64(2)}}, []byte("test")) if err != nil { t.Error(err) } @@ -242,10 +453,75 @@ func TestLookupNonUniqueDelete(t *testing.T) { // Test query fail. vc.mustFail = true - err = lookupNonUnique.(Lookup).Delete(vc, [][]sqltypes.Value{[]sqltypes.Value{sqltypes.NewInt64(1)}}, []byte("\x16k@\xb4J\xbaK\xd6")) + err = lookupNonUnique.(Lookup).Delete(vc, [][]sqltypes.Value{{sqltypes.NewInt64(1)}}, []byte("\x16k@\xb4J\xbaK\xd6")) want := "lookup.Delete: execute failed" if err == nil || err.Error() != want { t.Errorf("lookupNonUnique(query fail) err: %v, want %s", err, want) } vc.mustFail = false } + +func TestLookupNonUniqueDeleteAutocommit(t *testing.T) { + lookupNonUnique, err := CreateVindex("lookup", "lookup", map[string]string{ + "table": "t", + "from": "fromc", + "to": "toc", + "autocommit": "true", + }) + vc := &vcursor{} + + err = lookupNonUnique.(Lookup).Delete(vc, [][]sqltypes.Value{{sqltypes.NewInt64(1)}, {sqltypes.NewInt64(2)}}, []byte("test")) + if err != nil { + t.Error(err) + } + + wantqueries := []*querypb.BoundQuery(nil) + if !reflect.DeepEqual(vc.queries, wantqueries) { + t.Errorf("lookup.Delete queries:\n%v, want\n%v", vc.queries, wantqueries) + } +} + +func TestLookupNonUniqueUpdate(t *testing.T) { + lookupNonUnique := createLookup(t, "lookup", false) + vc := &vcursor{} + + err := lookupNonUnique.(Lookup).Update(vc, []sqltypes.Value{sqltypes.NewInt64(1)}, []byte("test"), []sqltypes.Value{sqltypes.NewInt64(2)}) + if err != nil { + t.Error(err) + } + + wantqueries := []*querypb.BoundQuery{{ + Sql: "delete from t where fromc = :fromc and toc = :toc", + BindVariables: map[string]*querypb.BindVariable{ + "fromc": sqltypes.Int64BindVariable(1), + "toc": sqltypes.BytesBindVariable([]byte("test")), + }, + }, { + Sql: "insert into t(fromc, toc) values(:fromc0, :toc0)", + BindVariables: map[string]*querypb.BindVariable{ + "fromc0": sqltypes.Int64BindVariable(2), + "toc0": sqltypes.BytesBindVariable([]byte("test")), + }, + }} + if !reflect.DeepEqual(vc.queries, wantqueries) { + t.Errorf("lookup.Update queries:\n%v, want\n%v", vc.queries, wantqueries) + } +} + +func createLookup(t *testing.T, name string, writeOnly bool) Vindex { + t.Helper() + write := "false" + if writeOnly { + write = "true" + } + l, err := CreateVindex(name, name, map[string]string{ + "table": "t", + "from": "fromc", + "to": "toc", + "write_only": write, + }) + if err != nil { + t.Fatal(err) + } + return l +} diff --git a/go/vt/vtgate/vindexes/lookup_unique_test.go b/go/vt/vtgate/vindexes/lookup_unique_test.go index a42ca51d0a2..6a81cc8a1b0 100644 --- a/go/vt/vtgate/vindexes/lookup_unique_test.go +++ b/go/vt/vtgate/vindexes/lookup_unique_test.go @@ -22,22 +22,53 @@ import ( "testing" "github.com/youtube/vitess/go/sqltypes" + querypb "github.com/youtube/vitess/go/vt/proto/query" ) +func TestLookupUniqueNew(t *testing.T) { + _ = createLookup(t, "lookup_unique", false) + + _, err := CreateVindex("lookup_unique", "lookup_unique", map[string]string{ + "table": "t", + "from": "fromc", + "to": "toc", + "write_only": "true", + }) + want := "write_only cannot be true for a unique lookup vindex" + if err == nil || err.Error() != want { + t.Errorf("Create(bad_scatter): %v, want %s", err, want) + } + + _, err = CreateVindex("lookup_unique", "lookup_unique", map[string]string{ + "table": "t", + "from": "fromc", + "to": "toc", + "write_only": "invalid", + }) + want = "write_only value must be 'true' or 'false': 'invalid'" + if err == nil || err.Error() != want { + t.Errorf("Create(bad_scatter): %v, want %s", err, want) + } +} + func TestLookupUniqueCost(t *testing.T) { + lookupUnique := createLookup(t, "lookup_unique", false) if lookupUnique.Cost() != 10 { t.Errorf("Cost(): %d, want 10", lookupUnique.Cost()) } } func TestLookupUniqueString(t *testing.T) { - if strings.Compare("lookupUnique", lookupUnique.String()) != 0 { - t.Errorf("String(): %s, want lookupUnique", lookupUnique.String()) + lookupUnique := createLookup(t, "lookup_unique", false) + if strings.Compare("lookup_unique", lookupUnique.String()) != 0 { + t.Errorf("String(): %s, want lookup_unique", lookupUnique.String()) } } func TestLookupUniqueMap(t *testing.T) { + lookupUnique := createLookup(t, "lookup_unique", false) vc := &vcursor{numRows: 1} + got, err := lookupUnique.(Unique).Map(vc, []sqltypes.Value{sqltypes.NewInt64(1), sqltypes.NewInt64(2)}) if err != nil { t.Error(err) @@ -78,7 +109,9 @@ func TestLookupUniqueMap(t *testing.T) { } func TestLookupUniqueVerify(t *testing.T) { + lookupUnique := createLookup(t, "lookup_unique", false) vc := &vcursor{numRows: 1} + _, err := lookupUnique.Verify(vc, []sqltypes.Value{sqltypes.NewInt64(1)}, [][]byte{[]byte("test")}) if err != nil { t.Error(err) @@ -86,45 +119,76 @@ func TestLookupUniqueVerify(t *testing.T) { if got, want := len(vc.queries), 1; got != want { t.Errorf("vc.queries length: %v, want %v", got, want) } +} + +func TestLookupUniqueCreate(t *testing.T) { + lookupUnique, err := CreateVindex("lookup_unique", "lookup_unique", map[string]string{ + "table": "t", + "from": "from", + "to": "toc", + "autocommit": "true", + }) + if err != nil { + t.Fatal(err) + } + vc := &vcursor{} - _, err = lookuphashunique.Verify(nil, []sqltypes.Value{sqltypes.NewInt64(1)}, [][]byte{[]byte("test1test23")}) - want := "lookup.Verify.vunhash: invalid keyspace id: 7465737431746573743233" - if err.Error() != want { + err = lookupUnique.(Lookup).Create(vc, [][]sqltypes.Value{{sqltypes.NewInt64(1)}}, [][]byte{[]byte("test")}, false /* ignoreMode */) + if err != nil { t.Error(err) } + + wantqueries := []*querypb.BoundQuery{{ + Sql: "insert into t(from, toc) values(:from0, :toc0)", + BindVariables: map[string]*querypb.BindVariable{ + "from0": sqltypes.Int64BindVariable(1), + "toc0": sqltypes.BytesBindVariable([]byte("test")), + }, + }} + if !reflect.DeepEqual(vc.queries, wantqueries) { + t.Errorf("lookup.Create queries:\n%v, want\n%v", vc.queries, wantqueries) + } + + if got, want := vc.autocommits, 1; got != want { + t.Errorf("Create(autocommit) count: %d, want %d", got, want) + } } -func TestLookupUniqueCreate(t *testing.T) { +func TestLookupUniqueCreateAutocommit(t *testing.T) { + lookupUnique := createLookup(t, "lookup_unique", false) vc := &vcursor{} - err := lookupUnique.(Lookup).Create(vc, [][]sqltypes.Value{[]sqltypes.Value{sqltypes.NewInt64(1)}}, [][]byte{[]byte("test")}, false /* ignoreMode */) + + err := lookupUnique.(Lookup).Create(vc, [][]sqltypes.Value{{sqltypes.NewInt64(1)}}, [][]byte{[]byte("test")}, false /* ignoreMode */) if err != nil { t.Error(err) } if got, want := len(vc.queries), 1; got != want { t.Errorf("vc.queries length: %v, want %v", got, want) } - - err = lookuphashunique.(Lookup).Create(nil, [][]sqltypes.Value{[]sqltypes.Value{sqltypes.NewInt64(1)}}, [][]byte{[]byte("test1test23")}, false /* ignoreMode */) - want := "lookup.Create.vunhash: invalid keyspace id: 7465737431746573743233" - if err.Error() != want { - t.Error(err) - } } func TestLookupUniqueDelete(t *testing.T) { + lookupUnique := createLookup(t, "lookup_unique", false) vc := &vcursor{} - err := lookupUnique.(Lookup).Delete(vc, [][]sqltypes.Value{[]sqltypes.Value{sqltypes.NewInt64(1)}}, []byte("test")) + + err := lookupUnique.(Lookup).Delete(vc, [][]sqltypes.Value{{sqltypes.NewInt64(1)}}, []byte("test")) if err != nil { t.Error(err) } if got, want := len(vc.queries), 1; got != want { t.Errorf("vc.queries length: %v, want %v", got, want) } +} + +func TestLookupUniqueUpdate(t *testing.T) { + lookupUnique := createLookup(t, "lookup_unique", false) + vc := &vcursor{} - //Negative Test - err = lookuphashunique.(Lookup).Delete(vc, [][]sqltypes.Value{[]sqltypes.Value{sqltypes.NewInt64(1)}}, []byte("test1test23")) - want := "lookup.Delete.vunhash: invalid keyspace id: 7465737431746573743233" - if err.Error() != want { + err := lookupUnique.(Lookup).Update(vc, []sqltypes.Value{sqltypes.NewInt64(1)}, []byte("test"), []sqltypes.Value{sqltypes.NewInt64(2)}) + if err != nil { t.Error(err) } + if got, want := len(vc.queries), 2; got != want { + t.Errorf("vc.queries length: %v, want %v", got, want) + } } diff --git a/go/vt/vtgate/vindexes/vindex.go b/go/vt/vtgate/vindexes/vindex.go index d7e5a627ec4..39efc85d2b9 100644 --- a/go/vt/vtgate/vindexes/vindex.go +++ b/go/vt/vtgate/vindexes/vindex.go @@ -22,6 +22,7 @@ import ( "github.com/youtube/vitess/go/sqltypes" querypb "github.com/youtube/vitess/go/vt/proto/query" + topodatapb "github.com/youtube/vitess/go/vt/proto/topodata" ) // This file defines interfaces and registration for vindexes. @@ -31,6 +32,7 @@ import ( // can use this interface to execute lookup queries. type VCursor interface { Execute(method string, query string, bindvars map[string]*querypb.BindVariable, isDML bool) (*sqltypes.Result, error) + ExecuteAutocommit(method string, query string, bindvars map[string]*querypb.BindVariable, isDML bool) (*sqltypes.Result, error) } // Vindex defines the interface required to register a vindex. @@ -62,10 +64,17 @@ type Unique interface { Map(cursor VCursor, ids []sqltypes.Value) ([][]byte, error) } +// Ksids represents keyspace ids. It's either a list of keyspace ids +// or a keyrange. +type Ksids struct { + Range *topodatapb.KeyRange + IDs [][]byte +} + // NonUnique defines the interface for a non-unique vindex. // This means that an id can map to multiple keyspace ids. type NonUnique interface { - Map(cursor VCursor, ids []sqltypes.Value) ([][][]byte, error) + Map(cursor VCursor, ids []sqltypes.Value) ([]Ksids, error) } // IsUnique returns true if the Vindex is Unique. @@ -103,7 +112,11 @@ type Lookup interface { // Create creates an association between ids and ksids. If ignoreMode // is true, then the Create should ignore dup key errors. Create(vc VCursor, rowsColValues [][]sqltypes.Value, ksids [][]byte, ignoreMode bool) error - Delete(VCursor, [][]sqltypes.Value, []byte) error + + Delete(vc VCursor, rowsColValues [][]sqltypes.Value, ksid []byte) error + + // Update replaces the mapping of old values with new values for a keyspace id. + Update(vc VCursor, oldValues []sqltypes.Value, ksid []byte, newValues []sqltypes.Value) error } // A NewVindexFunc is a function that creates a Vindex based on the diff --git a/go/vt/vtgate/vindexes/vschema_test.go b/go/vt/vtgate/vindexes/vschema_test.go index d3fb2a3ee3d..d4654f3c566 100644 --- a/go/vt/vtgate/vindexes/vschema_test.go +++ b/go/vt/vtgate/vindexes/vschema_test.go @@ -47,6 +47,8 @@ func NewSTFU(name string, params map[string]string) (Vindex, error) { return &stFU{name: name, Params: params}, nil } +var _ Unique = (*stFU)(nil) + // stF satisfies Functional, but no Map. Invalid vindex. type stF struct { name string @@ -67,34 +69,42 @@ type stLN struct { Params map[string]string } -func (v *stLN) String() string { return v.name } -func (*stLN) Cost() int { return 0 } -func (*stLN) Verify(VCursor, []sqltypes.Value, [][]byte) ([]bool, error) { return []bool{}, nil } -func (*stLN) Map(VCursor, []sqltypes.Value) ([][][]byte, error) { return nil, nil } -func (*stLN) Create(VCursor, [][]sqltypes.Value, [][]byte, bool) error { return nil } -func (*stLN) Delete(VCursor, [][]sqltypes.Value, []byte) error { return nil } +func (v *stLN) String() string { return v.name } +func (*stLN) Cost() int { return 0 } +func (*stLN) Verify(VCursor, []sqltypes.Value, [][]byte) ([]bool, error) { return []bool{}, nil } +func (*stLN) Map(VCursor, []sqltypes.Value) ([]Ksids, error) { return nil, nil } +func (*stLN) Create(VCursor, [][]sqltypes.Value, [][]byte, bool) error { return nil } +func (*stLN) Delete(VCursor, [][]sqltypes.Value, []byte) error { return nil } +func (*stLN) Update(VCursor, []sqltypes.Value, []byte, []sqltypes.Value) error { return nil } func NewSTLN(name string, params map[string]string) (Vindex, error) { return &stLN{name: name, Params: params}, nil } +var _ NonUnique = (*stLN)(nil) +var _ Lookup = (*stLN)(nil) + // stLU satisfies Lookup, Unique. type stLU struct { name string Params map[string]string } -func (v *stLU) String() string { return v.name } -func (*stLU) Cost() int { return 2 } -func (*stLU) Verify(VCursor, []sqltypes.Value, [][]byte) ([]bool, error) { return []bool{}, nil } -func (*stLU) Map(VCursor, []sqltypes.Value) ([][]byte, error) { return nil, nil } -func (*stLU) Create(VCursor, [][]sqltypes.Value, [][]byte, bool) error { return nil } -func (*stLU) Delete(VCursor, [][]sqltypes.Value, []byte) error { return nil } +func (v *stLU) String() string { return v.name } +func (*stLU) Cost() int { return 2 } +func (*stLU) Verify(VCursor, []sqltypes.Value, [][]byte) ([]bool, error) { return []bool{}, nil } +func (*stLU) Map(VCursor, []sqltypes.Value) ([][]byte, error) { return nil, nil } +func (*stLU) Create(VCursor, [][]sqltypes.Value, [][]byte, bool) error { return nil } +func (*stLU) Delete(VCursor, [][]sqltypes.Value, []byte) error { return nil } +func (*stLU) Update(VCursor, []sqltypes.Value, []byte, []sqltypes.Value) error { return nil } func NewSTLU(name string, params map[string]string) (Vindex, error) { return &stLU{name: name, Params: params}, nil } +var _ Unique = (*stLU)(nil) +var _ Lookup = (*stLU)(nil) + func init() { Register("stfu", NewSTFU) Register("stf", NewSTF)