-
Notifications
You must be signed in to change notification settings - Fork 6.1k
ingest: retry failed regions when batch scatter regions #61722
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -189,21 +189,38 @@ func (c *pdClient) needScatter(ctx context.Context) bool { | |
| func (c *pdClient) scatterRegions(ctx context.Context, newRegions []*RegionInfo) error { | ||
| log.Info("scatter regions", zap.Int("regions", len(newRegions))) | ||
| // the retry is for the temporary network errors during sending request. | ||
| return utils.WithRetry(ctx, func() error { | ||
| err := c.tryScatterRegions(ctx, newRegions) | ||
| err := utils.WithRetry(ctx, func() error { | ||
| failedRegionsID, err := c.tryScatterRegions(ctx, newRegions) | ||
| if isUnsupportedError(err) { | ||
| log.Warn("batch scatter isn't supported, rollback to old method", logutil.ShortError(err)) | ||
| c.scatterRegionsSequentially( | ||
| ctx, newRegions, | ||
| // backoff about 6s, or we give up scattering this region. | ||
| utils.NewBackoffRetryAllErrorStrategy(7, 100*time.Millisecond, 2*time.Second)) | ||
| // backoff about 1h total, or we give up scattering this region. | ||
| utils.NewBackoffRetryAllErrorStrategy(1800, 100*time.Millisecond, 2*time.Second)) | ||
| return nil | ||
| } | ||
| // If there are failed regions, retry them | ||
| if len(failedRegionsID) > 0 { | ||
| failedRegions := make([]*RegionInfo, 0, len(failedRegionsID)) | ||
| for _, region := range newRegions { | ||
| if _, exists := failedRegionsID[region.Region.Id]; exists { | ||
| failedRegions = append(failedRegions, region) | ||
| } | ||
| } | ||
| newRegions = failedRegions | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I’m concerned that the failure-handling logic here may not be robust. If a region is mistakenly judged as needing to be scattered and the operation fails, it appears that this region won’t be retried later. Consider this scenario: a small number of regions fail to add scatter operators, while others succeed. However, due to the asynchronous nature of operator execution, even regions that successfully had operators added might still time out or encounter other issues afterward. Does this scenario align with the intended design? |
||
| return errors.Annotatef(berrors.ErrPDNotFullyScatter, | ||
| "pd returns error during batch scattering: %d regions failed to scatter", len(failedRegionsID)) | ||
| } | ||
| return err | ||
| }, utils.NewBackoffRetryAllErrorStrategy(3, 500*time.Millisecond, 2*time.Second)) | ||
| }, utils.NewBackoffRetryAllErrorStrategy(1800, 500*time.Millisecond, 2*time.Second)) | ||
| if err != nil && berrors.ErrPDNotFullyScatter.Equal(err) { | ||
| log.Warn("some regions haven't been scattered", zap.Error(err)) | ||
| return nil | ||
| } | ||
| return err | ||
| } | ||
|
|
||
| func (c *pdClient) tryScatterRegions(ctx context.Context, regionInfo []*RegionInfo) error { | ||
| func (c *pdClient) tryScatterRegions(ctx context.Context, regionInfo []*RegionInfo) (map[uint64]struct{}, error) { | ||
tangenta marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| regionsID := make([]uint64, 0, len(regionInfo)) | ||
| for _, v := range regionInfo { | ||
| regionsID = append(regionsID, v.Region.Id) | ||
|
|
@@ -213,13 +230,19 @@ func (c *pdClient) tryScatterRegions(ctx context.Context, regionInfo []*RegionIn | |
| } | ||
| resp, err := c.client.ScatterRegions(ctx, regionsID, opt.WithSkipStoreLimit()) | ||
| if err != nil { | ||
| return err | ||
| return nil, err | ||
| } | ||
| if pbErr := resp.GetHeader().GetError(); pbErr.GetType() != pdpb.ErrorType_OK { | ||
| return errors.Annotatef(berrors.ErrPDInvalidResponse, | ||
| return nil, errors.Annotatef(berrors.ErrPDInvalidResponse, | ||
| "pd returns error during batch scattering: %s", pbErr) | ||
| } | ||
| return nil | ||
|
|
||
| failedRegionsID := make(map[uint64]struct{}) | ||
| for _, id := range resp.FailedRegionsId { | ||
| failedRegionsID[id] = struct{}{} | ||
| } | ||
|
|
||
| return failedRegionsID, nil | ||
| } | ||
|
|
||
| func (c *pdClient) GetStore(ctx context.Context, storeID uint64) (*metapb.Store, error) { | ||
|
|
@@ -1001,6 +1024,8 @@ func PdErrorCanRetry(err error) bool { | |
| // (1) region %d has no leader | ||
| // (2) region %d is hot | ||
| // (3) region %d is not fully replicated | ||
| // (4) operator canceled because cannot add an operator to the execute queue [PD:store-limit] | ||
| // (5) failed to create scatter region operator [PD:schedule:ErrCreateOperator] | ||
| // | ||
| // (2) shouldn't happen in a recently splitted region. | ||
| // (1) and (3) might happen, and should be retried. | ||
|
|
@@ -1009,7 +1034,9 @@ func PdErrorCanRetry(err error) bool { | |
| return false | ||
| } | ||
| return strings.Contains(grpcErr.Message(), "is not fully replicated") || | ||
| strings.Contains(grpcErr.Message(), "has no leader") | ||
| strings.Contains(grpcErr.Message(), "has no leader") || | ||
| strings.Contains(grpcErr.Message(), "cannot add an operator to the execute queue") || | ||
| strings.Contains(grpcErr.Message(), "failed to create scatter region operator") | ||
| } | ||
|
|
||
| // NextBackoff returns a duration to wait before retrying again. | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.