Skip to content
This repository was archived by the owner on Dec 16, 2022. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion bootstrap.sh
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,6 @@ gotools=" \
golang.org/x/tools/cmd/cover \
golang.org/x/tools/cmd/goimports \
golang.org/x/tools/cmd/goyacc \
honnef.co/go/tools/cmd/unused \
"
echo "Installing dev tools with 'go get'..."
# shellcheck disable=SC2086
Expand Down
13 changes: 13 additions & 0 deletions go/streamlog/streamlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"net/url"
"os"
"os/signal"
"strings"
"sync"
"syscall"

Expand All @@ -40,6 +41,9 @@ var (
// QueryLogFormat controls the format of the query log (either text or json)
QueryLogFormat = flag.String("querylog-format", "text", "format for query logs (\"text\" or \"json\")")

// QueryLogFilterTag contains an optional string that must be present in the query for it to be logged
QueryLogFilterTag = flag.String("querylog-filter-tag", "", "string that must be present in the query for it to be logged")

sendCount = stats.NewCountersWithSingleLabel("StreamlogSend", "stream log send count", "logger_names")
deliveredCount = stats.NewCountersWithMultiLabels(
"StreamlogDelivered",
Expand Down Expand Up @@ -201,3 +205,12 @@ func GetFormatter(logger *StreamLogger) LogFormatter {
return fmter.Logf(w, params)
}
}

// ShouldEmitLog returns whether the log with the given SQL query
// should be emitted or filtered
func ShouldEmitLog(sql string) bool {
if *QueryLogFilterTag == "" {
return true
}
return strings.Contains(sql, *QueryLogFilterTag)
}
11 changes: 11 additions & 0 deletions go/vt/vtgate/engine/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package engine
import (
"encoding/json"
"fmt"
"time"

"vitess.io/vitess/go/jsonutil"
"vitess.io/vitess/go/sqltypes"
Expand Down Expand Up @@ -63,6 +64,9 @@ type Delete struct {
// Option to override the standard behavior and allow a multi-shard delete
// to use single round trip autocommit.
MultiShardAutocommit bool

// QueryTimeout contains the optional timeout (in milliseconds) to apply to this query
QueryTimeout int
}

// MarshalJSON serializes the Delete into a JSON representation.
Expand All @@ -84,6 +88,7 @@ func (del *Delete) MarshalJSON() ([]byte, error) {
Table string `json:",omitempty"`
OwnedVindexQuery string `json:",omitempty"`
MultiShardAutocommit bool `json:",omitempty"`
QueryTimeout int `json:",omitempty"`
}{
Opcode: del.Opcode,
Keyspace: del.Keyspace,
Expand All @@ -93,6 +98,7 @@ func (del *Delete) MarshalJSON() ([]byte, error) {
Table: tname,
OwnedVindexQuery: del.OwnedVindexQuery,
MultiShardAutocommit: del.MultiShardAutocommit,
QueryTimeout: del.QueryTimeout,
}
return jsonutil.MarshalNoEscape(marshalDelete)
}
Expand Down Expand Up @@ -141,6 +147,11 @@ func (del *Delete) RouteType() string {

// Execute performs a non-streaming exec.
func (del *Delete) Execute(vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool) (*sqltypes.Result, error) {
if del.QueryTimeout != 0 {
cancel := vcursor.SetContextTimeout(time.Duration(del.QueryTimeout) * time.Millisecond)
defer cancel()
}

switch del.Opcode {
case DeleteUnsharded:
return del.execDeleteUnsharded(vcursor, bindVars)
Expand Down
11 changes: 11 additions & 0 deletions go/vt/vtgate/engine/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"
"strconv"
"strings"
"time"

"vitess.io/vitess/go/jsonutil"
"vitess.io/vitess/go/sqltypes"
Expand Down Expand Up @@ -76,6 +77,9 @@ type Insert struct {
// However some application use cases would prefer that the statement partially
// succeed in order to get the performance benefits of autocommit.
MultiShardAutocommit bool

// QueryTimeout contains the optional timeout (in milliseconds) to apply to this query
QueryTimeout int
}

// NewQueryInsert creates an Insert with a query string.
Expand Down Expand Up @@ -127,6 +131,7 @@ func (ins *Insert) MarshalJSON() ([]byte, error) {
Mid []string `json:",omitempty"`
Suffix string `json:",omitempty"`
MultiShardAutocommit bool `json:",omitempty"`
QueryTimeout int `json:",omitempty"`
}{
Opcode: ins.Opcode,
Keyspace: ins.Keyspace,
Expand All @@ -138,6 +143,7 @@ func (ins *Insert) MarshalJSON() ([]byte, error) {
Mid: ins.Mid,
Suffix: ins.Suffix,
MultiShardAutocommit: ins.MultiShardAutocommit,
QueryTimeout: ins.QueryTimeout,
}
return jsonutil.MarshalNoEscape(marshalInsert)
}
Expand Down Expand Up @@ -191,6 +197,11 @@ func (ins *Insert) RouteType() string {

// Execute performs a non-streaming exec.
func (ins *Insert) Execute(vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool) (*sqltypes.Result, error) {
if ins.QueryTimeout != 0 {
cancel := vcursor.SetContextTimeout(time.Duration(ins.QueryTimeout) * time.Millisecond)
defer cancel()
}

switch ins.Opcode {
case InsertUnsharded:
return ins.execInsertUnsharded(vcursor, bindVars)
Expand Down
11 changes: 11 additions & 0 deletions go/vt/vtgate/engine/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package engine
import (
"encoding/json"
"fmt"
"time"

"vitess.io/vitess/go/jsonutil"
"vitess.io/vitess/go/sqltypes"
Expand Down Expand Up @@ -66,6 +67,9 @@ type Update struct {
// Option to override the standard behavior and allow a multi-shard update
// to use single round trip autocommit.
MultiShardAutocommit bool

// QueryTimeout contains the optional timeout (in milliseconds) to apply to this query
QueryTimeout int
}

// MarshalJSON serializes the Update into a JSON representation.
Expand All @@ -88,6 +92,7 @@ func (upd *Update) MarshalJSON() ([]byte, error) {
Table string `json:",omitempty"`
OwnedVindexQuery string `json:",omitempty"`
MultiShardAutocommit bool `json:",omitempty"`
QueryTimeout int `json:",omitempty"`
}{
Opcode: upd.Opcode,
Keyspace: upd.Keyspace,
Expand All @@ -98,6 +103,7 @@ func (upd *Update) MarshalJSON() ([]byte, error) {
Table: tname,
OwnedVindexQuery: upd.OwnedVindexQuery,
MultiShardAutocommit: upd.MultiShardAutocommit,
QueryTimeout: upd.QueryTimeout,
}
return jsonutil.MarshalNoEscape(marshalUpdate)
}
Expand Down Expand Up @@ -145,6 +151,11 @@ func (upd *Update) RouteType() string {

// Execute performs a non-streaming exec.
func (upd *Update) Execute(vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool) (*sqltypes.Result, error) {
if upd.QueryTimeout != 0 {
cancel := vcursor.SetContextTimeout(time.Duration(upd.QueryTimeout) * time.Millisecond)
defer cancel()
}

switch upd.Opcode {
case UpdateUnsharded:
return upd.execUpdateUnsharded(vcursor, bindVars)
Expand Down
4 changes: 4 additions & 0 deletions go/vt/vtgate/logstats.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,10 @@ func (stats *LogStats) RemoteAddrUsername() (string, string) {
// Logf formats the log record to the given writer, either as
// tab-separated list of logged fields or as JSON.
func (stats *LogStats) Logf(w io.Writer, params url.Values) error {
if !streamlog.ShouldEmitLog(stats.SQL) {
return nil
}

formattedBindVars := "\"[REDACTED]\""
if !*streamlog.RedactDebugUIQueries {
_, fullBindParams := params["full"]
Expand Down
29 changes: 29 additions & 0 deletions go/vt/vtgate/logstats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,35 @@ func TestLogStatsFormat(t *testing.T) {
*streamlog.QueryLogFormat = "text"
}

func TestLogStatsFilter(t *testing.T) {
defer func() { *streamlog.QueryLogFilterTag = "" }()

logStats := NewLogStats(context.Background(), "test", "sql1 /* LOG_THIS_QUERY */", map[string]*querypb.BindVariable{"intVal": sqltypes.Int64BindVariable(1)})
logStats.StartTime = time.Date(2017, time.January, 1, 1, 2, 3, 0, time.UTC)
logStats.EndTime = time.Date(2017, time.January, 1, 1, 2, 4, 1234, time.UTC)
params := map[string][]string{"full": {}}

got := testFormat(logStats, url.Values(params))
want := "test\t\t\t''\t''\t2017-01-01 01:02:03.000000\t2017-01-01 01:02:04.000001\t1.000001\t0.000000\t0.000000\t0.000000\t\t\"sql1 /* LOG_THIS_QUERY */\"\tmap[intVal:type:INT64 value:\"1\" ]\t0\t0\t\"\"\t\n"
if got != want {
t.Errorf("logstats format: got:\n%q\nwant:\n%q\n", got, want)
}

*streamlog.QueryLogFilterTag = "LOG_THIS_QUERY"
got = testFormat(logStats, url.Values(params))
want = "test\t\t\t''\t''\t2017-01-01 01:02:03.000000\t2017-01-01 01:02:04.000001\t1.000001\t0.000000\t0.000000\t0.000000\t\t\"sql1 /* LOG_THIS_QUERY */\"\tmap[intVal:type:INT64 value:\"1\" ]\t0\t0\t\"\"\t\n"
if got != want {
t.Errorf("logstats format: got:\n%q\nwant:\n%q\n", got, want)
}

*streamlog.QueryLogFilterTag = "NOT_THIS_QUERY"
got = testFormat(logStats, url.Values(params))
want = ""
if got != want {
t.Errorf("logstats format: got:\n%q\nwant:\n%q\n", got, want)
}
}

func TestLogStatsContextHTML(t *testing.T) {
html := "HtmlContext"
callInfo := &fakecallinfo.FakeCallInfo{
Expand Down
2 changes: 2 additions & 0 deletions go/vt/vtgate/planbuilder/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ func buildDeletePlan(del *sqlparser.Delete, vschema ContextVSchema) (*engine.Del
edel.MultiShardAutocommit = true
}

edel.QueryTimeout = queryTimeout(directives)

if rb.ERoute.TargetDestination != nil {
if rb.ERoute.TargetTabletType != topodatapb.TabletType_MASTER {
return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "unsupported: DELETE statement with a replica target")
Expand Down
2 changes: 2 additions & 0 deletions go/vt/vtgate/planbuilder/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,8 @@ func buildInsertShardedPlan(ins *sqlparser.Insert, table *vindexes.Table) (*engi
eins.MultiShardAutocommit = true
}

eins.QueryTimeout = queryTimeout(directives)

var rows sqlparser.Values
switch insertValues := ins.Rows.(type) {
case *sqlparser.Select, *sqlparser.Union:
Expand Down
18 changes: 18 additions & 0 deletions go/vt/vtgate/planbuilder/route.go
Original file line number Diff line number Diff line change
Expand Up @@ -680,3 +680,21 @@ func (rb *route) SetOpcode(code engine.RouteOpcode) error {
rb.ERoute.Opcode = code
return nil
}

// queryTimeout returns DirectiveQueryTimeout value if set, otherwise returns 0.
func queryTimeout(d sqlparser.CommentDirectives) int {
if d == nil {
return 0
}

val, ok := d[sqlparser.DirectiveQueryTimeout]
if !ok {
return 0
}

intVal, ok := val.(int)
if ok {
return intVal
}
return 0
}
18 changes: 0 additions & 18 deletions go/vt/vtgate/planbuilder/select.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,21 +321,3 @@ func (pb *primitiveBuilder) expandStar(inrcs []*resultColumn, expr *sqlparser.St
}
return inrcs, true, nil
}

// queryTimeout returns DirectiveQueryTimeout value if set, otherwise returns 0.
func queryTimeout(d sqlparser.CommentDirectives) int {
if d == nil {
return 0
}

val, ok := d[sqlparser.DirectiveQueryTimeout]
if !ok {
return 0
}

intVal, ok := val.(int)
if ok {
return intVal
}
return 0
}
59 changes: 59 additions & 0 deletions go/vt/vtgate/planbuilder/testdata/dml_cases.txt
Original file line number Diff line number Diff line change
Expand Up @@ -988,6 +988,33 @@
}
}

# insert with query timeout
"insert /*vt+ QUERY_TIMEOUT_MS=1 */ into user(id) values (1), (2)"
{
"Original": "insert /*vt+ QUERY_TIMEOUT_MS=1 */ into user(id) values (1), (2)",
"Instructions": {
"Opcode": "InsertSharded",
"Keyspace": {
"Name": "user",
"Sharded": true
},
"Query": "insert /*vt+ QUERY_TIMEOUT_MS=1 */ into user(id, Name, Costly) values (:_Id0, :_Name0, :_Costly0), (:_Id1, :_Name1, :_Costly1)",
"Values": [[[":__seq0",":__seq1"]],[[null,null]],[[null,null]]],
"Table": "user",
"Generate": {
"Keyspace": {
"Name": "main",
"Sharded": false
},
"Query": "select next :n values from seq",
"Values": [1,2]
},
"Prefix": "insert /*vt+ QUERY_TIMEOUT_MS=1 */ into user(id, Name, Costly) values ",
"Mid": ["(:_Id0, :_Name0, :_Costly0)","(:_Id1, :_Name1, :_Costly1)"],
"QueryTimeout": 1
}
}

# insert with multiple rows - multi-shard autocommit
"insert /*vt+ MULTI_SHARD_AUTOCOMMIT=1 */ into user(id) values (1), (2)"
{
Expand Down Expand Up @@ -1337,6 +1364,22 @@
}
}

# update with no primary vindex on where clause (scatter update) - query timeout
"update /*vt+ QUERY_TIMEOUT_MS=1 */ user_extra set val = 1"
{
"Original": "update /*vt+ QUERY_TIMEOUT_MS=1 */ user_extra set val = 1",
"Instructions": {
"Opcode": "UpdateScatter",
"Keyspace": {
"Name": "user",
"Sharded": true
},
"Query": "update /*vt+ QUERY_TIMEOUT_MS=1 */ user_extra set val = 1",
"Table": "user_extra",
"QueryTimeout": 1
}
}

# update with non-comparison expr
"update user_extra set val = 1 where id between 1 and 2"
{
Expand Down Expand Up @@ -1489,6 +1532,22 @@
}
}

# delete from with no index match - query timeout
"delete /*vt+ QUERY_TIMEOUT_MS=1 */ from user_extra where name = 'jose'"
{
"Original": "delete /*vt+ QUERY_TIMEOUT_MS=1 */ from user_extra where name = 'jose'",
"Instructions": {
"Opcode": "DeleteScatter",
"Keyspace": {
"Name": "user",
"Sharded": true
},
"Query": "delete /*vt+ QUERY_TIMEOUT_MS=1 */ from user_extra where name = 'jose'",
"Table": "user_extra",
"QueryTimeout": 1
}
}

# delete from with primary id in through IN clause
"delete from user_extra where user_id in (1, 2)"
{
Expand Down
2 changes: 2 additions & 0 deletions go/vt/vtgate/planbuilder/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ func buildUpdatePlan(upd *sqlparser.Update, vschema ContextVSchema) (*engine.Upd
eupd.MultiShardAutocommit = true
}

eupd.QueryTimeout = queryTimeout(directives)

var vindexTable *vindexes.Table
for _, tval := range pb.st.tables {
vindexTable = tval.vindexTable
Expand Down
Loading