diff --git a/changelog/17.0/17.0.0/summary.md b/changelog/17.0/17.0.0/summary.md
index 1b2acf9cbea..5d6c5af8757 100644
--- a/changelog/17.0/17.0.0/summary.md
+++ b/changelog/17.0/17.0.0/summary.md
@@ -88,6 +88,10 @@ Prior to v17, this asynchronous process could run indefinitely in the background
this behavior was changed to use a context with a timeout of `action_timeout`. If you are using VtctldClient to initiate a restore, make sure you provide an appropriate value for action_timeout to give enough
time for the restore process to complete. Otherwise, the restore will throw an error if the context expires before it completes.
+### Vttablet's transaction throttler now also throttles DML outside of `BEGIN; ...; COMMIT;` blocks
+Prior to v17, `vttablet`'s transaction throttler (enabled with `--enable-tx-throttler`) would only throttle requests done inside an explicit transaction, i.e., a `BEGIN; ...; COMMIT;` block.
+In v17 [PR#13040](https://github.com/vitessio/vitess/issues/13037), this behavior was being changed so that it also throttles work outside of explicit transactions for `INSERT/UPDATE/DELETE/LOAD` queries.
+
### New command line flags and behavior
#### Backup --builtinbackup-file-read-buffer-size and --builtinbackup-file-write-buffer-size
diff --git a/go/vt/vttablet/tabletserver/query_executor.go b/go/vt/vttablet/tabletserver/query_executor.go
index 6696a4d8268..180f13be54d 100644
--- a/go/vt/vttablet/tabletserver/query_executor.go
+++ b/go/vt/vttablet/tabletserver/query_executor.go
@@ -80,6 +80,7 @@ var (
Type: sqltypes.Int64,
},
}
+ errTxThrottled = vterrors.Errorf(vtrpcpb.Code_RESOURCE_EXHAUSTED, "Transaction throttled")
)
func returnStreamResult(result *sqltypes.Result) error {
@@ -220,6 +221,10 @@ func (qre *QueryExecutor) execAutocommit(f func(conn *StatefulConnection) (*sqlt
}
qre.options.TransactionIsolation = querypb.ExecuteOptions_AUTOCOMMIT
+ if qre.tsv.txThrottler.Throttle(qre.tsv.getPriorityFromOptions(qre.options)) {
+ return nil, errTxThrottled
+ }
+
conn, _, _, err := qre.tsv.te.txPool.Begin(qre.ctx, qre.options, false, 0, nil, qre.setting)
if err != nil {
@@ -231,6 +236,9 @@ func (qre *QueryExecutor) execAutocommit(f func(conn *StatefulConnection) (*sqlt
}
func (qre *QueryExecutor) execAsTransaction(f func(conn *StatefulConnection) (*sqltypes.Result, error)) (*sqltypes.Result, error) {
+ if qre.tsv.txThrottler.Throttle(qre.tsv.getPriorityFromOptions(qre.options)) {
+ return nil, errTxThrottled
+ }
conn, beginSQL, _, err := qre.tsv.te.txPool.Begin(qre.ctx, qre.options, false, 0, nil, qre.setting)
if err != nil {
return nil, err
diff --git a/go/vt/vttablet/tabletserver/query_executor_test.go b/go/vt/vttablet/tabletserver/query_executor_test.go
index 7d2d28ecd09..3ab653bf50c 100644
--- a/go/vt/vttablet/tabletserver/query_executor_test.go
+++ b/go/vt/vttablet/tabletserver/query_executor_test.go
@@ -24,10 +24,6 @@ import (
"strings"
"testing"
- "vitess.io/vitess/go/vt/sidecardb"
-
- "vitess.io/vitess/go/vt/vttablet/tabletserver/tx"
-
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
@@ -38,6 +34,7 @@ import (
"vitess.io/vitess/go/vt/callerid"
"vitess.io/vitess/go/vt/callinfo"
"vitess.io/vitess/go/vt/callinfo/fakecallinfo"
+ "vitess.io/vitess/go/vt/sidecardb"
"vitess.io/vitess/go/vt/tableacl"
"vitess.io/vitess/go/vt/tableacl/simpleacl"
"vitess.io/vitess/go/vt/topo/memorytopo"
@@ -45,6 +42,8 @@ import (
"vitess.io/vitess/go/vt/vttablet/tabletserver/planbuilder"
"vitess.io/vitess/go/vt/vttablet/tabletserver/rules"
"vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv"
+ "vitess.io/vitess/go/vt/vttablet/tabletserver/tx"
+ "vitess.io/vitess/go/vt/vttablet/tabletserver/txthrottler"
querypb "vitess.io/vitess/go/vt/proto/query"
tableaclpb "vitess.io/vitess/go/vt/proto/tableacl"
@@ -82,6 +81,10 @@ func TestQueryExecutorPlans(t *testing.T) {
// inTxWant is the query log we expect if we're in a transation.
// If empty, then we should expect the same as logWant.
inTxWant string
+ // errorWant is the error we expect to get, if any, and should be nil if no error should be returned
+ errorWant error
+ // TxThrottler allows the test case to override the transaction throttler
+ txThrottler txthrottler.TxThrottler
}{{
input: "select * from t",
dbResponses: []dbResponse{{
@@ -268,7 +271,25 @@ func TestQueryExecutorPlans(t *testing.T) {
resultWant: emptyResult,
planWant: "Show",
logWant: "show create table mysql.`user`",
- }}
+ }, {
+ input: "update test_table set a=1",
+ dbResponses: []dbResponse{{
+ query: "update test_table set a = 1 limit 10001",
+ result: dmlResult,
+ }},
+ errorWant: errTxThrottled,
+ txThrottler: &mockTxThrottler{true},
+ }, {
+ input: "update test_table set a=1",
+ passThrough: true,
+ dbResponses: []dbResponse{{
+ query: "update test_table set a = 1 limit 10001",
+ result: dmlResult,
+ }},
+ errorWant: errTxThrottled,
+ txThrottler: &mockTxThrottler{true},
+ },
+ }
for _, tcase := range testcases {
t.Run(tcase.input, func(t *testing.T) {
db := setUpQueryExecutorTest(t)
@@ -278,6 +299,9 @@ func TestQueryExecutorPlans(t *testing.T) {
}
ctx := context.Background()
tsv := newTestTabletServer(ctx, noFlags, db)
+ if tcase.txThrottler != nil {
+ tsv.txThrottler = tcase.txThrottler
+ }
tsv.config.DB.DBName = "ks"
defer tsv.StopService()
@@ -286,32 +310,39 @@ func TestQueryExecutorPlans(t *testing.T) {
// Test outside a transaction.
qre := newTestQueryExecutor(ctx, tsv, tcase.input, 0)
got, err := qre.Execute()
- require.NoError(t, err, tcase.input)
- assert.Equal(t, tcase.resultWant, got, tcase.input)
- assert.Equal(t, tcase.planWant, qre.logStats.PlanType, tcase.input)
- assert.Equal(t, tcase.logWant, qre.logStats.RewrittenSQL(), tcase.input)
-
+ if tcase.errorWant == nil {
+ require.NoError(t, err, tcase.input)
+ assert.Equal(t, tcase.resultWant, got, tcase.input)
+ assert.Equal(t, tcase.planWant, qre.logStats.PlanType, tcase.input)
+ assert.Equal(t, tcase.logWant, qre.logStats.RewrittenSQL(), tcase.input)
+ } else {
+ assert.True(t, vterrors.Equals(err, tcase.errorWant))
+ }
// Wait for the existing query to be processed by the cache
tsv.QueryPlanCacheWait()
// Test inside a transaction.
target := tsv.sm.Target()
state, err := tsv.Begin(ctx, target, nil)
- require.NoError(t, err)
- require.NotNil(t, state.TabletAlias, "alias should not be nil")
- assert.Equal(t, tsv.alias, state.TabletAlias, "Wrong alias returned by Begin")
- defer tsv.Commit(ctx, target, state.TransactionID)
-
- qre = newTestQueryExecutor(ctx, tsv, tcase.input, state.TransactionID)
- got, err = qre.Execute()
- require.NoError(t, err, tcase.input)
- assert.Equal(t, tcase.resultWant, got, "in tx: %v", tcase.input)
- assert.Equal(t, tcase.planWant, qre.logStats.PlanType, "in tx: %v", tcase.input)
- want := tcase.logWant
- if tcase.inTxWant != "" {
- want = tcase.inTxWant
+ if tcase.errorWant == nil {
+ require.NoError(t, err)
+ require.NotNil(t, state.TabletAlias, "alias should not be nil")
+ assert.Equal(t, tsv.alias, state.TabletAlias, "Wrong alias returned by Begin")
+ defer tsv.Commit(ctx, target, state.TransactionID)
+
+ qre = newTestQueryExecutor(ctx, tsv, tcase.input, state.TransactionID)
+ got, err = qre.Execute()
+ require.NoError(t, err, tcase.input)
+ assert.Equal(t, tcase.resultWant, got, "in tx: %v", tcase.input)
+ assert.Equal(t, tcase.planWant, qre.logStats.PlanType, "in tx: %v", tcase.input)
+ want := tcase.logWant
+ if tcase.inTxWant != "" {
+ want = tcase.inTxWant
+ }
+ assert.Equal(t, want, qre.logStats.RewrittenSQL(), "in tx: %v", tcase.input)
+ } else {
+ assert.True(t, vterrors.Equals(err, tcase.errorWant))
}
- assert.Equal(t, want, qre.logStats.RewrittenSQL(), "in tx: %v", tcase.input)
})
}
}
@@ -1759,3 +1790,22 @@ func TestQueryExecSchemaReloadCount(t *testing.T) {
})
}
}
+
+type mockTxThrottler struct {
+ throttle bool
+}
+
+func (m mockTxThrottler) InitDBConfig(target *querypb.Target) {
+ panic("implement me")
+}
+
+func (m mockTxThrottler) Open() (err error) {
+ return nil
+}
+
+func (m mockTxThrottler) Close() {
+}
+
+func (m mockTxThrottler) Throttle(priority int) (result bool) {
+ return m.throttle
+}
diff --git a/go/vt/vttablet/tabletserver/tabletserver.go b/go/vt/vttablet/tabletserver/tabletserver.go
index cce32016da3..88d88eb8429 100644
--- a/go/vt/vttablet/tabletserver/tabletserver.go
+++ b/go/vt/vttablet/tabletserver/tabletserver.go
@@ -112,7 +112,7 @@ type TabletServer struct {
tracker *schema.Tracker
watcher *BinlogWatcher
qe *QueryEngine
- txThrottler *txthrottler.TxThrottler
+ txThrottler txthrottler.TxThrottler
te *TxEngine
messager *messager.Engine
hs *healthStreamer
@@ -493,22 +493,8 @@ func (tsv *TabletServer) begin(ctx context.Context, target *querypb.Target, save
target, options, false, /* allowOnShutdown */
func(ctx context.Context, logStats *tabletenv.LogStats) error {
startTime := time.Now()
- priority := tsv.config.TxThrottlerDefaultPriority
- if options != nil && options.Priority != "" {
- optionsPriority, err := strconv.Atoi(options.Priority)
- // This should never error out, as the value for Priority has been validated in the vtgate already.
- // Still, handle it just to make sure.
- if err != nil {
- log.Errorf(
- "The value of the %s query directive could not be converted to integer, using the "+
- "default value. Error was: %s",
- sqlparser.DirectivePriority, priority, err)
- } else {
- priority = optionsPriority
- }
- }
- if tsv.txThrottler.Throttle(priority) {
- return vterrors.Errorf(vtrpcpb.Code_RESOURCE_EXHAUSTED, "Transaction throttled")
+ if tsv.txThrottler.Throttle(tsv.getPriorityFromOptions(options)) {
+ return errTxThrottled
}
var connSetting *pools.Setting
if len(settings) > 0 {
@@ -539,6 +525,30 @@ func (tsv *TabletServer) begin(ctx context.Context, target *querypb.Target, save
return state, err
}
+func (tsv *TabletServer) getPriorityFromOptions(options *querypb.ExecuteOptions) int {
+ priority := tsv.config.TxThrottlerDefaultPriority
+ if options == nil {
+ return priority
+ }
+ if options.Priority == "" {
+ return priority
+ }
+
+ optionsPriority, err := strconv.Atoi(options.Priority)
+ // This should never error out, as the value for Priority has been validated in the vtgate already.
+ // Still, handle it just to make sure.
+ if err != nil {
+ log.Errorf(
+ "The value of the %s query directive could not be converted to integer, using the "+
+ "default value. Error was: %s",
+ sqlparser.DirectivePriority, priority, err)
+
+ return priority
+ }
+
+ return optionsPriority
+}
+
// Commit commits the specified transaction.
func (tsv *TabletServer) Commit(ctx context.Context, target *querypb.Target, transactionID int64) (newReservedID int64, err error) {
err = tsv.execRequest(
diff --git a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go
index 64e7070a228..bc5235593ac 100644
--- a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go
+++ b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go
@@ -65,6 +65,14 @@ func resetTxThrottlerFactories() {
}
}
+// TxThrottler defines the interface for the transaction throttler.
+type TxThrottler interface {
+ InitDBConfig(target *querypb.Target)
+ Open() (err error)
+ Close()
+ Throttle(priority int) (result bool)
+}
+
func init() {
resetTxThrottlerFactories()
}
@@ -95,7 +103,7 @@ type TopologyWatcherInterface interface {
// go/vt/throttler.GlobalManager.
const TxThrottlerName = "TransactionThrottler"
-// TxThrottler throttles transactions based on replication lag.
+// txThrottler implements TxThrottle for throttling transactions based on replication lag.
// It's a thin wrapper around the throttler found in vitess/go/vt/throttler.
// It uses a discovery.HealthCheck to send replication-lag updates to the wrapped throttler.
//
@@ -119,10 +127,10 @@ const TxThrottlerName = "TransactionThrottler"
// // To release the resources used by the throttler the caller should call Close().
// t.Close()
//
-// A TxThrottler object is generally not thread-safe: at any given time at most one goroutine should
+// A txThrottler object is generally not thread-safe: at any given time at most one goroutine should
// be executing a method. The only exception is the 'Throttle' method where multiple goroutines are
// allowed to execute it concurrently.
-type TxThrottler struct {
+type txThrottler struct {
// config stores the transaction throttler's configuration.
// It is populated in NewTxThrottler and is not modified
// since.
@@ -172,12 +180,12 @@ type txThrottlerState struct {
topologyWatchers []TopologyWatcherInterface
}
-// NewTxThrottler tries to construct a TxThrottler from the
+// NewTxThrottler tries to construct a txThrottler from the
// relevant fields in the tabletenv.Config object. It returns a disabled TxThrottler if
// any error occurs.
// This function calls tryCreateTxThrottler that does the actual creation work
// and returns an error if one occurred.
-func NewTxThrottler(env tabletenv.Env, topoServer *topo.Server) *TxThrottler {
+func NewTxThrottler(env tabletenv.Env, topoServer *topo.Server) TxThrottler {
txThrottler, err := tryCreateTxThrottler(env, topoServer)
if err != nil {
log.Errorf("Error creating transaction throttler. Transaction throttling will"+
@@ -191,11 +199,11 @@ func NewTxThrottler(env tabletenv.Env, topoServer *topo.Server) *TxThrottler {
}
// InitDBConfig initializes the target parameters for the throttler.
-func (t *TxThrottler) InitDBConfig(target *querypb.Target) {
+func (t *txThrottler) InitDBConfig(target *querypb.Target) {
t.target = proto.Clone(target).(*querypb.Target)
}
-func tryCreateTxThrottler(env tabletenv.Env, topoServer *topo.Server) (*TxThrottler, error) {
+func tryCreateTxThrottler(env tabletenv.Env, topoServer *topo.Server) (*txThrottler, error) {
if !env.Config().EnableTxThrottler {
return newTxThrottler(env, topoServer, &txThrottlerConfig{enabled: false})
}
@@ -218,7 +226,7 @@ func tryCreateTxThrottler(env tabletenv.Env, topoServer *topo.Server) (*TxThrott
})
}
-func newTxThrottler(env tabletenv.Env, topoServer *topo.Server, config *txThrottlerConfig) (*TxThrottler, error) {
+func newTxThrottler(env tabletenv.Env, topoServer *topo.Server, config *txThrottlerConfig) (*txThrottler, error) {
if config.enabled {
// Verify config.
err := throttler.MaxReplicationLagModuleConfig{Configuration: config.throttlerConfig}.Verify()
@@ -229,7 +237,7 @@ func newTxThrottler(env tabletenv.Env, topoServer *topo.Server, config *txThrott
return nil, fmt.Errorf("empty healthCheckCells given. %+v", config)
}
}
- return &TxThrottler{
+ return &txThrottler{
config: config,
topoServer: topoServer,
throttlerRunning: env.Exporter().NewGauge("TransactionThrottlerRunning", "transaction throttler running state"),
@@ -239,23 +247,23 @@ func newTxThrottler(env tabletenv.Env, topoServer *topo.Server, config *txThrott
}
// Open opens the transaction throttler. It must be called prior to 'Throttle'.
-func (t *TxThrottler) Open() (err error) {
+func (t *txThrottler) Open() (err error) {
if !t.config.enabled {
return nil
}
if t.state != nil {
return nil
}
- log.Info("TxThrottler: opening")
+ log.Info("txThrottler: opening")
t.throttlerRunning.Set(1)
t.state, err = newTxThrottlerState(t.topoServer, t.config, t.target)
return err
}
-// Close closes the TxThrottler object and releases resources.
+// Close closes the txThrottler object and releases resources.
// It should be called after the throttler is no longer needed.
// It's ok to call this method on a closed throttler--in which case the method does nothing.
-func (t *TxThrottler) Close() {
+func (t *txThrottler) Close() {
if !t.config.enabled {
return
}
@@ -265,14 +273,14 @@ func (t *TxThrottler) Close() {
t.state.deallocateResources()
t.state = nil
t.throttlerRunning.Set(0)
- log.Info("TxThrottler: closed")
+ log.Info("txThrottler: closed")
}
// Throttle should be called before a new transaction is started.
// It returns true if the transaction should not proceed (the caller
// should back off). Throttle requires that Open() was previously called
// successfully.
-func (t *TxThrottler) Throttle(priority int) (result bool) {
+func (t *txThrottler) Throttle(priority int) (result bool) {
if !t.config.enabled {
return false
}
@@ -280,9 +288,10 @@ func (t *TxThrottler) Throttle(priority int) (result bool) {
return false
}
- // Throttle according to both what the throttle state says, and the priority. Workloads with lower priority value
+ // Throttle according to both what the throttler state says and the priority. Workloads with lower priority value
// are less likely to be throttled.
result = t.state.throttle() && rand.Intn(sqlparser.MaxPriorityValue) < priority
+
t.requestsTotal.Add(1)
if result {
t.requestsThrottled.Add(1)
diff --git a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go
index 523b45c6174..97138e3928c 100644
--- a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go
+++ b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go
@@ -51,7 +51,8 @@ func TestDisabledThrottler(t *testing.T) {
})
assert.Nil(t, throttler.Open())
assert.False(t, throttler.Throttle(0))
- assert.Zero(t, throttler.throttlerRunning.Get())
+ throttlerImpl, _ := throttler.(*txThrottler)
+ assert.Zero(t, throttlerImpl.throttlerRunning.Get())
throttler.Close()
}
@@ -101,12 +102,13 @@ func TestEnabledThrottler(t *testing.T) {
call4 := mockThrottler.EXPECT().Throttle(0)
call4.Return(1 * time.Second)
- call6 := mockThrottler.EXPECT().Close()
+ calllast := mockThrottler.EXPECT().Close()
+
call1.After(call0)
call2.After(call1)
call3.After(call2)
call4.After(call3)
- call6.After(call4)
+ calllast.After(call4)
config := tabletenv.NewDefaultConfig()
config.EnableTxThrottler = true