diff --git a/balancer/pickfirst/internal/internal.go b/balancer/pickfirst/internal/internal.go index 7d66cb491c40..cc902a4de6fb 100644 --- a/balancer/pickfirst/internal/internal.go +++ b/balancer/pickfirst/internal/internal.go @@ -26,6 +26,8 @@ import ( var ( // RandShuffle pseudo-randomizes the order of addresses. RandShuffle = rand.Shuffle + // RandFloat64 returns, as a float64, a pseudo-random number in [0.0,1.0). + RandFloat64 = rand.Float64 // TimeAfterFunc allows mocking the timer for testing connection delay // related functionality. TimeAfterFunc = func(d time.Duration, f func()) func() { diff --git a/balancer/pickfirst/pickfirst.go b/balancer/pickfirst/pickfirst.go index b4bc3a2bf368..dccd9f0bf398 100644 --- a/balancer/pickfirst/pickfirst.go +++ b/balancer/pickfirst/pickfirst.go @@ -21,11 +21,14 @@ package pickfirst import ( + "cmp" "encoding/json" "errors" "fmt" + "math" "net" "net/netip" + "slices" "sync" "time" @@ -34,6 +37,8 @@ import ( "google.golang.org/grpc/connectivity" expstats "google.golang.org/grpc/experimental/stats" "google.golang.org/grpc/grpclog" + "google.golang.org/grpc/internal/balancer/weight" + "google.golang.org/grpc/internal/envconfig" internalgrpclog "google.golang.org/grpc/internal/grpclog" "google.golang.org/grpc/internal/pretty" "google.golang.org/grpc/resolver" @@ -258,8 +263,42 @@ func (b *pickfirstBalancer) UpdateClientConnState(state balancer.ClientConnState // will change the order of endpoints but not touch the order of the // addresses within each endpoint. - A61 if cfg.ShuffleAddressList { - endpoints = append([]resolver.Endpoint{}, endpoints...) - internal.RandShuffle(len(endpoints), func(i, j int) { endpoints[i], endpoints[j] = endpoints[j], endpoints[i] }) + if envconfig.PickFirstWeightedShuffling { + type weightedEndpoint struct { + endpoint resolver.Endpoint + weight float64 + } + + // For each endpoint, compute a key as described in A113 and + // https://utopia.duth.gr/~pefraimi/research/data/2007EncOfAlg.pdf: + var weightedEndpoints []weightedEndpoint + for _, endpoint := range endpoints { + u := internal.RandFloat64() // Random number in [0.0, 1.0) + weight := weightAttribute(endpoint) + weightedEndpoints = append(weightedEndpoints, weightedEndpoint{ + endpoint: endpoint, + weight: math.Pow(u, 1.0/float64(weight)), + }) + } + // Sort endpoints by key in descending order and reconstruct the + // endpoints slice. + slices.SortFunc(weightedEndpoints, func(a, b weightedEndpoint) int { + return cmp.Compare(b.weight, a.weight) + }) + + // Here, and in the "else" block below, we clone the endpoints + // slice to avoid mutating the resolver state. Doing the latter + // would lead to data races if the caller is accessing the same + // slice concurrently. + sortedEndpoints := make([]resolver.Endpoint, len(endpoints)) + for i, we := range weightedEndpoints { + sortedEndpoints[i] = we.endpoint + } + endpoints = sortedEndpoints + } else { + endpoints = slices.Clone(endpoints) + internal.RandShuffle(len(endpoints), func(i, j int) { endpoints[i], endpoints[j] = endpoints[j], endpoints[i] }) + } } // "Flatten the list by concatenating the ordered list of addresses for @@ -906,3 +945,17 @@ func equalAddressIgnoringBalAttributes(a, b *resolver.Address) bool { return a.Addr == b.Addr && a.ServerName == b.ServerName && a.Attributes.Equal(b.Attributes) } + +// weightAttribute is a convenience function which returns the value of the +// weight endpoint Attribute. +// +// When used in the xDS context, the weight attribute is guaranteed to be +// non-zero. But, when used in a non-xDS context, the weight attribute could be +// unset. A Default of 1 is used in the latter case. +func weightAttribute(e resolver.Endpoint) uint32 { + w := weight.FromEndpoint(e).Weight + if w == 0 { + return 1 + } + return w +} diff --git a/balancer/pickfirst/pickfirst_ext_test.go b/balancer/pickfirst/pickfirst_ext_test.go index 7296aa9636b9..e9b8edf03b6b 100644 --- a/balancer/pickfirst/pickfirst_ext_test.go +++ b/balancer/pickfirst/pickfirst_ext_test.go @@ -39,7 +39,9 @@ import ( "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/internal" "google.golang.org/grpc/internal/balancer/stub" + "google.golang.org/grpc/internal/balancer/weight" "google.golang.org/grpc/internal/channelz" + "google.golang.org/grpc/internal/envconfig" "google.golang.org/grpc/internal/grpcsync" "google.golang.org/grpc/internal/grpctest" "google.golang.org/grpc/internal/stubserver" @@ -425,6 +427,8 @@ func (s) TestPickFirst_StickyTransientFailure(t *testing.T) { // Tests the PF LB policy with shuffling enabled. func (s) TestPickFirst_ShuffleAddressList(t *testing.T) { + testutils.SetEnvConfig(t, &envconfig.PickFirstWeightedShuffling, false) + const serviceConfig = `{"loadBalancingConfig": [{"pick_first":{ "shuffleAddressList": true }}]}` // Install a shuffler that always reverses two entries. @@ -485,6 +489,8 @@ func (s) TestPickFirst_ShuffleAddressList(t *testing.T) { // Endpoints field in the resolver update to test the shuffling of the // Addresses. func (s) TestPickFirst_ShuffleAddressListNoEndpoints(t *testing.T) { + testutils.SetEnvConfig(t, &envconfig.PickFirstWeightedShuffling, false) + // Install a shuffler that always reverses two entries. origShuf := pfinternal.RandShuffle defer func() { pfinternal.RandShuffle = origShuf }() @@ -560,8 +566,73 @@ func (s) TestPickFirst_ShuffleAddressListNoEndpoints(t *testing.T) { } } +// Tests the PF LB policy with weighted shuffling enabled. +func (s) TestPickFirst_ShuffleAddressList_WeightedShuffling(t *testing.T) { + testutils.SetEnvConfig(t, &envconfig.PickFirstWeightedShuffling, true) + + const serviceConfig = `{"loadBalancingConfig": [{"pick_first":{ "shuffleAddressList": true }}]}` + + // Install a rand func that returns a constant value. The test sets up three + // endpoints with increasing weights. This means that in the weighted + // shuffling algorithm, the endpoints will end up with increasing values for + // their keys. And since the algorithm sorts in descending order, the last + // endpoint should be the one that would get picked. + origRand := pfinternal.RandFloat64 + defer func() { pfinternal.RandFloat64 = origRand }() + pfinternal.RandFloat64 = func() float64 { + return 0.5 + } + + // Set up our backends. + cc, r, backends := setupPickFirst(t, 3) + addrs := stubBackendsToResolverAddrs(backends) + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + // Create endpoints for the above backends with increasing weights. + ep1 := resolver.Endpoint{Addresses: []resolver.Address{addrs[0]}} + ep1 = weight.Set(ep1, weight.EndpointInfo{Weight: 357913941}) // Normalized weight of 1/6 + ep2 := resolver.Endpoint{Addresses: []resolver.Address{addrs[1]}} + ep2 = weight.Set(ep2, weight.EndpointInfo{Weight: 715827882}) // Normalized weight of 2/6 + ep3 := resolver.Endpoint{Addresses: []resolver.Address{addrs[2]}} + ep3 = weight.Set(ep3, weight.EndpointInfo{Weight: 1073741824}) // Normalized weight of 3/6 + + // Push an update with all addresses and shuffling disabled. We should + // connect to backend 0. + r.UpdateState(resolver.State{Endpoints: []resolver.Endpoint{ep1, ep2, ep3}}) + if err := pickfirst.CheckRPCsToBackend(ctx, cc, addrs[0]); err != nil { + t.Fatal(err) + } + + // Send a config with shuffling enabled. This will reverse the addresses, + // but the channel should still be connected to backend 0. + shufState := resolver.State{ + ServiceConfig: parseServiceConfig(t, r, serviceConfig), + Endpoints: []resolver.Endpoint{ep1, ep2, ep3}, + } + r.UpdateState(shufState) + if err := pickfirst.CheckRPCsToBackend(ctx, cc, addrs[0]); err != nil { + t.Fatal(err) + } + + // Send a resolver update with no addresses. This should push the channel + // into TransientFailure. + r.UpdateState(resolver.State{}) + testutils.AwaitState(ctx, t, cc, connectivity.TransientFailure) + + // Send the same config as last time with shuffling enabled. Since we are + // not connected to backend 0, we should connect to backend 2. + r.UpdateState(shufState) + if err := pickfirst.CheckRPCsToBackend(ctx, cc, addrs[2]); err != nil { + t.Fatal(err) + } +} + // Test config parsing with the env var turned on and off for various scenarios. func (s) TestPickFirst_ParseConfig_Success(t *testing.T) { + testutils.SetEnvConfig(t, &envconfig.PickFirstWeightedShuffling, false) + // Install a shuffler that always reverses two entries. origShuf := pfinternal.RandShuffle defer func() { pfinternal.RandShuffle = origShuf }() diff --git a/balancer/ringhash/ringhash_e2e_test.go b/balancer/ringhash/ringhash_e2e_test.go index 23ce37efa383..915db5ac646f 100644 --- a/balancer/ringhash/ringhash_e2e_test.go +++ b/balancer/ringhash/ringhash_e2e_test.go @@ -1240,29 +1240,30 @@ func (s) TestRingHash_UnsupportedHashPolicyUntilChannelIdHashing(t *testing.T) { // Tests that ring hash policy that hashes using a random value can spread RPCs // across all the backends according to locality weight. func (s) TestRingHash_RandomHashingDistributionAccordingToLocalityAndEndpointWeight(t *testing.T) { + testutils.SetEnvConfig(t, &envconfig.PickFirstWeightedShuffling, true) backends := backendAddrs(startTestServiceBackends(t, 2)) const clusterName = "cluster" - const locality1Weight = uint32(1) - const endpoint1Weight = uint32(1) - const locality2Weight = uint32(2) - const endpoint2Weight = uint32(2) + const locality0Weight = uint32(1) + const endpoint0Weight = uint32(1) + const locality1Weight = uint32(2) + const endpoint1Weight = uint32(2) endpoints := e2e.EndpointResourceWithOptions(e2e.EndpointOptions{ ClusterName: clusterName, Localities: []e2e.LocalityOptions{ { Backends: []e2e.BackendOptions{{ Ports: []uint32{testutils.ParsePort(t, backends[0])}, - Weight: endpoint1Weight, + Weight: endpoint0Weight, }}, - Weight: locality1Weight, + Weight: locality0Weight, }, { Backends: []e2e.BackendOptions{{ Ports: []uint32{testutils.ParsePort(t, backends[1])}, - Weight: endpoint2Weight, + Weight: endpoint1Weight, }}, - Weight: locality2Weight, + Weight: locality1Weight, }, }, }) @@ -1289,21 +1290,26 @@ func (s) TestRingHash_RandomHashingDistributionAccordingToLocalityAndEndpointWei defer conn.Close() client := testgrpc.NewTestServiceClient(conn) - const weight1 = endpoint1Weight * locality1Weight - const weight2 = endpoint2Weight * locality2Weight - const wantRPCs1 = float64(weight1) / float64(weight1+weight2) - const wantRPCs2 = float64(weight2) / float64(weight1+weight2) - numRPCs := computeIdealNumberOfRPCs(t, math.Min(wantRPCs1, wantRPCs2), errorTolerance) + // The target fraction of RPCs to each backend is computed as the product of + // the probability of selecting the locality and the probability of + // selecting the endpoint within the locality. The probability of selecting + // locality0 is 1/3 and locality1 is 2/3. Since there is only one endpoint + // in each locality, the probability of selecting the endpoint within the + // locality is 1. Therefore, the target fractions end up as 1/3 and 2/3 + // respectively. + const wantRPCs0 = float64(1) / float64(3) + const wantRPCs1 = float64(2) / float64(3) + numRPCs := computeIdealNumberOfRPCs(t, math.Min(wantRPCs0, wantRPCs1), errorTolerance) // Send a large number of RPCs and check that they are distributed randomly. gotPerBackend := checkRPCSendOK(ctx, t, client, numRPCs) got := float64(gotPerBackend[backends[0]]) / float64(numRPCs) - if !cmp.Equal(got, wantRPCs1, cmpopts.EquateApprox(0, errorTolerance)) { - t.Errorf("Fraction of RPCs to backend %s: got %v, want %v (margin: +-%v)", backends[2], got, wantRPCs1, errorTolerance) + if !cmp.Equal(got, wantRPCs0, cmpopts.EquateApprox(0, errorTolerance)) { + t.Errorf("Fraction of RPCs to backend %s: got %v, want %v (margin: +-%v)", backends[0], got, wantRPCs0, errorTolerance) } got = float64(gotPerBackend[backends[1]]) / float64(numRPCs) - if !cmp.Equal(got, wantRPCs2, cmpopts.EquateApprox(0, errorTolerance)) { - t.Errorf("Fraction of RPCs to backend %s: got %v, want %v (margin: +-%v)", backends[2], got, wantRPCs2, errorTolerance) + if !cmp.Equal(got, wantRPCs1, cmpopts.EquateApprox(0, errorTolerance)) { + t.Errorf("Fraction of RPCs to backend %s: got %v, want %v (margin: +-%v)", backends[1], got, wantRPCs1, errorTolerance) } } diff --git a/internal/envconfig/envconfig.go b/internal/envconfig/envconfig.go index ec5b37293b61..536f6f5990d8 100644 --- a/internal/envconfig/envconfig.go +++ b/internal/envconfig/envconfig.go @@ -90,6 +90,12 @@ var ( // This feature is defined in gRFC A81 and is enabled by setting the // environment variable GRPC_EXPERIMENTAL_XDS_AUTHORITY_REWRITE to "true". XDSAuthorityRewrite = boolFromEnv("GRPC_EXPERIMENTAL_XDS_AUTHORITY_REWRITE", false) + + // PickFirstWeightedShuffling indicates whether weighted endpoint shuffling + // is enabled in the pick_first LB policy, as defined in gRFC A113. This + // feature can be disabled by setting the environment variable + // GRPC_EXPERIMENTAL_PF_WEIGHTED_SHUFFLING to "false". + PickFirstWeightedShuffling = boolFromEnv("GRPC_EXPERIMENTAL_PF_WEIGHTED_SHUFFLING", true) ) func boolFromEnv(envVar string, def bool) bool { diff --git a/internal/xds/balancer/clusterresolver/configbuilder.go b/internal/xds/balancer/clusterresolver/configbuilder.go index 56344f9b7b66..d5675efb1bec 100644 --- a/internal/xds/balancer/clusterresolver/configbuilder.go +++ b/internal/xds/balancer/clusterresolver/configbuilder.go @@ -21,10 +21,11 @@ package clusterresolver import ( "encoding/json" "fmt" + "maps" "slices" - "sort" "google.golang.org/grpc/internal/balancer/weight" + "google.golang.org/grpc/internal/envconfig" "google.golang.org/grpc/internal/hierarchy" internalserviceconfig "google.golang.org/grpc/internal/serviceconfig" xdsinternal "google.golang.org/grpc/internal/xds" @@ -213,53 +214,49 @@ func buildClusterImplConfigForEDS(g *nameGenerator, edsResp xdsresource.Endpoint // For example, for L0-p0, L1-p0, L2-p1, results will be // - [[L0, L1], [L2]] func groupLocalitiesByPriority(localities []xdsresource.Locality) [][]xdsresource.Locality { - var priorityIntSlice []int priorities := make(map[int][]xdsresource.Locality) for _, locality := range localities { priority := int(locality.Priority) priorities[priority] = append(priorities[priority], locality) - priorityIntSlice = append(priorityIntSlice, priority) } // Sort the priorities based on the int value, deduplicate, and then turn // the sorted list into a string list. This will be child names, in priority // order. - sort.Ints(priorityIntSlice) - priorityIntSliceDeduped := dedupSortedIntSlice(priorityIntSlice) - ret := make([][]xdsresource.Locality, 0, len(priorityIntSliceDeduped)) - for _, p := range priorityIntSliceDeduped { + priorityIntSlice := slices.Sorted(maps.Keys(priorities)) + ret := make([][]xdsresource.Locality, 0, len(priorityIntSlice)) + for _, p := range priorityIntSlice { ret = append(ret, priorities[p]) } return ret } -func dedupSortedIntSlice(a []int) []int { - if len(a) == 0 { - return a - } - i, j := 0, 1 - for ; j < len(a); j++ { - if a[i] == a[j] { - continue - } - i++ - if i != j { - a[i] = a[j] - } - } - return a[:i+1] -} - // priorityLocalitiesToClusterImpl takes a list of localities (with the same // priority), and generates a cluster impl policy config, and a list of // addresses with their path hierarchy set to [priority-name, locality-name], so // priority and the xDS LB Policy know which child policy each address is for. func priorityLocalitiesToClusterImpl(localities []xdsresource.Locality, priorityName string, mechanism DiscoveryMechanism, drops []clusterimpl.DropConfig, xdsLBPolicy *internalserviceconfig.BalancerConfig) (*clusterimpl.LBConfig, []resolver.Endpoint, error) { var retEndpoints []resolver.Endpoint + + // Compute the sum of locality weights to normalize locality weights. The + // xDS client guarantees that the sum of locality weights (within a + // priority) will not exceed MaxUint32. + var localityWeightSum uint32 + for _, locality := range localities { + localityWeightSum += locality.Weight + } + for _, locality := range localities { - var lw uint32 = 1 - if locality.Weight != 0 { - lw = locality.Weight + // Compute the sum of endpoint weights to normalize endpoint weights. + // The xDS client does not currently guarantee that the sum of endpoint + // weights (within a locality) will not exceed MaxUint32. TODO(i/8862): + // Once the xDS client guarantees that the sum of endpoint weights does + // not exceed MaxUint32, we can change the type of this variable from + // uint64 to uint32. + var endpointWeightSum uint64 + for _, endpoint := range locality.Endpoints { + endpointWeightSum += uint64(endpoint.Weight) } + localityStr := xdsinternal.LocalityString(locality.ID) for _, endpoint := range locality.Endpoints { // Filter out all "unhealthy" endpoints (unknown and healthy are @@ -282,12 +279,19 @@ func priorityLocalitiesToClusterImpl(localities []xdsresource.Locality, priority // populate a new locality weight attribute for each address The // attribute will have the weight (as an integer) of the locality // the address is part of." - A52 - resolverEndpoint = wrrlocality.SetAddrInfo(resolverEndpoint, wrrlocality.AddrInfo{LocalityWeight: lw}) - var ew uint32 = 1 - if endpoint.Weight != 0 { - ew = endpoint.Weight + resolverEndpoint = wrrlocality.SetAddrInfo(resolverEndpoint, wrrlocality.AddrInfo{LocalityWeight: locality.Weight}) + + if envconfig.PickFirstWeightedShuffling { + normalizedLocalityWeight := fractionToFixedPoint(uint64(locality.Weight), uint64(localityWeightSum)) + normalizedEndpointWeight := fractionToFixedPoint(uint64(endpoint.Weight), endpointWeightSum) + endpointWeight := fixedPointMultiply(normalizedEndpointWeight, normalizedLocalityWeight) + if endpointWeight == 0 { + endpointWeight = 1 + } + resolverEndpoint = weight.Set(resolverEndpoint, weight.EndpointInfo{Weight: endpointWeight}) + } else { + resolverEndpoint = weight.Set(resolverEndpoint, weight.EndpointInfo{Weight: locality.Weight * endpoint.Weight}) } - resolverEndpoint = weight.Set(resolverEndpoint, weight.EndpointInfo{Weight: lw * ew}) retEndpoints = append(retEndpoints, resolverEndpoint) } } @@ -301,3 +305,35 @@ func priorityLocalitiesToClusterImpl(localities []xdsresource.Locality, priority ChildPolicy: xdsLBPolicy, }, retEndpoints, nil } + +// fixedPointFractionalBits is the number of bits used for the fractional part +// of normalized endpoint and locality weights. +// +// We use the UQ1.31 fixed-point format (Unsigned, 1 integer bit, 31 fractional bits). +// This allows representing values in the range [0.0, 2.0) with a precision +// of 2^-31. +// +// Bit Layout: +// [ 31 ] [ 30 ................. 0 ] +// +// | | +// | +--- Fractional Part (31 bits) +// +------------------ Integer Part (1 bit) +// +// See gRFC A113 for more details. +const fixedPointFractionalBits = 31 + +// fractionToFixedPoint converts a fraction represented by numerator and +// denominator to a fixed-point number between 0 and 1 represented as a uint32. +// +// The xDS client guarantees that the sum of locality weights (within a +// priority) will not exceed MaxUint32. TODO(i/8862): Once the xDS client +// guarantees that the sum of endpoint weights does not exceed MaxUint32, we can +// change the types of this function's arguments from uint64 to uint32. +func fractionToFixedPoint(numerator, denominator uint64) uint32 { + return uint32(uint64(numerator) << fixedPointFractionalBits / uint64(denominator)) +} + +func fixedPointMultiply(a, b uint32) uint32 { + return uint32((uint64(a) * uint64(b)) >> fixedPointFractionalBits) +} diff --git a/internal/xds/balancer/clusterresolver/configbuilder_test.go b/internal/xds/balancer/clusterresolver/configbuilder_test.go index a82c0eb70d98..3f5ecc87aa2c 100644 --- a/internal/xds/balancer/clusterresolver/configbuilder_test.go +++ b/internal/xds/balancer/clusterresolver/configbuilder_test.go @@ -32,9 +32,11 @@ import ( "google.golang.org/grpc/balancer/ringhash" "google.golang.org/grpc/balancer/roundrobin" "google.golang.org/grpc/internal/balancer/weight" + "google.golang.org/grpc/internal/envconfig" "google.golang.org/grpc/internal/hierarchy" iringhash "google.golang.org/grpc/internal/ringhash" iserviceconfig "google.golang.org/grpc/internal/serviceconfig" + "google.golang.org/grpc/internal/testutils" xdsinternal "google.golang.org/grpc/internal/xds" "google.golang.org/grpc/internal/xds/balancer/clusterimpl" "google.golang.org/grpc/internal/xds/balancer/outlierdetection" @@ -47,7 +49,6 @@ import ( ) const ( - testLRSServer = "test-lrs-server" testMaxRequests = 314 testEDSServiceName = "service-name-from-parent" testDropCategory = "test-drops" @@ -102,6 +103,7 @@ func init() { {Addr: fmt.Sprintf("addr-%d-%d-additional-2", i, j)}, }, }, + Weight: 1, }) } testResolverEndpoints = append(testResolverEndpoints, endpoints) @@ -335,7 +337,9 @@ func TestBuildClusterImplConfigForDNS(t *testing.T) { } } -func TestBuildClusterImplConfigForEDS(t *testing.T) { +func TestBuildClusterImplConfigForEDS_PickFirstWeightedShuffling_Disabled(t *testing.T) { + testutils.SetEnvConfig(t, &envconfig.PickFirstWeightedShuffling, false) + testLRSServerConfig, err := bootstrap.ServerConfigForTesting(bootstrap.ServerConfigTestingOptions{ URI: "trafficdirector.googleapis.com:443", ChannelCreds: []bootstrap.ChannelCreds{{Type: "google_default"}}, @@ -418,15 +422,16 @@ func TestBuildClusterImplConfigForEDS(t *testing.T) { }, }, } + // Endpoint weight is the product of locality weight and endpoint weight. wantEndpoints := []resolver.Endpoint{ - testEndpointWithAttrs(testEndpoints[0][0].ResolverEndpoint, 20, 1, "priority-2-0", &testLocalityIDs[0]), - testEndpointWithAttrs(testEndpoints[0][1].ResolverEndpoint, 20, 1, "priority-2-0", &testLocalityIDs[0]), - testEndpointWithAttrs(testEndpoints[1][0].ResolverEndpoint, 80, 1, "priority-2-0", &testLocalityIDs[1]), - testEndpointWithAttrs(testEndpoints[1][1].ResolverEndpoint, 80, 1, "priority-2-0", &testLocalityIDs[1]), - testEndpointWithAttrs(testEndpoints[2][0].ResolverEndpoint, 20, 1, "priority-2-1", &testLocalityIDs[2]), - testEndpointWithAttrs(testEndpoints[2][1].ResolverEndpoint, 20, 1, "priority-2-1", &testLocalityIDs[2]), - testEndpointWithAttrs(testEndpoints[3][0].ResolverEndpoint, 80, 1, "priority-2-1", &testLocalityIDs[3]), - testEndpointWithAttrs(testEndpoints[3][1].ResolverEndpoint, 80, 1, "priority-2-1", &testLocalityIDs[3]), + testEndpointWithAttrs(testEndpoints[0][0].ResolverEndpoint, 20, 20*1, "priority-2-0", &testLocalityIDs[0]), + testEndpointWithAttrs(testEndpoints[0][1].ResolverEndpoint, 20, 20*1, "priority-2-0", &testLocalityIDs[0]), + testEndpointWithAttrs(testEndpoints[1][0].ResolverEndpoint, 80, 80*1, "priority-2-0", &testLocalityIDs[1]), + testEndpointWithAttrs(testEndpoints[1][1].ResolverEndpoint, 80, 80*1, "priority-2-0", &testLocalityIDs[1]), + testEndpointWithAttrs(testEndpoints[2][0].ResolverEndpoint, 20, 20*1, "priority-2-1", &testLocalityIDs[2]), + testEndpointWithAttrs(testEndpoints[2][1].ResolverEndpoint, 20, 20*1, "priority-2-1", &testLocalityIDs[2]), + testEndpointWithAttrs(testEndpoints[3][0].ResolverEndpoint, 80, 80*1, "priority-2-1", &testLocalityIDs[3]), + testEndpointWithAttrs(testEndpoints[3][1].ResolverEndpoint, 80, 80*1, "priority-2-1", &testLocalityIDs[3]), } if diff := cmp.Diff(gotNames, wantNames); diff != "" { @@ -438,9 +443,132 @@ func TestBuildClusterImplConfigForEDS(t *testing.T) { if diff := cmp.Diff(gotEndpoints, wantEndpoints, endpointCmpOpts); diff != "" { t.Errorf("buildClusterImplConfigForEDS() diff (-got +want) %v", diff) } - } +func TestBuildClusterImplConfigForEDS_PickFirstWeightedShuffling_Enabled(t *testing.T) { + testutils.SetEnvConfig(t, &envconfig.PickFirstWeightedShuffling, true) + + testLRSServerConfig, err := bootstrap.ServerConfigForTesting(bootstrap.ServerConfigTestingOptions{ + URI: "trafficdirector.googleapis.com:443", + ChannelCreds: []bootstrap.ChannelCreds{{Type: "google_default"}}, + }) + if err != nil { + t.Fatalf("Failed to create LRS server config for testing: %v", err) + } + + gotNames, gotConfigs, gotEndpoints, _ := buildClusterImplConfigForEDS( + newNameGenerator(2), + xdsresource.EndpointsUpdate{ + Drops: []xdsresource.OverloadDropConfig{ + { + Category: testDropCategory, + Numerator: testDropOverMillion, + Denominator: million, + }, + }, + Localities: []xdsresource.Locality{ + { + Endpoints: testEndpoints[3], + ID: testLocalityIDs[3], + Weight: 80, + Priority: 1, + }, { + Endpoints: testEndpoints[1], + ID: testLocalityIDs[1], + Weight: 80, + Priority: 0, + }, { + Endpoints: testEndpoints[2], + ID: testLocalityIDs[2], + Weight: 20, + Priority: 1, + }, { + Endpoints: testEndpoints[0], + ID: testLocalityIDs[0], + Weight: 20, + Priority: 0, + }, + }, + }, + DiscoveryMechanism{ + Cluster: testClusterName, + MaxConcurrentRequests: newUint32(testMaxRequests), + LoadReportingServer: testLRSServerConfig, + Type: DiscoveryMechanismTypeEDS, + EDSServiceName: testEDSServiceName, + }, + nil, + ) + + wantNames := []string{ + fmt.Sprintf("priority-%v-%v", 2, 0), + fmt.Sprintf("priority-%v-%v", 2, 1), + } + wantConfigs := map[string]*clusterimpl.LBConfig{ + "priority-2-0": { + Cluster: testClusterName, + EDSServiceName: testEDSServiceName, + LoadReportingServer: testLRSServerConfig, + MaxConcurrentRequests: newUint32(testMaxRequests), + DropCategories: []clusterimpl.DropConfig{ + { + Category: testDropCategory, + RequestsPerMillion: testDropOverMillion, + }, + }, + }, + "priority-2-1": { + Cluster: testClusterName, + EDSServiceName: testEDSServiceName, + LoadReportingServer: testLRSServerConfig, + MaxConcurrentRequests: newUint32(testMaxRequests), + DropCategories: []clusterimpl.DropConfig{ + { + Category: testDropCategory, + RequestsPerMillion: testDropOverMillion, + }, + }, + }, + } + // Endpoints weights are the product of normalized locality weight and + // endpoint weight, represented as a fixed-point number in uQ1.31 format. + // Locality weights are normalized as: + // P1: locality 3: 80 / (100) = 0.8 + // P0: locality 1: 80 / (100) = 0.8 + // P1: locality 2: 20 / (100) = 0.2 + // P0: locality 0: 20 / (100) = 0.2 + // In fixed-point uQ1.31 format, the weights are: + // locality 3: 0.8 * 2^31 = 1717986918 + // locality 1: 0.8 * 2^31 = 1717986918 + // locality 2: 0.2 * 2^31 = 429496729 + // locality 0: 0.2 * 2^31 = 429496729 + // + // There are two endpoints in each locality, each with weight 1. So, their + // normalized weights are 0.5 each. And the final endpoint weights are a + // product of their locality weights and 0.5, which turns out to be either + // 1717986918 * 0.5 = 858993459, or, + // 429496729 * 0.5 = 214748364 + wantEndpoints := []resolver.Endpoint{ + testEndpointWithAttrs(testEndpoints[0][0].ResolverEndpoint, 20, 214748364, "priority-2-0", &testLocalityIDs[0]), + testEndpointWithAttrs(testEndpoints[0][1].ResolverEndpoint, 20, 214748364, "priority-2-0", &testLocalityIDs[0]), + testEndpointWithAttrs(testEndpoints[1][0].ResolverEndpoint, 80, 858993459, "priority-2-0", &testLocalityIDs[1]), + testEndpointWithAttrs(testEndpoints[1][1].ResolverEndpoint, 80, 858993459, "priority-2-0", &testLocalityIDs[1]), + testEndpointWithAttrs(testEndpoints[2][0].ResolverEndpoint, 20, 214748364, "priority-2-1", &testLocalityIDs[2]), + testEndpointWithAttrs(testEndpoints[2][1].ResolverEndpoint, 20, 214748364, "priority-2-1", &testLocalityIDs[2]), + testEndpointWithAttrs(testEndpoints[3][0].ResolverEndpoint, 80, 858993459, "priority-2-1", &testLocalityIDs[3]), + testEndpointWithAttrs(testEndpoints[3][1].ResolverEndpoint, 80, 858993459, "priority-2-1", &testLocalityIDs[3]), + } + + if diff := cmp.Diff(gotNames, wantNames); diff != "" { + t.Errorf("buildClusterImplConfigForEDS() diff (-got +want) %v", diff) + } + if diff := cmp.Diff(gotConfigs, wantConfigs); diff != "" { + t.Errorf("buildClusterImplConfigForEDS() diff (-got +want) %v", diff) + } + if diff := cmp.Diff(gotEndpoints, wantEndpoints, endpointCmpOpts); diff != "" { + t.Errorf("buildClusterImplConfigForEDS() diff (-got +want) %v", diff) + } +} func TestGroupLocalitiesByPriority(t *testing.T) { tests := []struct { name string @@ -503,38 +631,147 @@ func TestGroupLocalitiesByPriority(t *testing.T) { } } -func TestDedupSortedIntSlice(t *testing.T) { +func TestPriorityLocalitiesToClusterImpl_PickFirstWeightedShuffling_Disabled(t *testing.T) { + testutils.SetEnvConfig(t, &envconfig.PickFirstWeightedShuffling, false) tests := []struct { - name string - a []int - want []int + name string + localities []xdsresource.Locality + priorityName string + mechanism DiscoveryMechanism + childPolicy *iserviceconfig.BalancerConfig + wantConfig *clusterimpl.LBConfig + wantEndpoints []resolver.Endpoint + wantErr bool }{ { - name: "empty", - a: []int{}, - want: []int{}, - }, - { - name: "no dup", - a: []int{0, 1, 2, 3}, - want: []int{0, 1, 2, 3}, + name: "round_robin_as_child_no_LRS", + localities: []xdsresource.Locality{ + { + Endpoints: []xdsresource.Endpoint{ + { + ResolverEndpoint: resolver.Endpoint{Addresses: []resolver.Address{{Addr: "addr-1-1"}}}, + Weight: 90, + HealthStatus: xdsresource.EndpointHealthStatusHealthy, + }, + { + ResolverEndpoint: resolver.Endpoint{Addresses: []resolver.Address{{Addr: "addr-1-2"}}}, + Weight: 10, + HealthStatus: xdsresource.EndpointHealthStatusHealthy, + }, + }, + ID: clients.Locality{Zone: "test-zone-1"}, + Weight: 20, + }, + { + Endpoints: []xdsresource.Endpoint{ + { + ResolverEndpoint: resolver.Endpoint{Addresses: []resolver.Address{{Addr: "addr-2-1"}}}, + Weight: 90, + HealthStatus: xdsresource.EndpointHealthStatusHealthy, + }, + { + ResolverEndpoint: resolver.Endpoint{Addresses: []resolver.Address{{Addr: "addr-2-2"}}}, + Weight: 10, + HealthStatus: xdsresource.EndpointHealthStatusHealthy, + }, + }, + ID: clients.Locality{Zone: "test-zone-2"}, + Weight: 80, + }, + }, + priorityName: "test-priority", + childPolicy: &iserviceconfig.BalancerConfig{Name: roundrobin.Name}, + mechanism: DiscoveryMechanism{ + Cluster: testClusterName, + Type: DiscoveryMechanismTypeEDS, + EDSServiceName: testEDSService, + }, + // lrsServer is nil, so LRS policy will not be used. + wantConfig: &clusterimpl.LBConfig{ + Cluster: testClusterName, + EDSServiceName: testEDSService, + ChildPolicy: &iserviceconfig.BalancerConfig{Name: roundrobin.Name}, + }, + // Endpoint weight is the product of locality weight and endpoint weight. + wantEndpoints: []resolver.Endpoint{ + testEndpointWithAttrs(resolver.Endpoint{Addresses: []resolver.Address{{Addr: "addr-1-1"}}}, 20, 20*90, "test-priority", &clients.Locality{Zone: "test-zone-1"}), + testEndpointWithAttrs(resolver.Endpoint{Addresses: []resolver.Address{{Addr: "addr-1-2"}}}, 20, 20*10, "test-priority", &clients.Locality{Zone: "test-zone-1"}), + testEndpointWithAttrs(resolver.Endpoint{Addresses: []resolver.Address{{Addr: "addr-2-1"}}}, 80, 80*90, "test-priority", &clients.Locality{Zone: "test-zone-2"}), + testEndpointWithAttrs(resolver.Endpoint{Addresses: []resolver.Address{{Addr: "addr-2-2"}}}, 80, 80*10, "test-priority", &clients.Locality{Zone: "test-zone-2"}), + }, }, { - name: "with dup", - a: []int{0, 0, 1, 1, 1, 2, 3}, - want: []int{0, 1, 2, 3}, + name: "ring_hash_as_child", + localities: []xdsresource.Locality{ + { + Endpoints: []xdsresource.Endpoint{ + { + ResolverEndpoint: resolver.Endpoint{Addresses: []resolver.Address{{Addr: "addr-1-1"}}}, + Weight: 90, + HealthStatus: xdsresource.EndpointHealthStatusHealthy, + }, + { + ResolverEndpoint: resolver.Endpoint{Addresses: []resolver.Address{{Addr: "addr-1-2"}}}, + Weight: 10, + HealthStatus: xdsresource.EndpointHealthStatusHealthy, + }, + }, + ID: clients.Locality{Zone: "test-zone-1"}, + Weight: 20, + }, + { + Endpoints: []xdsresource.Endpoint{ + { + ResolverEndpoint: resolver.Endpoint{Addresses: []resolver.Address{{Addr: "addr-2-1"}}}, + Weight: 90, + HealthStatus: xdsresource.EndpointHealthStatusHealthy, + }, + { + ResolverEndpoint: resolver.Endpoint{Addresses: []resolver.Address{{Addr: "addr-2-2"}}}, + Weight: 10, + HealthStatus: xdsresource.EndpointHealthStatusHealthy, + }, + }, + ID: clients.Locality{Zone: "test-zone-2"}, + Weight: 80, + }, + }, + priorityName: "test-priority", + childPolicy: &iserviceconfig.BalancerConfig{Name: ringhash.Name, Config: &iringhash.LBConfig{MinRingSize: 1, MaxRingSize: 2}}, + // lrsServer is nil, so LRS policy will not be used. + wantConfig: &clusterimpl.LBConfig{ + ChildPolicy: &iserviceconfig.BalancerConfig{ + Name: ringhash.Name, + Config: &iringhash.LBConfig{MinRingSize: 1, MaxRingSize: 2}, + }, + }, + // Endpoint weight is the product of locality weight and endpoint weight. + wantEndpoints: []resolver.Endpoint{ + testEndpointWithAttrs(resolver.Endpoint{Addresses: []resolver.Address{{Addr: "addr-1-1"}}}, 20, 20*90, "test-priority", &clients.Locality{Zone: "test-zone-1"}), + testEndpointWithAttrs(resolver.Endpoint{Addresses: []resolver.Address{{Addr: "addr-1-2"}}}, 20, 20*10, "test-priority", &clients.Locality{Zone: "test-zone-1"}), + testEndpointWithAttrs(resolver.Endpoint{Addresses: []resolver.Address{{Addr: "addr-2-1"}}}, 80, 80*90, "test-priority", &clients.Locality{Zone: "test-zone-2"}), + testEndpointWithAttrs(resolver.Endpoint{Addresses: []resolver.Address{{Addr: "addr-2-2"}}}, 80, 80*10, "test-priority", &clients.Locality{Zone: "test-zone-2"}), + }, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - if got := dedupSortedIntSlice(tt.a); !cmp.Equal(got, tt.want) { - t.Errorf("dedupSortedIntSlice() = %v, want %v, diff %v", got, tt.want, cmp.Diff(got, tt.want)) + gotConfig, gotEndpoints, err := priorityLocalitiesToClusterImpl(tt.localities, tt.priorityName, tt.mechanism, nil, tt.childPolicy) + if (err != nil) != tt.wantErr { + t.Fatalf("priorityLocalitiesToClusterImpl() error = %v, wantErr %v", err, tt.wantErr) + } + if diff := cmp.Diff(gotConfig, tt.wantConfig); diff != "" { + t.Errorf("priorityLocalitiesToClusterImpl() diff (-got +want) %v", diff) + } + if diff := cmp.Diff(gotEndpoints, tt.wantEndpoints, cmp.AllowUnexported(attributes.Attributes{})); diff != "" { + t.Errorf("priorityLocalitiesToClusterImpl() diff (-got +want) %v", diff) } }) } } -func TestPriorityLocalitiesToClusterImpl(t *testing.T) { +func TestPriorityLocalitiesToClusterImpl_PickFirstWeightedShuffling_Enabled(t *testing.T) { + testutils.SetEnvConfig(t, &envconfig.PickFirstWeightedShuffling, true) tests := []struct { name string localities []xdsresource.Locality @@ -544,64 +781,84 @@ func TestPriorityLocalitiesToClusterImpl(t *testing.T) { wantConfig *clusterimpl.LBConfig wantEndpoints []resolver.Endpoint wantErr bool - }{{ - name: "round robin as child, no LRS", - localities: []xdsresource.Locality{ - { - Endpoints: []xdsresource.Endpoint{ - { - ResolverEndpoint: resolver.Endpoint{Addresses: []resolver.Address{{Addr: "addr-1-1"}}}, - Weight: 90, - HealthStatus: xdsresource.EndpointHealthStatusHealthy, - }, - { - ResolverEndpoint: resolver.Endpoint{Addresses: []resolver.Address{{Addr: "addr-1-2"}}}, - Weight: 10, - HealthStatus: xdsresource.EndpointHealthStatusHealthy, + }{ + { + name: "round_robin_as_child_no_LRS", + localities: []xdsresource.Locality{ + { + Endpoints: []xdsresource.Endpoint{ + { + ResolverEndpoint: resolver.Endpoint{Addresses: []resolver.Address{{Addr: "addr-1-1"}}}, + Weight: 90, + HealthStatus: xdsresource.EndpointHealthStatusHealthy, + }, + { + ResolverEndpoint: resolver.Endpoint{Addresses: []resolver.Address{{Addr: "addr-1-2"}}}, + Weight: 10, + HealthStatus: xdsresource.EndpointHealthStatusHealthy, + }, }, + ID: clients.Locality{Zone: "test-zone-1"}, + Weight: 20, }, - ID: clients.Locality{Zone: "test-zone-1"}, - Weight: 20, - }, - { - Endpoints: []xdsresource.Endpoint{ - { - ResolverEndpoint: resolver.Endpoint{Addresses: []resolver.Address{{Addr: "addr-2-1"}}}, - Weight: 90, - HealthStatus: xdsresource.EndpointHealthStatusHealthy, - }, - { - ResolverEndpoint: resolver.Endpoint{Addresses: []resolver.Address{{Addr: "addr-2-2"}}}, - Weight: 10, - HealthStatus: xdsresource.EndpointHealthStatusHealthy, + { + Endpoints: []xdsresource.Endpoint{ + { + ResolverEndpoint: resolver.Endpoint{Addresses: []resolver.Address{{Addr: "addr-2-1"}}}, + Weight: 90, + HealthStatus: xdsresource.EndpointHealthStatusHealthy, + }, + { + ResolverEndpoint: resolver.Endpoint{Addresses: []resolver.Address{{Addr: "addr-2-2"}}}, + Weight: 10, + HealthStatus: xdsresource.EndpointHealthStatusHealthy, + }, }, + ID: clients.Locality{Zone: "test-zone-2"}, + Weight: 80, }, - ID: clients.Locality{Zone: "test-zone-2"}, - Weight: 80, + }, + priorityName: "test-priority", + childPolicy: &iserviceconfig.BalancerConfig{Name: roundrobin.Name}, + mechanism: DiscoveryMechanism{ + Cluster: testClusterName, + Type: DiscoveryMechanismTypeEDS, + EDSServiceName: testEDSService, + }, + // lrsServer is nil, so LRS policy will not be used. + wantConfig: &clusterimpl.LBConfig{ + Cluster: testClusterName, + EDSServiceName: testEDSService, + ChildPolicy: &iserviceconfig.BalancerConfig{Name: roundrobin.Name}, + }, + // Endpoints weights are the product of normalized locality weight and + // endpoint weight, represented as a fixed-point number in uQ1.31 format. + // Locality weights are normalized as: + // locality 0: 20 / (100) = 0.2 + // locality 1: 80 / (100) = 0.8 + // In fixed-point uQ1.31 format, the weights are: + // locality 0: 0.2 * 2^31 = 429496729 + // locality 1: 0.8 * 2^31 = 1717986918 + // + // The normalized weights of endpoints in each locality are: + // locality 0: endpoint 0: 90 / (100) = 0.9, endpoint 1: 10 / (100) = 0.1 + // locality 1: endpoint 0: 90 / (100) = 0.9, endpoint 1: 10 / (100) = 0.1 + // + // The final endpoint weights are a product of the above normalized weights, + // which turns out to be: + // locality 0, endpoint 0: 0.2 * 0.9 = 386547056 + // locality 0, endpoint 1: 0.2 * 0.1 = 42949672 + // locality 1, endpoint 0: 0.8 * 0.9 = 1546188226 + // locality 1, endpoint 1: 0.8 * 0.1 = 171798691 + wantEndpoints: []resolver.Endpoint{ + testEndpointWithAttrs(resolver.Endpoint{Addresses: []resolver.Address{{Addr: "addr-1-1"}}}, 20, 386547056, "test-priority", &clients.Locality{Zone: "test-zone-1"}), + testEndpointWithAttrs(resolver.Endpoint{Addresses: []resolver.Address{{Addr: "addr-1-2"}}}, 20, 42949672, "test-priority", &clients.Locality{Zone: "test-zone-1"}), + testEndpointWithAttrs(resolver.Endpoint{Addresses: []resolver.Address{{Addr: "addr-2-1"}}}, 80, 1546188226, "test-priority", &clients.Locality{Zone: "test-zone-2"}), + testEndpointWithAttrs(resolver.Endpoint{Addresses: []resolver.Address{{Addr: "addr-2-2"}}}, 80, 171798691, "test-priority", &clients.Locality{Zone: "test-zone-2"}), }, }, - priorityName: "test-priority", - childPolicy: &iserviceconfig.BalancerConfig{Name: roundrobin.Name}, - mechanism: DiscoveryMechanism{ - Cluster: testClusterName, - Type: DiscoveryMechanismTypeEDS, - EDSServiceName: testEDSService, - }, - // lrsServer is nil, so LRS policy will not be used. - wantConfig: &clusterimpl.LBConfig{ - Cluster: testClusterName, - EDSServiceName: testEDSService, - ChildPolicy: &iserviceconfig.BalancerConfig{Name: roundrobin.Name}, - }, - wantEndpoints: []resolver.Endpoint{ - testEndpointWithAttrs(resolver.Endpoint{Addresses: []resolver.Address{{Addr: "addr-1-1"}}}, 20, 90, "test-priority", &clients.Locality{Zone: "test-zone-1"}), - testEndpointWithAttrs(resolver.Endpoint{Addresses: []resolver.Address{{Addr: "addr-1-2"}}}, 20, 10, "test-priority", &clients.Locality{Zone: "test-zone-1"}), - testEndpointWithAttrs(resolver.Endpoint{Addresses: []resolver.Address{{Addr: "addr-2-1"}}}, 80, 90, "test-priority", &clients.Locality{Zone: "test-zone-2"}), - testEndpointWithAttrs(resolver.Endpoint{Addresses: []resolver.Address{{Addr: "addr-2-2"}}}, 80, 10, "test-priority", &clients.Locality{Zone: "test-zone-2"}), - }, - }, { - name: "ring_hash as child", + name: "ring_hash_as_child", localities: []xdsresource.Locality{ { Endpoints: []xdsresource.Endpoint{ @@ -645,25 +902,44 @@ func TestPriorityLocalitiesToClusterImpl(t *testing.T) { Config: &iringhash.LBConfig{MinRingSize: 1, MaxRingSize: 2}, }, }, + // Endpoints weights are the product of normalized locality weight and + // endpoint weight, represented as a fixed-point number in uQ1.31 format. + // Locality weights are normalized as: + // locality 0: 20 / (100) = 0.2 + // locality 1: 80 / (100) = 0.8 + // In fixed-point uQ1.31 format, the weights are: + // locality 0: 0.2 * 2^31 = 429496729 + // locality 1: 0.8 * 2^31 = 1717986918 + // + // The normalized weights of endpoints in each locality are: + // locality 0: endpoint 0: 90 / (100) = 0.9, endpoint 1: 10 / (100) = 0.1 + // locality 1: endpoint 0: 90 / (100) = 0.9, endpoint 1: 10 / (100) = 0.1 + // + // The final endpoint weights are a product of the above normalized weights, + // which turns out to be: + // locality 0, endpoint 0: 0.2 * 0.9 = 386547056 + // locality 0, endpoint 1: 0.2 * 0.1 = 42949672 + // locality 1, endpoint 0: 0.8 * 0.9 = 1546188226 + // locality 1, endpoint 1: 0.8 * 0.1 = 171798691 wantEndpoints: []resolver.Endpoint{ - testEndpointWithAttrs(resolver.Endpoint{Addresses: []resolver.Address{{Addr: "addr-1-1"}}}, 20, 90, "test-priority", &clients.Locality{Zone: "test-zone-1"}), - testEndpointWithAttrs(resolver.Endpoint{Addresses: []resolver.Address{{Addr: "addr-1-2"}}}, 20, 10, "test-priority", &clients.Locality{Zone: "test-zone-1"}), - testEndpointWithAttrs(resolver.Endpoint{Addresses: []resolver.Address{{Addr: "addr-2-1"}}}, 80, 90, "test-priority", &clients.Locality{Zone: "test-zone-2"}), - testEndpointWithAttrs(resolver.Endpoint{Addresses: []resolver.Address{{Addr: "addr-2-2"}}}, 80, 10, "test-priority", &clients.Locality{Zone: "test-zone-2"}), + testEndpointWithAttrs(resolver.Endpoint{Addresses: []resolver.Address{{Addr: "addr-1-1"}}}, 20, 386547056, "test-priority", &clients.Locality{Zone: "test-zone-1"}), + testEndpointWithAttrs(resolver.Endpoint{Addresses: []resolver.Address{{Addr: "addr-1-2"}}}, 20, 42949672, "test-priority", &clients.Locality{Zone: "test-zone-1"}), + testEndpointWithAttrs(resolver.Endpoint{Addresses: []resolver.Address{{Addr: "addr-2-1"}}}, 80, 1546188226, "test-priority", &clients.Locality{Zone: "test-zone-2"}), + testEndpointWithAttrs(resolver.Endpoint{Addresses: []resolver.Address{{Addr: "addr-2-2"}}}, 80, 171798691, "test-priority", &clients.Locality{Zone: "test-zone-2"}), }, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - got, got1, err := priorityLocalitiesToClusterImpl(tt.localities, tt.priorityName, tt.mechanism, nil, tt.childPolicy) + gotConfig, gotEndpoints, err := priorityLocalitiesToClusterImpl(tt.localities, tt.priorityName, tt.mechanism, nil, tt.childPolicy) if (err != nil) != tt.wantErr { t.Fatalf("priorityLocalitiesToClusterImpl() error = %v, wantErr %v", err, tt.wantErr) } - if diff := cmp.Diff(got, tt.wantConfig); diff != "" { - t.Errorf("localitiesToWeightedTarget() diff (-got +want) %v", diff) + if diff := cmp.Diff(gotConfig, tt.wantConfig); diff != "" { + t.Errorf("priorityLocalitiesToClusterImpl() diff (-got +want) %v", diff) } - if diff := cmp.Diff(got1, tt.wantEndpoints, cmp.AllowUnexported(attributes.Attributes{})); diff != "" { - t.Errorf("localitiesToWeightedTarget() diff (-got +want) %v", diff) + if diff := cmp.Diff(gotEndpoints, tt.wantEndpoints, cmp.AllowUnexported(attributes.Attributes{})); diff != "" { + t.Errorf("priorityLocalitiesToClusterImpl() diff (-got +want) %v", diff) } }) } @@ -677,7 +953,7 @@ func testEndpointWithAttrs(endpoint resolver.Endpoint, localityWeight, endpointW } endpoint = hierarchy.SetInEndpoint(endpoint, path) endpoint = wrrlocality.SetAddrInfo(endpoint, wrrlocality.AddrInfo{LocalityWeight: localityWeight}) - endpoint = weight.Set(endpoint, weight.EndpointInfo{Weight: localityWeight * endpointWeight}) + endpoint = weight.Set(endpoint, weight.EndpointInfo{Weight: endpointWeight}) return endpoint }