Skip to content

Commit

Permalink
sql: allow atomic name swaps
Browse files Browse the repository at this point in the history
This commit introduces a new cluster version, keyed as
AvoidDrainingNames, which allows atomic name swaps between two
tables/databases/schemas/types. This means we're no longer populating
the draining names of a descriptor, instead we're updating the namespace
table in the same transaction.

The price to pay for all of this is that there is a period of time
following a name swap in which the name may refer to either of the two
descriptors.

Fixes cockroachdb#54562.

Release note (sql change): It's now possible to swap names (for tables,
etc.) in the same transaction. For example:

  CREATE TABLE foo();
  BEGIN;
  ALTER TABLE foo RENAME TO bar;
  CREATE TABLE foo();
  COMMIT;

Previously, we'd be getting a "relation ... already exists" error.
  • Loading branch information
Marius Posta committed Nov 3, 2021
1 parent fc7a77a commit 1b7f602
Show file tree
Hide file tree
Showing 37 changed files with 286 additions and 701 deletions.
2 changes: 1 addition & 1 deletion docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -168,4 +168,4 @@ trace.debug.enable boolean false if set, traces for recent requests can be seen
trace.jaeger.agent string the address of a Jaeger agent to receive traces using the Jaeger UDP Thrift protocol, as <host>:<port>. If no port is specified, 6381 will be used.
trace.opentelemetry.collector string address of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as <host>:<port>. If no port is specified, 4317 will be used.
trace.zipkin.collector string the address of a Zipkin instance to receive traces, as <host>:<port>. If no port is specified, 9411 will be used.
version version 21.2-4 set the active cluster version in the format '<major>.<minor>'
version version 21.2-6 set the active cluster version in the format '<major>.<minor>'
2 changes: 1 addition & 1 deletion docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,6 @@
<tr><td><code>trace.jaeger.agent</code></td><td>string</td><td><code></code></td><td>the address of a Jaeger agent to receive traces using the Jaeger UDP Thrift protocol, as <host>:<port>. If no port is specified, 6381 will be used.</td></tr>
<tr><td><code>trace.opentelemetry.collector</code></td><td>string</td><td><code></code></td><td>address of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as <host>:<port>. If no port is specified, 4317 will be used.</td></tr>
<tr><td><code>trace.zipkin.collector</code></td><td>string</td><td><code></code></td><td>the address of a Zipkin instance to receive traces, as <host>:<port>. If no port is specified, 9411 will be used.</td></tr>
<tr><td><code>version</code></td><td>version</td><td><code>21.2-4</code></td><td>set the active cluster version in the format '<major>.<minor>'</td></tr>
<tr><td><code>version</code></td><td>version</td><td><code>21.2-6</code></td><td>set the active cluster version in the format '<major>.<minor>'</td></tr>
</tbody>
</table>
24 changes: 12 additions & 12 deletions pkg/bench/rttanalysis/testdata/benchmark_expectations
Original file line number Diff line number Diff line change
Expand Up @@ -32,21 +32,21 @@ exp,benchmark
20,CreateRole/create_role_with_3_options
18,CreateRole/create_role_with_no_options
19,DropDatabase/drop_database_0_tables
26,DropDatabase/drop_database_1_table
33,DropDatabase/drop_database_2_tables
40,DropDatabase/drop_database_3_tables
27,DropDatabase/drop_database_1_table
35,DropDatabase/drop_database_2_tables
43,DropDatabase/drop_database_3_tables
24,DropRole/drop_1_role
31,DropRole/drop_2_roles
38,DropRole/drop_3_roles
17,DropSequence/drop_1_sequence
24,DropSequence/drop_2_sequences
31,DropSequence/drop_3_sequences
19,DropTable/drop_1_table
27,DropTable/drop_2_tables
35,DropTable/drop_3_tables
20,DropView/drop_1_view
28,DropView/drop_2_views
36,DropView/drop_3_views
18,DropSequence/drop_1_sequence
26,DropSequence/drop_2_sequences
34,DropSequence/drop_3_sequences
20,DropTable/drop_1_table
29,DropTable/drop_2_tables
38,DropTable/drop_3_tables
21,DropView/drop_1_view
30,DropView/drop_2_views
39,DropView/drop_3_views
16,Grant/grant_all_on_1_table
18,Grant/grant_all_on_2_tables
20,Grant/grant_all_on_3_tables
Expand Down
19 changes: 9 additions & 10 deletions pkg/ccl/backupccl/restore_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -2109,20 +2109,19 @@ func (r *restoreResumer) OnFailOrCancel(ctx context.Context, execCtx interface{}
}
}

