diff --git a/balancer/ringhash/ringhash_e2e_test.go b/balancer/ringhash/ringhash_e2e_test.go index b901e971344e..23ce37efa383 100644 --- a/balancer/ringhash/ringhash_e2e_test.go +++ b/balancer/ringhash/ringhash_e2e_test.go @@ -475,7 +475,9 @@ func (s) TestRingHash_AggregateClusterFallBackFromRingHashToLogicalDnsAtStartup( } dnsR := replaceDNSResolver(t) - dnsR.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: backends[0]}}}) + dnsR.UpdateState(resolver.State{ + Endpoints: []resolver.Endpoint{{Addresses: []resolver.Address{{Addr: backends[0]}}}}, + }) if err := xdsServer.Update(ctx, updateOpts); err != nil { t.Fatalf("Failed to update xDS resources: %v", err) @@ -553,7 +555,9 @@ func (s) TestRingHash_AggregateClusterFallBackFromRingHashToLogicalDnsAtStartupN } dnsR := replaceDNSResolver(t) - dnsR.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: backends[0]}}}) + dnsR.UpdateState(resolver.State{ + Endpoints: []resolver.Endpoint{{Addresses: []resolver.Address{{Addr: backends[0]}}}}, + }) if err := xdsServer.Update(ctx, updateOpts); err != nil { t.Fatalf("Failed to update xDS resources: %v", err) diff --git a/balancer/weightedtarget/weightedtarget.go b/balancer/weightedtarget/weightedtarget.go index b47925a34caf..02230d1a9b0f 100644 --- a/balancer/weightedtarget/weightedtarget.go +++ b/balancer/weightedtarget/weightedtarget.go @@ -106,8 +106,7 @@ func (b *weightedTargetBalancer) UpdateClientConnState(s balancer.ClientConnStat if !ok { return fmt.Errorf("unexpected balancer config with type: %T", s.BalancerConfig) } - addressesSplit := hierarchy.Group(s.ResolverState.Addresses) - endpointsSplit := hierarchy.GroupEndpoints(s.ResolverState.Endpoints) + endpointsSplit := hierarchy.Group(s.ResolverState.Endpoints) b.stateAggregator.PauseStateUpdates() defer b.stateAggregator.ResumeStateUpdates() @@ -154,7 +153,6 @@ func (b *weightedTargetBalancer) UpdateClientConnState(s balancer.ClientConnStat // TODO: handle error? How to aggregate errors and return? _ = b.bg.UpdateClientConnState(name, balancer.ClientConnState{ ResolverState: resolver.State{ - Addresses: addressesSplit[name], Endpoints: endpointsSplit[name], ServiceConfig: s.ResolverState.ServiceConfig, Attributes: s.ResolverState.Attributes.WithValue(localityKey, name), diff --git a/internal/hierarchy/hierarchy.go b/internal/hierarchy/hierarchy.go index 362c05fa2aa6..979d4cd7d205 100644 --- a/internal/hierarchy/hierarchy.go +++ b/internal/hierarchy/hierarchy.go @@ -60,70 +60,7 @@ func SetInEndpoint(endpoint resolver.Endpoint, path []string) resolver.Endpoint return endpoint } -// Get returns the hierarchical path of addr. -func Get(addr resolver.Address) []string { - attrs := addr.BalancerAttributes - if attrs == nil { - return nil - } - path, _ := attrs.Value(pathKey).(pathValue) - return ([]string)(path) -} - -// Set overrides the hierarchical path in addr with path. -func Set(addr resolver.Address, path []string) resolver.Address { - addr.BalancerAttributes = addr.BalancerAttributes.WithValue(pathKey, pathValue(path)) - return addr -} - -// Group splits a slice of addresses into groups based on -// the first hierarchy path. The first hierarchy path will be removed from the -// result. -// -// Input: -// [ -// -// {addr0, path: [p0, wt0]} -// {addr1, path: [p0, wt1]} -// {addr2, path: [p1, wt2]} -// {addr3, path: [p1, wt3]} -// -// ] -// -// Addresses will be split into p0/p1, and the p0/p1 will be removed from the -// path. -// -// Output: -// -// { -// p0: [ -// {addr0, path: [wt0]}, -// {addr1, path: [wt1]}, -// ], -// p1: [ -// {addr2, path: [wt2]}, -// {addr3, path: [wt3]}, -// ], -// } -// -// If hierarchical path is not set, or has no path in it, the address is -// dropped. -func Group(addrs []resolver.Address) map[string][]resolver.Address { - ret := make(map[string][]resolver.Address) - for _, addr := range addrs { - oldPath := Get(addr) - if len(oldPath) == 0 { - continue - } - curPath := oldPath[0] - newPath := oldPath[1:] - newAddr := Set(addr, newPath) - ret[curPath] = append(ret[curPath], newAddr) - } - return ret -} - -// GroupEndpoints splits a slice of endpoints into groups based on +// Group splits a slice of endpoints into groups based on // the first hierarchy path. The first hierarchy path will be removed from the // result. // @@ -155,7 +92,7 @@ func Group(addrs []resolver.Address) map[string][]resolver.Address { // // If hierarchical path is not set, or has no path in it, the endpoint is // dropped. -func GroupEndpoints(endpoints []resolver.Endpoint) map[string][]resolver.Endpoint { +func Group(endpoints []resolver.Endpoint) map[string][]resolver.Endpoint { ret := make(map[string][]resolver.Endpoint) for _, endpoint := range endpoints { oldPath := FromEndpoint(endpoint) diff --git a/internal/hierarchy/hierarchy_test.go b/internal/hierarchy/hierarchy_test.go index 1043d5f81dfa..fadfd0a19630 100644 --- a/internal/hierarchy/hierarchy_test.go +++ b/internal/hierarchy/hierarchy_test.go @@ -26,59 +26,59 @@ import ( "google.golang.org/grpc/resolver" ) -func TestGet(t *testing.T) { +func TestFromEndpoint(t *testing.T) { tests := []struct { name string - addr resolver.Address + ep resolver.Endpoint want []string }{ { name: "not set", - addr: resolver.Address{}, + ep: resolver.Endpoint{}, want: nil, }, { name: "set", - addr: resolver.Address{ - BalancerAttributes: attributes.New(pathKey, pathValue{"a", "b"}), + ep: resolver.Endpoint{ + Attributes: attributes.New(pathKey, pathValue{"a", "b"}), }, want: []string{"a", "b"}, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - if got := Get(tt.addr); !cmp.Equal(got, tt.want) { - t.Errorf("Get() = %v, want %v", got, tt.want) + if got := FromEndpoint(tt.ep); !cmp.Equal(got, tt.want) { + t.Errorf("FromEndpoint() = %v, want %v", got, tt.want) } }) } } -func TestSet(t *testing.T) { +func TestSetInEndpoint(t *testing.T) { tests := []struct { name string - addr resolver.Address + ep resolver.Endpoint path []string }{ { name: "before is not set", - addr: resolver.Address{}, + ep: resolver.Endpoint{}, path: []string{"a", "b"}, }, { name: "before is set", - addr: resolver.Address{ - BalancerAttributes: attributes.New(pathKey, pathValue{"before", "a", "b"}), + ep: resolver.Endpoint{ + Attributes: attributes.New(pathKey, pathValue{"before", "a", "b"}), }, path: []string{"a", "b"}, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - newAddr := Set(tt.addr, tt.path) - newPath := Get(newAddr) + newEP := SetInEndpoint(tt.ep, tt.path) + newPath := FromEndpoint(newEP) if !cmp.Equal(newPath, tt.path) { - t.Errorf("path after Set() = %v, want %v", newPath, tt.path) + t.Errorf("path after SetInEndpoint() = %v, want %v", newPath, tt.path) } }) } @@ -86,66 +86,66 @@ func TestSet(t *testing.T) { func TestGroup(t *testing.T) { tests := []struct { - name string - addrs []resolver.Address - want map[string][]resolver.Address + name string + eps []resolver.Endpoint + want map[string][]resolver.Endpoint }{ { name: "all with hierarchy", - addrs: []resolver.Address{ - {Addr: "a0", BalancerAttributes: attributes.New(pathKey, pathValue{"a"})}, - {Addr: "a1", BalancerAttributes: attributes.New(pathKey, pathValue{"a"})}, - {Addr: "b0", BalancerAttributes: attributes.New(pathKey, pathValue{"b"})}, - {Addr: "b1", BalancerAttributes: attributes.New(pathKey, pathValue{"b"})}, + eps: []resolver.Endpoint{ + {Addresses: []resolver.Address{{Addr: "a0"}}, Attributes: attributes.New(pathKey, pathValue{"a"})}, + {Addresses: []resolver.Address{{Addr: "a1"}}, Attributes: attributes.New(pathKey, pathValue{"a"})}, + {Addresses: []resolver.Address{{Addr: "b0"}}, Attributes: attributes.New(pathKey, pathValue{"b"})}, + {Addresses: []resolver.Address{{Addr: "b1"}}, Attributes: attributes.New(pathKey, pathValue{"b"})}, }, - want: map[string][]resolver.Address{ + want: map[string][]resolver.Endpoint{ "a": { - {Addr: "a0", BalancerAttributes: attributes.New(pathKey, pathValue{})}, - {Addr: "a1", BalancerAttributes: attributes.New(pathKey, pathValue{})}, + {Addresses: []resolver.Address{{Addr: "a0"}}, Attributes: attributes.New(pathKey, pathValue{})}, + {Addresses: []resolver.Address{{Addr: "a1"}}, Attributes: attributes.New(pathKey, pathValue{})}, }, "b": { - {Addr: "b0", BalancerAttributes: attributes.New(pathKey, pathValue{})}, - {Addr: "b1", BalancerAttributes: attributes.New(pathKey, pathValue{})}, + {Addresses: []resolver.Address{{Addr: "b0"}}, Attributes: attributes.New(pathKey, pathValue{})}, + {Addresses: []resolver.Address{{Addr: "b1"}}, Attributes: attributes.New(pathKey, pathValue{})}, }, }, }, { - // Addresses without hierarchy are ignored. + // Endpoints without hierarchy are ignored. name: "without hierarchy", - addrs: []resolver.Address{ - {Addr: "a0", BalancerAttributes: attributes.New(pathKey, pathValue{"a"})}, - {Addr: "a1", BalancerAttributes: attributes.New(pathKey, pathValue{"a"})}, - {Addr: "b0", BalancerAttributes: nil}, - {Addr: "b1", BalancerAttributes: nil}, + eps: []resolver.Endpoint{ + {Addresses: []resolver.Address{{Addr: "a0"}}, Attributes: attributes.New(pathKey, pathValue{"a"})}, + {Addresses: []resolver.Address{{Addr: "a1"}}, Attributes: attributes.New(pathKey, pathValue{"a"})}, + {Addresses: []resolver.Address{{Addr: "b0"}}, Attributes: nil}, + {Addresses: []resolver.Address{{Addr: "b1"}}, Attributes: nil}, }, - want: map[string][]resolver.Address{ + want: map[string][]resolver.Endpoint{ "a": { - {Addr: "a0", BalancerAttributes: attributes.New(pathKey, pathValue{})}, - {Addr: "a1", BalancerAttributes: attributes.New(pathKey, pathValue{})}, + {Addresses: []resolver.Address{{Addr: "a0"}}, Attributes: attributes.New(pathKey, pathValue{})}, + {Addresses: []resolver.Address{{Addr: "a1"}}, Attributes: attributes.New(pathKey, pathValue{})}, }, }, }, { // If hierarchy is set to a wrong type (which should never happen), - // the address is ignored. + // the endpoint is ignored. name: "wrong type", - addrs: []resolver.Address{ - {Addr: "a0", BalancerAttributes: attributes.New(pathKey, pathValue{"a"})}, - {Addr: "a1", BalancerAttributes: attributes.New(pathKey, pathValue{"a"})}, - {Addr: "b0", BalancerAttributes: attributes.New(pathKey, "b")}, - {Addr: "b1", BalancerAttributes: attributes.New(pathKey, 314)}, + eps: []resolver.Endpoint{ + {Addresses: []resolver.Address{{Addr: "a0"}}, Attributes: attributes.New(pathKey, pathValue{"a"})}, + {Addresses: []resolver.Address{{Addr: "a1"}}, Attributes: attributes.New(pathKey, pathValue{"a"})}, + {Addresses: []resolver.Address{{Addr: "b0"}}, Attributes: attributes.New(pathKey, "b")}, + {Addresses: []resolver.Address{{Addr: "b1"}}, Attributes: attributes.New(pathKey, 314)}, }, - want: map[string][]resolver.Address{ + want: map[string][]resolver.Endpoint{ "a": { - {Addr: "a0", BalancerAttributes: attributes.New(pathKey, pathValue{})}, - {Addr: "a1", BalancerAttributes: attributes.New(pathKey, pathValue{})}, + {Addresses: []resolver.Address{{Addr: "a0"}}, Attributes: attributes.New(pathKey, pathValue{})}, + {Addresses: []resolver.Address{{Addr: "a1"}}, Attributes: attributes.New(pathKey, pathValue{})}, }, }, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - if got := Group(tt.addrs); !cmp.Equal(got, tt.want, cmp.AllowUnexported(attributes.Attributes{})) { + if got := Group(tt.eps); !cmp.Equal(got, tt.want, cmp.AllowUnexported(attributes.Attributes{})) { t.Errorf("Group() = %v, want %v", got, tt.want) t.Errorf("diff: %v", cmp.Diff(got, tt.want, cmp.AllowUnexported(attributes.Attributes{}))) } @@ -165,28 +165,28 @@ func TestGroupE2E(t *testing.T) { }, } - var addrsWithHierarchy []resolver.Address + var epsWithHierarchy []resolver.Endpoint for p, wts := range hierarchy { path1 := pathValue{p} for wt, addrs := range wts { path2 := append(pathValue(nil), path1...) path2 = append(path2, wt) for _, addr := range addrs { - a := resolver.Address{ - Addr: addr, - BalancerAttributes: attributes.New(pathKey, path2), + a := resolver.Endpoint{ + Addresses: []resolver.Address{{Addr: addr}}, + Attributes: attributes.New(pathKey, path2), } - addrsWithHierarchy = append(addrsWithHierarchy, a) + epsWithHierarchy = append(epsWithHierarchy, a) } } } gotHierarchy := make(map[string]map[string][]string) - for p1, wts := range Group(addrsWithHierarchy) { + for p1, wts := range Group(epsWithHierarchy) { gotHierarchy[p1] = make(map[string][]string) - for p2, addrs := range Group(wts) { - for _, addr := range addrs { - gotHierarchy[p1][p2] = append(gotHierarchy[p1][p2], addr.Addr) + for p2, eps := range Group(wts) { + for _, ep := range eps { + gotHierarchy[p1][p2] = append(gotHierarchy[p1][p2], ep.Addresses[0].Addr) } } } diff --git a/internal/xds/balancer/cdsbalancer/e2e_test/aggregate_cluster_test.go b/internal/xds/balancer/cdsbalancer/e2e_test/aggregate_cluster_test.go index 9e6281f11af1..f3ea4c8946fb 100644 --- a/internal/xds/balancer/cdsbalancer/e2e_test/aggregate_cluster_test.go +++ b/internal/xds/balancer/cdsbalancer/e2e_test/aggregate_cluster_test.go @@ -589,7 +589,7 @@ func (s) TestAggregateCluster_WithEDSAndDNS(t *testing.T) { } // Update DNS resolver with test backend addresses. - dnsR.UpdateState(resolver.State{Addresses: addrs[1:]}) + dnsR.UpdateState(resolver.State{Endpoints: addrsToEndpoints(addrs[1:])}) // Make an RPC and ensure that it gets routed to the first backend since the // EDS cluster is of higher priority than the LOGICAL_DNS cluster. @@ -761,7 +761,7 @@ func (s) TestAggregateCluster_BadEDS_GoodToBadDNS(t *testing.T) { } // Update DNS resolver with test backend addresses. - dnsR.UpdateState(resolver.State{Addresses: addrs}) + dnsR.UpdateState(resolver.State{Endpoints: addrsToEndpoints(addrs)}) // Ensure that RPCs start getting routed to the first backend since the // child policy for a LOGICAL_DNS cluster is pick_first by default. @@ -1244,3 +1244,11 @@ func (s) TestAggregateCluster_Fallback_EDS_ResourceNotFound(t *testing.T) { t.Fatalf("EmptyCall() routed to backend %q, want %q", peer.Addr, server.Address) } } + +func addrsToEndpoints(addrs []resolver.Address) []resolver.Endpoint { + endpoints := make([]resolver.Endpoint, len(addrs)) + for i, addr := range addrs { + endpoints[i] = resolver.Endpoint{Addresses: []resolver.Address{addr}} + } + return endpoints +} diff --git a/internal/xds/balancer/cdsbalancer/e2e_test/eds_impl_test.go b/internal/xds/balancer/cdsbalancer/e2e_test/eds_impl_test.go index dc150dc185e2..6b1697b6deac 100644 --- a/internal/xds/balancer/cdsbalancer/e2e_test/eds_impl_test.go +++ b/internal/xds/balancer/cdsbalancer/e2e_test/eds_impl_test.go @@ -1004,18 +1004,15 @@ func (s) TestEDS_EndpointWithMultipleAddresses(t *testing.T) { name string dualstackEndpointsEnabled bool wantEndpointPorts []uint32 - wantAddrPorts []uint32 }{ { name: "flag_enabled", dualstackEndpointsEnabled: true, wantEndpointPorts: ports, - wantAddrPorts: ports[:1], }, { name: "flag_disabled", wantEndpointPorts: ports[:1], - wantAddrPorts: ports[:1], }, } @@ -1109,14 +1106,6 @@ func (s) TestEDS_EndpointWithMultipleAddresses(t *testing.T) { if diff := cmp.Diff(gotEndpointPorts, tc.wantEndpointPorts); diff != "" { t.Errorf("Unexpected endpoint address ports in resolver update, diff (-got +want): %v", diff) } - - gotAddrPorts := []uint32{} - for _, a := range gotState.Addresses { - gotAddrPorts = append(gotAddrPorts, testutils.ParsePort(t, a.Addr)) - } - if diff := cmp.Diff(gotAddrPorts, tc.wantAddrPorts); diff != "" { - t.Errorf("Unexpected address ports in resolver update, diff (-got +want): %v", diff) - } }) } } diff --git a/internal/xds/balancer/clusterimpl/tests/balancer_test.go b/internal/xds/balancer/clusterimpl/tests/balancer_test.go index f0418aba3030..7debfd793ff9 100644 --- a/internal/xds/balancer/clusterimpl/tests/balancer_test.go +++ b/internal/xds/balancer/clusterimpl/tests/balancer_test.go @@ -965,7 +965,9 @@ func (s) TestReResolutionAfterTransientFailure(t *testing.T) { dnsR.ResolveNowCallback = func(resolver.ResolveNowOptions) { close(resolveNowCh) } - dnsR.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: fmt.Sprintf("%s:%d", host, port)}}}) + dnsR.UpdateState(resolver.State{ + Endpoints: []resolver.Endpoint{{Addresses: []resolver.Address{{Addr: fmt.Sprintf("%s:%d", host, port)}}}}, + }) ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() diff --git a/internal/xds/balancer/clustermanager/clustermanager.go b/internal/xds/balancer/clustermanager/clustermanager.go index e0a75afa8e99..05e10e579dd1 100644 --- a/internal/xds/balancer/clustermanager/clustermanager.go +++ b/internal/xds/balancer/clustermanager/clustermanager.go @@ -83,10 +83,7 @@ func (b *bal) setErrorPickerForChild(childName string, err error) { } func (b *bal) updateChildren(s balancer.ClientConnState, newConfig *lbConfig) error { - // TODO: Get rid of handling hierarchy in addresses. This LB policy never - // gets addresses from the resolver. - addressesSplit := hierarchy.Group(s.ResolverState.Addresses) - endpointsSplit := hierarchy.GroupEndpoints(s.ResolverState.Endpoints) + endpointsSplit := hierarchy.Group(s.ResolverState.Endpoints) // Remove sub-balancers that are not in the new list from the aggregator and // balancergroup. @@ -138,7 +135,6 @@ func (b *bal) updateChildren(s balancer.ClientConnState, newConfig *lbConfig) er if err := b.bg.UpdateClientConnState(childName, balancer.ClientConnState{ ResolverState: resolver.State{ - Addresses: addressesSplit[childName], Endpoints: endpointsSplit[childName], ServiceConfig: s.ResolverState.ServiceConfig, Attributes: s.ResolverState.Attributes, diff --git a/internal/xds/balancer/clustermanager/clustermanager_test.go b/internal/xds/balancer/clustermanager/clustermanager_test.go index 01c1a8e80dad..17a302612f98 100644 --- a/internal/xds/balancer/clustermanager/clustermanager_test.go +++ b/internal/xds/balancer/clustermanager/clustermanager_test.go @@ -105,13 +105,9 @@ func (s) TestClusterPicks(t *testing.T) { } m1 := make(map[resolver.Address]balancer.SubConn) - // Verify that a subconn is created with the address, and the hierarchy path - // in the address is cleared. + // Verify that a subconn is created with the address. for range wantAddrs { addrs := <-cc.NewSubConnAddrsCh - if len(hierarchy.Get(addrs[0])) != 0 { - t.Fatalf("NewSubConn with address %+v, attrs %+v, want address with hierarchy cleared", addrs[0], addrs[0].BalancerAttributes) - } sc := <-cc.NewSubConnCh // Clear the attributes before adding to map. addrs[0].BalancerAttributes = nil @@ -186,13 +182,9 @@ func (s) TestConfigUpdateAddCluster(t *testing.T) { } m1 := make(map[resolver.Address]balancer.SubConn) - // Verify that a subconn is created with the address, and the hierarchy path - // in the address is cleared. + // Verify that a subconn is created with the address. for range wantAddrs { addrs := <-cc.NewSubConnAddrsCh - if len(hierarchy.Get(addrs[0])) != 0 { - t.Fatalf("NewSubConn with address %+v, attrs %+v, want address with hierarchy cleared", addrs[0], addrs[0].BalancerAttributes) - } sc := <-cc.NewSubConnCh // Clear the attributes before adding to map. addrs[0].BalancerAttributes = nil @@ -258,9 +250,6 @@ func (s) TestConfigUpdateAddCluster(t *testing.T) { // Expect exactly one new subconn. addrs := <-cc.NewSubConnAddrsCh - if len(hierarchy.Get(addrs[0])) != 0 { - t.Fatalf("NewSubConn with address %+v, attrs %+v, want address with hierarchy cleared", addrs[0], addrs[0].BalancerAttributes) - } sc := <-cc.NewSubConnCh // Clear the attributes before adding to map. addrs[0].BalancerAttributes = nil @@ -350,9 +339,6 @@ func (s) TestRoutingConfigUpdateDeleteAll(t *testing.T) { // in the address is cleared. for range wantAddrs { addrs := <-cc.NewSubConnAddrsCh - if len(hierarchy.Get(addrs[0])) != 0 { - t.Fatalf("NewSubConn with address %+v, attrs %+v, want address with hierarchy cleared", addrs[0], addrs[0].BalancerAttributes) - } sc := <-cc.NewSubConnCh // Clear the attributes before adding to map. addrs[0].BalancerAttributes = nil @@ -436,9 +422,6 @@ func (s) TestRoutingConfigUpdateDeleteAll(t *testing.T) { // in the address is cleared. for range wantAddrs { addrs := <-cc.NewSubConnAddrsCh - if len(hierarchy.Get(addrs[0])) != 0 { - t.Fatalf("NewSubConn with address %+v, attrs %+v, want address with hierarchy cleared", addrs[0], addrs[0].BalancerAttributes) - } sc := <-cc.NewSubConnCh // Clear the attributes before adding to map. addrs[0].BalancerAttributes = nil diff --git a/internal/xds/balancer/clusterresolver/clusterresolver.go b/internal/xds/balancer/clusterresolver/clusterresolver.go index f5a30e1acbec..199070468239 100644 --- a/internal/xds/balancer/clusterresolver/clusterresolver.go +++ b/internal/xds/balancer/clusterresolver/clusterresolver.go @@ -248,18 +248,10 @@ func (b *clusterResolverBalancer) updateChildConfig() { b.logger.Infof("Built child policy config: %s", pretty.ToJSON(childCfg)) } - flattenedAddrs := make([]resolver.Address, len(endpoints)) for i := range endpoints { for j := range endpoints[i].Addresses { addr := endpoints[i].Addresses[j] addr.BalancerAttributes = endpoints[i].Attributes - // If the endpoint has multiple addresses, only the first is added - // to the flattened address list. This ensures that LB policies - // that don't support endpoints create only one subchannel to a - // backend. - if j == 0 { - flattenedAddrs[i] = addr - } // BalancerAttributes need to be present in endpoint addresses. This // temporary workaround is required to make load reporting work // with the old pickfirst policy which creates SubConns with multiple @@ -274,7 +266,6 @@ func (b *clusterResolverBalancer) updateChildConfig() { if err := b.child.UpdateClientConnState(balancer.ClientConnState{ ResolverState: resolver.State{ Endpoints: endpoints, - Addresses: flattenedAddrs, ServiceConfig: b.configRaw, Attributes: b.attrsWithClient, }, diff --git a/internal/xds/balancer/clusterresolver/resource_resolver_dns.go b/internal/xds/balancer/clusterresolver/resource_resolver_dns.go index 5f7a21153057..ea292e712d19 100644 --- a/internal/xds/balancer/clusterresolver/resource_resolver_dns.go +++ b/internal/xds/balancer/clusterresolver/resource_resolver_dns.go @@ -133,15 +133,7 @@ func (dr *dnsDiscoveryMechanism) UpdateState(state resolver.State) error { } dr.mu.Lock() - var endpoints = state.Endpoints - if len(endpoints) == 0 { - endpoints = make([]resolver.Endpoint, len(state.Addresses)) - for i, a := range state.Addresses { - endpoints[i] = resolver.Endpoint{Addresses: []resolver.Address{a}} - endpoints[i].Attributes = a.BalancerAttributes - } - } - dr.endpoints = endpoints + dr.endpoints = state.Endpoints dr.updateReceived = true dr.mu.Unlock() @@ -171,8 +163,8 @@ func (dr *dnsDiscoveryMechanism) ReportError(err error) { dr.topLevelResolver.onUpdate(func() {}) } -func (dr *dnsDiscoveryMechanism) NewAddress(addresses []resolver.Address) { - dr.UpdateState(resolver.State{Addresses: addresses}) +func (dr *dnsDiscoveryMechanism) NewAddress([]resolver.Address) { + dr.logger.Errorf("NewAddress called unexpectedly.") } func (dr *dnsDiscoveryMechanism) ParseServiceConfig(string) *serviceconfig.ParseResult { diff --git a/internal/xds/balancer/priority/balancer.go b/internal/xds/balancer/priority/balancer.go index 194e0319086b..950cd13e651e 100644 --- a/internal/xds/balancer/priority/balancer.go +++ b/internal/xds/balancer/priority/balancer.go @@ -121,8 +121,7 @@ func (b *priorityBalancer) UpdateClientConnState(s balancer.ClientConnState) err if !ok { return fmt.Errorf("unexpected balancer config with type: %T", s.BalancerConfig) } - addressesSplit := hierarchy.Group(s.ResolverState.Addresses) - endpointsSplit := hierarchy.GroupEndpoints(s.ResolverState.Endpoints) + endpointsSplit := hierarchy.Group(s.ResolverState.Endpoints) b.mu.Lock() // Create and remove children, since we know all children from the config @@ -141,7 +140,6 @@ func (b *priorityBalancer) UpdateClientConnState(s balancer.ClientConnState) err // priority. If necessary, it will be built when syncing priorities. cb := newChildBalancer(name, b, bb.Name(), b.cc) cb.updateConfig(newSubConfig, resolver.State{ - Addresses: addressesSplit[name], Endpoints: endpointsSplit[name], ServiceConfig: s.ResolverState.ServiceConfig, Attributes: s.ResolverState.Attributes, @@ -163,7 +161,6 @@ func (b *priorityBalancer) UpdateClientConnState(s balancer.ClientConnState) err // updates to non-started child balancers (the child balancer might not // be built, if it's a low priority). currentChild.updateConfig(newSubConfig, resolver.State{ - Addresses: addressesSplit[name], Endpoints: endpointsSplit[name], ServiceConfig: s.ResolverState.ServiceConfig, Attributes: s.ResolverState.Attributes, diff --git a/internal/xds/xdsdepmgr/xds_dependency_manager.go b/internal/xds/xdsdepmgr/xds_dependency_manager.go index f4d956770868..4dd792fccc29 100644 --- a/internal/xds/xdsdepmgr/xds_dependency_manager.go +++ b/internal/xds/xdsdepmgr/xds_dependency_manager.go @@ -758,16 +758,7 @@ func (m *DependencyManager) onDNSUpdate(resourceName string, update *resolver.St if m.logger.V(2) { m.logger.Infof("Received update from DNS resolver for resource %q: %+v", resourceName, update) } - endpoints := update.Endpoints - if len(endpoints) == 0 { - endpoints = make([]resolver.Endpoint, len(update.Addresses)) - for i, a := range update.Addresses { - endpoints[i] = resolver.Endpoint{Addresses: []resolver.Address{a}} - endpoints[i].Attributes = a.BalancerAttributes - } - } - - m.dnsResolvers[resourceName].setLastUpdate(&xdsresource.DNSUpdate{Endpoints: endpoints}) + m.dnsResolvers[resourceName].setLastUpdate(&xdsresource.DNSUpdate{Endpoints: update.Endpoints}) m.maybeSendUpdateLocked() } diff --git a/internal/xds/xdsdepmgr/xds_dependency_manager_test.go b/internal/xds/xdsdepmgr/xds_dependency_manager_test.go index 6285959233df..7d9ba5658ff8 100644 --- a/internal/xds/xdsdepmgr/xds_dependency_manager_test.go +++ b/internal/xds/xdsdepmgr/xds_dependency_manager_test.go @@ -278,9 +278,6 @@ func createXDSClient(t *testing.T, bootstrapContents []byte) xdsclient.XDSClient } pool := xdsclient.NewPool(config) - if err != nil { - t.Fatalf("Failed to create an xDS client pool: %v", err) - } c, cancel, err := pool.NewClientForTesting(xdsclient.OptionsForTesting{Name: t.Name()}) if err != nil { t.Fatalf("Failed to create an xDS client: %v", err) @@ -1055,9 +1052,9 @@ func (s) TestAggregateCluster(t *testing.T) { dnsR := replaceDNSResolver(t) dnsR.UpdateState(resolver.State{ - Addresses: []resolver.Address{ - {Addr: "127.0.0.1:8081"}, - {Addr: "[::1]:8081"}, + Endpoints: []resolver.Endpoint{ + {Addresses: []resolver.Address{{Addr: "127.0.0.1:8081"}}}, + {Addresses: []resolver.Address{{Addr: "[::1]:8081"}}}, }, }) @@ -1203,9 +1200,9 @@ func (s) TestAggregateClusterChildError(t *testing.T) { dnsR := replaceDNSResolver(t) dnsR.UpdateState(resolver.State{ - Addresses: []resolver.Address{ - {Addr: "127.0.0.1:8081"}, - {Addr: "[::1]:8081"}, + Endpoints: []resolver.Endpoint{ + {Addresses: []resolver.Address{{Addr: "127.0.0.1:8081"}}}, + {Addresses: []resolver.Address{{Addr: "[::1]:8081"}}}, }, })