diff --git a/go/test/endtoend/vtgate/concurrentdml/main_test.go b/go/test/endtoend/vtgate/concurrentdml/main_test.go index b447ba8a720..c613b348b5f 100644 --- a/go/test/endtoend/vtgate/concurrentdml/main_test.go +++ b/go/test/endtoend/vtgate/concurrentdml/main_test.go @@ -364,8 +364,8 @@ func TestUpdateLookupUniqueVindex(t *testing.T) { exec(t, conn, `update t1 set c3 = 400 where c2 = 200`) // changed - same vindex exec(t, conn, `update t1 set c4 = 'abc' where c1 = 999`) - // not changed - same vindex - not yet supported bcoz of varchar field - // exec(t, conn, `update t1 set c4 = 'abc' where c4 = 'abc'`) + // not changed - same vindex + exec(t, conn, `update t1 set c4 = 'abc' where c4 = 'abc'`) } diff --git a/go/vt/vtexplain/testdata/multi-output/updatesharded-output.txt b/go/vt/vtexplain/testdata/multi-output/updatesharded-output.txt index 2bd0b638686..9df3b3f352a 100644 --- a/go/vt/vtexplain/testdata/multi-output/updatesharded-output.txt +++ b/go/vt/vtexplain/testdata/multi-output/updatesharded-output.txt @@ -26,7 +26,7 @@ update user set pet='fido' where id=1 update user set name='alicia' where id=1 1 ks_sharded/-40: begin -1 ks_sharded/-40: select id, name from user where id = 1 limit 10001 for update +1 ks_sharded/-40: select id, name, name = 'alicia' from user where id = 1 limit 10001 for update 2 ks_sharded/40-80: begin 2 ks_sharded/40-80: delete from name_user_map where name = 'name_val_2' and user_id = 1 limit 10001 3 ks_sharded/c0-: begin @@ -42,7 +42,7 @@ update user set name='alicia' where name='alice' 1 ks_sharded/40-80: begin 1 ks_sharded/40-80: select name, user_id from name_user_map where name in ('alice') limit 10001 for update 2 ks_sharded/-40: begin -2 ks_sharded/-40: select id, name from user where name = 'alice' limit 10001 for update +2 ks_sharded/-40: select id, name, name = 'alicia' from user where name = 'alice' limit 10001 for update 3 ks_sharded/40-80: delete from name_user_map where name = 'name_val_2' and user_id = 1 limit 10001 4 ks_sharded/c0-: begin 4 ks_sharded/c0-: insert into name_user_map(name, user_id) values ('alicia', 1) diff --git a/go/vt/vtexplain/vtexplain_vttablet.go b/go/vt/vtexplain/vtexplain_vttablet.go index 5fcf2039d7b..51095f43396 100644 --- a/go/vt/vtexplain/vtexplain_vttablet.go +++ b/go/vt/vtexplain/vtexplain_vttablet.go @@ -614,6 +614,9 @@ func inferColTypeFromExpr(node sqlparser.Expr, colTypeMap map[string]querypb.Typ case *sqlparser.NullVal: colNames = append(colNames, sqlparser.String(node)) colTypes = append(colTypes, querypb.Type_NULL_TYPE) + case *sqlparser.ComparisonExpr: + colNames = append(colNames, sqlparser.String(node)) + colTypes = append(colTypes, querypb.Type_INT64) default: log.Errorf("vtexplain: unsupported select expression type +%v node %s", reflect.TypeOf(node), sqlparser.String(node)) } diff --git a/go/vt/vtgate/autocommit_test.go b/go/vt/vtgate/autocommit_test.go index f84fa89edb7..b5576adb3ab 100644 --- a/go/vt/vtgate/autocommit_test.go +++ b/go/vt/vtgate/autocommit_test.go @@ -84,8 +84,8 @@ func TestAutocommitUpdateLookup(t *testing.T) { func TestAutocommitUpdateVindexChange(t *testing.T) { executor, sbc, _, sbclookup := createLegacyExecutorEnv() sbc.SetResults([]*sqltypes.Result{sqltypes.MakeTestResult( - sqltypes.MakeTestFields("id|name|lastname", "int64|int32|varchar"), - "1|1|foo", + sqltypes.MakeTestFields("id|name|lastname|name_lastname_keyspace_id_map", "int64|int32|varchar|int64"), + "1|1|foo|0", ), }) @@ -110,7 +110,7 @@ func TestAutocommitUpdateVindexChange(t *testing.T) { testCommitCount(t, "sbclookup", sbclookup, 1) testQueries(t, "sbc", sbc, []*querypb.BoundQuery{{ - Sql: "select id, name, lastname from user2 where id = 1 for update", + Sql: "select id, name, lastname, name = 'myname' and lastname = 'mylastname' from user2 where id = 1 for update", BindVariables: map[string]*querypb.BindVariable{}, }, { Sql: "update user2 set name = 'myname', lastname = 'mylastname' where id = 1", diff --git a/go/vt/vtgate/engine/update.go b/go/vt/vtgate/engine/update.go index 14b0a2682fb..a8710e1e324 100644 --- a/go/vt/vtgate/engine/update.go +++ b/go/vt/vtgate/engine/update.go @@ -21,6 +21,8 @@ import ( "sort" "time" + "vitess.io/vitess/go/vt/vtgate/evalengine" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" "vitess.io/vitess/go/sqltypes" @@ -36,14 +38,17 @@ import ( var _ Primitive = (*Update)(nil) // VindexValues contains changed values for a vindex. -type VindexValues map[string]sqltypes.PlanValue +type VindexValues struct { + PvMap map[string]sqltypes.PlanValue + Offset int // Offset from ownedVindexQuery to provide input decision for vindex update. +} // Update represents the instructions to perform an update. type Update struct { DML // ChangedVindexValues contains values for updated Vindexes during an update statement. - ChangedVindexValues map[string]VindexValues + ChangedVindexValues map[string]*VindexValues // Update does not take inputs noInputs @@ -227,13 +232,23 @@ func (upd *Update) updateVindexEntries(vcursor VCursor, bindVars map[string]*que for _, colVindex := range upd.Table.Owned { // Update columns only if they're being changed. if updColValues, ok := upd.ChangedVindexValues[colVindex.Name]; ok { + offset := updColValues.Offset + if !row[offset].IsNull() { + val, err := evalengine.ToInt64(row[offset]) + if err != nil { + return err + } + if val == int64(1) { // 1 means that the old and new value are same and vindex update is not required. + continue + } + } fromIds := make([]sqltypes.Value, 0, len(colVindex.Columns)) var vindexColumnKeys []sqltypes.Value for _, vCol := range colVindex.Columns { // Fetch the column values. origColValue := row[fieldColNumMap[vCol.String()]] fromIds = append(fromIds, origColValue) - if colValue, exists := updColValues[vCol.String()]; exists { + if colValue, exists := updColValues.PvMap[vCol.String()]; exists { resolvedVal, err := colValue.ResolveValue(bindVars) if err != nil { return err @@ -266,8 +281,8 @@ func (upd *Update) description() PrimitiveDescription { addFieldsIfNotEmpty(upd.DML, other) var changedVindexes []string - for vindex := range upd.ChangedVindexValues { - changedVindexes = append(changedVindexes, vindex) + for k, v := range upd.ChangedVindexValues { + changedVindexes = append(changedVindexes, fmt.Sprintf("%s:%d", k, v.Offset)) } sort.Strings(changedVindexes) // We sort these so random changes in the map order does not affect output if len(changedVindexes) > 0 { diff --git a/go/vt/vtgate/engine/update_test.go b/go/vt/vtgate/engine/update_test.go index 76fb07fdc71..5e64062bdac 100644 --- a/go/vt/vtgate/engine/update_test.go +++ b/go/vt/vtgate/engine/update_test.go @@ -205,23 +205,29 @@ func TestUpdateEqualChangedVindex(t *testing.T) { OwnedVindexQuery: "dummy_subquery", KsidVindex: ks.Vindexes["hash"].(vindexes.SingleColumn), }, - ChangedVindexValues: map[string]VindexValues{ + ChangedVindexValues: map[string]*VindexValues{ "twocol": { - "c1": {Value: sqltypes.NewInt64(1)}, - "c2": {Value: sqltypes.NewInt64(2)}, + PvMap: map[string]sqltypes.PlanValue{ + "c1": {Value: sqltypes.NewInt64(1)}, + "c2": {Value: sqltypes.NewInt64(2)}, + }, + Offset: 4, }, "onecol": { - "c3": {Value: sqltypes.NewInt64(3)}, + PvMap: map[string]sqltypes.PlanValue{ + "c3": {Value: sqltypes.NewInt64(3)}, + }, + Offset: 5, }, }, } results := []*sqltypes.Result{sqltypes.MakeTestResult( sqltypes.MakeTestFields( - "id|c1|c2|c3", - "int64|int64|int64|int64", + "id|c1|c2|c3|twocol|onecol", + "int64|int64|int64|int64|int64|int64", ), - "1|4|5|6", + "1|4|5|6|0|0", )} vc := newDMLTestVCursor("-20", "20-") vc.results = results @@ -258,14 +264,14 @@ func TestUpdateEqualChangedVindex(t *testing.T) { `ExecuteMultiShard sharded.-20: dummy_update {} true true`, }) - // Failure case: multiple rows changing. + // multiple rows changing. results = []*sqltypes.Result{sqltypes.MakeTestResult( sqltypes.MakeTestFields( - "id|c1|c2|c3", - "int64|int64|int64|int64", + "id|c1|c2|c3|twocol|onecol", + "int64|int64|int64|int64|int64|int64", ), - "1|4|5|6", - "1|7|8|9", + "1|4|5|6|0|0", + "1|7|8|9|0|0", )} vc = newDMLTestVCursor("-20", "20-") vc.results = results @@ -284,8 +290,40 @@ func TestUpdateEqualChangedVindex(t *testing.T) { // 6 has to be replaced by 3. `Execute delete from lkp1 where from = :from and toc = :toc from: type:INT64 value:"6" toc: type:VARBINARY value:"\026k@\264J\272K\326" true`, `Execute insert into lkp1(from, toc) values(:from_0, :toc_0) from_0: type:INT64 value:"3" toc_0: type:VARBINARY value:"\026k@\264J\272K\326" true`, + // 7,8 have to be replaced by 1,2 (the new values). `Execute delete from lkp2 where from1 = :from1 and from2 = :from2 and toc = :toc from1: type:INT64 value:"7" from2: type:INT64 value:"8" toc: type:VARBINARY value:"\026k@\264J\272K\326" true`, `Execute insert into lkp2(from1, from2, toc) values(:from1_0, :from2_0, :toc_0) from1_0: type:INT64 value:"1" from2_0: type:INT64 value:"2" toc_0: type:VARBINARY value:"\026k@\264J\272K\326" true`, + // 9 has to be replaced by 3. + `Execute delete from lkp1 where from = :from and toc = :toc from: type:INT64 value:"9" toc: type:VARBINARY value:"\026k@\264J\272K\326" true`, + `Execute insert into lkp1(from, toc) values(:from_0, :toc_0) from_0: type:INT64 value:"3" toc_0: type:VARBINARY value:"\026k@\264J\272K\326" true`, + // Finally, the actual update, which is also sent to -20, same route as the subquery. + `ExecuteMultiShard sharded.-20: dummy_update {} true true`, + }) + + // multiple rows changing, but only some vindex actually changes + results = []*sqltypes.Result{sqltypes.MakeTestResult( + sqltypes.MakeTestFields( + "id|c1|c2|c3|twocol|onecol", + "int64|int64|int64|int64|int64|int64", + ), + "1|4|5|6|0|1", // twocol changes + "1|7|8|9|1|0", // onecol changes + )} + vc = newDMLTestVCursor("-20", "20-") + vc.results = results + + _, err = upd.Execute(vc, map[string]*querypb.BindVariable{}, false) + require.NoError(t, err) + vc.ExpectLog(t, []string{ + `ResolveDestinations sharded [] Destinations:DestinationKeyspaceID(166b40b44aba4bd6)`, + // ResolveDestinations is hard-coded to return -20. + // It gets used to perform the subquery to fetch the changing column values. + `ExecuteMultiShard sharded.-20: dummy_subquery {} false false`, + // Those values are returned as 4,5 for twocol and 6 for onecol. + // 4,5 have to be replaced by 1,2 (the new values). + `Execute delete from lkp2 where from1 = :from1 and from2 = :from2 and toc = :toc from1: type:INT64 value:"4" from2: type:INT64 value:"5" toc: type:VARBINARY value:"\026k@\264J\272K\326" true`, + `Execute insert into lkp2(from1, from2, toc) values(:from1_0, :from2_0, :toc_0) from1_0: type:INT64 value:"1" from2_0: type:INT64 value:"2" toc_0: type:VARBINARY value:"\026k@\264J\272K\326" true`, + // 9 has to be replaced by 3. `Execute delete from lkp1 where from = :from and toc = :toc from: type:INT64 value:"9" toc: type:VARBINARY value:"\026k@\264J\272K\326" true`, `Execute insert into lkp1(from, toc) values(:from_0, :toc_0) from_0: type:INT64 value:"3" toc_0: type:VARBINARY value:"\026k@\264J\272K\326" true`, // Finally, the actual update, which is also sent to -20, same route as the subquery. @@ -306,23 +344,29 @@ func TestUpdateScatterChangedVindex(t *testing.T) { OwnedVindexQuery: "dummy_subquery", KsidVindex: ks.Vindexes["hash"].(vindexes.SingleColumn), }, - ChangedVindexValues: map[string]VindexValues{ + ChangedVindexValues: map[string]*VindexValues{ "twocol": { - "c1": {Value: sqltypes.NewInt64(1)}, - "c2": {Value: sqltypes.NewInt64(2)}, + PvMap: map[string]sqltypes.PlanValue{ + "c1": {Value: sqltypes.NewInt64(1)}, + "c2": {Value: sqltypes.NewInt64(2)}, + }, + Offset: 4, }, "onecol": { - "c3": {Value: sqltypes.NewInt64(3)}, + PvMap: map[string]sqltypes.PlanValue{ + "c3": {Value: sqltypes.NewInt64(3)}, + }, + Offset: 5, }, }, } results := []*sqltypes.Result{sqltypes.MakeTestResult( sqltypes.MakeTestFields( - "id|c1|c2|c3", - "int64|int64|int64|int64", + "id|c1|c2|c3|twocol|onecol", + "int64|int64|int64|int64|int64|int64", ), - "1|4|5|6", + "1|4|5|6|0|0", )} vc := newDMLTestVCursor("-20", "20-") vc.results = results @@ -362,11 +406,11 @@ func TestUpdateScatterChangedVindex(t *testing.T) { // Update can affect multiple rows results = []*sqltypes.Result{sqltypes.MakeTestResult( sqltypes.MakeTestFields( - "id|c1|c2|c3", - "int64|int64|int64|int64", + "id|c1|c2|c3|twocol|onecol", + "int64|int64|int64|int64|int64|int64", ), - "1|4|5|6", - "1|7|8|9", + "1|4|5|6|0|0", + "1|7|8|9|0|0", )} vc = newDMLTestVCursor("-20", "20-") vc.results = results @@ -441,24 +485,30 @@ func TestUpdateInChangedVindex(t *testing.T) { OwnedVindexQuery: "dummy_subquery", KsidVindex: ks.Vindexes["hash"].(vindexes.SingleColumn), }, - ChangedVindexValues: map[string]VindexValues{ + ChangedVindexValues: map[string]*VindexValues{ "twocol": { - "c1": {Value: sqltypes.NewInt64(1)}, - "c2": {Value: sqltypes.NewInt64(2)}, + PvMap: map[string]sqltypes.PlanValue{ + "c1": {Value: sqltypes.NewInt64(1)}, + "c2": {Value: sqltypes.NewInt64(2)}, + }, + Offset: 4, }, "onecol": { - "c3": {Value: sqltypes.NewInt64(3)}, + PvMap: map[string]sqltypes.PlanValue{ + "c3": {Value: sqltypes.NewInt64(3)}, + }, + Offset: 5, }, }, } results := []*sqltypes.Result{sqltypes.MakeTestResult( sqltypes.MakeTestFields( - "id|c1|c2|c3", - "int64|int64|int64|int64", + "id|c1|c2|c3|twocol|onecol", + "int64|int64|int64|int64|int64|int64", ), - "1|4|5|6", - "2|21|22|23", + "1|4|5|6|0|0", + "2|21|22|23|0|0", )} vc := newDMLTestVCursor("-20", "20-") vc.results = results @@ -501,15 +551,15 @@ func TestUpdateInChangedVindex(t *testing.T) { `ExecuteMultiShard sharded.-20: dummy_update {} true true`, }) - // Failure case: multiple rows changing. + // multiple rows changing. results = []*sqltypes.Result{sqltypes.MakeTestResult( sqltypes.MakeTestFields( - "id|c1|c2|c3", - "int64|int64|int64|int64", + "id|c1|c2|c3|twocol|onecol", + "int64|int64|int64|int64|int64|int64", ), - "1|4|5|6", - "1|7|8|9", - "2|21|22|23", + "1|4|5|6|0|0", + "1|7|8|9|0|0", + "2|21|22|23|0|0", )} vc = newDMLTestVCursor("-20", "20-") vc.results = results diff --git a/go/vt/vtgate/executor_dml_test.go b/go/vt/vtgate/executor_dml_test.go index b5ea0d29640..65054118020 100644 --- a/go/vt/vtgate/executor_dml_test.go +++ b/go/vt/vtgate/executor_dml_test.go @@ -101,8 +101,8 @@ func TestUpdateEqual(t *testing.T) { sbc2.Queries = nil sbclookup.Queries = nil sbc1.SetResults([]*sqltypes.Result{sqltypes.MakeTestResult( - sqltypes.MakeTestFields("id|name|lastname", "int64|int32|varchar"), - "1|1|foo", + sqltypes.MakeTestFields("id|name|lastname|name_lastname_keyspace_id_map", "int64|int32|varchar|int64"), + "1|1|foo|0", ), }) @@ -110,7 +110,7 @@ func TestUpdateEqual(t *testing.T) { require.NoError(t, err) wantQueries = []*querypb.BoundQuery{ { - Sql: "select id, name, lastname from user2 where id = 1 for update", + Sql: "select id, name, lastname, name = 'myname' and lastname = 'mylastname' from user2 where id = 1 for update", BindVariables: map[string]*querypb.BindVariable{}, }, { @@ -213,8 +213,8 @@ func TestUpdateMultiOwned(t *testing.T) { sbc1.SetResults([]*sqltypes.Result{ sqltypes.MakeTestResult( - sqltypes.MakeTestFields("id|a|b|c|d|e|f", "int64|int64|int64|int64|int64|int64|int64"), - "1|10|20|30|40|50|60", + sqltypes.MakeTestFields("id|a|b|c|d|e|f|lookup1|lookup3", "int64|int64|int64|int64|int64|int64|int64|int64|int64"), + "1|10|20|30|40|50|60|0|0", ), }) _, err := executorExec(executor, "update user set a=1, b=2, f=4, e=3 where id=1", nil) @@ -222,7 +222,7 @@ func TestUpdateMultiOwned(t *testing.T) { t.Fatal(err) } wantQueries := []*querypb.BoundQuery{{ - Sql: "select id, a, b, c, d, e, f from user where id = 1 for update", + Sql: "select id, a, b, c, d, e, f, a = 1 and b = 2, e = 3 and f = 4 from user where id = 1 for update", BindVariables: map[string]*querypb.BindVariable{}, }, { Sql: "update user set a = 1, b = 2, f = 4, e = 3 where id = 1", diff --git a/go/vt/vtgate/executor_select_test.go b/go/vt/vtgate/executor_select_test.go index 88a2f808c68..40eab7b6211 100644 --- a/go/vt/vtgate/executor_select_test.go +++ b/go/vt/vtgate/executor_select_test.go @@ -418,7 +418,7 @@ func TestRowCount(t *testing.T) { require.NoError(t, err) testRowCount(t, executor, -1) - _, err = executorExec(executor, "update user set name = 'abc' where id in (42, 24)", map[string]*querypb.BindVariable{}) + _, err = executorExec(executor, "delete from user where id in (42, 24)", map[string]*querypb.BindVariable{}) require.NoError(t, err) testRowCount(t, executor, 2) } diff --git a/go/vt/vtgate/planbuilder/testdata/dml_cases.txt b/go/vt/vtgate/planbuilder/testdata/dml_cases.txt index 4531dc693c7..67a5bc0b262 100644 --- a/go/vt/vtgate/planbuilder/testdata/dml_cases.txt +++ b/go/vt/vtgate/planbuilder/testdata/dml_cases.txt @@ -279,11 +279,11 @@ }, "TargetTabletType": "MASTER", "ChangedVindexValues": [ - "email_user_map" + "email_user_map:3" ], "KsidVindex": "user_index", "MultiShardAutocommit": false, - "OwnedVindexQuery": "select user_id, email, address from user_metadata where user_id = 1 for update", + "OwnedVindexQuery": "select user_id, email, address, email = 'juan@vitess.io' from user_metadata where user_id = 1 for update", "Query": "update user_metadata set email = 'juan@vitess.io' where user_id = 1", "Table": "user_metadata", "Values": [ @@ -311,12 +311,12 @@ }, "TargetTabletType": "MASTER", "ChangedVindexValues": [ - "address_user_map", - "email_user_map" + "address_user_map:4", + "email_user_map:3" ], "KsidVindex": "user_index", "MultiShardAutocommit": false, - "OwnedVindexQuery": "select user_id, email, address from user_metadata where user_id = 1 for update", + "OwnedVindexQuery": "select user_id, email, address, email = 'juan@vitess.io', address = '155 5th street' from user_metadata where user_id = 1 for update", "Query": "update user_metadata set email = 'juan@vitess.io', address = '155 5th street' where user_id = 1", "Table": "user_metadata", "Values": [ @@ -340,11 +340,11 @@ }, "TargetTabletType": "MASTER", "ChangedVindexValues": [ - "email_user_map" + "email_user_map:3" ], "KsidVindex": "user_index", "MultiShardAutocommit": false, - "OwnedVindexQuery": "select user_id, email, address from user_metadata where user_id = 1 order by user_id asc limit 10 for update", + "OwnedVindexQuery": "select user_id, email, address, email = 'juan@vitess.io' from user_metadata where user_id = 1 order by user_id asc limit 10 for update", "Query": "update user_metadata set email = 'juan@vitess.io' where user_id = 1 order by user_id asc limit 10", "Table": "user_metadata", "Values": [ @@ -1412,11 +1412,11 @@ }, "TargetTabletType": "MASTER", "ChangedVindexValues": [ - "colb_colc_map" + "colb_colc_map:4" ], "KsidVindex": "kid_index", "MultiShardAutocommit": false, - "OwnedVindexQuery": "select kid, column_a, column_b, column_c from multicolvin where kid = 1 for update", + "OwnedVindexQuery": "select kid, column_a, column_b, column_c, column_b = 1 and column_c = 2 from multicolvin where kid = 1 for update", "Query": "update multicolvin set column_b = 1, column_c = 2 where kid = 1", "Table": "multicolvin", "Values": [ @@ -1440,12 +1440,12 @@ }, "TargetTabletType": "MASTER", "ChangedVindexValues": [ - "cola_map", - "colb_colc_map" + "cola_map:4", + "colb_colc_map:5" ], "KsidVindex": "kid_index", "MultiShardAutocommit": false, - "OwnedVindexQuery": "select kid, column_a, column_b, column_c from multicolvin where kid = 1 for update", + "OwnedVindexQuery": "select kid, column_a, column_b, column_c, column_a = 0, column_b = 1 and column_c = 2 from multicolvin where kid = 1 for update", "Query": "update multicolvin set column_a = 0, column_b = 1, column_c = 2 where kid = 1", "Table": "multicolvin", "Values": [ @@ -1825,11 +1825,11 @@ }, "TargetTabletType": "MASTER", "ChangedVindexValues": [ - "name_user_map" + "name_user_map:3" ], "KsidVindex": "user_index", "MultiShardAutocommit": false, - "OwnedVindexQuery": "select Id, Name, Costly from user where id = 1 for update", + "OwnedVindexQuery": "select Id, Name, Costly, name = null from user where id = 1 for update", "Query": "update user set name = null where id = 1", "Table": "user", "Values": [ @@ -1872,11 +1872,11 @@ }, "TargetTabletType": "MASTER", "ChangedVindexValues": [ - "name_user_map" + "name_user_map:3" ], "KsidVindex": "user_index", "MultiShardAutocommit": false, - "OwnedVindexQuery": "select Id, Name, Costly from user where id in (1, 2, 3) for update", + "OwnedVindexQuery": "select Id, Name, Costly, name = null from user where id in (1, 2, 3) for update", "Query": "update user set name = null where id in (1, 2, 3)", "Table": "user", "Values": [ @@ -1904,11 +1904,11 @@ }, "TargetTabletType": "MASTER", "ChangedVindexValues": [ - "name_user_map" + "name_user_map:3" ], "KsidVindex": "user_index", "MultiShardAutocommit": false, - "OwnedVindexQuery": "select Id, Name, Costly from user for update", + "OwnedVindexQuery": "select Id, Name, Costly, name = null from user for update", "Query": "update user set name = null", "Table": "user" } @@ -1928,11 +1928,11 @@ }, "TargetTabletType": "MASTER", "ChangedVindexValues": [ - "name_user_map" + "name_user_map:3" ], "KsidVindex": "user_index", "MultiShardAutocommit": false, - "OwnedVindexQuery": "select Id, Name, Costly from user where id + 1 = 2 for update", + "OwnedVindexQuery": "select Id, Name, Costly, name = null from user where id + 1 = 2 for update", "Query": "update user set name = null where id + 1 = 2", "Table": "user" } @@ -2088,11 +2088,11 @@ }, "TargetTabletType": "MASTER", "ChangedVindexValues": [ - "colb_colc_map" + "colb_colc_map:4" ], "KsidVindex": "kid_index", "MultiShardAutocommit": false, - "OwnedVindexQuery": "select kid, column_a, column_b, column_c from multicolvin where kid = 1 for update", + "OwnedVindexQuery": "select kid, column_a, column_b, column_c, column_c = 2 from multicolvin where kid = 1 for update", "Query": "update multicolvin set column_c = 2 where kid = 1", "Table": "multicolvin", "Values": [ @@ -2116,11 +2116,11 @@ }, "TargetTabletType": "MASTER", "ChangedVindexValues": [ - "name_user_map" + "name_user_map:3" ], "KsidVindex": "user_index", "MultiShardAutocommit": false, - "OwnedVindexQuery": "select Id, Name, Costly from user where id = 1 for update", + "OwnedVindexQuery": "select Id, Name, Costly, name = _binary 'abc' from user where id = 1 for update", "Query": "update user set name = _binary 'abc' where id = 1", "Table": "user", "Values": [ @@ -2186,11 +2186,11 @@ }, "TargetTabletType": "MASTER", "ChangedVindexValues": [ - "name_user_map" + "name_user_map:3" ], "KsidVindex": "user_index", "MultiShardAutocommit": false, - "OwnedVindexQuery": "select Id, Name, Costly from user for update", + "OwnedVindexQuery": "select Id, Name, Costly, name = 'myname' from user for update", "Query": "update user set name = 'myname'", "Table": "user" } diff --git a/go/vt/vtgate/planbuilder/update.go b/go/vt/vtgate/planbuilder/update.go index dfbf18fa01f..79e4060dc8b 100644 --- a/go/vt/vtgate/planbuilder/update.go +++ b/go/vt/vtgate/planbuilder/update.go @@ -34,19 +34,20 @@ func buildUpdatePlan(stmt sqlparser.Statement, vschema ContextVSchema) (engine.P return nil, err } eupd := &engine.Update{ - DML: *dml, - ChangedVindexValues: make(map[string]engine.VindexValues), + DML: *dml, } if dml.Opcode == engine.Unsharded { return eupd, nil } - if eupd.ChangedVindexValues, err = buildChangedVindexesValues(upd, eupd.Table.ColumnVindexes); err != nil { + cvv, ovq, err := buildChangedVindexesValues(upd, eupd.Table, ksidCol) + if err != nil { return nil, err } + eupd.ChangedVindexValues = cvv + eupd.OwnedVindexQuery = ovq if len(eupd.ChangedVindexValues) != 0 { - eupd.OwnedVindexQuery = generateDMLSubquery(upd.Where, upd.OrderBy, upd.Limit, eupd.Table, ksidCol) eupd.KsidVindex = ksidVindex } return eupd, nil @@ -55,10 +56,12 @@ func buildUpdatePlan(stmt sqlparser.Statement, vschema ContextVSchema) (engine.P // buildChangedVindexesValues adds to the plan all the lookup vindexes that are changing. // Updates can only be performed to secondary lookup vindexes with no complex expressions // in the set clause. -func buildChangedVindexesValues(update *sqlparser.Update, colVindexes []*vindexes.ColumnVindex) (map[string]engine.VindexValues, error) { - changedVindexes := make(map[string]engine.VindexValues) - for i, vindex := range colVindexes { - vindexValueMap := make(engine.VindexValues) +func buildChangedVindexesValues(update *sqlparser.Update, table *vindexes.Table, ksidCol string) (map[string]*engine.VindexValues, string, error) { + changedVindexes := make(map[string]*engine.VindexValues) + buf, offset := initialQuery(ksidCol, table) + for i, vindex := range table.ColumnVindexes { + vindexValueMap := make(map[string]sqltypes.PlanValue) + first := true for _, vcol := range vindex.Columns { // Searching in order of columns in colvindex. found := false @@ -67,14 +70,20 @@ func buildChangedVindexesValues(update *sqlparser.Update, colVindexes []*vindexe continue } if found { - return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "column has duplicate set values: '%v'", assignment.Name.Name) + return nil, "", vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "column has duplicate set values: '%v'", assignment.Name.Name) } found = true pv, err := extractValueFromUpdate(assignment) if err != nil { - return nil, err + return nil, "", err } vindexValueMap[vcol.String()] = pv + if first { + buf.Myprintf(", %v", assignment) + first = false + } else { + buf.Myprintf(" and %v", assignment) + } } } if len(vindexValueMap) == 0 { @@ -83,21 +92,42 @@ func buildChangedVindexesValues(update *sqlparser.Update, colVindexes []*vindexe } if update.Limit != nil && len(update.OrderBy) == 0 { - return nil, vterrors.Errorf(vtrpcpb.Code_UNIMPLEMENTED, "unsupported: Need to provide order by clause when using limit. Invalid update on vindex: %v", vindex.Name) + return nil, "", vterrors.Errorf(vtrpcpb.Code_UNIMPLEMENTED, "unsupported: Need to provide order by clause when using limit. Invalid update on vindex: %v", vindex.Name) } if i == 0 { - return nil, vterrors.Errorf(vtrpcpb.Code_UNIMPLEMENTED, "unsupported: You can't update primary vindex columns. Invalid update on vindex: %v", vindex.Name) + return nil, "", vterrors.Errorf(vtrpcpb.Code_UNIMPLEMENTED, "unsupported: You can't update primary vindex columns. Invalid update on vindex: %v", vindex.Name) } if _, ok := vindex.Vindex.(vindexes.Lookup); !ok { - return nil, vterrors.Errorf(vtrpcpb.Code_UNIMPLEMENTED, "unsupported: You can only update lookup vindexes. Invalid update on vindex: %v", vindex.Name) + return nil, "", vterrors.Errorf(vtrpcpb.Code_UNIMPLEMENTED, "unsupported: You can only update lookup vindexes. Invalid update on vindex: %v", vindex.Name) } if !vindex.Owned { - return nil, vterrors.Errorf(vtrpcpb.Code_UNIMPLEMENTED, "unsupported: You can only update owned vindexes. Invalid update on vindex: %v", vindex.Name) + return nil, "", vterrors.Errorf(vtrpcpb.Code_UNIMPLEMENTED, "unsupported: You can only update owned vindexes. Invalid update on vindex: %v", vindex.Name) } - changedVindexes[vindex.Name] = vindexValueMap + changedVindexes[vindex.Name] = &engine.VindexValues{ + PvMap: vindexValueMap, + Offset: offset, + } + offset++ + } + if len(changedVindexes) == 0 { + return nil, "", nil } + // generate rest of the owned vindex query. + buf.Myprintf(" from %v%v%v%v for update", table.Name, update.Where, update.OrderBy, update.Limit) + return changedVindexes, buf.String(), nil +} - return changedVindexes, nil +func initialQuery(ksidCol string, table *vindexes.Table) (*sqlparser.TrackedBuffer, int) { + buf := sqlparser.NewTrackedBuffer(nil) + buf.Myprintf("select %s", ksidCol) + offset := 1 + for _, cv := range table.Owned { + for _, column := range cv.Columns { + buf.Myprintf(", %v", column) + offset++ + } + } + return buf, offset } // extractValueFromUpdate given an UpdateExpr attempts to extracts the Value