Skip to content

Commit

Permalink
Merge branch 'pingcap:master' into yzhan/authplugin
Browse files Browse the repository at this point in the history
  • Loading branch information
yzhan1 authored Jun 19, 2024
2 parents ac6e8c0 + e41a47c commit 6734ed3
Show file tree
Hide file tree
Showing 102 changed files with 2,868 additions and 575 deletions.
12 changes: 6 additions & 6 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -7180,13 +7180,13 @@ def go_deps():
name = "com_github_tikv_client_go_v2",
build_file_proto_mode = "disable_global",
importpath = "github.com/tikv/client-go/v2",
sha256 = "bb5f9eb136d207f214c1eef12b89774edfa55e0b6721c66c7f83b1dfd6c11855",
strip_prefix = "github.com/tikv/client-go/[email protected].20240604045705-156cebc2defa",
sha256 = "de925cd3715472de4dd5ca829e4bb6b0495cc0574af1c3ad2ae97dbf23b59786",
strip_prefix = "github.com/tikv/client-go/[email protected].20240614064455-ac8fa1d73a0c",
urls = [
"http://bazel-cache.pingcap.net:8080/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20240604045705-156cebc2defa.zip",
"http://ats.apps.svc/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20240604045705-156cebc2defa.zip",
"https://cache.hawkingrei.com/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20240604045705-156cebc2defa.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20240604045705-156cebc2defa.zip",
"http://bazel-cache.pingcap.net:8080/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20240614064455-ac8fa1d73a0c.zip",
"http://ats.apps.svc/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20240614064455-ac8fa1d73a0c.zip",
"https://cache.hawkingrei.com/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20240614064455-ac8fa1d73a0c.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20240614064455-ac8fa1d73a0c.zip",
],
)
go_repository(
Expand Down
9 changes: 9 additions & 0 deletions br/pkg/pdutil/pd.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,15 @@ func NewPdController(
}, nil
}

func NewPdControllerWithPDClient(pdClient pd.Client, pdHTTPCli pdhttp.Client, v *semver.Version) *PdController {
return &PdController{
pdClient: pdClient,
pdHTTPCli: pdHTTPCli,
version: v,
schedulerPauseCh: make(chan struct{}, 1),
}
}

