diff --git a/docs/root/intro/arch_overview/upstream/aggregate_cluster.rst b/docs/root/intro/arch_overview/upstream/aggregate_cluster.rst index ea94d2493fb15..b939eff9b079b 100644 --- a/docs/root/intro/arch_overview/upstream/aggregate_cluster.rst +++ b/docs/root/intro/arch_overview/upstream/aggregate_cluster.rst @@ -106,6 +106,27 @@ cannot locate that specific endpoint at the aggregate level. As a result, Statef aggregate clusters, because the final cluster choice is made without direct knowledge of the specific endpoint which doesn't exist at the top level. +Circuit Breakers +^^^^^^^^^^^^^^^^ + +In general, an aggregate cluster should be thought of as a cluster that groups the endpoints of the underlying clusters +together for load balancing purposes only, not for circuit breaking which is handled at the level of the underlying clusters, +not at the level of the aggregate cluster itself. This allows the aggregate cluster to maintain its failover capabilities +whilst respecting the circuit breaker limits of each underlying cluster. This is intentional as the underlying clusters +are accessible through multiple paths (directly or via the aggregate cluster) and configuring aggregate cluster circuit +breakers would effectively double the circuit breaker limits rendering them useless. + +When the configured limit is reached on the underlying cluster(s) only the underlying cluster(s)' circuit breaker opens. +When an underlying cluster's circuit breaker opens, requests routed through the aggregate cluster to that underlying cluster +will be rejected. The aggregate cluster's circuit breaker remains closed at all times, regardless of whether the circuit +breaker(s) limits on the underlying cluster(s) are reached or not. + +As an exception, the only circuit breaker configured at the aggregate cluster level is :ref:`max_retries ` +because when Envoy processes a retry request, it needs to determine whether the retry limit has been exceeded before +the aggregate cluster is able to choose the underlying cluster to use. When the configured limit is reached the aggregate +cluster's circuit breaker opens, and subsequent requests to the aggregate cluster path cannot retry, even though the +underlying cluster's retry budget is still available. + Load Balancing Example ---------------------- diff --git a/test/extensions/clusters/aggregate/BUILD b/test/extensions/clusters/aggregate/BUILD index 41bc27f105e8f..b6ff2718e79c7 100644 --- a/test/extensions/clusters/aggregate/BUILD +++ b/test/extensions/clusters/aggregate/BUILD @@ -83,5 +83,7 @@ envoy_extension_cc_test( "//test/test_common:resources_lib", "//test/test_common:utility_lib", "@envoy_api//envoy/config/cluster/v3:pkg_cc_proto", + "@envoy_api//envoy/extensions/clusters/aggregate/v3:pkg_cc_proto", + "@envoy_api//envoy/extensions/upstreams/http/v3:pkg_cc_proto", ], ) diff --git a/test/extensions/clusters/aggregate/cluster_integration_test.cc b/test/extensions/clusters/aggregate/cluster_integration_test.cc index 1a76054f9aee4..5d8af67437583 100644 --- a/test/extensions/clusters/aggregate/cluster_integration_test.cc +++ b/test/extensions/clusters/aggregate/cluster_integration_test.cc @@ -1,4 +1,6 @@ #include "envoy/config/cluster/v3/cluster.pb.h" +#include "envoy/extensions/clusters/aggregate/v3/cluster.pb.h" +#include "envoy/extensions/upstreams/http/v3/http_protocol_options.pb.h" #include "envoy/grpc/status.h" #include "envoy/stats/scope.h" @@ -29,6 +31,77 @@ const char SecondClusterName[] = "cluster_2"; const int FirstUpstreamIndex = 2; const int SecondUpstreamIndex = 3; +struct CircuitBreakerLimits { + uint32_t max_connections = 1024; + uint32_t max_requests = 1024; + uint32_t max_pending_requests = 1024; + uint32_t max_retries = 3; + uint32_t max_connection_pools = std::numeric_limits::max(); + + CircuitBreakerLimits& withMaxConnections(uint32_t max_connections) { + this->max_connections = max_connections; + return *this; + } + + CircuitBreakerLimits& withMaxRequests(uint32_t max_requests) { + this->max_requests = max_requests; + return *this; + } + + CircuitBreakerLimits& withMaxPendingRequests(uint32_t max_pending_requests) { + this->max_pending_requests = max_pending_requests; + return *this; + } + + CircuitBreakerLimits& withMaxRetries(uint32_t max_retries) { + this->max_retries = max_retries; + return *this; + } +}; + +void setCircuitBreakerLimits(envoy::config::cluster::v3::Cluster& cluster, + const CircuitBreakerLimits& limits) { + auto* cluster_circuit_breakers = cluster.mutable_circuit_breakers(); + + auto* cluster_circuit_breakers_threshold_default = cluster_circuit_breakers->add_thresholds(); + cluster_circuit_breakers_threshold_default->set_priority( + envoy::config::core::v3::RoutingPriority::DEFAULT); + + cluster_circuit_breakers_threshold_default->mutable_max_connections()->set_value( + limits.max_connections); + cluster_circuit_breakers_threshold_default->mutable_max_pending_requests()->set_value( + limits.max_pending_requests); + cluster_circuit_breakers_threshold_default->mutable_max_requests()->set_value( + limits.max_requests); + cluster_circuit_breakers_threshold_default->mutable_max_retries()->set_value(limits.max_retries); + cluster_circuit_breakers_threshold_default->mutable_max_connection_pools()->set_value( + limits.max_connection_pools); + cluster_circuit_breakers_threshold_default->set_track_remaining(true); +}; + +void setMaxConcurrentStreams(envoy::config::cluster::v3::Cluster& cluster, + uint32_t max_concurrent_streams) { + envoy::extensions::upstreams::http::v3::HttpProtocolOptions http_protocol_options; + http_protocol_options.mutable_explicit_http_config() + ->mutable_http2_protocol_options() + ->mutable_max_concurrent_streams() + ->set_value(max_concurrent_streams); + (*cluster.mutable_typed_extension_protocol_options()) + ["envoy.extensions.upstreams.http.v3.HttpProtocolOptions"] + .PackFrom(http_protocol_options); +}; + +void reduceAggregateClustersListToOneCluster( + envoy::config::cluster::v3::Cluster& aggregate_cluster) { + auto* aggregate_cluster_type = aggregate_cluster.mutable_cluster_type(); + auto* aggregate_cluster_typed_config = aggregate_cluster_type->mutable_typed_config(); + envoy::extensions::clusters::aggregate::v3::ClusterConfig new_aggregate_cluster_typed_config; + aggregate_cluster_typed_config->UnpackTo(&new_aggregate_cluster_typed_config); + new_aggregate_cluster_typed_config.clear_clusters(); + new_aggregate_cluster_typed_config.add_clusters("cluster_1"); + aggregate_cluster_typed_config->PackFrom(new_aggregate_cluster_typed_config); +}; + const std::string& config() { CONSTRUCT_ON_FIRST_USE(std::string, fmt::format(R"EOF( admin: @@ -303,5 +376,565 @@ TEST_P(AggregateIntegrationTest, PreviousPrioritiesRetryPredicate) { cleanupUpstreamAndDownstream(); } +// Tests the max_connections circuit breaker behaviour on the aggregate cluster and its underlying +// cluster1. When connections reach the configured limit, only the underlying cluster1's circuit +// breaker opens, the aggregate cluster's circuit breaker is completely unaffected. When cluster1's +// circuit breaker opens, new connections are rejected, and the overflow counter increases. After +// closing the connection, cluster1's circuit breaker closes again. +TEST_P(AggregateIntegrationTest, CircuitBreakerTestMaxConnections) { + setDownstreamProtocol(Http::CodecType::HTTP2); + + config_helper_.addConfigModifier([](envoy::config::bootstrap::v3::Bootstrap& bootstrap) { + auto* static_resources = bootstrap.mutable_static_resources(); + auto* aggregate_cluster = static_resources->mutable_clusters(1); + + reduceAggregateClustersListToOneCluster(*aggregate_cluster); + setCircuitBreakerLimits(*aggregate_cluster, CircuitBreakerLimits{}.withMaxConnections(1)); + setMaxConcurrentStreams(*aggregate_cluster, 1); + }); + + initialize(); + + setCircuitBreakerLimits(cluster1_, CircuitBreakerLimits{}.withMaxConnections(1)); + setMaxConcurrentStreams(cluster1_, 1); + + EXPECT_TRUE(compareDiscoveryRequest(Config::TypeUrl::get().Cluster, "55", {}, {}, {})); + sendDiscoveryResponse(Config::TypeUrl::get().Cluster, + {cluster1_}, {cluster1_}, {}, "56"); + test_server_->waitForGaugeEq("cluster_manager.active_clusters", 3); + + // initial circuit breaker states: + // the aggregate cluster circuit breaker is closed + test_server_->waitForGaugeEq("cluster.aggregate_cluster.circuit_breakers.default.cx_open", 0); + test_server_->waitForGaugeEq("cluster.aggregate_cluster.circuit_breakers.default.remaining_cx", + 1); + test_server_->waitForCounterEq("cluster.aggregate_cluster.upstream_cx_overflow", 0); + // the cluster1 circuit breaker is closed + test_server_->waitForGaugeEq("cluster.cluster_1.circuit_breakers.default.cx_open", 0); + test_server_->waitForGaugeEq("cluster.cluster_1.circuit_breakers.default.remaining_cx", 1); + test_server_->waitForCounterEq("cluster.cluster_1.upstream_cx_overflow", 0); + + codec_client_ = makeHttpConnection(lookupPort("http")); + + // send a first request to the aggregate cluster + auto aggregate_cluster_response1 = codec_client_->makeHeaderOnlyRequest( + Http::TestRequestHeaderMapImpl{{":method", "GET"}, + {":path", "/aggregatecluster"}, + {":scheme", "http"}, + {":authority", "host"}}); + + // wait for the first request to arrive at cluster1 + waitForNextUpstreamRequest(FirstUpstreamIndex); + + // after the first request arrives at cluster1 (there is now a single active upstream connection) + // the aggregate cluster circuit breaker remains closed + test_server_->waitForGaugeEq("cluster.aggregate_cluster.circuit_breakers.default.cx_open", 0); + test_server_->waitForGaugeEq("cluster.aggregate_cluster.circuit_breakers.default.remaining_cx", + 1); + test_server_->waitForCounterEq("cluster.aggregate_cluster.upstream_cx_overflow", 0); + // the cluster1 circuit breaker opens + test_server_->waitForGaugeEq("cluster.cluster_1.circuit_breakers.default.cx_open", 1); + test_server_->waitForGaugeEq("cluster.cluster_1.circuit_breakers.default.remaining_cx", 0); + test_server_->waitForCounterEq("cluster.cluster_1.upstream_cx_overflow", 0); + + // send a second request to the aggregate cluster + auto aggregate_cluster_response2 = codec_client_->makeHeaderOnlyRequest( + Http::TestRequestHeaderMapImpl{{":method", "GET"}, + {":path", "/aggregatecluster"}, + {":scheme", "http"}, + {":authority", "host"}}); + + // the second request is rejected because the cluster1 circuit breaker is open + // the aggregate cluster circuit breaker remains closed + test_server_->waitForGaugeEq("cluster.aggregate_cluster.circuit_breakers.default.cx_open", 0); + test_server_->waitForGaugeEq("cluster.aggregate_cluster.circuit_breakers.default.remaining_cx", + 1); + test_server_->waitForCounterEq("cluster.aggregate_cluster.upstream_cx_overflow", 0); + // the cluster1 circuit breaker remains open and overflows + test_server_->waitForGaugeEq("cluster.cluster_1.circuit_breakers.default.cx_open", 1); + test_server_->waitForGaugeEq("cluster.cluster_1.circuit_breakers.default.remaining_cx", 0); + test_server_->waitForCounterEq("cluster.cluster_1.upstream_cx_overflow", 1); + + // the requests to the aggregate cluster route affect the cluster1 circuit breaker state + // send a third request directly to cluster1 to confirm this + auto cluster1_response1 = codec_client_->makeHeaderOnlyRequest(Http::TestRequestHeaderMapImpl{ + {":method", "GET"}, {":path", "/cluster1"}, {":scheme", "http"}, {":authority", "host"}}); + + // the third request is rejected because the cluster1 circuit breaker is already open + // the aggregate cluster circuit breaker remains closed + test_server_->waitForGaugeEq("cluster.aggregate_cluster.circuit_breakers.default.cx_open", 0); + test_server_->waitForGaugeEq("cluster.aggregate_cluster.circuit_breakers.default.remaining_cx", + 1); + test_server_->waitForCounterEq("cluster.aggregate_cluster.upstream_cx_overflow", 0); + // the cluster1 circuit breaker remains open and overflows again + test_server_->waitForGaugeEq("cluster.cluster_1.circuit_breakers.default.cx_open", 1); + test_server_->waitForGaugeEq("cluster.cluster_1.circuit_breakers.default.remaining_cx", 0); + test_server_->waitForCounterEq("cluster.cluster_1.upstream_cx_overflow", 2); + + // respond to the first request to the aggregate cluster + upstream_request_->encodeHeaders(default_response_headers_, true); + ASSERT_TRUE(upstream_request_->waitForEndStream(*dispatcher_)); + + // the first request completes successfully + ASSERT_TRUE(aggregate_cluster_response1->waitForEndStream()); + EXPECT_EQ("200", aggregate_cluster_response1->headers().getStatusValue()); + + ASSERT_TRUE(fake_upstream_connection_->close()); + ASSERT_TRUE(fake_upstream_connection_->waitForDisconnect()); + + // after completing the first request and closing the connection + // the aggregate cluster circuit breaker remains closed + test_server_->waitForGaugeEq("cluster.aggregate_cluster.circuit_breakers.default.cx_open", 0); + test_server_->waitForGaugeEq("cluster.aggregate_cluster.circuit_breakers.default.remaining_cx", + 1); + test_server_->waitForCounterEq("cluster.aggregate_cluster.upstream_cx_overflow", 0); + // the cluster1 circuit breaker closes + test_server_->waitForGaugeEq("cluster.cluster_1.circuit_breakers.default.cx_open", 0); + test_server_->waitForGaugeEq("cluster.cluster_1.circuit_breakers.default.remaining_cx", 1); + // the overflow may be greater than 2 because after completing the first request + // the queued pending requests will attempt to reuse the connection + test_server_->waitForCounterGe("cluster.cluster_1.upstream_cx_overflow", 2); + + cleanupUpstreamAndDownstream(); +} + +// Tests the max_requests circuit breaker behaviour on the aggregate cluster and its underlying +// cluster1. When requests reach the configured limit, only the underlying cluster1's circuit +// breaker opens, the aggregate cluster's circuit breaker is completely unaffected. When +// cluster1's circuit breaker opens, new requests are rejected, and the overflow counter increases. +// After completing the request, cluster1's circuit breaker closes again. +TEST_P(AggregateIntegrationTest, CircuitBreakerTestMaxRequests) { + setDownstreamProtocol(Http::CodecType::HTTP2); + + config_helper_.addConfigModifier([](envoy::config::bootstrap::v3::Bootstrap& bootstrap) { + auto* static_resources = bootstrap.mutable_static_resources(); + auto* aggregate_cluster = static_resources->mutable_clusters(1); + + reduceAggregateClustersListToOneCluster(*aggregate_cluster); + setCircuitBreakerLimits(*aggregate_cluster, CircuitBreakerLimits{}.withMaxRequests(1)); + }); + + initialize(); + + setCircuitBreakerLimits(cluster1_, CircuitBreakerLimits{}.withMaxRequests(1)); + + EXPECT_TRUE(compareDiscoveryRequest(Config::TypeUrl::get().Cluster, "55", {}, {}, {})); + sendDiscoveryResponse(Config::TypeUrl::get().Cluster, + {cluster1_}, {cluster1_}, {}, "56"); + test_server_->waitForGaugeEq("cluster_manager.active_clusters", 3); + + // initial circuit breaker states: + // the aggregate cluster circuit breaker is closed + test_server_->waitForGaugeEq("cluster.aggregate_cluster.circuit_breakers.default.rq_open", 0); + test_server_->waitForGaugeEq("cluster.aggregate_cluster.circuit_breakers.default.remaining_rq", + 1); + test_server_->waitForCounterEq("cluster.aggregate_cluster.upstream_rq_pending_overflow", 0); + // the cluster1 circuit breaker is closed + test_server_->waitForGaugeEq("cluster.cluster_1.circuit_breakers.default.rq_open", 0); + test_server_->waitForGaugeEq("cluster.cluster_1.circuit_breakers.default.remaining_rq", 1); + test_server_->waitForCounterEq("cluster.cluster_1.upstream_rq_pending_overflow", 0); + + codec_client_ = makeHttpConnection(lookupPort("http")); + + // send a first request to the aggregate cluster + auto aggregate_cluster_response1 = codec_client_->makeHeaderOnlyRequest( + Http::TestRequestHeaderMapImpl{{":method", "GET"}, + {":path", "/aggregatecluster"}, + {":scheme", "http"}, + {":authority", "host"}}); + + // wait for the first request to arrive at cluster1 + waitForNextUpstreamRequest(FirstUpstreamIndex); + + // after the first request arrives at cluster1 (there is now a single active upstream request) + // the aggregate cluster circuit breaker remains closed + test_server_->waitForGaugeEq("cluster.aggregate_cluster.circuit_breakers.default.rq_open", 0); + test_server_->waitForGaugeEq("cluster.aggregate_cluster.circuit_breakers.default.remaining_rq", + 1); + test_server_->waitForCounterEq("cluster.aggregate_cluster.upstream_rq_pending_overflow", 0); + // the cluster1 circuit breaker opens + test_server_->waitForGaugeEq("cluster.cluster_1.circuit_breakers.default.rq_open", 1); + test_server_->waitForGaugeEq("cluster.cluster_1.circuit_breakers.default.remaining_rq", 0); + test_server_->waitForCounterEq("cluster.cluster_1.upstream_rq_pending_overflow", 0); + + // send a second request to the aggregate cluster + auto aggregate_cluster_response2 = codec_client_->makeHeaderOnlyRequest( + Http::TestRequestHeaderMapImpl{{":method", "GET"}, + {":path", "/aggregatecluster"}, + {":scheme", "http"}, + {":authority", "host"}}); + + ASSERT_TRUE(aggregate_cluster_response2->waitForEndStream()); + // the second request to the aggregate cluster is rejected + EXPECT_EQ("503", aggregate_cluster_response2->headers().getStatusValue()); + + // the aggregate cluster circuit breaker remains closed + test_server_->waitForGaugeEq("cluster.aggregate_cluster.circuit_breakers.default.rq_open", 0); + test_server_->waitForGaugeEq("cluster.aggregate_cluster.circuit_breakers.default.remaining_rq", + 1); + test_server_->waitForCounterEq("cluster.aggregate_cluster.upstream_rq_pending_overflow", 0); + // the cluster1 circuit breaker remains open and overflows + test_server_->waitForGaugeEq("cluster.cluster_1.circuit_breakers.default.rq_open", 1); + test_server_->waitForGaugeEq("cluster.cluster_1.circuit_breakers.default.remaining_rq", 0); + test_server_->waitForCounterEq("cluster.cluster_1.upstream_rq_pending_overflow", 1); + + // the requests to the aggregate cluster route affect the cluster1 circuit breaker state + // send a third request directly to cluster1 to confirm this + auto cluster1_response1 = codec_client_->makeHeaderOnlyRequest(Http::TestRequestHeaderMapImpl{ + {":method", "GET"}, {":path", "/cluster1"}, {":scheme", "http"}, {":authority", "host"}}); + + ASSERT_TRUE(cluster1_response1->waitForEndStream()); + // the third request to cluster1 is rejected + EXPECT_EQ("503", cluster1_response1->headers().getStatusValue()); + + // the aggregate cluster circuit breaker remains closed + test_server_->waitForGaugeEq("cluster.aggregate_cluster.circuit_breakers.default.rq_open", 0); + test_server_->waitForGaugeEq("cluster.aggregate_cluster.circuit_breakers.default.remaining_rq", + 1); + test_server_->waitForCounterEq("cluster.aggregate_cluster.upstream_rq_pending_overflow", 0); + // the cluster1 circuit breaker remains open and overflows again + test_server_->waitForGaugeEq("cluster.cluster_1.circuit_breakers.default.rq_open", 1); + test_server_->waitForGaugeEq("cluster.cluster_1.circuit_breakers.default.remaining_rq", 0); + test_server_->waitForCounterEq("cluster.cluster_1.upstream_rq_pending_overflow", 2); + + // respond to the first request to the aggregate cluster + upstream_request_->encodeHeaders(default_response_headers_, true); + ASSERT_TRUE(upstream_request_->waitForEndStream(*dispatcher_)); + // the first request completes successfully + ASSERT_TRUE(aggregate_cluster_response1->waitForEndStream()); + EXPECT_EQ("200", aggregate_cluster_response1->headers().getStatusValue()); + + // the aggregate cluster circuit breaker remains closed + test_server_->waitForGaugeEq("cluster.aggregate_cluster.circuit_breakers.default.rq_open", 0); + test_server_->waitForGaugeEq("cluster.aggregate_cluster.circuit_breakers.default.remaining_rq", + 1); + test_server_->waitForCounterEq("cluster.aggregate_cluster.upstream_rq_pending_overflow", 0); + // the cluster1 circuit breaker closes + test_server_->waitForGaugeEq("cluster.cluster_1.circuit_breakers.default.rq_open", 0); + test_server_->waitForGaugeEq("cluster.cluster_1.circuit_breakers.default.remaining_rq", 1); + test_server_->waitForCounterEq("cluster.cluster_1.upstream_rq_pending_overflow", 2); + + cleanupUpstreamAndDownstream(); +} + +// Tests the max_pending_requests circuit breaker behaviour on the aggregate cluster and its +// underlying cluster1. When pending requests reach the configured limit, only the underlying +// cluster1's circuit breaker opens, the aggregate cluster's circuit breaker is completely +// unaffected. When cluster1's circuit breaker opens, additional pending requests are rejected, and +// the overflow counter increases. After processing the pending requests, cluster1's circuit breaker +// closes again. +TEST_P(AggregateIntegrationTest, CircuitBreakerTestMaxPendingRequests) { + setDownstreamProtocol(Http::CodecType::HTTP2); + + config_helper_.addConfigModifier([](envoy::config::bootstrap::v3::Bootstrap& bootstrap) { + auto* static_resources = bootstrap.mutable_static_resources(); + auto* aggregate_cluster = static_resources->mutable_clusters(1); + + reduceAggregateClustersListToOneCluster(*aggregate_cluster); + setCircuitBreakerLimits(*aggregate_cluster, + CircuitBreakerLimits{}.withMaxConnections(1).withMaxPendingRequests(1)); + setMaxConcurrentStreams(*aggregate_cluster, 1); + }); + + initialize(); + + setCircuitBreakerLimits(cluster1_, + CircuitBreakerLimits{}.withMaxConnections(1).withMaxPendingRequests(1)); + setMaxConcurrentStreams(cluster1_, 1); + + EXPECT_TRUE(compareDiscoveryRequest(Config::TypeUrl::get().Cluster, "55", {}, {}, {})); + sendDiscoveryResponse(Config::TypeUrl::get().Cluster, + {cluster1_}, {cluster1_}, {}, "56"); + test_server_->waitForGaugeEq("cluster_manager.active_clusters", 3); + + // initial circuit breaker states: + // the aggregate cluster circuit breaker is closed + test_server_->waitForGaugeEq("cluster.aggregate_cluster.circuit_breakers.default.rq_pending_open", + 0); + test_server_->waitForGaugeEq( + "cluster.aggregate_cluster.circuit_breakers.default.remaining_pending", 1); + test_server_->waitForCounterEq("cluster.aggregate_cluster.upstream_rq_pending_overflow", 0); + // the cluster1 circuit breaker is closed + test_server_->waitForGaugeEq("cluster.cluster_1.circuit_breakers.default.rq_pending_open", 0); + test_server_->waitForGaugeEq("cluster.cluster_1.circuit_breakers.default.remaining_pending", 1); + test_server_->waitForCounterEq("cluster.cluster_1.upstream_rq_pending_overflow", 0); + + codec_client_ = makeHttpConnection(lookupPort("http")); + + // send a first request to the aggregate cluster + auto aggregate_cluster_response1 = codec_client_->makeHeaderOnlyRequest( + Http::TestRequestHeaderMapImpl{{":method", "GET"}, + {":path", "/aggregatecluster"}, + {":scheme", "http"}, + {":authority", "host"}}); + + // wait for the first request to arrive at cluster1 + waitForNextUpstreamRequest(FirstUpstreamIndex); + + // send a second request to the aggregate cluster (this is the first pending request) + auto aggregate_cluster_response2 = codec_client_->makeHeaderOnlyRequest( + Http::TestRequestHeaderMapImpl{{":method", "GET"}, + {":path", "/aggregatecluster"}, + {":scheme", "http"}, + {":authority", "host"}}); + + // the aggregate cluster circuit breaker remains closed + test_server_->waitForGaugeEq("cluster.aggregate_cluster.circuit_breakers.default.rq_pending_open", + 0); + test_server_->waitForGaugeEq( + "cluster.aggregate_cluster.circuit_breakers.default.remaining_pending", 1); + test_server_->waitForCounterEq("cluster.aggregate_cluster.upstream_rq_pending_overflow", 0); + // the cluster1 circuit breaker opens + test_server_->waitForGaugeEq("cluster.cluster_1.circuit_breakers.default.rq_pending_open", 1); + test_server_->waitForGaugeEq("cluster.cluster_1.circuit_breakers.default.remaining_pending", 0); + test_server_->waitForCounterEq("cluster.cluster_1.upstream_rq_pending_overflow", 0); + + // send a third request to the aggregate cluster (this is the second pending request) + auto aggregate_cluster_response3 = codec_client_->makeHeaderOnlyRequest( + Http::TestRequestHeaderMapImpl{{":method", "GET"}, + {":path", "/aggregatecluster"}, + {":scheme", "http"}, + {":authority", "host"}}); + + // the third request fails immediately because the cluster1 circuit breaker is open + ASSERT_TRUE(aggregate_cluster_response3->waitForEndStream()); + EXPECT_EQ("503", aggregate_cluster_response3->headers().getStatusValue()); + + // the aggregate cluster circuit breaker remains closed + test_server_->waitForGaugeEq("cluster.aggregate_cluster.circuit_breakers.default.rq_pending_open", + 0); + test_server_->waitForGaugeEq( + "cluster.aggregate_cluster.circuit_breakers.default.remaining_pending", 1); + test_server_->waitForCounterEq("cluster.aggregate_cluster.upstream_rq_pending_overflow", 0); + // the cluster1 circuit breaker remains open and overflows + test_server_->waitForGaugeEq("cluster.cluster_1.circuit_breakers.default.rq_pending_open", 1); + test_server_->waitForGaugeEq("cluster.cluster_1.circuit_breakers.default.remaining_pending", 0); + test_server_->waitForCounterEq("cluster.cluster_1.upstream_rq_pending_overflow", 1); + + // the pending requests to the aggregate cluster route affect the cluster1 circuit breaker state + // send a fourth request directly to cluster1 to confirm this + auto cluster1_response1 = codec_client_->makeHeaderOnlyRequest(Http::TestRequestHeaderMapImpl{ + {":method", "GET"}, {":path", "/cluster1"}, {":scheme", "http"}, {":authority", "host"}}); + + // the fourth request fails immediately because the cluster1 circuit breaker is open + ASSERT_TRUE(cluster1_response1->waitForEndStream()); + EXPECT_EQ("503", cluster1_response1->headers().getStatusValue()); + + // the aggregate cluster circuit breaker remains closed + test_server_->waitForGaugeEq("cluster.aggregate_cluster.circuit_breakers.default.rq_pending_open", + 0); + test_server_->waitForGaugeEq( + "cluster.aggregate_cluster.circuit_breakers.default.remaining_pending", 1); + test_server_->waitForCounterEq("cluster.aggregate_cluster.upstream_rq_pending_overflow", 0); + // the cluster1 circuit breaker remains open and overflows again + test_server_->waitForGaugeEq("cluster.cluster_1.circuit_breakers.default.rq_pending_open", 1); + test_server_->waitForGaugeEq("cluster.cluster_1.circuit_breakers.default.remaining_pending", 0); + test_server_->waitForCounterEq("cluster.cluster_1.upstream_rq_pending_overflow", 2); + + // respond to the first request to the aggregate cluster + upstream_request_->encodeHeaders(default_response_headers_, true); + ASSERT_TRUE(aggregate_cluster_response1->waitForEndStream()); + EXPECT_EQ("200", aggregate_cluster_response1->headers().getStatusValue()); + + // wait for the second request to arrive at cluster1 + waitForNextUpstreamRequest(FirstUpstreamIndex); + + // respond to the second request to the aggregate cluster + upstream_request_->encodeHeaders(default_response_headers_, true); + ASSERT_TRUE(aggregate_cluster_response2->waitForEndStream()); + EXPECT_EQ("200", aggregate_cluster_response2->headers().getStatusValue()); + + // the aggregate cluster circuit breaker remains closed + test_server_->waitForGaugeEq("cluster.aggregate_cluster.circuit_breakers.default.rq_pending_open", + 0); + test_server_->waitForGaugeEq( + "cluster.aggregate_cluster.circuit_breakers.default.remaining_pending", 1); + test_server_->waitForCounterEq("cluster.aggregate_cluster.upstream_rq_pending_overflow", 0); + // the cluster1 circuit breaker closes + test_server_->waitForGaugeEq("cluster.cluster_1.circuit_breakers.default.rq_pending_open", 0); + test_server_->waitForGaugeEq("cluster.cluster_1.circuit_breakers.default.remaining_pending", 1); + test_server_->waitForCounterEq("cluster.cluster_1.upstream_rq_pending_overflow", 2); + + cleanupUpstreamAndDownstream(); +} + +// Tests the max_retries circuit breaker behaviour on the aggregate cluster and its underlying +// cluster1. Unlike the other circuit breakers, the aggregate cluster's retry circuit breaker opens +// when retries exceed its configured limit. cluster1's retry circuit breaker opens independently +// when direct requests to cluster1 exceed its configured limit. When either circuit breaker opens, +// additional retries to that cluster are prevented, and overflow counters increase. The two circuit +// breakers operate independently. +TEST_P(AggregateIntegrationTest, CircuitBreakerMaxRetriesTest) { + setDownstreamProtocol(Http::CodecType::HTTP2); + + config_helper_.addConfigModifier([](envoy::config::bootstrap::v3::Bootstrap& bootstrap) { + auto* static_resources = bootstrap.mutable_static_resources(); + auto* listener = static_resources->mutable_listeners(0); + auto* filter_chain = listener->mutable_filter_chains(0); + auto* filter = filter_chain->mutable_filters(0); + envoy::extensions::filters::network::http_connection_manager::v3::HttpConnectionManager + http_connection_manager; + filter->mutable_typed_config()->UnpackTo(&http_connection_manager); + auto* virtual_host = http_connection_manager.mutable_route_config()->mutable_virtual_hosts(0); + auto* aggregate_cluster_route = virtual_host->mutable_routes(2); + auto* cluster1_route = virtual_host->mutable_routes(0); + aggregate_cluster_route->mutable_route()->mutable_retry_policy()->clear_retry_priority(); + // adjust the retry policy on both the /aggregatecluster and /cluster1 routes + aggregate_cluster_route->mutable_route()->mutable_retry_policy()->mutable_retry_on()->assign( + "5xx"); + cluster1_route->mutable_route()->mutable_retry_policy()->mutable_retry_on()->assign("5xx"); + filter->mutable_typed_config()->PackFrom(http_connection_manager); + + auto* aggregate_cluster = static_resources->mutable_clusters(1); + reduceAggregateClustersListToOneCluster(*aggregate_cluster); + setCircuitBreakerLimits(*aggregate_cluster, CircuitBreakerLimits{}.withMaxRetries(1)); + }); + + initialize(); + + setCircuitBreakerLimits(cluster1_, CircuitBreakerLimits{}.withMaxRetries(1)); + + EXPECT_TRUE(compareDiscoveryRequest(Config::TypeUrl::get().Cluster, "55", {}, {}, {})); + sendDiscoveryResponse(Config::TypeUrl::get().Cluster, + {cluster1_}, {cluster1_}, {}, "56"); + test_server_->waitForGaugeEq("cluster_manager.active_clusters", 3); + + // initial circuit breaker states: + // the aggregate cluster circuit breaker is closed + test_server_->waitForGaugeEq("cluster.aggregate_cluster.circuit_breakers.default.rq_retry_open", + 0); + test_server_->waitForGaugeEq( + "cluster.aggregate_cluster.circuit_breakers.default.remaining_retries", 1); + test_server_->waitForCounterEq("cluster.aggregate_cluster.upstream_rq_retry_overflow", 0); + // the cluster1 circuit breaker is closed + test_server_->waitForGaugeEq("cluster.cluster_1.circuit_breakers.default.rq_retry_open", 0); + test_server_->waitForGaugeEq("cluster.cluster_1.circuit_breakers.default.remaining_retries", 1); + test_server_->waitForCounterEq("cluster.cluster_1.upstream_rq_retry_overflow", 0); + + codec_client_ = makeHttpConnection(lookupPort("http")); + + // send a first request to the aggregate cluster + auto aggregate_cluster_response1 = codec_client_->makeHeaderOnlyRequest( + Http::TestRequestHeaderMapImpl{{":method", "GET"}, + {":path", "/aggregatecluster"}, + {":scheme", "http"}, + {":authority", "host"}}); + + // wait for the first request to arrive at cluster1 + waitForNextUpstreamRequest(FirstUpstreamIndex); + // respond to the first request with a 503 to trigger a retry + upstream_request_->encodeHeaders(Http::TestResponseHeaderMapImpl{{":status", "503"}}, true); + + // wait for the first request retry to arrive at cluster1 + waitForNextUpstreamRequest(FirstUpstreamIndex); + auto first_request_retry = std::move(upstream_request_); + + // the aggregate_cluster the circuit breaker opens + test_server_->waitForGaugeEq("cluster.aggregate_cluster.circuit_breakers.default.rq_retry_open", + 1); + test_server_->waitForGaugeEq( + "cluster.aggregate_cluster.circuit_breakers.default.remaining_retries", 0); + test_server_->waitForCounterEq("cluster.aggregate_cluster.upstream_rq_retry_overflow", 0); + // the cluster1 circuit breaker remains closed + test_server_->waitForGaugeEq("cluster.cluster_1.circuit_breakers.default.rq_retry_open", 0); + test_server_->waitForGaugeEq("cluster.cluster_1.circuit_breakers.default.remaining_retries", 1); + test_server_->waitForCounterEq("cluster.cluster_1.upstream_rq_retry_overflow", 0); + + // send a second request to the aggregate cluster + auto aggregate_cluster_response2 = codec_client_->makeHeaderOnlyRequest( + Http::TestRequestHeaderMapImpl{{":method", "GET"}, + {":path", "/aggregatecluster"}, + {":scheme", "http"}, + {":authority", "host"}}); + + // wait for the second request to arrive at cluster1 + waitForNextUpstreamRequest(FirstUpstreamIndex); + // respond to the second request with a 503 to trigger a retry + upstream_request_->encodeHeaders(Http::TestResponseHeaderMapImpl{{":status", "503"}}, true); + + // the aggregate_cluster the circuit breaker remains open and overflows + test_server_->waitForGaugeEq("cluster.aggregate_cluster.circuit_breakers.default.rq_retry_open", + 1); + test_server_->waitForGaugeEq( + "cluster.aggregate_cluster.circuit_breakers.default.remaining_retries", 0); + test_server_->waitForCounterEq("cluster.aggregate_cluster.upstream_rq_retry_overflow", 1); + // the cluster1 circuit breaker remains closed + test_server_->waitForGaugeEq("cluster.cluster_1.circuit_breakers.default.rq_retry_open", 0); + test_server_->waitForGaugeEq("cluster.cluster_1.circuit_breakers.default.remaining_retries", 1); + test_server_->waitForCounterEq("cluster.cluster_1.upstream_rq_retry_overflow", 0); + + // send a third request directly to cluster1 + auto cluster1_response1 = codec_client_->makeHeaderOnlyRequest(Http::TestRequestHeaderMapImpl{ + {":method", "GET"}, {":path", "/cluster1"}, {":scheme", "http"}, {":authority", "host"}}); + + // wait for the third request to arrive at cluster1 + waitForNextUpstreamRequest(FirstUpstreamIndex); + // respond to the third request with a 503 to trigger a retry + upstream_request_->encodeHeaders(Http::TestResponseHeaderMapImpl{{":status", "503"}}, true); + + // wait for the third request retry to arrive at cluster1 + waitForNextUpstreamRequest(FirstUpstreamIndex); + auto third_request_retry = std::move(upstream_request_); + + // the aggregate_cluster the circuit breaker remains open + test_server_->waitForGaugeEq("cluster.aggregate_cluster.circuit_breakers.default.rq_retry_open", + 1); + test_server_->waitForGaugeEq( + "cluster.aggregate_cluster.circuit_breakers.default.remaining_retries", 0); + test_server_->waitForCounterEq("cluster.aggregate_cluster.upstream_rq_retry_overflow", 1); + // the cluster1 circuit breaker opens + test_server_->waitForGaugeEq("cluster.cluster_1.circuit_breakers.default.rq_retry_open", 1); + test_server_->waitForGaugeEq("cluster.cluster_1.circuit_breakers.default.remaining_retries", 0); + test_server_->waitForCounterEq("cluster.cluster_1.upstream_rq_retry_overflow", 0); + + // send a fourth request directly to cluster1 + auto cluster1_response2 = codec_client_->makeHeaderOnlyRequest(Http::TestRequestHeaderMapImpl{ + {":method", "GET"}, {":path", "/cluster1"}, {":scheme", "http"}, {":authority", "host"}}); + + // wait for the fourth request to arrive at cluster1 + waitForNextUpstreamRequest(FirstUpstreamIndex); + // respond to the fourth request with a 503 to trigger a retry + upstream_request_->encodeHeaders(Http::TestResponseHeaderMapImpl{{":status", "503"}}, true); + + // the aggregate_cluster the circuit breaker remains open + test_server_->waitForGaugeEq("cluster.aggregate_cluster.circuit_breakers.default.rq_retry_open", + 1); + test_server_->waitForGaugeEq( + "cluster.aggregate_cluster.circuit_breakers.default.remaining_retries", 0); + test_server_->waitForCounterEq("cluster.aggregate_cluster.upstream_rq_retry_overflow", 1); + // the cluster1 circuit breaker remains open and overflows + test_server_->waitForGaugeEq("cluster.cluster_1.circuit_breakers.default.rq_retry_open", 1); + test_server_->waitForGaugeEq("cluster.cluster_1.circuit_breakers.default.remaining_retries", 0); + test_server_->waitForCounterEq("cluster.cluster_1.upstream_rq_retry_overflow", 1); + + // respond to the third request to cluster1 + third_request_retry->encodeHeaders(default_response_headers_, true); + ASSERT_TRUE(cluster1_response1->waitForEndStream()); + EXPECT_EQ("200", cluster1_response1->headers().getStatusValue()); + // handle the response to the fourth request to cluster1 + ASSERT_TRUE(cluster1_response2->waitForEndStream()); + EXPECT_EQ("503", cluster1_response2->headers().getStatusValue()); + // respond to the first request to the aggregate cluster + first_request_retry->encodeHeaders(default_response_headers_, true); + ASSERT_TRUE(aggregate_cluster_response1->waitForEndStream()); + EXPECT_EQ("200", aggregate_cluster_response1->headers().getStatusValue()); + // handle the response to the second request to the aggregate cluster + ASSERT_TRUE(aggregate_cluster_response2->waitForEndStream()); + EXPECT_EQ("503", aggregate_cluster_response2->headers().getStatusValue()); + + // the aggregate cluster circuit breaker is closed + test_server_->waitForGaugeEq("cluster.aggregate_cluster.circuit_breakers.default.rq_retry_open", + 0); + test_server_->waitForGaugeEq( + "cluster.aggregate_cluster.circuit_breakers.default.remaining_retries", 1); + test_server_->waitForCounterEq("cluster.aggregate_cluster.upstream_rq_retry_overflow", 1); + // the cluster1 circuit breaker is closed + test_server_->waitForGaugeEq("cluster.cluster_1.circuit_breakers.default.rq_retry_open", 0); + test_server_->waitForGaugeEq("cluster.cluster_1.circuit_breakers.default.remaining_retries", 1); + test_server_->waitForCounterEq("cluster.cluster_1.upstream_rq_retry_overflow", 1); + + cleanupUpstreamAndDownstream(); +} + } // namespace } // namespace Envoy