diff --git a/source/common/upstream/health_checker_base_impl.cc b/source/common/upstream/health_checker_base_impl.cc index 24cfabac7b541..71d2528473f52 100644 --- a/source/common/upstream/health_checker_base_impl.cc +++ b/source/common/upstream/health_checker_base_impl.cc @@ -38,6 +38,11 @@ HealthCheckerImplBase::HealthCheckerImplBase(const Cluster& cluster, }); } +HealthCheckerImplBase::~HealthCheckerImplBase() { + // Make sure that any sessions that were deferred deleted are cleared before we destruct. + dispatcher_.clearDeferredDeleteList(); +} + void HealthCheckerImplBase::decHealthy() { ASSERT(local_process_healthy_ > 0); local_process_healthy_--; @@ -135,6 +140,9 @@ void HealthCheckerImplBase::onClusterMemberUpdate(const HostVector& hosts_added, for (const HostSharedPtr& host : hosts_removed) { auto session_iter = active_sessions_.find(host); ASSERT(active_sessions_.end() != session_iter); + // This deletion can happen inline in response to a host failure, so we deferred delete. + session_iter->second->onDeferredDeleteBase(); + dispatcher_.deferredDelete(std::move(session_iter->second)); active_sessions_.erase(session_iter); } } @@ -220,6 +228,14 @@ HealthCheckerImplBase::ActiveHealthCheckSession::~ActiveHealthCheckSession() { } } +void HealthCheckerImplBase::ActiveHealthCheckSession::onDeferredDeleteBase() { + // The session is about to be deferred deleted. Make sure all timers are gone and any + // implementation specific state is destroyed. + interval_timer_.reset(); + timeout_timer_.reset(); + onDeferredDelete(); +} + void HealthCheckerImplBase::ActiveHealthCheckSession::handleSuccess(bool degraded) { // If we are healthy, reset the # of unhealthy to zero. num_unhealthy_ = 0; @@ -310,8 +326,14 @@ HealthTransition HealthCheckerImplBase::ActiveHealthCheckSession::setUnhealthy( void HealthCheckerImplBase::ActiveHealthCheckSession::handleFailure( envoy::data::core::v2alpha::HealthCheckFailureType type) { HealthTransition changed_state = setUnhealthy(type); - timeout_timer_->disableTimer(); - interval_timer_->enableTimer(parent_.interval(HealthState::Unhealthy, changed_state)); + // It's possible that the previous call caused this session to be deferred deleted. + if (timeout_timer_ != nullptr) { + timeout_timer_->disableTimer(); + } + + if (interval_timer_ != nullptr) { + interval_timer_->enableTimer(parent_.interval(HealthState::Unhealthy, changed_state)); + } } void HealthCheckerImplBase::ActiveHealthCheckSession::onIntervalBase() { diff --git a/source/common/upstream/health_checker_base_impl.h b/source/common/upstream/health_checker_base_impl.h index 2fd1d7f56a91f..8c683428a350f 100644 --- a/source/common/upstream/health_checker_base_impl.h +++ b/source/common/upstream/health_checker_base_impl.h @@ -46,11 +46,12 @@ class HealthCheckerImplBase : public HealthChecker, void start() override; protected: - class ActiveHealthCheckSession { + class ActiveHealthCheckSession : public Event::DeferredDeletable { public: virtual ~ActiveHealthCheckSession(); HealthTransition setUnhealthy(envoy::data::core::v2alpha::HealthCheckFailureType type); void start() { onIntervalBase(); } + void onDeferredDeleteBase(); protected: ActiveHealthCheckSession(HealthCheckerImplBase& parent, HostSharedPtr host); @@ -66,6 +67,7 @@ class HealthCheckerImplBase : public HealthChecker, void onIntervalBase(); virtual void onTimeout() PURE; void onTimeoutBase(); + virtual void onDeferredDelete() PURE; HealthCheckerImplBase& parent_; Event::TimerPtr interval_timer_; @@ -80,6 +82,7 @@ class HealthCheckerImplBase : public HealthChecker, HealthCheckerImplBase(const Cluster& cluster, const envoy::api::v2::core::HealthCheck& config, Event::Dispatcher& dispatcher, Runtime::Loader& runtime, Runtime::RandomGenerator& random, HealthCheckEventLoggerPtr&& event_logger); + ~HealthCheckerImplBase(); virtual ActiveHealthCheckSessionPtr makeSession(HostSharedPtr host) PURE; virtual envoy::data::core::v2alpha::HealthCheckerType healthCheckerType() const PURE; diff --git a/source/common/upstream/health_checker_impl.cc b/source/common/upstream/health_checker_impl.cc index 09b81c262a975..00a264ef31374 100644 --- a/source/common/upstream/health_checker_impl.cc +++ b/source/common/upstream/health_checker_impl.cc @@ -154,6 +154,11 @@ HttpHealthCheckerImpl::HttpActiveHealthCheckSession::HttpActiveHealthCheckSessio local_address_(std::make_shared("127.0.0.1")) {} HttpHealthCheckerImpl::HttpActiveHealthCheckSession::~HttpActiveHealthCheckSession() { + onDeferredDelete(); + ASSERT(client_ == nullptr); +} + +void HttpHealthCheckerImpl::HttpActiveHealthCheckSession::onDeferredDelete() { if (client_) { // If there is an active request it will get reset, so make sure we ignore the reset. expect_reset_ = true; @@ -349,6 +354,11 @@ TcpHealthCheckerImpl::TcpHealthCheckerImpl(const Cluster& cluster, receive_bytes_(TcpHealthCheckMatcher::loadProtoBytes(config.tcp_health_check().receive())) {} TcpHealthCheckerImpl::TcpActiveHealthCheckSession::~TcpActiveHealthCheckSession() { + onDeferredDelete(); + ASSERT(client_ == nullptr); +} + +void TcpHealthCheckerImpl::TcpActiveHealthCheckSession::onDeferredDelete() { if (client_) { client_->close(Network::ConnectionCloseType::NoFlush); } @@ -448,6 +458,11 @@ GrpcHealthCheckerImpl::GrpcActiveHealthCheckSession::GrpcActiveHealthCheckSessio : ActiveHealthCheckSession(parent, host), parent_(parent) {} GrpcHealthCheckerImpl::GrpcActiveHealthCheckSession::~GrpcActiveHealthCheckSession() { + onDeferredDelete(); + ASSERT(client_ == nullptr); +} + +void GrpcHealthCheckerImpl::GrpcActiveHealthCheckSession::onDeferredDelete() { if (client_) { // If there is an active request it will get reset, so make sure we ignore the reset. expect_reset_ = true; diff --git a/source/common/upstream/health_checker_impl.h b/source/common/upstream/health_checker_impl.h index 450cc734e1178..a1d7b6fcf4071 100644 --- a/source/common/upstream/health_checker_impl.h +++ b/source/common/upstream/health_checker_impl.h @@ -75,6 +75,7 @@ class HttpHealthCheckerImpl : public HealthCheckerImplBase { // ActiveHealthCheckSession void onInterval() override; void onTimeout() override; + void onDeferredDelete() final; // Http::StreamDecoder void decode100ContinueHeaders(Http::HeaderMapPtr&&) override {} @@ -246,6 +247,7 @@ class TcpHealthCheckerImpl : public HealthCheckerImplBase { // ActiveHealthCheckSession void onInterval() override; void onTimeout() override; + void onDeferredDelete() final; TcpHealthCheckerImpl& parent_; Network::ClientConnectionPtr client_; @@ -292,6 +294,7 @@ class GrpcHealthCheckerImpl : public HealthCheckerImplBase { // ActiveHealthCheckSession void onInterval() override; void onTimeout() override; + void onDeferredDelete() final; // Http::StreamDecoder void decode100ContinueHeaders(Http::HeaderMapPtr&&) override {} diff --git a/source/extensions/health_checkers/redis/redis.cc b/source/extensions/health_checkers/redis/redis.cc index 615c99c92f499..b6342316499ae 100644 --- a/source/extensions/health_checkers/redis/redis.cc +++ b/source/extensions/health_checkers/redis/redis.cc @@ -25,6 +25,12 @@ RedisHealthChecker::RedisActiveHealthCheckSession::RedisActiveHealthCheckSession : ActiveHealthCheckSession(parent, host), parent_(parent) {} RedisHealthChecker::RedisActiveHealthCheckSession::~RedisActiveHealthCheckSession() { + onDeferredDelete(); + ASSERT(current_request_ == nullptr); + ASSERT(client_ == nullptr); +} + +void RedisHealthChecker::RedisActiveHealthCheckSession::onDeferredDelete() { if (current_request_) { current_request_->cancel(); current_request_ = nullptr; diff --git a/source/extensions/health_checkers/redis/redis.h b/source/extensions/health_checkers/redis/redis.h index 791171d2d83e2..f43aabe7622a6 100644 --- a/source/extensions/health_checkers/redis/redis.h +++ b/source/extensions/health_checkers/redis/redis.h @@ -50,9 +50,11 @@ class RedisHealthChecker : public Upstream::HealthCheckerImplBase { public Network::ConnectionCallbacks { RedisActiveHealthCheckSession(RedisHealthChecker& parent, const Upstream::HostSharedPtr& host); ~RedisActiveHealthCheckSession(); + // ActiveHealthCheckSession void onInterval() override; void onTimeout() override; + void onDeferredDelete() final; // Extensions::NetworkFilters::Common::Redis::Client::Config bool disableOutlierEvents() const override { return true; } diff --git a/test/common/upstream/hds_test.cc b/test/common/upstream/hds_test.cc index e8268ffb6bbc9..85d40a3f430c5 100644 --- a/test/common/upstream/hds_test.cc +++ b/test/common/upstream/hds_test.cc @@ -217,6 +217,7 @@ TEST_F(HdsTest, TestProcessMessageHealthChecks) { // Check Correctness EXPECT_EQ(hds_delegate_->hdsClusters()[0]->healthCheckers().size(), 2); EXPECT_EQ(hds_delegate_->hdsClusters()[1]->healthCheckers().size(), 3); + EXPECT_CALL(dispatcher_, clearDeferredDeleteList()).Times(5); } // Tests OnReceiveMessage given a minimal HealthCheckSpecifier message @@ -324,6 +325,8 @@ TEST_F(HdsTest, TestSendResponseOneEndpointTimeout) { .socket_address() .port_value(), 1234); + + EXPECT_CALL(dispatcher_, clearDeferredDeleteList()); } } // namespace Upstream diff --git a/test/common/upstream/health_checker_impl_test.cc b/test/common/upstream/health_checker_impl_test.cc index f098a43137d96..5aabd795d0beb 100644 --- a/test/common/upstream/health_checker_impl_test.cc +++ b/test/common/upstream/health_checker_impl_test.cc @@ -70,7 +70,7 @@ TEST(HealthCheckerFactoryTest, GrpcHealthCheckHTTP2NotConfiguredException) { "fake_cluster cluster must support HTTP/2 for gRPC healthchecking"); } -TEST(HealthCheckerFactoryTest, createGrpc) { +TEST(HealthCheckerFactoryTest, CreateGrpc) { NiceMock cluster; EXPECT_CALL(*cluster.info_, features()) @@ -81,6 +81,7 @@ TEST(HealthCheckerFactoryTest, createGrpc) { Event::MockDispatcher dispatcher; AccessLog::MockAccessLogManager log_manager; + EXPECT_CALL(dispatcher, clearDeferredDeleteList()); EXPECT_NE(nullptr, dynamic_cast( HealthCheckerFactory::create(createGrpcHealthCheckConfig(), cluster, runtime, random, dispatcher, log_manager) @@ -1197,6 +1198,28 @@ TEST_F(HttpHealthCheckerImplTest, SuccessStartFailedFailFirstLogError) { expectSuccessStartFailedFailFirst(); } +// Verify that removal during a failure callback works. +TEST_F(HttpHealthCheckerImplTest, HttpFailRemoveHostInCallback) { + setupNoServiceValidationHC(); + cluster_->prioritySet().getMockHostSet(0)->hosts_ = { + makeTestHost(cluster_->info_, "tcp://127.0.0.1:80")}; + expectSessionCreate(); + expectStreamCreate(0); + EXPECT_CALL(*test_sessions_[0]->timeout_timer_, enableTimer(_)); + health_checker_->start(); + + EXPECT_CALL(*this, onHostStatus(_, HealthTransition::Changed)) + .WillOnce(Invoke([&](HostSharedPtr host, HealthTransition) { + cluster_->prioritySet().getMockHostSet(0)->hosts_ = {}; + cluster_->prioritySet().runUpdateCallbacks(0, {}, {host}); + })); + EXPECT_CALL(*event_logger_, logEjectUnhealthy(_, _, _)); + EXPECT_CALL(*test_sessions_[0]->interval_timer_, enableTimer(_)).Times(0); + EXPECT_CALL(*test_sessions_[0]->timeout_timer_, disableTimer()).Times(0); + EXPECT_CALL(*event_logger_, logUnhealthy(_, _, _, true)); + respond(0, "503", false); +} + TEST_F(HttpHealthCheckerImplTest, HttpFail) { setupNoServiceValidationHC(); cluster_->prioritySet().getMockHostSet(0)->hosts_ = { diff --git a/test/extensions/health_checkers/redis/config_test.cc b/test/extensions/health_checkers/redis/config_test.cc index c595c11029404..0985aa90a92a9 100644 --- a/test/extensions/health_checkers/redis/config_test.cc +++ b/test/extensions/health_checkers/redis/config_test.cc @@ -16,7 +16,7 @@ namespace { typedef Extensions::HealthCheckers::RedisHealthChecker::RedisHealthChecker CustomRedisHealthChecker; -TEST(HealthCheckerFactoryTest, createRedis) { +TEST(HealthCheckerFactoryTest, CreateRedis) { const std::string yaml = R"EOF( timeout: 1s interval: 1s @@ -40,7 +40,7 @@ TEST(HealthCheckerFactoryTest, createRedis) { .get())); } -TEST(HealthCheckerFactoryTest, createRedisWithoutKey) { +TEST(HealthCheckerFactoryTest, CreateRedisWithoutKey) { const std::string yaml = R"EOF( timeout: 1s interval: 1s @@ -63,7 +63,7 @@ TEST(HealthCheckerFactoryTest, createRedisWithoutKey) { .get())); } -TEST(HealthCheckerFactoryTest, createRedisWithLogHCFailure) { +TEST(HealthCheckerFactoryTest, CreateRedisWithLogHCFailure) { const std::string yaml = R"EOF( timeout: 1s interval: 1s @@ -87,7 +87,7 @@ TEST(HealthCheckerFactoryTest, createRedisWithLogHCFailure) { .get())); } -TEST(HealthCheckerFactoryTest, createRedisViaUpstreamHealthCheckerFactory) { +TEST(HealthCheckerFactoryTest, CreateRedisViaUpstreamHealthCheckerFactory) { const std::string yaml = R"EOF( timeout: 1s interval: 1s @@ -106,6 +106,8 @@ TEST(HealthCheckerFactoryTest, createRedisViaUpstreamHealthCheckerFactory) { Runtime::MockRandomGenerator random; Event::MockDispatcher dispatcher; AccessLog::MockAccessLogManager log_manager; + + EXPECT_CALL(dispatcher, clearDeferredDeleteList()); EXPECT_NE(nullptr, dynamic_cast( Upstream::HealthCheckerFactory::create( Upstream::parseHealthCheckFromV2Yaml(yaml), cluster, runtime, random, diff --git a/test/integration/eds_integration_test.cc b/test/integration/eds_integration_test.cc index d48c69be768ee..4bcd9c2bf7c50 100644 --- a/test/integration/eds_integration_test.cc +++ b/test/integration/eds_integration_test.cc @@ -21,7 +21,7 @@ class EdsIntegrationTest : public testing::TestWithParam overprovisioning_factor = absl::nullopt) { ASSERT(total_endpoints >= healthy_endpoints + degraded_endpoints); envoy::api::v2::ClusterLoadAssignment cluster_load_assignment; @@ -36,27 +36,42 @@ class EdsIntegrationTest : public testing::TestWithParamadd_lb_endpoints(); setUpstreamAddress(i, *endpoint); // First N endpoints are degraded, next M are healthy and the remaining endpoints are - // unhealthy. + // unhealthy or unknown depending on remaining_unhealthy. if (i < degraded_endpoints) { endpoint->set_health_status(envoy::api::v2::core::HealthStatus::DEGRADED); } else if (i >= healthy_endpoints + degraded_endpoints) { - endpoint->set_health_status(envoy::api::v2::core::HealthStatus::UNHEALTHY); + endpoint->set_health_status(remaining_unhealthy + ? envoy::api::v2::core::HealthStatus::UNHEALTHY + : envoy::api::v2::core::HealthStatus::UNKNOWN); } } eds_helper_.setEds({cluster_load_assignment}, *test_server_); } - void initialize() override { + void initializeTest(bool http_active_hc) { setUpstreamCount(4); - config_helper_.addConfigModifier([this](envoy::config::bootstrap::v2::Bootstrap& bootstrap) { - // Switch predefined cluster_0 to EDS filesystem sourcing. - auto* cluster_0 = bootstrap.mutable_static_resources()->mutable_clusters(0); - cluster_0->mutable_hosts()->Clear(); - cluster_0->set_type(envoy::api::v2::Cluster::EDS); - auto* eds_cluster_config = cluster_0->mutable_eds_cluster_config(); - eds_cluster_config->mutable_eds_config()->set_path(eds_helper_.eds_path()); - }); - HttpIntegrationTest::initialize(); + config_helper_.addConfigModifier( + [this, http_active_hc](envoy::config::bootstrap::v2::Bootstrap& bootstrap) { + // Switch predefined cluster_0 to EDS filesystem sourcing. + auto* cluster_0 = bootstrap.mutable_static_resources()->mutable_clusters(0); + cluster_0->mutable_hosts()->Clear(); + cluster_0->set_type(envoy::api::v2::Cluster::EDS); + auto* eds_cluster_config = cluster_0->mutable_eds_cluster_config(); + eds_cluster_config->mutable_eds_config()->set_path(eds_helper_.eds_path()); + if (http_active_hc) { + auto* health_check = cluster_0->add_health_checks(); + health_check->mutable_timeout()->set_seconds(30); + // TODO(mattklein123): Consider using simulated time here. + health_check->mutable_interval()->CopyFrom( + Protobuf::util::TimeUtil::MillisecondsToDuration(100)); + health_check->mutable_no_traffic_interval()->CopyFrom( + Protobuf::util::TimeUtil::MillisecondsToDuration(100)); + health_check->mutable_unhealthy_threshold()->set_value(1); + health_check->mutable_healthy_threshold()->set_value(1); + health_check->mutable_http_health_check()->set_path("/healthcheck"); + } + }); + initialize(); setEndpoints(0, 0, 0); } @@ -66,9 +81,36 @@ class EdsIntegrationTest : public testing::TestWithParamset_allow_unexpected_disconnects(true); + setEndpoints(1, 0, 0, false); + EXPECT_EQ(1, test_server_->gauge("cluster.cluster_0.membership_total")->value()); + EXPECT_EQ(0, test_server_->gauge("cluster.cluster_0.membership_healthy")->value()); + + // Wait for the first HC and verify the host is healthy. + waitForNextUpstreamRequest(); + upstream_request_->encodeHeaders(Http::TestHeaderMapImpl{{":status", "200"}}, true); + test_server_->waitForGaugeEq("cluster.cluster_0.membership_healthy", 1); + EXPECT_EQ(1, test_server_->gauge("cluster.cluster_0.membership_total")->value()); + + // Clear out the host and verify the host is still healthy. + setEndpoints(0, 0, 0); + EXPECT_EQ(1, test_server_->gauge("cluster.cluster_0.membership_total")->value()); + EXPECT_EQ(1, test_server_->gauge("cluster.cluster_0.membership_healthy")->value()); + + // Fail HC and verify the host is gone. + waitForNextUpstreamRequest(); + upstream_request_->encodeHeaders(Http::TestHeaderMapImpl{{":status", "503"}}, true); + test_server_->waitForGaugeEq("cluster.cluster_0.membership_healthy", 0); + EXPECT_EQ(0, test_server_->gauge("cluster.cluster_0.membership_total")->value()); +} + // Validate that health status updates are consumed from EDS. TEST_P(EdsIntegrationTest, HealthUpdate) { - initialize(); + initializeTest(false); // Initial state, no cluster members. EXPECT_EQ(0, test_server_->counter("cluster.cluster_0.membership_change")->value()); EXPECT_EQ(0, test_server_->gauge("cluster.cluster_0.membership_total")->value()); @@ -103,7 +145,7 @@ TEST_P(EdsIntegrationTest, HealthUpdate) { // Validate that overprovisioning_factor update are picked up by Envoy. TEST_P(EdsIntegrationTest, OverprovisioningFactorUpdate) { - initialize(); + initializeTest(false); // Default overprovisioning factor. setEndpoints(4, 4, 0); auto get_and_compare = [this](const uint32_t expected_factor) { @@ -119,13 +161,13 @@ TEST_P(EdsIntegrationTest, OverprovisioningFactorUpdate) { get_and_compare(Envoy::Upstream::kDefaultOverProvisioningFactor); // Use new overprovisioning factor 200. - setEndpoints(4, 4, 0, 200); + setEndpoints(4, 4, 0, true, 200); get_and_compare(200); } // Verifies that EDS update only triggers member update callbacks once per update. TEST_P(EdsIntegrationTest, BatchMemberUpdateCb) { - initialize(); + initializeTest(false); uint32_t member_update_count{}; diff --git a/test/test_listener.cc b/test/test_listener.cc index b6cd05b0e3c17..61e6b6b12bf3c 100644 --- a/test/test_listener.cc +++ b/test/test_listener.cc @@ -11,7 +11,9 @@ void TestListener::OnTestEnd(const ::testing::TestInfo& test_info) { std::string active_singletons = Envoy::Test::Globals::describeActiveSingletons(); RELEASE_ASSERT(active_singletons.empty(), absl::StrCat("FAIL [", test_info.test_suite_name(), ".", test_info.name(), - "]: Active singletons exist:\n", active_singletons)); + "]: Active singletons exist. Something is leaking. Consider " + "commenting out this assert and letting the heap checker run:\n", + active_singletons)); } } // namespace Envoy