Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions source/common/upstream/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -355,8 +355,11 @@ envoy_cc_library(
deps = [
":eds_lib",
":health_checker_lib",
# TODO(mattklein123): Move the clusters to extensions so they can be compiled out.
":logical_dns_cluster_lib",
":original_dst_cluster_lib",
":static_cluster_lib",
":strict_dns_cluster_lib",
":upstream_includes",
"//include/envoy/event:dispatcher_interface",
"//include/envoy/event:timer_interface",
Expand All @@ -381,6 +384,26 @@ envoy_cc_library(
],
)

envoy_cc_library(
name = "static_cluster_lib",
srcs = ["static_cluster.cc"],
hdrs = ["static_cluster.h"],
deps = [
":cluster_factory_includes",
":upstream_includes",
],
)

envoy_cc_library(
name = "strict_dns_cluster_lib",
srcs = ["strict_dns_cluster.cc"],
hdrs = ["strict_dns_cluster.h"],
deps = [
":cluster_factory_includes",
":upstream_includes",
],
)

envoy_cc_library(
name = "upstream_includes",
hdrs = ["upstream_impl.h"],
Expand Down
29 changes: 0 additions & 29 deletions source/common/upstream/cluster_factory_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -119,34 +119,5 @@ ClusterSharedPtr ClusterFactoryImplBase::create(const envoy::api::v2::Cluster& c
return new_cluster;
}

ClusterImplBaseSharedPtr StaticClusterFactory::createClusterImpl(
const envoy::api::v2::Cluster& cluster, ClusterFactoryContext& context,
Server::Configuration::TransportSocketFactoryContext& socket_factory_context,
Stats::ScopePtr&& stats_scope) {
return std::make_unique<StaticClusterImpl>(cluster, context.runtime(), socket_factory_context,
std::move(stats_scope), context.addedViaApi());
}

/**
* Static registration for the static cluster factory. @see RegisterFactory.
*/
REGISTER_FACTORY(StaticClusterFactory, ClusterFactory);

ClusterImplBaseSharedPtr StrictDnsClusterFactory::createClusterImpl(
const envoy::api::v2::Cluster& cluster, ClusterFactoryContext& context,
Server::Configuration::TransportSocketFactoryContext& socket_factory_context,
Stats::ScopePtr&& stats_scope) {
auto selected_dns_resolver = selectDnsResolver(cluster, context);

return std::make_unique<StrictDnsClusterImpl>(cluster, context.runtime(), selected_dns_resolver,
socket_factory_context, std::move(stats_scope),
context.addedViaApi());
}

/**
* Static registration for the strict dns cluster factory. @see RegisterFactory.
*/
REGISTER_FACTORY(StrictDnsClusterFactory, ClusterFactory);

} // namespace Upstream
} // namespace Envoy
30 changes: 0 additions & 30 deletions source/common/upstream/cluster_factory_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -144,36 +144,6 @@ class ClusterFactoryImplBase : public ClusterFactory {
const std::string name_;
};

/**
* Factory for StaticClusterImpl cluster.
*/
class StaticClusterFactory : public ClusterFactoryImplBase {
public:
StaticClusterFactory()
: ClusterFactoryImplBase(Extensions::Clusters::ClusterTypes::get().Static) {}

private:
ClusterImplBaseSharedPtr
createClusterImpl(const envoy::api::v2::Cluster& cluster, ClusterFactoryContext& context,
Server::Configuration::TransportSocketFactoryContext& socket_factory_context,
Stats::ScopePtr&& stats_scope) override;
};

/**
* Factory for StrictDnsClusterImpl
*/
class StrictDnsClusterFactory : public ClusterFactoryImplBase {
public:
StrictDnsClusterFactory()
: ClusterFactoryImplBase(Extensions::Clusters::ClusterTypes::get().StrictDns) {}

private:
ClusterImplBaseSharedPtr
createClusterImpl(const envoy::api::v2::Cluster& cluster, ClusterFactoryContext& context,
Server::Configuration::TransportSocketFactoryContext& socket_factory_context,
Stats::ScopePtr&& stats_scope) override;
};

