diff --git a/envoy/upstream/upstream.h b/envoy/upstream/upstream.h index d8b73f39de23f..54b78cc94f2fa 100644 --- a/envoy/upstream/upstream.h +++ b/envoy/upstream/upstream.h @@ -197,6 +197,8 @@ using HealthyHostVector = Phantom; using DegradedHostVector = Phantom; using ExcludedHostVector = Phantom; using HostMap = absl::flat_hash_map; +using HostMapSharedPtr = std::shared_ptr; +using HostMapConstSharedPtr = std::shared_ptr; using HostVectorSharedPtr = std::shared_ptr; using HostVectorConstSharedPtr = std::shared_ptr; @@ -424,6 +426,12 @@ class PrioritySet { */ virtual const std::vector& hostSetsPerPriority() const PURE; + /** + * @return HostMapConstSharedPtr read only cross priority host map that indexed by host address + * string. + */ + virtual HostMapConstSharedPtr crossPriorityHostMap() const PURE; + /** * Parameter class for updateHosts. */ @@ -447,11 +455,14 @@ class PrioritySet { * @param hosts_added supplies the hosts added since the last update. * @param hosts_removed supplies the hosts removed since the last update. * @param overprovisioning_factor if presents, overwrites the current overprovisioning_factor. + * @param cross_priority_host_map read only cross-priority host map which is created in the main + * thread and shared by all the worker threads. */ - virtual void updateHosts(uint32_t priority, UpdateHostsParams&& update_host_params, + virtual void updateHosts(uint32_t priority, UpdateHostsParams&& update_hosts_params, LocalityWeightsConstSharedPtr locality_weights, const HostVector& hosts_added, const HostVector& hosts_removed, - absl::optional overprovisioning_factor) PURE; + absl::optional overprovisioning_factor, + HostMapConstSharedPtr cross_priority_host_map = nullptr) PURE; /** * Callback provided during batch updates that can be used to update hosts. @@ -469,7 +480,7 @@ class PrioritySet { * @param hosts_removed supplies the hosts removed since the last update. * @param overprovisioning_factor if presents, overwrites the current overprovisioning_factor. */ - virtual void updateHosts(uint32_t priority, UpdateHostsParams&& update_host_params, + virtual void updateHosts(uint32_t priority, UpdateHostsParams&& update_hosts_params, LocalityWeightsConstSharedPtr locality_weights, const HostVector& hosts_added, const HostVector& hosts_removed, absl::optional overprovisioning_factor) PURE; diff --git a/source/common/upstream/cluster_manager_impl.cc b/source/common/upstream/cluster_manager_impl.cc index 21461da3bbdb8..5e5fd6f5d6dfd 100644 --- a/source/common/upstream/cluster_manager_impl.cc +++ b/source/common/upstream/cluster_manager_impl.cc @@ -978,35 +978,37 @@ void ClusterManagerImpl::postThreadLocalClusterUpdate(ClusterManagerCluster& cm_ per_priority.overprovisioning_factor_ = host_set->overprovisioningFactor(); } - tls_.runOnAllThreads( - [info = cm_cluster.cluster().info(), params = std::move(params), add_or_update_cluster, - load_balancer_factory](OptRef cluster_manager) { - ThreadLocalClusterManagerImpl::ClusterEntry* new_cluster = nullptr; - if (add_or_update_cluster) { - if (cluster_manager->thread_local_clusters_.count(info->name()) > 0) { - ENVOY_LOG(debug, "updating TLS cluster {}", info->name()); - } else { - ENVOY_LOG(debug, "adding TLS cluster {}", info->name()); - } + HostMapConstSharedPtr host_map = cm_cluster.cluster().prioritySet().crossPriorityHostMap(); + + tls_.runOnAllThreads([info = cm_cluster.cluster().info(), params = std::move(params), + add_or_update_cluster, load_balancer_factory, map = std::move(host_map)]( + OptRef cluster_manager) { + ThreadLocalClusterManagerImpl::ClusterEntry* new_cluster = nullptr; + if (add_or_update_cluster) { + if (cluster_manager->thread_local_clusters_.count(info->name()) > 0) { + ENVOY_LOG(debug, "updating TLS cluster {}", info->name()); + } else { + ENVOY_LOG(debug, "adding TLS cluster {}", info->name()); + } - new_cluster = new ThreadLocalClusterManagerImpl::ClusterEntry(*cluster_manager, info, - load_balancer_factory); - cluster_manager->thread_local_clusters_[info->name()].reset(new_cluster); - } + new_cluster = new ThreadLocalClusterManagerImpl::ClusterEntry(*cluster_manager, info, + load_balancer_factory); + cluster_manager->thread_local_clusters_[info->name()].reset(new_cluster); + } - for (const auto& per_priority : params.per_priority_update_params_) { - cluster_manager->updateClusterMembership( - info->name(), per_priority.priority_, per_priority.update_hosts_params_, - per_priority.locality_weights_, per_priority.hosts_added_, - per_priority.hosts_removed_, per_priority.overprovisioning_factor_); - } + for (const auto& per_priority : params.per_priority_update_params_) { + cluster_manager->updateClusterMembership( + info->name(), per_priority.priority_, per_priority.update_hosts_params_, + per_priority.locality_weights_, per_priority.hosts_added_, per_priority.hosts_removed_, + per_priority.overprovisioning_factor_, map); + } - if (new_cluster != nullptr) { - for (auto& cb : cluster_manager->update_callbacks_) { - cb->onClusterAddOrUpdate(*new_cluster); - } - } - }); + if (new_cluster != nullptr) { + for (auto& cb : cluster_manager->update_callbacks_) { + cb->onClusterAddOrUpdate(*new_cluster); + } + } + }); } void ClusterManagerImpl::postThreadLocalHealthFailure(const HostSharedPtr& host) { @@ -1046,11 +1048,13 @@ void ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::updateHost const std::string& name, uint32_t priority, PrioritySet::UpdateHostsParams&& update_hosts_params, LocalityWeightsConstSharedPtr locality_weights, const HostVector& hosts_added, - const HostVector& hosts_removed, absl::optional overprovisioning_factor) { + const HostVector& hosts_removed, absl::optional overprovisioning_factor, + HostMapConstSharedPtr cross_priority_host_map) { ENVOY_LOG(debug, "membership update for TLS cluster {} added {} removed {}", name, hosts_added.size(), hosts_removed.size()); priority_set_.updateHosts(priority, std::move(update_hosts_params), std::move(locality_weights), - hosts_added, hosts_removed, overprovisioning_factor); + hosts_added, hosts_removed, overprovisioning_factor, + std::move(cross_priority_host_map)); // If an LB is thread aware, create a new worker local LB on membership changes. if (lb_factory_ != nullptr) { ENVOY_LOG(debug, "re-creating local LB for TLS cluster {}", name); @@ -1224,12 +1228,13 @@ void ClusterManagerImpl::ThreadLocalClusterManagerImpl::removeHosts( void ClusterManagerImpl::ThreadLocalClusterManagerImpl::updateClusterMembership( const std::string& name, uint32_t priority, PrioritySet::UpdateHostsParams update_hosts_params, LocalityWeightsConstSharedPtr locality_weights, const HostVector& hosts_added, - const HostVector& hosts_removed, uint64_t overprovisioning_factor) { + const HostVector& hosts_removed, uint64_t overprovisioning_factor, + HostMapConstSharedPtr cross_priority_host_map) { ASSERT(thread_local_clusters_.find(name) != thread_local_clusters_.end()); const auto& cluster_entry = thread_local_clusters_[name]; cluster_entry->updateHosts(name, priority, std::move(update_hosts_params), std::move(locality_weights), hosts_added, hosts_removed, - overprovisioning_factor); + overprovisioning_factor, std::move(cross_priority_host_map)); } void ClusterManagerImpl::ThreadLocalClusterManagerImpl::onHostHealthFailure( diff --git a/source/common/upstream/cluster_manager_impl.h b/source/common/upstream/cluster_manager_impl.h index d606e68b7ed12..b7a982aa47c9f 100644 --- a/source/common/upstream/cluster_manager_impl.h +++ b/source/common/upstream/cluster_manager_impl.h @@ -423,7 +423,8 @@ class ClusterManagerImpl : public ClusterManager, Logger::Loggable overprovisioning_factor); + absl::optional overprovisioning_factor, + HostMapConstSharedPtr cross_priority_host_map); // Drains any connection pools associated with the removed hosts. void drainConnPools(const HostVector& hosts_removed); @@ -471,7 +472,8 @@ class ClusterManagerImpl : public ClusterManager, Logger::Loggablestart({cluster_name_}); } void EdsClusterImpl::BatchUpdateHelper::batchUpdate(PrioritySet::HostUpdateCb& host_update_cb) { - absl::flat_hash_map updated_hosts; absl::flat_hash_set all_new_hosts; PriorityStateManager priority_state_manager(parent_, parent_.local_info_, &host_update_cb); for (const auto& locality_lb_endpoint : cluster_load_assignment_.endpoints()) { @@ -56,6 +55,11 @@ void EdsClusterImpl::BatchUpdateHelper::batchUpdate(PrioritySet::HostUpdateCb& h for (const auto& lb_endpoint : locality_lb_endpoint.lb_endpoints()) { auto address = parent_.resolveProtoAddress(lb_endpoint.endpoint().address()); + // When the configuration contains duplicate hosts, only the first one will be retained. + if (all_new_hosts.count(address->asString()) > 0) { + continue; + } + priority_state_manager.registerHostForPriority(lb_endpoint.endpoint().hostname(), address, locality_lb_endpoint, lb_endpoint, parent_.time_source_); @@ -66,6 +70,11 @@ void EdsClusterImpl::BatchUpdateHelper::batchUpdate(PrioritySet::HostUpdateCb& h // Track whether we rebuilt any LB structures. bool cluster_rebuilt = false; + // Get the map of all the latest existing hosts, which is used to filter out the existing + // hosts in the process of updating cluster memberships. + HostMapConstSharedPtr all_hosts = parent_.prioritySet().crossPriorityHostMap(); + ASSERT(all_hosts != nullptr); + const uint32_t overprovisioning_factor = PROTOBUF_GET_WRAPPED_OR_DEFAULT( cluster_load_assignment_.policy(), overprovisioning_factor, kDefaultOverProvisioningFactor); @@ -80,13 +89,13 @@ void EdsClusterImpl::BatchUpdateHelper::batchUpdate(PrioritySet::HostUpdateCb& h if (priority_state[i].first != nullptr) { cluster_rebuilt |= parent_.updateHostsPerLocality( i, overprovisioning_factor, *priority_state[i].first, parent_.locality_weights_map_[i], - priority_state[i].second, priority_state_manager, updated_hosts, all_new_hosts); + priority_state[i].second, priority_state_manager, *all_hosts, all_new_hosts); } else { // If the new update contains a priority with no hosts, call the update function with an empty // set of hosts. cluster_rebuilt |= parent_.updateHostsPerLocality( i, overprovisioning_factor, {}, parent_.locality_weights_map_[i], empty_locality_map, - priority_state_manager, updated_hosts, all_new_hosts); + priority_state_manager, *all_hosts, all_new_hosts); } } @@ -99,11 +108,9 @@ void EdsClusterImpl::BatchUpdateHelper::batchUpdate(PrioritySet::HostUpdateCb& h } cluster_rebuilt |= parent_.updateHostsPerLocality( i, overprovisioning_factor, {}, parent_.locality_weights_map_[i], empty_locality_map, - priority_state_manager, updated_hosts, all_new_hosts); + priority_state_manager, *all_hosts, all_new_hosts); } - parent_.all_hosts_ = std::move(updated_hosts); - if (!cluster_rebuilt) { parent_.info_->stats().update_no_rebuild_.inc(); } @@ -226,18 +233,12 @@ void EdsClusterImpl::reloadHealthyHostsHelper(const HostSharedPtr& host) { HostSetImpl::partitionHosts(hosts_copy, hosts_per_locality_copy), host_set->localityWeights(), {}, hosts_to_remove, absl::nullopt); } - - if (host_to_exclude != nullptr) { - ASSERT(all_hosts_.find(host_to_exclude->address()->asString()) != all_hosts_.end()); - all_hosts_.erase(host_to_exclude->address()->asString()); - } } bool EdsClusterImpl::updateHostsPerLocality( const uint32_t priority, const uint32_t overprovisioning_factor, const HostVector& new_hosts, LocalityWeightsMap& locality_weights_map, LocalityWeightsMap& new_locality_weights_map, - PriorityStateManager& priority_state_manager, - absl::flat_hash_map& updated_hosts, + PriorityStateManager& priority_state_manager, const HostMap& all_hosts, const absl::flat_hash_set& all_new_hosts) { const auto& host_set = priority_set_.getOrCreateHostSet(priority, overprovisioning_factor); HostVectorSharedPtr current_hosts_copy(new HostVector(host_set.hosts())); @@ -254,9 +255,8 @@ bool EdsClusterImpl::updateHostsPerLocality( // performance implications, since this has the knock on effect that we rebuild the load balancers // and locality scheduler. See the comment in BaseDynamicClusterImpl::updateDynamicHostList // about this. In the future we may need to do better here. - const bool hosts_updated = - updateDynamicHostList(new_hosts, *current_hosts_copy, hosts_added, hosts_removed, - updated_hosts, all_hosts_, all_new_hosts); + const bool hosts_updated = updateDynamicHostList(new_hosts, *current_hosts_copy, hosts_added, + hosts_removed, all_hosts, all_new_hosts); if (hosts_updated || host_set.overprovisioningFactor() != overprovisioning_factor || locality_weights_map != new_locality_weights_map) { ASSERT(std::all_of(current_hosts_copy->begin(), current_hosts_copy->end(), diff --git a/source/common/upstream/eds.h b/source/common/upstream/eds.h index 45469661df798..f396b5e1785f7 100644 --- a/source/common/upstream/eds.h +++ b/source/common/upstream/eds.h @@ -50,7 +50,8 @@ class EdsClusterImpl bool updateHostsPerLocality(const uint32_t priority, const uint32_t overprovisioning_factor, const HostVector& new_hosts, LocalityWeightsMap& locality_weights_map, LocalityWeightsMap& new_locality_weights_map, - PriorityStateManager& priority_state_manager, HostMap& updated_hosts, + PriorityStateManager& priority_state_manager, + const HostMap& all_hosts, const absl::flat_hash_set& all_new_hosts); bool validateUpdateSize(int num_resources); @@ -78,7 +79,6 @@ class EdsClusterImpl const LocalInfo::LocalInfo& local_info_; const std::string cluster_name_; std::vector locality_weights_map_; - HostMap all_hosts_; Event::TimerPtr assignment_timeout_; InitializePhase initialize_phase_; }; diff --git a/source/common/upstream/strict_dns_cluster.cc b/source/common/upstream/strict_dns_cluster.cc index d46f3f96edfa9..0d03ea0a00f63 100644 --- a/source/common/upstream/strict_dns_cluster.cc +++ b/source/common/upstream/strict_dns_cluster.cc @@ -117,7 +117,6 @@ void StrictDnsClusterImpl::ResolveTarget::startResolve() { if (status == Network::DnsResolver::ResolutionStatus::Success) { parent_.info_->stats().update_success_.inc(); - HostMap updated_hosts; HostVector new_hosts; std::chrono::seconds ttl_refresh_rate = std::chrono::seconds::max(); absl::flat_hash_set all_new_hosts; @@ -127,33 +126,44 @@ void StrictDnsClusterImpl::ResolveTarget::startResolve() { // potentially move port handling into the DNS interface itself, which would work better // for SRV. ASSERT(resp.address_ != nullptr); + auto address = Network::Utility::getAddressWithPort(*(resp.address_), port_); + if (all_new_hosts.count(address->asString()) > 0) { + continue; + } + new_hosts.emplace_back(new HostImpl( - parent_.info_, hostname_, - Network::Utility::getAddressWithPort(*(resp.address_), port_), + parent_.info_, hostname_, address, // TODO(zyfjeff): Created through metadata shared pool std::make_shared(lb_endpoint_.metadata()), lb_endpoint_.load_balancing_weight().value(), locality_lb_endpoints_.locality(), lb_endpoint_.endpoint().health_check_config(), locality_lb_endpoints_.priority(), lb_endpoint_.health_status(), parent_.time_source_)); - all_new_hosts.emplace(new_hosts.back()->address()->asString()); + all_new_hosts.emplace(address->asString()); ttl_refresh_rate = min(ttl_refresh_rate, resp.ttl_); } HostVector hosts_added; HostVector hosts_removed; if (parent_.updateDynamicHostList(new_hosts, hosts_, hosts_added, hosts_removed, - updated_hosts, all_hosts_, all_new_hosts)) { + all_hosts_, all_new_hosts)) { ENVOY_LOG(debug, "DNS hosts have changed for {}", dns_address_); ASSERT(std::all_of(hosts_.begin(), hosts_.end(), [&](const auto& host) { return host->priority() == locality_lb_endpoints_.priority(); })); + + // Update host map for current resolve target. + for (const auto& host : hosts_removed) { + all_hosts_.erase(host->address()->asString()); + } + for (const auto& host : hosts_added) { + all_hosts_.insert({host->address()->asString(), host}); + } + parent_.updateAllHosts(hosts_added, hosts_removed, locality_lb_endpoints_.priority()); } else { parent_.info_->stats().update_no_rebuild_.inc(); } - all_hosts_ = std::move(updated_hosts); - // reset failure backoff strategy because there was a success. parent_.failure_backoff_strategy_->reset(); diff --git a/source/common/upstream/strict_dns_cluster.h b/source/common/upstream/strict_dns_cluster.h index 6be3a83f436ec..70c3040414524 100644 --- a/source/common/upstream/strict_dns_cluster.h +++ b/source/common/upstream/strict_dns_cluster.h @@ -41,6 +41,13 @@ class StrictDnsClusterImpl : public BaseDynamicClusterImpl { const uint32_t port_; const Event::TimerPtr resolve_timer_; HostVector hosts_; + + // Host map for current resolve target. When we have multiple resolve targets, multiple targets + // may contain two different hosts with the same address. This has two effects: + // 1) This host map cannot be replaced by the cross-priority global host map in the priority + // set. + // 2) Cross-priority global host map may not be able to search for the expected host based on + // the address. HostMap all_hosts_; }; diff --git a/source/common/upstream/upstream_impl.cc b/source/common/upstream/upstream_impl.cc index 44097ffe65de4..91a30b4d92dc8 100644 --- a/source/common/upstream/upstream_impl.cc +++ b/source/common/upstream/upstream_impl.cc @@ -551,13 +551,18 @@ PrioritySetImpl::getOrCreateHostSet(uint32_t priority, void PrioritySetImpl::updateHosts(uint32_t priority, UpdateHostsParams&& update_hosts_params, LocalityWeightsConstSharedPtr locality_weights, const HostVector& hosts_added, const HostVector& hosts_removed, - absl::optional overprovisioning_factor) { + absl::optional overprovisioning_factor, + HostMapConstSharedPtr cross_priority_host_map) { // Ensure that we have a HostSet for the given priority. getOrCreateHostSet(priority, overprovisioning_factor); static_cast(host_sets_[priority].get()) ->updateHosts(std::move(update_hosts_params), std::move(locality_weights), hosts_added, hosts_removed, overprovisioning_factor); + if (cross_priority_host_map != nullptr) { + const_cross_priority_host_map_ = std::move(cross_priority_host_map); + } + if (!batch_update_) { runUpdateCallbacks(hosts_added, hosts_removed); } @@ -596,6 +601,52 @@ void PrioritySetImpl::BatchUpdateScope::updateHosts( hosts_removed, overprovisioning_factor); } +void MainPrioritySetImpl::updateHosts(uint32_t priority, UpdateHostsParams&& update_hosts_params, + LocalityWeightsConstSharedPtr locality_weights, + const HostVector& hosts_added, + const HostVector& hosts_removed, + absl::optional overprovisioning_factor, + HostMapConstSharedPtr cross_priority_host_map) { + ASSERT(cross_priority_host_map == nullptr, + "External cross-priority host map is meaningless to MainPrioritySetImpl"); + updateCrossPriorityHostMap(hosts_added, hosts_removed); + + PrioritySetImpl::updateHosts(priority, std::move(update_hosts_params), locality_weights, + hosts_added, hosts_removed, overprovisioning_factor); +} + +HostMapConstSharedPtr MainPrioritySetImpl::crossPriorityHostMap() const { + // Check if the host set in the main thread PrioritySet has been updated. + if (mutable_cross_priority_host_map_ != nullptr) { + const_cross_priority_host_map_ = std::move(mutable_cross_priority_host_map_); + ASSERT(mutable_cross_priority_host_map_ == nullptr); + } + return const_cross_priority_host_map_; +} + +void MainPrioritySetImpl::updateCrossPriorityHostMap(const HostVector& hosts_added, + const HostVector& hosts_removed) { + if (hosts_added.empty() && hosts_removed.empty()) { + // No new hosts have been added and no old hosts have been removed. + return; + } + + // Since read_only_all_host_map_ may be shared by multiple threads, when the host set changes, we + // cannot directly modify read_only_all_host_map_. + if (mutable_cross_priority_host_map_ == nullptr) { + // Copy old read only host map to mutable host map. + mutable_cross_priority_host_map_ = std::make_shared(*const_cross_priority_host_map_); + } + + for (const auto& host : hosts_removed) { + mutable_cross_priority_host_map_->erase(host->address()->asString()); + } + + for (const auto& host : hosts_added) { + mutable_cross_priority_host_map_->insert({host->address()->asString(), host}); + } +} + ClusterStats ClusterInfoImpl::generateStats(Stats::Scope& scope, const ClusterStatNames& stat_names) { return ClusterStats(stat_names, scope); @@ -1442,8 +1493,7 @@ void PriorityStateManager::updateClusterPrioritySet( bool BaseDynamicClusterImpl::updateDynamicHostList( const HostVector& new_hosts, HostVector& current_priority_hosts, HostVector& hosts_added_to_current_priority, HostVector& hosts_removed_from_current_priority, - HostMap& updated_hosts, const HostMap& all_hosts, - const absl::flat_hash_set& all_new_hosts) { + const HostMap& all_hosts, const absl::flat_hash_set& all_new_hosts) { uint64_t max_host_weight = 1; // Did hosts change? @@ -1472,10 +1522,6 @@ bool BaseDynamicClusterImpl::updateDynamicHostList( absl::flat_hash_set new_hosts_for_current_priority(new_hosts.size()); HostVector final_hosts; for (const HostSharedPtr& host : new_hosts) { - if (updated_hosts.count(host->address()->asString())) { - continue; - } - // To match a new host with an existing host means comparing their addresses. auto existing_host = all_hosts.find(host->address()->asString()); const bool existing_host_found = existing_host != all_hosts.end(); @@ -1551,7 +1597,6 @@ bool BaseDynamicClusterImpl::updateDynamicHostList( } final_hosts.push_back(existing_host->second); - updated_hosts[existing_host->second->address()->asString()] = existing_host->second; } else { new_hosts_for_current_priority.emplace(host->address()->asString()); if (host->weight() > max_host_weight) { @@ -1569,7 +1614,6 @@ bool BaseDynamicClusterImpl::updateDynamicHostList( } } - updated_hosts[host->address()->asString()] = host; final_hosts.push_back(host); hosts_added_to_current_priority.push_back(host); } @@ -1610,8 +1654,8 @@ bool BaseDynamicClusterImpl::updateDynamicHostList( if (!current_priority_hosts.empty() && dont_remove_healthy_hosts) { erase_from = std::remove_if(current_priority_hosts.begin(), current_priority_hosts.end(), - [&all_new_hosts, &new_hosts_for_current_priority, &updated_hosts, - &final_hosts, &max_host_weight](const HostSharedPtr& p) { + [&all_new_hosts, &new_hosts_for_current_priority, &final_hosts, + &max_host_weight](const HostSharedPtr& p) { if (all_new_hosts.contains(p->address()->asString()) && !new_hosts_for_current_priority.contains(p->address()->asString())) { // If the address is being completely deleted from this priority, but is @@ -1629,7 +1673,6 @@ bool BaseDynamicClusterImpl::updateDynamicHostList( } final_hosts.push_back(p); - updated_hosts[p->address()->asString()] = p; p->healthFlagSet(Host::HealthFlag::PENDING_DYNAMIC_REMOVAL); return true; } diff --git a/source/common/upstream/upstream_impl.h b/source/common/upstream/upstream_impl.h index 6674082cd8d45..5892039abe88f 100644 --- a/source/common/upstream/upstream_impl.h +++ b/source/common/upstream/upstream_impl.h @@ -485,10 +485,15 @@ class PrioritySetImpl : public PrioritySet { void updateHosts(uint32_t priority, UpdateHostsParams&& update_hosts_params, LocalityWeightsConstSharedPtr locality_weights, const HostVector& hosts_added, const HostVector& hosts_removed, - absl::optional overprovisioning_factor = absl::nullopt) override; + absl::optional overprovisioning_factor = absl::nullopt, + HostMapConstSharedPtr cross_priority_host_map = nullptr) override; void batchHostUpdate(BatchUpdateCb& callback) override; + HostMapConstSharedPtr crossPriorityHostMap() const override { + return const_cross_priority_host_map_; + } + protected: // Allows subclasses of PrioritySetImpl to create their own type of HostSetImpl. virtual HostSetImplPtr createHostSet(uint32_t priority, @@ -509,6 +514,9 @@ class PrioritySetImpl : public PrioritySet { // avoid any potential lifetime issues. std::vector> host_sets_; + // Read only all host map for fast host searching. This will never be null. + mutable HostMapConstSharedPtr const_cross_priority_host_map_{std::make_shared()}; + private: // This is a matching vector to store the callback handles for host_sets_. It is kept separately // because host_sets_ is directly returned so we avoid translation. @@ -544,6 +552,26 @@ class PrioritySetImpl : public PrioritySet { }; }; +/** + * Specialized PrioritySetImpl designed for the main thread. It will update and maintain the read + * only cross priority host map when the host set changes. + */ +class MainPrioritySetImpl : public PrioritySetImpl, public Logger::Loggable { +public: + // PrioritySet + void updateHosts(uint32_t priority, UpdateHostsParams&& update_hosts_params, + LocalityWeightsConstSharedPtr locality_weights, const HostVector& hosts_added, + const HostVector& hosts_removed, + absl::optional overprovisioning_factor = absl::nullopt, + HostMapConstSharedPtr cross_priority_host_map = nullptr) override; + HostMapConstSharedPtr crossPriorityHostMap() const override; + +protected: + void updateCrossPriorityHostMap(const HostVector& hosts_added, const HostVector& hosts_removed); + + mutable HostMapSharedPtr mutable_cross_priority_host_map_; +}; + /** * Implementation of ClusterInfo that reads from JSON. */ @@ -880,7 +908,7 @@ class ClusterImplBase : public Cluster, protected Logger::Loggable& all_new_hosts); }; diff --git a/source/extensions/clusters/redis/redis_cluster.cc b/source/extensions/clusters/redis/redis_cluster.cc index 25ae1c8c7c66d..59d6f7ad0d3ca 100644 --- a/source/extensions/clusters/redis/redis_cluster.cc +++ b/source/extensions/clusters/redis/redis_cluster.cc @@ -94,19 +94,38 @@ void RedisCluster::onClusterSlotUpdate(ClusterSlotsPtr&& slots) { absl::flat_hash_set all_new_hosts; for (const ClusterSlot& slot : *slots) { - new_hosts.emplace_back(new RedisHost(info(), "", slot.primary(), *this, true, time_source_)); - all_new_hosts.emplace(slot.primary()->asString()); + if (all_new_hosts.count(slot.primary()->asString()) == 0) { + new_hosts.emplace_back(new RedisHost(info(), "", slot.primary(), *this, true, time_source_)); + all_new_hosts.emplace(slot.primary()->asString()); + } for (auto const& replica : slot.replicas()) { - new_hosts.emplace_back(new RedisHost(info(), "", replica.second, *this, false, time_source_)); - all_new_hosts.emplace(replica.first); + if (all_new_hosts.count(replica.first) == 0) { + new_hosts.emplace_back( + new RedisHost(info(), "", replica.second, *this, false, time_source_)); + all_new_hosts.emplace(replica.first); + } } } - Upstream::HostMap updated_hosts; + // Get the map of all the latest existing hosts, which is used to filter out the existing + // hosts in the process of updating cluster memberships. + Upstream::HostMapConstSharedPtr all_hosts = priority_set_.crossPriorityHostMap(); + ASSERT(all_hosts != nullptr); + Upstream::HostVector hosts_added; Upstream::HostVector hosts_removed; const bool host_updated = updateDynamicHostList(new_hosts, hosts_, hosts_added, hosts_removed, - updated_hosts, all_hosts_, all_new_hosts); + *all_hosts, all_new_hosts); + + // Create a map containing all the latest hosts to determine whether the slots are updated. + Upstream::HostMap updated_hosts = *all_hosts; + for (const auto& host : hosts_removed) { + updated_hosts.erase(host->address()->asString()); + } + for (const auto& host : hosts_added) { + updated_hosts[host->address()->asString()] = host; + } + const bool slot_updated = lb_factory_ ? lb_factory_->onClusterSlotUpdate(std::move(slots), updated_hosts) : false; @@ -121,8 +140,6 @@ void RedisCluster::onClusterSlotUpdate(ClusterSlotsPtr&& slots) { info_->stats().update_no_rebuild_.inc(); } - all_hosts_ = std::move(updated_hosts); - // TODO(hyang): If there is an initialize callback, fire it now. Note that if the // cluster refers to multiple DNS names, this will return initialized after a single // DNS resolution completes. This is not perfect but is easier to code and it is unclear diff --git a/source/extensions/clusters/redis/redis_cluster.h b/source/extensions/clusters/redis/redis_cluster.h index 96161c3514b11..ae8553b2effa9 100644 --- a/source/extensions/clusters/redis/redis_cluster.h +++ b/source/extensions/clusters/redis/redis_cluster.h @@ -276,7 +276,6 @@ class RedisCluster : public Upstream::BaseDynamicClusterImpl { const ClusterSlotUpdateCallBackSharedPtr lb_factory_; Upstream::HostVector hosts_; - Upstream::HostMap all_hosts_; const std::string auth_username_; const std::string auth_password_; diff --git a/source/extensions/clusters/redis/redis_cluster_lb.cc b/source/extensions/clusters/redis/redis_cluster_lb.cc index 25f38f448dae0..13e5af645efcc 100644 --- a/source/extensions/clusters/redis/redis_cluster_lb.cc +++ b/source/extensions/clusters/redis/redis_cluster_lb.cc @@ -18,7 +18,7 @@ bool ClusterSlot::operator==(const Envoy::Extensions::Clusters::Redis::ClusterSl // RedisClusterLoadBalancerFactory bool RedisClusterLoadBalancerFactory::onClusterSlotUpdate(ClusterSlotsPtr&& slots, - Envoy::Upstream::HostMap all_hosts) { + Envoy::Upstream::HostMap& all_hosts) { // The slots is sorted, allowing for a quick comparison to make sure we need to update the slot // array sort based on start and end to enable efficient comparison std::sort( diff --git a/source/extensions/clusters/redis/redis_cluster_lb.h b/source/extensions/clusters/redis/redis_cluster_lb.h index 4ac9f565e0b58..0edbfdedc93b6 100644 --- a/source/extensions/clusters/redis/redis_cluster_lb.h +++ b/source/extensions/clusters/redis/redis_cluster_lb.h @@ -109,7 +109,7 @@ class ClusterSlotUpdateCallBack { * @param all_hosts provides the updated hosts. * @return indicate if the cluster slot is updated or not. */ - virtual bool onClusterSlotUpdate(ClusterSlotsPtr&& slots, Upstream::HostMap all_hosts) PURE; + virtual bool onClusterSlotUpdate(ClusterSlotsPtr&& slots, Upstream::HostMap& all_hosts) PURE; /** * Callback when a host's health status is updated @@ -129,7 +129,7 @@ class RedisClusterLoadBalancerFactory : public ClusterSlotUpdateCallBack, RedisClusterLoadBalancerFactory(Random::RandomGenerator& random) : random_(random) {} // ClusterSlotUpdateCallBack - bool onClusterSlotUpdate(ClusterSlotsPtr&& slots, Upstream::HostMap all_hosts) override; + bool onClusterSlotUpdate(ClusterSlotsPtr&& slots, Upstream::HostMap& all_hosts) override; void onHostHealthUpdate() override; diff --git a/test/common/upstream/cluster_manager_impl_test.cc b/test/common/upstream/cluster_manager_impl_test.cc index 7cedf710a1c76..e4e51654429d1 100644 --- a/test/common/upstream/cluster_manager_impl_test.cc +++ b/test/common/upstream/cluster_manager_impl_test.cc @@ -3447,6 +3447,80 @@ TEST_F(ClusterManagerImplTest, HttpPoolDataForwardsCallsToConnectionPool) { opt_cp.value().addIdleCallback(drained_cb); } +// Test that the read only cross-priority host map in the main thread is correctly synchronized to +// the worker thread when the cluster's host set is updated. +TEST_F(ClusterManagerImplTest, CrossPriorityHostMapSyncTest) { + std::string yaml = R"EOF( + static_resources: + clusters: + - name: cluster_1 + connect_timeout: 0.250s + type: STATIC + lb_policy: ROUND_ROBIN + load_assignment: + cluster_name: cluster_1 + endpoints: + - lb_endpoints: + - endpoint: + address: + socket_address: + address: 127.0.0.1 + port_value: 11001 + - endpoint: + address: + socket_address: + address: 127.0.0.1 + port_value: 11002 + common_lb_config: + update_merge_window: 0s + )EOF"; + create(parseBootstrapFromV3Yaml(yaml)); + + Cluster& cluster = cluster_manager_->activeClusters().begin()->second; + EXPECT_EQ(2, cluster.prioritySet().crossPriorityHostMap()->size()); + EXPECT_EQ( + cluster_manager_->getThreadLocalCluster("cluster_1")->prioritySet().crossPriorityHostMap(), + cluster.prioritySet().crossPriorityHostMap()); + + HostVectorSharedPtr hosts( + new HostVector(cluster.prioritySet().hostSetsPerPriority()[0]->hosts())); + HostsPerLocalitySharedPtr hosts_per_locality = std::make_shared(); + HostVector hosts_added; + HostVector hosts_removed; + + hosts_removed.push_back((*hosts)[0]); + cluster.prioritySet().updateHosts( + 0, + updateHostsParams(hosts, hosts_per_locality, + std::make_shared(*hosts), hosts_per_locality), + {}, hosts_added, hosts_removed, absl::nullopt); + + EXPECT_EQ(1, factory_.stats_.counter("cluster_manager.cluster_updated").value()); + EXPECT_EQ(0, factory_.stats_.counter("cluster_manager.cluster_updated_via_merge").value()); + EXPECT_EQ(0, factory_.stats_.counter("cluster_manager.update_merge_cancelled").value()); + + EXPECT_EQ(1, cluster.prioritySet().crossPriorityHostMap()->size()); + EXPECT_EQ( + cluster_manager_->getThreadLocalCluster("cluster_1")->prioritySet().crossPriorityHostMap(), + cluster.prioritySet().crossPriorityHostMap()); + + hosts_added.push_back((*hosts)[0]); + hosts_removed.clear(); + cluster.prioritySet().updateHosts( + 0, + updateHostsParams(hosts, hosts_per_locality, + std::make_shared(*hosts), hosts_per_locality), + {}, hosts_added, hosts_removed, absl::nullopt); + EXPECT_EQ(2, factory_.stats_.counter("cluster_manager.cluster_updated").value()); + EXPECT_EQ(0, factory_.stats_.counter("cluster_manager.cluster_updated_via_merge").value()); + EXPECT_EQ(0, factory_.stats_.counter("cluster_manager.update_merge_cancelled").value()); + + EXPECT_EQ(2, cluster.prioritySet().crossPriorityHostMap()->size()); + EXPECT_EQ( + cluster_manager_->getThreadLocalCluster("cluster_1")->prioritySet().crossPriorityHostMap(), + cluster.prioritySet().crossPriorityHostMap()); +} + class TestUpstreamNetworkFilter : public Network::WriteFilter { public: Network::FilterStatus onWrite(Buffer::Instance&, bool) override { diff --git a/test/common/upstream/upstream_impl_test.cc b/test/common/upstream/upstream_impl_test.cc index 62c291cf0a72b..b20fbdea5a438 100644 --- a/test/common/upstream/upstream_impl_test.cc +++ b/test/common/upstream/upstream_impl_test.cc @@ -1,6 +1,7 @@ #include #include #include +#include #include #include #include @@ -2265,7 +2266,7 @@ TEST(PrioritySet, Extend) { auto member_update_cb = priority_set.addMemberUpdateCb( [&](const HostVector&, const HostVector&) -> void { ++membership_changes; }); - // The initial priority set starts with priority level 0.. + // The initial priority set starts with priority level 0. EXPECT_EQ(1, priority_set.hostSetsPerPriority().size()); EXPECT_EQ(0, priority_set.hostSetsPerPriority()[0]->hosts().size()); EXPECT_EQ(0, priority_set.hostSetsPerPriority()[0]->priority()); @@ -2289,21 +2290,26 @@ TEST(PrioritySet, Extend) { HostVectorSharedPtr hosts( new HostVector({makeTestHost(info, "tcp://127.0.0.1:80", *time_source)})); HostsPerLocalitySharedPtr hosts_per_locality = std::make_shared(); + HostMapConstSharedPtr fake_cross_priority_host_map = std::make_shared(); { HostVector hosts_added{hosts->front()}; HostVector hosts_removed{}; - priority_set.updateHosts(1, - updateHostsParams(hosts, hosts_per_locality, - std::make_shared(*hosts), - hosts_per_locality), - {}, hosts_added, hosts_removed, absl::nullopt); + priority_set.updateHosts( + 1, + updateHostsParams(hosts, hosts_per_locality, + std::make_shared(*hosts), hosts_per_locality), + {}, hosts_added, hosts_removed, absl::nullopt, fake_cross_priority_host_map); } EXPECT_EQ(1, priority_changes); EXPECT_EQ(1, membership_changes); EXPECT_EQ(last_priority, 1); EXPECT_EQ(1, priority_set.hostSetsPerPriority()[1]->hosts().size()); + // Simply verify the set and get the cross-priority host map is working properly in the priority + // set. + EXPECT_EQ(fake_cross_priority_host_map.get(), priority_set.crossPriorityHostMap().get()); + // Test iteration. int i = 0; for (auto& host_set : priority_set.hostSetsPerPriority()) { @@ -2328,6 +2334,73 @@ TEST(PrioritySet, Extend) { EXPECT_EQ(2, membership_changes); } +// Helper class used to test MainPrioritySetImpl. +class TestMainPrioritySetImpl : public MainPrioritySetImpl { +public: + HostMapConstSharedPtr constHostMapForTest() { return const_cross_priority_host_map_; } + HostMapSharedPtr mutableHostMapForTest() { return mutable_cross_priority_host_map_; } +}; + +// Test that the priority set in the main thread can work correctly. +TEST(PrioritySet, MainPrioritySetTest) { + TestMainPrioritySetImpl priority_set; + priority_set.getOrCreateHostSet(0); + + std::shared_ptr info{new NiceMock()}; + auto time_source = std::make_unique>(); + HostVectorSharedPtr hosts( + new HostVector({makeTestHost(info, "tcp://127.0.0.1:80", *time_source)})); + HostsPerLocalitySharedPtr hosts_per_locality = std::make_shared(); + + // The host map is initially empty or null. + EXPECT_TRUE(priority_set.constHostMapForTest()->empty()); + EXPECT_EQ(nullptr, priority_set.mutableHostMapForTest().get()); + + { + HostVector hosts_added{hosts->front()}; + HostVector hosts_removed{}; + + priority_set.updateHosts(1, + updateHostsParams(hosts, hosts_per_locality, + std::make_shared(*hosts), + hosts_per_locality), + {}, hosts_added, hosts_removed, absl::nullopt); + } + + // Only mutable host map can be updated directly. Read only host map will not be updated before + // `crossPriorityHostMap` is called. + EXPECT_TRUE(priority_set.constHostMapForTest()->empty()); + EXPECT_FALSE(priority_set.mutableHostMapForTest()->empty()); + + // Mutable host map will be moved to read only host map after `crossPriorityHostMap` is called. + HostMapSharedPtr host_map = priority_set.mutableHostMapForTest(); + EXPECT_EQ(host_map.get(), priority_set.crossPriorityHostMap().get()); + EXPECT_EQ(nullptr, priority_set.mutableHostMapForTest().get()); + + { + HostVector hosts_added{}; + HostVector hosts_removed{hosts->front()}; + + priority_set.updateHosts(1, + updateHostsParams(hosts, hosts_per_locality, + std::make_shared(*hosts), + hosts_per_locality), + {}, hosts_added, hosts_removed, absl::nullopt); + } + + // New mutable host map will be created and all update will be applied to new mutable host map. + // Read only host map will not be updated before `crossPriorityHostMap` is called. + EXPECT_EQ(host_map.get(), priority_set.constHostMapForTest().get()); + EXPECT_TRUE((priority_set.mutableHostMapForTest().get() != nullptr && + priority_set.mutableHostMapForTest().get() != host_map.get())); + + // Again, mutable host map will be moved to read only host map after `crossPriorityHostMap` is + // called. + host_map = priority_set.mutableHostMapForTest(); + EXPECT_EQ(host_map.get(), priority_set.crossPriorityHostMap().get()); + EXPECT_EQ(nullptr, priority_set.mutableHostMapForTest().get()); +} + class ClusterInfoImplTest : public testing::Test { public: ClusterInfoImplTest() : api_(Api::createApiForTest(stats_, random_)) {} diff --git a/test/extensions/clusters/redis/mocks.h b/test/extensions/clusters/redis/mocks.h index bd2409e241460..1f1531f45be6c 100644 --- a/test/extensions/clusters/redis/mocks.h +++ b/test/extensions/clusters/redis/mocks.h @@ -21,7 +21,7 @@ class MockClusterSlotUpdateCallBack : public ClusterSlotUpdateCallBack { MockClusterSlotUpdateCallBack(); ~MockClusterSlotUpdateCallBack() override = default; - MOCK_METHOD(bool, onClusterSlotUpdate, (ClusterSlotsPtr&&, Upstream::HostMap)); + MOCK_METHOD(bool, onClusterSlotUpdate, (ClusterSlotsPtr&&, Upstream::HostMap&)); MOCK_METHOD(void, onHostHealthUpdate, ()); }; diff --git a/test/integration/stats_integration_test.cc b/test/integration/stats_integration_test.cc index 43ad25d1096a1..ffe0dcbc742c3 100644 --- a/test/integration/stats_integration_test.cc +++ b/test/integration/stats_integration_test.cc @@ -268,6 +268,8 @@ TEST_P(ClusterMemoryTestRunner, MemoryLargeClusterSize) { // 2020/08/11 12202 37061 38500 router: add new retry back-off strategy // 2020/09/11 12973 38993 upstream: predictive preconnect // 2020/10/02 13251 39326 switch to google tcmalloc + // 2021/08/15 17290 40349 add all host map to priority set for fast host + // searching // Note: when adjusting this value: EXPECT_MEMORY_EQ is active only in CI // 'release' builds, where we control the platform and tool-chain. So you @@ -288,7 +290,7 @@ TEST_P(ClusterMemoryTestRunner, MemoryLargeClusterSize) { // https://github.com/envoyproxy/envoy/issues/12209 // EXPECT_MEMORY_EQ(m_per_cluster, 37061); } - EXPECT_MEMORY_LE(m_per_cluster, 40000); // Round up to allow platform variations. + EXPECT_MEMORY_LE(m_per_cluster, 40350); // Round up to allow platform variations. } TEST_P(ClusterMemoryTestRunner, MemoryLargeHostSizeWithStats) { diff --git a/test/mocks/upstream/priority_set.cc b/test/mocks/upstream/priority_set.cc index ac4613ac269bc..de2289ede221d 100644 --- a/test/mocks/upstream/priority_set.cc +++ b/test/mocks/upstream/priority_set.cc @@ -11,6 +11,7 @@ namespace Upstream { using ::testing::_; using ::testing::Invoke; +using ::testing::Return; using ::testing::ReturnRef; MockPrioritySet::MockPrioritySet() { @@ -25,6 +26,7 @@ MockPrioritySet::MockPrioritySet() { .WillByDefault(Invoke([this](PrioritySet::PriorityUpdateCb cb) -> Common::CallbackHandlePtr { return priority_update_cb_helper_.add(cb); })); + ON_CALL(*this, crossPriorityHostMap()).WillByDefault(Return(cross_priority_host_map_)); } MockPrioritySet::~MockPrioritySet() = default; diff --git a/test/mocks/upstream/priority_set.h b/test/mocks/upstream/priority_set.h index 8af92a8e19f86..cda2574854f1f 100644 --- a/test/mocks/upstream/priority_set.h +++ b/test/mocks/upstream/priority_set.h @@ -25,8 +25,10 @@ class MockPrioritySet : public PrioritySet { MOCK_METHOD(void, updateHosts, (uint32_t priority, UpdateHostsParams&& update_hosts_params, LocalityWeightsConstSharedPtr locality_weights, const HostVector& hosts_added, - const HostVector& hosts_removed, absl::optional overprovisioning_factor)); + const HostVector& hosts_removed, absl::optional overprovisioning_factor, + HostMapConstSharedPtr cross_priority_host_map)); MOCK_METHOD(void, batchHostUpdate, (BatchUpdateCb&)); + MOCK_METHOD(HostMapConstSharedPtr, crossPriorityHostMap, (), (const)); MockHostSet* getMockHostSet(uint32_t priority) { getHostSet(priority); // Ensure the host set exists. @@ -38,6 +40,8 @@ class MockPrioritySet : public PrioritySet { Common::CallbackManager member_update_cb_helper_; Common::CallbackManager priority_update_cb_helper_; + + HostMapConstSharedPtr cross_priority_host_map_{std::make_shared()}; }; } // namespace Upstream } // namespace Envoy