diff --git a/pkg/ccl/backupccl/backup_metadata_test.go b/pkg/ccl/backupccl/backup_metadata_test.go index 47febbea773a..51225caa164f 100644 --- a/pkg/ccl/backupccl/backup_metadata_test.go +++ b/pkg/ccl/backupccl/backup_metadata_test.go @@ -25,7 +25,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/security/username" "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" - "github.com/cockroachdb/cockroach/pkg/sql/catalog/descs" + "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/sql/stats" "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" "github.com/cockroachdb/cockroach/pkg/util/hlc" @@ -88,7 +88,7 @@ func checkMetadata( blobs.TestEmptyBlobClientFactory, username.RootUserName(), tc.Servers[0].InternalExecutor().(*sql.InternalExecutor), - tc.Servers[0].CollectionFactory().(*descs.CollectionFactory), + tc.Servers[0].InternalExecutorFactory().(sqlutil.InternalExecutorFactory), tc.Servers[0].DB(), nil, /* limiters */ ) diff --git a/pkg/ccl/backupccl/backup_test.go b/pkg/ccl/backupccl/backup_test.go index 0d37b2b3d58b..52910f9c7980 100644 --- a/pkg/ccl/backupccl/backup_test.go +++ b/pkg/ccl/backupccl/backup_test.go @@ -573,7 +573,7 @@ func TestBackupRestoreAppend(t *testing.T) { blobs.TestEmptyBlobClientFactory, username.RootUserName(), tc.Servers[0].InternalExecutor().(*sql.InternalExecutor), - tc.Servers[0].CollectionFactory().(*descs.CollectionFactory), + tc.Servers[0].InternalExecutorFactory().(sqlutil.InternalExecutorFactory), tc.Servers[0].DB(), nil, /* limiters */ ) @@ -8027,7 +8027,7 @@ func TestReadBackupManifestMemoryMonitoring(t *testing.T) { blobs.TestBlobServiceClient(dir), username.RootUserName(), nil, /* ie */ - nil, /* cf */ + nil, /* ief */ nil, /* kvDB */ nil, /* limiters */ ) diff --git a/pkg/ccl/backupccl/restore_data_processor_test.go b/pkg/ccl/backupccl/restore_data_processor_test.go index 529a493e934d..306ba223b37b 100644 --- a/pkg/ccl/backupccl/restore_data_processor_test.go +++ b/pkg/ccl/backupccl/restore_data_processor_test.go @@ -255,7 +255,7 @@ func runTestIngest(t *testing.T, init func(*cluster.Settings)) { return cloud.MakeExternalStorage(ctx, dest, base.ExternalIODirConfig{}, s.ClusterSettings(), blobs.TestBlobServiceClient(s.ClusterSettings().ExternalIODir), nil, /* ie */ - nil, /* cf */ + nil, /* ief */ nil, /* kvDB */ nil, /* limiters */ opts...) diff --git a/pkg/ccl/backupccl/restore_job.go b/pkg/ccl/backupccl/restore_job.go index 6f91d17a07c7..2a360bfaf42e 100644 --- a/pkg/ccl/backupccl/restore_job.go +++ b/pkg/ccl/backupccl/restore_job.go @@ -2223,7 +2223,7 @@ func (r *restoreResumer) OnFailOrCancel( logJobCompletion(ctx, restoreJobEventType, r.job.ID(), false, jobErr) execCfg := execCtx.(sql.JobExecContext).ExecCfg() - if err := execCfg.CollectionFactory.TxnWithExecutor(ctx, execCfg.DB, p.SessionData(), func( + if err := execCfg.InternalExecutorFactory.DescsTxnWithExecutor(ctx, execCfg.DB, p.SessionData(), func( ctx context.Context, txn *kv.Txn, descsCol *descs.Collection, ie sqlutil.InternalExecutor, ) error { for _, tenant := range details.Tenants { diff --git a/pkg/ccl/backupccl/restore_planning.go b/pkg/ccl/backupccl/restore_planning.go index 061afb7c38c7..2c38c27f71ec 100644 --- a/pkg/ccl/backupccl/restore_planning.go +++ b/pkg/ccl/backupccl/restore_planning.go @@ -706,7 +706,7 @@ func getDatabaseIDAndDesc( // as regular databases, we drop them before restoring them again in the // restore. func dropDefaultUserDBs(ctx context.Context, execCfg *sql.ExecutorConfig) error { - return execCfg.CollectionFactory.TxnWithExecutor(ctx, execCfg.DB, nil /* session data */, func( + return execCfg.InternalExecutorFactory.DescsTxnWithExecutor(ctx, execCfg.DB, nil /* session data */, func( ctx context.Context, txn *kv.Txn, _ *descs.Collection, ie sqlutil.InternalExecutor, ) error { _, err := ie.Exec(ctx, "drop-defaultdb", txn, "DROP DATABASE IF EXISTS defaultdb") diff --git a/pkg/ccl/changefeedccl/schemafeed/schema_feed.go b/pkg/ccl/changefeedccl/schemafeed/schema_feed.go index e1dcce9884c6..29a8078adabe 100644 --- a/pkg/ccl/changefeedccl/schemafeed/schema_feed.go +++ b/pkg/ccl/changefeedccl/schemafeed/schema_feed.go @@ -84,15 +84,16 @@ func New( tolerances changefeedbase.CanHandle, ) SchemaFeed { m := &schemaFeed{ - filter: schemaChangeEventFilters[events], - db: cfg.DB, - clock: cfg.DB.Clock(), - settings: cfg.Settings, - targets: targets, - leaseMgr: cfg.LeaseManager.(*lease.Manager), - collectionFactory: cfg.CollectionFactory, - metrics: metrics, - tolerances: tolerances, + filter: schemaChangeEventFilters[events], + db: cfg.DB, + clock: cfg.DB.Clock(), + settings: cfg.Settings, + targets: targets, + leaseMgr: cfg.LeaseManager.(*lease.Manager), + collectionFactory: cfg.CollectionFactory, + internalExecutorFactory: cfg.InternalExecutorFactory, + metrics: metrics, + tolerances: tolerances, } m.mu.previousTableVersion = make(map[descpb.ID]catalog.TableDescriptor) m.mu.highWater = initialHighwater @@ -122,8 +123,9 @@ type schemaFeed struct { // TODO(ajwerner): Should this live underneath the FilterFunc? // Should there be another function to decide whether to update the // lease manager? - leaseMgr *lease.Manager - collectionFactory *descs.CollectionFactory + leaseMgr *lease.Manager + collectionFactory *descs.CollectionFactory + internalExecutorFactory descs.TxnManager mu struct { syncutil.Mutex @@ -291,7 +293,7 @@ func (tf *schemaFeed) primeInitialTableDescs(ctx context.Context) error { }) } - if err := tf.collectionFactory.Txn(ctx, tf.db, initialTableDescsFn); err != nil { + if err := tf.internalExecutorFactory.DescsTxn(ctx, tf.db, initialTableDescsFn); err != nil { return err } diff --git a/pkg/ccl/changefeedccl/sink_cloudstorage_test.go b/pkg/ccl/changefeedccl/sink_cloudstorage_test.go index dbad01d047c4..e925476068ca 100644 --- a/pkg/ccl/changefeedccl/sink_cloudstorage_test.go +++ b/pkg/ccl/changefeedccl/sink_cloudstorage_test.go @@ -176,7 +176,7 @@ func TestCloudStorageSink(t *testing.T) { clientFactory, user, nil, /* ie */ - nil, /* cf */ + nil, /* ief */ nil, /* kvDB */ nil, /* limiters */ opts...) diff --git a/pkg/ccl/storageccl/BUILD.bazel b/pkg/ccl/storageccl/BUILD.bazel index 6ed03b91393f..29df0c0170da 100644 --- a/pkg/ccl/storageccl/BUILD.bazel +++ b/pkg/ccl/storageccl/BUILD.bazel @@ -46,7 +46,7 @@ go_test( "//pkg/security/username", "//pkg/server", "//pkg/sql", - "//pkg/sql/catalog/descs", + "//pkg/sql/sqlutil", "//pkg/storage", "//pkg/testutils", "//pkg/testutils/serverutils", diff --git a/pkg/ccl/storageccl/external_sst_reader_test.go b/pkg/ccl/storageccl/external_sst_reader_test.go index 9525ad84fd75..76b0fae120d0 100644 --- a/pkg/ccl/storageccl/external_sst_reader_test.go +++ b/pkg/ccl/storageccl/external_sst_reader_test.go @@ -20,7 +20,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/security/username" "github.com/cockroachdb/cockroach/pkg/sql" - "github.com/cockroachdb/cockroach/pkg/sql/catalog/descs" + "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/storageutils" @@ -124,7 +124,7 @@ func TestNewExternalSSTReader(t *testing.T) { blobs.TestBlobServiceClient(tempDir), username.RootUserName(), tc.Servers[0].InternalExecutor().(*sql.InternalExecutor), - tc.Servers[0].CollectionFactory().(*descs.CollectionFactory), + tc.Servers[0].InternalExecutorFactory().(sqlutil.InternalExecutorFactory), tc.Servers[0].DB(), nil, /* limiters */ ) diff --git a/pkg/ccl/workloadccl/storage.go b/pkg/ccl/workloadccl/storage.go index 6856a6e10f0d..3d34a7d936e3 100644 --- a/pkg/ccl/workloadccl/storage.go +++ b/pkg/ccl/workloadccl/storage.go @@ -43,7 +43,7 @@ func GetStorage(ctx context.Context, cfg FixtureConfig) (cloud.ExternalStorage, nil, /* blobClientFactory */ username.SQLUsername{}, nil, /* ie */ - nil, /* cf */ + nil, /* ief */ nil, /* kvDB */ nil, /* limiters */ ) diff --git a/pkg/cloud/BUILD.bazel b/pkg/cloud/BUILD.bazel index d80a2d38922a..130caa7f945e 100644 --- a/pkg/cloud/BUILD.bazel +++ b/pkg/cloud/BUILD.bazel @@ -23,7 +23,6 @@ go_library( "//pkg/security/username", "//pkg/settings", "//pkg/settings/cluster", - "//pkg/sql/catalog/descs", "//pkg/sql/sqlutil", "//pkg/util/ctxgroup", "//pkg/util/ioctx", diff --git a/pkg/cloud/amazon/s3_storage_test.go b/pkg/cloud/amazon/s3_storage_test.go index ac983b9ef0ea..397168657fa8 100644 --- a/pkg/cloud/amazon/s3_storage_test.go +++ b/pkg/cloud/amazon/s3_storage_test.go @@ -48,7 +48,7 @@ func makeS3Storage( s, err := cloud.MakeExternalStorage(ctx, conf, base.ExternalIODirConfig{}, testSettings, clientFactory, nil, /* ie */ - nil, /* cf */ + nil, /* ief */ nil, /* kvDB */ nil, /* limiters */ ) @@ -83,7 +83,7 @@ func TestPutS3(t *testing.T) { "backup-test-default"), base.ExternalIODirConfig{}, testSettings, blobs.TestEmptyBlobClientFactory, user, nil, /* ie */ - nil, /* cf */ + nil, /* ief */ nil, /* kvDB */ nil, /* limiters */ ) @@ -112,7 +112,7 @@ func TestPutS3(t *testing.T) { cloud.AuthParam, cloud.AuthParamImplicit, ), false, user, nil, /* ie */ - nil, /* cf */ + nil, /* ief */ nil, /* kvDB */ testSettings) }) @@ -122,11 +122,15 @@ func TestPutS3(t *testing.T) { ) cloudtestutils.CheckExportStore(t, uri, false, user, nil, /* ie */ - nil, /* cf */ + nil, /* ief */ nil, /* kvDB */ testSettings, ) - cloudtestutils.CheckListFiles(t, uri, user, nil, nil, nil, testSettings) + cloudtestutils.CheckListFiles(t, uri, user, + nil, /* ie */ + nil, /* ief */ + nil, /* kvDB */ + testSettings) }) // Tests that we can put an object with server side encryption specified. @@ -151,7 +155,7 @@ func TestPutS3(t *testing.T) { false, user, nil, /* ie */ - nil, /* cf */ + nil, /* ief */ nil, /* kvDB */ testSettings, ) @@ -169,7 +173,7 @@ func TestPutS3(t *testing.T) { false, user, nil, /* ie */ - nil, /* cf */ + nil, /* ief */ nil, /* kvDB */ testSettings) }) @@ -242,13 +246,13 @@ func TestPutS3AssumeRole(t *testing.T) { ) cloudtestutils.CheckExportStore(t, uri, false, user, nil, /* ie */ - nil, /* cf */ + nil, /* ief */ nil, /* kvDB */ testSettings, ) cloudtestutils.CheckListFiles(t, uri, user, nil, /* ie */ - nil, /* cf */ + nil, /* ief */ nil, /* kvDB */ testSettings, ) @@ -260,13 +264,13 @@ func TestPutS3AssumeRole(t *testing.T) { ) cloudtestutils.CheckExportStore(t, uri, false, user, nil, /* ie */ - nil, /* cf */ + nil, /* ief */ nil, /* kvDB */ testSettings, ) cloudtestutils.CheckListFiles(t, uri, user, nil, /* ie */ - nil, /* cf */ + nil, /* ief */ nil, /* kvDB */ testSettings, ) @@ -301,7 +305,7 @@ func TestPutS3AssumeRole(t *testing.T) { ) cloudtestutils.CheckNoPermission(t, roleURI, user, nil, /* ie */ - nil, /* cf */ + nil, /* ief */ nil, /* kvDB */ testSettings, ) @@ -321,7 +325,7 @@ func TestPutS3AssumeRole(t *testing.T) { cloudtestutils.CheckExportStore(t, uri, false, user, nil, /* ie */ - nil, /* cf */ + nil, /* ief */ nil, /* kvDB */ testSettings, ) @@ -365,7 +369,7 @@ func TestPutS3Endpoint(t *testing.T) { cloudtestutils.CheckExportStore(t, u.String(), false, user, nil, /* ie */ - nil, /* cf */ + nil, /* ief */ nil, /* kvDB */ testSettings, ) @@ -441,7 +445,7 @@ func TestS3BucketDoesNotExist(t *testing.T) { s, err := cloud.MakeExternalStorage(ctx, conf, base.ExternalIODirConfig{}, testSettings, clientFactory, nil, /* ie */ - nil, /* cf */ + nil, /* ief */ nil, /* kvDB */ nil, /* limiters */ ) diff --git a/pkg/cloud/azure/azure_storage_test.go b/pkg/cloud/azure/azure_storage_test.go index 9517afa25e16..40cc14e7a59a 100644 --- a/pkg/cloud/azure/azure_storage_test.go +++ b/pkg/cloud/azure/azure_storage_test.go @@ -69,13 +69,13 @@ func TestAzure(t *testing.T) { cloudtestutils.CheckExportStore(t, cfg.filePath("backup-test"), false, username.RootUserName(), nil, /* ie */ - nil, /* cf */ + nil, /* ief */ nil, /* kvDB */ testSettings, ) cloudtestutils.CheckListFiles(t, cfg.filePath("listing-test"), username.RootUserName(), nil, /* ie */ - nil, /* cf */ + nil, /* ief */ nil, /* kvDB */ testSettings, ) diff --git a/pkg/cloud/cloudtestutils/BUILD.bazel b/pkg/cloud/cloudtestutils/BUILD.bazel index d99cbebc2797..db24a23ab80b 100644 --- a/pkg/cloud/cloudtestutils/BUILD.bazel +++ b/pkg/cloud/cloudtestutils/BUILD.bazel @@ -14,7 +14,6 @@ go_library( "//pkg/kv", "//pkg/security/username", "//pkg/settings/cluster", - "//pkg/sql/catalog/descs", "//pkg/sql/sqlutil", "//pkg/util/ioctx", "//pkg/util/randutil", diff --git a/pkg/cloud/cloudtestutils/cloud_test_helpers.go b/pkg/cloud/cloudtestutils/cloud_test_helpers.go index 6a7028418569..ea3f1cc14957 100644 --- a/pkg/cloud/cloudtestutils/cloud_test_helpers.go +++ b/pkg/cloud/cloudtestutils/cloud_test_helpers.go @@ -32,7 +32,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/security/username" "github.com/cockroachdb/cockroach/pkg/settings/cluster" - "github.com/cockroachdb/cockroach/pkg/sql/catalog/descs" "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/util/ioctx" "github.com/cockroachdb/cockroach/pkg/util/randutil" @@ -109,7 +108,7 @@ func storeFromURI( clientFactory blobs.BlobClientFactory, user username.SQLUsername, ie sqlutil.InternalExecutor, - cf *descs.CollectionFactory, + ief sqlutil.InternalExecutorFactory, kvDB *kv.DB, testSettings *cluster.Settings, ) cloud.ExternalStorage { @@ -119,7 +118,7 @@ func storeFromURI( } // Setup a sink for the given args. s, err := cloud.MakeExternalStorage(ctx, conf, base.ExternalIODirConfig{}, testSettings, - clientFactory, ie, cf, kvDB, nil) + clientFactory, ie, ief, kvDB, nil) if err != nil { t.Fatal(err) } @@ -133,7 +132,7 @@ func CheckExportStore( skipSingleFile bool, user username.SQLUsername, ie sqlutil.InternalExecutor, - cf *descs.CollectionFactory, + ief sqlutil.InternalExecutorFactory, kvDB *kv.DB, testSettings *cluster.Settings, ) { @@ -148,7 +147,7 @@ func CheckExportStore( // Setup a sink for the given args. clientFactory := blobs.TestBlobServiceClient(testSettings.ExternalIODir) s, err := cloud.MakeExternalStorage(ctx, conf, ioConf, testSettings, clientFactory, - ie, cf, kvDB, nil) + ie, ief, kvDB, nil) if err != nil { t.Fatal(err) } @@ -256,7 +255,7 @@ func CheckExportStore( t.Fatal(err) } singleFile := storeFromURI(ctx, t, appendPath(t, storeURI, testingFilename), clientFactory, - user, ie, cf, kvDB, testSettings) + user, ie, ief, kvDB, testSettings) defer singleFile.Close() res, err := singleFile.ReadFile(ctx, "") @@ -277,7 +276,7 @@ func CheckExportStore( t.Run("write-single-file-by-uri", func(t *testing.T) { const testingFilename = "B" singleFile := storeFromURI(ctx, t, appendPath(t, storeURI, testingFilename), clientFactory, - user, ie, cf, kvDB, testSettings) + user, ie, ief, kvDB, testSettings) defer singleFile.Close() if err := cloud.WriteFile(ctx, singleFile, "", bytes.NewReader([]byte("bbb"))); err != nil { @@ -308,7 +307,7 @@ func CheckExportStore( if err := cloud.WriteFile(ctx, s, testingFilename, bytes.NewReader([]byte("aaa"))); err != nil { t.Fatal(err) } - singleFile := storeFromURI(ctx, t, storeURI, clientFactory, user, ie, cf, kvDB, testSettings) + singleFile := storeFromURI(ctx, t, storeURI, clientFactory, user, ie, ief, kvDB, testSettings) defer singleFile.Close() // Read a valid file. @@ -350,11 +349,11 @@ func CheckListFiles( storeURI string, user username.SQLUsername, ie sqlutil.InternalExecutor, - cf *descs.CollectionFactory, + ief sqlutil.InternalExecutorFactory, kvDB *kv.DB, testSettings *cluster.Settings, ) { - CheckListFilesCanonical(t, storeURI, "", user, ie, cf, kvDB, testSettings) + CheckListFilesCanonical(t, storeURI, "", user, ie, ief, kvDB, testSettings) } // CheckListFilesCanonical is like CheckListFiles but takes a canonical prefix @@ -366,7 +365,7 @@ func CheckListFilesCanonical( canonical string, user username.SQLUsername, ie sqlutil.InternalExecutor, - cf *descs.CollectionFactory, + ief sqlutil.InternalExecutorFactory, kvDB *kv.DB, testSettings *cluster.Settings, ) { @@ -380,7 +379,7 @@ func CheckListFilesCanonical( clientFactory := blobs.TestBlobServiceClient(testSettings.ExternalIODir) for _, fileName := range fileNames { - file := storeFromURI(ctx, t, storeURI, clientFactory, user, ie, cf, kvDB, testSettings) + file := storeFromURI(ctx, t, storeURI, clientFactory, user, ie, ief, kvDB, testSettings) if err := cloud.WriteFile(ctx, file, fileName, bytes.NewReader([]byte("bbb"))); err != nil { t.Fatal(err) } @@ -468,7 +467,7 @@ func CheckListFilesCanonical( }, } { t.Run(tc.name, func(t *testing.T) { - s := storeFromURI(ctx, t, tc.uri, clientFactory, user, ie, cf, kvDB, testSettings) + s := storeFromURI(ctx, t, tc.uri, clientFactory, user, ie, ief, kvDB, testSettings) var actual []string require.NoError(t, s.List(ctx, tc.prefix, tc.delimiter, func(f string) error { actual = append(actual, f) @@ -481,7 +480,7 @@ func CheckListFilesCanonical( }) for _, fileName := range fileNames { - file := storeFromURI(ctx, t, storeURI, clientFactory, user, ie, cf, kvDB, testSettings) + file := storeFromURI(ctx, t, storeURI, clientFactory, user, ie, ief, kvDB, testSettings) if err := file.Delete(ctx, fileName); err != nil { t.Fatal(err) } @@ -502,7 +501,7 @@ func uploadData( s, err := cloud.MakeExternalStorage(ctx, dest, base.ExternalIODirConfig{}, testSettings, nil, /* blobClientFactory */ nil, /* ie */ - nil, /* cf */ + nil, /* ief */ nil, /* kvDB */ nil, /* limiters */ ) @@ -547,7 +546,7 @@ func CheckAntagonisticRead( s, err := cloud.MakeExternalStorage(ctx, conf, base.ExternalIODirConfig{}, testSettings, nil, /* blobClientFactory */ nil, /* ie */ - nil, /* cf */ + nil, /* ief */ nil, /* kvDB */ nil, /* limiters */ ) @@ -569,7 +568,7 @@ func CheckNoPermission( storeURI string, user username.SQLUsername, ie sqlutil.InternalExecutor, - cf *descs.CollectionFactory, + ief sqlutil.InternalExecutorFactory, kvDB *kv.DB, testSettings *cluster.Settings, ) { @@ -582,7 +581,7 @@ func CheckNoPermission( } clientFactory := blobs.TestBlobServiceClient(testSettings.ExternalIODir) - s, err := cloud.MakeExternalStorage(ctx, conf, ioConf, testSettings, clientFactory, ie, cf, kvDB, nil) + s, err := cloud.MakeExternalStorage(ctx, conf, ioConf, testSettings, clientFactory, ie, ief, kvDB, nil) if err != nil { t.Fatal(err) } diff --git a/pkg/cloud/external_storage.go b/pkg/cloud/external_storage.go index 901118ce0a03..8ab8d2230eff 100644 --- a/pkg/cloud/external_storage.go +++ b/pkg/cloud/external_storage.go @@ -22,7 +22,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/security/username" "github.com/cockroachdb/cockroach/pkg/settings/cluster" - "github.com/cockroachdb/cockroach/pkg/sql/catalog/descs" "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/util/ioctx" "github.com/cockroachdb/errors" @@ -150,14 +149,14 @@ type ExternalStorageURIParser func(ExternalStorageURIContext, *url.URL) (cloudpb // ExternalStorageContext contains the dependencies passed to external storage // implementations during creation. type ExternalStorageContext struct { - IOConf base.ExternalIODirConfig - Settings *cluster.Settings - BlobClientFactory blobs.BlobClientFactory - InternalExecutor sqlutil.InternalExecutor - CollectionFactory *descs.CollectionFactory - DB *kv.DB - Options []ExternalStorageOption - Limiters Limiters + IOConf base.ExternalIODirConfig + Settings *cluster.Settings + BlobClientFactory blobs.BlobClientFactory + InternalExecutor sqlutil.InternalExecutor + InternalExecutorFactory sqlutil.InternalExecutorFactory + DB *kv.DB + Options []ExternalStorageOption + Limiters Limiters } // ExternalStorageOptions holds dependencies and values that can be diff --git a/pkg/cloud/externalconn/connection_storage.go b/pkg/cloud/externalconn/connection_storage.go index e98078e6e2eb..96309cdf75a9 100644 --- a/pkg/cloud/externalconn/connection_storage.go +++ b/pkg/cloud/externalconn/connection_storage.go @@ -92,7 +92,8 @@ func makeExternalConnectionStorage( uri.Path = path.Join(uri.Path, cfg.Path) return cloud.ExternalStorageFromURI(ctx, uri.String(), args.IOConf, args.Settings, args.BlobClientFactory, username.MakeSQLUsernameFromPreNormalizedString(cfg.User), - args.InternalExecutor, args.CollectionFactory, args.DB, args.Limiters, args.Options...) + args.InternalExecutor, args.InternalExecutorFactory, + args.DB, args.Limiters, args.Options...) default: return nil, errors.Newf("cannot connect to %T; unsupported resource for an ExternalStorage connection", d) } diff --git a/pkg/cloud/gcp/gcs_storage_test.go b/pkg/cloud/gcp/gcs_storage_test.go index cb0e302c451d..29e8456e33af 100644 --- a/pkg/cloud/gcp/gcs_storage_test.go +++ b/pkg/cloud/gcp/gcs_storage_test.go @@ -60,7 +60,16 @@ func TestPutGoogleCloud(t *testing.T) { if specified { uri += fmt.Sprintf("&%s=%s", cloud.AuthParam, cloud.AuthParamSpecified) } - cloudtestutils.CheckExportStore(t, uri, false, user, nil, nil, nil, testSettings) + cloudtestutils.CheckExportStore( + t, + uri, + false, + user, + nil, /* ie */ + nil, /* ief */ + nil, /* kvDB */ + testSettings, + ) cloudtestutils.CheckListFiles(t, fmt.Sprintf("gs://%s/%s/%s?%s=%s&%s=%s", bucket, "backup-test-specified", @@ -71,7 +80,7 @@ func TestPutGoogleCloud(t *testing.T) { url.QueryEscape(encoded), ), username.RootUserName(), nil, /* ie */ - nil, /* cf */ + nil, /* ief */ nil, /* kvDB */ testSettings, ) @@ -81,8 +90,16 @@ func TestPutGoogleCloud(t *testing.T) { skip.IgnoreLint(t, "implicit auth is not configured") } - cloudtestutils.CheckExportStore(t, fmt.Sprintf("gs://%s/%s?%s=%s", bucket, "backup-test-implicit", - cloud.AuthParam, cloud.AuthParamImplicit), false, user, nil, nil, nil, testSettings) + cloudtestutils.CheckExportStore( + t, + fmt.Sprintf("gs://%s/%s?%s=%s", bucket, "backup-test-implicit", + cloud.AuthParam, cloud.AuthParamImplicit), + false, + user, + nil, /* ie */ + nil, /* ief */ + nil, /* kvDB */ + testSettings) cloudtestutils.CheckListFiles(t, fmt.Sprintf("gs://%s/%s/%s?%s=%s", bucket, "backup-test-implicit", @@ -91,7 +108,7 @@ func TestPutGoogleCloud(t *testing.T) { cloud.AuthParamImplicit, ), username.RootUserName(), nil, /* ie */ - nil, /* cf */ + nil, /* ief */ nil, /* kvDB */ testSettings, ) @@ -118,7 +135,15 @@ func TestPutGoogleCloud(t *testing.T) { token.AccessToken, ) uri += fmt.Sprintf("&%s=%s", cloud.AuthParam, cloud.AuthParamSpecified) - cloudtestutils.CheckExportStore(t, uri, false, user, nil, nil, nil, testSettings) + cloudtestutils.CheckExportStore( + t, + uri, + false, + user, + nil, /* ie */ + nil, /* ief */ + nil, /* kvDB */ + testSettings) cloudtestutils.CheckListFiles(t, fmt.Sprintf("gs://%s/%s/%s?%s=%s&%s=%s", bucket, "backup-test-specified", @@ -129,7 +154,7 @@ func TestPutGoogleCloud(t *testing.T) { token.AccessToken, ), username.RootUserName(), nil, /* ie */ - nil, /* cf */ + nil, /* ief */ nil, /* kvDB */ testSettings, ) @@ -161,20 +186,27 @@ func TestGCSAssumeRole(t *testing.T) { cloudtestutils.CheckNoPermission(t, fmt.Sprintf("gs://%s/%s?%s=%s", limitedBucket, "backup-test-assume-role", CredentialsParam, url.QueryEscape(encoded)), user, nil, /* ie */ - nil, /* cf */ + nil, /* ief */ nil, /* kvDB */ testSettings, ) - cloudtestutils.CheckExportStore(t, fmt.Sprintf("gs://%s/%s?%s=%s&%s=%s&%s=%s", - limitedBucket, - "backup-test-assume-role", - cloud.AuthParam, - cloud.AuthParamSpecified, - AssumeRoleParam, - assumedAccount, CredentialsParam, - url.QueryEscape(encoded), - ), false, user, nil, nil, nil, testSettings) + cloudtestutils.CheckExportStore( + t, + fmt.Sprintf("gs://%s/%s?%s=%s&%s=%s&%s=%s", + limitedBucket, + "backup-test-assume-role", + cloud.AuthParam, + cloud.AuthParamSpecified, + AssumeRoleParam, + assumedAccount, CredentialsParam, + url.QueryEscape(encoded), + ), false, user, + nil, /* ie */ + nil, /* ief */ + nil, /* kvDB */ + testSettings, + ) cloudtestutils.CheckListFiles(t, fmt.Sprintf("gs://%s/%s/%s?%s=%s&%s=%s&%s=%s", limitedBucket, "backup-test-assume-role", @@ -187,7 +219,7 @@ func TestGCSAssumeRole(t *testing.T) { url.QueryEscape(encoded), ), username.RootUserName(), nil, /* ie */ - nil, /* cf */ + nil, /* ief */ nil, /* kvDB */ testSettings, ) @@ -203,7 +235,7 @@ func TestGCSAssumeRole(t *testing.T) { cloudtestutils.CheckNoPermission(t, fmt.Sprintf("gs://%s/%s?%s=%s", limitedBucket, "backup-test-assume-role", cloud.AuthParam, cloud.AuthParamImplicit), user, nil, /* ie */ - nil, /* cf */ + nil, /* ief */ nil, /* kvDB */ testSettings, ) @@ -211,7 +243,7 @@ func TestGCSAssumeRole(t *testing.T) { cloudtestutils.CheckExportStore(t, fmt.Sprintf("gs://%s/%s?%s=%s&%s=%s", limitedBucket, "backup-test-assume-role", cloud.AuthParam, cloud.AuthParamImplicit, AssumeRoleParam, assumedAccount), false, user, nil, /* ie */ - nil, /* cf */ + nil, /* ief */ nil, /* kvDB */ testSettings, ) @@ -225,7 +257,7 @@ func TestGCSAssumeRole(t *testing.T) { assumedAccount, ), username.RootUserName(), nil, /* ie */ - nil, /* cf */ + nil, /* ief */ nil, /* kvDB */ testSettings, ) @@ -269,7 +301,7 @@ func TestGCSAssumeRole(t *testing.T) { ) cloudtestutils.CheckNoPermission(t, roleURI, user, nil, /* ie */ - nil, /* cf */ + nil, /* ief */ nil, /* kvDB */ testSettings, ) @@ -285,13 +317,13 @@ func TestGCSAssumeRole(t *testing.T) { ) cloudtestutils.CheckExportStore(t, uri, false, user, nil, /* ie */ - nil, /* cf */ + nil, /* ief */ nil, /* kvDB */ testSettings, ) cloudtestutils.CheckListFiles(t, uri, user, nil, /* ie */ - nil, /* cf */ + nil, /* ief */ nil, /* kvDB */ testSettings, ) @@ -339,7 +371,7 @@ func TestFileDoesNotExist(t *testing.T) { s, err := cloud.MakeExternalStorage(context.Background(), conf, base.ExternalIODirConfig{}, testSettings, nil, /* blobClientFactory */ nil, /* ie */ - nil, /* cf */ + nil, /* ief */ nil, /* kvDB */ nil, /* limiters */ ) @@ -358,7 +390,7 @@ func TestFileDoesNotExist(t *testing.T) { s, err := cloud.MakeExternalStorage(context.Background(), conf, base.ExternalIODirConfig{}, testSettings, nil, /* blobClientFactory */ nil, /* ie */ - nil, /* cf */ + nil, /* ief */ nil, /* kvDB */ nil, /* limiters */ ) @@ -395,7 +427,7 @@ func TestCompressedGCS(t *testing.T) { s1, err := cloud.MakeExternalStorage(ctx, conf1, base.ExternalIODirConfig{}, testSettings, nil, /* blobClientFactory */ nil, /* ie */ - nil, /* cf */ + nil, /* ief */ nil, /* kvDB */ nil, /* limiters */ ) @@ -403,7 +435,7 @@ func TestCompressedGCS(t *testing.T) { s2, err := cloud.MakeExternalStorage(ctx, conf2, base.ExternalIODirConfig{}, testSettings, nil, /* blobClientFactory */ nil, /* ie */ - nil, /* cf */ + nil, /* ief */ nil, /* kvDB */ nil, /* limiters */ ) diff --git a/pkg/cloud/httpsink/http_storage_test.go b/pkg/cloud/httpsink/http_storage_test.go index 6e8823be4b3c..de84d4e7617b 100644 --- a/pkg/cloud/httpsink/http_storage_test.go +++ b/pkg/cloud/httpsink/http_storage_test.go @@ -123,7 +123,7 @@ func TestPutHttp(t *testing.T) { defer cleanup() cloudtestutils.CheckExportStore(t, srv.String(), false, user, nil, /* ie */ - nil, /* cf */ + nil, /* ief */ nil, /* kvDB */ testSettings, ) @@ -145,7 +145,7 @@ func TestPutHttp(t *testing.T) { cloudtestutils.CheckExportStore(t, combined.String(), true, user, nil, /* ie */ - nil, /* cf */ + nil, /* ief */ nil, /* kvDB */ testSettings, ) @@ -173,7 +173,7 @@ func TestPutHttp(t *testing.T) { } s, err := cloud.MakeExternalStorage(ctx, conf, base.ExternalIODirConfig{}, testSettings, blobs.TestEmptyBlobClientFactory, nil, /* ie */ - nil, /* cf */ + nil, /* ief */ nil, /* kvDB */ nil, /* limiters */ ) @@ -331,7 +331,7 @@ func TestCanDisableHttp(t *testing.T) { s, err := cloud.MakeExternalStorage(context.Background(), cloudpb.ExternalStorage{Provider: cloudpb.ExternalStorageProvider_http}, conf, testSettings, blobs.TestEmptyBlobClientFactory, nil, /* ie */ - nil, /* cf */ + nil, /* ief */ nil, /* kvDB */ nil, /* limiters */ ) @@ -354,7 +354,7 @@ func TestCanDisableOutbound(t *testing.T) { } { s, err := cloud.MakeExternalStorage(context.Background(), cloudpb.ExternalStorage{Provider: provider}, conf, testSettings, blobs.TestEmptyBlobClientFactory, nil, /* ie */ - nil, /* cf */ + nil, /* ief */ nil, /* kvDB */ nil, /* limiters */ ) @@ -388,7 +388,7 @@ func TestExternalStorageCanUseHTTPProxy(t *testing.T) { require.NoError(t, err) s, err := cloud.MakeExternalStorage(context.Background(), conf, base.ExternalIODirConfig{}, testSettings, nil, nil, /* ie */ - nil, /* cf */ + nil, /* ief */ nil, /* kvDB */ nil, /* limiters */ ) diff --git a/pkg/cloud/impl_registry.go b/pkg/cloud/impl_registry.go index 065258c0bd85..83139747d245 100644 --- a/pkg/cloud/impl_registry.go +++ b/pkg/cloud/impl_registry.go @@ -25,7 +25,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/security/username" "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" - "github.com/cockroachdb/cockroach/pkg/sql/catalog/descs" "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/util/ioctx" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -152,7 +151,7 @@ func ExternalStorageFromURI( blobClientFactory blobs.BlobClientFactory, user username.SQLUsername, ie sqlutil.InternalExecutor, - cf *descs.CollectionFactory, + ief sqlutil.InternalExecutorFactory, kvDB *kv.DB, limiters Limiters, opts ...ExternalStorageOption, @@ -162,7 +161,7 @@ func ExternalStorageFromURI( return nil, err } return MakeExternalStorage(ctx, conf, externalConfig, settings, blobClientFactory, - ie, cf, kvDB, limiters, opts...) + ie, ief, kvDB, limiters, opts...) } // SanitizeExternalStorageURI returns the external storage URI with with some @@ -207,20 +206,20 @@ func MakeExternalStorage( settings *cluster.Settings, blobClientFactory blobs.BlobClientFactory, ie sqlutil.InternalExecutor, - cf *descs.CollectionFactory, + ief sqlutil.InternalExecutorFactory, kvDB *kv.DB, limiters Limiters, opts ...ExternalStorageOption, ) (ExternalStorage, error) { args := ExternalStorageContext{ - IOConf: conf, - Settings: settings, - BlobClientFactory: blobClientFactory, - InternalExecutor: ie, - CollectionFactory: cf, - DB: kvDB, - Options: opts, - Limiters: limiters, + IOConf: conf, + Settings: settings, + BlobClientFactory: blobClientFactory, + InternalExecutor: ie, + InternalExecutorFactory: ief, + DB: kvDB, + Options: opts, + Limiters: limiters, } if conf.DisableOutbound && dest.Provider != cloudpb.ExternalStorageProvider_userfile { return nil, errors.New("external network access is disabled") diff --git a/pkg/cloud/nodelocal/nodelocal_storage_test.go b/pkg/cloud/nodelocal/nodelocal_storage_test.go index 4792c601b8e9..cfc62c526c3f 100644 --- a/pkg/cloud/nodelocal/nodelocal_storage_test.go +++ b/pkg/cloud/nodelocal/nodelocal_storage_test.go @@ -33,7 +33,7 @@ func TestPutLocal(t *testing.T) { cloudtestutils.CheckExportStore(t, dest, false, username.RootUserName(), nil, nil, nil, testSettings) cloudtestutils.CheckListFiles(t, "nodelocal://0/listing-test/basepath", username.RootUserName(), nil, /* ie */ - nil, /* cf */ + nil, /* ief */ nil, /* kvDB */ testSettings, ) diff --git a/pkg/cloud/nullsink/nullsink_storage_test.go b/pkg/cloud/nullsink/nullsink_storage_test.go index 04f898ab33d6..cb2d2a0ac316 100644 --- a/pkg/cloud/nullsink/nullsink_storage_test.go +++ b/pkg/cloud/nullsink/nullsink_storage_test.go @@ -40,7 +40,7 @@ func TestNullSinkReadAndWrite(t *testing.T) { nil, /* Cluster Settings */ nil, /* blobClientFactory */ nil, /* ie */ - nil, /* cf */ + nil, /* ief */ nil, /* kvDB */ nil, /* limiters */ ) diff --git a/pkg/cloud/userfile/BUILD.bazel b/pkg/cloud/userfile/BUILD.bazel index da46209f30b1..05ca28916e74 100644 --- a/pkg/cloud/userfile/BUILD.bazel +++ b/pkg/cloud/userfile/BUILD.bazel @@ -46,7 +46,6 @@ go_test( "//pkg/security/username", "//pkg/server", "//pkg/settings/cluster", - "//pkg/sql/catalog/descs", "//pkg/sql/sem/tree", "//pkg/sql/sqlutil", "//pkg/sql/tests", diff --git a/pkg/cloud/userfile/file_table_storage.go b/pkg/cloud/userfile/file_table_storage.go index c30b9fe7c06a..96dbcd8d98f2 100644 --- a/pkg/cloud/userfile/file_table_storage.go +++ b/pkg/cloud/userfile/file_table_storage.go @@ -122,7 +122,7 @@ func makeFileTableStorage( // cfg.User is already a normalized SQL username. user := username.MakeSQLUsernameFromPreNormalizedString(cfg.User) - executor := filetable.MakeInternalFileToTableExecutor(args.InternalExecutor, args.CollectionFactory, args.DB) + executor := filetable.MakeInternalFileToTableExecutor(args.InternalExecutor, args.InternalExecutorFactory, args.DB) fileToTableSystem, err := filetable.NewFileToTableSystem(ctx, cfg.QualifiedTableName, executor, user) diff --git a/pkg/cloud/userfile/file_table_storage_test.go b/pkg/cloud/userfile/file_table_storage_test.go index 940d009a61b0..e7dfcc03e581 100644 --- a/pkg/cloud/userfile/file_table_storage_test.go +++ b/pkg/cloud/userfile/file_table_storage_test.go @@ -23,7 +23,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/cloud/cloudtestutils" "github.com/cockroachdb/cockroach/pkg/security/username" "github.com/cockroachdb/cockroach/pkg/settings/cluster" - "github.com/cockroachdb/cockroach/pkg/sql/catalog/descs" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/sql/tests" @@ -49,21 +48,20 @@ func TestPutUserFileTable(t *testing.T) { dest := MakeUserFileStorageURI(qualifiedTableName, filename) ie := s.InternalExecutor().(sqlutil.InternalExecutor) - cf := s.CollectionFactory().(*descs.CollectionFactory) - cloudtestutils.CheckExportStore(t, dest, false, username.RootUserName(), ie, cf, kvDB, testSettings) + ief := s.InternalExecutorFactory().(sqlutil.InternalExecutorFactory) + cloudtestutils.CheckExportStore(t, dest, false, username.RootUserName(), ie, ief, kvDB, testSettings) cloudtestutils.CheckListFiles(t, "userfile://defaultdb.public.file_list_table/listing-test/basepath", - username.RootUserName(), ie, cf, kvDB, testSettings) + username.RootUserName(), ie, ief, kvDB, testSettings) t.Run("empty-qualified-table-name", func(t *testing.T) { dest := MakeUserFileStorageURI("", filename) ie := s.InternalExecutor().(sqlutil.InternalExecutor) - cf := s.CollectionFactory().(*descs.CollectionFactory) - cloudtestutils.CheckExportStore(t, dest, false, username.RootUserName(), ie, cf, kvDB, testSettings) + cloudtestutils.CheckExportStore(t, dest, false, username.RootUserName(), ie, ief, kvDB, testSettings) cloudtestutils.CheckListFilesCanonical(t, "userfile:///listing-test/basepath", "userfile://defaultdb.public.userfiles_root/listing-test/basepath", - username.RootUserName(), ie, cf, kvDB, testSettings) + username.RootUserName(), ie, ief, kvDB, testSettings) }) t.Run("reject-normalized-basename", func(t *testing.T) { @@ -71,7 +69,7 @@ func TestPutUserFileTable(t *testing.T) { userfileURL := url.URL{Scheme: "userfile", Host: qualifiedTableName, Path: ""} store, err := cloud.ExternalStorageFromURI(ctx, userfileURL.String()+"/", - base.ExternalIODirConfig{}, cluster.NoSettings, blobs.TestEmptyBlobClientFactory, username.RootUserName(), ie, cf, kvDB, nil) + base.ExternalIODirConfig{}, cluster.NoSettings, blobs.TestEmptyBlobClientFactory, username.RootUserName(), ie, ief, kvDB, nil) require.NoError(t, err) defer store.Close() @@ -109,7 +107,7 @@ func TestUserScoping(t *testing.T) { dest := MakeUserFileStorageURI(qualifiedTableName, "") ie := s.InternalExecutor().(sqlutil.InternalExecutor) - cf := s.CollectionFactory().(*descs.CollectionFactory) + ief := s.InternalExecutorFactory().(sqlutil.InternalExecutorFactory) // Create two users and grant them all privileges on defaultdb. user1 := username.MakeSQLUsernameFromPreNormalizedString("foo") @@ -119,13 +117,13 @@ func TestUserScoping(t *testing.T) { // Write file as user1. fileTableSystem1, err := cloud.ExternalStorageFromURI(ctx, dest, base.ExternalIODirConfig{}, - cluster.NoSettings, blobs.TestEmptyBlobClientFactory, user1, ie, cf, kvDB, nil) + cluster.NoSettings, blobs.TestEmptyBlobClientFactory, user1, ie, ief, kvDB, nil) require.NoError(t, err) require.NoError(t, cloud.WriteFile(ctx, fileTableSystem1, filename, bytes.NewReader([]byte("aaa")))) // Attempt to read/write file as user2 and expect to fail. fileTableSystem2, err := cloud.ExternalStorageFromURI(ctx, dest, base.ExternalIODirConfig{}, - cluster.NoSettings, blobs.TestEmptyBlobClientFactory, user2, ie, cf, kvDB, nil) + cluster.NoSettings, blobs.TestEmptyBlobClientFactory, user2, ie, ief, kvDB, nil) require.NoError(t, err) _, err = fileTableSystem2.ReadFile(ctx, filename) require.Error(t, err) @@ -133,7 +131,7 @@ func TestUserScoping(t *testing.T) { // Read file as root and expect to succeed. fileTableSystem3, err := cloud.ExternalStorageFromURI(ctx, dest, base.ExternalIODirConfig{}, - cluster.NoSettings, blobs.TestEmptyBlobClientFactory, username.RootUserName(), ie, cf, kvDB, nil) + cluster.NoSettings, blobs.TestEmptyBlobClientFactory, username.RootUserName(), ie, ief, kvDB, nil) require.NoError(t, err) _, err = fileTableSystem3.ReadFile(ctx, filename) require.NoError(t, err) diff --git a/pkg/cloud/userfile/filetable/file_table_read_writer.go b/pkg/cloud/userfile/filetable/file_table_read_writer.go index 182c6720112f..8c80b27d850f 100644 --- a/pkg/cloud/userfile/filetable/file_table_read_writer.go +++ b/pkg/cloud/userfile/filetable/file_table_read_writer.go @@ -60,9 +60,9 @@ type FileToTableSystemExecutor interface { // InternalFileToTableExecutor is the SQL query executor which uses an internal // SQL connection to interact with the database. type InternalFileToTableExecutor struct { - ie sqlutil.InternalExecutor - cf *descs.CollectionFactory - db *kv.DB + ie sqlutil.InternalExecutor + ief sqlutil.InternalExecutorFactory + db *kv.DB } var _ FileToTableSystemExecutor = &InternalFileToTableExecutor{} @@ -70,9 +70,9 @@ var _ FileToTableSystemExecutor = &InternalFileToTableExecutor{} // MakeInternalFileToTableExecutor returns an instance of a // InternalFileToTableExecutor. func MakeInternalFileToTableExecutor( - ie sqlutil.InternalExecutor, cf *descs.CollectionFactory, db *kv.DB, + ie sqlutil.InternalExecutor, ief sqlutil.InternalExecutorFactory, db *kv.DB, ) *InternalFileToTableExecutor { - return &InternalFileToTableExecutor{ie, cf, db} + return &InternalFileToTableExecutor{ie, ief, db} } // Query implements the FileToTableSystemExecutor interface. @@ -245,32 +245,33 @@ func NewFileToTableSystem( if err != nil { return nil, err } - if err := e.cf.TxnWithExecutor(ctx, e.db, nil /* SessionData */, func( - ctx context.Context, txn *kv.Txn, descriptors *descs.Collection, ie sqlutil.InternalExecutor, - ) error { - // TODO(adityamaru): Handle scenario where the user has already created - // tables with the same names not via the FileToTableSystem - // object. Not sure if we want to error out or work around it. - tablesExist, err := f.checkIfFileAndPayloadTableExist(ctx, txn, ie) - if err != nil { - return err - } - - if !tablesExist { - if err := f.createFileAndPayloadTables(ctx, txn, ie); err != nil { + if err := e.ief.(descs.TxnManager).DescsTxnWithExecutor( + ctx, e.db, nil /* SessionData */, func( + ctx context.Context, txn *kv.Txn, descriptors *descs.Collection, ie sqlutil.InternalExecutor, + ) error { + // TODO(adityamaru): Handle scenario where the user has already created + // tables with the same names not via the FileToTableSystem + // object. Not sure if we want to error out or work around it. + tablesExist, err := f.checkIfFileAndPayloadTableExist(ctx, txn, ie) + if err != nil { return err } - if err := f.grantCurrentUserTablePrivileges(ctx, txn, ie); err != nil { - return err - } + if !tablesExist { + if err := f.createFileAndPayloadTables(ctx, txn, ie); err != nil { + return err + } - if err := f.revokeOtherUserTablePrivileges(ctx, txn, ie); err != nil { - return err + if err := f.grantCurrentUserTablePrivileges(ctx, txn, ie); err != nil { + return err + } + + if err := f.revokeOtherUserTablePrivileges(ctx, txn, ie); err != nil { + return err + } } - } - return nil - }); err != nil { + return nil + }); err != nil { return nil, err } @@ -366,27 +367,28 @@ func DestroyUserFileSystem(ctx context.Context, f *FileToTableSystem) error { return err } - if err := e.cf.TxnWithExecutor(ctx, e.db, nil, func( - ctx context.Context, txn *kv.Txn, descriptors *descs.Collection, ie sqlutil.InternalExecutor, - ) error { - dropPayloadTableQuery := fmt.Sprintf(`DROP TABLE %s`, f.GetFQPayloadTableName()) - _, err := ie.ExecEx(ctx, "drop-payload-table", txn, - sessiondata.InternalExecutorOverride{User: f.username}, - dropPayloadTableQuery) - if err != nil { - return errors.Wrap(err, "failed to drop payload table") - } + if err := e.ief.(descs.TxnManager).DescsTxnWithExecutor( + ctx, e.db, nil /* sd */, func( + ctx context.Context, txn *kv.Txn, descriptors *descs.Collection, ie sqlutil.InternalExecutor, + ) error { + dropPayloadTableQuery := fmt.Sprintf(`DROP TABLE %s`, f.GetFQPayloadTableName()) + _, err := ie.ExecEx(ctx, "drop-payload-table", txn, + sessiondata.InternalExecutorOverride{User: f.username}, + dropPayloadTableQuery) + if err != nil { + return errors.Wrap(err, "failed to drop payload table") + } - dropFileTableQuery := fmt.Sprintf(`DROP TABLE %s CASCADE`, f.GetFQFileTableName()) - _, err = ie.ExecEx(ctx, "drop-file-table", txn, - sessiondata.InternalExecutorOverride{User: f.username}, - dropFileTableQuery) - if err != nil { - return errors.Wrap(err, "failed to drop file table") - } + dropFileTableQuery := fmt.Sprintf(`DROP TABLE %s CASCADE`, f.GetFQFileTableName()) + _, err = ie.ExecEx(ctx, "drop-file-table", txn, + sessiondata.InternalExecutorOverride{User: f.username}, + dropFileTableQuery) + if err != nil { + return errors.Wrap(err, "failed to drop file table") + } - return nil - }); err != nil { + return nil + }); err != nil { return err } diff --git a/pkg/cloud/userfile/filetable/filetabletest/BUILD.bazel b/pkg/cloud/userfile/filetable/filetabletest/BUILD.bazel index e99f83fa5def..8dab3835fc1d 100644 --- a/pkg/cloud/userfile/filetable/filetabletest/BUILD.bazel +++ b/pkg/cloud/userfile/filetable/filetabletest/BUILD.bazel @@ -17,7 +17,7 @@ go_test( "//pkg/security/username", "//pkg/server", "//pkg/sql", - "//pkg/sql/catalog/descs", + "//pkg/sql/sqlutil", "//pkg/sql/tests", "//pkg/testutils", "//pkg/testutils/serverutils", diff --git a/pkg/cloud/userfile/filetable/filetabletest/file_table_read_writer_test.go b/pkg/cloud/userfile/filetable/filetabletest/file_table_read_writer_test.go index d5ad2777b2b6..f368af7abe1a 100644 --- a/pkg/cloud/userfile/filetable/filetabletest/file_table_read_writer_test.go +++ b/pkg/cloud/userfile/filetable/filetabletest/file_table_read_writer_test.go @@ -22,7 +22,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/security/username" "github.com/cockroachdb/cockroach/pkg/sql" - "github.com/cockroachdb/cockroach/pkg/sql/catalog/descs" + "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/sql/tests" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" @@ -107,8 +107,11 @@ func TestListAndDeleteFiles(t *testing.T) { s, _, kvDB := serverutils.StartServer(t, params) defer s.Stopper().Stop(ctx) - executor := filetable.MakeInternalFileToTableExecutor(s.InternalExecutor().(*sql. - InternalExecutor), s.CollectionFactory().(*descs.CollectionFactory), kvDB) + executor := filetable.MakeInternalFileToTableExecutor( + s.InternalExecutor().(*sql.InternalExecutor), + s.InternalExecutorFactory().(sqlutil.InternalExecutorFactory), + kvDB, + ) fileTableReadWriter, err := filetable.NewFileToTableSystem(ctx, qualifiedTableName, executor, username.RootUserName()) require.NoError(t, err) @@ -158,8 +161,11 @@ func TestReadWriteFile(t *testing.T) { s, sqlDB, kvDB := serverutils.StartServer(t, params) defer s.Stopper().Stop(ctx) - executor := filetable.MakeInternalFileToTableExecutor(s.InternalExecutor().(*sql. - InternalExecutor), s.CollectionFactory().(*descs.CollectionFactory), kvDB) + executor := filetable.MakeInternalFileToTableExecutor( + s.InternalExecutor().(*sql.InternalExecutor), + s.InternalExecutorFactory().(sqlutil.InternalExecutorFactory), + kvDB, + ) fileTableReadWriter, err := filetable.NewFileToTableSystem(ctx, qualifiedTableName, executor, username.RootUserName()) require.NoError(t, err) @@ -341,8 +347,11 @@ func TestUserGrants(t *testing.T) { require.NoError(t, err) // Operate under non-admin user. - executor := filetable.MakeInternalFileToTableExecutor(s.InternalExecutor().(*sql. - InternalExecutor), s.CollectionFactory().(*descs.CollectionFactory), kvDB) + executor := filetable.MakeInternalFileToTableExecutor( + s.InternalExecutor().(*sql.InternalExecutor), + s.InternalExecutorFactory().(sqlutil.InternalExecutorFactory), + kvDB, + ) johnUser := username.MakeSQLUsernameFromPreNormalizedString("john") fileTableReadWriter, err := filetable.NewFileToTableSystem(ctx, qualifiedTableName, executor, johnUser) @@ -425,8 +434,11 @@ func TestDifferentUserDisallowed(t *testing.T) { require.NoError(t, err) // Operate under non-admin user john. - executor := filetable.MakeInternalFileToTableExecutor(s.InternalExecutor().(*sql. - InternalExecutor), s.CollectionFactory().(*descs.CollectionFactory), kvDB) + executor := filetable.MakeInternalFileToTableExecutor( + s.InternalExecutor().(*sql.InternalExecutor), + s.InternalExecutorFactory().(sqlutil.InternalExecutorFactory), + kvDB, + ) johnUser := username.MakeSQLUsernameFromPreNormalizedString("john") fileTableReadWriter, err := filetable.NewFileToTableSystem(ctx, qualifiedTableName, executor, johnUser) @@ -483,8 +495,11 @@ func TestDifferentRoleDisallowed(t *testing.T) { require.NoError(t, err) // Operate under non-admin user john. - executor := filetable.MakeInternalFileToTableExecutor(s.InternalExecutor().(*sql. - InternalExecutor), s.CollectionFactory().(*descs.CollectionFactory), kvDB) + executor := filetable.MakeInternalFileToTableExecutor( + s.InternalExecutor().(*sql.InternalExecutor), + s.InternalExecutorFactory().(sqlutil.InternalExecutorFactory), + kvDB, + ) johnUser := username.MakeSQLUsernameFromPreNormalizedString("john") fileTableReadWriter, err := filetable.NewFileToTableSystem(ctx, qualifiedTableName, executor, johnUser) @@ -518,8 +533,11 @@ func TestDatabaseScope(t *testing.T) { s, sqlDB, kvDB := serverutils.StartServer(t, params) defer s.Stopper().Stop(ctx) - executor := filetable.MakeInternalFileToTableExecutor(s.InternalExecutor().(*sql. - InternalExecutor), s.CollectionFactory().(*descs.CollectionFactory), kvDB) + executor := filetable.MakeInternalFileToTableExecutor( + s.InternalExecutor().(*sql.InternalExecutor), + s.InternalExecutorFactory().(sqlutil.InternalExecutorFactory), + kvDB, + ) fileTableReadWriter, err := filetable.NewFileToTableSystem(ctx, qualifiedTableName, executor, username.RootUserName()) require.NoError(t, err) diff --git a/pkg/server/api_v2_sql.go b/pkg/server/api_v2_sql.go index 7e57d076a9c6..93f69d9a5589 100644 --- a/pkg/server/api_v2_sql.go +++ b/pkg/server/api_v2_sql.go @@ -21,7 +21,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo" - "github.com/cockroachdb/cockroach/pkg/sql/catalog/descs" "github.com/cockroachdb/cockroach/pkg/sql/parser" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" @@ -373,13 +372,13 @@ func (a *apiV2Server) execSQL(w http.ResponseWriter, r *http.Request) { // We need a transaction to group the statements together. // We use TxnWithSteppingEnabled here even though we don't // use stepping below, because that buys us admission control. - cf := a.admin.server.sqlServer.execCfg.CollectionFactory + ief := a.admin.server.sqlServer.execCfg.InternalExecutorFactory runner = func(ctx context.Context, fn txnFunc) error { - return cf.TxnWithExecutor(ctx, a.admin.server.db, nil, func( - ctx context.Context, txn *kv.Txn, _ *descs.Collection, ie sqlutil.InternalExecutor, + return ief.TxnWithExecutor(ctx, a.admin.server.db, nil /* sessionData */, func( + ctx context.Context, txn *kv.Txn, ie sqlutil.InternalExecutor, ) error { return fn(ctx, txn, ie) - }, descs.SteppingEnabled()) + }, sqlutil.SteppingEnabled()) } } else { runner = func(ctx context.Context, fn func(context.Context, *kv.Txn, sqlutil.InternalExecutor) error) error { diff --git a/pkg/server/external_storage_builder.go b/pkg/server/external_storage_builder.go index d2da4d4da08f..70b758dc00de 100644 --- a/pkg/server/external_storage_builder.go +++ b/pkg/server/external_storage_builder.go @@ -24,7 +24,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/security/username" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql" - "github.com/cockroachdb/cockroach/pkg/sql/catalog/descs" + "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/errors" ) @@ -39,7 +39,7 @@ type externalStorageBuilder struct { blobClientFactory blobs.BlobClientFactory initCalled bool ie *sql.InternalExecutor - cf *descs.CollectionFactory + ief sqlutil.InternalExecutorFactory db *kv.DB limiters cloud.Limiters recorder multitenant.TenantSideExternalIORecorder @@ -53,7 +53,7 @@ func (e *externalStorageBuilder) init( nodeDialer *nodedialer.Dialer, testingKnobs base.TestingKnobs, ie *sql.InternalExecutor, - cf *descs.CollectionFactory, + ief sqlutil.InternalExecutorFactory, db *kv.DB, recorder multitenant.TenantSideExternalIORecorder, ) { @@ -69,7 +69,7 @@ func (e *externalStorageBuilder) init( e.blobClientFactory = blobClientFactory e.initCalled = true e.ie = ie - e.cf = cf + e.ief = ief e.db = db e.limiters = cloud.MakeLimiters(ctx, &settings.SV) e.recorder = recorder @@ -81,8 +81,8 @@ func (e *externalStorageBuilder) makeExternalStorage( if !e.initCalled { return nil, errors.New("cannot create external storage before init") } - return cloud.MakeExternalStorage(ctx, dest, e.conf, e.settings, e.blobClientFactory, e.ie, - e.cf, e.db, e.limiters, append(e.defaultOptions(), opts...)...) + return cloud.MakeExternalStorage(ctx, dest, e.conf, e.settings, e.blobClientFactory, e.ie, e.ief, + e.db, e.limiters, append(e.defaultOptions(), opts...)...) } func (e *externalStorageBuilder) makeExternalStorageFromURI( @@ -92,7 +92,7 @@ func (e *externalStorageBuilder) makeExternalStorageFromURI( return nil, errors.New("cannot create external storage before init") } return cloud.ExternalStorageFromURI(ctx, uri, e.conf, e.settings, e.blobClientFactory, - user, e.ie, e.cf, e.db, e.limiters, append(e.defaultOptions(), opts...)...) + user, e.ie, e.ief, e.db, e.limiters, append(e.defaultOptions(), opts...)...) } func (e *externalStorageBuilder) defaultOptions() []cloud.ExternalStorageOption { diff --git a/pkg/server/server.go b/pkg/server/server.go index 9aa0414e1980..8f0acf1c0519 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -64,7 +64,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigkvsubscriber" "github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigptsreader" "github.com/cockroachdb/cockroach/pkg/sql" - "github.com/cockroachdb/cockroach/pkg/sql/catalog/descs" _ "github.com/cockroachdb/cockroach/pkg/sql/catalog/schematelemetry" // register schedules declared outside of pkg/sql "github.com/cockroachdb/cockroach/pkg/sql/catalog/systemschema" "github.com/cockroachdb/cockroach/pkg/sql/flowinfra" @@ -483,8 +482,8 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { // which in turn needs many things. That's why everybody that needs an // InternalExecutor uses this one instance. internalExecutor := &sql.InternalExecutor{} + internalExecutorFactory := &sql.InternalExecutorFactory{} jobRegistry := &jobs.Registry{} // ditto - collectionFactory := &descs.CollectionFactory{} // Create an ExternalStorageBuilder. This is only usable after Start() where // we initialize all the configuration params. @@ -834,8 +833,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { closedSessionCache: closedSessionCache, flowScheduler: flowScheduler, circularInternalExecutor: internalExecutor, - collectionFactory: collectionFactory, - internalExecutorFactory: nil, // will be initialized in server.newSQLServer. + internalExecutorFactory: internalExecutorFactory, circularJobRegistry: jobRegistry, jobAdoptionStopFile: jobAdoptionStopFile, protectedtsProvider: protectedtsProvider, @@ -1082,7 +1080,7 @@ func (s *Server) PreStart(ctx context.Context) error { s.nodeDialer, s.cfg.TestingKnobs, &fileTableInternalExecutor, - s.sqlServer.execCfg.CollectionFactory, + s.sqlServer.execCfg.InternalExecutorFactory, s.db, nil, /* TenantExternalIORecorder */ ) diff --git a/pkg/server/server_sql.go b/pkg/server/server_sql.go index 3ca860a2d729..86a4d4322856 100644 --- a/pkg/server/server_sql.go +++ b/pkg/server/server_sql.go @@ -94,7 +94,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness" "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/slprovider" "github.com/cockroachdb/cockroach/pkg/sql/sqlstats" - "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/sql/stats" "github.com/cockroachdb/cockroach/pkg/sql/stmtdiagnostics" "github.com/cockroachdb/cockroach/pkg/startupmigrations" @@ -140,7 +139,7 @@ type SQLServer struct { execCfg *sql.ExecutorConfig cfg *BaseConfig internalExecutor *sql.InternalExecutor - internalExecutorFactory sqlutil.InternalExecutorFactory + internalExecutorFactory descs.TxnManager leaseMgr *lease.Manager blobService *blobs.Service tracingService *service.Service @@ -306,10 +305,8 @@ type sqlServerArgs struct { // TODO(tbg): make this less hacky. circularInternalExecutor *sql.InternalExecutor // empty initially - collectionFactory *descs.CollectionFactory - // internalExecutorFactory is to initialize an internal executor. - internalExecutorFactory sqlutil.InternalExecutorFactory + internalExecutorFactory *sql.InternalExecutorFactory // Stores and deletes expired liveness sessions. sqlLivenessProvider sqlliveness.Provider @@ -820,7 +817,7 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) { cfg.db, cfg.circularInternalExecutor, cfg.Settings, - collectionFactory, + cfg.internalExecutorFactory, ), QueryCache: querycache.New(cfg.QueryCacheSize), @@ -831,7 +828,7 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) { GCJobNotifier: gcJobNotifier, RangeFeedFactory: cfg.rangeFeedFactory, CollectionFactory: collectionFactory, - SystemTableIDResolver: descs.MakeSystemTableIDResolver(collectionFactory, cfg.db), + SystemTableIDResolver: descs.MakeSystemTableIDResolver(collectionFactory, cfg.internalExecutorFactory, cfg.db), ConsistencyChecker: consistencychecker.NewConsistencyChecker(cfg.db), RangeProber: rangeprober.NewRangeProber(cfg.db), DescIDGenerator: descidgen.NewGenerator(codec, cfg.db), @@ -967,8 +964,6 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) { ieFactoryMonitor, ) - collectionFactory.SetInternalExecutorWithTxn(ieFactory) - distSQLServer.ServerConfig.InternalExecutorFactory = ieFactory jobRegistry.SetInternalExecutorFactory(ieFactory) execCfg.IndexBackfiller = sql.NewIndexBackfiller(execCfg) @@ -990,8 +985,7 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) { cfg.registry.AddMetricStruct(m) } *cfg.circularInternalExecutor = sql.MakeInternalExecutor(pgServer.SQLServer, internalMemMetrics, ieFactoryMonitor) - *cfg.collectionFactory = *collectionFactory - cfg.internalExecutorFactory = ieFactory + *cfg.internalExecutorFactory = *ieFactory execCfg.InternalExecutor = cfg.circularInternalExecutor stmtDiagnosticsRegistry := stmtdiagnostics.NewRegistry( @@ -1088,6 +1082,7 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) { cfg.sqlStatusServer, cfg.isMeta1Leaseholder, sqlExecutorTestingKnobs, + ieFactory, collectionFactory, ) diff --git a/pkg/server/status.go b/pkg/server/status.go index fbedc6c9ea15..0a8f8b2c71b3 100644 --- a/pkg/server/status.go +++ b/pkg/server/status.go @@ -2428,7 +2428,7 @@ func (s *statusServer) HotRangesV2( schemaName = meta.(tableMeta).schemaName indexName = meta.(tableMeta).indexName } else { - if err = s.sqlServer.distSQLServer.CollectionFactory.TxnWithExecutor( + if err = s.sqlServer.distSQLServer.InternalExecutorFactory.DescsTxnWithExecutor( ctx, s.db, nil, func(ctx context.Context, txn *kv.Txn, col *descs.Collection, ie sqlutil.InternalExecutor) error { commonLookupFlags := tree.CommonLookupFlags{ Required: false, diff --git a/pkg/server/tenant.go b/pkg/server/tenant.go index e83e31883b9d..08230213edf0 100644 --- a/pkg/server/tenant.go +++ b/pkg/server/tenant.go @@ -42,7 +42,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/server/systemconfigwatcher" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql" - "github.com/cockroachdb/cockroach/pkg/sql/catalog/descs" "github.com/cockroachdb/cockroach/pkg/sql/flowinfra" "github.com/cockroachdb/cockroach/pkg/sql/optionalnodeliveness" "github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb" @@ -500,8 +499,8 @@ func makeTenantSQLServerArgs( ) circularInternalExecutor := &sql.InternalExecutor{} + internalExecutorFactory := &sql.InternalExecutorFactory{} circularJobRegistry := &jobs.Registry{} - collectionFactory := &descs.CollectionFactory{} // Initialize the protectedts subsystem in multi-tenant clusters. var protectedTSProvider protectedts.Provider @@ -541,7 +540,7 @@ func makeTenantSQLServerArgs( nodeDialer, baseCfg.TestingKnobs, circularInternalExecutor, - collectionFactory, + internalExecutorFactory, db, costController, ) @@ -610,7 +609,7 @@ func makeTenantSQLServerArgs( sessionRegistry: sessionRegistry, flowScheduler: flowScheduler, circularInternalExecutor: circularInternalExecutor, - collectionFactory: collectionFactory, + internalExecutorFactory: internalExecutorFactory, circularJobRegistry: circularJobRegistry, protectedtsProvider: protectedTSProvider, rangeFeedFactory: rangeFeedFactory, diff --git a/pkg/server/testserver.go b/pkg/server/testserver.go index 07df983ad22c..646a69c1bc80 100644 --- a/pkg/server/testserver.go +++ b/pkg/server/testserver.go @@ -1107,6 +1107,11 @@ func (ts *TestServer) InternalExecutor() interface{} { return ts.sqlServer.internalExecutor } +// InternalExecutorFactory is part of TestServerInterface. +func (ts *TestServer) InternalExecutorFactory() interface{} { + return ts.sqlServer.internalExecutorFactory +} + // GetNode exposes the Server's Node. func (ts *TestServer) GetNode() *Node { return ts.node diff --git a/pkg/sql/authorization_test.go b/pkg/sql/authorization_test.go index c628a982382f..383725a14052 100644 --- a/pkg/sql/authorization_test.go +++ b/pkg/sql/authorization_test.go @@ -40,9 +40,9 @@ func TestCheckAnyPrivilegeForNodeUser(t *testing.T) { require.NotNil(t, ts.InternalExecutor()) - cf := ts.CollectionFactory().(*descs.CollectionFactory) + ief := ts.InternalExecutorFactory().(descs.TxnManager) - if err := cf.TxnWithExecutor(ctx, s.DB(), nil, func( + if err := ief.DescsTxnWithExecutor(ctx, s.DB(), nil /* sessionData */, func( ctx context.Context, txn *kv.Txn, descriptors *descs.Collection, ie sqlutil.InternalExecutor, ) error { row, err := ie.QueryRowEx( diff --git a/pkg/sql/catalog/descs/BUILD.bazel b/pkg/sql/catalog/descs/BUILD.bazel index 52d4a1e9d4ec..4f4fdb62fa01 100644 --- a/pkg/sql/catalog/descs/BUILD.bazel +++ b/pkg/sql/catalog/descs/BUILD.bazel @@ -55,7 +55,6 @@ go_library( "//pkg/sql/sem/catconstants", "//pkg/sql/sem/tree", "//pkg/sql/sessiondata", - "//pkg/sql/sessiondatapb", "//pkg/sql/sqlerrors", "//pkg/sql/sqlliveness", "//pkg/sql/sqlutil", diff --git a/pkg/sql/catalog/descs/collection.go b/pkg/sql/catalog/descs/collection.go index f37a2b5c7cd0..6eb7c02d20ef 100644 --- a/pkg/sql/catalog/descs/collection.go +++ b/pkg/sql/catalog/descs/collection.go @@ -138,6 +138,11 @@ type Collection struct { var _ catalog.Accessor = (*Collection)(nil) +// GetDeletedDescs returns the deleted descriptors of the collection. +func (tc *Collection) GetDeletedDescs() catalog.DescriptorIDSet { + return tc.deletedDescs +} + // MaybeUpdateDeadline updates the deadline in a given transaction // based on the leased descriptors in this collection. This update is // only done when a deadline exists. diff --git a/pkg/sql/catalog/descs/collection_test.go b/pkg/sql/catalog/descs/collection_test.go index e75b9afd560d..3a7df5677c33 100644 --- a/pkg/sql/catalog/descs/collection_test.go +++ b/pkg/sql/catalog/descs/collection_test.go @@ -1036,7 +1036,7 @@ SELECT id ec := s.ExecutorConfig().(sql.ExecutorConfig) codec := ec.Codec descIDGen := ec.DescIDGenerator - require.NoError(t, s.CollectionFactory().(*descs.CollectionFactory).TxnWithExecutor(ctx, s.DB(), nil /* sessionData */, func( + require.NoError(t, ec.InternalExecutorFactory.DescsTxnWithExecutor(ctx, s.DB(), nil /* sessionData */, func( ctx context.Context, txn *kv.Txn, descriptors *descs.Collection, ie sqlutil.InternalExecutor, ) error { checkImmutableDescriptor := func(id descpb.ID, expName string, f func(t *testing.T, desc catalog.Descriptor)) error { diff --git a/pkg/sql/catalog/descs/factory.go b/pkg/sql/catalog/descs/factory.go index aa8b1460445e..32fb3bf1040e 100644 --- a/pkg/sql/catalog/descs/factory.go +++ b/pkg/sql/catalog/descs/factory.go @@ -15,7 +15,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" - "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/spanconfig" "github.com/cockroachdb/cockroach/pkg/sql/catalog" @@ -38,21 +37,42 @@ type CollectionFactory struct { spanConfigSplitter spanconfig.Splitter spanConfigLimiter spanconfig.Limiter defaultMonitor *mon.BytesMonitor - ieFactoryWithTxn InternalExecutorFactoryWithTxn } -// InternalExecutorFactoryWithTxn is used to create an internal executor -// with associated extra txn state information. -// It should only be used as a field hanging off CollectionFactory. -type InternalExecutorFactoryWithTxn interface { - MemoryMonitor() *mon.BytesMonitor +// GetClusterSettings returns the cluster setting from the collection factory. +func (cf *CollectionFactory) GetClusterSettings() *cluster.Settings { + return cf.settings +} + +// TxnManager is used to enable running multiple queries with an internal +// executor in a transactional manner. +type TxnManager interface { + sqlutil.InternalExecutorFactory - NewInternalExecutorWithTxn( + // DescsTxnWithExecutor enables using an internal executor to run sql + // statements in a transactional manner. It creates a descriptor collection + // that lives within the scope of the passed in TxnWithExecutorFunc, and + // also ensures that the internal executor also share the same descriptor + // collection. Please use this interface if you want to run multiple sql + // statement with an internal executor in a txn. + DescsTxnWithExecutor( + ctx context.Context, + db *kv.DB, sd *sessiondata.SessionData, - sv *settings.Values, - txn *kv.Txn, - descCol *Collection, - ) (sqlutil.InternalExecutor, InternalExecutorCommitTxnFunc) + f TxnWithExecutorFunc, + opts ...sqlutil.TxnOption, + ) error + + // DescsTxn is similar to DescsTxnWithExecutor but without an internal executor. + // It creates a descriptor collection that lives within the scope of the given + // function, and is a convenient method for running a transaction on + // them. + DescsTxn( + ctx context.Context, + db *kv.DB, + f func(context.Context, *kv.Txn, *Collection) error, + opts ...sqlutil.TxnOption, + ) error } // InternalExecutorCommitTxnFunc is to commit the txn associated with an @@ -108,11 +128,3 @@ func (cf *CollectionFactory) NewCollection( return newCollection(ctx, cf.leaseMgr, cf.settings, cf.codec, cf.hydrated, cf.systemDatabase, cf.virtualSchemas, temporarySchemaProvider, monitor) } - -// SetInternalExecutorWithTxn is to set the internal executor factory hanging -// off the collection factory. -func (cf *CollectionFactory) SetInternalExecutorWithTxn( - ieFactoryWithTxn InternalExecutorFactoryWithTxn, -) { - cf.ieFactoryWithTxn = ieFactoryWithTxn -} diff --git a/pkg/sql/catalog/descs/system_table.go b/pkg/sql/catalog/descs/system_table.go index 0191ea1c1149..b97b32d16afa 100644 --- a/pkg/sql/catalog/descs/system_table.go +++ b/pkg/sql/catalog/descs/system_table.go @@ -21,19 +21,21 @@ import ( // systemTableIDResolver is the implementation for catalog.SystemTableIDResolver. type systemTableIDResolver struct { - collectionFactory *CollectionFactory - db *kv.DB + collectionFactory *CollectionFactory + internalExecutorFactory TxnManager + db *kv.DB } var _ catalog.SystemTableIDResolver = (*systemTableIDResolver)(nil) // MakeSystemTableIDResolver creates an object that implements catalog.SystemTableIDResolver. func MakeSystemTableIDResolver( - collectionFactory *CollectionFactory, db *kv.DB, + collectionFactory *CollectionFactory, internalExecutorFactory TxnManager, db *kv.DB, ) catalog.SystemTableIDResolver { return &systemTableIDResolver{ - collectionFactory: collectionFactory, - db: db, + collectionFactory: collectionFactory, + internalExecutorFactory: internalExecutorFactory, + db: db, } } @@ -43,7 +45,7 @@ func (r *systemTableIDResolver) LookupSystemTableID( ) (descpb.ID, error) { var id descpb.ID - if err := r.collectionFactory.Txn(ctx, r.db, func( + if err := r.internalExecutorFactory.DescsTxn(ctx, r.db, func( ctx context.Context, txn *kv.Txn, descriptors *Collection, ) (err error) { id, err = descriptors.stored.LookupDescriptorID( diff --git a/pkg/sql/catalog/descs/txn.go b/pkg/sql/catalog/descs/txn.go index 0b3bba3168ef..559ba6524d8f 100644 --- a/pkg/sql/catalog/descs/txn.go +++ b/pkg/sql/catalog/descs/txn.go @@ -12,74 +12,19 @@ package descs import ( "context" - "time" "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/spanconfig" - "github.com/cockroachdb/cockroach/pkg/sql/catalog" "github.com/cockroachdb/cockroach/pkg/sql/catalog/lease" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" - "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" - "github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb" "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/retry" "github.com/cockroachdb/errors" ) -// Txn enables callers to run transactions with a *Collection such that all -// retrieved immutable descriptors are properly leased and all mutable -// descriptors are handled. The function deals with verifying the two version -// invariant and retrying when it is violated. Callers need not worry that they -// write mutable descriptors multiple times. The call will explicitly wait for -// the leases to drain on old versions of descriptors modified or deleted in the -// transaction; callers do not need to call lease.WaitForOneVersion. -// -// The passed transaction is pre-emptively anchored to the system config key on -// the system tenant. -// Deprecated: Use cf.TxnWithExecutor(). -func (cf *CollectionFactory) Txn( - ctx context.Context, - db *kv.DB, - f func(ctx context.Context, txn *kv.Txn, descriptors *Collection) error, - opts ...TxnOption, -) error { - return cf.TxnWithExecutor(ctx, db, nil /* sessionData */, func( - ctx context.Context, txn *kv.Txn, descriptors *Collection, _ sqlutil.InternalExecutor, - ) error { - return f(ctx, txn, descriptors) - }, opts...) -} - -// TxnOption is used to configure a Txn or TxnWithExecutor. -type TxnOption interface { - apply(*txnConfig) -} - -type txnConfig struct { - steppingEnabled bool -} - -type txnOptionFn func(options *txnConfig) - -func (f txnOptionFn) apply(options *txnConfig) { f(options) } - -var steppingEnabled = txnOptionFn(func(o *txnConfig) { - o.steppingEnabled = true -}) - -// SteppingEnabled creates a TxnOption to determine whether the underlying -// transaction should have stepping enabled. If stepping is enabled, the -// transaction will implicitly use lower admission priority. However, the -// user will need to remember to Step the Txn to make writes visible. The -// InternalExecutor will automatically (for better or for worse) step the -// transaction when executing each statement. -func SteppingEnabled() TxnOption { - return steppingEnabled -} - // TxnWithExecutorFunc is used to run a transaction in the context of a // Collection and an InternalExecutor. type TxnWithExecutorFunc = func( @@ -89,97 +34,6 @@ type TxnWithExecutorFunc = func( ie sqlutil.InternalExecutor, ) error -// TxnWithExecutor enables callers to run transactions with a *Collection such that all -// retrieved immutable descriptors are properly leased and all mutable -// descriptors are handled. The function deals with verifying the two version -// invariant and retrying when it is violated. Callers need not worry that they -// write mutable descriptors multiple times. The call will explicitly wait for -// the leases to drain on old versions of descriptors modified or deleted in the -// transaction; callers do not need to call lease.WaitForOneVersion. -// It also enables using internal executor to run sql queries in a txn manner. -// -// The passed transaction is pre-emptively anchored to the system config key on -// the system tenant. -func (cf *CollectionFactory) TxnWithExecutor( - ctx context.Context, - db *kv.DB, - sd *sessiondata.SessionData, - f TxnWithExecutorFunc, - opts ...TxnOption, -) error { - var config txnConfig - for _, opt := range opts { - opt.apply(&config) - } - run := db.Txn - if config.steppingEnabled { - type kvTxnFunc = func(context.Context, *kv.Txn) error - run = func(ctx context.Context, f kvTxnFunc) error { - return db.TxnWithSteppingEnabled(ctx, sessiondatapb.Normal, f) - } - } - - // Waits for descriptors that were modified, skipping - // over ones that had their descriptor wiped. - waitForDescriptors := func(modifiedDescriptors []lease.IDVersion, deletedDescs catalog.DescriptorIDSet) error { - // Wait for a single version on leased descriptors. - for _, ld := range modifiedDescriptors { - waitForNoVersion := deletedDescs.Contains(ld.ID) - retryOpts := retry.Options{ - InitialBackoff: time.Millisecond, - Multiplier: 1.5, - MaxBackoff: time.Second, - } - // Detect unpublished ones. - if waitForNoVersion { - err := cf.leaseMgr.WaitForNoVersion(ctx, ld.ID, retryOpts) - if err != nil { - return err - } - } else { - _, err := cf.leaseMgr.WaitForOneVersion(ctx, ld.ID, retryOpts) - // If the descriptor has been deleted, just wait for leases to drain. - if errors.Is(err, catalog.ErrDescriptorNotFound) { - err = cf.leaseMgr.WaitForNoVersion(ctx, ld.ID, retryOpts) - } - if err != nil { - return err - } - } - } - return nil - } - for { - var withNewVersion []lease.IDVersion - var deletedDescs catalog.DescriptorIDSet - if err := run(ctx, func(ctx context.Context, txn *kv.Txn) (err error) { - withNewVersion, deletedDescs = nil, catalog.DescriptorIDSet{} - descsCol := cf.NewCollection( - ctx, nil, /* temporarySchemaProvider */ - cf.ieFactoryWithTxn.MemoryMonitor(), - ) - defer descsCol.ReleaseAll(ctx) - ie, commitTxnFn := cf.ieFactoryWithTxn.NewInternalExecutorWithTxn(sd, &cf.settings.SV, txn, descsCol) - if err := f(ctx, txn, descsCol, ie); err != nil { - return err - } - deletedDescs = descsCol.deletedDescs - withNewVersion, err = descsCol.GetOriginalPreviousIDVersionsForUncommitted() - if err != nil { - return err - } - return commitTxnFn(ctx) - }); IsTwoVersionInvariantViolationError(err) { - continue - } else { - if err == nil { - err = waitForDescriptors(withNewVersion, deletedDescs) - } - return err - } - } -} - // CheckTwoVersionInvariant checks whether any new schema being modified written // at a version V has only valid leases at version = V - 1. A transaction retry // error as well as a boolean is returned whenever the invariant is violated. diff --git a/pkg/sql/catalog/descs/txn_external_test.go b/pkg/sql/catalog/descs/txn_external_test.go index 941610269f51..f9cc296d50aa 100644 --- a/pkg/sql/catalog/descs/txn_external_test.go +++ b/pkg/sql/catalog/descs/txn_external_test.go @@ -17,6 +17,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descs" + "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -34,12 +35,12 @@ func TestTxnWithStepping(t *testing.T) { s, _, kvDB := serverutils.StartServer(t, base.TestServerArgs{}) defer s.Stopper().Stop(ctx) - cf := s.CollectionFactory().(*descs.CollectionFactory) + ief := s.InternalExecutorFactory().(descs.TxnManager) scratchKey, err := s.ScratchRange() require.NoError(t, err) // Write a key, read in the transaction without stepping, ensure we // do not see the value, step the transaction, then ensure that we do. - require.NoError(t, cf.Txn(ctx, kvDB, func( + require.NoError(t, ief.DescsTxn(ctx, kvDB, func( ctx context.Context, txn *kv.Txn, descriptors *descs.Collection, ) error { if err := txn.Put(ctx, scratchKey, 1); err != nil { @@ -67,5 +68,5 @@ func TestTxnWithStepping(t *testing.T) { } } return nil - }, descs.SteppingEnabled())) + }, sqlutil.SteppingEnabled())) } diff --git a/pkg/sql/catalog/descs/txn_with_executor_datadriven_test.go b/pkg/sql/catalog/descs/txn_with_executor_datadriven_test.go index 3a7748368f24..b631ec73c498 100644 --- a/pkg/sql/catalog/descs/txn_with_executor_datadriven_test.go +++ b/pkg/sql/catalog/descs/txn_with_executor_datadriven_test.go @@ -51,7 +51,6 @@ func TestTxnWithExecutorDataDriven(t *testing.T) { datadriven.Walk(t, testutils.TestDataPath(t, ""), func(t *testing.T, path string) { s, _, kvDB := serverutils.StartServer(t, base.TestServerArgs{}) defer s.Stopper().Stop(ctx) - cf := s.CollectionFactory().(*descs.CollectionFactory) datadriven.RunTest(t, path, func(t *testing.T, d *datadriven.TestData) string { stmts, err := parser.Parse(d.Input) require.NoError(t, err) @@ -70,7 +69,8 @@ func TestTxnWithExecutorDataDriven(t *testing.T) { searchPath = sessiondata.MakeSearchPath(strings.Split(sp, ",")) } sd.SearchPath = &searchPath - err = cf.TxnWithExecutor(ctx, kvDB, nil /* sessionData */, func( + ief := s.InternalExecutorFactory().(descs.TxnManager) + err = ief.DescsTxnWithExecutor(ctx, kvDB, nil /* sessionData */, func( ctx context.Context, txn *kv.Txn, descriptors *descs.Collection, ie sqlutil.InternalExecutor, ) error { for _, stmt := range stmts { diff --git a/pkg/sql/conn_executor.go b/pkg/sql/conn_executor.go index 91ead9cc1415..ec023733f6ec 100644 --- a/pkg/sql/conn_executor.go +++ b/pkg/sql/conn_executor.go @@ -1119,7 +1119,7 @@ func (ex *connExecutor) close(ctx context.Context, closeType closeType) { err := cleanupSessionTempObjects( ctx, ex.server.cfg.Settings, - ex.server.cfg.CollectionFactory, + ex.server.cfg.InternalExecutorFactory, ex.server.cfg.DB, ex.server.cfg.Codec, ex.sessionID, diff --git a/pkg/sql/exec_util.go b/pkg/sql/exec_util.go index 977c75e18a8c..e51f477fc506 100644 --- a/pkg/sql/exec_util.go +++ b/pkg/sql/exec_util.go @@ -91,7 +91,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sessionphase" "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness" "github.com/cockroachdb/cockroach/pkg/sql/sqlstats" - "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/sql/stats" "github.com/cockroachdb/cockroach/pkg/sql/stmtdiagnostics" "github.com/cockroachdb/cockroach/pkg/sql/types" @@ -1311,10 +1310,9 @@ type ExecutorConfig struct { // records. SpanConfigKVAccessor spanconfig.KVAccessor - // InternalExecutorFactory is used to create an InternalExecutor binded with + // InternalExecutorFactory is used to create an InternalExecutor bound with // SessionData and other ExtraTxnState. - // This is currently only for builtin functions where we need to execute sql. - InternalExecutorFactory sqlutil.InternalExecutorFactory + InternalExecutorFactory descs.TxnManager // ConsistencyChecker is to generate the results in calls to // crdb_internal.check_consistency. @@ -3452,7 +3450,7 @@ func DescsTxn( execCfg *ExecutorConfig, f func(ctx context.Context, txn *kv.Txn, col *descs.Collection) error, ) error { - return execCfg.CollectionFactory.Txn(ctx, execCfg.DB, f) + return execCfg.InternalExecutorFactory.DescsTxn(ctx, execCfg.DB, f) } // TestingDescsTxn is a convenience function for running a transaction on diff --git a/pkg/sql/execinfra/server_config.go b/pkg/sql/execinfra/server_config.go index 957fb11c3da1..c74c3ca46442 100644 --- a/pkg/sql/execinfra/server_config.go +++ b/pkg/sql/execinfra/server_config.go @@ -149,7 +149,7 @@ type ServerConfig struct { // InternalExecutorFactory is used to construct session-bound // executors. The idea is that a higher-layer binds some of the arguments // required, so that users of ServerConfig don't have to care about them. - InternalExecutorFactory sqlutil.InternalExecutorFactory + InternalExecutorFactory descs.TxnManager ExternalStorage cloud.ExternalStorageFactory ExternalStorageFromURI cloud.ExternalStorageFromURIFactory diff --git a/pkg/sql/importer/BUILD.bazel b/pkg/sql/importer/BUILD.bazel index 096007ca1f44..c15bf93ea286 100644 --- a/pkg/sql/importer/BUILD.bazel +++ b/pkg/sql/importer/BUILD.bazel @@ -202,6 +202,7 @@ go_test( "//pkg/sql/sem/eval", "//pkg/sql/sem/tree", "//pkg/sql/sessiondata", + "//pkg/sql/sqlutil", "//pkg/sql/stats", "//pkg/sql/tests", "//pkg/sql/types", diff --git a/pkg/sql/importer/import_processor_test.go b/pkg/sql/importer/import_processor_test.go index eade356105f7..e0b15e3230a6 100644 --- a/pkg/sql/importer/import_processor_test.go +++ b/pkg/sql/importer/import_processor_test.go @@ -881,7 +881,7 @@ func externalStorageFactory( return cloud.MakeExternalStorage(ctx, dest, base.ExternalIODirConfig{}, nil, blobs.TestBlobServiceClient(workdir), nil, /* ie */ - nil, /* cf */ + nil, /* ief */ nil, /* kvDB */ nil, /* limiters */ ) diff --git a/pkg/sql/importer/import_stmt_test.go b/pkg/sql/importer/import_stmt_test.go index 5196278dab04..66bf29231c61 100644 --- a/pkg/sql/importer/import_stmt_test.go +++ b/pkg/sql/importer/import_stmt_test.go @@ -55,6 +55,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/row" "github.com/cockroachdb/cockroach/pkg/sql/sem/eval" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/sql/stats" "github.com/cockroachdb/cockroach/pkg/sql/tests" "github.com/cockroachdb/cockroach/pkg/storage" @@ -2702,9 +2703,9 @@ func TestImportObjectLevelRBAC(t *testing.T) { writeToUserfile := func(filename, data string) { // Write to userfile storage now that testuser has CREATE privileges. ie := tc.Server(0).InternalExecutor().(*sql.InternalExecutor) - cf := tc.Server(0).CollectionFactory().(*descs.CollectionFactory) + ief := tc.Server(0).InternalExecutorFactory().(sqlutil.InternalExecutorFactory) fileTableSystem1, err := cloud.ExternalStorageFromURI(ctx, dest, base.ExternalIODirConfig{}, - cluster.NoSettings, blobs.TestEmptyBlobClientFactory, username.TestUserName(), ie, cf, tc.Server(0).DB(), nil) + cluster.NoSettings, blobs.TestEmptyBlobClientFactory, username.TestUserName(), ie, ief, tc.Server(0).DB(), nil) require.NoError(t, err) require.NoError(t, cloud.WriteFile(ctx, fileTableSystem1, filename, bytes.NewReader([]byte(data)))) } @@ -5852,7 +5853,7 @@ func TestImportPgDumpIgnoredStmts(t *testing.T) { blobs.TestEmptyBlobClientFactory, username.RootUserName(), tc.Server(0).InternalExecutor().(*sql.InternalExecutor), - tc.Server(0).CollectionFactory().(*descs.CollectionFactory), + tc.Server(0).InternalExecutorFactory().(sqlutil.InternalExecutorFactory), tc.Server(0).DB(), nil, ) diff --git a/pkg/sql/internal.go b/pkg/sql/internal.go index a52786c7c818..32c813211e37 100644 --- a/pkg/sql/internal.go +++ b/pkg/sql/internal.go @@ -27,6 +27,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descs" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/lease" "github.com/cockroachdb/cockroach/pkg/sql/parser" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgwirebase" "github.com/cockroachdb/cockroach/pkg/sql/sem/catconstants" @@ -40,6 +41,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/util/fsm" "github.com/cockroachdb/cockroach/pkg/util/mon" + "github.com/cockroachdb/cockroach/pkg/util/retry" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/cockroachdb/errors" @@ -111,6 +113,8 @@ func (ie *InternalExecutor) WithSyntheticDescriptors( } // MakeInternalExecutor creates an InternalExecutor. +// TODO (janexing): usage of it should be deprecated with `DescsTxnWithExecutor()` +// or `RunWithoutTxn()`. func MakeInternalExecutor( s *Server, memMetrics MemoryMetrics, monitor *mon.BytesMonitor, ) InternalExecutor { @@ -121,55 +125,6 @@ func MakeInternalExecutor( } } -// newInternalExecutorWithTxn creates an Internal Executor with txn related -// information, and also a function that can be called to commit the txn. -// This function should only be used in the implementation of -// descs.CollectionFactory's InternalExecutorFactoryWithTxn. -// TODO (janexing): This function will be soon refactored after we change -// the internal executor infrastructure with a single conn executor for all -// sql statement executions within a txn. -func newInternalExecutorWithTxn( - s *Server, - sd *sessiondata.SessionData, - txn *kv.Txn, - memMetrics MemoryMetrics, - monitor *mon.BytesMonitor, - descCol *descs.Collection, -) (*InternalExecutor, descs.InternalExecutorCommitTxnFunc) { - schemaChangerState := &SchemaChangerState{ - mode: sd.NewSchemaChangerMode, - } - ie := InternalExecutor{ - s: s, - mon: monitor, - memMetrics: memMetrics, - extraTxnState: &extraTxnState{ - txn: txn, - descCollection: descCol, - jobs: new(jobsCollection), - schemaChangeJobRecords: make(map[descpb.ID]*jobs.Record), - schemaChangerState: schemaChangerState, - }, - } - ie.s.populateMinimalSessionData(sd) - ie.sessionDataStack = sessiondata.NewStack(sd) - - commitTxnFunc := func(ctx context.Context) error { - defer func() { - ie.extraTxnState.jobs.reset() - ie.releaseSchemaChangeJobRecords() - }() - if err := ie.commitTxn(ctx); err != nil { - return err - } - return ie.s.cfg.JobRegistry.Run( - ctx, ie.s.cfg.InternalExecutor, *ie.extraTxnState.jobs, - ) - } - - return &ie, commitTxnFunc -} - // MakeInternalExecutorMemMonitor creates and starts memory monitor for an // InternalExecutor. func MakeInternalExecutorMemMonitor( @@ -1286,12 +1241,6 @@ type InternalExecutorFactory struct { monitor *mon.BytesMonitor } -// MemoryMonitor returns the monitor which should be used when constructing -// things in the context of an internal executor created by this factory. -func (ief *InternalExecutorFactory) MemoryMonitor() *mon.BytesMonitor { - return ief.monitor -} - // NewInternalExecutorFactory returns a new internal executor factory. func NewInternalExecutorFactory( s *Server, memMetrics MemoryMetrics, monitor *mon.BytesMonitor, @@ -1304,10 +1253,11 @@ func NewInternalExecutorFactory( } var _ sqlutil.InternalExecutorFactory = &InternalExecutorFactory{} -var _ descs.InternalExecutorFactoryWithTxn = &InternalExecutorFactory{} +var _ descs.TxnManager = &InternalExecutorFactory{} // NewInternalExecutor constructs a new internal executor. -// TODO (janexing): this should be deprecated soon. +// TODO (janexing): usage of it should be deprecated with `DescsTxnWithExecutor()` +// or `RunWithoutTxn()`. func (ief *InternalExecutorFactory) NewInternalExecutor( sd *sessiondata.SessionData, ) sqlutil.InternalExecutor { @@ -1316,12 +1266,14 @@ func (ief *InternalExecutorFactory) NewInternalExecutor( return &ie } -// NewInternalExecutorWithTxn creates an internal executor with txn-related info, -// such as descriptor collection and schema change job records, etc. It should -// be called only after InternalExecutorFactory.NewInternalExecutor is already -// called to construct the InternalExecutorFactory with required server info. -// This function should only be used under CollectionFactory.TxnWithExecutor(). -func (ief *InternalExecutorFactory) NewInternalExecutorWithTxn( +// newInternalExecutorWithTxn creates an internal executor with txn-related info, +// such as descriptor collection and schema change job records, etc. +// This function should only be used under +// InternalExecutorFactory.DescsTxnWithExecutor(). +// TODO (janexing): This function will be soon refactored after we change +// the internal executor infrastructure with a single conn executor for all +// sql statement executions within a txn. +func (ief *InternalExecutorFactory) newInternalExecutorWithTxn( sd *sessiondata.SessionData, sv *settings.Values, txn *kv.Txn, descCol *descs.Collection, ) (sqlutil.InternalExecutor, descs.InternalExecutorCommitTxnFunc) { // By default, if not given session data, we initialize a sessionData that @@ -1335,16 +1287,39 @@ func (ief *InternalExecutorFactory) NewInternalExecutorWithTxn( sd = NewFakeSessionData(sv) sd.UserProto = username.RootUserName().EncodeProto() } - ie, commitTxnFunc := newInternalExecutorWithTxn( - ief.server, - sd, - txn, - ief.memMetrics, - ief.monitor, - descCol, - ) - return ie, commitTxnFunc + schemaChangerState := &SchemaChangerState{ + mode: sd.NewSchemaChangerMode, + } + ie := InternalExecutor{ + s: ief.server, + mon: ief.monitor, + memMetrics: ief.memMetrics, + extraTxnState: &extraTxnState{ + txn: txn, + descCollection: descCol, + jobs: new(jobsCollection), + schemaChangeJobRecords: make(map[descpb.ID]*jobs.Record), + schemaChangerState: schemaChangerState, + }, + } + ie.s.populateMinimalSessionData(sd) + ie.sessionDataStack = sessiondata.NewStack(sd) + + commitTxnFunc := func(ctx context.Context) error { + defer func() { + ie.extraTxnState.jobs.reset() + ie.releaseSchemaChangeJobRecords() + }() + if err := ie.commitTxn(ctx); err != nil { + return err + } + return ie.s.cfg.JobRegistry.Run( + ctx, ie.s.cfg.InternalExecutor, *ie.extraTxnState.jobs, + ) + } + + return &ie, commitTxnFunc } // RunWithoutTxn is to create an internal executor without binding to a txn, @@ -1355,3 +1330,147 @@ func (ief *InternalExecutorFactory) RunWithoutTxn( ie := ief.NewInternalExecutor(nil /* sessionData */) return run(ctx, ie) } + +type kvTxnFunc = func(context.Context, *kv.Txn) error + +// ApplyTxnOptions is to apply the txn options and returns the txn generator +// function. +func ApplyTxnOptions( + db *kv.DB, opts ...sqlutil.TxnOption, +) func(ctx context.Context, f kvTxnFunc) error { + var config sqlutil.TxnConfig + for _, opt := range opts { + opt.Apply(&config) + } + run := db.Txn + + if config.GetSteppingEnabled() { + + run = func(ctx context.Context, f kvTxnFunc) error { + return db.TxnWithSteppingEnabled(ctx, sessiondatapb.Normal, f) + } + } + return run +} + +// DescsTxnWithExecutor enables callers to run transactions with a *Collection +// such that all retrieved immutable descriptors are properly leased and all mutable +// descriptors are handled. The function deals with verifying the two version +// invariant and retrying when it is violated. Callers need not worry that they +// write mutable descriptors multiple times. The call will explicitly wait for +// the leases to drain on old versions of descriptors modified or deleted in the +// transaction; callers do not need to call lease.WaitForOneVersion. +// It also enables using internal executor to run sql queries in a txn manner. +// +// The passed transaction is pre-emptively anchored to the system config key on +// the system tenant. +func (ief *InternalExecutorFactory) DescsTxnWithExecutor( + ctx context.Context, + db *kv.DB, + sd *sessiondata.SessionData, + f descs.TxnWithExecutorFunc, + opts ...sqlutil.TxnOption, +) error { + run := ApplyTxnOptions(db, opts...) + + // Waits for descriptors that were modified, skipping + // over ones that had their descriptor wiped. + waitForDescriptors := func(modifiedDescriptors []lease.IDVersion, deletedDescs catalog.DescriptorIDSet) error { + // Wait for a single version on leased descriptors. + for _, ld := range modifiedDescriptors { + waitForNoVersion := deletedDescs.Contains(ld.ID) + retryOpts := retry.Options{ + InitialBackoff: time.Millisecond, + Multiplier: 1.5, + MaxBackoff: time.Second, + } + // Detect unpublished ones. + if waitForNoVersion { + err := ief.server.cfg.LeaseManager.WaitForNoVersion(ctx, ld.ID, retryOpts) + if err != nil { + return err + } + } else { + _, err := ief.server.cfg.LeaseManager.WaitForOneVersion(ctx, ld.ID, retryOpts) + // If the descriptor has been deleted, just wait for leases to drain. + if errors.Is(err, catalog.ErrDescriptorNotFound) { + err = ief.server.cfg.LeaseManager.WaitForNoVersion(ctx, ld.ID, retryOpts) + } + if err != nil { + return err + } + } + } + return nil + } + + cf := ief.server.cfg.CollectionFactory + for { + var withNewVersion []lease.IDVersion + var deletedDescs catalog.DescriptorIDSet + if err := run(ctx, func(ctx context.Context, txn *kv.Txn) (err error) { + withNewVersion, deletedDescs = nil, catalog.DescriptorIDSet{} + descsCol := cf.NewCollection( + ctx, nil, /* temporarySchemaProvider */ + ief.monitor, + ) + defer descsCol.ReleaseAll(ctx) + ie, commitTxnFn := ief.newInternalExecutorWithTxn(sd, &cf.GetClusterSettings().SV, txn, descsCol) + if err := f(ctx, txn, descsCol, ie); err != nil { + return err + } + deletedDescs = descsCol.GetDeletedDescs() + withNewVersion, err = descsCol.GetOriginalPreviousIDVersionsForUncommitted() + if err != nil { + return err + } + return commitTxnFn(ctx) + }); descs.IsTwoVersionInvariantViolationError(err) { + continue + } else { + if err == nil { + err = waitForDescriptors(withNewVersion, deletedDescs) + } + return err + } + } +} + +// DescsTxn is similar to DescsTxnWithExecutor, but without an internal executor +// involved. +func (ief *InternalExecutorFactory) DescsTxn( + ctx context.Context, + db *kv.DB, + f func(context.Context, *kv.Txn, *descs.Collection) error, + opts ...sqlutil.TxnOption, +) error { + return ief.DescsTxnWithExecutor( + ctx, + db, + nil, /* sessionData */ + func(ctx context.Context, txn *kv.Txn, descriptors *descs.Collection, _ sqlutil.InternalExecutor) error { + return f(ctx, txn, descriptors) + }, + opts..., + ) +} + +// TxnWithExecutor is to run queries with internal executor in a transactional +// manner. +func (ief *InternalExecutorFactory) TxnWithExecutor( + ctx context.Context, + db *kv.DB, + sd *sessiondata.SessionData, + f func(ctx context.Context, txn *kv.Txn, ie sqlutil.InternalExecutor) error, + opts ...sqlutil.TxnOption, +) error { + return ief.DescsTxnWithExecutor( + ctx, + db, + sd, + func(ctx context.Context, txn *kv.Txn, _ *descs.Collection, ie sqlutil.InternalExecutor) error { + return f(ctx, txn, ie) + }, + opts..., + ) +} diff --git a/pkg/sql/schema_change_plan_node.go b/pkg/sql/schema_change_plan_node.go index b21f45741654..fa0eb642cd2e 100644 --- a/pkg/sql/schema_change_plan_node.go +++ b/pkg/sql/schema_change_plan_node.go @@ -132,7 +132,7 @@ func (p *planner) waitForDescriptorSchemaChanges( ) } blocked := false - if err := p.ExecCfg().CollectionFactory.Txn(ctx, p.ExecCfg().DB, func( + if err := p.ExecCfg().InternalExecutorFactory.DescsTxn(ctx, p.ExecCfg().DB, func( ctx context.Context, txn *kv.Txn, descriptors *descs.Collection, ) error { if err := txn.SetFixedTimestamp(ctx, now); err != nil { diff --git a/pkg/sql/schema_changer.go b/pkg/sql/schema_changer.go index ec0f00448719..86322418e13a 100644 --- a/pkg/sql/schema_changer.go +++ b/pkg/sql/schema_changer.go @@ -1345,7 +1345,7 @@ func (sc *SchemaChanger) done(ctx context.Context) error { var didUpdate bool var depMutationJobs []jobspb.JobID var otherJobIDs []jobspb.JobID - err := sc.execCfg.CollectionFactory.Txn(ctx, sc.db, func( + err := sc.execCfg.InternalExecutorFactory.DescsTxn(ctx, sc.db, func( ctx context.Context, txn *kv.Txn, descsCol *descs.Collection, ) error { depMutationJobs = depMutationJobs[:0] @@ -2461,7 +2461,7 @@ func (sc *SchemaChanger) txn( return err } } - return sc.execCfg.CollectionFactory.Txn(ctx, sc.db, f) + return sc.execCfg.InternalExecutorFactory.DescsTxn(ctx, sc.db, f) } // txnWithExecutor is to run internal executor within a txn. @@ -2475,7 +2475,7 @@ func (sc *SchemaChanger) txnWithExecutor( return err } } - return sc.execCfg.CollectionFactory.TxnWithExecutor(ctx, sc.db, sd, f) + return sc.execCfg.InternalExecutorFactory.DescsTxnWithExecutor(ctx, sc.db, sd, f) } // createSchemaChangeEvalCtx creates an extendedEvalContext() to be used for backfills. diff --git a/pkg/sql/schemachanger/scdeps/run_deps.go b/pkg/sql/schemachanger/scdeps/run_deps.go index 47c4bb7a119e..c88db6115f4b 100644 --- a/pkg/sql/schemachanger/scdeps/run_deps.go +++ b/pkg/sql/schemachanger/scdeps/run_deps.go @@ -33,6 +33,7 @@ import ( // given arguments. func NewJobRunDependencies( collectionFactory *descs.CollectionFactory, + ieFactory descs.TxnManager, db *kv.DB, backfiller scexec.Backfiller, merger scexec.Merger, @@ -51,38 +52,40 @@ func NewJobRunDependencies( kvTrace bool, ) scrun.JobRunDependencies { return &jobExecutionDeps{ - collectionFactory: collectionFactory, - db: db, - backfiller: backfiller, - merger: merger, - rangeCounter: rangeCounter, - eventLoggerFactory: eventLoggerFactory, - jobRegistry: jobRegistry, - job: job, - codec: codec, - settings: settings, - testingKnobs: testingKnobs, - statements: statements, - indexValidator: indexValidator, - commentUpdaterFactory: metadataUpdaterFactory, - sessionData: sessionData, - kvTrace: kvTrace, - statsRefresher: statsRefresher, + collectionFactory: collectionFactory, + internalExecutorFactory: ieFactory, + db: db, + backfiller: backfiller, + merger: merger, + rangeCounter: rangeCounter, + eventLoggerFactory: eventLoggerFactory, + jobRegistry: jobRegistry, + job: job, + codec: codec, + settings: settings, + testingKnobs: testingKnobs, + statements: statements, + indexValidator: indexValidator, + commentUpdaterFactory: metadataUpdaterFactory, + sessionData: sessionData, + kvTrace: kvTrace, + statsRefresher: statsRefresher, } } type jobExecutionDeps struct { - collectionFactory *descs.CollectionFactory - db *kv.DB - eventLoggerFactory func(txn *kv.Txn) scexec.EventLogger - statsRefresher scexec.StatsRefresher - backfiller scexec.Backfiller - merger scexec.Merger - commentUpdaterFactory MetadataUpdaterFactory - rangeCounter backfiller.RangeCounter - jobRegistry *jobs.Registry - job *jobs.Job - kvTrace bool + collectionFactory *descs.CollectionFactory + internalExecutorFactory descs.TxnManager + db *kv.DB + eventLoggerFactory func(txn *kv.Txn) scexec.EventLogger + statsRefresher scexec.StatsRefresher + backfiller scexec.Backfiller + merger scexec.Merger + commentUpdaterFactory MetadataUpdaterFactory + rangeCounter backfiller.RangeCounter + jobRegistry *jobs.Registry + job *jobs.Job + kvTrace bool indexValidator scexec.IndexValidator @@ -104,7 +107,7 @@ func (d *jobExecutionDeps) ClusterSettings() *cluster.Settings { func (d *jobExecutionDeps) WithTxnInJob(ctx context.Context, fn scrun.JobTxnFunc) error { var createdJobs []jobspb.JobID var tableStatsToRefresh []descpb.ID - err := d.collectionFactory.Txn(ctx, d.db, func( + err := d.internalExecutorFactory.DescsTxn(ctx, d.db, func( ctx context.Context, txn *kv.Txn, descriptors *descs.Collection, ) error { pl := d.job.Payload() @@ -152,7 +155,7 @@ func (d *jobExecutionDeps) WithTxnInJob(ctx context.Context, fn scrun.JobTxnFunc d.jobRegistry.NotifyToResume(ctx, createdJobs...) } if len(tableStatsToRefresh) > 0 { - err := d.collectionFactory.Txn(ctx, d.db, func( + err := d.internalExecutorFactory.DescsTxn(ctx, d.db, func( ctx context.Context, txn *kv.Txn, descriptors *descs.Collection, ) error { for _, id := range tableStatsToRefresh { diff --git a/pkg/sql/schemachanger/scexec/executor_external_test.go b/pkg/sql/schemachanger/scexec/executor_external_test.go index 305749f1581e..8fe363b87b26 100644 --- a/pkg/sql/schemachanger/scexec/executor_external_test.go +++ b/pkg/sql/schemachanger/scexec/executor_external_test.go @@ -55,6 +55,7 @@ type testInfra struct { lm *lease.Manager tsql *sqlutils.SQLRunner cf *descs.CollectionFactory + ief descs.TxnManager } func (ti testInfra) newExecDeps( @@ -94,6 +95,7 @@ func setupTestInfra(t testing.TB) *testInfra { db: tc.Server(0).DB(), lm: tc.Server(0).LeaseManager().(*lease.Manager), cf: tc.Server(0).ExecutorConfig().(sql.ExecutorConfig).CollectionFactory, + ief: tc.Server(0).ExecutorConfig().(sql.ExecutorConfig).InternalExecutorFactory, tsql: sqlutils.MakeSQLRunner(tc.ServerConn(0)), } } @@ -102,7 +104,7 @@ func (ti *testInfra) txn( ctx context.Context, f func(ctx context.Context, txn *kv.Txn, descriptors *descs.Collection) error, ) error { - return ti.cf.Txn(ctx, ti.db, f) + return ti.ief.DescsTxn(ctx, ti.db, f) } func TestExecutorDescriptorMutationOps(t *testing.T) { diff --git a/pkg/sql/schemachanger/scjob/job.go b/pkg/sql/schemachanger/scjob/job.go index 95b067fd8fc8..23beb8f01369 100644 --- a/pkg/sql/schemachanger/scjob/job.go +++ b/pkg/sql/schemachanger/scjob/job.go @@ -69,6 +69,7 @@ func (n *newSchemaChangeResumer) run(ctx context.Context, execCtxI interface{}) payload := n.job.Payload() deps := scdeps.NewJobRunDependencies( execCfg.CollectionFactory, + execCfg.InternalExecutorFactory, execCfg.DB, execCfg.IndexBackfiller, execCfg.IndexMerger, diff --git a/pkg/sql/sessioninit/BUILD.bazel b/pkg/sql/sessioninit/BUILD.bazel index bdf05954d87f..d2a379ba1a74 100644 --- a/pkg/sql/sessioninit/BUILD.bazel +++ b/pkg/sql/sessioninit/BUILD.bazel @@ -19,7 +19,6 @@ go_library( "//pkg/sql/catalog/descpb", "//pkg/sql/catalog/descs", "//pkg/sql/sem/tree", - "//pkg/sql/sqlutil", "//pkg/util/log", "//pkg/util/mon", "//pkg/util/stop", @@ -48,7 +47,6 @@ go_test( "//pkg/sql/catalog/descpb", "//pkg/sql/catalog/descs", "//pkg/sql/sessiondatapb", - "//pkg/sql/sqlutil", "//pkg/testutils/serverutils", "//pkg/testutils/sqlutils", "//pkg/testutils/testcluster", diff --git a/pkg/sql/sessioninit/cache.go b/pkg/sql/sessioninit/cache.go index a0b61d314fe8..7c0d48f4fc1d 100644 --- a/pkg/sql/sessioninit/cache.go +++ b/pkg/sql/sessioninit/cache.go @@ -24,7 +24,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descs" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" - "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/mon" "github.com/cockroachdb/cockroach/pkg/util/stop" @@ -106,13 +105,12 @@ func NewCache(account mon.BoundAccount, stopper *stop.Stopper) *Cache { func (a *Cache) GetAuthInfo( ctx context.Context, settings *cluster.Settings, - ie sqlutil.InternalExecutor, db *kv.DB, - f *descs.CollectionFactory, + f descs.TxnManager, username username.SQLUsername, readFromSystemTables func( ctx context.Context, - ie sqlutil.InternalExecutor, + f descs.TxnManager, username username.SQLUsername, makePlanner func(opName string) (interface{}, func()), settings *cluster.Settings, @@ -120,12 +118,12 @@ func (a *Cache) GetAuthInfo( makePlanner func(opName string) (interface{}, func()), ) (aInfo AuthInfo, err error) { if !CacheEnabled.Get(&settings.SV) { - return readFromSystemTables(ctx, ie, username, makePlanner, settings) + return readFromSystemTables(ctx, f, username, makePlanner, settings) } var usersTableDesc catalog.TableDescriptor var roleOptionsTableDesc catalog.TableDescriptor - err = f.Txn(ctx, db, func( + err = f.DescsTxn(ctx, db, func( ctx context.Context, txn *kv.Txn, descriptors *descs.Collection, ) error { _, usersTableDesc, err = descriptors.GetImmutableTableByName( @@ -167,7 +165,7 @@ func (a *Cache) GetAuthInfo( val, err := a.loadValueOutsideOfCache( ctx, fmt.Sprintf("authinfo-%s-%d-%d", username.Normalized(), usersTableVersion, roleOptionsTableVersion), func(loadCtx context.Context) (interface{}, error) { - return readFromSystemTables(loadCtx, ie, username, makePlanner, settings) + return readFromSystemTables(loadCtx, f, username, makePlanner, settings) }) if err != nil { return aInfo, err @@ -283,21 +281,20 @@ func (a *Cache) maybeWriteAuthInfoBackToCache( func (a *Cache) GetDefaultSettings( ctx context.Context, settings *cluster.Settings, - ie sqlutil.InternalExecutor, db *kv.DB, - f *descs.CollectionFactory, + f descs.TxnManager, userName username.SQLUsername, databaseName string, readFromSystemTables func( ctx context.Context, - ie sqlutil.InternalExecutor, + f descs.TxnManager, userName username.SQLUsername, databaseID descpb.ID, ) ([]SettingsCacheEntry, error), ) (settingsEntries []SettingsCacheEntry, err error) { var dbRoleSettingsTableDesc catalog.TableDescriptor var databaseID descpb.ID - err = f.Txn(ctx, db, func( + err = f.DescsTxn(ctx, db, func( ctx context.Context, txn *kv.Txn, descriptors *descs.Collection, ) error { _, dbRoleSettingsTableDesc, err = descriptors.GetImmutableTableByName( @@ -333,7 +330,7 @@ func (a *Cache) GetDefaultSettings( if !CacheEnabled.Get(&settings.SV) { settingsEntries, err = readFromSystemTables( ctx, - ie, + f, userName, databaseID, ) @@ -357,7 +354,7 @@ func (a *Cache) GetDefaultSettings( val, err := a.loadValueOutsideOfCache( ctx, fmt.Sprintf("defaultsettings-%s-%d-%d", userName.Normalized(), databaseID, dbRoleSettingsTableVersion), func(loadCtx context.Context) (interface{}, error) { - return readFromSystemTables(loadCtx, ie, userName, databaseID) + return readFromSystemTables(loadCtx, f, userName, databaseID) }, ) if err != nil { diff --git a/pkg/sql/sessioninit/cache_test.go b/pkg/sql/sessioninit/cache_test.go index 337b5095f5ae..c9d344fa9882 100644 --- a/pkg/sql/sessioninit/cache_test.go +++ b/pkg/sql/sessioninit/cache_test.go @@ -25,7 +25,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/catalog/descs" "github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb" "github.com/cockroachdb/cockroach/pkg/sql/sessioninit" - "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/util/leaktest" @@ -64,12 +63,11 @@ func TestCacheInvalidation(t *testing.T) { settings, err := execCfg.SessionInitCache.GetDefaultSettings( ctx, s.ClusterSettings(), - s.InternalExecutor().(sqlutil.InternalExecutor), s.DB(), - s.CollectionFactory().(*descs.CollectionFactory), + s.InternalExecutorFactory().(*sql.InternalExecutorFactory), username.TestUserName(), "defaultdb", - func(ctx context.Context, ie sqlutil.InternalExecutor, userName username.SQLUsername, databaseID descpb.ID) ([]sessioninit.SettingsCacheEntry, error) { + func(ctx context.Context, ief descs.TxnManager, userName username.SQLUsername, databaseID descpb.ID) ([]sessioninit.SettingsCacheEntry, error) { didReadFromSystemTable = true return nil, nil }) @@ -91,11 +89,10 @@ func TestCacheInvalidation(t *testing.T) { aInfo, err := execCfg.SessionInitCache.GetAuthInfo( ctx, settings, - s.InternalExecutor().(sqlutil.InternalExecutor), s.DB(), - s.CollectionFactory().(*descs.CollectionFactory), + s.InternalExecutorFactory().(*sql.InternalExecutorFactory), username.TestUserName(), - func(ctx context.Context, ie sqlutil.InternalExecutor, userName username.SQLUsername, makePlanner func(opName string) (interface{}, func()), settings *cluster.Settings) (sessioninit.AuthInfo, error) { + func(ctx context.Context, f descs.TxnManager, userName username.SQLUsername, makePlanner func(opName string) (interface{}, func()), settings *cluster.Settings) (sessioninit.AuthInfo, error) { didReadFromSystemTable = true return sessioninit.AuthInfo{}, nil }, @@ -218,7 +215,6 @@ func TestCacheSingleFlight(t *testing.T) { defer s.Stopper().Stop(ctx) execCfg := s.ExecutorConfig().(sql.ExecutorConfig) settings := s.ExecutorConfig().(sql.ExecutorConfig).Settings - ie := s.InternalExecutor().(sqlutil.InternalExecutor) c := s.ExecutorConfig().(sql.ExecutorConfig).SessionInitCache testuser := username.MakeSQLUsernameFromPreNormalizedString("test") @@ -247,9 +243,9 @@ func TestCacheSingleFlight(t *testing.T) { go func() { didReadFromSystemTable := false - _, err := c.GetAuthInfo(ctx, settings, ie, s.DB(), s.ExecutorConfig().(sql.ExecutorConfig).CollectionFactory, testuser, func( + _, err := c.GetAuthInfo(ctx, settings, s.DB(), s.ExecutorConfig().(sql.ExecutorConfig).InternalExecutorFactory, testuser, func( ctx context.Context, - ie sqlutil.InternalExecutor, + f descs.TxnManager, userName username.SQLUsername, makePlanner func(opName string) (interface{}, func()), settings *cluster.Settings, @@ -274,9 +270,9 @@ func TestCacheSingleFlight(t *testing.T) { for i := 0; i < 2; i++ { go func() { didReadFromSystemTable := false - _, err := c.GetAuthInfo(ctx, settings, ie, s.DB(), s.ExecutorConfig().(sql.ExecutorConfig).CollectionFactory, testuser, func( + _, err := c.GetAuthInfo(ctx, settings, s.DB(), s.ExecutorConfig().(sql.ExecutorConfig).InternalExecutorFactory, testuser, func( ctx context.Context, - ie sqlutil.InternalExecutor, + f descs.TxnManager, userName username.SQLUsername, makePlanner func(opName string) (interface{}, func()), settings *cluster.Settings, @@ -298,9 +294,9 @@ func TestCacheSingleFlight(t *testing.T) { // GetAuthInfo should not be using the cache since it is outdated. didReadFromSystemTable := false - _, err = c.GetAuthInfo(ctx, settings, ie, s.DB(), s.ExecutorConfig().(sql.ExecutorConfig).CollectionFactory, testuser, func( + _, err = c.GetAuthInfo(ctx, settings, s.DB(), s.ExecutorConfig().(sql.ExecutorConfig).InternalExecutorFactory, testuser, func( ctx context.Context, - ie sqlutil.InternalExecutor, + f descs.TxnManager, userName username.SQLUsername, makePlanner func(opName string) (interface{}, func()), settings *cluster.Settings, diff --git a/pkg/sql/sqlutil/internal_executor.go b/pkg/sql/sqlutil/internal_executor.go index 06f334b55072..8e41f04e3e45 100644 --- a/pkg/sql/sqlutil/internal_executor.go +++ b/pkg/sql/sqlutil/internal_executor.go @@ -217,6 +217,19 @@ type InternalExecutorFactory interface { // RunWithoutTxn is to create an internal executor without binding to a txn, // and run the passed function with this internal executor. RunWithoutTxn(ctx context.Context, run func(ctx context.Context, ie InternalExecutor) error) error + + // TxnWithExecutor enables callers to run transactions with a *Collection such that all + // retrieved immutable descriptors are properly leased and all mutable + // descriptors are handled. The function deals with verifying the two version + // invariant and retrying when it is violated. Callers need not worry that they + // write mutable descriptors multiple times. The call will explicitly wait for + // the leases to drain on old versions of descriptors modified or deleted in the + // transaction; callers do not need to call lease.WaitForOneVersion. + // It also enables using internal executor to run sql queries in a txn manner. + // + // The passed transaction is pre-emptively anchored to the system config key on + // the system tenant. + TxnWithExecutor(context.Context, *kv.DB, *sessiondata.SessionData, func(context.Context, *kv.Txn, InternalExecutor) error, ...TxnOption) error } // InternalExecFn is the type of functions that operates using an internalExecutor. @@ -226,3 +239,37 @@ type InternalExecFn func(ctx context.Context, txn *kv.Txn, ie InternalExecutor) // passes the fn the exported InternalExecutor instead of the whole unexported // extendedEvalContenxt, so it can be implemented outside pkg/sql. type HistoricalInternalExecTxnRunner func(ctx context.Context, fn InternalExecFn) error + +// TxnOption is used to configure a Txn or TxnWithExecutor. +type TxnOption interface { + Apply(*TxnConfig) +} + +// TxnConfig is the config to be set for txn. +type TxnConfig struct { + steppingEnabled bool +} + +// GetSteppingEnabled return the steppingEnabled setting from the txn config. +func (tc *TxnConfig) GetSteppingEnabled() bool { + return tc.steppingEnabled +} + +type txnOptionFn func(options *TxnConfig) + +// Apply is to apply the txn config. +func (f txnOptionFn) Apply(options *TxnConfig) { f(options) } + +var steppingEnabled = txnOptionFn(func(o *TxnConfig) { + o.steppingEnabled = true +}) + +// SteppingEnabled creates a TxnOption to determine whether the underlying +// transaction should have stepping enabled. If stepping is enabled, the +// transaction will implicitly use lower admission priority. However, the +// user will need to remember to Step the Txn to make writes visible. The +// InternalExecutor will automatically (for better or for worse) step the +// transaction when executing each statement. +func SteppingEnabled() TxnOption { + return steppingEnabled +} diff --git a/pkg/sql/stats/automatic_stats.go b/pkg/sql/stats/automatic_stats.go index 3df4bc30cf51..fa8105453f5f 100644 --- a/pkg/sql/stats/automatic_stats.go +++ b/pkg/sql/stats/automatic_stats.go @@ -351,7 +351,7 @@ func (r *Refresher) autoStatsFractionStaleRows(explicitSettings *catpb.AutoStats func (r *Refresher) getTableDescriptor( ctx context.Context, tableID descpb.ID, ) (desc catalog.TableDescriptor) { - if err := r.cache.collectionFactory.Txn(ctx, r.cache.ClientDB, func( + if err := r.cache.internalExecutorFactory.DescsTxn(ctx, r.cache.ClientDB, func( ctx context.Context, txn *kv.Txn, descriptors *descs.Collection, ) (err error) { flags := tree.ObjectLookupFlagsWithRequired() diff --git a/pkg/sql/stats/automatic_stats_test.go b/pkg/sql/stats/automatic_stats_test.go index 1a2265559168..58d7c883666b 100644 --- a/pkg/sql/stats/automatic_stats_test.go +++ b/pkg/sql/stats/automatic_stats_test.go @@ -73,7 +73,7 @@ func TestMaybeRefreshStats(t *testing.T) { kvDB, executor, s.ClusterSettings(), - s.CollectionFactory().(*descs.CollectionFactory), + s.InternalExecutorFactory().(descs.TxnManager), ) require.NoError(t, cache.Start(ctx, keys.SystemSQLCodec, s.RangeFeedFactory().(*rangefeed.Factory))) refresher := MakeRefresher(s.AmbientCtx(), st, executor, cache, time.Microsecond /* asOfTime */) @@ -200,7 +200,7 @@ func TestEnsureAllTablesQueries(t *testing.T) { kvDB, executor, s.ClusterSettings(), - s.CollectionFactory().(*descs.CollectionFactory), + s.InternalExecutorFactory().(descs.TxnManager), ) require.NoError(t, cache.Start(ctx, keys.SystemSQLCodec, s.RangeFeedFactory().(*rangefeed.Factory))) r := MakeRefresher(s.AmbientCtx(), st, executor, cache, time.Microsecond /* asOfTime */) @@ -306,7 +306,7 @@ func TestAverageRefreshTime(t *testing.T) { kvDB, executor, s.ClusterSettings(), - s.CollectionFactory().(*descs.CollectionFactory), + s.InternalExecutorFactory().(descs.TxnManager), ) require.NoError(t, cache.Start(ctx, keys.SystemSQLCodec, s.RangeFeedFactory().(*rangefeed.Factory))) refresher := MakeRefresher(s.AmbientCtx(), st, executor, cache, time.Microsecond /* asOfTime */) @@ -554,7 +554,7 @@ func TestAutoStatsReadOnlyTables(t *testing.T) { kvDB, executor, s.ClusterSettings(), - s.CollectionFactory().(*descs.CollectionFactory), + s.InternalExecutorFactory().(descs.TxnManager), ) require.NoError(t, cache.Start(ctx, keys.SystemSQLCodec, s.RangeFeedFactory().(*rangefeed.Factory))) refresher := MakeRefresher(s.AmbientCtx(), st, executor, cache, time.Microsecond /* asOfTime */) @@ -611,7 +611,7 @@ func TestAutoStatsOnStartupClusterSettingOff(t *testing.T) { kvDB, executor, s.ClusterSettings(), - s.CollectionFactory().(*descs.CollectionFactory), + s.InternalExecutorFactory().(descs.TxnManager), ) require.NoError(t, cache.Start(ctx, keys.SystemSQLCodec, s.RangeFeedFactory().(*rangefeed.Factory))) refresher := MakeRefresher(s.AmbientCtx(), st, executor, cache, time.Microsecond /* asOfTime */) @@ -659,7 +659,7 @@ func TestNoRetryOnFailure(t *testing.T) { kvDB, executor, s.ClusterSettings(), - s.CollectionFactory().(*descs.CollectionFactory), + s.InternalExecutorFactory().(descs.TxnManager), ) require.NoError(t, cache.Start(ctx, keys.SystemSQLCodec, s.RangeFeedFactory().(*rangefeed.Factory))) r := MakeRefresher(s.AmbientCtx(), st, executor, cache, time.Microsecond /* asOfTime */) @@ -776,7 +776,7 @@ func TestAnalyzeSystemTables(t *testing.T) { kvDB, executor, s.ClusterSettings(), - s.CollectionFactory().(*descs.CollectionFactory), + s.InternalExecutorFactory().(descs.TxnManager), ) require.NoError(t, cache.Start(ctx, keys.SystemSQLCodec, s.RangeFeedFactory().(*rangefeed.Factory))) var tableNames []string diff --git a/pkg/sql/stats/delete_stats_test.go b/pkg/sql/stats/delete_stats_test.go index 6dc2955e9de4..2925ad405df1 100644 --- a/pkg/sql/stats/delete_stats_test.go +++ b/pkg/sql/stats/delete_stats_test.go @@ -46,7 +46,7 @@ func TestDeleteOldStatsForColumns(t *testing.T) { db, ex, s.ClusterSettings(), - s.CollectionFactory().(*descs.CollectionFactory), + s.InternalExecutorFactory().(descs.TxnManager), ) require.NoError(t, cache.Start(ctx, keys.SystemSQLCodec, s.RangeFeedFactory().(*rangefeed.Factory))) @@ -343,7 +343,7 @@ func TestDeleteOldStatsForOtherColumns(t *testing.T) { db, ex, s.ClusterSettings(), - s.CollectionFactory().(*descs.CollectionFactory), + s.InternalExecutorFactory().(descs.TxnManager), ) require.NoError(t, cache.Start(ctx, keys.SystemSQLCodec, s.RangeFeedFactory().(*rangefeed.Factory))) testData := []TableStatisticProto{ diff --git a/pkg/sql/stats/stats_cache.go b/pkg/sql/stats/stats_cache.go index 74e66d38c6b9..85cce35b958b 100644 --- a/pkg/sql/stats/stats_cache.go +++ b/pkg/sql/stats/stats_cache.go @@ -72,8 +72,7 @@ type TableStatisticsCache struct { SQLExecutor sqlutil.InternalExecutor Settings *cluster.Settings - // Used to resolve descriptors. - collectionFactory *descs.CollectionFactory + internalExecutorFactory descs.TxnManager // Used when decoding KV from the range feed. datumAlloc tree.DatumAlloc @@ -121,13 +120,13 @@ func NewTableStatisticsCache( db *kv.DB, sqlExecutor sqlutil.InternalExecutor, settings *cluster.Settings, - cf *descs.CollectionFactory, + ief descs.TxnManager, ) *TableStatisticsCache { tableStatsCache := &TableStatisticsCache{ - ClientDB: db, - SQLExecutor: sqlExecutor, - Settings: settings, - collectionFactory: cf, + ClientDB: db, + SQLExecutor: sqlExecutor, + Settings: settings, + internalExecutorFactory: ief, } tableStatsCache.mu.cache = cache.NewUnorderedCache(cache.Config{ Policy: cache.CacheLRU, @@ -602,7 +601,7 @@ func (sc *TableStatisticsCache) parseStats( // TypeDescriptor's with the timestamp that the stats were recorded with. // // TODO(ajwerner): We now do delete members from enum types. See #67050. - if err := sc.collectionFactory.Txn(ctx, sc.ClientDB, func( + if err := sc.internalExecutorFactory.DescsTxn(ctx, sc.ClientDB, func( ctx context.Context, txn *kv.Txn, descriptors *descs.Collection, ) error { resolver := descs.NewDistSQLTypeResolver(descriptors, txn) diff --git a/pkg/sql/stats/stats_cache_test.go b/pkg/sql/stats/stats_cache_test.go index d0a272b800e1..ddc0577b87a0 100644 --- a/pkg/sql/stats/stats_cache_test.go +++ b/pkg/sql/stats/stats_cache_test.go @@ -245,7 +245,7 @@ func TestCacheBasic(t *testing.T) { db, ex, s.ClusterSettings(), - s.CollectionFactory().(*descs.CollectionFactory), + s.InternalExecutorFactory().(descs.TxnManager), ) require.NoError(t, sc.Start(ctx, keys.SystemSQLCodec, s.RangeFeedFactory().(*rangefeed.Factory))) for _, tableID := range tableIDs { @@ -351,7 +351,7 @@ func TestCacheUserDefinedTypes(t *testing.T) { kvDB, s.InternalExecutor().(sqlutil.InternalExecutor), s.ClusterSettings(), - s.CollectionFactory().(*descs.CollectionFactory), + s.InternalExecutorFactory().(descs.TxnManager), ) require.NoError(t, sc.Start(ctx, keys.SystemSQLCodec, s.RangeFeedFactory().(*rangefeed.Factory))) tbl := desctestutils.TestingGetPublicTableDescriptor(kvDB, keys.SystemSQLCodec, "t", "tt") @@ -409,7 +409,7 @@ func TestCacheWait(t *testing.T) { db, ex, s.ClusterSettings(), - s.CollectionFactory().(*descs.CollectionFactory), + s.InternalExecutorFactory().(descs.TxnManager), ) require.NoError(t, sc.Start(ctx, keys.SystemSQLCodec, s.RangeFeedFactory().(*rangefeed.Factory))) for _, tableID := range tableIDs { @@ -464,7 +464,7 @@ func TestCacheAutoRefresh(t *testing.T) { s.DB(), s.InternalExecutor().(sqlutil.InternalExecutor), s.ClusterSettings(), - s.CollectionFactory().(*descs.CollectionFactory), + s.InternalExecutorFactory().(descs.TxnManager), ) require.NoError(t, sc.Start(ctx, keys.SystemSQLCodec, s.RangeFeedFactory().(*rangefeed.Factory))) diff --git a/pkg/sql/temporary_schema.go b/pkg/sql/temporary_schema.go index 0bf2615bf43e..c80cb3332e1f 100644 --- a/pkg/sql/temporary_schema.go +++ b/pkg/sql/temporary_schema.go @@ -169,45 +169,46 @@ func temporarySchemaSessionID(scName string) (bool, clusterunique.ID, error) { func cleanupSessionTempObjects( ctx context.Context, settings *cluster.Settings, - cf *descs.CollectionFactory, + ief sqlutil.InternalExecutorFactory, db *kv.DB, codec keys.SQLCodec, sessionID clusterunique.ID, ) error { tempSchemaName := temporarySchemaName(sessionID) - return cf.TxnWithExecutor(ctx, db, nil /* sessionData */, func( - ctx context.Context, txn *kv.Txn, descsCol *descs.Collection, - ie sqlutil.InternalExecutor, - ) error { - // We are going to read all database descriptor IDs, then for each database - // we will drop all the objects under the temporary schema. - allDbDescs, err := descsCol.GetAllDatabaseDescriptors(ctx, txn) - if err != nil { - return err - } - for _, dbDesc := range allDbDescs { - if err := cleanupSchemaObjects( - ctx, - txn, - descsCol, - codec, - ie, - dbDesc, - tempSchemaName, - ); err != nil { + return ief.(descs.TxnManager).DescsTxnWithExecutor( + ctx, db, nil /* sessionData */, func( + ctx context.Context, txn *kv.Txn, descsCol *descs.Collection, + ie sqlutil.InternalExecutor, + ) error { + // We are going to read all database descriptor IDs, then for each database + // we will drop all the objects under the temporary schema. + allDbDescs, err := descsCol.GetAllDatabaseDescriptors(ctx, txn) + if err != nil { return err } - // Even if no objects were found under the temporary schema, the schema - // itself may still exist (eg. a temporary table was created and then - // dropped). So we remove the namespace table entry of the temporary - // schema. - key := catalogkeys.MakeSchemaNameKey(codec, dbDesc.GetID(), tempSchemaName) - if _, err := txn.Del(ctx, key); err != nil { - return err + for _, dbDesc := range allDbDescs { + if err := cleanupSchemaObjects( + ctx, + txn, + descsCol, + codec, + ie, + dbDesc, + tempSchemaName, + ); err != nil { + return err + } + // Even if no objects were found under the temporary schema, the schema + // itself may still exist (eg. a temporary table was created and then + // dropped). So we remove the namespace table entry of the temporary + // schema. + key := catalogkeys.MakeSchemaNameKey(codec, dbDesc.GetID(), tempSchemaName) + if _, err := txn.Del(ctx, key); err != nil { + return err + } } - } - return nil - }) + return nil + }) } // cleanupSchemaObjects removes all objects that is located within a dbID and schema. @@ -400,11 +401,12 @@ type TemporaryObjectCleaner struct { db *kv.DB codec keys.SQLCodec // statusServer gives access to the SQLStatus service. - statusServer serverpb.SQLStatusServer - isMeta1LeaseholderFunc isMeta1LeaseholderFunc - testingKnobs ExecutorTestingKnobs - metrics *temporaryObjectCleanerMetrics - collectionFactory *descs.CollectionFactory + statusServer serverpb.SQLStatusServer + isMeta1LeaseholderFunc isMeta1LeaseholderFunc + testingKnobs ExecutorTestingKnobs + metrics *temporaryObjectCleanerMetrics + collectionFactory *descs.CollectionFactory + internalExecutorFactory sqlutil.InternalExecutorFactory } // temporaryObjectCleanerMetrics are the metrics for TemporaryObjectCleaner @@ -430,19 +432,21 @@ func NewTemporaryObjectCleaner( statusServer serverpb.SQLStatusServer, isMeta1LeaseholderFunc isMeta1LeaseholderFunc, testingKnobs ExecutorTestingKnobs, + ief sqlutil.InternalExecutorFactory, cf *descs.CollectionFactory, ) *TemporaryObjectCleaner { metrics := makeTemporaryObjectCleanerMetrics() registry.AddMetricStruct(metrics) return &TemporaryObjectCleaner{ - settings: settings, - db: db, - codec: codec, - statusServer: statusServer, - isMeta1LeaseholderFunc: isMeta1LeaseholderFunc, - testingKnobs: testingKnobs, - metrics: metrics, - collectionFactory: cf, + settings: settings, + db: db, + codec: codec, + statusServer: statusServer, + isMeta1LeaseholderFunc: isMeta1LeaseholderFunc, + testingKnobs: testingKnobs, + metrics: metrics, + internalExecutorFactory: ief, + collectionFactory: cf, } } @@ -590,7 +594,7 @@ func (c *TemporaryObjectCleaner) doTemporaryObjectCleanup( return cleanupSessionTempObjects( ctx, c.settings, - c.collectionFactory, + c.internalExecutorFactory, c.db, c.codec, sessionID, diff --git a/pkg/sql/temporary_schema_test.go b/pkg/sql/temporary_schema_test.go index fa45359fc465..f03c968bcc9d 100644 --- a/pkg/sql/temporary_schema_test.go +++ b/pkg/sql/temporary_schema_test.go @@ -96,8 +96,8 @@ INSERT INTO perm_table VALUES (DEFAULT, 1); require.NoError(t, rows.Close()) } execCfg := s.ExecutorConfig().(ExecutorConfig) - cf := execCfg.CollectionFactory - require.NoError(t, cf.TxnWithExecutor(ctx, kvDB, nil /* sessionData */, func( + ief := execCfg.InternalExecutorFactory + require.NoError(t, ief.DescsTxnWithExecutor(ctx, kvDB, nil /* sessionData */, func( ctx context.Context, txn *kv.Txn, descsCol *descs.Collection, ie sqlutil.InternalExecutor, ) error { diff --git a/pkg/sql/user.go b/pkg/sql/user.go index 1720839cfced..ec5b53105b7a 100644 --- a/pkg/sql/user.go +++ b/pkg/sql/user.go @@ -100,7 +100,7 @@ func GetUserSessionInitInfo( // necessary. rootFn := func(ctx context.Context) (expired bool, ret password.PasswordHash, err error) { err = runFn(ctx, func(ctx context.Context) error { - authInfo, _, err := retrieveSessionInitInfoWithCache(ctx, execCfg, ie, user, databaseName) + authInfo, _, err := retrieveSessionInitInfoWithCache(ctx, execCfg, user, databaseName) if err != nil { return err } @@ -126,14 +126,14 @@ func GetUserSessionInitInfo( // Other users must reach for system.users no matter what, because // only that contains the truth about whether the user exists. authInfo, settingsEntries, err = retrieveSessionInitInfoWithCache( - ctx, execCfg, ie, user, databaseName, + ctx, execCfg, user, databaseName, ) if err != nil { return err } // Find whether the user is an admin. - return execCfg.CollectionFactory.Txn(ctx, execCfg.DB, func( + return execCfg.InternalExecutorFactory.DescsTxn(ctx, execCfg.DB, func( ctx context.Context, txn *kv.Txn, descsCol *descs.Collection, ) error { memberships, err := MemberOfWithAdminOption( @@ -203,11 +203,7 @@ func getUserInfoRunFn( } func retrieveSessionInitInfoWithCache( - ctx context.Context, - execCfg *ExecutorConfig, - ie *InternalExecutor, - userName username.SQLUsername, - databaseName string, + ctx context.Context, execCfg *ExecutorConfig, userName username.SQLUsername, databaseName string, ) (aInfo sessioninit.AuthInfo, settingsEntries []sessioninit.SettingsCacheEntry, err error) { if err = func() (retErr error) { makePlanner := func(opName string) (interface{}, func()) { @@ -223,9 +219,8 @@ func retrieveSessionInitInfoWithCache( aInfo, retErr = execCfg.SessionInitCache.GetAuthInfo( ctx, execCfg.Settings, - ie, execCfg.DB, - execCfg.CollectionFactory, + execCfg.InternalExecutorFactory, userName, retrieveAuthInfo, makePlanner, @@ -240,9 +235,8 @@ func retrieveSessionInitInfoWithCache( settingsEntries, retErr = execCfg.SessionInitCache.GetDefaultSettings( ctx, execCfg.Settings, - ie, execCfg.DB, - execCfg.CollectionFactory, + execCfg.InternalExecutorFactory, userName, databaseName, retrieveDefaultSettings, @@ -258,7 +252,7 @@ func retrieveSessionInitInfoWithCache( func retrieveAuthInfo( ctx context.Context, - ie sqlutil.InternalExecutor, + f descs.TxnManager, user username.SQLUsername, makePlanner func(opName string) (interface{}, func()), settings *cluster.Settings, @@ -268,10 +262,16 @@ func retrieveAuthInfo( // we should always look up the latest data. const getHashedPassword = `SELECT "hashedPassword" FROM system.public.users ` + `WHERE username=$1` - values, err := ie.QueryRowEx( - ctx, "get-hashed-pwd", nil, /* txn */ - sessiondata.InternalExecutorOverride{User: username.RootUserName()}, - getHashedPassword, user) + var values tree.Datums + var err error + _ = f.RunWithoutTxn(ctx, func(ctx context.Context, ie sqlutil.InternalExecutor) error { + values, err = ie.QueryRowEx( + ctx, "get-hashed-pwd", nil, /* txn */ + sessiondata.InternalExecutorOverride{User: username.RootUserName()}, + getHashedPassword, user) + return err + }) + if err != nil { return aInfo, errors.Wrapf(err, "error looking up user %s", user) } @@ -297,12 +297,17 @@ func retrieveAuthInfo( const getLoginDependencies = `SELECT option, value FROM system.public.role_options ` + `WHERE username=$1 AND option IN ('NOLOGIN', 'VALID UNTIL', 'NOSQLLOGIN')` - roleOptsIt, err := ie.QueryIteratorEx( - ctx, "get-login-dependencies", nil, /* txn */ - sessiondata.InternalExecutorOverride{User: username.RootUserName()}, - getLoginDependencies, - user, - ) + var roleOptsIt sqlutil.InternalRows + _ = f.RunWithoutTxn(ctx, func(ctx context.Context, ie sqlutil.InternalExecutor) error { + roleOptsIt, err = ie.QueryIteratorEx( + ctx, "get-login-dependencies", nil, /* txn */ + sessiondata.InternalExecutorOverride{User: username.RootUserName()}, + getLoginDependencies, + user, + ) + return err + }) + if err != nil { return aInfo, errors.Wrapf(err, "error looking up user %s", user) } @@ -366,7 +371,7 @@ func retrieveAuthInfo( } func retrieveDefaultSettings( - ctx context.Context, ie sqlutil.InternalExecutor, user username.SQLUsername, databaseID descpb.ID, + ctx context.Context, f descs.TxnManager, user username.SQLUsername, databaseID descpb.ID, ) (settingsEntries []sessioninit.SettingsCacheEntry, retErr error) { // Add an empty slice for all the keys so that something gets cached and // prevents a lookup for the same key from happening later. @@ -398,13 +403,19 @@ WHERE ` // We use a nil txn as role settings are not tied to any transaction state, // and we should always look up the latest data. - defaultSettingsIt, err := ie.QueryIteratorEx( - ctx, "get-default-settings", nil, /* txn */ - sessiondata.InternalExecutorOverride{User: username.RootUserName()}, - getDefaultSettings, - user, - databaseID, - ) + var defaultSettingsIt sqlutil.InternalRows + var err error + _ = f.RunWithoutTxn(ctx, func(ctx context.Context, ie sqlutil.InternalExecutor) error { + defaultSettingsIt, err = ie.QueryIteratorEx( + ctx, "get-default-settings", nil, /* txn */ + sessiondata.InternalExecutorOverride{User: username.RootUserName()}, + getDefaultSettings, + user, + databaseID, + ) + return err + }) + if err != nil { return nil, errors.Wrapf(err, "error looking up user %s", user) } diff --git a/pkg/testutils/serverutils/test_server_shim.go b/pkg/testutils/serverutils/test_server_shim.go index 72900afae110..9f7aa32f21f7 100644 --- a/pkg/testutils/serverutils/test_server_shim.go +++ b/pkg/testutils/serverutils/test_server_shim.go @@ -146,6 +146,10 @@ type TestServerInterface interface { // also implements sqlutil.InternalExecutor if the test cannot depend on sql). InternalExecutor() interface{} + // InternalExecutorInternalExecutorFactory returns a + // sqlutil.InternalExecutorFactory as an interface{}. + InternalExecutorFactory() interface{} + // TracerI returns a *tracing.Tracer as an interface{}. TracerI() interface{} diff --git a/pkg/upgrade/tenant_upgrade.go b/pkg/upgrade/tenant_upgrade.go index f6ef9c1a80c7..ad9045314032 100644 --- a/pkg/upgrade/tenant_upgrade.go +++ b/pkg/upgrade/tenant_upgrade.go @@ -32,14 +32,15 @@ import ( // TenantDeps are the dependencies of upgrades which perform actions at the // SQL layer. type TenantDeps struct { - DB *kv.DB - Codec keys.SQLCodec - Settings *cluster.Settings - CollectionFactory *descs.CollectionFactory - LeaseManager *lease.Manager - JobRegistry *jobs.Registry - InternalExecutor sqlutil.InternalExecutor - SessionData *sessiondata.SessionData + DB *kv.DB + Codec keys.SQLCodec + Settings *cluster.Settings + CollectionFactory *descs.CollectionFactory + InternalExecutorFactory descs.TxnManager + LeaseManager *lease.Manager + JobRegistry *jobs.Registry + InternalExecutor sqlutil.InternalExecutor + SessionData *sessiondata.SessionData SpanConfig struct { // deps for span config upgrades; can be removed accordingly spanconfig.KVAccessor diff --git a/pkg/upgrade/upgradejob/upgrade_job.go b/pkg/upgrade/upgradejob/upgrade_job.go index 902e6339ad69..bbf8291a06f1 100644 --- a/pkg/upgrade/upgradejob/upgrade_job.go +++ b/pkg/upgrade/upgradejob/upgrade_job.go @@ -89,15 +89,16 @@ func (r resumer) Resume(ctx context.Context, execCtxI interface{}) error { err = m.Run(ctx, cv, mc.SystemDeps(), r.j) case *upgrade.TenantUpgrade: tenantDeps := upgrade.TenantDeps{ - DB: execCtx.ExecCfg().DB, - Codec: execCtx.ExecCfg().Codec, - Settings: execCtx.ExecCfg().Settings, - CollectionFactory: execCtx.ExecCfg().CollectionFactory, - LeaseManager: execCtx.ExecCfg().LeaseManager, - InternalExecutor: execCtx.ExecCfg().InternalExecutor, - JobRegistry: execCtx.ExecCfg().JobRegistry, - TestingKnobs: execCtx.ExecCfg().UpgradeTestingKnobs, - SessionData: execCtx.SessionData(), + DB: execCtx.ExecCfg().DB, + Codec: execCtx.ExecCfg().Codec, + Settings: execCtx.ExecCfg().Settings, + CollectionFactory: execCtx.ExecCfg().CollectionFactory, + InternalExecutorFactory: execCtx.ExecCfg().InternalExecutorFactory, + LeaseManager: execCtx.ExecCfg().LeaseManager, + InternalExecutor: execCtx.ExecCfg().InternalExecutor, + JobRegistry: execCtx.ExecCfg().JobRegistry, + TestingKnobs: execCtx.ExecCfg().UpgradeTestingKnobs, + SessionData: execCtx.SessionData(), } tenantDeps.SpanConfig.KVAccessor = execCtx.ExecCfg().SpanConfigKVAccessor tenantDeps.SpanConfig.Splitter = execCtx.ExecCfg().SpanConfigSplitter diff --git a/pkg/upgrade/upgrades/descriptor_utils.go b/pkg/upgrade/upgrades/descriptor_utils.go index 77abcc951787..36cfb4767c5c 100644 --- a/pkg/upgrade/upgrades/descriptor_utils.go +++ b/pkg/upgrade/upgrades/descriptor_utils.go @@ -88,7 +88,7 @@ func runPostDeserializationChangesOnAllDescriptors( maybeUpgradeDescriptors := func( ctx context.Context, d upgrade.TenantDeps, toUpgrade []descpb.ID, ) error { - return d.CollectionFactory.Txn(ctx, d.DB, func( + return d.InternalExecutorFactory.DescsTxn(ctx, d.DB, func( ctx context.Context, txn *kv.Txn, descriptors *descs.Collection, ) error { descs, err := descriptors.GetMutableDescriptorsByID(ctx, txn, toUpgrade...) diff --git a/pkg/upgrade/upgrades/helpers_test.go b/pkg/upgrade/upgrades/helpers_test.go index d48e4e9ccd55..e3cc524f8562 100644 --- a/pkg/upgrade/upgrades/helpers_test.go +++ b/pkg/upgrade/upgrades/helpers_test.go @@ -75,7 +75,7 @@ func InjectLegacyTable( table catalog.TableDescriptor, getDeprecatedDescriptor func() *descpb.TableDescriptor, ) { - err := s.CollectionFactory().(*descs.CollectionFactory).Txn(ctx, s.DB(), func( + err := s.InternalExecutorFactory().(descs.TxnManager).DescsTxn(ctx, s.DB(), func( ctx context.Context, txn *kv.Txn, descriptors *descs.Collection, ) error { id := table.GetID() @@ -140,7 +140,7 @@ func GetTable( ) catalog.TableDescriptor { var table catalog.TableDescriptor // Retrieve the table. - err := s.CollectionFactory().(*descs.CollectionFactory).Txn(ctx, s.DB(), func( + err := s.InternalExecutorFactory().(descs.TxnManager).DescsTxn(ctx, s.DB(), func( ctx context.Context, txn *kv.Txn, descriptors *descs.Collection, ) (err error) { table, err = descriptors.GetImmutableTableByID(ctx, txn, tableID, tree.ObjectLookupFlags{ diff --git a/pkg/upgrade/upgrades/schema_changes.go b/pkg/upgrade/upgrades/schema_changes.go index a4d25773a4ab..427580b2d5aa 100644 --- a/pkg/upgrade/upgrades/schema_changes.go +++ b/pkg/upgrade/upgrades/schema_changes.go @@ -142,7 +142,7 @@ func readTableDescriptor( ) (catalog.TableDescriptor, error) { var t catalog.TableDescriptor - if err := d.CollectionFactory.Txn(ctx, d.DB, func( + if err := d.InternalExecutorFactory.DescsTxn(ctx, d.DB, func( ctx context.Context, txn *kv.Txn, descriptors *descs.Collection, ) (err error) { t, err = descriptors.GetImmutableTableByID(ctx, txn, tableID, tree.ObjectLookupFlags{ diff --git a/pkg/upgrade/upgrades/schema_changes_external_test.go b/pkg/upgrade/upgrades/schema_changes_external_test.go index 75b039208eb7..72829fd04787 100644 --- a/pkg/upgrade/upgrades/schema_changes_external_test.go +++ b/pkg/upgrade/upgrades/schema_changes_external_test.go @@ -306,7 +306,7 @@ CREATE TABLE test.test_table ( tdb.Exec(t, "CREATE DATABASE test") tdb.Exec(t, createTableAfter) var desc catalog.TableDescriptor - require.NoError(t, s.CollectionFactory().(*descs.CollectionFactory).Txn(ctx, s.DB(), func( + require.NoError(t, s.InternalExecutorFactory().(descs.TxnManager).DescsTxn(ctx, s.DB(), func( ctx context.Context, txn *kv.Txn, descriptors *descs.Collection, ) (err error) { tn := tree.MakeTableNameWithSchema("test", "public", "test_table") diff --git a/pkg/upgrade/upgrades/update_invalid_column_ids_in_sequence_back_references.go b/pkg/upgrade/upgrades/update_invalid_column_ids_in_sequence_back_references.go index f7f6dc3b1e01..03a315e0f8cd 100644 --- a/pkg/upgrade/upgrades/update_invalid_column_ids_in_sequence_back_references.go +++ b/pkg/upgrade/upgrades/update_invalid_column_ids_in_sequence_back_references.go @@ -37,7 +37,7 @@ func updateInvalidColumnIDsInSequenceBackReferences( for { var currSeqID descpb.ID var done bool - if err := d.CollectionFactory.TxnWithExecutor(ctx, d.DB, d.SessionData, func( + if err := d.InternalExecutorFactory.DescsTxnWithExecutor(ctx, d.DB, d.SessionData, func( ctx context.Context, txn *kv.Txn, descriptors *descs.Collection, ie sqlutil.InternalExecutor, ) (err error) { currSeqID = lastSeqID diff --git a/pkg/upgrade/upgrades/upgrade_sequence_to_be_referenced_by_ID.go b/pkg/upgrade/upgrades/upgrade_sequence_to_be_referenced_by_ID.go index 9d0b2c0a943a..a51da4f916bd 100644 --- a/pkg/upgrade/upgrades/upgrade_sequence_to_be_referenced_by_ID.go +++ b/pkg/upgrade/upgrades/upgrade_sequence_to_be_referenced_by_ID.go @@ -35,7 +35,7 @@ import ( func upgradeSequenceToBeReferencedByID( ctx context.Context, _ clusterversion.ClusterVersion, d upgrade.TenantDeps, _ *jobs.Job, ) error { - return d.CollectionFactory.TxnWithExecutor(ctx, d.DB, d.SessionData, func( + return d.InternalExecutorFactory.DescsTxnWithExecutor(ctx, d.DB, d.SessionData, func( ctx context.Context, txn *kv.Txn, descriptors *descs.Collection, ie sqlutil.InternalExecutor, ) (err error) { var lastUpgradedID descpb.ID @@ -112,7 +112,7 @@ func findNextTableToUpgrade( func maybeUpgradeSeqReferencesInTableOrView( ctx context.Context, idToUpgrade descpb.ID, d upgrade.TenantDeps, ) error { - return d.CollectionFactory.Txn(ctx, d.DB, func( + return d.InternalExecutorFactory.DescsTxn(ctx, d.DB, func( ctx context.Context, txn *kv.Txn, descriptors *descs.Collection, ) error { // Set up: retrieve table desc for `idToUpgrade` and a schema resolver