diff --git a/docs/root/version_history/current.rst b/docs/root/version_history/current.rst index 3c371cd41e2bd..e5c85b0ac35df 100644 --- a/docs/root/version_history/current.rst +++ b/docs/root/version_history/current.rst @@ -21,6 +21,7 @@ Bug Fixes *Changes expected to improve the state of the world and are unlikely to have negative effects* * active http health checks: properly handles HTTP/2 GOAWAY frames from the upstream. Previously a GOAWAY frame due to a graceful listener drain could cause improper failed health checks due to streams being refused by the upstream on a connection that is going away. To revert to old GOAWAY handling behavior, set the runtime feature `envoy.reloadable_features.health_check.graceful_goaway_handling` to false. +* upstream: fix handling of moving endpoints between priorities when active health checks are enabled. Previously moving to a higher numbered priority was a NOOP, and moving to a lower numbered priority caused an abort. Removed Config or Runtime ------------------------- diff --git a/include/envoy/upstream/upstream.h b/include/envoy/upstream/upstream.h index b9de4977659b5..74582b1c41a6e 100644 --- a/include/envoy/upstream/upstream.h +++ b/include/envoy/upstream/upstream.h @@ -211,7 +211,7 @@ using HostVector = std::vector; using HealthyHostVector = Phantom; using DegradedHostVector = Phantom; using ExcludedHostVector = Phantom; -using HostMap = absl::node_hash_map; +using HostMap = absl::flat_hash_map; using HostVectorSharedPtr = std::shared_ptr; using HostVectorConstSharedPtr = std::shared_ptr; diff --git a/source/common/upstream/eds.cc b/source/common/upstream/eds.cc index 26fc51d58cfbe..e82c854343198 100644 --- a/source/common/upstream/eds.cc +++ b/source/common/upstream/eds.cc @@ -47,7 +47,8 @@ EdsClusterImpl::EdsClusterImpl( void EdsClusterImpl::startPreInit() { subscription_->start({cluster_name_}); } void EdsClusterImpl::BatchUpdateHelper::batchUpdate(PrioritySet::HostUpdateCb& host_update_cb) { - absl::node_hash_map updated_hosts; + absl::flat_hash_map updated_hosts; + absl::flat_hash_set all_new_hosts; PriorityStateManager priority_state_manager(parent_, parent_.local_info_, &host_update_cb); for (const auto& locality_lb_endpoint : cluster_load_assignment_.endpoints()) { parent_.validateEndpointsForZoneAwareRouting(locality_lb_endpoint); @@ -55,10 +56,11 @@ void EdsClusterImpl::BatchUpdateHelper::batchUpdate(PrioritySet::HostUpdateCb& h priority_state_manager.initializePriorityFor(locality_lb_endpoint); for (const auto& lb_endpoint : locality_lb_endpoint.lb_endpoints()) { - priority_state_manager.registerHostForPriority( - lb_endpoint.endpoint().hostname(), - parent_.resolveProtoAddress(lb_endpoint.endpoint().address()), locality_lb_endpoint, - lb_endpoint, parent_.time_source_); + auto address = parent_.resolveProtoAddress(lb_endpoint.endpoint().address()); + priority_state_manager.registerHostForPriority(lb_endpoint.endpoint().hostname(), address, + locality_lb_endpoint, lb_endpoint, + parent_.time_source_); + all_new_hosts.emplace(address->asString()); } } @@ -79,13 +81,13 @@ void EdsClusterImpl::BatchUpdateHelper::batchUpdate(PrioritySet::HostUpdateCb& h if (priority_state[i].first != nullptr) { cluster_rebuilt |= parent_.updateHostsPerLocality( i, overprovisioning_factor, *priority_state[i].first, parent_.locality_weights_map_[i], - priority_state[i].second, priority_state_manager, updated_hosts); + priority_state[i].second, priority_state_manager, updated_hosts, all_new_hosts); } else { // If the new update contains a priority with no hosts, call the update function with an empty // set of hosts. cluster_rebuilt |= parent_.updateHostsPerLocality( i, overprovisioning_factor, {}, parent_.locality_weights_map_[i], empty_locality_map, - priority_state_manager, updated_hosts); + priority_state_manager, updated_hosts, all_new_hosts); } } @@ -98,7 +100,7 @@ void EdsClusterImpl::BatchUpdateHelper::batchUpdate(PrioritySet::HostUpdateCb& h } cluster_rebuilt |= parent_.updateHostsPerLocality( i, overprovisioning_factor, {}, parent_.locality_weights_map_[i], empty_locality_map, - priority_state_manager, updated_hosts); + priority_state_manager, updated_hosts, all_new_hosts); } parent_.all_hosts_ = std::move(updated_hosts); @@ -236,7 +238,8 @@ bool EdsClusterImpl::updateHostsPerLocality( const uint32_t priority, const uint32_t overprovisioning_factor, const HostVector& new_hosts, LocalityWeightsMap& locality_weights_map, LocalityWeightsMap& new_locality_weights_map, PriorityStateManager& priority_state_manager, - absl::node_hash_map& updated_hosts) { + absl::flat_hash_map& updated_hosts, + const absl::flat_hash_set& all_new_hosts) { const auto& host_set = priority_set_.getOrCreateHostSet(priority, overprovisioning_factor); HostVectorSharedPtr current_hosts_copy(new HostVector(host_set.hosts())); @@ -252,8 +255,9 @@ bool EdsClusterImpl::updateHostsPerLocality( // performance implications, since this has the knock on effect that we rebuild the load balancers // and locality scheduler. See the comment in BaseDynamicClusterImpl::updateDynamicHostList // about this. In the future we may need to do better here. - const bool hosts_updated = updateDynamicHostList(new_hosts, *current_hosts_copy, hosts_added, - hosts_removed, updated_hosts, all_hosts_); + const bool hosts_updated = + updateDynamicHostList(new_hosts, *current_hosts_copy, hosts_added, hosts_removed, + updated_hosts, all_hosts_, all_new_hosts); if (hosts_updated || host_set.overprovisioningFactor() != overprovisioning_factor || locality_weights_map != new_locality_weights_map) { ASSERT(std::all_of(current_hosts_copy->begin(), current_hosts_copy->end(), diff --git a/source/common/upstream/eds.h b/source/common/upstream/eds.h index 4ab24c38788ab..224f0984e420c 100644 --- a/source/common/upstream/eds.h +++ b/source/common/upstream/eds.h @@ -52,8 +52,8 @@ class EdsClusterImpl bool updateHostsPerLocality(const uint32_t priority, const uint32_t overprovisioning_factor, const HostVector& new_hosts, LocalityWeightsMap& locality_weights_map, LocalityWeightsMap& new_locality_weights_map, - PriorityStateManager& priority_state_manager, - absl::node_hash_map& updated_hosts); + PriorityStateManager& priority_state_manager, HostMap& updated_hosts, + const absl::flat_hash_set& all_new_hosts); bool validateUpdateSize(int num_resources); // ClusterImplBase diff --git a/source/common/upstream/strict_dns_cluster.cc b/source/common/upstream/strict_dns_cluster.cc index 70f1dd37e00bc..a7f528b18f6be 100644 --- a/source/common/upstream/strict_dns_cluster.cc +++ b/source/common/upstream/strict_dns_cluster.cc @@ -118,9 +118,10 @@ void StrictDnsClusterImpl::ResolveTarget::startResolve() { if (status == Network::DnsResolver::ResolutionStatus::Success) { parent_.info_->stats().update_success_.inc(); - absl::node_hash_map updated_hosts; + HostMap updated_hosts; HostVector new_hosts; std::chrono::seconds ttl_refresh_rate = std::chrono::seconds::max(); + absl::flat_hash_set all_new_hosts; for (const auto& resp : response) { // TODO(mattklein123): Currently the DNS interface does not consider port. We need to // make a new address that has port in it. We need to both support IPv6 as well as @@ -135,14 +136,14 @@ void StrictDnsClusterImpl::ResolveTarget::startResolve() { lb_endpoint_.load_balancing_weight().value(), locality_lb_endpoint_.locality(), lb_endpoint_.endpoint().health_check_config(), locality_lb_endpoint_.priority(), lb_endpoint_.health_status(), parent_.time_source_)); - + all_new_hosts.emplace(new_hosts.back()->address()->asString()); ttl_refresh_rate = min(ttl_refresh_rate, resp.ttl_); } HostVector hosts_added; HostVector hosts_removed; if (parent_.updateDynamicHostList(new_hosts, hosts_, hosts_added, hosts_removed, - updated_hosts, all_hosts_)) { + updated_hosts, all_hosts_, all_new_hosts)) { ENVOY_LOG(debug, "DNS hosts have changed for {}", dns_address_); ASSERT(std::all_of(hosts_.begin(), hosts_.end(), [&](const auto& host) { return host->priority() == locality_lb_endpoint_.priority(); diff --git a/source/common/upstream/upstream_impl.cc b/source/common/upstream/upstream_impl.cc index 63234cf67129e..c4badf0f937ac 100644 --- a/source/common/upstream/upstream_impl.cc +++ b/source/common/upstream/upstream_impl.cc @@ -1384,12 +1384,11 @@ void PriorityStateManager::updateClusterPrioritySet( } } -bool BaseDynamicClusterImpl::updateDynamicHostList(const HostVector& new_hosts, - HostVector& current_priority_hosts, - HostVector& hosts_added_to_current_priority, - HostVector& hosts_removed_from_current_priority, - HostMap& updated_hosts, - const HostMap& all_hosts) { +bool BaseDynamicClusterImpl::updateDynamicHostList( + const HostVector& new_hosts, HostVector& current_priority_hosts, + HostVector& hosts_added_to_current_priority, HostVector& hosts_removed_from_current_priority, + HostMap& updated_hosts, const HostMap& all_hosts, + const absl::flat_hash_set& all_new_hosts) { uint64_t max_host_weight = 1; // Did hosts change? @@ -1414,8 +1413,10 @@ bool BaseDynamicClusterImpl::updateDynamicHostList(const HostVector& new_hosts, // do the same thing. // Keep track of hosts we see in new_hosts that we are able to match up with an existing host. - absl::node_hash_set existing_hosts_for_current_priority( + absl::flat_hash_set existing_hosts_for_current_priority( current_priority_hosts.size()); + // Keep track of hosts we're adding (or replacing) + absl::flat_hash_set new_hosts_for_current_priority(new_hosts.size()); HostVector final_hosts; for (const HostSharedPtr& host : new_hosts) { if (updated_hosts.count(host->address()->asString())) { @@ -1499,6 +1500,7 @@ bool BaseDynamicClusterImpl::updateDynamicHostList(const HostVector& new_hosts, final_hosts.push_back(existing_host->second); updated_hosts[existing_host->second->address()->asString()] = existing_host->second; } else { + new_hosts_for_current_priority.emplace(host->address()->asString()); if (host->weight() > max_host_weight) { max_host_weight = host->weight(); } @@ -1555,7 +1557,18 @@ bool BaseDynamicClusterImpl::updateDynamicHostList(const HostVector& new_hosts, if (!current_priority_hosts.empty() && dont_remove_healthy_hosts) { erase_from = std::remove_if(current_priority_hosts.begin(), current_priority_hosts.end(), - [&updated_hosts, &final_hosts, &max_host_weight](const HostSharedPtr& p) { + [&all_new_hosts, &new_hosts_for_current_priority, &updated_hosts, + &final_hosts, &max_host_weight](const HostSharedPtr& p) { + if (all_new_hosts.contains(p->address()->asString()) && + !new_hosts_for_current_priority.contains(p->address()->asString())) { + // If the address is being completely deleted from this priority, but is + // referenced from another priority, then we assume that the other + // priority will perform an in-place update to re-use the existing Host. + // We should therefore not mark it as PENDING_DYNAMIC_REMOVAL, but + // instead remove it immediately from this priority. + return false; + } + if (!(p->healthFlagGet(Host::HealthFlag::FAILED_ACTIVE_HC) || p->healthFlagGet(Host::HealthFlag::FAILED_EDS_HEALTH))) { if (p->weight() > max_host_weight) { diff --git a/source/common/upstream/upstream_impl.h b/source/common/upstream/upstream_impl.h index 3ff38c4b770d6..1c9fb7fa99618 100644 --- a/source/common/upstream/upstream_impl.h +++ b/source/common/upstream/upstream_impl.h @@ -913,13 +913,15 @@ class BaseDynamicClusterImpl : public ClusterImplBase { * priority. * @param updated_hosts is used to aggregate the new state of all hosts across priority, and will * be updated with the hosts that remain in this priority after the update. - * @param all_hosts all known hosts prior to this host update. + * @param all_hosts all known hosts prior to this host update across all priorities. + * @param all_new_hosts addresses of all hosts in the new configuration across all priorities. * @return whether the hosts for the priority changed. */ bool updateDynamicHostList(const HostVector& new_hosts, HostVector& current_priority_hosts, HostVector& hosts_added_to_current_priority, HostVector& hosts_removed_from_current_priority, - HostMap& updated_hosts, const HostMap& all_hosts); + HostMap& updated_hosts, const HostMap& all_hosts, + const absl::flat_hash_set& all_new_hosts); }; /** diff --git a/source/extensions/clusters/redis/redis_cluster.cc b/source/extensions/clusters/redis/redis_cluster.cc index 673bffa677055..7565936db2734 100644 --- a/source/extensions/clusters/redis/redis_cluster.cc +++ b/source/extensions/clusters/redis/redis_cluster.cc @@ -94,19 +94,22 @@ void RedisCluster::updateAllHosts(const Upstream::HostVector& hosts_added, void RedisCluster::onClusterSlotUpdate(ClusterSlotsPtr&& slots) { Upstream::HostVector new_hosts; + absl::flat_hash_set all_new_hosts; for (const ClusterSlot& slot : *slots) { new_hosts.emplace_back(new RedisHost(info(), "", slot.primary(), *this, true, time_source_)); + all_new_hosts.emplace(slot.primary()->asString()); for (auto const& replica : slot.replicas()) { new_hosts.emplace_back(new RedisHost(info(), "", replica, *this, false, time_source_)); + all_new_hosts.emplace(replica->asString()); } } - absl::node_hash_map updated_hosts; + Upstream::HostMap updated_hosts; Upstream::HostVector hosts_added; Upstream::HostVector hosts_removed; const bool host_updated = updateDynamicHostList(new_hosts, hosts_, hosts_added, hosts_removed, - updated_hosts, all_hosts_); + updated_hosts, all_hosts_, all_new_hosts); const bool slot_updated = lb_factory_ ? lb_factory_->onClusterSlotUpdate(std::move(slots), updated_hosts) : false; diff --git a/test/common/upstream/eds_test.cc b/test/common/upstream/eds_test.cc index 0cc9f5f9dd464..25d5e26d38a66 100644 --- a/test/common/upstream/eds_test.cc +++ b/test/common/upstream/eds_test.cc @@ -837,7 +837,7 @@ TEST_F(EdsTest, EndpointRemovalClusterDrainOnHostRemoval) { } // Verifies that if an endpoint is moved to a new priority, the active hc status is preserved. -TEST_F(EdsTest, EndpointMovedToNewPriority) { +TEST_F(EdsTest, EndpointMovedToNewPriorityWithDrain) { envoy::config::endpoint::v3::ClusterLoadAssignment cluster_load_assignment; cluster_load_assignment.set_cluster_name("fare"); resetClusterDrainOnHostRemoval(); @@ -899,6 +899,7 @@ TEST_F(EdsTest, EndpointMovedToNewPriority) { // The endpoint was healthy in the original priority, so moving it // around should preserve that. EXPECT_FALSE(hosts[0]->healthFlagGet(Host::HealthFlag::FAILED_ACTIVE_HC)); + EXPECT_FALSE(hosts[0]->healthFlagGet(Host::HealthFlag::PENDING_DYNAMIC_REMOVAL)); } { @@ -911,6 +912,7 @@ TEST_F(EdsTest, EndpointMovedToNewPriority) { // The endpoint was healthy in the original priority, so moving it // around should preserve that. EXPECT_FALSE(hosts[0]->healthFlagGet(Host::HealthFlag::FAILED_ACTIVE_HC)); + EXPECT_FALSE(hosts[0]->healthFlagGet(Host::HealthFlag::PENDING_DYNAMIC_REMOVAL)); } // Moves all the endpoints to priority 1. @@ -932,13 +934,15 @@ TEST_F(EdsTest, EndpointMovedToNewPriority) { // The endpoints were healthy, so moving them around should preserve that. EXPECT_FALSE(hosts[0]->healthFlagGet(Host::HealthFlag::FAILED_ACTIVE_HC)); + EXPECT_FALSE(hosts[0]->healthFlagGet(Host::HealthFlag::PENDING_DYNAMIC_REMOVAL)); EXPECT_FALSE(hosts[1]->healthFlagGet(Host::HealthFlag::FAILED_ACTIVE_HC)); + EXPECT_FALSE(hosts[1]->healthFlagGet(Host::HealthFlag::PENDING_DYNAMIC_REMOVAL)); } } // Verifies that if an endpoint is moved between priorities, the health check value // of the host is preserved -TEST_F(EdsTest, EndpointMoved) { +TEST_F(EdsTest, EndpointMovedWithDrain) { envoy::config::endpoint::v3::ClusterLoadAssignment cluster_load_assignment; cluster_load_assignment.set_cluster_name("fare"); resetClusterDrainOnHostRemoval(); @@ -1008,6 +1012,7 @@ TEST_F(EdsTest, EndpointMoved) { // The endpoint was healthy in the original priority, so moving it // around should preserve that. EXPECT_FALSE(hosts[0]->healthFlagGet(Host::HealthFlag::FAILED_ACTIVE_HC)); + EXPECT_FALSE(hosts[0]->healthFlagGet(Host::HealthFlag::PENDING_DYNAMIC_REMOVAL)); } { @@ -1021,6 +1026,296 @@ TEST_F(EdsTest, EndpointMoved) { // The endpoint was healthy in the original priority, so moving it // around should preserve that. EXPECT_FALSE(hosts[0]->healthFlagGet(Host::HealthFlag::FAILED_ACTIVE_HC)); + EXPECT_FALSE(hosts[0]->healthFlagGet(Host::HealthFlag::PENDING_DYNAMIC_REMOVAL)); + } +} + +// Verifies that if an endpoint is moved to a new priority, the active hc status is preserved. +TEST_F(EdsTest, EndpointMovedToNewPriority) { + envoy::config::endpoint::v3::ClusterLoadAssignment cluster_load_assignment; + cluster_load_assignment.set_cluster_name("fare"); + resetCluster(); + + auto health_checker = std::make_shared(); + EXPECT_CALL(*health_checker, start()); + EXPECT_CALL(*health_checker, addHostCheckCompleteCb(_)).Times(2); + cluster_->setHealthChecker(health_checker); + + auto add_endpoint = [&cluster_load_assignment](int port, int priority) { + auto* endpoints = cluster_load_assignment.add_endpoints(); + endpoints->set_priority(priority); + + auto* socket_address = endpoints->add_lb_endpoints() + ->mutable_endpoint() + ->mutable_address() + ->mutable_socket_address(); + socket_address->set_address("1.2.3.4"); + socket_address->set_port_value(port); + }; + + add_endpoint(80, 0); + add_endpoint(81, 0); + + doOnConfigUpdateVerifyNoThrow(cluster_load_assignment); + + { + auto& hosts = cluster_->prioritySet().hostSetsPerPriority()[0]->hosts(); + EXPECT_EQ(hosts.size(), 2); + + // Mark the hosts as healthy + for (auto& host : hosts) { + EXPECT_TRUE(host->healthFlagGet(Host::HealthFlag::FAILED_ACTIVE_HC)); + host->healthFlagClear(Host::HealthFlag::FAILED_ACTIVE_HC); + host->healthFlagClear(Host::HealthFlag::PENDING_ACTIVE_HC); + } + } + + // Moves the endpoints between priorities + cluster_load_assignment.clear_endpoints(); + add_endpoint(81, 0); + add_endpoint(80, 1); + + // Verify that no hosts gets added or removed to/from the PrioritySet. + cluster_->prioritySet().addMemberUpdateCb([&](const auto& added, const auto& removed) { + EXPECT_TRUE(added.empty()); + EXPECT_TRUE(removed.empty()); + }); + + doOnConfigUpdateVerifyNoThrow(cluster_load_assignment); + + { + auto& hosts = cluster_->prioritySet().hostSetsPerPriority()[0]->hosts(); + EXPECT_EQ(hosts.size(), 1); + + // assert that it didn't move + EXPECT_EQ(hosts[0]->address()->asString(), "1.2.3.4:81"); + + // The endpoint was healthy in the original priority, so moving it + // around should preserve that. + EXPECT_FALSE(hosts[0]->healthFlagGet(Host::HealthFlag::FAILED_ACTIVE_HC)); + EXPECT_FALSE(hosts[0]->healthFlagGet(Host::HealthFlag::PENDING_DYNAMIC_REMOVAL)); + } + + { + auto& hosts = cluster_->prioritySet().hostSetsPerPriority()[1]->hosts(); + EXPECT_EQ(hosts.size(), 1); + + // assert that it moved + EXPECT_EQ(hosts[0]->address()->asString(), "1.2.3.4:80"); + + // The endpoint was healthy in the original priority, so moving it + // around should preserve that. + EXPECT_FALSE(hosts[0]->healthFlagGet(Host::HealthFlag::FAILED_ACTIVE_HC)); + EXPECT_FALSE(hosts[0]->healthFlagGet(Host::HealthFlag::PENDING_DYNAMIC_REMOVAL)); + } + + // Moves all the endpoints to priority 1. + cluster_load_assignment.clear_endpoints(); + add_endpoint(80, 1); + add_endpoint(81, 1); + + doOnConfigUpdateVerifyNoThrow(cluster_load_assignment); + + { + // Priority 0 should now be empty. + auto& hosts = cluster_->prioritySet().hostSetsPerPriority()[0]->hosts(); + EXPECT_EQ(hosts.size(), 0); + } + + { + auto& hosts = cluster_->prioritySet().hostSetsPerPriority()[1]->hosts(); + EXPECT_EQ(hosts.size(), 2); + + // The endpoints were healthy, so moving them around should preserve that. + EXPECT_FALSE(hosts[0]->healthFlagGet(Host::HealthFlag::FAILED_ACTIVE_HC)); + EXPECT_FALSE(hosts[0]->healthFlagGet(Host::HealthFlag::PENDING_DYNAMIC_REMOVAL)); + EXPECT_FALSE(hosts[1]->healthFlagGet(Host::HealthFlag::FAILED_ACTIVE_HC)); + EXPECT_FALSE(hosts[1]->healthFlagGet(Host::HealthFlag::PENDING_DYNAMIC_REMOVAL)); + } +} + +// Verifies that if an endpoint is moved between priorities, the health check value +// of the host is preserved +TEST_F(EdsTest, EndpointMoved) { + envoy::config::endpoint::v3::ClusterLoadAssignment cluster_load_assignment; + cluster_load_assignment.set_cluster_name("fare"); + resetCluster(); + + auto health_checker = std::make_shared(); + EXPECT_CALL(*health_checker, start()); + EXPECT_CALL(*health_checker, addHostCheckCompleteCb(_)).Times(2); + cluster_->setHealthChecker(health_checker); + + auto add_endpoint = [&cluster_load_assignment](int port, int priority) { + auto* endpoints = cluster_load_assignment.add_endpoints(); + endpoints->set_priority(priority); + + auto* socket_address = endpoints->add_lb_endpoints() + ->mutable_endpoint() + ->mutable_address() + ->mutable_socket_address(); + socket_address->set_address("1.2.3.4"); + socket_address->set_port_value(port); + }; + + add_endpoint(80, 0); + add_endpoint(81, 1); + doOnConfigUpdateVerifyNoThrow(cluster_load_assignment); + + { + auto& hosts = cluster_->prioritySet().hostSetsPerPriority()[0]->hosts(); + EXPECT_EQ(hosts.size(), 1); + + EXPECT_TRUE(hosts[0]->healthFlagGet(Host::HealthFlag::FAILED_ACTIVE_HC)); + EXPECT_EQ(0, hosts[0]->priority()); + // Mark the host as healthy and remove the pending active hc flag. + hosts[0]->healthFlagClear(Host::HealthFlag::FAILED_ACTIVE_HC); + hosts[0]->healthFlagClear(Host::HealthFlag::PENDING_ACTIVE_HC); + } + + { + auto& hosts = cluster_->prioritySet().hostSetsPerPriority()[1]->hosts(); + EXPECT_EQ(hosts.size(), 1); + + EXPECT_TRUE(hosts[0]->healthFlagGet(Host::HealthFlag::FAILED_ACTIVE_HC)); + EXPECT_EQ(1, hosts[0]->priority()); + // Mark the host as healthy and remove the pending active hc flag. + hosts[0]->healthFlagClear(Host::HealthFlag::FAILED_ACTIVE_HC); + hosts[0]->healthFlagClear(Host::HealthFlag::PENDING_ACTIVE_HC); + } + + // Moves the endpoints between priorities + cluster_load_assignment.clear_endpoints(); + add_endpoint(81, 0); + add_endpoint(80, 1); + // Verify that no hosts gets added or removed to/from the PrioritySet. + cluster_->prioritySet().addMemberUpdateCb([&](const auto& added, const auto& removed) { + EXPECT_TRUE(added.empty()); + EXPECT_TRUE(removed.empty()); + }); + doOnConfigUpdateVerifyNoThrow(cluster_load_assignment); + + { + auto& hosts = cluster_->prioritySet().hostSetsPerPriority()[0]->hosts(); + EXPECT_EQ(hosts.size(), 1); + + // assert that it moved + EXPECT_EQ(hosts[0]->address()->asString(), "1.2.3.4:81"); + EXPECT_EQ(0, hosts[0]->priority()); + + // The endpoint was healthy in the original priority, so moving it + // around should preserve that. + EXPECT_FALSE(hosts[0]->healthFlagGet(Host::HealthFlag::FAILED_ACTIVE_HC)); + EXPECT_FALSE(hosts[0]->healthFlagGet(Host::HealthFlag::PENDING_DYNAMIC_REMOVAL)); + } + + { + auto& hosts = cluster_->prioritySet().hostSetsPerPriority()[1]->hosts(); + EXPECT_EQ(hosts.size(), 1); + + // assert that it moved + EXPECT_EQ(hosts[0]->address()->asString(), "1.2.3.4:80"); + EXPECT_EQ(1, hosts[0]->priority()); + + // The endpoint was healthy in the original priority, so moving it + // around should preserve that. + EXPECT_FALSE(hosts[0]->healthFlagGet(Host::HealthFlag::FAILED_ACTIVE_HC)); + EXPECT_FALSE(hosts[0]->healthFlagGet(Host::HealthFlag::PENDING_DYNAMIC_REMOVAL)); + } +} + +// Verifies that if an endpoint is moved to a new priority and has its health check address altered +// then nothing bad happens +TEST_F(EdsTest, EndpointMovedToNewPriorityWithHealthAddressChange) { + envoy::config::endpoint::v3::ClusterLoadAssignment cluster_load_assignment; + cluster_load_assignment.set_cluster_name("fare"); + resetCluster(); + + auto health_checker = std::make_shared(); + EXPECT_CALL(*health_checker, start()); + EXPECT_CALL(*health_checker, addHostCheckCompleteCb(_)).Times(2); + cluster_->setHealthChecker(health_checker); + + auto add_endpoint = [&cluster_load_assignment](int port, int priority, int health_port) { + auto* endpoints = cluster_load_assignment.add_endpoints(); + endpoints->set_priority(priority); + auto* endpoint = endpoints->add_lb_endpoints()->mutable_endpoint(); + + auto* socket_address = endpoint->mutable_address()->mutable_socket_address(); + socket_address->set_address("1.2.3.4"); + socket_address->set_port_value(port); + + endpoint->mutable_health_check_config()->set_port_value(health_port); + }; + + add_endpoint(80, 0, 80); + add_endpoint(81, 1, 81); + + doOnConfigUpdateVerifyNoThrow(cluster_load_assignment); + + { + auto& hosts = cluster_->prioritySet().hostSetsPerPriority()[0]->hosts(); + EXPECT_EQ(hosts.size(), 1); + + EXPECT_TRUE(hosts[0]->healthFlagGet(Host::HealthFlag::FAILED_ACTIVE_HC)); + hosts[0]->healthFlagClear(Host::HealthFlag::FAILED_ACTIVE_HC); + hosts[0]->healthFlagClear(Host::HealthFlag::PENDING_ACTIVE_HC); + } + + { + auto& hosts = cluster_->prioritySet().hostSetsPerPriority()[1]->hosts(); + EXPECT_EQ(hosts.size(), 1); + + EXPECT_TRUE(hosts[0]->healthFlagGet(Host::HealthFlag::FAILED_ACTIVE_HC)); + hosts[0]->healthFlagClear(Host::HealthFlag::FAILED_ACTIVE_HC); + hosts[0]->healthFlagClear(Host::HealthFlag::PENDING_ACTIVE_HC); + } + + cluster_load_assignment.clear_endpoints(); + add_endpoint(80, 0, 80); + add_endpoint(81, 0, 82); + + // Changing a health check endpoint at the same time as priority is an add and immediate remove + cluster_->prioritySet().addMemberUpdateCb([&](const auto& added, const auto& removed) { + EXPECT_EQ(added.size(), 1); + EXPECT_EQ(removed.size(), 1); + }); + + doOnConfigUpdateVerifyNoThrow(cluster_load_assignment); + + { + auto& hosts = cluster_->prioritySet().hostSetsPerPriority()[0]->hosts(); + EXPECT_EQ(hosts.size(), 2); + + EXPECT_EQ(hosts[1]->address()->asString(), "1.2.3.4:81"); + EXPECT_TRUE(hosts[1]->healthFlagGet(Host::HealthFlag::FAILED_ACTIVE_HC)); + hosts[1]->healthFlagClear(Host::HealthFlag::FAILED_ACTIVE_HC); + hosts[1]->healthFlagClear(Host::HealthFlag::PENDING_ACTIVE_HC); + } + + cluster_load_assignment.clear_endpoints(); + add_endpoint(80, 0, 80); + add_endpoint(81, 1, 83); + + // Changing a health check endpoint at the same time as priority is an add and immediate remove + cluster_->prioritySet().addMemberUpdateCb([&](const auto& added, const auto& removed) { + EXPECT_EQ(added.size(), 1); + EXPECT_EQ(removed.size(), 1); + }); + + doOnConfigUpdateVerifyNoThrow(cluster_load_assignment); + + { + auto& hosts = cluster_->prioritySet().hostSetsPerPriority()[0]->hosts(); + EXPECT_EQ(hosts.size(), 1); + } + + { + auto& hosts = cluster_->prioritySet().hostSetsPerPriority()[1]->hosts(); + EXPECT_EQ(hosts.size(), 1); + + EXPECT_EQ(hosts[0]->address()->asString(), "1.2.3.4:81"); + EXPECT_TRUE(hosts[0]->healthFlagGet(Host::HealthFlag::FAILED_ACTIVE_HC)); } } diff --git a/test/extensions/clusters/redis/redis_cluster_test.cc b/test/extensions/clusters/redis/redis_cluster_test.cc index 80a84ac10b084..3330fde17bf0e 100644 --- a/test/extensions/clusters/redis/redis_cluster_test.cc +++ b/test/extensions/clusters/redis/redis_cluster_test.cc @@ -977,34 +977,75 @@ TEST_F(RedisClusterTest, HostRemovalAfterHcFail) { cluster_->initialize([&]() -> void { initialized_.ready(); }); EXPECT_CALL(*cluster_callback_, onClusterSlotUpdate(_, _)); - expectClusterSlotResponse(singleSlotPrimaryReplica("127.0.0.1", "127.0.0.2", 22120)); + expectClusterSlotResponse(twoSlotsPrimariesWithReplica()); - // Verify that both hosts are initially marked with FAILED_ACTIVE_HC, then + // Verify that all hosts are initially marked with FAILED_ACTIVE_HC, then // clear the flag to simulate that these hosts have been successfully health // checked. { EXPECT_CALL(membership_updated_, ready()); const auto& hosts = cluster_->prioritySet().hostSetsPerPriority()[0]->hosts(); - EXPECT_EQ(2UL, hosts.size()); + EXPECT_EQ(4UL, hosts.size()); - for (size_t i = 0; i < 2; ++i) { + for (size_t i = 0; i < 4; ++i) { EXPECT_TRUE(hosts[i]->healthFlagGet(Upstream::Host::HealthFlag::FAILED_ACTIVE_HC)); hosts[i]->healthFlagClear(Upstream::Host::HealthFlag::FAILED_ACTIVE_HC); hosts[i]->healthFlagClear(Upstream::Host::HealthFlag::PENDING_ACTIVE_HC); health_checker->runCallbacks(hosts[i], Upstream::HealthTransition::Changed); } - expectHealthyHosts(std::list({"127.0.0.1:22120", "127.0.0.2:22120"})); + expectHealthyHosts(std::list( + {"127.0.0.1:22120", "127.0.0.3:22120", "127.0.0.2:22120", "127.0.0.4:22120"})); } - // Failed HC - EXPECT_CALL(membership_updated_, ready()); - EXPECT_CALL(*cluster_callback_, onHostHealthUpdate()); - const auto& hosts = cluster_->prioritySet().hostSetsPerPriority()[0]->hosts(); - hosts[1]->healthFlagSet(Upstream::Host::HealthFlag::FAILED_ACTIVE_HC); - health_checker->runCallbacks(hosts[1], Upstream::HealthTransition::Changed); + // Fail a HC for one of the hosts + { + EXPECT_CALL(membership_updated_, ready()); + EXPECT_CALL(*cluster_callback_, onHostHealthUpdate()); + const auto& hosts = cluster_->prioritySet().hostSetsPerPriority()[0]->hosts(); + hosts[2]->healthFlagSet(Upstream::Host::HealthFlag::FAILED_ACTIVE_HC); + health_checker->runCallbacks(hosts[2], Upstream::HealthTransition::Changed); + + EXPECT_THAT(cluster_->prioritySet().hostSetsPerPriority()[0]->hosts().size(), 4U); + EXPECT_THAT(cluster_->prioritySet().hostSetsPerPriority()[0]->healthyHosts().size(), 3U); + } - EXPECT_THAT(2U, cluster_->prioritySet().hostSetsPerPriority()[0]->hosts().size()); - EXPECT_THAT(1U, cluster_->prioritySet().hostSetsPerPriority()[0]->healthyHosts().size()); + // Remove 2nd shard. + { + expectRedisResolve(); + EXPECT_CALL(membership_updated_, ready()); + resolve_timer_->invokeCallback(); + EXPECT_CALL(*cluster_callback_, onClusterSlotUpdate(_, _)); + expectClusterSlotResponse(singleSlotPrimaryReplica("127.0.0.1", "127.0.0.3", 22120)); + + const auto& hosts = cluster_->prioritySet().hostSetsPerPriority()[0]->hosts(); + + // We expect the host that failed health checks to be instantly removed, + // but the other should remain until its health check fails too. + EXPECT_THAT(cluster_->prioritySet().hostSetsPerPriority()[0]->hosts().size(), 3U); + expectHealthyHosts( + std::list({"127.0.0.1:22120", "127.0.0.3:22120", "127.0.0.4:22120"})); + EXPECT_TRUE(hosts[2]->healthFlagGet(Upstream::Host::HealthFlag::PENDING_DYNAMIC_REMOVAL)); + } + + /* + // TODO(#14630) This part of the test doesn't pass, as removal of PENDING_DYNAMIC_REMOVAL hosts + // does not seem to be implemented for redis clusters at present. + + // Fail the HC for the remaining removed host + { + EXPECT_CALL(membership_updated_, ready()); + EXPECT_CALL(*cluster_callback_, onHostHealthUpdate()); + const auto& hosts = cluster_->prioritySet().hostSetsPerPriority()[0]->hosts(); + EXPECT_TRUE(hosts[2]->healthFlagGet(Upstream::Host::HealthFlag::PENDING_DYNAMIC_REMOVAL)); + hosts[2]->healthFlagSet(Upstream::Host::HealthFlag::FAILED_ACTIVE_HC); + health_checker->runCallbacks(hosts[2], Upstream::HealthTransition::Changed); + + // The pending removal host should also have been removed now + EXPECT_THAT(cluster_->prioritySet().hostSetsPerPriority()[0]->hosts().size(), 2U); + expectHealthyHosts(std::list( + {"127.0.0.1:22120", "127.0.0.3:22120"})); + } + */ } } // namespace Redis