diff --git a/interceptors/retry/backoff.go b/interceptors/retry/backoff.go index 98ef36b18..49a9abe92 100644 --- a/interceptors/retry/backoff.go +++ b/interceptors/retry/backoff.go @@ -4,9 +4,8 @@ package retry import ( + "math/rand" "time" - - "github.com/grpc-ecosystem/go-grpc-middleware/v2/util/backoffutils" ) // BackoffLinear is very simple: it waits for a fixed period of time between calls. @@ -16,22 +15,33 @@ func BackoffLinear(waitBetween time.Duration) BackoffFunc { } } +// jitterUp adds random jitter to the duration. +// This adds or subtracts time from the duration within a given jitter fraction. +// For example for 10s and jitter 0.1, it will return a time within [9s, 11s]) +func jitterUp(duration time.Duration, jitter float64) time.Duration { + multiplier := jitter * (rand.Float64()*2 - 1) + return time.Duration(float64(duration) * (1 + multiplier)) +} + +// exponentBase2 computes 2^(a-1) where a >= 1. If a is 0, the result is 0. +func exponentBase2(a uint) uint { + return (1 << a) >> 1 +} + // BackoffLinearWithJitter waits a set period of time, allowing for jitter (fractional adjustment). -// // For example waitBetween=1s and jitter=0.10 can generate waits between 900ms and 1100ms. func BackoffLinearWithJitter(waitBetween time.Duration, jitterFraction float64) BackoffFunc { return func(attempt uint) time.Duration { - return backoffutils.JitterUp(waitBetween, jitterFraction) + return jitterUp(waitBetween, jitterFraction) } } // BackoffExponential produces increasing intervals for each attempt. -// // The scalar is multiplied times 2 raised to the current attempt. So the first // retry with a scalar of 100ms is 100ms, while the 5th attempt would be 1.6s. func BackoffExponential(scalar time.Duration) BackoffFunc { return func(attempt uint) time.Duration { - return scalar * time.Duration(backoffutils.ExponentBase2(attempt)) + return scalar * time.Duration(exponentBase2(attempt)) } } @@ -39,6 +49,6 @@ func BackoffExponential(scalar time.Duration) BackoffFunc { // BackoffExponential does, but adds jitter. func BackoffExponentialWithJitter(scalar time.Duration, jitterFraction float64) BackoffFunc { return func(attempt uint) time.Duration { - return backoffutils.JitterUp(scalar*time.Duration(backoffutils.ExponentBase2(attempt)), jitterFraction) + return jitterUp(scalar*time.Duration(exponentBase2(attempt)), jitterFraction) } } diff --git a/interceptors/retry/doc.go b/interceptors/retry/doc.go index 66813c091..6f132c6b0 100644 --- a/interceptors/retry/doc.go +++ b/interceptors/retry/doc.go @@ -2,7 +2,7 @@ // Licensed under the Apache License 2.0. /* -`retry` provides client-side request retry logic for gRPC. +Package retry provides client-side request retry logic for gRPC. Client-Side Request Retry Interceptor diff --git a/interceptors/retry/examples_test.go b/interceptors/retry/examples_test.go index 321ce658f..ee53f205c 100644 --- a/interceptors/retry/examples_test.go +++ b/interceptors/retry/examples_test.go @@ -1,7 +1,7 @@ // Copyright (c) The go-grpc-middleware Authors. // Licensed under the Apache License 2.0. -package retry_test +package retry import ( "context" @@ -11,7 +11,6 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/codes" - "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/retry" "github.com/grpc-ecosystem/go-grpc-middleware/v2/testing/testpb" ) @@ -20,20 +19,20 @@ var cc *grpc.ClientConn // Simple example of using the default interceptor configuration. func Example_initialization() { _, _ = grpc.Dial("myservice.example.com", - grpc.WithStreamInterceptor(retry.StreamClientInterceptor()), - grpc.WithUnaryInterceptor(retry.UnaryClientInterceptor()), + grpc.WithStreamInterceptor(StreamClientInterceptor()), + grpc.WithUnaryInterceptor(UnaryClientInterceptor()), ) } // Complex example with a 100ms linear backoff interval, and retry only on NotFound and Unavailable. func Example_initializationWithOptions() { - opts := []retry.CallOption{ - retry.WithBackoff(retry.BackoffLinear(100 * time.Millisecond)), - retry.WithCodes(codes.NotFound, codes.Aborted), + opts := []CallOption{ + WithBackoff(BackoffLinear(100 * time.Millisecond)), + WithCodes(codes.NotFound, codes.Aborted), } _, _ = grpc.Dial("myservice.example.com", - grpc.WithStreamInterceptor(retry.StreamClientInterceptor(opts...)), - grpc.WithUnaryInterceptor(retry.UnaryClientInterceptor(opts...)), + grpc.WithStreamInterceptor(StreamClientInterceptor(opts...)), + grpc.WithUnaryInterceptor(UnaryClientInterceptor(opts...)), ) } @@ -41,12 +40,12 @@ func Example_initializationWithOptions() { // // Each next interval is the previous interval multiplied by 2. func Example_initializationWithExponentialBackoff() { - opts := []retry.CallOption{ - retry.WithBackoff(retry.BackoffExponential(100 * time.Millisecond)), + opts := []CallOption{ + WithBackoff(BackoffExponential(100 * time.Millisecond)), } _, _ = grpc.Dial("myservice.example.com", - grpc.WithStreamInterceptor(retry.StreamClientInterceptor(opts...)), - grpc.WithUnaryInterceptor(retry.UnaryClientInterceptor(opts...)), + grpc.WithStreamInterceptor(StreamClientInterceptor(opts...)), + grpc.WithUnaryInterceptor(UnaryClientInterceptor(opts...)), ) } @@ -56,7 +55,7 @@ func Example_simpleCall() { defer cancel() client := testpb.NewTestServiceClient(cc) - stream, _ := client.PingList(ctx, &testpb.PingListRequest{}, retry.WithMax(3)) + stream, _ := client.PingList(ctx, &testpb.PingListRequest{}, WithMax(3)) for { _, err := stream.Recv() // retries happen here @@ -85,6 +84,11 @@ func ExampleWithPerRetryTimeout() { _, _ = client.Ping( ctx, &testpb.PingRequest{}, - retry.WithMax(3), - retry.WithPerRetryTimeout(1*time.Second)) + WithMax(3), + WithPerRetryTimeout(1*time.Second)) +} + +// Scale duration by a factor. +func scaleDuration(d time.Duration, factor float64) time.Duration { + return time.Duration(float64(d) * factor) } diff --git a/interceptors/retry/retry_test.go b/interceptors/retry/retry_test.go index a7e29b86d..bcff27572 100644 --- a/interceptors/retry/retry_test.go +++ b/interceptors/retry/retry_test.go @@ -1,7 +1,7 @@ // Copyright (c) The go-grpc-middleware Authors. // Licensed under the Apache License 2.0. -package retry_test +package retry import ( "context" @@ -18,7 +18,6 @@ import ( "google.golang.org/grpc/status" middleware "github.com/grpc-ecosystem/go-grpc-middleware/v2" - "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/retry" "github.com/grpc-ecosystem/go-grpc-middleware/v2/testing/testpb" ) @@ -94,15 +93,15 @@ func TestRetrySuite(t *testing.T) { service := &failingService{ TestServiceServer: &testpb.TestPingService{T: t}, } - unaryInterceptor := retry.UnaryClientInterceptor( - retry.WithCodes(retriableErrors...), - retry.WithMax(3), - retry.WithBackoff(retry.BackoffLinear(retryTimeout)), + unaryInterceptor := UnaryClientInterceptor( + WithCodes(retriableErrors...), + WithMax(3), + WithBackoff(BackoffLinear(retryTimeout)), ) - streamInterceptor := retry.StreamClientInterceptor( - retry.WithCodes(retriableErrors...), - retry.WithMax(3), - retry.WithBackoff(retry.BackoffLinear(retryTimeout)), + streamInterceptor := StreamClientInterceptor( + WithCodes(retriableErrors...), + WithMax(3), + WithBackoff(BackoffLinear(retryTimeout)), ) s := &RetrySuite{ srv: service, @@ -148,10 +147,10 @@ func (s *RetrySuite) TestCallOptionsDontPanicWithoutInterceptor() { s.srv.resetFailingConfiguration(100, codes.DataLoss, noSleep) // doesn't matter all requests should fail nonMiddlewareClient := s.NewClient() _, err := nonMiddlewareClient.Ping(s.SimpleCtx(), testpb.GoodPing, - retry.WithMax(5), - retry.WithBackoff(retry.BackoffLinear(1*time.Millisecond)), - retry.WithCodes(codes.DataLoss), - retry.WithPerRetryTimeout(1*time.Millisecond), + WithMax(5), + WithBackoff(BackoffLinear(1*time.Millisecond)), + WithCodes(codes.DataLoss), + WithPerRetryTimeout(1*time.Millisecond), ) require.Error(s.T(), err) } @@ -175,7 +174,7 @@ func (s *RetrySuite) TestUnary_SucceedsOnRetriableError() { func (s *RetrySuite) TestUnary_OverrideFromDialOpts() { s.srv.resetFailingConfiguration(5, codes.ResourceExhausted, noSleep) // default is 3 and retriable_errors - out, err := s.Client.Ping(s.SimpleCtx(), testpb.GoodPing, retry.WithCodes(codes.ResourceExhausted), retry.WithMax(5)) + out, err := s.Client.Ping(s.SimpleCtx(), testpb.GoodPing, WithCodes(codes.ResourceExhausted), WithMax(5)) require.NoError(s.T(), err, "the fifth invocation should succeed") require.NotNil(s.T(), out, "Pong must be not nil") require.EqualValues(s.T(), 5, s.srv.requestCount(), "five requests should have been made") @@ -188,8 +187,8 @@ func (s *RetrySuite) TestUnary_PerCallDeadline_Succeeds() { // a retry call with a 5 millisecond deadline. The 5th one doesn't sleep and succeeds. deadlinePerCall := 5 * time.Millisecond s.srv.resetFailingConfiguration(5, codes.NotFound, 2*deadlinePerCall) - out, err := s.Client.Ping(s.SimpleCtx(), testpb.GoodPing, retry.WithPerRetryTimeout(deadlinePerCall), - retry.WithMax(5)) + out, err := s.Client.Ping(s.SimpleCtx(), testpb.GoodPing, WithPerRetryTimeout(deadlinePerCall), + WithMax(5)) require.NoError(s.T(), err, "the fifth invocation should succeed") require.NotNil(s.T(), out, "Pong must be not nil") require.EqualValues(s.T(), 5, s.srv.requestCount(), "five requests should have been made") @@ -209,8 +208,8 @@ func (s *RetrySuite) TestUnary_PerCallDeadline_FailsOnParent() { s.srv.resetFailingConfiguration(5, codes.NotFound, 2*deadlinePerCall) ctx, cancel := context.WithTimeout(context.TODO(), parentDeadline) defer cancel() - _, err := s.Client.Ping(ctx, testpb.GoodPing, retry.WithPerRetryTimeout(deadlinePerCall), - retry.WithMax(5)) + _, err := s.Client.Ping(ctx, testpb.GoodPing, WithPerRetryTimeout(deadlinePerCall), + WithMax(5)) require.Error(s.T(), err, "the retries must fail due to context deadline exceeded") require.Equal(s.T(), codes.DeadlineExceeded, status.Code(err), "failre code must be a gRPC error of Deadline class") } @@ -220,7 +219,7 @@ func (s *RetrySuite) TestUnary_OnRetryCallbackCalled() { s.srv.resetFailingConfiguration(3, codes.Unavailable, noSleep) // see retriable_errors out, err := s.Client.Ping(s.SimpleCtx(), testpb.GoodPing, - retry.WithOnRetryCallback(func(ctx context.Context, attempt uint, err error) { + WithOnRetryCallback(func(ctx context.Context, attempt uint, err error) { retryCallbackCount++ }), ) @@ -240,7 +239,7 @@ func (s *RetrySuite) TestServerStream_SucceedsOnRetriableError() { func (s *RetrySuite) TestServerStream_OverrideFromContext() { s.srv.resetFailingConfiguration(5, codes.ResourceExhausted, noSleep) // default is 3 and retriable_errors - stream, err := s.Client.PingList(s.SimpleCtx(), testpb.GoodPingList, retry.WithCodes(codes.ResourceExhausted), retry.WithMax(5)) + stream, err := s.Client.PingList(s.SimpleCtx(), testpb.GoodPingList, WithCodes(codes.ResourceExhausted), WithMax(5)) require.NoError(s.T(), err, "establishing the connection must always succeed") s.assertPingListWasCorrect(stream) require.EqualValues(s.T(), 5, s.srv.requestCount(), "three requests should have been made") @@ -253,8 +252,8 @@ func (s *RetrySuite) TestServerStream_PerCallDeadline_Succeeds() { // a retry call with a 50 millisecond deadline. The 5th one doesn't sleep and succeeds. deadlinePerCall := 100 * time.Millisecond s.srv.resetFailingConfiguration(5, codes.NotFound, 2*deadlinePerCall) - stream, err := s.Client.PingList(s.SimpleCtx(), testpb.GoodPingList, retry.WithPerRetryTimeout(deadlinePerCall), - retry.WithMax(5)) + stream, err := s.Client.PingList(s.SimpleCtx(), testpb.GoodPingList, WithPerRetryTimeout(deadlinePerCall), + WithMax(5)) require.NoError(s.T(), err, "establishing the connection must always succeed") s.assertPingListWasCorrect(stream) require.EqualValues(s.T(), 5, s.srv.requestCount(), "three requests should have been made") @@ -274,8 +273,8 @@ func (s *RetrySuite) TestServerStream_PerCallDeadline_FailsOnParent() { s.srv.resetFailingConfiguration(5, codes.NotFound, 2*deadlinePerCall) parentCtx, cancel := context.WithTimeout(context.TODO(), parentDeadline) defer cancel() - stream, err := s.Client.PingList(parentCtx, testpb.GoodPingList, retry.WithPerRetryTimeout(deadlinePerCall), - retry.WithMax(5)) + stream, err := s.Client.PingList(parentCtx, testpb.GoodPingList, WithPerRetryTimeout(deadlinePerCall), + WithMax(5)) require.NoError(s.T(), err, "establishing the connection must always succeed") _, err = stream.Recv() require.Equal(s.T(), codes.DeadlineExceeded, status.Code(err), "failre code must be a gRPC error of Deadline class") @@ -286,7 +285,7 @@ func (s *RetrySuite) TestServerStream_OnRetryCallbackCalled() { s.srv.resetFailingConfiguration(3, codes.Unavailable, noSleep) // see retriable_errors stream, err := s.Client.PingList(s.SimpleCtx(), testpb.GoodPingList, - retry.WithOnRetryCallback(func(ctx context.Context, attempt uint, err error) { + WithOnRetryCallback(func(ctx context.Context, attempt uint, err error) { retryCallbackCount++ }), ) @@ -322,7 +321,7 @@ func (s *RetrySuite) TestServerStream_CallRetrySucceeds() { restarted := s.RestartServer(retryTimeout) _, err := s.Client.PingList(s.SimpleCtx(), testpb.GoodPingList, - retry.WithMax(40), + WithMax(40), ) assert.NoError(s.T(), err, "establishing the connection should succeed") @@ -371,8 +370,8 @@ func TestChainedRetrySuite(t *testing.T) { InterceptorTestSuite: &testpb.InterceptorTestSuite{ TestService: service, ClientOpts: []grpc.DialOption{ - grpc.WithUnaryInterceptor(middleware.ChainUnaryClient(preRetryInterceptor.UnaryClientInterceptor, retry.UnaryClientInterceptor(), postRetryInterceptor.UnaryClientInterceptor)), - grpc.WithStreamInterceptor(middleware.ChainStreamClient(preRetryInterceptor.StreamClientInterceptor, retry.StreamClientInterceptor(), postRetryInterceptor.StreamClientInterceptor)), + grpc.WithUnaryInterceptor(middleware.ChainUnaryClient(preRetryInterceptor.UnaryClientInterceptor, UnaryClientInterceptor(), postRetryInterceptor.UnaryClientInterceptor)), + grpc.WithStreamInterceptor(middleware.ChainStreamClient(preRetryInterceptor.StreamClientInterceptor, StreamClientInterceptor(), postRetryInterceptor.StreamClientInterceptor)), }, }, } @@ -393,7 +392,7 @@ func (s *ChainedRetrySuite) SetupTest() { } func (s *ChainedRetrySuite) TestUnaryWithChainedInterceptors_NoFailure() { - _, err := s.Client.Ping(s.SimpleCtx(), testpb.GoodPing, retry.WithMax(2)) + _, err := s.Client.Ping(s.SimpleCtx(), testpb.GoodPing, WithMax(2)) require.NoError(s.T(), err, "the invocation should succeed") require.EqualValues(s.T(), 1, s.srv.requestCount(), "one request should have been made") require.EqualValues(s.T(), 1, s.preRetryInterceptor.called, "pre-retry interceptor should be called once") @@ -402,7 +401,7 @@ func (s *ChainedRetrySuite) TestUnaryWithChainedInterceptors_NoFailure() { func (s *ChainedRetrySuite) TestUnaryWithChainedInterceptors_WithRetry() { s.srv.resetFailingConfiguration(2, codes.Unavailable, noSleep) - _, err := s.Client.Ping(s.SimpleCtx(), testpb.GoodPing, retry.WithMax(2)) + _, err := s.Client.Ping(s.SimpleCtx(), testpb.GoodPing, WithMax(2)) require.NoError(s.T(), err, "the second invocation should succeed") require.EqualValues(s.T(), 2, s.srv.requestCount(), "two requests should have been made") require.EqualValues(s.T(), 1, s.preRetryInterceptor.called, "pre-retry interceptor should be called once") @@ -410,7 +409,7 @@ func (s *ChainedRetrySuite) TestUnaryWithChainedInterceptors_WithRetry() { } func (s *ChainedRetrySuite) TestStreamWithChainedInterceptors_NoFailure() { - stream, err := s.Client.PingList(s.SimpleCtx(), testpb.GoodPingList, retry.WithMax(2)) + stream, err := s.Client.PingList(s.SimpleCtx(), testpb.GoodPingList, WithMax(2)) require.NoError(s.T(), err, "the invocation should succeed") _, err = stream.Recv() require.NoError(s.T(), err, "the Recv should succeed") @@ -421,7 +420,7 @@ func (s *ChainedRetrySuite) TestStreamWithChainedInterceptors_NoFailure() { func (s *ChainedRetrySuite) TestStreamWithChainedInterceptors_WithRetry() { s.srv.resetFailingConfiguration(2, codes.Unavailable, noSleep) - stream, err := s.Client.PingList(s.SimpleCtx(), testpb.GoodPingList, retry.WithMax(2)) + stream, err := s.Client.PingList(s.SimpleCtx(), testpb.GoodPingList, WithMax(2)) require.NoError(s.T(), err, "the second invocation should succeed") _, err = stream.Recv() require.NoError(s.T(), err, "the Recv should succeed") @@ -429,3 +428,34 @@ func (s *ChainedRetrySuite) TestStreamWithChainedInterceptors_WithRetry() { require.EqualValues(s.T(), 1, s.preRetryInterceptor.called, "pre-retry interceptor should be called once") require.EqualValues(s.T(), 2, s.postRetryInterceptor.called, "post-retry interceptor should be called twice") } + +func TestJitterUp(t *testing.T) { + // Arguments to jitterup. + duration := 10 * time.Second + variance := 0.10 + + // Bound to check. + max := 11000 * time.Millisecond + min := 9000 * time.Millisecond + high := scaleDuration(max, 0.98) + low := scaleDuration(min, 1.02) + + highCount := 0 + lowCount := 0 + + for i := 0; i < 1000; i++ { + out := jitterUp(duration, variance) + assert.True(t, out <= max, "value %s must be <= %s", out, max) + assert.True(t, out >= min, "value %s must be >= %s", out, min) + + if out > high { + highCount++ + } + if out < low { + lowCount++ + } + } + + assert.True(t, highCount != 0, "at least one sample should reach to >%s", high) + assert.True(t, lowCount != 0, "at least one sample should to <%s", low) +}