From cb85233acb1cf561e7852491b3f1496df917778c Mon Sep 17 00:00:00 2001 From: Vladimir Popov Date: Mon, 20 Sep 2021 16:17:32 +0700 Subject: [PATCH] Add refresh retries on failure Signed-off-by: Vladimir Popov --- pkg/networkservice/common/refresh/client.go | 31 ++++++++++++------- .../common/refresh/client_test.go | 4 +-- 2 files changed, 22 insertions(+), 13 deletions(-) diff --git a/pkg/networkservice/common/refresh/client.go b/pkg/networkservice/common/refresh/client.go index ec227337b7..8e38177a23 100644 --- a/pkg/networkservice/common/refresh/client.go +++ b/pkg/networkservice/common/refresh/client.go @@ -29,14 +29,13 @@ import ( "github.com/pkg/errors" "google.golang.org/grpc" - "github.com/networkservicemesh/sdk/pkg/networkservice/common/begin" - "github.com/networkservicemesh/sdk/pkg/networkservice/utils/metadata" - "github.com/networkservicemesh/sdk/pkg/tools/log" - "github.com/networkservicemesh/api/pkg/api/networkservice" + "github.com/networkservicemesh/sdk/pkg/networkservice/common/begin" "github.com/networkservicemesh/sdk/pkg/networkservice/core/next" + "github.com/networkservicemesh/sdk/pkg/networkservice/utils/metadata" "github.com/networkservicemesh/sdk/pkg/tools/clock" + "github.com/networkservicemesh/sdk/pkg/tools/log" ) type refreshClient struct { @@ -52,6 +51,8 @@ func NewClient(ctx context.Context) networkservice.NetworkServiceClient { } func (t *refreshClient) Request(ctx context.Context, request *networkservice.NetworkServiceRequest, opts ...grpc.CallOption) (*networkservice.Connection, error) { + logger := log.FromContext(ctx).WithField("refreshClient", "Request") + conn, err := next.Client(ctx).Request(ctx, request, opts...) if err != nil { return nil, err @@ -77,14 +78,22 @@ func (t *refreshClient) Request(ctx context.Context, request *networkservice.Net timeClock := clock.FromContext(ctx) // Create the afterCh *outside* the go routine. This must be done to avoid picking up a later 'now' // from mockClock in testing - afterCh := timeClock.After(refreshAfter) - go func(cancelCtx context.Context, afterCh <-chan time.Time) { - select { - case <-cancelCtx.Done(): - case <-afterCh: - eventFactory.Request(begin.CancelContext(cancelCtx)) + afterTicker := timeClock.Ticker(refreshAfter) + go func() { + defer afterTicker.Stop() + for { + select { + case <-cancelCtx.Done(): + return + case <-afterTicker.C(): + if err := <-eventFactory.Request(begin.CancelContext(cancelCtx)); err != nil { + logger.Warnf("refresh failed: %s", err.Error()) + continue + } + return + } } - }(cancelCtx, afterCh) + }() return conn, nil } diff --git a/pkg/networkservice/common/refresh/client_test.go b/pkg/networkservice/common/refresh/client_test.go index b6d1efe37d..eadacb91da 100644 --- a/pkg/networkservice/common/refresh/client_test.go +++ b/pkg/networkservice/common/refresh/client_test.go @@ -302,7 +302,7 @@ func TestRefreshClient_NoRefreshOnFailure(t *testing.T) { require.Never(t, cloneClient.validator(2), testWait, testTick) } -func TestRefreshClient_NoRefreshOnRefreshFailure(t *testing.T) { +func TestRefreshClient_RefreshOnRefreshFailure(t *testing.T) { t.Cleanup(func() { goleak.VerifyNone(t) }) ctx, cancel := context.WithCancel(context.Background()) @@ -330,5 +330,5 @@ func TestRefreshClient_NoRefreshOnRefreshFailure(t *testing.T) { clockMock.Add(expireTimeout) - require.Never(t, cloneClient.validator(3), testWait, testTick) + require.Eventually(t, cloneClient.validator(3), testWait, testTick) }