Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
136 changes: 73 additions & 63 deletions agent/consul/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import (
"github.com/hashicorp/consul/agent/grpc-external/services/connectca"
"github.com/hashicorp/consul/agent/grpc-external/services/dataplane"
"github.com/hashicorp/consul/agent/grpc-external/services/peerstream"
"github.com/hashicorp/consul/agent/grpc-external/services/resource"
"github.com/hashicorp/consul/agent/grpc-external/services/serverdiscovery"
agentgrpc "github.com/hashicorp/consul/agent/grpc-internal"
"github.com/hashicorp/consul/agent/grpc-internal/services/subscribe"
Expand Down Expand Up @@ -728,69 +729,8 @@ func NewServer(config *Config, flat Deps, externalGRPCServer *grpc.Server, incom
s.overviewManager = NewOverviewManager(s.logger, s.fsm, s.config.MetricsReportingInterval)
go s.overviewManager.Run(&lib.StopChannelContext{StopCh: s.shutdownCh})

// Initialize external gRPC server - register services on external gRPC server.
s.externalACLServer = aclgrpc.NewServer(aclgrpc.Config{
ACLsEnabled: s.config.ACLsEnabled,
ForwardRPC: func(info structs.RPCInfo, fn func(*grpc.ClientConn) error) (bool, error) {
return s.ForwardGRPC(s.grpcConnPool, info, fn)
},
InPrimaryDatacenter: s.InPrimaryDatacenter(),
LoadAuthMethod: func(methodName string, entMeta *acl.EnterpriseMeta) (*structs.ACLAuthMethod, aclgrpc.Validator, error) {
return s.loadAuthMethod(methodName, entMeta)
},
LocalTokensEnabled: s.LocalTokensEnabled,
Logger: logger.Named("grpc-api.acl"),
NewLogin: func() aclgrpc.Login { return s.aclLogin() },
NewTokenWriter: func() aclgrpc.TokenWriter { return s.aclTokenWriter() },
PrimaryDatacenter: s.config.PrimaryDatacenter,
ValidateEnterpriseRequest: s.validateEnterpriseRequest,
})
s.externalACLServer.Register(s.externalGRPCServer)

s.externalConnectCAServer = connectca.NewServer(connectca.Config{
Publisher: s.publisher,
GetStore: func() connectca.StateStore { return s.FSM().State() },
Logger: logger.Named("grpc-api.connect-ca"),
ACLResolver: s.ACLResolver,
CAManager: s.caManager,
ForwardRPC: func(info structs.RPCInfo, fn func(*grpc.ClientConn) error) (bool, error) {
return s.ForwardGRPC(s.grpcConnPool, info, fn)
},
ConnectEnabled: s.config.ConnectEnabled,
})
s.externalConnectCAServer.Register(s.externalGRPCServer)

dataplane.NewServer(dataplane.Config{
GetStore: func() dataplane.StateStore { return s.FSM().State() },
Logger: logger.Named("grpc-api.dataplane"),
ACLResolver: s.ACLResolver,
Datacenter: s.config.Datacenter,
}).Register(s.externalGRPCServer)

serverdiscovery.NewServer(serverdiscovery.Config{
Publisher: s.publisher,
ACLResolver: s.ACLResolver,
Logger: logger.Named("grpc-api.server-discovery"),
}).Register(s.externalGRPCServer)

s.peeringBackend = NewPeeringBackend(s)
s.operatorBackend = NewOperatorBackend(s)
s.peerStreamServer = peerstream.NewServer(peerstream.Config{
Backend: s.peeringBackend,
GetStore: func() peerstream.StateStore { return s.FSM().State() },
Logger: logger.Named("grpc-api.peerstream"),
ACLResolver: s.ACLResolver,
Datacenter: s.config.Datacenter,
ConnectEnabled: s.config.ConnectEnabled,
ForwardRPC: func(info structs.RPCInfo, fn func(*grpc.ClientConn) error) (bool, error) {
// Only forward the request if the dc in the request matches the server's datacenter.
if info.RequestDatacenter() != "" && info.RequestDatacenter() != config.Datacenter {
return false, fmt.Errorf("requests to generate peering tokens cannot be forwarded to remote datacenters")
}
return s.ForwardGRPC(s.grpcConnPool, info, fn)
},
})
s.peerStreamServer.Register(s.externalGRPCServer)
// Initialize external gRPC server
s.setupExternalGRPC(config, logger)

// Initialize internal gRPC server.
//
Expand Down Expand Up @@ -1220,6 +1160,76 @@ func (s *Server) setupRPC() error {
return nil
}

// Initialize and register services on external gRPC server.
func (s *Server) setupExternalGRPC(config *Config, logger hclog.Logger) {

s.externalACLServer = aclgrpc.NewServer(aclgrpc.Config{
ACLsEnabled: s.config.ACLsEnabled,
ForwardRPC: func(info structs.RPCInfo, fn func(*grpc.ClientConn) error) (bool, error) {
return s.ForwardGRPC(s.grpcConnPool, info, fn)
},
InPrimaryDatacenter: s.InPrimaryDatacenter(),
LoadAuthMethod: func(methodName string, entMeta *acl.EnterpriseMeta) (*structs.ACLAuthMethod, aclgrpc.Validator, error) {
return s.loadAuthMethod(methodName, entMeta)
},
LocalTokensEnabled: s.LocalTokensEnabled,
Logger: logger.Named("grpc-api.acl"),
NewLogin: func() aclgrpc.Login { return s.aclLogin() },
NewTokenWriter: func() aclgrpc.TokenWriter { return s.aclTokenWriter() },
PrimaryDatacenter: s.config.PrimaryDatacenter,
ValidateEnterpriseRequest: s.validateEnterpriseRequest,
})
s.externalACLServer.Register(s.externalGRPCServer)

s.externalConnectCAServer = connectca.NewServer(connectca.Config{
Publisher: s.publisher,
GetStore: func() connectca.StateStore { return s.FSM().State() },
Logger: logger.Named("grpc-api.connect-ca"),
ACLResolver: s.ACLResolver,
CAManager: s.caManager,
ForwardRPC: func(info structs.RPCInfo, fn func(*grpc.ClientConn) error) (bool, error) {
return s.ForwardGRPC(s.grpcConnPool, info, fn)
},
ConnectEnabled: s.config.ConnectEnabled,
})
s.externalConnectCAServer.Register(s.externalGRPCServer)

dataplane.NewServer(dataplane.Config{
GetStore: func() dataplane.StateStore { return s.FSM().State() },
Logger: logger.Named("grpc-api.dataplane"),
ACLResolver: s.ACLResolver,
Datacenter: s.config.Datacenter,
}).Register(s.externalGRPCServer)

serverdiscovery.NewServer(serverdiscovery.Config{
Publisher: s.publisher,
ACLResolver: s.ACLResolver,
Logger: logger.Named("grpc-api.server-discovery"),
}).Register(s.externalGRPCServer)

s.peeringBackend = NewPeeringBackend(s)
s.operatorBackend = NewOperatorBackend(s)

s.peerStreamServer = peerstream.NewServer(peerstream.Config{
Backend: s.peeringBackend,
GetStore: func() peerstream.StateStore { return s.FSM().State() },
Logger: logger.Named("grpc-api.peerstream"),
ACLResolver: s.ACLResolver,
Datacenter: s.config.Datacenter,
ConnectEnabled: s.config.ConnectEnabled,
ForwardRPC: func(info structs.RPCInfo, fn func(*grpc.ClientConn) error) (bool, error) {
// Only forward the request if the dc in the request matches the server's datacenter.
if info.RequestDatacenter() != "" && info.RequestDatacenter() != config.Datacenter {
return false, fmt.Errorf("requests to generate peering tokens cannot be forwarded to remote datacenters")
}
return s.ForwardGRPC(s.grpcConnPool, info, fn)
},
})
s.peerStreamServer.Register(s.externalGRPCServer)

resource.NewServer(resource.Config{}).Register(s.externalGRPCServer)
Copy link
Contributor Author

@analogue analogue Mar 3, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

L1229 is the only new addition here outside of extracting code to setupExternalGRPC(...)

}

// Shutdown is used to shutdown the server
func (s *Server) Shutdown() error {
s.logger.Info("shutting down server")
Expand Down
56 changes: 56 additions & 0 deletions agent/grpc-external/services/resource/server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package resource

import (
"context"

"google.golang.org/grpc"

"github.com/hashicorp/consul/proto-public/pbresource"
)

type Server struct {
Config
}

type Config struct {
}

func NewServer(cfg Config) *Server {
return &Server{cfg}
}

var _ pbresource.ResourceServiceServer = (*Server)(nil)

func (s *Server) Register(grpcServer *grpc.Server) {
pbresource.RegisterResourceServiceServer(grpcServer, s)
}

func (s *Server) Read(ctx context.Context, req *pbresource.ReadRequest) (*pbresource.ReadResponse, error) {
// TODO
return &pbresource.ReadResponse{}, nil
}

func (s *Server) Write(ctx context.Context, req *pbresource.WriteRequest) (*pbresource.WriteResponse, error) {
// TODO
return &pbresource.WriteResponse{}, nil
}

func (s *Server) WriteStatus(ctx context.Context, req *pbresource.WriteStatusRequest) (*pbresource.WriteStatusResponse, error) {
// TODO
return &pbresource.WriteStatusResponse{}, nil
}

func (s *Server) List(ctx context.Context, req *pbresource.ListRequest) (*pbresource.ListResponse, error) {
// TODO
return &pbresource.ListResponse{}, nil
}

func (s *Server) Delete(ctx context.Context, req *pbresource.DeleteRequest) (*pbresource.DeleteResponse, error) {
// TODO
return &pbresource.DeleteResponse{}, nil
}

func (s *Server) Watch(req *pbresource.WatchRequest, ws pbresource.ResourceService_WatchServer) error {
// TODO
return nil
}
75 changes: 75 additions & 0 deletions agent/grpc-external/services/resource/server_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package resource

import (
"context"
"testing"

"github.com/stretchr/testify/require"
"google.golang.org/grpc"

"github.com/hashicorp/consul/agent/grpc-external/testutils"
"github.com/hashicorp/consul/proto-public/pbresource"
)

func testClient(t *testing.T, server *Server) pbresource.ResourceServiceClient {
t.Helper()

addr := testutils.RunTestServer(t, server)

//nolint:staticcheck
conn, err := grpc.DialContext(context.Background(), addr.String(), grpc.WithInsecure())
require.NoError(t, err)
t.Cleanup(func() {
require.NoError(t, conn.Close())
})

return pbresource.NewResourceServiceClient(conn)
}

func TestRead_TODO(t *testing.T) {
server := NewServer(Config{})
client := testClient(t, server)
resp, err := client.Read(context.Background(), &pbresource.ReadRequest{})
require.NoError(t, err)
require.NotNil(t, resp)
}

func TestWrite_TODO(t *testing.T) {
server := NewServer(Config{})
client := testClient(t, server)
resp, err := client.Write(context.Background(), &pbresource.WriteRequest{})
require.NoError(t, err)
require.NotNil(t, resp)
}

func TestWriteStatus_TODO(t *testing.T) {
server := NewServer(Config{})
client := testClient(t, server)
resp, err := client.WriteStatus(context.Background(), &pbresource.WriteStatusRequest{})
require.NoError(t, err)
require.NotNil(t, resp)
}

func TestList_TODO(t *testing.T) {
server := NewServer(Config{})
client := testClient(t, server)
resp, err := client.List(context.Background(), &pbresource.ListRequest{})
require.NoError(t, err)
require.NotNil(t, resp)
}

func TestDelete_TODO(t *testing.T) {
server := NewServer(Config{})
client := testClient(t, server)
resp, err := client.Delete(context.Background(), &pbresource.DeleteRequest{})
require.NoError(t, err)
require.NotNil(t, resp)
}

func TestWatch_TODO(t *testing.T) {
server := NewServer(Config{})
client := testClient(t, server)
wc, err := client.Watch(context.Background(), &pbresource.WatchRequest{})
require.NoError(t, err)
require.NotNil(t, wc)
}
6 changes: 6 additions & 0 deletions agent/grpc-middleware/rate_limit_mappings.gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,12 @@ var rpcRateLimitSpecs = map[string]rate.OperationType{
"/hashicorp.consul.internal.peering.PeeringService/TrustBundleRead": rate.OperationTypeRead,
"/hashicorp.consul.internal.peerstream.PeerStreamService/ExchangeSecret": rate.OperationTypeWrite,
"/hashicorp.consul.internal.peerstream.PeerStreamService/StreamResources": rate.OperationTypeRead,
"/hashicorp.consul.resource.ResourceService/Delete": rate.OperationTypeWrite,
"/hashicorp.consul.resource.ResourceService/List": rate.OperationTypeRead,
"/hashicorp.consul.resource.ResourceService/Read": rate.OperationTypeRead,
"/hashicorp.consul.resource.ResourceService/Watch": rate.OperationTypeRead,
"/hashicorp.consul.resource.ResourceService/Write": rate.OperationTypeWrite,
"/hashicorp.consul.resource.ResourceService/WriteStatus": rate.OperationTypeWrite,
"/hashicorp.consul.serverdiscovery.ServerDiscoveryService/WatchServers": rate.OperationTypeRead,
"/subscribe.StateChangeSubscription/Subscribe": rate.OperationTypeRead,
}
Loading