Skip to content

Commit

Permalink
Fix client chain elements order
Browse files Browse the repository at this point in the history
Signed-off-by: Vladimir Popov <[email protected]>
  • Loading branch information
Vladimir Popov committed Aug 10, 2021
1 parent c315438 commit fa4a19e
Show file tree
Hide file tree
Showing 2 changed files with 183 additions and 17 deletions.
4 changes: 2 additions & 2 deletions pkg/networkservice/chains/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,8 @@ func NewClient(ctx context.Context, connectTo *url.URL, clientOpts ...Option) ne
return chain.NewNetworkServiceClient(
append(
opts.additionalFunctionality,
heal.NewClient(ctx, networkservice.NewMonitorConnectionClient(cc)),
opts.authorizeClient,
heal.NewClient(ctx, networkservice.NewMonitorConnectionClient(cc)),
networkservice.NewNetworkServiceClient(cc),
)...,
)
Expand Down Expand Up @@ -152,9 +152,9 @@ func NewClientFactory(clientOpts ...Option) connect.ClientFactory {
serialize.NewClient(),
refresh.NewClient(ctx),
metadata.NewClient(),
heal.NewClient(ctx, networkservice.NewMonitorConnectionClient(cc)),
}, opts.additionalFunctionality...),
opts.authorizeClient,
heal.NewClient(ctx, networkservice.NewMonitorConnectionClient(cc)),
networkservice.NewNetworkServiceClient(cc),
)...)
return rv
Expand Down
196 changes: 181 additions & 15 deletions pkg/networkservice/chains/client/client_heal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,33 +19,59 @@ package client_test
import (
"context"
"net/url"
"sync/atomic"
"testing"
"time"

"github.com/networkservicemesh/api/pkg/api/networkservice"
"github.com/pkg/errors"
"github.com/stretchr/testify/require"
"go.uber.org/goleak"

"github.com/networkservicemesh/api/pkg/api/networkservice"
"google.golang.org/grpc"
"google.golang.org/protobuf/types/known/emptypb"

"github.com/networkservicemesh/sdk/pkg/networkservice/chains/client"
"github.com/networkservicemesh/sdk/pkg/networkservice/chains/endpoint"
"github.com/networkservicemesh/sdk/pkg/networkservice/common/clienturl"
"github.com/networkservicemesh/sdk/pkg/networkservice/common/connect"
"github.com/networkservicemesh/sdk/pkg/networkservice/common/heal"
"github.com/networkservicemesh/sdk/pkg/networkservice/core/adapters"
"github.com/networkservicemesh/sdk/pkg/networkservice/core/next"
"github.com/networkservicemesh/sdk/pkg/networkservice/utils/count"
"github.com/networkservicemesh/sdk/pkg/tools/addressof"
"github.com/networkservicemesh/sdk/pkg/tools/sandbox"
)

func TestClientHeal(t *testing.T) {
func startServer(ctx context.Context, t *testing.T, serverURL *url.URL, opts ...endpoint.Option) networkservice.NetworkServiceServer {
nse := endpoint.NewServer(ctx, sandbox.GenerateTestToken, opts...)

select {
case err := <-endpoint.Serve(ctx, serverURL, nse):
require.NoError(t, err)
default:
}

return nse
}

func TestClient_Heal(t *testing.T) {
t.Cleanup(func() { goleak.VerifyNone(t) })

ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()

serverURL := &url.URL{Scheme: "tcp", Host: "127.0.0.1:0"}
serverCancel := startEmptyServer(ctx, t, serverURL)
serverCtx, serverCancel := context.WithCancel(ctx)
defer serverCancel()

serverURL := &url.URL{Scheme: "tcp", Host: "127.0.0.1:0"}
startServer(serverCtx, t, serverURL)

nsc := client.NewClient(ctx,
serverURL,
client.WithDialOptions(sandbox.DefaultDialOptions(sandbox.GenerateTestToken)...),
client.WithDialTimeout(time.Second),
)

_, err := nsc.Request(ctx, &networkservice.NetworkServiceRequest{})
require.NoError(t, err)

Expand All @@ -55,25 +81,165 @@ func TestClientHeal(t *testing.T) {
}, time.Second, time.Millisecond*10)
require.NoError(t, ctx.Err())

serverCancel = startEmptyServer(ctx, t, serverURL)
defer serverCancel()
startServer(ctx, t, serverURL)

require.Eventually(t, func() bool {
_, err = nsc.Request(ctx, &networkservice.NetworkServiceRequest{})
return err == nil
}, time.Second*2, time.Millisecond*50)
}

