diff --git a/go/vt/vtgate/engine/dml.go b/go/vt/vtgate/engine/dml.go index 9a0a044a3c4..09f781c835d 100644 --- a/go/vt/vtgate/engine/dml.go +++ b/go/vt/vtgate/engine/dml.go @@ -133,7 +133,7 @@ func allowOnlyPrimary(rss ...*srvtopo.ResolvedShard) error { } func (dml *DML) execMultiShard(ctx context.Context, primitive Primitive, vcursor VCursor, rss []*srvtopo.ResolvedShard, queries []*querypb.BoundQuery) (*sqltypes.Result, error) { - autocommit := (len(rss) == 1 || dml.MultiShardAutocommit) && vcursor.AutocommitApproval() + autocommit := (len(rss) == 1 || vcursor.DefaultMultiShardAutocommit() || dml.MultiShardAutocommit) && vcursor.AutocommitApproval() result, errs := vcursor.ExecuteMultiShard(ctx, primitive, rss, queries, true /*rollbackOnError*/, autocommit, dml.FetchLastInsertID) return result, vterrors.Aggregate(errs) } diff --git a/go/vt/vtgate/engine/fake_vcursor_test.go b/go/vt/vtgate/engine/fake_vcursor_test.go index 060d2ebcfcb..e9b4bb2ae3f 100644 --- a/go/vt/vtgate/engine/fake_vcursor_test.go +++ b/go/vt/vtgate/engine/fake_vcursor_test.go @@ -414,6 +414,10 @@ func (t *noopVCursor) GetLogs() ([]ExecuteEntry, error) { func (t *noopVCursor) RecordMirrorStats(sourceExecTime, targetExecTime time.Duration, targetErr error) { } +func (t *noopVCursor) DefaultMultiShardAutocommit() bool { + return false +} + var ( _ VCursor = (*loggingVCursor)(nil) _ SessionActions = (*loggingVCursor)(nil) @@ -845,6 +849,10 @@ func (f *loggingVCursor) SetPriority(string) { panic("implement me") } +func (f *loggingVCursor) DefaultMultiShardAutocommit() bool { + return false +} + func (f *loggingVCursor) FindRoutedTable(tbl sqlparser.TableName) (*vindexes.Table, error) { f.log = append(f.log, fmt.Sprintf("FindTable(%s)", sqlparser.String(tbl))) return f.tableRoutes.tbl, nil diff --git a/go/vt/vtgate/engine/insert.go b/go/vt/vtgate/engine/insert.go index 5bc206f7465..c4f5eef409d 100644 --- a/go/vt/vtgate/engine/insert.go +++ b/go/vt/vtgate/engine/insert.go @@ -164,7 +164,7 @@ func (ins *Insert) executeInsertQueries( queries []*querypb.BoundQuery, insertID uint64, ) (*sqltypes.Result, error) { - autocommit := (len(rss) == 1 || ins.MultiShardAutocommit) && vcursor.AutocommitApproval() + autocommit := (len(rss) == 1 || vcursor.DefaultMultiShardAutocommit() || ins.MultiShardAutocommit) && vcursor.AutocommitApproval() err := allowOnlyPrimary(rss...) if err != nil { return nil, err diff --git a/go/vt/vtgate/engine/insert_select.go b/go/vt/vtgate/engine/insert_select.go index af834858175..ff47ffe8c26 100644 --- a/go/vt/vtgate/engine/insert_select.go +++ b/go/vt/vtgate/engine/insert_select.go @@ -202,7 +202,7 @@ func (ins *InsertSelect) executeInsertQueries( queries []*querypb.BoundQuery, insertID uint64, ) (*sqltypes.Result, error) { - autocommit := (len(rss) == 1 || ins.MultiShardAutocommit) && vcursor.AutocommitApproval() + autocommit := (len(rss) == 1 || vcursor.DefaultMultiShardAutocommit() || ins.MultiShardAutocommit) && vcursor.AutocommitApproval() err := allowOnlyPrimary(rss...) if err != nil { return nil, err diff --git a/go/vt/vtgate/engine/primitive.go b/go/vt/vtgate/engine/primitive.go index 7734dd81a6b..94a8381d0cd 100644 --- a/go/vt/vtgate/engine/primitive.go +++ b/go/vt/vtgate/engine/primitive.go @@ -149,6 +149,9 @@ type ( RecordMirrorStats(time.Duration, time.Duration, error) SetLastInsertID(uint64) + + // DefaultMultiShardAutocommit returns true if multi shard autocommit semantics are enabled by default + DefaultMultiShardAutocommit() bool } // SessionActions gives primitives ability to interact with the session state diff --git a/go/vt/vtgate/engine/send.go b/go/vt/vtgate/engine/send.go index 4655f680675..58ed29fade3 100644 --- a/go/vt/vtgate/engine/send.go +++ b/go/vt/vtgate/engine/send.go @@ -146,7 +146,7 @@ func (s *Send) checkAndReturnShards(ctx context.Context, vcursor VCursor) ([]*sr func (s *Send) canAutoCommit(vcursor VCursor, rss []*srvtopo.ResolvedShard) bool { if s.IsDML { - return (len(rss) == 1 || s.MultishardAutocommit) && vcursor.AutocommitApproval() + return (len(rss) == 1 || vcursor.DefaultMultiShardAutocommit() || s.MultishardAutocommit) && vcursor.AutocommitApproval() } return false } diff --git a/go/vt/vtgate/executor.go b/go/vt/vtgate/executor.go index c8dfd0b6269..4d7c43ba848 100644 --- a/go/vt/vtgate/executor.go +++ b/go/vt/vtgate/executor.go @@ -115,6 +115,9 @@ type ( AllowScatter bool WarmingReadsPercent int QueryLogToFile string + + // DefaultMultiShardAutocommit will opt into autocommit semantics even for multi shard DMLs + DefaultMultiShardAutocommit bool } Executor struct { @@ -1424,6 +1427,8 @@ func (e *Executor) initVConfig(warnOnShardedOnly bool, pv plancontext.PlannerVer WarmingReadsPercent: e.config.WarmingReadsPercent, WarmingReadsTimeout: warmingReadsQueryTimeout, WarmingReadsChannel: e.warmingReadsChannel, + + DefaultMultiShardAutocommit: e.config.DefaultMultiShardAutocommit, } } diff --git a/go/vt/vtgate/executorcontext/vcursor_impl.go b/go/vt/vtgate/executorcontext/vcursor_impl.go index 4d3f631bac5..087b5984477 100644 --- a/go/vt/vtgate/executorcontext/vcursor_impl.go +++ b/go/vt/vtgate/executorcontext/vcursor_impl.go @@ -91,6 +91,8 @@ type ( WarmingReadsPercent int WarmingReadsTimeout time.Duration WarmingReadsChannel chan bool + + DefaultMultiShardAutocommit bool } // vcursor_impl needs these facilities to be able to be able to execute queries for vindexes @@ -1619,3 +1621,7 @@ func (vc *VCursorImpl) SetLastInsertID(id uint64) { defer vc.SafeSession.mu.Unlock() vc.SafeSession.LastInsertId = id } + +func (vc *VCursorImpl) DefaultMultiShardAutocommit() bool { + return vc.config.DefaultMultiShardAutocommit +} diff --git a/go/vt/vtgate/vtgate.go b/go/vt/vtgate/vtgate.go index 8b8302d77d4..afe81b4f92e 100644 --- a/go/vt/vtgate/vtgate.go +++ b/go/vt/vtgate/vtgate.go @@ -78,6 +78,8 @@ var ( noScatter bool enableShardRouting bool + defaultMultiShardAutocommit bool + // healthCheckRetryDelay is the time to wait before retrying healthcheck healthCheckRetryDelay = 2 * time.Millisecond // healthCheckTimeout is the timeout on the RPC call to tablets @@ -199,6 +201,7 @@ func registerFlags(fs *pflag.FlagSet) { fs.IntVar(&warmingReadsPercent, "warming-reads-percent", 0, "Percentage of reads on the primary to forward to replicas. Useful for keeping buffer pools warm") fs.IntVar(&warmingReadsConcurrency, "warming-reads-concurrency", 500, "Number of concurrent warming reads allowed") fs.DurationVar(&warmingReadsQueryTimeout, "warming-reads-query-timeout", 5*time.Second, "Timeout of warming read queries") + fs.BoolVar(&defaultMultiShardAutocommit, "default-multi-shard-autocommit", defaultMultiShardAutocommit, "By default execute multi-shard DML statements with autocommit, even without the MULTI_SHARD_AUTOCOMMIT directive") viperutil.BindFlags(fs, enableOnlineDDL, @@ -358,6 +361,8 @@ func Init( AllowScatter: !noScatter, WarmingReadsPercent: warmingReadsPercent, QueryLogToFile: queryLogToFile, + + DefaultMultiShardAutocommit: defaultMultiShardAutocommit, } executor := NewExecutor(ctx, env, serv, cell, resolver, eConfig, warnShardedOnly, plans, si, pv, dynamicConfig)