Skip to content

Commit

Permalink
executor: get the right result for stmt `select ... for update union …
Browse files Browse the repository at this point in the history
…select …` (#31956)

close #31530
  • Loading branch information
lcwangchao authored Jan 27, 2022
1 parent d9b0c3a commit 07b0c7c
Show file tree
Hide file tree
Showing 6 changed files with 149 additions and 106 deletions.
52 changes: 37 additions & 15 deletions executor/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1303,7 +1303,7 @@ func (tc indexJoinTestCase) getMockDataSourceOptByRows(rows int) mockDataSourceP
}
}

func prepare4IndexInnerHashJoin(tc *indexJoinTestCase, outerDS *mockDataSource, innerDS *mockDataSource) Executor {
func prepare4IndexInnerHashJoin(tc *indexJoinTestCase, outerDS *mockDataSource, innerDS *mockDataSource) (Executor, error) {
outerCols, innerCols := tc.columns(), tc.columns()
joinSchema := expression.NewSchema(outerCols...)
joinSchema.Append(innerCols...)
Expand All @@ -1317,6 +1317,13 @@ func prepare4IndexInnerHashJoin(tc *indexJoinTestCase, outerDS *mockDataSource,
for i := range keyOff2IdxOff {
keyOff2IdxOff[i] = i
}

readerBuilder, err := newExecutorBuilder(tc.ctx, nil, nil, 0, false, oracle.GlobalTxnScope).
newDataReaderBuilder(&mockPhysicalIndexReader{e: innerDS})
if err != nil {
return nil, err
}

e := &IndexLookUpJoin{
baseExecutor: newBaseExecutor(tc.ctx, joinSchema, 1, outerDS),
outerCtx: outerCtx{
Expand All @@ -1325,7 +1332,7 @@ func prepare4IndexInnerHashJoin(tc *indexJoinTestCase, outerDS *mockDataSource,
hashCols: tc.outerHashKeyIdx,
},
innerCtx: innerCtx{
readerBuilder: &dataReaderBuilder{Plan: &mockPhysicalIndexReader{e: innerDS}, executorBuilder: newExecutorBuilder(tc.ctx, nil, nil, 0, false, oracle.GlobalTxnScope)},
readerBuilder: readerBuilder,
rowTypes: rightTypes,
colLens: colLens,
keyCols: tc.innerJoinKeyIdx,
Expand All @@ -1338,21 +1345,24 @@ func prepare4IndexInnerHashJoin(tc *indexJoinTestCase, outerDS *mockDataSource,
lastColHelper: nil,
}
e.joinResult = newFirstChunk(e)
return e
return e, nil
}

func prepare4IndexOuterHashJoin(tc *indexJoinTestCase, outerDS *mockDataSource, innerDS *mockDataSource) Executor {
e := prepare4IndexInnerHashJoin(tc, outerDS, innerDS).(*IndexLookUpJoin)
idxHash := &IndexNestedLoopHashJoin{IndexLookUpJoin: *e}
func prepare4IndexOuterHashJoin(tc *indexJoinTestCase, outerDS *mockDataSource, innerDS *mockDataSource) (Executor, error) {
e, err := prepare4IndexInnerHashJoin(tc, outerDS, innerDS)
if err != nil {
return nil, err
}
idxHash := &IndexNestedLoopHashJoin{IndexLookUpJoin: *e.(*IndexLookUpJoin)}
concurrency := tc.concurrency
idxHash.joiners = make([]joiner, concurrency)
for i := 0; i < concurrency; i++ {
idxHash.joiners[i] = e.joiner.Clone()
idxHash.joiners[i] = e.(*IndexLookUpJoin).joiner.Clone()
}
return idxHash
return idxHash, nil
}

func prepare4IndexMergeJoin(tc *indexJoinTestCase, outerDS *mockDataSource, innerDS *mockDataSource) Executor {
func prepare4IndexMergeJoin(tc *indexJoinTestCase, outerDS *mockDataSource, innerDS *mockDataSource) (Executor, error) {
outerCols, innerCols := tc.columns(), tc.columns()
joinSchema := expression.NewSchema(outerCols...)
joinSchema.Append(innerCols...)
Expand Down Expand Up @@ -1381,6 +1391,13 @@ func prepare4IndexMergeJoin(tc *indexJoinTestCase, outerDS *mockDataSource, inne
compareFuncs = append(compareFuncs, expression.GetCmpFunction(nil, outerJoinKeys[i], innerJoinKeys[i]))
outerCompareFuncs = append(outerCompareFuncs, expression.GetCmpFunction(nil, outerJoinKeys[i], outerJoinKeys[i]))
}

readerBuilder, err := newExecutorBuilder(tc.ctx, nil, nil, 0, false, oracle.GlobalTxnScope).
newDataReaderBuilder(&mockPhysicalIndexReader{e: innerDS})
if err != nil {
return nil, err
}

e := &IndexLookUpMergeJoin{
baseExecutor: newBaseExecutor(tc.ctx, joinSchema, 2, outerDS),
outerMergeCtx: outerMergeCtx{
Expand All @@ -1391,7 +1408,7 @@ func prepare4IndexMergeJoin(tc *indexJoinTestCase, outerDS *mockDataSource, inne
compareFuncs: outerCompareFuncs,
},
innerMergeCtx: innerMergeCtx{
readerBuilder: &dataReaderBuilder{Plan: &mockPhysicalIndexReader{e: innerDS}, executorBuilder: newExecutorBuilder(tc.ctx, nil, nil, 0, false, oracle.GlobalTxnScope)},
readerBuilder: readerBuilder,
rowTypes: rightTypes,
joinKeys: innerJoinKeys,
colLens: colLens,
Expand All @@ -1409,7 +1426,7 @@ func prepare4IndexMergeJoin(tc *indexJoinTestCase, outerDS *mockDataSource, inne
joiners[i] = newJoiner(tc.ctx, 0, false, defaultValues, nil, leftTypes, rightTypes, nil)
}
e.joiners = joiners
return e
return e, nil
}

type indexJoinType int8
Expand All @@ -1431,13 +1448,18 @@ func benchmarkIndexJoinExecWithCase(
for i := 0; i < b.N; i++ {
b.StopTimer()
var exec Executor
var err error
switch execType {
case indexInnerHashJoin:
exec = prepare4IndexInnerHashJoin(tc, outerDS, innerDS)
exec, err = prepare4IndexInnerHashJoin(tc, outerDS, innerDS)
case indexOuterHashJoin:
exec = prepare4IndexOuterHashJoin(tc, outerDS, innerDS)
exec, err = prepare4IndexOuterHashJoin(tc, outerDS, innerDS)
case indexMergeJoin:
exec = prepare4IndexMergeJoin(tc, outerDS, innerDS)
exec, err = prepare4IndexMergeJoin(tc, outerDS, innerDS)
}

if err != nil {
b.Fatal(err)
}

tmpCtx := context.Background()
Expand All @@ -1446,7 +1468,7 @@ func benchmarkIndexJoinExecWithCase(
innerDS.prepareChunks()

b.StartTimer()
if err := exec.Open(tmpCtx); err != nil {
if err = exec.Open(tmpCtx); err != nil {
b.Fatal(err)
}
for {
Expand Down
107 changes: 85 additions & 22 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ var (
type executorBuilder struct {
ctx sessionctx.Context
is infoschema.InfoSchema
snapshotTS uint64 // The consistent snapshot timestamp for the executor to read data.
snapshotTS uint64 // The ts for snapshot-read. A select statement without for update will use this ts
forUpdateTS uint64 // The ts should be used by insert/update/delete/select-for-update statement
snapshotTSCached bool
err error // err is set when there is error happened during Executor building process.
hasLock bool
Expand All @@ -97,6 +98,14 @@ type executorBuilder struct {
inUpdateStmt bool
inDeleteStmt bool
inInsertStmt bool
inSelectLockStmt bool

// forDataReaderBuilder indicates whether the builder is used by a dataReaderBuilder.
// When forDataReader is true, the builder should use the dataReaderTS as the executor read ts. This is because
// dataReaderBuilder can be used in concurrent goroutines, so we must ensure that getting the ts should be thread safe and
// can return a correct value even if the session context has already been destroyed
forDataReaderBuilder bool
dataReaderTS uint64
}

// CTEStorages stores resTbl and iterInTbl for CTEExec.
Expand Down Expand Up @@ -630,12 +639,16 @@ func (b *executorBuilder) buildDeallocate(v *plannercore.Deallocate) Executor {
}

func (b *executorBuilder) buildSelectLock(v *plannercore.PhysicalLock) Executor {
if !b.inSelectLockStmt {
b.inSelectLockStmt = true
defer func() { b.inSelectLockStmt = false }()
}
b.hasLock = true
if b.err = b.updateForUpdateTSIfNeeded(v.Children()[0]); b.err != nil {
return nil
}
// Build 'select for update' using the 'for update' ts.
b.snapshotTS = b.ctx.GetSessionVars().TxnCtx.GetForUpdateTS()
b.forUpdateTS = b.ctx.GetSessionVars().TxnCtx.GetForUpdateTS()

src := b.build(v.Children()[0])
if b.err != nil {
Expand Down Expand Up @@ -822,7 +835,7 @@ func (b *executorBuilder) buildInsert(v *plannercore.Insert) Executor {
return nil
}
}
b.snapshotTS = b.ctx.GetSessionVars().TxnCtx.GetForUpdateTS()
b.forUpdateTS = b.ctx.GetSessionVars().TxnCtx.GetForUpdateTS()
selectExec := b.build(v.SelectPlan)
if b.err != nil {
return nil
Expand Down Expand Up @@ -1476,8 +1489,24 @@ func (b *executorBuilder) buildTableDual(v *plannercore.PhysicalTableDual) Execu
return e
}

// `getSnapshotTS` returns the timestamp of the snapshot that a reader should read.
// `getSnapshotTS` returns for-update-ts if in insert/update/delete/lock statement otherwise the isolation read ts
// Please notice that in RC isolation, the above two ts are the same
func (b *executorBuilder) getSnapshotTS() (uint64, error) {
if b.forDataReaderBuilder {
return b.dataReaderTS, nil
}

if (b.inInsertStmt || b.inUpdateStmt || b.inDeleteStmt || b.inSelectLockStmt) && b.forUpdateTS != 0 {
return b.forUpdateTS, nil
}

return b.getReadTS()
}

// getReadTS returns the ts used by select (without for-update clause). The return value is affected by the isolation level
// and some stale/historical read contexts. For example, it will return txn.StartTS in RR and return
// the current timestamp in RC isolation
func (b *executorBuilder) getReadTS() (uint64, error) {
// `refreshForUpdateTSForRC` should always be invoked before returning the cached value to
// ensure the correct value is returned even the `snapshotTS` field is already set by other
// logics. However for `IndexLookUpMergeJoin` and `IndexLookUpHashJoin`, it requires caching the
Expand Down Expand Up @@ -1903,17 +1932,6 @@ func (b *executorBuilder) buildMaxOneRow(v *plannercore.PhysicalMaxOneRow) Execu
}

func (b *executorBuilder) buildUnionAll(v *plannercore.PhysicalUnionAll) Executor {
// A quick fix for avoiding a race mentioned in issue #30468.
// Fetch the snapshot ts to make the transaction's state ready. Otherwise, multiple threads in the Union executor
// may change the transaction's state concurrently, which causes race.
// This fix is a hack, but with minimal change to the current code and works. Actually, the usage of the transaction
// states and the logic to access the snapshot ts should all be refactored.
_, err := b.getSnapshotTS()
if err != nil {
b.err = err
return nil
}

childExecs := make([]Executor, len(v.Children()))
for i, child := range v.Children() {
childExecs[i] = b.build(child)
Expand Down Expand Up @@ -2011,7 +2029,7 @@ func (b *executorBuilder) buildUpdate(v *plannercore.Update) Executor {
if b.err = b.updateForUpdateTSIfNeeded(v.SelectPlan); b.err != nil {
return nil
}
b.snapshotTS = b.ctx.GetSessionVars().TxnCtx.GetForUpdateTS()
b.forUpdateTS = b.ctx.GetSessionVars().TxnCtx.GetForUpdateTS()
selExec := b.build(v.SelectPlan)
if b.err != nil {
return nil
Expand Down Expand Up @@ -2068,7 +2086,7 @@ func (b *executorBuilder) buildDelete(v *plannercore.Delete) Executor {
if b.err = b.updateForUpdateTSIfNeeded(v.SelectPlan); b.err != nil {
return nil
}
b.snapshotTS = b.ctx.GetSessionVars().TxnCtx.GetForUpdateTS()
b.forUpdateTS = b.ctx.GetSessionVars().TxnCtx.GetForUpdateTS()
selExec := b.build(v.SelectPlan)
if b.err != nil {
return nil
Expand Down Expand Up @@ -2774,6 +2792,22 @@ func (b *executorBuilder) corColInAccess(p plannercore.PhysicalPlan) bool {
return false
}

func (b *executorBuilder) newDataReaderBuilder(p plannercore.PhysicalPlan) (*dataReaderBuilder, error) {
ts, err := b.getSnapshotTS()
if err != nil {
return nil, err
}

builderForDataReader := *b
builderForDataReader.forDataReaderBuilder = true
builderForDataReader.dataReaderTS = ts

return &dataReaderBuilder{
Plan: p,
executorBuilder: &builderForDataReader,
}, nil
}

func (b *executorBuilder) buildIndexLookUpJoin(v *plannercore.PhysicalIndexJoin) Executor {
outerExec := b.build(v.Children()[1-v.InnerChildIdx])
if b.err != nil {
Expand Down Expand Up @@ -2845,6 +2879,13 @@ func (b *executorBuilder) buildIndexLookUpJoin(v *plannercore.PhysicalIndexJoin)
break
}
}

readerBuilder, err := b.newDataReaderBuilder(innerPlan)
if err != nil {
b.err = err
return nil
}

e := &IndexLookUpJoin{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ID(), outerExec),
outerCtx: outerCtx{
Expand All @@ -2853,7 +2894,7 @@ func (b *executorBuilder) buildIndexLookUpJoin(v *plannercore.PhysicalIndexJoin)
filter: outerFilter,
},
innerCtx: innerCtx{
readerBuilder: &dataReaderBuilder{Plan: innerPlan, executorBuilder: b},
readerBuilder: readerBuilder,
rowTypes: innerTypes,
hashTypes: innerHashTypes,
colLens: v.IdxColLens,
Expand Down Expand Up @@ -2955,6 +2996,12 @@ func (b *executorBuilder) buildIndexLookUpMergeJoin(v *plannercore.PhysicalIndex
}
executorCounterIndexLookUpJoin.Inc()

readerBuilder, err := b.newDataReaderBuilder(innerPlan)
if err != nil {
b.err = err
return nil
}

e := &IndexLookUpMergeJoin{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ID(), outerExec),
outerMergeCtx: outerMergeCtx{
Expand All @@ -2966,7 +3013,7 @@ func (b *executorBuilder) buildIndexLookUpMergeJoin(v *plannercore.PhysicalIndex
compareFuncs: v.OuterCompareFuncs,
},
innerMergeCtx: innerMergeCtx{
readerBuilder: &dataReaderBuilder{Plan: innerPlan, executorBuilder: b},
readerBuilder: readerBuilder,
rowTypes: innerTypes,
joinKeys: v.InnerJoinKeys,
keyCols: innerKeyCols,
Expand Down Expand Up @@ -3469,6 +3516,12 @@ func buildNoRangeIndexLookUpReader(b *executorBuilder, v *plannercore.PhysicalIn
if err != nil {
return nil, err
}

readerBuilder, err := b.newDataReaderBuilder(nil)
if err != nil {
return nil, err
}

e := &IndexLookUpExecutor{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ID()),
dagPB: indexReq,
Expand All @@ -3482,7 +3535,7 @@ func buildNoRangeIndexLookUpReader(b *executorBuilder, v *plannercore.PhysicalIn
indexStreaming: indexStreaming,
tableStreaming: tableStreaming,
indexPaging: indexPaging,
dataReaderBuilder: &dataReaderBuilder{executorBuilder: b},
dataReaderBuilder: readerBuilder,
corColInIdxSide: b.corColInDistPlan(v.IndexPlans),
corColInTblSide: b.corColInDistPlan(v.TablePlans),
corColInAccess: b.corColInAccess(v.IndexPlans[0]),
Expand Down Expand Up @@ -3624,6 +3677,12 @@ func buildNoRangeIndexMergeReader(b *executorBuilder, v *plannercore.PhysicalInd
if err != nil {
return nil, err
}

readerBuilder, err := b.newDataReaderBuilder(nil)
if err != nil {
return nil, err
}

e := &IndexMergeReaderExecutor{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ID()),
dagPBs: partialReqs,
Expand All @@ -3637,7 +3696,7 @@ func buildNoRangeIndexMergeReader(b *executorBuilder, v *plannercore.PhysicalInd
tableStreaming: tableStreaming,
partialPlans: v.PartialPlans,
tblPlans: v.TablePlans,
dataReaderBuilder: &dataReaderBuilder{executorBuilder: b},
dataReaderBuilder: readerBuilder,
feedbacks: feedbacks,
handleCols: ts.HandleCols,
isCorColInPartialFilters: isCorColInPartialFilters,
Expand Down Expand Up @@ -3760,7 +3819,11 @@ func (builder *dataReaderBuilder) buildExecutorForIndexJoinInternal(ctx context.
func (builder *dataReaderBuilder) buildUnionScanForIndexJoin(ctx context.Context, v *plannercore.PhysicalUnionScan,
values []*indexJoinLookUpContent, indexRanges []*ranger.Range, keyOff2IdxOff []int,
cwc *plannercore.ColWithCmpFuncManager, canReorderHandles bool, memTracker *memory.Tracker, interruptSignal *atomic.Value) (Executor, error) {
childBuilder := &dataReaderBuilder{Plan: v.Children()[0], executorBuilder: builder.executorBuilder}
childBuilder, err := builder.newDataReaderBuilder(v.Children()[0])
if err != nil {
return nil, err
}

reader, err := childBuilder.buildExecutorForIndexJoin(ctx, values, indexRanges, keyOff2IdxOff, cwc, canReorderHandles, memTracker, interruptSignal)
if err != nil {
return nil, err
Expand Down
24 changes: 24 additions & 0 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9850,6 +9850,30 @@ func (s *testSerialSuite) TestFix31038(c *C) {
failpoint.Disable("github.com/pingcap/tidb/store/copr/disable-collect-execution")
}

func (s *testSerialSuite) TestFix31530(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk2 := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk2.MustExec("use test")
defer func() {
tk.MustExec("drop table if exists t1")
}()
tk.MustExec("drop table if exists t1")
tk.MustExec("create table t1 (id int primary key, v int)")
tk.MustExec("insert into t1 values(1, 10)")
tk.MustExec("begin pessimistic")
tk.MustQuery("select * from t1").Check(testkit.Rows("1 10"))

// update t1 before session1 transaction not finished
tk2.MustExec("update t1 set v=11 where id=1")

tk.MustQuery("(select 'a' as c, id, v from t1 for update) union all (select 'b', id, v from t1) order by c").Check(testkit.Rows("a 1 11", "b 1 10"))
tk.MustQuery("(select 'a' as c, id, v from t1) union all (select 'b', id, v from t1 for update) order by c").Check(testkit.Rows("a 1 10", "b 1 11"))
tk.MustQuery("(select 'a' as c, id, v from t1 where id=1 for update) union all (select 'b', id, v from t1 where id=1) order by c").Check(testkit.Rows("a 1 11", "b 1 10"))
tk.MustQuery("(select 'a' as c, id, v from t1 where id=1) union all (select 'b', id, v from t1 where id=1 for update) order by c").Check(testkit.Rows("a 1 10", "b 1 11"))
tk.MustExec("rollback")
}

func (s *testSerialSuite) TestFix31537(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
Expand Down
Loading

0 comments on commit 07b0c7c

Please sign in to comment.