Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions go/vt/vtgate/engine/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package engine
import (
"encoding/json"
"fmt"
"time"

"vitess.io/vitess/go/jsonutil"
"vitess.io/vitess/go/sqltypes"
Expand Down Expand Up @@ -63,6 +64,9 @@ type Delete struct {
// Option to override the standard behavior and allow a multi-shard delete
// to use single round trip autocommit.
MultiShardAutocommit bool

// QueryTimeout contains the optional timeout (in milliseconds) to apply to this query
QueryTimeout int
}

// MarshalJSON serializes the Delete into a JSON representation.
Expand All @@ -84,6 +88,7 @@ func (del *Delete) MarshalJSON() ([]byte, error) {
Table string `json:",omitempty"`
OwnedVindexQuery string `json:",omitempty"`
MultiShardAutocommit bool `json:",omitempty"`
QueryTimeout int `json:",omitempty"`
}{
Opcode: del.Opcode,
Keyspace: del.Keyspace,
Expand All @@ -93,6 +98,7 @@ func (del *Delete) MarshalJSON() ([]byte, error) {
Table: tname,
OwnedVindexQuery: del.OwnedVindexQuery,
MultiShardAutocommit: del.MultiShardAutocommit,
QueryTimeout: del.QueryTimeout,
}
return jsonutil.MarshalNoEscape(marshalDelete)
}
Expand Down Expand Up @@ -141,6 +147,11 @@ func (del *Delete) RouteType() string {

// Execute performs a non-streaming exec.
func (del *Delete) Execute(vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool) (*sqltypes.Result, error) {
if del.QueryTimeout != 0 {
cancel := vcursor.SetContextTimeout(time.Duration(del.QueryTimeout) * time.Millisecond)
defer cancel()
}

switch del.Opcode {
case DeleteUnsharded:
return del.execDeleteUnsharded(vcursor, bindVars)
Expand Down
11 changes: 11 additions & 0 deletions go/vt/vtgate/engine/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"
"strconv"
"strings"
"time"

"vitess.io/vitess/go/jsonutil"
"vitess.io/vitess/go/sqltypes"
Expand Down Expand Up @@ -76,6 +77,9 @@ type Insert struct {
// However some application use cases would prefer that the statement partially
// succeed in order to get the performance benefits of autocommit.
MultiShardAutocommit bool

// QueryTimeout contains the optional timeout (in milliseconds) to apply to this query
QueryTimeout int
}

// NewQueryInsert creates an Insert with a query string.
Expand Down Expand Up @@ -127,6 +131,7 @@ func (ins *Insert) MarshalJSON() ([]byte, error) {
Mid []string `json:",omitempty"`
Suffix string `json:",omitempty"`
MultiShardAutocommit bool `json:",omitempty"`
QueryTimeout int `json:",omitempty"`
}{
Opcode: ins.Opcode,
Keyspace: ins.Keyspace,
Expand All @@ -138,6 +143,7 @@ func (ins *Insert) MarshalJSON() ([]byte, error) {
Mid: ins.Mid,
Suffix: ins.Suffix,
MultiShardAutocommit: ins.MultiShardAutocommit,
QueryTimeout: ins.QueryTimeout,
}
return jsonutil.MarshalNoEscape(marshalInsert)
}
Expand Down Expand Up @@ -191,6 +197,11 @@ func (ins *Insert) RouteType() string {

// Execute performs a non-streaming exec.
func (ins *Insert) Execute(vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool) (*sqltypes.Result, error) {
if ins.QueryTimeout != 0 {
cancel := vcursor.SetContextTimeout(time.Duration(ins.QueryTimeout) * time.Millisecond)
defer cancel()
}

switch ins.Opcode {
case InsertUnsharded:
return ins.execInsertUnsharded(vcursor, bindVars)
Expand Down
11 changes: 11 additions & 0 deletions go/vt/vtgate/engine/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package engine
import (
"encoding/json"
"fmt"
"time"

"vitess.io/vitess/go/jsonutil"
"vitess.io/vitess/go/sqltypes"
Expand Down Expand Up @@ -66,6 +67,9 @@ type Update struct {
// Option to override the standard behavior and allow a multi-shard update
// to use single round trip autocommit.
MultiShardAutocommit bool

// QueryTimeout contains the optional timeout (in milliseconds) to apply to this query
QueryTimeout int
}

// MarshalJSON serializes the Update into a JSON representation.
Expand All @@ -88,6 +92,7 @@ func (upd *Update) MarshalJSON() ([]byte, error) {
Table string `json:",omitempty"`
OwnedVindexQuery string `json:",omitempty"`
MultiShardAutocommit bool `json:",omitempty"`
QueryTimeout int `json:",omitempty"`
}{
Opcode: upd.Opcode,
Keyspace: upd.Keyspace,
Expand All @@ -98,6 +103,7 @@ func (upd *Update) MarshalJSON() ([]byte, error) {
Table: tname,
OwnedVindexQuery: upd.OwnedVindexQuery,
MultiShardAutocommit: upd.MultiShardAutocommit,
QueryTimeout: upd.QueryTimeout,
}
return jsonutil.MarshalNoEscape(marshalUpdate)
}
Expand Down Expand Up @@ -145,6 +151,11 @@ func (upd *Update) RouteType() string {

// Execute performs a non-streaming exec.
func (upd *Update) Execute(vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool) (*sqltypes.Result, error) {
if upd.QueryTimeout != 0 {
cancel := vcursor.SetContextTimeout(time.Duration(upd.QueryTimeout) * time.Millisecond)
defer cancel()
}

switch upd.Opcode {
case UpdateUnsharded:
return upd.execUpdateUnsharded(vcursor, bindVars)
Expand Down
2 changes: 2 additions & 0 deletions go/vt/vtgate/planbuilder/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ func buildDeletePlan(del *sqlparser.Delete, vschema ContextVSchema) (*engine.Del
edel.MultiShardAutocommit = true
}

edel.QueryTimeout = queryTimeout(directives)

if rb.ERoute.TargetDestination != nil {
if rb.ERoute.TargetTabletType != topodatapb.TabletType_MASTER {
return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "unsupported: DELETE statement with a replica target")
Expand Down
2 changes: 2 additions & 0 deletions go/vt/vtgate/planbuilder/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,8 @@ func buildInsertShardedPlan(ins *sqlparser.Insert, table *vindexes.Table) (*engi
eins.MultiShardAutocommit = true
}

eins.QueryTimeout = queryTimeout(directives)

var rows sqlparser.Values
switch insertValues := ins.Rows.(type) {
case *sqlparser.Select, *sqlparser.Union:
Expand Down
18 changes: 18 additions & 0 deletions go/vt/vtgate/planbuilder/route.go
Original file line number Diff line number Diff line change
Expand Up @@ -680,3 +680,21 @@ func (rb *route) SetOpcode(code engine.RouteOpcode) error {
rb.ERoute.Opcode = code
return nil
}

// queryTimeout returns DirectiveQueryTimeout value if set, otherwise returns 0.
func queryTimeout(d sqlparser.CommentDirectives) int {
if d == nil {
return 0
}

val, ok := d[sqlparser.DirectiveQueryTimeout]
if !ok {
return 0
}

intVal, ok := val.(int)
if ok {
return intVal
}
return 0
}
18 changes: 0 additions & 18 deletions go/vt/vtgate/planbuilder/select.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,21 +321,3 @@ func (pb *primitiveBuilder) expandStar(inrcs []*resultColumn, expr *sqlparser.St
}
return inrcs, true, nil
}

// queryTimeout returns DirectiveQueryTimeout value if set, otherwise returns 0.
func queryTimeout(d sqlparser.CommentDirectives) int {
if d == nil {
return 0
}

val, ok := d[sqlparser.DirectiveQueryTimeout]
if !ok {
return 0
}

intVal, ok := val.(int)
if ok {
return intVal
}
return 0
}
59 changes: 59 additions & 0 deletions go/vt/vtgate/planbuilder/testdata/dml_cases.txt
Original file line number Diff line number Diff line change
Expand Up @@ -988,6 +988,33 @@
}
}

# insert with query timeout
"insert /*vt+ QUERY_TIMEOUT_MS=1 */ into user(id) values (1), (2)"
{
"Original": "insert /*vt+ QUERY_TIMEOUT_MS=1 */ into user(id) values (1), (2)",
"Instructions": {
"Opcode": "InsertSharded",
"Keyspace": {
"Name": "user",
"Sharded": true
},
"Query": "insert /*vt+ QUERY_TIMEOUT_MS=1 */ into user(id, Name, Costly) values (:_Id0, :_Name0, :_Costly0), (:_Id1, :_Name1, :_Costly1)",
"Values": [[[":__seq0",":__seq1"]],[[null,null]],[[null,null]]],
"Table": "user",
"Generate": {
"Keyspace": {
"Name": "main",
"Sharded": false
},
"Query": "select next :n values from seq",
"Values": [1,2]
},
"Prefix": "insert /*vt+ QUERY_TIMEOUT_MS=1 */ into user(id, Name, Costly) values ",
"Mid": ["(:_Id0, :_Name0, :_Costly0)","(:_Id1, :_Name1, :_Costly1)"],
"QueryTimeout": 1
}
}

# insert with multiple rows - multi-shard autocommit
"insert /*vt+ MULTI_SHARD_AUTOCOMMIT=1 */ into user(id) values (1), (2)"
{
Expand Down Expand Up @@ -1337,6 +1364,22 @@
}
}

# update with no primary vindex on where clause (scatter update) - query timeout
"update /*vt+ QUERY_TIMEOUT_MS=1 */ user_extra set val = 1"
{
"Original": "update /*vt+ QUERY_TIMEOUT_MS=1 */ user_extra set val = 1",
"Instructions": {
"Opcode": "UpdateScatter",
"Keyspace": {
"Name": "user",
"Sharded": true
},
"Query": "update /*vt+ QUERY_TIMEOUT_MS=1 */ user_extra set val = 1",
"Table": "user_extra",
"QueryTimeout": 1
}
}

# update with non-comparison expr
"update user_extra set val = 1 where id between 1 and 2"
{
Expand Down Expand Up @@ -1489,6 +1532,22 @@
}
}

# delete from with no index match - query timeout
"delete /*vt+ QUERY_TIMEOUT_MS=1 */ from user_extra where name = 'jose'"
{
"Original": "delete /*vt+ QUERY_TIMEOUT_MS=1 */ from user_extra where name = 'jose'",
"Instructions": {
"Opcode": "DeleteScatter",
"Keyspace": {
"Name": "user",
"Sharded": true
},
"Query": "delete /*vt+ QUERY_TIMEOUT_MS=1 */ from user_extra where name = 'jose'",
"Table": "user_extra",
"QueryTimeout": 1
}
}

# delete from with primary id in through IN clause
"delete from user_extra where user_id in (1, 2)"
{
Expand Down
2 changes: 2 additions & 0 deletions go/vt/vtgate/planbuilder/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ func buildUpdatePlan(upd *sqlparser.Update, vschema ContextVSchema) (*engine.Upd
eupd.MultiShardAutocommit = true
}

eupd.QueryTimeout = queryTimeout(directives)

var vindexTable *vindexes.Table
for _, tval := range pb.st.tables {
vindexTable = tval.vindexTable
Expand Down