Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions api/envoy/admin/v2alpha/clusters.proto
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,10 @@ message HostHealthStatus {
// The host is currently being marked as degraded through active health checking.
bool failed_active_degraded_check = 4;

// The host has been removed from service discovery, but is being stabilized due to active
// health checking.
bool pending_dynamic_removal = 5;

// Health status as reported by EDS. Note: only HEALTHY and UNHEALTHY are currently supported
// here.
// TODO(mrice32): pipe through remaining EDS health status possibilities.
Expand Down
3 changes: 2 additions & 1 deletion include/envoy/upstream/health_checker.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ class HealthChecker {
* @param changed_state supplies whether the health check resulted in a host moving from healthy
* to not healthy or vice versa.
*/
typedef std::function<void(HostSharedPtr host, HealthTransition changed_state)> HostStatusCb;
typedef std::function<void(const HostSharedPtr& host, HealthTransition changed_state)>
HostStatusCb;

/**
* Install a callback that will be invoked every time a health check round is completed for
Expand Down
2 changes: 1 addition & 1 deletion include/envoy/upstream/outlier_detection.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ class Detector {
/**
* Outlier detection change state callback.
*/
typedef std::function<void(HostSharedPtr host)> ChangeStateCb;
typedef std::function<void(const HostSharedPtr& host)> ChangeStateCb;

/**
* Add a changed state callback to the detector. The callback will be called whenever any host
Expand Down
5 changes: 4 additions & 1 deletion include/envoy/upstream/upstream.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,10 @@ class Host : virtual public HostDescription {
/* The host is currently marked as degraded through active health checking. */ \
m(DEGRADED_ACTIVE_HC, 0x08) \
/* The host is currently marked as degraded by EDS. */ \
m(DEGRADED_EDS_HEALTH, 0x10)
m(DEGRADED_EDS_HEALTH, 0x10) \
/* The host is pending removal from discovery but is stabilized due to */ \
/* active HC. */ \
m(PENDING_DYNAMIC_REMOVAL, 0x20)
// clang-format on

#define DECLARE_ENUM(name, value) name = value,
Expand Down
46 changes: 46 additions & 0 deletions source/common/upstream/eds.cc
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,52 @@ void EdsClusterImpl::onAssignmentTimeout() {
info_->stats().assignment_stale_.inc();
}

void EdsClusterImpl::reloadHealthyHostsHelper(const HostSharedPtr& host) {
// Here we will see if we have a host that has been marked for deletion by service discovery
// but has been stabilized due to passing active health checking. If such a host is now
// failing active health checking we can remove it during this health check update.
HostSharedPtr host_to_exclude = host;
if (host_to_exclude != nullptr &&
host_to_exclude->healthFlagGet(Host::HealthFlag::FAILED_ACTIVE_HC) &&
host_to_exclude->healthFlagGet(Host::HealthFlag::PENDING_DYNAMIC_REMOVAL)) {
// Empty for clarity.
} else {
// Do not exclude and remove the host during the update.
host_to_exclude = nullptr;
}

const auto& host_sets = prioritySet().hostSetsPerPriority();
for (size_t priority = 0; priority < host_sets.size(); ++priority) {
const auto& host_set = host_sets[priority];

// Filter current hosts in case we need to exclude a host.
HostVectorSharedPtr hosts_copy(new HostVector());
std::copy_if(host_set->hosts().begin(), host_set->hosts().end(),
std::back_inserter(*hosts_copy),
[&host_to_exclude](const HostSharedPtr& host) { return host_to_exclude != host; });

// Setup a hosts to remove vector in case we need to exclude a host.
HostVector hosts_to_remove;
if (hosts_copy->size() != host_set->hosts().size()) {
ASSERT(hosts_copy->size() == host_set->hosts().size() - 1);
hosts_to_remove.emplace_back(host_to_exclude);
}

// Filter hosts per locality in case we need to exclude a host.
HostsPerLocalityConstSharedPtr hosts_per_locality_copy = host_set->hostsPerLocality().filter(
{[&host_to_exclude](const Host& host) { return &host != host_to_exclude.get(); }})[0];

prioritySet().updateHosts(priority,
HostSetImpl::partitionHosts(hosts_copy, hosts_per_locality_copy),
host_set->localityWeights(), {}, hosts_to_remove, absl::nullopt);
}

if (host_to_exclude != nullptr) {
ASSERT(all_hosts_.find(host_to_exclude->address()->asString()) != all_hosts_.end());
all_hosts_.erase(host_to_exclude->address()->asString());
}
}

bool EdsClusterImpl::updateHostsPerLocality(
const uint32_t priority, const uint32_t overprovisioning_factor, const HostVector& new_hosts,
LocalityWeightsMap& locality_weights_map, LocalityWeightsMap& new_locality_weights_map,
Expand Down
1 change: 1 addition & 0 deletions source/common/upstream/eds.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ class EdsClusterImpl : public BaseDynamicClusterImpl, Config::SubscriptionCallba
std::unordered_map<std::string, HostSharedPtr>& updated_hosts);

// ClusterImplBase
void reloadHealthyHostsHelper(const HostSharedPtr& host) override;
void startPreInit() override;
void onAssignmentTimeout();

Expand Down
22 changes: 17 additions & 5 deletions source/common/upstream/upstream_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -777,7 +777,7 @@ void ClusterImplBase::finishInitialization() {
initialization_complete_callback_ = nullptr;

if (health_checker_ != nullptr) {
reloadHealthyHosts();
reloadHealthyHosts(nullptr);
}

if (snapped_callback != nullptr) {
Expand All @@ -790,11 +790,11 @@ void ClusterImplBase::setHealthChecker(const HealthCheckerSharedPtr& health_chec
health_checker_ = health_checker;
health_checker_->start();
health_checker_->addHostCheckCompleteCb(
[this](HostSharedPtr, HealthTransition changed_state) -> void {
[this](const HostSharedPtr& host, HealthTransition changed_state) -> void {
// If we get a health check completion that resulted in a state change, signal to
// update the host sets on all threads.
if (changed_state == HealthTransition::Changed) {
reloadHealthyHosts();
reloadHealthyHosts(host);
}
});
}
Expand All @@ -805,10 +805,11 @@ void ClusterImplBase::setOutlierDetector(const Outlier::DetectorSharedPtr& outli
}

outlier_detector_ = outlier_detector;
outlier_detector_->addChangedStateCb([this](HostSharedPtr) -> void { reloadHealthyHosts(); });
outlier_detector_->addChangedStateCb(
[this](const HostSharedPtr& host) -> void { reloadHealthyHosts(host); });
}

void ClusterImplBase::reloadHealthyHosts() {
void ClusterImplBase::reloadHealthyHosts(const HostSharedPtr& host) {
// Every time a host changes Health Check state we cause a full healthy host recalculation which
// for expensive LBs (ring, subset, etc.) can be quite time consuming. During startup, this
// can also block worker threads by doing this repeatedly. There is no reason to do this
Expand All @@ -818,6 +819,10 @@ void ClusterImplBase::reloadHealthyHosts() {
return;
}

reloadHealthyHostsHelper(host);
}

void ClusterImplBase::reloadHealthyHostsHelper(const HostSharedPtr&) {
const auto& host_sets = prioritySet().hostSetsPerPriority();
for (size_t priority = 0; priority < host_sets.size(); ++priority) {
const auto& host_set = host_sets[priority];
Expand Down Expand Up @@ -1129,6 +1134,12 @@ bool BaseDynamicClusterImpl::updateDynamicHostList(const HostVector& new_hosts,
auto existing_host = all_hosts.find(host->address()->asString());
const bool existing_host_found = existing_host != all_hosts.end();

// Clear any pending deletion flag on an existing host in case it came back while it was
// being stabilized. We will set it again below if needed.
if (existing_host_found) {
existing_host->second->healthFlagClear(Host::HealthFlag::PENDING_DYNAMIC_REMOVAL);
}

// Check if in-place host update should be skipped, i.e. when the following criteria are met
// (currently there is only one criterion, but we might add more in the future):
// - The cluster health checker is activated and a new host is matched with the existing one,
Expand Down Expand Up @@ -1233,6 +1244,7 @@ bool BaseDynamicClusterImpl::updateDynamicHostList(const HostVector& new_hosts,

final_hosts.push_back(*i);
updated_hosts[(*i)->address()->asString()] = *i;
(*i)->healthFlagSet(Host::HealthFlag::PENDING_DYNAMIC_REMOVAL);
i = current_priority_hosts.erase(i);
} else {
i++;
Expand Down
21 changes: 11 additions & 10 deletions source/common/upstream/upstream_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -202,21 +202,22 @@ class HostImpl : public HostDescriptionImpl,
outlier_detector_ = std::move(outlier_detector);
}
Host::Health health() const override {
if (!health_flags_) {
return Host::Health::Healthy;
}

// If any of the unhealthy flags are set, host is unhealthy.
if (healthFlagGet(HealthFlag::FAILED_ACTIVE_HC) ||
healthFlagGet(HealthFlag::FAILED_OUTLIER_CHECK) ||
healthFlagGet(HealthFlag::FAILED_EDS_HEALTH)) {
return Host::Health::Unhealthy;
}

// Only possible option at this point is that the host is degraded.
ASSERT(healthFlagGet(HealthFlag::DEGRADED_ACTIVE_HC) ||
healthFlagGet(HealthFlag::DEGRADED_EDS_HEALTH));
return Host::Health::Degraded;
// If any of the degraded flags are set, host is degraded.
if (healthFlagGet(HealthFlag::DEGRADED_ACTIVE_HC) ||
healthFlagGet(HealthFlag::DEGRADED_EDS_HEALTH)) {
return Host::Health::Degraded;
}

// The host must have no flags or be pending removal.
ASSERT(health_flags_ == 0 || healthFlagGet(HealthFlag::PENDING_DYNAMIC_REMOVAL));
return Host::Health::Healthy;
}

uint32_t weight() const override { return weight_; }
Expand Down Expand Up @@ -401,7 +402,6 @@ typedef std::unique_ptr<HostSetImpl> HostSetImplPtr;
/**
* A class for management of the set of hosts in a given cluster.
*/

class PrioritySetImpl : public PrioritySet {
public:
PrioritySetImpl() : batch_update_(false) {}
Expand Down Expand Up @@ -698,7 +698,8 @@ class ClusterImplBase : public Cluster, protected Logger::Loggable<Logger::Id::u

private:
void finishInitialization();
void reloadHealthyHosts();
void reloadHealthyHosts(const HostSharedPtr& host);
virtual void reloadHealthyHostsHelper(const HostSharedPtr& host);

bool initialization_started_{};
std::function<void()> initialization_complete_callback_;
Expand Down
4 changes: 4 additions & 0 deletions source/server/http/admin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,10 @@ void setHealthFlag(Upstream::Host::HealthFlag flag, const Upstream::Host& host,
health_status.set_failed_active_degraded_check(
host.healthFlagGet(Upstream::Host::HealthFlag::DEGRADED_ACTIVE_HC));
break;
case Upstream::Host::HealthFlag::PENDING_DYNAMIC_REMOVAL:
health_status.set_pending_dynamic_removal(
host.healthFlagGet(Upstream::Host::HealthFlag::PENDING_DYNAMIC_REMOVAL));
break;
}
}
} // namespace
Expand Down
106 changes: 106 additions & 0 deletions test/common/upstream/eds_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,112 @@ TEST_F(EdsTest, EndpointHealthStatus) {
EXPECT_EQ(rebuild_container + 1, stats_.counter("cluster.name.update_no_rebuild").value());
}

// Verify that a host is removed if it is removed from discovery, stabilized, and then later
// fails active HC.
TEST_F(EdsTest, EndpoingRemovalAfterHcFail) {
envoy::api::v2::ClusterLoadAssignment cluster_load_assignment;
cluster_load_assignment.set_cluster_name("fare");

auto health_checker = std::make_shared<MockHealthChecker>();
EXPECT_CALL(*health_checker, start());
EXPECT_CALL(*health_checker, addHostCheckCompleteCb(_)).Times(2);
cluster_->setHealthChecker(health_checker);

auto add_endpoint = [&cluster_load_assignment](int port) {
auto* endpoints = cluster_load_assignment.add_endpoints();

auto* socket_address = endpoints->add_lb_endpoints()
->mutable_endpoint()
->mutable_address()
->mutable_socket_address();
socket_address->set_address("1.2.3.4");
socket_address->set_port_value(port);
};

add_endpoint(80);
add_endpoint(81);
doOnConfigUpdateVerifyNoThrow(cluster_load_assignment);

{
auto& hosts = cluster_->prioritySet().hostSetsPerPriority()[0]->hosts();
EXPECT_EQ(hosts.size(), 2);

// Mark the hosts as healthy
hosts[0]->healthFlagClear(Host::HealthFlag::FAILED_ACTIVE_HC);
hosts[1]->healthFlagClear(Host::HealthFlag::FAILED_ACTIVE_HC);
}

// Remove endpoints and add back the port 80 one. Both hosts should be present due to
// being stabilized, but one of them should be marked pending removal.
cluster_load_assignment.clear_endpoints();
add_endpoint(80);
doOnConfigUpdateVerifyNoThrow(cluster_load_assignment);

{
auto& hosts = cluster_->prioritySet().hostSetsPerPriority()[0]->hosts();
EXPECT_EQ(hosts.size(), 2);
EXPECT_FALSE(hosts[0]->healthFlagGet(Host::HealthFlag::PENDING_DYNAMIC_REMOVAL));
EXPECT_TRUE(hosts[1]->healthFlagGet(Host::HealthFlag::PENDING_DYNAMIC_REMOVAL));
}

// Add both hosts back, make sure pending removal is gone.
cluster_load_assignment.clear_endpoints();
add_endpoint(80);
add_endpoint(81);
doOnConfigUpdateVerifyNoThrow(cluster_load_assignment);

{
auto& hosts = cluster_->prioritySet().hostSetsPerPriority()[0]->hosts();
EXPECT_EQ(hosts.size(), 2);
EXPECT_FALSE(hosts[0]->healthFlagGet(Host::HealthFlag::PENDING_DYNAMIC_REMOVAL));
EXPECT_FALSE(hosts[1]->healthFlagGet(Host::HealthFlag::PENDING_DYNAMIC_REMOVAL));
}

// Remove endpoints and add back the port 80 one. Both hosts should be present due to
// being stabilized, but one of them should be marked pending removal.
cluster_load_assignment.clear_endpoints();
add_endpoint(80);
doOnConfigUpdateVerifyNoThrow(cluster_load_assignment);

HostSharedPtr not_removed_host;
HostSharedPtr removed_host;
{
EXPECT_EQ(2,
cluster_->prioritySet().hostSetsPerPriority()[0]->hostsPerLocality().get()[0].size());
auto& hosts = cluster_->prioritySet().hostSetsPerPriority()[0]->hosts();
EXPECT_EQ(hosts.size(), 2);
EXPECT_FALSE(hosts[0]->healthFlagGet(Host::HealthFlag::PENDING_DYNAMIC_REMOVAL));
EXPECT_TRUE(hosts[1]->healthFlagGet(Host::HealthFlag::PENDING_DYNAMIC_REMOVAL));

// Mark the host is failing active HC and then run callbacks.
not_removed_host = hosts[0];
removed_host = hosts[1];
hosts[1]->healthFlagSet(Host::HealthFlag::FAILED_ACTIVE_HC);
health_checker->runCallbacks(hosts[1], HealthTransition::Changed);
}

{
EXPECT_EQ(1,
cluster_->prioritySet().hostSetsPerPriority()[0]->hostsPerLocality().get()[0].size());
EXPECT_EQ(1, cluster_->prioritySet().hostSetsPerPriority()[0]->hosts().size());
}

// Add back 81. Verify that we have a new host. This will show that the all_hosts_ was updated
// correctly.
cluster_load_assignment.clear_endpoints();
add_endpoint(80);
add_endpoint(81);
doOnConfigUpdateVerifyNoThrow(cluster_load_assignment);

{
auto& hosts = cluster_->prioritySet().hostSetsPerPriority()[0]->hosts();
EXPECT_EQ(hosts.size(), 2);
EXPECT_EQ(not_removed_host, hosts[0]);
EXPECT_EQ(removed_host->address()->asString(), hosts[1]->address()->asString());
EXPECT_NE(removed_host, hosts[1]);
}
}

// Verify that a host is removed when it is still passing active HC, but has been previously
// told by the EDS server to fail health check.
TEST_F(EdsTest, EndpointRemovalEdsFailButActiveHcSuccess) {
Expand Down
Loading