Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 9 additions & 6 deletions internal/xds/balancer/clusterresolver/configbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"google.golang.org/grpc/internal/xds/balancer/outlierdetection"
"google.golang.org/grpc/internal/xds/balancer/priority"
"google.golang.org/grpc/internal/xds/balancer/wrrlocality"
"google.golang.org/grpc/internal/xds/clients"
"google.golang.org/grpc/internal/xds/xdsclient/xdsresource"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/resolver/ringhash"
Expand Down Expand Up @@ -108,7 +109,7 @@ func buildPriorityConfig(priorities []priorityConfig, xdsLBPolicy *internalservi
}
continue
case DiscoveryMechanismTypeLogicalDNS:
name, config, endpoints := buildClusterImplConfigForDNS(p.childNameGen, p.endpoints, p.mechanism)
name, config, endpoints := buildClusterImplConfigForDNS(p.childNameGen, p.endpoints, p.mechanism, xdsLBPolicy)
retConfig.Priorities = append(retConfig.Priorities, name)
retEndpoints = append(retEndpoints, endpoints...)
odCfg := makeClusterImplOutlierDetectionChild(config, p.mechanism.outlierDetection)
Expand Down Expand Up @@ -138,21 +139,23 @@ func makeClusterImplOutlierDetectionChild(ciCfg *clusterimpl.LBConfig, odCfg out
return &odCfgRet
}

