Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go/flags/endtoend/vtcombo.txt
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ Flags:
--dba_pool_size int Size of the connection pool for dba connections (default 20)
--dbddl_plugin string controls how to handle CREATE/DROP DATABASE. use it if you are using your own database provisioning service (default "fail")
--ddl_strategy string Set default strategy for DDL statements. Override with @@ddl_strategy session variable (default "direct")
--default-multi-shard-autocommit By default execute multi-shard DML statements with autocommit, even without the MULTI_SHARD_AUTOCOMMIT directive
--default_tablet_type topodatapb.TabletType The default tablet type to set for queries, when one is not explicitly selected. (default PRIMARY)
--degraded_threshold duration replication lag after which a replica is considered degraded (default 30s)
--disable_active_reparents if set, do not allow active reparents. Use this to protect a cluster using external reparents.
Expand Down
1 change: 1 addition & 0 deletions go/flags/endtoend/vtgate.txt
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ Flags:
--datadog-agent-port string port to send spans to. if empty, no tracing will be done
--dbddl_plugin string controls how to handle CREATE/DROP DATABASE. use it if you are using your own database provisioning service (default "fail")
--ddl_strategy string Set default strategy for DDL statements. Override with @@ddl_strategy session variable (default "direct")
--default-multi-shard-autocommit By default execute multi-shard DML statements with autocommit, even without the MULTI_SHARD_AUTOCOMMIT directive
--default_tablet_type topodatapb.TabletType The default tablet type to set for queries, when one is not explicitly selected. (default PRIMARY)
--discovery_high_replication_lag_minimum_serving duration Threshold above which replication lag is considered too high when applying the min_number_serving_vttablets flag. (default 2h0m0s)
--discovery_low_replication_lag duration Threshold below which replication lag is considered low enough to be healthy. (default 30s)
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtexplain/vtexplain_vtgate.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func (vte *VTExplain) initVtgateExecutor(ctx context.Context, ts *topo.Server, v
var schemaTracker vtgate.SchemaInfo // no schema tracker for these tests
queryLogBufferSize := 10
plans := theine.NewStore[vtgate.PlanCacheKey, *engine.Plan](4*1024*1024, false)
vte.vtgateExecutor = vtgate.NewExecutor(ctx, vte.env, vte.explainTopo, Cell, resolver, opts.Normalize, false, streamSize, plans, schemaTracker, false, opts.PlannerVersion, 0)
vte.vtgateExecutor = vtgate.NewExecutor(ctx, vte.env, vte.explainTopo, Cell, resolver, opts.Normalize, false, streamSize, plans, schemaTracker, false, opts.PlannerVersion, 0, false)
vte.vtgateExecutor.SetQueryLogger(streamlog.New[*logstats.LogStats]("VTGate", queryLogBufferSize))

return nil
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtgate/engine/dml.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ func allowOnlyPrimary(rss ...*srvtopo.ResolvedShard) error {
}

func execMultiShard(ctx context.Context, primitive Primitive, vcursor VCursor, rss []*srvtopo.ResolvedShard, queries []*querypb.BoundQuery, multiShardAutoCommit bool) (*sqltypes.Result, error) {
autocommit := (len(rss) == 1 || multiShardAutoCommit) && vcursor.AutocommitApproval()
autocommit := (len(rss) == 1 || multiShardAutoCommit || vcursor.DefaultMultiShardAutocommit()) && vcursor.AutocommitApproval()
result, errs := vcursor.ExecuteMultiShard(ctx, primitive, rss, queries, true /* rollbackOnError */, autocommit)
return result, vterrors.Aggregate(errs)
}
Expand Down
8 changes: 8 additions & 0 deletions go/vt/vtgate/engine/fake_vcursor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,10 @@ func (t *noopVCursor) GetDBDDLPluginName() string {
var _ VCursor = (*loggingVCursor)(nil)
var _ SessionActions = (*loggingVCursor)(nil)

func (t *noopVCursor) DefaultMultiShardAutocommit() bool {
return false
}

// loggingVCursor logs requests and allows you to verify
// that the correct requests were made.
type loggingVCursor struct {
Expand Down Expand Up @@ -784,6 +788,10 @@ func (f *loggingVCursor) SetPriority(string) {
panic("implement me")
}

func (f *loggingVCursor) DefaultMultiShardAutocommit() bool {
return false
}

func (f *loggingVCursor) FindRoutedTable(tbl sqlparser.TableName) (*vindexes.Table, error) {
f.log = append(f.log, fmt.Sprintf("FindTable(%s)", sqlparser.String(tbl)))
return f.tableRoutes.tbl, nil
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtgate/engine/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ func (ins *Insert) executeInsertQueries(
queries []*querypb.BoundQuery,
insertID uint64,
) (*sqltypes.Result, error) {
autocommit := (len(rss) == 1 || ins.MultiShardAutocommit) && vcursor.AutocommitApproval()
autocommit := (len(rss) == 1 || vcursor.DefaultMultiShardAutocommit() || ins.MultiShardAutocommit) && vcursor.AutocommitApproval()
err := allowOnlyPrimary(rss...)
if err != nil {
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtgate/engine/insert_select.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ func (ins *InsertSelect) executeInsertQueries(
queries []*querypb.BoundQuery,
insertID uint64,
) (*sqltypes.Result, error) {
autocommit := (len(rss) == 1 || ins.MultiShardAutocommit) && vcursor.AutocommitApproval()
autocommit := (len(rss) == 1 || vcursor.DefaultMultiShardAutocommit() || ins.MultiShardAutocommit) && vcursor.AutocommitApproval()
err := allowOnlyPrimary(rss...)
if err != nil {
return nil, err
Expand Down
3 changes: 3 additions & 0 deletions go/vt/vtgate/engine/primitive.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,9 @@ type (

// CloneForReplicaWarming clones the VCursor for re-use in warming queries to replicas
CloneForReplicaWarming(ctx context.Context) VCursor

// DefaultMultiShardAutocommit returns true if multi shard autocommit semantics are enabled by default
DefaultMultiShardAutocommit() bool
}

// SessionActions gives primitives ability to interact with the session state
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtgate/engine/send.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func (s *Send) checkAndReturnShards(ctx context.Context, vcursor VCursor) ([]*sr

func (s *Send) canAutoCommit(vcursor VCursor, rss []*srvtopo.ResolvedShard) bool {
if s.IsDML {
return (len(rss) == 1 || s.MultishardAutocommit) && vcursor.AutocommitApproval()
return (len(rss) == 1 || vcursor.DefaultMultiShardAutocommit() || s.MultishardAutocommit) && vcursor.AutocommitApproval()
}
return false
}
Expand Down
35 changes: 20 additions & 15 deletions go/vt/vtgate/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,9 @@ type Executor struct {

warmingReadsPercent int
warmingReadsChannel chan bool

// defaultMultiShardAutocommit will opt into autocommit semantics even for multi shard DMLs
defaultMultiShardAutocommit bool
}

var executorOnce sync.Once
Expand Down Expand Up @@ -155,23 +158,25 @@ func NewExecutor(
noScatter bool,
pv plancontext.PlannerVersion,
warmingReadsPercent int,
defaultMultiShardAutocommit bool,
) *Executor {
e := &Executor{
env: env,
serv: serv,
cell: cell,
resolver: resolver,
scatterConn: resolver.scatterConn,
txConn: resolver.scatterConn.txConn,
normalize: normalize,
warnShardedOnly: warnOnShardedOnly,
streamSize: streamSize,
schemaTracker: schemaTracker,
allowScatter: !noScatter,
pv: pv,
plans: plans,
warmingReadsPercent: warmingReadsPercent,
warmingReadsChannel: make(chan bool, warmingReadsConcurrency),
env: env,
serv: serv,
cell: cell,
resolver: resolver,
scatterConn: resolver.scatterConn,
txConn: resolver.scatterConn.txConn,
normalize: normalize,
warnShardedOnly: warnOnShardedOnly,
streamSize: streamSize,
schemaTracker: schemaTracker,
allowScatter: !noScatter,
pv: pv,
plans: plans,
warmingReadsPercent: warmingReadsPercent,
warmingReadsChannel: make(chan bool, warmingReadsConcurrency),
defaultMultiShardAutocommit: defaultMultiShardAutocommit,
}

vschemaacl.Init()
Expand Down
8 changes: 4 additions & 4 deletions go/vt/vtgate/executor_framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ func createExecutorEnvCallback(t testing.TB, eachShard func(shard, ks string, ta
// one-off queries from thrashing the cache. Disable the doorkeeper in the tests to prevent flakiness.
plans := theine.NewStore[PlanCacheKey, *engine.Plan](queryPlanCacheMemory, false)

executor = NewExecutor(ctx, vtenv.NewTestEnv(), serv, cell, resolver, false, false, testBufferSize, plans, nil, false, querypb.ExecuteOptions_Gen4, 0)
executor = NewExecutor(ctx, vtenv.NewTestEnv(), serv, cell, resolver, false, false, testBufferSize, plans, nil, false, querypb.ExecuteOptions_Gen4, 0, false)
executor.SetQueryLogger(queryLogger)

key.AnyShardPicker = DestinationAnyShardPickerFirstShard{}
Expand Down Expand Up @@ -230,7 +230,7 @@ func createCustomExecutor(t testing.TB, vschema string, mysqlVersion string) (ex
plans := DefaultPlanCache()
env, err := vtenv.New(vtenv.Options{MySQLServerVersion: mysqlVersion})
require.NoError(t, err)
executor = NewExecutor(ctx, env, serv, cell, resolver, false, false, testBufferSize, plans, nil, false, querypb.ExecuteOptions_Gen4, 0)
executor = NewExecutor(ctx, env, serv, cell, resolver, false, false, testBufferSize, plans, nil, false, querypb.ExecuteOptions_Gen4, 0, false)
executor.SetQueryLogger(queryLogger)

t.Cleanup(func() {
Expand Down Expand Up @@ -267,7 +267,7 @@ func createCustomExecutorSetValues(t testing.TB, vschema string, values []*sqlty
sbclookup = hc.AddTestTablet(cell, "0", 1, KsTestUnsharded, "0", topodatapb.TabletType_PRIMARY, true, 1, nil)
queryLogger := streamlog.New[*logstats.LogStats]("VTGate", queryLogBufferSize)
plans := DefaultPlanCache()
executor = NewExecutor(ctx, vtenv.NewTestEnv(), serv, cell, resolver, false, false, testBufferSize, plans, nil, false, querypb.ExecuteOptions_Gen4, 0)
executor = NewExecutor(ctx, vtenv.NewTestEnv(), serv, cell, resolver, false, false, testBufferSize, plans, nil, false, querypb.ExecuteOptions_Gen4, 0, false)
executor.SetQueryLogger(queryLogger)

t.Cleanup(func() {
Expand All @@ -292,7 +292,7 @@ func createExecutorEnvWithPrimaryReplicaConn(t testing.TB, ctx context.Context,
replica = hc.AddTestTablet(cell, "0-replica", 1, KsTestUnsharded, "0", topodatapb.TabletType_REPLICA, true, 1, nil)

queryLogger := streamlog.New[*logstats.LogStats]("VTGate", queryLogBufferSize)
executor = NewExecutor(ctx, vtenv.NewTestEnv(), serv, cell, resolver, false, false, testBufferSize, DefaultPlanCache(), nil, false, querypb.ExecuteOptions_Gen4, warmingReadsPercent)
executor = NewExecutor(ctx, vtenv.NewTestEnv(), serv, cell, resolver, false, false, testBufferSize, DefaultPlanCache(), nil, false, querypb.ExecuteOptions_Gen4, warmingReadsPercent, false)
executor.SetQueryLogger(queryLogger)

t.Cleanup(func() {
Expand Down
4 changes: 2 additions & 2 deletions go/vt/vtgate/executor_select_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1622,7 +1622,7 @@ func TestSelectListArg(t *testing.T) {
func createExecutor(ctx context.Context, serv *sandboxTopo, cell string, resolver *Resolver) *Executor {
queryLogger := streamlog.New[*logstats.LogStats]("VTGate", queryLogBufferSize)
plans := DefaultPlanCache()
ex := NewExecutor(ctx, vtenv.NewTestEnv(), serv, cell, resolver, false, false, testBufferSize, plans, nil, false, querypb.ExecuteOptions_Gen4, 0)
ex := NewExecutor(ctx, vtenv.NewTestEnv(), serv, cell, resolver, false, false, testBufferSize, plans, nil, false, querypb.ExecuteOptions_Gen4, 0, false)
ex.SetQueryLogger(queryLogger)
return ex
}
Expand Down Expand Up @@ -3313,7 +3313,7 @@ func TestStreamOrderByLimitWithMultipleResults(t *testing.T) {
}
queryLogger := streamlog.New[*logstats.LogStats]("VTGate", queryLogBufferSize)
plans := DefaultPlanCache()
executor := NewExecutor(ctx, vtenv.NewTestEnv(), serv, cell, resolver, true, false, testBufferSize, plans, nil, false, querypb.ExecuteOptions_Gen4, 0)
executor := NewExecutor(ctx, vtenv.NewTestEnv(), serv, cell, resolver, true, false, testBufferSize, plans, nil, false, querypb.ExecuteOptions_Gen4, 0, false)
executor.SetQueryLogger(queryLogger)
defer executor.Close()
// some sleep for all goroutines to start
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtgate/executor_stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func TestStreamSQLSharded(t *testing.T) {
queryLogger := streamlog.New[*logstats.LogStats]("VTGate", queryLogBufferSize)
plans := DefaultPlanCache()

executor := NewExecutor(ctx, vtenv.NewTestEnv(), serv, cell, resolver, false, false, testBufferSize, plans, nil, false, querypb.ExecuteOptions_Gen4, 0)
executor := NewExecutor(ctx, vtenv.NewTestEnv(), serv, cell, resolver, false, false, testBufferSize, plans, nil, false, querypb.ExecuteOptions_Gen4, 0, false)
executor.SetQueryLogger(queryLogger)

defer executor.Close()
Expand Down
43 changes: 25 additions & 18 deletions go/vt/vtgate/vcursor_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,9 @@ type vcursorImpl struct {
warnings []*querypb.QueryWarning // any warnings that are accumulated during the planning phase are stored here
pv plancontext.PlannerVersion

warmingReadsPercent int
warmingReadsChannel chan bool
warmingReadsPercent int
warmingReadsChannel chan bool
defaultMultiShardAutocommit bool
}

// newVcursorImpl creates a vcursorImpl. Before creating this object, you have to separate out any marginComments that came with
Expand Down Expand Up @@ -173,24 +174,26 @@ func newVCursorImpl(
if executor != nil {
warmingReadsPct = executor.warmingReadsPercent
warmingReadsChan = executor.warmingReadsChannel
defaultMultiShardAutocommit = executor.defaultMultiShardAutocommit
}
return &vcursorImpl{
safeSession: safeSession,
keyspace: keyspace,
tabletType: tabletType,
destination: destination,
marginComments: marginComments,
executor: executor,
logStats: logStats,
collation: connCollation,
resolver: resolver,
vschema: vschema,
vm: vm,
topoServer: ts,
warnShardedOnly: warnShardedOnly,
pv: pv,
warmingReadsPercent: warmingReadsPct,
warmingReadsChannel: warmingReadsChan,
safeSession: safeSession,
keyspace: keyspace,
tabletType: tabletType,
destination: destination,
marginComments: marginComments,
executor: executor,
logStats: logStats,
collation: connCollation,
resolver: resolver,
vschema: vschema,
vm: vm,
topoServer: ts,
warnShardedOnly: warnShardedOnly,
pv: pv,
warmingReadsPercent: warmingReadsPct,
warmingReadsChannel: warmingReadsChan,
defaultMultiShardAutocommit: defaultMultiShardAutocommit,
}, nil
}

Expand Down Expand Up @@ -1369,3 +1372,7 @@ func (vc *vcursorImpl) UpdateForeignKeyChecksState(fkStateFromQuery *bool) {
func (vc *vcursorImpl) GetForeignKeyChecksState() *bool {
return vc.fkChecksState
}

func (vc *vcursorImpl) DefaultMultiShardAutocommit() bool {
return vc.defaultMultiShardAutocommit
}
4 changes: 4 additions & 0 deletions go/vt/vtgate/vtgate.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ var (
noScatter bool
enableShardRouting bool

defaultMultiShardAutocommit bool

// healthCheckRetryDelay is the time to wait before retrying healthcheck
healthCheckRetryDelay = 2 * time.Millisecond
// healthCheckTimeout is the timeout on the RPC call to tablets
Expand Down Expand Up @@ -153,6 +155,7 @@ func registerFlags(fs *pflag.FlagSet) {
fs.IntVar(&warmingReadsPercent, "warming-reads-percent", 0, "Percentage of reads on the primary to forward to replicas. Useful for keeping buffer pools warm")
fs.IntVar(&warmingReadsConcurrency, "warming-reads-concurrency", 500, "Number of concurrent warming reads allowed")
fs.DurationVar(&warmingReadsQueryTimeout, "warming-reads-query-timeout", 5*time.Second, "Timeout of warming read queries")
fs.BoolVar(&defaultMultiShardAutocommit, "default-multi-shard-autocommit", defaultMultiShardAutocommit, "By default execute multi-shard DML statements with autocommit, even without the MULTI_SHARD_AUTOCOMMIT directive")
}

func init() {
Expand Down Expand Up @@ -329,6 +332,7 @@ func Init(
noScatter,
pv,
warmingReadsPercent,
defaultMultiShardAutocommit,
)

if err := executor.defaultQueryLogger(); err != nil {
Expand Down
Loading