Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions include/envoy/upstream/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ envoy_cc_library(
"//include/envoy/runtime:runtime_interface",
"//include/envoy/ssl:context_interface",
"//include/envoy/ssl:context_manager_interface",
"//include/envoy/upstream:types_interface",
],
)

Expand Down
5 changes: 5 additions & 0 deletions include/envoy/upstream/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,5 +51,10 @@ struct HealthyAvailability : PriorityAvailability {
using PriorityAvailability::PriorityAvailability;
};

// Phantom type indicating that the type is related to healthy hosts.
struct Healthy {};
// Phantom type indicating that the type is related to degraded hosts.
struct Degraded {};

} // namespace Upstream
} // namespace Envoy
25 changes: 16 additions & 9 deletions include/envoy/upstream/upstream.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "envoy/upstream/locality.h"
#include "envoy/upstream/outlier_detection.h"
#include "envoy/upstream/resource_manager.h"
#include "envoy/upstream/types.h"

#include "absl/types/optional.h"

Expand Down Expand Up @@ -191,10 +192,15 @@ class Host : virtual public HostDescription {
typedef std::shared_ptr<const Host> HostConstSharedPtr;

typedef std::vector<HostSharedPtr> HostVector;
typedef Phantom<HostVector, Healthy> HealthyHostVector;
typedef Phantom<HostVector, Degraded> DegradedHostVector;
typedef std::unordered_map<std::string, Upstream::HostSharedPtr> HostMap;
typedef std::shared_ptr<HostVector> HostVectorSharedPtr;
typedef std::shared_ptr<const HostVector> HostVectorConstSharedPtr;

typedef std::shared_ptr<const HealthyHostVector> HealthyHostVectorConstSharedPtr;
typedef std::shared_ptr<const DegradedHostVector> DegradedHostVectorConstSharedPtr;

typedef std::unique_ptr<HostVector> HostListPtr;
typedef std::unordered_map<envoy::api::v2::core::Locality, uint32_t, LocalityHash, LocalityEqualTo>
LocalityWeightsMap;
Expand All @@ -221,20 +227,21 @@ class HostsPerLocality {
virtual const std::vector<HostVector>& get() const PURE;

/**
* Clone object with a filter predicate.
* @param predicate on Host entries.
* @return HostsPerLocalityConstSharedPtr clone of the HostsPerLocality with only
* hosts according to predicate.
* Clone object with multiple filter predicates. Returns a vector of clones, each with host that
* match the provided predicates.
* @param predicates vector of predicates on Host entries.
* @return vector of HostsPerLocalityConstSharedPtr clones of the HostsPerLocality that match
* hosts according to predicates.
*/
virtual std::shared_ptr<const HostsPerLocality>
filter(std::function<bool(const Host&)> predicate) const PURE;
virtual std::vector<std::shared_ptr<const HostsPerLocality>>
filter(const std::vector<std::function<bool(const Host&)>>& predicates) const PURE;

/**
* Clone object.
* @return HostsPerLocalityConstSharedPtr clone of the HostsPerLocality.
*/
std::shared_ptr<const HostsPerLocality> clone() const {
return filter([](const Host&) { return true; });
return filter({[](const Host&) { return true; }})[0];
}
};

Expand Down Expand Up @@ -366,8 +373,8 @@ class PrioritySet {
*/
struct UpdateHostsParams {
HostVectorConstSharedPtr hosts;
HostVectorConstSharedPtr healthy_hosts;
HostVectorConstSharedPtr degraded_hosts;
HealthyHostVectorConstSharedPtr healthy_hosts;
DegradedHostVectorConstSharedPtr degraded_hosts;
HostsPerLocalityConstSharedPtr hosts_per_locality;
HostsPerLocalityConstSharedPtr healthy_hosts_per_locality;
HostsPerLocalityConstSharedPtr degraded_hosts_per_locality;
Expand Down
6 changes: 4 additions & 2 deletions source/common/upstream/cluster_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -678,8 +678,10 @@ void ClusterManagerImpl::postThreadLocalClusterUpdate(const Cluster& cluster, ui

// TODO(htuch): Can we skip these copies by exporting out const shared_ptr from HostSet?
HostVectorConstSharedPtr hosts_copy(new HostVector(host_set->hosts()));
HostVectorConstSharedPtr healthy_hosts_copy(new HostVector(host_set->healthyHosts()));
HostVectorConstSharedPtr degraded_hosts_copy(new HostVector(host_set->degradedHosts()));
HealthyHostVectorConstSharedPtr healthy_hosts_copy(
new HealthyHostVector(host_set->healthyHosts()));
DegradedHostVectorConstSharedPtr degraded_hosts_copy(
new DegradedHostVector(host_set->degradedHosts()));
HostsPerLocalityConstSharedPtr hosts_per_locality_copy = host_set->hostsPerLocality().clone();
HostsPerLocalityConstSharedPtr healthy_hosts_per_locality_copy =
host_set->healthyHostsPerLocality().clone();
Expand Down
18 changes: 9 additions & 9 deletions source/common/upstream/subset_lb.cc
Original file line number Diff line number Diff line change
Expand Up @@ -535,19 +535,19 @@ void SubsetLoadBalancer::HostSubsetImpl::update(const HostVector& hosts_added,
}
}

auto healthy_hosts = std::make_shared<HostVector>();
healthy_hosts->reserve(original_host_set_.healthyHosts().size());
auto healthy_hosts = std::make_shared<HealthyHostVector>();
healthy_hosts->get().reserve(original_host_set_.healthyHosts().size());
for (const auto& host : original_host_set_.healthyHosts()) {
if (cached_predicate(*host)) {
healthy_hosts->emplace_back(host);
healthy_hosts->get().emplace_back(host);
}
}

auto degraded_hosts = std::make_shared<HostVector>();
degraded_hosts->reserve(original_host_set_.degradedHosts().size());
auto degraded_hosts = std::make_shared<DegradedHostVector>();
degraded_hosts->get().reserve(original_host_set_.degradedHosts().size());
for (const auto& host : original_host_set_.degradedHosts()) {
if (cached_predicate(*host)) {
degraded_hosts->emplace_back(host);
degraded_hosts->get().emplace_back(host);
}
}

Expand All @@ -561,13 +561,13 @@ void SubsetLoadBalancer::HostSubsetImpl::update(const HostVector& hosts_added,
hosts_per_locality = std::make_shared<HostsPerLocalityImpl>(
*hosts, original_host_set_.hostsPerLocality().hasLocalLocality());
} else {
hosts_per_locality = original_host_set_.hostsPerLocality().filter(cached_predicate);
hosts_per_locality = original_host_set_.hostsPerLocality().filter({cached_predicate})[0];
}

HostsPerLocalityConstSharedPtr healthy_hosts_per_locality =
original_host_set_.healthyHostsPerLocality().filter(cached_predicate);
original_host_set_.healthyHostsPerLocality().filter({cached_predicate})[0];
HostsPerLocalityConstSharedPtr degraded_hosts_per_locality =
original_host_set_.degradedHostsPerLocality().filter(cached_predicate);
original_host_set_.degradedHostsPerLocality().filter({cached_predicate})[0];

// We can use the cached predicate here, since we trust that the hosts in hosts_added were also
// present in the list of all hosts.
Expand Down
96 changes: 61 additions & 35 deletions source/common/upstream/upstream_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -274,23 +274,39 @@ HostImpl::createConnection(Event::Dispatcher& dispatcher, const ClusterInfo& clu

void HostImpl::weight(uint32_t new_weight) { weight_ = std::max(1U, std::min(128U, new_weight)); }

HostsPerLocalityConstSharedPtr
HostsPerLocalityImpl::filter(std::function<bool(const Host&)> predicate) const {
auto* filtered_clone = new HostsPerLocalityImpl();
HostsPerLocalityConstSharedPtr shared_filtered_clone{filtered_clone};
std::vector<HostsPerLocalityConstSharedPtr> HostsPerLocalityImpl::filter(
const std::vector<std::function<bool(const Host&)>>& predicates) const {
// We keep two lists: one for being able to mutate the clone and one for returning to the caller.
// Creating them both at the start avoids iterating over the mutable values at the end to convert
// them to a const pointer.
std::vector<std::shared_ptr<HostsPerLocalityImpl>> mutable_clones;
std::vector<HostsPerLocalityConstSharedPtr> filtered_clones;

for (size_t i = 0; i < predicates.size(); ++i) {
mutable_clones.emplace_back(std::make_shared<HostsPerLocalityImpl>());
filtered_clones.emplace_back(mutable_clones.back());
mutable_clones.back()->local_ = local_;
}

filtered_clone->local_ = local_;
for (const auto& hosts_locality : hosts_per_locality_) {
HostVector current_locality_hosts;
std::vector<HostVector> current_locality_hosts;
current_locality_hosts.resize(predicates.size());

// Since # of hosts >> # of predicates, we iterate over the hosts in the outer loop.
for (const auto& host : hosts_locality) {
if (predicate(*host)) {
current_locality_hosts.emplace_back(host);
for (size_t i = 0; i < predicates.size(); ++i) {
if (predicates[i](*host)) {
current_locality_hosts[i].emplace_back(host);
}
}
}
filtered_clone->hosts_per_locality_.push_back(std::move(current_locality_hosts));

for (size_t i = 0; i < predicates.size(); ++i) {
mutable_clones[i]->hosts_per_locality_.push_back(std::move(current_locality_hosts[0]));
}
}

return shared_filtered_clone;
return filtered_clones;
}

void HostSetImpl::updateHosts(PrioritySet::UpdateHostsParams&& update_hosts_params,
Expand All @@ -310,11 +326,11 @@ void HostSetImpl::updateHosts(PrioritySet::UpdateHostsParams&& update_hosts_para
locality_weights_ = std::move(locality_weights);

rebuildLocalityScheduler(healthy_locality_scheduler_, healthy_locality_entries_,
*healthy_hosts_per_locality_, *healthy_hosts_, hosts_per_locality_,
*healthy_hosts_per_locality_, healthy_hosts_->get(), hosts_per_locality_,
locality_weights_, overprovisioning_factor_);
rebuildLocalityScheduler(degraded_locality_scheduler_, degraded_locality_entries_,
*degraded_hosts_per_locality_, *degraded_hosts_, hosts_per_locality_,
locality_weights_, overprovisioning_factor_);
*degraded_hosts_per_locality_, degraded_hosts_->get(),
hosts_per_locality_, locality_weights_, overprovisioning_factor_);

runUpdateCallbacks(hosts_added, hosts_removed);
}
Expand Down Expand Up @@ -387,25 +403,27 @@ PrioritySet::UpdateHostsParams
HostSetImpl::updateHostsParams(HostVectorConstSharedPtr hosts,
HostsPerLocalityConstSharedPtr hosts_per_locality) {
return updateHostsParams(std::move(hosts), std::move(hosts_per_locality),
std::make_shared<const HostVector>(), HostsPerLocalityImpl::empty());
std::make_shared<const HealthyHostVector>(),
HostsPerLocalityImpl::empty());
}

PrioritySet::UpdateHostsParams
HostSetImpl::updateHostsParams(HostVectorConstSharedPtr hosts,
HostsPerLocalityConstSharedPtr hosts_per_locality,
HostVectorConstSharedPtr healthy_hosts,
HealthyHostVectorConstSharedPtr healthy_hosts,
HostsPerLocalityConstSharedPtr healthy_hosts_per_locality) {
return updateHostsParams(std::move(hosts), std::move(hosts_per_locality),
std::move(healthy_hosts), std::move(healthy_hosts_per_locality),
std::make_shared<const HostVector>(), HostsPerLocalityImpl::empty());
std::make_shared<const DegradedHostVector>(),
HostsPerLocalityImpl::empty());
}

PrioritySet::UpdateHostsParams
HostSetImpl::updateHostsParams(HostVectorConstSharedPtr hosts,
HostsPerLocalityConstSharedPtr hosts_per_locality,
HostVectorConstSharedPtr healthy_hosts,
HealthyHostVectorConstSharedPtr healthy_hosts,
HostsPerLocalityConstSharedPtr healthy_hosts_per_locality,
HostVectorConstSharedPtr degraded_hosts,
DegradedHostVectorConstSharedPtr degraded_hosts,
HostsPerLocalityConstSharedPtr degraded_hosts_per_locality) {
return PrioritySet::UpdateHostsParams{std::move(hosts),
std::move(healthy_hosts),
Expand All @@ -418,16 +436,15 @@ HostSetImpl::updateHostsParams(HostVectorConstSharedPtr hosts,
PrioritySet::UpdateHostsParams
HostSetImpl::partitionHosts(HostVectorConstSharedPtr hosts,
HostsPerLocalityConstSharedPtr hosts_per_locality) {
auto healthy_hosts = ClusterImplBase::createHostList(*hosts, Host::Health::Healthy);
auto degraded_hosts = ClusterImplBase::createHostList(*hosts, Host::Health::Degraded);
auto healthy_hosts_per_locality =
ClusterImplBase::createHostLists(*hosts_per_locality, Host::Health::Healthy);
auto degraded_hosts_per_locality =
ClusterImplBase::createHostLists(*hosts_per_locality, Host::Health::Degraded);
auto healthy_and_degraded_hosts = ClusterImplBase::partitionHostList(*hosts);
auto healthy_and_degraded_hosts_per_locality =
ClusterImplBase::partitionHostsPerLocality(*hosts_per_locality);

return updateHostsParams(std::move(hosts), std::move(hosts_per_locality),
std::move(healthy_hosts), std::move(healthy_hosts_per_locality),
std::move(degraded_hosts), std::move(degraded_hosts_per_locality));
std::move(healthy_and_degraded_hosts.first),
std::move(healthy_and_degraded_hosts_per_locality.first),
std::move(healthy_and_degraded_hosts.second),
std::move(healthy_and_degraded_hosts_per_locality.second));
}

double HostSetImpl::effectiveLocalityWeight(uint32_t index,
Expand Down Expand Up @@ -678,21 +695,30 @@ ClusterImplBase::ClusterImplBase(
});
}

HostVectorConstSharedPtr ClusterImplBase::createHostList(const HostVector& hosts,
Host::Health health) {
HostVectorSharedPtr healthy_list(new HostVector());
std::pair<HealthyHostVectorConstSharedPtr, DegradedHostVectorConstSharedPtr>
ClusterImplBase::partitionHostList(const HostVector& hosts) {
auto healthy_list = std::make_shared<HealthyHostVector>();
auto degraded_list = std::make_shared<DegradedHostVector>();

for (const auto& host : hosts) {
if (host->health() == health) {
healthy_list->emplace_back(host);
if (host->health() == Host::Health::Healthy) {
healthy_list->get().emplace_back(host);
}
if (host->health() == Host::Health::Degraded) {
degraded_list->get().emplace_back(host);
}
}

return healthy_list;
return {healthy_list, degraded_list};
}

HostsPerLocalityConstSharedPtr ClusterImplBase::createHostLists(const HostsPerLocality& hosts,
Host::Health health) {
return hosts.filter([&health](const Host& host) { return host.health() == health; });
std::pair<HostsPerLocalityConstSharedPtr, HostsPerLocalityConstSharedPtr>
ClusterImplBase::partitionHostsPerLocality(const HostsPerLocality& hosts) {
auto filtered_clones =
hosts.filter({[](const Host& host) { return host.health() == Host::Health::Healthy; },
[](const Host& host) { return host.health() == Host::Health::Degraded; }});

return {std::move(filtered_clones[0]), std::move(filtered_clones[1])};
}

bool ClusterInfoImpl::maintenanceMode() const {
Expand Down
32 changes: 19 additions & 13 deletions source/common/upstream/upstream_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,8 @@ class HostsPerLocalityImpl : public HostsPerLocality {

bool hasLocalLocality() const override { return local_; }
const std::vector<HostVector>& get() const override { return hosts_per_locality_; }
HostsPerLocalityConstSharedPtr filter(std::function<bool(const Host&)> predicate) const override;
std::vector<HostsPerLocalityConstSharedPtr>
filter(const std::vector<std::function<bool(const Host&)>>& predicate) const override;

// The const shared pointer for the empty HostsPerLocalityImpl.
static HostsPerLocalityConstSharedPtr empty() {
Expand All @@ -279,8 +280,8 @@ class HostSetImpl : public HostSet {
: priority_(priority), overprovisioning_factor_(overprovisioning_factor.has_value()
? overprovisioning_factor.value()
: kDefaultOverProvisioningFactor),
hosts_(new HostVector()), healthy_hosts_(new HostVector()),
degraded_hosts_(new HostVector()) {}
hosts_(new HostVector()), healthy_hosts_(new HealthyHostVector()),
degraded_hosts_(new DegradedHostVector()) {}

/**
* Install a callback that will be invoked when the host set membership changes.
Expand All @@ -293,8 +294,8 @@ class HostSetImpl : public HostSet {

// Upstream::HostSet
const HostVector& hosts() const override { return *hosts_; }
const HostVector& healthyHosts() const override { return *healthy_hosts_; }
const HostVector& degradedHosts() const override { return *degraded_hosts_; }
const HostVector& healthyHosts() const override { return healthy_hosts_->get(); }
const HostVector& degradedHosts() const override { return degraded_hosts_->get(); }
const HostsPerLocality& hostsPerLocality() const override { return *hosts_per_locality_; }
const HostsPerLocality& healthyHostsPerLocality() const override {
return *healthy_hosts_per_locality_;
Expand All @@ -315,14 +316,14 @@ class HostSetImpl : public HostSet {
static PrioritySet::UpdateHostsParams
updateHostsParams(HostVectorConstSharedPtr hosts,
HostsPerLocalityConstSharedPtr hosts_per_locality,
HostVectorConstSharedPtr healthy_hosts,
HealthyHostVectorConstSharedPtr healthy_hosts,
HostsPerLocalityConstSharedPtr healthy_hosts_per_locality);
static PrioritySet::UpdateHostsParams
updateHostsParams(HostVectorConstSharedPtr hosts,
HostsPerLocalityConstSharedPtr hosts_per_locality,
HostVectorConstSharedPtr healthy_hosts,
HealthyHostVectorConstSharedPtr healthy_hosts,
HostsPerLocalityConstSharedPtr healthy_hosts_per_locality,
HostVectorConstSharedPtr degraded_hosts,
DegradedHostVectorConstSharedPtr degraded_hosts,
HostsPerLocalityConstSharedPtr degraded_hosts_per_locality);
static PrioritySet::UpdateHostsParams
partitionHosts(HostVectorConstSharedPtr hosts, HostsPerLocalityConstSharedPtr hosts_per_locality);
Expand All @@ -349,8 +350,8 @@ class HostSetImpl : public HostSet {
uint32_t priority_;
uint32_t overprovisioning_factor_;
HostVectorConstSharedPtr hosts_;
HostVectorConstSharedPtr healthy_hosts_;
HostVectorConstSharedPtr degraded_hosts_;
HealthyHostVectorConstSharedPtr healthy_hosts_;
DegradedHostVectorConstSharedPtr degraded_hosts_;
HostsPerLocalityConstSharedPtr hosts_per_locality_{HostsPerLocalityImpl::empty()};
HostsPerLocalityConstSharedPtr healthy_hosts_per_locality_{HostsPerLocalityImpl::empty()};
HostsPerLocalityConstSharedPtr degraded_hosts_per_locality_{HostsPerLocalityImpl::empty()};
Expand Down Expand Up @@ -636,9 +637,14 @@ class ClusterImplBase : public Cluster, protected Logger::Loggable<Logger::Id::u
const Network::Address::InstanceConstSharedPtr
resolveProtoAddress(const envoy::api::v2::core::Address& address);

static HostVectorConstSharedPtr createHostList(const HostVector& hosts, Host::Health health);
static HostsPerLocalityConstSharedPtr createHostLists(const HostsPerLocality& hosts,
Host::Health);
// Partitions the provided list of hosts into two new lists containing the healthy and degraded
// hosts respectively.
static std::pair<HealthyHostVectorConstSharedPtr, DegradedHostVectorConstSharedPtr>
partitionHostList(const HostVector& hosts);
// Partitions the provided list of hosts per locality into two new lists containing the healthy
// and degraded hosts respectively.
static std::pair<HostsPerLocalityConstSharedPtr, HostsPerLocalityConstSharedPtr>
partitionHostsPerLocality(const HostsPerLocality& hosts);

// Upstream::Cluster
HealthChecker* healthChecker() override { return health_checker_.get(); }
Expand Down
Loading