Skip to content

Commit

Permalink
handle the merged SST response
Browse files Browse the repository at this point in the history
Signed-off-by: Jianjun Liao <[email protected]>
  • Loading branch information
Leavrth committed Nov 21, 2024
1 parent ed2d749 commit a5b92cf
Show file tree
Hide file tree
Showing 8 changed files with 1,531 additions and 110 deletions.
3 changes: 2 additions & 1 deletion br/pkg/backup/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ go_test(
embed = [":backup"],
flaky = True,
race = "on",
shard_count = 15,
shard_count = 16,
deps = [
"//br/pkg/conn",
"//br/pkg/gluetidb/mock",
Expand All @@ -84,6 +84,7 @@ go_test(
"//pkg/testkit/testsetup",
"//pkg/util/table-filter",
"@com_github_golang_protobuf//proto",
"@com_github_google_btree//:btree",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_pingcap_kvproto//pkg/brpb",
"@com_github_pingcap_kvproto//pkg/encryptionpb",
Expand Down
55 changes: 37 additions & 18 deletions br/pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ mainLoop:
// Compute the left ranges that not backuped yet
start := time.Now()

var inCompleteRanges []rtree.Range
var inCompleteRanges []*backuppb.SubRanges
var allTxnLocks []*txnlock.Lock
select {
case <-ctx.Done():
Expand All @@ -211,8 +211,7 @@ mainLoop:
mainCancel()
return ctx.Err()
default:
iter := loop.GlobalProgressTree.Iter()
inCompleteRanges = iter.GetIncompleteRanges()
inCompleteRanges = loop.GlobalProgressTree.GetIncompleteRanges()
if len(inCompleteRanges) == 0 {
// all range backuped
logutil.CL(ctx).Info("This round finished all backup ranges", zap.Uint64("round", round))
Expand All @@ -225,7 +224,7 @@ mainLoop:
logutil.CL(mainCtx).Info("backup ranges", zap.Uint64("round", round),
zap.Int("incomplete-ranges", len(inCompleteRanges)), zap.Duration("cost", time.Since(start)))

loop.BackupReq.SubRanges = getBackupRanges(inCompleteRanges)
loop.BackupReq.SubRangesGroups = inCompleteRanges

allStores, err := bc.getBackupStores(mainCtx, loop.ReplicaReadLabel)
if err != nil {
Expand Down Expand Up @@ -1155,6 +1154,23 @@ func (bc *Client) getBackupStores(ctx context.Context, replicaReadLabel map[stri
return targetStores, nil
}

func minEndKey(left, right []byte) []byte {
if len(right) == 0 {
return left
}
if len(left) == 0 || bytes.Compare(left, right) > 0 {
return right
}
return left
}

func maxStartKey(left, right []byte) []byte {
if bytes.Compare(left, right) > 0 {
return left
}
return right
}

func (bc *Client) OnBackupResponse(
ctx context.Context,
r *ResponseAndStore,
Expand All @@ -1169,28 +1185,31 @@ func (bc *Client) OnBackupResponse(
storeID := r.GetStoreID()
if resp.GetError() == nil {
start := time.Now()
pr, err := globalProgressTree.FindContained(resp.StartKey, resp.EndKey)
prs, err := globalProgressTree.FindContained(resp.StartKey, resp.EndKey)
logutil.CL(ctx).Debug("find the range tree contains response ranges", zap.Duration("take", time.Since(start)))
if err != nil {
logutil.CL(ctx).Error("failed to update the backup response",
zap.Reflect("error", err))
return nil, err
}
if bc.checkpointRunner != nil {
if err := checkpoint.AppendForBackup(
ctx,
bc.checkpointRunner,
pr.GroupKey,
resp.StartKey,
resp.EndKey,
resp.Files,
); err != nil {
// flush checkpoint failed,
logutil.CL(ctx).Error("failed to flush checkpoint", zap.Error(err))
return nil, err
for _, pr := range prs {
startKey, endKey := maxStartKey(pr.Origin.StartKey, resp.StartKey), minEndKey(pr.Origin.EndKey, resp.EndKey)
if bc.checkpointRunner != nil {
if err := checkpoint.AppendForBackup(
ctx,
bc.checkpointRunner,
pr.GroupKey,
startKey,
endKey,
resp.Files,
); err != nil {
// flush checkpoint failed,
logutil.CL(ctx).Error("failed to flush checkpoint", zap.Error(err))
return nil, err
}
}
pr.Res.Put(startKey, endKey, resp.Files)
}
pr.Res.Put(resp.StartKey, resp.EndKey, resp.Files)
apiVersion := resp.ApiVersion
bc.SetApiVersion(apiVersion)
} else {
Expand Down
148 changes: 142 additions & 6 deletions br/pkg/backup/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"time"

"github.com/golang/protobuf/proto"
"github.com/google/btree"
"github.com/pingcap/failpoint"
backuppb "github.com/pingcap/kvproto/pkg/brpb"
"github.com/pingcap/kvproto/pkg/encryptionpb"
Expand Down Expand Up @@ -387,18 +388,18 @@ func TestOnBackupResponse(t *testing.T) {
require.NoError(t, err)
require.Nil(t, lock)

incomplete := tree.Iter().GetIncompleteRanges()
incomplete := tree.GetIncompleteRanges()
require.Len(t, incomplete, 1)
require.Equal(t, []byte("b"), incomplete[0].StartKey)
require.Equal(t, []byte("c"), incomplete[0].EndKey)
require.Len(t, incomplete[0].SubRanges, 1)
require.Equal(t, &kvrpcpb.KeyRange{StartKey: []byte("b"), EndKey: []byte("c")}, incomplete[0].SubRanges[0])

// case #4: success case, make up incomplete range
r.Resp.StartKey = []byte("b")
r.Resp.EndKey = []byte("c")
lock, err = s.backupClient.OnBackupResponse(ctx, r, errContext, &tree)
require.NoError(t, err)
require.Nil(t, lock)
incomplete = tree.Iter().GetIncompleteRanges()
incomplete = tree.GetIncompleteRanges()
require.Len(t, incomplete, 0)

// case #5: failed case, key is locked
Expand Down Expand Up @@ -426,6 +427,108 @@ func TestOnBackupResponse(t *testing.T) {
require.Equal(t, []byte("b"), lock.Primary)
}

func TestOnBackupResponse2(t *testing.T) {
s := createBackupSuite(t)

ctx := context.Background()
buildProgressRangeFn := func(startKey []byte, endKey []byte) *rtree.ProgressRange {
return &rtree.ProgressRange{
Res: rtree.NewRangeTree(),
Origin: rtree.Range{
StartKey: startKey,
EndKey: endKey,
},
}
}
buildBackupResponseFn := func(startKey, endKey []byte, fileName string) *backup.ResponseAndStore {
return &backup.ResponseAndStore{
StoreID: 0,
Resp: &backuppb.BackupResponse{
StartKey: startKey,
EndKey: endKey,
Files: []*backuppb.File{
{Name: fileName},
},
},
}
}
tree := rtree.NewProgressRangeTree()
require.NoError(t, tree.Insert(buildProgressRangeFn([]byte("aa"), []byte("cc"))))
require.NoError(t, tree.Insert(buildProgressRangeFn([]byte("dd"), []byte("ff"))))

errContext := utils.NewErrorContext("test", 1)
r := buildBackupResponseFn([]byte("a"), []byte("ccc"), "123")
lock, err := s.backupClient.OnBackupResponse(ctx, r, errContext, &tree)
require.Error(t, err)
require.Nil(t, lock)
r = buildBackupResponseFn([]byte("aaa"), []byte("dd"), "123")
lock, err = s.backupClient.OnBackupResponse(ctx, r, errContext, &tree)
require.Error(t, err)
require.Nil(t, lock)
r = buildBackupResponseFn([]byte("aaa"), []byte("ca"), "123")
lock, err = s.backupClient.OnBackupResponse(ctx, r, errContext, &tree)
require.NoError(t, err)
require.Nil(t, lock)
r = buildBackupResponseFn([]byte("ca"), []byte("de"), "456")
lock, err = s.backupClient.OnBackupResponse(ctx, r, errContext, &tree)
require.NoError(t, err)
require.Nil(t, lock)
r = buildBackupResponseFn([]byte("aa"), []byte("aaa"), "789")
lock, err = s.backupClient.OnBackupResponse(ctx, r, errContext, &tree)
require.NoError(t, err)
require.Nil(t, lock)
r = buildBackupResponseFn([]byte("de"), []byte("ff"), "012")
lock, err = s.backupClient.OnBackupResponse(ctx, r, errContext, &tree)
require.NoError(t, err)
require.Nil(t, lock)

expectResultFileNames := []struct {
name string
startKey []byte
endKey []byte
}{
{
name: "789",
startKey: []byte("aa"),
endKey: []byte("aaa"),
},
{
name: "123",
startKey: []byte("aaa"),
endKey: []byte("ca"),
},
{
name: "456",
startKey: []byte("ca"),
endKey: []byte("cc"),
},
{
name: "456",
startKey: []byte("dd"),
endKey: []byte("de"),
},
{
name: "012",
startKey: []byte("de"),
endKey: []byte("ff"),
},
}
i := 0
tree.Ascend(func(item *rtree.ProgressRange) bool {
item.Res.Ascend(func(item btree.Item) bool {
rg := item.(*rtree.Range)
expectRg := expectResultFileNames[i]
require.Equal(t, expectRg.startKey, rg.StartKey)
require.Equal(t, expectRg.endKey, rg.EndKey)
require.Len(t, rg.Files, 1)
require.Equal(t, expectRg.name, rg.Files[0].Name)
i += 1
return true
})
return true
})
}

func TestMainBackupLoop(t *testing.T) {
s := createBackupSuite(t)
backgroundCtx := context.Background()
Expand Down Expand Up @@ -760,8 +863,41 @@ func TestBuildProgressRangeTree(t *testing.T) {

contained, err = tree.FindContained([]byte("aa"), []byte("b"))
require.NotNil(t, contained)
require.Equal(t, []byte("aa"), contained.Origin.StartKey)
require.Equal(t, []byte("b"), contained.Origin.EndKey)
require.Len(t, contained, 1)
require.Equal(t, []byte("aa"), contained[0].Origin.StartKey)
require.Equal(t, []byte("b"), contained[0].Origin.EndKey)
require.NoError(t, err)

contained, err = tree.FindContained([]byte("aaa"), []byte("b"))
require.NotNil(t, contained)
require.Len(t, contained, 1)
require.Equal(t, []byte("aa"), contained[0].Origin.StartKey)
require.Equal(t, []byte("b"), contained[0].Origin.EndKey)
require.NoError(t, err)

contained, err = tree.FindContained([]byte("a"), []byte("b"))
require.Nil(t, contained)
require.Error(t, err)

contained, err = tree.FindContained([]byte("bb"), []byte("cc"))
require.Nil(t, contained)
require.Error(t, err)

contained, err = tree.FindContained([]byte("b"), []byte("cc"))
require.Nil(t, contained)
require.Error(t, err)

contained, err = tree.FindContained([]byte("aaa"), []byte("c"))
require.Nil(t, contained)
require.Error(t, err)

contained, err = tree.FindContained([]byte("aaa"), []byte("cc"))
require.NotNil(t, contained)
require.Len(t, contained, 2)
require.Equal(t, []byte("aa"), contained[0].Origin.StartKey)
require.Equal(t, []byte("b"), contained[0].Origin.EndKey)
require.Equal(t, []byte("c"), contained[1].Origin.StartKey)
require.Equal(t, []byte("d"), contained[1].Origin.EndKey)
require.NoError(t, err)
}

Expand Down
2 changes: 2 additions & 0 deletions br/pkg/rtree/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ go_library(
"//pkg/util/redact",
"@com_github_google_btree//:btree",
"@com_github_pingcap_kvproto//pkg/brpb",
"@com_github_pingcap_kvproto//pkg/kvrpcpb",
"@com_github_pingcap_log//:log",
"@com_github_pkg_errors//:errors",
"@org_uber_go_zap//:zap",
Expand All @@ -39,6 +40,7 @@ go_test(
"//pkg/tablecodec",
"//pkg/testkit/testsetup",
"@com_github_pingcap_kvproto//pkg/brpb",
"@com_github_pingcap_kvproto//pkg/kvrpcpb",
"@com_github_stretchr_testify//require",
"@org_uber_go_goleak//:goleak",
"@org_uber_go_zap//:zap",
Expand Down
Loading

0 comments on commit a5b92cf

Please sign in to comment.