Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 8 additions & 7 deletions br/pkg/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,14 @@ var (
ErrMigrationNotFound = errors.Normalize("no migration found", errors.RFCCodeText("BR:Common:ErrMigrationNotFound"))
ErrMigrationVersionNotSupported = errors.Normalize("the migration version isn't supported", errors.RFCCodeText("BR:Common:ErrMigrationVersionNotSupported"))

ErrPDUpdateFailed = errors.Normalize("failed to update PD", errors.RFCCodeText("BR:PD:ErrPDUpdateFailed"))
ErrPDLeaderNotFound = errors.Normalize("PD leader not found", errors.RFCCodeText("BR:PD:ErrPDLeaderNotFound"))
ErrPDInvalidResponse = errors.Normalize("PD invalid response", errors.RFCCodeText("BR:PD:ErrPDInvalidResponse"))
ErrPDBatchScanRegion = errors.Normalize("batch scan region", errors.RFCCodeText("BR:PD:ErrPDBatchScanRegion"))
ErrPDUnknownScatterResult = errors.Normalize("failed to wait region scattered", errors.RFCCodeText("BR:PD:ErrPDUknownScatterResult"))
ErrPDNotFullyScatter = errors.Normalize("pd not fully scattered", errors.RFCCodeText("BR:PD:ErrPDNotFullyScatter"))
ErrPDSplitFailed = errors.Normalize("failed to wait region split", errors.RFCCodeText("BR:PD:ErrPDUknownScatterResult"))
ErrPDUpdateFailed = errors.Normalize("failed to update PD", errors.RFCCodeText("BR:PD:ErrPDUpdateFailed"))
ErrPDLeaderNotFound = errors.Normalize("PD leader not found", errors.RFCCodeText("BR:PD:ErrPDLeaderNotFound"))
ErrPDInvalidResponse = errors.Normalize("PD invalid response", errors.RFCCodeText("BR:PD:ErrPDInvalidResponse"))
ErrPDBatchScanRegion = errors.Normalize("batch scan region", errors.RFCCodeText("BR:PD:ErrPDBatchScanRegion"))
ErrPDUnknownScatterResult = errors.Normalize("failed to wait region scattered", errors.RFCCodeText("BR:PD:ErrPDUknownScatterResult"))
ErrPDNotFullyScatter = errors.Normalize("pd not fully scattered", errors.RFCCodeText("BR:PD:ErrPDNotFullyScatter"))
ErrPDSplitFailed = errors.Normalize("failed to wait region split", errors.RFCCodeText("BR:PD:ErrPDUknownScatterResult"))
ErrPDRegionsNotFullyScatter = errors.Normalize("regions not fully scattered", errors.RFCCodeText("BR:PD:ErrPDRegionsNotFullyScatter"))

ErrBackupChecksumMismatch = errors.Normalize("backup checksum mismatch", errors.RFCCodeText("BR:Backup:ErrBackupChecksumMismatch"))
ErrBackupInvalidRange = errors.Normalize("backup range invalid", errors.RFCCodeText("BR:Backup:ErrBackupInvalidRange"))
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/restore/split/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ go_test(
],
embed = [":split"],
flaky = True,
shard_count = 28,
shard_count = 29,
deps = [
"//br/pkg/errors",
"//br/pkg/restore/utils",
Expand Down
24 changes: 18 additions & 6 deletions br/pkg/restore/split/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,8 +191,13 @@ func (c *pdClient) scatterRegions(ctx context.Context, newRegions []*RegionInfo)
// the retry is for the temporary network errors during sending request.
err := utils.WithRetry(ctx, func() error {
failedRegionsID, err := c.tryScatterRegions(ctx, newRegions)
if isUnsupportedError(err) {
log.Warn("batch scatter isn't supported, rollback to old method", logutil.ShortError(err))
// if err is unsupported, we need to fallback to the old method.
// ErrPDRegionsNotFullyScatter means the regions are not fully scattered,
// in new version of PD, the scatter regions API will return the failed regions id,
// but the old version of PD will only return the FinishedPercentage.
// so we need to retry the regions one by one.
if isUnsupportedError(err) || berrors.ErrPDRegionsNotFullyScatter.Equal(err) {
log.Warn("failed to batch scatter regions, rollback to sequentially scatter", logutil.ShortError(err))
c.scatterRegionsSequentially(
ctx, newRegions,
// backoff about 1h total, or we give up scattering this region.
Expand Down Expand Up @@ -237,12 +242,19 @@ func (c *pdClient) tryScatterRegions(ctx context.Context, regionInfo []*RegionIn
"pd returns error during batch scattering: %s", pbErr)
}

failedRegionsID := make(map[uint64]struct{})
for _, id := range resp.FailedRegionsId {
failedRegionsID[id] = struct{}{}
if len(resp.FailedRegionsId) > 0 {
failedRegionsID := make(map[uint64]struct{})
for _, id := range resp.FailedRegionsId {
failedRegionsID[id] = struct{}{}
}
return failedRegionsID, nil
}

if finished := resp.GetFinishedPercentage(); finished < 100 {
return nil, errors.Annotatef(berrors.ErrPDRegionsNotFullyScatter, "scatter finished percentage %d less than 100", finished)
}

return failedRegionsID, nil
return nil, nil
}

func (c *pdClient) GetStore(ctx context.Context, storeID uint64) (*metapb.Store, error) {
Expand Down
12 changes: 7 additions & 5 deletions br/pkg/restore/split/mock_pd_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,9 +192,10 @@ type MockPDClientForSplit struct {
count map[uint64]int
}
scatterRegions struct {
notImplemented bool
regionCount int
failedCount int
notImplemented bool
regionCount int
failedCount int
finishedPercentage int
}
getOperator struct {
responses map[uint64][]*pdpb.GetOperatorResponse
Expand All @@ -206,6 +207,7 @@ func NewMockPDClientForSplit() *MockPDClientForSplit {
ret := &MockPDClientForSplit{}
ret.Regions = &pdtypes.RegionTree{}
ret.scatterRegion.count = make(map[uint64]int)
ret.scatterRegions.finishedPercentage = 100
return ret
}

Expand Down Expand Up @@ -389,8 +391,8 @@ func (c *MockPDClientForSplit) ScatterRegions(_ context.Context, regionIDs []uin
FailedRegionsId: regionIDs[:],
}, nil
}
c.scatterRegions.regionCount += len(regionIDs)
return &pdpb.ScatterRegionResponse{}, nil
c.scatterRegions.regionCount += len(regionIDs) * c.scatterRegions.finishedPercentage / 100
return &pdpb.ScatterRegionResponse{FinishedPercentage: uint64(c.scatterRegions.finishedPercentage)}, nil
}

func (c *MockPDClientForSplit) GetOperator(_ context.Context, regionID uint64) (*pdpb.GetOperatorResponse, error) {
Expand Down
41 changes: 41 additions & 0 deletions br/pkg/restore/split/split_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1031,3 +1031,44 @@ func TestSplitPoint2(t *testing.T) {
})
require.NoError(t, err)
}

func TestRegionsNotFullyScatter(t *testing.T) {
mockClient := NewMockPDClientForSplit()
client := pdClient{
needScatterVal: true,
client: mockClient,
}
client.needScatterInit.Do(func() {})
ctx := context.Background()

regions := []*RegionInfo{
{
Region: &metapb.Region{
Id: 1,
},
},
{
Region: &metapb.Region{
Id: 2,
},
},
}
err := client.scatterRegions(ctx, regions)
require.NoError(t, err)
require.Equal(t, 2, mockClient.scatterRegions.regionCount)
require.Len(t, mockClient.scatterRegion.count, 0)

// simulate that one region is not fully scattered when scatterRegions
mockClient.scatterRegions.finishedPercentage = 50
err = client.scatterRegions(ctx, regions)
require.NoError(t, err)
require.Equal(t, 2+1, mockClient.scatterRegions.regionCount)
require.Equal(t, map[uint64]int{1: 1, 2: 1}, mockClient.scatterRegion.count)

// simulate that the regions is not fully scattered when scatterRegion
mockClient.scatterRegion.eachRegionFailBefore = 7
err = client.scatterRegions(ctx, regions)
require.NoError(t, err)
require.Equal(t, 2+1+1, mockClient.scatterRegions.regionCount)
require.Equal(t, map[uint64]int{1: 1 + 7, 2: 1 + 7}, mockClient.scatterRegion.count)
}
5 changes: 5 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,11 @@ error = '''
pd not fully scattered
'''

["BR:PD:ErrPDRegionsNotFullyScatter"]
error = '''
regions not fully scattered
'''

["BR:PD:ErrPDUknownScatterResult"]
error = '''
failed to wait region split
Expand Down
Loading