func parseVersion(versionStr string) *semver.Version {
// we need trim space or semver will parse failed
v := strings.TrimSpace(versionStr)
Expand Down
14 changes: 13 additions & 1 deletion br/pkg/restore/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,26 @@ go_library(
go_test(
name = "restore_test",
timeout = "short",
srcs = ["misc_test.go"],
srcs = [
"import_mode_switcher_test.go",
"misc_test.go",
],
flaky = True,
race = "off",
shard_count = 6,
deps = [
":restore",
"//br/pkg/conn",
"//br/pkg/mock",
"//br/pkg/pdutil",
"//br/pkg/utiltest",
"//pkg/infoschema",
"//pkg/parser/model",
"@com_github_coreos_go_semver//semver",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_pingcap_kvproto//pkg/import_sstpb",
"@com_github_pingcap_kvproto//pkg/metapb",
"@com_github_stretchr_testify//require",
"@org_golang_google_grpc//:grpc",
],
)
145 changes: 145 additions & 0 deletions br/pkg/restore/import_mode_switcher_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package restore_test

import (
"context"
"fmt"
"net"
"sync"
"testing"
"time"

"github.com/coreos/go-semver/semver"
"github.com/pingcap/kvproto/pkg/import_sstpb"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/tidb/br/pkg/conn"
"github.com/pingcap/tidb/br/pkg/pdutil"
"github.com/pingcap/tidb/br/pkg/restore"
"github.com/pingcap/tidb/br/pkg/utiltest"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
)

type mockImportServer struct {
import_sstpb.ImportSSTServer

count int
ch chan struct{}
}

func (s *mockImportServer) SwitchMode(_ context.Context, req *import_sstpb.SwitchModeRequest) (*import_sstpb.SwitchModeResponse, error) {
s.count -= 1
if s.count == 0 {
s.ch <- struct{}{}
}
return &import_sstpb.SwitchModeResponse{}, nil
}

func TestRestorePreWork(t *testing.T) {
ctx := context.Background()
var port int
var lis net.Listener
var err error
for port = 0; port < 1000; port += 1 {
addr := fmt.Sprintf(":%d", 51111+port)
lis, err = net.Listen("tcp", addr)
if err == nil {
break
}
t.Log(err)
}

s := grpc.NewServer()
ch := make(chan struct{})
import_sstpb.RegisterImportSSTServer(s, &mockImportServer{count: 3, ch: ch})

var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
err := s.Serve(lis)
require.NoError(t, err)
}()

pdClient := utiltest.NewFakePDClient([]*metapb.Store{
{
Id: 1,
Address: fmt.Sprintf(":%d", 51111+port),
},
}, false, nil)
pdHTTPCli := utiltest.NewFakePDHTTPClient()
mgr := &conn.Mgr{
PdController: pdutil.NewPdControllerWithPDClient(
pdClient, pdHTTPCli, &semver.Version{Major: 4, Minor: 0, Patch: 9}),
}
mgr.PdController.SchedulerPauseTTL = 3 * time.Second
switcher := restore.NewImportModeSwitcher(pdClient, time.Millisecond*200, nil)
undo, cfg, err := restore.RestorePreWork(ctx, mgr, switcher, false, true)
require.NoError(t, err)
// check the cfg
{
require.Equal(t, len(pdutil.Schedulers), len(cfg.Schedulers))
for _, key := range cfg.Schedulers {
_, ok := pdutil.Schedulers[key]
require.True(t, ok)
}
require.Equal(t, len(utiltest.ExistPDCfgGeneratorBefore), len(cfg.ScheduleCfg))
for key, value := range cfg.ScheduleCfg {
expectValue, ok := utiltest.ExistPDCfgGeneratorBefore[key]
require.True(t, ok)
require.Equal(t, expectValue, value)
}
cfgs, err := pdHTTPCli.GetConfig(context.TODO())
require.NoError(t, err)
require.Equal(t, len(utiltest.ExpectPDCfgGeneratorsResult), len(cfg.ScheduleCfg))
for key, value := range cfgs {
expectValue, ok := utiltest.ExpectPDCfgGeneratorsResult[key[len("schedule."):]]
require.True(t, ok)
require.Equal(t, expectValue, value)
}
delaySchedulers := pdHTTPCli.GetDelaySchedulers()
require.Equal(t, len(pdutil.Schedulers), len(delaySchedulers))
for delayScheduler := range delaySchedulers {
_, ok := pdutil.Schedulers[delayScheduler]
require.True(t, ok)
}
}
<-ch
restore.RestorePostWork(ctx, switcher, undo, false)
// check the cfg done
{
cfgs, err := pdHTTPCli.GetConfig(context.TODO())
require.NoError(t, err)
require.Equal(t, len(utiltest.ExistPDCfgGeneratorBefore), len(cfg.ScheduleCfg))
for key, value := range cfgs {
expectValue, ok := utiltest.ExistPDCfgGeneratorBefore[key[len("schedule."):]]
require.True(t, ok)
require.Equal(t, expectValue, value)
}
delaySchedulers := pdHTTPCli.GetDelaySchedulers()
require.Equal(t, 0, len(delaySchedulers))
}

s.Stop()
lis.Close()
}

func TestRestorePreWorkOnline(t *testing.T) {
ctx := context.Background()
undo, _, err := restore.RestorePreWork(ctx, nil, nil, true, false)
require.NoError(t, err)
restore.RestorePostWork(ctx, nil, undo, true)
}
21 changes: 20 additions & 1 deletion br/pkg/restore/internal/import_client/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "import_client",
Expand All @@ -18,3 +18,22 @@ go_library(
"@org_golang_google_grpc//status",
],
)

go_test(
name = "import_client_test",
timeout = "short",
srcs = ["import_client_test.go"],
flaky = True,
deps = [
":import_client",
"//br/pkg/restore/split",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_kvproto//pkg/errorpb",
"@com_github_pingcap_kvproto//pkg/import_sstpb",
"@com_github_pingcap_kvproto//pkg/kvrpcpb",
"@com_github_pingcap_kvproto//pkg/metapb",
"@com_github_stretchr_testify//require",
"@org_golang_google_grpc//:grpc",
"@org_golang_google_grpc//keepalive",
],
)
148 changes: 148 additions & 0 deletions br/pkg/restore/internal/import_client/import_client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package importclient_test

