Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove backoffutils and added the files to retry package #390

Merged
merged 5 commits into from
Aug 3, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 17 additions & 7 deletions interceptors/retry/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,8 @@
package retry

import (
"math/rand"
"time"

"github.com/grpc-ecosystem/go-grpc-middleware/v2/util/backoffutils"
)

// BackoffLinear is very simple: it waits for a fixed period of time between calls.
Expand All @@ -16,29 +15,40 @@ func BackoffLinear(waitBetween time.Duration) BackoffFunc {
}
}

// jitterUp adds random jitter to the duration.
// This adds or subtracts time from the duration within a given jitter fraction.
// For example for 10s and jitter 0.1, it will return a time within [9s, 11s])
func jitterUp(duration time.Duration, jitter float64) time.Duration {
multiplier := jitter * (rand.Float64()*2 - 1)
return time.Duration(float64(duration) * (1 + multiplier))
}

// exponentBase2 computes 2^(a-1) where a >= 1. If a is 0, the result is 0.
func exponentBase2(a uint) uint {
return (1 << a) >> 1
}

// BackoffLinearWithJitter waits a set period of time, allowing for jitter (fractional adjustment).
//
// For example waitBetween=1s and jitter=0.10 can generate waits between 900ms and 1100ms.
func BackoffLinearWithJitter(waitBetween time.Duration, jitterFraction float64) BackoffFunc {
return func(attempt uint) time.Duration {
return backoffutils.JitterUp(waitBetween, jitterFraction)
return jitterUp(waitBetween, jitterFraction)
}
}

// BackoffExponential produces increasing intervals for each attempt.
//
// The scalar is multiplied times 2 raised to the current attempt. So the first
// retry with a scalar of 100ms is 100ms, while the 5th attempt would be 1.6s.
func BackoffExponential(scalar time.Duration) BackoffFunc {
return func(attempt uint) time.Duration {
return scalar * time.Duration(backoffutils.ExponentBase2(attempt))
return scalar * time.Duration(exponentBase2(attempt))
}
}

// BackoffExponentialWithJitter creates an exponential backoff like
// BackoffExponential does, but adds jitter.
func BackoffExponentialWithJitter(scalar time.Duration, jitterFraction float64) BackoffFunc {
return func(attempt uint) time.Duration {
return backoffutils.JitterUp(scalar*time.Duration(backoffutils.ExponentBase2(attempt)), jitterFraction)
return jitterUp(scalar*time.Duration(exponentBase2(attempt)), jitterFraction)
}
}
2 changes: 1 addition & 1 deletion interceptors/retry/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// Licensed under the Apache License 2.0.