func buildClusterImplConfigForDNS(g *nameGenerator, endpoints []resolver.Endpoint, mechanism DiscoveryMechanism) (string, *clusterimpl.LBConfig, []resolver.Endpoint) {
// Endpoint picking policy for DNS is hardcoded to pick_first.
const childPolicy = "pick_first"
func buildClusterImplConfigForDNS(g *nameGenerator, endpoints []resolver.Endpoint, mechanism DiscoveryMechanism, xdsLBPolicy *internalserviceconfig.BalancerConfig) (string, *clusterimpl.LBConfig, []resolver.Endpoint) {
retEndpoints := make([]resolver.Endpoint, len(endpoints))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We want to return a slice that contains a single endpoint.

pName := fmt.Sprintf("priority-%v", g.prefix)
for i, e := range endpoints {
retEndpoints[i] = hierarchy.SetInEndpoint(e, []string{pName})
// Use the canonical string representation for the locality to match
// the keys expected by the parent Load Balancing policy.
localityStr := xdsinternal.LocalityString(clients.Locality{})
retEndpoints[i] = hierarchy.SetInEndpoint(e, []string{pName, localityStr})
retEndpoints[i] = wrrlocality.SetAddrInfoInEndpoint(retEndpoints[i], wrrlocality.AddrInfo{LocalityWeight: 1})
Comment on lines +146 to +150
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is still not doing what we want.

The DNS resolver implementation returns addresses and not endpoints, because the DNS protocol does not know about endpoints. See:

state := resolver.State{Addresses: addrs}

We have code in the channel that converts these addresses to endpoints, with one address per endpoint. See:

func addressesToEndpoints(addrs []resolver.Address) []resolver.Endpoint {

But the above code will execute only when the DNS resolver is used on the gRPC channel. Here in the cluster_resolver LB policy, we are creating a DNS resolver ourselves, and therefore the code to convert from addresses to endpoints (that exists in the channel) will not be run here. So, the cluster_resolver will see a set of addresses and no endpoints from the DNS resolver in [resource_resolver_dns.go](https://github.com/grpc/grpc-go/pull/8733/files#diff-6249528a41f17a06cec41f598b885840f09ef05ee3740ab297fc5a905f2875ca), and will probably convert it into a single endpoint with all the addresses in it (this is the change that you have in line 138 of resource_resolver_dns.go).

But, at some point, we will change the DNS resolver implementation to do what is being done today in the channel, i.e., return a set of endpoints, with one address per endpoint. That means that this code should handle multiple endpoints as input, but output a single endpoint that contains all the addresses from the input endpoints.

// Copy the nested address field as slice fields are shared by the
// iteration variable and the original slice.
retEndpoints[i].Addresses = append([]resolver.Address{}, e.Addresses...)
}
return pName, &clusterimpl.LBConfig{
Cluster: mechanism.Cluster,
TelemetryLabels: mechanism.TelemetryLabels,
ChildPolicy: &internalserviceconfig.BalancerConfig{Name: childPolicy},
ChildPolicy: xdsLBPolicy,
MaxConcurrentRequests: mechanism.MaxConcurrentRequests,
LoadReportingServer: mechanism.LoadReportingServer,
}, retEndpoints
Expand Down
26 changes: 15 additions & 11 deletions internal/xds/balancer/clusterresolver/configbuilder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,8 +290,7 @@ func TestBuildPriorityConfig(t *testing.T) {
ChildPolicy: &iserviceconfig.BalancerConfig{
Name: clusterimpl.Name,
Config: &clusterimpl.LBConfig{
Cluster: testClusterName2,
ChildPolicy: &iserviceconfig.BalancerConfig{Name: "pick_first"},
Cluster: testClusterName2,
},
},
},
Expand All @@ -306,20 +305,25 @@ func TestBuildPriorityConfig(t *testing.T) {
}
}

func testEndpointForDNS(addrStrs []string, localityWeight uint32, path []string) resolver.Endpoint {
endpoint := resolver.Endpoint{}
endpoint.Addresses = append(endpoint.Addresses, resolver.Address{Addr: addrStrs[0]})
endpoint = hierarchy.SetInEndpoint(endpoint, path)
endpoint = wrrlocality.SetAddrInfoInEndpoint(endpoint, wrrlocality.AddrInfo{LocalityWeight: localityWeight})
return endpoint
}

func TestBuildClusterImplConfigForDNS(t *testing.T) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We want to test all possibilities here:

  • one endpoint with one address
  • one endpoint with multiple addresses
  • multiple endpoints, all with one address each
  • multiple endpoints, all with multiple addresses.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We also want to make sure we have a more e2e style test to cover this new logic. A LOGICAL_DNS cluster that specifies a round_robin LB policy and the DNS resolver returning multiple addresses. In this case, because cluster_resolver will convert these addresses into a single endpoint, we should end up with all traffic going to the first address (because RR will delegate to PF).

gotName, gotConfig, gotEndpoints := buildClusterImplConfigForDNS(newNameGenerator(3), testResolverEndpoints[0], DiscoveryMechanism{Cluster: testClusterName2, Type: DiscoveryMechanismTypeLogicalDNS})
gotName, gotConfig, gotEndpoints := buildClusterImplConfigForDNS(newNameGenerator(3), testResolverEndpoints[0], DiscoveryMechanism{Cluster: testClusterName2, Type: DiscoveryMechanismTypeLogicalDNS}, nil)
wantName := "priority-3"
localityStr := xdsinternal.LocalityString(clients.Locality{})
wantConfig := &clusterimpl.LBConfig{
Cluster: testClusterName2,
ChildPolicy: &iserviceconfig.BalancerConfig{
Name: "pick_first",
},
Cluster: testClusterName2,
ChildPolicy: nil,
}
e1 := resolver.Endpoint{Addresses: []resolver.Address{{Addr: testEndpoints[0][0].Addresses[0]}}}
e2 := resolver.Endpoint{Addresses: []resolver.Address{{Addr: testEndpoints[0][1].Addresses[0]}}}
wantEndpoints := []resolver.Endpoint{
hierarchy.SetInEndpoint(e1, []string{"priority-3"}),
hierarchy.SetInEndpoint(e2, []string{"priority-3"}),
testEndpointForDNS(testEndpoints[0][0].Addresses, 1, []string{wantName, localityStr}),
testEndpointForDNS(testEndpoints[0][1].Addresses, 1, []string{wantName, localityStr}),
}

if diff := cmp.Diff(gotName, wantName); diff != "" {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -984,7 +984,7 @@ func (s) TestAggregateCluster_BadEDS_BadDNS(t *testing.T) {
if err == nil {
t.Fatal("EmptyCall() succeeded when expected to fail")
}
if status.Code(err) == codes.Unavailable && strings.Contains(err.Error(), "produced zero addresses") {
if status.Code(err) == codes.Unavailable && strings.Contains(err.Error(), "no targets to pick from") {
break
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,11 +135,7 @@ func (dr *dnsDiscoveryMechanism) UpdateState(state resolver.State) error {
dr.mu.Lock()
var endpoints = state.Endpoints
if len(endpoints) == 0 {
endpoints = make([]resolver.Endpoint, len(state.Addresses))
for i, a := range state.Addresses {
endpoints[i] = resolver.Endpoint{Addresses: []resolver.Address{a}}
endpoints[i].Attributes = a.BalancerAttributes
}
endpoints = []resolver.Endpoint{{Addresses: state.Addresses}}
}
dr.endpoints = endpoints
dr.updateReceived = true
Expand Down