From 4d5373c259d83f8212c2005655dbebb55f48ed1b Mon Sep 17 00:00:00 2001 From: Luke Kysow <1034429+lkysow@users.noreply.github.com> Date: Thu, 28 Jul 2022 14:14:24 -0700 Subject: [PATCH 1/6] peering: retry some disconnect errors more quickly When we receive a FailedPrecondition error, retry that more quickly because we expect it will resolve shortly. This is particularly important in the context of Consul servers behind a load balancer because when establishing a connection we have to retry until we randomly land on a leader node. The default retry backoff goes from 2s, 4s, 8s, etc. which can result in very long delays quite quickly. Instead, this backoff retries in 8ms five times, then goes exponentially from there: 16ms, 32ms, ... up to a max of 8152ms. --- agent/consul/leader_peering.go | 100 +++++++++++++++++- .../services/peerstream/stream_resources.go | 93 ++++++++-------- 2 files changed, 144 insertions(+), 49 deletions(-) diff --git a/agent/consul/leader_peering.go b/agent/consul/leader_peering.go index 3288a141a15..c2f14224b9b 100644 --- a/agent/consul/leader_peering.go +++ b/agent/consul/leader_peering.go @@ -16,8 +16,10 @@ import ( "github.com/hashicorp/go-uuid" "golang.org/x/time/rate" "google.golang.org/grpc" + "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials" "google.golang.org/grpc/keepalive" + grpcstatus "google.golang.org/grpc/status" "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/agent/consul/state" @@ -300,7 +302,7 @@ func (s *Server) establishStream(ctx context.Context, logger hclog.Logger, peer } // Establish a stream-specific retry so that retrying stream/conn errors isn't dependent on state store changes. - go retryLoopBackoff(retryCtx, func() error { + go retryLoopBackoffPeering(retryCtx, logger, func() error { // Try a new address on each iteration by advancing the ring buffer on errors. defer func() { buffer = buffer.Next() @@ -358,13 +360,47 @@ func (s *Server) establishStream(ctx context.Context, logger hclog.Logger, peer return err }, func(err error) { + // TODO(peering): why are we using TrackSendError here? This could also be a receive error. streamStatus.TrackSendError(err.Error()) - logger.Error("error managing peering stream", "peer_id", peer.ID, "error", err) - }) + if isFailedPreconditionErr(err) { + logger.Debug("stream disconnected due to 'failed precondition' error; reconnecting", + "error", err) + return + } + logger.Error("error managing peering stream", "error", err) + }, peeringRetryTimeout) return nil } +// peeringRetryTimeout returns the time that should be waited between re-establishing a peering +// connection after an error. We follow the default backoff from retryLoopBackoff +// unless the error is a "failed precondition" error in which case we retry much more quickly. +// Retrying quickly is important in the case of a failed precondition error because we expect it to resolve +// quickly. For example in the case of connecting with a follower through a load balancer, we just need to retry +// until our request lands on a leader. +func peeringRetryTimeout(failedAttempts uint, loopErr error) time.Duration { + if loopErr != nil && isFailedPreconditionErr(loopErr) { + // Wait 8ms first five times. + if failedAttempts < 6 { + return 8 * time.Millisecond + } + // Then follow exponential backoff maxing out at 8192ms. + // The minus two here is so we start out the 6th retry at 16ms. + ms := 1 << (failedAttempts - 2) + if ms > 8192 { + return 8192 * time.Millisecond + } + return time.Duration(ms) * time.Millisecond + } + + // Else we go with the default backoff from retryLoopBackoff. + if (1 << failedAttempts) < maxRetryBackoff { + return (1 << failedAttempts) * time.Second + } + return time.Duration(maxRetryBackoff) * time.Second +} + func (s *Server) startPeeringDeferredDeletion(ctx context.Context) { s.leaderRoutineManager.Start(ctx, peeringDeletionRoutineName, s.runPeeringDeletions) } @@ -517,3 +553,61 @@ func (s *Server) deleteTrustBundleFromPeer(ctx context.Context, limiter *rate.Li _, err = s.raftApplyProtobuf(structs.PeeringTrustBundleDeleteType, req) return err } + +// retryLoopBackoffPeering re-runs loopFn with a backoff on error. errFn is run whenever +// loopFn returns an error. retryTimeFn is used to calculate the time between retries on error. +// It is passed the number of errors in a row that loopFn has returned and the latest error +// from loopFn. +// +// This function is modelled off of retryLoopBackoffHandleSuccess but is specific to peering +// because peering needs to use different retry times depending on which error is returned. +// This function doesn't use a rate limiter, unlike retryLoopBackoffHandleSuccess, because +// the rate limiter is only needed when retrying when loopFn returns a nil error. In the streaming +// case, when loopFn returns a nil error, the ctx has already been cancelled, and so we won't re-loop +// but instead exit. +func retryLoopBackoffPeering(ctx context.Context, logger hclog.Logger, loopFn func() error, errFn func(error), + retryTimeFn func(failedAttempts uint, loopErr error) time.Duration) { + var failedAttempts uint + var err error + for { + if err = loopFn(); err != nil { + errFn(err) + + failedAttempts++ + + retryTime := retryTimeFn(failedAttempts, err) + logger.Trace("in connection retry backoff", "delay", retryTime) + timer := time.NewTimer(retryTime) + + select { + case <-ctx.Done(): + timer.Stop() + return + case <-timer.C: + } + continue + } + + failedAttempts = 0 + select { + // NOTE: this is important to check here before re-entering the loop because our + // caller will cancel the context when the stream exits gracefully, and so we don't want to + // reconnect. + case <-ctx.Done(): + return + default: + } + } +} + +// isFailedPreconditionErr returns true if err is a gRPC error with code FailedPrecondition. +func isFailedPreconditionErr(err error) bool { + if err == nil { + return false + } + grpcErr, ok := grpcstatus.FromError(err) + if !ok { + return false + } + return grpcErr.Code() == codes.FailedPrecondition +} diff --git a/agent/grpc-external/services/peerstream/stream_resources.go b/agent/grpc-external/services/peerstream/stream_resources.go index 1feb7f01d62..2079560e7bd 100644 --- a/agent/grpc-external/services/peerstream/stream_resources.go +++ b/agent/grpc-external/services/peerstream/stream_resources.go @@ -37,9 +37,8 @@ func (s *Server) StreamResources(stream pbpeerstream.PeerStreamService_StreamRes // handling code in HandleStream() if !s.Backend.IsLeader() { - // we are not the leader so we will hang up on the dialer - - logger.Error("cannot establish a peering stream on a follower node") + // We are not the leader so we will hang up on the dialer. + logger.Debug("cannot establish a peering stream on a follower node") st, err := grpcstatus.New(codes.FailedPrecondition, "cannot establish a peering stream on a follower node").WithDetails( @@ -180,6 +179,10 @@ func (s *Server) realHandleStream(streamReq HandleStreamRequest) error { With("dialed", streamReq.WasDialed()) logger.Trace("handling stream for peer") + // handleStreamCtx is local to this function. + handleStreamCtx, cancel := context.WithCancel(streamReq.Stream.Context()) + defer cancel() + status, err := s.Tracker.Connected(streamReq.LocalID) if err != nil { return fmt.Errorf("failed to register stream: %v", err) @@ -242,58 +245,53 @@ func (s *Server) realHandleStream(streamReq HandleStreamRequest) error { PeerID: streamReq.RemoteID, }) if err := streamSend(sub); err != nil { - if err == io.EOF { - logger.Info("stream ended by peer") - return nil - } // TODO(peering) Test error handling in calls to Send/Recv return fmt.Errorf("failed to send subscription for %q to stream: %w", resourceURL, err) } } - // TODO(peering): Should this be buffered? - recvChan := make(chan *pbpeerstream.ReplicationMessage) + // recvCh sends messages from the gRPC stream. + recvCh := make(chan *pbpeerstream.ReplicationMessage) + // recvErrCh sends errors received from the gRPC stream. + recvErrCh := make(chan error) + + // Start a goroutine to read from the stream and pass to recvCh and recvErrCh. + // Using a separate goroutine allows us to process sends and receives all in the main for{} loop. go func() { - defer close(recvChan) for { msg, err := streamReq.Stream.Recv() - if err == nil { - logTraceRecv(logger, msg) - recvChan <- msg - continue + if err != nil { + recvErrCh <- err + return } - - if err == io.EOF { - logger.Info("stream ended by peer") - status.TrackRecvError(err.Error()) + logTraceRecv(logger, msg) + select { + case recvCh <- msg: + case <-handleStreamCtx.Done(): return } - logger.Error("failed to receive from stream", "error", err) - status.TrackRecvError(err.Error()) - return } }() - // Heartbeat sender. + // Start a goroutine to send heartbeats at a regular interval. go func() { tick := time.NewTicker(s.outgoingHeartbeatInterval) defer tick.Stop() for { select { - case <-streamReq.Stream.Context().Done(): + case <-handleStreamCtx.Done(): return case <-tick.C: - } - - heartbeat := &pbpeerstream.ReplicationMessage{ - Payload: &pbpeerstream.ReplicationMessage_Heartbeat_{ - Heartbeat: &pbpeerstream.ReplicationMessage_Heartbeat{}, - }, - } - if err := streamSend(heartbeat); err != nil { - logger.Warn("error sending heartbeat", "err", err) + heartbeat := &pbpeerstream.ReplicationMessage{ + Payload: &pbpeerstream.ReplicationMessage_Heartbeat_{ + Heartbeat: &pbpeerstream.ReplicationMessage_Heartbeat{}, + }, + } + if err := streamSend(heartbeat); err != nil { + logger.Warn("error sending heartbeat", "err", err) + } } } }() @@ -308,6 +306,7 @@ func (s *Server) realHandleStream(streamReq HandleStreamRequest) error { incomingHeartbeatCtxCancel() }() + // The main loop that processes sends and receives. for { select { // When the doneCh is closed that means that the peering was deleted locally. @@ -331,28 +330,30 @@ func (s *Server) realHandleStream(streamReq HandleStreamRequest) error { return nil + // Handle errors received from the stream by shutting down our handler. + case err := <-recvErrCh: + if err == io.EOF { + // NOTE: We don't expect to receive an io.EOF error here when the stream is disconnected gracefully. + // When the peering is deleted locally, status.Done() returns which is handled elsewhere and this method + // exits. When we receive a Terminated message, that's also handled elsewhere and this method + // exits. After the method exits this code here won't receive any recv errors and those will be handled + // by DrainStream(). + err = fmt.Errorf("stream ended unexpectedly") + } + status.TrackRecvError(err.Error()) + return err + // We haven't received a heartbeat within the expected interval. Kill the stream. case <-incomingHeartbeatCtx.Done(): - logger.Error("ending stream due to heartbeat timeout") return fmt.Errorf("heartbeat timeout") - case msg, open := <-recvChan: - if !open { - // The only time we expect the stream to end is when we've received a "Terminated" message. - // We handle the case of receiving the Terminated message below and then this function exits. - // So if the channel is closed while this function is still running then we haven't received a Terminated - // message which means we want to try and reestablish the stream. - // It's the responsibility of the caller of this function to reestablish the stream on error and so that's - // why we return an error here. - return fmt.Errorf("stream ended unexpectedly") - } - + case msg := <-recvCh: // NOTE: this code should have similar error handling to the // initial handling code in StreamResources() if !s.Backend.IsLeader() { - // we are not the leader anymore so we will hang up on the dialer - logger.Error("node is not a leader anymore; cannot continue streaming") + // We are not the leader anymore, so we will hang up on the dialer. + logger.Info("node is not a leader anymore; cannot continue streaming") st, err := grpcstatus.New(codes.FailedPrecondition, "node is not a leader anymore; cannot continue streaming").WithDetails( From 7a80cca16e53265aaa30842ded32477c495853c1 Mon Sep 17 00:00:00 2001 From: Luke Kysow <1034429+lkysow@users.noreply.github.com> Date: Fri, 29 Jul 2022 10:21:16 -0700 Subject: [PATCH 2/6] Fix test --- agent/grpc-external/services/peerstream/stream_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/agent/grpc-external/services/peerstream/stream_test.go b/agent/grpc-external/services/peerstream/stream_test.go index 174ecf59f3e..c4458acf0c6 100644 --- a/agent/grpc-external/services/peerstream/stream_test.go +++ b/agent/grpc-external/services/peerstream/stream_test.go @@ -589,7 +589,7 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) { testutil.RunStep(t, "client disconnect marks stream as disconnected", func(t *testing.T) { lastRecvError = it.FutureNow(1) disconnectTime := it.FutureNow(2) - lastRecvErrorMsg = io.EOF.Error() + lastRecvErrorMsg = "stream ended unexpectedly" client.Close() @@ -597,7 +597,7 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) { expect := Status{ Connected: false, - DisconnectErrorMessage: "stream ended unexpectedly", + DisconnectErrorMessage: lastRecvErrorMsg, LastAck: lastSendSuccess, LastNack: lastNack, LastNackMessage: lastNackMsg, From 026e4d82b6230e9b250b96c1927c6674df5fe889 Mon Sep 17 00:00:00 2001 From: Luke Kysow <1034429+lkysow@users.noreply.github.com> Date: Fri, 29 Jul 2022 10:22:48 -0700 Subject: [PATCH 3/6] Move function down to bottom --- agent/consul/leader_peering.go | 56 +++++++++++++++++----------------- 1 file changed, 28 insertions(+), 28 deletions(-) diff --git a/agent/consul/leader_peering.go b/agent/consul/leader_peering.go index c2f14224b9b..c5216f75da9 100644 --- a/agent/consul/leader_peering.go +++ b/agent/consul/leader_peering.go @@ -373,34 +373,6 @@ func (s *Server) establishStream(ctx context.Context, logger hclog.Logger, peer return nil } -// peeringRetryTimeout returns the time that should be waited between re-establishing a peering -// connection after an error. We follow the default backoff from retryLoopBackoff -// unless the error is a "failed precondition" error in which case we retry much more quickly. -// Retrying quickly is important in the case of a failed precondition error because we expect it to resolve -// quickly. For example in the case of connecting with a follower through a load balancer, we just need to retry -// until our request lands on a leader. -func peeringRetryTimeout(failedAttempts uint, loopErr error) time.Duration { - if loopErr != nil && isFailedPreconditionErr(loopErr) { - // Wait 8ms first five times. - if failedAttempts < 6 { - return 8 * time.Millisecond - } - // Then follow exponential backoff maxing out at 8192ms. - // The minus two here is so we start out the 6th retry at 16ms. - ms := 1 << (failedAttempts - 2) - if ms > 8192 { - return 8192 * time.Millisecond - } - return time.Duration(ms) * time.Millisecond - } - - // Else we go with the default backoff from retryLoopBackoff. - if (1 << failedAttempts) < maxRetryBackoff { - return (1 << failedAttempts) * time.Second - } - return time.Duration(maxRetryBackoff) * time.Second -} - func (s *Server) startPeeringDeferredDeletion(ctx context.Context) { s.leaderRoutineManager.Start(ctx, peeringDeletionRoutineName, s.runPeeringDeletions) } @@ -600,6 +572,34 @@ func retryLoopBackoffPeering(ctx context.Context, logger hclog.Logger, loopFn fu } } +// peeringRetryTimeout returns the time that should be waited between re-establishing a peering +// connection after an error. We follow the default backoff from retryLoopBackoff +// unless the error is a "failed precondition" error in which case we retry much more quickly. +// Retrying quickly is important in the case of a failed precondition error because we expect it to resolve +// quickly. For example in the case of connecting with a follower through a load balancer, we just need to retry +// until our request lands on a leader. +func peeringRetryTimeout(failedAttempts uint, loopErr error) time.Duration { + if loopErr != nil && isFailedPreconditionErr(loopErr) { + // Wait 8ms first five times. + if failedAttempts < 6 { + return 8 * time.Millisecond + } + // Then follow exponential backoff maxing out at 8192ms. + // The minus two here is so we start out the 6th retry at 16ms. + ms := 1 << (failedAttempts - 2) + if ms > 8192 { + return 8192 * time.Millisecond + } + return time.Duration(ms) * time.Millisecond + } + + // Else we go with the default backoff from retryLoopBackoff. + if (1 << failedAttempts) < maxRetryBackoff { + return (1 << failedAttempts) * time.Second + } + return time.Duration(maxRetryBackoff) * time.Second +} + // isFailedPreconditionErr returns true if err is a gRPC error with code FailedPrecondition. func isFailedPreconditionErr(err error) bool { if err == nil { From ac8f97b8e45cae6516ac56dd625deaddeca257cc Mon Sep 17 00:00:00 2001 From: Luke Kysow <1034429+lkysow@users.noreply.github.com> Date: Fri, 29 Jul 2022 11:04:59 -0700 Subject: [PATCH 4/6] Add some tests --- agent/consul/leader_peering.go | 39 ++++++++++++------ agent/consul/leader_peering_test.go | 62 +++++++++++++++++++++++++++++ 2 files changed, 89 insertions(+), 12 deletions(-) diff --git a/agent/consul/leader_peering.go b/agent/consul/leader_peering.go index c5216f75da9..3ccc293c11d 100644 --- a/agent/consul/leader_peering.go +++ b/agent/consul/leader_peering.go @@ -6,6 +6,7 @@ import ( "crypto/tls" "crypto/x509" "fmt" + "math" "time" "github.com/armon/go-metrics" @@ -40,6 +41,15 @@ var LeaderPeeringMetrics = []prometheus.GaugeDefinition{ "We emit this metric every 9 seconds", }, } +var ( + // fastConnRetryTimeout is how long we wait between retrying connections following the "fast" path + // which is triggered on specific connection errors. + fastConnRetryTimeout = 8 * time.Millisecond + // maxFastConnRetries is the maximum number of fast connection retries before we follow exponential backoff. + maxFastConnRetries = uint(5) + // maxFastRetryBackoff is the maximum amount of time we'll wait between retries following the fast path. + maxFastRetryBackoff = 8192 * time.Millisecond +) func (s *Server) startPeeringStreamSync(ctx context.Context) { s.leaderRoutineManager.Start(ctx, peeringStreamsRoutineName, s.runPeeringSync) @@ -545,7 +555,9 @@ func retryLoopBackoffPeering(ctx context.Context, logger hclog.Logger, loopFn fu if err = loopFn(); err != nil { errFn(err) - failedAttempts++ + if failedAttempts < math.MaxUint { + failedAttempts++ + } retryTime := retryTimeFn(failedAttempts, err) logger.Trace("in connection retry backoff", "delay", retryTime) @@ -580,17 +592,20 @@ func retryLoopBackoffPeering(ctx context.Context, logger hclog.Logger, loopFn fu // until our request lands on a leader. func peeringRetryTimeout(failedAttempts uint, loopErr error) time.Duration { if loopErr != nil && isFailedPreconditionErr(loopErr) { - // Wait 8ms first five times. - if failedAttempts < 6 { - return 8 * time.Millisecond - } - // Then follow exponential backoff maxing out at 8192ms. - // The minus two here is so we start out the 6th retry at 16ms. - ms := 1 << (failedAttempts - 2) - if ms > 8192 { - return 8192 * time.Millisecond - } - return time.Duration(ms) * time.Millisecond + // Wait a constant time for the first number of retries. + if failedAttempts <= maxFastConnRetries { + return fastConnRetryTimeout + } + // From here, follow an exponential backoff maxing out at maxFastRetryBackoff. + // The below equation multiples the constantRetryTimeout by 2^n where n is the number of failed attempts + // we're on, starting at 1 now that we're past our maxFastConnRetries. + // For example if fastConnRetryTimeout == 8ms and maxFastConnRetries == 5, then at 6 failed retries + // we'll do 8ms * 2^1 = 16ms, then 8ms * 2^2 = 32ms, etc. + ms := fastConnRetryTimeout * (1 << (failedAttempts - maxFastConnRetries)) + if ms > maxFastRetryBackoff { + return maxFastRetryBackoff + } + return ms } // Else we go with the default backoff from retryLoopBackoff. diff --git a/agent/consul/leader_peering_test.go b/agent/consul/leader_peering_test.go index feaf5be027d..67bc24b6303 100644 --- a/agent/consul/leader_peering_test.go +++ b/agent/consul/leader_peering_test.go @@ -4,6 +4,7 @@ import ( "context" "encoding/base64" "encoding/json" + "errors" "fmt" "io/ioutil" "testing" @@ -12,6 +13,8 @@ import ( "github.com/armon/go-metrics" "github.com/stretchr/testify/require" "google.golang.org/grpc" + "google.golang.org/grpc/codes" + grpcstatus "google.golang.org/grpc/status" "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/agent/consul/state" @@ -1122,3 +1125,62 @@ func TestLeader_Peering_NoEstablishmentWhenPeeringDisabled(t *testing.T) { return found }, 7*time.Second, 1*time.Second, "peering should not have been established") } + +// Test peeringRetryTimeout when the errors are FailedPrecondition errors because these +// errors have a different backoff. +func TestLeader_Peering_peeringRetryTimeout_failedPreconditionErrors(t *testing.T) { + cases := []struct { + failedAttempts uint + expDuration time.Duration + }{ + // Constant time backoff. + {0, 8 * time.Millisecond}, + {1, 8 * time.Millisecond}, + {2, 8 * time.Millisecond}, + {3, 8 * time.Millisecond}, + {4, 8 * time.Millisecond}, + {5, 8 * time.Millisecond}, + // Then exponential. + {6, 16 * time.Millisecond}, + {7, 32 * time.Millisecond}, + {13, 2048 * time.Millisecond}, + {14, 4096 * time.Millisecond}, + {15, 8192 * time.Millisecond}, + // Max. + {16, 8192 * time.Millisecond}, + {17, 8192 * time.Millisecond}, + } + + for _, c := range cases { + t.Run(fmt.Sprintf("failed attempts %d", c.failedAttempts), func(t *testing.T) { + err := grpcstatus.Error(codes.FailedPrecondition, "msg") + require.Equal(t, c.expDuration, peeringRetryTimeout(c.failedAttempts, err)) + }) + } +} + +// Test peeringRetryTimeout with non-FailedPrecondition errors because these errors have a different +// backoff from FailedPrecondition errors. +func TestLeader_Peering_peeringRetryTimeout_regularErrors(t *testing.T) { + cases := []struct { + failedAttempts uint + expDuration time.Duration + }{ + // Exponential. + {0, 1 * time.Second}, + {1, 2 * time.Second}, + {2, 4 * time.Second}, + {3, 8 * time.Second}, + // Until max. + {8, 256 * time.Second}, + {9, 256 * time.Second}, + {10, 256 * time.Second}, + } + + for _, c := range cases { + t.Run(fmt.Sprintf("failed attempts %d", c.failedAttempts), func(t *testing.T) { + err := errors.New("error") + require.Equal(t, c.expDuration, peeringRetryTimeout(c.failedAttempts, err)) + }) + } +} From c2696846ab5080569587cc9d88acc37ed602d3a5 Mon Sep 17 00:00:00 2001 From: Luke Kysow <1034429+lkysow@users.noreply.github.com> Date: Fri, 29 Jul 2022 12:14:17 -0700 Subject: [PATCH 5/6] Test retryLoopBackoffPeering --- agent/consul/leader_peering_test.go | 67 +++++++++++++++++++++++++++++ 1 file changed, 67 insertions(+) diff --git a/agent/consul/leader_peering_test.go b/agent/consul/leader_peering_test.go index 67bc24b6303..0ad97738b13 100644 --- a/agent/consul/leader_peering_test.go +++ b/agent/consul/leader_peering_test.go @@ -11,6 +11,7 @@ import ( "time" "github.com/armon/go-metrics" + "github.com/hashicorp/go-hclog" "github.com/stretchr/testify/require" "google.golang.org/grpc" "google.golang.org/grpc/codes" @@ -1184,3 +1185,69 @@ func TestLeader_Peering_peeringRetryTimeout_regularErrors(t *testing.T) { }) } } + +// This test exercises all the functionality of retryLoopBackoffPeering. +func TestLeader_Peering_retryLoopBackoffPeering(t *testing.T) { + // We'll cancel the ctx to ensure the loop exits. + ctx, cancel := context.WithCancel(context.Background()) + logger := hclog.NewNullLogger() + + // loopCount counts how many times we executed loopFn. + loopCount := 0 + // loopTimes holds the times at which each loopFn was executed. We use this to test the timeout functionality. + var loopTimes []time.Time + // loopFn will run 5 times and do something different on each loop. + loopFn := func() error { + loopCount++ + loopTimes = append(loopTimes, time.Now()) + if loopCount == 1 { + return fmt.Errorf("error 1") + } + if loopCount == 2 { + return fmt.Errorf("error 2") + } + if loopCount == 3 { + return nil + } + if loopCount == 4 { + return fmt.Errorf("error 4") + } + // On the 5th loop stop. + cancel() + return nil + } + // allErrors collects all the errors passed into errFn. + var allErrors []error + errFn := func(e error) { + allErrors = append(allErrors, e) + } + retryTimeFn := func(_ uint, _ error) time.Duration { + return 1 * time.Millisecond + } + + retryLoopBackoffPeering(ctx, logger, loopFn, errFn, retryTimeFn) + + // Ensure loopFn ran the number of expected times. + require.Equal(t, 5, loopCount) + // Ensure errFn ran as expected. + require.Equal(t, []error{ + fmt.Errorf("error 1"), + fmt.Errorf("error 2"), + fmt.Errorf("error 4"), + }, allErrors) + + // Test retryTimeFn by comparing the difference between when each loopFn ran. + // Except for the success case, the difference between each loop time should be > 1ms. + for i := range loopTimes { + if i == 0 { + // Can't compare first time. + continue + } + if i == 3 { + // At i == 3, loopFn was successful which then gets retried immediately. + continue + } + require.True(t, loopTimes[i].Sub(loopTimes[i-1]) >= 1*time.Millisecond, + "time between indices %d and %d was > 1ms", i, i-1) + } +} From 95d1e8bd6de3c408683a029bd3d2bf51185f8aa5 Mon Sep 17 00:00:00 2001 From: Luke Kysow <1034429+lkysow@users.noreply.github.com> Date: Fri, 29 Jul 2022 12:50:32 -0700 Subject: [PATCH 6/6] Exit loop on success --- agent/consul/leader_peering.go | 19 ++--------- agent/consul/leader_peering_test.go | 50 +++++++++++++++++++++-------- 2 files changed, 39 insertions(+), 30 deletions(-) diff --git a/agent/consul/leader_peering.go b/agent/consul/leader_peering.go index 3ccc293c11d..954daddce4c 100644 --- a/agent/consul/leader_peering.go +++ b/agent/consul/leader_peering.go @@ -361,10 +361,7 @@ func (s *Server) establishStream(ctx context.Context, logger hclog.Logger, peer if err == nil { stream.CloseSend() s.peerStreamServer.DrainStream(streamReq) - - // This will cancel the retry-er context, letting us break out of this loop when we want to shut down the stream. cancel() - logger.Info("closed outbound stream") } return err @@ -544,9 +541,8 @@ func (s *Server) deleteTrustBundleFromPeer(ctx context.Context, limiter *rate.Li // This function is modelled off of retryLoopBackoffHandleSuccess but is specific to peering // because peering needs to use different retry times depending on which error is returned. // This function doesn't use a rate limiter, unlike retryLoopBackoffHandleSuccess, because -// the rate limiter is only needed when retrying when loopFn returns a nil error. In the streaming -// case, when loopFn returns a nil error, the ctx has already been cancelled, and so we won't re-loop -// but instead exit. +// the rate limiter is only needed in the success case when loopFn returns nil and we want to +// loop again. In the peering case, we exit on a successful loop so we don't need the limter. func retryLoopBackoffPeering(ctx context.Context, logger hclog.Logger, loopFn func() error, errFn func(error), retryTimeFn func(failedAttempts uint, loopErr error) time.Duration) { var failedAttempts uint @@ -571,16 +567,7 @@ func retryLoopBackoffPeering(ctx context.Context, logger hclog.Logger, loopFn fu } continue } - - failedAttempts = 0 - select { - // NOTE: this is important to check here before re-entering the loop because our - // caller will cancel the context when the stream exits gracefully, and so we don't want to - // reconnect. - case <-ctx.Done(): - return - default: - } + return } } diff --git a/agent/consul/leader_peering_test.go b/agent/consul/leader_peering_test.go index 0ad97738b13..8a0461c38aa 100644 --- a/agent/consul/leader_peering_test.go +++ b/agent/consul/leader_peering_test.go @@ -1188,8 +1188,7 @@ func TestLeader_Peering_peeringRetryTimeout_regularErrors(t *testing.T) { // This test exercises all the functionality of retryLoopBackoffPeering. func TestLeader_Peering_retryLoopBackoffPeering(t *testing.T) { - // We'll cancel the ctx to ensure the loop exits. - ctx, cancel := context.WithCancel(context.Background()) + ctx := context.Background() logger := hclog.NewNullLogger() // loopCount counts how many times we executed loopFn. @@ -1207,13 +1206,9 @@ func TestLeader_Peering_retryLoopBackoffPeering(t *testing.T) { return fmt.Errorf("error 2") } if loopCount == 3 { + // On the 3rd loop, return success which ends the loop. return nil } - if loopCount == 4 { - return fmt.Errorf("error 4") - } - // On the 5th loop stop. - cancel() return nil } // allErrors collects all the errors passed into errFn. @@ -1228,26 +1223,53 @@ func TestLeader_Peering_retryLoopBackoffPeering(t *testing.T) { retryLoopBackoffPeering(ctx, logger, loopFn, errFn, retryTimeFn) // Ensure loopFn ran the number of expected times. - require.Equal(t, 5, loopCount) + require.Equal(t, 3, loopCount) // Ensure errFn ran as expected. require.Equal(t, []error{ fmt.Errorf("error 1"), fmt.Errorf("error 2"), - fmt.Errorf("error 4"), }, allErrors) // Test retryTimeFn by comparing the difference between when each loopFn ran. - // Except for the success case, the difference between each loop time should be > 1ms. for i := range loopTimes { if i == 0 { // Can't compare first time. continue } - if i == 3 { - // At i == 3, loopFn was successful which then gets retried immediately. - continue - } require.True(t, loopTimes[i].Sub(loopTimes[i-1]) >= 1*time.Millisecond, "time between indices %d and %d was > 1ms", i, i-1) } } + +// Test that if the context is cancelled the loop exits. +func TestLeader_Peering_retryLoopBackoffPeering_cancelContext(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + logger := hclog.NewNullLogger() + + // loopCount counts how many times we executed loopFn. + loopCount := 0 + loopFn := func() error { + loopCount++ + return fmt.Errorf("error %d", loopCount) + } + // allErrors collects all the errors passed into errFn. + var allErrors []error + errFn := func(e error) { + allErrors = append(allErrors, e) + } + // Set the retry time to a huge number. + retryTimeFn := func(_ uint, _ error) time.Duration { + return 1 * time.Millisecond + } + + // Cancel the context before the loop runs. It should run once and then exit. + cancel() + retryLoopBackoffPeering(ctx, logger, loopFn, errFn, retryTimeFn) + + // Ensure loopFn ran the number of expected times. + require.Equal(t, 1, loopCount) + // Ensure errFn ran as expected. + require.Equal(t, []error{ + fmt.Errorf("error 1"), + }, allErrors) +}