Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -1342,6 +1342,7 @@ func newConsulConfig(runtimeCfg *config.RuntimeConfig, logger hclog.Logger) (*co
cfg.SerfLANConfig = consul.CloneSerfLANConfig(cfg.SerfLANConfig)

cfg.PeeringEnabled = runtimeCfg.PeeringEnabled
cfg.PeeringTestAllowPeerRegistrations = runtimeCfg.PeeringTestAllowPeerRegistrations

enterpriseConsulConfig(cfg, runtimeCfg)
return cfg, nil
Expand Down
54 changes: 54 additions & 0 deletions agent/catalog_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,60 @@ import (
"github.com/hashicorp/consul/testrpc"
)

func TestCatalogRegister_PeeringRegistration(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}

t.Parallel()

t.Run("deny peer registrations by default", func(t *testing.T) {
a := NewTestAgent(t, "")
defer a.Shutdown()

// Register request with peer
args := &structs.RegisterRequest{Node: "foo", PeerName: "foo", Address: "127.0.0.1"}
req, _ := http.NewRequest("PUT", "/v1/catalog/register", jsonReader(args))

obj, err := a.srv.CatalogRegister(nil, req)
require.Error(t, err)
require.Contains(t, err.Error(), "cannot register requests with PeerName in them")
require.Nil(t, obj)
})

t.Run("cannot hcl set the peer registrations config", func(t *testing.T) {
// this will have no effect, as the value is overriden in non user source
a := NewTestAgent(t, "peering = { test_allow_peer_registrations = true }")
defer a.Shutdown()

// Register request with peer
args := &structs.RegisterRequest{Node: "foo", PeerName: "foo", Address: "127.0.0.1"}
req, _ := http.NewRequest("PUT", "/v1/catalog/register", jsonReader(args))

obj, err := a.srv.CatalogRegister(nil, req)
require.Error(t, err)
require.Contains(t, err.Error(), "cannot register requests with PeerName in them")
require.Nil(t, obj)
})

t.Run("allow peer registrations with test overrides", func(t *testing.T) {
// the only way to set the config in the agent is via the overrides
a := StartTestAgent(t, TestAgent{HCL: ``, Overrides: `peering = { test_allow_peer_registrations = true }`})
defer a.Shutdown()

// Register request with peer
args := &structs.RegisterRequest{Node: "foo", PeerName: "foo", Address: "127.0.0.1"}
req, _ := http.NewRequest("PUT", "/v1/catalog/register", jsonReader(args))

obj, err := a.srv.CatalogRegister(nil, req)
require.NoError(t, err)
applied, ok := obj.(bool)
require.True(t, ok)
require.True(t, applied)
})

}

