Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 18 additions & 13 deletions envoy/common/conn_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,21 @@ class Cancellable {
virtual void cancel(CancelPolicy cancel_policy) PURE;
};

/**
* Controls the behavior when draining a connection pool.
*/
enum class DrainBehavior {
// Starts draining a pool, by gracefully completing all requests and gracefully closing all

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add that it is invalid to create new requests/streams/connections on this pool after this call.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea. Done.

// connections, in preparation for deletion. It is invalid to create new streams or
// connections from this pool after draining a pool with this behavior.
DrainAndDelete,
// Actively drain all existing connection pool connections. This can be used in cases where
// the connection pool is not being destroyed, but the caller wishes to make sure that
// all new streams take place on a new connection. For example, when a health check failure
// occurs.
DrainExistingConnections,
};

/**
* An instance of a generic connection pool.
*/
Expand All @@ -59,20 +74,10 @@ class Instance {
virtual bool isIdle() const PURE;

/**
* Starts draining a pool, by gracefully completing all requests and gracefully closing all
* connections, in preparation for deletion. When the process completes, the function registered
* via `addIdleCallback()` is called. The callback may occur before this call returns if the pool
* can be immediately drained.
*/
virtual void startDrain() PURE;

/**
* Actively drain all existing connection pool connections. This method can be used in cases
* where the connection pool is not being destroyed, but the caller wishes to make sure that
* all new streams take place on a new connection. For example, when a health check failure
* occurs.
* Drains the connections in a pool.
* @param drain_behavior A DrainBehavior that controls the behavior of the draining.
*/
virtual void drainConnections() PURE;
virtual void drainConnections(DrainBehavior drain_behavior) PURE;

/**
* @return Upstream::HostDescriptionConstSharedPtr the host for which connections are pooled.
Expand Down
12 changes: 6 additions & 6 deletions source/common/conn_pool/conn_pool_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -331,11 +331,6 @@ void ConnPoolImplBase::transitionActiveClientState(ActiveClient& client,

void ConnPoolImplBase::addIdleCallbackImpl(Instance::IdleCb cb) { idle_callbacks_.push_back(cb); }

void ConnPoolImplBase::startDrainImpl() {
is_draining_ = true;
checkForIdleAndCloseIdleConnsIfDraining();
}

void ConnPoolImplBase::closeIdleConnectionsForDrainingPool() {
// Create a separate list of elements to close to avoid mutate-while-iterating problems.
std::list<ActiveClient*> to_close;
Expand All @@ -359,7 +354,12 @@ void ConnPoolImplBase::closeIdleConnectionsForDrainingPool() {
}
}

void ConnPoolImplBase::drainConnectionsImpl() {
void ConnPoolImplBase::drainConnectionsImpl(DrainBehavior drain_behavior) {
if (drain_behavior == Envoy::ConnectionPool::DrainBehavior::DrainAndDelete) {
is_draining_ = true;

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought we wanted to add more asserts that no new streams are created if is_draining_ is true? Am I missing that?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yes, good point! I've added an ASSERT in ConnPoolImplBase::newStreamImpl. It did make me wonder if we should rename is_draining_ to is_draining_for_deletion_ to match the name of the enum (and sine it's not set on all drain behaviors, just one of them).

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure renaming SGTM (or just store the enum itself somehow)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed. I notice this matches the description in the .h file nicely now:

  // Whether the connection pool is currently in the process of closing                                                                                                                                                     
  // all connections so that it can be gracefully deleted.                                                                                                                                                                  

It's like we planned it or something :)

checkForIdleAndCloseIdleConnsIfDraining();
return;
}
closeIdleConnectionsForDrainingPool();

// closeIdleConnections() closes all connections in ready_clients_ with no active streams,
Expand Down
3 changes: 1 addition & 2 deletions source/common/conn_pool/conn_pool_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -166,8 +166,7 @@ class ConnPoolImplBase : protected Logger::Loggable<Logger::Id::pool> {
void addIdleCallbackImpl(Instance::IdleCb cb);
// Returns true if the pool is idle.
bool isIdleImpl() const;
void startDrainImpl();
void drainConnectionsImpl();
void drainConnectionsImpl(DrainBehavior drain_behavior);
const Upstream::HostConstSharedPtr& host() const { return host_; }
// Called if this pool is likely to be picked soon, to determine if it's worth preconnecting.
bool maybePreconnectImpl(float global_preconnect_ratio);
Expand Down
5 changes: 3 additions & 2 deletions source/common/http/conn_pool_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,9 @@ class HttpConnPoolImplBase : public Envoy::ConnectionPool::ConnPoolImplBase,
// ConnectionPool::Instance
void addIdleCallback(IdleCb cb) override { addIdleCallbackImpl(cb); }
bool isIdle() const override { return isIdleImpl(); }
void startDrain() override { startDrainImpl(); }
void drainConnections() override { drainConnectionsImpl(); }
void drainConnections(Envoy::ConnectionPool::DrainBehavior drain_behavior) override {
drainConnectionsImpl(drain_behavior);
}
Upstream::HostDescriptionConstSharedPtr host() const override { return host_; }
ConnectionPool::Cancellable* newStream(Http::ResponseDecoder& response_decoder,
Http::ConnectionPool::Callbacks& callbacks) override;
Expand Down
16 changes: 6 additions & 10 deletions source/common/http/conn_pool_grid.cc
Original file line number Diff line number Diff line change
Expand Up @@ -296,24 +296,20 @@ void ConnectivityGrid::addIdleCallback(IdleCb cb) {
idle_callbacks_.emplace_back(cb);
}

void ConnectivityGrid::startDrain() {
void ConnectivityGrid::drainConnections(Envoy::ConnectionPool::DrainBehavior drain_behavior) {
if (draining_) {
// A drain callback has already been set, and only needs to happen once.
return;
}

// Note that no new pools can be created from this point on
// as createNextPool fast-fails if `draining_` is true.
draining_ = true;

for (auto& pool : pools_) {
pool->startDrain();
if (drain_behavior == Envoy::ConnectionPool::DrainBehavior::DrainAndDelete) {
// Note that no new pools can be created from this point on
// as createNextPool fast-fails if `draining_` is true.
draining_ = true;
}
}

void ConnectivityGrid::drainConnections() {
for (auto& pool : pools_) {
pool->drainConnections();
pool->drainConnections(drain_behavior);
}
}

Expand Down
3 changes: 1 addition & 2 deletions source/common/http/conn_pool_grid.h
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,7 @@ class ConnectivityGrid : public ConnectionPool::Instance,
ConnectionPool::Callbacks& callbacks) override;
void addIdleCallback(IdleCb cb) override;
bool isIdle() const override;
void startDrain() override;
void drainConnections() override;
void drainConnections(Envoy::ConnectionPool::DrainBehavior drain_behavior) override;
Upstream::HostDescriptionConstSharedPtr host() const override;
bool maybePreconnect(float preconnect_ratio) override;
absl::string_view protocolDescription() const override { return "connection grid"; }
Expand Down
8 changes: 5 additions & 3 deletions source/common/tcp/conn_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -150,9 +150,11 @@ class ConnPoolImpl : public Envoy::ConnectionPool::ConnPoolImplBase,

void addIdleCallback(IdleCb cb) override { addIdleCallbackImpl(cb); }
bool isIdle() const override { return isIdleImpl(); }
void startDrain() override { startDrainImpl(); }
void drainConnections() override {
drainConnectionsImpl();
void drainConnections(Envoy::ConnectionPool::DrainBehavior drain_behavior) override {
drainConnectionsImpl(drain_behavior);
if (drain_behavior == Envoy::ConnectionPool::DrainBehavior::DrainAndDelete) {
return;
}
// Legacy behavior for the TCP connection pool marks all connecting clients
// as draining.
for (auto& connecting_client : connecting_clients_) {
Expand Down
13 changes: 7 additions & 6 deletions source/common/tcp/original_conn_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,13 @@ OriginalConnPoolImpl::~OriginalConnPoolImpl() {
dispatcher_.clearDeferredDeleteList();
}

void OriginalConnPoolImpl::drainConnections() {
void OriginalConnPoolImpl::drainConnections(Envoy::ConnectionPool::DrainBehavior drain_behavior) {
if (drain_behavior == Envoy::ConnectionPool::DrainBehavior::DrainAndDelete) {
is_draining_ = true;
checkForIdleAndCloseIdleConnsIfDraining();
return;
}

ENVOY_LOG(debug, "draining connections");
while (!ready_conns_.empty()) {
ready_conns_.front()->conn_->close(Network::ConnectionCloseType::NoFlush);
Expand Down Expand Up @@ -70,11 +76,6 @@ void OriginalConnPoolImpl::closeConnections() {

void OriginalConnPoolImpl::addIdleCallback(IdleCb cb) { idle_callbacks_.push_back(cb); }

void OriginalConnPoolImpl::startDrain() {
is_draining_ = true;
checkForIdleAndCloseIdleConnsIfDraining();
}

void OriginalConnPoolImpl::assignConnection(ActiveConn& conn,
ConnectionPool::Callbacks& callbacks) {
ASSERT(conn.wrapper_ == nullptr);
Expand Down
3 changes: 1 addition & 2 deletions source/common/tcp/original_conn_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,7 @@ class OriginalConnPoolImpl : Logger::Loggable<Logger::Id::pool>, public Connecti
// ConnectionPool::Instance
void addIdleCallback(IdleCb cb) override;
bool isIdle() const override;
void startDrain() override;
void drainConnections() override;
void drainConnections(Envoy::ConnectionPool::DrainBehavior drain_behavior) override;
void closeConnections() override;
ConnectionPool::Cancellable* newConnection(ConnectionPool::Callbacks& callbacks) override;
// The old pool does not implement preconnecting.
Expand Down
9 changes: 5 additions & 4 deletions source/common/upstream/cluster_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1197,7 +1197,7 @@ void ClusterManagerImpl::ThreadLocalClusterManagerImpl::drainConnPools(
// guarding deletion with `do_not_delete_` in the registered idle callback, and then checking
// afterwards whether it is empty and deleting it if necessary.
container.do_not_delete_ = true;
pools->startDrain();
pools->drainConnections(Envoy::ConnectionPool::DrainBehavior::DrainAndDelete);
container.do_not_delete_ = false;

if (container.pools_->size() == 0) {
Expand All @@ -1217,7 +1217,7 @@ void ClusterManagerImpl::ThreadLocalClusterManagerImpl::drainTcpConnPools(

container.draining_ = true;
for (auto pool : pools) {
pool->startDrain();
pool->drainConnections(Envoy::ConnectionPool::DrainBehavior::DrainAndDelete);
}
}

Expand Down Expand Up @@ -1389,7 +1389,8 @@ void ClusterManagerImpl::ThreadLocalClusterManagerImpl::drainAllConnPoolsWorker(
const auto container = getHttpConnPoolsContainer(host);
if (container != nullptr) {
container->do_not_delete_ = true;
container->pools_->drainConnections();
container->pools_->drainConnections(
Envoy::ConnectionPool::DrainBehavior::DrainExistingConnections);
container->do_not_delete_ = false;

if (container->pools_->size() == 0) {
Expand Down Expand Up @@ -1417,7 +1418,7 @@ void ClusterManagerImpl::ThreadLocalClusterManagerImpl::drainAllConnPoolsWorker(
ClusterInfo::Features::CLOSE_CONNECTIONS_ON_HOST_HEALTH_FAILURE) {
pool->closeConnections();
} else {
pool->drainConnections();
pool->drainConnections(Envoy::ConnectionPool::DrainBehavior::DrainExistingConnections);
}
}
}
Expand Down
7 changes: 1 addition & 6 deletions source/common/upstream/conn_pool_map.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,15 +58,10 @@ template <typename KEY_TYPE, typename POOL_TYPE> class ConnPoolMap {
*/
void addIdleCallback(const IdleCb& cb);

/**
* See `Envoy::ConnectionPool::Instance::startDrain()`.
*/
void startDrain();

/**
* See `Envoy::ConnectionPool::Instance::drainConnections()`.
*/
void drainConnections();
void drainConnections(ConnectionPool::DrainBehavior drain_behavior);

private:
/**
Expand Down
20 changes: 3 additions & 17 deletions source/common/upstream/conn_pool_map_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,8 @@ void ConnPoolMap<KEY_TYPE, POOL_TYPE>::addIdleCallback(const IdleCb& cb) {
}

template <typename KEY_TYPE, typename POOL_TYPE>
void ConnPoolMap<KEY_TYPE, POOL_TYPE>::startDrain() {
void ConnPoolMap<KEY_TYPE, POOL_TYPE>::drainConnections(
ConnectionPool::DrainBehavior drain_behavior) {
// Copy the `active_pools_` so that it is safe for the call to result
// in deletion, and avoid iteration through a mutating container.
std::vector<POOL_TYPE*> pools;
Expand All @@ -109,22 +110,7 @@ void ConnPoolMap<KEY_TYPE, POOL_TYPE>::startDrain() {
}

for (auto* pool : pools) {
pool->startDrain();
}
}

template <typename KEY_TYPE, typename POOL_TYPE>
void ConnPoolMap<KEY_TYPE, POOL_TYPE>::drainConnections() {
// Copy the `active_pools_` so that it is safe for the call to result
// in deletion, and avoid iteration through a mutating container.
std::vector<POOL_TYPE*> pools;
pools.reserve(active_pools_.size());
for (auto& pool_pair : active_pools_) {
pools.push_back(pool_pair.second.get());
}

for (auto* pool : pools) {
pool->drainConnections();
pool->drainConnections(drain_behavior);
}
}

Expand Down
7 changes: 1 addition & 6 deletions source/common/upstream/priority_conn_pool_map.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,15 +51,10 @@ template <typename KEY_TYPE, typename POOL_TYPE> class PriorityConnPoolMap {
*/
void addIdleCallback(const IdleCb& cb);

/**
* See `Envoy::ConnectionPool::Instance::startDrain()`.
*/
void startDrain();

/**
* See `Envoy::ConnectionPool::Instance::drainConnections()`.
*/
void drainConnections();
void drainConnections(ConnectionPool::DrainBehavior drain_behavior);

private:
size_t getPriorityIndex(ResourcePriority priority) const;
Expand Down
12 changes: 3 additions & 9 deletions source/common/upstream/priority_conn_pool_map_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,16 +55,10 @@ void PriorityConnPoolMap<KEY_TYPE, POOL_TYPE>::addIdleCallback(const IdleCb& cb)
}

