Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions go/test/endtoend/vtgate/concurrentdml/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,8 +364,8 @@ func TestUpdateLookupUniqueVindex(t *testing.T) {
exec(t, conn, `update t1 set c3 = 400 where c2 = 200`)
// changed - same vindex
exec(t, conn, `update t1 set c4 = 'abc' where c1 = 999`)
// not changed - same vindex - not yet supported bcoz of varchar field
// exec(t, conn, `update t1 set c4 = 'abc' where c4 = 'abc'`)
// not changed - same vindex
exec(t, conn, `update t1 set c4 = 'abc' where c4 = 'abc'`)

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ update user set pet='fido' where id=1
update user set name='alicia' where id=1

1 ks_sharded/-40: begin
1 ks_sharded/-40: select id, name from user where id = 1 limit 10001 for update
1 ks_sharded/-40: select id, name, name = 'alicia' from user where id = 1 limit 10001 for update
2 ks_sharded/40-80: begin
2 ks_sharded/40-80: delete from name_user_map where name = 'name_val_2' and user_id = 1 limit 10001
3 ks_sharded/c0-: begin
Expand All @@ -42,7 +42,7 @@ update user set name='alicia' where name='alice'
1 ks_sharded/40-80: begin
1 ks_sharded/40-80: select name, user_id from name_user_map where name in ('alice') limit 10001 for update
2 ks_sharded/-40: begin
2 ks_sharded/-40: select id, name from user where name = 'alice' limit 10001 for update
2 ks_sharded/-40: select id, name, name = 'alicia' from user where name = 'alice' limit 10001 for update
3 ks_sharded/40-80: delete from name_user_map where name = 'name_val_2' and user_id = 1 limit 10001
4 ks_sharded/c0-: begin
4 ks_sharded/c0-: insert into name_user_map(name, user_id) values ('alicia', 1)
Expand Down
3 changes: 3 additions & 0 deletions go/vt/vtexplain/vtexplain_vttablet.go
Original file line number Diff line number Diff line change
Expand Up @@ -614,6 +614,9 @@ func inferColTypeFromExpr(node sqlparser.Expr, colTypeMap map[string]querypb.Typ
case *sqlparser.NullVal:
colNames = append(colNames, sqlparser.String(node))
colTypes = append(colTypes, querypb.Type_NULL_TYPE)
case *sqlparser.ComparisonExpr:
colNames = append(colNames, sqlparser.String(node))
colTypes = append(colTypes, querypb.Type_INT64)
default:
log.Errorf("vtexplain: unsupported select expression type +%v node %s", reflect.TypeOf(node), sqlparser.String(node))
}
Expand Down
6 changes: 3 additions & 3 deletions go/vt/vtgate/autocommit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,8 @@ func TestAutocommitUpdateLookup(t *testing.T) {
func TestAutocommitUpdateVindexChange(t *testing.T) {
executor, sbc, _, sbclookup := createLegacyExecutorEnv()
sbc.SetResults([]*sqltypes.Result{sqltypes.MakeTestResult(
sqltypes.MakeTestFields("id|name|lastname", "int64|int32|varchar"),
"1|1|foo",
sqltypes.MakeTestFields("id|name|lastname|name_lastname_keyspace_id_map", "int64|int32|varchar|int64"),
"1|1|foo|0",
),
})

Expand All @@ -110,7 +110,7 @@ func TestAutocommitUpdateVindexChange(t *testing.T) {
testCommitCount(t, "sbclookup", sbclookup, 1)

testQueries(t, "sbc", sbc, []*querypb.BoundQuery{{
Sql: "select id, name, lastname from user2 where id = 1 for update",
Sql: "select id, name, lastname, name = 'myname' and lastname = 'mylastname' from user2 where id = 1 for update",
BindVariables: map[string]*querypb.BindVariable{},
}, {
Sql: "update user2 set name = 'myname', lastname = 'mylastname' where id = 1",
Expand Down
25 changes: 20 additions & 5 deletions go/vt/vtgate/engine/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"sort"
"time"

"vitess.io/vitess/go/vt/vtgate/evalengine"

topodatapb "vitess.io/vitess/go/vt/proto/topodata"

"vitess.io/vitess/go/sqltypes"
Expand All @@ -36,14 +38,17 @@ import (
var _ Primitive = (*Update)(nil)

// VindexValues contains changed values for a vindex.
type VindexValues map[string]sqltypes.PlanValue
type VindexValues struct {
PvMap map[string]sqltypes.PlanValue
Offset int // Offset from ownedVindexQuery to provide input decision for vindex update.
}

// Update represents the instructions to perform an update.
type Update struct {
DML

// ChangedVindexValues contains values for updated Vindexes during an update statement.
ChangedVindexValues map[string]VindexValues
ChangedVindexValues map[string]*VindexValues

// Update does not take inputs
noInputs
Expand Down Expand Up @@ -227,13 +232,23 @@ func (upd *Update) updateVindexEntries(vcursor VCursor, bindVars map[string]*que
for _, colVindex := range upd.Table.Owned {
// Update columns only if they're being changed.
if updColValues, ok := upd.ChangedVindexValues[colVindex.Name]; ok {
offset := updColValues.Offset
if !row[offset].IsNull() {
val, err := evalengine.ToInt64(row[offset])
if err != nil {
return err
}
if val == int64(1) { // 1 means that the old and new value are same and vindex update is not required.
continue
}
}
fromIds := make([]sqltypes.Value, 0, len(colVindex.Columns))
var vindexColumnKeys []sqltypes.Value
for _, vCol := range colVindex.Columns {
// Fetch the column values.
origColValue := row[fieldColNumMap[vCol.String()]]
fromIds = append(fromIds, origColValue)
if colValue, exists := updColValues[vCol.String()]; exists {
if colValue, exists := updColValues.PvMap[vCol.String()]; exists {
resolvedVal, err := colValue.ResolveValue(bindVars)
if err != nil {
return err
Expand Down Expand Up @@ -266,8 +281,8 @@ func (upd *Update) description() PrimitiveDescription {
addFieldsIfNotEmpty(upd.DML, other)

var changedVindexes []string
for vindex := range upd.ChangedVindexValues {
changedVindexes = append(changedVindexes, vindex)
for k, v := range upd.ChangedVindexValues {
changedVindexes = append(changedVindexes, fmt.Sprintf("%s:%d", k, v.Offset))
}
sort.Strings(changedVindexes) // We sort these so random changes in the map order does not affect output
if len(changedVindexes) > 0 {
Expand Down
124 changes: 87 additions & 37 deletions go/vt/vtgate/engine/update_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,23 +205,29 @@ func TestUpdateEqualChangedVindex(t *testing.T) {
OwnedVindexQuery: "dummy_subquery",
KsidVindex: ks.Vindexes["hash"].(vindexes.SingleColumn),
},
ChangedVindexValues: map[string]VindexValues{
ChangedVindexValues: map[string]*VindexValues{
"twocol": {
"c1": {Value: sqltypes.NewInt64(1)},
"c2": {Value: sqltypes.NewInt64(2)},
PvMap: map[string]sqltypes.PlanValue{
"c1": {Value: sqltypes.NewInt64(1)},
"c2": {Value: sqltypes.NewInt64(2)},
},
Offset: 4,
},
"onecol": {
"c3": {Value: sqltypes.NewInt64(3)},
PvMap: map[string]sqltypes.PlanValue{
"c3": {Value: sqltypes.NewInt64(3)},
},
Offset: 5,
},
},
}

results := []*sqltypes.Result{sqltypes.MakeTestResult(
sqltypes.MakeTestFields(
"id|c1|c2|c3",
"int64|int64|int64|int64",
"id|c1|c2|c3|twocol|onecol",
"int64|int64|int64|int64|int64|int64",
),
"1|4|5|6",
"1|4|5|6|0|0",
)}
vc := newDMLTestVCursor("-20", "20-")
vc.results = results
Expand Down Expand Up @@ -258,14 +264,14 @@ func TestUpdateEqualChangedVindex(t *testing.T) {
`ExecuteMultiShard sharded.-20: dummy_update {} true true`,
})

// Failure case: multiple rows changing.
// multiple rows changing.
results = []*sqltypes.Result{sqltypes.MakeTestResult(
sqltypes.MakeTestFields(
"id|c1|c2|c3",
"int64|int64|int64|int64",
"id|c1|c2|c3|twocol|onecol",
"int64|int64|int64|int64|int64|int64",
),
"1|4|5|6",
"1|7|8|9",
"1|4|5|6|0|0",
"1|7|8|9|0|0",
)}
vc = newDMLTestVCursor("-20", "20-")
vc.results = results
Expand All @@ -284,8 +290,40 @@ func TestUpdateEqualChangedVindex(t *testing.T) {
// 6 has to be replaced by 3.
`Execute delete from lkp1 where from = :from and toc = :toc from: type:INT64 value:"6" toc: type:VARBINARY value:"\026k@\264J\272K\326" true`,
`Execute insert into lkp1(from, toc) values(:from_0, :toc_0) from_0: type:INT64 value:"3" toc_0: type:VARBINARY value:"\026k@\264J\272K\326" true`,
// 7,8 have to be replaced by 1,2 (the new values).
`Execute delete from lkp2 where from1 = :from1 and from2 = :from2 and toc = :toc from1: type:INT64 value:"7" from2: type:INT64 value:"8" toc: type:VARBINARY value:"\026k@\264J\272K\326" true`,
`Execute insert into lkp2(from1, from2, toc) values(:from1_0, :from2_0, :toc_0) from1_0: type:INT64 value:"1" from2_0: type:INT64 value:"2" toc_0: type:VARBINARY value:"\026k@\264J\272K\326" true`,
// 9 has to be replaced by 3.
`Execute delete from lkp1 where from = :from and toc = :toc from: type:INT64 value:"9" toc: type:VARBINARY value:"\026k@\264J\272K\326" true`,
`Execute insert into lkp1(from, toc) values(:from_0, :toc_0) from_0: type:INT64 value:"3" toc_0: type:VARBINARY value:"\026k@\264J\272K\326" true`,
// Finally, the actual update, which is also sent to -20, same route as the subquery.
`ExecuteMultiShard sharded.-20: dummy_update {} true true`,
})

// multiple rows changing, but only some vindex actually changes
results = []*sqltypes.Result{sqltypes.MakeTestResult(
sqltypes.MakeTestFields(
"id|c1|c2|c3|twocol|onecol",
"int64|int64|int64|int64|int64|int64",
),
"1|4|5|6|0|1", // twocol changes
"1|7|8|9|1|0", // onecol changes
)}
vc = newDMLTestVCursor("-20", "20-")
vc.results = results

_, err = upd.Execute(vc, map[string]*querypb.BindVariable{}, false)
require.NoError(t, err)
vc.ExpectLog(t, []string{
`ResolveDestinations sharded [] Destinations:DestinationKeyspaceID(166b40b44aba4bd6)`,
// ResolveDestinations is hard-coded to return -20.
// It gets used to perform the subquery to fetch the changing column values.
`ExecuteMultiShard sharded.-20: dummy_subquery {} false false`,
// Those values are returned as 4,5 for twocol and 6 for onecol.
// 4,5 have to be replaced by 1,2 (the new values).
`Execute delete from lkp2 where from1 = :from1 and from2 = :from2 and toc = :toc from1: type:INT64 value:"4" from2: type:INT64 value:"5" toc: type:VARBINARY value:"\026k@\264J\272K\326" true`,
`Execute insert into lkp2(from1, from2, toc) values(:from1_0, :from2_0, :toc_0) from1_0: type:INT64 value:"1" from2_0: type:INT64 value:"2" toc_0: type:VARBINARY value:"\026k@\264J\272K\326" true`,
// 9 has to be replaced by 3.
`Execute delete from lkp1 where from = :from and toc = :toc from: type:INT64 value:"9" toc: type:VARBINARY value:"\026k@\264J\272K\326" true`,
`Execute insert into lkp1(from, toc) values(:from_0, :toc_0) from_0: type:INT64 value:"3" toc_0: type:VARBINARY value:"\026k@\264J\272K\326" true`,
// Finally, the actual update, which is also sent to -20, same route as the subquery.
Expand All @@ -306,23 +344,29 @@ func TestUpdateScatterChangedVindex(t *testing.T) {
OwnedVindexQuery: "dummy_subquery",
KsidVindex: ks.Vindexes["hash"].(vindexes.SingleColumn),
},
ChangedVindexValues: map[string]VindexValues{
ChangedVindexValues: map[string]*VindexValues{
"twocol": {
"c1": {Value: sqltypes.NewInt64(1)},
"c2": {Value: sqltypes.NewInt64(2)},
PvMap: map[string]sqltypes.PlanValue{
"c1": {Value: sqltypes.NewInt64(1)},
"c2": {Value: sqltypes.NewInt64(2)},
},
Offset: 4,
},
"onecol": {
"c3": {Value: sqltypes.NewInt64(3)},
PvMap: map[string]sqltypes.PlanValue{
"c3": {Value: sqltypes.NewInt64(3)},
},
Offset: 5,
},
},
}

results := []*sqltypes.Result{sqltypes.MakeTestResult(
sqltypes.MakeTestFields(
"id|c1|c2|c3",
"int64|int64|int64|int64",
"id|c1|c2|c3|twocol|onecol",
"int64|int64|int64|int64|int64|int64",
),
"1|4|5|6",
"1|4|5|6|0|0",
)}
vc := newDMLTestVCursor("-20", "20-")
vc.results = results
Expand Down Expand Up @@ -362,11 +406,11 @@ func TestUpdateScatterChangedVindex(t *testing.T) {
// Update can affect multiple rows
results = []*sqltypes.Result{sqltypes.MakeTestResult(
sqltypes.MakeTestFields(
"id|c1|c2|c3",
"int64|int64|int64|int64",
"id|c1|c2|c3|twocol|onecol",
"int64|int64|int64|int64|int64|int64",
),
"1|4|5|6",
"1|7|8|9",
"1|4|5|6|0|0",
"1|7|8|9|0|0",
)}
vc = newDMLTestVCursor("-20", "20-")
vc.results = results
Expand Down Expand Up @@ -441,24 +485,30 @@ func TestUpdateInChangedVindex(t *testing.T) {
OwnedVindexQuery: "dummy_subquery",
KsidVindex: ks.Vindexes["hash"].(vindexes.SingleColumn),
},
ChangedVindexValues: map[string]VindexValues{
ChangedVindexValues: map[string]*VindexValues{
"twocol": {
"c1": {Value: sqltypes.NewInt64(1)},
"c2": {Value: sqltypes.NewInt64(2)},
PvMap: map[string]sqltypes.PlanValue{
"c1": {Value: sqltypes.NewInt64(1)},
"c2": {Value: sqltypes.NewInt64(2)},
},
Offset: 4,
},
"onecol": {
"c3": {Value: sqltypes.NewInt64(3)},
PvMap: map[string]sqltypes.PlanValue{
"c3": {Value: sqltypes.NewInt64(3)},
},
Offset: 5,
},
},
}

results := []*sqltypes.Result{sqltypes.MakeTestResult(
sqltypes.MakeTestFields(
"id|c1|c2|c3",
"int64|int64|int64|int64",
"id|c1|c2|c3|twocol|onecol",
"int64|int64|int64|int64|int64|int64",
),
"1|4|5|6",
"2|21|22|23",
"1|4|5|6|0|0",
"2|21|22|23|0|0",
)}
vc := newDMLTestVCursor("-20", "20-")
vc.results = results
Expand Down Expand Up @@ -501,15 +551,15 @@ func TestUpdateInChangedVindex(t *testing.T) {
`ExecuteMultiShard sharded.-20: dummy_update {} true true`,
})

// Failure case: multiple rows changing.
// multiple rows changing.
results = []*sqltypes.Result{sqltypes.MakeTestResult(
sqltypes.MakeTestFields(
"id|c1|c2|c3",
"int64|int64|int64|int64",
"id|c1|c2|c3|twocol|onecol",
"int64|int64|int64|int64|int64|int64",
),
"1|4|5|6",
"1|7|8|9",
"2|21|22|23",
"1|4|5|6|0|0",
"1|7|8|9|0|0",
"2|21|22|23|0|0",
)}
vc = newDMLTestVCursor("-20", "20-")
vc.results = results
Expand Down
12 changes: 6 additions & 6 deletions go/vt/vtgate/executor_dml_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,16 +101,16 @@ func TestUpdateEqual(t *testing.T) {
sbc2.Queries = nil
sbclookup.Queries = nil
sbc1.SetResults([]*sqltypes.Result{sqltypes.MakeTestResult(
sqltypes.MakeTestFields("id|name|lastname", "int64|int32|varchar"),
"1|1|foo",
sqltypes.MakeTestFields("id|name|lastname|name_lastname_keyspace_id_map", "int64|int32|varchar|int64"),
"1|1|foo|0",
),
})

_, err = executorExec(executor, "update user2 set name='myname', lastname='mylastname' where id = 1", nil)
require.NoError(t, err)
wantQueries = []*querypb.BoundQuery{
{
Sql: "select id, name, lastname from user2 where id = 1 for update",
Sql: "select id, name, lastname, name = 'myname' and lastname = 'mylastname' from user2 where id = 1 for update",
BindVariables: map[string]*querypb.BindVariable{},
},
{
Expand Down Expand Up @@ -213,16 +213,16 @@ func TestUpdateMultiOwned(t *testing.T) {

sbc1.SetResults([]*sqltypes.Result{
sqltypes.MakeTestResult(
sqltypes.MakeTestFields("id|a|b|c|d|e|f", "int64|int64|int64|int64|int64|int64|int64"),
"1|10|20|30|40|50|60",
sqltypes.MakeTestFields("id|a|b|c|d|e|f|lookup1|lookup3", "int64|int64|int64|int64|int64|int64|int64|int64|int64"),
"1|10|20|30|40|50|60|0|0",
),
})
_, err := executorExec(executor, "update user set a=1, b=2, f=4, e=3 where id=1", nil)
if err != nil {
t.Fatal(err)
}
wantQueries := []*querypb.BoundQuery{{
Sql: "select id, a, b, c, d, e, f from user where id = 1 for update",
Sql: "select id, a, b, c, d, e, f, a = 1 and b = 2, e = 3 and f = 4 from user where id = 1 for update",
BindVariables: map[string]*querypb.BindVariable{},
}, {
Sql: "update user set a = 1, b = 2, f = 4, e = 3 where id = 1",
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtgate/executor_select_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,7 @@ func TestRowCount(t *testing.T) {
require.NoError(t, err)
testRowCount(t, executor, -1)

_, err = executorExec(executor, "update user set name = 'abc' where id in (42, 24)", map[string]*querypb.BindVariable{})
_, err = executorExec(executor, "delete from user where id in (42, 24)", map[string]*querypb.BindVariable{})
require.NoError(t, err)
testRowCount(t, executor, 2)
}
Expand Down
Loading