func TestCatalogRegister_Service_InvalidAddress(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
Expand Down
121 changes: 61 additions & 60 deletions agent/config/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1009,66 +1009,67 @@ func (b *builder) build() (rt RuntimeConfig, err error) {
LogRotateBytes: intVal(c.LogRotateBytes),
LogRotateMaxFiles: intVal(c.LogRotateMaxFiles),
},
MaxQueryTime: b.durationVal("max_query_time", c.MaxQueryTime),
NodeID: types.NodeID(stringVal(c.NodeID)),
NodeMeta: c.NodeMeta,
NodeName: b.nodeName(c.NodeName),
ReadReplica: boolVal(c.ReadReplica),
PeeringEnabled: boolVal(c.Peering.Enabled),
PidFile: stringVal(c.PidFile),
PrimaryDatacenter: primaryDatacenter,
PrimaryGateways: b.expandAllOptionalAddrs("primary_gateways", c.PrimaryGateways),
PrimaryGatewaysInterval: b.durationVal("primary_gateways_interval", c.PrimaryGatewaysInterval),
RPCAdvertiseAddr: rpcAdvertiseAddr,
RPCBindAddr: rpcBindAddr,
RPCHandshakeTimeout: b.durationVal("limits.rpc_handshake_timeout", c.Limits.RPCHandshakeTimeout),
RPCHoldTimeout: b.durationVal("performance.rpc_hold_timeout", c.Performance.RPCHoldTimeout),
RPCMaxBurst: intVal(c.Limits.RPCMaxBurst),
RPCMaxConnsPerClient: intVal(c.Limits.RPCMaxConnsPerClient),
RPCProtocol: intVal(c.RPCProtocol),
RPCRateLimit: rate.Limit(float64Val(c.Limits.RPCRate)),
RPCConfig: consul.RPCConfig{EnableStreaming: boolValWithDefault(c.RPC.EnableStreaming, serverMode)},
RaftProtocol: intVal(c.RaftProtocol),
RaftSnapshotThreshold: intVal(c.RaftSnapshotThreshold),
RaftSnapshotInterval: b.durationVal("raft_snapshot_interval", c.RaftSnapshotInterval),
RaftTrailingLogs: intVal(c.RaftTrailingLogs),
ReconnectTimeoutLAN: b.durationVal("reconnect_timeout", c.ReconnectTimeoutLAN),
ReconnectTimeoutWAN: b.durationVal("reconnect_timeout_wan", c.ReconnectTimeoutWAN),
RejoinAfterLeave: boolVal(c.RejoinAfterLeave),
RetryJoinIntervalLAN: b.durationVal("retry_interval", c.RetryJoinIntervalLAN),
RetryJoinIntervalWAN: b.durationVal("retry_interval_wan", c.RetryJoinIntervalWAN),
RetryJoinLAN: b.expandAllOptionalAddrs("retry_join", c.RetryJoinLAN),
RetryJoinMaxAttemptsLAN: intVal(c.RetryJoinMaxAttemptsLAN),
RetryJoinMaxAttemptsWAN: intVal(c.RetryJoinMaxAttemptsWAN),
RetryJoinWAN: b.expandAllOptionalAddrs("retry_join_wan", c.RetryJoinWAN),
SegmentName: stringVal(c.SegmentName),
Segments: segments,
SegmentLimit: intVal(c.SegmentLimit),
SerfAdvertiseAddrLAN: serfAdvertiseAddrLAN,
SerfAdvertiseAddrWAN: serfAdvertiseAddrWAN,
SerfAllowedCIDRsLAN: serfAllowedCIDRSLAN,
SerfAllowedCIDRsWAN: serfAllowedCIDRSWAN,
SerfBindAddrLAN: serfBindAddrLAN,
SerfBindAddrWAN: serfBindAddrWAN,
SerfPortLAN: serfPortLAN,
SerfPortWAN: serfPortWAN,
ServerMode: serverMode,
ServerName: stringVal(c.ServerName),
ServerPort: serverPort,
Services: services,
SessionTTLMin: b.durationVal("session_ttl_min", c.SessionTTLMin),
SkipLeaveOnInt: skipLeaveOnInt,
StartJoinAddrsLAN: b.expandAllOptionalAddrs("start_join", c.StartJoinAddrsLAN),
StartJoinAddrsWAN: b.expandAllOptionalAddrs("start_join_wan", c.StartJoinAddrsWAN),
TaggedAddresses: c.TaggedAddresses,
TranslateWANAddrs: boolVal(c.TranslateWANAddrs),
TxnMaxReqLen: uint64Val(c.Limits.TxnMaxReqLen),
UIConfig: b.uiConfigVal(c.UIConfig),
UnixSocketGroup: stringVal(c.UnixSocket.Group),
UnixSocketMode: stringVal(c.UnixSocket.Mode),
UnixSocketUser: stringVal(c.UnixSocket.User),
Watches: c.Watches,
AutoReloadConfigCoalesceInterval: 1 * time.Second,
MaxQueryTime: b.durationVal("max_query_time", c.MaxQueryTime),
NodeID: types.NodeID(stringVal(c.NodeID)),
NodeMeta: c.NodeMeta,
NodeName: b.nodeName(c.NodeName),
ReadReplica: boolVal(c.ReadReplica),
PeeringEnabled: boolVal(c.Peering.Enabled),
PeeringTestAllowPeerRegistrations: boolValWithDefault(c.Peering.TestAllowPeerRegistrations, false),
PidFile: stringVal(c.PidFile),
PrimaryDatacenter: primaryDatacenter,
PrimaryGateways: b.expandAllOptionalAddrs("primary_gateways", c.PrimaryGateways),
PrimaryGatewaysInterval: b.durationVal("primary_gateways_interval", c.PrimaryGatewaysInterval),
RPCAdvertiseAddr: rpcAdvertiseAddr,
RPCBindAddr: rpcBindAddr,
RPCHandshakeTimeout: b.durationVal("limits.rpc_handshake_timeout", c.Limits.RPCHandshakeTimeout),
RPCHoldTimeout: b.durationVal("performance.rpc_hold_timeout", c.Performance.RPCHoldTimeout),
RPCMaxBurst: intVal(c.Limits.RPCMaxBurst),
RPCMaxConnsPerClient: intVal(c.Limits.RPCMaxConnsPerClient),
RPCProtocol: intVal(c.RPCProtocol),
RPCRateLimit: rate.Limit(float64Val(c.Limits.RPCRate)),
RPCConfig: consul.RPCConfig{EnableStreaming: boolValWithDefault(c.RPC.EnableStreaming, serverMode)},
RaftProtocol: intVal(c.RaftProtocol),
RaftSnapshotThreshold: intVal(c.RaftSnapshotThreshold),
RaftSnapshotInterval: b.durationVal("raft_snapshot_interval", c.RaftSnapshotInterval),
RaftTrailingLogs: intVal(c.RaftTrailingLogs),
ReconnectTimeoutLAN: b.durationVal("reconnect_timeout", c.ReconnectTimeoutLAN),
ReconnectTimeoutWAN: b.durationVal("reconnect_timeout_wan", c.ReconnectTimeoutWAN),
RejoinAfterLeave: boolVal(c.RejoinAfterLeave),
RetryJoinIntervalLAN: b.durationVal("retry_interval", c.RetryJoinIntervalLAN),
RetryJoinIntervalWAN: b.durationVal("retry_interval_wan", c.RetryJoinIntervalWAN),
RetryJoinLAN: b.expandAllOptionalAddrs("retry_join", c.RetryJoinLAN),
RetryJoinMaxAttemptsLAN: intVal(c.RetryJoinMaxAttemptsLAN),
RetryJoinMaxAttemptsWAN: intVal(c.RetryJoinMaxAttemptsWAN),
RetryJoinWAN: b.expandAllOptionalAddrs("retry_join_wan", c.RetryJoinWAN),
SegmentName: stringVal(c.SegmentName),
Segments: segments,
SegmentLimit: intVal(c.SegmentLimit),
SerfAdvertiseAddrLAN: serfAdvertiseAddrLAN,
SerfAdvertiseAddrWAN: serfAdvertiseAddrWAN,
SerfAllowedCIDRsLAN: serfAllowedCIDRSLAN,
SerfAllowedCIDRsWAN: serfAllowedCIDRSWAN,
SerfBindAddrLAN: serfBindAddrLAN,
SerfBindAddrWAN: serfBindAddrWAN,
SerfPortLAN: serfPortLAN,
SerfPortWAN: serfPortWAN,
ServerMode: serverMode,
ServerName: stringVal(c.ServerName),
ServerPort: serverPort,
Services: services,
SessionTTLMin: b.durationVal("session_ttl_min", c.SessionTTLMin),
SkipLeaveOnInt: skipLeaveOnInt,
StartJoinAddrsLAN: b.expandAllOptionalAddrs("start_join", c.StartJoinAddrsLAN),
StartJoinAddrsWAN: b.expandAllOptionalAddrs("start_join_wan", c.StartJoinAddrsWAN),
TaggedAddresses: c.TaggedAddresses,
TranslateWANAddrs: boolVal(c.TranslateWANAddrs),
TxnMaxReqLen: uint64Val(c.Limits.TxnMaxReqLen),
UIConfig: b.uiConfigVal(c.UIConfig),
UnixSocketGroup: stringVal(c.UnixSocket.Group),
UnixSocketMode: stringVal(c.UnixSocket.Mode),
UnixSocketUser: stringVal(c.UnixSocket.User),
Watches: c.Watches,
AutoReloadConfigCoalesceInterval: 1 * time.Second,
}

