Skip to content

Commit

Permalink
Remove backoffutils and added the files to retry package (#390)
Browse files Browse the repository at this point in the history
* removed backoffutils and added the files to retry package

Signed-off-by: Yash Sharma <[email protected]>

* linting changes

Signed-off-by: Yash Sharma <[email protected]>

* move the jitter up function

Signed-off-by: Yash Sharma <[email protected]>

* renamed exponentbase2 to private

Signed-off-by: Yash Sharma <[email protected]>

* renamed the test package to retry, making it internal

Signed-off-by: Yash Sharma <[email protected]>
  • Loading branch information
yashrsharma44 authored Aug 3, 2021
1 parent 5b8ad84 commit 7d56e76
Show file tree
Hide file tree
Showing 4 changed files with 101 additions and 57 deletions.
24 changes: 17 additions & 7 deletions interceptors/retry/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,8 @@
package retry

import (
"math/rand"
"time"

"github.com/grpc-ecosystem/go-grpc-middleware/v2/util/backoffutils"
)

// BackoffLinear is very simple: it waits for a fixed period of time between calls.
Expand All @@ -16,29 +15,40 @@ func BackoffLinear(waitBetween time.Duration) BackoffFunc {
}
}

// jitterUp adds random jitter to the duration.
// This adds or subtracts time from the duration within a given jitter fraction.
// For example for 10s and jitter 0.1, it will return a time within [9s, 11s])
func jitterUp(duration time.Duration, jitter float64) time.Duration {
multiplier := jitter * (rand.Float64()*2 - 1)
return time.Duration(float64(duration) * (1 + multiplier))
}

// exponentBase2 computes 2^(a-1) where a >= 1. If a is 0, the result is 0.
func exponentBase2(a uint) uint {
return (1 << a) >> 1
}

// BackoffLinearWithJitter waits a set period of time, allowing for jitter (fractional adjustment).
//
// For example waitBetween=1s and jitter=0.10 can generate waits between 900ms and 1100ms.
func BackoffLinearWithJitter(waitBetween time.Duration, jitterFraction float64) BackoffFunc {
return func(attempt uint) time.Duration {
return backoffutils.JitterUp(waitBetween, jitterFraction)
return jitterUp(waitBetween, jitterFraction)
}
}

// BackoffExponential produces increasing intervals for each attempt.
//
// The scalar is multiplied times 2 raised to the current attempt. So the first
// retry with a scalar of 100ms is 100ms, while the 5th attempt would be 1.6s.
func BackoffExponential(scalar time.Duration) BackoffFunc {
return func(attempt uint) time.Duration {
return scalar * time.Duration(backoffutils.ExponentBase2(attempt))
return scalar * time.Duration(exponentBase2(attempt))
}
}

// BackoffExponentialWithJitter creates an exponential backoff like
// BackoffExponential does, but adds jitter.
func BackoffExponentialWithJitter(scalar time.Duration, jitterFraction float64) BackoffFunc {
return func(attempt uint) time.Duration {
return backoffutils.JitterUp(scalar*time.Duration(backoffutils.ExponentBase2(attempt)), jitterFraction)
return jitterUp(scalar*time.Duration(exponentBase2(attempt)), jitterFraction)
}
}
2 changes: 1 addition & 1 deletion interceptors/retry/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// Licensed under the Apache License 2.0.

