Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions br/pkg/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,13 @@ var (
ErrUnsupportedOperation = errors.Normalize("the operation is not supported", errors.RFCCodeText("BR:Common:ErrUnsupportedOperation"))
ErrInvalidRange = errors.Normalize("invalid restore range", errors.RFCCodeText("BR:Common:ErrInvalidRange"))

ErrPDUpdateFailed = errors.Normalize("failed to update PD", errors.RFCCodeText("BR:PD:ErrPDUpdateFailed"))
ErrPDLeaderNotFound = errors.Normalize("PD leader not found", errors.RFCCodeText("BR:PD:ErrPDLeaderNotFound"))
ErrPDInvalidResponse = errors.Normalize("PD invalid response", errors.RFCCodeText("BR:PD:ErrPDInvalidResponse"))
ErrPDBatchScanRegion = errors.Normalize("batch scan region", errors.RFCCodeText("BR:PD:ErrPDBatchScanRegion"))
ErrPDUnknownScatterResult = errors.Normalize("failed to wait region scattered", errors.RFCCodeText("BR:PD:ErrPDUknownScatterResult"))
ErrPDSplitFailed = errors.Normalize("failed to wait region splitted", errors.RFCCodeText("BR:PD:ErrPDUknownScatterResult"))
ErrPDUpdateFailed = errors.Normalize("failed to update PD", errors.RFCCodeText("BR:PD:ErrPDUpdateFailed"))
ErrPDLeaderNotFound = errors.Normalize("PD leader not found", errors.RFCCodeText("BR:PD:ErrPDLeaderNotFound"))
ErrPDInvalidResponse = errors.Normalize("PD invalid response", errors.RFCCodeText("BR:PD:ErrPDInvalidResponse"))
ErrPDBatchScanRegion = errors.Normalize("batch scan region", errors.RFCCodeText("BR:PD:ErrPDBatchScanRegion"))
ErrPDUnknownScatterResult = errors.Normalize("failed to wait region scattered", errors.RFCCodeText("BR:PD:ErrPDUknownScatterResult"))
ErrPDSplitFailed = errors.Normalize("failed to wait region splitted", errors.RFCCodeText("BR:PD:ErrPDUknownScatterResult"))
ErrPDRegionsNotFullyScatter = errors.Normalize("regions not fully scattered", errors.RFCCodeText("BR:PD:ErrPDRegionsNotFullyScatter"))

ErrBackupChecksumMismatch = errors.Normalize("backup checksum mismatch", errors.RFCCodeText("BR:Backup:ErrBackupChecksumMismatch"))
ErrBackupInvalidRange = errors.Normalize("backup range invalid", errors.RFCCodeText("BR:Backup:ErrBackupInvalidRange"))
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/restore/split/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ go_test(
],
embed = [":split"],
flaky = True,
shard_count = 17,
shard_count = 18,
deps = [
"//br/pkg/errors",
"//br/pkg/utils",
Expand Down
13 changes: 11 additions & 2 deletions br/pkg/restore/split/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,8 +191,13 @@ func (c *pdClient) scatterRegions(ctx context.Context, newRegions []*RegionInfo)
// the retry is for the temporary network errors during sending request.
return utils.WithRetry(ctx, func() error {
err := c.tryScatterRegions(ctx, newRegions)
if isUnsupportedError(err) {
log.Warn("batch scatter isn't supported, rollback to old method", logutil.ShortError(err))
// if err is unsupported, we need to fallback to the old method.
// ErrPDRegionsNotFullyScatter means the regions are not fully scattered,
// in new version of PD, the scatter regions API will return the failed regions id,
// but the old version of PD will only return the FinishedPercentage.
// so we need to retry the regions one by one.
if isUnsupportedError(err) || berrors.ErrPDRegionsNotFullyScatter.Equal(err) {
log.Warn("failed to batch scatter regions, rollback to sequentially scatter", logutil.ShortError(err))
c.scatterRegionsSequentially(
ctx, newRegions,
// backoff about 6s, or we give up scattering this region.
Expand Down Expand Up @@ -222,6 +227,10 @@ func (c *pdClient) tryScatterRegions(ctx context.Context, regionInfo []*RegionIn
return errors.Annotatef(berrors.ErrPDInvalidResponse,
"pd returns error during batch scattering: %s", pbErr)
}
if finished := resp.GetFinishedPercentage(); finished < 100 {
return errors.Annotatef(berrors.ErrPDRegionsNotFullyScatter, "scatter finished percentage %d less than 100", finished)
}

return nil
}

Expand Down
10 changes: 6 additions & 4 deletions br/pkg/restore/split/mock_pd_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,9 @@ type MockPDClientForSplit struct {
count map[uint64]int
}
scatterRegions struct {
notImplemented bool
regionCount int
notImplemented bool
regionCount int
finishedPercentage int
}
getOperator struct {
responses map[uint64][]*pdpb.GetOperatorResponse
Expand All @@ -51,6 +52,7 @@ func NewMockPDClientForSplit() *MockPDClientForSplit {
ret := &MockPDClientForSplit{}
ret.Regions = &pdtypes.RegionTree{}
ret.scatterRegion.count = make(map[uint64]int)
ret.scatterRegions.finishedPercentage = 100
return ret
}

Expand Down Expand Up @@ -178,8 +180,8 @@ func (c *MockPDClientForSplit) ScatterRegions(_ context.Context, regionIDs []uin
if c.scatterRegions.notImplemented {
return nil, status.Error(codes.Unimplemented, "Ah, yep")
}
c.scatterRegions.regionCount += len(regionIDs)
return &pdpb.ScatterRegionResponse{}, nil
c.scatterRegions.regionCount += len(regionIDs) * c.scatterRegions.finishedPercentage / 100
return &pdpb.ScatterRegionResponse{FinishedPercentage: uint64(c.scatterRegions.finishedPercentage)}, nil
}

func (c *MockPDClientForSplit) GetOperator(_ context.Context, regionID uint64) (*pdpb.GetOperatorResponse, error) {
Expand Down
41 changes: 41 additions & 0 deletions br/pkg/restore/split/split_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -689,3 +689,44 @@ func TestRegionConsistency(t *testing.T) {
require.Regexp(t, ca.err, err.Error())
}
}

func TestRegionsNotFullyScatter(t *testing.T) {
mockClient := NewMockPDClientForSplit()
client := pdClient{
needScatterVal: true,
client: mockClient,
}
client.needScatterInit.Do(func() {})
ctx := context.Background()

regions := []*RegionInfo{
{
Region: &metapb.Region{
Id: 1,
},
},
{
Region: &metapb.Region{
Id: 2,
},
},
}
err := client.scatterRegions(ctx, regions)
require.NoError(t, err)
require.Equal(t, 2, mockClient.scatterRegions.regionCount)
require.Len(t, mockClient.scatterRegion.count, 0)

// simulate that one region is not fully scattered when scatterRegions
mockClient.scatterRegions.finishedPercentage = 50
err = client.scatterRegions(ctx, regions)
require.NoError(t, err)
require.Equal(t, 2+1, mockClient.scatterRegions.regionCount)
require.Equal(t, map[uint64]int{1: 1, 2: 1}, mockClient.scatterRegion.count)

// simulate that the regions is not fully scattered when scatterRegion
mockClient.scatterRegion.eachRegionFailBefore = 7
err = client.scatterRegions(ctx, regions)
require.NoError(t, err)
require.Equal(t, 2+1+1, mockClient.scatterRegions.regionCount)
require.Equal(t, map[uint64]int{1: 1 + 7, 2: 1 + 7}, mockClient.scatterRegion.count)
}
5 changes: 5 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,11 @@ error = '''
PD leader not found
'''

["BR:PD:ErrPDRegionsNotFullyScatter"]
error = '''
regions not fully scattered
'''

["BR:PD:ErrPDUknownScatterResult"]
error = '''
failed to wait region splitted
Expand Down