diff --git a/xds/internal/balancer/clusterresolver/balancerconfig/configbuilder.go b/xds/internal/balancer/clusterresolver/balancerconfig/configbuilder.go new file mode 100644 index 000000000000..2e67d54dadb6 --- /dev/null +++ b/xds/internal/balancer/clusterresolver/balancerconfig/configbuilder.go @@ -0,0 +1,297 @@ +/* + * + * Copyright 2021 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +// Package balancerconfig contains utility functions to build balancer config. +// The built config will generate a tree of balancers with priority, +// cluster_impl, weighted_target, lrs, and roundrobin. +// +// This is in a subpackage of cluster_resolver so that it can be used by the EDS +// balancer. Eventually we will delete the EDS balancer, and replace it with +// cluster_resolver, then we can move the functions to package cluster_resolver, +// and unexport them. +// +// TODO: move and unexport. Read above. +package balancerconfig + +import ( + "encoding/json" + "fmt" + "sort" + + "google.golang.org/grpc/balancer/roundrobin" + "google.golang.org/grpc/balancer/weightedroundrobin" + "google.golang.org/grpc/internal/hierarchy" + internalserviceconfig "google.golang.org/grpc/internal/serviceconfig" + "google.golang.org/grpc/resolver" + "google.golang.org/grpc/xds/internal/balancer/clusterimpl" + "google.golang.org/grpc/xds/internal/balancer/lrs" + "google.golang.org/grpc/xds/internal/balancer/priority" + "google.golang.org/grpc/xds/internal/balancer/weightedtarget" + xdsclient "google.golang.org/grpc/xds/internal/client" +) + +const million = 1000000 + +// PriorityConfig is config for one priority. For example, if there an EDS and a +// DNS, the priority list will be [priorityConfig{EDS}, PriorityConfig{DNS}]. +// +// Each PriorityConfig corresponds to one discovery mechanism from the LBConfig +// generated by the CDS balancer. The CDS balancer resolves the cluster name to +// an ordered list of discovery mechanisms (if the top cluster is an aggregated +// cluster), one for each underlying cluster. +type PriorityConfig struct { + Mechanism DiscoveryMechanism + // EDSResp is set only if type is EDS. + EDSResp xdsclient.EndpointsUpdate + // Addresses is set only if type is DNS. + Addresses []string +} + +// BuildPriorityConfigJSON builds balancer config for the passed in +// priorities. +// +// The built tree of balancers (see test for the output struct). +// +// ┌────────┐ +// │priority│ +// └┬──────┬┘ +// │ │ +// ┌───────────▼┐ ┌▼───────────┐ +// │cluster_impl│ │cluster_impl│ +// └─┬──────────┘ └──────────┬─┘ +// │ │ +// ┌──────────────▼─┐ ┌─▼──────────────┐ +// │locality_picking│ │locality_picking│ +// └┬──────────────┬┘ └┬──────────────┬┘ +// │ │ │ │ +// ┌─▼─┐ ┌─▼─┐ ┌─▼─┐ ┌─▼─┐ +// │LRS│ │LRS│ │LRS│ │LRS│ +// └─┬─┘ └─┬─┘ └─┬─┘ └─┬─┘ +// │ │ │ │ +// ┌──────────▼─────┐ ┌─────▼──────────┐ ┌──────────▼─────┐ ┌─────▼──────────┐ +// │endpoint_picking│ │endpoint_picking│ │endpoint_picking│ │endpoint_picking│ +// └────────────────┘ └────────────────┘ └────────────────┘ └────────────────┘ +// +// If endpointPickingPolicy is nil, roundrobin will be used. +// +// Custom locality picking policy isn't support, and weighted_target is always +// used. +// +// TODO: support setting locality picking policy, and add a parameter for +// locality picking policy. +func BuildPriorityConfigJSON(priorities []PriorityConfig, endpointPickingPolicy *internalserviceconfig.BalancerConfig) ([]byte, []resolver.Address, error) { + pc, addrs := buildPriorityConfig(priorities, endpointPickingPolicy) + ret, err := json.Marshal(pc) + if err != nil { + return nil, nil, fmt.Errorf("failed to marshal built priority config struct into json: %v", err) + } + return ret, addrs, nil +} + +func buildPriorityConfig(priorities []PriorityConfig, endpointPickingPolicy *internalserviceconfig.BalancerConfig) (*priority.LBConfig, []resolver.Address) { + var ( + retConfig = &priority.LBConfig{Children: make(map[string]*priority.Child)} + retAddrs []resolver.Address + ) + for i, p := range priorities { + switch p.Mechanism.Type { + case DiscoveryMechanismTypeEDS: + names, configs, addrs := buildClusterImplConfigForEDS(i, p.EDSResp, p.Mechanism, endpointPickingPolicy) + retConfig.Priorities = append(retConfig.Priorities, names...) + for n, c := range configs { + retConfig.Children[n] = &priority.Child{ + Config: &internalserviceconfig.BalancerConfig{Name: clusterimpl.Name, Config: c}, + // Ignore all re-resolution from EDS children. + IgnoreReresolutionRequests: true, + } + } + retAddrs = append(retAddrs, addrs...) + case DiscoveryMechanismTypeLogicalDNS: + name, config, addrs := buildClusterImplConfigForDNS(i, p.Addresses) + retConfig.Priorities = append(retConfig.Priorities, name) + retConfig.Children[name] = &priority.Child{ + Config: &internalserviceconfig.BalancerConfig{Name: clusterimpl.Name, Config: config}, + // Not ignore re-resolution from DNS children, they will trigger + // DNS to re-resolve. + IgnoreReresolutionRequests: false, + } + retAddrs = append(retAddrs, addrs...) + } + } + return retConfig, retAddrs +} + +func buildClusterImplConfigForDNS(parentPriority int, addrStrs []string) (string, *clusterimpl.LBConfig, []resolver.Address) { + // Endpoint picking policy for DNS is hardcoded to pick_first. + const childPolicy = "pick_first" + var retAddrs []resolver.Address + pName := fmt.Sprintf("priority-%v", parentPriority) + for _, addrStr := range addrStrs { + retAddrs = append(retAddrs, hierarchy.Set(resolver.Address{Addr: addrStr}, []string{pName})) + } + return pName, &clusterimpl.LBConfig{ChildPolicy: &internalserviceconfig.BalancerConfig{Name: childPolicy}}, retAddrs +} + +// buildClusterImplConfigForEDS returns a list of cluster_impl configs, one for +// each priority, sorted by priority, and the addresses for each priority (with +// hierarchy attributes set). +// +// For example, if there are two priorities, the returned values will be +// - ["p0", "p1"] +// - map{"p0":p0_config, "p1":p1_config} +// - [p0_address_0, p0_address_1, p1_address_0, p1_address_1] +// - p0 addresses' hierarchy attributes are set to p0 +func buildClusterImplConfigForEDS(parentPriority int, edsResp xdsclient.EndpointsUpdate, mechanism DiscoveryMechanism, endpointPickingPolicy *internalserviceconfig.BalancerConfig) ([]string, map[string]*clusterimpl.LBConfig, []resolver.Address) { + var ( + retNames []string + retAddrs []resolver.Address + retConfigs = make(map[string]*clusterimpl.LBConfig) + ) + + if endpointPickingPolicy == nil { + endpointPickingPolicy = &internalserviceconfig.BalancerConfig{Name: roundrobin.Name} + } + + drops := make([]clusterimpl.DropConfig, 0, len(edsResp.Drops)) + for _, d := range edsResp.Drops { + drops = append(drops, clusterimpl.DropConfig{ + Category: d.Category, + RequestsPerMillion: d.Numerator * million / d.Denominator, + }) + } + + priorityChildNames, priorities := groupLocalitiesByPriority(edsResp.Localities) + for _, priorityName := range priorityChildNames { + priorityLocalities := priorities[priorityName] + // Prepend parent priority to the priority names, to avoid duplicates. + pName := fmt.Sprintf("priority-%v-%v", parentPriority, priorityName) + retNames = append(retNames, pName) + wtConfig, addrs := localitiesToWeightedTarget(priorityLocalities, pName, endpointPickingPolicy, mechanism.LoadReportingServerName, mechanism.Cluster, mechanism.EDSServiceName) + retConfigs[pName] = &clusterimpl.LBConfig{ + Cluster: mechanism.Cluster, + EDSServiceName: mechanism.EDSServiceName, + ChildPolicy: &internalserviceconfig.BalancerConfig{Name: weightedtarget.Name, Config: wtConfig}, + LoadReportingServerName: mechanism.LoadReportingServerName, + MaxConcurrentRequests: mechanism.MaxConcurrentRequests, + DropCategories: drops, + } + retAddrs = append(retAddrs, addrs...) + } + + return retNames, retConfigs, retAddrs +} + +// groupLocalitiesByPriority returns the localities grouped by priority. +// +// It also returns a list of strings where each string represents a priority, +// and the list is sorted from higher priority to lower priority. +// +// For example, for L0-p0, L1-p0, L2-p1, results will be +// - ["p0", "p1"] +// - map{"p0":[L0, L1], "p1":[L2]} +func groupLocalitiesByPriority(localities []xdsclient.Locality) ([]string, map[string][]xdsclient.Locality) { + var priorityIntSlice []int + priorities := make(map[string][]xdsclient.Locality) + for _, locality := range localities { + if locality.Weight == 0 { + continue + } + priorityName := fmt.Sprintf("%v", locality.Priority) + priorities[priorityName] = append(priorities[priorityName], locality) + priorityIntSlice = append(priorityIntSlice, int(locality.Priority)) + } + // Sort the priorities based on the int value, deduplicate, and then turn + // the sorted list into a string list. This will be child names, in priority + // order. + sort.Ints(priorityIntSlice) + priorityIntSliceDeduped := dedupSortedIntSlice(priorityIntSlice) + priorityNameSlice := make([]string, 0, len(priorityIntSliceDeduped)) + for _, p := range priorityIntSliceDeduped { + priorityNameSlice = append(priorityNameSlice, fmt.Sprintf("%v", p)) + } + return priorityNameSlice, priorities +} + +func dedupSortedIntSlice(a []int) []int { + if len(a) == 0 { + return a + } + i, j := 0, 1 + for ; j < len(a); j++ { + if a[i] == a[j] { + continue + } + i++ + if i != j { + a[i] = a[j] + } + } + return a[:i+1] +} + +// localitiesToWeightedTarget takes a list of localities (with the same +// priority), and generates a weighted target config, and list of addresses. +// +// The addresses have path hierarchy set to [priority-name, locality-name], so +// priority and weighted target know which child policy they are for. +func localitiesToWeightedTarget(localities []xdsclient.Locality, priorityName string, childPolicy *internalserviceconfig.BalancerConfig, lrsServer *string, cluster, edsService string) (*weightedtarget.LBConfig, []resolver.Address) { + weightedTargets := make(map[string]weightedtarget.Target) + var addrs []resolver.Address + for _, locality := range localities { + localityStr, err := locality.ID.ToString() + if err != nil { + localityStr = fmt.Sprintf("%+v", locality.ID) + } + + child := childPolicy + // If lrsServer is not set, we can skip this extra layer of the LRS + // policy. + if lrsServer != nil { + localityID := locality.ID + child = &internalserviceconfig.BalancerConfig{ + Name: lrs.Name, + Config: &lrs.LBConfig{ + ClusterName: cluster, + EDSServiceName: edsService, + ChildPolicy: childPolicy, + LoadReportingServerName: *lrsServer, + Locality: &localityID, + }, + } + } + weightedTargets[localityStr] = weightedtarget.Target{Weight: locality.Weight, ChildPolicy: child} + + for _, endpoint := range locality.Endpoints { + // Filter out all "unhealthy" endpoints (unknown and healthy are + // both considered to be healthy: + // https://www.envoyproxy.io/docs/envoy/latest/api-v2/api/v2/core/health_check.proto#envoy-api-enum-core-healthstatus). + if endpoint.HealthStatus != xdsclient.EndpointHealthStatusHealthy && endpoint.HealthStatus != xdsclient.EndpointHealthStatusUnknown { + continue + } + + addr := resolver.Address{Addr: endpoint.Address} + if childPolicy.Name == weightedroundrobin.Name && endpoint.Weight != 0 { + ai := weightedroundrobin.AddrInfo{Weight: endpoint.Weight} + addr = weightedroundrobin.SetAddrInfo(addr, ai) + } + addr = hierarchy.Set(addr, []string{priorityName, localityStr}) + addrs = append(addrs, addr) + } + } + return &weightedtarget.LBConfig{Targets: weightedTargets}, addrs +} diff --git a/xds/internal/balancer/clusterresolver/balancerconfig/configbuilder_test.go b/xds/internal/balancer/clusterresolver/balancerconfig/configbuilder_test.go new file mode 100644 index 000000000000..273dc6233478 --- /dev/null +++ b/xds/internal/balancer/clusterresolver/balancerconfig/configbuilder_test.go @@ -0,0 +1,839 @@ +/* + * + * Copyright 2021 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package balancerconfig + +import ( + "bytes" + "encoding/json" + "fmt" + "sort" + "testing" + + "github.com/google/go-cmp/cmp" + "google.golang.org/grpc/attributes" + "google.golang.org/grpc/balancer" + "google.golang.org/grpc/balancer/roundrobin" + "google.golang.org/grpc/balancer/weightedroundrobin" + "google.golang.org/grpc/internal/hierarchy" + internalserviceconfig "google.golang.org/grpc/internal/serviceconfig" + "google.golang.org/grpc/resolver" + "google.golang.org/grpc/xds/internal" + "google.golang.org/grpc/xds/internal/balancer/clusterimpl" + "google.golang.org/grpc/xds/internal/balancer/lrs" + "google.golang.org/grpc/xds/internal/balancer/priority" + "google.golang.org/grpc/xds/internal/balancer/weightedtarget" + xdsclient "google.golang.org/grpc/xds/internal/client" +) + +const ( + testClusterName = "test-cluster-name" + testLRSServer = "test-lrs-server" + testMaxRequests = 314 + testEDSServcie = "test-eds-service-name" + testEDSServiceName = "service-name-from-parent" + testDropCategory = "test-drops" + testDropOverMillion = 1 + + localityCount = 5 + addressPerLocality = 2 +) + +var ( + testLocalityIDs []internal.LocalityID + testAddressStrs [][]string + testEndpoints [][]xdsclient.Endpoint + + testLocalitiesP0, testLocalitiesP1 []xdsclient.Locality + + addrCmpOpts = cmp.Options{ + cmp.AllowUnexported(attributes.Attributes{}), + cmp.Transformer("SortAddrs", func(in []resolver.Address) []resolver.Address { + out := append([]resolver.Address(nil), in...) // Copy input to avoid mutating it + sort.Slice(out, func(i, j int) bool { + return out[i].Addr < out[j].Addr + }) + return out + })} +) + +func init() { + for i := 0; i < localityCount; i++ { + testLocalityIDs = append(testLocalityIDs, internal.LocalityID{Zone: fmt.Sprintf("test-zone-%d", i)}) + var ( + addrs []string + ends []xdsclient.Endpoint + ) + for j := 0; j < addressPerLocality; j++ { + addr := fmt.Sprintf("addr-%d-%d", i, j) + addrs = append(addrs, addr) + ends = append(ends, xdsclient.Endpoint{ + Address: addr, + HealthStatus: xdsclient.EndpointHealthStatusHealthy, + }) + } + testAddressStrs = append(testAddressStrs, addrs) + testEndpoints = append(testEndpoints, ends) + } + + testLocalitiesP0 = []xdsclient.Locality{ + { + Endpoints: testEndpoints[0], + ID: testLocalityIDs[0], + Weight: 20, + Priority: 0, + }, + { + Endpoints: testEndpoints[1], + ID: testLocalityIDs[1], + Weight: 80, + Priority: 0, + }, + } + testLocalitiesP1 = []xdsclient.Locality{ + { + Endpoints: testEndpoints[2], + ID: testLocalityIDs[2], + Weight: 20, + Priority: 1, + }, + { + Endpoints: testEndpoints[3], + ID: testLocalityIDs[3], + Weight: 80, + Priority: 1, + }, + } +} + +// TestBuildPriorityConfigJSON is a sanity check that the built balancer config +// can be parsed. The behavior test is covered by TestBuildPriorityConfig. +func TestBuildPriorityConfigJSON(t *testing.T) { + gotConfig, _, err := BuildPriorityConfigJSON([]PriorityConfig{ + { + Mechanism: DiscoveryMechanism{ + Cluster: testClusterName, + LoadReportingServerName: newString(testLRSServer), + MaxConcurrentRequests: newUint32(testMaxRequests), + Type: DiscoveryMechanismTypeEDS, + EDSServiceName: testEDSServiceName, + }, + EDSResp: xdsclient.EndpointsUpdate{ + Drops: []xdsclient.OverloadDropConfig{ + { + Category: testDropCategory, + Numerator: testDropOverMillion, + Denominator: million, + }, + }, + Localities: []xdsclient.Locality{ + testLocalitiesP0[0], + testLocalitiesP0[1], + testLocalitiesP1[0], + testLocalitiesP1[1], + }, + }, + }, + { + Mechanism: DiscoveryMechanism{ + Type: DiscoveryMechanismTypeLogicalDNS, + }, + Addresses: testAddressStrs[4], + }, + }, nil) + if err != nil { + t.Fatalf("buildPriorityConfigJSON(...) failed: %v", err) + } + + var prettyGot bytes.Buffer + if err := json.Indent(&prettyGot, gotConfig, ">>> ", " "); err != nil { + t.Fatalf("json.Indent() failed: %v", err) + } + // Print the indented json if this test fails. + t.Log(prettyGot.String()) + + priorityB := balancer.Get(priority.Name) + if _, err = priorityB.(balancer.ConfigParser).ParseConfig(gotConfig); err != nil { + t.Fatalf("ParseConfig(%+v) failed: %v", gotConfig, err) + } +} + +func TestBuildPriorityConfig(t *testing.T) { + gotConfig, gotAddrs := buildPriorityConfig([]PriorityConfig{ + { + Mechanism: DiscoveryMechanism{ + Cluster: testClusterName, + LoadReportingServerName: newString(testLRSServer), + MaxConcurrentRequests: newUint32(testMaxRequests), + Type: DiscoveryMechanismTypeEDS, + EDSServiceName: testEDSServiceName, + }, + EDSResp: xdsclient.EndpointsUpdate{ + Drops: []xdsclient.OverloadDropConfig{ + { + Category: testDropCategory, + Numerator: testDropOverMillion, + Denominator: million, + }, + }, + Localities: []xdsclient.Locality{ + testLocalitiesP0[0], + testLocalitiesP0[1], + testLocalitiesP1[0], + testLocalitiesP1[1], + }, + }, + }, + { + Mechanism: DiscoveryMechanism{ + Type: DiscoveryMechanismTypeLogicalDNS, + }, + Addresses: testAddressStrs[4], + }, + }, nil) + + wantConfig := &priority.LBConfig{ + Children: map[string]*priority.Child{ + "priority-0-0": { + Config: &internalserviceconfig.BalancerConfig{ + Name: clusterimpl.Name, + Config: &clusterimpl.LBConfig{ + Cluster: testClusterName, + EDSServiceName: testEDSServiceName, + LoadReportingServerName: newString(testLRSServer), + MaxConcurrentRequests: newUint32(testMaxRequests), + DropCategories: []clusterimpl.DropConfig{ + { + Category: testDropCategory, + RequestsPerMillion: testDropOverMillion, + }, + }, + ChildPolicy: &internalserviceconfig.BalancerConfig{ + Name: weightedtarget.Name, + Config: &weightedtarget.LBConfig{ + Targets: map[string]weightedtarget.Target{ + assertString(testLocalityIDs[0].ToString): { + Weight: 20, + ChildPolicy: &internalserviceconfig.BalancerConfig{ + Name: lrs.Name, + Config: &lrs.LBConfig{ + ClusterName: testClusterName, + EDSServiceName: testEDSServiceName, + LoadReportingServerName: testLRSServer, + Locality: &testLocalityIDs[0], + ChildPolicy: &internalserviceconfig.BalancerConfig{Name: roundrobin.Name}, + }, + }, + }, + assertString(testLocalityIDs[1].ToString): { + Weight: 80, + ChildPolicy: &internalserviceconfig.BalancerConfig{ + Name: lrs.Name, + Config: &lrs.LBConfig{ + ClusterName: testClusterName, + EDSServiceName: testEDSServiceName, + LoadReportingServerName: testLRSServer, + Locality: &testLocalityIDs[1], + ChildPolicy: &internalserviceconfig.BalancerConfig{Name: roundrobin.Name}, + }, + }, + }, + }, + }, + }, + }, + }, + IgnoreReresolutionRequests: true, + }, + "priority-0-1": { + Config: &internalserviceconfig.BalancerConfig{ + Name: clusterimpl.Name, + Config: &clusterimpl.LBConfig{ + Cluster: testClusterName, + EDSServiceName: testEDSServiceName, + LoadReportingServerName: newString(testLRSServer), + MaxConcurrentRequests: newUint32(testMaxRequests), + DropCategories: []clusterimpl.DropConfig{ + { + Category: testDropCategory, + RequestsPerMillion: testDropOverMillion, + }, + }, + ChildPolicy: &internalserviceconfig.BalancerConfig{ + Name: weightedtarget.Name, + Config: &weightedtarget.LBConfig{ + Targets: map[string]weightedtarget.Target{ + assertString(testLocalityIDs[2].ToString): { + Weight: 20, + ChildPolicy: &internalserviceconfig.BalancerConfig{ + Name: lrs.Name, + Config: &lrs.LBConfig{ + ClusterName: testClusterName, + EDSServiceName: testEDSServiceName, + LoadReportingServerName: testLRSServer, + Locality: &testLocalityIDs[2], + ChildPolicy: &internalserviceconfig.BalancerConfig{Name: roundrobin.Name}, + }, + }, + }, + assertString(testLocalityIDs[3].ToString): { + Weight: 80, + ChildPolicy: &internalserviceconfig.BalancerConfig{ + Name: lrs.Name, + Config: &lrs.LBConfig{ + ClusterName: testClusterName, + EDSServiceName: testEDSServiceName, + LoadReportingServerName: testLRSServer, + Locality: &testLocalityIDs[3], + ChildPolicy: &internalserviceconfig.BalancerConfig{Name: roundrobin.Name}, + }, + }, + }, + }, + }, + }, + }, + }, + IgnoreReresolutionRequests: true, + }, + "priority-1": { + Config: &internalserviceconfig.BalancerConfig{ + Name: clusterimpl.Name, + Config: &clusterimpl.LBConfig{ + ChildPolicy: &internalserviceconfig.BalancerConfig{Name: "pick_first"}, + }, + }, + IgnoreReresolutionRequests: false, + }, + }, + Priorities: []string{"priority-0-0", "priority-0-1", "priority-1"}, + } + wantAddrs := []resolver.Address{ + hierarchy.Set(resolver.Address{Addr: testAddressStrs[0][0]}, []string{"priority-0-0", assertString(testLocalityIDs[0].ToString)}), + hierarchy.Set(resolver.Address{Addr: testAddressStrs[0][1]}, []string{"priority-0-0", assertString(testLocalityIDs[0].ToString)}), + hierarchy.Set(resolver.Address{Addr: testAddressStrs[1][0]}, []string{"priority-0-0", assertString(testLocalityIDs[1].ToString)}), + hierarchy.Set(resolver.Address{Addr: testAddressStrs[1][1]}, []string{"priority-0-0", assertString(testLocalityIDs[1].ToString)}), + hierarchy.Set(resolver.Address{Addr: testAddressStrs[2][0]}, []string{"priority-0-1", assertString(testLocalityIDs[2].ToString)}), + hierarchy.Set(resolver.Address{Addr: testAddressStrs[2][1]}, []string{"priority-0-1", assertString(testLocalityIDs[2].ToString)}), + hierarchy.Set(resolver.Address{Addr: testAddressStrs[3][0]}, []string{"priority-0-1", assertString(testLocalityIDs[3].ToString)}), + hierarchy.Set(resolver.Address{Addr: testAddressStrs[3][1]}, []string{"priority-0-1", assertString(testLocalityIDs[3].ToString)}), + hierarchy.Set(resolver.Address{Addr: testAddressStrs[4][0]}, []string{"priority-1"}), + hierarchy.Set(resolver.Address{Addr: testAddressStrs[4][1]}, []string{"priority-1"}), + } + + if diff := cmp.Diff(gotConfig, wantConfig); diff != "" { + t.Errorf("buildPriorityConfig() diff (-got +want) %v", diff) + } + if diff := cmp.Diff(gotAddrs, wantAddrs, addrCmpOpts); diff != "" { + t.Errorf("buildPriorityConfig() diff (-got +want) %v", diff) + } +} + +func TestBuildClusterImplConfigForDNS(t *testing.T) { + gotName, gotConfig, gotAddrs := buildClusterImplConfigForDNS(3, testAddressStrs[0]) + wantName := "priority-3" + wantConfig := &clusterimpl.LBConfig{ + ChildPolicy: &internalserviceconfig.BalancerConfig{ + Name: "pick_first", + }, + } + wantAddrs := []resolver.Address{ + hierarchy.Set(resolver.Address{Addr: testAddressStrs[0][0]}, []string{"priority-3"}), + hierarchy.Set(resolver.Address{Addr: testAddressStrs[0][1]}, []string{"priority-3"}), + } + + if diff := cmp.Diff(gotName, wantName); diff != "" { + t.Errorf("buildClusterImplConfigForDNS() diff (-got +want) %v", diff) + } + if diff := cmp.Diff(gotConfig, wantConfig); diff != "" { + t.Errorf("buildClusterImplConfigForDNS() diff (-got +want) %v", diff) + } + if diff := cmp.Diff(gotAddrs, wantAddrs, addrCmpOpts); diff != "" { + t.Errorf("buildClusterImplConfigForDNS() diff (-got +want) %v", diff) + } +} + +func TestBuildClusterImplConfigForEDS(t *testing.T) { + gotNames, gotConfigs, gotAddrs := buildClusterImplConfigForEDS( + 2, + xdsclient.EndpointsUpdate{ + Drops: []xdsclient.OverloadDropConfig{ + { + Category: testDropCategory, + Numerator: testDropOverMillion, + Denominator: million, + }, + }, + Localities: []xdsclient.Locality{ + { + Endpoints: testEndpoints[3], + ID: testLocalityIDs[3], + Weight: 80, + Priority: 1, + }, { + Endpoints: testEndpoints[1], + ID: testLocalityIDs[1], + Weight: 80, + Priority: 0, + }, { + Endpoints: testEndpoints[2], + ID: testLocalityIDs[2], + Weight: 20, + Priority: 1, + }, { + Endpoints: testEndpoints[0], + ID: testLocalityIDs[0], + Weight: 20, + Priority: 0, + }, + }, + }, + DiscoveryMechanism{ + Cluster: testClusterName, + MaxConcurrentRequests: newUint32(testMaxRequests), + LoadReportingServerName: newString(testLRSServer), + Type: DiscoveryMechanismTypeEDS, + EDSServiceName: testEDSServiceName, + }, + nil, + ) + + wantNames := []string{ + fmt.Sprintf("priority-%v-%v", 2, 0), + fmt.Sprintf("priority-%v-%v", 2, 1), + } + wantConfigs := map[string]*clusterimpl.LBConfig{ + "priority-2-0": { + Cluster: testClusterName, + EDSServiceName: testEDSServiceName, + LoadReportingServerName: newString(testLRSServer), + MaxConcurrentRequests: newUint32(testMaxRequests), + DropCategories: []clusterimpl.DropConfig{ + { + Category: testDropCategory, + RequestsPerMillion: testDropOverMillion, + }, + }, + ChildPolicy: &internalserviceconfig.BalancerConfig{ + Name: weightedtarget.Name, + Config: &weightedtarget.LBConfig{ + Targets: map[string]weightedtarget.Target{ + assertString(testLocalityIDs[0].ToString): { + Weight: 20, + ChildPolicy: &internalserviceconfig.BalancerConfig{ + Name: lrs.Name, + Config: &lrs.LBConfig{ + ClusterName: testClusterName, + EDSServiceName: testEDSServiceName, + LoadReportingServerName: testLRSServer, + Locality: &testLocalityIDs[0], + ChildPolicy: &internalserviceconfig.BalancerConfig{Name: roundrobin.Name}, + }, + }, + }, + assertString(testLocalityIDs[1].ToString): { + Weight: 80, + ChildPolicy: &internalserviceconfig.BalancerConfig{ + Name: lrs.Name, + Config: &lrs.LBConfig{ + ClusterName: testClusterName, + EDSServiceName: testEDSServiceName, + LoadReportingServerName: testLRSServer, + Locality: &testLocalityIDs[1], + ChildPolicy: &internalserviceconfig.BalancerConfig{Name: roundrobin.Name}, + }, + }, + }, + }, + }, + }, + }, + "priority-2-1": { + Cluster: testClusterName, + EDSServiceName: testEDSServiceName, + LoadReportingServerName: newString(testLRSServer), + MaxConcurrentRequests: newUint32(testMaxRequests), + DropCategories: []clusterimpl.DropConfig{ + { + Category: testDropCategory, + RequestsPerMillion: testDropOverMillion, + }, + }, + ChildPolicy: &internalserviceconfig.BalancerConfig{ + Name: weightedtarget.Name, + Config: &weightedtarget.LBConfig{ + Targets: map[string]weightedtarget.Target{ + assertString(testLocalityIDs[2].ToString): { + Weight: 20, + ChildPolicy: &internalserviceconfig.BalancerConfig{ + Name: lrs.Name, + Config: &lrs.LBConfig{ + ClusterName: testClusterName, + EDSServiceName: testEDSServiceName, + LoadReportingServerName: testLRSServer, + Locality: &testLocalityIDs[2], + ChildPolicy: &internalserviceconfig.BalancerConfig{Name: roundrobin.Name}, + }, + }, + }, + assertString(testLocalityIDs[3].ToString): { + Weight: 80, + ChildPolicy: &internalserviceconfig.BalancerConfig{ + Name: lrs.Name, + Config: &lrs.LBConfig{ + ClusterName: testClusterName, + EDSServiceName: testEDSServiceName, + LoadReportingServerName: testLRSServer, + Locality: &testLocalityIDs[3], + ChildPolicy: &internalserviceconfig.BalancerConfig{Name: roundrobin.Name}, + }, + }, + }, + }, + }, + }, + }, + } + wantAddrs := []resolver.Address{ + hierarchy.Set(resolver.Address{Addr: testAddressStrs[0][0]}, []string{"priority-2-0", assertString(testLocalityIDs[0].ToString)}), + hierarchy.Set(resolver.Address{Addr: testAddressStrs[0][1]}, []string{"priority-2-0", assertString(testLocalityIDs[0].ToString)}), + hierarchy.Set(resolver.Address{Addr: testAddressStrs[1][0]}, []string{"priority-2-0", assertString(testLocalityIDs[1].ToString)}), + hierarchy.Set(resolver.Address{Addr: testAddressStrs[1][1]}, []string{"priority-2-0", assertString(testLocalityIDs[1].ToString)}), + hierarchy.Set(resolver.Address{Addr: testAddressStrs[2][0]}, []string{"priority-2-1", assertString(testLocalityIDs[2].ToString)}), + hierarchy.Set(resolver.Address{Addr: testAddressStrs[2][1]}, []string{"priority-2-1", assertString(testLocalityIDs[2].ToString)}), + hierarchy.Set(resolver.Address{Addr: testAddressStrs[3][0]}, []string{"priority-2-1", assertString(testLocalityIDs[3].ToString)}), + hierarchy.Set(resolver.Address{Addr: testAddressStrs[3][1]}, []string{"priority-2-1", assertString(testLocalityIDs[3].ToString)}), + } + + if diff := cmp.Diff(gotNames, wantNames); diff != "" { + t.Errorf("buildClusterImplConfigForEDS() diff (-got +want) %v", diff) + } + if diff := cmp.Diff(gotConfigs, wantConfigs); diff != "" { + t.Errorf("buildClusterImplConfigForEDS() diff (-got +want) %v", diff) + } + if diff := cmp.Diff(gotAddrs, wantAddrs, addrCmpOpts); diff != "" { + t.Errorf("buildClusterImplConfigForEDS() diff (-got +want) %v", diff) + } + +} + +func TestGroupLocalitiesByPriority(t *testing.T) { + tests := []struct { + name string + localities []xdsclient.Locality + wantPriorities []string + wantLocalities map[string][]xdsclient.Locality + }{ + { + name: "1 locality 1 priority", + localities: []xdsclient.Locality{testLocalitiesP0[0]}, + wantPriorities: []string{"0"}, + wantLocalities: map[string][]xdsclient.Locality{ + "0": {testLocalitiesP0[0]}, + }, + }, + { + name: "2 locality 1 priority", + localities: []xdsclient.Locality{testLocalitiesP0[0], testLocalitiesP0[1]}, + wantPriorities: []string{"0"}, + wantLocalities: map[string][]xdsclient.Locality{ + "0": {testLocalitiesP0[0], testLocalitiesP0[1]}, + }, + }, + { + name: "1 locality in each", + localities: []xdsclient.Locality{testLocalitiesP0[0], testLocalitiesP1[0]}, + wantPriorities: []string{"0", "1"}, + wantLocalities: map[string][]xdsclient.Locality{ + "0": {testLocalitiesP0[0]}, + "1": {testLocalitiesP1[0]}, + }, + }, + { + name: "2 localities in each sorted", + localities: []xdsclient.Locality{ + testLocalitiesP0[0], testLocalitiesP0[1], + testLocalitiesP1[0], testLocalitiesP1[1]}, + wantPriorities: []string{"0", "1"}, + wantLocalities: map[string][]xdsclient.Locality{ + "0": {testLocalitiesP0[0], testLocalitiesP0[1]}, + "1": {testLocalitiesP1[0], testLocalitiesP1[1]}, + }, + }, + { + // The localities are given in order [p1, p0, p1, p0], but the + // returned priority list must be sorted [p0, p1], because the list + // order is the priority order. + name: "2 localities in each needs to sort", + localities: []xdsclient.Locality{ + testLocalitiesP1[1], testLocalitiesP0[1], + testLocalitiesP1[0], testLocalitiesP0[0]}, + wantPriorities: []string{"0", "1"}, + wantLocalities: map[string][]xdsclient.Locality{ + "0": {testLocalitiesP0[1], testLocalitiesP0[0]}, + "1": {testLocalitiesP1[1], testLocalitiesP1[0]}, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + gotPriorities, gotLocalities := groupLocalitiesByPriority(tt.localities) + if diff := cmp.Diff(gotPriorities, tt.wantPriorities); diff != "" { + t.Errorf("groupLocalitiesByPriority() diff(-got +want) %v", diff) + } + if diff := cmp.Diff(gotLocalities, tt.wantLocalities); diff != "" { + t.Errorf("groupLocalitiesByPriority() diff(-got +want) %v", diff) + } + }) + } +} + +func TestDedupSortedIntSlice(t *testing.T) { + tests := []struct { + name string + a []int + want []int + }{ + { + name: "empty", + a: []int{}, + want: []int{}, + }, + { + name: "no dup", + a: []int{0, 1, 2, 3}, + want: []int{0, 1, 2, 3}, + }, + { + name: "with dup", + a: []int{0, 0, 1, 1, 1, 2, 3}, + want: []int{0, 1, 2, 3}, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := dedupSortedIntSlice(tt.a); !cmp.Equal(got, tt.want) { + t.Errorf("dedupSortedIntSlice() = %v, want %v, diff %v", got, tt.want, cmp.Diff(got, tt.want)) + } + }) + } +} + +func TestLocalitiesToWeightedTarget(t *testing.T) { + tests := []struct { + name string + localities []xdsclient.Locality + priorityName string + childPolicy *internalserviceconfig.BalancerConfig + lrsServer *string + cluster string + edsService string + wantConfig *weightedtarget.LBConfig + wantAddrs []resolver.Address + }{ + { + name: "roundrobin as child, with LRS", + localities: []xdsclient.Locality{ + { + Endpoints: []xdsclient.Endpoint{ + {Address: "addr-1-1", HealthStatus: xdsclient.EndpointHealthStatusHealthy}, + {Address: "addr-1-2", HealthStatus: xdsclient.EndpointHealthStatusHealthy}, + }, + ID: internal.LocalityID{Zone: "test-zone-1"}, + Weight: 20, + }, + { + Endpoints: []xdsclient.Endpoint{ + {Address: "addr-2-1", HealthStatus: xdsclient.EndpointHealthStatusHealthy}, + {Address: "addr-2-2", HealthStatus: xdsclient.EndpointHealthStatusHealthy}, + }, + ID: internal.LocalityID{Zone: "test-zone-2"}, + Weight: 80, + }, + }, + priorityName: "test-priority", + childPolicy: &internalserviceconfig.BalancerConfig{Name: roundrobin.Name}, + lrsServer: newString("test-lrs-server"), + cluster: "test-cluster", + edsService: "test-eds-service", + wantConfig: &weightedtarget.LBConfig{ + Targets: map[string]weightedtarget.Target{ + assertString(internal.LocalityID{Zone: "test-zone-1"}.ToString): { + Weight: 20, + ChildPolicy: &internalserviceconfig.BalancerConfig{ + Name: lrs.Name, + Config: &lrs.LBConfig{ + ClusterName: "test-cluster", + EDSServiceName: "test-eds-service", + LoadReportingServerName: "test-lrs-server", + Locality: &internal.LocalityID{Zone: "test-zone-1"}, + ChildPolicy: &internalserviceconfig.BalancerConfig{Name: roundrobin.Name}, + }, + }, + }, + assertString(internal.LocalityID{Zone: "test-zone-2"}.ToString): { + Weight: 80, + ChildPolicy: &internalserviceconfig.BalancerConfig{ + Name: lrs.Name, + Config: &lrs.LBConfig{ + ClusterName: "test-cluster", + EDSServiceName: "test-eds-service", + LoadReportingServerName: "test-lrs-server", + Locality: &internal.LocalityID{Zone: "test-zone-2"}, + ChildPolicy: &internalserviceconfig.BalancerConfig{Name: roundrobin.Name}, + }, + }, + }, + }, + }, + wantAddrs: []resolver.Address{ + hierarchy.Set(resolver.Address{Addr: "addr-1-1"}, []string{"test-priority", assertString(internal.LocalityID{Zone: "test-zone-1"}.ToString)}), + hierarchy.Set(resolver.Address{Addr: "addr-1-2"}, []string{"test-priority", assertString(internal.LocalityID{Zone: "test-zone-1"}.ToString)}), + hierarchy.Set(resolver.Address{Addr: "addr-2-1"}, []string{"test-priority", assertString(internal.LocalityID{Zone: "test-zone-2"}.ToString)}), + hierarchy.Set(resolver.Address{Addr: "addr-2-2"}, []string{"test-priority", assertString(internal.LocalityID{Zone: "test-zone-2"}.ToString)}), + }, + }, + { + name: "roundrobin as child, no LRS", + localities: []xdsclient.Locality{ + { + Endpoints: []xdsclient.Endpoint{ + {Address: "addr-1-1", HealthStatus: xdsclient.EndpointHealthStatusHealthy}, + {Address: "addr-1-2", HealthStatus: xdsclient.EndpointHealthStatusHealthy}, + }, + ID: internal.LocalityID{Zone: "test-zone-1"}, + Weight: 20, + }, + { + Endpoints: []xdsclient.Endpoint{ + {Address: "addr-2-1", HealthStatus: xdsclient.EndpointHealthStatusHealthy}, + {Address: "addr-2-2", HealthStatus: xdsclient.EndpointHealthStatusHealthy}, + }, + ID: internal.LocalityID{Zone: "test-zone-2"}, + Weight: 80, + }, + }, + priorityName: "test-priority", + childPolicy: &internalserviceconfig.BalancerConfig{Name: roundrobin.Name}, + // lrsServer is nil, so LRS policy will not be used. + wantConfig: &weightedtarget.LBConfig{ + Targets: map[string]weightedtarget.Target{ + assertString(internal.LocalityID{Zone: "test-zone-1"}.ToString): { + Weight: 20, + ChildPolicy: &internalserviceconfig.BalancerConfig{ + Name: roundrobin.Name, + }, + }, + assertString(internal.LocalityID{Zone: "test-zone-2"}.ToString): { + Weight: 80, + ChildPolicy: &internalserviceconfig.BalancerConfig{ + Name: roundrobin.Name, + }, + }, + }, + }, + wantAddrs: []resolver.Address{ + hierarchy.Set(resolver.Address{Addr: "addr-1-1"}, []string{"test-priority", assertString(internal.LocalityID{Zone: "test-zone-1"}.ToString)}), + hierarchy.Set(resolver.Address{Addr: "addr-1-2"}, []string{"test-priority", assertString(internal.LocalityID{Zone: "test-zone-1"}.ToString)}), + hierarchy.Set(resolver.Address{Addr: "addr-2-1"}, []string{"test-priority", assertString(internal.LocalityID{Zone: "test-zone-2"}.ToString)}), + hierarchy.Set(resolver.Address{Addr: "addr-2-2"}, []string{"test-priority", assertString(internal.LocalityID{Zone: "test-zone-2"}.ToString)}), + }, + }, + { + name: "weighted round robin as child, no LRS", + localities: []xdsclient.Locality{ + { + Endpoints: []xdsclient.Endpoint{ + {Address: "addr-1-1", HealthStatus: xdsclient.EndpointHealthStatusHealthy, Weight: 90}, + {Address: "addr-1-2", HealthStatus: xdsclient.EndpointHealthStatusHealthy, Weight: 10}, + }, + ID: internal.LocalityID{Zone: "test-zone-1"}, + Weight: 20, + }, + { + Endpoints: []xdsclient.Endpoint{ + {Address: "addr-2-1", HealthStatus: xdsclient.EndpointHealthStatusHealthy, Weight: 90}, + {Address: "addr-2-2", HealthStatus: xdsclient.EndpointHealthStatusHealthy, Weight: 10}, + }, + ID: internal.LocalityID{Zone: "test-zone-2"}, + Weight: 80, + }, + }, + priorityName: "test-priority", + childPolicy: &internalserviceconfig.BalancerConfig{Name: weightedroundrobin.Name}, + // lrsServer is nil, so LRS policy will not be used. + wantConfig: &weightedtarget.LBConfig{ + Targets: map[string]weightedtarget.Target{ + assertString(internal.LocalityID{Zone: "test-zone-1"}.ToString): { + Weight: 20, + ChildPolicy: &internalserviceconfig.BalancerConfig{ + Name: weightedroundrobin.Name, + }, + }, + assertString(internal.LocalityID{Zone: "test-zone-2"}.ToString): { + Weight: 80, + ChildPolicy: &internalserviceconfig.BalancerConfig{ + Name: weightedroundrobin.Name, + }, + }, + }, + }, + wantAddrs: []resolver.Address{ + hierarchy.Set( + weightedroundrobin.SetAddrInfo(resolver.Address{Addr: "addr-1-1"}, weightedroundrobin.AddrInfo{Weight: 90}), + []string{"test-priority", assertString(internal.LocalityID{Zone: "test-zone-1"}.ToString)}), + hierarchy.Set( + weightedroundrobin.SetAddrInfo(resolver.Address{Addr: "addr-1-2"}, weightedroundrobin.AddrInfo{Weight: 10}), + []string{"test-priority", assertString(internal.LocalityID{Zone: "test-zone-1"}.ToString)}), + hierarchy.Set( + weightedroundrobin.SetAddrInfo(resolver.Address{Addr: "addr-2-1"}, weightedroundrobin.AddrInfo{Weight: 90}), + []string{"test-priority", assertString(internal.LocalityID{Zone: "test-zone-2"}.ToString)}), + hierarchy.Set( + weightedroundrobin.SetAddrInfo(resolver.Address{Addr: "addr-2-2"}, weightedroundrobin.AddrInfo{Weight: 10}), + []string{"test-priority", assertString(internal.LocalityID{Zone: "test-zone-2"}.ToString)}), + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, got1 := localitiesToWeightedTarget(tt.localities, tt.priorityName, tt.childPolicy, tt.lrsServer, tt.cluster, tt.edsService) + if diff := cmp.Diff(got, tt.wantConfig); diff != "" { + t.Errorf("localitiesToWeightedTarget() diff (-got +want) %v", diff) + } + if diff := cmp.Diff(got1, tt.wantAddrs, cmp.AllowUnexported(attributes.Attributes{})); diff != "" { + t.Errorf("localitiesToWeightedTarget() diff (-got +want) %v", diff) + } + }) + } +} + +func newString(s string) *string { + return &s +} + +func newUint32(i uint32) *uint32 { + return &i +} + +func assertString(f func() (string, error)) string { + s, err := f() + if err != nil { + panic(err.Error()) + } + return s +} diff --git a/xds/internal/balancer/clusterresolver/balancerconfig/type.go b/xds/internal/balancer/clusterresolver/balancerconfig/type.go new file mode 100644 index 000000000000..eb149cd384dd --- /dev/null +++ b/xds/internal/balancer/clusterresolver/balancerconfig/type.go @@ -0,0 +1,95 @@ +/* + * + * Copyright 2021 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package balancerconfig + +import ( + "bytes" + "encoding/json" + "fmt" +) + +// DiscoveryMechanismType is the type of discovery mechanism. +type DiscoveryMechanismType int + +const ( + // DiscoveryMechanismTypeEDS is eds. + DiscoveryMechanismTypeEDS DiscoveryMechanismType = iota // `json:EDS` + // DiscoveryMechanismTypeLogicalDNS is DNS. + DiscoveryMechanismTypeLogicalDNS // `json:LOGICAL_DNS` +) + +// MarshalJSON marshals a DiscoveryMechanismType to a quoted json string. +// +// This is necessary to handle enum (as strings) from JSON. +func (t *DiscoveryMechanismType) MarshalJSON() ([]byte, error) { + buffer := bytes.NewBufferString(`"`) + switch *t { + case DiscoveryMechanismTypeEDS: + buffer.WriteString("EDS") + case DiscoveryMechanismTypeLogicalDNS: + buffer.WriteString("LOGICAL_DNS") + } + buffer.WriteString(`"`) + return buffer.Bytes(), nil +} + +// UnmarshalJSON unmarshals a quoted json string to the DiscoveryMechanismType. +func (t *DiscoveryMechanismType) UnmarshalJSON(b []byte) error { + var s string + err := json.Unmarshal(b, &s) + if err != nil { + return err + } + switch s { + case "EDS": + *t = DiscoveryMechanismTypeEDS + case "LOGICAL_DNS": + *t = DiscoveryMechanismTypeLogicalDNS + default: + return fmt.Errorf("unable to unmarshal string %q to type DiscoveryMechanismType", s) + } + return nil +} + +// DiscoveryMechanism is the discovery mechanism, can be either EDS or DNS. +// +// For DNS, the ClientConn target will be used for name resolution. +// +// For EDS, if EDSServiceName is not empty, it will be used for watching. If +// EDSServiceName is empty, Cluster will be used. +type DiscoveryMechanism struct { + // Cluster is the cluster name. + Cluster string `json:"cluster,omitempty"` + // LoadReportingServerName is the LRS server to send load reports to. If + // not present, load reporting will be disabled. If set to the empty string, + // load reporting will be sent to the same server that we obtained CDS data + // from. + LoadReportingServerName *string `json:"lrsLoadReportingServerName,omitempty"` + // MaxConcurrentRequests is the maximum number of outstanding requests can + // be made to the upstream cluster. Default is 1024. + MaxConcurrentRequests *uint32 `json:"maxConcurrentRequests,omitempty"` + // Type is the discovery mechanism type. + Type DiscoveryMechanismType `json:"type,omitempty"` + // EDSServiceName is the EDS service name, as returned in CDS. May be unset + // if not specified in CDS. For type EDS only. + // + // This is used for EDS watch if set. If unset, Cluster is used for EDS + // watch. + EDSServiceName string `json:"edsServiceName,omitempty"` +} diff --git a/xds/internal/balancer/edsbalancer/configbuilder.go b/xds/internal/balancer/edsbalancer/configbuilder.go new file mode 100644 index 000000000000..1e08a05e2048 --- /dev/null +++ b/xds/internal/balancer/edsbalancer/configbuilder.go @@ -0,0 +1,49 @@ +/* + * + * Copyright 2021 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package edsbalancer + +import ( + internalserviceconfig "google.golang.org/grpc/internal/serviceconfig" + "google.golang.org/grpc/resolver" + "google.golang.org/grpc/xds/internal/balancer/clusterresolver/balancerconfig" + xdsclient "google.golang.org/grpc/xds/internal/client" +) + +const million = 1000000 + +func buildPriorityConfigJSON(edsResp xdsclient.EndpointsUpdate, c *EDSConfig) ([]byte, []resolver.Address, error) { + var childConfig *internalserviceconfig.BalancerConfig + if c.ChildPolicy != nil { + childConfig = &internalserviceconfig.BalancerConfig{Name: c.ChildPolicy.Name} + } + return balancerconfig.BuildPriorityConfigJSON( + []balancerconfig.PriorityConfig{ + { + Mechanism: balancerconfig.DiscoveryMechanism{ + Cluster: c.ClusterName, + LoadReportingServerName: c.LrsLoadReportingServerName, + MaxConcurrentRequests: c.MaxConcurrentRequests, + Type: balancerconfig.DiscoveryMechanismTypeEDS, + EDSServiceName: c.EDSServiceName, + }, + EDSResp: edsResp, + }, + }, childConfig, + ) +} diff --git a/xds/internal/balancer/edsbalancer/configbuilder_test.go b/xds/internal/balancer/edsbalancer/configbuilder_test.go new file mode 100644 index 000000000000..9425b2363a52 --- /dev/null +++ b/xds/internal/balancer/edsbalancer/configbuilder_test.go @@ -0,0 +1,126 @@ +/* + * + * Copyright 2021 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package edsbalancer + +import ( + "fmt" + "testing" + + "google.golang.org/grpc/balancer" + "google.golang.org/grpc/balancer/roundrobin" + "google.golang.org/grpc/xds/internal" + "google.golang.org/grpc/xds/internal/balancer/priority" + xdsclient "google.golang.org/grpc/xds/internal/client" + + _ "google.golang.org/grpc/xds/internal/balancer/clustermanager" // Register the xds_cluster_manager balancer + _ "google.golang.org/grpc/xds/internal/balancer/weightedtarget" // Register the weighted_target balancer +) + +const ( + localityCount = 4 + addressPerLocality = 2 +) + +var ( + testLocalityIDs []internal.LocalityID + testEndpoints [][]xdsclient.Endpoint +) + +func init() { + for i := 0; i < localityCount; i++ { + testLocalityIDs = append(testLocalityIDs, internal.LocalityID{Zone: fmt.Sprintf("test-zone-%d", i)}) + var ends []xdsclient.Endpoint + for j := 0; j < addressPerLocality; j++ { + addr := fmt.Sprintf("addr-%d-%d", i, j) + ends = append(ends, xdsclient.Endpoint{ + Address: addr, + HealthStatus: xdsclient.EndpointHealthStatusHealthy, + }) + } + testEndpoints = append(testEndpoints, ends) + } +} + +// TestBuildPriorityConfigJSON is a sanity check that the generated config bytes +// are valid (can be parsed back to a config struct). +// +// The correctness is covered by the unmarshalled version +// TestBuildPriorityConfig. +func TestBuildPriorityConfigJSON(t *testing.T) { + const ( + testClusterName = "cluster-name-for-watch" + testEDSServiceName = "service-name-from-parent" + testLRSServer = "lrs-addr-from-config" + testMaxReq = 314 + testDropCategory = "test-drops" + testDropOverMillion = 1 + ) + for _, lrsServer := range []*string{newString(testLRSServer), newString(""), nil} { + got, _, err := buildPriorityConfigJSON(xdsclient.EndpointsUpdate{ + Drops: []xdsclient.OverloadDropConfig{{ + Category: testDropCategory, + Numerator: testDropOverMillion, + Denominator: million, + }}, + Localities: []xdsclient.Locality{{ + Endpoints: testEndpoints[3], + ID: testLocalityIDs[3], + Weight: 80, + Priority: 1, + }, { + Endpoints: testEndpoints[1], + ID: testLocalityIDs[1], + Weight: 80, + Priority: 0, + }, { + Endpoints: testEndpoints[2], + ID: testLocalityIDs[2], + Weight: 20, + Priority: 1, + }, { + Endpoints: testEndpoints[0], + ID: testLocalityIDs[0], + Weight: 20, + Priority: 0, + }}}, + &EDSConfig{ + ChildPolicy: &loadBalancingConfig{Name: roundrobin.Name}, + ClusterName: testClusterName, + EDSServiceName: testEDSServiceName, + MaxConcurrentRequests: newUint32(testMaxReq), + LrsLoadReportingServerName: lrsServer, + }, + ) + if err != nil { + t.Fatalf("buildPriorityConfigJSON(...) failed: %v", err) + } + priorityB := balancer.Get(priority.Name) + if _, err = priorityB.(balancer.ConfigParser).ParseConfig(got); err != nil { + t.Fatalf("ParseConfig(%+v) failed: %v", got, err) + } + } +} + +func newString(s string) *string { + return &s +} + +func newUint32(i uint32) *uint32 { + return &i +}