import (
"context"
"fmt"
"net"
"sync"
"testing"

"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/errorpb"
"github.com/pingcap/kvproto/pkg/import_sstpb"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/kvproto/pkg/metapb"
importclient "github.com/pingcap/tidb/br/pkg/restore/internal/import_client"
"github.com/pingcap/tidb/br/pkg/restore/split"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"
)

type storeClient struct {
split.SplitClient
addr string
}

func (sc *storeClient) GetStore(_ context.Context, _ uint64) (*metapb.Store, error) {
return &metapb.Store{
Address: sc.addr,
}, nil
}

type mockImportServer struct {
import_sstpb.ImportSSTServer

ErrCount int
}

func (s *mockImportServer) ClearFiles(_ context.Context, req *import_sstpb.ClearRequest) (*import_sstpb.ClearResponse, error) {
return &import_sstpb.ClearResponse{Error: &import_sstpb.Error{Message: req.Prefix}}, nil
}

func (s *mockImportServer) Apply(_ context.Context, req *import_sstpb.ApplyRequest) (*import_sstpb.ApplyResponse, error) {
return &import_sstpb.ApplyResponse{Error: &import_sstpb.Error{Message: req.StorageCacheId}}, nil
}

func (s *mockImportServer) Download(_ context.Context, req *import_sstpb.DownloadRequest) (*import_sstpb.DownloadResponse, error) {
return &import_sstpb.DownloadResponse{Error: &import_sstpb.Error{Message: req.Name}}, nil
}

func (s *mockImportServer) SetDownloadSpeedLimit(_ context.Context, req *import_sstpb.SetDownloadSpeedLimitRequest) (*import_sstpb.SetDownloadSpeedLimitResponse, error) {
return &import_sstpb.SetDownloadSpeedLimitResponse{}, nil
}

func (s *mockImportServer) MultiIngest(_ context.Context, req *import_sstpb.MultiIngestRequest) (*import_sstpb.IngestResponse, error) {
if s.ErrCount <= 0 {
return nil, errors.Errorf("test")
}
s.ErrCount -= 1
if req.Context == nil {
return &import_sstpb.IngestResponse{}, nil
}
return &import_sstpb.IngestResponse{Error: &errorpb.Error{Message: req.Context.RequestSource}}, nil
}

func TestImportClient(t *testing.T) {
ctx := context.Background()
var port int
var lis net.Listener
var err error
for port = 0; port < 1000; port += 1 {
addr := fmt.Sprintf(":%d", 51111+port)
lis, err = net.Listen("tcp", addr)
if err == nil {
break
}
t.Log(err)
}

s := grpc.NewServer()
import_sstpb.RegisterImportSSTServer(s, &mockImportServer{ErrCount: 3})

var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
err := s.Serve(lis)
require.NoError(t, err)
}()

client := importclient.NewImportClient(&storeClient{addr: fmt.Sprintf(":%d", 51111+port)}, nil, keepalive.ClientParameters{})

{
resp, err := client.ClearFiles(ctx, 1, &import_sstpb.ClearRequest{Prefix: "test"})
require.NoError(t, err)
require.Equal(t, "test", resp.Error.Message)
}

{
resp, err := client.ApplyKVFile(ctx, 1, &import_sstpb.ApplyRequest{StorageCacheId: "test"})
require.NoError(t, err)
require.Equal(t, "test", resp.Error.Message)
}

{
resp, err := client.DownloadSST(ctx, 1, &import_sstpb.DownloadRequest{Name: "test"})
require.NoError(t, err)
require.Equal(t, "test", resp.Error.Message)
}

{
_, err := client.SetDownloadSpeedLimit(ctx, 1, &import_sstpb.SetDownloadSpeedLimitRequest{SpeedLimit: 123})
require.NoError(t, err)
}

{
resp, err := client.MultiIngest(ctx, 1, &import_sstpb.MultiIngestRequest{Context: &kvrpcpb.Context{RequestSource: "test"}})
require.NoError(t, err)
require.Equal(t, "test", resp.Error.Message)
}

{
err := client.CheckMultiIngestSupport(ctx, []uint64{1})
require.NoError(t, err)
err = client.CheckMultiIngestSupport(ctx, []uint64{3, 4, 5})
require.Error(t, err)
}

err = client.CloseGrpcClient()
require.NoError(t, err)

s.Stop()
lis.Close()
}
Loading

0 comments on commit 6734ed3

Please sign in to comment.