Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a Path to Registry Find Requests #1536

Draft
wants to merge 6 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions pkg/registry/common/grpcmetadata/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"encoding/json"

"github.com/networkservicemesh/api/pkg/api/registry"
"github.com/pkg/errors"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
Expand Down Expand Up @@ -92,3 +93,23 @@ func sendPath(ctx context.Context, path *Path) error {
header := metadata.Pairs(pathKey, string(bytes))
return grpc.SendHeader(ctx, header)
}

func nsFindServerSendPath(server registry.NetworkServiceRegistry_FindServer, path *Path) error {
bytes, err := json.Marshal(path)
if err != nil {
return errors.Wrap(err, "failed to convert a provided path into JSON")
}

header := metadata.Pairs(pathKey, string(bytes))
return server.SendHeader(header)
}

func nseFindServerSendPath(server registry.NetworkServiceEndpointRegistry_FindServer, path *Path) error {
bytes, err := json.Marshal(path)
if err != nil {
return errors.Wrap(err, "failed to convert a provided path into JSON")
}

header := metadata.Pairs(pathKey, string(bytes))
return server.SendHeader(header)
}
32 changes: 30 additions & 2 deletions pkg/registry/common/grpcmetadata/ns_client.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2022 Cisco and/or its affiliates.
// Copyright (c) 2022-2023 Cisco and/or its affiliates.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand All @@ -19,8 +19,10 @@ package grpcmetadata