/**
* Common base class for custom cluster factory with custom configuration.
* @param ConfigProto is the configuration protobuf.
Expand Down
67 changes: 67 additions & 0 deletions source/common/upstream/static_cluster.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
#include "common/upstream/static_cluster.h"

namespace Envoy {
namespace Upstream {

StaticClusterImpl::StaticClusterImpl(
const envoy::api::v2::Cluster& cluster, Runtime::Loader& runtime,
Server::Configuration::TransportSocketFactoryContext& factory_context,
Stats::ScopePtr&& stats_scope, bool added_via_api)
: ClusterImplBase(cluster, runtime, factory_context, std::move(stats_scope), added_via_api),
priority_state_manager_(
new PriorityStateManager(*this, factory_context.localInfo(), nullptr)) {
// TODO(dio): Use by-reference when cluster.hosts() is removed.
const envoy::api::v2::ClusterLoadAssignment cluster_load_assignment(
cluster.has_load_assignment() ? cluster.load_assignment()
: Config::Utility::translateClusterHosts(cluster.hosts()));

overprovisioning_factor_ = PROTOBUF_GET_WRAPPED_OR_DEFAULT(
cluster_load_assignment.policy(), overprovisioning_factor, kDefaultOverProvisioningFactor);

for (const auto& locality_lb_endpoint : cluster_load_assignment.endpoints()) {
priority_state_manager_->initializePriorityFor(locality_lb_endpoint);
for (const auto& lb_endpoint : locality_lb_endpoint.lb_endpoints()) {
priority_state_manager_->registerHostForPriority(
"", resolveProtoAddress(lb_endpoint.endpoint().address()), locality_lb_endpoint,
lb_endpoint);
}
}
}

void StaticClusterImpl::startPreInit() {
// At this point see if we have a health checker. If so, mark all the hosts unhealthy and
// then fire update callbacks to start the health checking process.
const auto& health_checker_flag =
health_checker_ != nullptr
? absl::optional<Upstream::Host::HealthFlag>(Host::HealthFlag::FAILED_ACTIVE_HC)
: absl::nullopt;

auto& priority_state = priority_state_manager_->priorityState();
for (size_t i = 0; i < priority_state.size(); ++i) {
if (priority_state[i].first == nullptr) {
priority_state[i].first = std::make_unique<HostVector>();
}
priority_state_manager_->updateClusterPrioritySet(
i, std::move(priority_state[i].first), absl::nullopt, absl::nullopt, health_checker_flag,
overprovisioning_factor_);
}
priority_state_manager_.reset();

onPreInitComplete();
}

ClusterImplBaseSharedPtr StaticClusterFactory::createClusterImpl(
const envoy::api::v2::Cluster& cluster, ClusterFactoryContext& context,
Server::Configuration::TransportSocketFactoryContext& socket_factory_context,
Stats::ScopePtr&& stats_scope) {
return std::make_unique<StaticClusterImpl>(cluster, context.runtime(), socket_factory_context,
std::move(stats_scope), context.addedViaApi());
}

/**
* Static registration for the static cluster factory. @see RegisterFactory.
*/
REGISTER_FACTORY(StaticClusterFactory, ClusterFactory);

} // namespace Upstream
} // namespace Envoy
46 changes: 46 additions & 0 deletions source/common/upstream/static_cluster.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
#pragma once

#include "common/upstream/cluster_factory_impl.h"
#include "common/upstream/upstream_impl.h"

namespace Envoy {
namespace Upstream {

/**
* Implementation of Upstream::Cluster for static clusters (clusters that have a fixed number of
* hosts with resolved IP addresses).
*/
class StaticClusterImpl : public ClusterImplBase {
public:
StaticClusterImpl(const envoy::api::v2::Cluster& cluster, Runtime::Loader& runtime,
Server::Configuration::TransportSocketFactoryContext& factory_context,
Stats::ScopePtr&& stats_scope, bool added_via_api);

// Upstream::Cluster
InitializePhase initializePhase() const override { return InitializePhase::Primary; }

private:
// ClusterImplBase
void startPreInit() override;

PriorityStateManagerPtr priority_state_manager_;
uint32_t overprovisioning_factor_;
};

/**
* Factory for StaticClusterImpl cluster.
*/
class StaticClusterFactory : public ClusterFactoryImplBase {
public:
StaticClusterFactory()
: ClusterFactoryImplBase(Extensions::Clusters::ClusterTypes::get().Static) {}

private:
ClusterImplBaseSharedPtr
createClusterImpl(const envoy::api::v2::Cluster& cluster, ClusterFactoryContext& context,
Server::Configuration::TransportSocketFactoryContext& socket_factory_context,
Stats::ScopePtr&& stats_scope) override;
};

} // namespace Upstream
} // namespace Envoy
154 changes: 154 additions & 0 deletions source/common/upstream/strict_dns_cluster.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
#include "common/upstream/strict_dns_cluster.h"