if err := r.dropDescriptors(ctx, execCfg.JobRegistry, execCfg.Codec, txn, descsCol); err != nil {
return err
}

if details.DescriptorCoverage == tree.AllDescriptors {
// The temporary system table descriptors should already have been dropped
// in `dropDescriptors` but we still need to drop the temporary system db.
return r.cleanupTempSystemTables(ctx, txn)
}
return nil
return r.dropDescriptors(ctx, execCfg.JobRegistry, execCfg.Codec, txn, descsCol)
}); err != nil {
return err
}

if details.DescriptorCoverage == tree.AllDescriptors {
// The temporary system table descriptors should already have been dropped
// in `dropDescriptors` but we still need to drop the temporary system db.
if err := execCfg.DB.Txn(ctx, r.cleanupTempSystemTables); err != nil {
return err
}
}

// Emit to the event log that the job has completed reverting.
emitRestoreJobEvent(ctx, p, jobs.StatusFailed, r.job)
return nil
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/importccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ go_library(
"//pkg/ccl/storageccl",
"//pkg/ccl/utilccl",
"//pkg/cloud",
"//pkg/clusterversion",
"//pkg/col/coldata",
"//pkg/featureflag",
"//pkg/jobs",
Expand Down
37 changes: 23 additions & 14 deletions pkg/ccl/importccl/import_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/ccl/backupccl"
"github.com/cockroachdb/cockroach/pkg/ccl/utilccl"
"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/keys"
Expand Down Expand Up @@ -1485,25 +1486,33 @@ func (r *importResumer) dropSchemas(
return nil, errors.Newf("unable to resolve schema desc with ID %d", schema.Desc.ID)
}

//lint:ignore SA1019 deprecated method call is OK
schemaDesc.AddDrainingName(descpb.NameInfo{
ParentID: details.ParentID,
ParentSchemaID: keys.RootNamespaceID,
Name: schemaDesc.Name,
})

// Update the parent database with information about the dropped schema.
if dbDesc.Schemas == nil {
dbDesc.Schemas = make(map[string]descpb.DatabaseDescriptor_SchemaInfo)
}
dbDesc.Schemas[schema.Desc.Name] = descpb.DatabaseDescriptor_SchemaInfo{ID: dbDesc.ID,
Dropped: true}

// Mark the descriptor as dropped and write it to the batch.
// Delete namespace entry or update draining names depending on version.

schemaDesc.SetDropped()
droppedSchemaIDs = append(droppedSchemaIDs, schemaDesc.GetID())

b := txn.NewBatch()
// TODO(postamar): remove version gate and else-block in 22.2
if execCfg.Settings.Version.IsActive(ctx, clusterversion.AvoidDrainingNames) {
if dbDesc.Schemas != nil {
delete(dbDesc.Schemas, schemaDesc.GetName())
}
b.Del(catalogkeys.EncodeNameKey(p.ExecCfg().Codec, schemaDesc))
} else {
//lint:ignore SA1019 removal of deprecated method call scheduled for 22.2
schemaDesc.AddDrainingName(descpb.NameInfo{
ParentID: details.ParentID,
ParentSchemaID: keys.RootNamespaceID,
Name: schemaDesc.Name,
})
// Update the parent database with information about the dropped schema.
if dbDesc.Schemas == nil {
dbDesc.Schemas = make(map[string]descpb.DatabaseDescriptor_SchemaInfo)
}
dbDesc.Schemas[schema.Desc.Name] = descpb.DatabaseDescriptor_SchemaInfo{ID: dbDesc.ID, Dropped: true}
}

if err := descsCol.WriteDescToBatch(ctx, p.ExtendedEvalContext().Tracing.KVTracingEnabled(),
schemaDesc, b); err != nil {
return nil, err
Expand Down
7 changes: 7 additions & 0 deletions pkg/clusterversion/cockroach_versions.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,9 @@ const (
// requires the limit to always be overshot in order to properly enforce
// limits when splitting requests.
TargetBytesAvoidExcess
// AvoidDrainingNames avoids using the draining_names field when renaming or
// dropping descriptors.
AvoidDrainingNames

// *************************************************
// Step (1): Add new versions here.
Expand Down Expand Up @@ -473,6 +476,10 @@ var versionsSingleton = keyedVersions{
Key: TargetBytesAvoidExcess,
Version: roachpb.Version{Major: 21, Minor: 2, Internal: 4},
},
{
Key: AvoidDrainingNames,
Version: roachpb.Version{Major: 21, Minor: 2, Internal: 6},
},

// *************************************************
// Step (2): Add new versions here.
Expand Down
5 changes: 3 additions & 2 deletions pkg/clusterversion/key_string.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pkg/sql/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ go_library(
"lookup_join.go",
"max_one_row.go",
"mem_metrics.go",
"name_util.go",
"notice.go",
"opaque.go",
"opt_catalog.go",
Expand Down
38 changes: 19 additions & 19 deletions pkg/sql/alter_schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,9 @@ import (
"context"
"fmt"

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/security"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkeys"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/dbdesc"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/schemadesc"
Expand All @@ -27,7 +26,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sqlerrors"
"github.com/cockroachdb/cockroach/pkg/sql/sqltelemetry"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/log/eventpb"
"github.com/cockroachdb/errors"
)
Expand Down Expand Up @@ -194,6 +192,12 @@ func (p *planner) setNewSchemaOwner(
func (p *planner) renameSchema(
ctx context.Context, db *dbdesc.Mutable, desc *schemadesc.Mutable, newName string, jobDesc string,
) error {
oldNameKey := descpb.NameInfo{
ParentID: desc.GetParentID(),
ParentSchemaID: desc.GetParentSchemaID(),
Name: desc.GetName(),
}

// Check that there isn't a name collision with the new name.
found, _, err := schemaExists(ctx, p.txn, p.ExecCfg().Codec, db.ID, newName)
if err != nil {
Expand All @@ -209,21 +213,12 @@ func (p *planner) renameSchema(
}

// Set the new name for the descriptor.
oldName := desc.Name
desc.AddDrainingName(descpb.NameInfo{
ParentID: desc.ParentID,
ParentSchemaID: keys.RootNamespaceID,
Name: desc.Name,
})
oldName := oldNameKey.GetName()
desc.SetName(newName)

// Write a new namespace entry for the new name.
nameKey := catalogkeys.MakeSchemaNameKey(p.execCfg.Codec, desc.ParentID, newName)
// Write the new name and remove the old name.
b := p.txn.NewBatch()
if p.ExtendedEvalContext().Tracing.KVTracingEnabled() {
log.VEventf(ctx, 2, "CPut %s -> %d", nameKey, desc.ID)
}
b.CPut(nameKey, desc.ID, nil)
p.renameNamespaceEntry(ctx, b, oldNameKey, desc)
if err := p.txn.Run(ctx, b); err != nil {
return err
}
Expand All @@ -247,11 +242,16 @@ func (p *planner) renameSchema(
)
}

// Mark the old schema name as dropped.
db.Schemas[oldName] = descpb.DatabaseDescriptor_SchemaInfo{
ID: desc.ID,
Dropped: true,
// Remove the old schema name or mark it as dropped, depending on version.
if p.execCfg.Settings.Version.IsActive(ctx, clusterversion.AvoidDrainingNames) {
delete(db.Schemas, oldName)
} else {
db.Schemas[oldName] = descpb.DatabaseDescriptor_SchemaInfo{
ID: desc.ID,
Dropped: true,
}
}

// Create an entry for the new schema name.
db.Schemas[newName] = descpb.DatabaseDescriptor_SchemaInfo{ID: desc.ID}
if err := p.writeNonDropDatabaseChange(
Expand Down
25 changes: 12 additions & 13 deletions pkg/sql/alter_table_set_schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,11 @@ func (n *alterTableSetSchemaNode) startExec(params runParams) error {
ctx := params.ctx
p := params.p
tableDesc := n.tableDesc
schemaID := tableDesc.GetParentSchemaID()
databaseID := tableDesc.GetParentID()
oldNameKey := descpb.NameInfo{
ParentID: tableDesc.GetParentID(),
ParentSchemaID: tableDesc.GetParentSchemaID(),
Name: tableDesc.GetName(),
}

kind := tree.GetTableType(tableDesc.IsSequence(), tableDesc.IsView(), tableDesc.GetIsMaterializedView())
oldName := tree.MakeTableNameFromPrefix(n.prefix.NamePrefix(), tree.Name(n.tableDesc.GetName()))
Expand All @@ -114,38 +117,34 @@ func (n *alterTableSetSchemaNode) startExec(params runParams) error {

// If the schema being changed to is the same as the current schema for the
// table, do a no-op.
if desiredSchemaID == schemaID {
if desiredSchemaID == oldNameKey.GetParentSchemaID() {
return nil
}

// TODO(ajwerner): Use the collection here.
exists, _, err := catalogkv.LookupObjectID(
ctx, p.txn, p.ExecCfg().Codec, databaseID, desiredSchemaID, tableDesc.Name,
ctx, p.txn, p.ExecCfg().Codec, tableDesc.GetParentID(), desiredSchemaID, tableDesc.GetName(),
)
if err == nil && exists {
return pgerror.Newf(pgcode.DuplicateRelation,
"relation %s already exists in schema %s", tableDesc.Name, n.newSchema)
"relation %s already exists in schema %s", tableDesc.GetName(), n.newSchema)
} else if err != nil {
return err
}

renameDetails := descpb.NameInfo{
ParentID: databaseID,
ParentSchemaID: schemaID,
Name: tableDesc.Name,
}
tableDesc.AddDrainingName(renameDetails)

// Set the tableDesc's new schema id to the desired schema's id.
tableDesc.SetParentSchemaID(desiredSchemaID)

b := p.txn.NewBatch()
p.renameNamespaceEntry(ctx, b, oldNameKey, tableDesc)

if err := p.writeSchemaChange(
ctx, tableDesc, descpb.InvalidMutationID, tree.AsStringWithFQNames(n.n, params.Ann()),
); err != nil {
return err
}

if err := p.writeNameKey(ctx, tableDesc, tableDesc.ID); err != nil {
if err := p.txn.Run(ctx, b); err != nil {
return err
}

Expand Down
26 changes: 15 additions & 11 deletions pkg/sql/alter_type.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,23 +275,27 @@ func (p *planner) performRenameTypeDesc(
newSchemaID descpb.ID,
jobDesc string,
) error {
// Record the rename details in the descriptor for draining.
name := descpb.NameInfo{
ParentID: desc.ParentID,
ParentSchemaID: desc.ParentSchemaID,
Name: desc.Name,
oldNameKey := descpb.NameInfo{
ParentID: desc.GetParentID(),
ParentSchemaID: desc.GetParentSchemaID(),
Name: desc.GetName(),
}
desc.AddDrainingName(name)

// Set the descriptor up with the new name.
desc.Name = newName
// Set the descriptor to the new schema ID.
// Update the type descriptor with the new name and new schema ID.
desc.SetName(newName)
desc.SetParentSchemaID(newSchemaID)

// Populate the namespace update batch.
b := p.txn.NewBatch()
p.renameNamespaceEntry(ctx, b, oldNameKey, desc)

// Write the updated type descriptor.
if err := p.writeTypeSchemaChange(ctx, desc, jobDesc); err != nil {
return err
}
// Write the new namespace key.
return p.writeNameKey(ctx, desc, desc.ID)

// Run the namespace update batch.
return p.txn.Run(ctx, b)
}

func (p *planner) renameTypeValue(
Expand Down
5 changes: 3 additions & 2 deletions pkg/sql/catalog/lease/lease_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -563,6 +563,8 @@ CREATE TABLE test.t(a INT PRIMARY KEY);
t.Fatal(err)
}

tableDesc := catalogkv.TestingGetTableDescriptor(t.kvDB, keys.SystemSQLCodec, "test", "t")

// Block schema changers so that the table we're about to DROP is not actually
// dropped; it will be left in a "deleted" state.
mu.Lock()
Expand All @@ -576,10 +578,9 @@ CREATE TABLE test.t(a INT PRIMARY KEY);
}

// Make sure we can't get a lease on the descriptor.
tableDesc := catalogkv.TestingGetTableDescriptor(t.kvDB, keys.SystemSQLCodec, "test", "t")
// try to acquire at a bogus version to make sure we don't get back a lease we
// already had.
_, err = t.acquireMinVersion(1, tableDesc.GetID(), tableDesc.GetVersion()+1)
_, err = t.acquireMinVersion(1, tableDesc.GetID(), tableDesc.GetVersion()+123)
if !testutils.IsError(err, "descriptor is being dropped") {
t.Fatalf("got a different error than expected: %v", err)
}
Expand Down
5 changes: 5 additions & 0 deletions pkg/sql/catalog/typedesc/type_desc.go
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,11 @@ func (desc *Mutable) AddDrainingName(name descpb.NameInfo) {
desc.DrainingNames = append(desc.DrainingNames, name)
}

// SetName sets the TypeDescriptor's name.
func (desc *Mutable) SetName(name string) {
desc.Name = name
}

// EnumMembers is a sortable list of TypeDescriptor_EnumMember, sorted by the
// physical representation.
type EnumMembers []descpb.TypeDescriptor_EnumMember
Expand Down
Loading

0 comments on commit 1b7f602

Please sign in to comment.