Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 102 additions & 6 deletions agent/consul/leader_peering.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"crypto/tls"
"crypto/x509"
"fmt"
"math"
"time"

"github.com/armon/go-metrics"
Expand All @@ -16,8 +17,10 @@ import (
"github.com/hashicorp/go-uuid"
"golang.org/x/time/rate"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/keepalive"
grpcstatus "google.golang.org/grpc/status"

"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/consul/state"
Expand All @@ -38,6 +41,15 @@ var LeaderPeeringMetrics = []prometheus.GaugeDefinition{
"We emit this metric every 9 seconds",
},
}
var (
// fastConnRetryTimeout is how long we wait between retrying connections following the "fast" path
// which is triggered on specific connection errors.
fastConnRetryTimeout = 8 * time.Millisecond
// maxFastConnRetries is the maximum number of fast connection retries before we follow exponential backoff.
maxFastConnRetries = uint(5)
// maxFastRetryBackoff is the maximum amount of time we'll wait between retries following the fast path.
maxFastRetryBackoff = 8192 * time.Millisecond
)

func (s *Server) startPeeringStreamSync(ctx context.Context) {
s.leaderRoutineManager.Start(ctx, peeringStreamsRoutineName, s.runPeeringSync)
Expand Down Expand Up @@ -300,7 +312,7 @@ func (s *Server) establishStream(ctx context.Context, logger hclog.Logger, peer
}

// Establish a stream-specific retry so that retrying stream/conn errors isn't dependent on state store changes.
go retryLoopBackoff(retryCtx, func() error {
go retryLoopBackoffPeering(retryCtx, logger, func() error {
// Try a new address on each iteration by advancing the ring buffer on errors.
defer func() {
buffer = buffer.Next()
Expand Down Expand Up @@ -349,18 +361,21 @@ func (s *Server) establishStream(ctx context.Context, logger hclog.Logger, peer
if err == nil {
stream.CloseSend()
s.peerStreamServer.DrainStream(streamReq)

// This will cancel the retry-er context, letting us break out of this loop when we want to shut down the stream.
cancel()

logger.Info("closed outbound stream")
}
return err

}, func(err error) {
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Log errors differently so as to not spam logs with not actual errors.

// TODO(peering): why are we using TrackSendError here? This could also be a receive error.
streamStatus.TrackSendError(err.Error())
logger.Error("error managing peering stream", "peer_id", peer.ID, "error", err)
})
if isFailedPreconditionErr(err) {
logger.Debug("stream disconnected due to 'failed precondition' error; reconnecting",
"error", err)
return
}
logger.Error("error managing peering stream", "error", err)
}, peeringRetryTimeout)

return nil
}
Expand Down Expand Up @@ -517,3 +532,84 @@ func (s *Server) deleteTrustBundleFromPeer(ctx context.Context, limiter *rate.Li
_, err = s.raftApplyProtobuf(structs.PeeringTrustBundleDeleteType, req)
return err
}

// retryLoopBackoffPeering re-runs loopFn with a backoff on error. errFn is run whenever
// loopFn returns an error. retryTimeFn is used to calculate the time between retries on error.
// It is passed the number of errors in a row that loopFn has returned and the latest error
// from loopFn.
//
// This function is modelled off of retryLoopBackoffHandleSuccess but is specific to peering
// because peering needs to use different retry times depending on which error is returned.
// This function doesn't use a rate limiter, unlike retryLoopBackoffHandleSuccess, because
// the rate limiter is only needed in the success case when loopFn returns nil and we want to
// loop again. In the peering case, we exit on a successful loop so we don't need the limter.
func retryLoopBackoffPeering(ctx context.Context, logger hclog.Logger, loopFn func() error, errFn func(error),
retryTimeFn func(failedAttempts uint, loopErr error) time.Duration) {
var failedAttempts uint
var err error
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

err doesn't need to be here

for {
if err = loopFn(); err != nil {
errFn(err)

if failedAttempts < math.MaxUint {
failedAttempts++
}

retryTime := retryTimeFn(failedAttempts, err)
logger.Trace("in connection retry backoff", "delay", retryTime)
timer := time.NewTimer(retryTime)

select {
case <-ctx.Done():
timer.Stop()
return
case <-timer.C:
}
continue
}
return
}
}

// peeringRetryTimeout returns the time that should be waited between re-establishing a peering
// connection after an error. We follow the default backoff from retryLoopBackoff
// unless the error is a "failed precondition" error in which case we retry much more quickly.
// Retrying quickly is important in the case of a failed precondition error because we expect it to resolve
// quickly. For example in the case of connecting with a follower through a load balancer, we just need to retry
// until our request lands on a leader.
func peeringRetryTimeout(failedAttempts uint, loopErr error) time.Duration {
if loopErr != nil && isFailedPreconditionErr(loopErr) {
// Wait a constant time for the first number of retries.
if failedAttempts <= maxFastConnRetries {
return fastConnRetryTimeout
}
// From here, follow an exponential backoff maxing out at maxFastRetryBackoff.
// The below equation multiples the constantRetryTimeout by 2^n where n is the number of failed attempts
// we're on, starting at 1 now that we're past our maxFastConnRetries.
// For example if fastConnRetryTimeout == 8ms and maxFastConnRetries == 5, then at 6 failed retries
// we'll do 8ms * 2^1 = 16ms, then 8ms * 2^2 = 32ms, etc.
ms := fastConnRetryTimeout * (1 << (failedAttempts - maxFastConnRetries))
if ms > maxFastRetryBackoff {
return maxFastRetryBackoff
}
return ms
}

// Else we go with the default backoff from retryLoopBackoff.
if (1 << failedAttempts) < maxRetryBackoff {
return (1 << failedAttempts) * time.Second
}
return time.Duration(maxRetryBackoff) * time.Second
}

// isFailedPreconditionErr returns true if err is a gRPC error with code FailedPrecondition.
func isFailedPreconditionErr(err error) bool {
if err == nil {
return false
}
grpcErr, ok := grpcstatus.FromError(err)
if !ok {
return false
}
return grpcErr.Code() == codes.FailedPrecondition
}
151 changes: 151 additions & 0 deletions agent/consul/leader_peering_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,18 @@ import (
"context"
"encoding/base64"
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"testing"
"time"

"github.com/armon/go-metrics"
"github.com/hashicorp/go-hclog"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
grpcstatus "google.golang.org/grpc/status"

"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/consul/state"
Expand Down Expand Up @@ -1122,3 +1126,150 @@ func TestLeader_Peering_NoEstablishmentWhenPeeringDisabled(t *testing.T) {
return found
}, 7*time.Second, 1*time.Second, "peering should not have been established")
}

// Test peeringRetryTimeout when the errors are FailedPrecondition errors because these
// errors have a different backoff.
func TestLeader_Peering_peeringRetryTimeout_failedPreconditionErrors(t *testing.T) {
cases := []struct {
failedAttempts uint
expDuration time.Duration
}{
// Constant time backoff.
{0, 8 * time.Millisecond},
{1, 8 * time.Millisecond},
{2, 8 * time.Millisecond},
{3, 8 * time.Millisecond},
{4, 8 * time.Millisecond},
{5, 8 * time.Millisecond},
// Then exponential.
{6, 16 * time.Millisecond},
{7, 32 * time.Millisecond},
{13, 2048 * time.Millisecond},
{14, 4096 * time.Millisecond},
{15, 8192 * time.Millisecond},
// Max.
{16, 8192 * time.Millisecond},
{17, 8192 * time.Millisecond},
}

for _, c := range cases {
t.Run(fmt.Sprintf("failed attempts %d", c.failedAttempts), func(t *testing.T) {
err := grpcstatus.Error(codes.FailedPrecondition, "msg")
require.Equal(t, c.expDuration, peeringRetryTimeout(c.failedAttempts, err))
})
}
}

// Test peeringRetryTimeout with non-FailedPrecondition errors because these errors have a different
// backoff from FailedPrecondition errors.
func TestLeader_Peering_peeringRetryTimeout_regularErrors(t *testing.T) {
cases := []struct {
failedAttempts uint
expDuration time.Duration
}{
// Exponential.
{0, 1 * time.Second},
{1, 2 * time.Second},
{2, 4 * time.Second},
{3, 8 * time.Second},
// Until max.
{8, 256 * time.Second},
{9, 256 * time.Second},
{10, 256 * time.Second},
}

for _, c := range cases {
t.Run(fmt.Sprintf("failed attempts %d", c.failedAttempts), func(t *testing.T) {
err := errors.New("error")
require.Equal(t, c.expDuration, peeringRetryTimeout(c.failedAttempts, err))
})
}
}

// This test exercises all the functionality of retryLoopBackoffPeering.
func TestLeader_Peering_retryLoopBackoffPeering(t *testing.T) {
ctx := context.Background()
logger := hclog.NewNullLogger()

// loopCount counts how many times we executed loopFn.
loopCount := 0
// loopTimes holds the times at which each loopFn was executed. We use this to test the timeout functionality.
var loopTimes []time.Time
// loopFn will run 5 times and do something different on each loop.
loopFn := func() error {
loopCount++
loopTimes = append(loopTimes, time.Now())
if loopCount == 1 {
return fmt.Errorf("error 1")
}
if loopCount == 2 {
return fmt.Errorf("error 2")
}
if loopCount == 3 {
// On the 3rd loop, return success which ends the loop.
return nil
}
return nil
}
// allErrors collects all the errors passed into errFn.
var allErrors []error
errFn := func(e error) {
allErrors = append(allErrors, e)
}
retryTimeFn := func(_ uint, _ error) time.Duration {
return 1 * time.Millisecond
}

retryLoopBackoffPeering(ctx, logger, loopFn, errFn, retryTimeFn)

// Ensure loopFn ran the number of expected times.
require.Equal(t, 3, loopCount)
// Ensure errFn ran as expected.
require.Equal(t, []error{
fmt.Errorf("error 1"),
fmt.Errorf("error 2"),
}, allErrors)

// Test retryTimeFn by comparing the difference between when each loopFn ran.
for i := range loopTimes {
if i == 0 {
// Can't compare first time.
continue
}
require.True(t, loopTimes[i].Sub(loopTimes[i-1]) >= 1*time.Millisecond,
"time between indices %d and %d was > 1ms", i, i-1)
}
}

// Test that if the context is cancelled the loop exits.
func TestLeader_Peering_retryLoopBackoffPeering_cancelContext(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
logger := hclog.NewNullLogger()

// loopCount counts how many times we executed loopFn.
loopCount := 0
loopFn := func() error {
loopCount++
return fmt.Errorf("error %d", loopCount)
}
// allErrors collects all the errors passed into errFn.
var allErrors []error
errFn := func(e error) {
allErrors = append(allErrors, e)
}
// Set the retry time to a huge number.
retryTimeFn := func(_ uint, _ error) time.Duration {
return 1 * time.Millisecond
}

// Cancel the context before the loop runs. It should run once and then exit.
cancel()
retryLoopBackoffPeering(ctx, logger, loopFn, errFn, retryTimeFn)

// Ensure loopFn ran the number of expected times.
require.Equal(t, 1, loopCount)
// Ensure errFn ran as expected.
require.Equal(t, []error{
fmt.Errorf("error 1"),
}, allErrors)
}
Loading