Skip to content

Commit 7e6b49b

Browse files
mjonssti-chi-bot
authored andcommitted
*: Allow Point_Get during DDL with Global Index (pingcap#56382)
ref pingcap#45133, ref pingcap#55819, close pingcap#56819
1 parent 27e78ae commit 7e6b49b

File tree

6 files changed

+147
-47
lines changed

6 files changed

+147
-47
lines changed

pkg/ddl/partition.go

+31-19
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,8 @@ func (w *worker) onAddTablePartition(jobCtx *jobContext, job *model.Job) (ver in
136136

137137
// move the adding definition into tableInfo.
138138
updateAddingPartitionInfo(partInfo, tblInfo)
139+
tblInfo.Partition.DDLState = model.StateReplicaOnly
140+
tblInfo.Partition.DDLAction = job.Type
139141
ver, err = updateVersionAndTableInfoWithCheck(jobCtx, job, tblInfo, true)
140142
if err != nil {
141143
return ver, errors.Trace(err)
@@ -222,6 +224,8 @@ func (w *worker) onAddTablePartition(jobCtx *jobContext, job *model.Job) (ver in
222224

223225
preSplitAndScatter(w.sess.Context, jobCtx.store, tblInfo, addingDefinitions)
224226

227+
tblInfo.Partition.DDLState = model.StateNone
228+
tblInfo.Partition.DDLAction = model.ActionNone
225229
ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, true)
226230
if err != nil {
227231
return ver, errors.Trace(err)
@@ -2244,9 +2248,6 @@ func (w *worker) onDropTablePartition(jobCtx *jobContext, job *model.Job) (ver i
22442248
}
22452249

22462250
var physicalTableIDs []int64
2247-
// In order to skip maintaining the state check in partitionDefinition, TiDB use droppingDefinition instead of state field.
2248-
// So here using `job.SchemaState` to judge what the stage of this job is.
2249-
originalState := job.SchemaState
22502251
switch job.SchemaState {
22512252
case model.StatePublic:
22522253
// Here we mark the partitions to be dropped, so they are not read or written
@@ -2260,11 +2261,11 @@ func (w *worker) onDropTablePartition(jobCtx *jobContext, job *model.Job) (ver i
22602261
originalDefs := tblInfo.Partition.Definitions
22612262
physicalTableIDs = updateDroppingPartitionInfo(tblInfo, partNames)
22622263
tblInfo.Partition.Definitions = originalDefs
2263-
tblInfo.Partition.DDLState = model.StateWriteOnly
2264-
tblInfo.Partition.DDLAction = model.ActionDropTablePartition
2265-
22662264
job.SchemaState = model.StateWriteOnly
2267-
ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, originalState != job.SchemaState)
2265+
tblInfo.Partition.DDLState = job.SchemaState
2266+
tblInfo.Partition.DDLAction = job.Type
2267+
2268+
ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, true)
22682269
case model.StateWriteOnly:
22692270
// Since the previous state do not use the dropping partitions,
22702271
// we can now actually remove them, allowing to write into the overlapping range
@@ -2308,16 +2309,16 @@ func (w *worker) onDropTablePartition(jobCtx *jobContext, job *model.Job) (ver i
23082309
return ver, err
23092310
}
23102311

2311-
tblInfo.Partition.DDLState = model.StateDeleteOnly
23122312
job.SchemaState = model.StateDeleteOnly
2313-
ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, originalState != job.SchemaState)
2313+
tblInfo.Partition.DDLState = job.SchemaState
2314+
ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, true)
23142315
case model.StateDeleteOnly:
23152316
// This state is not a real 'DeleteOnly' state, because tidb does not maintain the state check in partitionDefinition.
23162317
// Insert this state to confirm all servers can not see the old partitions when reorg is running,
23172318
// so that no new data will be inserted into old partitions when reorganizing.
2318-
tblInfo.Partition.DDLState = model.StateDeleteReorganization
23192319
job.SchemaState = model.StateDeleteReorganization
2320-
ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, originalState != job.SchemaState)
2320+
tblInfo.Partition.DDLState = job.SchemaState
2321+
ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, true)
23212322
case model.StateDeleteReorganization:
23222323
oldTblInfo := getTableInfoWithDroppingPartitions(tblInfo)
23232324
physicalTableIDs = getPartitionIDsFromDefinitions(tblInfo.Partition.DroppingDefinitions)
@@ -2375,7 +2376,8 @@ func (w *worker) onDropTablePartition(jobCtx *jobContext, job *model.Job) (ver i
23752376
}
23762377
droppedDefs := tblInfo.Partition.DroppingDefinitions
23772378
tblInfo.Partition.DroppingDefinitions = nil
2378-
tblInfo.Partition.DDLState = model.StateNone
2379+
job.SchemaState = model.StateNone
2380+
tblInfo.Partition.DDLState = job.SchemaState
23792381
tblInfo.Partition.DDLAction = model.ActionNone
23802382
// used by ApplyDiff in updateSchemaVersion
23812383
job.CtxVars = []any{physicalTableIDs} // TODO remove it.
@@ -2511,14 +2513,16 @@ func (w *worker) onTruncateTablePartition(jobCtx *jobContext, job *model.Job) (i
25112513
pi.DroppingDefinitions = truncatingDefinitions
25122514
pi.NewPartitionIDs = newIDs[:]
25132515

2514-
tblInfo.Partition.DDLAction = model.ActionTruncateTablePartition
25152516
job.SchemaState = model.StateDeleteOnly
2517+
pi.DDLState = job.SchemaState
2518+
pi.DDLAction = job.Type
25162519
ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, true)
25172520
case model.StateDeleteOnly:
25182521
// This state is not a real 'DeleteOnly' state, because tidb does not maintaining the state check in partitionDefinition.
25192522
// Insert this state to confirm all servers can not see the old partitions when reorg is running,
25202523
// so that no new data will be inserted into old partitions when reorganizing.
25212524
job.SchemaState = model.StateDeleteReorganization
2525+
pi.DDLState = job.SchemaState
25222526
ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, true)
25232527
case model.StateDeleteReorganization:
25242528
// Step2: clear global index rows.
@@ -2609,6 +2613,8 @@ func (w *worker) onTruncateTablePartition(jobCtx *jobContext, job *model.Job) (i
26092613
// Step4: clear DroppingDefinitions and finish job.
26102614
tblInfo.Partition.DroppingDefinitions = nil
26112615
tblInfo.Partition.NewPartitionIDs = nil
2616+
tblInfo.Partition.DDLAction = model.ActionNone
2617+
tblInfo.Partition.DDLState = model.StateNone
26122618

26132619
preSplitAndScatter(w.sess.Context, jobCtx.store, tblInfo, newPartitions)
26142620

@@ -2816,6 +2822,8 @@ func (w *worker) onExchangeTablePartition(jobCtx *jobContext, job *model.Job) (v
28162822
// into the table using the schema version
28172823
// before the exchange is made.
28182824
job.SchemaState = model.StateWriteOnly
2825+
pt.Partition.DDLState = job.SchemaState
2826+
pt.Partition.DDLAction = job.Type
28192827
return updateVersionAndTableInfoWithCheck(jobCtx, job, nt, true, ptInfo...)
28202828
}
28212829
// From now on, nt (the non-partitioned table) has
@@ -2890,6 +2898,8 @@ func (w *worker) onExchangeTablePartition(jobCtx *jobContext, job *model.Job) (v
28902898
originalPartitionDef := partDef.Clone()
28912899
originalNt := nt.Clone()
28922900
partDef.ID, nt.ID = nt.ID, partDef.ID
2901+
pt.Partition.DDLState = model.StateNone
2902+
pt.Partition.DDLAction = model.ActionNone
28932903

28942904
err = metaMut.UpdateTable(ptSchemaID, pt)
28952905
if err != nil {
@@ -3324,7 +3334,7 @@ func (w *worker) onReorganizePartition(jobCtx *jobContext, job *model.Job) (ver
33243334
// Assume we cannot have more than MaxUint64 rows, set the progress to 1/10 of that.
33253335
metrics.GetBackfillProgressByLabel(metrics.LblReorgPartition, job.SchemaName, tblInfo.Name.String()).Set(0.1 / float64(math.MaxUint64))
33263336
job.SchemaState = model.StateDeleteOnly
3327-
tblInfo.Partition.DDLState = model.StateDeleteOnly
3337+
tblInfo.Partition.DDLState = job.SchemaState
33283338
tblInfo.Partition.DDLAction = job.Type
33293339
ver, err = updateVersionAndTableInfoWithCheck(jobCtx, job, tblInfo, true)
33303340
if err != nil {
@@ -3394,8 +3404,9 @@ func (w *worker) onReorganizePartition(jobCtx *jobContext, job *model.Job) (ver
33943404
failpoint.Return(rollbackReorganizePartitionWithErr(jobCtx, job, err))
33953405
}
33963406
})
3397-
ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, true)
33983407
job.SchemaState = model.StateWriteOnly
3408+
tblInfo.Partition.DDLState = job.SchemaState
3409+
ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, true)
33993410
case model.StateWriteOnly:
34003411
// Insert this state to confirm all servers can see the new partitions when reorg is running,
34013412
// so that new data will be updated in both old and new partitions when reorganizing.
@@ -3405,10 +3416,10 @@ func (w *worker) onReorganizePartition(jobCtx *jobContext, job *model.Job) (ver
34053416
tblInfo.Indices[i].State = model.StateWriteReorganization
34063417
}
34073418
}
3408-
tblInfo.Partition.DDLState = model.StateWriteReorganization
3419+
job.SchemaState = model.StateWriteReorganization
3420+
tblInfo.Partition.DDLState = job.SchemaState
34093421
metrics.GetBackfillProgressByLabel(metrics.LblReorgPartition, job.SchemaName, tblInfo.Name.String()).Set(0.3 / float64(math.MaxUint64))
34103422
ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, true)
3411-
job.SchemaState = model.StateWriteReorganization
34123423
case model.StateWriteReorganization:
34133424
physicalTableIDs := getPartitionIDsFromDefinitions(tblInfo.Partition.DroppingDefinitions)
34143425
tbl, err2 := getTable(jobCtx.getAutoIDRequirement(), job.SchemaID, tblInfo)
@@ -3493,9 +3504,9 @@ func (w *worker) onReorganizePartition(jobCtx *jobContext, job *model.Job) (ver
34933504
// Now all the data copying is done, but we cannot simply remove the droppingDefinitions
34943505
// since they are a part of the normal Definitions that other nodes with
34953506
// the current schema version. So we need to double write for one more schema version
3496-
tblInfo.Partition.DDLState = model.StateDeleteReorganization
3497-
ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, true)
34983507
job.SchemaState = model.StateDeleteReorganization
3508+
tblInfo.Partition.DDLState = job.SchemaState
3509+
ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, true)
34993510

