Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions br/pkg/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,13 @@ var (
ErrUnsupportedOperation = errors.Normalize("the operation is not supported", errors.RFCCodeText("BR:Common:ErrUnsupportedOperation"))
ErrInvalidRange = errors.Normalize("invalid restore range", errors.RFCCodeText("BR:Common:ErrInvalidRange"))

ErrPDUpdateFailed = errors.Normalize("failed to update PD", errors.RFCCodeText("BR:PD:ErrPDUpdateFailed"))
ErrPDLeaderNotFound = errors.Normalize("PD leader not found", errors.RFCCodeText("BR:PD:ErrPDLeaderNotFound"))
ErrPDInvalidResponse = errors.Normalize("PD invalid response", errors.RFCCodeText("BR:PD:ErrPDInvalidResponse"))
ErrPDBatchScanRegion = errors.Normalize("batch scan region", errors.RFCCodeText("BR:PD:ErrPDBatchScanRegion"))
ErrPDUnknownScatterResult = errors.Normalize("failed to wait region scattered", errors.RFCCodeText("BR:PD:ErrPDUknownScatterResult"))
ErrPDSplitFailed = errors.Normalize("failed to wait region splitted", errors.RFCCodeText("BR:PD:ErrPDUknownScatterResult"))
ErrPDUpdateFailed = errors.Normalize("failed to update PD", errors.RFCCodeText("BR:PD:ErrPDUpdateFailed"))
ErrPDLeaderNotFound = errors.Normalize("PD leader not found", errors.RFCCodeText("BR:PD:ErrPDLeaderNotFound"))
ErrPDInvalidResponse = errors.Normalize("PD invalid response", errors.RFCCodeText("BR:PD:ErrPDInvalidResponse"))
ErrPDBatchScanRegion = errors.Normalize("batch scan region", errors.RFCCodeText("BR:PD:ErrPDBatchScanRegion"))
ErrPDUnknownScatterResult = errors.Normalize("failed to wait region scattered", errors.RFCCodeText("BR:PD:ErrPDUknownScatterResult"))
ErrPDSplitFailed = errors.Normalize("failed to wait region splitted", errors.RFCCodeText("BR:PD:ErrPDUknownScatterResult"))
ErrPDRegionsNotFullyScatter = errors.Normalize("regions not fully scattered", errors.RFCCodeText("BR:PD:ErrPDRegionsNotFullyScatter"))

ErrBackupChecksumMismatch = errors.Normalize("backup checksum mismatch", errors.RFCCodeText("BR:Backup:ErrBackupChecksumMismatch"))
ErrBackupInvalidRange = errors.Normalize("backup range invalid", errors.RFCCodeText("BR:Backup:ErrBackupInvalidRange"))
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/restore/split/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ go_test(
],
embed = [":split"],
flaky = True,
shard_count = 19,
shard_count = 21,
deps = [
"//br/pkg/errors",
"//br/pkg/utils",
Expand Down
36 changes: 30 additions & 6 deletions br/pkg/restore/split/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,19 +190,25 @@ func (c *pdClient) scatterRegions(ctx context.Context, newRegions []*RegionInfo)
// the retry is for the temporary network errors during sending request.
return utils.WithRetry(ctx, func() error {
err := c.tryScatterRegions(ctx, newRegions)
if isUnsupportedError(err) {
log.Warn("batch scatter isn't supported, rollback to old method", logutil.ShortError(err))
// if err is unsupported, we need to fallback to the old method.
// ErrPDRegionsNotFullyScatter means the regions are not fully scattered,
// in new version of PD, the scatter regions API will return the failed regions id,
// but the old version of PD will only return the FinishedPercentage.
// so we need to retry the regions one by one.
if isUnsupportedError(err) || berrors.ErrPDRegionsNotFullyScatter.Equal(err) {
log.Warn("failed to batch scatter regions, rollback to sequentially scatter", logutil.ShortError(err))
c.scatterRegionsSequentially(
ctx, newRegions,
// backoff about 6s, or we give up scattering this region.
// backoff about 1h total, or we give up scattering this region.
&ExponentialBackoffer{
Attempts: 7,
Attempts: 1800,
BaseBackoff: 100 * time.Millisecond,
MaxDelay: 2 * time.Second,
})
return nil
}
return err
}, &ExponentialBackoffer{Attempts: 3, BaseBackoff: 500 * time.Millisecond})
}, &ExponentialBackoffer{Attempts: 3, BaseBackoff: 500 * time.Millisecond, MaxDelay: 2 * time.Second})
}

