Skip to content

Commit

Permalink
sql: disallow some schema changes in transactions
Browse files Browse the repository at this point in the history
Not all schema changes require extra work after executing
in a transaction. For example CREATE TABLE without foreign
keys goes into effect immediately.

This change affects schema changes that require extra work
after the transaction they are in is committed.
e.g. CREATE INDEX, DROP TABLE, etc. This change affects these
types of schema changes in the following way:

1. any statement in the same txn following a schema change
that requires further work, is disallowed. Since the schema
change is not really complete, running a statement after
it makes no sense.
2. A schema change that requires further work is allowed at
the end of a transaction IIF the preceeding statements in
the transaction are READ_ONLY. This is because the schema
change needs to anchor the transaction on the system range,
and it is unable to do so thanks to #7570

fixes #14280
  • Loading branch information
vivekmenezes committed Apr 4, 2017
1 parent 1ff15ce commit 45614b6
Show file tree
Hide file tree
Showing 5 changed files with 83 additions and 125 deletions.
36 changes: 0 additions & 36 deletions pkg/sql/drop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -497,42 +497,6 @@ func TestDropTableInterleaved(t *testing.T) {
}
}

func TestDropTableInTxn(t *testing.T) {
defer leaktest.AfterTest(t)()
params, _ := createTestServerParams()
s, sqlDB, _ := serverutils.StartServer(t, params)
defer s.Stopper().Stop()

if _, err := sqlDB.Exec(`
CREATE DATABASE t;
CREATE TABLE t.kv (k CHAR PRIMARY KEY, v CHAR);
`); err != nil {
t.Fatal(err)
}

tx, err := sqlDB.Begin()
if err != nil {
t.Fatal(err)
}

if _, err := tx.Exec(`DROP TABLE t.kv`); err != nil {
t.Fatal(err)
}

// We might still be able to read/write in the table inside this transaction
// until the schema changer runs, but we shouldn't be able to ALTER it.
if _, err := tx.Exec(`ALTER TABLE t.kv ADD COLUMN w CHAR`); !testutils.IsError(err,
`table "kv" is being dropped`) {
t.Fatalf("different error than expected: %v", err)
}

// Can't commit after ALTER errored, so we ROLLBACK.
if err := tx.Rollback(); err != nil {
t.Fatal(err)
}

}

