Skip to content

Commit

Permalink
xdsclient: always backoff between new streams even after successful s…
Browse files Browse the repository at this point in the history
…tream (#5280)
  • Loading branch information
dfawley authored Apr 1, 2022
1 parent 4e78093 commit 3cccf6a
Show file tree
Hide file tree
Showing 7 changed files with 29 additions and 35 deletions.
2 changes: 1 addition & 1 deletion xds/internal/xdsclient/authority.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func (c *clientImpl) newAuthority(config *bootstrap.ServerConfig) (_ *authority,
ret.close()
}
}()
ctr, err := newController(config, ret.pubsub, c.updateValidator, c.logger)
ctr, err := newController(config, ret.pubsub, c.updateValidator, c.logger, nil)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion xds/internal/xdsclient/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ type testController struct {
func overrideNewController(t *testing.T) *testutils.Channel {
origNewController := newController
ch := testutils.NewChannel()
newController = func(config *bootstrap.ServerConfig, pubsub *pubsub.Pubsub, validator xdsresource.UpdateValidatorFunc, logger *grpclog.PrefixLogger) (controllerInterface, error) {
newController = func(config *bootstrap.ServerConfig, pubsub *pubsub.Pubsub, validator xdsresource.UpdateValidatorFunc, logger *grpclog.PrefixLogger, _ func(int) time.Duration) (controllerInterface, error) {
ret := newTestController(config)
ch.Send(ret)
return ret, nil
Expand Down
6 changes: 4 additions & 2 deletions xds/internal/xdsclient/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package xdsclient

import (
"time"

"google.golang.org/grpc/internal/grpclog"
"google.golang.org/grpc/xds/internal/xdsclient/bootstrap"
"google.golang.org/grpc/xds/internal/xdsclient/controller"
Expand All @@ -33,6 +35,6 @@ type controllerInterface interface {
Close()
}

var newController = func(config *bootstrap.ServerConfig, pubsub *pubsub.Pubsub, validator xdsresource.UpdateValidatorFunc, logger *grpclog.PrefixLogger) (controllerInterface, error) {
return controller.New(config, pubsub, validator, logger)
var newController = func(config *bootstrap.ServerConfig, pubsub *pubsub.Pubsub, validator xdsresource.UpdateValidatorFunc, logger *grpclog.PrefixLogger, boff func(int) time.Duration) (controllerInterface, error) {
return controller.New(config, pubsub, validator, logger, boff)
}
7 changes: 5 additions & 2 deletions xds/internal/xdsclient/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func SetGRPCDial(dialer func(target string, opts ...grpc.DialOption) (*grpc.Clie
}

// New creates a new controller.
func New(config *bootstrap.ServerConfig, updateHandler pubsub.UpdateHandler, validator xdsresource.UpdateValidatorFunc, logger *grpclog.PrefixLogger) (_ *Controller, retErr error) {
func New(config *bootstrap.ServerConfig, updateHandler pubsub.UpdateHandler, validator xdsresource.UpdateValidatorFunc, logger *grpclog.PrefixLogger, boff func(int) time.Duration) (_ *Controller, retErr error) {
switch {
case config == nil:
return nil, errors.New("xds: no xds_server provided")
Expand All @@ -120,12 +120,15 @@ func New(config *bootstrap.ServerConfig, updateHandler pubsub.UpdateHandler, val
}),
}

if boff == nil {
boff = backoff.DefaultExponential.Backoff
}
ret := &Controller{
config: config,
updateValidator: validator,
updateHandler: updateHandler,

backoff: backoff.DefaultExponential.Backoff, // TODO: should this be configurable?
backoff: boff,
streamCh: make(chan grpc.ClientStream, 1),
sendCh: buffer.NewUnbounded(),
watchMap: make(map[xdsresource.ResourceType]map[string]bool),
Expand Down
6 changes: 3 additions & 3 deletions xds/internal/xdsclient/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func (s) TestNew(t *testing.T) {

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
c, err := New(test.config, noopUpdateHandler, nil, nil) // Only testing the config, other inputs are left as nil.
c, err := New(test.config, noopUpdateHandler, nil, nil, nil) // Only testing the config, other inputs are left as nil.
defer func() {
if c != nil {
c.Close()
Expand Down Expand Up @@ -123,7 +123,7 @@ func (s) TestNewWithGRPCDial(t *testing.T) {

// Set the dialer and make sure it is called.
SetGRPCDial(customDialer)
c, err := New(config, noopUpdateHandler, nil, nil)
c, err := New(config, noopUpdateHandler, nil, nil, nil)
if err != nil {
t.Fatalf("New(%+v) = %v, want no error", config, err)
}
Expand All @@ -138,7 +138,7 @@ func (s) TestNewWithGRPCDial(t *testing.T) {

// Reset the dialer and make sure it is not called.
SetGRPCDial(grpc.Dial)
c, err = New(config, noopUpdateHandler, nil, nil)
c, err = New(config, noopUpdateHandler, nil, nil, nil)
defer func() {
if c != nil {
c.Close()
Expand Down
36 changes: 14 additions & 22 deletions xds/internal/xdsclient/controller/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,26 +59,21 @@ func (t *Controller) run(ctx context.Context) {
// report error (and log) when stats is transient failure.

retries := 0
for {
select {
case <-ctx.Done():
return
default:
}

if retries != 0 {
timer := time.NewTimer(t.backoff(retries))
lastStreamStartTime := time.Time{}
for ctx.Err() == nil {
dur := time.Until(lastStreamStartTime.Add(t.backoff(retries)))
if dur > 0 {
timer := time.NewTimer(dur)
select {
case <-timer.C:
case <-ctx.Done():
if !timer.Stop() {
<-timer.C
}
timer.Stop()
return
}
}

retries++
lastStreamStartTime = time.Now()
stream, err := t.vClient.NewStream(ctx, t.cc)
if err != nil {
t.updateHandler.NewConnectionError(err)
Expand Down Expand Up @@ -370,24 +365,21 @@ func (t *Controller) processAckInfo(ack *ackAction, stream grpc.ClientStream) (t
// It blocks until the context is cancelled.
func (t *Controller) reportLoad(ctx context.Context, cc *grpc.ClientConn, opts controllerversion.LoadReportingOptions) {
retries := 0
for {
if ctx.Err() != nil {
return
}

if retries != 0 {
timer := time.NewTimer(t.backoff(retries))
lastStreamStartTime := time.Time{}
for ctx.Err() == nil {
dur := time.Until(lastStreamStartTime.Add(t.backoff(retries)))
if dur > 0 {
timer := time.NewTimer(dur)
select {
case <-timer.C:
case <-ctx.Done():
if !timer.Stop() {
<-timer.C
}
timer.Stop()
return
}
}

retries++
lastStreamStartTime = time.Now()
stream, err := t.vClient.NewLoadStatsStream(ctx, cc)
if err != nil {
t.logger.Warningf("lrs: failed to create stream: %v", err)
Expand Down
5 changes: 1 addition & 4 deletions xds/internal/xdsclient/controller/v2_testutils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -458,13 +458,10 @@ func newTestController(p pubsub.UpdateHandler, controlPlanAddr string, n *basepb
Creds: grpc.WithTransportCredentials(insecure.NewCredentials()),
TransportAPI: version.TransportV2,
NodeProto: n,
}, p, nil, l)
}, p, nil, l, b)
if err != nil {
return nil, err
}
// This direct setting backoff seems a bit hacky, but should be OK for the
// tests. Or we need to make it configurable in New().
c.backoff = b
return c, nil
}

Expand Down

0 comments on commit 3cccf6a

Please sign in to comment.