diff --git a/br/pkg/errors/errors.go b/br/pkg/errors/errors.go index ec5838ca1c60c..a21855f95c589 100644 --- a/br/pkg/errors/errors.go +++ b/br/pkg/errors/errors.go @@ -43,13 +43,14 @@ var ( ErrMigrationNotFound = errors.Normalize("no migration found", errors.RFCCodeText("BR:Common:ErrMigrationNotFound")) ErrMigrationVersionNotSupported = errors.Normalize("the migration version isn't supported", errors.RFCCodeText("BR:Common:ErrMigrationVersionNotSupported")) - ErrPDUpdateFailed = errors.Normalize("failed to update PD", errors.RFCCodeText("BR:PD:ErrPDUpdateFailed")) - ErrPDLeaderNotFound = errors.Normalize("PD leader not found", errors.RFCCodeText("BR:PD:ErrPDLeaderNotFound")) - ErrPDInvalidResponse = errors.Normalize("PD invalid response", errors.RFCCodeText("BR:PD:ErrPDInvalidResponse")) - ErrPDBatchScanRegion = errors.Normalize("batch scan region", errors.RFCCodeText("BR:PD:ErrPDBatchScanRegion")) - ErrPDUnknownScatterResult = errors.Normalize("failed to wait region scattered", errors.RFCCodeText("BR:PD:ErrPDUknownScatterResult")) - ErrPDNotFullyScatter = errors.Normalize("pd not fully scattered", errors.RFCCodeText("BR:PD:ErrPDNotFullyScatter")) - ErrPDSplitFailed = errors.Normalize("failed to wait region split", errors.RFCCodeText("BR:PD:ErrPDUknownScatterResult")) + ErrPDUpdateFailed = errors.Normalize("failed to update PD", errors.RFCCodeText("BR:PD:ErrPDUpdateFailed")) + ErrPDLeaderNotFound = errors.Normalize("PD leader not found", errors.RFCCodeText("BR:PD:ErrPDLeaderNotFound")) + ErrPDInvalidResponse = errors.Normalize("PD invalid response", errors.RFCCodeText("BR:PD:ErrPDInvalidResponse")) + ErrPDBatchScanRegion = errors.Normalize("batch scan region", errors.RFCCodeText("BR:PD:ErrPDBatchScanRegion")) + ErrPDUnknownScatterResult = errors.Normalize("failed to wait region scattered", errors.RFCCodeText("BR:PD:ErrPDUknownScatterResult")) + ErrPDNotFullyScatter = errors.Normalize("pd not fully scattered", errors.RFCCodeText("BR:PD:ErrPDNotFullyScatter")) + ErrPDSplitFailed = errors.Normalize("failed to wait region split", errors.RFCCodeText("BR:PD:ErrPDUknownScatterResult")) + ErrPDRegionsNotFullyScatter = errors.Normalize("regions not fully scattered", errors.RFCCodeText("BR:PD:ErrPDRegionsNotFullyScatter")) ErrBackupChecksumMismatch = errors.Normalize("backup checksum mismatch", errors.RFCCodeText("BR:Backup:ErrBackupChecksumMismatch")) ErrBackupInvalidRange = errors.Normalize("backup range invalid", errors.RFCCodeText("BR:Backup:ErrBackupInvalidRange")) diff --git a/br/pkg/restore/split/BUILD.bazel b/br/pkg/restore/split/BUILD.bazel index 2247b758f893b..5e96c352d4e03 100644 --- a/br/pkg/restore/split/BUILD.bazel +++ b/br/pkg/restore/split/BUILD.bazel @@ -65,7 +65,7 @@ go_test( ], embed = [":split"], flaky = True, - shard_count = 28, + shard_count = 29, deps = [ "//br/pkg/errors", "//br/pkg/restore/utils", diff --git a/br/pkg/restore/split/client.go b/br/pkg/restore/split/client.go index 0b9036a34f6d1..83319b04f0931 100644 --- a/br/pkg/restore/split/client.go +++ b/br/pkg/restore/split/client.go @@ -191,8 +191,13 @@ func (c *pdClient) scatterRegions(ctx context.Context, newRegions []*RegionInfo) // the retry is for the temporary network errors during sending request. err := utils.WithRetry(ctx, func() error { failedRegionsID, err := c.tryScatterRegions(ctx, newRegions) - if isUnsupportedError(err) { - log.Warn("batch scatter isn't supported, rollback to old method", logutil.ShortError(err)) + // if err is unsupported, we need to fallback to the old method. + // ErrPDRegionsNotFullyScatter means the regions are not fully scattered, + // in new version of PD, the scatter regions API will return the failed regions id, + // but the old version of PD will only return the FinishedPercentage. + // so we need to retry the regions one by one. + if isUnsupportedError(err) || berrors.ErrPDRegionsNotFullyScatter.Equal(err) { + log.Warn("failed to batch scatter regions, rollback to sequentially scatter", logutil.ShortError(err)) c.scatterRegionsSequentially( ctx, newRegions, // backoff about 1h total, or we give up scattering this region. @@ -237,12 +242,19 @@ func (c *pdClient) tryScatterRegions(ctx context.Context, regionInfo []*RegionIn "pd returns error during batch scattering: %s", pbErr) } - failedRegionsID := make(map[uint64]struct{}) - for _, id := range resp.FailedRegionsId { - failedRegionsID[id] = struct{}{} + if len(resp.FailedRegionsId) > 0 { + failedRegionsID := make(map[uint64]struct{}) + for _, id := range resp.FailedRegionsId { + failedRegionsID[id] = struct{}{} + } + return failedRegionsID, nil + } + + if finished := resp.GetFinishedPercentage(); finished < 100 { + return nil, errors.Annotatef(berrors.ErrPDRegionsNotFullyScatter, "scatter finished percentage %d less than 100", finished) } - return failedRegionsID, nil + return nil, nil } func (c *pdClient) GetStore(ctx context.Context, storeID uint64) (*metapb.Store, error) { diff --git a/br/pkg/restore/split/mock_pd_client.go b/br/pkg/restore/split/mock_pd_client.go index 5833be433e3ff..1f864787a9903 100644 --- a/br/pkg/restore/split/mock_pd_client.go +++ b/br/pkg/restore/split/mock_pd_client.go @@ -192,9 +192,10 @@ type MockPDClientForSplit struct { count map[uint64]int } scatterRegions struct { - notImplemented bool - regionCount int - failedCount int + notImplemented bool + regionCount int + failedCount int + finishedPercentage int } getOperator struct { responses map[uint64][]*pdpb.GetOperatorResponse @@ -206,6 +207,7 @@ func NewMockPDClientForSplit() *MockPDClientForSplit { ret := &MockPDClientForSplit{} ret.Regions = &pdtypes.RegionTree{} ret.scatterRegion.count = make(map[uint64]int) + ret.scatterRegions.finishedPercentage = 100 return ret } @@ -389,8 +391,8 @@ func (c *MockPDClientForSplit) ScatterRegions(_ context.Context, regionIDs []uin FailedRegionsId: regionIDs[:], }, nil } - c.scatterRegions.regionCount += len(regionIDs) - return &pdpb.ScatterRegionResponse{}, nil + c.scatterRegions.regionCount += len(regionIDs) * c.scatterRegions.finishedPercentage / 100 + return &pdpb.ScatterRegionResponse{FinishedPercentage: uint64(c.scatterRegions.finishedPercentage)}, nil } func (c *MockPDClientForSplit) GetOperator(_ context.Context, regionID uint64) (*pdpb.GetOperatorResponse, error) { diff --git a/br/pkg/restore/split/split_test.go b/br/pkg/restore/split/split_test.go index 60b0df2054265..9e8a186195341 100644 --- a/br/pkg/restore/split/split_test.go +++ b/br/pkg/restore/split/split_test.go @@ -1031,3 +1031,44 @@ func TestSplitPoint2(t *testing.T) { }) require.NoError(t, err) } + +func TestRegionsNotFullyScatter(t *testing.T) { + mockClient := NewMockPDClientForSplit() + client := pdClient{ + needScatterVal: true, + client: mockClient, + } + client.needScatterInit.Do(func() {}) + ctx := context.Background() + + regions := []*RegionInfo{ + { + Region: &metapb.Region{ + Id: 1, + }, + }, + { + Region: &metapb.Region{ + Id: 2, + }, + }, + } + err := client.scatterRegions(ctx, regions) + require.NoError(t, err) + require.Equal(t, 2, mockClient.scatterRegions.regionCount) + require.Len(t, mockClient.scatterRegion.count, 0) + + // simulate that one region is not fully scattered when scatterRegions + mockClient.scatterRegions.finishedPercentage = 50 + err = client.scatterRegions(ctx, regions) + require.NoError(t, err) + require.Equal(t, 2+1, mockClient.scatterRegions.regionCount) + require.Equal(t, map[uint64]int{1: 1, 2: 1}, mockClient.scatterRegion.count) + + // simulate that the regions is not fully scattered when scatterRegion + mockClient.scatterRegion.eachRegionFailBefore = 7 + err = client.scatterRegions(ctx, regions) + require.NoError(t, err) + require.Equal(t, 2+1+1, mockClient.scatterRegions.regionCount) + require.Equal(t, map[uint64]int{1: 1 + 7, 2: 1 + 7}, mockClient.scatterRegion.count) +} diff --git a/errors.toml b/errors.toml index 696a5e98203fb..53c726e931bf0 100644 --- a/errors.toml +++ b/errors.toml @@ -201,6 +201,11 @@ error = ''' pd not fully scattered ''' +["BR:PD:ErrPDRegionsNotFullyScatter"] +error = ''' +regions not fully scattered +''' + ["BR:PD:ErrPDUknownScatterResult"] error = ''' failed to wait region split