func startEmptyServer(ctx context.Context, t *testing.T, serverURL *url.URL) context.CancelFunc {
serverCtx, serverCancel := context.WithCancel(ctx)
func TestClient_StopHealingOnFailure(t *testing.T) {
var samples = []struct {
name string
optsSupplier func(counter networkservice.NetworkServiceClient) []client.Option
}{
{
name: "Authorize failure",
optsSupplier: func(counter networkservice.NetworkServiceClient) []client.Option {
return []client.Option{
client.WithAuthorizeClient(new(refreshFailureClient)),
client.WithAdditionalFunctionality(counter),
}
},
},
{
name: "Additional functionality failure",
optsSupplier: func(counter networkservice.NetworkServiceClient) []client.Option {
return []client.Option{
client.WithAdditionalFunctionality(
new(refreshFailureClient),
counter,
),
}
},
},
}

nse := endpoint.NewServer(serverCtx, sandbox.GenerateTestToken)
for _, sample := range samples {
// nolint:scopelint
t.Run(sample.name, func(t *testing.T) {
testStopHealingOnFailure(t, func(ctx context.Context, serverURL *url.URL, counter networkservice.NetworkServiceClient) networkservice.NetworkServiceClient {
return client.NewClient(ctx,
serverURL,
append([]client.Option{
client.WithDialOptions(sandbox.DefaultDialOptions(sandbox.GenerateTestToken)...),
client.WithDialTimeout(time.Second),
}, sample.optsSupplier(counter)...)...,
)
})
})
}
}

select {
case err := <-endpoint.Serve(serverCtx, serverURL, nse):
require.NoError(t, err)
default:
func TestClientFactory_StopHealingOnFailure(t *testing.T) {
var samples = []struct {
name string
opts []client.Option
}{
{
name: "Authorize failure",
opts: []client.Option{
client.WithAuthorizeClient(new(refreshFailureClient)),
},
},
{
name: "Additional functionality failure",
opts: []client.Option{
client.WithAdditionalFunctionality(new(refreshFailureClient)),
},
},
}

for _, sample := range samples {
// nolint:scopelint
t.Run(sample.name, func(t *testing.T) {
testStopHealingOnFailure(t, func(ctx context.Context, serverURL *url.URL, counter networkservice.NetworkServiceClient) networkservice.NetworkServiceClient {
clientServerURL := &url.URL{Scheme: "tcp", Host: "127.0.0.1:0"}

clientServer := new(struct {
networkservice.NetworkServiceServer
})
clientServer.NetworkServiceServer = startServer(ctx, t, clientServerURL,
endpoint.WithName("name"),
endpoint.WithAdditionalFunctionality(
heal.NewServer(ctx,
heal.WithOnHeal(addressof.NetworkServiceClient(adapters.NewServerToClient(clientServer)))),
adapters.NewClientToServer(counter),
clienturl.NewServer(serverURL),
connect.NewServer(ctx, client.NewClientFactory(
append([]client.Option{
client.WithName("name"),
}, sample.opts...)...),
connect.WithDialOptions(sandbox.DefaultDialOptions(sandbox.GenerateTestToken)...),
connect.WithDialTimeout(time.Second),
),
),
)

return client.NewClient(ctx,
clientServerURL,
client.WithDialOptions(sandbox.DefaultDialOptions(sandbox.GenerateTestToken)...),
client.WithDialTimeout(time.Second),
)
})
})
}
}

func testStopHealingOnFailure(
t *testing.T,
clientSupplier func(ctx context.Context, serverURL *url.URL, counter networkservice.NetworkServiceClient) networkservice.NetworkServiceClient,
) {
t.Cleanup(func() { goleak.VerifyNone(t) })

ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()

serverURL := &url.URL{Scheme: "tcp", Host: "127.0.0.1:0"}
startServer(ctx, t, serverURL)

counter := new(count.Client)
nsc := clientSupplier(ctx, serverURL, counter)

conn, err := nsc.Request(ctx, new(networkservice.NetworkServiceRequest))
require.NoError(t, err)

_, err = nsc.Request(ctx, &networkservice.NetworkServiceRequest{
Connection: conn.Clone(),
})
require.Errorf(t, err, "refresh error")

require.Never(t, func() bool {
// 1. Request
// 2. Failed refresh Request
// 3+. Heal Requests
return counter.Requests() > 2
}, time.Second*2, time.Millisecond*50)
}

type refreshFailureClient struct {
flag int32
}

func (c *refreshFailureClient) Request(ctx context.Context, request *networkservice.NetworkServiceRequest, opts ...grpc.CallOption) (*networkservice.Connection, error) {
conn, err := next.Client(ctx).Request(ctx, request, opts...)
if err != nil {
return nil, err
}

if err = ctx.Err(); err == nil && atomic.CompareAndSwapInt32(&c.flag, 0, 1) {
return conn, nil
}

_, _ = next.Client(ctx).Close(ctx, conn, opts...)

if err == nil {
err = errors.New("refresh error")
}
return nil, err
}

return serverCancel
func (c *refreshFailureClient) Close(ctx context.Context, conn *networkservice.Connection, opts ...grpc.CallOption) (*emptypb.Empty, error) {
return next.Client(ctx).Close(ctx, conn, opts...)
}

0 comments on commit fa4a19e

Please sign in to comment.