Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions go/test/endtoend/transaction/single/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,45 @@ func TestNoRecordInTableNotFail(t *testing.T) {
}
}

func TestOnlyMultiShardWriteFail(t *testing.T) {
conn, cleanup := setup(t)
defer func() {
cleanup()
conn.Close()
}()

// basic test to check that multi-shard transaction is not allowed.
t.Run("insert-select-insert fail", func(t *testing.T) {
utils.Exec(t, conn, `begin`)
utils.Exec(t, conn, `INSERT INTO t1(id, txn_id) VALUES (1, "a")`)
// read is ok on another shard.
utils.Exec(t, conn, `select * from t1 where txn_id = "b"`)
// write on it will fail.
_, err := utils.ExecAllowError(t, conn, `INSERT INTO t1(id, txn_id) VALUES (2, "b")`)
require.ErrorContains(t, err, "multi-db transaction attempted")
utils.Exec(t, conn, `rollback`)
})

// test with select query on different shards and one write query.
t.Run("select-select-insert pass", func(t *testing.T) {
utils.Exec(t, conn, `begin`)
utils.Exec(t, conn, `select * from t1 where txn_id = "a"`)
utils.Exec(t, conn, `select * from t1 where txn_id = "b"`)
utils.Exec(t, conn, `INSERT INTO t1(id, txn_id) VALUES (1, "a")`)
utils.Exec(t, conn, `commit`)
})

// test with one write query and multiple select.
t.Run("insert-select-select pass", func(t *testing.T) {
utils.Exec(t, conn, `begin`)
utils.Exec(t, conn, `INSERT INTO t1(id, txn_id) VALUES (2, "b")`)
utils.Exec(t, conn, `select * from t1 where txn_id in ("a", "b", "c")`)
utils.Exec(t, conn, `select * from t1 where txn_id in ("d", "e", "f")`)
utils.Exec(t, conn, `commit`)
})

}