rt.TLS, err = b.buildTLSConfig(rt, c.TLS)
Expand Down
4 changes: 4 additions & 0 deletions agent/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -891,4 +891,8 @@ type TLS struct {

type Peering struct {
Enabled *bool `mapstructure:"enabled"`

// TestAllowPeerRegistrations controls whether CatalogRegister endpoints allow registrations for objects with `PeerName`
// This always gets overridden in NonUserSource()
TestAllowPeerRegistrations *bool `mapstructure:"test_allow_peer_registrations"`
}
5 changes: 5 additions & 0 deletions agent/config/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,11 @@ func NonUserSource() Source {
# the max time before leaf certs can be generated after a roots change.
test_ca_leaf_root_change_spread = "0s"
}

peering = {
# We use peer registration for various testing
test_allow_peer_registrations = false
}
`,
}
}
Expand Down
4 changes: 4 additions & 0 deletions agent/config/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -818,6 +818,10 @@ type RuntimeConfig struct {
// hcl: peering { enabled = (true|false) }
PeeringEnabled bool

// TestAllowPeerRegistrations controls whether CatalogRegister endpoints allow
// registrations for objects with `PeerName`
PeeringTestAllowPeerRegistrations bool

// PidFile is the file to store our PID in.
//
// hcl: pid_file = string
Expand Down
1 change: 1 addition & 0 deletions agent/config/testdata/TestRuntimeConfig_Sanitize.golden
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@
"NodeMeta": {},
"NodeName": "",
"PeeringEnabled": false,
"PeeringTestAllowPeerRegistrations": false,
"PidFile": "",
"PrimaryDatacenter": "",
"PrimaryGateways": [
Expand Down
27 changes: 27 additions & 0 deletions agent/consul/catalog_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,36 @@ type Catalog struct {
logger hclog.Logger
}

func hasPeerNameInRequest(req *structs.RegisterRequest) bool {
if req == nil {
return false
}
// nodes, services, checks
if req.PeerName != structs.DefaultPeerKeyword {
return true
}
if req.Service != nil && req.Service.PeerName != structs.DefaultPeerKeyword {
return true
}
if req.Check != nil && req.Check.PeerName != structs.DefaultPeerKeyword {
return true
}
for _, check := range req.Checks {
if check.PeerName != structs.DefaultPeerKeyword {
return true
}
}

return false
}

// Register a service and/or check(s) in a node, creating the node if it doesn't exist.
// It is valid to pass no service or checks to simply create the node itself.
func (c *Catalog) Register(args *structs.RegisterRequest, reply *struct{}) error {
if !c.srv.config.PeeringTestAllowPeerRegistrations && hasPeerNameInRequest(args) {
return fmt.Errorf("cannot register requests with PeerName in them")
}

if done, err := c.srv.ForwardRPC("Catalog.Register", args, reply); done {
return err
}
Expand Down
98 changes: 98 additions & 0 deletions agent/consul/catalog_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2765,6 +2765,104 @@ node_prefix "" {
return
}

// TestCatalog_Register_DenyPeeringRegistration makes sure that users cannot send structs.RegisterRequest
// with a PeerName in any part of the request.
func TestCatalog_Register_DenyPeeringRegistration(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}

t.Parallel()
_, s := testServerWithConfig(t)
codec := rpcClient(t, s)

// we will add PeerName to copies of arg
arg := structs.RegisterRequest{
Datacenter: "dc1",
Node: "foo",
Address: "127.0.0.1",
Service: &structs.NodeService{
Service: "db",
Tags: []string{"primary"},
Port: 8000,
},
Check: &structs.HealthCheck{
CheckID: types.CheckID("db-check"),
ServiceID: "db",
},
Checks: structs.HealthChecks{
&structs.HealthCheck{
CheckID: types.CheckID("db-check"),
ServiceID: "db",
},
},
}

type testcase struct {
name string
reqCopyFn func(arg *structs.RegisterRequest) structs.RegisterRequest
}

testCases := []testcase{
{
name: "peer name on top level",
reqCopyFn: func(arg *structs.RegisterRequest) structs.RegisterRequest {
copyR := *arg
copyR.PeerName = "foo"
return copyR
},
},
{
name: "peer name in service",
reqCopyFn: func(arg *structs.RegisterRequest) structs.RegisterRequest {
copyR := *arg
copyR.Service.PeerName = "foo"
return copyR
},
},
{
name: "peer name in check",
reqCopyFn: func(arg *structs.RegisterRequest) structs.RegisterRequest {
copyR := *arg
copyR.Check.PeerName = "foo"
return copyR
},
},
{
name: "peer name in checks",
reqCopyFn: func(arg *structs.RegisterRequest) structs.RegisterRequest {
copyR := *arg
copyR.Checks[0].PeerName = "foo"
return copyR
},
},
{
name: "peer name everywhere",
reqCopyFn: func(arg *structs.RegisterRequest) structs.RegisterRequest {
copyR := *arg

copyR.PeerName = "foo1"
copyR.Service.PeerName = "foo2"
copyR.Check.PeerName = "foo3"
copyR.Checks[0].PeerName = "foo4"
return copyR
},
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
req := tc.reqCopyFn(&arg)

var out struct{}
err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", &req, &out)
require.Error(t, err)
require.Contains(t, err.Error(), "cannot register requests with PeerName in them")
})
}

}

func TestCatalog_ListServices_FilterACL(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
Expand Down
5 changes: 4 additions & 1 deletion agent/consul/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,8 @@ type Config struct {
// PeeringEnabled enables cluster peering.
PeeringEnabled bool

PeeringTestAllowPeerRegistrations bool

// Embedded Consul Enterprise specific configuration
*EnterpriseConfig
}
Expand Down Expand Up @@ -515,7 +517,8 @@ func DefaultConfig() *Config {
DefaultQueryTime: 300 * time.Second,
MaxQueryTime: 600 * time.Second,

PeeringEnabled: true,
PeeringEnabled: true,
PeeringTestAllowPeerRegistrations: false,

EnterpriseConfig: DefaultEnterpriseConfig(),
}
Expand Down
Loading