Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions source/common/common/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,12 @@ envoy_cc_library(
deps = [":minimal_logger_lib"],
)

envoy_cc_library(
name = "debug_recursion_checker_lib",
hdrs = ["debug_recursion_checker.h"],
deps = [":assert_lib"],
)

envoy_cc_library(
name = "backoff_lib",
srcs = ["backoff_strategy.cc"],
Expand Down
42 changes: 42 additions & 0 deletions source/common/common/debug_recursion_checker.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
#pragma once

#include "common/common/assert.h"

namespace Envoy {
namespace Common {
/**
* A helper class to assert that a call is not recursive.
*/
class DebugRecursionChecker {
public:
void enter() {
ASSERT(!entered_, "A resource should only be entered once");
#if !defined(NDEBUG)
entered_ = true;
#endif // !defined(NDEBUG)
}

void exit() {
#if !defined(NDEBUG)
entered_ = false;
#endif // !defined(NDEBUG)
}

private:
bool entered_ = false;
};

class AutoDebugRecursionChecker {
public:
explicit AutoDebugRecursionChecker(DebugRecursionChecker& checker) : checker_(checker) {
checker.enter();
}

~AutoDebugRecursionChecker() { checker_.exit(); }

private:
DebugRecursionChecker& checker_;
};

} // namespace Common
} // namespace Envoy
19 changes: 19 additions & 0 deletions source/common/upstream/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,31 @@ envoy_cc_library(
"//source/common/protobuf:utility_lib",
"//source/common/router:shadow_writer_lib",
"//source/common/tcp:conn_pool_lib",
"//source/common/upstream:conn_pool_map",
"//source/common/upstream:conn_pool_map_impl_lib",
"//source/common/upstream:upstream_lib",
"@envoy_api//envoy/admin/v2alpha:config_dump_cc",
"@envoy_api//envoy/api/v2/core:base_cc",
],
)

envoy_cc_library(
name = "conn_pool_map",
hdrs = ["conn_pool_map.h"],
deps = [
"//include/envoy/event:dispatcher_interface",
"//source/common/common:debug_recursion_checker_lib",
],
)

envoy_cc_library(
name = "conn_pool_map_impl_lib",
hdrs = ["conn_pool_map_impl.h"],
deps = [
":conn_pool_map",
],
)

