Skip to content
4 changes: 4 additions & 0 deletions changelog/17.0/17.0.0/summary.md
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,10 @@ Prior to v17, this asynchronous process could run indefinitely in the background
this behavior was changed to use a context with a timeout of `action_timeout`. If you are using VtctldClient to initiate a restore, make sure you provide an appropriate value for action_timeout to give enough
time for the restore process to complete. Otherwise, the restore will throw an error if the context expires before it completes.

### <a id="Vttablet-TxThrottler"> Vttablet's transaction throttler now also throttles DML outside of `BEGIN; ...; COMMIT;` blocks
Prior to v17, `vttablet`'s transaction throttler (enabled with `--enable-tx-throttler`) would only throttle requests done inside an explicit transaction, i.e., a `BEGIN; ...; COMMIT;` block.
In v17 [PR#13040](https://github.com/vitessio/vitess/issues/13037), this behavior was being changed so that it also throttles work outside of explicit transactions for `INSERT/UPDATE/DELETE/LOAD` queries.

### <a id="new-flag"/> New command line flags and behavior

#### <a id="builtin-backup-read-buffering-flags" /> Backup --builtinbackup-file-read-buffer-size and --builtinbackup-file-write-buffer-size
Expand Down
8 changes: 8 additions & 0 deletions go/vt/vttablet/tabletserver/query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ var (
Type: sqltypes.Int64,
},
}
errTxThrottled = vterrors.Errorf(vtrpcpb.Code_RESOURCE_EXHAUSTED, "Transaction throttled")
)