/*
`retry` provides client-side request retry logic for gRPC.
Package retry provides client-side request retry logic for gRPC.

Client-Side Request Retry Interceptor

Expand Down
36 changes: 20 additions & 16 deletions interceptors/retry/examples_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright (c) The go-grpc-middleware Authors.
// Licensed under the Apache License 2.0.

package retry_test
package retry

import (
"context"
Expand All @@ -11,7 +11,6 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/codes"

"github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/retry"
"github.com/grpc-ecosystem/go-grpc-middleware/v2/testing/testpb"
)

Expand All @@ -20,33 +19,33 @@ var cc *grpc.ClientConn
// Simple example of using the default interceptor configuration.
func Example_initialization() {
_, _ = grpc.Dial("myservice.example.com",
grpc.WithStreamInterceptor(retry.StreamClientInterceptor()),
grpc.WithUnaryInterceptor(retry.UnaryClientInterceptor()),
grpc.WithStreamInterceptor(StreamClientInterceptor()),
grpc.WithUnaryInterceptor(UnaryClientInterceptor()),
)
}

// Complex example with a 100ms linear backoff interval, and retry only on NotFound and Unavailable.
func Example_initializationWithOptions() {
opts := []retry.CallOption{
retry.WithBackoff(retry.BackoffLinear(100 * time.Millisecond)),
retry.WithCodes(codes.NotFound, codes.Aborted),
opts := []CallOption{
WithBackoff(BackoffLinear(100 * time.Millisecond)),
WithCodes(codes.NotFound, codes.Aborted),
}
_, _ = grpc.Dial("myservice.example.com",
grpc.WithStreamInterceptor(retry.StreamClientInterceptor(opts...)),
grpc.WithUnaryInterceptor(retry.UnaryClientInterceptor(opts...)),
grpc.WithStreamInterceptor(StreamClientInterceptor(opts...)),
grpc.WithUnaryInterceptor(UnaryClientInterceptor(opts...)),
)
}

// Example with an exponential backoff starting with 100ms.
//
// Each next interval is the previous interval multiplied by 2.
func Example_initializationWithExponentialBackoff() {
opts := []retry.CallOption{
retry.WithBackoff(retry.BackoffExponential(100 * time.Millisecond)),
opts := []CallOption{
WithBackoff(BackoffExponential(100 * time.Millisecond)),
}
_, _ = grpc.Dial("myservice.example.com",
grpc.WithStreamInterceptor(retry.StreamClientInterceptor(opts...)),
grpc.WithUnaryInterceptor(retry.UnaryClientInterceptor(opts...)),
grpc.WithStreamInterceptor(StreamClientInterceptor(opts...)),
grpc.WithUnaryInterceptor(UnaryClientInterceptor(opts...)),
)
}

Expand All @@ -56,7 +55,7 @@ func Example_simpleCall() {
defer cancel()

client := testpb.NewTestServiceClient(cc)
stream, _ := client.PingList(ctx, &testpb.PingListRequest{}, retry.WithMax(3))
stream, _ := client.PingList(ctx, &testpb.PingListRequest{}, WithMax(3))

for {
_, err := stream.Recv() // retries happen here
Expand Down Expand Up @@ -85,6 +84,11 @@ func ExampleWithPerRetryTimeout() {
_, _ = client.Ping(
ctx,
&testpb.PingRequest{},
retry.WithMax(3),
retry.WithPerRetryTimeout(1*time.Second))
WithMax(3),
WithPerRetryTimeout(1*time.Second))
}

// Scale duration by a factor.
func scaleDuration(d time.Duration, factor float64) time.Duration {
return time.Duration(float64(d) * factor)
}
96 changes: 63 additions & 33 deletions interceptors/retry/retry_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright (c) The go-grpc-middleware Authors.
// Licensed under the Apache License 2.0.

package retry_test
package retry

import (
"context"
Expand All @@ -18,7 +18,6 @@ import (
"google.golang.org/grpc/status"

middleware "github.com/grpc-ecosystem/go-grpc-middleware/v2"
"github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/retry"
"github.com/grpc-ecosystem/go-grpc-middleware/v2/testing/testpb"
)

Expand Down Expand Up @@ -94,15 +93,15 @@ func TestRetrySuite(t *testing.T) {
service := &failingService{
TestServiceServer: &testpb.TestPingService{T: t},
}
unaryInterceptor := retry.UnaryClientInterceptor(
retry.WithCodes(retriableErrors...),
retry.WithMax(3),
retry.WithBackoff(retry.BackoffLinear(retryTimeout)),
unaryInterceptor := UnaryClientInterceptor(
WithCodes(retriableErrors...),
WithMax(3),
WithBackoff(BackoffLinear(retryTimeout)),
)
streamInterceptor := retry.StreamClientInterceptor(
retry.WithCodes(retriableErrors...),
retry.WithMax(3),
retry.WithBackoff(retry.BackoffLinear(retryTimeout)),
streamInterceptor := StreamClientInterceptor(
WithCodes(retriableErrors...),
WithMax(3),
WithBackoff(BackoffLinear(retryTimeout)),
)
s := &RetrySuite{
srv: service,
Expand Down Expand Up @@ -148,10 +147,10 @@ func (s *RetrySuite) TestCallOptionsDontPanicWithoutInterceptor() {
s.srv.resetFailingConfiguration(100, codes.DataLoss, noSleep) // doesn't matter all requests should fail
nonMiddlewareClient := s.NewClient()
_, err := nonMiddlewareClient.Ping(s.SimpleCtx(), testpb.GoodPing,
retry.WithMax(5),
retry.WithBackoff(retry.BackoffLinear(1*time.Millisecond)),
retry.WithCodes(codes.DataLoss),
retry.WithPerRetryTimeout(1*time.Millisecond),
WithMax(5),
WithBackoff(BackoffLinear(1*time.Millisecond)),
WithCodes(codes.DataLoss),
WithPerRetryTimeout(1*time.Millisecond),
)
require.Error(s.T(), err)
}
Expand All @@ -175,7 +174,7 @@ func (s *RetrySuite) TestUnary_SucceedsOnRetriableError() {

func (s *RetrySuite) TestUnary_OverrideFromDialOpts() {
s.srv.resetFailingConfiguration(5, codes.ResourceExhausted, noSleep) // default is 3 and retriable_errors
out, err := s.Client.Ping(s.SimpleCtx(), testpb.GoodPing, retry.WithCodes(codes.ResourceExhausted), retry.WithMax(5))
out, err := s.Client.Ping(s.SimpleCtx(), testpb.GoodPing, WithCodes(codes.ResourceExhausted), WithMax(5))
require.NoError(s.T(), err, "the fifth invocation should succeed")
require.NotNil(s.T(), out, "Pong must be not nil")
require.EqualValues(s.T(), 5, s.srv.requestCount(), "five requests should have been made")
Expand All @@ -188,8 +187,8 @@ func (s *RetrySuite) TestUnary_PerCallDeadline_Succeeds() {
// a retry call with a 5 millisecond deadline. The 5th one doesn't sleep and succeeds.
deadlinePerCall := 5 * time.Millisecond
s.srv.resetFailingConfiguration(5, codes.NotFound, 2*deadlinePerCall)
out, err := s.Client.Ping(s.SimpleCtx(), testpb.GoodPing, retry.WithPerRetryTimeout(deadlinePerCall),
retry.WithMax(5))
out, err := s.Client.Ping(s.SimpleCtx(), testpb.GoodPing, WithPerRetryTimeout(deadlinePerCall),
WithMax(5))
require.NoError(s.T(), err, "the fifth invocation should succeed")
require.NotNil(s.T(), out, "Pong must be not nil")
require.EqualValues(s.T(), 5, s.srv.requestCount(), "five requests should have been made")
Expand All @@ -209,8 +208,8 @@ func (s *RetrySuite) TestUnary_PerCallDeadline_FailsOnParent() {
s.srv.resetFailingConfiguration(5, codes.NotFound, 2*deadlinePerCall)
ctx, cancel := context.WithTimeout(context.TODO(), parentDeadline)
defer cancel()
_, err := s.Client.Ping(ctx, testpb.GoodPing, retry.WithPerRetryTimeout(deadlinePerCall),
retry.WithMax(5))
_, err := s.Client.Ping(ctx, testpb.GoodPing, WithPerRetryTimeout(deadlinePerCall),
WithMax(5))
require.Error(s.T(), err, "the retries must fail due to context deadline exceeded")
require.Equal(s.T(), codes.DeadlineExceeded, status.Code(err), "failre code must be a gRPC error of Deadline class")
}
Expand All @@ -220,7 +219,7 @@ func (s *RetrySuite) TestUnary_OnRetryCallbackCalled() {

s.srv.resetFailingConfiguration(3, codes.Unavailable, noSleep) // see retriable_errors
out, err := s.Client.Ping(s.SimpleCtx(), testpb.GoodPing,
retry.WithOnRetryCallback(func(ctx context.Context, attempt uint, err error) {
WithOnRetryCallback(func(ctx context.Context, attempt uint, err error) {
retryCallbackCount++
}),
)
Expand All @@ -240,7 +239,7 @@ func (s *RetrySuite) TestServerStream_SucceedsOnRetriableError() {

func (s *RetrySuite) TestServerStream_OverrideFromContext() {
s.srv.resetFailingConfiguration(5, codes.ResourceExhausted, noSleep) // default is 3 and retriable_errors
stream, err := s.Client.PingList(s.SimpleCtx(), testpb.GoodPingList, retry.WithCodes(codes.ResourceExhausted), retry.WithMax(5))
stream, err := s.Client.PingList(s.SimpleCtx(), testpb.GoodPingList, WithCodes(codes.ResourceExhausted), WithMax(5))
require.NoError(s.T(), err, "establishing the connection must always succeed")
s.assertPingListWasCorrect(stream)
require.EqualValues(s.T(), 5, s.srv.requestCount(), "three requests should have been made")
Expand All @@ -253,8 +252,8 @@ func (s *RetrySuite) TestServerStream_PerCallDeadline_Succeeds() {
// a retry call with a 50 millisecond deadline. The 5th one doesn't sleep and succeeds.
deadlinePerCall := 100 * time.Millisecond
s.srv.resetFailingConfiguration(5, codes.NotFound, 2*deadlinePerCall)
stream, err := s.Client.PingList(s.SimpleCtx(), testpb.GoodPingList, retry.WithPerRetryTimeout(deadlinePerCall),
retry.WithMax(5))
stream, err := s.Client.PingList(s.SimpleCtx(), testpb.GoodPingList, WithPerRetryTimeout(deadlinePerCall),
WithMax(5))
require.NoError(s.T(), err, "establishing the connection must always succeed")
s.assertPingListWasCorrect(stream)
require.EqualValues(s.T(), 5, s.srv.requestCount(), "three requests should have been made")
Expand All @@ -274,8 +273,8 @@ func (s *RetrySuite) TestServerStream_PerCallDeadline_FailsOnParent() {
s.srv.resetFailingConfiguration(5, codes.NotFound, 2*deadlinePerCall)
parentCtx, cancel := context.WithTimeout(context.TODO(), parentDeadline)
defer cancel()
stream, err := s.Client.PingList(parentCtx, testpb.GoodPingList, retry.WithPerRetryTimeout(deadlinePerCall),
retry.WithMax(5))
stream, err := s.Client.PingList(parentCtx, testpb.GoodPingList, WithPerRetryTimeout(deadlinePerCall),
WithMax(5))
require.NoError(s.T(), err, "establishing the connection must always succeed")
_, err = stream.Recv()
require.Equal(s.T(), codes.DeadlineExceeded, status.Code(err), "failre code must be a gRPC error of Deadline class")
Expand All @@ -286,7 +285,7 @@ func (s *RetrySuite) TestServerStream_OnRetryCallbackCalled() {

s.srv.resetFailingConfiguration(3, codes.Unavailable, noSleep) // see retriable_errors
stream, err := s.Client.PingList(s.SimpleCtx(), testpb.GoodPingList,
retry.WithOnRetryCallback(func(ctx context.Context, attempt uint, err error) {
WithOnRetryCallback(func(ctx context.Context, attempt uint, err error) {
retryCallbackCount++
}),
)
Expand Down Expand Up @@ -322,7 +321,7 @@ func (s *RetrySuite) TestServerStream_CallRetrySucceeds() {
restarted := s.RestartServer(retryTimeout)

_, err := s.Client.PingList(s.SimpleCtx(), testpb.GoodPingList,
retry.WithMax(40),
WithMax(40),
)

assert.NoError(s.T(), err, "establishing the connection should succeed")
Expand Down Expand Up @@ -371,8 +370,8 @@ func TestChainedRetrySuite(t *testing.T) {
InterceptorTestSuite: &testpb.InterceptorTestSuite{
TestService: service,
ClientOpts: []grpc.DialOption{
grpc.WithUnaryInterceptor(middleware.ChainUnaryClient(preRetryInterceptor.UnaryClientInterceptor, retry.UnaryClientInterceptor(), postRetryInterceptor.UnaryClientInterceptor)),
grpc.WithStreamInterceptor(middleware.ChainStreamClient(preRetryInterceptor.StreamClientInterceptor, retry.StreamClientInterceptor(), postRetryInterceptor.StreamClientInterceptor)),
grpc.WithUnaryInterceptor(middleware.ChainUnaryClient(preRetryInterceptor.UnaryClientInterceptor, UnaryClientInterceptor(), postRetryInterceptor.UnaryClientInterceptor)),
grpc.WithStreamInterceptor(middleware.ChainStreamClient(preRetryInterceptor.StreamClientInterceptor, StreamClientInterceptor(), postRetryInterceptor.StreamClientInterceptor)),
},
},
}
Expand All @@ -393,7 +392,7 @@ func (s *ChainedRetrySuite) SetupTest() {
}

func (s *ChainedRetrySuite) TestUnaryWithChainedInterceptors_NoFailure() {
_, err := s.Client.Ping(s.SimpleCtx(), testpb.GoodPing, retry.WithMax(2))
_, err := s.Client.Ping(s.SimpleCtx(), testpb.GoodPing, WithMax(2))
require.NoError(s.T(), err, "the invocation should succeed")
require.EqualValues(s.T(), 1, s.srv.requestCount(), "one request should have been made")
require.EqualValues(s.T(), 1, s.preRetryInterceptor.called, "pre-retry interceptor should be called once")
Expand All @@ -402,15 +401,15 @@ func (s *ChainedRetrySuite) TestUnaryWithChainedInterceptors_NoFailure() {

func (s *ChainedRetrySuite) TestUnaryWithChainedInterceptors_WithRetry() {
s.srv.resetFailingConfiguration(2, codes.Unavailable, noSleep)
_, err := s.Client.Ping(s.SimpleCtx(), testpb.GoodPing, retry.WithMax(2))
_, err := s.Client.Ping(s.SimpleCtx(), testpb.GoodPing, WithMax(2))
require.NoError(s.T(), err, "the second invocation should succeed")
require.EqualValues(s.T(), 2, s.srv.requestCount(), "two requests should have been made")
require.EqualValues(s.T(), 1, s.preRetryInterceptor.called, "pre-retry interceptor should be called once")
require.EqualValues(s.T(), 2, s.postRetryInterceptor.called, "post-retry interceptor should be called twice")
}

func (s *ChainedRetrySuite) TestStreamWithChainedInterceptors_NoFailure() {
stream, err := s.Client.PingList(s.SimpleCtx(), testpb.GoodPingList, retry.WithMax(2))
stream, err := s.Client.PingList(s.SimpleCtx(), testpb.GoodPingList, WithMax(2))
require.NoError(s.T(), err, "the invocation should succeed")
_, err = stream.Recv()
require.NoError(s.T(), err, "the Recv should succeed")
Expand All @@ -421,11 +420,42 @@ func (s *ChainedRetrySuite) TestStreamWithChainedInterceptors_NoFailure() {

func (s *ChainedRetrySuite) TestStreamWithChainedInterceptors_WithRetry() {
s.srv.resetFailingConfiguration(2, codes.Unavailable, noSleep)
stream, err := s.Client.PingList(s.SimpleCtx(), testpb.GoodPingList, retry.WithMax(2))
stream, err := s.Client.PingList(s.SimpleCtx(), testpb.GoodPingList, WithMax(2))
require.NoError(s.T(), err, "the second invocation should succeed")
_, err = stream.Recv()
require.NoError(s.T(), err, "the Recv should succeed")
require.EqualValues(s.T(), 2, s.srv.requestCount(), "two requests should have been made")
require.EqualValues(s.T(), 1, s.preRetryInterceptor.called, "pre-retry interceptor should be called once")
require.EqualValues(s.T(), 2, s.postRetryInterceptor.called, "post-retry interceptor should be called twice")
}

func TestJitterUp(t *testing.T) {
// Arguments to jitterup.
duration := 10 * time.Second
variance := 0.10

// Bound to check.
max := 11000 * time.Millisecond
min := 9000 * time.Millisecond
high := scaleDuration(max, 0.98)
low := scaleDuration(min, 1.02)

highCount := 0
lowCount := 0

for i := 0; i < 1000; i++ {
out := jitterUp(duration, variance)
assert.True(t, out <= max, "value %s must be <= %s", out, max)
assert.True(t, out >= min, "value %s must be >= %s", out, min)

if out > high {
highCount++
}
if out < low {
lowCount++
}
}

assert.True(t, highCount != 0, "at least one sample should reach to >%s", high)
assert.True(t, lowCount != 0, "at least one sample should to <%s", low)
}