Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go/vt/vtgate/engine/dml.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func allowOnlyPrimary(rss ...*srvtopo.ResolvedShard) error {
}

func (dml *DML) execMultiShard(ctx context.Context, primitive Primitive, vcursor VCursor, rss []*srvtopo.ResolvedShard, queries []*querypb.BoundQuery) (*sqltypes.Result, error) {
autocommit := (len(rss) == 1 || dml.MultiShardAutocommit) && vcursor.AutocommitApproval()
autocommit := (len(rss) == 1 || vcursor.DefaultMultiShardAutocommit() || dml.MultiShardAutocommit) && vcursor.AutocommitApproval()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dml.MultiShardAutocommit should be authoritative over the default, so if we want to override the default let's say true with a false we have to consider that.

Copy link
Member

@harshit-gangal harshit-gangal Mar 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We would need to store like a enum or a pointer bool in the plan to know if this is set or not.

result, errs := vcursor.ExecuteMultiShard(ctx, primitive, rss, queries, true /*rollbackOnError*/, autocommit, dml.FetchLastInsertID)
return result, vterrors.Aggregate(errs)
}
Expand Down
8 changes: 8 additions & 0 deletions go/vt/vtgate/engine/fake_vcursor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,10 @@ func (t *noopVCursor) GetLogs() ([]ExecuteEntry, error) {
func (t *noopVCursor) RecordMirrorStats(sourceExecTime, targetExecTime time.Duration, targetErr error) {
}

func (t *noopVCursor) DefaultMultiShardAutocommit() bool {
return false
}

var (
_ VCursor = (*loggingVCursor)(nil)
_ SessionActions = (*loggingVCursor)(nil)
Expand Down Expand Up @@ -845,6 +849,10 @@ func (f *loggingVCursor) SetPriority(string) {
panic("implement me")
}

func (f *loggingVCursor) DefaultMultiShardAutocommit() bool {
return false
}

func (f *loggingVCursor) FindRoutedTable(tbl sqlparser.TableName) (*vindexes.Table, error) {
f.log = append(f.log, fmt.Sprintf("FindTable(%s)", sqlparser.String(tbl)))
return f.tableRoutes.tbl, nil
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtgate/engine/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ func (ins *Insert) executeInsertQueries(
queries []*querypb.BoundQuery,
insertID uint64,
) (*sqltypes.Result, error) {
autocommit := (len(rss) == 1 || ins.MultiShardAutocommit) && vcursor.AutocommitApproval()
autocommit := (len(rss) == 1 || vcursor.DefaultMultiShardAutocommit() || ins.MultiShardAutocommit) && vcursor.AutocommitApproval()
err := allowOnlyPrimary(rss...)
if err != nil {
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtgate/engine/insert_select.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ func (ins *InsertSelect) executeInsertQueries(
queries []*querypb.BoundQuery,
insertID uint64,
) (*sqltypes.Result, error) {
autocommit := (len(rss) == 1 || ins.MultiShardAutocommit) && vcursor.AutocommitApproval()
autocommit := (len(rss) == 1 || vcursor.DefaultMultiShardAutocommit() || ins.MultiShardAutocommit) && vcursor.AutocommitApproval()
err := allowOnlyPrimary(rss...)
if err != nil {
return nil, err
Expand Down
3 changes: 3 additions & 0 deletions go/vt/vtgate/engine/primitive.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,9 @@ type (
RecordMirrorStats(time.Duration, time.Duration, error)

SetLastInsertID(uint64)

// DefaultMultiShardAutocommit returns true if multi shard autocommit semantics are enabled by default
DefaultMultiShardAutocommit() bool
}

// SessionActions gives primitives ability to interact with the session state
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtgate/engine/send.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func (s *Send) checkAndReturnShards(ctx context.Context, vcursor VCursor) ([]*sr

func (s *Send) canAutoCommit(vcursor VCursor, rss []*srvtopo.ResolvedShard) bool {
if s.IsDML {
return (len(rss) == 1 || s.MultishardAutocommit) && vcursor.AutocommitApproval()
return (len(rss) == 1 || vcursor.DefaultMultiShardAutocommit() || s.MultishardAutocommit) && vcursor.AutocommitApproval()
}
return false
}
Expand Down
5 changes: 5 additions & 0 deletions go/vt/vtgate/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,9 @@ type (
AllowScatter bool
WarmingReadsPercent int
QueryLogToFile string

// DefaultMultiShardAutocommit will opt into autocommit semantics even for multi shard DMLs
DefaultMultiShardAutocommit bool
Comment on lines +118 to +120
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: This is not used by executor, it is only passed to vcusor. We can set on the vcursor config directly.

}

Executor struct {
Expand Down Expand Up @@ -1424,6 +1427,8 @@ func (e *Executor) initVConfig(warnOnShardedOnly bool, pv plancontext.PlannerVer
WarmingReadsPercent: e.config.WarmingReadsPercent,
WarmingReadsTimeout: warmingReadsQueryTimeout,
WarmingReadsChannel: e.warmingReadsChannel,

DefaultMultiShardAutocommit: e.config.DefaultMultiShardAutocommit,
}
}

Expand Down
6 changes: 6 additions & 0 deletions go/vt/vtgate/executorcontext/vcursor_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ type (
WarmingReadsPercent int
WarmingReadsTimeout time.Duration
WarmingReadsChannel chan bool

DefaultMultiShardAutocommit bool
}

// vcursor_impl needs these facilities to be able to be able to execute queries for vindexes
Expand Down Expand Up @@ -1619,3 +1621,7 @@ func (vc *VCursorImpl) SetLastInsertID(id uint64) {
defer vc.SafeSession.mu.Unlock()
vc.SafeSession.LastInsertId = id
}

func (vc *VCursorImpl) DefaultMultiShardAutocommit() bool {
return vc.config.DefaultMultiShardAutocommit
}
5 changes: 5 additions & 0 deletions go/vt/vtgate/vtgate.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ var (
noScatter bool
enableShardRouting bool

defaultMultiShardAutocommit bool

// healthCheckRetryDelay is the time to wait before retrying healthcheck
healthCheckRetryDelay = 2 * time.Millisecond
// healthCheckTimeout is the timeout on the RPC call to tablets
Expand Down Expand Up @@ -199,6 +201,7 @@ func registerFlags(fs *pflag.FlagSet) {
fs.IntVar(&warmingReadsPercent, "warming-reads-percent", 0, "Percentage of reads on the primary to forward to replicas. Useful for keeping buffer pools warm")
fs.IntVar(&warmingReadsConcurrency, "warming-reads-concurrency", 500, "Number of concurrent warming reads allowed")
fs.DurationVar(&warmingReadsQueryTimeout, "warming-reads-query-timeout", 5*time.Second, "Timeout of warming read queries")
fs.BoolVar(&defaultMultiShardAutocommit, "default-multi-shard-autocommit", defaultMultiShardAutocommit, "By default execute multi-shard DML statements with autocommit, even without the MULTI_SHARD_AUTOCOMMIT directive")

viperutil.BindFlags(fs,
enableOnlineDDL,
Expand Down Expand Up @@ -358,6 +361,8 @@ func Init(
AllowScatter: !noScatter,
WarmingReadsPercent: warmingReadsPercent,
QueryLogToFile: queryLogToFile,

DefaultMultiShardAutocommit: defaultMultiShardAutocommit,
}

executor := NewExecutor(ctx, env, serv, cell, resolver, eConfig, warnShardedOnly, plans, si, pv, dynamicConfig)
Expand Down
Loading