diff --git a/source/common/upstream/health_checker_base_impl.cc b/source/common/upstream/health_checker_base_impl.cc index 21e68d6b518b7..5858fdbadc31e 100644 --- a/source/common/upstream/health_checker_base_impl.cc +++ b/source/common/upstream/health_checker_base_impl.cc @@ -40,8 +40,11 @@ HealthCheckerImplBase::HealthCheckerImplBase(const Cluster& cluster, } HealthCheckerImplBase::~HealthCheckerImplBase() { - // Make sure that any sessions that were deferred deleted are cleared before we destruct. - dispatcher_.clearDeferredDeleteList(); + // ASSERTs inside the session destructor check to make sure we have been previously deferred + // deleted. Unify that logic here before actual destruction happens. + for (auto& session : active_sessions_) { + session.second->onDeferredDeleteBase(); + } } void HealthCheckerImplBase::decHealthy() { @@ -226,12 +229,9 @@ HealthCheckerImplBase::ActiveHealthCheckSession::ActiveHealthCheckSession( } HealthCheckerImplBase::ActiveHealthCheckSession::~ActiveHealthCheckSession() { - if (!host_->healthFlagGet(Host::HealthFlag::FAILED_ACTIVE_HC)) { - parent_.decHealthy(); - } - if (host_->healthFlagGet(Host::HealthFlag::DEGRADED_ACTIVE_HC)) { - parent_.decDegraded(); - } + // Make sure onDeferredDeleteBase() has been called. We should not reference our parent at this + // point since we may have been deferred deleted. + ASSERT(interval_timer_ == nullptr && timeout_timer_ == nullptr); } void HealthCheckerImplBase::ActiveHealthCheckSession::onDeferredDeleteBase() { @@ -239,6 +239,12 @@ void HealthCheckerImplBase::ActiveHealthCheckSession::onDeferredDeleteBase() { // implementation specific state is destroyed. interval_timer_.reset(); timeout_timer_.reset(); + if (!host_->healthFlagGet(Host::HealthFlag::FAILED_ACTIVE_HC)) { + parent_.decHealthy(); + } + if (host_->healthFlagGet(Host::HealthFlag::DEGRADED_ACTIVE_HC)) { + parent_.decDegraded(); + } onDeferredDelete(); } diff --git a/source/common/upstream/health_checker_impl.cc b/source/common/upstream/health_checker_impl.cc index 1c254b9321ddb..5891ac1d7417f 100644 --- a/source/common/upstream/health_checker_impl.cc +++ b/source/common/upstream/health_checker_impl.cc @@ -154,7 +154,6 @@ HttpHealthCheckerImpl::HttpActiveHealthCheckSession::HttpActiveHealthCheckSessio local_address_(std::make_shared("127.0.0.1")) {} HttpHealthCheckerImpl::HttpActiveHealthCheckSession::~HttpActiveHealthCheckSession() { - onDeferredDelete(); ASSERT(client_ == nullptr); } @@ -356,7 +355,6 @@ TcpHealthCheckerImpl::TcpHealthCheckerImpl(const Cluster& cluster, receive_bytes_(TcpHealthCheckMatcher::loadProtoBytes(config.tcp_health_check().receive())) {} TcpHealthCheckerImpl::TcpActiveHealthCheckSession::~TcpActiveHealthCheckSession() { - onDeferredDelete(); ASSERT(client_ == nullptr); } @@ -464,7 +462,6 @@ GrpcHealthCheckerImpl::GrpcActiveHealthCheckSession::GrpcActiveHealthCheckSessio : ActiveHealthCheckSession(parent, host), parent_(parent) {} GrpcHealthCheckerImpl::GrpcActiveHealthCheckSession::~GrpcActiveHealthCheckSession() { - onDeferredDelete(); ASSERT(client_ == nullptr); } diff --git a/source/extensions/health_checkers/redis/redis.cc b/source/extensions/health_checkers/redis/redis.cc index b6342316499ae..9f130824639c8 100644 --- a/source/extensions/health_checkers/redis/redis.cc +++ b/source/extensions/health_checkers/redis/redis.cc @@ -25,7 +25,6 @@ RedisHealthChecker::RedisActiveHealthCheckSession::RedisActiveHealthCheckSession : ActiveHealthCheckSession(parent, host), parent_(parent) {} RedisHealthChecker::RedisActiveHealthCheckSession::~RedisActiveHealthCheckSession() { - onDeferredDelete(); ASSERT(current_request_ == nullptr); ASSERT(client_ == nullptr); } diff --git a/test/common/upstream/hds_test.cc b/test/common/upstream/hds_test.cc index 85d40a3f430c5..e8268ffb6bbc9 100644 --- a/test/common/upstream/hds_test.cc +++ b/test/common/upstream/hds_test.cc @@ -217,7 +217,6 @@ TEST_F(HdsTest, TestProcessMessageHealthChecks) { // Check Correctness EXPECT_EQ(hds_delegate_->hdsClusters()[0]->healthCheckers().size(), 2); EXPECT_EQ(hds_delegate_->hdsClusters()[1]->healthCheckers().size(), 3); - EXPECT_CALL(dispatcher_, clearDeferredDeleteList()).Times(5); } // Tests OnReceiveMessage given a minimal HealthCheckSpecifier message @@ -325,8 +324,6 @@ TEST_F(HdsTest, TestSendResponseOneEndpointTimeout) { .socket_address() .port_value(), 1234); - - EXPECT_CALL(dispatcher_, clearDeferredDeleteList()); } } // namespace Upstream diff --git a/test/common/upstream/health_checker_impl_test.cc b/test/common/upstream/health_checker_impl_test.cc index 2d86d68c3b699..7ea5c824b92aa 100644 --- a/test/common/upstream/health_checker_impl_test.cc +++ b/test/common/upstream/health_checker_impl_test.cc @@ -81,7 +81,6 @@ TEST(HealthCheckerFactoryTest, CreateGrpc) { Event::MockDispatcher dispatcher; AccessLog::MockAccessLogManager log_manager; - EXPECT_CALL(dispatcher, clearDeferredDeleteList()); EXPECT_NE(nullptr, dynamic_cast( HealthCheckerFactory::create(createGrpcHealthCheckConfig(), cluster, runtime, random, dispatcher, log_manager) diff --git a/test/config/utility.cc b/test/config/utility.cc index a27970824b5b9..d2ceba53da4d1 100644 --- a/test/config/utility.cc +++ b/test/config/utility.cc @@ -316,7 +316,8 @@ void ConfigHelper::finalize(const std::vector& ports) { *cluster->mutable_transport_socket(), tls_config); } } - ASSERT(port_idx == ports.size() || eds_hosts || custom_cluster); + ASSERT(port_idx == ports.size() || eds_hosts || custom_cluster || + bootstrap_.dynamic_resources().has_cds_config()); if (!connect_timeout_set_) { #ifdef __APPLE__ @@ -598,6 +599,23 @@ void ConfigHelper::addConfigModifier(HttpModifierFunction function) { }); } +CdsHelper::CdsHelper() : cds_path_(TestEnvironment::writeStringToFileForTest("cds.pb_text", "")) {} + +void CdsHelper::setCds(const std::vector& clusters) { + // Write to file the DiscoveryResponse and trigger inotify watch. + envoy::api::v2::DiscoveryResponse cds_response; + cds_response.set_version_info(std::to_string(cds_version_++)); + cds_response.set_type_url(Config::TypeUrl::get().Cluster); + for (const auto& cluster : clusters) { + cds_response.add_resources()->PackFrom(cluster); + } + // Past the initial write, need move semantics to trigger inotify move event that the + // FilesystemSubscriptionImpl is subscribed to. + std::string path = + TestEnvironment::writeStringToFileForTest("cds.update.pb_text", cds_response.DebugString()); + TestUtility::renameFile(path, cds_path_); +} + EdsHelper::EdsHelper() : eds_path_(TestEnvironment::writeStringToFileForTest("eds.pb_text", "")) { // cluster.cluster_0.update_success will be incremented on the initial // load when Envoy comes up. @@ -605,8 +623,7 @@ EdsHelper::EdsHelper() : eds_path_(TestEnvironment::writeStringToFileForTest("ed } void EdsHelper::setEds( - const std::vector& cluster_load_assignments, - IntegrationTestServerStats& server_stats) { + const std::vector& cluster_load_assignments) { // Write to file the DiscoveryResponse and trigger inotify watch. envoy::api::v2::DiscoveryResponse eds_response; eds_response.set_version_info(std::to_string(eds_version_++)); @@ -619,10 +636,16 @@ void EdsHelper::setEds( std::string path = TestEnvironment::writeStringToFileForTest("eds.update.pb_text", eds_response.DebugString()); TestUtility::renameFile(path, eds_path_); +} + +void EdsHelper::setEdsAndWait( + const std::vector& cluster_load_assignments, + IntegrationTestServerStats& server_stats) { + setEds(cluster_load_assignments); // Make sure Envoy has consumed the update now that it is running. - server_stats.waitForCounterGe("cluster.cluster_0.update_success", ++update_successes_); + ++update_successes_; + server_stats.waitForCounterGe("cluster.cluster_0.update_success", update_successes_); RELEASE_ASSERT( update_successes_ == server_stats.counter("cluster.cluster_0.update_success")->value(), ""); } - } // namespace Envoy diff --git a/test/config/utility.h b/test/config/utility.h index e93d5d579f9c1..1d10699134f52 100644 --- a/test/config/utility.h +++ b/test/config/utility.h @@ -178,14 +178,29 @@ class ConfigHelper { bool finalized_{false}; }; +class CdsHelper { +public: + CdsHelper(); + + // Set CDS contents on filesystem. + void setCds(const std::vector& cluster); + const std::string& cds_path() const { return cds_path_; } + +private: + const std::string cds_path_; + uint32_t cds_version_{}; +}; + // Common code for tests that deliver EDS update via the filesystem. class EdsHelper { public: EdsHelper(); // Set EDS contents on filesystem and wait for Envoy to pick this up. - void setEds(const std::vector& cluster_load_assignments, - IntegrationTestServerStats& server_stats); + void setEds(const std::vector& cluster_load_assignments); + void + setEdsAndWait(const std::vector& cluster_load_assignments, + IntegrationTestServerStats& server_stats); const std::string& eds_path() const { return eds_path_; } private: diff --git a/test/extensions/health_checkers/redis/config_test.cc b/test/extensions/health_checkers/redis/config_test.cc index 0985aa90a92a9..b05edf269d830 100644 --- a/test/extensions/health_checkers/redis/config_test.cc +++ b/test/extensions/health_checkers/redis/config_test.cc @@ -107,7 +107,6 @@ TEST(HealthCheckerFactoryTest, CreateRedisViaUpstreamHealthCheckerFactory) { Event::MockDispatcher dispatcher; AccessLog::MockAccessLogManager log_manager; - EXPECT_CALL(dispatcher, clearDeferredDeleteList()); EXPECT_NE(nullptr, dynamic_cast( Upstream::HealthCheckerFactory::create( Upstream::parseHealthCheckFromV2Yaml(yaml), cluster, runtime, random, diff --git a/test/integration/eds_integration_test.cc b/test/integration/eds_integration_test.cc index 1b14651fa5f52..a87154a48c951 100644 --- a/test/integration/eds_integration_test.cc +++ b/test/integration/eds_integration_test.cc @@ -22,7 +22,8 @@ class EdsIntegrationTest : public testing::TestWithParam overprovisioning_factor = absl::nullopt) { + absl::optional overprovisioning_factor = absl::nullopt, + bool await_update = true) { ASSERT(total_endpoints >= healthy_endpoints + degraded_endpoints); envoy::api::v2::ClusterLoadAssignment cluster_load_assignment; cluster_load_assignment.set_cluster_name("cluster_0"); @@ -45,42 +46,108 @@ class EdsIntegrationTest : public testing::TestWithParammutable_cds_config()->set_path(cds_helper_.cds_path()); + bootstrap.mutable_static_resources()->clear_clusters(); + }); + + // Set validate_clusters to false to allow us to reference a CDS cluster. config_helper_.addConfigModifier( - [this, http_active_hc](envoy::config::bootstrap::v2::Bootstrap& bootstrap) { - // Switch predefined cluster_0 to EDS filesystem sourcing. - auto* cluster_0 = bootstrap.mutable_static_resources()->mutable_clusters(0); - cluster_0->mutable_hosts()->Clear(); - cluster_0->set_type(envoy::api::v2::Cluster::EDS); - auto* eds_cluster_config = cluster_0->mutable_eds_cluster_config(); - eds_cluster_config->mutable_eds_config()->set_path(eds_helper_.eds_path()); - if (http_active_hc) { - auto* health_check = cluster_0->add_health_checks(); - health_check->mutable_timeout()->set_seconds(30); - // TODO(mattklein123): Consider using simulated time here. - health_check->mutable_interval()->CopyFrom( - Protobuf::util::TimeUtil::MillisecondsToDuration(100)); - health_check->mutable_no_traffic_interval()->CopyFrom( - Protobuf::util::TimeUtil::MillisecondsToDuration(100)); - health_check->mutable_unhealthy_threshold()->set_value(1); - health_check->mutable_healthy_threshold()->set_value(1); - health_check->mutable_http_health_check()->set_path("/healthcheck"); - } - }); + [](envoy::config::filter::network::http_connection_manager::v2::HttpConnectionManager& + hcm) { hcm.mutable_route_config()->mutable_validate_clusters()->set_value(false); }); + + cluster_.mutable_connect_timeout()->CopyFrom( + Protobuf::util::TimeUtil::MillisecondsToDuration(100)); + cluster_.set_name("cluster_0"); + cluster_.mutable_hosts()->Clear(); + cluster_.set_type(envoy::api::v2::Cluster::EDS); + auto* eds_cluster_config = cluster_.mutable_eds_cluster_config(); + eds_cluster_config->mutable_eds_config()->set_path(eds_helper_.eds_path()); + if (http_active_hc) { + auto* health_check = cluster_.add_health_checks(); + health_check->mutable_timeout()->set_seconds(30); + // TODO(mattklein123): Consider using simulated time here. + health_check->mutable_interval()->CopyFrom( + Protobuf::util::TimeUtil::MillisecondsToDuration(100)); + health_check->mutable_no_traffic_interval()->CopyFrom( + Protobuf::util::TimeUtil::MillisecondsToDuration(100)); + health_check->mutable_unhealthy_threshold()->set_value(1); + health_check->mutable_healthy_threshold()->set_value(1); + health_check->mutable_http_health_check()->set_path("/healthcheck"); + health_check->mutable_http_health_check()->set_use_http2(use_http2_hc_); + } + setEndpoints(0, 0, 0, true, absl::nullopt, false); + cds_helper_.setCds({cluster_}); initialize(); - setEndpoints(0, 0, 0); + test_server_->waitForGaugeEq("cluster_manager.warming_clusters", 0); } + bool use_http2_hc_{}; EdsHelper eds_helper_; + CdsHelper cds_helper_; + envoy::api::v2::Cluster cluster_; }; INSTANTIATE_TEST_SUITE_P(IpVersions, EdsIntegrationTest, testing::ValuesIn(TestEnvironment::getIpVersionsForTest())); +// Verifies that a new cluster can we warmed when using HTTP/2 health checking. Regression test +// of the issue detailed in issue #6951. +TEST_P(EdsIntegrationTest, Http2HcClusterRewarming) { + use_http2_hc_ = true; + initializeTest(true); + fake_upstreams_[0]->set_allow_unexpected_disconnects(true); + setEndpoints(1, 0, 0, false); + EXPECT_EQ(1, test_server_->gauge("cluster.cluster_0.membership_total")->value()); + EXPECT_EQ(0, test_server_->gauge("cluster.cluster_0.membership_healthy")->value()); + + // Wait for the first HC and verify the host is healthy. This should warm the initial cluster. + waitForNextUpstreamRequest(); + upstream_request_->encodeHeaders(Http::TestHeaderMapImpl{{":status", "200"}}, true); + test_server_->waitForGaugeEq("cluster.cluster_0.membership_healthy", 1); + EXPECT_EQ(1, test_server_->gauge("cluster.cluster_0.membership_total")->value()); + + // Trigger a CDS update. This should cause a new cluster to require warming, blocked on the host + // being health checked. + cluster_.mutable_circuit_breakers()->add_thresholds()->mutable_max_connections()->set_value(100); + cds_helper_.setCds({cluster_}); + test_server_->waitForGaugeEq("cluster_manager.warming_clusters", 1); + EXPECT_EQ(1, test_server_->gauge("cluster_manager.warming_clusters")->value()); + + // We need to do a bunch of work to get a hold of second hc connection. + FakeHttpConnectionPtr fake_upstream_connection; + auto result = fake_upstreams_[0]->waitForHttpConnection( + *dispatcher_, fake_upstream_connection, TestUtility::DefaultTimeout, max_request_headers_kb_); + RELEASE_ASSERT(result, result.message()); + + FakeStreamPtr upstream_request; + result = fake_upstream_connection->waitForNewStream(*dispatcher_, upstream_request); + RELEASE_ASSERT(result, result.message()); + // Wait for the stream to be completely received. + result = upstream_request->waitForEndStream(*dispatcher_); + RELEASE_ASSERT(result, result.message()); + + // Respond with a health check. This will cause the previous cluster to be destroyed inline as + // part of processing the response. + upstream_request->encodeHeaders(Http::TestHeaderMapImpl{{":status", "503"}}, true); + test_server_->waitForGaugeEq("cluster_manager.warming_clusters", 0); + EXPECT_EQ(0, test_server_->gauge("cluster_manager.warming_clusters")->value()); +} + // Verify that a host stabilized via active health checking which is first removed from EDS and // then fails health checking is removed. TEST_P(EdsIntegrationTest, RemoveAfterHcFail) { @@ -111,11 +178,7 @@ TEST_P(EdsIntegrationTest, RemoveAfterHcFail) { // Verifies that endpoints are ignored until health checked when configured to. TEST_P(EdsIntegrationTest, EndpointWarmingSuccessfulHc) { - config_helper_.addConfigModifier([](envoy::config::bootstrap::v2::Bootstrap& bootstrap) { - // Switch predefined cluster_0 to EDS filesystem sourcing. - auto* cluster_0 = bootstrap.mutable_static_resources()->mutable_clusters(0); - cluster_0->mutable_common_lb_config()->set_ignore_new_hosts_until_first_hc(true); - }); + cluster_.mutable_common_lb_config()->set_ignore_new_hosts_until_first_hc(true); // Endpoints are initially excluded. initializeTest(true); @@ -138,11 +201,7 @@ TEST_P(EdsIntegrationTest, EndpointWarmingSuccessfulHc) { // Verifies that endpoints are ignored until health checked when configured to when the first // health check fails. TEST_P(EdsIntegrationTest, EndpointWarmingFailedHc) { - config_helper_.addConfigModifier([](envoy::config::bootstrap::v2::Bootstrap& bootstrap) { - // Switch predefined cluster_0 to EDS filesystem sourcing. - auto* cluster_0 = bootstrap.mutable_static_resources()->mutable_clusters(0); - cluster_0->mutable_common_lb_config()->set_ignore_new_hosts_until_first_hc(true); - }); + cluster_.mutable_common_lb_config()->set_ignore_new_hosts_until_first_hc(true); // Endpoints are initially excluded. initializeTest(true); @@ -255,7 +314,7 @@ TEST_P(EdsIntegrationTest, BatchMemberUpdateCb) { auto* endpoint = locality_lb_endpoints->add_lb_endpoints(); setUpstreamAddress(1, *endpoint); - eds_helper_.setEds({cluster_load_assignment}, *test_server_); + eds_helper_.setEdsAndWait({cluster_load_assignment}, *test_server_); EXPECT_EQ(1, member_update_count); } diff --git a/test/integration/load_stats_integration_test.cc b/test/integration/load_stats_integration_test.cc index 0d81db97f07a0..3d2ddb24172c8 100644 --- a/test/integration/load_stats_integration_test.cc +++ b/test/integration/load_stats_integration_test.cc @@ -93,7 +93,7 @@ class LoadStatsIntegrationTest : public testing::TestWithParam