Skip to content

Commit

Permalink
Move counter chain element to networkservice/utils
Browse files Browse the repository at this point in the history
Signed-off-by: Vladimir Popov <[email protected]>
  • Loading branch information
Vladimir Popov committed Jul 26, 2021
1 parent 92b2824 commit f67a381
Show file tree
Hide file tree
Showing 12 changed files with 245 additions and 145 deletions.
9 changes: 5 additions & 4 deletions pkg/networkservice/chains/nsmgr/heal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (

"github.com/networkservicemesh/api/pkg/api/registry"

"github.com/networkservicemesh/sdk/pkg/networkservice/utils/count"
"github.com/networkservicemesh/sdk/pkg/tools/sandbox"
)

Expand Down Expand Up @@ -76,7 +77,7 @@ func testNSMGRHealEndpoint(t *testing.T, nodeNum int) {

nseReg := defaultRegistryEndpoint(nsReg.Name)

counter := &counterServer{}
counter := new(count.Server)
nse := domain.Nodes[nodeNum].NewEndpoint(ctx, nseReg, sandbox.GenerateTestToken, counter)

request := defaultRequest(nsReg.Name)
Expand Down Expand Up @@ -150,7 +151,7 @@ func testNSMGRHealForwarder(t *testing.T, nodeNum int) {
nsReg, err := nsRegistryClient.Register(ctx, defaultRegistryService())
require.NoError(t, err)

counter := &counterServer{}
counter := new(count.Server)
domain.Nodes[1].NewEndpoint(ctx, defaultRegistryEndpoint(nsReg.Name), sandbox.GenerateTestToken, counter)

request := defaultRequest(nsReg.Name)
Expand Down Expand Up @@ -232,7 +233,7 @@ func testNSMGRHealNSMgr(t *testing.T, nodeNum int, restored bool) {

nseReg := defaultRegistryEndpoint(nsReg.Name)

counter := &counterServer{}
counter := new(count.Server)
domain.Nodes[1].NewEndpoint(ctx, nseReg, sandbox.GenerateTestToken, counter)

request := defaultRequest(nsReg.Name)
Expand Down Expand Up @@ -302,7 +303,7 @@ func TestNSMGR_HealRegistry(t *testing.T) {

nseReg := defaultRegistryEndpoint(nsReg.Name)

counter := &counterServer{}
counter := new(count.Server)
domain.Nodes[0].NewEndpoint(ctx, nseReg, sandbox.GenerateTestToken, counter)

request := defaultRequest(nsReg.Name)
Expand Down
3 changes: 2 additions & 1 deletion pkg/networkservice/chains/nsmgr/scale_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/networkservicemesh/api/pkg/api/registry"

"github.com/networkservicemesh/sdk/pkg/networkservice/core/next"
"github.com/networkservicemesh/sdk/pkg/networkservice/utils/count"
"github.com/networkservicemesh/sdk/pkg/tools/sandbox"
)

Expand Down Expand Up @@ -72,7 +73,7 @@ func TestCreateEndpointDuringRequest(t *testing.T) {
}

flag := atomic.Bool{}
requestCounter := &counterServer{}
requestCounter := new(count.Server)

makerServer := &nseMaker{
ctx: ctx,
Expand Down
3 changes: 2 additions & 1 deletion pkg/networkservice/chains/nsmgr/single_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (

"github.com/networkservicemesh/sdk/pkg/networkservice/chains/nsmgr"
"github.com/networkservicemesh/sdk/pkg/networkservice/connectioncontext/dnscontext"
"github.com/networkservicemesh/sdk/pkg/networkservice/utils/count"
"github.com/networkservicemesh/sdk/pkg/tools/clientinfo"
"github.com/networkservicemesh/sdk/pkg/tools/sandbox"
)
Expand Down Expand Up @@ -216,7 +217,7 @@ func Test_Local_NoURLUsecase(t *testing.T) {

nseReg := defaultRegistryEndpoint(nsReg.Name)
request := defaultRequest(nsReg.Name)
counter := &counterServer{}
counter := new(count.Server)

domain.Nodes[0].NewEndpoint(ctx, nseReg, sandbox.GenerateTestToken, counter)

Expand Down
25 changes: 13 additions & 12 deletions pkg/networkservice/chains/nsmgr/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
"github.com/networkservicemesh/sdk/pkg/networkservice/common/mechanismtranslation"
"github.com/networkservicemesh/sdk/pkg/networkservice/common/replacelabels"
"github.com/networkservicemesh/sdk/pkg/networkservice/core/chain"
"github.com/networkservicemesh/sdk/pkg/networkservice/utils/count"
"github.com/networkservicemesh/sdk/pkg/networkservice/utils/inject/injecterror"
registryclient "github.com/networkservicemesh/sdk/pkg/registry/chains/client"
"github.com/networkservicemesh/sdk/pkg/tools/sandbox"
Expand Down Expand Up @@ -96,7 +97,7 @@ func (s *nsmgrSuite) Test_Remote_ParallelUsecase() {
require.NoError(t, err)

nseReg := defaultRegistryEndpoint(nsReg.Name)
counter := &counterServer{}
counter := new(count.Server)

var unregisterWG sync.WaitGroup
var nse *sandbox.EndpointEntry
Expand Down Expand Up @@ -202,7 +203,7 @@ func (s *nsmgrSuite) Test_Remote_BusyEndpointsUsecase() {
nsReg, err := s.nsRegistryClient.Register(ctx, defaultRegistryService())
require.NoError(t, err)

counter := &counterServer{}
counter := new(count.Server)

var wg sync.WaitGroup
var nseRegs [4]*registry.NetworkServiceEndpoint
Expand Down Expand Up @@ -273,7 +274,7 @@ func (s *nsmgrSuite) Test_RemoteUsecase() {
require.NoError(t, err)

nseReg := defaultRegistryEndpoint(nsReg.Name)
counter := &counterServer{}
counter := new(count.Server)

nse := s.domain.Nodes[0].NewEndpoint(ctx, nseReg, sandbox.GenerateTestToken, counter)

Expand Down Expand Up @@ -317,7 +318,7 @@ func (s *nsmgrSuite) Test_ConnectToDeadNSEUsecase() {
require.NoError(t, err)

nseReg := defaultRegistryEndpoint(nsReg.Name)
counter := &counterServer{}
counter := new(count.Server)

nse := s.domain.Nodes[0].NewEndpoint(ctx, nseReg, sandbox.GenerateTestToken, counter)

Expand Down Expand Up @@ -362,7 +363,7 @@ func (s *nsmgrSuite) Test_LocalUsecase() {
require.NoError(t, err)

nseReg := defaultRegistryEndpoint(nsReg.Name)
counter := &counterServer{}
counter := new(count.Server)

nse := s.domain.Nodes[0].NewEndpoint(ctx, nseReg, sandbox.GenerateTestToken, counter)

Expand Down Expand Up @@ -402,7 +403,7 @@ func (s *nsmgrSuite) Test_PassThroughRemoteUsecase() {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()

counterClose := &counterServer{}
counterClose := new(count.Server)

nsReg := linearNS(nodesCount)
nsReg, err := s.nsRegistryClient.Register(ctx, nsReg)
Expand Down Expand Up @@ -463,7 +464,7 @@ func (s *nsmgrSuite) Test_PassThroughLocalUsecase() {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()

counterClose := &counterServer{}
counterClose := new(count.Server)

nsReg, err := s.nsRegistryClient.Register(ctx, linearNS(nsesCount))
require.NoError(t, err)
Expand Down Expand Up @@ -522,7 +523,7 @@ func (s *nsmgrSuite) Test_PassThroughSameSourceSelector() {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()

counterClose := &counterServer{}
counterClose := new(count.Server)

ns := linearNS(nsesCount)
ns.Matches[len(ns.Matches)-1].Fallthrough = true
Expand Down Expand Up @@ -602,7 +603,7 @@ func (s *nsmgrSuite) Test_PassThroughLocalUsecaseMultiLabel() {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()

counterClose := &counterServer{}
counterClose := new(count.Server)

nsReg, err := s.nsRegistryClient.Register(ctx, multiLabelNS())
require.NoError(t, err)
Expand Down Expand Up @@ -686,10 +687,10 @@ const (
labelB = "label_b"
)

func linearNS(count int) *registry.NetworkService {
func linearNS(hopsCount int) *registry.NetworkService {
matches := make([]*registry.Match, 0)

for i := 1; i < count; i++ {
for i := 1; i < hopsCount; i++ {
match := &registry.Match{
SourceSelector: map[string]string{
step: fmt.Sprintf("%v", i-1),
Expand All @@ -706,7 +707,7 @@ func linearNS(count int) *registry.NetworkService {
matches = append(matches, match)
}

if count > 1 {
if hopsCount > 1 {
// match with empty source selector must be the last
match := &registry.Match{
Routes: []*registry.Destination{
Expand Down
60 changes: 2 additions & 58 deletions pkg/networkservice/chains/nsmgr/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,16 @@ package nsmgr_test

import (
"context"
"sync"
"sync/atomic"
"testing"

"github.com/golang/protobuf/ptypes/empty"
"github.com/google/uuid"
"github.com/stretchr/testify/require"

"github.com/networkservicemesh/api/pkg/api/networkservice"
"github.com/networkservicemesh/api/pkg/api/networkservice/mechanisms/cls"
kernelmech "github.com/networkservicemesh/api/pkg/api/networkservice/mechanisms/kernel"
"github.com/networkservicemesh/api/pkg/api/registry"
"github.com/stretchr/testify/require"

"github.com/networkservicemesh/sdk/pkg/networkservice/core/next"
"github.com/networkservicemesh/sdk/pkg/tools/sandbox"
)

Expand Down Expand Up @@ -84,56 +81,3 @@ func testNSEAndClient(
_, err = nse.Unregister(ctx, nseReg)
require.NoError(t, err)
}

type counterServer struct {
Requests, Closes int32
requests map[string]int32
closes map[string]int32
mu sync.Mutex
}

func (c *counterServer) Request(ctx context.Context, request *networkservice.NetworkServiceRequest) (*networkservice.Connection, error) {
c.mu.Lock()
defer c.mu.Unlock()

atomic.AddInt32(&c.Requests, 1)
if c.requests == nil {
c.requests = make(map[string]int32)
}
c.requests[request.GetConnection().GetId()]++

return next.Server(ctx).Request(ctx, request)
}

func (c *counterServer) Close(ctx context.Context, connection *networkservice.Connection) (*empty.Empty, error) {
c.mu.Lock()
defer c.mu.Unlock()

atomic.AddInt32(&c.Closes, 1)
if c.closes == nil {
c.closes = make(map[string]int32)
}
c.closes[connection.GetId()]++

return next.Server(ctx).Close(ctx, connection)
}

func (c *counterServer) UniqueRequests() int {
c.mu.Lock()
defer c.mu.Unlock()

if c.requests == nil {
return 0
}
return len(c.requests)
}

func (c *counterServer) UniqueCloses() int {
c.mu.Lock()
defer c.mu.Unlock()

if c.closes == nil {
return 0
}
return len(c.closes)
}
44 changes: 20 additions & 24 deletions pkg/networkservice/common/connect/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"github.com/networkservicemesh/sdk/pkg/networkservice/common/null"
"github.com/networkservicemesh/sdk/pkg/networkservice/core/adapters"
"github.com/networkservicemesh/sdk/pkg/networkservice/core/next"
"github.com/networkservicemesh/sdk/pkg/networkservice/utils/count"
"github.com/networkservicemesh/sdk/pkg/networkservice/utils/inject/injecterror"
"github.com/networkservicemesh/sdk/pkg/tools/clienturlctx"
)
Expand Down Expand Up @@ -185,14 +186,14 @@ func TestConnectServer_RequestParallel(t *testing.T) {

// 1. Create connectServer

serverNext := new(countServer)
serverClient := new(countServer)
serverNext := new(count.Server)
serverClient := new(count.Client)

s := next.NewNetworkServiceServer(
connect.NewServer(context.Background(),
func(ctx context.Context, cc grpc.ClientConnInterface) networkservice.NetworkServiceClient {
return next.NewNetworkServiceClient(
adapters.NewServerToClient(serverClient),
serverClient,
newMonitorClient(ctx, cc),
networkservice.NewNetworkServiceClient(cc),
)
Expand All @@ -209,7 +210,7 @@ func TestConnectServer_RequestParallel(t *testing.T) {
defer cancel()

urlA := &url.URL{Scheme: "tcp", Host: "127.0.0.1:"}
serverA := new(countServer)
serverA := new(count.Server)

err := startServer(ctx, urlA, serverA)
require.NoError(t, err)
Expand All @@ -221,6 +222,8 @@ func TestConnectServer_RequestParallel(t *testing.T) {
wg := new(sync.WaitGroup)
wg.Add(parallelCount)

// IMPORTANT: please don't use any `require` statements from here to `barrier.Done()`. It would lead to strange
// errors if one of them fails. Please use `assert` instead of that.
barrier := new(sync.WaitGroup)
barrier.Add(1)

Expand Down Expand Up @@ -262,16 +265,23 @@ func TestConnectServer_RequestParallel(t *testing.T) {
wg.Wait()
wg.Add(parallelCount)

assert.Equal(t, int32(parallelCount), serverClient.count)
assert.Equal(t, int32(parallelCount), serverA.count)
assert.Equal(t, int32(parallelCount), serverNext.count)
assert.Equal(t, int32(parallelCount), atomic.LoadInt32(&serverClient.Requests))
assert.Equal(t, int32(0), atomic.LoadInt32(&serverClient.Closes))
assert.Equal(t, int32(parallelCount), atomic.LoadInt32(&serverA.Requests))
assert.Equal(t, int32(0), atomic.LoadInt32(&serverA.Closes))
assert.Equal(t, int32(parallelCount), atomic.LoadInt32(&serverNext.Requests))
assert.Equal(t, int32(0), atomic.LoadInt32(&serverNext.Closes))

// IMPORTANT: now feel free to use `require` statements.
barrier.Done()
wg.Wait()

require.Equal(t, int32(parallelCount), serverClient.count)
require.Equal(t, int32(parallelCount), serverA.count)
require.Equal(t, int32(parallelCount), serverNext.count)
require.Equal(t, int32(2*parallelCount), atomic.LoadInt32(&serverClient.Requests))
require.Equal(t, int32(parallelCount), atomic.LoadInt32(&serverClient.Closes))
require.Equal(t, int32(2*parallelCount), atomic.LoadInt32(&serverA.Requests))
require.Equal(t, int32(parallelCount), atomic.LoadInt32(&serverA.Closes))
require.Equal(t, int32(2*parallelCount), atomic.LoadInt32(&serverNext.Requests))
require.Equal(t, int32(parallelCount), atomic.LoadInt32(&serverNext.Closes))
}

func TestConnectServer_RequestFail(t *testing.T) {
Expand Down Expand Up @@ -549,17 +559,3 @@ func (s *captureServer) Close(ctx context.Context, conn *networkservice.Connecti
s.capturedRequest = nil
return next.Server(ctx).Close(ctx, conn)
}

type countServer struct {
count int32
}

func (s *countServer) Request(ctx context.Context, request *networkservice.NetworkServiceRequest) (*networkservice.Connection, error) {
atomic.AddInt32(&s.count, 1)
return next.Server(ctx).Request(ctx, request)
}

func (s *countServer) Close(ctx context.Context, conn *networkservice.Connection) (*empty.Empty, error) {
atomic.AddInt32(&s.count, -1)
return next.Server(ctx).Close(ctx, conn)
}
Loading

0 comments on commit f67a381

Please sign in to comment.