/*
`retry` provides client-side request retry logic for gRPC.
Package retry provides client-side request retry logic for gRPC.
Client-Side Request Retry Interceptor
Expand Down
36 changes: 20 additions & 16 deletions interceptors/retry/examples_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright (c) The go-grpc-middleware Authors.
// Licensed under the Apache License 2.0.

package retry_test
package retry

import (
"context"
Expand All @@ -11,7 +11,6 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/codes"

"github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/retry"
"github.com/grpc-ecosystem/go-grpc-middleware/v2/testing/testpb"
)

Expand All @@ -20,33 +19,33 @@ var cc *grpc.ClientConn
// Simple example of using the default interceptor configuration.
func Example_initialization() {
_, _ = grpc.Dial("myservice.example.com",
grpc.WithStreamInterceptor(retry.StreamClientInterceptor()),
grpc.WithUnaryInterceptor(retry.UnaryClientInterceptor()),
grpc.WithStreamInterceptor(StreamClientInterceptor()),
grpc.WithUnaryInterceptor(UnaryClientInterceptor()),
)
}

// Complex example with a 100ms linear backoff interval, and retry only on NotFound and Unavailable.
func Example_initializationWithOptions() {
opts := []retry.CallOption{
retry.WithBackoff(retry.BackoffLinear(100 * time.Millisecond)),
retry.WithCodes(codes.NotFound, codes.Aborted),
opts := []CallOption{
WithBackoff(BackoffLinear(100 * time.Millisecond)),
WithCodes(codes.NotFound, codes.Aborted),
}
_, _ = grpc.Dial("myservice.example.com",
grpc.WithStreamInterceptor(retry.StreamClientInterceptor(opts...)),
grpc.WithUnaryInterceptor(retry.UnaryClientInterceptor(opts...)),
grpc.WithStreamInterceptor(StreamClientInterceptor(opts...)),
grpc.WithUnaryInterceptor(UnaryClientInterceptor(opts...)),
)
}

// Example with an exponential backoff starting with 100ms.
//
// Each next interval is the previous interval multiplied by 2.
func Example_initializationWithExponentialBackoff() {
opts := []retry.CallOption{
retry.WithBackoff(retry.BackoffExponential(100 * time.Millisecond)),
opts := []CallOption{
WithBackoff(BackoffExponential(100 * time.Millisecond)),
}
_, _ = grpc.Dial("myservice.example.com",
grpc.WithStreamInterceptor(retry.StreamClientInterceptor(opts...)),
grpc.WithUnaryInterceptor(retry.UnaryClientInterceptor(opts...)),
grpc.WithStreamInterceptor(StreamClientInterceptor(opts...)),
grpc.WithUnaryInterceptor(UnaryClientInterceptor(opts...)),
)
}

Expand All @@ -56,7 +55,7 @@ func Example_simpleCall() {
defer cancel()

client := testpb.NewTestServiceClient(cc)
stream, _ := client.PingList(ctx, &testpb.PingListRequest{}, retry.WithMax(3))
stream, _ := client.PingList(ctx, &testpb.PingListRequest{}, WithMax(3))

for {
_, err := stream.Recv() // retries happen here
Expand Down Expand Up @@ -85,6 +84,11 @@ func ExampleWithPerRetryTimeout() {
_, _ = client.Ping(
ctx,
&testpb.PingRequest{},
retry.WithMax(3),
retry.WithPerRetryTimeout(1*time.Second))
WithMax(3),
WithPerRetryTimeout(1*time.Second))
}

// Scale duration by a factor.
func scaleDuration(d time.Duration, factor float64) time.Duration {
return time.Duration(float64(d) * factor)
}
96 changes: 63 additions & 33 deletions interceptors/retry/retry_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright (c) The go-grpc-middleware Authors.
// Licensed under the Apache License 2.0.

package retry_test
package retry

import (
"context"
Expand All @@ -18,7 +18,6 @@ import (
"google.golang.org/grpc/status"

middleware "github.com/grpc-ecosystem/go-grpc-middleware/v2"
"github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/retry"
"github.com/grpc-ecosystem/go-grpc-middleware/v2/testing/testpb"
)

Expand Down Expand Up @@ -94,15 +93,15 @@ func TestRetrySuite(t *testing.T) {
service := &failingService{
TestServiceServer: &testpb.TestPingService{T: t},
}
unaryInterceptor := retry.UnaryClientInterceptor(
retry.WithCodes(retriableErrors...),
retry.WithMax(3),
retry.WithBackoff(retry.BackoffLinear(retryTimeout)),
unaryInterceptor := UnaryClientInterceptor(
WithCodes(retriableErrors...),
WithMax(3),
WithBackoff(BackoffLinear(retryTimeout)),
)
streamInterceptor := retry.StreamClientInterceptor(
retry.WithCodes(retriableErrors...),
retry.WithMax(3),
retry.WithBackoff(retry.BackoffLinear(retryTimeout)),
streamInterceptor := StreamClientInterceptor(
WithCodes(retriableErrors...),
WithMax(3),
WithBackoff(BackoffLinear(retryTimeout)),
)
s := &RetrySuite{
srv: service,
Expand Down Expand Up @@ -148,10 +147,10 @@ func (s *RetrySuite) TestCallOptionsDontPanicWithoutInterceptor() {
s.srv.resetFailingConfiguration(100, codes.DataLoss, noSleep) // doesn't matter all requests should fail
nonMiddlewareClient := s.NewClient()
_, err := nonMiddlewareClient.Ping(s.SimpleCtx(), testpb.GoodPing,
retry.WithMax(5),
retry.WithBackoff(retry.BackoffLinear(1*time.Millisecond)),
retry.WithCodes(codes.DataLoss),
retry.WithPerRetryTimeout(1*time.Millisecond),
WithMax(5),
WithBackoff(BackoffLinear(1*time.Millisecond)),
WithCodes(codes.DataLoss),
WithPerRetryTimeout(1*time.Millisecond),
)
require.Error(s.T(), err)
}
Expand All @@ -175,7 +174,7 @@ func (s *RetrySuite) TestUnary_SucceedsOnRetriableError() {

func (s *RetrySuite) TestUnary_OverrideFromDialOpts() {
s.srv.resetFailingConfiguration(5, codes.ResourceExhausted, noSleep) // default is 3 and retriable_errors
out, err := s.Client.Ping(s.SimpleCtx(), testpb.GoodPing, retry.WithCodes(codes.ResourceExhausted), retry.WithMax(5))
out, err := s.Client.Ping(s.SimpleCtx(), testpb.GoodPing, WithCodes(codes.ResourceExhausted), WithMax(5))
require.NoError(s.T(), err, "the fifth invocation should succeed")
require.NotNil(s.T(), out, "Pong must be not nil")
require.EqualValues(s.T(), 5, s.srv.requestCount(), "five requests should have been made")
Expand All @@ -188,8 +187,8 @@ func (s *RetrySuite) TestUnary_PerCallDeadline_Succeeds() {
// a retry call with a 5 millisecond deadline. The 5th one doesn't sleep and succeeds.
deadlinePerCall := 5 * time.Millisecond
s.srv.resetFailingConfiguration(5, codes.NotFound, 2*deadlinePerCall)
out, err := s.Client.Ping(s.SimpleCtx(), testpb.GoodPing, retry.WithPerRetryTimeout(deadlinePerCall),
retry.WithMax(5))
out, err := s.Client.Ping(s.SimpleCtx(), testpb.GoodPing, WithPerRetryTimeout(deadlinePerCall),
WithMax(5))
require.NoError(s.T(), err, "the fifth invocation should succeed")
require.NotNil(s.T(), out, "Pong must be not nil")
require.EqualValues(s.T(), 5, s.srv.requestCount(), "five requests should have been made")
Expand All @@ -209,8 +208,8 @@ func (s *RetrySuite) TestUnary_PerCallDeadline_FailsOnParent() {
s.srv.resetFailingConfiguration(5, codes.NotFound, 2*deadlinePerCall)
ctx, cancel := context.WithTimeout(context.TODO(), parentDeadline)
defer cancel()
_, err := s.Client.Ping(ctx, testpb.GoodPing, retry.WithPerRetryTimeout(deadlinePerCall),
retry.WithMax(5))
_, err := s.Client.Ping(ctx, testpb.GoodPing, WithPerRetryTimeout(deadlinePerCall),
WithMax(5))
require.Error(s.T(), err, "the retries must fail due to context deadline exceeded")
require.Equal(s.T(), codes.DeadlineExceeded, status.Code(err), "failre code must be a gRPC error of Deadline class")
}
Expand All @@ -220,7 +219,7 @@ func (s *RetrySuite) TestUnary_OnRetryCallbackCalled() {

s.srv.resetFailingConfiguration(3, codes.Unavailable, noSleep) // see retriable_errors
out, err := s.Client.Ping(s.SimpleCtx(), testpb.GoodPing,
retry.WithOnRetryCallback(func(ctx context.Context, attempt uint, err error) {
WithOnRetryCallback(func(ctx context.Context, attempt uint, err error) {
retryCallbackCount++
}),
)
Expand All @@ -240,7 +239,7 @@ func (s *RetrySuite) TestServerStream_SucceedsOnRetriableError() {

func (s *RetrySuite) TestServerStream_OverrideFromContext() {
s.srv.resetFailingConfiguration(5, codes.ResourceExhausted, noSleep) // default is 3 and retriable_errors
stream, err := s.Client.PingList(s.SimpleCtx(), testpb.GoodPingList, retry.WithCodes(codes.ResourceExhausted), retry.WithMax(5))
stream, err := s.Client.PingList(s.SimpleCtx(), testpb.GoodPingList, WithCodes(codes.ResourceExhausted), WithMax(5))
require.NoError(s.T(), err, "establishing the connection must always succeed")
s.assertPingListWasCorrect(stream)
require.EqualValues(s.T(), 5, s.srv.requestCount(), "three requests should have been made")
Expand All @@ -253,8 +252,8 @@ func (s *RetrySuite) TestServerStream_PerCallDeadline_Succeeds() {
// a retry call with a 50 millisecond deadline. The 5th one doesn't sleep and succeeds.
deadlinePerCall := 100 * time.Millisecond
s.srv.resetFailingConfiguration(5, codes.NotFound, 2*deadlinePerCall)
stream, err := s.Client.PingList(s.SimpleCtx(), testpb.GoodPingList, retry.WithPerRetryTimeout(deadlinePerCall),
retry.WithMax(5))
stream, err := s.Client.PingList(s.SimpleCtx(), testpb.GoodPingList, WithPerRetryTimeout(deadlinePerCall),
WithMax(5))
require.NoError(s.T(), err, "establishing the connection must always succeed")
s.assertPingListWasCorrect(stream)
require.EqualValues(s.T(), 5, s.srv.requestCount(), "three requests should have been made")
Expand All @@ -274,8 +273,8 @@ func (s *RetrySuite) TestServerStream_PerCallDeadline_FailsOnParent() {
s.srv.resetFailingConfiguration(5, codes.NotFound, 2*deadlinePerCall)
parentCtx, cancel := context.WithTimeout(context.TODO(), parentDeadline)
defer cancel()
stream, err := s.Client.PingList(parentCtx, testpb.GoodPingList, retry.WithPerRetryTimeout(deadlinePerCall),
retry.WithMax(5))
stream, err := s.Client.PingList(parentCtx, testpb.GoodPingList, WithPerRetryTimeout(deadlinePerCall),
WithMax(5))
require.NoError(s.T(), err, "establishing the connection must always succeed")
_, err = stream.Recv()
require.Equal(s.T(), codes.DeadlineExceeded, status.Code(err), "failre code must be a gRPC error of Deadline class")
Expand All @@ -286,7 +285,7 @@ func (s *RetrySuite) TestServerStream_OnRetryCallbackCalled() {

s.srv.resetFailingConfiguration(3, codes.Unavailable, noSleep) // see retriable_errors
stream, err := s.Client.PingList(s.SimpleCtx(), testpb.GoodPingList,
retry.WithOnRetryCallback(func(ctx context.Context, attempt uint, err error) {
WithOnRetryCallback(func(ctx context.Context, attempt uint, err error) {
retryCallbackCount++
}),
)
Expand Down Expand Up @@ -322,7 +321,7 @@ func (s *RetrySuite) TestServerStream_CallRetrySucceeds() {
restarted := s.RestartServer(retryTimeout)

_, err := s.Client.PingList(s.SimpleCtx(), testpb.GoodPingList,
retry.WithMax(40),
WithMax(40),
)

assert.NoError(s.T(), err, "establishing the connection should succeed")
Expand Down Expand Up @@ -371,8 +370,8 @@ func TestChainedRetrySuite(t *testing.T) {
InterceptorTestSuite: &testpb.InterceptorTestSuite{
TestService: service,
ClientOpts: []grpc.DialOption{
grpc.WithUnaryInterceptor(middleware.ChainUnaryClient(preRetryInterceptor.UnaryClientInterceptor, retry.UnaryClientInterceptor(), postRetryInterceptor.UnaryClientInterceptor)),
grpc.WithStreamInterceptor(middleware.ChainStreamClient(preRetryInterceptor.StreamClientInterceptor, retry.StreamClientInterceptor(), postRetryInterceptor.StreamClientInterceptor)),
grpc.WithUnaryInterceptor(middleware.ChainUnaryClient(preRetryInterceptor.UnaryClientInterceptor, UnaryClientInterceptor(), postRetryInterceptor.UnaryClientInterceptor)),
grpc.WithStreamInterceptor(middleware.ChainStreamClient(preRetryInterceptor.StreamClientInterceptor, StreamClientInterceptor(), postRetryInterceptor.StreamClientInterceptor)),
},
},
}
Expand All @@ -393,7 +392,7 @@ func (s *ChainedRetrySuite) SetupTest() {
}

func (s *ChainedRetrySuite) TestUnaryWithChainedInterceptors_NoFailure() {
_, err := s.Client.Ping(s.SimpleCtx(), testpb.GoodPing, retry.WithMax(2))
_, err := s.Client.Ping(s.SimpleCtx(), testpb.GoodPing, WithMax(2))
require.NoError(s.T(), err, "the invocation should succeed")
require.EqualValues(s.T(), 1, s.srv.requestCount(), "one request should have been made")
require.EqualValues(s.T(), 1, s.preRetryInterceptor.called, "pre-retry interceptor should be called once")
Expand All @@ -402,15 +401,15 @@ func (s *ChainedRetrySuite) TestUnaryWithChainedInterceptors_NoFailure() {

func (s *ChainedRetrySuite) TestUnaryWithChainedInterceptors_WithRetry() {
s.srv.resetFailingConfiguration(2, codes.Unavailable, noSleep)
_, err := s.Client.Ping(s.SimpleCtx(), testpb.GoodPing, retry.WithMax(2))
_, err := s.Client.Ping(s.SimpleCtx(), testpb.GoodPing, WithMax(2))
require.NoError(s.T(), err, "the second invocation should succeed")
require.EqualValues(s.T(), 2, s.srv.requestCount(), "two requests should have been made")
require.EqualValues(s.T(), 1, s.preRetryInterceptor.called, "pre-retry interceptor should be called once")
require.EqualValues(s.T(), 2, s.postRetryInterceptor.called, "post-retry interceptor should be called twice")
}

func (s *ChainedRetrySuite) TestStreamWithChainedInterceptors_NoFailure() {
stream, err := s.Client.PingList(s.SimpleCtx(), testpb.GoodPingList, retry.WithMax(2))
stream, err := s.Client.PingList(s.SimpleCtx(), testpb.GoodPingList, WithMax(2))
require.NoError(s.T(), err, "the invocation should succeed")
_, err = stream.Recv()
require.NoError(s.T(), err, "the Recv should succeed")
Expand All @@ -421,11 +420,42 @@ func (s *ChainedRetrySuite) TestStreamWithChainedInterceptors_NoFailure() {

func (s *ChainedRetrySuite) TestStreamWithChainedInterceptors_WithRetry() {
s.srv.resetFailingConfiguration(2, codes.Unavailable, noSleep)
stream, err := s.Client.PingList(s.SimpleCtx(), testpb.GoodPingList, retry.WithMax(2))
stream, err := s.Client.PingList(s.SimpleCtx(), testpb.GoodPingList, WithMax(2))
require.NoError(s.T(), err, "the second invocation should succeed")
_, err = stream.Recv()
require.NoError(s.T(), err, "the Recv should succeed")
require.EqualValues(s.T(), 2, s.srv.requestCount(), "two requests should have been made")
require.EqualValues(s.T(), 1, s.preRetryInterceptor.called, "pre-retry interceptor should be called once")
require.EqualValues(s.T(), 2, s.postRetryInterceptor.called, "post-retry interceptor should be called twice")
}

func TestJitterUp(t *testing.T) {
// Arguments to jitterup.
duration := 10 * time.Second
variance := 0.10

// Bound to check.
max := 11000 * time.Millisecond
min := 9000 * time.Millisecond
high := scaleDuration(max, 0.98)
low := scaleDuration(min, 1.02)

highCount := 0
lowCount := 0

for i := 0; i < 1000; i++ {
out := jitterUp(duration, variance)
assert.True(t, out <= max, "value %s must be <= %s", out, max)
assert.True(t, out >= min, "value %s must be >= %s", out, min)

if out > high {
highCount++
}
if out < low {
lowCount++
}
}

assert.True(t, highCount != 0, "at least one sample should reach to >%s", high)
assert.True(t, lowCount != 0, "at least one sample should to <%s", low)
}

0 comments on commit 7d56e76

Please sign in to comment.