Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor, session: Add Detach() method to detach the record set #54091

Merged
merged 1 commit into from
Jun 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/executor/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ go_library(
"//pkg/executor/lockstats",
"//pkg/executor/metrics",
"//pkg/executor/sortexec",
"//pkg/executor/staticrecordset",
"//pkg/executor/unionexec",
"//pkg/expression",
"//pkg/expression/aggregation",
Expand Down
12 changes: 12 additions & 0 deletions pkg/executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/pingcap/tidb/pkg/domain"
"github.com/pingcap/tidb/pkg/executor/internal/exec"
executor_metrics "github.com/pingcap/tidb/pkg/executor/metrics"
"github.com/pingcap/tidb/pkg/executor/staticrecordset"
"github.com/pingcap/tidb/pkg/expression"
"github.com/pingcap/tidb/pkg/infoschema"
"github.com/pingcap/tidb/pkg/keyspace"
Expand Down Expand Up @@ -241,6 +242,17 @@ func (a *recordSet) OnFetchReturned() {
a.stmt.LogSlowQuery(a.txnStartTS, len(a.lastErrs) == 0, true)
}

// Detach creates a new `RecordSet` which doesn't depend on the current session context.
func (a *recordSet) TryDetach() (sqlexec.RecordSet, bool, error) {
// TODO: also detach the executor. Currently, the executor inside may contain the session context. Once
// the executor itself supports detach, we should also detach it here.
e, ok := a.executor.(*TableReaderExecutor)
if !ok {
return nil, false, nil
}
return staticrecordset.New(a.Fields(), e, a.stmt.GetTextToLog(false)), true, nil
}

// ExecStmt implements the sqlexec.Statement interface, it builds a planner.Plan to an sqlexec.Statement.
type ExecStmt struct {
// GoCtx stores parent go context.Context for a stmt.
Expand Down
31 changes: 31 additions & 0 deletions pkg/executor/staticrecordset/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "staticrecordset",
srcs = ["recordset.go"],
importpath = "github.com/pingcap/tidb/pkg/executor/staticrecordset",
visibility = ["//visibility:public"],
deps = [
"//pkg/executor/internal/exec",
"//pkg/parser/ast",
"//pkg/util",
"//pkg/util/chunk",
"//pkg/util/logutil",
"//pkg/util/sqlexec",
"@org_uber_go_zap//:zap",
],
)

go_test(
name = "staticrecordset_test",
timeout = "short",
srcs = ["integration_test.go"],
flaky = True,
shard_count = 4,
deps = [
"//pkg/testkit",
"//pkg/util/sqlexec",
"@com_github_stretchr_testify//require",
"@com_github_tikv_client_go_v2//tikv",
],
)
150 changes: 150 additions & 0 deletions pkg/executor/staticrecordset/integration_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package staticrecordset_test

import (
"context"
"testing"
"time"

"github.com/pingcap/tidb/pkg/testkit"
"github.com/pingcap/tidb/pkg/util/sqlexec"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/tikv"
)

func TestStaticRecordSet(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)

tk.MustExec("use test")
tk.MustExec("create table t(id int)")
tk.MustExec("insert into t values (1), (2), (3)")

rs, err := tk.Exec("select * from t")
require.NoError(t, err)
drs := rs.(sqlexec.DetachableRecordSet)
srs, ok, err := drs.TryDetach()
require.True(t, ok)
require.NoError(t, err)

// check schema
require.Len(t, srs.Fields(), 1)
require.Equal(t, "id", srs.Fields()[0].Column.Name.O)

// check data
chk := srs.NewChunk(nil)
err = srs.Next(context.Background(), chk)
require.NoError(t, err)
require.Equal(t, 3, chk.NumRows())
require.Equal(t, int64(1), chk.GetRow(0).GetInt64(0))
require.Equal(t, int64(2), chk.GetRow(1).GetInt64(0))
require.Equal(t, int64(3), chk.GetRow(2).GetInt64(0))

require.NoError(t, srs.Close())
}

func TestStaticRecordSetWithTxn(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)

tk.MustExec("use test")
tk.MustExec("create table t(id int)")
tk.MustExec("insert into t values (1), (2), (3)")

rs, err := tk.Exec("select * from t")
require.NoError(t, err)
txn, err := tk.Session().Txn(false)
require.NoError(t, err)
require.True(t, txn.Valid())
drs := rs.(sqlexec.DetachableRecordSet)
srs, ok, err := drs.TryDetach()
require.True(t, ok)
require.NoError(t, err)

// The transaction should have been committed.
txn, err = tk.Session().Txn(false)
require.NoError(t, err)
require.False(t, txn.Valid())

// Now, it's fine to run another statement on the session
// remove all existing data in the table
tk.MustExec("truncate table t")

// check data
chk := srs.NewChunk(nil)
err = srs.Next(context.Background(), chk)
require.NoError(t, err)
require.Equal(t, 3, chk.NumRows())
require.Equal(t, int64(1), chk.GetRow(0).GetInt64(0))
require.Equal(t, int64(2), chk.GetRow(1).GetInt64(0))
require.Equal(t, int64(3), chk.GetRow(2).GetInt64(0))

require.NoError(t, srs.Close())
}

func TestStaticRecordSetExceedGCTime(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)

tk.MustExec("use test")
tk.MustExec("create table t(id int)")
tk.MustExec("insert into t values (1), (2), (3)")