func setup(t *testing.T) (*mysql.Conn, func()) {
t.Helper()
conn, err := mysql.Connect(context.Background(), &vtParams)
Expand Down
16 changes: 8 additions & 8 deletions go/vt/proto/vtgate/vtgate.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 6 additions & 6 deletions go/vt/proto/vtgate/vtgate_vtproto.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 5 additions & 4 deletions go/vt/sqlparser/analyzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ type StatementType int
// These constants are used to identify the SQL statement type.
// Changing this list will require reviewing all calls to Preview.
const (
StmtSelect StatementType = iota
StmtUnknown StatementType = iota
StmtSelect
StmtStream
StmtInsert
StmtReplace
Expand All @@ -45,7 +46,6 @@ const (
StmtUse
StmtOther
StmtAnalyze
StmtUnknown
StmtComment
StmtPriv
StmtExplain
Expand Down Expand Up @@ -421,9 +421,10 @@ func IsSimpleTuple(node Expr) bool {
return false
}

func SupportsOptimizerHint(stmt StatementType) bool {
// IsReadStatement returns true if the statement is a read statement.
func (stmt StatementType) IsReadStatement() bool {
switch stmt {
case StmtSelect, StmtInsert, StmtUpdate, StmtDelete, StmtStream, StmtVStream:
case StmtSelect, StmtShow:
return true
default:
return false
Expand Down
42 changes: 15 additions & 27 deletions go/vt/vtgate/executorcontext/safe_session.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,9 @@ type (
// and doesn't have to be updated by the executor
foundRowsHandled bool

// queryFromVindex is used to avoid erroring out on multi-db transaction
// execReadQuery is used to avoid erroring out on multi-db transaction
// as the query that started a new transaction on the shard belong to a vindex.
queryFromVindex bool
execReadQuery bool

logging *ExecuteLogger

Expand Down Expand Up @@ -241,18 +241,11 @@ func (session *SafeSession) GetRollbackOnPartialExec() string {
return session.rollbackOnPartialExec
}

// SetQueryFromVindex set the queryFromVindex value.
func (session *SafeSession) SetQueryFromVindex(value bool) {
// SetExecReadQuery set the execReadQuery value.
func (session *SafeSession) SetExecReadQuery(value bool) {
session.mu.Lock()
defer session.mu.Unlock()
session.queryFromVindex = value
}

// GetQueryFromVindex returns the queryFromVindex value.
func (session *SafeSession) GetQueryFromVindex() bool {
session.mu.Lock()
defer session.mu.Unlock()
return session.queryFromVindex
session.execReadQuery = value
}

// SetQueryTimeout sets the query timeout
Expand Down Expand Up @@ -427,8 +420,8 @@ func (session *SafeSession) InTransaction() bool {
// Key behavior:
// 1. Retrieves the appropriate list of sessions (PreSessions, PostSessions, or default ShardSessions) based on the commit order.
// 2. Identifies a matching session by keyspace, shard, and tablet type.
// 3. If the session meets specific conditions (e.g., non-vindex-only, single transaction mode), it updates the session state:
// - Converts a vindex-only session to a standard session if required by the transaction type.
// 3. If the session meets specific conditions (e.g., dml, single transaction mode), it updates the session state:
// - Converts a non-dml session to a standard session if required by the transaction type.
// - If a multi-shard transaction is detected in Single mode, marks the session for rollback and returns an error.
//
// Parameters:
Expand All @@ -446,21 +439,16 @@ func (session *SafeSession) FindAndChangeSessionIfInSingleTxMode(keyspace, shard

shardSession := session.findSessionLocked(keyspace, shard, tabletType)

if shardSession == nil {
return nil, nil
}

if !shardSession.VindexOnly {
if shardSession == nil || !shardSession.ReadOnly || session.execReadQuery {
return shardSession, nil
}

if err := session.singleModeErrorOnCrossShard(txMode, 0); err != nil {
return nil, err
}

// the shard session is now used by non-vindex query as well,
// so it is not an exclusive vindex only shard session anymore.
shardSession.VindexOnly = false
// the shard session is now used by dml query as well.
shardSession.ReadOnly = false
return shardSession, nil
}

Expand Down Expand Up @@ -518,8 +506,8 @@ func (session *SafeSession) AppendOrUpdate(target *querypb.Target, info ShardAct
if !existingSession.RowsAffected {
existingSession.RowsAffected = info.RowsAffected()
}
if existingSession.VindexOnly {
existingSession.VindexOnly = session.queryFromVindex
if existingSession.ReadOnly {
existingSession.ReadOnly = session.execReadQuery
}
if err := session.singleModeErrorOnCrossShard(txMode, 1); err != nil {
return err
Expand All @@ -532,7 +520,7 @@ func (session *SafeSession) AppendOrUpdate(target *querypb.Target, info ShardAct
TransactionId: info.TransactionID(),
ReservedId: info.ReservedID(),
RowsAffected: info.RowsAffected(),
VindexOnly: session.queryFromVindex,
ReadOnly: session.execReadQuery,
}

// Always append, in order for rollback to succeed.
Expand Down Expand Up @@ -560,7 +548,7 @@ func (session *SafeSession) singleModeErrorOnCrossShard(txMode vtgatepb.Transact
// 1. The query comes from a lookup vindex.
// 2. The transaction mode is not Single.
// 3. The transaction is not in the normal shard session.
if session.queryFromVindex || session.commitOrder != vtgatepb.CommitOrder_NORMAL || !session.isSingleDB(txMode) {
if session.execReadQuery || session.commitOrder != vtgatepb.CommitOrder_NORMAL || !session.isSingleDB(txMode) {
return nil
}

Expand All @@ -576,7 +564,7 @@ func (session *SafeSession) singleModeErrorOnCrossShard(txMode vtgatepb.Transact
func actualNoOfShardSession(sessions []*vtgatepb.Session_ShardSession) int {
actualSS := 0
for _, ss := range sessions {
if ss.VindexOnly {
if ss.ReadOnly {
continue
}
actualSS++
Expand Down
10 changes: 5 additions & 5 deletions go/vt/vtgate/executorcontext/safe_session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,14 +81,14 @@ func TestSingleDbUpdateToMultiShard(t *testing.T) {
})

// shard session s0 due to a vindex query
session.queryFromVindex = true
session.execReadQuery = true
err := session.AppendOrUpdate(
&querypb.Target{Keyspace: "keyspace", Shard: "0"},
info(1, 0),
nil,
vtgatepb.TransactionMode_SINGLE)
require.NoError(t, err)
session.queryFromVindex = false
session.execReadQuery = false

// shard session s1
err = session.AppendOrUpdate(
Expand All @@ -115,14 +115,14 @@ func TestSingleDbPreFailOnFind(t *testing.T) {
})

// shard session s0 due to a vindex query
session.queryFromVindex = true
session.execReadQuery = true
err := session.AppendOrUpdate(
&querypb.Target{Keyspace: "keyspace", Shard: "0"},
info(1, 0),
nil,
vtgatepb.TransactionMode_SINGLE)
require.NoError(t, err)
session.queryFromVindex = false
session.execReadQuery = false

// shard session s1
err = session.AppendOrUpdate(
Expand All @@ -140,7 +140,7 @@ func TestSingleDbPreFailOnFind(t *testing.T) {
vtgatepb.TransactionMode_SINGLE)
require.NoError(t, err)
require.NotNil(t, ss)
require.False(t, ss.VindexOnly)
require.False(t, ss.ReadOnly)
require.EqualValues(t, 1, ss.TabletAlias.Uid)

// shard session s0 for normal query
Expand Down
8 changes: 5 additions & 3 deletions go/vt/vtgate/executorcontext/vcursor_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -840,11 +840,13 @@ func (vc *VCursorImpl) markSavepoint(ctx context.Context, needsRollbackOnParialE
}
uID := fmt.Sprintf("_vt%s", strings.ReplaceAll(uuid.NewString(), "-", "_"))
spQuery := fmt.Sprintf("%ssavepoint %s%s", vc.marginComments.Leading, uID, vc.marginComments.Trailing)
vc.SafeSession.SetExecReadQuery(true)
_, err := vc.executor.Execute(ctx, nil, "MarkSavepoint", vc.SafeSession, spQuery, bindVars, false)
if err != nil {
return err
}
vc.SafeSession.SetSavepoint(uID)
vc.SafeSession.SetExecReadQuery(false)
return nil
}

Expand Down Expand Up @@ -924,11 +926,11 @@ func (vc *VCursorImpl) ExecuteKeyspaceID(ctx context.Context, keyspace string, k
// This function is only called from consistent_lookup vindex when the lookup row getting inserting finds a duplicate.
// In such scenario, original row needs to be locked to check if it already exists or no other transaction is working on it or does not write to it.
// This creates a transaction but that transaction is for locking purpose only and should not cause multi-db transaction error.
// This fields helps in to ignore multi-db transaction error when it states `queryFromVindex`.
// This fields helps in to ignore multi-db transaction error when it states `execReadQuery`.
if !rollbackOnError {
vc.SafeSession.SetQueryFromVindex(true)
vc.SafeSession.SetExecReadQuery(true)
defer func() {
vc.SafeSession.SetQueryFromVindex(false)
vc.SafeSession.SetExecReadQuery(false)
}()
}
qr, errs := vc.ExecuteMultiShard(ctx, nil, rss, queries, rollbackOnError, autocommit, false)
Expand Down
3 changes: 3 additions & 0 deletions go/vt/vtgate/plan_execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,9 @@ func (e *Executor) newExecute(
return err
}

// Set the session variable to indicate if the query is a read query or not.
safeSession.SetExecReadQuery(plan.QueryType.IsReadStatement())

// Execute the plan.
if plan.Instructions.NeedsTransaction() {
err = e.insideTransaction(ctx, safeSession, logStats,
Expand Down
3 changes: 2 additions & 1 deletion proto/vtgate.proto
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ message Session {
topodata.TabletAlias tablet_alias = 3;
// reserved connection if a dedicated connection is needed
int64 reserved_id = 4;
bool vindex_only = 5;
// read_only is true if the session has only executed read queries.
bool read_only = 5;
// rows_affected tracks if any query has modified the rows.
bool rows_affected = 6;
}
Expand Down
Loading