func TestDropAndCreateTable(t *testing.T) {
defer leaktest.AfterTest(t)()
params, _ := createTestServerParams()
Expand Down
14 changes: 14 additions & 0 deletions pkg/sql/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ var errNoTransactionToPipeline = errors.New("statement pipelining is only allowe
var errStaleMetadata = errors.New("metadata is still stale")
var errTransactionInProgress = errors.New("there is already a transaction in progress")
var errNotRetriable = errors.New("the transaction is not in a retriable state")
var errStmtFollowsSchemaChange = errors.New("statement cannot follow a schema change in a transaction")

const sqlTxnName string = "sql txn"
const sqlImplicitTxnName string = "sql txn implicit"
Expand Down Expand Up @@ -667,6 +668,7 @@ func (e *Executor) execRequest(

// Track if we are retrying this query, so that we do not double count.
automaticRetryCount := 0
schemaChangerCount := len(txnState.schemaChangers.schemaChangers)
txnClosure := func(ctx context.Context, txn *client.Txn, opt *client.TxnExecOptions) error {
defer func() { automaticRetryCount++ }()
if txnState.State == Open && txnState.txn != txn {
Expand All @@ -675,6 +677,12 @@ func (e *Executor) execRequest(
}
txnState.txn = txn

// Remove all schema changers added by the closure.
if automaticRetryCount > 0 && len(txnState.schemaChangers.schemaChangers) > 0 {
txnState.schemaChangers.schemaChangers =
txnState.schemaChangers.schemaChangers[:schemaChangerCount]
}

if protoTS != nil {
SetTxnTimestamps(txnState.txn, *protoTS)
}
Expand Down Expand Up @@ -917,6 +925,7 @@ func (e *Executor) execStmtsInCurrentTxn(
if log.V(2) || log.HasSpanOrEvent(session.Ctx()) {
log.VEventf(session.Ctx(), 2, "executing %d/%d: %s", i+1, len(stmts), stmt)
}

txnState.schemaChangers.curStatementIdx = i

var stmtStrBefore string
Expand Down Expand Up @@ -1219,6 +1228,10 @@ func (e *Executor) execStmtInOpenTxn(
return Result{PGTag: s.StatementTag()}, nil
}

if len(txnState.schemaChangers.schemaChangers) > 0 {
return Result{}, errStmtFollowsSchemaChange
}

// Create a new planner from the Session to execute the statement.
planner := session.newPlanner(e, txnState.txn)
planner.evalCtx.SetTxnTimestamp(txnState.sqlTimestamp)
Expand All @@ -1234,6 +1247,7 @@ func (e *Executor) execStmtInOpenTxn(
autoCommit := implicitTxn && !e.cfg.TestingKnobs.DisableAutoCommit
result, err = e.execStmt(stmt, planner, autoCommit, automaticRetryCount)
}

if err != nil {
if independentFromPipelined {
// If the statement run was independent from pipelined execution, it
Expand Down
3 changes: 2 additions & 1 deletion pkg/sql/plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,8 @@ func (p *planner) newPlan(
// refreshes, but that's expected to be quite rare in practice.
if stmt.StatementType() == parser.DDL {
if err := p.txn.SetSystemConfigTrigger(); err != nil {
return nil, err
return nil, errors.Wrap(err,
"schema change statement cannot follow a statement that has written in the same transaction")
}
}

Expand Down
88 changes: 0 additions & 88 deletions pkg/sql/rename_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,91 +238,3 @@ CREATE TABLE test.t (a INT PRIMARY KEY);
// up. It needed to wait for the transaction to release its lease.
<-threadDone
}

// Test that a txn doing a rename can use the new name immediately.
// It can also use the old name if it took a lease on it before the rename, for
// better or worse.
func TestTxnCanUseNewNameAfterRename(t *testing.T) {
defer leaktest.AfterTest(t)()
s, db, _ := serverutils.StartServer(t, base.TestServerArgs{})
defer s.Stopper().Stop()

sql := `
CREATE DATABASE test;
CREATE TABLE test.t (a INT PRIMARY KEY);
`
_, err := db.Exec(sql)
if err != nil {
t.Fatal(err)
}

txn, err := db.Begin()
if err != nil {
t.Fatal(err)
}

// Make sure we take a lease on the version called "t".
if _, err := txn.Exec("SELECT * FROM test.t"); err != nil {
t.Fatal(err)
}
if _, err := txn.Exec("ALTER TABLE test.t RENAME TO test.t2"); err != nil {
t.Fatal(err)
}
// Check that we can use the new name.
if _, err := txn.Exec("SELECT * FROM test.t2"); err != nil {
t.Fatal(err)
}
// Check that we can also use the old name, since we have a lease on it.
if _, err := txn.Exec("SELECT * FROM test.t"); err != nil {
t.Fatal(err)
}
if err := txn.Commit(); err != nil {
t.Fatal(err)
}
}

// Check that we properly cleanup all the temporary names when performing a
// series of renames in a transaction.
func TestSeriesOfRenames(t *testing.T) {
defer leaktest.AfterTest(t)()
s, db, _ := serverutils.StartServer(t, base.TestServerArgs{})
defer s.Stopper().Stop()

sql := `
CREATE DATABASE test;
CREATE TABLE test.t (a INT PRIMARY KEY);
`
_, err := db.Exec(sql)
if err != nil {
t.Fatal(err)
}

txn, err := db.Begin()
if err != nil {
t.Fatal(err)
}
if _, err := txn.Exec("ALTER TABLE test.t RENAME TO test.t2"); err != nil {
t.Fatal(err)
}
if _, err := txn.Exec("ALTER TABLE test.t2 RENAME TO test.t3"); err != nil {
t.Fatal(err)
}
if _, err := txn.Exec("ALTER TABLE test.t3 RENAME TO test.t4"); err != nil {
t.Fatal(err)
}
if err := txn.Commit(); err != nil {
t.Fatal(err)
}

// Check that the temp names have been properly cleaned up by creating tables
// with those names.
if _, err := db.Exec("CREATE TABLE test.t (a INT PRIMARY KEY)"); err != nil {
t.Fatal(err)
}
if _, err := db.Exec("CREATE TABLE test.t2 (a INT PRIMARY KEY)"); err != nil {
t.Fatal(err)
}
if _, err := db.Exec("CREATE TABLE test.t3 (a INT PRIMARY KEY)"); err != nil {
t.Fatal(err)
}
}
67 changes: 67 additions & 0 deletions pkg/sql/schema_changer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1830,3 +1830,70 @@ func TestBackfillCompletesOnChunkBoundary(t *testing.T) {
})
}
}

func TestSchemaChangeInTxn(t *testing.T) {
defer leaktest.AfterTest(t)()
params, _ := createTestServerParams()
s, sqlDB, _ := serverutils.StartServer(t, params)
defer s.Stopper().Stop()

if _, err := sqlDB.Exec(`
CREATE DATABASE t;
CREATE TABLE t.kv (k CHAR PRIMARY KEY, v CHAR);
INSERT INTO t.kv VALUES ('a', 'b');
`); err != nil {
t.Fatal(err)
}

testCases := []struct {
name string
firstStmt string
secondStmt string
expectedErr string
}{
// drop table followed by create table case.
{`drop-create`, `DROP TABLE t.kv`, `CREATE TABLE t.kv (k CHAR PRIMARY KEY, v CHAR)`,
`statement cannot follow a schema change in a transaction`},
// schema change followed by another statement.
{`create-insert`, `CREATE INDEX foo ON t.kv (v)`, `INSERT INTO t.kv VALUES ('c', 'd')`,
`statement cannot follow a schema change in a transaction`},
// schema change at the end of a transaction that has written.
{`insert-create`, `INSERT INTO t.kv VALUES ('c', 'd')`, `CREATE INDEX foo ON t.kv (v)`,
`schema change statement cannot follow a statement that has written in the same transaction`},
// schema change at the end of a read only transaction.
{`select-create`, `SELECT * FROM t.kv`, `CREATE INDEX foo ON t.kv (v)`, ``},
}

for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
tx, err := sqlDB.Begin()
if err != nil {
t.Fatal(err)
}

if _, err := tx.Exec(testCase.firstStmt); err != nil {
t.Fatal(err)
}

_, err = tx.Exec(testCase.secondStmt)

if testCase.expectedErr != "" {
if !testutils.IsError(err, testCase.expectedErr) {
t.Fatalf("different error than expected: %v", err)
}

// Can't commit after ALTER errored, so we ROLLBACK.
if err := tx.Rollback(); err != nil {
t.Fatal(err)
}
} else {
if err != nil {
t.Fatal(err)
}
if err := tx.Commit(); err != nil {
t.Fatal(err)
}
}
})
}
}

0 comments on commit 45614b6

Please sign in to comment.