template <typename KEY_TYPE, typename POOL_TYPE>
void PriorityConnPoolMap<KEY_TYPE, POOL_TYPE>::startDrain() {
void PriorityConnPoolMap<KEY_TYPE, POOL_TYPE>::drainConnections(
ConnectionPool::DrainBehavior drain_behavior) {
for (auto& pool_map : conn_pool_maps_) {
pool_map->startDrain();
}
}

template <typename KEY_TYPE, typename POOL_TYPE>
void PriorityConnPoolMap<KEY_TYPE, POOL_TYPE>::drainConnections() {
for (auto& pool_map : conn_pool_maps_) {
pool_map->drainConnections();
pool_map->drainConnections(drain_behavior);
}
}

Expand Down
4 changes: 2 additions & 2 deletions test/common/conn_pool/conn_pool_base_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ TEST_F(ConnPoolImplBaseTest, PoolIdleCallbackTriggeredRemoteClose) {
clients_.back()->onEvent(Network::ConnectionEvent::RemoteClose);

EXPECT_CALL(idle_pool_callback, Call());
pool_.startDrainImpl();
pool_.drainConnectionsImpl(Envoy::ConnectionPool::DrainBehavior::DrainAndDelete);
}

// Local close simulates what would happen for an idle timeout on a connection.
Expand Down Expand Up @@ -255,7 +255,7 @@ TEST_F(ConnPoolImplBaseTest, PoolIdleCallbackTriggeredLocalClose) {
clients_.back()->onEvent(Network::ConnectionEvent::LocalClose);

EXPECT_CALL(idle_pool_callback, Call());
pool_.startDrainImpl();
pool_.drainConnectionsImpl(Envoy::ConnectionPool::DrainBehavior::DrainAndDelete);
}

} // namespace ConnectionPool
Expand Down
35 changes: 22 additions & 13 deletions test/common/http/conn_pool_grid_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -382,20 +382,23 @@ TEST_F(ConnectivityGridTest, TestCancel) {

// Make sure drains get sent to all active pools.
TEST_F(ConnectivityGridTest, Drain) {
grid_.drainConnections();
grid_.drainConnections(Envoy::ConnectionPool::DrainBehavior::DrainExistingConnections);

// Synthetically create a pool.
grid_.createNextPool();
{
EXPECT_CALL(*grid_.first(), drainConnections());
grid_.drainConnections();
EXPECT_CALL(*grid_.first(),
drainConnections(Envoy::ConnectionPool::DrainBehavior::DrainExistingConnections));
grid_.drainConnections(Envoy::ConnectionPool::DrainBehavior::DrainExistingConnections);
}

grid_.createNextPool();
{
EXPECT_CALL(*grid_.first(), drainConnections());
EXPECT_CALL(*grid_.second(), drainConnections());
grid_.drainConnections();
EXPECT_CALL(*grid_.first(),
drainConnections(Envoy::ConnectionPool::DrainBehavior::DrainExistingConnections));
EXPECT_CALL(*grid_.second(),
drainConnections(Envoy::ConnectionPool::DrainBehavior::DrainExistingConnections));
grid_.drainConnections(Envoy::ConnectionPool::DrainBehavior::DrainExistingConnections);
}
}

Expand All @@ -411,16 +414,22 @@ TEST_F(ConnectivityGridTest, DrainCallbacks) {

// The first time a drain is started, both pools should start draining.
{
EXPECT_CALL(*grid_.first(), startDrain());
EXPECT_CALL(*grid_.second(), startDrain());
grid_.startDrain();
EXPECT_CALL(*grid_.first(),
drainConnections(Envoy::ConnectionPool::DrainBehavior::DrainAndDelete));
EXPECT_CALL(*grid_.second(),
drainConnections(Envoy::ConnectionPool::DrainBehavior::DrainAndDelete));
grid_.drainConnections(Envoy::ConnectionPool::DrainBehavior::DrainAndDelete);
}

// The second time, the pools will not see any change.
{
EXPECT_CALL(*grid_.first(), startDrain()).Times(0);
EXPECT_CALL(*grid_.second(), startDrain()).Times(0);
grid_.startDrain();
EXPECT_CALL(*grid_.first(),
drainConnections(Envoy::ConnectionPool::DrainBehavior::DrainAndDelete))
.Times(0);
EXPECT_CALL(*grid_.second(),
drainConnections(Envoy::ConnectionPool::DrainBehavior::DrainAndDelete))
.Times(0);
grid_.drainConnections(Envoy::ConnectionPool::DrainBehavior::DrainAndDelete);
}
{
// Notify the grid the second pool has been drained. This should not be
Expand Down Expand Up @@ -481,7 +490,7 @@ TEST_F(ConnectivityGridTest, NoDrainOnTeardown) {

{
grid_.addIdleCallback([&drain_received]() -> void { drain_received = true; });
grid_.startDrain();
grid_.drainConnections(Envoy::ConnectionPool::DrainBehavior::DrainAndDelete);
}

grid_.setDestroying(); // Fake being in the destructor.
Expand Down
Loading