func returnStreamResult(result *sqltypes.Result) error {
Expand Down Expand Up @@ -220,6 +221,10 @@ func (qre *QueryExecutor) execAutocommit(f func(conn *StatefulConnection) (*sqlt
}
qre.options.TransactionIsolation = querypb.ExecuteOptions_AUTOCOMMIT

if qre.tsv.txThrottler.Throttle(qre.tsv.getPriorityFromOptions(qre.options)) {
return nil, errTxThrottled
}

conn, _, _, err := qre.tsv.te.txPool.Begin(qre.ctx, qre.options, false, 0, nil, qre.setting)

if err != nil {
Expand All @@ -231,6 +236,9 @@ func (qre *QueryExecutor) execAutocommit(f func(conn *StatefulConnection) (*sqlt
}

func (qre *QueryExecutor) execAsTransaction(f func(conn *StatefulConnection) (*sqltypes.Result, error)) (*sqltypes.Result, error) {
if qre.tsv.txThrottler.Throttle(qre.tsv.getPriorityFromOptions(qre.options)) {
return nil, errTxThrottled
}
conn, beginSQL, _, err := qre.tsv.te.txPool.Begin(qre.ctx, qre.options, false, 0, nil, qre.setting)
if err != nil {
return nil, err
Expand Down
98 changes: 74 additions & 24 deletions go/vt/vttablet/tabletserver/query_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,6 @@ import (
"strings"
"testing"

"vitess.io/vitess/go/vt/sidecardb"

"vitess.io/vitess/go/vt/vttablet/tabletserver/tx"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

Expand All @@ -38,13 +34,16 @@ import (
"vitess.io/vitess/go/vt/callerid"
"vitess.io/vitess/go/vt/callinfo"
"vitess.io/vitess/go/vt/callinfo/fakecallinfo"
"vitess.io/vitess/go/vt/sidecardb"
"vitess.io/vitess/go/vt/tableacl"
"vitess.io/vitess/go/vt/tableacl/simpleacl"
"vitess.io/vitess/go/vt/topo/memorytopo"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vttablet/tabletserver/planbuilder"
"vitess.io/vitess/go/vt/vttablet/tabletserver/rules"
"vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv"
"vitess.io/vitess/go/vt/vttablet/tabletserver/tx"
"vitess.io/vitess/go/vt/vttablet/tabletserver/txthrottler"

querypb "vitess.io/vitess/go/vt/proto/query"
tableaclpb "vitess.io/vitess/go/vt/proto/tableacl"
Expand Down Expand Up @@ -82,6 +81,10 @@ func TestQueryExecutorPlans(t *testing.T) {
// inTxWant is the query log we expect if we're in a transation.
// If empty, then we should expect the same as logWant.
inTxWant string
// errorWant is the error we expect to get, if any, and should be nil if no error should be returned
errorWant error
// TxThrottler allows the test case to override the transaction throttler
txThrottler txthrottler.TxThrottler
}{{
input: "select * from t",
dbResponses: []dbResponse{{
Expand Down Expand Up @@ -268,7 +271,25 @@ func TestQueryExecutorPlans(t *testing.T) {
resultWant: emptyResult,
planWant: "Show",
logWant: "show create table mysql.`user`",
}}
}, {
input: "update test_table set a=1",
dbResponses: []dbResponse{{
query: "update test_table set a = 1 limit 10001",
result: dmlResult,
}},
errorWant: errTxThrottled,
txThrottler: &mockTxThrottler{true},
}, {
input: "update test_table set a=1",
passThrough: true,
dbResponses: []dbResponse{{
query: "update test_table set a = 1 limit 10001",
result: dmlResult,
}},
errorWant: errTxThrottled,
txThrottler: &mockTxThrottler{true},
},
}
for _, tcase := range testcases {
t.Run(tcase.input, func(t *testing.T) {
db := setUpQueryExecutorTest(t)
Expand All @@ -278,6 +299,9 @@ func TestQueryExecutorPlans(t *testing.T) {
}
ctx := context.Background()
tsv := newTestTabletServer(ctx, noFlags, db)
if tcase.txThrottler != nil {
tsv.txThrottler = tcase.txThrottler
}
tsv.config.DB.DBName = "ks"
defer tsv.StopService()

Expand All @@ -286,32 +310,39 @@ func TestQueryExecutorPlans(t *testing.T) {
// Test outside a transaction.
qre := newTestQueryExecutor(ctx, tsv, tcase.input, 0)
got, err := qre.Execute()
require.NoError(t, err, tcase.input)
assert.Equal(t, tcase.resultWant, got, tcase.input)
assert.Equal(t, tcase.planWant, qre.logStats.PlanType, tcase.input)
assert.Equal(t, tcase.logWant, qre.logStats.RewrittenSQL(), tcase.input)

if tcase.errorWant == nil {
require.NoError(t, err, tcase.input)
assert.Equal(t, tcase.resultWant, got, tcase.input)
assert.Equal(t, tcase.planWant, qre.logStats.PlanType, tcase.input)
assert.Equal(t, tcase.logWant, qre.logStats.RewrittenSQL(), tcase.input)
} else {
assert.True(t, vterrors.Equals(err, tcase.errorWant))
}
// Wait for the existing query to be processed by the cache
tsv.QueryPlanCacheWait()

// Test inside a transaction.
target := tsv.sm.Target()
state, err := tsv.Begin(ctx, target, nil)
require.NoError(t, err)
require.NotNil(t, state.TabletAlias, "alias should not be nil")
assert.Equal(t, tsv.alias, state.TabletAlias, "Wrong alias returned by Begin")
defer tsv.Commit(ctx, target, state.TransactionID)

qre = newTestQueryExecutor(ctx, tsv, tcase.input, state.TransactionID)
got, err = qre.Execute()
require.NoError(t, err, tcase.input)
assert.Equal(t, tcase.resultWant, got, "in tx: %v", tcase.input)
assert.Equal(t, tcase.planWant, qre.logStats.PlanType, "in tx: %v", tcase.input)
want := tcase.logWant
if tcase.inTxWant != "" {
want = tcase.inTxWant
if tcase.errorWant == nil {
require.NoError(t, err)
require.NotNil(t, state.TabletAlias, "alias should not be nil")
assert.Equal(t, tsv.alias, state.TabletAlias, "Wrong alias returned by Begin")
defer tsv.Commit(ctx, target, state.TransactionID)

qre = newTestQueryExecutor(ctx, tsv, tcase.input, state.TransactionID)
got, err = qre.Execute()
require.NoError(t, err, tcase.input)
assert.Equal(t, tcase.resultWant, got, "in tx: %v", tcase.input)
assert.Equal(t, tcase.planWant, qre.logStats.PlanType, "in tx: %v", tcase.input)
want := tcase.logWant
if tcase.inTxWant != "" {
want = tcase.inTxWant
}
assert.Equal(t, want, qre.logStats.RewrittenSQL(), "in tx: %v", tcase.input)
} else {
assert.True(t, vterrors.Equals(err, tcase.errorWant))
}
assert.Equal(t, want, qre.logStats.RewrittenSQL(), "in tx: %v", tcase.input)
})
}
}
Expand Down Expand Up @@ -1759,3 +1790,22 @@ func TestQueryExecSchemaReloadCount(t *testing.T) {
})
}
}

type mockTxThrottler struct {
throttle bool
}

func (m mockTxThrottler) InitDBConfig(target *querypb.Target) {
panic("implement me")
}

func (m mockTxThrottler) Open() (err error) {
return nil
}

func (m mockTxThrottler) Close() {
}

func (m mockTxThrottler) Throttle(priority int) (result bool) {
return m.throttle
}
44 changes: 27 additions & 17 deletions go/vt/vttablet/tabletserver/tabletserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ type TabletServer struct {
tracker *schema.Tracker
watcher *BinlogWatcher
qe *QueryEngine
txThrottler *txthrottler.TxThrottler
txThrottler txthrottler.TxThrottler
te *TxEngine
messager *messager.Engine
hs *healthStreamer
Expand Down Expand Up @@ -493,22 +493,8 @@ func (tsv *TabletServer) begin(ctx context.Context, target *querypb.Target, save
target, options, false, /* allowOnShutdown */
func(ctx context.Context, logStats *tabletenv.LogStats) error {
startTime := time.Now()
priority := tsv.config.TxThrottlerDefaultPriority
if options != nil && options.Priority != "" {
optionsPriority, err := strconv.Atoi(options.Priority)
// This should never error out, as the value for Priority has been validated in the vtgate already.
// Still, handle it just to make sure.
if err != nil {
log.Errorf(
"The value of the %s query directive could not be converted to integer, using the "+
"default value. Error was: %s",
sqlparser.DirectivePriority, priority, err)
} else {
priority = optionsPriority
}
}
if tsv.txThrottler.Throttle(priority) {
return vterrors.Errorf(vtrpcpb.Code_RESOURCE_EXHAUSTED, "Transaction throttled")
if tsv.txThrottler.Throttle(tsv.getPriorityFromOptions(options)) {
return errTxThrottled
}
var connSetting *pools.Setting
if len(settings) > 0 {
Expand Down Expand Up @@ -539,6 +525,30 @@ func (tsv *TabletServer) begin(ctx context.Context, target *querypb.Target, save
return state, err
}

func (tsv *TabletServer) getPriorityFromOptions(options *querypb.ExecuteOptions) int {
priority := tsv.config.TxThrottlerDefaultPriority
if options == nil {
return priority
}
if options.Priority == "" {
return priority
}

optionsPriority, err := strconv.Atoi(options.Priority)
// This should never error out, as the value for Priority has been validated in the vtgate already.
// Still, handle it just to make sure.
if err != nil {
log.Errorf(
"The value of the %s query directive could not be converted to integer, using the "+
"default value. Error was: %s",
sqlparser.DirectivePriority, priority, err)

return priority
}

return optionsPriority
}

// Commit commits the specified transaction.
func (tsv *TabletServer) Commit(ctx context.Context, target *querypb.Target, transactionID int64) (newReservedID int64, err error) {
err = tsv.execRequest(
Expand Down
Loading