From 03c8b0e508a6e9453b1b7dd13e9ccda21e558396 Mon Sep 17 00:00:00 2001 From: Pranjali-2501 Date: Mon, 1 Dec 2025 14:42:31 +0000 Subject: [PATCH 01/10] remove hardcoded 'pickfirst' from logical_dns --- .../e2e_test/aggregate_cluster_test.go | 2 +- .../balancer/clusterresolver/configbuilder.go | 18 ++++++++---- .../clusterresolver/configbuilder_test.go | 28 +++++++++++-------- .../clusterresolver/resource_resolver_dns.go | 7 ++--- 4 files changed, 32 insertions(+), 23 deletions(-) diff --git a/internal/xds/balancer/cdsbalancer/e2e_test/aggregate_cluster_test.go b/internal/xds/balancer/cdsbalancer/e2e_test/aggregate_cluster_test.go index 9e6281f11af1..4471b3bcedaf 100644 --- a/internal/xds/balancer/cdsbalancer/e2e_test/aggregate_cluster_test.go +++ b/internal/xds/balancer/cdsbalancer/e2e_test/aggregate_cluster_test.go @@ -1000,7 +1000,7 @@ func (s) TestAggregateCluster_BadEDS_BadDNS(t *testing.T) { if err == nil { t.Fatal("EmptyCall() succeeded when expected to fail") } - if status.Code(err) == codes.Unavailable && strings.Contains(err.Error(), "produced zero addresses") { + if status.Code(err) == codes.Unavailable && strings.Contains(err.Error(), "no targets to pick from") { break } } diff --git a/internal/xds/balancer/clusterresolver/configbuilder.go b/internal/xds/balancer/clusterresolver/configbuilder.go index 702ba76c63e4..8682254b6abd 100644 --- a/internal/xds/balancer/clusterresolver/configbuilder.go +++ b/internal/xds/balancer/clusterresolver/configbuilder.go @@ -32,6 +32,7 @@ import ( "google.golang.org/grpc/internal/xds/balancer/outlierdetection" "google.golang.org/grpc/internal/xds/balancer/priority" "google.golang.org/grpc/internal/xds/balancer/wrrlocality" + "google.golang.org/grpc/internal/xds/clients" "google.golang.org/grpc/internal/xds/xdsclient/xdsresource" "google.golang.org/grpc/resolver" "google.golang.org/grpc/resolver/ringhash" @@ -109,7 +110,7 @@ func buildPriorityConfig(priorities []priorityConfig, xdsLBPolicy *internalservi } continue case DiscoveryMechanismTypeLogicalDNS: - name, config, endpoints := buildClusterImplConfigForDNS(p.childNameGen, p.endpoints, p.mechanism) + name, config, endpoints := buildClusterImplConfigForDNS(p.childNameGen, p.endpoints, p.mechanism, xdsLBPolicy) retConfig.Priorities = append(retConfig.Priorities, name) retEndpoints = append(retEndpoints, endpoints...) odCfg := makeClusterImplOutlierDetectionChild(config, p.mechanism.outlierDetection) @@ -139,13 +140,18 @@ func makeClusterImplOutlierDetectionChild(ciCfg *clusterimpl.LBConfig, odCfg out return &odCfgRet } -func buildClusterImplConfigForDNS(g *nameGenerator, endpoints []resolver.Endpoint, mechanism DiscoveryMechanism) (string, *clusterimpl.LBConfig, []resolver.Endpoint) { - // Endpoint picking policy for DNS is hardcoded to pick_first. - const childPolicy = "pick_first" +func buildClusterImplConfigForDNS(g *nameGenerator, endpoints []resolver.Endpoint, mechanism DiscoveryMechanism, xdsLBPolicy *internalserviceconfig.BalancerConfig) (string, *clusterimpl.LBConfig, []resolver.Endpoint) { retEndpoints := make([]resolver.Endpoint, len(endpoints)) pName := fmt.Sprintf("priority-%v", g.prefix) for i, e := range endpoints { - retEndpoints[i] = hierarchy.SetInEndpoint(e, []string{pName}) + lid := clients.Locality{} + localityStr := xdsinternal.LocalityString(lid) + retEndpoints[i] = hierarchy.SetInEndpoint(e, []string{pName, localityStr}) + retEndpoints[i] = xdsinternal.SetLocalityIDInEndpoint(retEndpoints[i], lid) + var lw uint32 = 1 + retEndpoints[i] = wrrlocality.SetAddrInfoInEndpoint(retEndpoints[i], wrrlocality.AddrInfo{LocalityWeight: lw}) + var ew uint32 = 1 + retEndpoints[i] = weight.Set(retEndpoints[i], weight.EndpointInfo{Weight: lw * ew}) // Copy the nested address field as slice fields are shared by the // iteration variable and the original slice. retEndpoints[i].Addresses = append([]resolver.Address{}, e.Addresses...) @@ -153,7 +159,7 @@ func buildClusterImplConfigForDNS(g *nameGenerator, endpoints []resolver.Endpoin return pName, &clusterimpl.LBConfig{ Cluster: mechanism.Cluster, TelemetryLabels: mechanism.TelemetryLabels, - ChildPolicy: &internalserviceconfig.BalancerConfig{Name: childPolicy}, + ChildPolicy: xdsLBPolicy, MaxConcurrentRequests: mechanism.MaxConcurrentRequests, LoadReportingServer: mechanism.LoadReportingServer, }, retEndpoints diff --git a/internal/xds/balancer/clusterresolver/configbuilder_test.go b/internal/xds/balancer/clusterresolver/configbuilder_test.go index a82c0eb70d98..ad5086054c5b 100644 --- a/internal/xds/balancer/clusterresolver/configbuilder_test.go +++ b/internal/xds/balancer/clusterresolver/configbuilder_test.go @@ -292,8 +292,7 @@ func TestBuildPriorityConfig(t *testing.T) { ChildPolicy: &iserviceconfig.BalancerConfig{ Name: clusterimpl.Name, Config: &clusterimpl.LBConfig{ - Cluster: testClusterName2, - ChildPolicy: &iserviceconfig.BalancerConfig{Name: "pick_first"}, + Cluster: testClusterName2, }, }, }, @@ -308,20 +307,27 @@ func TestBuildPriorityConfig(t *testing.T) { } } +func testEndpointForDNS(addrStrs []string, localityWeight, endpointWeight uint32, path []string, lID *clients.Locality) resolver.Endpoint { + endpoint := resolver.Endpoint{} + endpoint.Addresses = append(endpoint.Addresses, resolver.Address{Addr: addrStrs[0]}) + endpoint = hierarchy.SetInEndpoint(endpoint, path) + endpoint = xdsinternal.SetLocalityIDInEndpoint(endpoint, *lID) + endpoint = wrrlocality.SetAddrInfoInEndpoint(endpoint, wrrlocality.AddrInfo{LocalityWeight: localityWeight}) + endpoint = weight.Set(endpoint, weight.EndpointInfo{Weight: localityWeight * endpointWeight}) + return endpoint +} + func TestBuildClusterImplConfigForDNS(t *testing.T) { - gotName, gotConfig, gotEndpoints := buildClusterImplConfigForDNS(newNameGenerator(3), testResolverEndpoints[0], DiscoveryMechanism{Cluster: testClusterName2, Type: DiscoveryMechanismTypeLogicalDNS}) + gotName, gotConfig, gotEndpoints := buildClusterImplConfigForDNS(newNameGenerator(3), testResolverEndpoints[0], DiscoveryMechanism{Cluster: testClusterName2, Type: DiscoveryMechanismTypeLogicalDNS}, nil) wantName := "priority-3" + localityStr := xdsinternal.LocalityString(clients.Locality{}) wantConfig := &clusterimpl.LBConfig{ - Cluster: testClusterName2, - ChildPolicy: &iserviceconfig.BalancerConfig{ - Name: "pick_first", - }, + Cluster: testClusterName2, + ChildPolicy: nil, } - e1 := resolver.Endpoint{Addresses: []resolver.Address{{Addr: testEndpoints[0][0].ResolverEndpoint.Addresses[0].Addr}}} - e2 := resolver.Endpoint{Addresses: []resolver.Address{{Addr: testEndpoints[0][1].ResolverEndpoint.Addresses[0].Addr}}} wantEndpoints := []resolver.Endpoint{ - hierarchy.SetInEndpoint(e1, []string{"priority-3"}), - hierarchy.SetInEndpoint(e2, []string{"priority-3"}), + testEndpointForDNS(testEndpoints[0][0].Addresses, 1, 1, []string{wantName, localityStr}, &clients.Locality{}), + testEndpointForDNS(testEndpoints[0][1].Addresses, 1, 1, []string{wantName, localityStr}, &clients.Locality{}), } if diff := cmp.Diff(gotName, wantName); diff != "" { diff --git a/internal/xds/balancer/clusterresolver/resource_resolver_dns.go b/internal/xds/balancer/clusterresolver/resource_resolver_dns.go index 5f7a21153057..0f912d9fadb9 100644 --- a/internal/xds/balancer/clusterresolver/resource_resolver_dns.go +++ b/internal/xds/balancer/clusterresolver/resource_resolver_dns.go @@ -135,11 +135,7 @@ func (dr *dnsDiscoveryMechanism) UpdateState(state resolver.State) error { dr.mu.Lock() var endpoints = state.Endpoints if len(endpoints) == 0 { - endpoints = make([]resolver.Endpoint, len(state.Addresses)) - for i, a := range state.Addresses { - endpoints[i] = resolver.Endpoint{Addresses: []resolver.Address{a}} - endpoints[i].Attributes = a.BalancerAttributes - } + endpoints = []resolver.Endpoint{{Addresses: state.Addresses}} } dr.endpoints = endpoints dr.updateReceived = true @@ -172,6 +168,7 @@ func (dr *dnsDiscoveryMechanism) ReportError(err error) { } func (dr *dnsDiscoveryMechanism) NewAddress(addresses []resolver.Address) { + fmt.Println("NewAddress") dr.UpdateState(resolver.State{Addresses: addresses}) } From a3abb831b861d5c14c730f51ec978a8e7e61b91d Mon Sep 17 00:00:00 2001 From: Pranjali-2501 Date: Wed, 3 Dec 2025 17:39:27 +0000 Subject: [PATCH 02/10] resolving comments --- .../xds/balancer/clusterresolver/configbuilder.go | 11 ++++------- .../balancer/clusterresolver/configbuilder_test.go | 8 +++----- .../balancer/clusterresolver/resource_resolver_dns.go | 1 - 3 files changed, 7 insertions(+), 13 deletions(-) diff --git a/internal/xds/balancer/clusterresolver/configbuilder.go b/internal/xds/balancer/clusterresolver/configbuilder.go index 8682254b6abd..23e8e25cc6d3 100644 --- a/internal/xds/balancer/clusterresolver/configbuilder.go +++ b/internal/xds/balancer/clusterresolver/configbuilder.go @@ -144,14 +144,11 @@ func buildClusterImplConfigForDNS(g *nameGenerator, endpoints []resolver.Endpoin retEndpoints := make([]resolver.Endpoint, len(endpoints)) pName := fmt.Sprintf("priority-%v", g.prefix) for i, e := range endpoints { - lid := clients.Locality{} - localityStr := xdsinternal.LocalityString(lid) + // Use the canonical string representation for the locality to match + // the keys expected by the parent Load Balancing policy. + localityStr := xdsinternal.LocalityString(clients.Locality{}) retEndpoints[i] = hierarchy.SetInEndpoint(e, []string{pName, localityStr}) - retEndpoints[i] = xdsinternal.SetLocalityIDInEndpoint(retEndpoints[i], lid) - var lw uint32 = 1 - retEndpoints[i] = wrrlocality.SetAddrInfoInEndpoint(retEndpoints[i], wrrlocality.AddrInfo{LocalityWeight: lw}) - var ew uint32 = 1 - retEndpoints[i] = weight.Set(retEndpoints[i], weight.EndpointInfo{Weight: lw * ew}) + retEndpoints[i] = wrrlocality.SetAddrInfoInEndpoint(retEndpoints[i], wrrlocality.AddrInfo{LocalityWeight: 1}) // Copy the nested address field as slice fields are shared by the // iteration variable and the original slice. retEndpoints[i].Addresses = append([]resolver.Address{}, e.Addresses...) diff --git a/internal/xds/balancer/clusterresolver/configbuilder_test.go b/internal/xds/balancer/clusterresolver/configbuilder_test.go index ad5086054c5b..df23795ffe9c 100644 --- a/internal/xds/balancer/clusterresolver/configbuilder_test.go +++ b/internal/xds/balancer/clusterresolver/configbuilder_test.go @@ -307,13 +307,11 @@ func TestBuildPriorityConfig(t *testing.T) { } } -func testEndpointForDNS(addrStrs []string, localityWeight, endpointWeight uint32, path []string, lID *clients.Locality) resolver.Endpoint { +func testEndpointForDNS(addrStrs []string, localityWeight uint32, path []string) resolver.Endpoint { endpoint := resolver.Endpoint{} endpoint.Addresses = append(endpoint.Addresses, resolver.Address{Addr: addrStrs[0]}) endpoint = hierarchy.SetInEndpoint(endpoint, path) - endpoint = xdsinternal.SetLocalityIDInEndpoint(endpoint, *lID) endpoint = wrrlocality.SetAddrInfoInEndpoint(endpoint, wrrlocality.AddrInfo{LocalityWeight: localityWeight}) - endpoint = weight.Set(endpoint, weight.EndpointInfo{Weight: localityWeight * endpointWeight}) return endpoint } @@ -326,8 +324,8 @@ func TestBuildClusterImplConfigForDNS(t *testing.T) { ChildPolicy: nil, } wantEndpoints := []resolver.Endpoint{ - testEndpointForDNS(testEndpoints[0][0].Addresses, 1, 1, []string{wantName, localityStr}, &clients.Locality{}), - testEndpointForDNS(testEndpoints[0][1].Addresses, 1, 1, []string{wantName, localityStr}, &clients.Locality{}), + testEndpointForDNS(testEndpoints[0][0].Addresses, 1, []string{wantName, localityStr}), + testEndpointForDNS(testEndpoints[0][1].Addresses, 1, []string{wantName, localityStr}), } if diff := cmp.Diff(gotName, wantName); diff != "" { diff --git a/internal/xds/balancer/clusterresolver/resource_resolver_dns.go b/internal/xds/balancer/clusterresolver/resource_resolver_dns.go index 0f912d9fadb9..2c86e974db3f 100644 --- a/internal/xds/balancer/clusterresolver/resource_resolver_dns.go +++ b/internal/xds/balancer/clusterresolver/resource_resolver_dns.go @@ -168,7 +168,6 @@ func (dr *dnsDiscoveryMechanism) ReportError(err error) { } func (dr *dnsDiscoveryMechanism) NewAddress(addresses []resolver.Address) { - fmt.Println("NewAddress") dr.UpdateState(resolver.State{Addresses: addresses}) } From 23c3c9b5a81b2536407781209dc2cb1129a37008 Mon Sep 17 00:00:00 2001 From: Pranjali-2501 Date: Mon, 15 Dec 2025 10:13:41 +0000 Subject: [PATCH 03/10] resolving comments resolve conflicts --- .../cdsbalancer/e2e_test/dns_impl_test.go | 155 ++++++++++++++++++ .../balancer/clusterresolver/configbuilder.go | 19 ++- .../clusterresolver/configbuilder_test.go | 99 ++++++++--- 3 files changed, 245 insertions(+), 28 deletions(-) create mode 100644 internal/xds/balancer/cdsbalancer/e2e_test/dns_impl_test.go diff --git a/internal/xds/balancer/cdsbalancer/e2e_test/dns_impl_test.go b/internal/xds/balancer/cdsbalancer/e2e_test/dns_impl_test.go new file mode 100644 index 000000000000..a31a53bd6ca8 --- /dev/null +++ b/internal/xds/balancer/cdsbalancer/e2e_test/dns_impl_test.go @@ -0,0 +1,155 @@ +/* + * Copyright 2025 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package e2e_test + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/google/uuid" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/internal" + "google.golang.org/grpc/internal/stubserver" + "google.golang.org/grpc/internal/testutils/xds/e2e" + "google.golang.org/grpc/internal/xds/bootstrap" + "google.golang.org/grpc/internal/xds/xdsclient" + "google.golang.org/grpc/peer" + "google.golang.org/grpc/resolver" + "google.golang.org/grpc/resolver/manual" + "google.golang.org/grpc/serviceconfig" + + testpb "google.golang.org/grpc/interop/grpc_testing" + + _ "google.golang.org/grpc/internal/xds/balancer/clusterresolver" +) + +// TestLogicalDNS_MultipleEndpoints tests the cluster_resolver LB policy +// using a LOGICAL_DNS discovery mechanism. +// +// The test verifies that multiple addresses returned by the DNS resolver are +// grouped into a single endpoint (as per gRFC A61). Because the round_robin +// LB policy (configured via xdsLbPolicy) sees only one endpoint, it should +// not rotate traffic between the addresses. Instead, the single endpoint +// (which contains all addresses) is picked, and connects to the first address. +func (s) TestLogicalDNS_MultipleEndpoints(t *testing.T) { + // Spin up a management server to receive xDS resources from. + managementServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{}) + + // Create bootstrap configuration pointing to the above management server. + nodeID := uuid.New().String() + bootstrapContents := e2e.DefaultBootstrapContents(t, nodeID, managementServer.Address) + + // Start backend servers which provide an implementation of the TestService. + server1 := stubserver.StartTestService(t, nil) + defer server1.Stop() + server2 := stubserver.StartTestService(t, nil) + defer server2.Stop() + + // Mock the DNS Resolver + const dnsScheme = "dns" + dnsR := manual.NewBuilderWithScheme(dnsScheme) + originalDNS := resolver.Get("dns") + resolver.Register(dnsR) + t.Cleanup(func() { resolver.Register(originalDNS) }) + + // Capture the ClientConn created by the cluster_resolver so we can push updates. + // We use a channel to synchronize access and avoid race conditions. + dnsCCCh := make(chan resolver.ClientConn, 1) + dnsR.BuildCallback = func(_ resolver.Target, cc resolver.ClientConn, _ resolver.BuildOptions) { + select { + case dnsCCCh <- cc: + default: + } + } + + // Create an xDS client for use by the cluster_resolver LB policy. + config, err := bootstrap.NewConfigFromContents(bootstrapContents) + if err != nil { + t.Fatalf("Failed to parse bootstrap: %v", err) + } + pool := xdsclient.NewPool(config) + xdsC, closeClient, err := pool.NewClientForTesting(xdsclient.OptionsForTesting{ + Name: t.Name(), + }) + if err != nil { + t.Fatalf("Failed to create xDS client: %v", err) + } + defer closeClient() + + // Create a manual resolver and push service config specifying the use of + // the cluster_resolver LB policy with LOGICAL_DNS discovery mechanism. + r := manual.NewBuilderWithScheme("whatever") + jsonSC := fmt.Sprintf(`{ + "loadBalancingConfig":[{ + "cluster_resolver_experimental":{ + "discoveryMechanisms": [{ + "cluster": "test-cluster", + "type": "LOGICAL_DNS", + "dnsHostname": "%s:///target-name", + "outlierDetection": {} + }], + "xdsLbPolicy":[{"round_robin":{}}] + } + }] + }`, dnsScheme) + + scpr := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(jsonSC) + r.InitialState(xdsclient.SetClient(resolver.State{ServiceConfig: scpr}, xdsC)) + + // Create a ClientConn and make a successful RPC. + cc, err := grpc.NewClient(r.Scheme()+":///test.service", + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithResolvers(r), + ) + cc.Connect() + if err != nil { + t.Fatalf("failed to create new client for local test server: %v", err) + } + defer cc.Close() + + testClient := testpb.NewTestServiceClient(cc) + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + var dnsClientConn resolver.ClientConn + select { + case dnsClientConn = <-dnsCCCh: + case <-ctx.Done(): + t.Fatal("Timeout waiting for cluster_resolver to build the DNS resolver") + } + + // For LOGICAL_DNS, this updates the SINGLE endpoint to have 2 IPs. + dnsClientConn.UpdateState(resolver.State{ + Addresses: []resolver.Address{ + {Addr: server1.Address}, + {Addr: server2.Address}, + }, + }) + + // Ensure the RPC is routed to the first backend. + var peer peer.Peer + if _, err := testClient.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(&peer)); err != nil { + t.Fatalf("RPC failed: %v", err) + } + + if got, want := peer.Addr.String(), server1.Address; got != want { + t.Errorf("peer.Addr = %q, want = %q", got, want) + } +} diff --git a/internal/xds/balancer/clusterresolver/configbuilder.go b/internal/xds/balancer/clusterresolver/configbuilder.go index 23e8e25cc6d3..ac645ea30e76 100644 --- a/internal/xds/balancer/clusterresolver/configbuilder.go +++ b/internal/xds/balancer/clusterresolver/configbuilder.go @@ -141,17 +141,18 @@ func makeClusterImplOutlierDetectionChild(ciCfg *clusterimpl.LBConfig, odCfg out } func buildClusterImplConfigForDNS(g *nameGenerator, endpoints []resolver.Endpoint, mechanism DiscoveryMechanism, xdsLBPolicy *internalserviceconfig.BalancerConfig) (string, *clusterimpl.LBConfig, []resolver.Endpoint) { - retEndpoints := make([]resolver.Endpoint, len(endpoints)) + var retEndpoints []resolver.Endpoint pName := fmt.Sprintf("priority-%v", g.prefix) - for i, e := range endpoints { - // Use the canonical string representation for the locality to match - // the keys expected by the parent Load Balancing policy. + if len(endpoints) >= 1 { + retEndpoints = make([]resolver.Endpoint, 1) + for _, e := range endpoints { + // Copy the nested address field as slice fields are shared by the + // iteration variable and the original slice. + retEndpoints[0].Addresses = append(retEndpoints[0].Addresses, e.Addresses...) + } localityStr := xdsinternal.LocalityString(clients.Locality{}) - retEndpoints[i] = hierarchy.SetInEndpoint(e, []string{pName, localityStr}) - retEndpoints[i] = wrrlocality.SetAddrInfoInEndpoint(retEndpoints[i], wrrlocality.AddrInfo{LocalityWeight: 1}) - // Copy the nested address field as slice fields are shared by the - // iteration variable and the original slice. - retEndpoints[i].Addresses = append([]resolver.Address{}, e.Addresses...) + retEndpoints[0] = hierarchy.SetInEndpoint(retEndpoints[0], []string{pName, localityStr}) + retEndpoints[0] = wrrlocality.SetAddrInfoInEndpoint(retEndpoints[0], wrrlocality.AddrInfo{LocalityWeight: 1}) } return pName, &clusterimpl.LBConfig{ Cluster: mechanism.Cluster, diff --git a/internal/xds/balancer/clusterresolver/configbuilder_test.go b/internal/xds/balancer/clusterresolver/configbuilder_test.go index df23795ffe9c..e410e14565f0 100644 --- a/internal/xds/balancer/clusterresolver/configbuilder_test.go +++ b/internal/xds/balancer/clusterresolver/configbuilder_test.go @@ -307,35 +307,96 @@ func TestBuildPriorityConfig(t *testing.T) { } } -func testEndpointForDNS(addrStrs []string, localityWeight uint32, path []string) resolver.Endpoint { - endpoint := resolver.Endpoint{} - endpoint.Addresses = append(endpoint.Addresses, resolver.Address{Addr: addrStrs[0]}) - endpoint = hierarchy.SetInEndpoint(endpoint, path) - endpoint = wrrlocality.SetAddrInfoInEndpoint(endpoint, wrrlocality.AddrInfo{LocalityWeight: localityWeight}) - return endpoint +func testEndpointForDNS(endpoint []resolver.Endpoint, localityWeight uint32, path []string) resolver.Endpoint { + retEndpoint := resolver.Endpoint{} + for _, e := range endpoint { + retEndpoint.Addresses = append(retEndpoint.Addresses, e.Addresses...) + } + retEndpoint = hierarchy.SetInEndpoint(retEndpoint, path) + retEndpoint = wrrlocality.SetAddrInfoInEndpoint(retEndpoint, wrrlocality.AddrInfo{LocalityWeight: localityWeight}) + return retEndpoint } func TestBuildClusterImplConfigForDNS(t *testing.T) { - gotName, gotConfig, gotEndpoints := buildClusterImplConfigForDNS(newNameGenerator(3), testResolverEndpoints[0], DiscoveryMechanism{Cluster: testClusterName2, Type: DiscoveryMechanismTypeLogicalDNS}, nil) wantName := "priority-3" localityStr := xdsinternal.LocalityString(clients.Locality{}) wantConfig := &clusterimpl.LBConfig{ Cluster: testClusterName2, ChildPolicy: nil, } - wantEndpoints := []resolver.Endpoint{ - testEndpointForDNS(testEndpoints[0][0].Addresses, 1, []string{wantName, localityStr}), - testEndpointForDNS(testEndpoints[0][1].Addresses, 1, []string{wantName, localityStr}), - } + for _, tt := range []struct { + name string + endpoint []resolver.Endpoint + wantEndpoint []resolver.Endpoint + }{ + { + name: "one endpoint with one address", + endpoint: []resolver.Endpoint{{Addresses: []resolver.Address{{Addr: "addr-0-0"}}}}, + wantEndpoint: []resolver.Endpoint{testEndpointForDNS([]resolver.Endpoint{{Addresses: []resolver.Address{{Addr: "addr-0-0"}}}}, 1, []string{wantName, localityStr})}, + }, + { + name: "one endpoint with multiple addresses", + endpoint: []resolver.Endpoint{{Addresses: []resolver.Address{ + {Addr: "addr-0-0"}, + {Addr: "addr-0-1"}, + }}}, + wantEndpoint: []resolver.Endpoint{ + testEndpointForDNS([]resolver.Endpoint{{Addresses: []resolver.Address{ + {Addr: "addr-0-0"}, + {Addr: "addr-0-1"}, + }}}, 1, []string{wantName, localityStr}), + }, + }, + { + name: "multiple endpoints, all with one address each", + endpoint: []resolver.Endpoint{ + {Addresses: []resolver.Address{{Addr: "addr-0-0"}}}, + {Addresses: []resolver.Address{{Addr: "addr-0-1"}}}, + }, + wantEndpoint: []resolver.Endpoint{ + testEndpointForDNS([]resolver.Endpoint{ + {Addresses: []resolver.Address{{Addr: "addr-0-0"}}}, + {Addresses: []resolver.Address{{Addr: "addr-0-1"}}}, + }, 1, []string{wantName, localityStr}), + }, + }, + { + name: "multiple endpoints, all with multiple addresses", + endpoint: []resolver.Endpoint{ + {Addresses: []resolver.Address{ + {Addr: "addr-0-0"}, + {Addr: "addr-0-1"}, + }}, + {Addresses: []resolver.Address{ + {Addr: "addr-1-0"}, + {Addr: "addr-1-1"}, + }}, + }, + wantEndpoint: []resolver.Endpoint{ + testEndpointForDNS([]resolver.Endpoint{ + {Addresses: []resolver.Address{ + {Addr: "addr-0-0"}, + {Addr: "addr-0-1"}, + }}, + {Addresses: []resolver.Address{ + {Addr: "addr-1-0"}, + {Addr: "addr-1-1"}, + }}, + }, 1, []string{wantName, localityStr}), + }, + }, + } { + gotName, gotConfig, gotEndpoints := buildClusterImplConfigForDNS(newNameGenerator(3), tt.endpoint, DiscoveryMechanism{Cluster: testClusterName2, Type: DiscoveryMechanismTypeLogicalDNS}, nil) - if diff := cmp.Diff(gotName, wantName); diff != "" { - t.Errorf("buildClusterImplConfigForDNS() diff (-got +want) %v", diff) - } - if diff := cmp.Diff(gotConfig, wantConfig); diff != "" { - t.Errorf("buildClusterImplConfigForDNS() diff (-got +want) %v", diff) - } - if diff := cmp.Diff(gotEndpoints, wantEndpoints, endpointCmpOpts); diff != "" { - t.Errorf("buildClusterImplConfigForDNS() diff (-got +want) %v", diff) + if diff := cmp.Diff(gotName, wantName); diff != "" { + t.Errorf("buildClusterImplConfigForDNS() diff (-got +want) %v", diff) + } + if diff := cmp.Diff(gotConfig, wantConfig); diff != "" { + t.Errorf("buildClusterImplConfigForDNS() diff (-got +want) %v", diff) + } + if diff := cmp.Diff(gotEndpoints, tt.wantEndpoint, endpointCmpOpts); diff != "" { + t.Errorf("buildClusterImplConfigForDNS() diff (-got +want) %v", diff) + } } } From 9555a199ff10a457aca60452d89b0d642af9705d Mon Sep 17 00:00:00 2001 From: Pranjali-2501 Date: Wed, 17 Dec 2025 07:33:51 +0000 Subject: [PATCH 04/10] resolving comments --- .../cdsbalancer/e2e_test/dns_impl_test.go | 3 +- .../balancer/clusterresolver/configbuilder.go | 33 ++++++++----- .../clusterresolver/configbuilder_test.go | 46 ++++++++++--------- 3 files changed, 47 insertions(+), 35 deletions(-) diff --git a/internal/xds/balancer/cdsbalancer/e2e_test/dns_impl_test.go b/internal/xds/balancer/cdsbalancer/e2e_test/dns_impl_test.go index a31a53bd6ca8..fa640008524a 100644 --- a/internal/xds/balancer/cdsbalancer/e2e_test/dns_impl_test.go +++ b/internal/xds/balancer/cdsbalancer/e2e_test/dns_impl_test.go @@ -62,7 +62,8 @@ func (s) TestLogicalDNS_MultipleEndpoints(t *testing.T) { server2 := stubserver.StartTestService(t, nil) defer server2.Stop() - // Mock the DNS Resolver + // Override the DNS resolver with a manual resolver that returns the + // addresses of the above server backends. const dnsScheme = "dns" dnsR := manual.NewBuilderWithScheme(dnsScheme) originalDNS := resolver.Get("dns") diff --git a/internal/xds/balancer/clusterresolver/configbuilder.go b/internal/xds/balancer/clusterresolver/configbuilder.go index ac645ea30e76..ef88db114ed8 100644 --- a/internal/xds/balancer/clusterresolver/configbuilder.go +++ b/internal/xds/balancer/clusterresolver/configbuilder.go @@ -141,26 +141,35 @@ func makeClusterImplOutlierDetectionChild(ciCfg *clusterimpl.LBConfig, odCfg out } func buildClusterImplConfigForDNS(g *nameGenerator, endpoints []resolver.Endpoint, mechanism DiscoveryMechanism, xdsLBPolicy *internalserviceconfig.BalancerConfig) (string, *clusterimpl.LBConfig, []resolver.Endpoint) { - var retEndpoints []resolver.Endpoint pName := fmt.Sprintf("priority-%v", g.prefix) - if len(endpoints) >= 1 { - retEndpoints = make([]resolver.Endpoint, 1) - for _, e := range endpoints { - // Copy the nested address field as slice fields are shared by the - // iteration variable and the original slice. - retEndpoints[0].Addresses = append(retEndpoints[0].Addresses, e.Addresses...) - } - localityStr := xdsinternal.LocalityString(clients.Locality{}) - retEndpoints[0] = hierarchy.SetInEndpoint(retEndpoints[0], []string{pName, localityStr}) - retEndpoints[0] = wrrlocality.SetAddrInfoInEndpoint(retEndpoints[0], wrrlocality.AddrInfo{LocalityWeight: 1}) + if len(endpoints) == 0 { + return pName, &clusterimpl.LBConfig{ + Cluster: mechanism.Cluster, + TelemetryLabels: mechanism.TelemetryLabels, + ChildPolicy: xdsLBPolicy, + MaxConcurrentRequests: mechanism.MaxConcurrentRequests, + LoadReportingServer: mechanism.LoadReportingServer, + }, []resolver.Endpoint{} + } + var retEndpoint resolver.Endpoint + for _, e := range endpoints { + // Copy the nested address field as slice fields are shared by the + // iteration variable and the original slice. + retEndpoint.Addresses = append(retEndpoint.Addresses, e.Addresses...) } + localityStr := xdsinternal.LocalityString(clients.Locality{}) + retEndpoint = hierarchy.SetInEndpoint(retEndpoint, []string{pName, localityStr}) + // Set the locality weight to 1. This is required because the child policy + // like wrr which relies on locality weights to distribute traffic. These + // policies may drop traffic if the weight is 0. + retEndpoint = wrrlocality.SetAddrInfoInEndpoint(retEndpoint, wrrlocality.AddrInfo{LocalityWeight: 1}) return pName, &clusterimpl.LBConfig{ Cluster: mechanism.Cluster, TelemetryLabels: mechanism.TelemetryLabels, ChildPolicy: xdsLBPolicy, MaxConcurrentRequests: mechanism.MaxConcurrentRequests, LoadReportingServer: mechanism.LoadReportingServer, - }, retEndpoints + }, []resolver.Endpoint{retEndpoint} } // buildClusterImplConfigForEDS returns a list of cluster_impl configs, one for diff --git a/internal/xds/balancer/clusterresolver/configbuilder_test.go b/internal/xds/balancer/clusterresolver/configbuilder_test.go index e410e14565f0..ac6529773eeb 100644 --- a/internal/xds/balancer/clusterresolver/configbuilder_test.go +++ b/internal/xds/balancer/clusterresolver/configbuilder_test.go @@ -140,7 +140,7 @@ func init() { // TestBuildPriorityConfigJSON is a sanity check that the built balancer config // can be parsed. The behavior test is covered by TestBuildPriorityConfig. -func TestBuildPriorityConfigJSON(t *testing.T) { +func (s) TestBuildPriorityConfigJSON(t *testing.T) { testLRSServerConfig, err := bootstrap.ServerConfigForTesting(bootstrap.ServerConfigTestingOptions{ URI: "trafficdirector.googleapis.com:443", ChannelCreds: []bootstrap.ChannelCreds{{Type: "google_default"}}, @@ -203,7 +203,7 @@ func TestBuildPriorityConfigJSON(t *testing.T) { // TestBuildPriorityConfig tests the priority config generation. Each top level // balancer per priority should be an Outlier Detection balancer, with a Cluster // Impl Balancer as a child. -func TestBuildPriorityConfig(t *testing.T) { +func (s) TestBuildPriorityConfig(t *testing.T) { gotConfig, _, _ := buildPriorityConfig([]priorityConfig{ { // EDS - OD config should be the top level for both of the EDS @@ -317,7 +317,7 @@ func testEndpointForDNS(endpoint []resolver.Endpoint, localityWeight uint32, pat return retEndpoint } -func TestBuildClusterImplConfigForDNS(t *testing.T) { +func (s) TestBuildClusterImplConfigForDNS(t *testing.T) { wantName := "priority-3" localityStr := xdsinternal.LocalityString(clients.Locality{}) wantConfig := &clusterimpl.LBConfig{ @@ -330,12 +330,12 @@ func TestBuildClusterImplConfigForDNS(t *testing.T) { wantEndpoint []resolver.Endpoint }{ { - name: "one endpoint with one address", + name: "one_endpoint_one_address", endpoint: []resolver.Endpoint{{Addresses: []resolver.Address{{Addr: "addr-0-0"}}}}, wantEndpoint: []resolver.Endpoint{testEndpointForDNS([]resolver.Endpoint{{Addresses: []resolver.Address{{Addr: "addr-0-0"}}}}, 1, []string{wantName, localityStr})}, }, { - name: "one endpoint with multiple addresses", + name: "one_endpoint_multiple_addresses", endpoint: []resolver.Endpoint{{Addresses: []resolver.Address{ {Addr: "addr-0-0"}, {Addr: "addr-0-1"}, @@ -348,7 +348,7 @@ func TestBuildClusterImplConfigForDNS(t *testing.T) { }, }, { - name: "multiple endpoints, all with one address each", + name: "multiple_endpoints_one_address_each", endpoint: []resolver.Endpoint{ {Addresses: []resolver.Address{{Addr: "addr-0-0"}}}, {Addresses: []resolver.Address{{Addr: "addr-0-1"}}}, @@ -361,7 +361,7 @@ func TestBuildClusterImplConfigForDNS(t *testing.T) { }, }, { - name: "multiple endpoints, all with multiple addresses", + name: "multiple_endpoints_multiple_addresses", endpoint: []resolver.Endpoint{ {Addresses: []resolver.Address{ {Addr: "addr-0-0"}, @@ -386,21 +386,23 @@ func TestBuildClusterImplConfigForDNS(t *testing.T) { }, }, } { - gotName, gotConfig, gotEndpoints := buildClusterImplConfigForDNS(newNameGenerator(3), tt.endpoint, DiscoveryMechanism{Cluster: testClusterName2, Type: DiscoveryMechanismTypeLogicalDNS}, nil) + t.Run(tt.name, func(t *testing.T) { + gotName, gotConfig, gotEndpoints := buildClusterImplConfigForDNS(newNameGenerator(3), tt.endpoint, DiscoveryMechanism{Cluster: testClusterName2, Type: DiscoveryMechanismTypeLogicalDNS}, nil) - if diff := cmp.Diff(gotName, wantName); diff != "" { - t.Errorf("buildClusterImplConfigForDNS() diff (-got +want) %v", diff) - } - if diff := cmp.Diff(gotConfig, wantConfig); diff != "" { - t.Errorf("buildClusterImplConfigForDNS() diff (-got +want) %v", diff) - } - if diff := cmp.Diff(gotEndpoints, tt.wantEndpoint, endpointCmpOpts); diff != "" { - t.Errorf("buildClusterImplConfigForDNS() diff (-got +want) %v", diff) - } + if diff := cmp.Diff(gotName, wantName); diff != "" { + t.Errorf("buildClusterImplConfigForDNS() diff (-got +want) %v", diff) + } + if diff := cmp.Diff(gotConfig, wantConfig); diff != "" { + t.Errorf("buildClusterImplConfigForDNS() diff (-got +want) %v", diff) + } + if diff := cmp.Diff(gotEndpoints, tt.wantEndpoint, endpointCmpOpts); diff != "" { + t.Errorf("buildClusterImplConfigForDNS() diff (-got +want) %v", diff) + } + }) } } -func TestBuildClusterImplConfigForEDS(t *testing.T) { +func (s) TestBuildClusterImplConfigForEDS(t *testing.T) { testLRSServerConfig, err := bootstrap.ServerConfigForTesting(bootstrap.ServerConfigTestingOptions{ URI: "trafficdirector.googleapis.com:443", ChannelCreds: []bootstrap.ChannelCreds{{Type: "google_default"}}, @@ -506,7 +508,7 @@ func TestBuildClusterImplConfigForEDS(t *testing.T) { } -func TestGroupLocalitiesByPriority(t *testing.T) { +func (s) TestGroupLocalitiesByPriority(t *testing.T) { tests := []struct { name string localities []xdsresource.Locality @@ -568,7 +570,7 @@ func TestGroupLocalitiesByPriority(t *testing.T) { } } -func TestDedupSortedIntSlice(t *testing.T) { +func (s) TestDedupSortedIntSlice(t *testing.T) { tests := []struct { name string a []int @@ -599,7 +601,7 @@ func TestDedupSortedIntSlice(t *testing.T) { } } -func TestPriorityLocalitiesToClusterImpl(t *testing.T) { +func (s) TestPriorityLocalitiesToClusterImpl(t *testing.T) { tests := []struct { name string localities []xdsresource.Locality @@ -746,7 +748,7 @@ func testEndpointWithAttrs(endpoint resolver.Endpoint, localityWeight, endpointW return endpoint } -func TestConvertClusterImplMapToOutlierDetection(t *testing.T) { +func (s) TestConvertClusterImplMapToOutlierDetection(t *testing.T) { tests := []struct { name string ciCfgsMap map[string]*clusterimpl.LBConfig From 03b81e07bea45b9c3b053e06c9dc7267ec10d5db Mon Sep 17 00:00:00 2001 From: Pranjali-2501 Date: Tue, 30 Dec 2025 06:37:11 +0000 Subject: [PATCH 05/10] resolving comments --- .../cdsbalancer/e2e_test/dns_impl_test.go | 23 +++---- .../balancer/clusterresolver/configbuilder.go | 23 +++---- .../clusterresolver/configbuilder_test.go | 63 ++++++------------- 3 files changed, 38 insertions(+), 71 deletions(-) diff --git a/internal/xds/balancer/cdsbalancer/e2e_test/dns_impl_test.go b/internal/xds/balancer/cdsbalancer/e2e_test/dns_impl_test.go index fa640008524a..240f5ea913ba 100644 --- a/internal/xds/balancer/cdsbalancer/e2e_test/dns_impl_test.go +++ b/internal/xds/balancer/cdsbalancer/e2e_test/dns_impl_test.go @@ -20,7 +20,6 @@ import ( "context" "fmt" "testing" - "time" "github.com/google/uuid" "google.golang.org/grpc" @@ -62,8 +61,10 @@ func (s) TestLogicalDNS_MultipleEndpoints(t *testing.T) { server2 := stubserver.StartTestService(t, nil) defer server2.Stop() - // Override the DNS resolver with a manual resolver that returns the - // addresses of the above server backends. + // Register a manual resolver with the "dns" scheme to mock DNS resolution. + // This global override is safe because connection to the xDS management + // server uses the passthrough scheme instead and therefore overriding + // the DNS resolver does not affect it in any way const dnsScheme = "dns" dnsR := manual.NewBuilderWithScheme(dnsScheme) originalDNS := resolver.Get("dns") @@ -106,27 +107,22 @@ func (s) TestLogicalDNS_MultipleEndpoints(t *testing.T) { "dnsHostname": "%s:///target-name", "outlierDetection": {} }], - "xdsLbPolicy":[{"round_robin":{}}] + "xdsLbPolicy":[{"xds_wrr_locality_experimental": {"childPolicy": [{"round_robin": {}}]}}] } }] }`, dnsScheme) - scpr := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(jsonSC) r.InitialState(xdsclient.SetClient(resolver.State{ServiceConfig: scpr}, xdsC)) // Create a ClientConn and make a successful RPC. - cc, err := grpc.NewClient(r.Scheme()+":///test.service", - grpc.WithTransportCredentials(insecure.NewCredentials()), - grpc.WithResolvers(r), - ) - cc.Connect() + cc, err := grpc.NewClient(r.Scheme()+":///test.service", grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(r)) if err != nil { - t.Fatalf("failed to create new client for local test server: %v", err) + t.Fatalf("Failed to create new client for local test server: %v", err) } + cc.Connect() defer cc.Close() - testClient := testpb.NewTestServiceClient(cc) - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() var dnsClientConn resolver.ClientConn @@ -145,6 +141,7 @@ func (s) TestLogicalDNS_MultipleEndpoints(t *testing.T) { }) // Ensure the RPC is routed to the first backend. + testClient := testpb.NewTestServiceClient(cc) var peer peer.Peer if _, err := testClient.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(&peer)); err != nil { t.Fatalf("RPC failed: %v", err) diff --git a/internal/xds/balancer/clusterresolver/configbuilder.go b/internal/xds/balancer/clusterresolver/configbuilder.go index ef88db114ed8..bc7755608209 100644 --- a/internal/xds/balancer/clusterresolver/configbuilder.go +++ b/internal/xds/balancer/clusterresolver/configbuilder.go @@ -142,14 +142,15 @@ func makeClusterImplOutlierDetectionChild(ciCfg *clusterimpl.LBConfig, odCfg out func buildClusterImplConfigForDNS(g *nameGenerator, endpoints []resolver.Endpoint, mechanism DiscoveryMechanism, xdsLBPolicy *internalserviceconfig.BalancerConfig) (string, *clusterimpl.LBConfig, []resolver.Endpoint) { pName := fmt.Sprintf("priority-%v", g.prefix) + lbconfig := &clusterimpl.LBConfig{ + Cluster: mechanism.Cluster, + TelemetryLabels: mechanism.TelemetryLabels, + ChildPolicy: xdsLBPolicy, + MaxConcurrentRequests: mechanism.MaxConcurrentRequests, + LoadReportingServer: mechanism.LoadReportingServer, + } if len(endpoints) == 0 { - return pName, &clusterimpl.LBConfig{ - Cluster: mechanism.Cluster, - TelemetryLabels: mechanism.TelemetryLabels, - ChildPolicy: xdsLBPolicy, - MaxConcurrentRequests: mechanism.MaxConcurrentRequests, - LoadReportingServer: mechanism.LoadReportingServer, - }, []resolver.Endpoint{} + return pName, lbconfig, nil } var retEndpoint resolver.Endpoint for _, e := range endpoints { @@ -163,13 +164,7 @@ func buildClusterImplConfigForDNS(g *nameGenerator, endpoints []resolver.Endpoin // like wrr which relies on locality weights to distribute traffic. These // policies may drop traffic if the weight is 0. retEndpoint = wrrlocality.SetAddrInfoInEndpoint(retEndpoint, wrrlocality.AddrInfo{LocalityWeight: 1}) - return pName, &clusterimpl.LBConfig{ - Cluster: mechanism.Cluster, - TelemetryLabels: mechanism.TelemetryLabels, - ChildPolicy: xdsLBPolicy, - MaxConcurrentRequests: mechanism.MaxConcurrentRequests, - LoadReportingServer: mechanism.LoadReportingServer, - }, []resolver.Endpoint{retEndpoint} + return pName, lbconfig, []resolver.Endpoint{retEndpoint} } // buildClusterImplConfigForEDS returns a list of cluster_impl configs, one for diff --git a/internal/xds/balancer/clusterresolver/configbuilder_test.go b/internal/xds/balancer/clusterresolver/configbuilder_test.go index ac6529773eeb..1b39bf6471ad 100644 --- a/internal/xds/balancer/clusterresolver/configbuilder_test.go +++ b/internal/xds/balancer/clusterresolver/configbuilder_test.go @@ -307,9 +307,9 @@ func (s) TestBuildPriorityConfig(t *testing.T) { } } -func testEndpointForDNS(endpoint []resolver.Endpoint, localityWeight uint32, path []string) resolver.Endpoint { +func testEndpointForDNS(endpoints []resolver.Endpoint, localityWeight uint32, path []string) resolver.Endpoint { retEndpoint := resolver.Endpoint{} - for _, e := range endpoint { + for _, e := range endpoints { retEndpoint.Addresses = append(retEndpoint.Addresses, e.Addresses...) } retEndpoint = hierarchy.SetInEndpoint(retEndpoint, path) @@ -318,51 +318,31 @@ func testEndpointForDNS(endpoint []resolver.Endpoint, localityWeight uint32, pat } func (s) TestBuildClusterImplConfigForDNS(t *testing.T) { - wantName := "priority-3" - localityStr := xdsinternal.LocalityString(clients.Locality{}) - wantConfig := &clusterimpl.LBConfig{ - Cluster: testClusterName2, - ChildPolicy: nil, - } for _, tt := range []struct { - name string - endpoint []resolver.Endpoint - wantEndpoint []resolver.Endpoint + name string + endpoints []resolver.Endpoint }{ { - name: "one_endpoint_one_address", - endpoint: []resolver.Endpoint{{Addresses: []resolver.Address{{Addr: "addr-0-0"}}}}, - wantEndpoint: []resolver.Endpoint{testEndpointForDNS([]resolver.Endpoint{{Addresses: []resolver.Address{{Addr: "addr-0-0"}}}}, 1, []string{wantName, localityStr})}, + name: "one_endpoint_one_address", + endpoints: []resolver.Endpoint{{Addresses: []resolver.Address{{Addr: "addr-0-0"}}}}, }, { name: "one_endpoint_multiple_addresses", - endpoint: []resolver.Endpoint{{Addresses: []resolver.Address{ + endpoints: []resolver.Endpoint{{Addresses: []resolver.Address{ {Addr: "addr-0-0"}, {Addr: "addr-0-1"}, }}}, - wantEndpoint: []resolver.Endpoint{ - testEndpointForDNS([]resolver.Endpoint{{Addresses: []resolver.Address{ - {Addr: "addr-0-0"}, - {Addr: "addr-0-1"}, - }}}, 1, []string{wantName, localityStr}), - }, }, { name: "multiple_endpoints_one_address_each", - endpoint: []resolver.Endpoint{ + endpoints: []resolver.Endpoint{ {Addresses: []resolver.Address{{Addr: "addr-0-0"}}}, {Addresses: []resolver.Address{{Addr: "addr-0-1"}}}, }, - wantEndpoint: []resolver.Endpoint{ - testEndpointForDNS([]resolver.Endpoint{ - {Addresses: []resolver.Address{{Addr: "addr-0-0"}}}, - {Addresses: []resolver.Address{{Addr: "addr-0-1"}}}, - }, 1, []string{wantName, localityStr}), - }, }, { name: "multiple_endpoints_multiple_addresses", - endpoint: []resolver.Endpoint{ + endpoints: []resolver.Endpoint{ {Addresses: []resolver.Address{ {Addr: "addr-0-0"}, {Addr: "addr-0-1"}, @@ -372,30 +352,25 @@ func (s) TestBuildClusterImplConfigForDNS(t *testing.T) { {Addr: "addr-1-1"}, }}, }, - wantEndpoint: []resolver.Endpoint{ - testEndpointForDNS([]resolver.Endpoint{ - {Addresses: []resolver.Address{ - {Addr: "addr-0-0"}, - {Addr: "addr-0-1"}, - }}, - {Addresses: []resolver.Address{ - {Addr: "addr-1-0"}, - {Addr: "addr-1-1"}, - }}, - }, 1, []string{wantName, localityStr}), - }, }, } { t.Run(tt.name, func(t *testing.T) { - gotName, gotConfig, gotEndpoints := buildClusterImplConfigForDNS(newNameGenerator(3), tt.endpoint, DiscoveryMechanism{Cluster: testClusterName2, Type: DiscoveryMechanismTypeLogicalDNS}, nil) + gotName, gotConfig, gotEndpoints := buildClusterImplConfigForDNS(newNameGenerator(3), tt.endpoints, DiscoveryMechanism{Cluster: testClusterName2, Type: DiscoveryMechanismTypeLogicalDNS}, nil) - if diff := cmp.Diff(gotName, wantName); diff != "" { + if diff := cmp.Diff(gotName, "priority-3"); diff != "" { t.Errorf("buildClusterImplConfigForDNS() diff (-got +want) %v", diff) } + + wantConfig := &clusterimpl.LBConfig{ + Cluster: testClusterName2, + ChildPolicy: nil, + } if diff := cmp.Diff(gotConfig, wantConfig); diff != "" { t.Errorf("buildClusterImplConfigForDNS() diff (-got +want) %v", diff) } - if diff := cmp.Diff(gotEndpoints, tt.wantEndpoint, endpointCmpOpts); diff != "" { + + wantEndpoints := []resolver.Endpoint{testEndpointForDNS(tt.endpoints, 1, []string{"priority-3", xdsinternal.LocalityString(clients.Locality{})})} + if diff := cmp.Diff(gotEndpoints, wantEndpoints, endpointCmpOpts); diff != "" { t.Errorf("buildClusterImplConfigForDNS() diff (-got +want) %v", diff) } }) From b1a981f5e40c0430d369558089c79cf42a128ea9 Mon Sep 17 00:00:00 2001 From: Pranjali-2501 Date: Mon, 5 Jan 2026 12:40:42 +0000 Subject: [PATCH 06/10] convert test into e2e test --- .../cdsbalancer/e2e_test/dns_impl_test.go | 125 ++++++++---------- 1 file changed, 53 insertions(+), 72 deletions(-) diff --git a/internal/xds/balancer/cdsbalancer/e2e_test/dns_impl_test.go b/internal/xds/balancer/cdsbalancer/e2e_test/dns_impl_test.go index 240f5ea913ba..9684b7901723 100644 --- a/internal/xds/balancer/cdsbalancer/e2e_test/dns_impl_test.go +++ b/internal/xds/balancer/cdsbalancer/e2e_test/dns_impl_test.go @@ -27,13 +27,13 @@ import ( "google.golang.org/grpc/internal" "google.golang.org/grpc/internal/stubserver" "google.golang.org/grpc/internal/testutils/xds/e2e" - "google.golang.org/grpc/internal/xds/bootstrap" - "google.golang.org/grpc/internal/xds/xdsclient" "google.golang.org/grpc/peer" "google.golang.org/grpc/resolver" "google.golang.org/grpc/resolver/manual" - "google.golang.org/grpc/serviceconfig" + v3clusterpb "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3" + v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3" + v3routepb "github.com/envoyproxy/go-control-plane/envoy/config/route/v3" testpb "google.golang.org/grpc/interop/grpc_testing" _ "google.golang.org/grpc/internal/xds/balancer/clusterresolver" @@ -44,9 +44,9 @@ import ( // // The test verifies that multiple addresses returned by the DNS resolver are // grouped into a single endpoint (as per gRFC A61). Because the round_robin -// LB policy (configured via xdsLbPolicy) sees only one endpoint, it should -// not rotate traffic between the addresses. Instead, the single endpoint -// (which contains all addresses) is picked, and connects to the first address. +// LB policy sees only one endpoint, it should not rotate traffic between the +// addresses. Instead, the single endpoint is picked, and connects to the +// first address. func (s) TestLogicalDNS_MultipleEndpoints(t *testing.T) { // Spin up a management server to receive xDS resources from. managementServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{}) @@ -54,6 +54,10 @@ func (s) TestLogicalDNS_MultipleEndpoints(t *testing.T) { // Create bootstrap configuration pointing to the above management server. nodeID := uuid.New().String() bootstrapContents := e2e.DefaultBootstrapContents(t, nodeID, managementServer.Address) + resolverBuilder, err := internal.NewXDSResolverWithConfigForTesting.(func([]byte) (resolver.Builder, error))(bootstrapContents) + if err != nil { + t.Fatalf("Failed to create xDS resolver for testing: %v", err) + } // Start backend servers which provide an implementation of the TestService. server1 := stubserver.StartTestService(t, nil) @@ -64,90 +68,67 @@ func (s) TestLogicalDNS_MultipleEndpoints(t *testing.T) { // Register a manual resolver with the "dns" scheme to mock DNS resolution. // This global override is safe because connection to the xDS management // server uses the passthrough scheme instead and therefore overriding - // the DNS resolver does not affect it in any way + // the DNS resolver does not affect it in any way. const dnsScheme = "dns" dnsR := manual.NewBuilderWithScheme(dnsScheme) originalDNS := resolver.Get("dns") resolver.Register(dnsR) t.Cleanup(func() { resolver.Register(originalDNS) }) - // Capture the ClientConn created by the cluster_resolver so we can push updates. - // We use a channel to synchronize access and avoid race conditions. - dnsCCCh := make(chan resolver.ClientConn, 1) - dnsR.BuildCallback = func(_ resolver.Target, cc resolver.ClientConn, _ resolver.BuildOptions) { - select { - case dnsCCCh <- cc: - default: - } - } - - // Create an xDS client for use by the cluster_resolver LB policy. - config, err := bootstrap.NewConfigFromContents(bootstrapContents) - if err != nil { - t.Fatalf("Failed to parse bootstrap: %v", err) - } - pool := xdsclient.NewPool(config) - xdsC, closeClient, err := pool.NewClientForTesting(xdsclient.OptionsForTesting{ - Name: t.Name(), + // For LOGICAL_DNS, this updates the SINGLE endpoint to have 2 IPs. + dnsR.InitialState(resolver.State{ + Addresses: []resolver.Address{ + {Addr: server1.Address}, + {Addr: server2.Address}, + }, }) - if err != nil { - t.Fatalf("Failed to create xDS client: %v", err) - } - defer closeClient() - - // Create a manual resolver and push service config specifying the use of - // the cluster_resolver LB policy with LOGICAL_DNS discovery mechanism. - r := manual.NewBuilderWithScheme("whatever") - jsonSC := fmt.Sprintf(`{ - "loadBalancingConfig":[{ - "cluster_resolver_experimental":{ - "discoveryMechanisms": [{ - "cluster": "test-cluster", - "type": "LOGICAL_DNS", - "dnsHostname": "%s:///target-name", - "outlierDetection": {} - }], - "xdsLbPolicy":[{"xds_wrr_locality_experimental": {"childPolicy": [{"round_robin": {}}]}}] - } - }] - }`, dnsScheme) - scpr := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(jsonSC) - r.InitialState(xdsclient.SetClient(resolver.State{ServiceConfig: scpr}, xdsC)) - // Create a ClientConn and make a successful RPC. - cc, err := grpc.NewClient(r.Scheme()+":///test.service", grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(r)) - if err != nil { - t.Fatalf("Failed to create new client for local test server: %v", err) + const ( + serviceName = "test-xds-service" + clusterName = "cluster-test-xds-service" + endpointsName = "endpoints-test-xds-service" + rdsName = "route-test-xds-service" + ) + + resources := e2e.UpdateOptions{ + NodeID: nodeID, + Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(serviceName, rdsName)}, + Routes: []*v3routepb.RouteConfiguration{e2e.DefaultRouteConfig(rdsName, serviceName, clusterName)}, + Clusters: []*v3clusterpb.Cluster{e2e.ClusterResourceWithOptions(e2e.ClusterOptions{ + ClusterName: clusterName, + ServiceName: endpointsName, + Type: e2e.ClusterTypeLogicalDNS, + DNSHostName: "dns", + DNSPort: uint32(8080), + Policy: e2e.LoadBalancingPolicyRoundRobin, + })}, + Endpoints: nil, } - cc.Connect() - defer cc.Close() ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() - var dnsClientConn resolver.ClientConn - select { - case dnsClientConn = <-dnsCCCh: - case <-ctx.Done(): - t.Fatal("Timeout waiting for cluster_resolver to build the DNS resolver") + if err := managementServer.Update(ctx, resources); err != nil { + t.Fatalf("Failed to update management server: %v", err) } - // For LOGICAL_DNS, this updates the SINGLE endpoint to have 2 IPs. - dnsClientConn.UpdateState(resolver.State{ - Addresses: []resolver.Address{ - {Addr: server1.Address}, - {Addr: server2.Address}, - }, - }) + // Create a ClientConn and make a successful RPC. + cc, err := grpc.NewClient(fmt.Sprintf("xds:///%s", serviceName), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(resolverBuilder)) + if err != nil { + t.Fatalf("Failed to create new client for local test server: %v", err) + } + defer cc.Close() // Ensure the RPC is routed to the first backend. testClient := testpb.NewTestServiceClient(cc) - var peer peer.Peer - if _, err := testClient.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(&peer)); err != nil { - t.Fatalf("RPC failed: %v", err) - } + for i := 0; i < 10; i++ { + var peer peer.Peer + if _, err := testClient.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(&peer)); err != nil { + t.Fatalf("RPC failed: %v", err) + } - if got, want := peer.Addr.String(), server1.Address; got != want { - t.Errorf("peer.Addr = %q, want = %q", got, want) + if got, want := peer.Addr.String(), server1.Address; got != want { + t.Errorf("peer.Addr = %q, want = %q", got, want) + } } } From b99da6f5245cd4a811ac3a5110cb3f1fbbaf05bd Mon Sep 17 00:00:00 2001 From: Pranjali-2501 Date: Tue, 6 Jan 2026 03:34:23 +0000 Subject: [PATCH 07/10] add comment --- internal/xds/balancer/clusterresolver/configbuilder.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/internal/xds/balancer/clusterresolver/configbuilder.go b/internal/xds/balancer/clusterresolver/configbuilder.go index bc7755608209..1e721321cd02 100644 --- a/internal/xds/balancer/clusterresolver/configbuilder.go +++ b/internal/xds/balancer/clusterresolver/configbuilder.go @@ -158,6 +158,10 @@ func buildClusterImplConfigForDNS(g *nameGenerator, endpoints []resolver.Endpoin // iteration variable and the original slice. retEndpoint.Addresses = append(retEndpoint.Addresses, e.Addresses...) } + // Even though localities are not a thing for the LOGICAL_DNS cluster and + // its endpoint(s), we add an empty locality attribute here to ensure that + // LB policies that rely on locality information (like weighted_target) + // continue to work. localityStr := xdsinternal.LocalityString(clients.Locality{}) retEndpoint = hierarchy.SetInEndpoint(retEndpoint, []string{pName, localityStr}) // Set the locality weight to 1. This is required because the child policy From d7e8178e6aadd75d52914a330d7560c82f685b59 Mon Sep 17 00:00:00 2001 From: Pranjali-2501 Date: Tue, 6 Jan 2026 07:47:57 +0000 Subject: [PATCH 08/10] resolving comments --- .../cdsbalancer/e2e_test/dns_impl_test.go | 2 +- .../clusterresolver/configbuilder_test.go | 25 ++++++++++++------- 2 files changed, 17 insertions(+), 10 deletions(-) diff --git a/internal/xds/balancer/cdsbalancer/e2e_test/dns_impl_test.go b/internal/xds/balancer/cdsbalancer/e2e_test/dns_impl_test.go index 9684b7901723..e31bd29bef9d 100644 --- a/internal/xds/balancer/cdsbalancer/e2e_test/dns_impl_test.go +++ b/internal/xds/balancer/cdsbalancer/e2e_test/dns_impl_test.go @@ -113,7 +113,7 @@ func (s) TestLogicalDNS_MultipleEndpoints(t *testing.T) { } // Create a ClientConn and make a successful RPC. - cc, err := grpc.NewClient(fmt.Sprintf("xds:///%s", serviceName), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(resolverBuilder)) + cc, err := grpc.NewClient(fmt.Sprintf("xds:///"+serviceName), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(resolverBuilder)) if err != nil { t.Fatalf("Failed to create new client for local test server: %v", err) } diff --git a/internal/xds/balancer/clusterresolver/configbuilder_test.go b/internal/xds/balancer/clusterresolver/configbuilder_test.go index 1b39bf6471ad..bf1c75a3685f 100644 --- a/internal/xds/balancer/clusterresolver/configbuilder_test.go +++ b/internal/xds/balancer/clusterresolver/configbuilder_test.go @@ -29,6 +29,7 @@ import ( "github.com/google/go-cmp/cmp" "google.golang.org/grpc/attributes" "google.golang.org/grpc/balancer" + "google.golang.org/grpc/balancer/pickfirst" "google.golang.org/grpc/balancer/ringhash" "google.golang.org/grpc/balancer/roundrobin" "google.golang.org/grpc/internal/balancer/weight" @@ -318,13 +319,16 @@ func testEndpointForDNS(endpoints []resolver.Endpoint, localityWeight uint32, pa } func (s) TestBuildClusterImplConfigForDNS(t *testing.T) { + for _, tt := range []struct { - name string - endpoints []resolver.Endpoint + name string + endpoints []resolver.Endpoint + xdsLBPolicy *iserviceconfig.BalancerConfig }{ { - name: "one_endpoint_one_address", - endpoints: []resolver.Endpoint{{Addresses: []resolver.Address{{Addr: "addr-0-0"}}}}, + name: "one_endpoint_one_address", + endpoints: []resolver.Endpoint{{Addresses: []resolver.Address{{Addr: "addr-0-0"}}}}, + xdsLBPolicy: &iserviceconfig.BalancerConfig{Name: pickfirst.Name}, }, { name: "one_endpoint_multiple_addresses", @@ -332,6 +336,7 @@ func (s) TestBuildClusterImplConfigForDNS(t *testing.T) { {Addr: "addr-0-0"}, {Addr: "addr-0-1"}, }}}, + xdsLBPolicy: &iserviceconfig.BalancerConfig{Name: wrrlocality.Name}, }, { name: "multiple_endpoints_one_address_each", @@ -339,6 +344,7 @@ func (s) TestBuildClusterImplConfigForDNS(t *testing.T) { {Addresses: []resolver.Address{{Addr: "addr-0-0"}}}, {Addresses: []resolver.Address{{Addr: "addr-0-1"}}}, }, + xdsLBPolicy: &iserviceconfig.BalancerConfig{Name: roundrobin.Name}, }, { name: "multiple_endpoints_multiple_addresses", @@ -352,24 +358,25 @@ func (s) TestBuildClusterImplConfigForDNS(t *testing.T) { {Addr: "addr-1-1"}, }}, }, + xdsLBPolicy: &iserviceconfig.BalancerConfig{Name: roundrobin.Name}, }, } { t.Run(tt.name, func(t *testing.T) { - gotName, gotConfig, gotEndpoints := buildClusterImplConfigForDNS(newNameGenerator(3), tt.endpoints, DiscoveryMechanism{Cluster: testClusterName2, Type: DiscoveryMechanismTypeLogicalDNS}, nil) - - if diff := cmp.Diff(gotName, "priority-3"); diff != "" { + gotName, gotConfig, gotEndpoints := buildClusterImplConfigForDNS(newNameGenerator(3), tt.endpoints, DiscoveryMechanism{Cluster: testClusterName2, Type: DiscoveryMechanismTypeLogicalDNS}, tt.xdsLBPolicy) + wantName := "priority-3" + if diff := cmp.Diff(gotName, wantName); diff != "" { t.Errorf("buildClusterImplConfigForDNS() diff (-got +want) %v", diff) } wantConfig := &clusterimpl.LBConfig{ Cluster: testClusterName2, - ChildPolicy: nil, + ChildPolicy: tt.xdsLBPolicy, } if diff := cmp.Diff(gotConfig, wantConfig); diff != "" { t.Errorf("buildClusterImplConfigForDNS() diff (-got +want) %v", diff) } - wantEndpoints := []resolver.Endpoint{testEndpointForDNS(tt.endpoints, 1, []string{"priority-3", xdsinternal.LocalityString(clients.Locality{})})} + wantEndpoints := []resolver.Endpoint{testEndpointForDNS(tt.endpoints, 1, []string{wantName, xdsinternal.LocalityString(clients.Locality{})})} if diff := cmp.Diff(gotEndpoints, wantEndpoints, endpointCmpOpts); diff != "" { t.Errorf("buildClusterImplConfigForDNS() diff (-got +want) %v", diff) } From fa04dda9e3dedea8b5fce3812b64d72738f87646 Mon Sep 17 00:00:00 2001 From: Pranjali-2501 Date: Tue, 6 Jan 2026 19:59:25 +0000 Subject: [PATCH 09/10] fix test --- internal/xds/balancer/clusterimpl/tests/balancer_test.go | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/internal/xds/balancer/clusterimpl/tests/balancer_test.go b/internal/xds/balancer/clusterimpl/tests/balancer_test.go index f0418aba3030..48b97b0e96d6 100644 --- a/internal/xds/balancer/clusterimpl/tests/balancer_test.go +++ b/internal/xds/balancer/clusterimpl/tests/balancer_test.go @@ -987,14 +987,9 @@ func (s) TestReResolutionAfterTransientFailure(t *testing.T) { } // Stopping the server listener will close the transport on the client, - // which will lead to the channel eventually moving to IDLE. + // which will lead to the channel eventually moving to TRANSIENT_FAILURE. lis.Stop() - testutils.AwaitState(ctx, t, conn, connectivity.Idle) - - // An RPC at this point is expected to fail with TRANSIENT_FAILURE. - if _, err = client.EmptyCall(ctx, &testpb.Empty{}); status.Code(err) != codes.Unavailable { - t.Fatalf("EmptyCall RPC succeeded when the channel is in TRANSIENT_FAILURE, got %v want %v", err, codes.Unavailable) - } + testutils.AwaitState(ctx, t, conn, connectivity.TransientFailure) // Expect resolver's ResolveNow to be called due to TF state. select { From 99fc16f33b25919075c43bb99e62c57c7628280e Mon Sep 17 00:00:00 2001 From: Pranjali-2501 Date: Wed, 7 Jan 2026 09:21:59 +0000 Subject: [PATCH 10/10] resolving comments --- .../xds/balancer/cdsbalancer/e2e_test/dns_impl_test.go | 2 +- internal/xds/balancer/clusterresolver/configbuilder.go | 7 ++++--- .../xds/balancer/clusterresolver/configbuilder_test.go | 4 ++-- 3 files changed, 7 insertions(+), 6 deletions(-) diff --git a/internal/xds/balancer/cdsbalancer/e2e_test/dns_impl_test.go b/internal/xds/balancer/cdsbalancer/e2e_test/dns_impl_test.go index e31bd29bef9d..bdda9f386eb3 100644 --- a/internal/xds/balancer/cdsbalancer/e2e_test/dns_impl_test.go +++ b/internal/xds/balancer/cdsbalancer/e2e_test/dns_impl_test.go @@ -1,5 +1,5 @@ /* - * Copyright 2025 gRPC authors. + * Copyright 2026 gRPC authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/internal/xds/balancer/clusterresolver/configbuilder.go b/internal/xds/balancer/clusterresolver/configbuilder.go index 1e721321cd02..3da889ec9ad5 100644 --- a/internal/xds/balancer/clusterresolver/configbuilder.go +++ b/internal/xds/balancer/clusterresolver/configbuilder.go @@ -154,8 +154,9 @@ func buildClusterImplConfigForDNS(g *nameGenerator, endpoints []resolver.Endpoin } var retEndpoint resolver.Endpoint for _, e := range endpoints { - // Copy the nested address field as slice fields are shared by the - // iteration variable and the original slice. + // LOGICAL_DNS requires all resolved addresses to be grouped into a + // single logical endpoint. We iterate over the input endpoints and + // aggregate their addresses into a new endpoint variable. retEndpoint.Addresses = append(retEndpoint.Addresses, e.Addresses...) } // Even though localities are not a thing for the LOGICAL_DNS cluster and @@ -167,7 +168,7 @@ func buildClusterImplConfigForDNS(g *nameGenerator, endpoints []resolver.Endpoin // Set the locality weight to 1. This is required because the child policy // like wrr which relies on locality weights to distribute traffic. These // policies may drop traffic if the weight is 0. - retEndpoint = wrrlocality.SetAddrInfoInEndpoint(retEndpoint, wrrlocality.AddrInfo{LocalityWeight: 1}) + retEndpoint = wrrlocality.SetAddrInfo(retEndpoint, wrrlocality.AddrInfo{LocalityWeight: 1}) return pName, lbconfig, []resolver.Endpoint{retEndpoint} } diff --git a/internal/xds/balancer/clusterresolver/configbuilder_test.go b/internal/xds/balancer/clusterresolver/configbuilder_test.go index bf1c75a3685f..14abba5deeaa 100644 --- a/internal/xds/balancer/clusterresolver/configbuilder_test.go +++ b/internal/xds/balancer/clusterresolver/configbuilder_test.go @@ -314,7 +314,7 @@ func testEndpointForDNS(endpoints []resolver.Endpoint, localityWeight uint32, pa retEndpoint.Addresses = append(retEndpoint.Addresses, e.Addresses...) } retEndpoint = hierarchy.SetInEndpoint(retEndpoint, path) - retEndpoint = wrrlocality.SetAddrInfoInEndpoint(retEndpoint, wrrlocality.AddrInfo{LocalityWeight: localityWeight}) + retEndpoint = wrrlocality.SetAddrInfo(retEndpoint, wrrlocality.AddrInfo{LocalityWeight: localityWeight}) return retEndpoint } @@ -363,7 +363,7 @@ func (s) TestBuildClusterImplConfigForDNS(t *testing.T) { } { t.Run(tt.name, func(t *testing.T) { gotName, gotConfig, gotEndpoints := buildClusterImplConfigForDNS(newNameGenerator(3), tt.endpoints, DiscoveryMechanism{Cluster: testClusterName2, Type: DiscoveryMechanismTypeLogicalDNS}, tt.xdsLBPolicy) - wantName := "priority-3" + const wantName = "priority-3" if diff := cmp.Diff(gotName, wantName); diff != "" { t.Errorf("buildClusterImplConfigForDNS() diff (-got +want) %v", diff) }