diff --git a/go/flags/endtoend/vtcombo.txt b/go/flags/endtoend/vtcombo.txt index cd6479b1654..d7b468ef7bb 100644 --- a/go/flags/endtoend/vtcombo.txt +++ b/go/flags/endtoend/vtcombo.txt @@ -108,6 +108,7 @@ Flags: --dba_pool_size int Size of the connection pool for dba connections (default 20) --dbddl_plugin string controls how to handle CREATE/DROP DATABASE. use it if you are using your own database provisioning service (default "fail") --ddl_strategy string Set default strategy for DDL statements. Override with @@ddl_strategy session variable (default "direct") + --default-multi-shard-autocommit By default execute multi-shard DML statements with autocommit, even without the MULTI_SHARD_AUTOCOMMIT directive --default_tablet_type topodatapb.TabletType The default tablet type to set for queries, when one is not explicitly selected. (default PRIMARY) --degraded_threshold duration replication lag after which a replica is considered degraded (default 30s) --disable_active_reparents if set, do not allow active reparents. Use this to protect a cluster using external reparents. diff --git a/go/flags/endtoend/vtgate.txt b/go/flags/endtoend/vtgate.txt index a56a6893934..e271b523a59 100644 --- a/go/flags/endtoend/vtgate.txt +++ b/go/flags/endtoend/vtgate.txt @@ -51,6 +51,7 @@ Flags: --datadog-agent-port string port to send spans to. if empty, no tracing will be done --dbddl_plugin string controls how to handle CREATE/DROP DATABASE. use it if you are using your own database provisioning service (default "fail") --ddl_strategy string Set default strategy for DDL statements. Override with @@ddl_strategy session variable (default "direct") + --default-multi-shard-autocommit By default execute multi-shard DML statements with autocommit, even without the MULTI_SHARD_AUTOCOMMIT directive --default_tablet_type topodatapb.TabletType The default tablet type to set for queries, when one is not explicitly selected. (default PRIMARY) --discovery_high_replication_lag_minimum_serving duration Threshold above which replication lag is considered too high when applying the min_number_serving_vttablets flag. (default 2h0m0s) --discovery_low_replication_lag duration Threshold below which replication lag is considered low enough to be healthy. (default 30s) diff --git a/go/vt/vtexplain/vtexplain_vtgate.go b/go/vt/vtexplain/vtexplain_vtgate.go index a5d9677f02a..c5d8c7104b3 100644 --- a/go/vt/vtexplain/vtexplain_vtgate.go +++ b/go/vt/vtexplain/vtexplain_vtgate.go @@ -72,7 +72,7 @@ func (vte *VTExplain) initVtgateExecutor(ctx context.Context, ts *topo.Server, v var schemaTracker vtgate.SchemaInfo // no schema tracker for these tests queryLogBufferSize := 10 plans := theine.NewStore[vtgate.PlanCacheKey, *engine.Plan](4*1024*1024, false) - vte.vtgateExecutor = vtgate.NewExecutor(ctx, vte.env, vte.explainTopo, Cell, resolver, opts.Normalize, false, streamSize, plans, schemaTracker, false, opts.PlannerVersion, 0) + vte.vtgateExecutor = vtgate.NewExecutor(ctx, vte.env, vte.explainTopo, Cell, resolver, opts.Normalize, false, streamSize, plans, schemaTracker, false, opts.PlannerVersion, 0, false) vte.vtgateExecutor.SetQueryLogger(streamlog.New[*logstats.LogStats]("VTGate", queryLogBufferSize)) return nil diff --git a/go/vt/vtgate/engine/dml.go b/go/vt/vtgate/engine/dml.go index 463008a4433..a711e0c0817 100644 --- a/go/vt/vtgate/engine/dml.go +++ b/go/vt/vtgate/engine/dml.go @@ -130,7 +130,7 @@ func allowOnlyPrimary(rss ...*srvtopo.ResolvedShard) error { } func execMultiShard(ctx context.Context, primitive Primitive, vcursor VCursor, rss []*srvtopo.ResolvedShard, queries []*querypb.BoundQuery, multiShardAutoCommit bool) (*sqltypes.Result, error) { - autocommit := (len(rss) == 1 || multiShardAutoCommit) && vcursor.AutocommitApproval() + autocommit := (len(rss) == 1 || multiShardAutoCommit || vcursor.DefaultMultiShardAutocommit()) && vcursor.AutocommitApproval() result, errs := vcursor.ExecuteMultiShard(ctx, primitive, rss, queries, true /* rollbackOnError */, autocommit) return result, vterrors.Aggregate(errs) } diff --git a/go/vt/vtgate/engine/fake_vcursor_test.go b/go/vt/vtgate/engine/fake_vcursor_test.go index 08a40c0d835..d75c1f8bec3 100644 --- a/go/vt/vtgate/engine/fake_vcursor_test.go +++ b/go/vt/vtgate/engine/fake_vcursor_test.go @@ -383,6 +383,10 @@ func (t *noopVCursor) GetDBDDLPluginName() string { var _ VCursor = (*loggingVCursor)(nil) var _ SessionActions = (*loggingVCursor)(nil) +func (t *noopVCursor) DefaultMultiShardAutocommit() bool { + return false +} + // loggingVCursor logs requests and allows you to verify // that the correct requests were made. type loggingVCursor struct { @@ -784,6 +788,10 @@ func (f *loggingVCursor) SetPriority(string) { panic("implement me") } +func (f *loggingVCursor) DefaultMultiShardAutocommit() bool { + return false +} + func (f *loggingVCursor) FindRoutedTable(tbl sqlparser.TableName) (*vindexes.Table, error) { f.log = append(f.log, fmt.Sprintf("FindTable(%s)", sqlparser.String(tbl))) return f.tableRoutes.tbl, nil diff --git a/go/vt/vtgate/engine/insert.go b/go/vt/vtgate/engine/insert.go index be0bb889083..f00e998d68c 100644 --- a/go/vt/vtgate/engine/insert.go +++ b/go/vt/vtgate/engine/insert.go @@ -164,7 +164,7 @@ func (ins *Insert) executeInsertQueries( queries []*querypb.BoundQuery, insertID uint64, ) (*sqltypes.Result, error) { - autocommit := (len(rss) == 1 || ins.MultiShardAutocommit) && vcursor.AutocommitApproval() + autocommit := (len(rss) == 1 || vcursor.DefaultMultiShardAutocommit() || ins.MultiShardAutocommit) && vcursor.AutocommitApproval() err := allowOnlyPrimary(rss...) if err != nil { return nil, err diff --git a/go/vt/vtgate/engine/insert_select.go b/go/vt/vtgate/engine/insert_select.go index 88767420508..0f7a83c259a 100644 --- a/go/vt/vtgate/engine/insert_select.go +++ b/go/vt/vtgate/engine/insert_select.go @@ -201,7 +201,7 @@ func (ins *InsertSelect) executeInsertQueries( queries []*querypb.BoundQuery, insertID uint64, ) (*sqltypes.Result, error) { - autocommit := (len(rss) == 1 || ins.MultiShardAutocommit) && vcursor.AutocommitApproval() + autocommit := (len(rss) == 1 || vcursor.DefaultMultiShardAutocommit() || ins.MultiShardAutocommit) && vcursor.AutocommitApproval() err := allowOnlyPrimary(rss...) if err != nil { return nil, err diff --git a/go/vt/vtgate/engine/primitive.go b/go/vt/vtgate/engine/primitive.go index 1c0e7de7a19..2abce115692 100644 --- a/go/vt/vtgate/engine/primitive.go +++ b/go/vt/vtgate/engine/primitive.go @@ -131,6 +131,9 @@ type ( // CloneForReplicaWarming clones the VCursor for re-use in warming queries to replicas CloneForReplicaWarming(ctx context.Context) VCursor + + // DefaultMultiShardAutocommit returns true if multi shard autocommit semantics are enabled by default + DefaultMultiShardAutocommit() bool } // SessionActions gives primitives ability to interact with the session state diff --git a/go/vt/vtgate/engine/send.go b/go/vt/vtgate/engine/send.go index 53f12cd3549..0f25c91fa5f 100644 --- a/go/vt/vtgate/engine/send.go +++ b/go/vt/vtgate/engine/send.go @@ -140,7 +140,7 @@ func (s *Send) checkAndReturnShards(ctx context.Context, vcursor VCursor) ([]*sr func (s *Send) canAutoCommit(vcursor VCursor, rss []*srvtopo.ResolvedShard) bool { if s.IsDML { - return (len(rss) == 1 || s.MultishardAutocommit) && vcursor.AutocommitApproval() + return (len(rss) == 1 || vcursor.DefaultMultiShardAutocommit() || s.MultishardAutocommit) && vcursor.AutocommitApproval() } return false } diff --git a/go/vt/vtgate/executor.go b/go/vt/vtgate/executor.go index b99873ced02..530b5c0ebd9 100644 --- a/go/vt/vtgate/executor.go +++ b/go/vt/vtgate/executor.go @@ -124,6 +124,9 @@ type Executor struct { warmingReadsPercent int warmingReadsChannel chan bool + + // defaultMultiShardAutocommit will opt into autocommit semantics even for multi shard DMLs + defaultMultiShardAutocommit bool } var executorOnce sync.Once @@ -155,23 +158,25 @@ func NewExecutor( noScatter bool, pv plancontext.PlannerVersion, warmingReadsPercent int, + defaultMultiShardAutocommit bool, ) *Executor { e := &Executor{ - env: env, - serv: serv, - cell: cell, - resolver: resolver, - scatterConn: resolver.scatterConn, - txConn: resolver.scatterConn.txConn, - normalize: normalize, - warnShardedOnly: warnOnShardedOnly, - streamSize: streamSize, - schemaTracker: schemaTracker, - allowScatter: !noScatter, - pv: pv, - plans: plans, - warmingReadsPercent: warmingReadsPercent, - warmingReadsChannel: make(chan bool, warmingReadsConcurrency), + env: env, + serv: serv, + cell: cell, + resolver: resolver, + scatterConn: resolver.scatterConn, + txConn: resolver.scatterConn.txConn, + normalize: normalize, + warnShardedOnly: warnOnShardedOnly, + streamSize: streamSize, + schemaTracker: schemaTracker, + allowScatter: !noScatter, + pv: pv, + plans: plans, + warmingReadsPercent: warmingReadsPercent, + warmingReadsChannel: make(chan bool, warmingReadsConcurrency), + defaultMultiShardAutocommit: defaultMultiShardAutocommit, } vschemaacl.Init() diff --git a/go/vt/vtgate/executor_framework_test.go b/go/vt/vtgate/executor_framework_test.go index 332139c4a78..31cf8e8c4f0 100644 --- a/go/vt/vtgate/executor_framework_test.go +++ b/go/vt/vtgate/executor_framework_test.go @@ -181,7 +181,7 @@ func createExecutorEnvCallback(t testing.TB, eachShard func(shard, ks string, ta // one-off queries from thrashing the cache. Disable the doorkeeper in the tests to prevent flakiness. plans := theine.NewStore[PlanCacheKey, *engine.Plan](queryPlanCacheMemory, false) - executor = NewExecutor(ctx, vtenv.NewTestEnv(), serv, cell, resolver, false, false, testBufferSize, plans, nil, false, querypb.ExecuteOptions_Gen4, 0) + executor = NewExecutor(ctx, vtenv.NewTestEnv(), serv, cell, resolver, false, false, testBufferSize, plans, nil, false, querypb.ExecuteOptions_Gen4, 0, false) executor.SetQueryLogger(queryLogger) key.AnyShardPicker = DestinationAnyShardPickerFirstShard{} @@ -230,7 +230,7 @@ func createCustomExecutor(t testing.TB, vschema string, mysqlVersion string) (ex plans := DefaultPlanCache() env, err := vtenv.New(vtenv.Options{MySQLServerVersion: mysqlVersion}) require.NoError(t, err) - executor = NewExecutor(ctx, env, serv, cell, resolver, false, false, testBufferSize, plans, nil, false, querypb.ExecuteOptions_Gen4, 0) + executor = NewExecutor(ctx, env, serv, cell, resolver, false, false, testBufferSize, plans, nil, false, querypb.ExecuteOptions_Gen4, 0, false) executor.SetQueryLogger(queryLogger) t.Cleanup(func() { @@ -267,7 +267,7 @@ func createCustomExecutorSetValues(t testing.TB, vschema string, values []*sqlty sbclookup = hc.AddTestTablet(cell, "0", 1, KsTestUnsharded, "0", topodatapb.TabletType_PRIMARY, true, 1, nil) queryLogger := streamlog.New[*logstats.LogStats]("VTGate", queryLogBufferSize) plans := DefaultPlanCache() - executor = NewExecutor(ctx, vtenv.NewTestEnv(), serv, cell, resolver, false, false, testBufferSize, plans, nil, false, querypb.ExecuteOptions_Gen4, 0) + executor = NewExecutor(ctx, vtenv.NewTestEnv(), serv, cell, resolver, false, false, testBufferSize, plans, nil, false, querypb.ExecuteOptions_Gen4, 0, false) executor.SetQueryLogger(queryLogger) t.Cleanup(func() { @@ -292,7 +292,7 @@ func createExecutorEnvWithPrimaryReplicaConn(t testing.TB, ctx context.Context, replica = hc.AddTestTablet(cell, "0-replica", 1, KsTestUnsharded, "0", topodatapb.TabletType_REPLICA, true, 1, nil) queryLogger := streamlog.New[*logstats.LogStats]("VTGate", queryLogBufferSize) - executor = NewExecutor(ctx, vtenv.NewTestEnv(), serv, cell, resolver, false, false, testBufferSize, DefaultPlanCache(), nil, false, querypb.ExecuteOptions_Gen4, warmingReadsPercent) + executor = NewExecutor(ctx, vtenv.NewTestEnv(), serv, cell, resolver, false, false, testBufferSize, DefaultPlanCache(), nil, false, querypb.ExecuteOptions_Gen4, warmingReadsPercent, false) executor.SetQueryLogger(queryLogger) t.Cleanup(func() { diff --git a/go/vt/vtgate/executor_select_test.go b/go/vt/vtgate/executor_select_test.go index 90118603f46..e5bf834fd69 100644 --- a/go/vt/vtgate/executor_select_test.go +++ b/go/vt/vtgate/executor_select_test.go @@ -1622,7 +1622,7 @@ func TestSelectListArg(t *testing.T) { func createExecutor(ctx context.Context, serv *sandboxTopo, cell string, resolver *Resolver) *Executor { queryLogger := streamlog.New[*logstats.LogStats]("VTGate", queryLogBufferSize) plans := DefaultPlanCache() - ex := NewExecutor(ctx, vtenv.NewTestEnv(), serv, cell, resolver, false, false, testBufferSize, plans, nil, false, querypb.ExecuteOptions_Gen4, 0) + ex := NewExecutor(ctx, vtenv.NewTestEnv(), serv, cell, resolver, false, false, testBufferSize, plans, nil, false, querypb.ExecuteOptions_Gen4, 0, false) ex.SetQueryLogger(queryLogger) return ex } @@ -3313,7 +3313,7 @@ func TestStreamOrderByLimitWithMultipleResults(t *testing.T) { } queryLogger := streamlog.New[*logstats.LogStats]("VTGate", queryLogBufferSize) plans := DefaultPlanCache() - executor := NewExecutor(ctx, vtenv.NewTestEnv(), serv, cell, resolver, true, false, testBufferSize, plans, nil, false, querypb.ExecuteOptions_Gen4, 0) + executor := NewExecutor(ctx, vtenv.NewTestEnv(), serv, cell, resolver, true, false, testBufferSize, plans, nil, false, querypb.ExecuteOptions_Gen4, 0, false) executor.SetQueryLogger(queryLogger) defer executor.Close() // some sleep for all goroutines to start diff --git a/go/vt/vtgate/executor_stream_test.go b/go/vt/vtgate/executor_stream_test.go index b8cfeaf3cd5..066c30e667c 100644 --- a/go/vt/vtgate/executor_stream_test.go +++ b/go/vt/vtgate/executor_stream_test.go @@ -67,7 +67,7 @@ func TestStreamSQLSharded(t *testing.T) { queryLogger := streamlog.New[*logstats.LogStats]("VTGate", queryLogBufferSize) plans := DefaultPlanCache() - executor := NewExecutor(ctx, vtenv.NewTestEnv(), serv, cell, resolver, false, false, testBufferSize, plans, nil, false, querypb.ExecuteOptions_Gen4, 0) + executor := NewExecutor(ctx, vtenv.NewTestEnv(), serv, cell, resolver, false, false, testBufferSize, plans, nil, false, querypb.ExecuteOptions_Gen4, 0, false) executor.SetQueryLogger(queryLogger) defer executor.Close() diff --git a/go/vt/vtgate/vcursor_impl.go b/go/vt/vtgate/vcursor_impl.go index 67093046ded..a6d00f1bb96 100644 --- a/go/vt/vtgate/vcursor_impl.go +++ b/go/vt/vtgate/vcursor_impl.go @@ -122,8 +122,9 @@ type vcursorImpl struct { warnings []*querypb.QueryWarning // any warnings that are accumulated during the planning phase are stored here pv plancontext.PlannerVersion - warmingReadsPercent int - warmingReadsChannel chan bool + warmingReadsPercent int + warmingReadsChannel chan bool + defaultMultiShardAutocommit bool } // newVcursorImpl creates a vcursorImpl. Before creating this object, you have to separate out any marginComments that came with @@ -173,24 +174,26 @@ func newVCursorImpl( if executor != nil { warmingReadsPct = executor.warmingReadsPercent warmingReadsChan = executor.warmingReadsChannel + defaultMultiShardAutocommit = executor.defaultMultiShardAutocommit } return &vcursorImpl{ - safeSession: safeSession, - keyspace: keyspace, - tabletType: tabletType, - destination: destination, - marginComments: marginComments, - executor: executor, - logStats: logStats, - collation: connCollation, - resolver: resolver, - vschema: vschema, - vm: vm, - topoServer: ts, - warnShardedOnly: warnShardedOnly, - pv: pv, - warmingReadsPercent: warmingReadsPct, - warmingReadsChannel: warmingReadsChan, + safeSession: safeSession, + keyspace: keyspace, + tabletType: tabletType, + destination: destination, + marginComments: marginComments, + executor: executor, + logStats: logStats, + collation: connCollation, + resolver: resolver, + vschema: vschema, + vm: vm, + topoServer: ts, + warnShardedOnly: warnShardedOnly, + pv: pv, + warmingReadsPercent: warmingReadsPct, + warmingReadsChannel: warmingReadsChan, + defaultMultiShardAutocommit: defaultMultiShardAutocommit, }, nil } @@ -1369,3 +1372,7 @@ func (vc *vcursorImpl) UpdateForeignKeyChecksState(fkStateFromQuery *bool) { func (vc *vcursorImpl) GetForeignKeyChecksState() *bool { return vc.fkChecksState } + +func (vc *vcursorImpl) DefaultMultiShardAutocommit() bool { + return vc.defaultMultiShardAutocommit +} diff --git a/go/vt/vtgate/vtgate.go b/go/vt/vtgate/vtgate.go index 6f2823521fc..637ea19b98f 100644 --- a/go/vt/vtgate/vtgate.go +++ b/go/vt/vtgate/vtgate.go @@ -76,6 +76,8 @@ var ( noScatter bool enableShardRouting bool + defaultMultiShardAutocommit bool + // healthCheckRetryDelay is the time to wait before retrying healthcheck healthCheckRetryDelay = 2 * time.Millisecond // healthCheckTimeout is the timeout on the RPC call to tablets @@ -153,6 +155,7 @@ func registerFlags(fs *pflag.FlagSet) { fs.IntVar(&warmingReadsPercent, "warming-reads-percent", 0, "Percentage of reads on the primary to forward to replicas. Useful for keeping buffer pools warm") fs.IntVar(&warmingReadsConcurrency, "warming-reads-concurrency", 500, "Number of concurrent warming reads allowed") fs.DurationVar(&warmingReadsQueryTimeout, "warming-reads-query-timeout", 5*time.Second, "Timeout of warming read queries") + fs.BoolVar(&defaultMultiShardAutocommit, "default-multi-shard-autocommit", defaultMultiShardAutocommit, "By default execute multi-shard DML statements with autocommit, even without the MULTI_SHARD_AUTOCOMMIT directive") } func init() { @@ -329,6 +332,7 @@ func Init( noScatter, pv, warmingReadsPercent, + defaultMultiShardAutocommit, ) if err := executor.defaultQueryLogger(); err != nil {