Skip to content

Commit 0c27830

Browse files
committed
br: fix restore ratelimit
Signed-off-by: haojinming <[email protected]>
1 parent 4fac4d6 commit 0c27830

File tree

4 files changed

+182
-18
lines changed

4 files changed

+182
-18
lines changed

br/pkg/restore/client.go

+36-11
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@ import (
3434
"google.golang.org/grpc/keepalive"
3535
)
3636

37+
const resetSpeedLimitRetryTimes = 3
38+
3739
// Client sends requests to restore files.
3840
type Client struct {
3941
pdClient pd.Client
@@ -59,7 +61,6 @@ type Client struct {
5961

6062
// NewRestoreClient returns a new RestoreClient.
6163
func NewRestoreClient(
62-
g glue.Glue,
6364
pdClient pd.Client,
6465
tlsConf *tls.Config,
6566
keepaliveConf keepalive.ClientParameters,
@@ -134,7 +135,7 @@ func (rc *Client) InitBackupMeta(
134135
metaClient := NewSplitClient(rc.pdClient, rc.tlsConf, rc.backupMeta.IsRawKv)
135136
importCli := NewImportClient(metaClient, rc.tlsConf, rc.keepaliveConf)
136137
rc.fileImporter = NewFileImporter(metaClient, importCli, backend, rc.backupMeta.IsRawKv,
137-
rc.backupMeta.ApiVersion, rc.rateLimit)
138+
rc.backupMeta.ApiVersion)
138139
return rc.fileImporter.CheckMultiIngestSupport(c, rc.pdClient)
139140
}
140141

@@ -224,21 +225,39 @@ func (rc *Client) GetTS(ctx context.Context) (uint64, error) {
224225
return restoreTS, nil
225226
}
226227

227-
// nolint:unused
228-
func (rc *Client) setSpeedLimit(ctx context.Context) error {
229-
if !rc.hasSpeedLimited && rc.rateLimit != 0 {
230-
stores, err := conn.GetAllTiKVStores(ctx, rc.pdClient, conn.SkipTiFlash)
228+
func (rc *Client) setSpeedLimit(ctx context.Context, rateLimit uint64) error {
229+
stores, err := conn.GetAllTiKVStores(ctx, rc.pdClient, conn.SkipTiFlash)
230+
if err != nil {
231+
return errors.Trace(err)
232+
}
233+
for _, store := range stores {
234+
err = rc.fileImporter.setDownloadSpeedLimit(ctx, store.GetId(), rateLimit)
231235
if err != nil {
232236
return errors.Trace(err)
233237
}
234-
for _, store := range stores {
235-
err = rc.fileImporter.setDownloadSpeedLimit(ctx, store.GetId())
236-
if err != nil {
237-
return errors.Trace(err)
238+
}
239+
rc.hasSpeedLimited = true
240+
return nil
241+
}
242+
243+
func (rc *Client) resetSpeedLimit(ctx context.Context) error {
244+
if rc.hasSpeedLimited {
245+
var resetErr error
246+
for retry := 0; retry < resetSpeedLimitRetryTimes; retry++ {
247+
resetErr = rc.setSpeedLimit(ctx, 0)
248+
if resetErr != nil {
249+
log.Warn("failed to reset speed limit, retry it",
250+
zap.Int("retry time", retry), logutil.ShortError(resetErr))
251+
time.Sleep(time.Duration(retry+3) * time.Second)
252+
continue
238253
}
254+
break
255+
}
256+
if resetErr != nil {
257+
log.Error("failed to reset speed limit", zap.Error(resetErr))
239258
}
240-
rc.hasSpeedLimited = true
241259
}
260+
rc.hasSpeedLimited = false
242261
return nil
243262
}
244263

@@ -270,6 +289,12 @@ func (rc *Client) RestoreRaw(
270289
if err != nil {
271290
return errors.Trace(err)
272291
}
292+
err = rc.setSpeedLimit(ctx, rc.rateLimit)
293+
if err != nil {
294+
return errors.Trace(err)
295+
}
296+
// TODO: Need a mechanism to set speed limit in ttl.
297+
defer rc.resetSpeedLimit(ctx)
273298

274299
for _, file := range files {
275300
fileReplica := file

br/pkg/restore/client_test.go

+143
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
// Copyright 2020 PingCAP, Inc. Licensed under Apache-2.0.
2+
3+
package restore
4+
5+
import (
6+
"context"
7+
"fmt"
8+
"sync"
9+
"testing"
10+
"time"
11+
12+
"github.com/jarcoal/httpmock"
13+
"github.com/pingcap/kvproto/pkg/import_sstpb"
14+
"github.com/pingcap/kvproto/pkg/kvrpcpb"
15+
"github.com/pingcap/kvproto/pkg/metapb"
16+
"github.com/stretchr/testify/require"
17+
pd "github.com/tikv/pd/client"
18+
"google.golang.org/grpc/keepalive"
19+
)
20+
21+
var defaultKeepaliveCfg = keepalive.ClientParameters{
22+
Time: 3 * time.Second,
23+
Timeout: 10 * time.Second,
24+
}
25+
26+
type fakePDClient struct {
27+
pd.Client
28+
stores []*metapb.Store
29+
}
30+
31+
func (fpdc fakePDClient) GetAllStores(context.Context, ...pd.GetStoreOption) ([]*metapb.Store, error) {
32+
return append([]*metapb.Store{}, fpdc.stores...), nil
33+
}
34+
35+
// Mock ImporterClient interface
36+
type FakeImporterClient struct {
37+
ImporterClient
38+
}
39+
40+
// Record the stores that have communicated
41+
type RecordStores struct {
42+
mu sync.Mutex
43+
stores map[uint64]uint64
44+
}
45+
46+
func NewRecordStores() RecordStores {
47+
return RecordStores{stores: make(map[uint64]uint64, 0)}
48+
}
49+
50+
func (r *RecordStores) put(id uint64, rateLimit uint64) {
51+
r.mu.Lock()
52+
defer r.mu.Unlock()
53+
r.stores[id] = rateLimit
54+
}
55+
56+
func (r *RecordStores) len() int {
57+
r.mu.Lock()
58+
defer r.mu.Unlock()
59+
return len(r.stores)
60+
}
61+
62+
func (r *RecordStores) get(id uint64) uint64 {
63+
r.mu.Lock()
64+
defer r.mu.Unlock()
65+
return r.stores[id]
66+
}
67+
68+
func (r *RecordStores) toString() string {
69+
r.mu.Lock()
70+
defer r.mu.Unlock()
71+
return fmt.Sprintf("%v", r.stores)
72+
}
73+
74+
var recordStores RecordStores
75+
76+
const (
77+
WORKING_TIME = 10
78+
)
79+
80+
func (fakeImportCli FakeImporterClient) SetDownloadSpeedLimit(
81+
ctx context.Context,
82+
storeID uint64,
83+
req *import_sstpb.SetDownloadSpeedLimitRequest,
84+
) (*import_sstpb.SetDownloadSpeedLimitResponse, error) {
85+
time.Sleep(WORKING_TIME * time.Millisecond) // simulate doing 100 ms work
86+
recordStores.put(storeID, req.SpeedLimit)
87+
return nil, nil
88+
}
89+
90+
func TestSetSpeedLimit(t *testing.T) {
91+
mockStores := []*metapb.Store{
92+
{Id: 1},
93+
{Id: 2},
94+
{Id: 3},
95+
{Id: 4},
96+
{Id: 5},
97+
{Id: 6},
98+
{Id: 7},
99+
{Id: 8},
100+
{Id: 9},
101+
{Id: 10},
102+
}
103+
httpmock.Activate()
104+
defer httpmock.DeactivateAndReset()
105+
// Exact URL match
106+
httpmock.RegisterResponder("GET", `=~^/config`,
107+
httpmock.NewStringResponder(200, `{"storage":{"api-version":2, "enable-ttl":true}}`))
108+
// 1. The cost of concurrent communication is expected to be less than the cost of serial communication.
109+
client, err := NewRestoreClient(fakePDClient{
110+
stores: mockStores,
111+
}, nil, defaultKeepaliveCfg, true)
112+
require.NoError(t, err)
113+
client.fileImporter = NewFileImporter(nil, FakeImporterClient{}, nil, true, kvrpcpb.APIVersion_V2)
114+
ctx := context.Background()
115+
116+
rateLimit := uint64(10)
117+
recordStores = NewRecordStores()
118+
start := time.Now()
119+
err = client.setSpeedLimit(ctx, rateLimit)
120+
cost := time.Since(start)
121+
require.NoError(t, err)
122+
123+
t.Logf("Total Cost: %v\n", cost)
124+
t.Logf("Has Communicated: %v\n", recordStores.toString())
125+
126+
serialCost := time.Duration(len(mockStores)*WORKING_TIME) * time.Millisecond
127+
require.LessOrEqual(t, serialCost, cost)
128+
require.Equal(t, len(mockStores), recordStores.len())
129+
for i := 0; i < len(mockStores); i++ {
130+
require.Equal(t, rateLimit, recordStores.get(mockStores[i].Id))
131+
}
132+
133+
recordStores = NewRecordStores()
134+
start = time.Now()
135+
err = client.resetSpeedLimit(ctx)
136+
cost = time.Since(start)
137+
require.NoError(t, err)
138+
require.LessOrEqual(t, serialCost, cost)
139+
require.Equal(t, len(mockStores), recordStores.len())
140+
for i := 0; i < len(mockStores); i++ {
141+
require.Equal(t, uint64(0), recordStores.get(mockStores[i].Id))
142+
}
143+
}

br/pkg/restore/import.go

+2-6
Original file line numberDiff line numberDiff line change
@@ -201,7 +201,6 @@ type FileImporter struct {
201201
metaClient SplitClient
202202
importClient ImporterClient
203203
backend *backuppb.StorageBackend
204-
rateLimit uint64
205204

206205
isRawKvMode bool
207206
sstAPIVersion kvrpcpb.APIVersion
@@ -217,15 +216,13 @@ func NewFileImporter(
217216
backend *backuppb.StorageBackend,
218217
isRawKvMode bool,
219218
apiVersion kvrpcpb.APIVersion,
220-
rateLimit uint64,
221219
) FileImporter {
222220
return FileImporter{
223221
metaClient: metaClient,
224222
backend: backend,
225223
importClient: importClient,
226224
isRawKvMode: isRawKvMode,
227225
sstAPIVersion: apiVersion,
228-
rateLimit: rateLimit,
229226
}
230227
}
231228

@@ -451,10 +448,9 @@ func (importer *FileImporter) Import(
451448
return errors.Trace(err)
452449
}
453450

454-
// nolint:unused
455-
func (importer *FileImporter) setDownloadSpeedLimit(ctx context.Context, storeID uint64) error {
451+
func (importer *FileImporter) setDownloadSpeedLimit(ctx context.Context, storeID uint64, rateLimit uint64) error {
456452
req := &import_sstpb.SetDownloadSpeedLimitRequest{
457-
SpeedLimit: importer.rateLimit,
453+
SpeedLimit: rateLimit,
458454
}
459455
_, err := importer.importClient.SetDownloadSpeedLimit(ctx, storeID, req)
460456
return errors.Trace(err)

br/pkg/task/restore_raw.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ func RunRestoreRaw(c context.Context, g glue.Glue, cmdName string, cfg *RestoreR
4545
// sometimes we have pooled the connections.
4646
// sending heartbeats in idle times is useful.
4747
keepaliveCfg.PermitWithoutStream = true
48-
client, err := restore.NewRestoreClient(g, mgr.GetPDClient(), mgr.GetTLSConfig(), keepaliveCfg, true)
48+
client, err := restore.NewRestoreClient(mgr.GetPDClient(), mgr.GetTLSConfig(), keepaliveCfg, true)
4949
if err != nil {
5050
return errors.Trace(err)
5151
}

0 commit comments

Comments
 (0)