envoy_cc_library(
name = "edf_scheduler_lib",
hdrs = ["edf_scheduler.h"],
Expand Down
107 changes: 67 additions & 40 deletions source/common/upstream/cluster_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include "common/router/shadow_writer_impl.h"
#include "common/tcp/conn_pool.h"
#include "common/upstream/cds_api_impl.h"
#include "common/upstream/conn_pool_map_impl.h"
#include "common/upstream/load_balancer_impl.h"
#include "common/upstream/maglev_lb.h"
#include "common/upstream/original_dst_cluster.h"
Expand Down Expand Up @@ -817,9 +818,9 @@ ClusterManagerImpl::ThreadLocalClusterManagerImpl::~ThreadLocalClusterManagerImp
void ClusterManagerImpl::ThreadLocalClusterManagerImpl::drainConnPools(const HostVector& hosts) {
for (const HostSharedPtr& host : hosts) {
{
auto container = host_http_conn_pool_map_.find(host);
if (container != host_http_conn_pool_map_.end()) {
drainConnPools(host, container->second);
auto container = getHttpConnPoolsContainer(host);
if (container != nullptr) {
drainConnPools(host, *container);
}
}
{
Expand All @@ -833,38 +834,49 @@ void ClusterManagerImpl::ThreadLocalClusterManagerImpl::drainConnPools(const Hos

void ClusterManagerImpl::ThreadLocalClusterManagerImpl::drainConnPools(
HostSharedPtr old_host, ConnPoolsContainer& container) {
container.drains_remaining_ += container.pools_.size();

for (const auto& pair : container.pools_) {
pair.second->addDrainedCallback([this, old_host]() -> void {
if (destroying_) {
// It is possible for a connection pool to fire drain callbacks during destruction. Instead
// of checking if old_host actually exists in the map, it's clearer and cleaner to keep
// track of destruction as a separate state and check for it here. This also allows us to
// do this check here versus inside every different connection pool implementation.
return;
}
container.drains_remaining_ += container.pools_->size();

// Make a copy to protect against erasure in the callback.
std::shared_ptr<ConnPoolsContainer::ConnPools> pools = container.pools_;
pools->addDrainedCallback([this, old_host]() -> void {
if (destroying_) {
// It is possible for a connection pool to fire drain callbacks during destruction. Instead
// of checking if old_host actually exists in the map, it's clearer and cleaner to keep
// track of destruction as a separate state and check for it here. This also allows us to
// do this check here versus inside every different connection pool implementation.
return;
}

ConnPoolsContainer& container = host_http_conn_pool_map_[old_host];
ASSERT(container.drains_remaining_ > 0);
container.drains_remaining_--;
if (container.drains_remaining_ == 0) {
for (auto& pair : container.pools_) {
thread_local_dispatcher_.deferredDelete(std::move(pair.second));
}
host_http_conn_pool_map_.erase(old_host);
}
});
ConnPoolsContainer* to_clear = getHttpConnPoolsContainer(old_host);
if (to_clear == nullptr) {
// This could happen if we have cleaned out the host before iterating through every connection
// pool. Handle it by just continuing.
return;
}

// The above addDrainedCallback() drain completion callback might execute immediately. This can
// then effectively nuke 'container', which means we can't continue to loop on its contents
// (we're done here).
if (host_http_conn_pool_map_.count(old_host) == 0) {
break;
ASSERT(to_clear->drains_remaining_ > 0);
to_clear->drains_remaining_--;
if (to_clear->drains_remaining_ == 0 && to_clear->ready_to_drain_) {
clearContainer(old_host, *to_clear);
}
});

// We need to hold off on actually emptying out the container until we have finished processing
// `addDrainedCallback`. If we do not, then it's possible that the container could be erased in
// the middle of its iteration, which leads to undefined behaviour. We handle that case by
// checking here to see if the drains have completed.
container.ready_to_drain_ = true;
if (container.drains_remaining_ == 0) {
clearContainer(old_host, container);
}
}

void ClusterManagerImpl::ThreadLocalClusterManagerImpl::clearContainer(
HostSharedPtr old_host, ConnPoolsContainer& container) {
container.pools_->clear();
host_http_conn_pool_map_.erase(old_host);
}

void ClusterManagerImpl::ThreadLocalClusterManagerImpl::drainTcpConnPools(
HostSharedPtr old_host, TcpConnPoolsContainer& container) {
container.drains_remaining_ += container.pools_.size();
Expand Down Expand Up @@ -946,12 +958,9 @@ void ClusterManagerImpl::ThreadLocalClusterManagerImpl::onHostHealthFailure(
// more targeted.
ThreadLocalClusterManagerImpl& config = tls.getTyped<ThreadLocalClusterManagerImpl>();
{
const auto& container = config.host_http_conn_pool_map_.find(host);
if (container != config.host_http_conn_pool_map_.end()) {
for (const auto& pair : container->second.pools_) {
const Http::ConnectionPool::InstancePtr& pool = pair.second;
pool->drainConnections();
}
const auto container = config.getHttpConnPoolsContainer(host);
if (container != nullptr) {
container->pools_->drainConnections();
}
}
{
Expand Down Expand Up @@ -986,6 +995,21 @@ void ClusterManagerImpl::ThreadLocalClusterManagerImpl::onHostHealthFailure(
}
}

ClusterManagerImpl::ThreadLocalClusterManagerImpl::ConnPoolsContainer*
ClusterManagerImpl::ThreadLocalClusterManagerImpl::getHttpConnPoolsContainer(
const HostConstSharedPtr& host, bool allocate) {
auto container_iter = host_http_conn_pool_map_.find(host);
if (container_iter == host_http_conn_pool_map_.end()) {
if (!allocate) {
return nullptr;
}
ConnPoolsContainer container{thread_local_dispatcher_};
container_iter = host_http_conn_pool_map_.emplace(host, std::move(container)).first;
}

return &container_iter->second;
}

ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::ClusterEntry(
ThreadLocalClusterManagerImpl& parent, ClusterInfoConstSharedPtr cluster,
const LoadBalancerFactorySharedPtr& lb_factory)
Expand Down Expand Up @@ -1092,14 +1116,17 @@ ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::connPool(
}
}

ConnPoolsContainer& container = parent_.host_http_conn_pool_map_[host];
if (!container.pools_[hash_key]) {
container.pools_[hash_key] = parent_.parent_.factory_.allocateConnPool(
ConnPoolsContainer& container = *parent_.getHttpConnPoolsContainer(host, true);

// Note: to simplify this, we assume that the factory is only called in the scope of this
// function. Otherwise, we'd need to capture a few of these variables by value.
Http::ConnectionPool::Instance& pool = container.pools_->getPool(hash_key, [&]() {
return parent_.parent_.factory_.allocateConnPool(
parent_.thread_local_dispatcher_, host, priority, protocol,
have_options ? context->downstreamConnection()->socketOptions() : nullptr);
}
});

return container.pools_[hash_key].get();
return &pool;
}

Tcp::ConnectionPool::Instance*
Expand Down
14 changes: 12 additions & 2 deletions source/common/upstream/cluster_manager_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

#include "common/config/grpc_mux_impl.h"
#include "common/http/async_client_impl.h"
#include "common/upstream/conn_pool_map.h"
#include "common/upstream/load_stats_reporter.h"
#include "common/upstream/upstream_impl.h"

Expand Down Expand Up @@ -231,9 +232,14 @@ class ClusterManagerImpl : public ClusterManager, Logger::Loggable<Logger::Id::u
*/
struct ThreadLocalClusterManagerImpl : public ThreadLocal::ThreadLocalObject {
struct ConnPoolsContainer {
typedef std::map<std::vector<uint8_t>, Http::ConnectionPool::InstancePtr> ConnPools;
ConnPoolsContainer(Event::Dispatcher& dispatcher)
: pools_{std::make_shared<ConnPools>(dispatcher)} {}

ConnPools pools_;
typedef ConnPoolMap<std::vector<uint8_t>, Http::ConnectionPool::Instance> ConnPools;

// This is a shared_ptr so we can keep it alive while cleaning up.
std::shared_ptr<ConnPools> pools_;
bool ready_to_drain_{false};
uint64_t drains_remaining_{};
};

Expand Down Expand Up @@ -307,6 +313,7 @@ class ClusterManagerImpl : public ClusterManager, Logger::Loggable<Logger::Id::u
~ThreadLocalClusterManagerImpl();
void drainConnPools(const HostVector& hosts);
void drainConnPools(HostSharedPtr old_host, ConnPoolsContainer& container);
void clearContainer(HostSharedPtr old_host, ConnPoolsContainer& container);
void drainTcpConnPools(HostSharedPtr old_host, TcpConnPoolsContainer& container);
void removeTcpConn(const HostConstSharedPtr& host, Network::ClientConnection& connection);
static void updateClusterMembership(const std::string& name, uint32_t priority,
Expand All @@ -316,6 +323,9 @@ class ClusterManagerImpl : public ClusterManager, Logger::Loggable<Logger::Id::u
const HostVector& hosts_removed, ThreadLocal::Slot& tls);
static void onHostHealthFailure(const HostSharedPtr& host, ThreadLocal::Slot& tls);

ConnPoolsContainer* getHttpConnPoolsContainer(const HostConstSharedPtr& host,
bool allocate = false);

ClusterManagerImpl& parent_;
Event::Dispatcher& thread_local_dispatcher_;
std::unordered_map<std::string, ClusterEntryPtr> thread_local_clusters_;
Expand Down
61 changes: 61 additions & 0 deletions source/common/upstream/conn_pool_map.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
#pragma once

#include <vector>

#include "envoy/event/dispatcher.h"

#include "common/common/debug_recursion_checker.h"

#include "absl/container/flat_hash_map.h"

namespace Envoy {
namespace Upstream {
/**
* A class mapping keys to connection pools, with some recycling logic built in.
*/
template <typename KEY_TYPE, typename POOL_TYPE> class ConnPoolMap {
public:
using PoolFactory = std::function<std::unique_ptr<POOL_TYPE>()>;
using DrainedCb = std::function<void()>;

ConnPoolMap(Event::Dispatcher& dispatcher);
~ConnPoolMap();
/**
* Returns an existing pool for `key`, or creates a new one using `factory`. Note that it is
* possible for this to fail if a limit on the number of pools allowed is reached.
* @return The pool corresponding to `key`, or `absl::nullopt`.
*/
POOL_TYPE& getPool(KEY_TYPE key, const PoolFactory& factory);

/**
* @return the number of pools.
*/
size_t size() const;

/**
* Destroys all mapped pools.
*/
void clear();

/**
* Adds a drain callback to all mapped pools. Any future mapped pools with have the callback
* automatically added. Be careful with the callback. If it itself calls into `this`, modifying
* the state of `this`, there is a good chance it will cause corruption due to the callback firing
* immediately.
*/
void addDrainedCallback(const DrainedCb& cb);

/**
* Instructs each connection pool to drain its connections.
*/
void drainConnections();

private:
absl::flat_hash_map<KEY_TYPE, std::unique_ptr<POOL_TYPE>> active_pools_;
Event::Dispatcher& thread_local_dispatcher_;
std::vector<DrainedCb> cached_callbacks_;
Common::DebugRecursionChecker recursion_checker_;
};

} // namespace Upstream
} // namespace Envoy
Loading