diff --git a/pkg/sql/drop_test.go b/pkg/sql/drop_test.go index b86ca0abaa1f..7395645c59ba 100644 --- a/pkg/sql/drop_test.go +++ b/pkg/sql/drop_test.go @@ -497,42 +497,6 @@ func TestDropTableInterleaved(t *testing.T) { } } -func TestDropTableInTxn(t *testing.T) { - defer leaktest.AfterTest(t)() - params, _ := createTestServerParams() - s, sqlDB, _ := serverutils.StartServer(t, params) - defer s.Stopper().Stop() - - if _, err := sqlDB.Exec(` -CREATE DATABASE t; -CREATE TABLE t.kv (k CHAR PRIMARY KEY, v CHAR); -`); err != nil { - t.Fatal(err) - } - - tx, err := sqlDB.Begin() - if err != nil { - t.Fatal(err) - } - - if _, err := tx.Exec(`DROP TABLE t.kv`); err != nil { - t.Fatal(err) - } - - // We might still be able to read/write in the table inside this transaction - // until the schema changer runs, but we shouldn't be able to ALTER it. - if _, err := tx.Exec(`ALTER TABLE t.kv ADD COLUMN w CHAR`); !testutils.IsError(err, - `table "kv" is being dropped`) { - t.Fatalf("different error than expected: %v", err) - } - - // Can't commit after ALTER errored, so we ROLLBACK. - if err := tx.Rollback(); err != nil { - t.Fatal(err) - } - -} - func TestDropAndCreateTable(t *testing.T) { defer leaktest.AfterTest(t)() params, _ := createTestServerParams() diff --git a/pkg/sql/executor.go b/pkg/sql/executor.go index 3a24e3702f5d..cc182abf5b30 100644 --- a/pkg/sql/executor.go +++ b/pkg/sql/executor.go @@ -56,6 +56,7 @@ var errNoTransactionToPipeline = errors.New("statement pipelining is only allowe var errStaleMetadata = errors.New("metadata is still stale") var errTransactionInProgress = errors.New("there is already a transaction in progress") var errNotRetriable = errors.New("the transaction is not in a retriable state") +var errStmtFollowsSchemaChange = errors.New("statement cannot follow a schema change in a transaction") const sqlTxnName string = "sql txn" const sqlImplicitTxnName string = "sql txn implicit" @@ -667,6 +668,7 @@ func (e *Executor) execRequest( // Track if we are retrying this query, so that we do not double count. automaticRetryCount := 0 + schemaChangerCount := len(txnState.schemaChangers.schemaChangers) txnClosure := func(ctx context.Context, txn *client.Txn, opt *client.TxnExecOptions) error { defer func() { automaticRetryCount++ }() if txnState.State == Open && txnState.txn != txn { @@ -675,6 +677,12 @@ func (e *Executor) execRequest( } txnState.txn = txn + // Remove all schema changers added by the closure. + if automaticRetryCount > 0 && len(txnState.schemaChangers.schemaChangers) > 0 { + txnState.schemaChangers.schemaChangers = + txnState.schemaChangers.schemaChangers[:schemaChangerCount] + } + if protoTS != nil { SetTxnTimestamps(txnState.txn, *protoTS) } @@ -917,6 +925,7 @@ func (e *Executor) execStmtsInCurrentTxn( if log.V(2) || log.HasSpanOrEvent(session.Ctx()) { log.VEventf(session.Ctx(), 2, "executing %d/%d: %s", i+1, len(stmts), stmt) } + txnState.schemaChangers.curStatementIdx = i var stmtStrBefore string @@ -1219,6 +1228,10 @@ func (e *Executor) execStmtInOpenTxn( return Result{PGTag: s.StatementTag()}, nil } + if len(txnState.schemaChangers.schemaChangers) > 0 { + return Result{}, errStmtFollowsSchemaChange + } + // Create a new planner from the Session to execute the statement. planner := session.newPlanner(e, txnState.txn) planner.evalCtx.SetTxnTimestamp(txnState.sqlTimestamp) @@ -1234,6 +1247,7 @@ func (e *Executor) execStmtInOpenTxn( autoCommit := implicitTxn && !e.cfg.TestingKnobs.DisableAutoCommit result, err = e.execStmt(stmt, planner, autoCommit, automaticRetryCount) } + if err != nil { if independentFromPipelined { // If the statement run was independent from pipelined execution, it diff --git a/pkg/sql/plan.go b/pkg/sql/plan.go index fe03d9990fd6..c3c986d62f26 100644 --- a/pkg/sql/plan.go +++ b/pkg/sql/plan.go @@ -267,7 +267,8 @@ func (p *planner) newPlan( // refreshes, but that's expected to be quite rare in practice. if stmt.StatementType() == parser.DDL { if err := p.txn.SetSystemConfigTrigger(); err != nil { - return nil, err + return nil, errors.Wrap(err, + "schema change statement cannot follow a statement that has written in the same transaction") } } diff --git a/pkg/sql/rename_test.go b/pkg/sql/rename_test.go index ceb2652b6277..3e33a70569d1 100644 --- a/pkg/sql/rename_test.go +++ b/pkg/sql/rename_test.go @@ -238,91 +238,3 @@ CREATE TABLE test.t (a INT PRIMARY KEY); // up. It needed to wait for the transaction to release its lease. <-threadDone } - -// Test that a txn doing a rename can use the new name immediately. -// It can also use the old name if it took a lease on it before the rename, for -// better or worse. -func TestTxnCanUseNewNameAfterRename(t *testing.T) { - defer leaktest.AfterTest(t)() - s, db, _ := serverutils.StartServer(t, base.TestServerArgs{}) - defer s.Stopper().Stop() - - sql := ` -CREATE DATABASE test; -CREATE TABLE test.t (a INT PRIMARY KEY); -` - _, err := db.Exec(sql) - if err != nil { - t.Fatal(err) - } - - txn, err := db.Begin() - if err != nil { - t.Fatal(err) - } - - // Make sure we take a lease on the version called "t". - if _, err := txn.Exec("SELECT * FROM test.t"); err != nil { - t.Fatal(err) - } - if _, err := txn.Exec("ALTER TABLE test.t RENAME TO test.t2"); err != nil { - t.Fatal(err) - } - // Check that we can use the new name. - if _, err := txn.Exec("SELECT * FROM test.t2"); err != nil { - t.Fatal(err) - } - // Check that we can also use the old name, since we have a lease on it. - if _, err := txn.Exec("SELECT * FROM test.t"); err != nil { - t.Fatal(err) - } - if err := txn.Commit(); err != nil { - t.Fatal(err) - } -} - -// Check that we properly cleanup all the temporary names when performing a -// series of renames in a transaction. -func TestSeriesOfRenames(t *testing.T) { - defer leaktest.AfterTest(t)() - s, db, _ := serverutils.StartServer(t, base.TestServerArgs{}) - defer s.Stopper().Stop() - - sql := ` -CREATE DATABASE test; -CREATE TABLE test.t (a INT PRIMARY KEY); -` - _, err := db.Exec(sql) - if err != nil { - t.Fatal(err) - } - - txn, err := db.Begin() - if err != nil { - t.Fatal(err) - } - if _, err := txn.Exec("ALTER TABLE test.t RENAME TO test.t2"); err != nil { - t.Fatal(err) - } - if _, err := txn.Exec("ALTER TABLE test.t2 RENAME TO test.t3"); err != nil { - t.Fatal(err) - } - if _, err := txn.Exec("ALTER TABLE test.t3 RENAME TO test.t4"); err != nil { - t.Fatal(err) - } - if err := txn.Commit(); err != nil { - t.Fatal(err) - } - - // Check that the temp names have been properly cleaned up by creating tables - // with those names. - if _, err := db.Exec("CREATE TABLE test.t (a INT PRIMARY KEY)"); err != nil { - t.Fatal(err) - } - if _, err := db.Exec("CREATE TABLE test.t2 (a INT PRIMARY KEY)"); err != nil { - t.Fatal(err) - } - if _, err := db.Exec("CREATE TABLE test.t3 (a INT PRIMARY KEY)"); err != nil { - t.Fatal(err) - } -} diff --git a/pkg/sql/schema_changer_test.go b/pkg/sql/schema_changer_test.go index b789f44bcd90..6d3dace6cbc4 100644 --- a/pkg/sql/schema_changer_test.go +++ b/pkg/sql/schema_changer_test.go @@ -1830,3 +1830,70 @@ func TestBackfillCompletesOnChunkBoundary(t *testing.T) { }) } } + +func TestSchemaChangeInTxn(t *testing.T) { + defer leaktest.AfterTest(t)() + params, _ := createTestServerParams() + s, sqlDB, _ := serverutils.StartServer(t, params) + defer s.Stopper().Stop() + + if _, err := sqlDB.Exec(` +CREATE DATABASE t; +CREATE TABLE t.kv (k CHAR PRIMARY KEY, v CHAR); +INSERT INTO t.kv VALUES ('a', 'b'); +`); err != nil { + t.Fatal(err) + } + + testCases := []struct { + name string + firstStmt string + secondStmt string + expectedErr string + }{ + // drop table followed by create table case. + {`drop-create`, `DROP TABLE t.kv`, `CREATE TABLE t.kv (k CHAR PRIMARY KEY, v CHAR)`, + `statement cannot follow a schema change in a transaction`}, + // schema change followed by another statement. + {`create-insert`, `CREATE INDEX foo ON t.kv (v)`, `INSERT INTO t.kv VALUES ('c', 'd')`, + `statement cannot follow a schema change in a transaction`}, + // schema change at the end of a transaction that has written. + {`insert-create`, `INSERT INTO t.kv VALUES ('c', 'd')`, `CREATE INDEX foo ON t.kv (v)`, + `schema change statement cannot follow a statement that has written in the same transaction`}, + // schema change at the end of a read only transaction. + {`select-create`, `SELECT * FROM t.kv`, `CREATE INDEX foo ON t.kv (v)`, ``}, + } + + for _, testCase := range testCases { + t.Run(testCase.name, func(t *testing.T) { + tx, err := sqlDB.Begin() + if err != nil { + t.Fatal(err) + } + + if _, err := tx.Exec(testCase.firstStmt); err != nil { + t.Fatal(err) + } + + _, err = tx.Exec(testCase.secondStmt) + + if testCase.expectedErr != "" { + if !testutils.IsError(err, testCase.expectedErr) { + t.Fatalf("different error than expected: %v", err) + } + + // Can't commit after ALTER errored, so we ROLLBACK. + if err := tx.Rollback(); err != nil { + t.Fatal(err) + } + } else { + if err != nil { + t.Fatal(err) + } + if err := tx.Commit(); err != nil { + t.Fatal(err) + } + } + }) + } +}