diff --git a/test/extensions/clusters/aggregate/cluster_integration_test.cc b/test/extensions/clusters/aggregate/cluster_integration_test.cc index 1a76054f9aee4..99ab092fe636b 100644 --- a/test/extensions/clusters/aggregate/cluster_integration_test.cc +++ b/test/extensions/clusters/aggregate/cluster_integration_test.cc @@ -1,8 +1,11 @@ #include "envoy/config/cluster/v3/cluster.pb.h" +#include "envoy/extensions/clusters/aggregate/v3/cluster.pb.h" +#include "envoy/extensions/upstreams/http/v3/http_protocol_options.pb.h" #include "envoy/grpc/status.h" #include "envoy/stats/scope.h" #include "source/common/config/protobuf_link_hacks.h" +#include "source/common/network/socket_option_factory.h" #include "source/common/protobuf/protobuf.h" #include "source/common/protobuf/utility.h" @@ -28,6 +31,13 @@ const char SecondClusterName[] = "cluster_2"; // Index in fake_upstreams_ const int FirstUpstreamIndex = 2; const int SecondUpstreamIndex = 3; +struct CircuitBreakerLimits { + uint32_t max_connections = 1024; + uint32_t max_requests = 1024; + uint32_t max_pending_requests = 1024; + uint32_t max_retries = 3; + uint32_t max_connection_pools = std::numeric_limits::max(); +}; const std::string& config() { CONSTRUCT_ON_FIRST_USE(std::string, fmt::format(R"EOF( @@ -178,6 +188,50 @@ class AggregateIntegrationTest xds_stream_->startGrpcStream(); } + void setCircuitBreakerLimits(envoy::config::cluster::v3::Cluster& cluster, + const CircuitBreakerLimits& limits) { + auto* cluster_circuit_breakers = cluster.mutable_circuit_breakers(); + + auto* cluster_circuit_breakers_threshold_default = cluster_circuit_breakers->add_thresholds(); + cluster_circuit_breakers_threshold_default->set_priority( + envoy::config::core::v3::RoutingPriority::DEFAULT); + + cluster_circuit_breakers_threshold_default->mutable_max_connections()->set_value( + limits.max_connections); + cluster_circuit_breakers_threshold_default->mutable_max_pending_requests()->set_value( + limits.max_pending_requests); + cluster_circuit_breakers_threshold_default->mutable_max_requests()->set_value( + limits.max_requests); + cluster_circuit_breakers_threshold_default->mutable_max_retries()->set_value( + limits.max_retries); + cluster_circuit_breakers_threshold_default->mutable_max_connection_pools()->set_value( + limits.max_connection_pools); + cluster_circuit_breakers_threshold_default->set_track_remaining(true); + } + + void setMaxConcurrentStreams(envoy::config::cluster::v3::Cluster& cluster, + uint32_t max_concurrent_streams) { + envoy::extensions::upstreams::http::v3::HttpProtocolOptions http_protocol_options; + http_protocol_options.mutable_explicit_http_config() + ->mutable_http2_protocol_options() + ->mutable_max_concurrent_streams() + ->set_value(max_concurrent_streams); + (*cluster.mutable_typed_extension_protocol_options()) + ["envoy.extensions.upstreams.http.v3.HttpProtocolOptions"] + .PackFrom(http_protocol_options); + } + + void + reduceAggregateClustersListToOneCluster(envoy::config::cluster::v3::Cluster& aggregate_cluster) { + auto* aggregate_cluster_type = aggregate_cluster.mutable_cluster_type(); + auto* aggregate_cluster_typed_config = aggregate_cluster_type->mutable_typed_config(); + envoy::extensions::clusters::aggregate::v3::ClusterConfig new_aggregate_cluster_typed_config; + aggregate_cluster_typed_config->UnpackTo(&new_aggregate_cluster_typed_config); + new_aggregate_cluster_typed_config.clear_clusters(); + new_aggregate_cluster_typed_config.add_clusters("cluster_1"); + aggregate_cluster_typed_config->PackFrom(new_aggregate_cluster_typed_config); + } + const bool deferred_cluster_creation_; envoy::config::cluster::v3::Cluster cluster1_; envoy::config::cluster::v3::Cluster cluster2_; @@ -303,5 +357,565 @@ TEST_P(AggregateIntegrationTest, PreviousPrioritiesRetryPredicate) { cleanupUpstreamAndDownstream(); } +// Tests the max_connections circuit breaker behaviour on the aggregate cluster and its underlying +// cluster1. When connections reach the configured limit, only the underlying cluster1's circuit +// breaker opens, the aggregate cluster's circuit breaker is completely unaffected. When cluster1's +// circuit breaker opens, new connections are rejected, and the overflow counter increases. After +// closing the connection, cluster1's circuit breaker closes again. +TEST_P(AggregateIntegrationTest, CircuitBreakerTestMaxConnections) { + setDownstreamProtocol(Http::CodecType::HTTP2); + + config_helper_.addConfigModifier([this](envoy::config::bootstrap::v3::Bootstrap& bootstrap) { + auto* static_resources = bootstrap.mutable_static_resources(); + auto* aggregate_cluster = static_resources->mutable_clusters(1); + + reduceAggregateClustersListToOneCluster(*aggregate_cluster); + setCircuitBreakerLimits(*aggregate_cluster, CircuitBreakerLimits{.max_connections = 1}); + setMaxConcurrentStreams(*aggregate_cluster, 1); + }); + + initialize(); + + setCircuitBreakerLimits(cluster1_, CircuitBreakerLimits{.max_connections = 1}); + setMaxConcurrentStreams(cluster1_, 1); + + EXPECT_TRUE(compareDiscoveryRequest(Config::TypeUrl::get().Cluster, "55", {}, {}, {})); + sendDiscoveryResponse(Config::TypeUrl::get().Cluster, + {cluster1_}, {cluster1_}, {}, "56"); + test_server_->waitForGaugeEq("cluster_manager.active_clusters", 3); + + // initial circuit breaker states: + // the aggregate cluster circuit breaker is closed + test_server_->waitForGaugeEq("cluster.aggregate_cluster.circuit_breakers.default.cx_open", 0); + test_server_->waitForGaugeEq("cluster.aggregate_cluster.circuit_breakers.default.remaining_cx", + 1); + test_server_->waitForCounterEq("cluster.aggregate_cluster.upstream_cx_overflow", 0); + // the cluster1 circuit breaker is closed + test_server_->waitForGaugeEq("cluster.cluster_1.circuit_breakers.default.cx_open", 0); + test_server_->waitForGaugeEq("cluster.cluster_1.circuit_breakers.default.remaining_cx", 1); + test_server_->waitForCounterEq("cluster.cluster_1.upstream_cx_overflow", 0); + + codec_client_ = makeHttpConnection(lookupPort("http")); + + // send a first request to the aggregate cluster + auto aggregate_cluster_response1 = codec_client_->makeHeaderOnlyRequest( + Http::TestRequestHeaderMapImpl{{":method", "GET"}, + {":path", "/aggregatecluster"}, + {":scheme", "http"}, + {":authority", "host"}}); + + // wait for the first request to arrive at cluster1 + waitForNextUpstreamRequest(FirstUpstreamIndex); + + // after the first request arrives at cluster1 (there is now a single active upstream connection) + // the aggregate cluster circuit breaker remains closed + test_server_->waitForGaugeEq("cluster.aggregate_cluster.circuit_breakers.default.cx_open", 0); + test_server_->waitForGaugeEq("cluster.aggregate_cluster.circuit_breakers.default.remaining_cx", + 1); + test_server_->waitForCounterEq("cluster.aggregate_cluster.upstream_cx_overflow", 0); + // the cluster1 circuit breaker opens + test_server_->waitForGaugeEq("cluster.cluster_1.circuit_breakers.default.cx_open", 1); + test_server_->waitForGaugeEq("cluster.cluster_1.circuit_breakers.default.remaining_cx", 0); + test_server_->waitForCounterEq("cluster.cluster_1.upstream_cx_overflow", 0); + + // send a second request to the aggregate cluster + auto aggregate_cluster_response2 = codec_client_->makeHeaderOnlyRequest( + Http::TestRequestHeaderMapImpl{{":method", "GET"}, + {":path", "/aggregatecluster"}, + {":scheme", "http"}, + {":authority", "host"}}); + + // the second request is rejected because the cluster1 circuit breaker is open + // the aggregate cluster circuit breaker remains closed + test_server_->waitForGaugeEq("cluster.aggregate_cluster.circuit_breakers.default.cx_open", 0); + test_server_->waitForGaugeEq("cluster.aggregate_cluster.circuit_breakers.default.remaining_cx", + 1); + test_server_->waitForCounterEq("cluster.aggregate_cluster.upstream_cx_overflow", 0); + // the cluster1 circuit breaker remains open and overflows + test_server_->waitForGaugeEq("cluster.cluster_1.circuit_breakers.default.cx_open", 1); + test_server_->waitForGaugeEq("cluster.cluster_1.circuit_breakers.default.remaining_cx", 0); + test_server_->waitForCounterEq("cluster.cluster_1.upstream_cx_overflow", 1); + + // the requests to the aggregate cluster route affect the cluster1 circuit breaker state + // send a third request directly to cluster1 to confirm this + auto cluster1_response1 = codec_client_->makeHeaderOnlyRequest(Http::TestRequestHeaderMapImpl{ + {":method", "GET"}, {":path", "/cluster1"}, {":scheme", "http"}, {":authority", "host"}}); + + // the third request is rejected because the cluster1 circuit breaker is already open + // the aggregate cluster circuit breaker remains closed + test_server_->waitForGaugeEq("cluster.aggregate_cluster.circuit_breakers.default.cx_open", 0); + test_server_->waitForGaugeEq("cluster.aggregate_cluster.circuit_breakers.default.remaining_cx", + 1); + test_server_->waitForCounterEq("cluster.aggregate_cluster.upstream_cx_overflow", 0); + // the cluster1 circuit breaker remains open and overflows again + test_server_->waitForGaugeEq("cluster.cluster_1.circuit_breakers.default.cx_open", 1); + test_server_->waitForGaugeEq("cluster.cluster_1.circuit_breakers.default.remaining_cx", 0); + test_server_->waitForCounterEq("cluster.cluster_1.upstream_cx_overflow", 2); + + // respond to the first request to the aggregate cluster + upstream_request_->encodeHeaders(default_response_headers_, true); + ASSERT_TRUE(upstream_request_->waitForEndStream(*dispatcher_)); + + // the first request completes successfully + ASSERT_TRUE(aggregate_cluster_response1->waitForEndStream()); + EXPECT_EQ("200", aggregate_cluster_response1->headers().getStatusValue()); + + ASSERT_TRUE(fake_upstream_connection_->close()); + ASSERT_TRUE(fake_upstream_connection_->waitForDisconnect()); + + // after completing the first request and closing the connection + // the aggregate cluster circuit breaker remains closed + test_server_->waitForGaugeEq("cluster.aggregate_cluster.circuit_breakers.default.cx_open", 0); + test_server_->waitForGaugeEq("cluster.aggregate_cluster.circuit_breakers.default.remaining_cx", + 1); + test_server_->waitForCounterEq("cluster.aggregate_cluster.upstream_cx_overflow", 0); + // the cluster1 circuit breaker closes + test_server_->waitForGaugeEq("cluster.cluster_1.circuit_breakers.default.cx_open", 0); + test_server_->waitForGaugeEq("cluster.cluster_1.circuit_breakers.default.remaining_cx", 1); + // the overflow may be greater than 2 because after completing the first request + // the queued pending requests will attempt to reuse the connection + test_server_->waitForCounterGe("cluster.cluster_1.upstream_cx_overflow", 2); + + cleanupUpstreamAndDownstream(); +} + +// Tests the max_requests circuit breaker behaviour on the aggregate cluster and its underlying +// cluster1. When requests reach the configured limit, only the underlying cluster1's circuit +// breaker opens, the aggregate cluster's circuit breaker is completely unaffected. When +// cluster1's circuit breaker opens, new requests are rejected, and the overflow counter increases. +// After completing the request, cluster1's circuit breaker closes again. +TEST_P(AggregateIntegrationTest, CircuitBreakerTestMaxRequests) { + setDownstreamProtocol(Http::CodecType::HTTP2); + + config_helper_.addConfigModifier([this](envoy::config::bootstrap::v3::Bootstrap& bootstrap) { + auto* static_resources = bootstrap.mutable_static_resources(); + auto* aggregate_cluster = static_resources->mutable_clusters(1); + + reduceAggregateClustersListToOneCluster(*aggregate_cluster); + setCircuitBreakerLimits(*aggregate_cluster, CircuitBreakerLimits{.max_requests = 1}); + }); + + initialize(); + + setCircuitBreakerLimits(cluster1_, CircuitBreakerLimits{.max_requests = 1}); + + EXPECT_TRUE(compareDiscoveryRequest(Config::TypeUrl::get().Cluster, "55", {}, {}, {})); + sendDiscoveryResponse(Config::TypeUrl::get().Cluster, + {cluster1_}, {cluster1_}, {}, "56"); + test_server_->waitForGaugeEq("cluster_manager.active_clusters", 3); + + // initial circuit breaker states: + // the aggregate cluster circuit breaker is closed + test_server_->waitForGaugeEq("cluster.aggregate_cluster.circuit_breakers.default.rq_open", 0); + test_server_->waitForGaugeEq("cluster.aggregate_cluster.circuit_breakers.default.remaining_rq", + 1); + test_server_->waitForCounterEq("cluster.aggregate_cluster.upstream_rq_pending_overflow", 0); + // the cluster1 circuit breaker is closed + test_server_->waitForGaugeEq("cluster.cluster_1.circuit_breakers.default.rq_open", 0); + test_server_->waitForGaugeEq("cluster.cluster_1.circuit_breakers.default.remaining_rq", 1); + test_server_->waitForCounterEq("cluster.cluster_1.upstream_rq_pending_overflow", 0); + + codec_client_ = makeHttpConnection(lookupPort("http")); + + // send a first request to the aggregate cluster + auto aggregate_cluster_response1 = codec_client_->makeHeaderOnlyRequest( + Http::TestRequestHeaderMapImpl{{":method", "GET"}, + {":path", "/aggregatecluster"}, + {":scheme", "http"}, + {":authority", "host"}}); + + // wait for the first request to arrive at cluster1 + waitForNextUpstreamRequest(FirstUpstreamIndex); + + // after the first request arrives at cluster1 (there is now a single active upstream request) + // the aggregate cluster circuit breaker remains closed + test_server_->waitForGaugeEq("cluster.aggregate_cluster.circuit_breakers.default.rq_open", 0); + test_server_->waitForGaugeEq("cluster.aggregate_cluster.circuit_breakers.default.remaining_rq", + 1); + test_server_->waitForCounterEq("cluster.aggregate_cluster.upstream_rq_pending_overflow", 0); + // the cluster1 circuit breaker opens + test_server_->waitForGaugeEq("cluster.cluster_1.circuit_breakers.default.rq_open", 1); + test_server_->waitForGaugeEq("cluster.cluster_1.circuit_breakers.default.remaining_rq", 0); + test_server_->waitForCounterEq("cluster.cluster_1.upstream_rq_pending_overflow", 0); + + // send a second request to the aggregate cluster + auto aggregate_cluster_response2 = codec_client_->makeHeaderOnlyRequest( + Http::TestRequestHeaderMapImpl{{":method", "GET"}, + {":path", "/aggregatecluster"}, + {":scheme", "http"}, + {":authority", "host"}}); + + ASSERT_TRUE(aggregate_cluster_response2->waitForEndStream()); + // the second request to the aggregate cluster is rejected + EXPECT_EQ("503", aggregate_cluster_response2->headers().getStatusValue()); + + // the aggregate cluster circuit breaker remains closed + test_server_->waitForGaugeEq("cluster.aggregate_cluster.circuit_breakers.default.rq_open", 0); + test_server_->waitForGaugeEq("cluster.aggregate_cluster.circuit_breakers.default.remaining_rq", + 1); + test_server_->waitForCounterEq("cluster.aggregate_cluster.upstream_rq_pending_overflow", 0); + // the cluster1 circuit breaker remains open and overflows + test_server_->waitForGaugeEq("cluster.cluster_1.circuit_breakers.default.rq_open", 1); + test_server_->waitForGaugeEq("cluster.cluster_1.circuit_breakers.default.remaining_rq", 0); + test_server_->waitForCounterEq("cluster.cluster_1.upstream_rq_pending_overflow", 1); + + // the requests to the aggregate cluster route affect the cluster1 circuit breaker state + // send a third request directly to cluster1 to confirm this + auto cluster1_response1 = codec_client_->makeHeaderOnlyRequest(Http::TestRequestHeaderMapImpl{ + {":method", "GET"}, {":path", "/cluster1"}, {":scheme", "http"}, {":authority", "host"}}); + + ASSERT_TRUE(cluster1_response1->waitForEndStream()); + // the third request to cluster1 is rejected + EXPECT_EQ("503", cluster1_response1->headers().getStatusValue()); + + // the aggregate cluster circuit breaker remains closed + test_server_->waitForGaugeEq("cluster.aggregate_cluster.circuit_breakers.default.rq_open", 0); + test_server_->waitForGaugeEq("cluster.aggregate_cluster.circuit_breakers.default.remaining_rq", + 1); + test_server_->waitForCounterEq("cluster.aggregate_cluster.upstream_rq_pending_overflow", 0); + // the cluster1 circuit breaker remains open and overflows again + test_server_->waitForGaugeEq("cluster.cluster_1.circuit_breakers.default.rq_open", 1); + test_server_->waitForGaugeEq("cluster.cluster_1.circuit_breakers.default.remaining_rq", 0); + test_server_->waitForCounterEq("cluster.cluster_1.upstream_rq_pending_overflow", 2); + + // respond to the first request to the aggregate cluster + upstream_request_->encodeHeaders(default_response_headers_, true); + ASSERT_TRUE(upstream_request_->waitForEndStream(*dispatcher_)); + // the first request completes successfully + ASSERT_TRUE(aggregate_cluster_response1->waitForEndStream()); + EXPECT_EQ("200", aggregate_cluster_response1->headers().getStatusValue()); + + // the aggregate cluster circuit breaker remains closed + test_server_->waitForGaugeEq("cluster.aggregate_cluster.circuit_breakers.default.rq_open", 0); + test_server_->waitForGaugeEq("cluster.aggregate_cluster.circuit_breakers.default.remaining_rq", + 1); + test_server_->waitForCounterEq("cluster.aggregate_cluster.upstream_rq_pending_overflow", 0); + // the cluster1 circuit breaker closes + test_server_->waitForGaugeEq("cluster.cluster_1.circuit_breakers.default.rq_open", 0); + test_server_->waitForGaugeEq("cluster.cluster_1.circuit_breakers.default.remaining_rq", 1); + test_server_->waitForCounterEq("cluster.cluster_1.upstream_rq_pending_overflow", 2); + + cleanupUpstreamAndDownstream(); +} + +// Tests the max_pending_requests circuit breaker behaviour on the aggregate cluster and its +// underlying cluster1. When pending requests reach the configured limit, only the underlying +// cluster1's circuit breaker opens, the aggregate cluster's circuit breaker is completely +// unaffected. When cluster1's circuit breaker opens, additional pending requests are rejected, and +// the overflow counter increases. After processing the pending requests, cluster1's circuit breaker +// closes again. +TEST_P(AggregateIntegrationTest, CircuitBreakerTestMaxPendingRequests) { + setDownstreamProtocol(Http::CodecType::HTTP2); + + config_helper_.addConfigModifier([this](envoy::config::bootstrap::v3::Bootstrap& bootstrap) { + auto* static_resources = bootstrap.mutable_static_resources(); + auto* aggregate_cluster = static_resources->mutable_clusters(1); + + reduceAggregateClustersListToOneCluster(*aggregate_cluster); + setCircuitBreakerLimits(*aggregate_cluster, + CircuitBreakerLimits{.max_connections = 1, .max_pending_requests = 1}); + setMaxConcurrentStreams(*aggregate_cluster, 1); + }); + + initialize(); + + setCircuitBreakerLimits(cluster1_, + CircuitBreakerLimits{.max_connections = 1, .max_pending_requests = 1}); + setMaxConcurrentStreams(cluster1_, 1); + + EXPECT_TRUE(compareDiscoveryRequest(Config::TypeUrl::get().Cluster, "55", {}, {}, {})); + sendDiscoveryResponse(Config::TypeUrl::get().Cluster, + {cluster1_}, {cluster1_}, {}, "56"); + test_server_->waitForGaugeEq("cluster_manager.active_clusters", 3); + + // initial circuit breaker states: + // the aggregate cluster circuit breaker is closed + test_server_->waitForGaugeEq("cluster.aggregate_cluster.circuit_breakers.default.rq_pending_open", + 0); + test_server_->waitForGaugeEq( + "cluster.aggregate_cluster.circuit_breakers.default.remaining_pending", 1); + test_server_->waitForCounterEq("cluster.aggregate_cluster.upstream_rq_pending_overflow", 0); + // the cluster1 circuit breaker is closed + test_server_->waitForGaugeEq("cluster.cluster_1.circuit_breakers.default.rq_pending_open", 0); + test_server_->waitForGaugeEq("cluster.cluster_1.circuit_breakers.default.remaining_pending", 1); + test_server_->waitForCounterEq("cluster.cluster_1.upstream_rq_pending_overflow", 0); + + codec_client_ = makeHttpConnection(lookupPort("http")); + + // send a first request to the aggregate cluster + auto aggregate_cluster_response1 = codec_client_->makeHeaderOnlyRequest( + Http::TestRequestHeaderMapImpl{{":method", "GET"}, + {":path", "/aggregatecluster"}, + {":scheme", "http"}, + {":authority", "host"}}); + + // wait for the first request to arrive at cluster1 + waitForNextUpstreamRequest(FirstUpstreamIndex); + + // send a second request to the aggregate cluster (this is the first pending request) + auto aggregate_cluster_response2 = codec_client_->makeHeaderOnlyRequest( + Http::TestRequestHeaderMapImpl{{":method", "GET"}, + {":path", "/aggregatecluster"}, + {":scheme", "http"}, + {":authority", "host"}}); + + // the aggregate cluster circuit breaker remains closed + test_server_->waitForGaugeEq("cluster.aggregate_cluster.circuit_breakers.default.rq_pending_open", + 0); + test_server_->waitForGaugeEq( + "cluster.aggregate_cluster.circuit_breakers.default.remaining_pending", 1); + test_server_->waitForCounterEq("cluster.aggregate_cluster.upstream_rq_pending_overflow", 0); + // the cluster1 circuit breaker opens + test_server_->waitForGaugeEq("cluster.cluster_1.circuit_breakers.default.rq_pending_open", 1); + test_server_->waitForGaugeEq("cluster.cluster_1.circuit_breakers.default.remaining_pending", 0); + test_server_->waitForCounterEq("cluster.cluster_1.upstream_rq_pending_overflow", 0); + + // send a third request to the aggregate cluster (this is the second pending request) + auto aggregate_cluster_response3 = codec_client_->makeHeaderOnlyRequest( + Http::TestRequestHeaderMapImpl{{":method", "GET"}, + {":path", "/aggregatecluster"}, + {":scheme", "http"}, + {":authority", "host"}}); + + // the third request fails immediately because the cluster1 circuit breaker is open + ASSERT_TRUE(aggregate_cluster_response3->waitForEndStream()); + EXPECT_EQ("503", aggregate_cluster_response3->headers().getStatusValue()); + + // the aggregate cluster circuit breaker remains closed + test_server_->waitForGaugeEq("cluster.aggregate_cluster.circuit_breakers.default.rq_pending_open", + 0); + test_server_->waitForGaugeEq( + "cluster.aggregate_cluster.circuit_breakers.default.remaining_pending", 1); + test_server_->waitForCounterEq("cluster.aggregate_cluster.upstream_rq_pending_overflow", 0); + // the cluster1 circuit breaker remains open and overflows + test_server_->waitForGaugeEq("cluster.cluster_1.circuit_breakers.default.rq_pending_open", 1); + test_server_->waitForGaugeEq("cluster.cluster_1.circuit_breakers.default.remaining_pending", 0); + test_server_->waitForCounterEq("cluster.cluster_1.upstream_rq_pending_overflow", 1); + + // the pending requests to the aggregate cluster route affect the cluster1 circuit breaker state + // send a fourth request directly to cluster1 to confirm this + auto cluster1_response1 = codec_client_->makeHeaderOnlyRequest(Http::TestRequestHeaderMapImpl{ + {":method", "GET"}, {":path", "/cluster1"}, {":scheme", "http"}, {":authority", "host"}}); + + // the fourth request fails immediately because the cluster1 circuit breaker is open + ASSERT_TRUE(cluster1_response1->waitForEndStream()); + EXPECT_EQ("503", cluster1_response1->headers().getStatusValue()); + + // the aggregate cluster circuit breaker remains closed + test_server_->waitForGaugeEq("cluster.aggregate_cluster.circuit_breakers.default.rq_pending_open", + 0); + test_server_->waitForGaugeEq( + "cluster.aggregate_cluster.circuit_breakers.default.remaining_pending", 1); + test_server_->waitForCounterEq("cluster.aggregate_cluster.upstream_rq_pending_overflow", 0); + // the cluster1 circuit breaker remains open and overflows again + test_server_->waitForGaugeEq("cluster.cluster_1.circuit_breakers.default.rq_pending_open", 1); + test_server_->waitForGaugeEq("cluster.cluster_1.circuit_breakers.default.remaining_pending", 0); + test_server_->waitForCounterEq("cluster.cluster_1.upstream_rq_pending_overflow", 2); + + // respond to the first request to the aggregate cluster + upstream_request_->encodeHeaders(default_response_headers_, true); + ASSERT_TRUE(aggregate_cluster_response1->waitForEndStream()); + EXPECT_EQ("200", aggregate_cluster_response1->headers().getStatusValue()); + + // wait for the second request to arrive at cluster1 + waitForNextUpstreamRequest(FirstUpstreamIndex); + + // respond to the second request to the aggregate cluster + upstream_request_->encodeHeaders(default_response_headers_, true); + ASSERT_TRUE(aggregate_cluster_response2->waitForEndStream()); + EXPECT_EQ("200", aggregate_cluster_response2->headers().getStatusValue()); + + // the aggregate cluster circuit breaker remains closed + test_server_->waitForGaugeEq("cluster.aggregate_cluster.circuit_breakers.default.rq_pending_open", + 0); + test_server_->waitForGaugeEq( + "cluster.aggregate_cluster.circuit_breakers.default.remaining_pending", 1); + test_server_->waitForCounterEq("cluster.aggregate_cluster.upstream_rq_pending_overflow", 0); + // the cluster1 circuit breaker closes + test_server_->waitForGaugeEq("cluster.cluster_1.circuit_breakers.default.rq_pending_open", 0); + test_server_->waitForGaugeEq("cluster.cluster_1.circuit_breakers.default.remaining_pending", 1); + test_server_->waitForCounterEq("cluster.cluster_1.upstream_rq_pending_overflow", 2); + + cleanupUpstreamAndDownstream(); +} + +// Tests the max_retries circuit breaker behaviour on the aggregate cluster and its underlying +// cluster1. Unlike the other circuit breakers, the aggregate cluster's retry circuit breaker opens +// when retries exceed its configured limit. cluster1's retry circuit breaker opens independently +// when direct requests to cluster1 exceed its configured limit. When either circuit breaker opens, +// additional retries to that cluster are prevented, and overflow counters increase. The two circuit +// breakers operate independently. +TEST_P(AggregateIntegrationTest, CircuitBreakerMaxRetriesTest) { + setDownstreamProtocol(Http::CodecType::HTTP2); + + config_helper_.addConfigModifier([this](envoy::config::bootstrap::v3::Bootstrap& bootstrap) { + auto* static_resources = bootstrap.mutable_static_resources(); + auto* listener = static_resources->mutable_listeners(0); + auto* filter_chain = listener->mutable_filter_chains(0); + auto* filter = filter_chain->mutable_filters(0); + envoy::extensions::filters::network::http_connection_manager::v3::HttpConnectionManager + http_connection_manager; + filter->mutable_typed_config()->UnpackTo(&http_connection_manager); + auto* virtual_host = http_connection_manager.mutable_route_config()->mutable_virtual_hosts(0); + auto* aggregate_cluster_route = virtual_host->mutable_routes(2); + auto* cluster1_route = virtual_host->mutable_routes(0); + aggregate_cluster_route->mutable_route()->mutable_retry_policy()->clear_retry_priority(); + // adjust the retry policy on both the /aggregatecluster and /cluster1 routes + aggregate_cluster_route->mutable_route()->mutable_retry_policy()->mutable_retry_on()->assign( + "5xx"); + cluster1_route->mutable_route()->mutable_retry_policy()->mutable_retry_on()->assign("5xx"); + filter->mutable_typed_config()->PackFrom(http_connection_manager); + + auto* aggregate_cluster = static_resources->mutable_clusters(1); + reduceAggregateClustersListToOneCluster(*aggregate_cluster); + setCircuitBreakerLimits(*aggregate_cluster, CircuitBreakerLimits{.max_retries = 1}); + }); + + initialize(); + + setCircuitBreakerLimits(cluster1_, CircuitBreakerLimits{.max_retries = 1}); + + EXPECT_TRUE(compareDiscoveryRequest(Config::TypeUrl::get().Cluster, "55", {}, {}, {})); + sendDiscoveryResponse(Config::TypeUrl::get().Cluster, + {cluster1_}, {cluster1_}, {}, "56"); + test_server_->waitForGaugeEq("cluster_manager.active_clusters", 3); + + // initial circuit breaker states: + // the aggregate cluster circuit breaker is closed + test_server_->waitForGaugeEq("cluster.aggregate_cluster.circuit_breakers.default.rq_retry_open", + 0); + test_server_->waitForGaugeEq( + "cluster.aggregate_cluster.circuit_breakers.default.remaining_retries", 1); + test_server_->waitForCounterEq("cluster.aggregate_cluster.upstream_rq_retry_overflow", 0); + // the cluster1 circuit breaker is closed + test_server_->waitForGaugeEq("cluster.cluster_1.circuit_breakers.default.rq_retry_open", 0); + test_server_->waitForGaugeEq("cluster.cluster_1.circuit_breakers.default.remaining_retries", 1); + test_server_->waitForCounterEq("cluster.cluster_1.upstream_rq_retry_overflow", 0); + + codec_client_ = makeHttpConnection(lookupPort("http")); + + // send a first request to the aggregate cluster + auto aggregate_cluster_response1 = codec_client_->makeHeaderOnlyRequest( + Http::TestRequestHeaderMapImpl{{":method", "GET"}, + {":path", "/aggregatecluster"}, + {":scheme", "http"}, + {":authority", "host"}}); + + // wait for the first request to arrive at cluster1 + waitForNextUpstreamRequest(FirstUpstreamIndex); + // respond to the first request with a 503 to trigger a retry + upstream_request_->encodeHeaders(Http::TestResponseHeaderMapImpl{{":status", "503"}}, true); + + // wait for the first request retry to arrive at cluster1 + waitForNextUpstreamRequest(FirstUpstreamIndex); + auto& first_request_retry = *upstream_request_; + + // the aggregate_cluster the circuit breaker opens + test_server_->waitForGaugeEq("cluster.aggregate_cluster.circuit_breakers.default.rq_retry_open", + 1); + test_server_->waitForGaugeEq( + "cluster.aggregate_cluster.circuit_breakers.default.remaining_retries", 0); + test_server_->waitForCounterEq("cluster.aggregate_cluster.upstream_rq_retry_overflow", 0); + // the cluster1 circuit breaker remains closed + test_server_->waitForGaugeEq("cluster.cluster_1.circuit_breakers.default.rq_retry_open", 0); + test_server_->waitForGaugeEq("cluster.cluster_1.circuit_breakers.default.remaining_retries", 1); + test_server_->waitForCounterEq("cluster.cluster_1.upstream_rq_retry_overflow", 0); + + // send a second request to the aggregate cluster + auto aggregate_cluster_response2 = codec_client_->makeHeaderOnlyRequest( + Http::TestRequestHeaderMapImpl{{":method", "GET"}, + {":path", "/aggregatecluster"}, + {":scheme", "http"}, + {":authority", "host"}}); + + // wait for the second request to arrive at cluster1 + waitForNextUpstreamRequest(FirstUpstreamIndex); + // respond to the second request with a 503 to trigger a retry + upstream_request_->encodeHeaders(Http::TestResponseHeaderMapImpl{{":status", "503"}}, true); + + // the aggregate_cluster the circuit breaker remains open and overflows + test_server_->waitForGaugeEq("cluster.aggregate_cluster.circuit_breakers.default.rq_retry_open", + 1); + test_server_->waitForGaugeEq( + "cluster.aggregate_cluster.circuit_breakers.default.remaining_retries", 0); + test_server_->waitForCounterEq("cluster.aggregate_cluster.upstream_rq_retry_overflow", 1); + // the cluster1 circuit breaker remains closed + test_server_->waitForGaugeEq("cluster.cluster_1.circuit_breakers.default.rq_retry_open", 0); + test_server_->waitForGaugeEq("cluster.cluster_1.circuit_breakers.default.remaining_retries", 1); + test_server_->waitForCounterEq("cluster.cluster_1.upstream_rq_retry_overflow", 0); + + // send a third request directly to cluster1 + auto cluster1_response1 = codec_client_->makeHeaderOnlyRequest(Http::TestRequestHeaderMapImpl{ + {":method", "GET"}, {":path", "/cluster1"}, {":scheme", "http"}, {":authority", "host"}}); + + // wait for the third request to arrive at cluster1 + waitForNextUpstreamRequest(FirstUpstreamIndex); + // respond to the third request with a 503 to trigger a retry + upstream_request_->encodeHeaders(Http::TestResponseHeaderMapImpl{{":status", "503"}}, true); + + // wait for the third request retry to arrive at cluster1 + waitForNextUpstreamRequest(FirstUpstreamIndex); + auto& third_request_retry = *upstream_request_; + + // the aggregate_cluster the circuit breaker remains open + test_server_->waitForGaugeEq("cluster.aggregate_cluster.circuit_breakers.default.rq_retry_open", + 1); + test_server_->waitForGaugeEq( + "cluster.aggregate_cluster.circuit_breakers.default.remaining_retries", 0); + test_server_->waitForCounterEq("cluster.aggregate_cluster.upstream_rq_retry_overflow", 1); + // the cluster1 circuit breaker opens + test_server_->waitForGaugeEq("cluster.cluster_1.circuit_breakers.default.rq_retry_open", 1); + test_server_->waitForGaugeEq("cluster.cluster_1.circuit_breakers.default.remaining_retries", 0); + test_server_->waitForCounterEq("cluster.cluster_1.upstream_rq_retry_overflow", 0); + + // send a fourth request directly to cluster1 + auto cluster1_response2 = codec_client_->makeHeaderOnlyRequest(Http::TestRequestHeaderMapImpl{ + {":method", "GET"}, {":path", "/cluster1"}, {":scheme", "http"}, {":authority", "host"}}); + + // wait for the fourth request to arrive at cluster1 + waitForNextUpstreamRequest(FirstUpstreamIndex); + // respond to the fourth request with a 503 to trigger a retry + upstream_request_->encodeHeaders(Http::TestResponseHeaderMapImpl{{":status", "503"}}, true); + + // the aggregate_cluster the circuit breaker remains open + test_server_->waitForGaugeEq("cluster.aggregate_cluster.circuit_breakers.default.rq_retry_open", + 1); + test_server_->waitForGaugeEq( + "cluster.aggregate_cluster.circuit_breakers.default.remaining_retries", 0); + test_server_->waitForCounterEq("cluster.aggregate_cluster.upstream_rq_retry_overflow", 1); + // the cluster1 circuit breaker remains open and overflows + test_server_->waitForGaugeEq("cluster.cluster_1.circuit_breakers.default.rq_retry_open", 1); + test_server_->waitForGaugeEq("cluster.cluster_1.circuit_breakers.default.remaining_retries", 0); + test_server_->waitForCounterEq("cluster.cluster_1.upstream_rq_retry_overflow", 1); + + // respond to the third request to cluster1 + third_request_retry.encodeHeaders(default_response_headers_, true); + ASSERT_TRUE(cluster1_response1->waitForEndStream()); + EXPECT_EQ("200", cluster1_response1->headers().getStatusValue()); + // handle the response to the fourth request to cluster1 + ASSERT_TRUE(cluster1_response2->waitForEndStream()); + EXPECT_EQ("503", cluster1_response2->headers().getStatusValue()); + // respond to the first request to the aggregate cluster + first_request_retry.encodeHeaders(default_response_headers_, true); + ASSERT_TRUE(aggregate_cluster_response1->waitForEndStream()); + EXPECT_EQ("200", aggregate_cluster_response1->headers().getStatusValue()); + // handle the response to the second request to the aggregate cluster + ASSERT_TRUE(aggregate_cluster_response2->waitForEndStream()); + EXPECT_EQ("503", aggregate_cluster_response2->headers().getStatusValue()); + + // the aggregate cluster circuit breaker is closed + test_server_->waitForGaugeEq("cluster.aggregate_cluster.circuit_breakers.default.rq_retry_open", + 0); + test_server_->waitForGaugeEq( + "cluster.aggregate_cluster.circuit_breakers.default.remaining_retries", 1); + test_server_->waitForCounterEq("cluster.aggregate_cluster.upstream_rq_retry_overflow", 1); + // the cluster1 circuit breaker is closed + test_server_->waitForGaugeEq("cluster.cluster_1.circuit_breakers.default.rq_retry_open", 0); + test_server_->waitForGaugeEq("cluster.cluster_1.circuit_breakers.default.remaining_retries", 1); + test_server_->waitForCounterEq("cluster.cluster_1.upstream_rq_retry_overflow", 1); + + cleanupUpstreamAndDownstream(); +} + } // namespace } // namespace Envoy