From feb8a5359054841ff322befbb11adb68ca410a33 Mon Sep 17 00:00:00 2001 From: Baz Murphy <61154071+bazmurphy@users.noreply.github.com> Date: Wed, 16 Apr 2025 00:06:07 -0700 Subject: [PATCH 1/4] aggregate cluster circuit breaker integration tests Co-authored-by: Saadia El fekak <74792703+SaadiaELF@users.noreply.github.com> Signed-off-by: Baz Murphy <61154071+bazmurphy@users.noreply.github.com> Signed-off-by: Saadia El fekak <74792703+SaadiaELF@users.noreply.github.com> --- .../aggregate/cluster_integration_test.cc | 619 ++++++++++++++++++ 1 file changed, 619 insertions(+) diff --git a/test/extensions/clusters/aggregate/cluster_integration_test.cc b/test/extensions/clusters/aggregate/cluster_integration_test.cc index 1a76054f9aee4..8df348eb368e0 100644 --- a/test/extensions/clusters/aggregate/cluster_integration_test.cc +++ b/test/extensions/clusters/aggregate/cluster_integration_test.cc @@ -1,8 +1,11 @@ #include "envoy/config/cluster/v3/cluster.pb.h" +#include "envoy/extensions/clusters/aggregate/v3/cluster.pb.h" +#include "envoy/extensions/upstreams/http/v3/http_protocol_options.pb.h" #include "envoy/grpc/status.h" #include "envoy/stats/scope.h" #include "source/common/config/protobuf_link_hacks.h" +#include "source/common/network/socket_option_factory.h" #include "source/common/protobuf/protobuf.h" #include "source/common/protobuf/utility.h" @@ -28,6 +31,13 @@ const char SecondClusterName[] = "cluster_2"; // Index in fake_upstreams_ const int FirstUpstreamIndex = 2; const int SecondUpstreamIndex = 3; +struct CircuitBreakerLimits { + uint32_t max_connections = 1024; + uint32_t max_requests = 1024; + uint32_t max_pending_requests = 1024; + uint32_t max_retries = 3; + uint32_t max_connection_pools = std::numeric_limits::max(); +}; const std::string& config() { CONSTRUCT_ON_FIRST_USE(std::string, fmt::format(R"EOF( @@ -178,6 +188,50 @@ class AggregateIntegrationTest xds_stream_->startGrpcStream(); } + void setCircuitBreakerLimits(envoy::config::cluster::v3::Cluster& cluster, + const CircuitBreakerLimits& limits) { + auto* cluster_circuit_breakers = cluster.mutable_circuit_breakers(); + + auto* cluster_circuit_breakers_threshold_default = cluster_circuit_breakers->add_thresholds(); + cluster_circuit_breakers_threshold_default->set_priority( + envoy::config::core::v3::RoutingPriority::DEFAULT); + + cluster_circuit_breakers_threshold_default->mutable_max_connections()->set_value( + limits.max_connections); + cluster_circuit_breakers_threshold_default->mutable_max_pending_requests()->set_value( + limits.max_pending_requests); + cluster_circuit_breakers_threshold_default->mutable_max_requests()->set_value( + limits.max_requests); + cluster_circuit_breakers_threshold_default->mutable_max_retries()->set_value( + limits.max_retries); + cluster_circuit_breakers_threshold_default->mutable_max_connection_pools()->set_value( + limits.max_connection_pools); + cluster_circuit_breakers_threshold_default->set_track_remaining(true); + } + + void setMaxConcurrentStreams(envoy::config::cluster::v3::Cluster& cluster, + uint32_t max_concurrent_streams) { + envoy::extensions::upstreams::http::v3::HttpProtocolOptions http_protocol_options; + http_protocol_options.mutable_explicit_http_config() + ->mutable_http2_protocol_options() + ->mutable_max_concurrent_streams() + ->set_value(max_concurrent_streams); + (*cluster.mutable_typed_extension_protocol_options()) + ["envoy.extensions.upstreams.http.v3.HttpProtocolOptions"] + .PackFrom(http_protocol_options); + } + + void reduceAggregateClustersListToOneCluster( + envoy::config::cluster::v3::Cluster& aggregate_cluster) { + auto* aggregate_cluster_type = aggregate_cluster.mutable_cluster_type(); + auto* aggregate_cluster_typed_config = aggregate_cluster_type->mutable_typed_config(); + envoy::extensions::clusters::aggregate::v3::ClusterConfig new_aggregate_cluster_typed_config; + aggregate_cluster_typed_config->UnpackTo(&new_aggregate_cluster_typed_config); + new_aggregate_cluster_typed_config.clear_clusters(); + new_aggregate_cluster_typed_config.add_clusters("cluster_1"); + aggregate_cluster_typed_config->PackFrom(new_aggregate_cluster_typed_config); + } + const bool deferred_cluster_creation_; envoy::config::cluster::v3::Cluster cluster1_; envoy::config::cluster::v3::Cluster cluster2_; @@ -303,5 +357,570 @@ TEST_P(AggregateIntegrationTest, PreviousPrioritiesRetryPredicate) { cleanupUpstreamAndDownstream(); } +// Tests the max_connections circuit breaker behaviour on the aggregate cluster and its underlying cluster1. +// On creating new connections up to and over the specified limit, the aggregate cluster circuit breaker state is unaffected, +// whereas the underlying cluster1 circuit breaker closes, prevents new upstream connections, and overflows. +TEST_P(AggregateIntegrationTest, CircuitBreakerTestMaxConnections) { + setDownstreamProtocol(Http::CodecType::HTTP2); + + config_helper_.addConfigModifier([this](envoy::config::bootstrap::v3::Bootstrap& bootstrap) { + auto* static_resources = bootstrap.mutable_static_resources(); + auto* aggregate_cluster = static_resources->mutable_clusters(1); + + reduceAggregateClustersListToOneCluster(*aggregate_cluster); + setCircuitBreakerLimits(*aggregate_cluster, CircuitBreakerLimits{.max_connections = 1}); + setMaxConcurrentStreams(*aggregate_cluster, 1U); + }); + + initialize(); + + setCircuitBreakerLimits(cluster1_, CircuitBreakerLimits{.max_connections = 1}); + setMaxConcurrentStreams(cluster1_, 1U); + + EXPECT_TRUE(compareDiscoveryRequest(Config::TypeUrl::get().Cluster, "55", {}, {}, {})); + sendDiscoveryResponse(Config::TypeUrl::get().Cluster, + {cluster1_}, {cluster1_}, {}, "56"); + test_server_->waitForGaugeEq("cluster_manager.active_clusters", 3); + + // initial circuit breaker states: + // the aggregate cluster circuit breaker is closed + test_server_->waitForGaugeEq("cluster.aggregate_cluster.circuit_breakers.default.cx_open", 0); + test_server_->waitForGaugeEq("cluster.aggregate_cluster.circuit_breakers.default.remaining_cx", + 1); + test_server_->waitForCounterEq("cluster.aggregate_cluster.upstream_cx_overflow", 0); + // the cluster1 circuit breaker is closed + test_server_->waitForGaugeEq("cluster.cluster_1.circuit_breakers.default.cx_open", 0); + test_server_->waitForGaugeEq("cluster.cluster_1.circuit_breakers.default.remaining_cx", 1); + test_server_->waitForCounterEq("cluster.cluster_1.upstream_cx_overflow", 0); + + codec_client_ = makeHttpConnection(lookupPort("http")); + + // send the 1st request to the aggregate cluster + auto aggregate_cluster_response1 = codec_client_->makeHeaderOnlyRequest( + Http::TestRequestHeaderMapImpl{{":method", "GET"}, + {":path", "/aggregatecluster"}, + {":scheme", "http"}, + {":authority", "host"}}); + + // wait for the 1st request to arrive at cluster1 + waitForNextUpstreamRequest(FirstUpstreamIndex); + + // after the 1st request arrives at cluster1 [there is now a single active upstream connection] + // the aggregate cluster circuit breaker remains closed + test_server_->waitForGaugeEq("cluster.aggregate_cluster.circuit_breakers.default.cx_open", 0); + test_server_->waitForGaugeEq("cluster.aggregate_cluster.circuit_breakers.default.remaining_cx", + 1); + test_server_->waitForCounterEq("cluster.aggregate_cluster.upstream_cx_overflow", 0); + // the cluster1 circuit breaker opens + test_server_->waitForGaugeEq("cluster.cluster_1.circuit_breakers.default.cx_open", 1); + test_server_->waitForGaugeEq("cluster.cluster_1.circuit_breakers.default.remaining_cx", 0); + test_server_->waitForCounterEq("cluster.cluster_1.upstream_cx_overflow", 0); + + // send a 2nd request to the aggregate cluster + auto aggregate_cluster_response2 = codec_client_->makeHeaderOnlyRequest( + Http::TestRequestHeaderMapImpl{{":method", "GET"}, + {":path", "/aggregatecluster"}, + {":scheme", "http"}, + {":authority", "host"}}); + + // the 2nd request is rejected because the cluster1 circuit breaker is open + // the aggregate cluster circuit breaker remains closed + test_server_->waitForGaugeEq("cluster.aggregate_cluster.circuit_breakers.default.cx_open", 0); + test_server_->waitForGaugeEq("cluster.aggregate_cluster.circuit_breakers.default.remaining_cx", + 1); + test_server_->waitForCounterEq("cluster.aggregate_cluster.upstream_cx_overflow", 0); + // the cluster1 circuit breaker remains open and overflows + test_server_->waitForGaugeEq("cluster.cluster_1.circuit_breakers.default.cx_open", 1); + test_server_->waitForGaugeEq("cluster.cluster_1.circuit_breakers.default.remaining_cx", 0); + test_server_->waitForCounterEq("cluster.cluster_1.upstream_cx_overflow", 1); + + // the requests to the aggregate cluster route affect the cluster1 circuit breaker state + // send a 3rd request directly to cluster1 + auto cluster1_response1 = codec_client_->makeHeaderOnlyRequest(Http::TestRequestHeaderMapImpl{ + {":method", "GET"}, {":path", "/cluster1"}, {":scheme", "http"}, {":authority", "host"}}); + + // the 3rd request is rejected because the cluster1 circuit breaker is already open + // the aggregate cluster circuit breaker remains closed + test_server_->waitForGaugeEq("cluster.aggregate_cluster.circuit_breakers.default.cx_open", 0); + test_server_->waitForGaugeEq("cluster.aggregate_cluster.circuit_breakers.default.remaining_cx", + 1); + test_server_->waitForCounterEq("cluster.aggregate_cluster.upstream_cx_overflow", 0); + // the cluster1 circuit breaker remains open and overflows again + test_server_->waitForGaugeEq("cluster.cluster_1.circuit_breakers.default.cx_open", 1); + test_server_->waitForGaugeEq("cluster.cluster_1.circuit_breakers.default.remaining_cx", 0); + test_server_->waitForCounterEq("cluster.cluster_1.upstream_cx_overflow", 2); + + // respond to the 1st request to the aggregate cluster + upstream_request_->encodeHeaders(default_response_headers_, true); + ASSERT_TRUE(upstream_request_->waitForEndStream(*dispatcher_)); + + // the 1st request completes successfully + ASSERT_TRUE(aggregate_cluster_response1->waitForEndStream()); + EXPECT_EQ("200", aggregate_cluster_response1->headers().getStatusValue()); + + ASSERT_TRUE(fake_upstream_connection_->close()); + ASSERT_TRUE(fake_upstream_connection_->waitForDisconnect()); + + // after completing the first request and closing the connection + // the aggregate cluster circuit breaker remains closed + test_server_->waitForGaugeEq("cluster.aggregate_cluster.circuit_breakers.default.cx_open", 0); + test_server_->waitForGaugeEq("cluster.aggregate_cluster.circuit_breakers.default.remaining_cx", + 1); + test_server_->waitForCounterEq("cluster.aggregate_cluster.upstream_cx_overflow", 0); + // the cluster1 circuit breaker closes + test_server_->waitForGaugeEq("cluster.cluster_1.circuit_breakers.default.cx_open", 0); + test_server_->waitForGaugeEq("cluster.cluster_1.circuit_breakers.default.remaining_cx", 1); + test_server_->waitForCounterGe("cluster.cluster_1.upstream_cx_overflow", 2); + // the overflow may be greater than 2 because after completing the 1st request + // the queued pending requests will attempt to reuse the connection + + cleanupUpstreamAndDownstream(); +} + +// Tests the max_requests circuit breaker behaviour on the aggregate cluster and its underlying cluster1. +// On sending new requests up to and over the specified limit, the aggregate cluster circuit breaker state is unaffected, +// whereas the underlying cluster1 circuit breaker closes, queues subsequent upstream requests, and overflows. +TEST_P(AggregateIntegrationTest, CircuitBreakerTestMaxRequests) { + setDownstreamProtocol(Http::CodecType::HTTP2); + + config_helper_.addConfigModifier([this](envoy::config::bootstrap::v3::Bootstrap& bootstrap) { + auto* static_resources = bootstrap.mutable_static_resources(); + auto* aggregate_cluster = static_resources->mutable_clusters(1); + + reduceAggregateClustersListToOneCluster(*aggregate_cluster); + setCircuitBreakerLimits(*aggregate_cluster, CircuitBreakerLimits{.max_requests = 1}); + }); + + initialize(); + + setCircuitBreakerLimits(cluster1_, CircuitBreakerLimits{.max_requests = 1}); + + EXPECT_TRUE(compareDiscoveryRequest(Config::TypeUrl::get().Cluster, "55", {}, {}, {})); + sendDiscoveryResponse(Config::TypeUrl::get().Cluster, + {cluster1_}, {cluster1_}, {}, "56"); + test_server_->waitForGaugeEq("cluster_manager.active_clusters", 3); + + // initial circuit breaker states: + // the aggregate cluster circuit breaker is closed + test_server_->waitForGaugeEq("cluster.aggregate_cluster.circuit_breakers.default.rq_open", 0); + test_server_->waitForGaugeEq("cluster.aggregate_cluster.circuit_breakers.default.remaining_rq", + 1); + test_server_->waitForCounterEq("cluster.aggregate_cluster.upstream_rq_pending_overflow", 0); + // the cluster1 circuit breaker is closed + test_server_->waitForGaugeEq("cluster.cluster_1.circuit_breakers.default.rq_open", 0); + test_server_->waitForGaugeEq("cluster.cluster_1.circuit_breakers.default.remaining_rq", 1); + test_server_->waitForCounterEq("cluster.cluster_1.upstream_rq_pending_overflow", 0); + + codec_client_ = makeHttpConnection(lookupPort("http")); + + // send the 1st request to the aggregate cluster + auto aggregate_cluster_response1 = codec_client_->makeHeaderOnlyRequest( + Http::TestRequestHeaderMapImpl{{":method", "GET"}, + {":path", "/aggregatecluster"}, + {":scheme", "http"}, + {":authority", "host"}}); + + // wait for the 1st request to arrive at cluster1 + waitForNextUpstreamRequest(FirstUpstreamIndex); + + // after the 1st request arrives at cluster1 [there is now a single active upstream request] + // the aggregate cluster circuit breaker remains closed + test_server_->waitForGaugeEq("cluster.aggregate_cluster.circuit_breakers.default.rq_open", 0); + test_server_->waitForGaugeEq("cluster.aggregate_cluster.circuit_breakers.default.remaining_rq", + 1); + test_server_->waitForCounterEq("cluster.aggregate_cluster.upstream_rq_pending_overflow", 0); + // the cluster1 circuit breaker opens + test_server_->waitForGaugeEq("cluster.cluster_1.circuit_breakers.default.rq_open", 1); + test_server_->waitForGaugeEq("cluster.cluster_1.circuit_breakers.default.remaining_rq", 0); + test_server_->waitForCounterEq("cluster.cluster_1.upstream_rq_pending_overflow", 0); + + // send a 2nd request to the aggregate cluster + auto aggregate_cluster_response2 = codec_client_->makeHeaderOnlyRequest( + Http::TestRequestHeaderMapImpl{{":method", "GET"}, + {":path", "/aggregatecluster"}, + {":scheme", "http"}, + {":authority", "host"}}); + + ASSERT_TRUE(aggregate_cluster_response2->waitForEndStream()); + + // the 2nd request to the aggregate cluster is rejected + EXPECT_EQ("503", aggregate_cluster_response2->headers().getStatusValue()); + + // the aggregate cluster circuit breaker remains closed + test_server_->waitForGaugeEq("cluster.aggregate_cluster.circuit_breakers.default.rq_open", 0); + test_server_->waitForGaugeEq("cluster.aggregate_cluster.circuit_breakers.default.remaining_rq", + 1); + test_server_->waitForCounterEq("cluster.aggregate_cluster.upstream_rq_pending_overflow", 0); + // the cluster1 circuit breaker remains open and overflows + test_server_->waitForGaugeEq("cluster.cluster_1.circuit_breakers.default.rq_open", 1); + test_server_->waitForGaugeEq("cluster.cluster_1.circuit_breakers.default.remaining_rq", 0); + test_server_->waitForCounterEq("cluster.cluster_1.upstream_rq_pending_overflow", 1); + + // the requests to the aggregate cluster route affect the cluster1 circuit breaker state + // send a 3rd request directly to cluster1 + auto cluster1_response1 = codec_client_->makeHeaderOnlyRequest(Http::TestRequestHeaderMapImpl{ + {":method", "GET"}, {":path", "/cluster1"}, {":scheme", "http"}, {":authority", "host"}}); + + ASSERT_TRUE(cluster1_response1->waitForEndStream()); + + // the 3rd request to cluster1 is rejected + EXPECT_EQ("503", cluster1_response1->headers().getStatusValue()); + + // the aggregate cluster circuit breaker remains closed + test_server_->waitForGaugeEq("cluster.aggregate_cluster.circuit_breakers.default.rq_open", 0); + test_server_->waitForGaugeEq("cluster.aggregate_cluster.circuit_breakers.default.remaining_rq", + 1); + test_server_->waitForCounterEq("cluster.aggregate_cluster.upstream_rq_pending_overflow", 0); + // the cluster1 circuit breaker remains open and overflows again + test_server_->waitForGaugeEq("cluster.cluster_1.circuit_breakers.default.rq_open", 1); + test_server_->waitForGaugeEq("cluster.cluster_1.circuit_breakers.default.remaining_rq", 0); + test_server_->waitForCounterEq("cluster.cluster_1.upstream_rq_pending_overflow", 2); + + // respond to the 1st request to the aggregate cluster + upstream_request_->encodeHeaders(default_response_headers_, true); + ASSERT_TRUE(upstream_request_->waitForEndStream(*dispatcher_)); + + // the 1st request completes successfully + ASSERT_TRUE(aggregate_cluster_response1->waitForEndStream()); + EXPECT_EQ("200", aggregate_cluster_response1->headers().getStatusValue()); + + // the aggregate cluster circuit breaker remains closed + test_server_->waitForGaugeEq("cluster.aggregate_cluster.circuit_breakers.default.rq_open", 0); + test_server_->waitForGaugeEq("cluster.aggregate_cluster.circuit_breakers.default.remaining_rq", + 1); + test_server_->waitForCounterEq("cluster.aggregate_cluster.upstream_rq_pending_overflow", 0); + // the cluster1 circuit breaker closes + test_server_->waitForGaugeEq("cluster.cluster_1.circuit_breakers.default.rq_open", 0); + test_server_->waitForGaugeEq("cluster.cluster_1.circuit_breakers.default.remaining_rq", 1); + test_server_->waitForCounterEq("cluster.cluster_1.upstream_rq_pending_overflow", 2); + + cleanupUpstreamAndDownstream(); +} + +// Tests the max_pending_requests circuit breaker behaviour on the aggregate cluster and its underlying cluster1. +// On queuing new pending requests up to and over the specified limit, the aggregate cluster circuit breaker state is unaffected, +// whereas the underlying cluster1 circuit breaker closes, rejects subsequent pending requests, and overflows. +TEST_P(AggregateIntegrationTest, CircuitBreakerTestMaxPendingRequests) { + setDownstreamProtocol(Http::CodecType::HTTP2); + + config_helper_.addConfigModifier([this](envoy::config::bootstrap::v3::Bootstrap& bootstrap) { + auto* static_resources = bootstrap.mutable_static_resources(); + auto* aggregate_cluster = static_resources->mutable_clusters(1); + + reduceAggregateClustersListToOneCluster(*aggregate_cluster); + setCircuitBreakerLimits(*aggregate_cluster, + CircuitBreakerLimits{.max_connections = 1, .max_pending_requests = 1}); + setMaxConcurrentStreams(*aggregate_cluster, 1U); + }); + + initialize(); + + setCircuitBreakerLimits(cluster1_, + CircuitBreakerLimits{.max_connections = 1, .max_pending_requests = 1}); + setMaxConcurrentStreams(cluster1_, 1U); + + EXPECT_TRUE(compareDiscoveryRequest(Config::TypeUrl::get().Cluster, "55", {}, {}, {})); + sendDiscoveryResponse(Config::TypeUrl::get().Cluster, + {cluster1_}, {cluster1_}, {}, "56"); + test_server_->waitForGaugeEq("cluster_manager.active_clusters", 3); + + // initial circuit breaker states: + // the aggregate cluster circuit breaker is closed + test_server_->waitForGaugeEq("cluster.aggregate_cluster.circuit_breakers.default.rq_pending_open", + 0); + test_server_->waitForGaugeEq( + "cluster.aggregate_cluster.circuit_breakers.default.remaining_pending", 1); + test_server_->waitForCounterEq("cluster.aggregate_cluster.upstream_rq_pending_overflow", 0); + // the cluster1 circuit breaker is closed + test_server_->waitForGaugeEq("cluster.cluster_1.circuit_breakers.default.rq_pending_open", 0); + test_server_->waitForGaugeEq("cluster.cluster_1.circuit_breakers.default.remaining_pending", 1); + test_server_->waitForCounterEq("cluster.cluster_1.upstream_rq_pending_overflow", 0); + + codec_client_ = makeHttpConnection(lookupPort("http")); + + // send the 1st request to the aggregate cluster + auto aggregate_cluster_response1 = codec_client_->makeHeaderOnlyRequest( + Http::TestRequestHeaderMapImpl{{":method", "GET"}, + {":path", "/aggregatecluster"}, + {":scheme", "http"}, + {":authority", "host"}}); + + // wait for the 1st request to arrive at cluster1 + waitForNextUpstreamRequest(FirstUpstreamIndex); + + // send the 2nd request to the aggregate cluster [this is the 1st pending request] + auto aggregate_cluster_response2 = codec_client_->makeHeaderOnlyRequest( + Http::TestRequestHeaderMapImpl{{":method", "GET"}, + {":path", "/aggregatecluster"}, + {":scheme", "http"}, + {":authority", "host"}}); + + // the aggregate cluster circuit breaker remains closed + test_server_->waitForGaugeEq("cluster.aggregate_cluster.circuit_breakers.default.rq_pending_open", + 0); + test_server_->waitForGaugeEq( + "cluster.aggregate_cluster.circuit_breakers.default.remaining_pending", 1); + test_server_->waitForCounterEq("cluster.aggregate_cluster.upstream_rq_pending_overflow", 0); + // the cluster1 circuit breaker opens + test_server_->waitForGaugeEq("cluster.cluster_1.circuit_breakers.default.rq_pending_open", 1); + test_server_->waitForGaugeEq("cluster.cluster_1.circuit_breakers.default.remaining_pending", 0); + test_server_->waitForCounterEq("cluster.cluster_1.upstream_rq_pending_overflow", 0); + + // send the 3rd request to the aggregate cluster [this is the 2nd pending request] + auto aggregate_cluster_response3 = codec_client_->makeHeaderOnlyRequest( + Http::TestRequestHeaderMapImpl{{":method", "GET"}, + {":path", "/aggregatecluster"}, + {":scheme", "http"}, + {":authority", "host"}}); + + // the 3rd request fails immediately because the cluster1 circuit breaker is open + ASSERT_TRUE(aggregate_cluster_response3->waitForEndStream()); + EXPECT_EQ("503", aggregate_cluster_response3->headers().getStatusValue()); + + // the aggregate cluster circuit breaker remains closed + test_server_->waitForGaugeEq("cluster.aggregate_cluster.circuit_breakers.default.rq_pending_open", + 0); + test_server_->waitForGaugeEq( + "cluster.aggregate_cluster.circuit_breakers.default.remaining_pending", 1); + test_server_->waitForCounterEq("cluster.aggregate_cluster.upstream_rq_pending_overflow", 0); + // the cluster1 circuit breaker remains open and overflows + test_server_->waitForGaugeEq("cluster.cluster_1.circuit_breakers.default.rq_pending_open", 1); + test_server_->waitForGaugeEq("cluster.cluster_1.circuit_breakers.default.remaining_pending", 0); + test_server_->waitForCounterEq("cluster.cluster_1.upstream_rq_pending_overflow", 1); + + // send the 4th request directly to cluster1 + auto cluster1_response1 = codec_client_->makeHeaderOnlyRequest(Http::TestRequestHeaderMapImpl{ + {":method", "GET"}, {":path", "/cluster1"}, {":scheme", "http"}, {":authority", "host"}}); + + // the 4th request fails immediately because the cluster1 circuit breaker is open + ASSERT_TRUE(cluster1_response1->waitForEndStream()); + EXPECT_EQ("503", cluster1_response1->headers().getStatusValue()); + + // the aggregate cluster circuit breaker remains closed + test_server_->waitForGaugeEq("cluster.aggregate_cluster.circuit_breakers.default.rq_pending_open", + 0); + test_server_->waitForGaugeEq( + "cluster.aggregate_cluster.circuit_breakers.default.remaining_pending", 1); + test_server_->waitForCounterEq("cluster.aggregate_cluster.upstream_rq_pending_overflow", 0); + // the cluster1 circuit breaker remains open and overflows again + test_server_->waitForGaugeEq("cluster.cluster_1.circuit_breakers.default.rq_pending_open", 1); + test_server_->waitForGaugeEq("cluster.cluster_1.circuit_breakers.default.remaining_pending", 0); + test_server_->waitForCounterEq("cluster.cluster_1.upstream_rq_pending_overflow", 2); + + // respond to the 1st request to the aggregate cluster + upstream_request_->encodeHeaders(default_response_headers_, true); + ASSERT_TRUE(aggregate_cluster_response1->waitForEndStream()); + EXPECT_EQ("200", aggregate_cluster_response1->headers().getStatusValue()); + + // wait for the 2nd request to arrive at cluster1 + waitForNextUpstreamRequest(FirstUpstreamIndex); + + // respond to the 2nd request to the aggregate cluster + upstream_request_->encodeHeaders(default_response_headers_, true); + ASSERT_TRUE(aggregate_cluster_response2->waitForEndStream()); + EXPECT_EQ("200", aggregate_cluster_response2->headers().getStatusValue()); + + // the aggregate cluster circuit breaker remains closed + test_server_->waitForGaugeEq("cluster.aggregate_cluster.circuit_breakers.default.rq_pending_open", + 0); + test_server_->waitForGaugeEq( + "cluster.aggregate_cluster.circuit_breakers.default.remaining_pending", 1); + test_server_->waitForCounterEq("cluster.aggregate_cluster.upstream_rq_pending_overflow", 0); + // the cluster1 circuit breaker closes + test_server_->waitForGaugeEq("cluster.cluster_1.circuit_breakers.default.rq_pending_open", 0); + test_server_->waitForGaugeEq("cluster.cluster_1.circuit_breakers.default.remaining_pending", 1); + test_server_->waitForCounterEq("cluster.cluster_1.upstream_rq_pending_overflow", 2); + + cleanupUpstreamAndDownstream(); +} + +// Tests the max_retries circuit breaker behaviour on the aggregate cluster and its underlying cluster1. +// On attempting retries up to and over the specified limit, the aggregate cluster circuit breaker opens +// (when the aggregate cluster route is used), preventing further retries, and overflows. +// The underlying cluster1 circuit breaker operates independently, opening only when direct requests +// to cluster1 exceed its own specified limit. Therefore the circuit breakers for the aggregate cluster +// and its underlying cluster(s) operate independently. +TEST_P(AggregateIntegrationTest, CircuitBreakerMaxRetriesTest) { + setDownstreamProtocol(Http::CodecType::HTTP2); + + config_helper_.addConfigModifier([this](envoy::config::bootstrap::v3::Bootstrap& bootstrap) { + auto* static_resources = bootstrap.mutable_static_resources(); + auto* listener = static_resources->mutable_listeners(0); + auto* filter_chain = listener->mutable_filter_chains(0); + auto* filter = filter_chain->mutable_filters(0); + envoy::extensions::filters::network::http_connection_manager::v3::HttpConnectionManager + http_connection_manager; + filter->mutable_typed_config()->UnpackTo(&http_connection_manager); + auto* virtual_host = http_connection_manager.mutable_route_config()->mutable_virtual_hosts(0); + auto* aggregate_cluster_route = virtual_host->mutable_routes(2); + auto* cluster1_route = virtual_host->mutable_routes(0); + aggregate_cluster_route->mutable_route()->mutable_retry_policy()->clear_retry_priority(); + // adjust the retry policy on both the /aggregatecluster and /cluster1 routes + aggregate_cluster_route->mutable_route()->mutable_retry_policy()->mutable_retry_on()->assign( + "5xx"); + cluster1_route->mutable_route()->mutable_retry_policy()->mutable_retry_on()->assign( + "5xx"); + filter->mutable_typed_config()->PackFrom(http_connection_manager); + + auto* aggregate_cluster = static_resources->mutable_clusters(1); + reduceAggregateClustersListToOneCluster(*aggregate_cluster); + setCircuitBreakerLimits(*aggregate_cluster, CircuitBreakerLimits{.max_retries = 1}); + }); + + initialize(); + + setCircuitBreakerLimits(cluster1_, CircuitBreakerLimits{.max_retries = 1}); + + EXPECT_TRUE(compareDiscoveryRequest(Config::TypeUrl::get().Cluster, "55", {}, {}, {})); + sendDiscoveryResponse(Config::TypeUrl::get().Cluster, + {cluster1_}, {cluster1_}, {}, "56"); + test_server_->waitForGaugeEq("cluster_manager.active_clusters", 3); + + // initial circuit breaker states: + // the aggregate cluster circuit breaker is closed + test_server_->waitForGaugeEq("cluster.aggregate_cluster.circuit_breakers.default.rq_retry_open", + 0); + test_server_->waitForGaugeEq( + "cluster.aggregate_cluster.circuit_breakers.default.remaining_retries", 1); + test_server_->waitForCounterEq("cluster.aggregate_cluster.upstream_rq_retry_overflow", 0); + // the cluster1 circuit breaker is closed + test_server_->waitForGaugeEq("cluster.cluster_1.circuit_breakers.default.rq_retry_open", 0); + test_server_->waitForGaugeEq("cluster.cluster_1.circuit_breakers.default.remaining_retries", 1); + test_server_->waitForCounterEq("cluster.cluster_1.upstream_rq_retry_overflow", 0); + + codec_client_ = makeHttpConnection(lookupPort("http")); + + // send the 1st request to the aggregate cluster + auto aggregate_cluster_response1 = codec_client_->makeHeaderOnlyRequest( + Http::TestRequestHeaderMapImpl{{":method", "GET"}, + {":path", "/aggregatecluster"}, + {":scheme", "http"}, + {":authority", "host"}}); + + // wait for the 1st request to arrive at cluster1 + waitForNextUpstreamRequest(FirstUpstreamIndex); + + // respond to the 1st request with a 503 to trigger a retry + upstream_request_->encodeHeaders(Http::TestResponseHeaderMapImpl{{":status", "503"}}, true); + + // wait for the 1st request retry to arrive at cluster1 + waitForNextUpstreamRequest(FirstUpstreamIndex); + + auto& first_request_retry = *upstream_request_; + + // the aggregate_cluster the circuit breaker opens + test_server_->waitForGaugeEq("cluster.aggregate_cluster.circuit_breakers.default.rq_retry_open", + 1); + test_server_->waitForGaugeEq( + "cluster.aggregate_cluster.circuit_breakers.default.remaining_retries", 0); + test_server_->waitForCounterEq("cluster.aggregate_cluster.upstream_rq_retry_overflow", 0); + // the cluster1 circuit breaker remains closed + test_server_->waitForGaugeEq("cluster.cluster_1.circuit_breakers.default.rq_retry_open", 0); + test_server_->waitForGaugeEq("cluster.cluster_1.circuit_breakers.default.remaining_retries", 1); + test_server_->waitForCounterEq("cluster.cluster_1.upstream_rq_retry_overflow", 0); + + // send the 2nd request to the aggregate cluster + auto aggregate_cluster_response2 = codec_client_->makeHeaderOnlyRequest( + Http::TestRequestHeaderMapImpl{{":method", "GET"}, + {":path", "/aggregatecluster"}, + {":scheme", "http"}, + {":authority", "host"}}); + + // wait for the 2nd request to arrive at cluster1 + waitForNextUpstreamRequest(FirstUpstreamIndex); + + // respond to the 2nd request with a 503 to trigger a retry + upstream_request_->encodeHeaders(Http::TestResponseHeaderMapImpl{{":status", "503"}}, true); + + // the aggregate_cluster the circuit breaker remains open and overflows + test_server_->waitForGaugeEq("cluster.aggregate_cluster.circuit_breakers.default.rq_retry_open", + 1); + test_server_->waitForGaugeEq( + "cluster.aggregate_cluster.circuit_breakers.default.remaining_retries", 0); + test_server_->waitForCounterEq("cluster.aggregate_cluster.upstream_rq_retry_overflow", 1); + // the cluster1 circuit breaker remains closed + test_server_->waitForGaugeEq("cluster.cluster_1.circuit_breakers.default.rq_retry_open", 0); + test_server_->waitForGaugeEq("cluster.cluster_1.circuit_breakers.default.remaining_retries", 1); + test_server_->waitForCounterEq("cluster.cluster_1.upstream_rq_retry_overflow", 0); + + // send the 3rd request directly to cluster1 + auto cluster1_response1 = codec_client_->makeHeaderOnlyRequest(Http::TestRequestHeaderMapImpl{ + {":method", "GET"}, {":path", "/cluster1"}, {":scheme", "http"}, {":authority", "host"}}); + + // wait for the 3rd request to arrive at cluster1 + waitForNextUpstreamRequest(FirstUpstreamIndex); + + // respond to the 3rd request with a 503 to trigger a retry + upstream_request_->encodeHeaders(Http::TestResponseHeaderMapImpl{{":status", "503"}}, true); + + // wait for the 3rd request retry to arrive at cluster1 + waitForNextUpstreamRequest(FirstUpstreamIndex); + + auto& third_request_retry = *upstream_request_; + + // the aggregate_cluster the circuit breaker remains open + test_server_->waitForGaugeEq("cluster.aggregate_cluster.circuit_breakers.default.rq_retry_open", + 1); + test_server_->waitForGaugeEq( + "cluster.aggregate_cluster.circuit_breakers.default.remaining_retries", 0); + test_server_->waitForCounterEq("cluster.aggregate_cluster.upstream_rq_retry_overflow", 1); + // the cluster1 circuit breaker opens + test_server_->waitForGaugeEq("cluster.cluster_1.circuit_breakers.default.rq_retry_open", 1); + test_server_->waitForGaugeEq("cluster.cluster_1.circuit_breakers.default.remaining_retries", 0); + test_server_->waitForCounterEq("cluster.cluster_1.upstream_rq_retry_overflow", 0); + + // send the 4th request directly to cluster1 + auto cluster1_response2 = codec_client_->makeHeaderOnlyRequest(Http::TestRequestHeaderMapImpl{ + {":method", "GET"}, {":path", "/cluster1"}, {":scheme", "http"}, {":authority", "host"}}); + + // wait for the 4th request to arrive at cluster1 + waitForNextUpstreamRequest(FirstUpstreamIndex); + + // respond to the 4th request with a 503 to trigger a retry + upstream_request_->encodeHeaders(Http::TestResponseHeaderMapImpl{{":status", "503"}}, true); + + // the aggregate_cluster the circuit breaker remains open + test_server_->waitForGaugeEq("cluster.aggregate_cluster.circuit_breakers.default.rq_retry_open", + 1); + test_server_->waitForGaugeEq( + "cluster.aggregate_cluster.circuit_breakers.default.remaining_retries", 0); + test_server_->waitForCounterEq("cluster.aggregate_cluster.upstream_rq_retry_overflow", 1); + // the cluster1 circuit breaker remains open and overflows + test_server_->waitForGaugeEq("cluster.cluster_1.circuit_breakers.default.rq_retry_open", 1); + test_server_->waitForGaugeEq("cluster.cluster_1.circuit_breakers.default.remaining_retries", 0); + test_server_->waitForCounterEq("cluster.cluster_1.upstream_rq_retry_overflow", 1); + + // respond to the 3rd request to cluster1 + third_request_retry.encodeHeaders(default_response_headers_, true); + ASSERT_TRUE(cluster1_response1->waitForEndStream()); + EXPECT_EQ("200", cluster1_response1->headers().getStatusValue()); + + // respond to the 4th request to cluster1 + ASSERT_TRUE(cluster1_response2->waitForEndStream()); + EXPECT_EQ("503", cluster1_response2->headers().getStatusValue()); + + // respond to the 1st request to the aggregate cluster + first_request_retry.encodeHeaders(default_response_headers_, true); + ASSERT_TRUE(aggregate_cluster_response1->waitForEndStream()); + EXPECT_EQ("200", aggregate_cluster_response1->headers().getStatusValue()); + + // respond to the 2nd request to the aggregate cluster + ASSERT_TRUE(aggregate_cluster_response2->waitForEndStream()); + EXPECT_EQ("503", aggregate_cluster_response2->headers().getStatusValue()); + + // the aggregate cluster circuit breaker is closed + test_server_->waitForGaugeEq("cluster.aggregate_cluster.circuit_breakers.default.rq_retry_open", + 0); + test_server_->waitForGaugeEq( + "cluster.aggregate_cluster.circuit_breakers.default.remaining_retries", 1); + test_server_->waitForCounterEq("cluster.aggregate_cluster.upstream_rq_retry_overflow", 1); + // the cluster1 circuit breaker is closed + test_server_->waitForGaugeEq("cluster.cluster_1.circuit_breakers.default.rq_retry_open", 0); + test_server_->waitForGaugeEq("cluster.cluster_1.circuit_breakers.default.remaining_retries", 1); + test_server_->waitForCounterEq("cluster.cluster_1.upstream_rq_retry_overflow", 1); + + cleanupUpstreamAndDownstream(); +} + } // namespace } // namespace Envoy From 9c02cb7b324f8bf574b7f066144741a0dffeded7 Mon Sep 17 00:00:00 2001 From: Baz Murphy <61154071+bazmurphy@users.noreply.github.com> Date: Wed, 16 Apr 2025 01:03:23 -0700 Subject: [PATCH 2/4] adjustments Co-authored-by: Saadia El fekak <74792703+SaadiaELF@users.noreply.github.com> Signed-off-by: Baz Murphy <61154071+bazmurphy@users.noreply.github.com> Signed-off-by: Saadia El fekak <74792703+SaadiaELF@users.noreply.github.com> --- .../aggregate/cluster_integration_test.cc | 150 +++++++++--------- 1 file changed, 72 insertions(+), 78 deletions(-) diff --git a/test/extensions/clusters/aggregate/cluster_integration_test.cc b/test/extensions/clusters/aggregate/cluster_integration_test.cc index 8df348eb368e0..990e39e3562c2 100644 --- a/test/extensions/clusters/aggregate/cluster_integration_test.cc +++ b/test/extensions/clusters/aggregate/cluster_integration_test.cc @@ -221,8 +221,8 @@ class AggregateIntegrationTest .PackFrom(http_protocol_options); } - void reduceAggregateClustersListToOneCluster( - envoy::config::cluster::v3::Cluster& aggregate_cluster) { + void + reduceAggregateClustersListToOneCluster(envoy::config::cluster::v3::Cluster& aggregate_cluster) { auto* aggregate_cluster_type = aggregate_cluster.mutable_cluster_type(); auto* aggregate_cluster_typed_config = aggregate_cluster_type->mutable_typed_config(); envoy::extensions::clusters::aggregate::v3::ClusterConfig new_aggregate_cluster_typed_config; @@ -357,9 +357,11 @@ TEST_P(AggregateIntegrationTest, PreviousPrioritiesRetryPredicate) { cleanupUpstreamAndDownstream(); } -// Tests the max_connections circuit breaker behaviour on the aggregate cluster and its underlying cluster1. -// On creating new connections up to and over the specified limit, the aggregate cluster circuit breaker state is unaffected, -// whereas the underlying cluster1 circuit breaker closes, prevents new upstream connections, and overflows. +// Tests the max_connections circuit breaker behaviour on the aggregate cluster and its underlying +// cluster1. When connections reach the configured limit, only the underlying cluster1's circuit +// breaker opens. The aggregate cluster's circuit breaker remains unaffected. When cluster1's +// circuit breaker opens, new connections are rejected, and the overflow counter increases. After +// closing the connection, cluster1's circuit breaker closes again. TEST_P(AggregateIntegrationTest, CircuitBreakerTestMaxConnections) { setDownstreamProtocol(Http::CodecType::HTTP2); @@ -395,17 +397,17 @@ TEST_P(AggregateIntegrationTest, CircuitBreakerTestMaxConnections) { codec_client_ = makeHttpConnection(lookupPort("http")); - // send the 1st request to the aggregate cluster + // send the first request to the aggregate cluster auto aggregate_cluster_response1 = codec_client_->makeHeaderOnlyRequest( Http::TestRequestHeaderMapImpl{{":method", "GET"}, {":path", "/aggregatecluster"}, {":scheme", "http"}, {":authority", "host"}}); - // wait for the 1st request to arrive at cluster1 + // wait for the first request to arrive at cluster1 waitForNextUpstreamRequest(FirstUpstreamIndex); - // after the 1st request arrives at cluster1 [there is now a single active upstream connection] + // after the first request arrives at cluster1 [there is now a single active upstream connection] // the aggregate cluster circuit breaker remains closed test_server_->waitForGaugeEq("cluster.aggregate_cluster.circuit_breakers.default.cx_open", 0); test_server_->waitForGaugeEq("cluster.aggregate_cluster.circuit_breakers.default.remaining_cx", @@ -416,14 +418,14 @@ TEST_P(AggregateIntegrationTest, CircuitBreakerTestMaxConnections) { test_server_->waitForGaugeEq("cluster.cluster_1.circuit_breakers.default.remaining_cx", 0); test_server_->waitForCounterEq("cluster.cluster_1.upstream_cx_overflow", 0); - // send a 2nd request to the aggregate cluster + // send a second request to the aggregate cluster auto aggregate_cluster_response2 = codec_client_->makeHeaderOnlyRequest( Http::TestRequestHeaderMapImpl{{":method", "GET"}, {":path", "/aggregatecluster"}, {":scheme", "http"}, {":authority", "host"}}); - // the 2nd request is rejected because the cluster1 circuit breaker is open + // the second request is rejected because the cluster1 circuit breaker is open // the aggregate cluster circuit breaker remains closed test_server_->waitForGaugeEq("cluster.aggregate_cluster.circuit_breakers.default.cx_open", 0); test_server_->waitForGaugeEq("cluster.aggregate_cluster.circuit_breakers.default.remaining_cx", @@ -439,7 +441,7 @@ TEST_P(AggregateIntegrationTest, CircuitBreakerTestMaxConnections) { auto cluster1_response1 = codec_client_->makeHeaderOnlyRequest(Http::TestRequestHeaderMapImpl{ {":method", "GET"}, {":path", "/cluster1"}, {":scheme", "http"}, {":authority", "host"}}); - // the 3rd request is rejected because the cluster1 circuit breaker is already open + // the third request is rejected because the cluster1 circuit breaker is already open // the aggregate cluster circuit breaker remains closed test_server_->waitForGaugeEq("cluster.aggregate_cluster.circuit_breakers.default.cx_open", 0); test_server_->waitForGaugeEq("cluster.aggregate_cluster.circuit_breakers.default.remaining_cx", @@ -450,11 +452,11 @@ TEST_P(AggregateIntegrationTest, CircuitBreakerTestMaxConnections) { test_server_->waitForGaugeEq("cluster.cluster_1.circuit_breakers.default.remaining_cx", 0); test_server_->waitForCounterEq("cluster.cluster_1.upstream_cx_overflow", 2); - // respond to the 1st request to the aggregate cluster + // respond to the first request to the aggregate cluster upstream_request_->encodeHeaders(default_response_headers_, true); ASSERT_TRUE(upstream_request_->waitForEndStream(*dispatcher_)); - // the 1st request completes successfully + // the first request completes successfully ASSERT_TRUE(aggregate_cluster_response1->waitForEndStream()); EXPECT_EQ("200", aggregate_cluster_response1->headers().getStatusValue()); @@ -471,15 +473,17 @@ TEST_P(AggregateIntegrationTest, CircuitBreakerTestMaxConnections) { test_server_->waitForGaugeEq("cluster.cluster_1.circuit_breakers.default.cx_open", 0); test_server_->waitForGaugeEq("cluster.cluster_1.circuit_breakers.default.remaining_cx", 1); test_server_->waitForCounterGe("cluster.cluster_1.upstream_cx_overflow", 2); - // the overflow may be greater than 2 because after completing the 1st request + // the overflow may be greater than 2 because after completing the first request // the queued pending requests will attempt to reuse the connection cleanupUpstreamAndDownstream(); } -// Tests the max_requests circuit breaker behaviour on the aggregate cluster and its underlying cluster1. -// On sending new requests up to and over the specified limit, the aggregate cluster circuit breaker state is unaffected, -// whereas the underlying cluster1 circuit breaker closes, queues subsequent upstream requests, and overflows. +// Tests the max_requests circuit breaker behaviour on the aggregate cluster and its underlying +// cluster1. When requests reach the configured limit, only the underlying cluster1's circuit +// breaker opens. The aggregate cluster's circuit breaker remains unaffected. When +// cluster1's circuit breaker opens, new requests are rejected, and the overflow counter increases. +// After completing the request, cluster1's circuit breaker closes again. TEST_P(AggregateIntegrationTest, CircuitBreakerTestMaxRequests) { setDownstreamProtocol(Http::CodecType::HTTP2); @@ -513,17 +517,17 @@ TEST_P(AggregateIntegrationTest, CircuitBreakerTestMaxRequests) { codec_client_ = makeHttpConnection(lookupPort("http")); - // send the 1st request to the aggregate cluster + // send the first request to the aggregate cluster auto aggregate_cluster_response1 = codec_client_->makeHeaderOnlyRequest( Http::TestRequestHeaderMapImpl{{":method", "GET"}, {":path", "/aggregatecluster"}, {":scheme", "http"}, {":authority", "host"}}); - // wait for the 1st request to arrive at cluster1 + // wait for the first request to arrive at cluster1 waitForNextUpstreamRequest(FirstUpstreamIndex); - // after the 1st request arrives at cluster1 [there is now a single active upstream request] + // after the first request arrives at cluster1 [there is now a single active upstream request] // the aggregate cluster circuit breaker remains closed test_server_->waitForGaugeEq("cluster.aggregate_cluster.circuit_breakers.default.rq_open", 0); test_server_->waitForGaugeEq("cluster.aggregate_cluster.circuit_breakers.default.remaining_rq", @@ -534,7 +538,7 @@ TEST_P(AggregateIntegrationTest, CircuitBreakerTestMaxRequests) { test_server_->waitForGaugeEq("cluster.cluster_1.circuit_breakers.default.remaining_rq", 0); test_server_->waitForCounterEq("cluster.cluster_1.upstream_rq_pending_overflow", 0); - // send a 2nd request to the aggregate cluster + // send a second request to the aggregate cluster auto aggregate_cluster_response2 = codec_client_->makeHeaderOnlyRequest( Http::TestRequestHeaderMapImpl{{":method", "GET"}, {":path", "/aggregatecluster"}, @@ -542,8 +546,7 @@ TEST_P(AggregateIntegrationTest, CircuitBreakerTestMaxRequests) { {":authority", "host"}}); ASSERT_TRUE(aggregate_cluster_response2->waitForEndStream()); - - // the 2nd request to the aggregate cluster is rejected + // the second request to the aggregate cluster is rejected EXPECT_EQ("503", aggregate_cluster_response2->headers().getStatusValue()); // the aggregate cluster circuit breaker remains closed @@ -562,8 +565,7 @@ TEST_P(AggregateIntegrationTest, CircuitBreakerTestMaxRequests) { {":method", "GET"}, {":path", "/cluster1"}, {":scheme", "http"}, {":authority", "host"}}); ASSERT_TRUE(cluster1_response1->waitForEndStream()); - - // the 3rd request to cluster1 is rejected + // the third request to cluster1 is rejected EXPECT_EQ("503", cluster1_response1->headers().getStatusValue()); // the aggregate cluster circuit breaker remains closed @@ -576,11 +578,10 @@ TEST_P(AggregateIntegrationTest, CircuitBreakerTestMaxRequests) { test_server_->waitForGaugeEq("cluster.cluster_1.circuit_breakers.default.remaining_rq", 0); test_server_->waitForCounterEq("cluster.cluster_1.upstream_rq_pending_overflow", 2); - // respond to the 1st request to the aggregate cluster + // respond to the first request to the aggregate cluster upstream_request_->encodeHeaders(default_response_headers_, true); ASSERT_TRUE(upstream_request_->waitForEndStream(*dispatcher_)); - - // the 1st request completes successfully + // the first request completes successfully ASSERT_TRUE(aggregate_cluster_response1->waitForEndStream()); EXPECT_EQ("200", aggregate_cluster_response1->headers().getStatusValue()); @@ -597,12 +598,15 @@ TEST_P(AggregateIntegrationTest, CircuitBreakerTestMaxRequests) { cleanupUpstreamAndDownstream(); } -// Tests the max_pending_requests circuit breaker behaviour on the aggregate cluster and its underlying cluster1. -// On queuing new pending requests up to and over the specified limit, the aggregate cluster circuit breaker state is unaffected, -// whereas the underlying cluster1 circuit breaker closes, rejects subsequent pending requests, and overflows. +// Tests the max_pending_requests circuit breaker behaviour on the aggregate cluster and its +// underlying cluster1. When pending requests reach the configured limit, only the underlying +// cluster1's circuit breaker opens. The aggregate cluster's circuit breaker remains unaffected. +// When cluster1's circuit breaker opens, additional pending requests are rejected, and the overflow +// counter increases. After processing the pending requests, cluster1's circuit breaker closes +// again. TEST_P(AggregateIntegrationTest, CircuitBreakerTestMaxPendingRequests) { setDownstreamProtocol(Http::CodecType::HTTP2); - + config_helper_.addConfigModifier([this](envoy::config::bootstrap::v3::Bootstrap& bootstrap) { auto* static_resources = bootstrap.mutable_static_resources(); auto* aggregate_cluster = static_resources->mutable_clusters(1); @@ -638,17 +642,17 @@ TEST_P(AggregateIntegrationTest, CircuitBreakerTestMaxPendingRequests) { codec_client_ = makeHttpConnection(lookupPort("http")); - // send the 1st request to the aggregate cluster + // send the first request to the aggregate cluster auto aggregate_cluster_response1 = codec_client_->makeHeaderOnlyRequest( Http::TestRequestHeaderMapImpl{{":method", "GET"}, {":path", "/aggregatecluster"}, {":scheme", "http"}, {":authority", "host"}}); - // wait for the 1st request to arrive at cluster1 + // wait for the first request to arrive at cluster1 waitForNextUpstreamRequest(FirstUpstreamIndex); - // send the 2nd request to the aggregate cluster [this is the 1st pending request] + // send the second request to the aggregate cluster [this is the first pending request] auto aggregate_cluster_response2 = codec_client_->makeHeaderOnlyRequest( Http::TestRequestHeaderMapImpl{{":method", "GET"}, {":path", "/aggregatecluster"}, @@ -666,14 +670,14 @@ TEST_P(AggregateIntegrationTest, CircuitBreakerTestMaxPendingRequests) { test_server_->waitForGaugeEq("cluster.cluster_1.circuit_breakers.default.remaining_pending", 0); test_server_->waitForCounterEq("cluster.cluster_1.upstream_rq_pending_overflow", 0); - // send the 3rd request to the aggregate cluster [this is the 2nd pending request] + // send the third request to the aggregate cluster [this is the second pending request] auto aggregate_cluster_response3 = codec_client_->makeHeaderOnlyRequest( Http::TestRequestHeaderMapImpl{{":method", "GET"}, {":path", "/aggregatecluster"}, {":scheme", "http"}, {":authority", "host"}}); - // the 3rd request fails immediately because the cluster1 circuit breaker is open + // the third request fails immediately because the cluster1 circuit breaker is open ASSERT_TRUE(aggregate_cluster_response3->waitForEndStream()); EXPECT_EQ("503", aggregate_cluster_response3->headers().getStatusValue()); @@ -688,11 +692,11 @@ TEST_P(AggregateIntegrationTest, CircuitBreakerTestMaxPendingRequests) { test_server_->waitForGaugeEq("cluster.cluster_1.circuit_breakers.default.remaining_pending", 0); test_server_->waitForCounterEq("cluster.cluster_1.upstream_rq_pending_overflow", 1); - // send the 4th request directly to cluster1 + // send the fourth request directly to cluster1 auto cluster1_response1 = codec_client_->makeHeaderOnlyRequest(Http::TestRequestHeaderMapImpl{ {":method", "GET"}, {":path", "/cluster1"}, {":scheme", "http"}, {":authority", "host"}}); - // the 4th request fails immediately because the cluster1 circuit breaker is open + // the fourth request fails immediately because the cluster1 circuit breaker is open ASSERT_TRUE(cluster1_response1->waitForEndStream()); EXPECT_EQ("503", cluster1_response1->headers().getStatusValue()); @@ -707,15 +711,15 @@ TEST_P(AggregateIntegrationTest, CircuitBreakerTestMaxPendingRequests) { test_server_->waitForGaugeEq("cluster.cluster_1.circuit_breakers.default.remaining_pending", 0); test_server_->waitForCounterEq("cluster.cluster_1.upstream_rq_pending_overflow", 2); - // respond to the 1st request to the aggregate cluster + // respond to the first request to the aggregate cluster upstream_request_->encodeHeaders(default_response_headers_, true); ASSERT_TRUE(aggregate_cluster_response1->waitForEndStream()); EXPECT_EQ("200", aggregate_cluster_response1->headers().getStatusValue()); - // wait for the 2nd request to arrive at cluster1 + // wait for the second request to arrive at cluster1 waitForNextUpstreamRequest(FirstUpstreamIndex); - // respond to the 2nd request to the aggregate cluster + // respond to the second request to the aggregate cluster upstream_request_->encodeHeaders(default_response_headers_, true); ASSERT_TRUE(aggregate_cluster_response2->waitForEndStream()); EXPECT_EQ("200", aggregate_cluster_response2->headers().getStatusValue()); @@ -734,12 +738,12 @@ TEST_P(AggregateIntegrationTest, CircuitBreakerTestMaxPendingRequests) { cleanupUpstreamAndDownstream(); } -// Tests the max_retries circuit breaker behaviour on the aggregate cluster and its underlying cluster1. -// On attempting retries up to and over the specified limit, the aggregate cluster circuit breaker opens -// (when the aggregate cluster route is used), preventing further retries, and overflows. -// The underlying cluster1 circuit breaker operates independently, opening only when direct requests -// to cluster1 exceed its own specified limit. Therefore the circuit breakers for the aggregate cluster -// and its underlying cluster(s) operate independently. +// Tests the max_retries circuit breaker behaviour on the aggregate cluster and its underlying +// cluster1. Unlike the other circuit breakers, the aggregate cluster's retry circuit breaker opens +// when retries to the aggregate cluster exceed its configured limit. cluster1's retry circuit +// breaker opens independently when direct requests to cluster1 exceed its configured limit. When +// either circuit breaker opens, additional retries to that cluster are prevented, and overflow +// counters increase. The two circuit breakers operate independently. TEST_P(AggregateIntegrationTest, CircuitBreakerMaxRetriesTest) { setDownstreamProtocol(Http::CodecType::HTTP2); @@ -758,8 +762,7 @@ TEST_P(AggregateIntegrationTest, CircuitBreakerMaxRetriesTest) { // adjust the retry policy on both the /aggregatecluster and /cluster1 routes aggregate_cluster_route->mutable_route()->mutable_retry_policy()->mutable_retry_on()->assign( "5xx"); - cluster1_route->mutable_route()->mutable_retry_policy()->mutable_retry_on()->assign( - "5xx"); + cluster1_route->mutable_route()->mutable_retry_policy()->mutable_retry_on()->assign("5xx"); filter->mutable_typed_config()->PackFrom(http_connection_manager); auto* aggregate_cluster = static_resources->mutable_clusters(1); @@ -790,22 +793,20 @@ TEST_P(AggregateIntegrationTest, CircuitBreakerMaxRetriesTest) { codec_client_ = makeHttpConnection(lookupPort("http")); - // send the 1st request to the aggregate cluster + // send the first request to the aggregate cluster auto aggregate_cluster_response1 = codec_client_->makeHeaderOnlyRequest( Http::TestRequestHeaderMapImpl{{":method", "GET"}, {":path", "/aggregatecluster"}, {":scheme", "http"}, {":authority", "host"}}); - // wait for the 1st request to arrive at cluster1 + // wait for the first request to arrive at cluster1 waitForNextUpstreamRequest(FirstUpstreamIndex); - - // respond to the 1st request with a 503 to trigger a retry + // respond to the first request with a 503 to trigger a retry upstream_request_->encodeHeaders(Http::TestResponseHeaderMapImpl{{":status", "503"}}, true); - - // wait for the 1st request retry to arrive at cluster1 - waitForNextUpstreamRequest(FirstUpstreamIndex); + // wait for the first request retry to arrive at cluster1 + waitForNextUpstreamRequest(FirstUpstreamIndex); auto& first_request_retry = *upstream_request_; // the aggregate_cluster the circuit breaker opens @@ -819,17 +820,16 @@ TEST_P(AggregateIntegrationTest, CircuitBreakerMaxRetriesTest) { test_server_->waitForGaugeEq("cluster.cluster_1.circuit_breakers.default.remaining_retries", 1); test_server_->waitForCounterEq("cluster.cluster_1.upstream_rq_retry_overflow", 0); - // send the 2nd request to the aggregate cluster + // send the second request to the aggregate cluster auto aggregate_cluster_response2 = codec_client_->makeHeaderOnlyRequest( Http::TestRequestHeaderMapImpl{{":method", "GET"}, {":path", "/aggregatecluster"}, {":scheme", "http"}, {":authority", "host"}}); - // wait for the 2nd request to arrive at cluster1 + // wait for the second request to arrive at cluster1 waitForNextUpstreamRequest(FirstUpstreamIndex); - - // respond to the 2nd request with a 503 to trigger a retry + // respond to the second request with a 503 to trigger a retry upstream_request_->encodeHeaders(Http::TestResponseHeaderMapImpl{{":status", "503"}}, true); // the aggregate_cluster the circuit breaker remains open and overflows @@ -843,19 +843,17 @@ TEST_P(AggregateIntegrationTest, CircuitBreakerMaxRetriesTest) { test_server_->waitForGaugeEq("cluster.cluster_1.circuit_breakers.default.remaining_retries", 1); test_server_->waitForCounterEq("cluster.cluster_1.upstream_rq_retry_overflow", 0); - // send the 3rd request directly to cluster1 + // send the third request directly to cluster1 auto cluster1_response1 = codec_client_->makeHeaderOnlyRequest(Http::TestRequestHeaderMapImpl{ {":method", "GET"}, {":path", "/cluster1"}, {":scheme", "http"}, {":authority", "host"}}); - // wait for the 3rd request to arrive at cluster1 + // wait for the third request to arrive at cluster1 waitForNextUpstreamRequest(FirstUpstreamIndex); - - // respond to the 3rd request with a 503 to trigger a retry + // respond to the third request with a 503 to trigger a retry upstream_request_->encodeHeaders(Http::TestResponseHeaderMapImpl{{":status", "503"}}, true); - // wait for the 3rd request retry to arrive at cluster1 + // wait for the third request retry to arrive at cluster1 waitForNextUpstreamRequest(FirstUpstreamIndex); - auto& third_request_retry = *upstream_request_; // the aggregate_cluster the circuit breaker remains open @@ -869,14 +867,13 @@ TEST_P(AggregateIntegrationTest, CircuitBreakerMaxRetriesTest) { test_server_->waitForGaugeEq("cluster.cluster_1.circuit_breakers.default.remaining_retries", 0); test_server_->waitForCounterEq("cluster.cluster_1.upstream_rq_retry_overflow", 0); - // send the 4th request directly to cluster1 + // send the fourth request directly to cluster1 auto cluster1_response2 = codec_client_->makeHeaderOnlyRequest(Http::TestRequestHeaderMapImpl{ {":method", "GET"}, {":path", "/cluster1"}, {":scheme", "http"}, {":authority", "host"}}); - // wait for the 4th request to arrive at cluster1 + // wait for the fourth request to arrive at cluster1 waitForNextUpstreamRequest(FirstUpstreamIndex); - - // respond to the 4th request with a 503 to trigger a retry + // respond to the fourth request with a 503 to trigger a retry upstream_request_->encodeHeaders(Http::TestResponseHeaderMapImpl{{":status", "503"}}, true); // the aggregate_cluster the circuit breaker remains open @@ -890,21 +887,18 @@ TEST_P(AggregateIntegrationTest, CircuitBreakerMaxRetriesTest) { test_server_->waitForGaugeEq("cluster.cluster_1.circuit_breakers.default.remaining_retries", 0); test_server_->waitForCounterEq("cluster.cluster_1.upstream_rq_retry_overflow", 1); - // respond to the 3rd request to cluster1 + // respond to the third request to cluster1 third_request_retry.encodeHeaders(default_response_headers_, true); ASSERT_TRUE(cluster1_response1->waitForEndStream()); EXPECT_EQ("200", cluster1_response1->headers().getStatusValue()); - - // respond to the 4th request to cluster1 + // respond to the fourth request to cluster1 ASSERT_TRUE(cluster1_response2->waitForEndStream()); EXPECT_EQ("503", cluster1_response2->headers().getStatusValue()); - - // respond to the 1st request to the aggregate cluster + // respond to the first request to the aggregate cluster first_request_retry.encodeHeaders(default_response_headers_, true); ASSERT_TRUE(aggregate_cluster_response1->waitForEndStream()); EXPECT_EQ("200", aggregate_cluster_response1->headers().getStatusValue()); - - // respond to the 2nd request to the aggregate cluster + // respond to the second request to the aggregate cluster ASSERT_TRUE(aggregate_cluster_response2->waitForEndStream()); EXPECT_EQ("503", aggregate_cluster_response2->headers().getStatusValue()); From ecb427100eb527dea0d3be2f137968382aa7dadd Mon Sep 17 00:00:00 2001 From: Baz Murphy <61154071+bazmurphy@users.noreply.github.com> Date: Wed, 16 Apr 2025 01:12:13 -0700 Subject: [PATCH 3/4] adjustments Co-authored-by: Saadia El fekak <74792703+SaadiaELF@users.noreply.github.com> Signed-off-by: Baz Murphy <61154071+bazmurphy@users.noreply.github.com> Signed-off-by: Saadia El fekak <74792703+SaadiaELF@users.noreply.github.com> --- .../aggregate/cluster_integration_test.cc | 26 +++++++++---------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/test/extensions/clusters/aggregate/cluster_integration_test.cc b/test/extensions/clusters/aggregate/cluster_integration_test.cc index 990e39e3562c2..faa78306fbb24 100644 --- a/test/extensions/clusters/aggregate/cluster_integration_test.cc +++ b/test/extensions/clusters/aggregate/cluster_integration_test.cc @@ -359,7 +359,7 @@ TEST_P(AggregateIntegrationTest, PreviousPrioritiesRetryPredicate) { // Tests the max_connections circuit breaker behaviour on the aggregate cluster and its underlying // cluster1. When connections reach the configured limit, only the underlying cluster1's circuit -// breaker opens. The aggregate cluster's circuit breaker remains unaffected. When cluster1's +// breaker opens, the aggregate cluster's circuit breaker is completely unaffected. When cluster1's // circuit breaker opens, new connections are rejected, and the overflow counter increases. After // closing the connection, cluster1's circuit breaker closes again. TEST_P(AggregateIntegrationTest, CircuitBreakerTestMaxConnections) { @@ -479,9 +479,9 @@ TEST_P(AggregateIntegrationTest, CircuitBreakerTestMaxConnections) { cleanupUpstreamAndDownstream(); } -// Tests the max_requests circuit breaker behaviour on the aggregate cluster and its underlying +// Tests the max_requests circuit breaker behaviour on the aggregate cluster and its underlying // cluster1. When requests reach the configured limit, only the underlying cluster1's circuit -// breaker opens. The aggregate cluster's circuit breaker remains unaffected. When +// breaker opens, the aggregate cluster's circuit breaker is completely unaffected. When // cluster1's circuit breaker opens, new requests are rejected, and the overflow counter increases. // After completing the request, cluster1's circuit breaker closes again. TEST_P(AggregateIntegrationTest, CircuitBreakerTestMaxRequests) { @@ -600,10 +600,10 @@ TEST_P(AggregateIntegrationTest, CircuitBreakerTestMaxRequests) { // Tests the max_pending_requests circuit breaker behaviour on the aggregate cluster and its // underlying cluster1. When pending requests reach the configured limit, only the underlying -// cluster1's circuit breaker opens. The aggregate cluster's circuit breaker remains unaffected. -// When cluster1's circuit breaker opens, additional pending requests are rejected, and the overflow -// counter increases. After processing the pending requests, cluster1's circuit breaker closes -// again. +// cluster1's circuit breaker opens, the aggregate cluster's circuit breaker is completely +// unaffected. When cluster1's circuit breaker opens, additional pending requests are rejected, and +// the overflow counter increases. After processing the pending requests, cluster1's circuit breaker +// closes again. TEST_P(AggregateIntegrationTest, CircuitBreakerTestMaxPendingRequests) { setDownstreamProtocol(Http::CodecType::HTTP2); @@ -652,7 +652,7 @@ TEST_P(AggregateIntegrationTest, CircuitBreakerTestMaxPendingRequests) { // wait for the first request to arrive at cluster1 waitForNextUpstreamRequest(FirstUpstreamIndex); - // send the second request to the aggregate cluster [this is the first pending request] + // send the second request to the aggregate cluster (this is the first pending request) auto aggregate_cluster_response2 = codec_client_->makeHeaderOnlyRequest( Http::TestRequestHeaderMapImpl{{":method", "GET"}, {":path", "/aggregatecluster"}, @@ -670,7 +670,7 @@ TEST_P(AggregateIntegrationTest, CircuitBreakerTestMaxPendingRequests) { test_server_->waitForGaugeEq("cluster.cluster_1.circuit_breakers.default.remaining_pending", 0); test_server_->waitForCounterEq("cluster.cluster_1.upstream_rq_pending_overflow", 0); - // send the third request to the aggregate cluster [this is the second pending request] + // send the third request to the aggregate cluster (this is the second pending request) auto aggregate_cluster_response3 = codec_client_->makeHeaderOnlyRequest( Http::TestRequestHeaderMapImpl{{":method", "GET"}, {":path", "/aggregatecluster"}, @@ -740,10 +740,10 @@ TEST_P(AggregateIntegrationTest, CircuitBreakerTestMaxPendingRequests) { // Tests the max_retries circuit breaker behaviour on the aggregate cluster and its underlying // cluster1. Unlike the other circuit breakers, the aggregate cluster's retry circuit breaker opens -// when retries to the aggregate cluster exceed its configured limit. cluster1's retry circuit -// breaker opens independently when direct requests to cluster1 exceed its configured limit. When -// either circuit breaker opens, additional retries to that cluster are prevented, and overflow -// counters increase. The two circuit breakers operate independently. +// when retries exceed its configured limit. cluster1's retry circuit breaker opens independently +// when direct requests to cluster1 exceed its configured limit. When either circuit breaker opens, +// additional retries to that cluster are prevented, and overflow counters increase. The two circuit +// breakers operate independently. TEST_P(AggregateIntegrationTest, CircuitBreakerMaxRetriesTest) { setDownstreamProtocol(Http::CodecType::HTTP2); From c76dddec85d45a29bd917911ccca930569a8efc6 Mon Sep 17 00:00:00 2001 From: Baz Murphy <61154071+bazmurphy@users.noreply.github.com> Date: Wed, 16 Apr 2025 04:28:26 -0700 Subject: [PATCH 4/4] adjust from feedback Co-authored-by: Saadia El fekak <74792703+SaadiaELF@users.noreply.github.com> Signed-off-by: Baz Murphy <61154071+bazmurphy@users.noreply.github.com> Signed-off-by: Saadia El fekak <74792703+SaadiaELF@users.noreply.github.com> --- .../aggregate/cluster_integration_test.cc | 43 ++++++++++--------- 1 file changed, 22 insertions(+), 21 deletions(-) diff --git a/test/extensions/clusters/aggregate/cluster_integration_test.cc b/test/extensions/clusters/aggregate/cluster_integration_test.cc index faa78306fbb24..99ab092fe636b 100644 --- a/test/extensions/clusters/aggregate/cluster_integration_test.cc +++ b/test/extensions/clusters/aggregate/cluster_integration_test.cc @@ -371,13 +371,13 @@ TEST_P(AggregateIntegrationTest, CircuitBreakerTestMaxConnections) { reduceAggregateClustersListToOneCluster(*aggregate_cluster); setCircuitBreakerLimits(*aggregate_cluster, CircuitBreakerLimits{.max_connections = 1}); - setMaxConcurrentStreams(*aggregate_cluster, 1U); + setMaxConcurrentStreams(*aggregate_cluster, 1); }); initialize(); setCircuitBreakerLimits(cluster1_, CircuitBreakerLimits{.max_connections = 1}); - setMaxConcurrentStreams(cluster1_, 1U); + setMaxConcurrentStreams(cluster1_, 1); EXPECT_TRUE(compareDiscoveryRequest(Config::TypeUrl::get().Cluster, "55", {}, {}, {})); sendDiscoveryResponse(Config::TypeUrl::get().Cluster, @@ -397,7 +397,7 @@ TEST_P(AggregateIntegrationTest, CircuitBreakerTestMaxConnections) { codec_client_ = makeHttpConnection(lookupPort("http")); - // send the first request to the aggregate cluster + // send a first request to the aggregate cluster auto aggregate_cluster_response1 = codec_client_->makeHeaderOnlyRequest( Http::TestRequestHeaderMapImpl{{":method", "GET"}, {":path", "/aggregatecluster"}, @@ -407,7 +407,7 @@ TEST_P(AggregateIntegrationTest, CircuitBreakerTestMaxConnections) { // wait for the first request to arrive at cluster1 waitForNextUpstreamRequest(FirstUpstreamIndex); - // after the first request arrives at cluster1 [there is now a single active upstream connection] + // after the first request arrives at cluster1 (there is now a single active upstream connection) // the aggregate cluster circuit breaker remains closed test_server_->waitForGaugeEq("cluster.aggregate_cluster.circuit_breakers.default.cx_open", 0); test_server_->waitForGaugeEq("cluster.aggregate_cluster.circuit_breakers.default.remaining_cx", @@ -437,7 +437,7 @@ TEST_P(AggregateIntegrationTest, CircuitBreakerTestMaxConnections) { test_server_->waitForCounterEq("cluster.cluster_1.upstream_cx_overflow", 1); // the requests to the aggregate cluster route affect the cluster1 circuit breaker state - // send a 3rd request directly to cluster1 + // send a third request directly to cluster1 to confirm this auto cluster1_response1 = codec_client_->makeHeaderOnlyRequest(Http::TestRequestHeaderMapImpl{ {":method", "GET"}, {":path", "/cluster1"}, {":scheme", "http"}, {":authority", "host"}}); @@ -472,9 +472,9 @@ TEST_P(AggregateIntegrationTest, CircuitBreakerTestMaxConnections) { // the cluster1 circuit breaker closes test_server_->waitForGaugeEq("cluster.cluster_1.circuit_breakers.default.cx_open", 0); test_server_->waitForGaugeEq("cluster.cluster_1.circuit_breakers.default.remaining_cx", 1); - test_server_->waitForCounterGe("cluster.cluster_1.upstream_cx_overflow", 2); // the overflow may be greater than 2 because after completing the first request // the queued pending requests will attempt to reuse the connection + test_server_->waitForCounterGe("cluster.cluster_1.upstream_cx_overflow", 2); cleanupUpstreamAndDownstream(); } @@ -517,7 +517,7 @@ TEST_P(AggregateIntegrationTest, CircuitBreakerTestMaxRequests) { codec_client_ = makeHttpConnection(lookupPort("http")); - // send the first request to the aggregate cluster + // send a first request to the aggregate cluster auto aggregate_cluster_response1 = codec_client_->makeHeaderOnlyRequest( Http::TestRequestHeaderMapImpl{{":method", "GET"}, {":path", "/aggregatecluster"}, @@ -527,7 +527,7 @@ TEST_P(AggregateIntegrationTest, CircuitBreakerTestMaxRequests) { // wait for the first request to arrive at cluster1 waitForNextUpstreamRequest(FirstUpstreamIndex); - // after the first request arrives at cluster1 [there is now a single active upstream request] + // after the first request arrives at cluster1 (there is now a single active upstream request) // the aggregate cluster circuit breaker remains closed test_server_->waitForGaugeEq("cluster.aggregate_cluster.circuit_breakers.default.rq_open", 0); test_server_->waitForGaugeEq("cluster.aggregate_cluster.circuit_breakers.default.remaining_rq", @@ -560,7 +560,7 @@ TEST_P(AggregateIntegrationTest, CircuitBreakerTestMaxRequests) { test_server_->waitForCounterEq("cluster.cluster_1.upstream_rq_pending_overflow", 1); // the requests to the aggregate cluster route affect the cluster1 circuit breaker state - // send a 3rd request directly to cluster1 + // send a third request directly to cluster1 to confirm this auto cluster1_response1 = codec_client_->makeHeaderOnlyRequest(Http::TestRequestHeaderMapImpl{ {":method", "GET"}, {":path", "/cluster1"}, {":scheme", "http"}, {":authority", "host"}}); @@ -614,14 +614,14 @@ TEST_P(AggregateIntegrationTest, CircuitBreakerTestMaxPendingRequests) { reduceAggregateClustersListToOneCluster(*aggregate_cluster); setCircuitBreakerLimits(*aggregate_cluster, CircuitBreakerLimits{.max_connections = 1, .max_pending_requests = 1}); - setMaxConcurrentStreams(*aggregate_cluster, 1U); + setMaxConcurrentStreams(*aggregate_cluster, 1); }); initialize(); setCircuitBreakerLimits(cluster1_, CircuitBreakerLimits{.max_connections = 1, .max_pending_requests = 1}); - setMaxConcurrentStreams(cluster1_, 1U); + setMaxConcurrentStreams(cluster1_, 1); EXPECT_TRUE(compareDiscoveryRequest(Config::TypeUrl::get().Cluster, "55", {}, {}, {})); sendDiscoveryResponse(Config::TypeUrl::get().Cluster, @@ -642,7 +642,7 @@ TEST_P(AggregateIntegrationTest, CircuitBreakerTestMaxPendingRequests) { codec_client_ = makeHttpConnection(lookupPort("http")); - // send the first request to the aggregate cluster + // send a first request to the aggregate cluster auto aggregate_cluster_response1 = codec_client_->makeHeaderOnlyRequest( Http::TestRequestHeaderMapImpl{{":method", "GET"}, {":path", "/aggregatecluster"}, @@ -652,7 +652,7 @@ TEST_P(AggregateIntegrationTest, CircuitBreakerTestMaxPendingRequests) { // wait for the first request to arrive at cluster1 waitForNextUpstreamRequest(FirstUpstreamIndex); - // send the second request to the aggregate cluster (this is the first pending request) + // send a second request to the aggregate cluster (this is the first pending request) auto aggregate_cluster_response2 = codec_client_->makeHeaderOnlyRequest( Http::TestRequestHeaderMapImpl{{":method", "GET"}, {":path", "/aggregatecluster"}, @@ -670,7 +670,7 @@ TEST_P(AggregateIntegrationTest, CircuitBreakerTestMaxPendingRequests) { test_server_->waitForGaugeEq("cluster.cluster_1.circuit_breakers.default.remaining_pending", 0); test_server_->waitForCounterEq("cluster.cluster_1.upstream_rq_pending_overflow", 0); - // send the third request to the aggregate cluster (this is the second pending request) + // send a third request to the aggregate cluster (this is the second pending request) auto aggregate_cluster_response3 = codec_client_->makeHeaderOnlyRequest( Http::TestRequestHeaderMapImpl{{":method", "GET"}, {":path", "/aggregatecluster"}, @@ -692,7 +692,8 @@ TEST_P(AggregateIntegrationTest, CircuitBreakerTestMaxPendingRequests) { test_server_->waitForGaugeEq("cluster.cluster_1.circuit_breakers.default.remaining_pending", 0); test_server_->waitForCounterEq("cluster.cluster_1.upstream_rq_pending_overflow", 1); - // send the fourth request directly to cluster1 + // the pending requests to the aggregate cluster route affect the cluster1 circuit breaker state + // send a fourth request directly to cluster1 to confirm this auto cluster1_response1 = codec_client_->makeHeaderOnlyRequest(Http::TestRequestHeaderMapImpl{ {":method", "GET"}, {":path", "/cluster1"}, {":scheme", "http"}, {":authority", "host"}}); @@ -793,7 +794,7 @@ TEST_P(AggregateIntegrationTest, CircuitBreakerMaxRetriesTest) { codec_client_ = makeHttpConnection(lookupPort("http")); - // send the first request to the aggregate cluster + // send a first request to the aggregate cluster auto aggregate_cluster_response1 = codec_client_->makeHeaderOnlyRequest( Http::TestRequestHeaderMapImpl{{":method", "GET"}, {":path", "/aggregatecluster"}, @@ -820,7 +821,7 @@ TEST_P(AggregateIntegrationTest, CircuitBreakerMaxRetriesTest) { test_server_->waitForGaugeEq("cluster.cluster_1.circuit_breakers.default.remaining_retries", 1); test_server_->waitForCounterEq("cluster.cluster_1.upstream_rq_retry_overflow", 0); - // send the second request to the aggregate cluster + // send a second request to the aggregate cluster auto aggregate_cluster_response2 = codec_client_->makeHeaderOnlyRequest( Http::TestRequestHeaderMapImpl{{":method", "GET"}, {":path", "/aggregatecluster"}, @@ -843,7 +844,7 @@ TEST_P(AggregateIntegrationTest, CircuitBreakerMaxRetriesTest) { test_server_->waitForGaugeEq("cluster.cluster_1.circuit_breakers.default.remaining_retries", 1); test_server_->waitForCounterEq("cluster.cluster_1.upstream_rq_retry_overflow", 0); - // send the third request directly to cluster1 + // send a third request directly to cluster1 auto cluster1_response1 = codec_client_->makeHeaderOnlyRequest(Http::TestRequestHeaderMapImpl{ {":method", "GET"}, {":path", "/cluster1"}, {":scheme", "http"}, {":authority", "host"}}); @@ -867,7 +868,7 @@ TEST_P(AggregateIntegrationTest, CircuitBreakerMaxRetriesTest) { test_server_->waitForGaugeEq("cluster.cluster_1.circuit_breakers.default.remaining_retries", 0); test_server_->waitForCounterEq("cluster.cluster_1.upstream_rq_retry_overflow", 0); - // send the fourth request directly to cluster1 + // send a fourth request directly to cluster1 auto cluster1_response2 = codec_client_->makeHeaderOnlyRequest(Http::TestRequestHeaderMapImpl{ {":method", "GET"}, {":path", "/cluster1"}, {":scheme", "http"}, {":authority", "host"}}); @@ -891,14 +892,14 @@ TEST_P(AggregateIntegrationTest, CircuitBreakerMaxRetriesTest) { third_request_retry.encodeHeaders(default_response_headers_, true); ASSERT_TRUE(cluster1_response1->waitForEndStream()); EXPECT_EQ("200", cluster1_response1->headers().getStatusValue()); - // respond to the fourth request to cluster1 + // handle the response to the fourth request to cluster1 ASSERT_TRUE(cluster1_response2->waitForEndStream()); EXPECT_EQ("503", cluster1_response2->headers().getStatusValue()); // respond to the first request to the aggregate cluster first_request_retry.encodeHeaders(default_response_headers_, true); ASSERT_TRUE(aggregate_cluster_response1->waitForEndStream()); EXPECT_EQ("200", aggregate_cluster_response1->headers().getStatusValue()); - // respond to the second request to the aggregate cluster + // handle the response to the second request to the aggregate cluster ASSERT_TRUE(aggregate_cluster_response2->waitForEndStream()); EXPECT_EQ("503", aggregate_cluster_response2->headers().getStatusValue());