Skip to content

Commit

Permalink
Merge 459b35d into 4fac4d6
Browse files Browse the repository at this point in the history
  • Loading branch information
haojinming authored Oct 20, 2022
2 parents 4fac4d6 + 459b35d commit 681e667
Show file tree
Hide file tree
Showing 4 changed files with 179 additions and 19 deletions.
48 changes: 36 additions & 12 deletions br/pkg/restore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ import (
"google.golang.org/grpc/keepalive"
)

const resetSpeedLimitRetryTimes = 3

// Client sends requests to restore files.
type Client struct {
pdClient pd.Client
Expand All @@ -59,7 +61,6 @@ type Client struct {

// NewRestoreClient returns a new RestoreClient.
func NewRestoreClient(
g glue.Glue,
pdClient pd.Client,
tlsConf *tls.Config,
keepaliveConf keepalive.ClientParameters,
Expand Down Expand Up @@ -134,7 +135,7 @@ func (rc *Client) InitBackupMeta(
metaClient := NewSplitClient(rc.pdClient, rc.tlsConf, rc.backupMeta.IsRawKv)
importCli := NewImportClient(metaClient, rc.tlsConf, rc.keepaliveConf)
rc.fileImporter = NewFileImporter(metaClient, importCli, backend, rc.backupMeta.IsRawKv,
rc.backupMeta.ApiVersion, rc.rateLimit)
rc.backupMeta.ApiVersion)
return rc.fileImporter.CheckMultiIngestSupport(c, rc.pdClient)
}

Expand Down Expand Up @@ -224,22 +225,39 @@ func (rc *Client) GetTS(ctx context.Context) (uint64, error) {
return restoreTS, nil
}

// nolint:unused
func (rc *Client) setSpeedLimit(ctx context.Context) error {
if !rc.hasSpeedLimited && rc.rateLimit != 0 {
stores, err := conn.GetAllTiKVStores(ctx, rc.pdClient, conn.SkipTiFlash)
func (rc *Client) setSpeedLimit(ctx context.Context, rateLimit uint64) error {
stores, err := conn.GetAllTiKVStores(ctx, rc.pdClient, conn.SkipTiFlash)
if err != nil {
return errors.Trace(err)
}
for _, store := range stores {
err = rc.fileImporter.setDownloadSpeedLimit(ctx, store.GetId(), rateLimit)
if err != nil {
return errors.Trace(err)
}
for _, store := range stores {
err = rc.fileImporter.setDownloadSpeedLimit(ctx, store.GetId())
if err != nil {
return errors.Trace(err)
}
rc.hasSpeedLimited = true
return nil
}

func (rc *Client) resetSpeedLimit(ctx context.Context) {
if rc.hasSpeedLimited {
var resetErr error
for retry := 0; retry < resetSpeedLimitRetryTimes; retry++ {
resetErr = rc.setSpeedLimit(ctx, 0)
if resetErr != nil {
log.Warn("failed to reset speed limit, retry it",
zap.Int("retry time", retry), logutil.ShortError(resetErr))
time.Sleep(time.Duration(retry+3) * time.Second)
continue
}
break
}
if resetErr != nil {
log.Error("failed to reset speed limit", zap.Error(resetErr))
}
rc.hasSpeedLimited = true
}
return nil
rc.hasSpeedLimited = false
}

// RestoreRaw tries to restore raw keys in the specified range.
Expand Down Expand Up @@ -270,6 +288,12 @@ func (rc *Client) RestoreRaw(
if err != nil {
return errors.Trace(err)
}
err = rc.setSpeedLimit(ctx, rc.rateLimit)
if err != nil {
return errors.Trace(err)
}
// TODO: Need a mechanism to set speed limit in ttl.
defer rc.resetSpeedLimit(ctx)

for _, file := range files {
fileReplica := file
Expand Down
140 changes: 140 additions & 0 deletions br/pkg/restore/client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
// Copyright 2020 PingCAP, Inc. Licensed under Apache-2.0.

package restore

import (
"context"
"fmt"
"sync"
"testing"
"time"

"github.com/jarcoal/httpmock"
"github.com/pingcap/kvproto/pkg/import_sstpb"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/stretchr/testify/require"
pd "github.com/tikv/pd/client"
"google.golang.org/grpc/keepalive"
)

var defaultKeepaliveCfg = keepalive.ClientParameters{
Time: 3 * time.Second,
Timeout: 10 * time.Second,
}

type fakePDClient struct {
pd.Client
stores []*metapb.Store
}

func (fpdc fakePDClient) GetAllStores(context.Context, ...pd.GetStoreOption) ([]*metapb.Store, error) {
return append([]*metapb.Store{}, fpdc.stores...), nil
}

// Mock ImporterClient interface
type FakeImporterClient struct {
ImporterClient
}

// Record the stores that have communicated
type RecordStores struct {
mu sync.Mutex
stores map[uint64]uint64
}

func NewRecordStores() RecordStores {
return RecordStores{stores: make(map[uint64]uint64, 0)}
}

func (r *RecordStores) put(id uint64, rateLimit uint64) {
r.mu.Lock()
defer r.mu.Unlock()
r.stores[id] = rateLimit
}

func (r *RecordStores) len() int {
r.mu.Lock()
defer r.mu.Unlock()
return len(r.stores)
}

func (r *RecordStores) get(id uint64) uint64 {
r.mu.Lock()
defer r.mu.Unlock()
return r.stores[id]
}

func (r *RecordStores) toString() string {
r.mu.Lock()
defer r.mu.Unlock()
return fmt.Sprintf("%v", r.stores)
}

var recordStores RecordStores

const workingTime = 10

func (fakeImportCli FakeImporterClient) SetDownloadSpeedLimit(
ctx context.Context,
storeID uint64,
req *import_sstpb.SetDownloadSpeedLimitRequest,
) (*import_sstpb.SetDownloadSpeedLimitResponse, error) {
time.Sleep(workingTime * time.Millisecond) // simulate doing 100 ms work
recordStores.put(storeID, req.SpeedLimit)
return nil, nil
}

func TestSetSpeedLimit(t *testing.T) {
mockStores := []*metapb.Store{
{Id: 1},
{Id: 2},
{Id: 3},
{Id: 4},
{Id: 5},
{Id: 6},
{Id: 7},
{Id: 8},
{Id: 9},
{Id: 10},
}
httpmock.Activate()
defer httpmock.DeactivateAndReset()
// Exact URL match
httpmock.RegisterResponder("GET", `=~^/config`,
httpmock.NewStringResponder(200, `{"storage":{"api-version":2, "enable-ttl":true}}`))
// 1. The cost of concurrent communication is expected to be less than the cost of serial communication.
client, err := NewRestoreClient(fakePDClient{
stores: mockStores,
}, nil, defaultKeepaliveCfg, true)
require.NoError(t, err)
client.fileImporter = NewFileImporter(nil, FakeImporterClient{}, nil, true, kvrpcpb.APIVersion_V2)
ctx := context.Background()

rateLimit := uint64(10)
recordStores = NewRecordStores()
start := time.Now()
err = client.setSpeedLimit(ctx, rateLimit)
cost := time.Since(start)
require.NoError(t, err)

t.Logf("Total Cost: %v\n", cost)
t.Logf("Has Communicated: %v\n", recordStores.toString())

serialCost := time.Duration(len(mockStores)*workingTime) * time.Millisecond
require.LessOrEqual(t, serialCost, cost)
require.Equal(t, len(mockStores), recordStores.len())
for i := 0; i < len(mockStores); i++ {
require.Equal(t, rateLimit, recordStores.get(mockStores[i].Id))
}

recordStores = NewRecordStores()
start = time.Now()
client.resetSpeedLimit(ctx)
cost = time.Since(start)
require.LessOrEqual(t, serialCost, cost)
require.Equal(t, len(mockStores), recordStores.len())
for i := 0; i < len(mockStores); i++ {
require.Equal(t, uint64(0), recordStores.get(mockStores[i].Id))
}
}
8 changes: 2 additions & 6 deletions br/pkg/restore/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,6 @@ type FileImporter struct {
metaClient SplitClient
importClient ImporterClient
backend *backuppb.StorageBackend
rateLimit uint64

isRawKvMode bool
sstAPIVersion kvrpcpb.APIVersion
Expand All @@ -217,15 +216,13 @@ func NewFileImporter(
backend *backuppb.StorageBackend,
isRawKvMode bool,
apiVersion kvrpcpb.APIVersion,
rateLimit uint64,
) FileImporter {
return FileImporter{
metaClient: metaClient,
backend: backend,
importClient: importClient,
isRawKvMode: isRawKvMode,
sstAPIVersion: apiVersion,
rateLimit: rateLimit,
}
}

Expand Down Expand Up @@ -451,10 +448,9 @@ func (importer *FileImporter) Import(
return errors.Trace(err)
}

// nolint:unused
func (importer *FileImporter) setDownloadSpeedLimit(ctx context.Context, storeID uint64) error {
func (importer *FileImporter) setDownloadSpeedLimit(ctx context.Context, storeID uint64, rateLimit uint64) error {
req := &import_sstpb.SetDownloadSpeedLimitRequest{
SpeedLimit: importer.rateLimit,
SpeedLimit: rateLimit,
}
_, err := importer.importClient.SetDownloadSpeedLimit(ctx, storeID, req)
return errors.Trace(err)
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/task/restore_raw.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func RunRestoreRaw(c context.Context, g glue.Glue, cmdName string, cfg *RestoreR
// sometimes we have pooled the connections.
// sending heartbeats in idle times is useful.
keepaliveCfg.PermitWithoutStream = true
client, err := restore.NewRestoreClient(g, mgr.GetPDClient(), mgr.GetTLSConfig(), keepaliveCfg, true)
client, err := restore.NewRestoreClient(mgr.GetPDClient(), mgr.GetTLSConfig(), keepaliveCfg, true)
if err != nil {
return errors.Trace(err)
}
Expand Down

0 comments on commit 681e667

Please sign in to comment.