Skip to content

Commit

Permalink
ddl: refine setSchemaDiff (#51468)
Browse files Browse the repository at this point in the history
ref #50959
  • Loading branch information
ywqzzy authored Apr 24, 2024
1 parent e349c7c commit 851e22d
Show file tree
Hide file tree
Showing 3 changed files with 417 additions and 308 deletions.
1 change: 1 addition & 0 deletions pkg/ddl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ go_library(
"rollingback.go",
"sanity_check.go",
"schema.go",
"schema_version.go",
"sequence.go",
"split_region.go",
"stat.go",
Expand Down
308 changes: 0 additions & 308 deletions pkg/ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ import (
tidbutil "github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tidb/pkg/util/dbterror"
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/pingcap/tidb/pkg/util/mathutil"
"github.com/pingcap/tidb/pkg/util/resourcegrouptag"
"github.com/pingcap/tidb/pkg/util/topsql"
topsqlstate "github.com/pingcap/tidb/pkg/util/topsql/state"
Expand Down Expand Up @@ -1402,68 +1401,12 @@ func waitSchemaChanged(d *ddlCtx, waitTime time.Duration, latestSchemaVersion in
return checkAllVersions(d, job, latestSchemaVersion, timeStart)
}

func checkAllVersions(d *ddlCtx, job *model.Job, latestSchemaVersion int64, timeStart time.Time) error {
failpoint.Inject("checkDownBeforeUpdateGlobalVersion", func(val failpoint.Value) {
if val.(bool) {
if mockDDLErrOnce > 0 && mockDDLErrOnce != latestSchemaVersion {
panic("check down before update global version failed")
}
mockDDLErrOnce = -1
}
})

// OwnerCheckAllVersions returns only when all TiDB schemas are synced(exclude the isolated TiDB).
err := d.schemaSyncer.OwnerCheckAllVersions(d.ctx, job.ID, latestSchemaVersion)
if err != nil {
logutil.Logger(d.ctx).Info("wait latest schema version encounter error", zap.String("category", "ddl"), zap.Int64("ver", latestSchemaVersion),
zap.Int64("jobID", job.ID), zap.Duration("take time", time.Since(timeStart)), zap.Error(err))
return err
}
logutil.Logger(d.ctx).Info("wait latest schema version changed(get the metadata lock if tidb_enable_metadata_lock is true)", zap.String("category", "ddl"),
zap.Int64("ver", latestSchemaVersion),
zap.Duration("take time", time.Since(timeStart)),
zap.String("job", job.String()))
return nil
}

// waitSchemaSyncedForMDL likes waitSchemaSynced, but it waits for getting the metadata lock of the latest version of this DDL.
func waitSchemaSyncedForMDL(d *ddlCtx, job *model.Job, latestSchemaVersion int64) error {
timeStart := time.Now()
return checkAllVersions(d, job, latestSchemaVersion, timeStart)
}

// waitSchemaSynced handles the following situation:
// If the job enters a new state, and the worker crashs when it's in the process of waiting for 2 * lease time,
// Then the worker restarts quickly, we may run the job immediately again,
// but in this case we don't wait enough 2 * lease time to let other servers update the schema.
// So here we get the latest schema version to make sure all servers' schema version update to the latest schema version
// in a cluster, or to wait for 2 * lease time.
func waitSchemaSynced(d *ddlCtx, job *model.Job, waitTime time.Duration) error {
if !job.IsRunning() && !job.IsRollingback() && !job.IsDone() && !job.IsRollbackDone() {
return nil
}

ver, _ := d.store.CurrentVersion(kv.GlobalTxnScope)
snapshot := d.store.GetSnapshot(ver)
m := meta.NewSnapshotMeta(snapshot)
latestSchemaVersion, err := m.GetSchemaVersionWithNonEmptyDiff()
if err != nil {
logutil.Logger(d.ctx).Warn("get global version failed", zap.String("category", "ddl"), zap.Int64("jobID", job.ID), zap.Error(err))
return err
}

failpoint.Inject("checkDownBeforeUpdateGlobalVersion", func(val failpoint.Value) {
if val.(bool) {
if mockDDLErrOnce > 0 && mockDDLErrOnce != latestSchemaVersion {
panic("check down before update global version failed")
}
mockDDLErrOnce = -1
}
})

return waitSchemaChanged(d, waitTime, latestSchemaVersion, job)
}

func buildPlacementAffects(oldIDs []int64, newIDs []int64) []*model.AffectedOption {
if len(oldIDs) == 0 {
return nil
Expand All @@ -1478,254 +1421,3 @@ func buildPlacementAffects(oldIDs []int64, newIDs []int64) []*model.AffectedOpti
}
return affects
}

// updateSchemaVersion increments the schema version by 1 and sets SchemaDiff.
func updateSchemaVersion(d *ddlCtx, t *meta.Meta, job *model.Job, multiInfos ...schemaIDAndTableInfo) (int64, error) {
schemaVersion, err := d.setSchemaVersion(job, d.store)
if err != nil {
return 0, errors.Trace(err)
}
diff := &model.SchemaDiff{
Version: schemaVersion,
Type: job.Type,
SchemaID: job.SchemaID,
}
switch job.Type {
case model.ActionCreateTables:
var tableInfos []*model.TableInfo
err = job.DecodeArgs(&tableInfos)
if err != nil {
return 0, errors.Trace(err)
}
diff.AffectedOpts = make([]*model.AffectedOption, len(tableInfos))
for i := range tableInfos {
diff.AffectedOpts[i] = &model.AffectedOption{
SchemaID: job.SchemaID,
OldSchemaID: job.SchemaID,
TableID: tableInfos[i].ID,
OldTableID: tableInfos[i].ID,
}
}
case model.ActionTruncateTable:
// Truncate table has two table ID, should be handled differently.
err = job.DecodeArgs(&diff.TableID)
if err != nil {
return 0, errors.Trace(err)
}
diff.OldTableID = job.TableID

// affects are used to update placement rule cache
if len(job.CtxVars) > 0 {
oldIDs := job.CtxVars[0].([]int64)
newIDs := job.CtxVars[1].([]int64)
diff.AffectedOpts = buildPlacementAffects(oldIDs, newIDs)
}
case model.ActionCreateView:
tbInfo := &model.TableInfo{}
var orReplace bool
var oldTbInfoID int64
if err := job.DecodeArgs(tbInfo, &orReplace, &oldTbInfoID); err != nil {
return 0, errors.Trace(err)
}
// When the statement is "create or replace view " and we need to drop the old view,
// it has two table IDs and should be handled differently.
if oldTbInfoID > 0 && orReplace {
diff.OldTableID = oldTbInfoID
}
diff.TableID = tbInfo.ID
case model.ActionRenameTable:
err = job.DecodeArgs(&diff.OldSchemaID)
if err != nil {
return 0, errors.Trace(err)
}
diff.TableID = job.TableID
case model.ActionRenameTables:
var (
oldSchemaIDs, newSchemaIDs, tableIDs []int64
tableNames, oldSchemaNames []*model.CIStr
)
err = job.DecodeArgs(&oldSchemaIDs, &newSchemaIDs, &tableNames, &tableIDs, &oldSchemaNames)
if err != nil {
return 0, errors.Trace(err)
}
affects := make([]*model.AffectedOption, len(newSchemaIDs)-1)
for i, newSchemaID := range newSchemaIDs {
// Do not add the first table to AffectedOpts. Related issue tidb#47064.
if i == 0 {
continue
}
affects[i-1] = &model.AffectedOption{
SchemaID: newSchemaID,
TableID: tableIDs[i],
OldTableID: tableIDs[i],
OldSchemaID: oldSchemaIDs[i],
}
}
diff.TableID = tableIDs[0]
diff.SchemaID = newSchemaIDs[0]
diff.OldSchemaID = oldSchemaIDs[0]
diff.AffectedOpts = affects
case model.ActionExchangeTablePartition:
// From start of function: diff.SchemaID = job.SchemaID
// Old is original non partitioned table
diff.OldTableID = job.TableID
diff.OldSchemaID = job.SchemaID
// Update the partitioned table (it is only done in the last state)
var (
ptSchemaID int64
ptTableID int64
ptDefID int64
partName string // Not used
withValidation bool // Not used
)
// See ddl.ExchangeTablePartition
err = job.DecodeArgs(&ptDefID, &ptSchemaID, &ptTableID, &partName, &withValidation)
if err != nil {
return 0, errors.Trace(err)
}
// This is needed for not crashing TiFlash!
// TODO: Update TiFlash, to handle StateWriteOnly
diff.AffectedOpts = []*model.AffectedOption{{
TableID: ptTableID,
}}
if job.SchemaState != model.StatePublic {
// No change, just to refresh the non-partitioned table
// with its new ExchangePartitionInfo.
diff.TableID = job.TableID
// Keep this as Schema ID of non-partitioned table
// to avoid trigger early rename in TiFlash
diff.AffectedOpts[0].SchemaID = job.SchemaID
// Need reload partition table, use diff.AffectedOpts[0].OldSchemaID to mark it.
if len(multiInfos) > 0 {
diff.AffectedOpts[0].OldSchemaID = ptSchemaID
}
} else {
// Swap
diff.TableID = ptDefID
// Also add correct SchemaID in case different schemas
diff.AffectedOpts[0].SchemaID = ptSchemaID
}
case model.ActionTruncateTablePartition:
diff.TableID = job.TableID
if len(job.CtxVars) > 0 {
oldIDs := job.CtxVars[0].([]int64)
newIDs := job.CtxVars[1].([]int64)
diff.AffectedOpts = buildPlacementAffects(oldIDs, newIDs)
}
case model.ActionDropTablePartition, model.ActionRecoverTable, model.ActionDropTable:
// affects are used to update placement rule cache
diff.TableID = job.TableID
if len(job.CtxVars) > 0 {
if oldIDs, ok := job.CtxVars[0].([]int64); ok {
diff.AffectedOpts = buildPlacementAffects(oldIDs, oldIDs)
}
}
case model.ActionReorganizePartition:
diff.TableID = job.TableID
// TODO: should this be for every state of Reorganize?
if len(job.CtxVars) > 0 {
if droppedIDs, ok := job.CtxVars[0].([]int64); ok {
if addedIDs, ok := job.CtxVars[1].([]int64); ok {
// to use AffectedOpts we need both new and old to have the same length
maxParts := mathutil.Max[int](len(droppedIDs), len(addedIDs))
// Also initialize them to 0!
oldIDs := make([]int64, maxParts)
copy(oldIDs, droppedIDs)
newIDs := make([]int64, maxParts)
copy(newIDs, addedIDs)
diff.AffectedOpts = buildPlacementAffects(oldIDs, newIDs)
}
}
}
case model.ActionRemovePartitioning, model.ActionAlterTablePartitioning:
diff.TableID = job.TableID
diff.OldTableID = job.TableID
if job.SchemaState == model.StateDeleteReorganization {
partInfo := &model.PartitionInfo{}
var partNames []string
err = job.DecodeArgs(&partNames, &partInfo)
if err != nil {
return 0, errors.Trace(err)
}
// Final part, new table id is assigned
diff.TableID = partInfo.NewTableID
if len(job.CtxVars) > 0 {
if droppedIDs, ok := job.CtxVars[0].([]int64); ok {
if addedIDs, ok := job.CtxVars[1].([]int64); ok {
// to use AffectedOpts we need both new and old to have the same length
maxParts := mathutil.Max[int](len(droppedIDs), len(addedIDs))
// Also initialize them to 0!
oldIDs := make([]int64, maxParts)
copy(oldIDs, droppedIDs)
newIDs := make([]int64, maxParts)
copy(newIDs, addedIDs)
diff.AffectedOpts = buildPlacementAffects(oldIDs, newIDs)
}
}
}
}
case model.ActionCreateTable:
diff.TableID = job.TableID
if len(job.Args) > 0 {
tbInfo, _ := job.Args[0].(*model.TableInfo)
// When create table with foreign key, there are two schema status change:
// 1. none -> write-only
// 2. write-only -> public
// In the second status change write-only -> public, infoschema loader should apply drop old table first, then
// apply create new table. So need to set diff.OldTableID here to make sure it.
if tbInfo != nil && tbInfo.State == model.StatePublic && len(tbInfo.ForeignKeys) > 0 {
diff.OldTableID = job.TableID
}
}
case model.ActionRecoverSchema:
var (
recoverSchemaInfo *RecoverSchemaInfo
recoverSchemaCheckFlag int64
)
err = job.DecodeArgs(&recoverSchemaInfo, &recoverSchemaCheckFlag)
if err != nil {
return 0, errors.Trace(err)
}
// Reserved recoverSchemaCheckFlag value for gc work judgment.
job.Args[checkFlagIndexInJobArgs] = recoverSchemaCheckFlag
recoverTabsInfo := recoverSchemaInfo.RecoverTabsInfo
diff.AffectedOpts = make([]*model.AffectedOption, len(recoverTabsInfo))
for i := range recoverTabsInfo {
diff.AffectedOpts[i] = &model.AffectedOption{
SchemaID: job.SchemaID,
OldSchemaID: job.SchemaID,
TableID: recoverTabsInfo[i].TableInfo.ID,
OldTableID: recoverTabsInfo[i].TableInfo.ID,
}
}
case model.ActionFlashbackCluster:
diff.TableID = -1
if job.SchemaState == model.StatePublic {
diff.RegenerateSchemaMap = true
}
default:
diff.TableID = job.TableID
}
if len(multiInfos) > 0 {
existsMap := make(map[int64]struct{})
existsMap[diff.TableID] = struct{}{}
for _, affect := range diff.AffectedOpts {
existsMap[affect.TableID] = struct{}{}
}
for _, info := range multiInfos {
_, exist := existsMap[info.tblInfo.ID]
if exist {
continue
}
existsMap[info.tblInfo.ID] = struct{}{}
diff.AffectedOpts = append(diff.AffectedOpts, &model.AffectedOption{
SchemaID: info.schemaID,
OldSchemaID: info.schemaID,
TableID: info.tblInfo.ID,
OldTableID: info.tblInfo.ID,
})
}
}
err = t.SetSchemaDiff(diff)
return schemaVersion, errors.Trace(err)
}
Loading

0 comments on commit 851e22d

Please sign in to comment.