diff --git a/br/pkg/errors/errors.go b/br/pkg/errors/errors.go index 130569514db35..056ffb81b1577 100644 --- a/br/pkg/errors/errors.go +++ b/br/pkg/errors/errors.go @@ -41,12 +41,13 @@ var ( ErrUnsupportedOperation = errors.Normalize("the operation is not supported", errors.RFCCodeText("BR:Common:ErrUnsupportedOperation")) ErrInvalidRange = errors.Normalize("invalid restore range", errors.RFCCodeText("BR:Common:ErrInvalidRange")) - ErrPDUpdateFailed = errors.Normalize("failed to update PD", errors.RFCCodeText("BR:PD:ErrPDUpdateFailed")) - ErrPDLeaderNotFound = errors.Normalize("PD leader not found", errors.RFCCodeText("BR:PD:ErrPDLeaderNotFound")) - ErrPDInvalidResponse = errors.Normalize("PD invalid response", errors.RFCCodeText("BR:PD:ErrPDInvalidResponse")) - ErrPDBatchScanRegion = errors.Normalize("batch scan region", errors.RFCCodeText("BR:PD:ErrPDBatchScanRegion")) - ErrPDUnknownScatterResult = errors.Normalize("failed to wait region scattered", errors.RFCCodeText("BR:PD:ErrPDUknownScatterResult")) - ErrPDSplitFailed = errors.Normalize("failed to wait region splitted", errors.RFCCodeText("BR:PD:ErrPDUknownScatterResult")) + ErrPDUpdateFailed = errors.Normalize("failed to update PD", errors.RFCCodeText("BR:PD:ErrPDUpdateFailed")) + ErrPDLeaderNotFound = errors.Normalize("PD leader not found", errors.RFCCodeText("BR:PD:ErrPDLeaderNotFound")) + ErrPDInvalidResponse = errors.Normalize("PD invalid response", errors.RFCCodeText("BR:PD:ErrPDInvalidResponse")) + ErrPDBatchScanRegion = errors.Normalize("batch scan region", errors.RFCCodeText("BR:PD:ErrPDBatchScanRegion")) + ErrPDUnknownScatterResult = errors.Normalize("failed to wait region scattered", errors.RFCCodeText("BR:PD:ErrPDUknownScatterResult")) + ErrPDSplitFailed = errors.Normalize("failed to wait region splitted", errors.RFCCodeText("BR:PD:ErrPDUknownScatterResult")) + ErrPDRegionsNotFullyScatter = errors.Normalize("regions not fully scattered", errors.RFCCodeText("BR:PD:ErrPDRegionsNotFullyScatter")) ErrBackupChecksumMismatch = errors.Normalize("backup checksum mismatch", errors.RFCCodeText("BR:Backup:ErrBackupChecksumMismatch")) ErrBackupInvalidRange = errors.Normalize("backup range invalid", errors.RFCCodeText("BR:Backup:ErrBackupInvalidRange")) diff --git a/br/pkg/restore/split/BUILD.bazel b/br/pkg/restore/split/BUILD.bazel index d56c1f7f9888f..448f67d62a579 100644 --- a/br/pkg/restore/split/BUILD.bazel +++ b/br/pkg/restore/split/BUILD.bazel @@ -58,7 +58,7 @@ go_test( ], embed = [":split"], flaky = True, - shard_count = 17, + shard_count = 18, deps = [ "//br/pkg/errors", "//br/pkg/utils", diff --git a/br/pkg/restore/split/client.go b/br/pkg/restore/split/client.go index bb9f6e497445a..18bb63b9b59b7 100644 --- a/br/pkg/restore/split/client.go +++ b/br/pkg/restore/split/client.go @@ -191,8 +191,13 @@ func (c *pdClient) scatterRegions(ctx context.Context, newRegions []*RegionInfo) // the retry is for the temporary network errors during sending request. return utils.WithRetry(ctx, func() error { err := c.tryScatterRegions(ctx, newRegions) - if isUnsupportedError(err) { - log.Warn("batch scatter isn't supported, rollback to old method", logutil.ShortError(err)) + // if err is unsupported, we need to fallback to the old method. + // ErrPDRegionsNotFullyScatter means the regions are not fully scattered, + // in new version of PD, the scatter regions API will return the failed regions id, + // but the old version of PD will only return the FinishedPercentage. + // so we need to retry the regions one by one. + if isUnsupportedError(err) || berrors.ErrPDRegionsNotFullyScatter.Equal(err) { + log.Warn("failed to batch scatter regions, rollback to sequentially scatter", logutil.ShortError(err)) c.scatterRegionsSequentially( ctx, newRegions, // backoff about 6s, or we give up scattering this region. @@ -222,6 +227,10 @@ func (c *pdClient) tryScatterRegions(ctx context.Context, regionInfo []*RegionIn return errors.Annotatef(berrors.ErrPDInvalidResponse, "pd returns error during batch scattering: %s", pbErr) } + if finished := resp.GetFinishedPercentage(); finished < 100 { + return errors.Annotatef(berrors.ErrPDRegionsNotFullyScatter, "scatter finished percentage %d less than 100", finished) + } + return nil } diff --git a/br/pkg/restore/split/mock_pd_client.go b/br/pkg/restore/split/mock_pd_client.go index a621219f2a83f..2c121a3358707 100644 --- a/br/pkg/restore/split/mock_pd_client.go +++ b/br/pkg/restore/split/mock_pd_client.go @@ -38,8 +38,9 @@ type MockPDClientForSplit struct { count map[uint64]int } scatterRegions struct { - notImplemented bool - regionCount int + notImplemented bool + regionCount int + finishedPercentage int } getOperator struct { responses map[uint64][]*pdpb.GetOperatorResponse @@ -51,6 +52,7 @@ func NewMockPDClientForSplit() *MockPDClientForSplit { ret := &MockPDClientForSplit{} ret.Regions = &pdtypes.RegionTree{} ret.scatterRegion.count = make(map[uint64]int) + ret.scatterRegions.finishedPercentage = 100 return ret } @@ -178,8 +180,8 @@ func (c *MockPDClientForSplit) ScatterRegions(_ context.Context, regionIDs []uin if c.scatterRegions.notImplemented { return nil, status.Error(codes.Unimplemented, "Ah, yep") } - c.scatterRegions.regionCount += len(regionIDs) - return &pdpb.ScatterRegionResponse{}, nil + c.scatterRegions.regionCount += len(regionIDs) * c.scatterRegions.finishedPercentage / 100 + return &pdpb.ScatterRegionResponse{FinishedPercentage: uint64(c.scatterRegions.finishedPercentage)}, nil } func (c *MockPDClientForSplit) GetOperator(_ context.Context, regionID uint64) (*pdpb.GetOperatorResponse, error) { diff --git a/br/pkg/restore/split/split_test.go b/br/pkg/restore/split/split_test.go index 4d5b01435193c..ac43d6401ee4e 100644 --- a/br/pkg/restore/split/split_test.go +++ b/br/pkg/restore/split/split_test.go @@ -689,3 +689,44 @@ func TestRegionConsistency(t *testing.T) { require.Regexp(t, ca.err, err.Error()) } } + +func TestRegionsNotFullyScatter(t *testing.T) { + mockClient := NewMockPDClientForSplit() + client := pdClient{ + needScatterVal: true, + client: mockClient, + } + client.needScatterInit.Do(func() {}) + ctx := context.Background() + + regions := []*RegionInfo{ + { + Region: &metapb.Region{ + Id: 1, + }, + }, + { + Region: &metapb.Region{ + Id: 2, + }, + }, + } + err := client.scatterRegions(ctx, regions) + require.NoError(t, err) + require.Equal(t, 2, mockClient.scatterRegions.regionCount) + require.Len(t, mockClient.scatterRegion.count, 0) + + // simulate that one region is not fully scattered when scatterRegions + mockClient.scatterRegions.finishedPercentage = 50 + err = client.scatterRegions(ctx, regions) + require.NoError(t, err) + require.Equal(t, 2+1, mockClient.scatterRegions.regionCount) + require.Equal(t, map[uint64]int{1: 1, 2: 1}, mockClient.scatterRegion.count) + + // simulate that the regions is not fully scattered when scatterRegion + mockClient.scatterRegion.eachRegionFailBefore = 7 + err = client.scatterRegions(ctx, regions) + require.NoError(t, err) + require.Equal(t, 2+1+1, mockClient.scatterRegions.regionCount) + require.Equal(t, map[uint64]int{1: 1 + 7, 2: 1 + 7}, mockClient.scatterRegion.count) +} diff --git a/errors.toml b/errors.toml index b3a2e27a2a0df..cd014a6f39dc4 100644 --- a/errors.toml +++ b/errors.toml @@ -181,6 +181,11 @@ error = ''' PD leader not found ''' +["BR:PD:ErrPDRegionsNotFullyScatter"] +error = ''' +regions not fully scattered +''' + ["BR:PD:ErrPDUknownScatterResult"] error = ''' failed to wait region splitted