Skip to content

Commit

Permalink
Merge branch 'master' into close_recordset
Browse files Browse the repository at this point in the history
  • Loading branch information
hawkingrei authored Dec 14, 2022
2 parents 0f2b00c + d2eca72 commit 5c168d9
Show file tree
Hide file tree
Showing 7 changed files with 118 additions and 39 deletions.
1 change: 1 addition & 0 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2167,6 +2167,7 @@ func ResetContextOfStmt(ctx sessionctx.Context, s ast.StmtNode) (err error) {
vars.ClearStmtVars()
vars.PrevFoundInBinding = vars.FoundInBinding
vars.FoundInBinding = false
vars.DurationWaitTS = 0
return
}

Expand Down
86 changes: 50 additions & 36 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -3055,22 +3055,21 @@ func loadCollationParameter(ctx context.Context, se *session) (bool, error) {
return false, nil
}

type tableBasicInfo struct {
SQL string
id int64
}

var (
errResultIsEmpty = dbterror.ClassExecutor.NewStd(errno.ErrResultIsEmpty)
// DDLJobTables is a list of tables definitions used in concurrent DDL.
DDLJobTables = []struct {
SQL string
id int64
}{
DDLJobTables = []tableBasicInfo{
{ddl.JobTableSQL, ddl.JobTableID},
{ddl.ReorgTableSQL, ddl.ReorgTableID},
{ddl.HistoryTableSQL, ddl.HistoryTableID},
}
// BackfillTables is a list of tables definitions used in dist reorg DDL.
BackfillTables = []struct {
SQL string
id int64
}{
BackfillTables = []tableBasicInfo{
{ddl.BackfillTableSQL, ddl.BackfillTableID},
{ddl.BackfillHistoryTableSQL, ddl.BackfillHistoryTableID},
}
Expand All @@ -3091,7 +3090,7 @@ func splitAndScatterTable(store kv.Storage, tableIDs []int64) {
}
}

// InitDDLJobTables is to create tidb_ddl_job, tidb_ddl_reorg and tidb_ddl_history, or tidb_ddl_backfill and tidb_ddl_backfill_history.
// InitDDLJobTables is to create tidb_ddl_job, tidb_ddl_reorg, tidb_ddl_history, tidb_ddl_backfill and tidb_ddl_backfill_history.
func InitDDLJobTables(store kv.Storage) error {
return kv.RunInNewTxn(kv.WithInternalSourceType(context.Background(), kv.InternalTxnDDL), store, true, func(ctx context.Context, txn kv.Transaction) error {
t := meta.NewMeta(txn)
Expand All @@ -3103,41 +3102,56 @@ func InitDDLJobTables(store kv.Storage) error {
if err != nil {
return err
}
tables := append(DDLJobTables, BackfillTables...)
if exists {
tblExist, err := t.CheckTableExists(dbID, BackfillTables[0].id)
if err != nil || tblExist {
return errors.Trace(err)
}
tables = BackfillTables
return initBackfillJobTables(store, t, dbID)
}
tableIDs := make([]int64, 0, len(tables))
for _, tbl := range tables {
tableIDs = append(tableIDs, tbl.id)

if err = createAndSplitTables(store, t, dbID, DDLJobTables); err != nil {
return err
}
splitAndScatterTable(store, tableIDs)
p := parser.New()
for _, tbl := range tables {
stmt, err := p.ParseOneStmt(tbl.SQL, "", "")
if err != nil {
return errors.Trace(err)
}
tblInfo, err := ddl.BuildTableInfoFromAST(stmt.(*ast.CreateTableStmt))
if err != nil {
return errors.Trace(err)
}
tblInfo.State = model.StatePublic
tblInfo.ID = tbl.id
tblInfo.UpdateTS = t.StartTS
err = t.CreateTableOrView(dbID, tblInfo)
if err != nil {
return errors.Trace(err)
}
if err = initBackfillJobTables(store, t, dbID); err != nil {
return err
}
return t.SetDDLTables()
})
}

// initBackfillJobTables is to create tidb_ddl_backfill and tidb_ddl_backfill_history.
func initBackfillJobTables(store kv.Storage, t *meta.Meta, dbID int64) error {
tblExist, err := t.CheckTableExists(dbID, BackfillTables[0].id)
if err != nil || tblExist {
return errors.Trace(err)
}
return createAndSplitTables(store, t, dbID, BackfillTables)
}

func createAndSplitTables(store kv.Storage, t *meta.Meta, dbID int64, tables []tableBasicInfo) error {
tableIDs := make([]int64, 0, len(tables))
for _, tbl := range tables {
tableIDs = append(tableIDs, tbl.id)
}
splitAndScatterTable(store, tableIDs)
p := parser.New()
for _, tbl := range tables {
stmt, err := p.ParseOneStmt(tbl.SQL, "", "")
if err != nil {
return errors.Trace(err)
}
tblInfo, err := ddl.BuildTableInfoFromAST(stmt.(*ast.CreateTableStmt))
if err != nil {
return errors.Trace(err)
}
tblInfo.State = model.StatePublic
tblInfo.ID = tbl.id
tblInfo.UpdateTS = t.StartTS
err = t.CreateTableOrView(dbID, tblInfo)
if err != nil {
return errors.Trace(err)
}
}
return nil
}

// InitMDLTable is to create tidb_mdl_info, which is used for metadata lock.
func InitMDLTable(store kv.Storage) error {
return kv.RunInNewTxn(kv.WithInternalSourceType(context.Background(), kv.InternalTxnDDL), store, true, func(ctx context.Context, txn kv.Transaction) error {
Expand Down
3 changes: 3 additions & 0 deletions sessiontxn/isolation/readcommitted.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package isolation

import (
"context"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
Expand Down Expand Up @@ -186,9 +187,11 @@ func (p *PessimisticRCTxnContextProvider) getStmtTS() (ts uint64, err error) {
}

p.prepareStmtTS()
start := time.Now()
if ts, err = p.stmtTSFuture.Wait(); err != nil {
return 0, err
}
p.sctx.GetSessionVars().DurationWaitTS += time.Since(start)

txn.SetOption(kv.SnapshotTS, ts)
p.stmtTS = ts
Expand Down
3 changes: 3 additions & 0 deletions sessiontxn/isolation/repeatable_read.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package isolation

import (
"context"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
Expand Down Expand Up @@ -83,9 +84,11 @@ func (p *PessimisticRRTxnContextProvider) getForUpdateTs() (ts uint64, err error
txnCtx := p.sctx.GetSessionVars().TxnCtx
futureTS := newOracleFuture(p.ctx, p.sctx, txnCtx.TxnScope)

start := time.Now()
if ts, err = futureTS.Wait(); err != nil {
return 0, err
}
p.sctx.GetSessionVars().DurationWaitTS += time.Since(start)

txnCtx.SetForUpdateTS(ts)
txn.SetOption(kv.SnapshotTS, ts)
Expand Down
22 changes: 22 additions & 0 deletions sessiontxn/isolation/repeatable_read_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -678,3 +678,25 @@ func initializeRepeatableReadProvider(t *testing.T, tk *testkit.TestKit, active
require.NoError(t, tk.Session().PrepareTxnCtx(context.TODO()))
return assert.CheckAndGetProvider(t)
}

func TestRRWaitTSTimeInSlowLog(t *testing.T) {
store := testkit.CreateMockStore(t)

tk := testkit.NewTestKit(t, store)
se := tk.Session()

tk.MustExec("use test")
tk.MustExec("create table t (id int primary key, v int)")
tk.MustExec("insert into t values (1, 1)")

tk.MustExec("begin pessimistic")
waitTS1 := se.GetSessionVars().DurationWaitTS
tk.MustExec("update t set v = v + 10 where id = 1")
waitTS2 := se.GetSessionVars().DurationWaitTS
tk.MustExec("delete from t")
waitTS3 := se.GetSessionVars().DurationWaitTS
tk.MustExec("commit")
require.NotEqual(t, waitTS1, waitTS2)
require.NotEqual(t, waitTS1, waitTS3)
require.NotEqual(t, waitTS2, waitTS3)
}
29 changes: 29 additions & 0 deletions sessiontxn/txn_rc_tso_optimize_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -790,3 +790,32 @@ func TestConflictErrorsUseRcWriteCheckTs(t *testing.T) {

require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/assertPessimisticLockErr"))
}

func TestRcWaitTSInSlowLog(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)

tk.MustExec("set global transaction_isolation = 'READ-COMMITTED'")
tk.RefreshSession()
sctx := tk.Session()

tk.MustExec("use test")
tk.MustExec("drop table if exists t1")
tk.MustExec("create table t1(id1 int, id2 int, id3 int, PRIMARY KEY(id1), UNIQUE KEY udx_id2 (id2))")
tk.MustExec("insert into t1 values (1, 1, 1), (2, 2, 2), (3, 3, 3)")

res := tk.MustQuery("show variables like 'transaction_isolation'")
require.Equal(t, "READ-COMMITTED", res.Rows()[0][1])
sctx.SetValue(sessiontxn.TsoRequestCount, 0)

tk.MustExec("begin pessimistic")
waitTs1 := sctx.GetSessionVars().DurationWaitTS
tk.MustExec("update t1 set id3 = id3 + 10 where id1 = 1")
waitTs2 := sctx.GetSessionVars().DurationWaitTS
tk.MustExec("update t1 set id3 = id3 + 10 where id1 > 3 and id1 < 6")
waitTs3 := sctx.GetSessionVars().DurationWaitTS
tk.MustExec("commit")
require.NotEqual(t, waitTs1, waitTs2)
require.NotEqual(t, waitTs1, waitTs2)
require.NotEqual(t, waitTs2, waitTs3)
}
13 changes: 10 additions & 3 deletions ttl/ttlworker/job_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,9 +143,13 @@ func (m *JobManager) jobLoop() error {
}
cancel()
case <-updateScanTaskStateTicker:
m.updateTaskState()
if m.updateTaskState() {
m.rescheduleJobs(se, now)
}
case <-m.notifyStateCh:
m.updateTaskState()
if m.updateTaskState() {
m.rescheduleJobs(se, now)
}
case <-jobCheckTicker:
m.checkFinishedJob(se, now)
m.checkNotOwnJob()
Expand Down Expand Up @@ -263,7 +267,8 @@ func (m *JobManager) resizeWorkers(workers []worker, count int, factory func() w
return workers, nil, nil
}

func (m *JobManager) updateTaskState() {
// updateTaskState polls the result from scan worker and returns whether there are result polled
func (m *JobManager) updateTaskState() bool {
results := m.pollScanWorkerResults()
for _, result := range results {
job := findJobWithTableID(m.runningJobs, result.task.tbl.ID)
Expand All @@ -276,6 +281,8 @@ func (m *JobManager) updateTaskState() {
job.finishedScanTaskCounter += 1
job.scanTaskErr = multierr.Append(job.scanTaskErr, result.err)
}

return len(results) > 0
}

func (m *JobManager) pollScanWorkerResults() []*ttlScanTaskExecResult {
Expand Down

0 comments on commit 5c168d9

Please sign in to comment.