import (
"context"
"fmt"

"github.com/golang/protobuf/ptypes/empty"
"github.com/pkg/errors"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"

Expand Down Expand Up @@ -64,7 +66,33 @@ func (c *grpcMetadataNSClient) Register(ctx context.Context, ns *registry.Networ
}

func (c *grpcMetadataNSClient) Find(ctx context.Context, query *registry.NetworkServiceQuery, opts ...grpc.CallOption) (registry.NetworkServiceRegistry_FindClient, error) {
return next.NetworkServiceRegistryClient(ctx).Find(ctx, query, opts...)
path := PathFromContext(ctx)
ctx, err := appendToMetadata(ctx, path)
if err != nil {
return nil, err
}

var header metadata.MD
opts = append(opts, grpc.Header(&header))
resp, err := next.NetworkServiceRegistryClient(ctx).Find(ctx, query, opts...)
if err != nil {
return nil, err
}

header, err = resp.Header()
if err != nil {
return nil, errors.Wrap(err, "couldn't get registry path from find server")
}

newpath, err := fromMD(header)
if err == nil {
path.Index = newpath.Index
path.PathSegments = newpath.PathSegments
}

fmt.Printf("newpath: %v\n", newpath)

return resp, nil
}

func (c *grpcMetadataNSClient) Unregister(ctx context.Context, ns *registry.NetworkService, opts ...grpc.CallOption) (*empty.Empty, error) {
Expand Down
18 changes: 16 additions & 2 deletions pkg/registry/common/grpcmetadata/ns_server.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2022 Cisco and/or its affiliates.
// Copyright (c) 2022-2023 Cisco and/or its affiliates.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand All @@ -24,6 +24,7 @@ import (
"github.com/networkservicemesh/api/pkg/api/registry"

"github.com/networkservicemesh/sdk/pkg/registry/core/next"
"github.com/networkservicemesh/sdk/pkg/registry/core/streamcontext"
"github.com/networkservicemesh/sdk/pkg/tools/log"
)

Expand Down Expand Up @@ -58,7 +59,20 @@ func (s *grpcMetadataNSServer) Register(ctx context.Context, ns *registry.Networ
}

func (s *grpcMetadataNSServer) Find(query *registry.NetworkServiceQuery, server registry.NetworkServiceRegistry_FindServer) error {
return next.NetworkServiceRegistryServer(server.Context()).Find(query, server)
ctx := server.Context()
path, err := fromContext(ctx)
if err != nil {
log.FromContext(ctx).Warnf("Unregister: failed to load grpc metadata from context: %v", err.Error())
return next.NetworkServiceRegistryServer(ctx).Find(query, server)
}

ctx = PathWithContext(ctx, path)
err = next.NetworkServiceRegistryServer(ctx).Find(query, streamcontext.NetworkServiceRegistryFindServer(ctx, server))
if err != nil {
return err
}

return nsFindServerSendPath(server, path)
}

func (s *grpcMetadataNSServer) Unregister(ctx context.Context, ns *registry.NetworkService) (*empty.Empty, error) {
Expand Down
37 changes: 30 additions & 7 deletions pkg/registry/common/grpcmetadata/ns_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2022 Cisco and/or its affiliates.
// Copyright (c) 2022-2023 Cisco and/or its affiliates.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -77,7 +77,10 @@ func (p *pathCheckerNSClient) Register(ctx context.Context, in *registry.Network
}

func (p *pathCheckerNSClient) Find(ctx context.Context, query *registry.NetworkServiceQuery, opts ...grpc.CallOption) (registry.NetworkServiceRegistry_FindClient, error) {
return next.NetworkServiceRegistryClient(ctx).Find(ctx, query, opts...)
pBefore := p.funcBefore(ctx)
c, e := next.NetworkServiceRegistryClient(ctx).Find(ctx, query, opts...)
p.funcAfter(ctx, pBefore)
return c, e
}

func (p *pathCheckerNSClient) Unregister(ctx context.Context, in *registry.NetworkService, opts ...grpc.CallOption) (*emptypb.Empty, error) {
Expand All @@ -87,10 +90,11 @@ func (p *pathCheckerNSClient) Unregister(ctx context.Context, in *registry.Netwo
return r, e
}

// nolint: funlen
func TestGRPCMetadataNetworkService(t *testing.T) {
t.Cleanup(func() { goleak.VerifyNone(t) })

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*2000)
defer cancel()

clockMock := clockmock.New(ctx)
Expand Down Expand Up @@ -164,23 +168,33 @@ func TestGRPCMetadataNetworkService(t *testing.T) {
ctx = grpcmetadata.PathWithContext(ctx, &path)

ns := &registry.NetworkService{Name: "ns"}
_, err = client.Register(ctx, ns)
ns, err = client.Register(ctx, ns)
require.NoError(t, err)

require.Equal(t, int(path.Index), 0)
require.Len(t, path.PathSegments, 3)
require.Len(t, ns.PathIds, 3)

// Simulate refresh
_, err = client.Register(ctx, ns)
require.NoError(t, err)

query := &registry.NetworkServiceQuery{NetworkService: ns}
path = grpcmetadata.Path{}
ctx = grpcmetadata.PathWithContext(ctx, &path)
_, err = client.Find(ctx, query)
require.NoError(t, err)
require.Equal(t, int(path.Index), 0)
require.Len(t, path.PathSegments, 3)
//require.Len(t, query.NetworkService.PathIds, 3)

_, err = client.Unregister(ctx, ns)
require.NoError(t, err)

serverGRPCServer.Stop()
proxyGRPCServer.Stop()
}

// nolint: funlen
func TestGRPCMetadataNetworkService_BackwardCompatibility(t *testing.T) {
t.Cleanup(func() { goleak.VerifyNone(t) })

Expand Down Expand Up @@ -255,16 +269,25 @@ func TestGRPCMetadataNetworkService_BackwardCompatibility(t *testing.T) {
ctx = grpcmetadata.PathWithContext(ctx, &path)

ns := &registry.NetworkService{Name: "ns"}
_, err = client.Register(ctx, ns)
ns, err = client.Register(ctx, ns)
require.NoError(t, err)

require.Equal(t, int(path.Index), 0)
require.Len(t, path.PathSegments, 2)
require.Len(t, ns.PathIds, 2)

// Simulate refresh
_, err = client.Register(ctx, ns)
require.NoError(t, err)

query := &registry.NetworkServiceQuery{NetworkService: ns}
path = grpcmetadata.Path{}
ctx = grpcmetadata.PathWithContext(ctx, &path)
_, err = client.Find(ctx, query)
require.NoError(t, err)
require.Equal(t, int(path.Index), 0)
require.Len(t, path.PathSegments, 2)
//require.Len(t, query.NetworkService.PathIds, 2)

_, err = client.Unregister(ctx, ns)
require.NoError(t, err)
}
29 changes: 27 additions & 2 deletions pkg/registry/common/grpcmetadata/nse_client.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2022 Cisco and/or its affiliates.
// Copyright (c) 2022-2023 Cisco and/or its affiliates.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand All @@ -20,6 +20,7 @@ import (
"context"

"github.com/golang/protobuf/ptypes/empty"
"github.com/pkg/errors"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"

Expand Down Expand Up @@ -62,7 +63,31 @@ func (c *grpcMetadataNSEClient) Register(ctx context.Context, nse *registry.Netw
}

func (c *grpcMetadataNSEClient) Find(ctx context.Context, query *registry.NetworkServiceEndpointQuery, opts ...grpc.CallOption) (registry.NetworkServiceEndpointRegistry_FindClient, error) {
return next.NetworkServiceEndpointRegistryClient(ctx).Find(ctx, query, opts...)
path := PathFromContext(ctx)
ctx, err := appendToMetadata(ctx, path)
if err != nil {
return nil, err
}

var header metadata.MD
opts = append(opts, grpc.Header(&header))
resp, err := next.NetworkServiceEndpointRegistryClient(ctx).Find(ctx, query, opts...)
if err != nil {
return nil, err
}

header, err = resp.Header()
if err != nil {
return nil, errors.Wrap(err, "couldn't get registry path from find server")
}

newpath, err := fromMD(header)
if err == nil {
path.Index = newpath.Index
path.PathSegments = newpath.PathSegments
}

return resp, nil
}

func (c *grpcMetadataNSEClient) Unregister(ctx context.Context, nse *registry.NetworkServiceEndpoint, opts ...grpc.CallOption) (*empty.Empty, error) {
Expand Down
20 changes: 18 additions & 2 deletions pkg/registry/common/grpcmetadata/nse_server.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2022 Cisco and/or its affiliates.
// Copyright (c) 2022-2023 Cisco and/or its affiliates.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand All @@ -24,6 +24,7 @@ import (
"github.com/networkservicemesh/api/pkg/api/registry"

"github.com/networkservicemesh/sdk/pkg/registry/core/next"
"github.com/networkservicemesh/sdk/pkg/registry/core/streamcontext"
"github.com/networkservicemesh/sdk/pkg/tools/log"
)

Expand Down Expand Up @@ -56,7 +57,22 @@ func (s *grpcMetadataNSEServer) Register(ctx context.Context, nse *registry.Netw
}

func (s *grpcMetadataNSEServer) Find(query *registry.NetworkServiceEndpointQuery, server registry.NetworkServiceEndpointRegistry_FindServer) error {
return next.NetworkServiceEndpointRegistryServer(server.Context()).Find(query, server)
ctx := server.Context()
path, err := fromContext(ctx)
if err != nil {
log.FromContext(ctx).Warnf("Unregister: failed to load grpc metadata from context: %v", err.Error())
return next.NetworkServiceEndpointRegistryServer(ctx).Find(query, server)
}

ctx = PathWithContext(ctx, path)
err = next.NetworkServiceEndpointRegistryServer(server.Context()).Find(
query, streamcontext.NetworkServiceEndpointRegistryFindServer(ctx, server))
if err != nil {
return err
}

err = nseFindServerSendPath(server, path)
return err
}

func (s *grpcMetadataNSEServer) Unregister(ctx context.Context, nse *registry.NetworkServiceEndpoint) (*empty.Empty, error) {
Expand Down
34 changes: 29 additions & 5 deletions pkg/registry/common/grpcmetadata/nse_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2022 Cisco and/or its affiliates.
// Copyright (c) 2022-2023 Cisco and/or its affiliates.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -72,7 +72,11 @@ func (p *pathCheckerNSEClient) Register(ctx context.Context, in *registry.Networ
}

func (p *pathCheckerNSEClient) Find(ctx context.Context, in *registry.NetworkServiceEndpointQuery, opts ...grpc.CallOption) (registry.NetworkServiceEndpointRegistry_FindClient, error) {
return next.NetworkServiceEndpointRegistryClient(ctx).Find(ctx, in, opts...)
pBefore := p.funcBefore(ctx)
c, e := next.NetworkServiceEndpointRegistryClient(ctx).Find(ctx, in, opts...)
p.funcAfter(ctx, pBefore)

return c, e
}

func (p *pathCheckerNSEClient) Unregister(ctx context.Context, in *registry.NetworkServiceEndpoint, opts ...grpc.CallOption) (*empty.Empty, error) {
Expand All @@ -82,6 +86,7 @@ func (p *pathCheckerNSEClient) Unregister(ctx context.Context, in *registry.Netw
return r, e
}

// nolint: funlen
// This test checks that registry Path is correctly updated and passed through grpc metadata
// Test scheme: client ---> proxyServer ---> server
func TestGRPCMetadataNetworkServiceEndpoint(t *testing.T) {
Expand Down Expand Up @@ -163,23 +168,33 @@ func TestGRPCMetadataNetworkServiceEndpoint(t *testing.T) {
ctx = grpcmetadata.PathWithContext(ctx, &path)

nse := &registry.NetworkServiceEndpoint{Name: "nse"}
_, err = client.Register(ctx, nse)
nse, err = client.Register(ctx, nse)
require.NoError(t, err)

require.Equal(t, int(path.Index), 0)
require.Len(t, path.PathSegments, 3)
require.Len(t, nse.PathIds, 3)

// Simulate refresh
_, err = client.Register(ctx, nse)
require.NoError(t, err)

query := &registry.NetworkServiceEndpointQuery{NetworkServiceEndpoint: nse}
path = grpcmetadata.Path{}
ctx = grpcmetadata.PathWithContext(ctx, &path)
_, err = client.Find(ctx, query)
require.NoError(t, err)
require.Equal(t, int(path.Index), 0)
require.Len(t, path.PathSegments, 3)
//require.Len(t, query.NetworkService.PathIds, 3)

_, err = client.Unregister(ctx, nse)
require.NoError(t, err)

serverGRPCServer.Stop()
proxyGRPCServer.Stop()
}

// nolint: funlen
func TestGRPCMetadataNetworkServiceEndpoint_BackwardCompatibility(t *testing.T) {
t.Cleanup(func() { goleak.VerifyNone(t) })

Expand Down Expand Up @@ -255,11 +270,20 @@ func TestGRPCMetadataNetworkServiceEndpoint_BackwardCompatibility(t *testing.T)
ctx = grpcmetadata.PathWithContext(ctx, &path)

nse := &registry.NetworkServiceEndpoint{Name: "ns"}
_, err = client.Register(ctx, nse)
nse, err = client.Register(ctx, nse)
require.NoError(t, err)
require.Equal(t, int(path.Index), 0)
require.Len(t, path.PathSegments, 2)
require.Len(t, nse.PathIds, 2)

query := &registry.NetworkServiceEndpointQuery{NetworkServiceEndpoint: nse}
path = grpcmetadata.Path{}
ctx = grpcmetadata.PathWithContext(ctx, &path)
_, err = client.Find(ctx, query)
require.NoError(t, err)
require.Equal(t, int(path.Index), 0)
require.Len(t, path.PathSegments, 2)
//require.Len(t, query.NetworkService.PathIds, 3)

// Simulate refresh
_, err = client.Register(ctx, nse)
Expand Down
Loading
Loading