diff --git a/api/envoy/admin/v2alpha/clusters.proto b/api/envoy/admin/v2alpha/clusters.proto index be7011be03036..a910f35dc470e 100644 --- a/api/envoy/admin/v2alpha/clusters.proto +++ b/api/envoy/admin/v2alpha/clusters.proto @@ -78,6 +78,10 @@ message HostHealthStatus { // The host is currently being marked as degraded through active health checking. bool failed_active_degraded_check = 4; + // The host has been removed from service discovery, but is being stabilized due to active + // health checking. + bool pending_dynamic_removal = 5; + // Health status as reported by EDS. Note: only HEALTHY and UNHEALTHY are currently supported // here. // TODO(mrice32): pipe through remaining EDS health status possibilities. diff --git a/include/envoy/upstream/health_checker.h b/include/envoy/upstream/health_checker.h index a625bd5a1bd75..9d8041235366d 100644 --- a/include/envoy/upstream/health_checker.h +++ b/include/envoy/upstream/health_checker.h @@ -40,7 +40,8 @@ class HealthChecker { * @param changed_state supplies whether the health check resulted in a host moving from healthy * to not healthy or vice versa. */ - typedef std::function HostStatusCb; + typedef std::function + HostStatusCb; /** * Install a callback that will be invoked every time a health check round is completed for diff --git a/include/envoy/upstream/outlier_detection.h b/include/envoy/upstream/outlier_detection.h index d9b1c9b4c8786..d5ee2fbcd938c 100644 --- a/include/envoy/upstream/outlier_detection.h +++ b/include/envoy/upstream/outlier_detection.h @@ -98,7 +98,7 @@ class Detector { /** * Outlier detection change state callback. */ - typedef std::function ChangeStateCb; + typedef std::function ChangeStateCb; /** * Add a changed state callback to the detector. The callback will be called whenever any host diff --git a/include/envoy/upstream/upstream.h b/include/envoy/upstream/upstream.h index 25f431860fbc9..9841ec7d09328 100644 --- a/include/envoy/upstream/upstream.h +++ b/include/envoy/upstream/upstream.h @@ -51,7 +51,10 @@ class Host : virtual public HostDescription { /* The host is currently marked as degraded through active health checking. */ \ m(DEGRADED_ACTIVE_HC, 0x08) \ /* The host is currently marked as degraded by EDS. */ \ - m(DEGRADED_EDS_HEALTH, 0x10) + m(DEGRADED_EDS_HEALTH, 0x10) \ + /* The host is pending removal from discovery but is stabilized due to */ \ + /* active HC. */ \ + m(PENDING_DYNAMIC_REMOVAL, 0x20) // clang-format on #define DECLARE_ENUM(name, value) name = value, diff --git a/source/common/upstream/eds.cc b/source/common/upstream/eds.cc index 98a9654e838a5..c76247c92dd0e 100644 --- a/source/common/upstream/eds.cc +++ b/source/common/upstream/eds.cc @@ -150,6 +150,52 @@ void EdsClusterImpl::onAssignmentTimeout() { info_->stats().assignment_stale_.inc(); } +void EdsClusterImpl::reloadHealthyHostsHelper(const HostSharedPtr& host) { + // Here we will see if we have a host that has been marked for deletion by service discovery + // but has been stabilized due to passing active health checking. If such a host is now + // failing active health checking we can remove it during this health check update. + HostSharedPtr host_to_exclude = host; + if (host_to_exclude != nullptr && + host_to_exclude->healthFlagGet(Host::HealthFlag::FAILED_ACTIVE_HC) && + host_to_exclude->healthFlagGet(Host::HealthFlag::PENDING_DYNAMIC_REMOVAL)) { + // Empty for clarity. + } else { + // Do not exclude and remove the host during the update. + host_to_exclude = nullptr; + } + + const auto& host_sets = prioritySet().hostSetsPerPriority(); + for (size_t priority = 0; priority < host_sets.size(); ++priority) { + const auto& host_set = host_sets[priority]; + + // Filter current hosts in case we need to exclude a host. + HostVectorSharedPtr hosts_copy(new HostVector()); + std::copy_if(host_set->hosts().begin(), host_set->hosts().end(), + std::back_inserter(*hosts_copy), + [&host_to_exclude](const HostSharedPtr& host) { return host_to_exclude != host; }); + + // Setup a hosts to remove vector in case we need to exclude a host. + HostVector hosts_to_remove; + if (hosts_copy->size() != host_set->hosts().size()) { + ASSERT(hosts_copy->size() == host_set->hosts().size() - 1); + hosts_to_remove.emplace_back(host_to_exclude); + } + + // Filter hosts per locality in case we need to exclude a host. + HostsPerLocalityConstSharedPtr hosts_per_locality_copy = host_set->hostsPerLocality().filter( + {[&host_to_exclude](const Host& host) { return &host != host_to_exclude.get(); }})[0]; + + prioritySet().updateHosts(priority, + HostSetImpl::partitionHosts(hosts_copy, hosts_per_locality_copy), + host_set->localityWeights(), {}, hosts_to_remove, absl::nullopt); + } + + if (host_to_exclude != nullptr) { + ASSERT(all_hosts_.find(host_to_exclude->address()->asString()) != all_hosts_.end()); + all_hosts_.erase(host_to_exclude->address()->asString()); + } +} + bool EdsClusterImpl::updateHostsPerLocality( const uint32_t priority, const uint32_t overprovisioning_factor, const HostVector& new_hosts, LocalityWeightsMap& locality_weights_map, LocalityWeightsMap& new_locality_weights_map, diff --git a/source/common/upstream/eds.h b/source/common/upstream/eds.h index b2b3139031dae..ac2a95411b489 100644 --- a/source/common/upstream/eds.h +++ b/source/common/upstream/eds.h @@ -52,6 +52,7 @@ class EdsClusterImpl : public BaseDynamicClusterImpl, Config::SubscriptionCallba std::unordered_map& updated_hosts); // ClusterImplBase + void reloadHealthyHostsHelper(const HostSharedPtr& host) override; void startPreInit() override; void onAssignmentTimeout(); diff --git a/source/common/upstream/upstream_impl.cc b/source/common/upstream/upstream_impl.cc index 0e1cf05619b7f..b385350e1b1a1 100644 --- a/source/common/upstream/upstream_impl.cc +++ b/source/common/upstream/upstream_impl.cc @@ -777,7 +777,7 @@ void ClusterImplBase::finishInitialization() { initialization_complete_callback_ = nullptr; if (health_checker_ != nullptr) { - reloadHealthyHosts(); + reloadHealthyHosts(nullptr); } if (snapped_callback != nullptr) { @@ -790,11 +790,11 @@ void ClusterImplBase::setHealthChecker(const HealthCheckerSharedPtr& health_chec health_checker_ = health_checker; health_checker_->start(); health_checker_->addHostCheckCompleteCb( - [this](HostSharedPtr, HealthTransition changed_state) -> void { + [this](const HostSharedPtr& host, HealthTransition changed_state) -> void { // If we get a health check completion that resulted in a state change, signal to // update the host sets on all threads. if (changed_state == HealthTransition::Changed) { - reloadHealthyHosts(); + reloadHealthyHosts(host); } }); } @@ -805,10 +805,11 @@ void ClusterImplBase::setOutlierDetector(const Outlier::DetectorSharedPtr& outli } outlier_detector_ = outlier_detector; - outlier_detector_->addChangedStateCb([this](HostSharedPtr) -> void { reloadHealthyHosts(); }); + outlier_detector_->addChangedStateCb( + [this](const HostSharedPtr& host) -> void { reloadHealthyHosts(host); }); } -void ClusterImplBase::reloadHealthyHosts() { +void ClusterImplBase::reloadHealthyHosts(const HostSharedPtr& host) { // Every time a host changes Health Check state we cause a full healthy host recalculation which // for expensive LBs (ring, subset, etc.) can be quite time consuming. During startup, this // can also block worker threads by doing this repeatedly. There is no reason to do this @@ -818,6 +819,10 @@ void ClusterImplBase::reloadHealthyHosts() { return; } + reloadHealthyHostsHelper(host); +} + +void ClusterImplBase::reloadHealthyHostsHelper(const HostSharedPtr&) { const auto& host_sets = prioritySet().hostSetsPerPriority(); for (size_t priority = 0; priority < host_sets.size(); ++priority) { const auto& host_set = host_sets[priority]; @@ -1129,6 +1134,12 @@ bool BaseDynamicClusterImpl::updateDynamicHostList(const HostVector& new_hosts, auto existing_host = all_hosts.find(host->address()->asString()); const bool existing_host_found = existing_host != all_hosts.end(); + // Clear any pending deletion flag on an existing host in case it came back while it was + // being stabilized. We will set it again below if needed. + if (existing_host_found) { + existing_host->second->healthFlagClear(Host::HealthFlag::PENDING_DYNAMIC_REMOVAL); + } + // Check if in-place host update should be skipped, i.e. when the following criteria are met // (currently there is only one criterion, but we might add more in the future): // - The cluster health checker is activated and a new host is matched with the existing one, @@ -1233,6 +1244,7 @@ bool BaseDynamicClusterImpl::updateDynamicHostList(const HostVector& new_hosts, final_hosts.push_back(*i); updated_hosts[(*i)->address()->asString()] = *i; + (*i)->healthFlagSet(Host::HealthFlag::PENDING_DYNAMIC_REMOVAL); i = current_priority_hosts.erase(i); } else { i++; diff --git a/source/common/upstream/upstream_impl.h b/source/common/upstream/upstream_impl.h index cb10291ea233f..5bbf793e7bf4b 100644 --- a/source/common/upstream/upstream_impl.h +++ b/source/common/upstream/upstream_impl.h @@ -202,10 +202,6 @@ class HostImpl : public HostDescriptionImpl, outlier_detector_ = std::move(outlier_detector); } Host::Health health() const override { - if (!health_flags_) { - return Host::Health::Healthy; - } - // If any of the unhealthy flags are set, host is unhealthy. if (healthFlagGet(HealthFlag::FAILED_ACTIVE_HC) || healthFlagGet(HealthFlag::FAILED_OUTLIER_CHECK) || @@ -213,10 +209,15 @@ class HostImpl : public HostDescriptionImpl, return Host::Health::Unhealthy; } - // Only possible option at this point is that the host is degraded. - ASSERT(healthFlagGet(HealthFlag::DEGRADED_ACTIVE_HC) || - healthFlagGet(HealthFlag::DEGRADED_EDS_HEALTH)); - return Host::Health::Degraded; + // If any of the degraded flags are set, host is degraded. + if (healthFlagGet(HealthFlag::DEGRADED_ACTIVE_HC) || + healthFlagGet(HealthFlag::DEGRADED_EDS_HEALTH)) { + return Host::Health::Degraded; + } + + // The host must have no flags or be pending removal. + ASSERT(health_flags_ == 0 || healthFlagGet(HealthFlag::PENDING_DYNAMIC_REMOVAL)); + return Host::Health::Healthy; } uint32_t weight() const override { return weight_; } @@ -401,7 +402,6 @@ typedef std::unique_ptr HostSetImplPtr; /** * A class for management of the set of hosts in a given cluster. */ - class PrioritySetImpl : public PrioritySet { public: PrioritySetImpl() : batch_update_(false) {} @@ -698,7 +698,8 @@ class ClusterImplBase : public Cluster, protected Logger::Loggable initialization_complete_callback_; diff --git a/source/server/http/admin.cc b/source/server/http/admin.cc index 8fb1fde7d74e5..daf4818a74cb0 100644 --- a/source/server/http/admin.cc +++ b/source/server/http/admin.cc @@ -182,6 +182,10 @@ void setHealthFlag(Upstream::Host::HealthFlag flag, const Upstream::Host& host, health_status.set_failed_active_degraded_check( host.healthFlagGet(Upstream::Host::HealthFlag::DEGRADED_ACTIVE_HC)); break; + case Upstream::Host::HealthFlag::PENDING_DYNAMIC_REMOVAL: + health_status.set_pending_dynamic_removal( + host.healthFlagGet(Upstream::Host::HealthFlag::PENDING_DYNAMIC_REMOVAL)); + break; } } } // namespace diff --git a/test/common/upstream/eds_test.cc b/test/common/upstream/eds_test.cc index be66f4f013218..acbebded4e96b 100644 --- a/test/common/upstream/eds_test.cc +++ b/test/common/upstream/eds_test.cc @@ -478,6 +478,112 @@ TEST_F(EdsTest, EndpointHealthStatus) { EXPECT_EQ(rebuild_container + 1, stats_.counter("cluster.name.update_no_rebuild").value()); } +// Verify that a host is removed if it is removed from discovery, stabilized, and then later +// fails active HC. +TEST_F(EdsTest, EndpoingRemovalAfterHcFail) { + envoy::api::v2::ClusterLoadAssignment cluster_load_assignment; + cluster_load_assignment.set_cluster_name("fare"); + + auto health_checker = std::make_shared(); + EXPECT_CALL(*health_checker, start()); + EXPECT_CALL(*health_checker, addHostCheckCompleteCb(_)).Times(2); + cluster_->setHealthChecker(health_checker); + + auto add_endpoint = [&cluster_load_assignment](int port) { + auto* endpoints = cluster_load_assignment.add_endpoints(); + + auto* socket_address = endpoints->add_lb_endpoints() + ->mutable_endpoint() + ->mutable_address() + ->mutable_socket_address(); + socket_address->set_address("1.2.3.4"); + socket_address->set_port_value(port); + }; + + add_endpoint(80); + add_endpoint(81); + doOnConfigUpdateVerifyNoThrow(cluster_load_assignment); + + { + auto& hosts = cluster_->prioritySet().hostSetsPerPriority()[0]->hosts(); + EXPECT_EQ(hosts.size(), 2); + + // Mark the hosts as healthy + hosts[0]->healthFlagClear(Host::HealthFlag::FAILED_ACTIVE_HC); + hosts[1]->healthFlagClear(Host::HealthFlag::FAILED_ACTIVE_HC); + } + + // Remove endpoints and add back the port 80 one. Both hosts should be present due to + // being stabilized, but one of them should be marked pending removal. + cluster_load_assignment.clear_endpoints(); + add_endpoint(80); + doOnConfigUpdateVerifyNoThrow(cluster_load_assignment); + + { + auto& hosts = cluster_->prioritySet().hostSetsPerPriority()[0]->hosts(); + EXPECT_EQ(hosts.size(), 2); + EXPECT_FALSE(hosts[0]->healthFlagGet(Host::HealthFlag::PENDING_DYNAMIC_REMOVAL)); + EXPECT_TRUE(hosts[1]->healthFlagGet(Host::HealthFlag::PENDING_DYNAMIC_REMOVAL)); + } + + // Add both hosts back, make sure pending removal is gone. + cluster_load_assignment.clear_endpoints(); + add_endpoint(80); + add_endpoint(81); + doOnConfigUpdateVerifyNoThrow(cluster_load_assignment); + + { + auto& hosts = cluster_->prioritySet().hostSetsPerPriority()[0]->hosts(); + EXPECT_EQ(hosts.size(), 2); + EXPECT_FALSE(hosts[0]->healthFlagGet(Host::HealthFlag::PENDING_DYNAMIC_REMOVAL)); + EXPECT_FALSE(hosts[1]->healthFlagGet(Host::HealthFlag::PENDING_DYNAMIC_REMOVAL)); + } + + // Remove endpoints and add back the port 80 one. Both hosts should be present due to + // being stabilized, but one of them should be marked pending removal. + cluster_load_assignment.clear_endpoints(); + add_endpoint(80); + doOnConfigUpdateVerifyNoThrow(cluster_load_assignment); + + HostSharedPtr not_removed_host; + HostSharedPtr removed_host; + { + EXPECT_EQ(2, + cluster_->prioritySet().hostSetsPerPriority()[0]->hostsPerLocality().get()[0].size()); + auto& hosts = cluster_->prioritySet().hostSetsPerPriority()[0]->hosts(); + EXPECT_EQ(hosts.size(), 2); + EXPECT_FALSE(hosts[0]->healthFlagGet(Host::HealthFlag::PENDING_DYNAMIC_REMOVAL)); + EXPECT_TRUE(hosts[1]->healthFlagGet(Host::HealthFlag::PENDING_DYNAMIC_REMOVAL)); + + // Mark the host is failing active HC and then run callbacks. + not_removed_host = hosts[0]; + removed_host = hosts[1]; + hosts[1]->healthFlagSet(Host::HealthFlag::FAILED_ACTIVE_HC); + health_checker->runCallbacks(hosts[1], HealthTransition::Changed); + } + + { + EXPECT_EQ(1, + cluster_->prioritySet().hostSetsPerPriority()[0]->hostsPerLocality().get()[0].size()); + EXPECT_EQ(1, cluster_->prioritySet().hostSetsPerPriority()[0]->hosts().size()); + } + + // Add back 81. Verify that we have a new host. This will show that the all_hosts_ was updated + // correctly. + cluster_load_assignment.clear_endpoints(); + add_endpoint(80); + add_endpoint(81); + doOnConfigUpdateVerifyNoThrow(cluster_load_assignment); + + { + auto& hosts = cluster_->prioritySet().hostSetsPerPriority()[0]->hosts(); + EXPECT_EQ(hosts.size(), 2); + EXPECT_EQ(not_removed_host, hosts[0]); + EXPECT_EQ(removed_host->address()->asString(), hosts[1]->address()->asString()); + EXPECT_NE(removed_host, hosts[1]); + } +} + // Verify that a host is removed when it is still passing active HC, but has been previously // told by the EDS server to fail health check. TEST_F(EdsTest, EndpointRemovalEdsFailButActiveHcSuccess) { diff --git a/test/common/upstream/upstream_impl_test.cc b/test/common/upstream/upstream_impl_test.cc index 64864f91f29fc..3f1218f2f4d3b 100644 --- a/test/common/upstream/upstream_impl_test.cc +++ b/test/common/upstream/upstream_impl_test.cc @@ -410,6 +410,76 @@ TEST_F(StrictDnsClusterImplTest, HostRemovalActiveHealthSkipped) { EXPECT_EQ(1UL, hosts.size()); } +// Verify that a host is not removed if it is removed from DNS but still passing active health +// checking. +TEST_F(StrictDnsClusterImplTest, HostRemovalAfterHcFail) { + const std::string yaml = R"EOF( + name: name + connect_timeout: 0.25s + type: STRICT_DNS + lb_policy: ROUND_ROBIN + hosts: [{ socket_address: { address: foo.bar.com, port_value: 443 }}] + )EOF"; + + ResolverData resolver(*dns_resolver_, dispatcher_); + envoy::api::v2::Cluster cluster_config = parseClusterFromV2Yaml(yaml); + Envoy::Stats::ScopePtr scope = stats_.createScope(fmt::format( + "cluster.{}.", cluster_config.alt_stat_name().empty() ? cluster_config.name() + : cluster_config.alt_stat_name())); + Envoy::Server::Configuration::TransportSocketFactoryContextImpl factory_context( + admin_, ssl_context_manager_, *scope, cm_, local_info_, dispatcher_, random_, stats_, + singleton_manager_, tls_, *api_); + StrictDnsClusterImpl cluster(cluster_config, runtime_, dns_resolver_, factory_context, + std::move(scope), false); + std::shared_ptr health_checker(new MockHealthChecker()); + EXPECT_CALL(*health_checker, start()); + EXPECT_CALL(*health_checker, addHostCheckCompleteCb(_)); + cluster.setHealthChecker(health_checker); + ReadyWatcher initialized; + cluster.initialize([&initialized]() { initialized.ready(); }); + + EXPECT_CALL(*health_checker, addHostCheckCompleteCb(_)); + EXPECT_CALL(*resolver.timer_, enableTimer(_)).Times(2); + resolver.dns_callback_(TestUtility::makeDnsResponse({"127.0.0.1", "127.0.0.2"})); + + // Verify that both endpoints are initially marked with FAILED_ACTIVE_HC, then + // clear the flag to simulate that these endpoints have been successfully health + // checked. + { + const auto& hosts = cluster.prioritySet().hostSetsPerPriority()[0]->hosts(); + EXPECT_EQ(2UL, hosts.size()); + + for (size_t i = 0; i < 2; ++i) { + EXPECT_TRUE(hosts[i]->healthFlagGet(Host::HealthFlag::FAILED_ACTIVE_HC)); + hosts[i]->healthFlagClear(Host::HealthFlag::FAILED_ACTIVE_HC); + if (i == 1) { + EXPECT_CALL(initialized, ready()); + } + health_checker->runCallbacks(hosts[i], HealthTransition::Changed); + } + } + + // Re-resolve the DNS name with only one record, we should still have 2 hosts. + resolver.dns_callback_(TestUtility::makeDnsResponse({"127.0.0.1"})); + + { + const auto& hosts = cluster.prioritySet().hostSetsPerPriority()[0]->hosts(); + EXPECT_EQ(2UL, hosts.size()); + EXPECT_FALSE(hosts[0]->healthFlagGet(Host::HealthFlag::PENDING_DYNAMIC_REMOVAL)); + EXPECT_TRUE(hosts[1]->healthFlagGet(Host::HealthFlag::PENDING_DYNAMIC_REMOVAL)); + + hosts[1]->healthFlagSet(Host::HealthFlag::FAILED_ACTIVE_HC); + health_checker->runCallbacks(hosts[1], HealthTransition::Changed); + } + + // Unlike EDS we will not remove if HC is failing but will wait until the next polling interval. + // This may change in the future. + { + const auto& hosts = cluster.prioritySet().hostSetsPerPriority()[0]->hosts(); + EXPECT_EQ(2UL, hosts.size()); + } +} + TEST_F(StrictDnsClusterImplTest, LoadAssignmentBasic) { // gmock matches in LIFO order which is why these are swapped. ResolverData resolver3(*dns_resolver_, dispatcher_); diff --git a/test/server/http/admin_test.cc b/test/server/http/admin_test.cc index 21ece6c4336a7..98a2620c5331a 100644 --- a/test/server/http/admin_test.cc +++ b/test/server/http/admin_test.cc @@ -1059,6 +1059,8 @@ TEST_P(AdminInstanceTest, ClustersJson) { .WillByDefault(Return(true)); ON_CALL(*host, healthFlagGet(Upstream::Host::HealthFlag::DEGRADED_EDS_HEALTH)) .WillByDefault(Return(true)); + ON_CALL(*host, healthFlagGet(Upstream::Host::HealthFlag::PENDING_DYNAMIC_REMOVAL)) + .WillByDefault(Return(true)); ON_CALL(host->outlier_detector_, successRate()).WillByDefault(Return(43.2)); ON_CALL(*host, weight()).WillByDefault(Return(5)); @@ -1118,7 +1120,8 @@ TEST_P(AdminInstanceTest, ClustersJson) { "eds_health_status": "DEGRADED", "failed_active_health_check": true, "failed_outlier_check": true, - "failed_active_degraded_check": true + "failed_active_degraded_check": true, + "pending_dynamic_removal": true }, "success_rate": { "value": 43.2