Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .changelog/14930.txt

This file was deleted.

2 changes: 1 addition & 1 deletion agent/consul/peering_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func (b *PeeringBackend) fetchPeerServerAddresses(ws memdb.WatchSet, peerID stri
if !peering.IsActive() {
return nil, fmt.Errorf("there is no active peering for %q", peerID)
}
return bufferFromAddresses(peering.PeerServerAddresses)
return bufferFromAddresses(peering.GetAddressesToDial())
}

// maybeFetchGatewayAddresses will return a ring buffer with the latest gateway addresses if the
Expand Down
19 changes: 19 additions & 0 deletions agent/consul/peering_backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,25 @@ func TestPeeringBackend_GetDialAddresses(t *testing.T) {
err: "no known addresses",
},
},
{
name: "manual server addrs are returned when defined",
setup: func(store *state.Store) {
require.NoError(t, store.PeeringWrite(2, &pbpeering.PeeringWriteRequest{
Peering: &pbpeering.Peering{
Name: "dialer",
ID: dialerPeerID,
ManualServerAddresses: []string{"5.6.7.8:8502"},
PeerServerAddresses: []string{"1.2.3.4:8502", "2.3.4.5:8503"},
},
}))
// Mesh config entry does not exist
},
peerID: dialerPeerID,
expect: expectation{
haveGateways: false,
addrs: []string{"5.6.7.8:8502"},
},
},
{
name: "only server addrs are returned when mesh config does not exist",
setup: func(store *state.Store) {
Expand Down
44 changes: 38 additions & 6 deletions agent/grpc-external/services/peerstream/stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1596,17 +1596,20 @@ func Test_processResponse_Validation(t *testing.T) {
peerID := "1fabcd52-1d46-49b0-b1d8-71559aee47f5"

type testCase struct {
name string
in *pbpeerstream.ReplicationMessage_Response
expect *pbpeerstream.ReplicationMessage
wantErr bool
name string
in *pbpeerstream.ReplicationMessage_Response
expect *pbpeerstream.ReplicationMessage
extraTests func(t *testing.T, s *state.Store)
wantErr bool
}

srv, store := newTestServer(t, nil)
require.NoError(t, store.PeeringWrite(31, &pbpeering.PeeringWriteRequest{
Peering: &pbpeering.Peering{
ID: peerID,
Name: peerName,
Name: peerName,
ID: peerID,
ManualServerAddresses: []string{"manual"},
PeerServerAddresses: []string{"one", "two"},
},
}))

Expand All @@ -1622,6 +1625,9 @@ func Test_processResponse_Validation(t *testing.T) {
require.NoError(t, err)
}
require.Equal(t, tc.expect, reply)
if tc.extraTests != nil {
tc.extraTests(t, store)
}
}

tt := []testCase{
Expand Down Expand Up @@ -1729,6 +1735,32 @@ func Test_processResponse_Validation(t *testing.T) {
},
wantErr: true,
},
{
name: "manual server addresses are not overwritten",
in: &pbpeerstream.ReplicationMessage_Response{
ResourceURL: pbpeerstream.TypeURLPeeringServerAddresses,
Nonce: "1",
Operation: pbpeerstream.Operation_OPERATION_UPSERT,
Resource: makeAnyPB(t, &pbpeering.PeeringServerAddresses{
Addresses: []string{"three"},
}),
},
expect: &pbpeerstream.ReplicationMessage{
Payload: &pbpeerstream.ReplicationMessage_Request_{
Request: &pbpeerstream.ReplicationMessage_Request{
ResourceURL: pbpeerstream.TypeURLPeeringServerAddresses,
ResponseNonce: "1",
},
},
},
extraTests: func(t *testing.T, s *state.Store) {
_, peer, err := s.PeeringReadByID(nil, peerID)
require.NoError(t, err)
require.Equal(t, []string{"manual"}, peer.ManualServerAddresses)
require.Equal(t, []string{"three"}, peer.PeerServerAddresses)
},
wantErr: false,
},
}
for _, tc := range tt {
t.Run(tc.name, func(t *testing.T) {
Expand Down
34 changes: 34 additions & 0 deletions agent/peering_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,40 @@ func TestHTTP_Peering_GenerateToken(t *testing.T) {
// The PeerID in the token is randomly generated so we don't assert on its value.
require.NotEmpty(t, token.PeerID)
})

t.Run("Success with external address", func(t *testing.T) {
externalAddress := "32.1.2.3"
body := &pbpeering.GenerateTokenRequest{
PeerName: "peering-a",
ServerExternalAddresses: []string{externalAddress},
}

bodyBytes, err := json.Marshal(body)
require.NoError(t, err)

req, err := http.NewRequest("POST", "/v1/peering/token", bytes.NewReader(bodyBytes))
require.NoError(t, err)
resp := httptest.NewRecorder()
a.srv.h.ServeHTTP(resp, req)
require.Equal(t, http.StatusOK, resp.Code, "expected 200, got %d: %v", resp.Code, resp.Body.String())

var r pbpeering.GenerateTokenResponse
require.NoError(t, json.NewDecoder(resp.Body).Decode(&r))

tokenJSON, err := base64.StdEncoding.DecodeString(r.PeeringToken)
require.NoError(t, err)

var token structs.PeeringToken
require.NoError(t, json.Unmarshal(tokenJSON, &token))

require.NotNil(t, token.CA)
require.Equal(t, []string{externalAddress}, token.ManualServerAddresses)
require.Equal(t, []string{fmt.Sprintf("127.0.0.1:%d", a.config.GRPCTLSPort)}, token.ServerAddresses)
require.Equal(t, "server.dc1.peering.11111111-2222-3333-4444-555555555555.consul", token.ServerName)

// The PeerID in the token is randomly generated so we don't assert on its value.
require.NotEmpty(t, token.PeerID)
})
}

// Test for GenerateToken calls at various points in a peer's lifecycle
Expand Down
2 changes: 1 addition & 1 deletion agent/proxycfg/mesh_gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -625,7 +625,7 @@ func (s *handlerMeshGateway) handleUpdate(ctx context.Context, u UpdateEvent, sn
continue
}

hostnames, ips := peerHostnamesAndIPs(meshLogger, peering.Name, peering.PeerServerAddresses)
hostnames, ips := peerHostnamesAndIPs(meshLogger, peering.Name, peering.GetAddressesToDial())
if len(hostnames) > 0 {
peerServers[peering.PeerServerName] = PeerServersValue{
Addresses: hostnames,
Expand Down
37 changes: 20 additions & 17 deletions agent/rpc/peering/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,11 +317,12 @@ func (s *Server) GenerateToken(

tok := structs.PeeringToken{
// Store the UUID so that we can do a global search when handling inbound streams.
PeerID: peering.ID,
CA: caPEMs,
ServerAddresses: serverAddrs,
ServerName: serverName,
EstablishmentSecret: secretID,
PeerID: peering.ID,
CA: caPEMs,
ManualServerAddresses: req.ServerExternalAddresses,
ServerAddresses: serverAddrs,
ServerName: serverName,
EstablishmentSecret: secretID,
Remote: structs.PeeringTokenRemote{
Partition: req.PartitionOrDefault(),
Datacenter: s.Datacenter,
Expand Down Expand Up @@ -423,13 +424,14 @@ func (s *Server) Establish(
}

peering := &pbpeering.Peering{
ID: id,
Name: req.PeerName,
PeerCAPems: tok.CA,
PeerServerAddresses: serverAddrs,
PeerServerName: tok.ServerName,
PeerID: tok.PeerID,
Meta: req.Meta,
ID: id,
Name: req.PeerName,
PeerCAPems: tok.CA,
ManualServerAddresses: tok.ManualServerAddresses,
PeerServerAddresses: serverAddrs,
PeerServerName: tok.ServerName,
PeerID: tok.PeerID,
Meta: req.Meta,

// State is intentionally not set until after the secret exchange succeeds.
// This is to prevent a scenario where an active peering is re-established,
Expand Down Expand Up @@ -857,11 +859,12 @@ func (s *Server) PeeringDelete(ctx context.Context, req *pbpeering.PeeringDelete
// We only need to include the name and partition for the peering to be identified.
// All other data associated with the peering can be discarded because once marked
// for deletion the peering is effectively gone.
ID: existing.ID,
Name: req.Name,
State: pbpeering.PeeringState_DELETING,
PeerServerAddresses: existing.PeerServerAddresses,
DeletedAt: structs.TimeToProto(time.Now().UTC()),
ID: existing.ID,
Name: req.Name,
State: pbpeering.PeeringState_DELETING,
ManualServerAddresses: existing.ManualServerAddresses,
PeerServerAddresses: existing.PeerServerAddresses,
DeletedAt: structs.TimeToProto(time.Now().UTC()),

// PartitionOrEmpty is used to avoid writing "default" in OSS.
Partition: entMeta.PartitionOrEmpty(),
Expand Down
39 changes: 39 additions & 0 deletions agent/rpc/peering/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,45 @@ func TestPeeringService_GenerateToken(t *testing.T) {

}

func TestPeeringService_GenerateTokenExternalAddress(t *testing.T) {
dir := testutil.TempDir(t, "consul")

signer, _, _ := tlsutil.GeneratePrivateKey()
ca, _, _ := tlsutil.GenerateCA(tlsutil.CAOpts{Signer: signer})
cafile := path.Join(dir, "cacert.pem")
require.NoError(t, ioutil.WriteFile(cafile, []byte(ca), 0600))

// TODO(peering): see note on newTestServer, refactor to not use this
s := newTestServer(t, func(c *consul.Config) {
c.SerfLANConfig.MemberlistConfig.AdvertiseAddr = "127.0.0.1"
c.TLSConfig.GRPC.CAFile = cafile
c.DataDir = dir
})
client := pbpeering.NewPeeringServiceClient(s.ClientConn(t))
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
t.Cleanup(cancel)

externalAddresses := []string{"32.1.2.3:8502"}
// happy path
req := pbpeering.GenerateTokenRequest{PeerName: "peerB", Meta: map[string]string{"foo": "bar"}, ServerExternalAddresses: externalAddresses}
resp, err := client.GenerateToken(ctx, &req)
require.NoError(t, err)

tokenJSON, err := base64.StdEncoding.DecodeString(resp.PeeringToken)
require.NoError(t, err)

token := &structs.PeeringToken{}
require.NoError(t, json.Unmarshal(tokenJSON, token))
require.Equal(t, "server.dc1.peering.11111111-2222-3333-4444-555555555555.consul", token.ServerName)
require.Equal(t, externalAddresses, token.ManualServerAddresses)
require.Equal(t, []string{s.PublicGRPCAddr}, token.ServerAddresses)

// The roots utilized should be the ConnectCA roots and not the ones manually configured.
_, roots, err := s.Server.FSM().State().CARoots(nil)
require.NoError(t, err)
require.Equal(t, []string{roots.Active().RootCert}, token.CA)
}

func TestPeeringService_GenerateToken_ACLEnforcement(t *testing.T) {
// TODO(peering): see note on newTestServer, refactor to not use this
s := newTestServer(t, func(conf *consul.Config) {
Expand Down
15 changes: 13 additions & 2 deletions agent/rpc/peering/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ func validatePeeringToken(tok *structs.PeeringToken) error {
}
}

if len(tok.ServerAddresses) == 0 {
if len(tok.ServerAddresses) == 0 && len(tok.ManualServerAddresses) == 0 {
return errPeeringTokenEmptyServerAddresses
}
for _, addr := range tok.ServerAddresses {
validAddr := func(addr string) error {
_, portRaw, err := net.SplitHostPort(addr)
if err != nil {
return &errPeeringInvalidServerAddress{addr}
Expand All @@ -36,6 +36,17 @@ func validatePeeringToken(tok *structs.PeeringToken) error {
if port < 1 || port > 65535 {
return &errPeeringInvalidServerAddress{addr}
}
return nil
}
for _, addr := range tok.ManualServerAddresses {
if err := validAddr(addr); err != nil {
return err
}
}
for _, addr := range tok.ServerAddresses {
if err := validAddr(addr); err != nil {
return err
}
}

if len(tok.CA) > 0 && tok.ServerName == "" {
Expand Down
10 changes: 10 additions & 0 deletions agent/rpc/peering/validate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,16 @@ func TestValidatePeeringToken(t *testing.T) {
"1.2.3.4",
},
},
{
name: "invalid address port - manual",
token: &structs.PeeringToken{
CA: []string{validCA},
ManualServerAddresses: []string{"1.2.3.4"},
},
wantErr: &errPeeringInvalidServerAddress{
"1.2.3.4",
},
},
{
name: "invalid server name",
token: &structs.PeeringToken{
Expand Down
13 changes: 7 additions & 6 deletions agent/structs/peering.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@ package structs

// PeeringToken identifies a peer in order for a connection to be established.
type PeeringToken struct {
CA []string
ServerAddresses []string
ServerName string
PeerID string
EstablishmentSecret string
Remote PeeringTokenRemote
CA []string
ManualServerAddresses []string
ServerAddresses []string
ServerName string
PeerID string
EstablishmentSecret string
Remote PeeringTokenRemote
}

type PeeringTokenRemote struct {
Expand Down
4 changes: 4 additions & 0 deletions api/peering.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,10 @@ type PeeringGenerateTokenRequest struct {
Partition string `json:",omitempty"`
// Meta is a mapping of some string value to any other string value
Meta map[string]string `json:",omitempty"`
// ServerExternalAddresses is a list of addresses to put into the generated token. This could be used to specify
// load balancer(s) or external IPs to reach the servers from the dialing side, and will override any server
// addresses obtained from the "consul" service.
ServerExternalAddresses []string `json:",omitempty"`
}

type PeeringGenerateTokenResponse struct {
Expand Down
35 changes: 35 additions & 0 deletions api/peering_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package api

import (
"context"
"encoding/base64"
"encoding/json"
"reflect"
"testing"
"time"
Expand Down Expand Up @@ -224,6 +226,39 @@ func TestAPI_Peering_List(t *testing.T) {
})
}

func TestAPI_Peering_GenerateToken_ExternalAddresses(t *testing.T) {
t.Parallel()

c, s := makeClient(t) // this is "dc1"
defer s.Stop()
s.WaitForSerfCheck(t)

ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

externalAddress := "32.1.2.3:8502"

// Generate a token happy path
p1 := PeeringGenerateTokenRequest{
PeerName: "peer1",
Meta: map[string]string{"foo": "bar"},
ServerExternalAddresses: []string{externalAddress},
}
resp, wm, err := c.Peerings().GenerateToken(ctx, p1, nil)
require.NoError(t, err)
require.NotNil(t, wm)
require.NotNil(t, resp)

tokenJSON, err := base64.StdEncoding.DecodeString(resp.PeeringToken)
require.NoError(t, err)

// Put the token in an arbitrary map, because the struct isn't available in the api package.
token := make(map[string]interface{})
require.NoError(t, json.Unmarshal(tokenJSON, &token))
require.Equal(t, []interface{}{s.GRPCTLSAddr}, token["ServerAddresses"])
require.Equal(t, []interface{}{externalAddress}, token["ManualServerAddresses"])
}

// TestAPI_Peering_GenerateToken_Read_Establish_Delete tests the following use case:
// a server creates a peering token, reads the token, then another server calls establish peering
// finally, we delete the token on the first server
Expand Down
Loading