Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,565 changes: 859 additions & 706 deletions DEPS.bzl

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions br/pkg/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ var (
ErrPDInvalidResponse = errors.Normalize("PD invalid response", errors.RFCCodeText("BR:PD:ErrPDInvalidResponse"))
ErrPDBatchScanRegion = errors.Normalize("batch scan region", errors.RFCCodeText("BR:PD:ErrPDBatchScanRegion"))
ErrPDUnknownScatterResult = errors.Normalize("failed to wait region scattered", errors.RFCCodeText("BR:PD:ErrPDUknownScatterResult"))
ErrPDNotFullyScatter = errors.Normalize("pd not fully scattered", errors.RFCCodeText("BR:PD:ErrPDNotFullyScatter"))
ErrPDSplitFailed = errors.Normalize("failed to wait region splitted", errors.RFCCodeText("BR:PD:ErrPDUknownScatterResult"))

ErrBackupChecksumMismatch = errors.Normalize("backup checksum mismatch", errors.RFCCodeText("BR:Backup:ErrBackupChecksumMismatch"))
Expand Down
4 changes: 4 additions & 0 deletions br/pkg/restore/split/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,11 @@ go_test(
],
embed = [":split"],
flaky = True,
<<<<<<< HEAD
shard_count = 17,
=======
shard_count = 28,
>>>>>>> c6cf760c9af (ingest: retry failed regions when batch scatter regions (#61722))
deps = [
"//br/pkg/errors",
"//br/pkg/utils",
Expand Down
50 changes: 43 additions & 7 deletions br/pkg/restore/split/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,24 +189,50 @@ func (c *pdClient) needScatter(ctx context.Context) bool {
func (c *pdClient) scatterRegions(ctx context.Context, newRegions []*RegionInfo) error {
log.Info("scatter regions", zap.Int("regions", len(newRegions)))
// the retry is for the temporary network errors during sending request.
return utils.WithRetry(ctx, func() error {
err := c.tryScatterRegions(ctx, newRegions)
err := utils.WithRetry(ctx, func() error {
failedRegionsID, err := c.tryScatterRegions(ctx, newRegions)
if isUnsupportedError(err) {
log.Warn("batch scatter isn't supported, rollback to old method", logutil.ShortError(err))
c.scatterRegionsSequentially(
ctx, newRegions,
<<<<<<< HEAD
// backoff about 6s, or we give up scattering this region.
&ExponentialBackoffer{
Attempts: 7,
BaseBackoff: 100 * time.Millisecond,
})
=======
// backoff about 1h total, or we give up scattering this region.
utils.NewBackoffRetryAllErrorStrategy(1800, 100*time.Millisecond, 2*time.Second))
>>>>>>> c6cf760c9af (ingest: retry failed regions when batch scatter regions (#61722))
return nil
}
// If there are failed regions, retry them
if len(failedRegionsID) > 0 {
failedRegions := make([]*RegionInfo, 0, len(failedRegionsID))
for _, region := range newRegions {
if _, exists := failedRegionsID[region.Region.Id]; exists {
failedRegions = append(failedRegions, region)
}
}
newRegions = failedRegions
return errors.Annotatef(berrors.ErrPDNotFullyScatter,
"pd returns error during batch scattering: %d regions failed to scatter", len(failedRegionsID))
}
return err
<<<<<<< HEAD
}, &ExponentialBackoffer{Attempts: 3, BaseBackoff: 500 * time.Millisecond})
=======
}, utils.NewBackoffRetryAllErrorStrategy(1800, 500*time.Millisecond, 2*time.Second))
if err != nil && berrors.ErrPDNotFullyScatter.Equal(err) {
log.Warn("some regions haven't been scattered", zap.Error(err))
return nil
}
return err
>>>>>>> c6cf760c9af (ingest: retry failed regions when batch scatter regions (#61722))
}

func (c *pdClient) tryScatterRegions(ctx context.Context, regionInfo []*RegionInfo) error {
func (c *pdClient) tryScatterRegions(ctx context.Context, regionInfo []*RegionInfo) (map[uint64]struct{}, error) {
regionsID := make([]uint64, 0, len(regionInfo))
for _, v := range regionInfo {
regionsID = append(regionsID, v.Region.Id)
Expand All @@ -216,13 +242,19 @@ func (c *pdClient) tryScatterRegions(ctx context.Context, regionInfo []*RegionIn
}
resp, err := c.client.ScatterRegions(ctx, regionsID, pd.WithSkipStoreLimit())
if err != nil {
return err
return nil, err
}
if pbErr := resp.GetHeader().GetError(); pbErr.GetType() != pdpb.ErrorType_OK {
return errors.Annotatef(berrors.ErrPDInvalidResponse,
return nil, errors.Annotatef(berrors.ErrPDInvalidResponse,
"pd returns error during batch scattering: %s", pbErr)
}
return nil

failedRegionsID := make(map[uint64]struct{})
for _, id := range resp.FailedRegionsId {
failedRegionsID[id] = struct{}{}
}

return failedRegionsID, nil
}

func (c *pdClient) GetStore(ctx context.Context, storeID uint64) (*metapb.Store, error) {
Expand Down Expand Up @@ -1017,6 +1049,8 @@ func PdErrorCanRetry(err error) bool {
// (1) region %d has no leader
// (2) region %d is hot
// (3) region %d is not fully replicated
// (4) operator canceled because cannot add an operator to the execute queue [PD:store-limit]
// (5) failed to create scatter region operator [PD:schedule:ErrCreateOperator]
//
// (2) shouldn't happen in a recently splitted region.
// (1) and (3) might happen, and should be retried.
Expand All @@ -1025,7 +1059,9 @@ func PdErrorCanRetry(err error) bool {
return false
}
return strings.Contains(grpcErr.Message(), "is not fully replicated") ||
strings.Contains(grpcErr.Message(), "has no leader")
strings.Contains(grpcErr.Message(), "has no leader") ||
strings.Contains(grpcErr.Message(), "cannot add an operator to the execute queue") ||
strings.Contains(grpcErr.Message(), "failed to create scatter region operator")
}

// NextBackoff returns a duration to wait before retrying again.
Expand Down
20 changes: 20 additions & 0 deletions br/pkg/restore/split/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import (
"github.com/pingcap/tidb/pkg/types"
"github.com/pingcap/tidb/pkg/util/codec"
"github.com/stretchr/testify/require"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

func TestBatchSplit(t *testing.T) {
Expand Down Expand Up @@ -266,3 +268,21 @@ func TestSplitMeetErrorAndRetry(t *testing.T) {
_, err = mockClient.SplitKeysAndScatter(ctx, [][]byte{{'d'}})
require.ErrorContains(t, err, "no valid key")
}

func TestPDErrorCanRetry(t *testing.T) {
// non-gRPC error should not retry
err := errors.New("random failure")
require.False(t, PdErrorCanRetry(err))

e1 := status.Error(codes.Unknown, "region 42 is not fully replicated")
require.True(t, PdErrorCanRetry(e1))

e2 := status.Error(codes.Unknown, "operator canceled because cannot add an operator to the execute queue")
require.True(t, PdErrorCanRetry(e2))

e3 := status.Error(codes.Unknown, "unable to create operator, failed to create scatter region operator for region 13813282")
require.True(t, PdErrorCanRetry(e3))

e4 := status.Error(codes.Unknown, "should be false")
require.False(t, PdErrorCanRetry(e4))
}
8 changes: 8 additions & 0 deletions br/pkg/restore/split/mock_pd_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type MockPDClientForSplit struct {
scatterRegions struct {
notImplemented bool
regionCount int
failedCount int
}
getOperator struct {
responses map[uint64][]*pdpb.GetOperatorResponse
Expand Down Expand Up @@ -178,6 +179,13 @@ func (c *MockPDClientForSplit) ScatterRegions(_ context.Context, regionIDs []uin
if c.scatterRegions.notImplemented {
return nil, status.Error(codes.Unimplemented, "Ah, yep")
}
if c.scatterRegions.failedCount > 0 {
c.scatterRegions.failedCount--
return &pdpb.ScatterRegionResponse{
FinishedPercentage: 0,
FailedRegionsId: regionIDs[:],
}, nil
}
c.scatterRegions.regionCount += len(regionIDs)
return &pdpb.ScatterRegionResponse{}, nil
}
Expand Down
27 changes: 27 additions & 0 deletions br/pkg/restore/split/split_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,33 @@ func TestScatterSequentiallyRetryCnt(t *testing.T) {
require.Equal(t, 7, backoffer.already)
}

// TestBatchScatterRegionsRetryCnt tests the retry count of BatchScatterRegions.
func TestBatchScatterRegionsRetryCnt(t *testing.T) {
mockClient := NewMockPDClientForSplit()
mockClient.scatterRegions.failedCount = 7
client := pdClient{
needScatterVal: true,
client: mockClient,
}
client.needScatterInit.Do(func() {})

ctx := context.Background()
regions := []*RegionInfo{
{
Region: &metapb.Region{
Id: 1,
},
},
{
Region: &metapb.Region{
Id: 2,
},
},
}
err := client.scatterRegions(ctx, regions)
require.NoError(t, err)
}

func TestScatterBackwardCompatibility(t *testing.T) {
mockClient := NewMockPDClientForSplit()
mockClient.scatterRegions.notImplemented = true
Expand Down
5 changes: 5 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,11 @@ error = '''
PD leader not found
'''

["BR:PD:ErrPDNotFullyScatter"]
error = '''
pd not fully scattered
'''

["BR:PD:ErrPDUknownScatterResult"]
error = '''
failed to wait region splitted
Expand Down
87 changes: 87 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,16 @@ module github.com/pingcap/tidb
go 1.21

require (
<<<<<<< HEAD
cloud.google.com/go/storage v1.36.0
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.9.1
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.5.1
=======
cloud.google.com/go/kms v1.15.8
cloud.google.com/go/storage v1.39.1
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.14.0
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.7.0
>>>>>>> c6cf760c9af (ingest: retry failed regions when batch scatter regions (#61722))
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.0.0
github.com/BurntSushi/toml v1.3.2
github.com/DATA-DOG/go-sqlmock v1.5.0
Expand Down Expand Up @@ -84,8 +91,13 @@ require (
github.com/pingcap/errors v0.11.5-0.20240318064555-6bd07397691f
github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86
github.com/pingcap/fn v1.0.0
<<<<<<< HEAD
github.com/pingcap/kvproto v0.0.0-20240227073058-929ab83f9754
github.com/pingcap/log v1.1.1-0.20240314023424-862ccc32f18d
=======
github.com/pingcap/kvproto v0.0.0-20250605100108-dc99a8f6e348
github.com/pingcap/log v1.1.1-0.20250514022801-14f3b4ca066e
>>>>>>> c6cf760c9af (ingest: retry failed regions when batch scatter regions (#61722))
github.com/pingcap/sysutil v1.0.1-0.20240311050922-ae81ee01f3a5
github.com/pingcap/tidb/pkg/parser v0.0.0-20211011031125-9b13dc409c5e
github.com/pingcap/tipb v0.0.0-20240318032315-55a7867ddd50
Expand Down Expand Up @@ -115,10 +127,18 @@ require (
github.com/wangjohn/quickselect v0.0.0-20161129230411-ed8402a42d5f
github.com/xitongsys/parquet-go v1.5.5-0.20201110004701-b09c49d6d457
github.com/xitongsys/parquet-go-source v0.0.0-20200817004010-026bad9b25d0
<<<<<<< HEAD
go.etcd.io/etcd/api/v3 v3.5.12
go.etcd.io/etcd/client/pkg/v3 v3.5.12
go.etcd.io/etcd/client/v3 v3.5.12
go.etcd.io/etcd/server/v3 v3.5.12
=======
github.com/zyedidia/generic v1.2.1
go.etcd.io/etcd/api/v3 v3.5.15
go.etcd.io/etcd/client/pkg/v3 v3.5.15
go.etcd.io/etcd/client/v3 v3.5.15
go.etcd.io/etcd/server/v3 v3.5.15
>>>>>>> c6cf760c9af (ingest: retry failed regions when batch scatter regions (#61722))
go.etcd.io/etcd/tests/v3 v3.5.12
go.opencensus.io v0.24.0
go.uber.org/atomic v1.11.0
Expand All @@ -127,6 +147,7 @@ require (
go.uber.org/mock v0.4.0
go.uber.org/multierr v1.11.0
go.uber.org/zap v1.27.0
<<<<<<< HEAD
golang.org/x/exp v0.0.0-20240404231335-c0f41cb1a7a0
golang.org/x/net v0.24.0
golang.org/x/oauth2 v0.18.0
Expand All @@ -140,6 +161,23 @@ require (
google.golang.org/grpc v1.62.1
gopkg.in/yaml.v2 v2.4.0
honnef.co/go/tools v0.4.7
=======
golang.org/x/exp v0.0.0-20240909161429-701f63a606c0
golang.org/x/net v0.40.0
golang.org/x/oauth2 v0.30.0
golang.org/x/sync v0.15.0
golang.org/x/sys v0.33.0
golang.org/x/term v0.32.0
golang.org/x/text v0.25.0
golang.org/x/time v0.12.0
golang.org/x/tools v0.33.0
google.golang.org/api v0.170.0
google.golang.org/grpc v1.63.2
gopkg.in/yaml.v2 v2.4.0
gorm.io/driver/mysql v1.5.7
gorm.io/gorm v1.25.12
honnef.co/go/tools v0.6.1
>>>>>>> c6cf760c9af (ingest: retry failed regions when batch scatter regions (#61722))
k8s.io/api v0.29.11
sourcegraph.com/sourcegraph/appdash v0.0.0-20190731080439-ebfcffb1b5c0
sourcegraph.com/sourcegraph/appdash-data v0.0.0-20151005221446-73f23eafcf67
Expand All @@ -148,6 +186,7 @@ require (
require (
github.com/cockroachdb/tokenbucket v0.0.0-20230807174530-cc333fc44b06 // indirect
github.com/getsentry/sentry-go v0.27.0 // indirect
<<<<<<< HEAD
)

require (
Expand All @@ -157,6 +196,33 @@ require (
cloud.google.com/go/iam v1.1.6 // indirect
cloud.google.com/go/pubsub v1.36.1 // indirect
github.com/Azure/azure-sdk-for-go/sdk/internal v1.5.1 // indirect
=======
github.com/goccy/go-reflect v1.2.0 // indirect
github.com/google/flatbuffers v2.0.8+incompatible // indirect
github.com/jinzhu/inflection v1.0.0 // indirect
github.com/jinzhu/now v1.1.5 // indirect
github.com/klauspost/asmfmt v1.3.2 // indirect
github.com/klauspost/cpuid/v2 v2.2.7 // indirect
github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 // indirect
github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/pierrec/lz4/v4 v4.1.15 // indirect
github.com/qri-io/jsonpointer v0.1.1 // indirect
github.com/segmentio/fasthash v1.0.3 // indirect
github.com/tidwall/gjson v1.14.4 // indirect
github.com/tidwall/match v1.1.1 // indirect
github.com/tidwall/pretty v1.2.1 // indirect
github.com/tidwall/sjson v1.2.5 // indirect
github.com/zeebo/xxh3 v1.0.2 // indirect
)

require (
cloud.google.com/go v0.112.2 // indirect
cloud.google.com/go/compute/metadata v0.3.0 // indirect
cloud.google.com/go/iam v1.1.7 // indirect
cloud.google.com/go/pubsub v1.37.0 // indirect
github.com/Azure/azure-sdk-for-go/sdk/internal v1.10.0 // indirect
>>>>>>> c6cf760c9af (ingest: retry failed regions when batch scatter regions (#61722))
github.com/Azure/go-ntlmssp v0.0.0-20221128193559-754e69321358 // indirect
github.com/AzureAD/microsoft-authentication-library-for-go v1.2.1 // indirect
github.com/DataDog/zstd v1.5.5 // indirect
Expand Down Expand Up @@ -202,7 +268,11 @@ require (
github.com/google/renameio/v2 v2.0.0 // indirect
github.com/google/s2a-go v0.1.7 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.3.2 // indirect
<<<<<<< HEAD
github.com/googleapis/gax-go/v2 v2.12.0 // indirect
=======
github.com/googleapis/gax-go/v2 v2.12.3 // indirect
>>>>>>> c6cf760c9af (ingest: retry failed regions when batch scatter regions (#61722))
github.com/gorilla/handlers v1.5.1 // indirect
github.com/gorilla/websocket v1.5.1 // indirect
github.com/gostaticanalysis/analysisutil v0.7.1 // indirect
Expand Down Expand Up @@ -276,13 +346,23 @@ require (
github.com/uber/jaeger-lib v2.4.1+incompatible // indirect
github.com/xiang90/probing v0.0.0-20221125231312-a49e3df8f510 // indirect
github.com/yusufpapurcu/wmi v1.2.4 // indirect
<<<<<<< HEAD
go.etcd.io/bbolt v1.3.8 // indirect
go.etcd.io/etcd/client/v2 v2.305.12 // indirect
go.etcd.io/etcd/pkg/v3 v3.5.12 // indirect
go.etcd.io/etcd/raft/v3 v3.5.12 // indirect
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.47.0 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.47.0 // indirect
go.opentelemetry.io/otel v1.22.0 // indirect
=======
go.etcd.io/bbolt v1.3.10 // indirect
go.etcd.io/etcd/client/v2 v2.305.15 // indirect
go.etcd.io/etcd/pkg/v3 v3.5.15 // indirect
go.etcd.io/etcd/raft/v3 v3.5.15 // indirect
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.49.0 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0 // indirect
go.opentelemetry.io/otel v1.24.0 // indirect
>>>>>>> c6cf760c9af (ingest: retry failed regions when batch scatter regions (#61722))
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.22.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.22.0 // indirect
go.opentelemetry.io/otel/metric v1.22.0 // indirect
Expand All @@ -293,12 +373,19 @@ require (
golang.org/x/exp/typeparams v0.0.0-20240314144324-c7f7c6466f7f // indirect
golang.org/x/mod v0.17.0 // indirect
golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 // indirect
<<<<<<< HEAD
gonum.org/v1/gonum v0.8.2 // indirect
google.golang.org/appengine v1.6.8 // indirect
google.golang.org/genproto v0.0.0-20240213162025-012b6fc9bca9 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20240304212257-790db918fca8 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240308144416-29370a3891b7 // indirect
google.golang.org/protobuf v1.33.0 // indirect
=======
google.golang.org/genproto v0.0.0-20240401170217-c3f982113cda // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20240401170217-c3f982113cda // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240515191416-fc5f0ca64291 // indirect
google.golang.org/protobuf v1.36.6
>>>>>>> c6cf760c9af (ingest: retry failed regions when batch scatter regions (#61722))
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
Expand Down
Loading
Loading