Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into s13-add-warning-t…
Browse files Browse the repository at this point in the history
…o-slow-query
  • Loading branch information
time-and-fate committed Jan 4, 2023
2 parents 70760b8 + 73c8cc7 commit 9730c56
Show file tree
Hide file tree
Showing 76 changed files with 2,997 additions and 254 deletions.
18 changes: 17 additions & 1 deletion autoid_service/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "autoid_service",
Expand All @@ -9,6 +9,7 @@ go_library(
"//config",
"//kv",
"//meta",
"//meta/autoid",
"//metrics",
"//owner",
"//parser/model",
Expand All @@ -23,3 +24,18 @@ go_library(
"@org_uber_go_zap//:zap",
],
)

go_test(
name = "autoid_service_test",
srcs = ["autoid_test.go"],
embed = [":autoid_service"],
deps = [
"//parser/model",
"//testkit",
"@com_github_pingcap_kvproto//pkg/autoid",
"@com_github_stretchr_testify//require",
"@io_etcd_go_etcd_tests_v3//integration",
"@org_golang_google_grpc//:grpc",
"@org_golang_google_grpc//credentials/insecure",
],
)
13 changes: 11 additions & 2 deletions autoid_service/autoid.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta"
autoid1 "github.com/pingcap/tidb/meta/autoid"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/owner"
"github.com/pingcap/tidb/parser/model"
Expand Down Expand Up @@ -253,6 +254,7 @@ type Service struct {
func New(selfAddr string, etcdAddr []string, store kv.Storage, tlsConfig *tls.Config) *Service {
cfg := config.GetGlobalConfig()
etcdLogCfg := zap.NewProductionConfig()

cli, err := clientv3.New(clientv3.Config{
LogConfig: &etcdLogCfg,
Endpoints: etcdAddr,
Expand All @@ -270,9 +272,12 @@ func New(selfAddr string, etcdAddr []string, store kv.Storage, tlsConfig *tls.Co
if err != nil {
panic(err)
}
return newWithCli(selfAddr, cli, store)
}

func newWithCli(selfAddr string, cli *clientv3.Client, store kv.Storage) *Service {
l := owner.NewOwnerManager(context.Background(), cli, "autoid", selfAddr, autoIDLeaderPath)
err = l.CampaignOwner()
err := l.CampaignOwner()
if err != nil {
panic(err)
}
Expand All @@ -299,7 +304,7 @@ func (m *mockClient) Rebase(ctx context.Context, in *autoid.RebaseRequest, opts
var global = make(map[string]*mockClient)

// MockForTest is used for testing, the UT test and unistore use this.
func MockForTest(store kv.Storage) *mockClient {
func MockForTest(store kv.Storage) autoid.AutoIDAllocClient {
uuid := store.UUID()
ret, ok := global[uuid]
if !ok {
Expand Down Expand Up @@ -515,3 +520,7 @@ func (s *Service) Rebase(ctx context.Context, req *autoid.RebaseRequest) (*autoi
}
return &autoid.RebaseResponse{}, nil
}

func init() {
autoid1.MockForTest = MockForTest
}
202 changes: 202 additions & 0 deletions autoid_service/autoid_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package autoid

import (
"context"
"fmt"
"math"
"net"
"testing"
"time"

"github.com/pingcap/kvproto/pkg/autoid"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/testkit"
"github.com/stretchr/testify/require"
"go.etcd.io/etcd/tests/v3/integration"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)

type autoIDResp struct {
*autoid.AutoIDResponse
error
*testing.T
}

func (resp autoIDResp) check(min, max int64) {
require.NoError(resp.T, resp.error)
require.Equal(resp.T, resp.AutoIDResponse, &autoid.AutoIDResponse{Min: min, Max: max})
}

func (resp autoIDResp) checkErrmsg() {
require.NoError(resp.T, resp.error)
require.True(resp.T, len(resp.GetErrmsg()) > 0)
}

type rebaseResp struct {
*autoid.RebaseResponse
error
*testing.T
}

func (resp rebaseResp) check(msg string) {
require.NoError(resp.T, resp.error)
require.Equal(resp.T, string(resp.RebaseResponse.GetErrmsg()), msg)
}

func TestAPI(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
cli := MockForTest(store)
tk.MustExec("use test")
tk.MustExec("create table t (id int key auto_increment);")
is := dom.InfoSchema()
dbInfo, ok := is.SchemaByName(model.NewCIStr("test"))
require.True(t, ok)

tbl, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("t"))
require.NoError(t, err)
tbInfo := tbl.Meta()

ctx := context.Background()
checkCurrValue := func(t *testing.T, cli autoid.AutoIDAllocClient, min, max int64) {
req := &autoid.AutoIDRequest{DbID: dbInfo.ID, TblID: tbInfo.ID, N: 0}
resp, err := cli.AllocAutoID(ctx, req)
require.NoError(t, err)
require.Equal(t, resp, &autoid.AutoIDResponse{Min: min, Max: max})
}
autoIDRequest := func(t *testing.T, cli autoid.AutoIDAllocClient, unsigned bool, n uint64, more ...int64) autoIDResp {
increment := int64(1)
offset := int64(1)
if len(more) >= 1 {
increment = more[0]
}
if len(more) >= 2 {
offset = more[1]
}
req := &autoid.AutoIDRequest{DbID: dbInfo.ID, TblID: tbInfo.ID, IsUnsigned: unsigned, N: n, Increment: increment, Offset: offset}
resp, err := cli.AllocAutoID(ctx, req)
return autoIDResp{resp, err, t}
}
rebaseRequest := func(t *testing.T, cli autoid.AutoIDAllocClient, unsigned bool, n int64, force ...struct{}) rebaseResp {
req := &autoid.RebaseRequest{
DbID: dbInfo.ID,
TblID: tbInfo.ID,
Base: n,
IsUnsigned: unsigned,
Force: len(force) > 0,
}
resp, err := cli.Rebase(ctx, req)
return rebaseResp{resp, err, t}
}
var force = struct{}{}

// basic auto id operation
autoIDRequest(t, cli, false, 1).check(0, 1)
autoIDRequest(t, cli, false, 10).check(1, 11)
checkCurrValue(t, cli, 11, 11)
autoIDRequest(t, cli, false, 128).check(11, 139)
autoIDRequest(t, cli, false, 1, 10, 5).check(139, 145)

// basic rebase operation
rebaseRequest(t, cli, false, 666).check("")
autoIDRequest(t, cli, false, 1).check(666, 667)

rebaseRequest(t, cli, false, 6666).check("")
autoIDRequest(t, cli, false, 1).check(6666, 6667)

// rebase will not decrease the value without 'force'
rebaseRequest(t, cli, false, 44).check("")
checkCurrValue(t, cli, 6667, 6667)
rebaseRequest(t, cli, false, 44, force).check("")
checkCurrValue(t, cli, 44, 44)

// max increase 1
rebaseRequest(t, cli, false, math.MaxInt64, force).check("")
checkCurrValue(t, cli, math.MaxInt64, math.MaxInt64)
autoIDRequest(t, cli, false, 1).checkErrmsg()

rebaseRequest(t, cli, true, 0, force).check("")
checkCurrValue(t, cli, 0, 0)
autoIDRequest(t, cli, true, 1).check(0, 1)
autoIDRequest(t, cli, true, 10).check(1, 11)
autoIDRequest(t, cli, true, 128).check(11, 139)
autoIDRequest(t, cli, true, 1, 10, 5).check(139, 145)

// max increase 1
rebaseRequest(t, cli, true, math.MaxInt64).check("")
checkCurrValue(t, cli, math.MaxInt64, math.MaxInt64)
autoIDRequest(t, cli, true, 1).check(math.MaxInt64, math.MinInt64)
autoIDRequest(t, cli, true, 1).check(math.MinInt64, math.MinInt64+1)

rebaseRequest(t, cli, true, -1).check("")
checkCurrValue(t, cli, -1, -1)
autoIDRequest(t, cli, true, 1).check(-1, 0)
}

func TestGRPC(t *testing.T) {
integration.BeforeTestExternal(t)
store := testkit.CreateMockStore(t)
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
defer cluster.Terminate(t)
etcdCli := cluster.RandClient()

var addr string
var listener net.Listener
for port := 10080; ; port++ {
var err error
addr = fmt.Sprintf("127.0.0.1:%d", port)
listener, err = net.Listen("tcp", addr)
if err == nil {
break
}
}
defer listener.Close()

service := newWithCli(addr, etcdCli, store)
defer service.Close()

var i int
for !service.leaderShip.IsOwner() {
time.Sleep(100 * time.Millisecond)
i++
if i >= 20 {
break
}
}
require.Less(t, i, 20)

grpcServer := grpc.NewServer()
autoid.RegisterAutoIDAllocServer(grpcServer, service)
go func() {
grpcServer.Serve(listener)
}()
defer grpcServer.Stop()

grpcConn, err := grpc.Dial("127.0.0.1:10080", grpc.WithTransportCredentials(insecure.NewCredentials()))
require.NoError(t, err)
cli := autoid.NewAutoIDAllocClient(grpcConn)
_, err = cli.AllocAutoID(context.Background(), &autoid.AutoIDRequest{
DbID: 0,
TblID: 0,
N: 1,
Increment: 1,
Offset: 1,
IsUnsigned: false,
})
require.NoError(t, err)
}
1 change: 1 addition & 0 deletions ddl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ go_test(
flaky = True,
shard_count = 50,
deps = [
"//autoid_service",
"//config",
"//ddl/ingest",
"//ddl/placement",
Expand Down
5 changes: 5 additions & 0 deletions ddl/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ func TestFlashbackCloseAndResetPDSchedule(t *testing.T) {
}
dom.DDL().SetHook(hook)

time.Sleep(10 * time.Millisecond)
ts, err := tk.Session().GetStore().GetOracle().GetTimestamp(context.Background(), &oracle.Option{})
require.NoError(t, err)

Expand All @@ -144,6 +145,7 @@ func TestAddDDLDuringFlashback(t *testing.T) {
tk.MustExec("use test")
tk.MustExec("create table t(a int)")

time.Sleep(10 * time.Millisecond)
ts, err := tk.Session().GetStore().GetOracle().GetTimestamp(context.Background(), &oracle.Option{})
require.NoError(t, err)

Expand Down Expand Up @@ -182,6 +184,7 @@ func TestGlobalVariablesOnFlashback(t *testing.T) {
tk.MustExec("use test")
tk.MustExec("create table t(a int)")

time.Sleep(10 * time.Millisecond)
ts, err := tk.Session().GetStore().GetOracle().GetTimestamp(context.Background(), &oracle.Option{})
require.NoError(t, err)

Expand Down Expand Up @@ -260,6 +263,8 @@ func TestCancelFlashbackCluster(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
originHook := dom.DDL().GetHook()
tk := testkit.NewTestKit(t, store)

time.Sleep(10 * time.Millisecond)
ts, err := tk.Session().GetStore().GetOracle().GetTimestamp(context.Background(), &oracle.Option{})
require.NoError(t, err)

Expand Down
1 change: 1 addition & 0 deletions ddl/db_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"time"

"github.com/pingcap/errors"
_ "github.com/pingcap/tidb/autoid_service"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/ddl/schematracker"
Expand Down
3 changes: 2 additions & 1 deletion ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -871,7 +871,8 @@ func doReorgWorkForCreateIndex(w *worker, d *ddlCtx, t *meta.Meta, job *model.Jo
return false, ver, err
}
indexInfo.BackfillState = model.BackfillStateInapplicable // Prevent double-write on this index.
return true, ver, nil
ver, err = updateVersionAndTableInfo(d, t, job, tbl.Meta(), true)
return true, ver, err
default:
return false, 0, dbterror.ErrInvalidDDLState.GenWithStackByArgs("backfill", indexInfo.BackfillState)
}
Expand Down
11 changes: 11 additions & 0 deletions ddl/multi_schema_change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1208,6 +1208,17 @@ func TestMultiSchemaChangeSchemaVersion(t *testing.T) {
dom.DDL().SetHook(originHook)
}

func TestMultiSchemaChangeAddIndexChangeColumn(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test;")
tk.MustExec("CREATE TABLE t (a SMALLINT DEFAULT '30219', b TIME NULL DEFAULT '02:45:06', PRIMARY KEY (a));")
tk.MustExec("ALTER TABLE t ADD unique INDEX idx4 (b), change column a e MEDIUMINT DEFAULT '5280454' FIRST;")
tk.MustExec("insert ignore into t (e) values (5586359),(501788),(-5961048),(220083),(-4917129),(-7267211),(7750448);")
tk.MustQuery("select * from t;").Check(testkit.Rows("5586359 02:45:06"))
tk.MustExec("admin check table t;")
}

func TestMultiSchemaChangeMixedWithUpdate(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
Expand Down
8 changes: 5 additions & 3 deletions domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -2187,7 +2187,7 @@ func (do *Domain) NotifyUpdatePrivilege() error {
// NotifyUpdateSysVarCache updates the sysvar cache key in etcd, which other TiDB
// clients are subscribed to for updates. For the caller, the cache is also built
// synchronously so that the effect is immediate.
func (do *Domain) NotifyUpdateSysVarCache() {
func (do *Domain) NotifyUpdateSysVarCache(updateLocal bool) {
if do.etcdClient != nil {
row := do.etcdClient.KV
_, err := row.Put(context.Background(), sysVarCacheKey, "")
Expand All @@ -2196,8 +2196,10 @@ func (do *Domain) NotifyUpdateSysVarCache() {
}
}
// update locally
if err := do.rebuildSysVarCache(nil); err != nil {
logutil.BgLogger().Error("rebuilding sysvar cache failed", zap.Error(err))
if updateLocal {
if err := do.rebuildSysVarCache(nil); err != nil {
logutil.BgLogger().Error("rebuilding sysvar cache failed", zap.Error(err))
}
}
}

Expand Down
Loading

0 comments on commit 9730c56

Please sign in to comment.