35003511
case model.StateDeleteReorganization:
35013512
// Drop the droppingDefinitions and finish the DDL
@@ -3517,6 +3528,7 @@ func (w *worker) onReorganizePartition(jobCtx *jobContext, job *model.Job) (ver
35173528
tblInfo.Partition.DroppingDefinitions = nil
35183529
tblInfo.Partition.AddingDefinitions = nil
35193530
tblInfo.Partition.DDLState = model.StateNone
3531+
tblInfo.Partition.DDLAction = model.ActionNone
35203532
tblInfo.Partition.OriginalPartitionIDsOrder = nil
35213533

35223534
var dropIndices []*model.IndexInfo

pkg/ddl/tests/partition/db_partition_test.go

+2-4
Original file line numberDiff line numberDiff line change
@@ -1433,10 +1433,8 @@ func TestTruncatePartitionWithGlobalIndex(t *testing.T) {
14331433
tk3 := testkit.NewTestKit(t, store)
14341434
tk3.MustExec(`begin`)
14351435
tk3.MustExec(`use test`)
1436-
tk3.MustQuery(`explain format='brief' select b from test_global use index(idx_b) where b = 15`).CheckContain("IndexRangeScan")
1437-
tk3.MustQuery(`explain format='brief' select b from test_global use index(idx_b) where b = 15`).CheckContain("Selection")
1438-
tk3.MustQuery(`explain format='brief' select c from test_global use index(idx_c) where c = 15`).CheckContain("IndexRangeScan")
1439-
tk3.MustQuery(`explain format='brief' select c from test_global use index(idx_c) where c = 15`).CheckContain("Selection")
1436+
tk3.MustQuery(`explain format='brief' select b from test_global use index(idx_b) where b = 15`).CheckContain("Point_Get")
1437+
tk3.MustQuery(`explain format='brief' select c from test_global use index(idx_c) where c = 15`).CheckContain("Point_Get")
14401438
tk3.MustQuery(`select b from test_global use index(idx_b) where b = 15`).Check(testkit.Rows())
14411439
tk3.MustQuery(`select c from test_global use index(idx_c) where c = 15`).Check(testkit.Rows())
14421440
// Here it will fail with

pkg/ddl/tests/partition/multi_domain_test.go

+22
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,28 @@ import (
2727
"go.uber.org/zap"
2828
)
2929

30+
func TestMultiSchemaReorganizePartitionIssue56819(t *testing.T) {
31+
createSQL := `create table t (a int primary key, b varchar(255), unique index idx_b_global (b) global) partition by range (a) (partition p1 values less than (200), partition pMax values less than (maxvalue))`
32+
initFn := func(tkO *testkit.TestKit) {
33+
tkO.MustExec(`insert into t values (1,1),(2,2)`)
34+
}
35+
alterSQL := `alter table t reorganize partition p1 into (partition p0 values less than (100), partition p1 values less than (200))`
36+
loopFn := func(tkO, tkNO *testkit.TestKit) {
37+
res := tkO.MustQuery(`select schema_state from information_schema.DDL_JOBS where table_name = 't' order by job_id desc limit 1`)
38+
schemaState := res.Rows()[0][0].(string)
39+
switch schemaState {
40+
case model.StateDeleteOnly.String():
41+
tkNO.MustExec(`insert into t values (4,4)`)
42+
tkNO.MustQuery(`select * from t where b = "4"`).Sort().Check(testkit.Rows("4 4"))
43+
tkO.MustQuery(`select * from t where b = "4"`).Sort().Check(testkit.Rows("4 4"))
44+
}
45+
}
46+
postFn := func(tkO *testkit.TestKit) {
47+
// nothing
48+
}
49+
runMultiSchemaTest(t, createSQL, alterSQL, initFn, postFn, loopFn)
50+
}
51+
3052
func TestMultiSchemaDropRangePartition(t *testing.T) {
3153
createSQL := `create table t (a int primary key, b varchar(255)) partition by range (a) (partition p0 values less than (100), partition p1 values less than (200))`
3254
initFn := func(tkO *testkit.TestKit) {

pkg/executor/point_get.go

+7-1
Original file line numberDiff line numberDiff line change
@@ -371,9 +371,15 @@ func (e *PointGetExecutor) Next(ctx context.Context, req *chunk.Chunk) error {
371371
return err
372372
}
373373
tblID = pid
374-
if !matchPartitionNames(tblID, e.partitionNames, e.tblInfo.GetPartitionInfo()) {
374+
pi := e.tblInfo.GetPartitionInfo()
375+
if !matchPartitionNames(tblID, e.partitionNames, pi) {
375376
return nil
376377
}
378+
for _, id := range pi.IDsInDDLToIgnore() {
379+
if id == pid {
380+
return nil
381+
}
382+
}
377383
}
378384
}
379385
}

pkg/meta/model/table.go

+64-1
Original file line numberDiff line numberDiff line change
@@ -1007,6 +1007,69 @@ func (pi *PartitionInfo) SetOriginalPartitionIDs() {
10071007
pi.OriginalPartitionIDsOrder = ids
10081008
}
10091009

1010+
// IDsInDDLToIgnore returns a list of IDs that the current
1011+
// session should not see (may be duplicate errors on insert/update though)
1012+
// For example during truncate or drop partition.
1013+
func (pi *PartitionInfo) IDsInDDLToIgnore() []int64 {
1014+
// TODO:
1015+
// Truncate partition:
1016+
// write only => should not see NewPartitionIDs
1017+
// delete only => should not see DroppingPartitions
1018+
// Drop partition:
1019+
// TODO: Make similar changes as in Truncate Partition:
1020+
// Add a state blocking read and write in the partitions to be dropped,
1021+
// to avoid situations like https://github.com/pingcap/tidb/issues/55888
1022+
// Add partition:
1023+
// TODO: Add tests!
1024+
// Exchange Partition:
1025+
// Currently blocked for GlobalIndex
1026+
// Reorganize Partition:
1027+
// Nothing, since it will create a new copy of the global index.
1028+
// This is due to the global index needs to have two partitions for the same index key
1029+
// TODO: Should we extend the GlobalIndex to have multiple partitions?
1030+
// Maybe from PK/_tidb_rowid + Partition ID
1031+
// to PK/_tidb_rowid + Partition ID + Valid from Schema Version,
1032+
// with support for two entries?
1033+
// Then we could avoid having two copies of the same Global Index
1034+
// just for handling a single SchemaState.
1035+
// If so, could we then replace this?
1036+
switch pi.DDLAction {
1037+
case ActionTruncateTablePartition:
1038+
switch pi.DDLState {
1039+
case StateWriteOnly:
1040+
return pi.NewPartitionIDs
1041+
case StateDeleteOnly, StateDeleteReorganization:
1042+
if len(pi.DroppingDefinitions) == 0 {
1043+
return nil
1044+
}
1045+
ids := make([]int64, 0, len(pi.DroppingDefinitions))
1046+
for _, definition := range pi.DroppingDefinitions {
1047+
ids = append(ids, definition.ID)
1048+
}
1049+
return ids
1050+
}
1051+
case ActionDropTablePartition:
1052+
if len(pi.DroppingDefinitions) > 0 && pi.DDLState == StateDeleteOnly {
1053+
ids := make([]int64, 0, len(pi.DroppingDefinitions))
1054+
for _, def := range pi.DroppingDefinitions {
1055+
ids = append(ids, def.ID)
1056+
}
1057+
return ids
1058+
}
1059+
case ActionAddTablePartition:
1060+
// TODO: Add tests for ADD PARTITION multi-domain with Global Index!
1061+
if len(pi.AddingDefinitions) > 0 {
1062+
ids := make([]int64, 0, len(pi.DroppingDefinitions))
1063+
for _, def := range pi.AddingDefinitions {
1064+
ids = append(ids, def.ID)
1065+
}
1066+
return ids
1067+
}
1068+
// Not supporting Global Indexes: case ActionExchangeTablePartition
1069+
}
1070+
return nil
1071+
}
1072+
10101073
// PartitionState is the state of the partition.
10111074
type PartitionState struct {
10121075
ID int64 `json:"id"`
@@ -1023,7 +1086,7 @@ type PartitionDefinition struct {
10231086
Comment string `json:"comment,omitempty"`
10241087
}
10251088

1026-
// Clone clones ConstraintInfo.
1089+
// Clone clones PartitionDefinition.
10271090
func (ci *PartitionDefinition) Clone() PartitionDefinition {
10281091
nci := *ci
10291092
nci.LessThan = make([]string, len(ci.LessThan))

pkg/planner/core/find_best_task.go

+21-22
Original file line numberDiff line numberDiff line change
@@ -1480,16 +1480,6 @@ func findBestTask4LogicalDataSource(lp base.LogicalPlan, prop *property.Physical
14801480
if canConvertPointGet && path.IsIntHandlePath && !ds.Table.Meta().PKIsHandle && len(ds.PartitionNames) != 1 {
14811481
canConvertPointGet = false
14821482
}
1483-
if canConvertPointGet {
1484-
if path != nil && path.Index != nil && path.Index.Global {
1485-
// Don't convert to point get during ddl
1486-
// TODO: Revisit truncate partition and global index
1487-
if len(ds.TableInfo.GetPartitionInfo().DroppingDefinitions) > 0 ||
1488-
len(ds.TableInfo.GetPartitionInfo().AddingDefinitions) > 0 {
1489-
canConvertPointGet = false
1490-
}
1491-
}
1492-
}
14931483
}
14941484
if canConvertPointGet {
14951485
allRangeIsPoint := true
@@ -2278,26 +2268,35 @@ func (is *PhysicalIndexScan) addSelectionConditionForGlobalIndex(p *logicalop.Da
22782268
needNot := false
22792269
pInfo := p.TableInfo.GetPartitionInfo()
22802270
if len(idxArr) == 1 && idxArr[0] == FullRange {
2281-
// Only filter adding and dropping partitions.
2282-
if len(pInfo.AddingDefinitions) == 0 && len(pInfo.DroppingDefinitions) == 0 {
2283-
return conditions, nil
2284-
}
2271+
// Filter away partitions that may exists in Global Index,
2272+
// but should not be seen.
22852273
needNot = true
2286-
for _, p := range pInfo.AddingDefinitions {
2287-
args = append(args, expression.NewInt64Const(p.ID))
2288-
}
2289-
for _, p := range pInfo.DroppingDefinitions {
2290-
args = append(args, expression.NewInt64Const(p.ID))
2274+
for _, id := range pInfo.IDsInDDLToIgnore() {
2275+
args = append(args, expression.NewInt64Const(id))
22912276
}
22922277
} else if len(idxArr) == 0 {
2293-
// add an invalid pid as param for `IN` function
2278+
// TODO: Can we change to Table Dual somehow?
2279+
// Add an invalid pid as param for `IN` function
22942280
args = append(args, expression.NewInt64Const(-1))
22952281
} else {
2296-
// `PartitionPruning`` func does not return adding and dropping partitions
2282+
// TODO: When PartitionPruning is guaranteed to not
2283+
// return old/blocked partition ids then ignoreMap can be removed.
2284+
ignoreMap := make(map[int64]struct{})
2285+
for _, id := range pInfo.IDsInDDLToIgnore() {
2286+
ignoreMap[id] = struct{}{}
2287+
}
22972288
for _, idx := range idxArr {
2298-
args = append(args, expression.NewInt64Const(pInfo.Definitions[idx].ID))
2289+
id := pInfo.Definitions[idx].ID
2290+
_, ok := ignoreMap[id]
2291+
if !ok {
2292+
args = append(args, expression.NewInt64Const(id))
2293+
}
2294+
intest.Assert(!ok, "PartitionPruning returns partitions which should be ignored!")
22992295
}
23002296
}
2297+
if len(args) == 1 {
2298+
return conditions, nil
2299+
}
23012300
condition, err := expression.NewFunction(p.SCtx().GetExprCtx(), ast.In, types.NewFieldType(mysql.TypeLonglong), args...)
23022301
if err != nil {
23032302
return nil, err

0 commit comments

Comments
 (0)