Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions api/envoy/admin/v2alpha/clusters.proto
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,10 @@ message HostHealthStatus {
// The host is currently being marked as degraded through active health checking.
bool failed_active_degraded_check = 4;

// The host has been removed from service discovery, but is being stabilized due to active
// health checking.
bool pending_dynamic_removal = 5;

// Health status as reported by EDS. Note: only HEALTHY and UNHEALTHY are currently supported
// here.
// TODO(mrice32): pipe through remaining EDS health status possibilities.
Expand Down
3 changes: 2 additions & 1 deletion include/envoy/upstream/health_checker.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ class HealthChecker {
* @param changed_state supplies whether the health check resulted in a host moving from healthy
* to not healthy or vice versa.
*/
typedef std::function<void(HostSharedPtr host, HealthTransition changed_state)> HostStatusCb;
typedef std::function<void(const HostSharedPtr& host, HealthTransition changed_state)>
HostStatusCb;

/**
* Install a callback that will be invoked every time a health check round is completed for
Expand Down
2 changes: 1 addition & 1 deletion include/envoy/upstream/outlier_detection.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ class Detector {
/**
* Outlier detection change state callback.
*/
typedef std::function<void(HostSharedPtr host)> ChangeStateCb;
typedef std::function<void(const HostSharedPtr& host)> ChangeStateCb;

/**
* Add a changed state callback to the detector. The callback will be called whenever any host
Expand Down
5 changes: 4 additions & 1 deletion include/envoy/upstream/upstream.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,10 @@ class Host : virtual public HostDescription {
/* The host is currently marked as degraded through active health checking. */ \
m(DEGRADED_ACTIVE_HC, 0x08) \
/* The host is currently marked as degraded by EDS. */ \
m(DEGRADED_EDS_HEALTH, 0x10)
m(DEGRADED_EDS_HEALTH, 0x10) \
/* The host is pending removal from discovery but is stabilized due to */ \
/* active HC. */ \
m(PENDING_DYNAMIC_REMOVAL, 0x20)
// clang-format on

#define DECLARE_ENUM(name, value) name = value,
Expand Down
46 changes: 46 additions & 0 deletions source/common/upstream/eds.cc
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,52 @@ void EdsClusterImpl::onAssignmentTimeout() {
info_->stats().assignment_stale_.inc();
}

void EdsClusterImpl::reloadHealthyHostsHelper(const HostSharedPtr& host) {
// Here we will see if we have a host that has been marked for deletion by service discovery
// but has been stabilized due to passing active health checking. If such a host is now
// failing active health checking we can remove it during this health check update.
HostSharedPtr host_to_exclude = host;
if (host_to_exclude != nullptr &&
host_to_exclude->healthFlagGet(Host::HealthFlag::FAILED_ACTIVE_HC) &&
host_to_exclude->healthFlagGet(Host::HealthFlag::PENDING_DYNAMIC_REMOVAL)) {
// Empty for clarity.
} else {
// Do not exclude and remove the host during the update.
host_to_exclude = nullptr;
}

const auto& host_sets = prioritySet().hostSetsPerPriority();
for (size_t priority = 0; priority < host_sets.size(); ++priority) {
const auto& host_set = host_sets[priority];

// Filter current hosts in case we need to exclude a host.
HostVectorSharedPtr hosts_copy(new HostVector());
std::copy_if(host_set->hosts().begin(), host_set->hosts().end(),
std::back_inserter(*hosts_copy),
[&host_to_exclude](const HostSharedPtr& host) { return host_to_exclude != host; });

// Setup a hosts to remove vector in case we need to exclude a host.
HostVector hosts_to_remove;
if (hosts_copy->size() != host_set->hosts().size()) {
ASSERT(hosts_copy->size() == host_set->hosts().size() - 1);
hosts_to_remove.emplace_back(host_to_exclude);
}

// Filter hosts per locality in case we need to exclude a host.
HostsPerLocalityConstSharedPtr hosts_per_locality_copy = host_set->hostsPerLocality().filter(
{[&host_to_exclude](const Host& host) { return &host != host_to_exclude.get(); }})[0];

prioritySet().updateHosts(priority,
HostSetImpl::partitionHosts(hosts_copy, hosts_per_locality_copy),
host_set->localityWeights(), {}, hosts_to_remove, absl::nullopt);
}

if (host_to_exclude != nullptr) {
ASSERT(all_hosts_.find(host_to_exclude->address()->asString()) != all_hosts_.end());
all_hosts_.erase(host_to_exclude->address()->asString());
}
}

bool EdsClusterImpl::updateHostsPerLocality(
const uint32_t priority, const uint32_t overprovisioning_factor, const HostVector& new_hosts,
LocalityWeightsMap& locality_weights_map, LocalityWeightsMap& new_locality_weights_map,
Expand Down
1 change: 1 addition & 0 deletions source/common/upstream/eds.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ class EdsClusterImpl : public BaseDynamicClusterImpl, Config::SubscriptionCallba
std::unordered_map<std::string, HostSharedPtr>& updated_hosts);

// ClusterImplBase
void reloadHealthyHostsHelper(const HostSharedPtr& host) override;
void startPreInit() override;
void onAssignmentTimeout();

Expand Down
22 changes: 17 additions & 5 deletions source/common/upstream/upstream_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -777,7 +777,7 @@ void ClusterImplBase::finishInitialization() {
initialization_complete_callback_ = nullptr;

if (health_checker_ != nullptr) {
reloadHealthyHosts();
reloadHealthyHosts(nullptr);
}

if (snapped_callback != nullptr) {
Expand All @@ -790,11 +790,11 @@ void ClusterImplBase::setHealthChecker(const HealthCheckerSharedPtr& health_chec
health_checker_ = health_checker;
health_checker_->start();
health_checker_->addHostCheckCompleteCb(
[this](HostSharedPtr, HealthTransition changed_state) -> void {
[this](const HostSharedPtr& host, HealthTransition changed_state) -> void {
// If we get a health check completion that resulted in a state change, signal to
// update the host sets on all threads.
if (changed_state == HealthTransition::Changed) {
reloadHealthyHosts();
reloadHealthyHosts(host);
}
});
}
Expand All @@ -805,10 +805,11 @@ void ClusterImplBase::setOutlierDetector(const Outlier::DetectorSharedPtr& outli
}

outlier_detector_ = outlier_detector;
outlier_detector_->addChangedStateCb([this](HostSharedPtr) -> void { reloadHealthyHosts(); });
outlier_detector_->addChangedStateCb(
[this](const HostSharedPtr& host) -> void { reloadHealthyHosts(host); });
}

void ClusterImplBase::reloadHealthyHosts() {
void ClusterImplBase::reloadHealthyHosts(const HostSharedPtr& host) {
// Every time a host changes Health Check state we cause a full healthy host recalculation which
// for expensive LBs (ring, subset, etc.) can be quite time consuming. During startup, this
// can also block worker threads by doing this repeatedly. There is no reason to do this
Expand All @@ -818,6 +819,10 @@ void ClusterImplBase::reloadHealthyHosts() {
return;
}

reloadHealthyHostsHelper(host);
}

void ClusterImplBase::reloadHealthyHostsHelper(const HostSharedPtr&) {
const auto& host_sets = prioritySet().hostSetsPerPriority();
for (size_t priority = 0; priority < host_sets.size(); ++priority) {
const auto& host_set = host_sets[priority];
Expand Down Expand Up @@ -1129,6 +1134,12 @@ bool BaseDynamicClusterImpl::updateDynamicHostList(const HostVector& new_hosts,
auto existing_host = all_hosts.find(host->address()->asString());
const bool existing_host_found = existing_host != all_hosts.end();

// Clear any pending deletion flag on an existing host in case it came back while it was
// being stabilized. We will set it again below if needed.
if (existing_host_found) {
existing_host->second->healthFlagClear(Host::HealthFlag::PENDING_DYNAMIC_REMOVAL);
}

// Check if in-place host update should be skipped, i.e. when the following criteria are met
// (currently there is only one criterion, but we might add more in the future):
// - The cluster health checker is activated and a new host is matched with the existing one,
Expand Down Expand Up @@ -1233,6 +1244,7 @@ bool BaseDynamicClusterImpl::updateDynamicHostList(const HostVector& new_hosts,

final_hosts.push_back(*i);
updated_hosts[(*i)->address()->asString()] = *i;
(*i)->healthFlagSet(Host::HealthFlag::PENDING_DYNAMIC_REMOVAL);
i = current_priority_hosts.erase(i);
} else {
i++;
Expand Down
21 changes: 11 additions & 10 deletions source/common/upstream/upstream_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -202,21 +202,22 @@ class HostImpl : public HostDescriptionImpl,
outlier_detector_ = std::move(outlier_detector);
}
Host::Health health() const override {
if (!health_flags_) {
return Host::Health::Healthy;
}

// If any of the unhealthy flags are set, host is unhealthy.
if (healthFlagGet(HealthFlag::FAILED_ACTIVE_HC) ||
healthFlagGet(HealthFlag::FAILED_OUTLIER_CHECK) ||
healthFlagGet(HealthFlag::FAILED_EDS_HEALTH)) {
return Host::Health::Unhealthy;
}

// Only possible option at this point is that the host is degraded.
ASSERT(healthFlagGet(HealthFlag::DEGRADED_ACTIVE_HC) ||
healthFlagGet(HealthFlag::DEGRADED_EDS_HEALTH));
return Host::Health::Degraded;
// If any of the degraded flags are set, host is degraded.
if (healthFlagGet(HealthFlag::DEGRADED_ACTIVE_HC) ||
healthFlagGet(HealthFlag::DEGRADED_EDS_HEALTH)) {
return Host::Health::Degraded;
}

// The host must have no flags or be pending removal.
ASSERT(health_flags_ == 0 || healthFlagGet(HealthFlag::PENDING_DYNAMIC_REMOVAL));
return Host::Health::Healthy;
}

uint32_t weight() const override { return weight_; }
Expand Down Expand Up @@ -401,7 +402,6 @@ typedef std::unique_ptr<HostSetImpl> HostSetImplPtr;
/**
* A class for management of the set of hosts in a given cluster.
*/

class PrioritySetImpl : public PrioritySet {
public:
PrioritySetImpl() : batch_update_(false) {}
Expand Down Expand Up @@ -698,7 +698,8 @@ class ClusterImplBase : public Cluster, protected Logger::Loggable<Logger::Id::u

private:
void finishInitialization();
void reloadHealthyHosts();
void reloadHealthyHosts(const HostSharedPtr& host);
virtual void reloadHealthyHostsHelper(const HostSharedPtr& host);

bool initialization_started_{};
std::function<void()> initialization_complete_callback_;
Expand Down
4 changes: 4 additions & 0 deletions source/server/http/admin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,10 @@ void setHealthFlag(Upstream::Host::HealthFlag flag, const Upstream::Host& host,
health_status.set_failed_active_degraded_check(
host.healthFlagGet(Upstream::Host::HealthFlag::DEGRADED_ACTIVE_HC));
break;
case Upstream::Host::HealthFlag::PENDING_DYNAMIC_REMOVAL:
health_status.set_pending_dynamic_removal(
host.healthFlagGet(Upstream::Host::HealthFlag::PENDING_DYNAMIC_REMOVAL));
break;
}
}
} // namespace
Expand Down
103 changes: 103 additions & 0 deletions test/common/upstream/eds_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,109 @@ TEST_F(EdsTest, EndpointHealthStatus) {
EXPECT_EQ(rebuild_container + 1, stats_.counter("cluster.name.update_no_rebuild").value());
}

// Verify that a host is removed if it is removed from discovery, stabilized, and then later
// fails active HC.
TEST_F(EdsTest, EndpoingRemovalAfterHcFail) {
envoy::api::v2::ClusterLoadAssignment cluster_load_assignment;
cluster_load_assignment.set_cluster_name("fare");

auto health_checker = std::make_shared<MockHealthChecker>();
EXPECT_CALL(*health_checker, start());
EXPECT_CALL(*health_checker, addHostCheckCompleteCb(_)).Times(2);
cluster_->setHealthChecker(health_checker);

auto add_endpoint = [&cluster_load_assignment](int port) {
auto* endpoints = cluster_load_assignment.add_endpoints();

auto* socket_address = endpoints->add_lb_endpoints()
->mutable_endpoint()
->mutable_address()
->mutable_socket_address();
socket_address->set_address("1.2.3.4");
socket_address->set_port_value(port);
};

add_endpoint(80);
add_endpoint(81);
doOnConfigUpdateVerifyNoThrow(cluster_load_assignment);

{
auto& hosts = cluster_->prioritySet().hostSetsPerPriority()[0]->hosts();
EXPECT_EQ(hosts.size(), 2);

// Mark the hosts as healthy
hosts[0]->healthFlagClear(Host::HealthFlag::FAILED_ACTIVE_HC);
hosts[1]->healthFlagClear(Host::HealthFlag::FAILED_ACTIVE_HC);
}

// Remove endpoints and add back the port 80 one. Both hosts should be present due to
// being stabilized, but one of them should be marked pending removal.
cluster_load_assignment.clear_endpoints();
add_endpoint(80);
doOnConfigUpdateVerifyNoThrow(cluster_load_assignment);

{
auto& hosts = cluster_->prioritySet().hostSetsPerPriority()[0]->hosts();
EXPECT_EQ(hosts.size(), 2);
EXPECT_TRUE(hosts[1]->healthFlagGet(Host::HealthFlag::PENDING_DYNAMIC_REMOVAL));
Comment thread
mattklein123 marked this conversation as resolved.
}

// Add both hosts back, make sure pending removal is gone.
cluster_load_assignment.clear_endpoints();
add_endpoint(80);
add_endpoint(81);
doOnConfigUpdateVerifyNoThrow(cluster_load_assignment);

{
auto& hosts = cluster_->prioritySet().hostSetsPerPriority()[0]->hosts();
EXPECT_EQ(hosts.size(), 2);
EXPECT_FALSE(hosts[1]->healthFlagGet(Host::HealthFlag::PENDING_DYNAMIC_REMOVAL));
}

// Remove endpoints and add back the port 80 one. Both hosts should be present due to
// being stabilized, but one of them should be marked pending removal.
cluster_load_assignment.clear_endpoints();
add_endpoint(80);
doOnConfigUpdateVerifyNoThrow(cluster_load_assignment);

HostSharedPtr not_removed_host;
HostSharedPtr removed_host;
{
EXPECT_EQ(2,
cluster_->prioritySet().hostSetsPerPriority()[0]->hostsPerLocality().get()[0].size());
auto& hosts = cluster_->prioritySet().hostSetsPerPriority()[0]->hosts();
EXPECT_EQ(hosts.size(), 2);
EXPECT_TRUE(hosts[1]->healthFlagGet(Host::HealthFlag::PENDING_DYNAMIC_REMOVAL));

// Mark the host is failing active HC and then run callbacks.
not_removed_host = hosts[0];
removed_host = hosts[1];
hosts[1]->healthFlagSet(Host::HealthFlag::FAILED_ACTIVE_HC);
health_checker->runCallbacks(hosts[1], HealthTransition::Changed);
}

{
EXPECT_EQ(1,
cluster_->prioritySet().hostSetsPerPriority()[0]->hostsPerLocality().get()[0].size());
EXPECT_EQ(1, cluster_->prioritySet().hostSetsPerPriority()[0]->hosts().size());
}

// Add back 81. Verify that we have a new host. This will show that the all_hosts_ was updated
// correctly.
cluster_load_assignment.clear_endpoints();
add_endpoint(80);
add_endpoint(81);
doOnConfigUpdateVerifyNoThrow(cluster_load_assignment);

{
auto& hosts = cluster_->prioritySet().hostSetsPerPriority()[0]->hosts();
EXPECT_EQ(hosts.size(), 2);
EXPECT_EQ(not_removed_host, hosts[0]);
EXPECT_EQ(removed_host->address()->asString(), hosts[1]->address()->asString());
EXPECT_NE(removed_host, hosts[1]);
}
}

// Verify that a host is removed when it is still passing active HC, but has been previously
// told by the EDS server to fail health check.
TEST_F(EdsTest, EndpointRemovalEdsFailButActiveHcSuccess) {
Expand Down
Loading