Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions api/envoy/api/v2/cluster/circuit_breaker.proto
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ message CircuitBreakers {
// The maximum number of parallel retries that Envoy will allow to the
// upstream cluster. If not specified, the default is 3.
google.protobuf.UInt32Value max_retries = 5;

// The maximum number of connection pools that Envoy will concurrently support at once.
Comment thread
klarose marked this conversation as resolved.
Outdated
// If not specified, the default is unlimited.
google.protobuf.UInt32Value max_connection_pools = 6;
}

// If multiple :ref:`Thresholds<envoy_api_msg_cluster.CircuitBreakers.Thresholds>`
Expand Down
10 changes: 10 additions & 0 deletions include/envoy/upstream/resource_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@ class Resource {
*/
virtual void dec() PURE;

/**
* Decrement the resource count by a specific amount.
*/
virtual void decBy(uint64_t amount) PURE;

/**
* @return the current maximum allowed number of this resource.
*/
Expand Down Expand Up @@ -73,6 +78,11 @@ class ResourceManager {
* @return Resource& active retries.
*/
virtual Resource& retries() PURE;

/**
* @return Resource& active connection pools.
*/
virtual Resource& connectionPools() PURE;
};

} // namespace Upstream
Expand Down
3 changes: 2 additions & 1 deletion include/envoy/upstream/upstream.h
Original file line number Diff line number Diff line change
Expand Up @@ -528,7 +528,8 @@ class PrioritySet {
GAUGE (cx_open) \
GAUGE (rq_pending_open) \
GAUGE (rq_open) \
GAUGE (rq_retry_open)
GAUGE (rq_retry_open) \
GAUGE (cx_pool_open)
// clang-format on

/**
Expand Down
26 changes: 24 additions & 2 deletions source/common/upstream/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,7 @@ envoy_cc_library(
"//source/common/protobuf:utility_lib",
"//source/common/router:shadow_writer_lib",
"//source/common/tcp:conn_pool_lib",
"//source/common/upstream:conn_pool_map",
"//source/common/upstream:conn_pool_map_impl_lib",
"//source/common/upstream:priority_conn_pool_map_impl_lib",
"//source/common/upstream:upstream_lib",
"@envoy_api//envoy/admin/v2alpha:config_dump_cc",
"@envoy_api//envoy/api/v2/core:base_cc",
Expand All @@ -73,6 +72,8 @@ envoy_cc_library(
hdrs = ["conn_pool_map.h"],
deps = [
"//include/envoy/event:dispatcher_interface",
"//include/envoy/upstream:resource_manager_interface",
"//include/envoy/upstream:upstream_interface",
"//source/common/common:debug_recursion_checker_lib",
],
)
Expand All @@ -85,6 +86,27 @@ envoy_cc_library(
],
)

envoy_cc_library(
name = "priority_conn_pool_map",
hdrs = ["priority_conn_pool_map.h"],
deps = [
":conn_pool_map",
"//include/envoy/event:dispatcher_interface",
"//include/envoy/upstream:resource_manager_interface",
"//include/envoy/upstream:upstream_interface",
"//source/common/common:debug_recursion_checker_lib",
],
)

envoy_cc_library(
name = "priority_conn_pool_map_impl_lib",
hdrs = ["priority_conn_pool_map_impl.h"],
deps = [
":conn_pool_map_impl_lib",
":priority_conn_pool_map",
],
)

envoy_cc_library(
name = "edf_scheduler_lib",
hdrs = ["edf_scheduler.h"],
Expand Down
26 changes: 14 additions & 12 deletions source/common/upstream/cluster_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@
#include "common/router/shadow_writer_impl.h"
#include "common/tcp/conn_pool.h"
#include "common/upstream/cds_api_impl.h"
#include "common/upstream/conn_pool_map_impl.h"
#include "common/upstream/load_balancer_impl.h"
#include "common/upstream/maglev_lb.h"
#include "common/upstream/original_dst_cluster.h"
#include "common/upstream/priority_conn_pool_map_impl.h"
#include "common/upstream/ring_hash_lb.h"
#include "common/upstream/subset_lb.h"

Expand Down Expand Up @@ -1043,7 +1043,7 @@ ClusterManagerImpl::ThreadLocalClusterManagerImpl::getHttpConnPoolsContainer(
if (!allocate) {
return nullptr;
}
ConnPoolsContainer container{thread_local_dispatcher_};
ConnPoolsContainer container{thread_local_dispatcher_, host};
container_iter = host_http_conn_pool_map_.emplace(host, std::move(container)).first;
}

Expand Down Expand Up @@ -1153,16 +1153,18 @@ ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::connPool(

// Note: to simplify this, we assume that the factory is only called in the scope of this
// function. Otherwise, we'd need to capture a few of these variables by value.
ConnPoolsContainer::ConnPools::OptPoolRef pool = container.pools_->getPool(hash_key, [&]() {
return parent_.parent_.factory_.allocateConnPool(
parent_.thread_local_dispatcher_, host, priority, protocol,
have_options ? context->downstreamConnection()->socketOptions() : nullptr);
});
// The Connection Pool tracking is a work in progress. We plan for it to eventually have the
// ability to fail, but until we add upper layer handling for failures, it should not. So, assert
// that we don't accidentally add conditions that could allow it to fail.
ASSERT(pool.has_value(), "Pool allocation should never fail");
return &(pool.value().get());
ConnPoolsContainer::ConnPools::OptPoolRef pool =
Comment thread
klarose marked this conversation as resolved.
container.pools_->getPool(priority, hash_key, [&]() {
return parent_.parent_.factory_.allocateConnPool(
parent_.thread_local_dispatcher_, host, priority, protocol,
have_options ? context->downstreamConnection()->socketOptions() : nullptr);
});

if (pool.has_value()) {
return &(pool.value().get());
} else {
return nullptr;
}
}

Tcp::ConnectionPool::Instance*
Expand Down
8 changes: 4 additions & 4 deletions source/common/upstream/cluster_manager_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@

#include "common/config/grpc_mux_impl.h"
#include "common/http/async_client_impl.h"
#include "common/upstream/conn_pool_map.h"
#include "common/upstream/load_stats_reporter.h"
#include "common/upstream/priority_conn_pool_map.h"
#include "common/upstream/upstream_impl.h"

namespace Envoy {
Expand Down Expand Up @@ -236,10 +236,10 @@ class ClusterManagerImpl : public ClusterManager, Logger::Loggable<Logger::Id::u
*/
struct ThreadLocalClusterManagerImpl : public ThreadLocal::ThreadLocalObject {
struct ConnPoolsContainer {
ConnPoolsContainer(Event::Dispatcher& dispatcher)
: pools_{std::make_shared<ConnPools>(dispatcher, absl::nullopt)} {}
ConnPoolsContainer(Event::Dispatcher& dispatcher, const HostConstSharedPtr host)
Comment thread
klarose marked this conversation as resolved.
Outdated
: pools_{std::make_shared<ConnPools>(dispatcher, host)} {}

typedef ConnPoolMap<std::vector<uint8_t>, Http::ConnectionPool::Instance> ConnPools;
typedef PriorityConnPoolMap<std::vector<uint8_t>, Http::ConnectionPool::Instance> ConnPools;

// This is a shared_ptr so we can keep it alive while cleaning up.
std::shared_ptr<ConnPools> pools_;
Expand Down
12 changes: 10 additions & 2 deletions source/common/upstream/conn_pool_map.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
#include <vector>

#include "envoy/event/dispatcher.h"
#include "envoy/upstream/resource_manager.h"
#include "envoy/upstream/upstream.h"

#include "common/common/debug_recursion_checker.h"

Expand All @@ -21,7 +23,7 @@ template <typename KEY_TYPE, typename POOL_TYPE> class ConnPoolMap {
using DrainedCb = std::function<void()>;
using OptPoolRef = absl::optional<std::reference_wrapper<POOL_TYPE>>;

ConnPoolMap(Event::Dispatcher& dispatcher, absl::optional<uint64_t> max_size);
ConnPoolMap(Event::Dispatcher& dispatcher, HostConstSharedPtr host, ResourcePriority priority);
Comment thread
klarose marked this conversation as resolved.
Outdated
~ConnPoolMap();
/**
* Returns an existing pool for `key`, or creates a new one using `factory`. Note that it is
Expand Down Expand Up @@ -60,11 +62,17 @@ template <typename KEY_TYPE, typename POOL_TYPE> class ConnPoolMap {
*/
bool freeOnePool();

/**
* Cleans up the active_pools_ map and updates resource tracking
Comment thread
klarose marked this conversation as resolved.
**/
void clearActivePools();

absl::flat_hash_map<KEY_TYPE, std::unique_ptr<POOL_TYPE>> active_pools_;
Event::Dispatcher& thread_local_dispatcher_;
std::vector<DrainedCb> cached_callbacks_;
Common::DebugRecursionChecker recursion_checker_;
const absl::optional<uint64_t> max_size_;
const HostConstSharedPtr host_;
const ResourcePriority priority_;
};

} // namespace Upstream
Expand Down
25 changes: 17 additions & 8 deletions source/common/upstream/conn_pool_map_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,14 @@ namespace Upstream {

template <typename KEY_TYPE, typename POOL_TYPE>
ConnPoolMap<KEY_TYPE, POOL_TYPE>::ConnPoolMap(Envoy::Event::Dispatcher& dispatcher,
absl::optional<uint64_t> max_size)
: thread_local_dispatcher_(dispatcher), max_size_(max_size) {}
HostConstSharedPtr host, ResourcePriority priority)
: thread_local_dispatcher_(dispatcher), host_(host), priority_(priority) {}

template <typename KEY_TYPE, typename POOL_TYPE>
ConnPoolMap<KEY_TYPE, POOL_TYPE>::~ConnPoolMap() = default;
template <typename KEY_TYPE, typename POOL_TYPE> ConnPoolMap<KEY_TYPE, POOL_TYPE>::~ConnPoolMap() {
// Explicitly clear things out for resource tracking purposes. Note that we call this rather than
Comment thread
klarose marked this conversation as resolved.
Outdated
// clear because in this case we don't want to do a deferred delete.
clearActivePools();
}

template <typename KEY_TYPE, typename POOL_TYPE>
typename ConnPoolMap<KEY_TYPE, POOL_TYPE>::OptPoolRef
Expand All @@ -26,13 +29,12 @@ ConnPoolMap<KEY_TYPE, POOL_TYPE>::getPool(KEY_TYPE key, const PoolFactory& facto
}

// We need a new pool. Check if we have room.
if (max_size_.has_value() && size() >= max_size_.value()) {
if (!host_->cluster().resourceManager(priority_).connectionPools().canCreate()) {
// We're full. Try to free up a pool. If we can't, bail out.
if (!freeOnePool()) {
return absl::nullopt;
}

ASSERT(size() < max_size_.value(), "Freeing a pool should reduce the size to below the max.");
Comment thread
klarose marked this conversation as resolved.
// TODO(klarose): Consider some simple hysteresis here. How can we prevent iterating over all
// pools when we're at the limit every time we want to allocate a new one, even if most of the
// pools are not busy, while balancing that with not unnecessarily freeing all pools? If we
Expand All @@ -42,6 +44,7 @@ ConnPoolMap<KEY_TYPE, POOL_TYPE>::getPool(KEY_TYPE key, const PoolFactory& facto

// We have room for a new pool. Allocate one and let it know about any cached callbacks.
auto new_pool = factory();
host_->cluster().resourceManager(priority_).connectionPools().inc();
for (const auto& cb : cached_callbacks_) {
new_pool->addDrainedCallback(cb);
}
Expand All @@ -60,8 +63,7 @@ template <typename KEY_TYPE, typename POOL_TYPE> void ConnPoolMap<KEY_TYPE, POOL
for (auto& pool_pair : active_pools_) {
thread_local_dispatcher_.deferredDelete(std::move(pool_pair.second));
}

active_pools_.clear();
clearActivePools();
}

template <typename KEY_TYPE, typename POOL_TYPE>
Expand Down Expand Up @@ -95,12 +97,19 @@ bool ConnPoolMap<KEY_TYPE, POOL_TYPE>::freeOnePool() {

if (pool_iter != active_pools_.end()) {
// We found one. Free it up, and let the caller know.
thread_local_dispatcher_.deferredDelete(std::move(pool_iter->second));
Comment thread
klarose marked this conversation as resolved.
Outdated
active_pools_.erase(pool_iter);
host_->cluster().resourceManager(priority_).connectionPools().dec();
return true;
}

return false;
}

template <typename KEY_TYPE, typename POOL_TYPE>
void ConnPoolMap<KEY_TYPE, POOL_TYPE>::clearActivePools() {
host_->cluster().resourceManager(priority_).connectionPools().decBy(active_pools_.size());
active_pools_.clear();
}
} // namespace Upstream
} // namespace Envoy
59 changes: 59 additions & 0 deletions source/common/upstream/priority_conn_pool_map.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
#pragma once

#include "envoy/event/dispatcher.h"
#include "envoy/upstream/resource_manager.h"
#include "envoy/upstream/upstream.h"

#include "common/upstream/conn_pool_map.h"

namespace Envoy {
namespace Upstream {
/**
* A class mapping keys to connection pools, with some recycling logic built in.
*/
template <typename KEY_TYPE, typename POOL_TYPE> class PriorityConnPoolMap {
public:
using ConnPoolMapType = ConnPoolMap<KEY_TYPE, POOL_TYPE>;
using PoolFactory = typename ConnPoolMapType::PoolFactory;
using DrainedCb = typename ConnPoolMapType::DrainedCb;
using OptPoolRef = typename ConnPoolMapType::OptPoolRef;

PriorityConnPoolMap(Event::Dispatcher& dispatcher, const HostConstSharedPtr host);
~PriorityConnPoolMap();
/**
* Returns an existing pool for the given priority and `key`, or creates a new one using
* `factory`. Note that it is possible for this to fail if a limit on the number of pools allowed
* is reached.
* @return The pool corresponding to `key`, or `absl::nullopt`.
*/
OptPoolRef getPool(ResourcePriority priority, KEY_TYPE key, const PoolFactory& factory);

/**
* @return the number of pools across all priorities.
*/
size_t size() const;

/**
* Destroys all mapped pools.
*/
void clear();

/**
* Adds a drain callback to all mapped pools. Any future mapped pools with have the callback
* automatically added. Be careful with the callback. If it itself calls into `this`, modifying
* the state of `this`, there is a good chance it will cause corruption due to the callback firing
* immediately.
*/
void addDrainedCallback(const DrainedCb& cb);

/**
* Instructs each connection pool to drain its connections.
*/
void drainConnections();

private:
std::array<std::unique_ptr<ConnPoolMapType>, NumResourcePriorities> conn_pool_maps_;
};

} // namespace Upstream
} // namespace Envoy
61 changes: 61 additions & 0 deletions source/common/upstream/priority_conn_pool_map_impl.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
#pragma once

#include "common/upstream/conn_pool_map_impl.h"
#include "common/upstream/priority_conn_pool_map.h"

namespace Envoy {
namespace Upstream {

template <typename KEY_TYPE, typename POOL_TYPE>
PriorityConnPoolMap<KEY_TYPE, POOL_TYPE>::PriorityConnPoolMap(Envoy::Event::Dispatcher& dispatcher,
const HostConstSharedPtr host) {
Comment thread
klarose marked this conversation as resolved.
Outdated
for (size_t pool_map_index = 0; pool_map_index < NumResourcePriorities; ++pool_map_index) {
ResourcePriority priority = static_cast<ResourcePriority>(pool_map_index);
conn_pool_maps_[pool_map_index].reset(new ConnPoolMapType(dispatcher, host, priority));
}
}

template <typename KEY_TYPE, typename POOL_TYPE>
PriorityConnPoolMap<KEY_TYPE, POOL_TYPE>::~PriorityConnPoolMap() = default;

template <typename KEY_TYPE, typename POOL_TYPE>
typename PriorityConnPoolMap<KEY_TYPE, POOL_TYPE>::OptPoolRef
PriorityConnPoolMap<KEY_TYPE, POOL_TYPE>::getPool(ResourcePriority priority, KEY_TYPE key,
const PoolFactory& factory) {
size_t index = static_cast<size_t>(priority);
ASSERT(index < conn_pool_maps_.size());
return conn_pool_maps_[index]->getPool(key, factory);
}

template <typename KEY_TYPE, typename POOL_TYPE>
size_t PriorityConnPoolMap<KEY_TYPE, POOL_TYPE>::size() const {
size_t size = 0;
for (const auto& pool_map : conn_pool_maps_) {
size += pool_map->size();
}
return size;
}

template <typename KEY_TYPE, typename POOL_TYPE>
void PriorityConnPoolMap<KEY_TYPE, POOL_TYPE>::clear() {
for (auto& pool_map : conn_pool_maps_) {
pool_map->clear();
}
}

template <typename KEY_TYPE, typename POOL_TYPE>
void PriorityConnPoolMap<KEY_TYPE, POOL_TYPE>::addDrainedCallback(const DrainedCb& cb) {
for (auto& pool_map : conn_pool_maps_) {
pool_map->addDrainedCallback(cb);
}
}

template <typename KEY_TYPE, typename POOL_TYPE>
void PriorityConnPoolMap<KEY_TYPE, POOL_TYPE>::drainConnections() {
for (auto& pool_map : conn_pool_maps_) {
pool_map->drainConnections();
}
}

} // namespace Upstream
} // namespace Envoy
Loading