Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sql: allow CREATE TABLE after DROP TABLE in the same txn #19112

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 21 additions & 7 deletions pkg/sql/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -528,9 +528,15 @@ func (n *createViewNode) Start(params runParams) error {
viewName := n.n.Name.TableName().Table()
tKey := tableKey{parentID: n.dbDesc.ID, name: viewName}
key := tKey.Key()
if exists, err := descExists(params.ctx, n.p.txn, key); err == nil && exists {
// TODO(a-robinson): Support CREATE OR REPLACE commands.
return sqlbase.NewRelationAlreadyExistsError(tKey.Name())
if id, err := getDescID(params.ctx, n.p.txn, key); err == nil && id != sqlbase.InvalidID {
found, err := params.p.session.maybeDeleteDescriptorName(params.ctx, n.p.txn, key, id)
if err != nil {
return err
}
if !found {
// TODO(a-robinson): Support CREATE OR REPLACE commands.
return sqlbase.NewRelationAlreadyExistsError(tKey.Name())
}
} else if err != nil {
return err
}
Expand Down Expand Up @@ -710,13 +716,21 @@ func HoistConstraints(n *parser.CreateTable) {
}

func (n *createTableNode) Start(params runParams) error {
ctx := params.ctx
txn := params.p.txn
tKey := tableKey{parentID: n.dbDesc.ID, name: n.n.Table.TableName().Table()}
key := tKey.Key()
if exists, err := descExists(params.ctx, params.p.txn, key); err == nil && exists {
if n.n.IfNotExists {
return nil
if id, err := getDescID(ctx, txn, key); err == nil && id != sqlbase.InvalidID {
found, err := params.p.session.maybeDeleteDescriptorName(ctx, txn, key, id)
if err != nil {
return err
}
if !found {
if n.n.IfNotExists {
return nil
}
return sqlbase.NewRelationAlreadyExistsError(tKey.Name())
}
return sqlbase.NewRelationAlreadyExistsError(tKey.Name())
} else if err != nil {
return err
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/sql/descriptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func (p *planner) createDescriptor(
) (bool, error) {
idKey := plainKey.Key()

if exists, err := descExists(ctx, p.txn, idKey); err == nil && exists {
if id, err := getDescID(ctx, p.txn, idKey); err == nil && id != sqlbase.InvalidID {
if ifNotExists {
// Noop.
return false, nil
Expand All @@ -109,13 +109,13 @@ func (p *planner) createDescriptor(
return true, p.createDescriptorWithID(ctx, idKey, id, descriptor)
}

func descExists(ctx context.Context, txn *client.Txn, idKey roachpb.Key) (bool, error) {
func getDescID(ctx context.Context, txn *client.Txn, idKey roachpb.Key) (sqlbase.ID, error) {
// Check whether idKey exists.
gr, err := txn.Get(ctx, idKey)
if err != nil {
return false, err
if err != nil || !gr.Exists() {
return sqlbase.InvalidID, err
}
return gr.Exists(), nil
return sqlbase.ID(gr.ValueInt()), nil
}

func (p *planner) createDescriptorWithID(
Expand Down
82 changes: 79 additions & 3 deletions pkg/sql/drop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -749,7 +749,7 @@ func TestCommandsWhileTableBeingDropped(t *testing.T) {

params, _ := createTestServerParams()
// Block schema changers so that the table we're about to DROP is not
// actually dropped; it will be left in the "deleted" state.
// actually dropped; it will be left in the "dropped" state.
params.Knobs = base.TestingKnobs{
SQLSchemaChanger: &sql.SchemaChangerTestingKnobs{
SyncFilter: func(tscc sql.TestingSchemaChangerCollection) {
Expand All @@ -774,8 +774,7 @@ CREATE TABLE test.t(a INT PRIMARY KEY);
t.Fatal(err)
}

// Check that SHOW TABLES marks a dropped table with the " (dropped)"
// suffix.
// Check that SHOW TABLES is unable to see the table.
rows, err := db.Query(`SHOW TABLES FROM test`)
if err != nil {
t.Fatal(err)
Expand All @@ -795,3 +794,80 @@ CREATE TABLE test.t(a INT PRIMARY KEY);
t.Fatal(err)
}
}

// Test INSERT after a CREATE following a DROP in the same transaction.
// This test recreates an INSERT running on a different node than the
// one running the DROP-CREATE transaction, with the INSERT running
// after the DROP-CREATE but before the schema change execution for
// the DROP-CREATE. The INSERT can write data into the table that was
// dropped. We also test that an INSERT using a timestamp before the
// DROP-CREATE transaction will write to the old table.
func TestInsertWhileTableBeingDropCreated(t *testing.T) {
defer leaktest.AfterTest(t)()

gossipSem := make(chan struct{}, 1)
params, _ := createTestServerParams()
// Block schema changers so that the table we're about to DROP is not
// actually dropped; it will be left in the "dropped" state.
params.Knobs = base.TestingKnobs{
SQLSchemaChanger: &sql.SchemaChangerTestingKnobs{
SyncFilter: func(tscc sql.TestingSchemaChangerCollection) {
tscc.ClearSchemaChangers()
},
AsyncExecNotification: asyncSchemaChangerDisabled,
},
// We're going to block gossip so it doesn't clear up the leases
SQLLeaseManager: &sql.LeaseManagerTestingKnobs{
GossipUpdateEvent: func(cfg config.SystemConfig) {
gossipSem <- struct{}{}
<-gossipSem
},
},
}
s, db, _ := serverutils.StartServer(t, params)
defer s.Stopper().Stop(context.TODO())

// Block gossip.
gossipSem <- struct{}{}
defer func() {
// Unblock gossip.
<-gossipSem
}()

if _, err := db.Exec(`
CREATE DATABASE test;
CREATE TABLE test.t(a INT PRIMARY KEY);
`); err != nil {
t.Fatal(err)
}

if _, err := db.Exec(`INSERT INTO test.t VALUES (1), (2), (3)`); err != nil {
t.Fatal(err)
}

// Pick a timestamp before the execution of the DROP-CREATE transaction.
tx, err := db.Begin()
if err != nil {
t.Fatal(err)
}

// DROP and then CREATE the table.
if _, err := db.Exec(`BEGIN; DROP TABLE test.t; CREATE TABLE test.t(a INT PRIMARY KEY); COMMIT`); err != nil {
t.Fatal(err)
}

// INSERT tries to inserts an entry into the old table.
if _, err := db.Exec(`INSERT INTO test.t VALUES (1), (2), (3)`); !testutils.IsError(
err, `duplicate key value \(a\)=\(1\) violates unique constraint "primary"`) {
t.Fatal(err)
}

if _, err := tx.Exec(`INSERT INTO test.t VALUES (1), (2), (3)`); !testutils.IsError(
err, `duplicate key value \(a\)=\(1\) violates unique constraint "primary"`) {
t.Fatal(err)
}

if err := tx.Rollback(); err != nil {
t.Fatal(err)
}
}
82 changes: 82 additions & 0 deletions pkg/sql/logictest/testdata/logic_test/drop_table
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,88 @@ DROP TABLE IF EXISTS a
statement ok
CREATE TABLE a (id INT PRIMARY KEY)

statement ok
INSERT INTO a VALUES (5),(6),(8)

query I
SELECT * FROM a
----
5
6
8

# DROP followed by CREATE in the same transaction.
statement ok
BEGIN

statement ok
DROP TABLE a

statement ok
CREATE TABLE a (id INT PRIMARY KEY)

statement ok
INSERT INTO a VALUES (1)

statement ok
COMMIT

statement ok
INSERT INTO a VALUES (2)

query I
SELECT * FROM a
----
1
2

statement ok
INSERT INTO b VALUES (5),(6),(8)

# DROP followed by RENAME in the same transaction.
statement ok
BEGIN

statement ok
DROP TABLE a

statement ok
ALTER TABLE b RENAME TO a

statement ok
INSERT INTO a VALUES (4)

statement ok
COMMIT

query I
SELECT * FROM a
----
4
5
6
8

statement ok
CREATE VIEW v AS select id from a

# DROP VIEW followed by CREATE in the same transaction.
statement ok
BEGIN

statement ok
DROP VIEW v

statement ok
CREATE VIEW v AS select id from a

statement ok
COMMIT

query I
SELECT * FROM v
----
4
5
6
8
10 changes: 8 additions & 2 deletions pkg/sql/rename.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,10 +223,16 @@ func (p *planner) RenameTable(ctx context.Context, n *parser.RenameTable) (planN
// has made sure it's not in use any more.
b := &client.Batch{}
if p.session.Tracing.KVTracingEnabled() {
log.VEventf(ctx, 2, "Put %s -> %s", descKey, descDesc)
log.VEventf(ctx, 2, "CPut %s -> %d", newTbKey, descID)
}
b.Put(descKey, descDesc)

if id, err := getDescID(ctx, p.txn, newTbKey); err == nil && id != sqlbase.InvalidID {
if _, err := p.session.maybeDeleteDescriptorName(ctx, p.txn, newTbKey, id); err != nil {
return nil, err
}
} else if err != nil {
return nil, err
}
b.CPut(newTbKey, descID, nil)

if err := p.txn.Run(ctx, b); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/schema_changer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2046,7 +2046,7 @@ INSERT INTO t.kv VALUES ('a', 'b');
}{
// DROP TABLE followed by CREATE TABLE case.
{`drop-create`, `DROP TABLE t.kv`, `CREATE TABLE t.kv (k CHAR PRIMARY KEY, v CHAR)`,
`relation "kv" already exists`},
``},
// schema change followed by another statement works.
{`createindex-insert`, `CREATE INDEX foo ON t.kv (v)`, `INSERT INTO t.kv VALUES ('c', 'd')`,
``},
Expand Down
62 changes: 62 additions & 0 deletions pkg/sql/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -1385,6 +1385,68 @@ func (s *Session) maybeRecover(action, stmt string) {
}
}

// A descriptor dropped in a transaction has its name to id map
// deleted after the transaction has committed, when the schema
// change executes. This delayed map deletion follows the name being
// deleted from all the descriptor caches across the entire cluster to
// prevent the name from being reused and used illegally.
//
// But if the name needs to be reused during the transaction itself, the
// name has to be deleted before the reuse during the transaction. That
// is done here. One of the ramifications of this is that a new name to id
// is created once the transaction is committed even though the old name
// to id map can be present in a descriptor cache on a node.
// The schema changes that run after the transaction commits purge the
// old name to id map, but before that the name can be used illegally.
//
// Note: statements following the name reuse in the same transaction
// use the new name and descriptor thanks to the use of uncommittedTables
// in TableCollection.
//
// A node using the old mapping sees the name map to the old dropped
// descriptor and can potentially add data into a dropped table.
// This is okay because the dropped table GC is started only once all
// nodes are not holding leases on the old descriptor. A node using the
// old descriptor is notified that the old descriptor is no longer
// valid (dropped), and will release the lease on the old descriptor.
// While acquiring a new lease on the descriptor it will reevaluate
// the name to id map and will start using the new descriptor.
//
// Furthermore, If a table is dropped-created on node A through a
// transaction and a user is running another transaction inserting data
// through node B in a coordinated manner such that the insert is executed
// after the create is complete, the insert will normally add data into
// the new table as long as it also uses a timestamp greater than the
// timestamp of the new descriptor creation transaction. If for some
// reason node B picks a timestamp from the past for the INSERT
// (before the DROP+CREATE transaction), the INSERT will write to the
// old table.
func (s *Session) maybeDeleteDescriptorName(
ctx context.Context, txn *client.Txn, key roachpb.Key, id sqlbase.ID,
) (bool, error) {
// CHECK if the existing descriptor has been dropped in the
// current transaction.
for _, sc := range s.TxnState.schemaChangers.schemaChangers {
if sc.sc.tableID == id {
// the transaction touched upon the descriptor being created.
tableDesc, err := sqlbase.GetTableDescFromID(ctx, txn, id)
if err != nil {
return false, err
}
if tableDesc.Dropped() {
// Use CPut because we want to remove a specific name -> id map.
if s.Tracing.KVTracingEnabled() {
log.VEventf(ctx, 2, "CPut %s -> nil", key)
}
err := txn.CPut(ctx, key, nil, id)
return true, err
}
break
}
}
return false, nil
}

// SessionTracing holds the state used by SET TRACING {ON,OFF,LOCAL} statements in
// the context of one SQL session.
// It holds the current trace being collected (or the last trace collected, if
Expand Down
4 changes: 3 additions & 1 deletion pkg/sql/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -503,7 +503,9 @@ func (tc *TableCollection) getUncommittedDatabaseID(tn *parser.TableName) (sqlba
func (tc *TableCollection) getUncommittedTable(
dbID sqlbase.ID, tn *parser.TableName,
) (*sqlbase.TableDescriptor, error) {
for _, table := range tc.uncommittedTables {
// Walk latest to earliest.
for i := len(tc.uncommittedTables) - 1; i >= 0; i-- {
table := tc.uncommittedTables[i]
if table.Name == string(tn.TableName) &&
table.ParentID == dbID {
if err := filterTableState(table); err != nil {
Expand Down