diff --git a/br/pkg/conn/BUILD.bazel b/br/pkg/conn/BUILD.bazel index fc88f174394f3..f6065e6757506 100644 --- a/br/pkg/conn/BUILD.bazel +++ b/br/pkg/conn/BUILD.bazel @@ -57,5 +57,6 @@ go_test( "@org_golang_google_grpc//codes", "@org_golang_google_grpc//status", "@org_uber_go_goleak//:goleak", + "@org_uber_go_multierr//:multierr", ], ) diff --git a/br/pkg/conn/conn.go b/br/pkg/conn/conn.go index 9cb3c4dc1c6fa..38461c4dd5652 100644 --- a/br/pkg/conn/conn.go +++ b/br/pkg/conn/conn.go @@ -81,16 +81,20 @@ func GetAllTiKVStoresWithRetry(ctx context.Context, func() error { stores, err = util.GetAllTiKVStores(ctx, pdClient, storeBehavior) failpoint.Inject("hint-GetAllTiKVStores-error", func(val failpoint.Value) { + logutil.CL(ctx).Debug("failpoint hint-GetAllTiKVStores-error injected.") if val.(bool) { - logutil.CL(ctx).Debug("failpoint hint-GetAllTiKVStores-error injected.") err = status.Error(codes.Unknown, "Retryable error") + } else { + err = context.Canceled } }) failpoint.Inject("hint-GetAllTiKVStores-cancel", func(val failpoint.Value) { + logutil.CL(ctx).Debug("failpoint hint-GetAllTiKVStores-cancel injected.") if val.(bool) { - logutil.CL(ctx).Debug("failpoint hint-GetAllTiKVStores-cancel injected.") err = status.Error(codes.Canceled, "Cancel Retry") + } else { + err = context.Canceled } }) diff --git a/br/pkg/conn/conn_test.go b/br/pkg/conn/conn_test.go index fc822fac123d9..d9544cb32eafc 100644 --- a/br/pkg/conn/conn_test.go +++ b/br/pkg/conn/conn_test.go @@ -18,12 +18,13 @@ import ( "github.com/pingcap/tidb/br/pkg/pdutil" "github.com/pingcap/tidb/br/pkg/utils" "github.com/stretchr/testify/require" + "go.uber.org/multierr" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) func TestGetAllTiKVStoresWithRetryCancel(t *testing.T) { - _ = failpoint.Enable("github.com/pingcap/tidb/br/pkg/conn/hint-GetAllTiKVStores-cancel", "return(true)") + _ = failpoint.Enable("github.com/pingcap/tidb/br/pkg/conn/hint-GetAllTiKVStores-cancel", "1*return(true)->1*return(false)") defer func() { _ = failpoint.Disable("github.com/pingcap/tidb/br/pkg/conn/hint-GetAllTiKVStores-cancel") }() @@ -59,11 +60,13 @@ func TestGetAllTiKVStoresWithRetryCancel(t *testing.T) { _, err := GetAllTiKVStoresWithRetry(ctx, fpdc, util.SkipTiFlash) require.Error(t, err) - require.Equal(t, codes.Canceled, status.Code(errors.Cause(err))) + errs := multierr.Errors(err) + require.Equal(t, 2, len(errs)) + require.Equal(t, codes.Canceled, status.Code(errors.Cause(errs[0]))) } func TestGetAllTiKVStoresWithUnknown(t *testing.T) { - _ = failpoint.Enable("github.com/pingcap/tidb/br/pkg/conn/hint-GetAllTiKVStores-error", "return(true)") + _ = failpoint.Enable("github.com/pingcap/tidb/br/pkg/conn/hint-GetAllTiKVStores-error", "1*return(true)->1*return(false)") defer func() { _ = failpoint.Disable("github.com/pingcap/tidb/br/pkg/conn/hint-GetAllTiKVStores-error") }() @@ -99,7 +102,9 @@ func TestGetAllTiKVStoresWithUnknown(t *testing.T) { _, err := GetAllTiKVStoresWithRetry(ctx, fpdc, util.SkipTiFlash) require.Error(t, err) - require.Equal(t, codes.Unknown, status.Code(errors.Cause(err))) + errs := multierr.Errors(err) + require.Equal(t, 2, len(errs)) + require.Equal(t, codes.Unknown, status.Code(errors.Cause(errs[0]))) } func TestCheckStoresAlive(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) @@ -397,14 +402,18 @@ func TestGetMergeRegionSizeAndCount(t *testing.T) { }, } - ctx := context.Background() + pctx := context.Background() for _, ca := range cases { + ctx, cancel := context.WithCancel(pctx) pdCli := utils.FakePDClient{Stores: ca.stores} require.Equal(t, len(ca.content), len(ca.stores)) count := 0 mockServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { switch strings.TrimSpace(r.URL.Path) { case "/config": + if len(ca.content[count]) == 0 { + cancel() + } _, _ = fmt.Fprint(w, ca.content[count]) default: http.NotFoundHandler().ServeHTTP(w, r) diff --git a/br/pkg/restore/client.go b/br/pkg/restore/client.go index 21e92c0a0e694..cadd5ad40d226 100644 --- a/br/pkg/restore/client.go +++ b/br/pkg/restore/client.go @@ -1170,12 +1170,14 @@ func (rc *Client) RestoreSSTFiles( for rangeFiles, leftFiles = drainFilesByRange(files, rc.fileImporter.supportMultiIngest); len(rangeFiles) != 0; rangeFiles, leftFiles = drainFilesByRange(leftFiles, rc.fileImporter.supportMultiIngest) { filesReplica := rangeFiles rc.workerPool.ApplyOnErrorGroup(eg, - func() error { + func() (err error) { fileStart := time.Now() defer func() { - log.Info("import files done", logutil.Files(filesReplica), - zap.Duration("take", time.Since(fileStart))) - updateCh.Inc() + if err == nil { + log.Info("import files done", logutil.Files(filesReplica), + zap.Duration("take", time.Since(fileStart))) + updateCh.Inc() + } }() return rc.fileImporter.ImportSSTFiles(ectx, filesReplica, rewriteRules, rc.cipher, rc.backupMeta.ApiVersion) }) diff --git a/br/pkg/utils/backoff.go b/br/pkg/utils/backoff.go index d0a21d92b45e8..6d6120a0eea98 100644 --- a/br/pkg/utils/backoff.go +++ b/br/pkg/utils/backoff.go @@ -26,9 +26,9 @@ const ( downloadSSTWaitInterval = 1 * time.Second downloadSSTMaxWaitInterval = 4 * time.Second - resetTSRetryTime = 16 + resetTSRetryTime = 32 resetTSWaitInterval = 50 * time.Millisecond - resetTSMaxWaitInterval = 500 * time.Millisecond + resetTSMaxWaitInterval = 2 * time.Second resetTSRetryTimeExt = 600 resetTSWaitIntervalExt = 500 * time.Millisecond @@ -135,7 +135,6 @@ func NewDownloadSSTBackoffer() Backoffer { } func (bo *importerBackoffer) NextBackoff(err error) time.Duration { - log.Warn("retry to import ssts", zap.Int("attempt", bo.attempt), zap.Error(err)) // we don't care storeID here. res := bo.errContext.HandleErrorMsg(err.Error(), 0) if res.Strategy == RetryStrategy { @@ -209,8 +208,12 @@ func (bo *pdReqBackoffer) NextBackoff(err error) time.Duration { bo.delayTime = 2 * bo.delayTime bo.attempt-- default: + // If the connection timeout, pd client would cancel the context, and return grpc context cancel error. + // So make the codes.Canceled retryable too. + // It's OK to retry the grpc context cancel error, because the parent context cancel returns context.Canceled. + // For example, cancel the `ectx` and then pdClient.GetTS(ectx) returns context.Canceled instead of grpc context canceled. switch status.Code(e) { - case codes.DeadlineExceeded, codes.NotFound, codes.AlreadyExists, codes.PermissionDenied, codes.ResourceExhausted, codes.Aborted, codes.OutOfRange, codes.Unavailable, codes.DataLoss, codes.Unknown: + case codes.DeadlineExceeded, codes.Canceled, codes.NotFound, codes.AlreadyExists, codes.PermissionDenied, codes.ResourceExhausted, codes.Aborted, codes.OutOfRange, codes.Unavailable, codes.DataLoss, codes.Unknown: bo.delayTime = 2 * bo.delayTime bo.attempt-- default: diff --git a/br/pkg/utils/backoff_test.go b/br/pkg/utils/backoff_test.go index dc09826fd7806..aa5d048e90890 100644 --- a/br/pkg/utils/backoff_test.go +++ b/br/pkg/utils/backoff_test.go @@ -123,9 +123,12 @@ func TestPdBackoffWithRetryableError(t *testing.T) { if counter == 2 { return io.EOF } + if counter == 6 { + return context.Canceled + } return gRPCError }, backoffer) - require.Equal(t, 16, counter) + require.Equal(t, 7, counter) require.Equal(t, []error{ gRPCError, gRPCError, @@ -133,16 +136,7 @@ func TestPdBackoffWithRetryableError(t *testing.T) { gRPCError, gRPCError, gRPCError, - gRPCError, - gRPCError, - gRPCError, - gRPCError, - gRPCError, - gRPCError, - gRPCError, - gRPCError, - gRPCError, - gRPCError, + context.Canceled, }, multierr.Errors(err)) } diff --git a/br/tests/br_pitr/run.sh b/br/tests/br_pitr/run.sh index afe400820eb7e..96faac6ef88ae 100644 --- a/br/tests/br_pitr/run.sh +++ b/br/tests/br_pitr/run.sh @@ -104,6 +104,8 @@ check_contains "restore log success summary" check_not_contains "rewrite delete range" echo "" > $res_file echo "check sql result" +run_sql "select * from mysql.gc_delete_range" +run_sql "select * from mysql.gc_delete_range_done" run_sql "select count(*) DELETE_RANGE_CNT from (select * from mysql.gc_delete_range union all select * from mysql.gc_delete_range_done) del_range group by ts order by DELETE_RANGE_CNT desc limit 1;" expect_delete_range=$(($incremental_delete_range_count-$prepare_delete_range_count)) check_contains "DELETE_RANGE_CNT: $expect_delete_range"