Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions pkg/sql/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -1097,7 +1097,16 @@ func (ie *InternalExecutor) checkIfStmtIsAllowed(stmt tree.Statement, txn *kv.Tx

// checkIfTxnIsConsistent returns true if the given txn is not nil and is not
// the same txn that is used to construct the internal executor.
// TODO(janexing): this will be deprecated soon as we will only use
// ie.extraTxnState.txn, and the txn argument in query functions will be
// deprecated.
func (ie *InternalExecutor) checkIfTxnIsConsistent(txn *kv.Txn) error {
if txn == nil && ie.extraTxnState != nil {
return errors.New("the current internal executor was contructed with" +
"a txn. To use an internal executor without a txn, call " +
"sqlutil.InternalExecutorFactory.RunWithoutTxn()")
}

if txn != nil && ie.extraTxnState != nil && ie.extraTxnState.txn != txn {
return errors.New("txn is inconsistent with the one when " +
"constructing the internal executor")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func updateInvalidColumnIDsInSequenceBackReferences(
) (err error) {
currSeqID = lastSeqID
for {
done, currSeqID, err = findNextTableToUpgrade(ctx, ie, currSeqID,
done, currSeqID, err = findNextTableToUpgrade(ctx, ie, txn, currSeqID,
func(table *descpb.TableDescriptor) bool {
return table.IsSequence()
})
Expand Down
42 changes: 23 additions & 19 deletions pkg/upgrade/upgrades/upgrade_sequence_to_be_referenced_by_ID.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,26 +34,29 @@ import (
func upgradeSequenceToBeReferencedByID(
ctx context.Context, _ clusterversion.ClusterVersion, d upgrade.TenantDeps, _ *jobs.Job,
) error {
var lastUpgradedID descpb.ID
// Upgrade each table/view, one at a time, until we exhaust all of them.
for {
done, idToUpgrade, err := findNextTableToUpgrade(ctx, d.InternalExecutor, lastUpgradedID,
func(table *descpb.TableDescriptor) bool {
return table.IsTable() || table.IsView()
})
if err != nil || done {
return err
}
return d.CollectionFactory.TxnWithExecutor(ctx, d.DB, d.SessionData, func(
ctx context.Context, txn *kv.Txn, descriptors *descs.Collection, ie sqlutil.InternalExecutor,
) (err error) {
var lastUpgradedID descpb.ID
// Upgrade each table/view, one at a time, until we exhaust all of them.
for {
done, idToUpgrade, err := findNextTableToUpgrade(ctx, d.InternalExecutor, txn, lastUpgradedID,
func(table *descpb.TableDescriptor) bool {
return table.IsTable() || table.IsView()
})
if err != nil || done {
return err
}

// Table/View `idToUpgrade` might contain reference to sequences by name. If so, we need to upgrade
// those references to be by ID.
err = maybeUpgradeSeqReferencesInTableOrView(ctx, idToUpgrade, d)
if err != nil {
return err
// Table/View `idToUpgrade` might contain reference to sequences by name. If so, we need to upgrade
// those references to be by ID.
err = maybeUpgradeSeqReferencesInTableOrView(ctx, idToUpgrade, d)
if err != nil {
return err
}
lastUpgradedID = idToUpgrade
}

lastUpgradedID = idToUpgrade
}
})
}

// Find the next table descriptor ID that is > `lastUpgradedID`
Expand All @@ -62,11 +65,12 @@ func upgradeSequenceToBeReferencedByID(
func findNextTableToUpgrade(
ctx context.Context,
ie sqlutil.InternalExecutor,
txn *kv.Txn,
lastUpgradedID descpb.ID,
tableSelector func(table *descpb.TableDescriptor) bool,
) (done bool, idToUpgrade descpb.ID, err error) {
var rows sqlutil.InternalRows
rows, err = ie.QueryIterator(ctx, "upgrade-seq-find-desc", nil,
rows, err = ie.QueryIterator(ctx, "upgrade-seq-find-desc", txn,
`SELECT id, descriptor, crdb_internal_mvcc_timestamp FROM system.descriptor WHERE id > $1 ORDER BY ID ASC`, lastUpgradedID)
if err != nil {
return false, 0, err
Expand Down