rs, err := tk.Exec("select * from t")
require.NoError(t, err)
// Get the startTS
txn, err := tk.Session().Txn(false)
require.NoError(t, err)
startTS := txn.StartTS()

// Detach the record set
drs := rs.(sqlexec.DetachableRecordSet)
srs, ok, err := drs.TryDetach()
require.True(t, ok)
require.NoError(t, err)

// Now, it's fine to run another statement on the session
// remove all existing data in the table
tk.MustExec("truncate table t")

// Update the safe point
store.(tikv.Storage).UpdateSPCache(startTS+1, time.Now())

// Check data, it'll get an error
chk := srs.NewChunk(nil)
err = srs.Next(context.Background(), chk)
require.Error(t, err)
require.NoError(t, srs.Close())
}

func TestDetachError(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)

tk.MustExec("use test")
tk.MustExec("create table t(id int)")
tk.MustExec("insert into t values (1), (2), (3)")

// explicit transaction is not allowed
tk.MustExec("begin")
rs, err := tk.Exec("select * from t")
require.NoError(t, err)
drs2 := rs.(sqlexec.DetachableRecordSet)
_, ok, err := drs2.TryDetach()
require.False(t, ok)
require.NoError(t, err)
tk.MustExec("commit")
}
79 changes: 79 additions & 0 deletions pkg/executor/staticrecordset/recordset.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package staticrecordset

import (
"context"

"github.com/pingcap/tidb/pkg/executor/internal/exec"
"github.com/pingcap/tidb/pkg/parser/ast"
"github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tidb/pkg/util/chunk"
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/pingcap/tidb/pkg/util/sqlexec"
"go.uber.org/zap"
)

var _ sqlexec.RecordSet = &staticRecordSet{}

type staticRecordSet struct {
fields []*ast.ResultField
executor exec.Executor

sqlText string
}

// New creates a new staticRecordSet
func New(fields []*ast.ResultField, executor exec.Executor, sqlText string) sqlexec.RecordSet {
return &staticRecordSet{
fields: fields,
executor: executor,
sqlText: sqlText,
}
}

func (s *staticRecordSet) Fields() []*ast.ResultField {
return s.fields
}

func (s *staticRecordSet) Next(ctx context.Context, req *chunk.Chunk) (err error) {
defer func() {
r := recover()
if r == nil {
return
}
err = util.GetRecoverError(r)
logutil.Logger(ctx).Error("execute sql panic", zap.String("sql", s.sqlText), zap.Stack("stack"))
}()

return exec.Next(ctx, s.executor, req)
}

// NewChunk create a chunk base on top-level executor's exec.NewFirstChunk().
func (s *staticRecordSet) NewChunk(alloc chunk.Allocator) *chunk.Chunk {
if alloc == nil {
return exec.NewFirstChunk(s.executor)
}

return alloc.Alloc(s.executor.RetFieldTypes(), s.executor.InitCap(), s.executor.MaxChunkSize())
}

// Close closes the executor.
func (s *staticRecordSet) Close() error {
err := exec.Close(s.executor)
s.executor = nil

return err
}
34 changes: 34 additions & 0 deletions pkg/session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -2379,6 +2379,40 @@ func (rs *execStmtResult) Close() error {
return err2
}

func (rs *execStmtResult) TryDetach() (sqlexec.RecordSet, bool, error) {
if !rs.sql.IsReadOnly(rs.se.GetSessionVars()) {
return nil, false, nil
}
if !plannercore.IsAutoCommitTxn(rs.se.GetSessionVars()) {
return nil, false, nil
}

drs, ok := rs.RecordSet.(sqlexec.DetachableRecordSet)
if !ok {
return nil, false, nil
}
detachedRS, ok, err := drs.TryDetach()
if !ok || err != nil {
return nil, ok, err
}

// FIXME: block the min-start-ts. Now the `min-start-ts` will advance and exceed the `startTS` used by
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This FIXME will be fixed in #54114

// this detached record set, and may cause an error if `GC` runs.

// Now, a transaction is not needed for the detached record set, so we commit the transaction and cleanup
// the session state.
err = finishStmt(context.Background(), rs.se, nil, rs.sql)
if err != nil {
err2 := detachedRS.Close()
if err2 != nil {
logutil.BgLogger().Error("close detached record set failed", zap.Error(err2))
}
return nil, false, err
}

return detachedRS, true, nil
}

// rollbackOnError makes sure the next statement starts a new transaction with the latest InfoSchema.
func (s *session) rollbackOnError(ctx context.Context) {
if !s.sessionVars.InTxn() {
Expand Down
16 changes: 16 additions & 0 deletions pkg/util/sqlexec/restricted_sql_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,22 @@ type RecordSet interface {
Close() error
}

// DetachableRecordSet extends the `RecordSet` to support detaching from current session context
type DetachableRecordSet interface {
RecordSet

// TryDetach detaches the record set from the current session context.
//
// The last two return value indicates whether the record set is suitable for detaching, and whether
// it detaches successfully. If it faces any error during detaching (and there is no way to rollback),
// it will return the error. If an error is returned, the record set (and session) will be left at
// an unknown state.
//
// If the caller receives `_, false, _`, it means the original record set can still be used. If the caller
// receives an error, it means the original record set (and the session) is dirty.
TryDetach() (RecordSet, bool, error)
}

// MultiQueryNoDelayResult is an interface for one no-delay result for one statement in multi-queries.
type MultiQueryNoDelayResult interface {
// AffectedRows return affected row for one statement in multi-queries.
Expand Down