Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/17270.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
grpc: ensure grpc resolver correctly uses lan/wan addresses on servers
```
10 changes: 7 additions & 3 deletions agent/consul/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -501,11 +501,15 @@ func newClient(t *testing.T, config *Config) *Client {
return client
}

func newTestResolverConfig(t *testing.T, suffix string) resolver.Config {
func newTestResolverConfig(t *testing.T, suffix string, dc, agentType string) resolver.Config {
n := t.Name()
s := strings.Replace(n, "/", "", -1)
s = strings.Replace(s, "_", "", -1)
return resolver.Config{Authority: strings.ToLower(s) + "-" + suffix}
return resolver.Config{
Datacenter: dc,
AgentType: agentType,
Authority: strings.ToLower(s) + "-" + suffix,
}
}

func newDefaultDeps(t *testing.T, c *Config) Deps {
Expand All @@ -520,7 +524,7 @@ func newDefaultDeps(t *testing.T, c *Config) Deps {
tls, err := tlsutil.NewConfigurator(c.TLSConfig, logger)
require.NoError(t, err, "failed to create tls configuration")

resolverBuilder := resolver.NewServerResolverBuilder(newTestResolverConfig(t, c.NodeName+"-"+c.Datacenter))
resolverBuilder := resolver.NewServerResolverBuilder(newTestResolverConfig(t, c.NodeName+"-"+c.Datacenter, c.Datacenter, "server"))
resolver.Register(resolverBuilder)

balancerBuilder := balancer.NewBuilder(resolverBuilder.Authority(), testutil.Logger(t))
Expand Down
4 changes: 4 additions & 0 deletions agent/consul/server_serf.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/hashicorp/consul/lib"
libserf "github.com/hashicorp/consul/lib/serf"
"github.com/hashicorp/consul/logging"
"github.com/hashicorp/consul/types"
)

const (
Expand Down Expand Up @@ -356,6 +357,7 @@ func (s *Server) lanNodeJoin(me serf.MemberEvent) {

// Update server lookup
s.serverLookup.AddServer(serverMeta)
s.router.AddServer(types.AreaLAN, serverMeta)

// If we're still expecting to bootstrap, may need to handle this.
if s.config.BootstrapExpect != 0 {
Expand All @@ -377,6 +379,7 @@ func (s *Server) lanNodeUpdate(me serf.MemberEvent) {

// Update server lookup
s.serverLookup.AddServer(serverMeta)
s.router.AddServer(types.AreaLAN, serverMeta)
}
}

Expand Down Expand Up @@ -515,5 +518,6 @@ func (s *Server) lanNodeFailed(me serf.MemberEvent) {

// Update id to address map
s.serverLookup.RemoveServer(serverMeta)
s.router.RemoveServer(types.AreaLAN, serverMeta)
}
}
5 changes: 4 additions & 1 deletion agent/consul/subscribe_backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,10 @@ func newClientWithGRPCPlumbing(t *testing.T, ops ...func(*Config)) (*Client, *re
}

resolverBuilder := resolver.NewServerResolverBuilder(newTestResolverConfig(t,
"client."+config.Datacenter+"."+string(config.NodeID)))
"client."+config.Datacenter+"."+string(config.NodeID),
config.Datacenter,
"client",
))

resolver.Register(resolverBuilder)
t.Cleanup(func() {
Expand Down
79 changes: 59 additions & 20 deletions agent/grpc-internal/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ func TestNewDialer_WithTLSWrapper(t *testing.T) {
require.NoError(t, err)
t.Cleanup(logError(t, lis.Close))

builder := resolver.NewServerResolverBuilder(newConfig(t))
builder.AddServer(types.AreaWAN, &metadata.Server{
builder := resolver.NewServerResolverBuilder(newConfig(t, "dc1", "server"))
builder.AddServer(types.AreaLAN, &metadata.Server{
Name: "server-1",
ID: "ID1",
Datacenter: "dc1",
Expand Down Expand Up @@ -87,7 +87,7 @@ func TestNewDialer_WithALPNWrapper(t *testing.T) {
p.Wait()
}()

builder := resolver.NewServerResolverBuilder(newConfig(t))
builder := resolver.NewServerResolverBuilder(newConfig(t, "dc1", "server"))
builder.AddServer(types.AreaWAN, &metadata.Server{
Name: "server-1",
ID: "ID1",
Expand Down Expand Up @@ -142,7 +142,7 @@ func TestNewDialer_WithALPNWrapper(t *testing.T) {
func TestNewDialer_IntegrationWithTLSEnabledHandler(t *testing.T) {
// if this test is failing because of expired certificates
// use the procedure in test/CA-GENERATION.md
res := resolver.NewServerResolverBuilder(newConfig(t))
res := resolver.NewServerResolverBuilder(newConfig(t, "dc1", "server"))
registerWithGRPC(t, res)

tlsConf, err := tlsutil.NewConfigurator(tlsutil.Config{
Expand All @@ -159,9 +159,17 @@ func TestNewDialer_IntegrationWithTLSEnabledHandler(t *testing.T) {
srv := newSimpleTestServer(t, "server-1", "dc1", tlsConf)

md := srv.Metadata()
res.AddServer(types.AreaWAN, md)
res.AddServer(types.AreaLAN, md)
t.Cleanup(srv.shutdown)

{
// Put a duplicate instance of this on the WAN that will
// fail if we accidentally use it.
srv := newPanicTestServer(t, hclog.Default(), "server-1", "dc1", nil)
res.AddServer(types.AreaWAN, srv.Metadata())
t.Cleanup(srv.shutdown)
}

pool := NewClientConnPool(ClientConnPoolConfig{
Servers: res,
TLSWrapper: TLSWrapper(tlsConf.OutgoingRPCWrapper()),
Expand Down Expand Up @@ -190,7 +198,7 @@ func TestNewDialer_IntegrationWithTLSEnabledHandler_viaMeshGateway(t *testing.T)
// use the procedure in test/CA-GENERATION.md
gwAddr := ipaddr.FormatAddressPort("127.0.0.1", freeport.GetOne(t))

res := resolver.NewServerResolverBuilder(newConfig(t))
res := resolver.NewServerResolverBuilder(newConfig(t, "dc2", "server"))
registerWithGRPC(t, res)

tlsConf, err := tlsutil.NewConfigurator(tlsutil.Config{
Expand Down Expand Up @@ -266,7 +274,7 @@ func TestNewDialer_IntegrationWithTLSEnabledHandler_viaMeshGateway(t *testing.T)

func TestClientConnPool_IntegrationWithGRPCResolver_Failover(t *testing.T) {
count := 4
res := resolver.NewServerResolverBuilder(newConfig(t))
res := resolver.NewServerResolverBuilder(newConfig(t, "dc1", "server"))
registerWithGRPC(t, res)
pool := NewClientConnPool(ClientConnPoolConfig{
Servers: res,
Expand All @@ -278,9 +286,18 @@ func TestClientConnPool_IntegrationWithGRPCResolver_Failover(t *testing.T) {

for i := 0; i < count; i++ {
name := fmt.Sprintf("server-%d", i)
srv := newSimpleTestServer(t, name, "dc1", nil)
res.AddServer(types.AreaWAN, srv.Metadata())
t.Cleanup(srv.shutdown)
{
srv := newSimpleTestServer(t, name, "dc1", nil)
res.AddServer(types.AreaLAN, srv.Metadata())
t.Cleanup(srv.shutdown)
}
{
// Put a duplicate instance of this on the WAN that will
// fail if we accidentally use it.
srv := newPanicTestServer(t, hclog.Default(), name, "dc1", nil)
res.AddServer(types.AreaWAN, srv.Metadata())
t.Cleanup(srv.shutdown)
}
}

conn, err := pool.ClientConn("dc1")
Expand All @@ -293,7 +310,7 @@ func TestClientConnPool_IntegrationWithGRPCResolver_Failover(t *testing.T) {
first, err := client.Something(ctx, &testservice.Req{})
require.NoError(t, err)

res.RemoveServer(types.AreaWAN, &metadata.Server{ID: first.ServerName, Datacenter: "dc1"})
res.RemoveServer(types.AreaLAN, &metadata.Server{ID: first.ServerName, Datacenter: "dc1"})

resp, err := client.Something(ctx, &testservice.Req{})
require.NoError(t, err)
Expand All @@ -302,7 +319,7 @@ func TestClientConnPool_IntegrationWithGRPCResolver_Failover(t *testing.T) {

func TestClientConnPool_ForwardToLeader_Failover(t *testing.T) {
count := 3
res := resolver.NewServerResolverBuilder(newConfig(t))
res := resolver.NewServerResolverBuilder(newConfig(t, "dc1", "server"))
registerWithGRPC(t, res)
pool := NewClientConnPool(ClientConnPoolConfig{
Servers: res,
Expand All @@ -315,10 +332,19 @@ func TestClientConnPool_ForwardToLeader_Failover(t *testing.T) {
var servers []testServer
for i := 0; i < count; i++ {
name := fmt.Sprintf("server-%d", i)
srv := newSimpleTestServer(t, name, "dc1", nil)
res.AddServer(types.AreaWAN, srv.Metadata())
servers = append(servers, srv)
t.Cleanup(srv.shutdown)
{
srv := newSimpleTestServer(t, name, "dc1", nil)
res.AddServer(types.AreaLAN, srv.Metadata())
servers = append(servers, srv)
t.Cleanup(srv.shutdown)
}
{
// Put a duplicate instance of this on the WAN that will
// fail if we accidentally use it.
srv := newPanicTestServer(t, hclog.Default(), name, "dc1", nil)
res.AddServer(types.AreaWAN, srv.Metadata())
t.Cleanup(srv.shutdown)
}
}

// Set the leader address to the first server.
Expand All @@ -345,17 +371,21 @@ func TestClientConnPool_ForwardToLeader_Failover(t *testing.T) {
require.Equal(t, resp.ServerName, servers[1].name)
}

func newConfig(t *testing.T) resolver.Config {
func newConfig(t *testing.T, dc, agentType string) resolver.Config {
n := t.Name()
s := strings.Replace(n, "/", "", -1)
s = strings.Replace(s, "_", "", -1)
return resolver.Config{Authority: strings.ToLower(s)}
return resolver.Config{
Datacenter: dc,
AgentType: agentType,
Authority: strings.ToLower(s),
}
}

func TestClientConnPool_IntegrationWithGRPCResolver_MultiDC(t *testing.T) {
dcs := []string{"dc1", "dc2", "dc3"}

res := resolver.NewServerResolverBuilder(newConfig(t))
res := resolver.NewServerResolverBuilder(newConfig(t, "dc1", "server"))
registerWithGRPC(t, res)
pool := NewClientConnPool(ClientConnPoolConfig{
Servers: res,
Expand All @@ -368,7 +398,16 @@ func TestClientConnPool_IntegrationWithGRPCResolver_MultiDC(t *testing.T) {
for _, dc := range dcs {
name := "server-0-" + dc
srv := newSimpleTestServer(t, name, dc, nil)
res.AddServer(types.AreaWAN, srv.Metadata())
if dc == "dc1" {
res.AddServer(types.AreaLAN, srv.Metadata())
// Put a duplicate instance of this on the WAN that will
// fail if we accidentally use it.
srvBad := newPanicTestServer(t, hclog.Default(), name, dc, nil)
res.AddServer(types.AreaWAN, srvBad.Metadata())
t.Cleanup(srvBad.shutdown)
} else {
res.AddServer(types.AreaWAN, srv.Metadata())
}
t.Cleanup(srv.shutdown)
}

Expand Down
4 changes: 2 additions & 2 deletions agent/grpc-internal/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@ func TestHandler_PanicRecoveryInterceptor(t *testing.T) {
Output: &buf,
})

res := resolver.NewServerResolverBuilder(newConfig(t))
res := resolver.NewServerResolverBuilder(newConfig(t, "dc1", "server"))
registerWithGRPC(t, res)

srv := newPanicTestServer(t, logger, "server-1", "dc1", nil)
res.AddServer(types.AreaWAN, srv.Metadata())
res.AddServer(types.AreaLAN, srv.Metadata())
t.Cleanup(srv.shutdown)

pool := NewClientConnPool(ClientConnPoolConfig{
Expand Down
Loading