namespace Envoy {
namespace Upstream {

StrictDnsClusterImpl::StrictDnsClusterImpl(
const envoy::api::v2::Cluster& cluster, Runtime::Loader& runtime,
Network::DnsResolverSharedPtr dns_resolver,
Server::Configuration::TransportSocketFactoryContext& factory_context,
Stats::ScopePtr&& stats_scope, bool added_via_api)
: BaseDynamicClusterImpl(cluster, runtime, factory_context, std::move(stats_scope),
added_via_api),
local_info_(factory_context.localInfo()), dns_resolver_(dns_resolver),
dns_refresh_rate_ms_(
std::chrono::milliseconds(PROTOBUF_GET_MS_OR_DEFAULT(cluster, dns_refresh_rate, 5000))) {
std::list<ResolveTargetPtr> resolve_targets;
const envoy::api::v2::ClusterLoadAssignment load_assignment(
cluster.has_load_assignment() ? cluster.load_assignment()
: Config::Utility::translateClusterHosts(cluster.hosts()));
const auto& locality_lb_endpoints = load_assignment.endpoints();
for (const auto& locality_lb_endpoint : locality_lb_endpoints) {
for (const auto& lb_endpoint : locality_lb_endpoint.lb_endpoints()) {
const auto& socket_address = lb_endpoint.endpoint().address().socket_address();
if (!socket_address.resolver_name().empty()) {
throw EnvoyException("STRICT_DNS clusters must NOT have a custom resolver name set");
}

const std::string& url =
fmt::format("tcp://{}:{}", socket_address.address(), socket_address.port_value());
resolve_targets.emplace_back(new ResolveTarget(*this, factory_context.dispatcher(), url,
locality_lb_endpoint, lb_endpoint));
}
}
resolve_targets_ = std::move(resolve_targets);
dns_lookup_family_ = getDnsLookupFamilyFromCluster(cluster);

overprovisioning_factor_ = PROTOBUF_GET_WRAPPED_OR_DEFAULT(
load_assignment.policy(), overprovisioning_factor, kDefaultOverProvisioningFactor);
}

void StrictDnsClusterImpl::startPreInit() {
for (const ResolveTargetPtr& target : resolve_targets_) {
target->startResolve();
}
}

void StrictDnsClusterImpl::updateAllHosts(const HostVector& hosts_added,
const HostVector& hosts_removed,
uint32_t current_priority) {
PriorityStateManager priority_state_manager(*this, local_info_, nullptr);
// At this point we know that we are different so make a new host list and notify.
//
// TODO(dio): The uniqueness of a host address resolved in STRICT_DNS cluster per priority is not
// guaranteed. Need a clear agreement on the behavior here, whether it is allowable to have
// duplicated hosts inside a priority. And if we want to enforce this behavior, it should be done
// inside the priority state manager.
for (const ResolveTargetPtr& target : resolve_targets_) {
priority_state_manager.initializePriorityFor(target->locality_lb_endpoint_);
for (const HostSharedPtr& host : target->hosts_) {
if (target->locality_lb_endpoint_.priority() == current_priority) {
priority_state_manager.registerHostForPriority(host, target->locality_lb_endpoint_);
}
}
}

// TODO(dio): Add assertion in here.
priority_state_manager.updateClusterPrioritySet(
current_priority, std::move(priority_state_manager.priorityState()[current_priority].first),
hosts_added, hosts_removed, absl::nullopt, overprovisioning_factor_);
}

StrictDnsClusterImpl::ResolveTarget::ResolveTarget(
StrictDnsClusterImpl& parent, Event::Dispatcher& dispatcher, const std::string& url,
const envoy::api::v2::endpoint::LocalityLbEndpoints& locality_lb_endpoint,
const envoy::api::v2::endpoint::LbEndpoint& lb_endpoint)
: parent_(parent), dns_address_(Network::Utility::hostFromTcpUrl(url)),
port_(Network::Utility::portFromTcpUrl(url)),
resolve_timer_(dispatcher.createTimer([this]() -> void { startResolve(); })),
locality_lb_endpoint_(locality_lb_endpoint), lb_endpoint_(lb_endpoint) {}

StrictDnsClusterImpl::ResolveTarget::~ResolveTarget() {
if (active_query_) {
active_query_->cancel();
}
}

void StrictDnsClusterImpl::ResolveTarget::startResolve() {
ENVOY_LOG(trace, "starting async DNS resolution for {}", dns_address_);
parent_.info_->stats().update_attempt_.inc();

active_query_ = parent_.dns_resolver_->resolve(
dns_address_, parent_.dns_lookup_family_,
[this](const std::list<Network::Address::InstanceConstSharedPtr>&& address_list) -> void {
active_query_ = nullptr;
ENVOY_LOG(trace, "async DNS resolution complete for {}", dns_address_);
parent_.info_->stats().update_success_.inc();

std::unordered_map<std::string, HostSharedPtr> updated_hosts;
HostVector new_hosts;
for (const Network::Address::InstanceConstSharedPtr& address : address_list) {
// TODO(mattklein123): Currently the DNS interface does not consider port. We need to
// make a new address that has port in it. We need to both support IPv6 as well as
// potentially move port handling into the DNS interface itself, which would work better
// for SRV.
ASSERT(address != nullptr);
new_hosts.emplace_back(new HostImpl(
parent_.info_, dns_address_, Network::Utility::getAddressWithPort(*address, port_),
lb_endpoint_.metadata(), lb_endpoint_.load_balancing_weight().value(),
locality_lb_endpoint_.locality(), lb_endpoint_.endpoint().health_check_config(),
locality_lb_endpoint_.priority(), lb_endpoint_.health_status()));
}

HostVector hosts_added;
HostVector hosts_removed;
if (parent_.updateDynamicHostList(new_hosts, hosts_, hosts_added, hosts_removed,
updated_hosts, all_hosts_)) {
ENVOY_LOG(debug, "DNS hosts have changed for {}", dns_address_);
ASSERT(std::all_of(hosts_.begin(), hosts_.end(), [&](const auto& host) {
return host->priority() == locality_lb_endpoint_.priority();
}));
parent_.updateAllHosts(hosts_added, hosts_removed, locality_lb_endpoint_.priority());
} else {
parent_.info_->stats().update_no_rebuild_.inc();
}

all_hosts_ = std::move(updated_hosts);

// If there is an initialize callback, fire it now. Note that if the cluster refers to
// multiple DNS names, this will return initialized after a single DNS resolution
// completes. This is not perfect but is easier to code and unclear if the extra
// complexity is needed so will start with this.
parent_.onPreInitComplete();
resolve_timer_->enableTimer(parent_.dns_refresh_rate_ms_);
});
}

ClusterImplBaseSharedPtr StrictDnsClusterFactory::createClusterImpl(
const envoy::api::v2::Cluster& cluster, ClusterFactoryContext& context,
Server::Configuration::TransportSocketFactoryContext& socket_factory_context,
Stats::ScopePtr&& stats_scope) {
auto selected_dns_resolver = selectDnsResolver(cluster, context);

return std::make_unique<StrictDnsClusterImpl>(cluster, context.runtime(), selected_dns_resolver,
socket_factory_context, std::move(stats_scope),
context.addedViaApi());
}

/**
* Static registration for the strict dns cluster factory. @see RegisterFactory.
*/
REGISTER_FACTORY(StrictDnsClusterFactory, ClusterFactory);

} // namespace Upstream
} // namespace Envoy
Loading