func (c *pdClient) tryScatterRegions(ctx context.Context, regionInfo []*RegionInfo) error {
Expand All @@ -221,6 +227,10 @@ func (c *pdClient) tryScatterRegions(ctx context.Context, regionInfo []*RegionIn
return errors.Annotatef(berrors.ErrPDInvalidResponse,
"pd returns error during batch scattering: %s", pbErr)
}
if finished := resp.GetFinishedPercentage(); finished < 100 {
return errors.Annotatef(berrors.ErrPDRegionsNotFullyScatter, "scatter finished percentage %d less than 100", finished)
}

return nil
}

Expand Down Expand Up @@ -828,6 +838,11 @@ func (c *pdClient) scatterRegionsSequentially(ctx context.Context, newRegions []
logutil.Region(region.Region),
)
delete(newRegionSet, region.Region.Id)
} else {
log.Warn("scatter region meet error, will retry",
logutil.ShortError(err),
logutil.Region(region.Region),
)
}
errs = multierr.Append(errs, err)
}
Expand Down Expand Up @@ -1003,6 +1018,7 @@ func CheckRegionEpoch(_new, _old *RegionInfo) bool {
type ExponentialBackoffer struct {
Attempts int
BaseBackoff time.Duration
MaxDelay time.Duration
}

func (b *ExponentialBackoffer) exponentialBackoff() time.Duration {
Expand All @@ -1012,6 +1028,9 @@ func (b *ExponentialBackoffer) exponentialBackoff() time.Duration {
return 0
}
b.BaseBackoff *= 2
if b.MaxDelay > 0 && b.BaseBackoff > b.MaxDelay {
b.BaseBackoff = b.MaxDelay
}
return bo
}

Expand All @@ -1024,12 +1043,17 @@ func PdErrorCanRetry(err error) bool {
//
// (2) shouldn't happen in a recently splitted region.
// (1) and (3) might happen, and should be retried.
//
// (4) operator canceled because cannot add an operator to the execute queue [PD:store-limit]
// (5) failed to create scatter region operator [PD:schedule:ErrCreateOperator]
grpcErr := status.Convert(err)
if grpcErr == nil {
return false
}
return strings.Contains(grpcErr.Message(), "is not fully replicated") ||
strings.Contains(grpcErr.Message(), "has no leader")
strings.Contains(grpcErr.Message(), "has no leader") ||
strings.Contains(grpcErr.Message(), "cannot add an operator to the execute queue") ||
strings.Contains(grpcErr.Message(), "failed to create scatter region operator")
}

// NextBackoff returns a duration to wait before retrying again.
Expand Down
20 changes: 20 additions & 0 deletions br/pkg/restore/split/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import (
"github.com/pingcap/tidb/pkg/types"
"github.com/pingcap/tidb/pkg/util/codec"
"github.com/stretchr/testify/require"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

func TestBatchSplit(t *testing.T) {
Expand Down Expand Up @@ -301,3 +303,21 @@ func TestSplitMeetErrorAndRetry(t *testing.T) {
_, err = mockClient.SplitKeysAndScatter(ctx, [][]byte{{'d'}})
require.ErrorContains(t, err, "no valid key")
}

func TestPDErrorCanRetry(t *testing.T) {
// non-gRPC error should not retry
err := errors.New("random failure")
require.False(t, PdErrorCanRetry(err))

e1 := status.Error(codes.Unknown, "region 42 is not fully replicated")
require.True(t, PdErrorCanRetry(e1))

e2 := status.Error(codes.Unknown, "operator canceled because cannot add an operator to the execute queue")
require.True(t, PdErrorCanRetry(e2))

e3 := status.Error(codes.Unknown, "unable to create operator, failed to create scatter region operator for region 13813282")
require.True(t, PdErrorCanRetry(e3))

e4 := status.Error(codes.Unknown, "should be false")
require.False(t, PdErrorCanRetry(e4))
}
10 changes: 6 additions & 4 deletions br/pkg/restore/split/mock_pd_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,9 @@ type MockPDClientForSplit struct {
count map[uint64]int
}
scatterRegions struct {
notImplemented bool
regionCount int
notImplemented bool
regionCount int
finishedPercentage int
}
getOperator struct {
responses map[uint64][]*pdpb.GetOperatorResponse
Expand All @@ -52,6 +53,7 @@ func NewMockPDClientForSplit() *MockPDClientForSplit {
ret := &MockPDClientForSplit{}
ret.Regions = &pdtypes.RegionTree{}
ret.scatterRegion.count = make(map[uint64]int)
ret.scatterRegions.finishedPercentage = 100
return ret
}

Expand Down Expand Up @@ -221,8 +223,8 @@ func (c *MockPDClientForSplit) ScatterRegions(_ context.Context, regionIDs []uin
if c.scatterRegions.notImplemented {
return nil, status.Error(codes.Unimplemented, "Ah, yep")
}
c.scatterRegions.regionCount += len(regionIDs)
return &pdpb.ScatterRegionResponse{}, nil
c.scatterRegions.regionCount += len(regionIDs) * c.scatterRegions.finishedPercentage / 100
return &pdpb.ScatterRegionResponse{FinishedPercentage: uint64(c.scatterRegions.finishedPercentage)}, nil
}

func (c *MockPDClientForSplit) GetOperator(_ context.Context, regionID uint64) (*pdpb.GetOperatorResponse, error) {
Expand Down
41 changes: 41 additions & 0 deletions br/pkg/restore/split/split_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -756,3 +756,44 @@ func TestScanRegionsWithRetry(t *testing.T) {
require.Equal(t, []byte("2"), regions[1].Region.StartKey)
}
}

func TestRegionsNotFullyScatter(t *testing.T) {
mockClient := NewMockPDClientForSplit()
client := pdClient{
needScatterVal: true,
client: mockClient,
}
client.needScatterInit.Do(func() {})
ctx := context.Background()

regions := []*RegionInfo{
{
Region: &metapb.Region{
Id: 1,
},
},
{
Region: &metapb.Region{
Id: 2,
},
},
}
err := client.scatterRegions(ctx, regions)
require.NoError(t, err)
require.Equal(t, 2, mockClient.scatterRegions.regionCount)
require.Len(t, mockClient.scatterRegion.count, 0)

// simulate that one region is not fully scattered when scatterRegions
mockClient.scatterRegions.finishedPercentage = 50
err = client.scatterRegions(ctx, regions)
require.NoError(t, err)
require.Equal(t, 2+1, mockClient.scatterRegions.regionCount)
require.Equal(t, map[uint64]int{1: 1, 2: 1}, mockClient.scatterRegion.count)

// simulate that the regions is not fully scattered when scatterRegion
mockClient.scatterRegion.eachRegionFailBefore = 7
err = client.scatterRegions(ctx, regions)
require.NoError(t, err)
require.Equal(t, 2+1+1, mockClient.scatterRegions.regionCount)
require.Equal(t, map[uint64]int{1: 1 + 7, 2: 1 + 7}, mockClient.scatterRegion.count)
}
6 changes: 3 additions & 3 deletions br/tests/br_replica_read/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ fi
# set random store to read only
random_store_id=$(run_pd_ctl -u https://$PD_ADDR store | jq 'first(.stores[]|select(.store.labels|(.!= null and any(.key == "engine" and .value=="tiflash"))| not)|.store.id)')
echo "random store id: $random_store_id"
run_pd_ctl -u https://$PD_ADDR store label $random_store_id '$mode' 'read_only'
run_pd_ctl -u https://$PD_ADDR store label $random_store_id 'mode' 'read_only'

# set placement rule to add a learner replica for each region in the read only store
run_pd_ctl -u https://$PD_ADDR config placement-rules rule-bundle load --out=$TEST_DIR/default_rules.json
Expand Down Expand Up @@ -56,7 +56,7 @@ run_sql "INSERT INTO $DB.usertable2 VALUES (\"c\", \"d\");"

# backup db
echo "backup start..."
run_br -u https://$PD_ADDR backup db --db "$DB" -s "local://$TEST_DIR/$DB" --replica-read-label '$mode:read_only'
run_br -u https://$PD_ADDR backup db --db "$DB" -s "local://$TEST_DIR/$DB" --replica-read-label 'mode:read_only'

run_sql "DROP DATABASE $DB;"

Expand Down Expand Up @@ -86,5 +86,5 @@ run_curl https://$TIDB_STATUS_ADDR/ddl/history | grep -E '/\*from\(br\)\*/CREATE
run_curl https://$TIDB_STATUS_ADDR/ddl/history | grep -E '/\*from\(br\)\*/CREATE DATABASE'

run_sql "DROP DATABASE $DB;"
run_pd_ctl -u https://$PD_ADDR store label $random_store_id '$mode' ''
run_pd_ctl -u https://$PD_ADDR store label $random_store_id 'mode' ''
run_pd_ctl -u https://$PD_ADDR config placement-rules rule-bundle save --in $TEST_DIR/default_rules.json
5 changes: 5 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,11 @@ error = '''
PD leader not found
'''

["BR:PD:ErrPDRegionsNotFullyScatter"]
error = '''
regions not fully scattered
'''

["BR:PD:ErrPDUknownScatterResult"]
error = '''
failed to wait region splitted
Expand Down
2 changes: 1 addition & 1 deletion tests/_utils/run_services
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ export TIDB_STATUS_ADDR="127.0.0.1:10080"
# actual tikv_addr are TIKV_ADDR${i}
export TIKV_ADDR="127.0.0.1:2016"
export TIKV_STATUS_ADDR="127.0.0.1:2018"
export TIKV_COUNT=3
export TIKV_COUNT=6
export TIFLASH_HTTP="127.0.0.1:20292"
export TIKV_PIDS="${TEST_DIR:?}/tikv_pids.txt"

Expand Down