Skip to content

Commit

Permalink
xds: clusterresolver e2e test cleanup (#6391)
Browse files Browse the repository at this point in the history
  • Loading branch information
easwars authored Jun 23, 2023
1 parent 10f5b50 commit dd931c8
Show file tree
Hide file tree
Showing 4 changed files with 137 additions and 95 deletions.
31 changes: 25 additions & 6 deletions internal/testutils/xds/e2e/clientresources.go
Original file line number Diff line number Diff line change
Expand Up @@ -526,10 +526,23 @@ func ClusterResourceWithOptions(opts ClusterOptions) *v3clusterpb.Cluster {

// LocalityOptions contains options to configure a Locality.
type LocalityOptions struct {
// Ports is a set of ports on "localhost" belonging to this locality.
Ports []uint32
// Name is the unique locality name.
Name string
// Weight is the weight of the locality, used for load balancing.
Weight uint32
// Backends is a set of backends belonging to this locality.
Backends []BackendOptions
}

// BackendOptions contains options to configure individual backends in a
// locality.
type BackendOptions struct {
// Port number on which the backend is accepting connections. All backends
// are expected to run on localhost, hence host name is not stored here.
Port uint32
// Health status of the backend. Default is UNKNOWN which is treated the
// same as HEALTHY.
HealthStatus v3corepb.HealthStatus
}

// EndpointOptions contains options to configure an Endpoint (or
Expand All @@ -550,13 +563,17 @@ type EndpointOptions struct {

// DefaultEndpoint returns a basic xds Endpoint resource.
func DefaultEndpoint(clusterName string, host string, ports []uint32) *v3endpointpb.ClusterLoadAssignment {
var bOpts []BackendOptions
for _, p := range ports {
bOpts = append(bOpts, BackendOptions{Port: p})
}
return EndpointResourceWithOptions(EndpointOptions{
ClusterName: clusterName,
Host: host,
Localities: []LocalityOptions{
{
Ports: ports,
Weight: 1,
Backends: bOpts,
Weight: 1,
},
},
})
Expand All @@ -568,16 +585,18 @@ func EndpointResourceWithOptions(opts EndpointOptions) *v3endpointpb.ClusterLoad
var endpoints []*v3endpointpb.LocalityLbEndpoints
for i, locality := range opts.Localities {
var lbEndpoints []*v3endpointpb.LbEndpoint
for _, port := range locality.Ports {
for _, b := range locality.Backends {
lbEndpoints = append(lbEndpoints, &v3endpointpb.LbEndpoint{
HostIdentifier: &v3endpointpb.LbEndpoint_Endpoint{Endpoint: &v3endpointpb.Endpoint{
Address: &v3corepb.Address{Address: &v3corepb.Address_SocketAddress{
SocketAddress: &v3corepb.SocketAddress{
Protocol: v3corepb.SocketAddress_TCP,
Address: opts.Host,
PortSpecifier: &v3corepb.SocketAddress_PortValue{PortValue: port}},
PortSpecifier: &v3corepb.SocketAddress_PortValue{PortValue: b.Port},
},
}},
}},
HealthStatus: b.HealthStatus,
LoadBalancingWeight: &wrapperspb.UInt32Value{Value: 1},
})
}
Expand Down
8 changes: 4 additions & 4 deletions test/xds/xds_client_custom_lb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,12 +213,12 @@ func (s) TestWrrLocality(t *testing.T) {
Host: "localhost",
Localities: []e2e.LocalityOptions{
{
Ports: []uint32{port1, port2},
Weight: 1,
Backends: []e2e.BackendOptions{{Port: port1}, {Port: port2}},
Weight: 1,
},
{
Ports: []uint32{port3, port4, port5},
Weight: 2,
Backends: []e2e.BackendOptions{{Port: port3}, {Port: port4}, {Port: port5}},
Weight: 2,
},
},
})},
Expand Down
4 changes: 2 additions & 2 deletions xds/internal/balancer/clusterimpl/tests/balancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,8 @@ func (s) TestConfigUpdateWithSameLoadReportingServerConfig(t *testing.T) {
Host: "localhost",
Localities: []e2e.LocalityOptions{
{
Ports: []uint32{testutils.ParsePort(t, server.Address)},
Weight: 1,
Backends: []e2e.BackendOptions{{Port: testutils.ParsePort(t, server.Address)}},
Weight: 1,
},
},
DropPercents: map[string]int{"test-drop-everything": 100},
Expand Down
189 changes: 106 additions & 83 deletions xds/internal/balancer/clusterresolver/e2e_test/eds_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,57 +101,16 @@ func startTestServiceBackends(t *testing.T, numBackends int) ([]*stubserver.Stub
}
}

// endpointResource returns an EDS resource for the given cluster name and
// localities. Backends within a locality are all assumed to be on the same
// machine (localhost).
func endpointResource(clusterName string, localities []localityInfo) *v3endpointpb.ClusterLoadAssignment {
var localityEndpoints []*v3endpointpb.LocalityLbEndpoints
for _, locality := range localities {
var endpoints []*v3endpointpb.LbEndpoint
for i, port := range locality.ports {
endpoint := &v3endpointpb.LbEndpoint{
HostIdentifier: &v3endpointpb.LbEndpoint_Endpoint{
Endpoint: &v3endpointpb.Endpoint{
Address: &v3corepb.Address{Address: &v3corepb.Address_SocketAddress{
SocketAddress: &v3corepb.SocketAddress{
Protocol: v3corepb.SocketAddress_TCP,
Address: "localhost",
PortSpecifier: &v3corepb.SocketAddress_PortValue{PortValue: port}},
},
},
},
},
}
if i < len(locality.healthStatus) {
endpoint.HealthStatus = locality.healthStatus[i]
}
endpoints = append(endpoints, endpoint)
}
localityEndpoints = append(localityEndpoints, &v3endpointpb.LocalityLbEndpoints{
Locality: &v3corepb.Locality{SubZone: locality.name},
LbEndpoints: endpoints,
LoadBalancingWeight: wrapperspb.UInt32(locality.weight),
})
}
return &v3endpointpb.ClusterLoadAssignment{
ClusterName: clusterName,
Endpoints: localityEndpoints,
}
}

type localityInfo struct {
name string
weight uint32
ports []uint32
healthStatus []v3corepb.HealthStatus
}

// clientEndpointsResource returns an EDS resource for the specified nodeID,
// service name and localities.
func clientEndpointsResource(nodeID, edsServiceName string, localities []localityInfo) e2e.UpdateOptions {
func clientEndpointsResource(nodeID, edsServiceName string, localities []e2e.LocalityOptions) e2e.UpdateOptions {
return e2e.UpdateOptions{
NodeID: nodeID,
Endpoints: []*v3endpointpb.ClusterLoadAssignment{endpointResource(edsServiceName, localities)},
NodeID: nodeID,
Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.EndpointResourceWithOptions(e2e.EndpointOptions{
ClusterName: edsServiceName,
Host: "localhost",
Localities: localities,
})},
SkipValidation: true,
}
}
Expand All @@ -175,7 +134,11 @@ func (s) TestEDS_OneLocality(t *testing.T) {

// Create xDS resources for consumption by the test. We start off with a
// single backend in a single EDS locality.
resources := clientEndpointsResource(nodeID, edsServiceName, []localityInfo{{name: localityName1, weight: 1, ports: ports[:1]}})
resources := clientEndpointsResource(nodeID, edsServiceName, []e2e.LocalityOptions{{
Name: localityName1,
Weight: 1,
Backends: []e2e.BackendOptions{{Port: ports[0]}},
}})
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if err := managementServer.Update(ctx, resources); err != nil {
Expand Down Expand Up @@ -223,7 +186,11 @@ func (s) TestEDS_OneLocality(t *testing.T) {

// Add a backend to the same locality, and ensure RPCs are sent in a
// roundrobin fashion across the two backends.
resources = clientEndpointsResource(nodeID, edsServiceName, []localityInfo{{name: localityName1, weight: 1, ports: ports[:2]}})
resources = clientEndpointsResource(nodeID, edsServiceName, []e2e.LocalityOptions{{
Name: localityName1,
Weight: 1,
Backends: []e2e.BackendOptions{{Port: ports[0]}, {Port: ports[1]}},
}})
if err := managementServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}
Expand All @@ -233,7 +200,11 @@ func (s) TestEDS_OneLocality(t *testing.T) {

// Remove the first backend, and ensure all RPCs are sent to the second
// backend.
resources = clientEndpointsResource(nodeID, edsServiceName, []localityInfo{{name: localityName1, weight: 1, ports: ports[1:2]}})
resources = clientEndpointsResource(nodeID, edsServiceName, []e2e.LocalityOptions{{
Name: localityName1,
Weight: 1,
Backends: []e2e.BackendOptions{{Port: ports[1]}},
}})
if err := managementServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}
Expand All @@ -242,7 +213,11 @@ func (s) TestEDS_OneLocality(t *testing.T) {
}

// Replace the backend, and ensure all RPCs are sent to the new backend.
resources = clientEndpointsResource(nodeID, edsServiceName, []localityInfo{{name: localityName1, weight: 1, ports: ports[2:3]}})
resources = clientEndpointsResource(nodeID, edsServiceName, []e2e.LocalityOptions{{
Name: localityName1,
Weight: 1,
Backends: []e2e.BackendOptions{{Port: ports[2]}},
}})
if err := managementServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -281,9 +256,17 @@ func (s) TestEDS_MultipleLocalities(t *testing.T) {

// Create xDS resources for consumption by the test. We start off with two
// localities, and single backend in each of them.
resources := clientEndpointsResource(nodeID, edsServiceName, []localityInfo{
{name: localityName1, weight: 1, ports: ports[:1]},
{name: localityName2, weight: 1, ports: ports[1:2]},
resources := clientEndpointsResource(nodeID, edsServiceName, []e2e.LocalityOptions{
{
Name: localityName1,
Weight: 1,
Backends: []e2e.BackendOptions{{Port: ports[0]}},
},
{
Name: localityName2,
Weight: 1,
Backends: []e2e.BackendOptions{{Port: ports[1]}},
},
})
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
Expand Down Expand Up @@ -332,10 +315,22 @@ func (s) TestEDS_MultipleLocalities(t *testing.T) {

// Add another locality with a single backend, and ensure RPCs are being
// weighted roundrobined across the three backends.
resources = clientEndpointsResource(nodeID, edsServiceName, []localityInfo{
{name: localityName1, weight: 1, ports: ports[:1]},
{name: localityName2, weight: 1, ports: ports[1:2]},
{name: localityName3, weight: 1, ports: ports[2:3]},
resources = clientEndpointsResource(nodeID, edsServiceName, []e2e.LocalityOptions{
{
Name: localityName1,
Weight: 1,
Backends: []e2e.BackendOptions{{Port: ports[0]}},
},
{
Name: localityName2,
Weight: 1,
Backends: []e2e.BackendOptions{{Port: ports[1]}},
},
{
Name: localityName3,
Weight: 1,
Backends: []e2e.BackendOptions{{Port: ports[2]}},
},
})
if err := managementServer.Update(ctx, resources); err != nil {
t.Fatal(err)
Expand All @@ -346,9 +341,17 @@ func (s) TestEDS_MultipleLocalities(t *testing.T) {

// Remove the first locality, and ensure RPCs are being weighted
// roundrobined across the remaining two backends.
resources = clientEndpointsResource(nodeID, edsServiceName, []localityInfo{
{name: localityName2, weight: 1, ports: ports[1:2]},
{name: localityName3, weight: 1, ports: ports[2:3]},
resources = clientEndpointsResource(nodeID, edsServiceName, []e2e.LocalityOptions{
{
Name: localityName2,
Weight: 1,
Backends: []e2e.BackendOptions{{Port: ports[1]}},
},
{
Name: localityName3,
Weight: 1,
Backends: []e2e.BackendOptions{{Port: ports[2]}},
},
})
if err := managementServer.Update(ctx, resources); err != nil {
t.Fatal(err)
Expand All @@ -360,9 +363,17 @@ func (s) TestEDS_MultipleLocalities(t *testing.T) {
// Add a backend to one locality, and ensure weighted roundrobin. Since RPCs
// are roundrobined across localities, locality2's backend will receive
// twice the traffic.
resources = clientEndpointsResource(nodeID, edsServiceName, []localityInfo{
{name: localityName2, weight: 1, ports: ports[1:2]},
{name: localityName3, weight: 1, ports: ports[2:4]},
resources = clientEndpointsResource(nodeID, edsServiceName, []e2e.LocalityOptions{
{
Name: localityName2,
Weight: 1,
Backends: []e2e.BackendOptions{{Port: ports[1]}},
},
{
Name: localityName3,
Weight: 1,
Backends: []e2e.BackendOptions{{Port: ports[2]}, {Port: ports[3]}},
},
})
if err := managementServer.Update(ctx, resources); err != nil {
t.Fatal(err)
Expand All @@ -389,23 +400,31 @@ func (s) TestEDS_EndpointsHealth(t *testing.T) {
// Create xDS resources for consumption by the test. Two localities with
// six backends each, with two of the six backends being healthy. Both
// UNKNOWN and HEALTHY are considered by gRPC for load balancing.
resources := clientEndpointsResource(nodeID, edsServiceName, []localityInfo{
{name: localityName1, weight: 1, ports: ports[:6], healthStatus: []v3corepb.HealthStatus{
v3corepb.HealthStatus_UNKNOWN,
v3corepb.HealthStatus_HEALTHY,
v3corepb.HealthStatus_UNHEALTHY,
v3corepb.HealthStatus_DRAINING,
v3corepb.HealthStatus_TIMEOUT,
v3corepb.HealthStatus_DEGRADED,
}},
{name: localityName2, weight: 1, ports: ports[6:12], healthStatus: []v3corepb.HealthStatus{
v3corepb.HealthStatus_UNKNOWN,
v3corepb.HealthStatus_HEALTHY,
v3corepb.HealthStatus_UNHEALTHY,
v3corepb.HealthStatus_DRAINING,
v3corepb.HealthStatus_TIMEOUT,
v3corepb.HealthStatus_DEGRADED,
}},
resources := clientEndpointsResource(nodeID, edsServiceName, []e2e.LocalityOptions{
{
Name: localityName1,
Weight: 1,
Backends: []e2e.BackendOptions{
{Port: ports[0], HealthStatus: v3corepb.HealthStatus_UNKNOWN},
{Port: ports[1], HealthStatus: v3corepb.HealthStatus_HEALTHY},
{Port: ports[2], HealthStatus: v3corepb.HealthStatus_UNHEALTHY},
{Port: ports[3], HealthStatus: v3corepb.HealthStatus_DRAINING},
{Port: ports[4], HealthStatus: v3corepb.HealthStatus_TIMEOUT},
{Port: ports[5], HealthStatus: v3corepb.HealthStatus_DEGRADED},
},
},
{
Name: localityName2,
Weight: 1,
Backends: []e2e.BackendOptions{
{Port: ports[6], HealthStatus: v3corepb.HealthStatus_UNKNOWN},
{Port: ports[7], HealthStatus: v3corepb.HealthStatus_HEALTHY},
{Port: ports[8], HealthStatus: v3corepb.HealthStatus_UNHEALTHY},
{Port: ports[9], HealthStatus: v3corepb.HealthStatus_DRAINING},
{Port: ports[10], HealthStatus: v3corepb.HealthStatus_TIMEOUT},
{Port: ports[11], HealthStatus: v3corepb.HealthStatus_DEGRADED},
},
},
})
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
Expand Down Expand Up @@ -520,7 +539,11 @@ func (s) TestEDS_EmptyUpdate(t *testing.T) {
}

// Add a locality with one backend and ensure RPCs are successful.
resources = clientEndpointsResource(nodeID, edsServiceName, []localityInfo{{name: localityName1, weight: 1, ports: ports[:1]}})
resources = clientEndpointsResource(nodeID, edsServiceName, []e2e.LocalityOptions{{
Name: localityName1,
Weight: 1,
Backends: []e2e.BackendOptions{{Port: ports[0]}},
}})
if err := managementServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}
Expand Down

0 comments on commit dd931c8

Please sign in to comment.