diff --git a/docs/root/intro/arch_overview/other_features/ip_transparency.rst b/docs/root/intro/arch_overview/other_features/ip_transparency.rst index 0e9fe81fc82d2..76ed11b5f5928 100644 --- a/docs/root/intro/arch_overview/other_features/ip_transparency.rst +++ b/docs/root/intro/arch_overview/other_features/ip_transparency.rst @@ -56,6 +56,10 @@ conjunction with the :ref:`Original Src Listener Filter `. Finally, Envoy supports generating this header using the :ref:`Proxy Protocol Transport Socket `. +IMPORTANT: There is currently a memory `issue `_ in Envoy where upstream connection pools are +not cleaned up after they are created. This heavily affects the usage of this transport socket as new pools are created for every downstream client +IP and port pair. Removing a cluster will clean up its associated connection pools, which could be used to mitigate this issue in the current state. + Here is an example config for setting up the socket: .. code-block:: yaml diff --git a/docs/root/version_history/current.rst b/docs/root/version_history/current.rst index 0c7d0abb2c2d6..9ca41df84466c 100644 --- a/docs/root/version_history/current.rst +++ b/docs/root/version_history/current.rst @@ -49,7 +49,6 @@ Bug Fixes *Changes expected to improve the state of the world and are unlikely to have negative effects* * aws_lambda: if ``payload_passthrough`` is set to ``false``, the downstream response content-type header will now be set from the content-type entry in the JSON response's headers map, if present. -* cluster: delete pools when they're idle to fix unbounded memory use when using PROXY protocol upstream with tcp_proxy. This behavior can be temporarily reverted by setting the ``envoy.reloadable_features.conn_pool_delete_when_idle`` runtime guard to false. * cluster: fixed the :ref:`cluster stats ` histograms by moving the accounting into the router filter. This means that we now properly compute the number of bytes sent as well as handling retries which were previously ignored. * hot_restart: fix double counting of ``server.seconds_until_first_ocsp_response_expiring`` and ``server.days_until_first_cert_expiring`` during hot-restart. This stat was only incorrect until the parent process terminated. diff --git a/envoy/common/conn_pool.h b/envoy/common/conn_pool.h index 17cf77eca6824..e05664288b673 100644 --- a/envoy/common/conn_pool.h +++ b/envoy/common/conn_pool.h @@ -44,27 +44,17 @@ class Instance { virtual ~Instance() = default; /** - * Called when a connection pool has no pending streams, busy connections, or ready connections. + * Called when a connection pool has been drained of pending streams, busy connections, and + * ready connections. */ - using IdleCb = std::function; + using DrainedCb = std::function; /** - * Register a callback that gets called when the connection pool is fully idle. + * Register a callback that gets called when the connection pool is fully drained and kicks + * off a drain. The owner of the connection pool is responsible for not creating any + * new streams. */ - virtual void addIdleCallback(IdleCb cb) PURE; - - /** - * Returns true if the pool does not have any connections or pending requests. - */ - virtual bool isIdle() const PURE; - - /** - * Starts draining a pool, by gracefully completing all requests and gracefully closing all - * connections, in preparation for deletion. When the process completes, the function registered - * via `addIdleCallback()` is called. The callback may occur before this call returns if the pool - * can be immediately drained. - */ - virtual void startDrain() PURE; + virtual void addDrainedCallback(DrainedCb cb) PURE; /** * Actively drain all existing connection pool connections. This method can be used in cases diff --git a/envoy/event/deferred_deletable.h b/envoy/event/deferred_deletable.h index 32b9398e322cd..c0e3dfee2835a 100644 --- a/envoy/event/deferred_deletable.h +++ b/envoy/event/deferred_deletable.h @@ -13,12 +13,6 @@ namespace Event { class DeferredDeletable { public: virtual ~DeferredDeletable() = default; - - /** - * Called when an object is passed to `deferredDelete`. This signals that the object will soon - * be deleted. - */ - virtual void deleteIsPending() {} }; using DeferredDeletablePtr = std::unique_ptr; diff --git a/envoy/upstream/thread_local_cluster.h b/envoy/upstream/thread_local_cluster.h index 16bba2855b0f3..2efe626228475 100644 --- a/envoy/upstream/thread_local_cluster.h +++ b/envoy/upstream/thread_local_cluster.h @@ -31,7 +31,9 @@ class HttpPoolData { /** * See documentation of Envoy::ConnectionPool::Instance. */ - void addIdleCallback(ConnectionPool::Instance::IdleCb cb) { pool_->addIdleCallback(cb); }; + void addDrainedCallback(ConnectionPool::Instance::DrainedCb cb) { + pool_->addDrainedCallback(cb); + }; Upstream::HostDescriptionConstSharedPtr host() const { return pool_->host(); } diff --git a/source/common/config/grpc_mux_impl.cc b/source/common/config/grpc_mux_impl.cc index 060f0c845796b..58807df00d2d3 100644 --- a/source/common/config/grpc_mux_impl.cc +++ b/source/common/config/grpc_mux_impl.cc @@ -14,35 +14,6 @@ namespace Envoy { namespace Config { -namespace { -class AllMuxesState { -public: - void insert(GrpcMuxImpl* mux) { - absl::WriterMutexLock locker(&lock_); - muxes_.insert(mux); - } - - void erase(GrpcMuxImpl* mux) { - absl::WriterMutexLock locker(&lock_); - muxes_.erase(mux); - } - - void shutdownAll() { - absl::WriterMutexLock locker(&lock_); - for (auto& mux : muxes_) { - mux->shutdown(); - } - } - -private: - absl::flat_hash_set muxes_ ABSL_GUARDED_BY(lock_); - - // TODO(ggreenway): can this lock be removed? Is this code only run on the main thread? - absl::Mutex lock_; -}; -using AllMuxes = ThreadSafeSingleton; -} // namespace - GrpcMuxImpl::GrpcMuxImpl(const LocalInfo::LocalInfo& local_info, Grpc::RawAsyncClientPtr async_client, Event::Dispatcher& dispatcher, const Protobuf::MethodDescriptor& service_method, @@ -59,13 +30,8 @@ GrpcMuxImpl::GrpcMuxImpl(const LocalInfo::LocalInfo& local_info, onDynamicContextUpdate(resource_type_url); })) { Config::Utility::checkLocalInfo("ads", local_info); - AllMuxes::get().insert(this); } -GrpcMuxImpl::~GrpcMuxImpl() { AllMuxes::get().erase(this); } - -void GrpcMuxImpl::shutdownAll() { AllMuxes::get().shutdownAll(); } - void GrpcMuxImpl::onDynamicContextUpdate(absl::string_view resource_type_url) { auto api_state = api_state_.find(resource_type_url); if (api_state == api_state_.end()) { @@ -78,10 +44,6 @@ void GrpcMuxImpl::onDynamicContextUpdate(absl::string_view resource_type_url) { void GrpcMuxImpl::start() { grpc_stream_.establishNewStream(); } void GrpcMuxImpl::sendDiscoveryRequest(const std::string& type_url) { - if (shutdown_) { - return; - } - ApiState& api_state = apiStateFor(type_url); auto& request = api_state.request_; request.mutable_resource_names()->Clear(); diff --git a/source/common/config/grpc_mux_impl.h b/source/common/config/grpc_mux_impl.h index 57b4946099373..585d028fe2b65 100644 --- a/source/common/config/grpc_mux_impl.h +++ b/source/common/config/grpc_mux_impl.h @@ -39,17 +39,6 @@ class GrpcMuxImpl : public GrpcMux, Random::RandomGenerator& random, Stats::Scope& scope, const RateLimitSettings& rate_limit_settings, bool skip_subsequent_node); - ~GrpcMuxImpl() override; - - // Causes all GrpcMuxImpl objects to stop sending any messages on `grpc_stream_` to fix a crash - // on Envoy shutdown due to dangling pointers. This may not be the ideal fix; it is probably - // preferable for the `ServerImpl` to cause all configuration subscriptions to be shutdown, which - // would then cause all `GrpcMuxImpl` to be destructed. - // TODO: figure out the correct fix: https://github.com/envoyproxy/envoy/issues/15072. - static void shutdownAll(); - - void shutdown() { shutdown_ = true; } - void start() override; // GrpcMux @@ -190,10 +179,6 @@ class GrpcMuxImpl : public GrpcMux, Event::Dispatcher& dispatcher_; Common::CallbackHandlePtr dynamic_update_callback_handle_; - - // True iff Envoy is shutting down; no messages should be sent on the `grpc_stream_` when this is - // true because it may contain dangling pointers. - std::atomic shutdown_{false}; }; using GrpcMuxImplPtr = std::unique_ptr; diff --git a/source/common/config/new_grpc_mux_impl.cc b/source/common/config/new_grpc_mux_impl.cc index d0e3db537d0b9..89a829167e273 100644 --- a/source/common/config/new_grpc_mux_impl.cc +++ b/source/common/config/new_grpc_mux_impl.cc @@ -16,35 +16,6 @@ namespace Envoy { namespace Config { -namespace { -class AllMuxesState { -public: - void insert(NewGrpcMuxImpl* mux) { - absl::WriterMutexLock locker(&lock_); - muxes_.insert(mux); - } - - void erase(NewGrpcMuxImpl* mux) { - absl::WriterMutexLock locker(&lock_); - muxes_.erase(mux); - } - - void shutdownAll() { - absl::WriterMutexLock locker(&lock_); - for (auto& mux : muxes_) { - mux->shutdown(); - } - } - -private: - absl::flat_hash_set muxes_ ABSL_GUARDED_BY(lock_); - - // TODO(ggreenway): can this lock be removed? Is this code only run on the main thread? - absl::Mutex lock_; -}; -using AllMuxes = ThreadSafeSingleton; -} // namespace - NewGrpcMuxImpl::NewGrpcMuxImpl(Grpc::RawAsyncClientPtr&& async_client, Event::Dispatcher& dispatcher, const Protobuf::MethodDescriptor& service_method, @@ -59,13 +30,7 @@ NewGrpcMuxImpl::NewGrpcMuxImpl(Grpc::RawAsyncClientPtr&& async_client, [this](absl::string_view resource_type_url) { onDynamicContextUpdate(resource_type_url); })), - transport_api_version_(transport_api_version), dispatcher_(dispatcher) { - AllMuxes::get().insert(this); -} - -NewGrpcMuxImpl::~NewGrpcMuxImpl() { AllMuxes::get().erase(this); } - -void NewGrpcMuxImpl::shutdownAll() { AllMuxes::get().shutdownAll(); } + transport_api_version_(transport_api_version), dispatcher_(dispatcher) {} void NewGrpcMuxImpl::onDynamicContextUpdate(absl::string_view resource_type_url) { auto sub = subscriptions_.find(resource_type_url); @@ -251,10 +216,6 @@ void NewGrpcMuxImpl::addSubscription(const std::string& type_url, const bool use } void NewGrpcMuxImpl::trySendDiscoveryRequests() { - if (shutdown_) { - return; - } - while (true) { // Do any of our subscriptions even want to send a request? absl::optional maybe_request_type = whoWantsToSendDiscoveryRequest(); diff --git a/source/common/config/new_grpc_mux_impl.h b/source/common/config/new_grpc_mux_impl.h index 98ded0dec357b..4c2246fed813b 100644 --- a/source/common/config/new_grpc_mux_impl.h +++ b/source/common/config/new_grpc_mux_impl.h @@ -38,17 +38,6 @@ class NewGrpcMuxImpl const RateLimitSettings& rate_limit_settings, const LocalInfo::LocalInfo& local_info); - ~NewGrpcMuxImpl() override; - - // Causes all NewGrpcMuxImpl objects to stop sending any messages on `grpc_stream_` to fix a crash - // on Envoy shutdown due to dangling pointers. This may not be the ideal fix; it is probably - // preferable for the `ServerImpl` to cause all configuration subscriptions to be shutdown, which - // would then cause all `NewGrpcMuxImpl` to be destructed. - // TODO: figure out the correct fix: https://github.com/envoyproxy/envoy/issues/15072. - static void shutdownAll(); - - void shutdown() { shutdown_ = true; } - GrpcMuxWatchPtr addWatch(const std::string& type_url, const absl::flat_hash_set& resources, SubscriptionCallbacks& callbacks, @@ -181,10 +170,6 @@ class NewGrpcMuxImpl Common::CallbackHandlePtr dynamic_update_callback_handle_; const envoy::config::core::v3::ApiVersion transport_api_version_; Event::Dispatcher& dispatcher_; - - // True iff Envoy is shutting down; no messages should be sent on the `grpc_stream_` when this is - // true because it may contain dangling pointers. - std::atomic shutdown_{false}; }; using NewGrpcMuxImplPtr = std::unique_ptr; diff --git a/source/common/conn_pool/conn_pool_base.cc b/source/common/conn_pool/conn_pool_base.cc index 626ac8df39ffc..c6276446f4a9d 100644 --- a/source/common/conn_pool/conn_pool_base.cc +++ b/source/common/conn_pool/conn_pool_base.cc @@ -28,13 +28,9 @@ ConnPoolImplBase::ConnPoolImplBase( upstream_ready_cb_(dispatcher_.createSchedulableCallback([this]() { onUpstreamReady(); })) {} ConnPoolImplBase::~ConnPoolImplBase() { - ASSERT(isIdleImpl()); - ASSERT(connecting_stream_capacity_ == 0); -} - -void ConnPoolImplBase::deleteIsPendingImpl() { - deferred_deleting_ = true; - ASSERT(isIdleImpl()); + ASSERT(ready_clients_.empty()); + ASSERT(busy_clients_.empty()); + ASSERT(connecting_clients_.empty()); ASSERT(connecting_stream_capacity_ == 0); } @@ -233,8 +229,6 @@ void ConnPoolImplBase::onStreamClosed(Envoy::ConnectionPool::ActiveClient& clien } ConnectionPool::Cancellable* ConnPoolImplBase::newStream(AttachContext& context) { - ASSERT(!deferred_deleting_); - ASSERT(static_cast(connecting_stream_capacity_) == connectingCapacity(connecting_clients_)); // O(n) debug check. if (!ready_clients_.empty()) { @@ -282,7 +276,6 @@ ConnectionPool::Cancellable* ConnPoolImplBase::newStream(AttachContext& context) } bool ConnPoolImplBase::maybePreconnect(float global_preconnect_ratio) { - ASSERT(!deferred_deleting_); return tryCreateNewConnection(global_preconnect_ratio) == ConnectionResult::CreatedNewConnection; } @@ -333,11 +326,9 @@ void ConnPoolImplBase::transitionActiveClientState(ActiveClient& client, } } -void ConnPoolImplBase::addIdleCallbackImpl(Instance::IdleCb cb) { idle_callbacks_.push_back(cb); } - -void ConnPoolImplBase::startDrainImpl() { - is_draining_ = true; - checkForIdleAndCloseIdleConnsIfDraining(); +void ConnPoolImplBase::addDrainedCallbackImpl(Instance::DrainedCb cb) { + drained_callbacks_.push_back(cb); + checkForDrained(); } void ConnPoolImplBase::closeIdleConnectionsForDrainingPool() { @@ -379,19 +370,17 @@ void ConnPoolImplBase::drainConnectionsImpl() { } } -bool ConnPoolImplBase::isIdleImpl() const { - return pending_streams_.empty() && ready_clients_.empty() && busy_clients_.empty() && - connecting_clients_.empty(); -} - -void ConnPoolImplBase::checkForIdleAndCloseIdleConnsIfDraining() { - if (is_draining_) { - closeIdleConnectionsForDrainingPool(); +void ConnPoolImplBase::checkForDrained() { + if (drained_callbacks_.empty()) { + return; } - if (isIdleImpl()) { - ENVOY_LOG(debug, "invoking idle callbacks - is_draining_={}", is_draining_); - for (const Instance::IdleCb& cb : idle_callbacks_) { + closeIdleConnectionsForDrainingPool(); + + if (pending_streams_.empty() && ready_clients_.empty() && busy_clients_.empty() && + connecting_clients_.empty()) { + ENVOY_LOG(debug, "invoking drained callbacks"); + for (const Instance::DrainedCb& cb : drained_callbacks_) { cb(); } } @@ -454,8 +443,9 @@ void ConnPoolImplBase::onConnectionEvent(ActiveClient& client, absl::string_view client.releaseResources(); dispatcher_.deferredDelete(client.removeFromList(owningList(client.state()))); - - checkForIdleAndCloseIdleConnsIfDraining(); + if (incomplete_stream) { + checkForDrained(); + } client.setState(ActiveClient::State::CLOSED); @@ -473,7 +463,7 @@ void ConnPoolImplBase::onConnectionEvent(ActiveClient& client, absl::string_view // refer to client after this point. onConnected(client); onUpstreamReady(); - checkForIdleAndCloseIdleConnsIfDraining(); + checkForDrained(); } } @@ -543,7 +533,7 @@ void ConnPoolImplBase::onPendingStreamCancel(PendingStream& stream, } host_->cluster().stats().upstream_rq_cancelled_.inc(); - checkForIdleAndCloseIdleConnsIfDraining(); + checkForDrained(); } namespace { diff --git a/source/common/conn_pool/conn_pool_base.h b/source/common/conn_pool/conn_pool_base.h index 8e06e8e68ed2b..f4822b7e77f64 100644 --- a/source/common/conn_pool/conn_pool_base.h +++ b/source/common/conn_pool/conn_pool_base.h @@ -144,8 +144,6 @@ class ConnPoolImplBase : protected Logger::Loggable { Upstream::ClusterConnectivityState& state); virtual ~ConnPoolImplBase(); - void deleteIsPendingImpl(); - // A helper function to get the specific context type from the base class context. template T& typedContext(AttachContext& context) { ASSERT(dynamic_cast(&context) != nullptr); @@ -162,8 +160,7 @@ class ConnPoolImplBase : protected Logger::Loggable { int64_t connecting_and_connected_capacity, float preconnect_ratio, bool anticipate_incoming_stream = false); - void addIdleCallbackImpl(Instance::IdleCb cb); - void startDrainImpl(); + void addDrainedCallbackImpl(Instance::DrainedCb cb); void drainConnectionsImpl(); // Closes and destroys all connections. This must be called in the destructor of @@ -195,13 +192,8 @@ class ConnPoolImplBase : protected Logger::Loggable { void onConnectionEvent(ActiveClient& client, absl::string_view failure_reason, Network::ConnectionEvent event); - - // Returns true if the pool is idle. - bool isIdleImpl() const; - - // See if the pool has gone idle. If we're draining, this will also close idle connections. - void checkForIdleAndCloseIdleConnsIfDraining(); - + // See if the drain process has started and/or completed. + void checkForDrained(); void scheduleOnUpstreamReady(); ConnectionPool::Cancellable* newStream(AttachContext& context); // Called if this pool is likely to be picked soon, to determine if it's worth preconnecting. @@ -307,7 +299,7 @@ class ConnPoolImplBase : protected Logger::Loggable { const Network::ConnectionSocket::OptionsSharedPtr socket_options_; const Network::TransportSocketOptionsConstSharedPtr transport_socket_options_; - std::list idle_callbacks_; + std::list drained_callbacks_; // When calling purgePendingStreams, this list will be used to hold the streams we are about // to purge. We need this if one cancelled streams cancels a different pending stream @@ -333,13 +325,6 @@ class ConnPoolImplBase : protected Logger::Loggable { // The number of streams currently attached to clients. uint32_t num_active_streams_{0}; - // Whether the connection pool is currently in the process of closing - // all connections so that it can be gracefully deleted. - bool is_draining_{false}; - - // True iff this object is in the deferred delete list. - bool deferred_deleting_{false}; - void onUpstreamReady(); Event::SchedulableCallbackPtr upstream_ready_cb_; }; diff --git a/source/common/event/dispatcher_impl.cc b/source/common/event/dispatcher_impl.cc index ff1c855977b80..f12399d2df666 100644 --- a/source/common/event/dispatcher_impl.cc +++ b/source/common/event/dispatcher_impl.cc @@ -247,13 +247,10 @@ TimerPtr DispatcherImpl::createTimerInternal(TimerCb cb) { void DispatcherImpl::deferredDelete(DeferredDeletablePtr&& to_delete) { ASSERT(isThreadSafe()); - if (to_delete != nullptr) { - to_delete->deleteIsPending(); - current_to_delete_->emplace_back(std::move(to_delete)); - ENVOY_LOG(trace, "item added to deferred deletion list (size={})", current_to_delete_->size()); - if (current_to_delete_->size() == 1) { - deferred_delete_cb_->scheduleCallbackCurrentIteration(); - } + current_to_delete_->emplace_back(std::move(to_delete)); + ENVOY_LOG(trace, "item added to deferred deletion list (size={})", current_to_delete_->size()); + if (current_to_delete_->size() == 1) { + deferred_delete_cb_->scheduleCallbackCurrentIteration(); } } diff --git a/source/common/http/conn_pool_base.cc b/source/common/http/conn_pool_base.cc index 4a67bea9da859..601295b8f9665 100644 --- a/source/common/http/conn_pool_base.cc +++ b/source/common/http/conn_pool_base.cc @@ -141,7 +141,7 @@ void MultiplexedActiveClientBase::onStreamDestroy() { // wait until the connection has been fully drained of streams and then check in the connection // event callback. if (!closed_with_active_rq_) { - parent().checkForIdleAndCloseIdleConnsIfDraining(); + parent().checkForDrained(); } } diff --git a/source/common/http/conn_pool_base.h b/source/common/http/conn_pool_base.h index e2f7494c0e38b..0d72454786204 100644 --- a/source/common/http/conn_pool_base.h +++ b/source/common/http/conn_pool_base.h @@ -55,13 +55,8 @@ class HttpConnPoolImplBase : public Envoy::ConnectionPool::ConnPoolImplBase, std::vector protocols); ~HttpConnPoolImplBase() override; - // Event::DeferredDeletable - void deleteIsPending() override { deleteIsPendingImpl(); } - // ConnectionPool::Instance - void addIdleCallback(IdleCb cb) override { addIdleCallbackImpl(cb); } - bool isIdle() const override { return isIdleImpl(); } - void startDrain() override { startDrainImpl(); } + void addDrainedCallback(DrainedCb cb) override { addDrainedCallbackImpl(cb); } void drainConnections() override { drainConnectionsImpl(); } Upstream::HostDescriptionConstSharedPtr host() const override { return host_; } ConnectionPool::Cancellable* newStream(Http::ResponseDecoder& response_decoder, diff --git a/source/common/http/conn_pool_grid.cc b/source/common/http/conn_pool_grid.cc index 80f67419f8dea..28ce8d1c89c30 100644 --- a/source/common/http/conn_pool_grid.cc +++ b/source/common/http/conn_pool_grid.cc @@ -205,7 +205,7 @@ ConnectivityGrid::ConnectivityGrid( } ConnectivityGrid::~ConnectivityGrid() { - // Ignore idle callbacks while the pools are destroyed below. + // Ignore drained callbacks while the pools are destroyed below. destroying_ = true; // Callbacks might have pending streams registered with the pools, so cancel and delete // the callback before deleting the pools. @@ -213,40 +213,25 @@ ConnectivityGrid::~ConnectivityGrid() { pools_.clear(); } -void ConnectivityGrid::deleteIsPending() { - deferred_deleting_ = true; - for (const auto& pool : pools_) { - pool->deleteIsPending(); - } -} - absl::optional ConnectivityGrid::createNextPool() { - ASSERT(!deferred_deleting_); // Pools are created by newStream, which should not be called during draining. - ASSERT(!draining_); + ASSERT(drained_callbacks_.empty()); // Right now, only H3 and TCP are supported, so if there are 2 pools we're done. - if (pools_.size() == 2 || draining_) { + if (pools_.size() == 2 || !drained_callbacks_.empty()) { return absl::nullopt; } // HTTP/3 is hard-coded as higher priority, H2 as secondary. - ConnectionPool::InstancePtr pool; if (pools_.empty()) { - pool = Http3::allocateConnPool(dispatcher_, random_generator_, host_, priority_, options_, - transport_socket_options_, state_, time_source_); - } else { - pool = std::make_unique(dispatcher_, random_generator_, host_, priority_, - options_, transport_socket_options_, state_); - } - - setupPool(*pool); - pools_.push_back(std::move(pool)); - - return --pools_.end(); -} - -void ConnectivityGrid::setupPool(ConnectionPool::Instance& pool) { - pool.addIdleCallback([this]() { onIdleReceived(); }); + pools_.push_back(Http3::allocateConnPool(dispatcher_, random_generator_, host_, priority_, + options_, transport_socket_options_, state_, + time_source_)); + return pools_.begin(); + } + pools_.push_back(std::make_unique(dispatcher_, random_generator_, host_, + priority_, options_, + transport_socket_options_, state_)); + return std::next(pools_.begin()); } bool ConnectivityGrid::hasActiveConnections() const { @@ -261,11 +246,6 @@ bool ConnectivityGrid::hasActiveConnections() const { ConnectionPool::Cancellable* ConnectivityGrid::newStream(Http::ResponseDecoder& decoder, ConnectionPool::Callbacks& callbacks) { - ASSERT(!deferred_deleting_); - - // New streams should not be created during draining. - ASSERT(!draining_); - if (pools_.empty()) { createNextPool(); } @@ -287,24 +267,22 @@ ConnectionPool::Cancellable* ConnectivityGrid::newStream(Http::ResponseDecoder& return ret; } -void ConnectivityGrid::addIdleCallback(IdleCb cb) { +void ConnectivityGrid::addDrainedCallback(DrainedCb cb) { // Add the callback to the list of callbacks to be called when all drains are // complete. - idle_callbacks_.emplace_back(cb); -} + drained_callbacks_.emplace_back(cb); -void ConnectivityGrid::startDrain() { - if (draining_) { - // A drain callback has already been set, and only needs to happen once. + if (drained_callbacks_.size() != 1) { return; } - // Note that no new pools can be created from this point on - // as createNextPool fast-fails if `draining_` is true. - draining_ = true; - + // If this is the first time a drained callback has been added, track the + // number of pools which need to be drained in order to pass drain-completion + // up to the callers. Note that no new pools can be created from this point on + // as createNextPool fast-fails if drained callbacks are present. + drains_needed_ = pools_.size(); for (auto& pool : pools_) { - pool->startDrain(); + pool->addDrainedCallback([this]() -> void { onDrainReceived(); }); } } @@ -338,25 +316,21 @@ void ConnectivityGrid::markHttp3Broken() { http3_status_tracker_.markHttp3Broken void ConnectivityGrid::markHttp3Confirmed() { http3_status_tracker_.markHttp3Confirmed(); } -bool ConnectivityGrid::isIdle() const { - // This is O(n) but the function is constant and there are no plans for n > 8. - bool idle = true; - for (const auto& pool : pools_) { - idle &= pool->isIdle(); - } - return idle; -} - -void ConnectivityGrid::onIdleReceived() { +void ConnectivityGrid::onDrainReceived() { // Don't do any work under the stack of ~ConnectivityGrid() if (destroying_) { return; } - if (isIdle()) { - for (auto& callback : idle_callbacks_) { - callback(); - } + // If not all the pools have drained, keep waiting. + ASSERT(drains_needed_ != 0); + if (--drains_needed_ != 0) { + return; + } + + // All the pools have drained. Notify drain subscribers. + for (auto& callback : drained_callbacks_) { + callback(); } } diff --git a/source/common/http/conn_pool_grid.h b/source/common/http/conn_pool_grid.h index f9ddd6d266e00..5adf47dd4f7c3 100644 --- a/source/common/http/conn_pool_grid.h +++ b/source/common/http/conn_pool_grid.h @@ -137,16 +137,11 @@ class ConnectivityGrid : public ConnectionPool::Instance, ConnectivityOptions connectivity_options); ~ConnectivityGrid() override; - // Event::DeferredDeletable - void deleteIsPending() override; - // Http::ConnPool::Instance bool hasActiveConnections() const override; ConnectionPool::Cancellable* newStream(Http::ResponseDecoder& response_decoder, ConnectionPool::Callbacks& callbacks) override; - void addIdleCallback(IdleCb cb) override; - bool isIdle() const override; - void startDrain() override; + void addDrainedCallback(DrainedCb cb) override; void drainConnections() override; Upstream::HostDescriptionConstSharedPtr host() const override; bool maybePreconnect(float preconnect_ratio) override; @@ -170,16 +165,12 @@ class ConnectivityGrid : public ConnectionPool::Instance, // event that HTTP/3 is marked broken again. void markHttp3Confirmed(); -protected: - // Set the required idle callback on the pool. - void setupPool(ConnectionPool::Instance& pool); - private: friend class ConnectivityGridForTest; - // Called by each pool as it idles. The grid is responsible for calling - // idle_callbacks_ once all pools have idled. - void onIdleReceived(); + // Called by each pool as it drains. The grid is responsible for calling + // drained_callbacks_ once all pools have drained. + void onDrainReceived(); // Returns true if HTTP/3 should be attempted because there is an alternate protocol // that specifies HTTP/3 and HTTP/3 is not broken. @@ -203,24 +194,20 @@ class ConnectivityGrid : public ConnectionPool::Instance, // TODO(RyanTheOptimist): Make the alternate_protocols_ member non-optional. AlternateProtocolsCacheSharedPtr alternate_protocols_; - // True iff this pool is draining. No new streams or connections should be created - // in this state. - bool draining_{false}; - + // Tracks how many drains are needed before calling drain callbacks. This is + // set to the number of pools when the first drain callbacks are added, and + // decremented as various pools drain. + uint32_t drains_needed_ = 0; // Tracks the callbacks to be called on drain completion. - std::list idle_callbacks_; + std::list drained_callbacks_; // The connection pools to use to create new streams, ordered in the order of // desired use. std::list pools_; - // True iff under the stack of the destructor, to avoid calling drain // callbacks on deletion. bool destroying_{}; - // True iff this pool is being being defer deleted. - bool deferred_deleting_{}; - // Wrapped callbacks are stashed in the wrapped_callbacks_ for ownership. std::list wrapped_callbacks_; }; diff --git a/source/common/http/http1/conn_pool.cc b/source/common/http/http1/conn_pool.cc index 87f33935474cf..c7e9af0970108 100644 --- a/source/common/http/http1/conn_pool.cc +++ b/source/common/http/http1/conn_pool.cc @@ -63,7 +63,7 @@ void ActiveClient::StreamWrapper::onDecodeComplete() { pool->scheduleOnUpstreamReady(); parent_.stream_wrapper_.reset(); - pool->checkForIdleAndCloseIdleConnsIfDraining(); + pool->checkForDrained(); } } diff --git a/source/common/runtime/runtime_features.cc b/source/common/runtime/runtime_features.cc index 7fe14815aed3a..df75f156656a5 100644 --- a/source/common/runtime/runtime_features.cc +++ b/source/common/runtime/runtime_features.cc @@ -61,7 +61,6 @@ constexpr const char* runtime_features[] = { "envoy.reloadable_features.allow_response_for_timeout", "envoy.reloadable_features.check_unsupported_typed_per_filter_config", "envoy.reloadable_features.check_ocsp_policy", - "envoy.reloadable_features.conn_pool_delete_when_idle", "envoy.reloadable_features.disable_tls_inspector_injection", "envoy.reloadable_features.dont_add_content_length_for_bodiless_requests", "envoy.reloadable_features.enable_compression_without_content_length_header", diff --git a/source/common/tcp/conn_pool.cc b/source/common/tcp/conn_pool.cc index 38456bf4a5110..a4f33086956c6 100644 --- a/source/common/tcp/conn_pool.cc +++ b/source/common/tcp/conn_pool.cc @@ -42,7 +42,7 @@ ActiveTcpClient::~ActiveTcpClient() { ASSERT(state() == ActiveClient::State::CLOSED); tcp_connection_data_->release(); parent_.onStreamClosed(*this, true); - parent_.checkForIdleAndCloseIdleConnsIfDraining(); + parent_.checkForDrained(); } } @@ -54,7 +54,7 @@ void ActiveTcpClient::clearCallbacks() { callbacks_ = nullptr; tcp_connection_data_ = nullptr; parent_.onStreamClosed(*this, true); - parent_.checkForIdleAndCloseIdleConnsIfDraining(); + parent_.checkForDrained(); } void ActiveTcpClient::onEvent(Network::ConnectionEvent event) { diff --git a/source/common/tcp/conn_pool.h b/source/common/tcp/conn_pool.h index 398254b498461..e8c2f24c026e4 100644 --- a/source/common/tcp/conn_pool.h +++ b/source/common/tcp/conn_pool.h @@ -145,12 +145,7 @@ class ConnPoolImpl : public Envoy::ConnectionPool::ConnPoolImplBase, transport_socket_options, state) {} ~ConnPoolImpl() override { destructAllConnections(); } - // Event::DeferredDeletable - void deleteIsPending() override { deleteIsPendingImpl(); } - - void addIdleCallback(IdleCb cb) override { addIdleCallbackImpl(cb); } - bool isIdle() const override { return isIdleImpl(); } - void startDrain() override { startDrainImpl(); } + void addDrainedCallback(DrainedCb cb) override { addDrainedCallbackImpl(cb); } void drainConnections() override { drainConnectionsImpl(); // Legacy behavior for the TCP connection pool marks all connecting clients diff --git a/source/common/tcp/original_conn_pool.cc b/source/common/tcp/original_conn_pool.cc index cb4bf71b6735e..4f4da573b940d 100644 --- a/source/common/tcp/original_conn_pool.cc +++ b/source/common/tcp/original_conn_pool.cc @@ -38,7 +38,6 @@ OriginalConnPoolImpl::~OriginalConnPoolImpl() { } void OriginalConnPoolImpl::drainConnections() { - ENVOY_LOG(debug, "draining connections"); while (!ready_conns_.empty()) { ready_conns_.front()->conn_->close(Network::ConnectionCloseType::NoFlush); } @@ -68,11 +67,9 @@ void OriginalConnPoolImpl::closeConnections() { } } -void OriginalConnPoolImpl::addIdleCallback(IdleCb cb) { idle_callbacks_.push_back(cb); } - -void OriginalConnPoolImpl::startDrain() { - is_draining_ = true; - checkForIdleAndCloseIdleConnsIfDraining(); +void OriginalConnPoolImpl::addDrainedCallback(DrainedCb cb) { + drained_callbacks_.push_back(cb); + checkForDrained(); } void OriginalConnPoolImpl::assignConnection(ActiveConn& conn, @@ -84,22 +81,14 @@ void OriginalConnPoolImpl::assignConnection(ActiveConn& conn, conn.real_host_description_); } -bool OriginalConnPoolImpl::isIdle() const { - return pending_requests_.empty() && busy_conns_.empty() && pending_conns_.empty() && - ready_conns_.empty(); -} - -void OriginalConnPoolImpl::checkForIdleAndCloseIdleConnsIfDraining() { - if (pending_requests_.empty() && busy_conns_.empty() && pending_conns_.empty() && - (is_draining_ || ready_conns_.empty())) { - if (is_draining_) { - ENVOY_LOG(debug, "in draining state"); - while (!ready_conns_.empty()) { - ready_conns_.front()->conn_->close(Network::ConnectionCloseType::NoFlush); - } +void OriginalConnPoolImpl::checkForDrained() { + if (!drained_callbacks_.empty() && pending_requests_.empty() && busy_conns_.empty() && + pending_conns_.empty()) { + while (!ready_conns_.empty()) { + ready_conns_.front()->conn_->close(Network::ConnectionCloseType::NoFlush); } - ENVOY_LOG(debug, "Calling idle callbacks - drained={}", is_draining_); - for (const IdleCb& cb : idle_callbacks_) { + + for (const DrainedCb& cb : drained_callbacks_) { cb(); } } @@ -113,8 +102,6 @@ void OriginalConnPoolImpl::createNewConnection() { ConnectionPool::Cancellable* OriginalConnPoolImpl::newConnection(ConnectionPool::Callbacks& callbacks) { - ASSERT(!deferred_deleting_); - if (!ready_conns_.empty()) { ready_conns_.front()->moveBetweenLists(ready_conns_, busy_conns_); ENVOY_CONN_LOG(debug, "using existing connection", *busy_conns_.front()->conn_); @@ -209,8 +196,8 @@ void OriginalConnPoolImpl::onConnectionEvent(ActiveConn& conn, Network::Connecti createNewConnection(); } - if (check_for_drained || !is_draining_) { - checkForIdleAndCloseIdleConnsIfDraining(); + if (check_for_drained) { + checkForDrained(); } } @@ -245,7 +232,7 @@ void OriginalConnPoolImpl::onPendingRequestCancel(PendingRequest& request, pending_conns_.back()->conn_->close(Network::ConnectionCloseType::NoFlush); } - checkForIdleAndCloseIdleConnsIfDraining(); + checkForDrained(); } void OriginalConnPoolImpl::onConnReleased(ActiveConn& conn) { @@ -326,7 +313,7 @@ void OriginalConnPoolImpl::processIdleConnection(ActiveConn& conn, bool new_conn upstream_ready_cb_->scheduleCallbackCurrentIteration(); } - checkForIdleAndCloseIdleConnsIfDraining(); + checkForDrained(); } OriginalConnPoolImpl::ConnectionWrapper::ConnectionWrapper(ActiveConn& parent) : parent_(parent) { diff --git a/source/common/tcp/original_conn_pool.h b/source/common/tcp/original_conn_pool.h index d5e79580c5b46..b4c5f4bca5fd5 100644 --- a/source/common/tcp/original_conn_pool.h +++ b/source/common/tcp/original_conn_pool.h @@ -28,13 +28,8 @@ class OriginalConnPoolImpl : Logger::Loggable, public Connecti ~OriginalConnPoolImpl() override; - // Event::DeferredDeletable - void deleteIsPending() override { deferred_deleting_ = true; } - // ConnectionPool::Instance - void addIdleCallback(IdleCb cb) override; - bool isIdle() const override; - void startDrain() override; + void addDrainedCallback(DrainedCb cb) override; void drainConnections() override; void closeConnections() override; ConnectionPool::Cancellable* newConnection(ConnectionPool::Callbacks& callbacks) override; @@ -153,7 +148,7 @@ class OriginalConnPoolImpl : Logger::Loggable, public Connecti virtual void onConnDestroyed(ActiveConn& conn); void onUpstreamReady(); void processIdleConnection(ActiveConn& conn, bool new_connection, bool delay); - void checkForIdleAndCloseIdleConnsIfDraining(); + void checkForDrained(); Event::Dispatcher& dispatcher_; Upstream::HostConstSharedPtr host_; @@ -165,13 +160,10 @@ class OriginalConnPoolImpl : Logger::Loggable, public Connecti std::list ready_conns_; // conns ready for assignment std::list busy_conns_; // conns assigned std::list pending_requests_; - std::list idle_callbacks_; + std::list drained_callbacks_; Stats::TimespanPtr conn_connect_ms_; Event::SchedulableCallbackPtr upstream_ready_cb_; - bool upstream_ready_enabled_{false}; - bool is_draining_{false}; - bool deferred_deleting_{false}; }; } // namespace Tcp diff --git a/source/common/tcp_proxy/tcp_proxy.cc b/source/common/tcp_proxy/tcp_proxy.cc index a369af7358d16..ce3f18d7d65c2 100644 --- a/source/common/tcp_proxy/tcp_proxy.cc +++ b/source/common/tcp_proxy/tcp_proxy.cc @@ -711,22 +711,14 @@ void Filter::disableIdleTimer() { UpstreamDrainManager::~UpstreamDrainManager() { // If connections aren't closed before they are destructed an ASSERT fires, // so cancel all pending drains, which causes the connections to be closed. - if (!drainers_.empty()) { - auto& dispatcher = drainers_.begin()->second->dispatcher(); - while (!drainers_.empty()) { - auto begin = drainers_.begin(); - Drainer* key = begin->first; - begin->second->cancelDrain(); - - // cancelDrain() should cause that drainer to be removed from drainers_. - // ASSERT so that we don't end up in an infinite loop. - ASSERT(drainers_.find(key) == drainers_.end()); - } + while (!drainers_.empty()) { + auto begin = drainers_.begin(); + Drainer* key = begin->first; + begin->second->cancelDrain(); - // This destructor is run when shutting down `ThreadLocal`. The destructor of some objects use - // earlier `ThreadLocal` slots (for accessing the runtime snapshot) so they must run before that - // slot is destructed. Clear the list to enforce that ordering. - dispatcher.clearDeferredDeleteList(); + // cancelDrain() should cause that drainer to be removed from drainers_. + // ASSERT so that we don't end up in an infinite loop. + ASSERT(drainers_.find(key) == drainers_.end()); } } @@ -798,7 +790,5 @@ void Drainer::cancelDrain() { upstream_conn_data_->connection().close(Network::ConnectionCloseType::NoFlush); } -Event::Dispatcher& Drainer::dispatcher() { return upstream_conn_data_->connection().dispatcher(); } - } // namespace TcpProxy } // namespace Envoy diff --git a/source/common/tcp_proxy/tcp_proxy.h b/source/common/tcp_proxy/tcp_proxy.h index 7e22c3273bc62..06c5572a313f2 100644 --- a/source/common/tcp_proxy/tcp_proxy.h +++ b/source/common/tcp_proxy/tcp_proxy.h @@ -401,7 +401,6 @@ class Drainer : public Event::DeferredDeletable { void onIdleTimeout(); void onBytesSent(); void cancelDrain(); - Event::Dispatcher& dispatcher(); private: UpstreamDrainManager& parent_; diff --git a/source/common/upstream/cluster_manager_impl.cc b/source/common/upstream/cluster_manager_impl.cc index 0b9897b40aa6e..b62c3261076d1 100644 --- a/source/common/upstream/cluster_manager_impl.cc +++ b/source/common/upstream/cluster_manager_impl.cc @@ -1116,9 +1116,6 @@ ClusterManagerImpl::ThreadLocalClusterManagerImpl::~ThreadLocalClusterManagerImp } } thread_local_clusters_.clear(); - - // Ensure that all pools are completely destructed. - thread_local_dispatcher_.clearDeferredDeleteList(); } void ClusterManagerImpl::ThreadLocalClusterManagerImpl::drainConnPools(const HostVector& hosts) { @@ -1132,7 +1129,7 @@ void ClusterManagerImpl::ThreadLocalClusterManagerImpl::drainConnPools(const Hos { auto container = host_tcp_conn_pool_map_.find(host); if (container != host_tcp_conn_pool_map_.end()) { - drainTcpConnPools(container->second); + drainTcpConnPools(host, container->second); } } } @@ -1140,37 +1137,80 @@ void ClusterManagerImpl::ThreadLocalClusterManagerImpl::drainConnPools(const Hos void ClusterManagerImpl::ThreadLocalClusterManagerImpl::drainConnPools( HostSharedPtr old_host, ConnPoolsContainer& container) { + container.drains_remaining_ += container.pools_->size(); + // Make a copy to protect against erasure in the callback. std::shared_ptr pools = container.pools_; - container.draining_ = true; + pools->addDrainedCallback([this, old_host]() -> void { + if (destroying_) { + // It is possible for a connection pool to fire drain callbacks during destruction. Instead + // of checking if old_host actually exists in the map, it's clearer and cleaner to keep + // track of destruction as a separate state and check for it here. This also allows us to + // do this check here versus inside every different connection pool implementation. + return; + } + + ConnPoolsContainer* to_clear = getHttpConnPoolsContainer(old_host); + if (to_clear == nullptr) { + // This could happen if we have cleaned out the host before iterating through every connection + // pool. Handle it by just continuing. + return; + } + + ASSERT(to_clear->drains_remaining_ > 0); + to_clear->drains_remaining_--; + if (to_clear->drains_remaining_ == 0 && to_clear->ready_to_drain_) { + clearContainer(old_host, *to_clear); + } + }); // We need to hold off on actually emptying out the container until we have finished processing - // `addIdleCallback`. If we do not, then it's possible that the container could be erased in + // `addDrainedCallback`. If we do not, then it's possible that the container could be erased in // the middle of its iteration, which leads to undefined behaviour. We handle that case by - // guarding deletion with `do_not_delete_` in the registered idle callback, and then checking - // afterwards whether it is empty and deleting it if necessary. - container.do_not_delete_ = true; - pools->startDrain(); - container.do_not_delete_ = false; - - if (container.pools_->size() == 0) { - host_http_conn_pool_map_.erase(old_host); + // checking here to see if the drains have completed. + container.ready_to_drain_ = true; + if (container.drains_remaining_ == 0) { + clearContainer(old_host, container); } } +void ClusterManagerImpl::ThreadLocalClusterManagerImpl::clearContainer( + HostSharedPtr old_host, ConnPoolsContainer& container) { + container.pools_->clear(); + host_http_conn_pool_map_.erase(old_host); +} + void ClusterManagerImpl::ThreadLocalClusterManagerImpl::drainTcpConnPools( - TcpConnPoolsContainer& container) { + HostSharedPtr old_host, TcpConnPoolsContainer& container) { + container.drains_remaining_ += container.pools_.size(); - // Copy the pools so that it is safe for the completion callback to mutate `container.pools_`. - // `container` may be invalid after all calls to `startDrain()`. - std::vector pools; for (const auto& pair : container.pools_) { - pools.push_back(pair.second.get()); - } + pair.second->addDrainedCallback([this, old_host]() -> void { + if (destroying_) { + // It is possible for a connection pool to fire drain callbacks during destruction. Instead + // of checking if old_host actually exists in the map, it's clearer and cleaner to keep + // track of destruction as a separate state and check for it here. This also allows us to + // do this check here versus inside every different connection pool implementation. + return; + } - container.draining_ = true; - for (auto pool : pools) { - pool->startDrain(); + TcpConnPoolsContainer& container = host_tcp_conn_pool_map_[old_host]; + ASSERT(container.drains_remaining_ > 0); + container.drains_remaining_--; + if (container.drains_remaining_ == 0) { + for (auto& pair : container.pools_) { + thread_local_dispatcher_.deferredDelete(std::move(pair.second)); + } + host_tcp_conn_pool_map_.erase(old_host); + } + }); + + // The above addDrainedCallback() drain completion callback might execute immediately. This can + // then effectively nuke 'container', which means we can't continue to loop on its contents + // (we're done here). + if (host_tcp_conn_pool_map_.count(old_host) == 0) { + break; + } } } @@ -1231,13 +1271,7 @@ void ClusterManagerImpl::ThreadLocalClusterManagerImpl::onHostHealthFailure( { const auto container = getHttpConnPoolsContainer(host); if (container != nullptr) { - container->do_not_delete_ = true; container->pools_->drainConnections(); - container->do_not_delete_ = false; - - if (container->pools_->size() == 0) { - host_http_conn_pool_map_.erase(host); - } } } { @@ -1247,15 +1281,8 @@ void ClusterManagerImpl::ThreadLocalClusterManagerImpl::onHostHealthFailure( // active connections. const auto& container = host_tcp_conn_pool_map_.find(host); if (container != host_tcp_conn_pool_map_.end()) { - // Draining pools or closing connections can cause pool deletion if it becomes - // idle. Copy `pools_` so that we aren't iterating through a container that - // gets mutated by callbacks deleting from it. - std::vector pools; for (const auto& pair : container->second.pools_) { - pools.push_back(pair.second.get()); - } - - for (auto* pool : pools) { + const Tcp::ConnectionPool::InstancePtr& pool = pair.second; if (host->cluster().features() & ClusterInfo::Features::CLOSE_CONNECTIONS_ON_HOST_HEALTH_FAILURE) { pool->closeConnections(); @@ -1433,16 +1460,11 @@ ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::connPool( // function. Otherwise, we'd need to capture a few of these variables by value. ConnPoolsContainer::ConnPools::PoolOptRef pool = container.pools_->getPool(priority, hash_key, [&]() { - auto pool = parent_.parent_.factory_.allocateConnPool( + return parent_.parent_.factory_.allocateConnPool( parent_.thread_local_dispatcher_, host, priority, upstream_protocols, alternate_protocol_options, !upstream_options->empty() ? upstream_options : nullptr, have_transport_socket_options ? context->upstreamTransportSocketOptions() : nullptr, parent_.parent_.time_source_, parent_.cluster_manager_state_); - - pool->addIdleCallback( - [this, host, priority, hash_key]() { httpConnPoolIsIdle(host, priority, hash_key); }); - - return pool; }); if (pool.has_value()) { @@ -1452,38 +1474,6 @@ ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::connPool( } } -void ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::httpConnPoolIsIdle( - HostConstSharedPtr host, ResourcePriority priority, const std::vector& hash_key) { - if (parent_.destroying_) { - // If the Cluster is being destroyed, this pool will be cleaned up by that - // process. - return; - } - - ConnPoolsContainer* container = parent_.getHttpConnPoolsContainer(host); - if (container == nullptr) { - // This could happen if we have cleaned out the host before iterating through every - // connection pool. Handle it by just continuing. - return; - } - - if (container->draining_ || - Runtime::runtimeFeatureEnabled("envoy.reloadable_features.conn_pool_delete_when_idle")) { - - ENVOY_LOG(trace, "Erasing idle pool for host {}", host); - container->pools_->erasePool(priority, hash_key); - - // Guard deletion of the container with `do_not_delete_` to avoid deletion while - // iterating through the container in `container->pools_->startDrain()`. See - // comment in `ClusterManagerImpl::ThreadLocalClusterManagerImpl::drainConnPools`. - if (!container->do_not_delete_ && container->pools_->size() == 0) { - ENVOY_LOG(trace, "Pool container empty for host {}, erasing host entry", host); - parent_.host_http_conn_pool_map_.erase( - host); // NOTE: `container` is erased after this point in the lambda. - } - } -} - Tcp::ConnectionPool::Instance* ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::tcpConnPool( ResourcePriority priority, LoadBalancerContext* context, bool peek) { @@ -1521,50 +1511,15 @@ ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::tcpConnPool( } TcpConnPoolsContainer& container = parent_.host_tcp_conn_pool_map_[host]; - auto pool_iter = container.pools_.find(hash_key); - if (pool_iter == container.pools_.end()) { - bool inserted; - std::tie(pool_iter, inserted) = container.pools_.emplace( - hash_key, - parent_.parent_.factory_.allocateTcpConnPool( - parent_.thread_local_dispatcher_, host, priority, - !upstream_options->empty() ? upstream_options : nullptr, - have_transport_socket_options ? context->upstreamTransportSocketOptions() : nullptr, - parent_.cluster_manager_state_)); - ASSERT(inserted); - pool_iter->second->addIdleCallback( - [this, host, hash_key]() { tcpConnPoolIsIdle(host, hash_key); }); - } - - return pool_iter->second.get(); -} - -void ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::tcpConnPoolIsIdle( - HostConstSharedPtr host, const std::vector& hash_key) { - if (parent_.destroying_) { - // If the Cluster is being destroyed, this pool will be cleaned up by that process. - return; + if (!container.pools_[hash_key]) { + container.pools_[hash_key] = parent_.parent_.factory_.allocateTcpConnPool( + parent_.thread_local_dispatcher_, host, priority, + !upstream_options->empty() ? upstream_options : nullptr, + have_transport_socket_options ? context->upstreamTransportSocketOptions() : nullptr, + parent_.cluster_manager_state_); } - auto it = parent_.host_tcp_conn_pool_map_.find(host); - if (it != parent_.host_tcp_conn_pool_map_.end()) { - TcpConnPoolsContainer& container = it->second; - - auto erase_iter = container.pools_.find(hash_key); - if (erase_iter != container.pools_.end()) { - if (container.draining_ || - Runtime::runtimeFeatureEnabled("envoy.reloadable_features.conn_pool_delete_when_idle")) { - ENVOY_LOG(trace, "Idle pool, erasing pool for host {}", host); - parent_.thread_local_dispatcher_.deferredDelete(std::move(erase_iter->second)); - container.pools_.erase(erase_iter); - } - } - - if (container.pools_.empty()) { - parent_.host_tcp_conn_pool_map_.erase( - host); // NOTE: `container` is erased after this point in the lambda. - } - } + return container.pools_[hash_key].get(); } ClusterManagerPtr ProdClusterManagerFactory::clusterManagerFromProto( diff --git a/source/common/upstream/cluster_manager_impl.h b/source/common/upstream/cluster_manager_impl.h index a2c6a62e0fdd3..c3d436eb3f9ac 100644 --- a/source/common/upstream/cluster_manager_impl.h +++ b/source/common/upstream/cluster_manager_impl.h @@ -359,18 +359,15 @@ class ClusterManagerImpl : public ClusterManager, Logger::Loggable pools_; - bool draining_{false}; - - // Protect from deletion while iterating through pools_. See comments and usage - // in `ClusterManagerImpl::ThreadLocalClusterManagerImpl::drainConnPools()`. - bool do_not_delete_{false}; + bool ready_to_drain_{false}; + uint64_t drains_remaining_{}; }; struct TcpConnPoolsContainer { using ConnPools = std::map, Tcp::ConnectionPool::InstancePtr>; ConnPools pools_; - bool draining_{false}; + uint64_t drains_remaining_{}; }; // Holds an unowned reference to a connection, and watches for Closed events. If the connection @@ -412,10 +409,6 @@ class ClusterManagerImpl : public ClusterManager, Logger::Loggable& hash_key); - void tcpConnPoolIsIdle(HostConstSharedPtr host, const std::vector& hash_key); - // Upstream::ThreadLocalCluster const PrioritySet& prioritySet() override { return priority_set_; } ClusterInfoConstSharedPtr info() override { return cluster_info_; } @@ -452,7 +445,8 @@ class ClusterManagerImpl : public ClusterManager, Logger::Loggable class ConnPoolMap { public: using PoolFactory = std::function()>; - using IdleCb = typename POOL_TYPE::IdleCb; + using DrainedCb = std::function; using PoolOptRef = absl::optional>; ConnPoolMap(Event::Dispatcher& dispatcher, const HostConstSharedPtr& host, @@ -31,14 +31,7 @@ template class ConnPoolMap { * possible for this to fail if a limit on the number of pools allowed is reached. * @return The pool corresponding to `key`, or `absl::nullopt`. */ - PoolOptRef getPool(const KEY_TYPE& key, const PoolFactory& factory); - - /** - * Erases an existing pool mapped to `key`. - * - * @return true if the entry exists and was removed, false otherwise - */ - bool erasePool(const KEY_TYPE& key); + PoolOptRef getPool(KEY_TYPE key, const PoolFactory& factory); /** * @return the number of pools. @@ -51,20 +44,15 @@ template class ConnPoolMap { void clear(); /** - * Adds an idle callback to all mapped pools. Any future mapped pools with have the callback + * Adds a drain callback to all mapped pools. Any future mapped pools with have the callback * automatically added. Be careful with the callback. If it itself calls into `this`, modifying * the state of `this`, there is a good chance it will cause corruption due to the callback firing * immediately. */ - void addIdleCallback(const IdleCb& cb); - - /** - * See `Envoy::ConnectionPool::Instance::startDrain()`. - */ - void startDrain(); + void addDrainedCallback(const DrainedCb& cb); /** - * See `Envoy::ConnectionPool::Instance::drainConnections()`. + * Instructs each connection pool to drain its connections. */ void drainConnections(); @@ -82,7 +70,7 @@ template class ConnPoolMap { absl::flat_hash_map> active_pools_; Event::Dispatcher& thread_local_dispatcher_; - std::vector cached_callbacks_; + std::vector cached_callbacks_; Common::DebugRecursionChecker recursion_checker_; const HostConstSharedPtr host_; const ResourcePriority priority_; diff --git a/source/common/upstream/conn_pool_map_impl.h b/source/common/upstream/conn_pool_map_impl.h index 18176d0a18ec8..b183f9d438bb7 100644 --- a/source/common/upstream/conn_pool_map_impl.h +++ b/source/common/upstream/conn_pool_map_impl.h @@ -21,7 +21,7 @@ template ConnPoolMap typename ConnPoolMap::PoolOptRef -ConnPoolMap::getPool(const KEY_TYPE& key, const PoolFactory& factory) { +ConnPoolMap::getPool(KEY_TYPE key, const PoolFactory& factory) { Common::AutoDebugRecursionChecker assert_not_in(recursion_checker_); // TODO(klarose): Consider how we will change the connection pool's configuration in the future. // The plan is to change the downstream socket options... We may want to take those as a parameter @@ -53,28 +53,13 @@ ConnPoolMap::getPool(const KEY_TYPE& key, const PoolFactory auto new_pool = factory(); connPoolResource.inc(); for (const auto& cb : cached_callbacks_) { - new_pool->addIdleCallback(cb); + new_pool->addDrainedCallback(cb); } auto inserted = active_pools_.emplace(key, std::move(new_pool)); return std::ref(*inserted.first->second); } -template -bool ConnPoolMap::erasePool(const KEY_TYPE& key) { - Common::AutoDebugRecursionChecker assert_not_in(recursion_checker_); - auto pool_iter = active_pools_.find(key); - - if (pool_iter != active_pools_.end()) { - thread_local_dispatcher_.deferredDelete(std::move(pool_iter->second)); - active_pools_.erase(pool_iter); - host_->cluster().resourceManager(priority_).connectionPools().dec(); - return true; - } else { - return false; - } -} - template size_t ConnPoolMap::size() const { return active_pools_.size(); @@ -89,42 +74,20 @@ template void ConnPoolMap -void ConnPoolMap::addIdleCallback(const IdleCb& cb) { +void ConnPoolMap::addDrainedCallback(const DrainedCb& cb) { Common::AutoDebugRecursionChecker assert_not_in(recursion_checker_); for (auto& pool_pair : active_pools_) { - pool_pair.second->addIdleCallback(cb); + pool_pair.second->addDrainedCallback(cb); } cached_callbacks_.emplace_back(std::move(cb)); } -template -void ConnPoolMap::startDrain() { - // Copy the `active_pools_` so that it is safe for the call to result - // in deletion, and avoid iteration through a mutating container. - std::vector pools; - pools.reserve(active_pools_.size()); - for (auto& pool_pair : active_pools_) { - pools.push_back(pool_pair.second.get()); - } - - for (auto* pool : pools) { - pool->startDrain(); - } -} - template void ConnPoolMap::drainConnections() { - // Copy the `active_pools_` so that it is safe for the call to result - // in deletion, and avoid iteration through a mutating container. - std::vector pools; - pools.reserve(active_pools_.size()); + Common::AutoDebugRecursionChecker assert_not_in(recursion_checker_); for (auto& pool_pair : active_pools_) { - pools.push_back(pool_pair.second.get()); - } - - for (auto* pool : pools) { - pool->drainConnections(); + pool_pair.second->drainConnections(); } } diff --git a/source/common/upstream/priority_conn_pool_map.h b/source/common/upstream/priority_conn_pool_map.h index c43ba46c06ea5..18ce2c52eb959 100644 --- a/source/common/upstream/priority_conn_pool_map.h +++ b/source/common/upstream/priority_conn_pool_map.h @@ -15,7 +15,7 @@ template class PriorityConnPoolMap { public: using ConnPoolMapType = ConnPoolMap; using PoolFactory = typename ConnPoolMapType::PoolFactory; - using IdleCb = typename ConnPoolMapType::IdleCb; + using DrainedCb = typename ConnPoolMapType::DrainedCb; using PoolOptRef = typename ConnPoolMapType::PoolOptRef; PriorityConnPoolMap(Event::Dispatcher& dispatcher, const HostConstSharedPtr& host); @@ -26,12 +26,7 @@ template class PriorityConnPoolMap { * is reached. * @return The pool corresponding to `key`, or `absl::nullopt`. */ - PoolOptRef getPool(ResourcePriority priority, const KEY_TYPE& key, const PoolFactory& factory); - - /** - * Erase a pool for the given priority and `key` if it exists and is idle. - */ - bool erasePool(ResourcePriority priority, const KEY_TYPE& key); + PoolOptRef getPool(ResourcePriority priority, KEY_TYPE key, const PoolFactory& factory); /** * @return the number of pools across all priorities. @@ -49,21 +44,14 @@ template class PriorityConnPoolMap { * the state of `this`, there is a good chance it will cause corruption due to the callback firing * immediately. */ - void addIdleCallback(const IdleCb& cb); + void addDrainedCallback(const DrainedCb& cb); /** - * See `Envoy::ConnectionPool::Instance::startDrain()`. - */ - void startDrain(); - - /** - * See `Envoy::ConnectionPool::Instance::drainConnections()`. + * Instructs each connection pool to drain its connections. */ void drainConnections(); private: - size_t getPriorityIndex(ResourcePriority priority) const; - std::array, NumResourcePriorities> conn_pool_maps_; }; diff --git a/source/common/upstream/priority_conn_pool_map_impl.h b/source/common/upstream/priority_conn_pool_map_impl.h index 66cc9ff4407ea..b1cb6f8c54d12 100644 --- a/source/common/upstream/priority_conn_pool_map_impl.h +++ b/source/common/upstream/priority_conn_pool_map_impl.h @@ -20,15 +20,11 @@ PriorityConnPoolMap::~PriorityConnPoolMap() = default; template typename PriorityConnPoolMap::PoolOptRef -PriorityConnPoolMap::getPool(ResourcePriority priority, const KEY_TYPE& key, +PriorityConnPoolMap::getPool(ResourcePriority priority, KEY_TYPE key, const PoolFactory& factory) { - return conn_pool_maps_[getPriorityIndex(priority)]->getPool(key, factory); -} - -template -bool PriorityConnPoolMap::erasePool(ResourcePriority priority, - const KEY_TYPE& key) { - return conn_pool_maps_[getPriorityIndex(priority)]->erasePool(key); + size_t index = static_cast(priority); + ASSERT(index < conn_pool_maps_.size()); + return conn_pool_maps_[index]->getPool(key, factory); } template @@ -48,16 +44,9 @@ void PriorityConnPoolMap::clear() { } template -void PriorityConnPoolMap::addIdleCallback(const IdleCb& cb) { +void PriorityConnPoolMap::addDrainedCallback(const DrainedCb& cb) { for (auto& pool_map : conn_pool_maps_) { - pool_map->addIdleCallback(cb); - } -} - -template -void PriorityConnPoolMap::startDrain() { - for (auto& pool_map : conn_pool_maps_) { - pool_map->startDrain(); + pool_map->addDrainedCallback(cb); } } @@ -68,12 +57,5 @@ void PriorityConnPoolMap::drainConnections() { } } -template -size_t PriorityConnPoolMap::getPriorityIndex(ResourcePriority priority) const { - size_t index = static_cast(priority); - ASSERT(index < conn_pool_maps_.size()); - return index; -} - } // namespace Upstream } // namespace Envoy diff --git a/source/server/BUILD b/source/server/BUILD index a632ff364c4ec..9c893e5444288 100644 --- a/source/server/BUILD +++ b/source/server/BUILD @@ -509,8 +509,6 @@ envoy_cc_library( "//source/common/common:logger_lib", "//source/common/common:mutex_tracer_lib", "//source/common/common:utility_lib", - "//source/common/config:grpc_mux_lib", - "//source/common/config:new_grpc_mux_lib", "//source/common/config:utility_lib", "//source/common/config:xds_resource_lib", "//source/common/grpc:async_client_manager_lib", diff --git a/source/server/server.cc b/source/server/server.cc index 1ec87f8a4ab4e..88235f38d7637 100644 --- a/source/server/server.cc +++ b/source/server/server.cc @@ -29,8 +29,6 @@ #include "source/common/common/enum_to_int.h" #include "source/common/common/mutex_tracer_impl.h" #include "source/common/common/utility.h" -#include "source/common/config/grpc_mux_impl.h" -#include "source/common/config/new_grpc_mux_impl.h" #include "source/common/config/utility.h" #include "source/common/config/version_converter.h" #include "source/common/config/xds_resource.h" @@ -827,10 +825,6 @@ void InstanceImpl::terminate() { // Before the workers start exiting we should disable stat threading. stats_store_.shutdownThreading(); - // TODO: figure out the correct fix: https://github.com/envoyproxy/envoy/issues/15072. - Config::GrpcMuxImpl::shutdownAll(); - Config::NewGrpcMuxImpl::shutdownAll(); - if (overload_manager_) { overload_manager_->stop(); } diff --git a/test/common/conn_pool/conn_pool_base_test.cc b/test/common/conn_pool/conn_pool_base_test.cc index f1f53f69f9fd8..5d8d64d359db3 100644 --- a/test/common/conn_pool/conn_pool_base_test.cc +++ b/test/common/conn_pool/conn_pool_base_test.cc @@ -60,8 +60,7 @@ class ConnPoolImplBaseTest : public testing::Test { // connection resource limit for most tests. cluster_->resetResourceManager(1024, 1024, 1024, 1, 1); ON_CALL(pool_, instantiateActiveClient).WillByDefault(Invoke([&]() -> ActiveClientPtr { - auto ret = - std::make_unique>(pool_, stream_limit_, concurrent_streams_); + auto ret = std::make_unique(pool_, stream_limit_, concurrent_streams_); clients_.push_back(ret.get()); ret->real_host_description_ = descr_; return ret; @@ -89,7 +88,7 @@ class ConnPoolImplBaseTest : public testing::Test { Upstream::makeTestHost(cluster_, "tcp://127.0.0.1:80", dispatcher_.timeSource())}; TestConnPoolImplBase pool_; AttachContext context_; - std::vector clients_; + std::vector clients_; }; TEST_F(ConnPoolImplBaseTest, DumpState) { @@ -202,61 +201,5 @@ TEST_F(ConnPoolImplBaseTest, ExplicitPreconnectNotHealthy) { EXPECT_FALSE(pool_.maybePreconnect(1)); } -// Remote close simulates the peer closing the connection. -TEST_F(ConnPoolImplBaseTest, PoolIdleCallbackTriggeredRemoteClose) { - EXPECT_CALL(dispatcher_, createTimer_(_)).Times(AnyNumber()); - - // Create a new stream using the pool - EXPECT_CALL(pool_, instantiateActiveClient); - pool_.newStream(context_); - ASSERT_EQ(1, clients_.size()); - - // Emulate the new upstream connection establishment - EXPECT_CALL(pool_, onPoolReady); - clients_.back()->onEvent(Network::ConnectionEvent::Connected); - - // The pool now has no requests/streams, but has an open connection, so it is not yet idle. - clients_.back()->active_streams_ = 0; - pool_.onStreamClosed(*clients_.back(), false); - - // Now that the last connection is closed, while there are no requests, the pool becomes idle. - testing::MockFunction idle_pool_callback; - EXPECT_CALL(idle_pool_callback, Call()); - pool_.addIdleCallbackImpl(idle_pool_callback.AsStdFunction()); - dispatcher_.clearDeferredDeleteList(); - clients_.back()->onEvent(Network::ConnectionEvent::RemoteClose); - - EXPECT_CALL(idle_pool_callback, Call()); - pool_.startDrainImpl(); -} - -// Local close simulates what would happen for an idle timeout on a connection. -TEST_F(ConnPoolImplBaseTest, PoolIdleCallbackTriggeredLocalClose) { - EXPECT_CALL(dispatcher_, createTimer_(_)).Times(AnyNumber()); - - // Create a new stream using the pool - EXPECT_CALL(pool_, instantiateActiveClient); - pool_.newStream(context_); - ASSERT_EQ(1, clients_.size()); - - // Emulate the new upstream connection establishment - EXPECT_CALL(pool_, onPoolReady); - clients_.back()->onEvent(Network::ConnectionEvent::Connected); - - // The pool now has no requests/streams, but has an open connection, so it is not yet idle. - clients_.back()->active_streams_ = 0; - pool_.onStreamClosed(*clients_.back(), false); - - // Now that the last connection is closed, while there are no requests, the pool becomes idle. - testing::MockFunction idle_pool_callback; - EXPECT_CALL(idle_pool_callback, Call()); - pool_.addIdleCallbackImpl(idle_pool_callback.AsStdFunction()); - dispatcher_.clearDeferredDeleteList(); - clients_.back()->onEvent(Network::ConnectionEvent::LocalClose); - - EXPECT_CALL(idle_pool_callback, Call()); - pool_.startDrainImpl(); -} - } // namespace ConnectionPool } // namespace Envoy diff --git a/test/common/http/conn_pool_grid_test.cc b/test/common/http/conn_pool_grid_test.cc index 406bda423e3b5..945af22b86ab7 100644 --- a/test/common/http/conn_pool_grid_test.cc +++ b/test/common/http/conn_pool_grid_test.cc @@ -41,7 +41,6 @@ class ConnectivityGridForTest : public ConnectivityGrid { return absl::nullopt; } ConnectionPool::MockInstance* instance = new NiceMock(); - setupPool(*instance); pools_.push_back(ConnectionPool::InstancePtr{instance}); ON_CALL(*instance, newStream(_, _)) .WillByDefault( @@ -402,28 +401,32 @@ TEST_F(ConnectivityGridTest, DrainCallbacks) { grid_.createNextPool(); bool drain_received = false; + bool second_drain_received = false; - grid_.addIdleCallback([&]() { drain_received = true; }); - - // The first time a drain is started, both pools should start draining. + ConnectionPool::Instance::DrainedCb pool1_cb; + ConnectionPool::Instance::DrainedCb pool2_cb; + // The first time a drained callback is added, the Grid's callback should be + // added to both pools. { - EXPECT_CALL(*grid_.first(), startDrain()); - EXPECT_CALL(*grid_.second(), startDrain()); - grid_.startDrain(); + EXPECT_CALL(*grid_.first(), addDrainedCallback(_)) + .WillOnce(Invoke(Invoke([&](ConnectionPool::Instance::DrainedCb cb) { pool1_cb = cb; }))); + EXPECT_CALL(*grid_.second(), addDrainedCallback(_)) + .WillOnce(Invoke(Invoke([&](ConnectionPool::Instance::DrainedCb cb) { pool2_cb = cb; }))); + grid_.addDrainedCallback([&drain_received]() -> void { drain_received = true; }); } - // The second time, the pools will not see any change. + // The second time a drained callback is added, the pools will not see any + // change. { - EXPECT_CALL(*grid_.first(), startDrain()).Times(0); - EXPECT_CALL(*grid_.second(), startDrain()).Times(0); - grid_.startDrain(); + EXPECT_CALL(*grid_.first(), addDrainedCallback(_)).Times(0); + EXPECT_CALL(*grid_.second(), addDrainedCallback(_)).Times(0); + grid_.addDrainedCallback([&second_drain_received]() -> void { second_drain_received = true; }); } { // Notify the grid the second pool has been drained. This should not be // passed up to the original callers. EXPECT_FALSE(drain_received); - EXPECT_CALL(*grid_.second(), isIdle()).WillRepeatedly(Return(true)); - grid_.second()->idle_cb_(); + (pool2_cb)(); EXPECT_FALSE(drain_received); } @@ -431,57 +434,27 @@ TEST_F(ConnectivityGridTest, DrainCallbacks) { // Notify the grid that another pool has been drained. Now that all pools are // drained, the original callers should be informed. EXPECT_FALSE(drain_received); - EXPECT_CALL(*grid_.first(), isIdle()).WillRepeatedly(Return(true)); - grid_.first()->idle_cb_(); + (pool1_cb)(); EXPECT_TRUE(drain_received); + EXPECT_TRUE(second_drain_received); } } -// Make sure idle callbacks work as expected. -TEST_F(ConnectivityGridTest, IdleCallbacks) { - // Synthetically create both pools. - grid_.createNextPool(); - grid_.createNextPool(); - - bool idle_received = false; - - grid_.addIdleCallback([&]() { idle_received = true; }); - EXPECT_FALSE(idle_received); - - // Notify the grid the second pool is idle. This should not be - // passed up to the original callers. - EXPECT_CALL(*grid_.second(), isIdle()).WillOnce(Return(true)); - EXPECT_CALL(*grid_.first(), isIdle()).WillOnce(Return(false)); - grid_.second()->idle_cb_(); - EXPECT_FALSE(idle_received); - - // Notify the grid that the first pool is idle, the but second no longer is. - EXPECT_CALL(*grid_.first(), isIdle()).WillOnce(Return(true)); - EXPECT_CALL(*grid_.second(), isIdle()).WillOnce(Return(false)); - grid_.first()->idle_cb_(); - EXPECT_FALSE(idle_received); - - // Notify the grid that both are now idle. This should be passed up - // to the original caller. - EXPECT_CALL(*grid_.first(), isIdle()).WillOnce(Return(true)); - EXPECT_CALL(*grid_.second(), isIdle()).WillOnce(Return(true)); - grid_.first()->idle_cb_(); - EXPECT_TRUE(idle_received); -} - // Ensure drain callbacks aren't called during grid teardown. TEST_F(ConnectivityGridTest, NoDrainOnTeardown) { grid_.createNextPool(); bool drain_received = false; + ConnectionPool::Instance::DrainedCb pool1_cb; { - grid_.addIdleCallback([&drain_received]() -> void { drain_received = true; }); - grid_.startDrain(); + EXPECT_CALL(*grid_.first(), addDrainedCallback(_)) + .WillOnce(Invoke(Invoke([&](ConnectionPool::Instance::DrainedCb cb) { pool1_cb = cb; }))); + grid_.addDrainedCallback([&drain_received]() -> void { drain_received = true; }); } grid_.setDestroying(); // Fake being in the destructor. - grid_.first()->idle_cb_(); + (pool1_cb)(); EXPECT_FALSE(drain_received); } diff --git a/test/common/http/http1/conn_pool_test.cc b/test/common/http/http1/conn_pool_test.cc index f603967b14c97..c0493c7307b47 100644 --- a/test/common/http/http1/conn_pool_test.cc +++ b/test/common/http/http1/conn_pool_test.cc @@ -31,7 +31,6 @@ #include "gtest/gtest.h" using testing::_; -using testing::AtLeast; using testing::DoAll; using testing::InSequence; using testing::Invoke; @@ -947,17 +946,16 @@ TEST_F(Http1ConnPoolImplTest, DrainCallback) { InSequence s; ReadyWatcher drained; + EXPECT_CALL(drained, ready()); + conn_pool_->addDrainedCallback([&]() -> void { drained.ready(); }); + ActiveTestRequest r1(*this, 0, ActiveTestRequest::Type::CreateConnection); ActiveTestRequest r2(*this, 0, ActiveTestRequest::Type::Pending); - - conn_pool_->addIdleCallback([&]() -> void { drained.ready(); }); - conn_pool_->startDrain(); - r2.handle_->cancel(Envoy::ConnectionPool::CancelPolicy::Default); EXPECT_EQ(1U, cluster_->stats_.upstream_rq_total_.value()); conn_pool_->expectEnableUpstreamReady(); - EXPECT_CALL(drained, ready()).Times(AtLeast(1)); + EXPECT_CALL(drained, ready()); r1.startRequest(); r1.completeResponse(false); @@ -977,11 +975,10 @@ TEST_F(Http1ConnPoolImplTest, DrainWhileConnecting) { Http::ConnectionPool::Cancellable* handle = conn_pool_->newStream(outer_decoder, callbacks); EXPECT_NE(nullptr, handle); - conn_pool_->addIdleCallback([&]() -> void { drained.ready(); }); - conn_pool_->startDrain(); + conn_pool_->addDrainedCallback([&]() -> void { drained.ready(); }); EXPECT_CALL(*conn_pool_->test_clients_[0].connection_, close(Network::ConnectionCloseType::NoFlush)); - EXPECT_CALL(drained, ready()).Times(AtLeast(1)); + EXPECT_CALL(drained, ready()); handle->cancel(Envoy::ConnectionPool::CancelPolicy::Default); EXPECT_CALL(*conn_pool_, onClientDestroy()); diff --git a/test/common/http/http2/conn_pool_test.cc b/test/common/http/http2/conn_pool_test.cc index b45fb5f1a56d8..e3784cbaaf718 100644 --- a/test/common/http/http2/conn_pool_test.cc +++ b/test/common/http/http2/conn_pool_test.cc @@ -23,7 +23,6 @@ #include "gtest/gtest.h" using testing::_; -using testing::AtLeast; using testing::DoAll; using testing::InSequence; using testing::Invoke; @@ -1090,8 +1089,7 @@ TEST_F(Http2ConnPoolImplTest, DrainDisconnectWithActiveRequest) { ->encodeHeaders(TestRequestHeaderMapImpl{{":path", "/"}, {":method", "GET"}}, true) .ok()); ReadyWatcher drained; - pool_->addIdleCallback([&]() -> void { drained.ready(); }); - pool_->startDrain(); + pool_->addDrainedCallback([&]() -> void { drained.ready(); }); EXPECT_CALL(dispatcher_, deferredDelete_(_)); EXPECT_CALL(drained, ready()); @@ -1127,8 +1125,7 @@ TEST_F(Http2ConnPoolImplTest, DrainDisconnectDrainingWithActiveRequest) { .ok()); ReadyWatcher drained; - pool_->addIdleCallback([&]() -> void { drained.ready(); }); - pool_->startDrain(); + pool_->addDrainedCallback([&]() -> void { drained.ready(); }); EXPECT_CALL(dispatcher_, deferredDelete_(_)); EXPECT_CALL(r2.decoder_, decodeHeaders_(_, true)); @@ -1171,8 +1168,7 @@ TEST_F(Http2ConnPoolImplTest, DrainPrimary) { .ok()); ReadyWatcher drained; - pool_->addIdleCallback([&]() -> void { drained.ready(); }); - pool_->startDrain(); + pool_->addDrainedCallback([&]() -> void { drained.ready(); }); EXPECT_CALL(dispatcher_, deferredDelete_(_)); EXPECT_CALL(r2.decoder_, decodeHeaders_(_, true)); @@ -1182,7 +1178,7 @@ TEST_F(Http2ConnPoolImplTest, DrainPrimary) { dispatcher_.clearDeferredDeleteList(); EXPECT_CALL(dispatcher_, deferredDelete_(_)); - EXPECT_CALL(drained, ready()).Times(AtLeast(1)); + EXPECT_CALL(drained, ready()); EXPECT_CALL(r1.decoder_, decodeHeaders_(_, true)); r1.inner_decoder_->decodeHeaders( ResponseHeaderMapPtr{new TestResponseHeaderMapImpl{{":status", "200"}}}, true); @@ -1227,8 +1223,7 @@ TEST_F(Http2ConnPoolImplTest, DrainPrimaryNoActiveRequest) { ReadyWatcher drained; EXPECT_CALL(drained, ready()); - pool_->addIdleCallback([&]() -> void { drained.ready(); }); - pool_->startDrain(); + pool_->addDrainedCallback([&]() -> void { drained.ready(); }); EXPECT_CALL(*this, onClientDestroy()); dispatcher_.clearDeferredDeleteList(); diff --git a/test/common/tcp/conn_pool_test.cc b/test/common/tcp/conn_pool_test.cc index 8657f444a4834..c1febd25d1558 100644 --- a/test/common/tcp/conn_pool_test.cc +++ b/test/common/tcp/conn_pool_test.cc @@ -25,7 +25,6 @@ using testing::_; using testing::AnyNumber; -using testing::AtLeast; using testing::Invoke; using testing::InvokeWithoutArgs; using testing::NiceMock; @@ -105,9 +104,7 @@ class ConnPoolBase : public Tcp::ConnectionPool::Instance { Network::TransportSocketOptionsConstSharedPtr transport_socket_options, bool test_new_connection_pool); - void addIdleCallback(IdleCb cb) override { conn_pool_->addIdleCallback(cb); } - bool isIdle() const override { return conn_pool_->isIdle(); } - void startDrain() override { return conn_pool_->startDrain(); } + void addDrainedCallback(DrainedCb cb) override { conn_pool_->addDrainedCallback(cb); } void drainConnections() override { conn_pool_->drainConnections(); } void closeConnections() override { conn_pool_->closeConnections(); } ConnectionPool::Cancellable* newConnection(Tcp::ConnectionPool::Callbacks& callbacks) override { @@ -157,7 +154,6 @@ class ConnPoolBase : public Tcp::ConnectionPool::Instance { Event::MockDispatcher& mock_dispatcher_; NiceMock* mock_upstream_ready_cb_; std::vector test_conns_; - Upstream::HostSharedPtr host_; Network::ConnectionCallbacks* callbacks_ = nullptr; bool test_new_connection_pool_; Network::ConnectionSocket::OptionsSharedPtr options_; @@ -977,16 +973,16 @@ TEST_P(TcpConnPoolImplTest, ConnectionStateWithConcurrentConnections) { TEST_P(TcpConnPoolImplTest, DrainCallback) { initialize(); ReadyWatcher drained; + EXPECT_CALL(drained, ready()); - conn_pool_->addIdleCallback([&]() -> void { drained.ready(); }); - conn_pool_->startDrain(); + conn_pool_->addDrainedCallback([&]() -> void { drained.ready(); }); ActiveTestConn c1(*this, 0, ActiveTestConn::Type::CreateConnection); ActiveTestConn c2(*this, 0, ActiveTestConn::Type::Pending); c2.handle_->cancel(ConnectionPool::CancelPolicy::Default); EXPECT_CALL(*conn_pool_, onConnReleasedForTest()); - EXPECT_CALL(drained, ready()).Times(AtLeast(1)); + EXPECT_CALL(drained, ready()); c1.releaseConn(); EXPECT_CALL(*conn_pool_, onConnDestroyedForTest()); @@ -1006,13 +1002,11 @@ TEST_P(TcpConnPoolImplTest, DrainWhileConnecting) { Tcp::ConnectionPool::Cancellable* handle = conn_pool_->newConnection(callbacks); EXPECT_NE(nullptr, handle); - conn_pool_->addIdleCallback([&]() -> void { drained.ready(); }); - conn_pool_->startDrain(); - + conn_pool_->addDrainedCallback([&]() -> void { drained.ready(); }); if (test_new_connection_pool_) { // The shared connection pool removes and closes connecting clients if there are no // pending requests. - EXPECT_CALL(drained, ready()).Times(AtLeast(1)); + EXPECT_CALL(drained, ready()); handle->cancel(ConnectionPool::CancelPolicy::Default); } else { handle->cancel(ConnectionPool::CancelPolicy::Default); @@ -1032,12 +1026,11 @@ TEST_P(TcpConnPoolImplTest, DrainOnClose) { initialize(); ReadyWatcher drained; EXPECT_CALL(drained, ready()); - conn_pool_->addIdleCallback([&]() -> void { drained.ready(); }); - conn_pool_->startDrain(); + conn_pool_->addDrainedCallback([&]() -> void { drained.ready(); }); ActiveTestConn c1(*this, 0, ActiveTestConn::Type::CreateConnection); - EXPECT_CALL(drained, ready()).Times(AtLeast(1)); + EXPECT_CALL(drained, ready()); EXPECT_CALL(c1.callbacks_.callbacks_, onEvent(Network::ConnectionEvent::RemoteClose)) .WillOnce(Invoke([&](Network::ConnectionEvent event) -> void { EXPECT_EQ(Network::ConnectionEvent::RemoteClose, event); @@ -1118,25 +1111,6 @@ TEST_P(TcpConnPoolImplTest, RequestCapacity) { conn_pool_->test_conns_[2].connection_->raiseEvent(Network::ConnectionEvent::RemoteClose); } -// Test that connections that are closed due to idle timeout causes the idle callback to be fired. -TEST_P(TcpConnPoolImplTest, TestIdleTimeout) { - initialize(); - testing::MockFunction idle_callback; - conn_pool_->addIdleCallback(idle_callback.AsStdFunction()); - - EXPECT_CALL(idle_callback, Call()); - ActiveTestConn c1(*this, 0, ActiveTestConn::Type::CreateConnection); - EXPECT_CALL(*conn_pool_, onConnReleasedForTest()); - c1.releaseConn(); - conn_pool_->test_conns_[0].connection_->raiseEvent(Network::ConnectionEvent::RemoteClose); - - testing::MockFunction drained_callback; - EXPECT_CALL(idle_callback, Call()); - conn_pool_->startDrain(); - EXPECT_CALL(*conn_pool_, onConnDestroyedForTest()); - dispatcher_.clearDeferredDeleteList(); -} - // Test that maybePreconnect is passed up to the base class implementation. TEST_P(TcpConnPoolImplTest, TestPreconnect) { initialize(); diff --git a/test/common/upstream/cluster_manager_impl_test.cc b/test/common/upstream/cluster_manager_impl_test.cc index e6edf516d24da..fa5f863ae53ff 100644 --- a/test/common/upstream/cluster_manager_impl_test.cc +++ b/test/common/upstream/cluster_manager_impl_test.cc @@ -55,8 +55,6 @@ using ::testing::ReturnNew; using ::testing::ReturnRef; using ::testing::SaveArg; -using namespace std::chrono_literals; - envoy::config::bootstrap::v3::Bootstrap parseBootstrapFromV3Yaml(const std::string& yaml, bool avoid_boosting = true) { envoy::config::bootstrap::v3::Bootstrap bootstrap; @@ -1640,14 +1638,12 @@ TEST_F(ClusterManagerImplTest, DynamicAddRemove) { EXPECT_EQ(1UL, cluster_manager_->clusters().active_clusters_.size()); Http::ConnectionPool::MockInstance* cp = new Http::ConnectionPool::MockInstance(); EXPECT_CALL(factory_, allocateConnPool_(_, _, _, _, _)).WillOnce(Return(cp)); - EXPECT_CALL(*cp, addIdleCallback(_)); EXPECT_EQ(cp, HttpPoolDataPeer::getPool(cluster_manager_->getThreadLocalCluster("fake_cluster") ->httpConnPool(ResourcePriority::Default, Http::Protocol::Http11, nullptr))); Tcp::ConnectionPool::MockInstance* cp2 = new Tcp::ConnectionPool::MockInstance(); EXPECT_CALL(factory_, allocateTcpConnPool_(_)).WillOnce(Return(cp2)); - EXPECT_CALL(*cp2, addIdleCallback(_)); EXPECT_EQ(cp2, TcpPoolDataPeer::getPool(cluster_manager_->getThreadLocalCluster("fake_cluster") ->tcpConnPool(ResourcePriority::Default, nullptr))); @@ -1663,9 +1659,11 @@ TEST_F(ClusterManagerImplTest, DynamicAddRemove) { // Now remove the cluster. This should drain the connection pools, but not affect // tcp connections. + Http::ConnectionPool::Instance::DrainedCb drained_cb; + Tcp::ConnectionPool::Instance::DrainedCb drained_cb2; EXPECT_CALL(*callbacks, onClusterRemoval(_)); - EXPECT_CALL(*cp, startDrain()); - EXPECT_CALL(*cp2, startDrain()); + EXPECT_CALL(*cp, addDrainedCallback(_)).WillOnce(SaveArg<0>(&drained_cb)); + EXPECT_CALL(*cp2, addDrainedCallback(_)).WillOnce(SaveArg<0>(&drained_cb2)); EXPECT_TRUE(cluster_manager_->removeCluster("fake_cluster")); EXPECT_EQ(nullptr, cluster_manager_->getThreadLocalCluster("fake_cluster")); EXPECT_EQ(0UL, cluster_manager_->clusters().active_clusters_.size()); @@ -1678,6 +1676,9 @@ TEST_F(ClusterManagerImplTest, DynamicAddRemove) { // Remove an unknown cluster. EXPECT_FALSE(cluster_manager_->removeCluster("foo")); + drained_cb(); + drained_cb2(); + checkStats(1 /*added*/, 1 /*modified*/, 1 /*removed*/, 0 /*active*/, 0 /*warming*/); EXPECT_TRUE(Mock::VerifyAndClearExpectations(cluster1.get())); @@ -1777,8 +1778,8 @@ TEST_F(ClusterManagerImplTest, CloseHttpConnectionsOnHealthFailure) { Outlier::MockDetector outlier_detector; ON_CALL(*cluster1, outlierDetector()).WillByDefault(Return(&outlier_detector)); - Http::ConnectionPool::MockInstance* cp1 = new NiceMock(); - Http::ConnectionPool::MockInstance* cp2 = new NiceMock(); + Http::ConnectionPool::MockInstance* cp1 = new Http::ConnectionPool::MockInstance(); + Http::ConnectionPool::MockInstance* cp2 = new Http::ConnectionPool::MockInstance(); { InSequence s; @@ -1824,54 +1825,6 @@ TEST_F(ClusterManagerImplTest, CloseHttpConnectionsOnHealthFailure) { EXPECT_TRUE(Mock::VerifyAndClearExpectations(cluster1.get())); } -// Test that we close all HTTP connection pool connections when there is a host health failure. -// Verify that the pool gets deleted if it is idle, and that a crash does not occur due to -// deleting a container while iterating through it (see `do_not_delete_` in -// `ClusterManagerImpl::ThreadLocalClusterManagerImpl::onHostHealthFailure()`). -TEST_F(ClusterManagerImplTest, CloseHttpConnectionsAndDeletePoolOnHealthFailure) { - const std::string json = fmt::sprintf("{\"static_resources\":{%s}}", - clustersJson({defaultStaticClusterJson("some_cluster")})); - std::shared_ptr cluster1(new NiceMock()); - cluster1->info_->name_ = "some_cluster"; - HostSharedPtr test_host = makeTestHost(cluster1->info_, "tcp://127.0.0.1:80", time_system_); - cluster1->prioritySet().getMockHostSet(0)->hosts_ = {test_host}; - ON_CALL(*cluster1, initializePhase()).WillByDefault(Return(Cluster::InitializePhase::Primary)); - - MockHealthChecker health_checker; - ON_CALL(*cluster1, healthChecker()).WillByDefault(Return(&health_checker)); - - Outlier::MockDetector outlier_detector; - ON_CALL(*cluster1, outlierDetector()).WillByDefault(Return(&outlier_detector)); - - Http::ConnectionPool::MockInstance* cp1 = new NiceMock(); - - InSequence s; - - EXPECT_CALL(factory_, clusterFromProto_(_, _, _, _)) - .WillOnce(Return(std::make_pair(cluster1, nullptr))); - EXPECT_CALL(health_checker, addHostCheckCompleteCb(_)); - EXPECT_CALL(outlier_detector, addChangedStateCb(_)); - EXPECT_CALL(*cluster1, initialize(_)) - .WillOnce(Invoke([cluster1](std::function initialize_callback) { - // Test inline init. - initialize_callback(); - })); - create(parseBootstrapFromV3Json(json)); - - EXPECT_CALL(factory_, allocateConnPool_(_, _, _, _, _)).WillOnce(Return(cp1)); - cluster_manager_->getThreadLocalCluster("some_cluster") - ->httpConnPool(ResourcePriority::Default, Http::Protocol::Http11, nullptr); - - outlier_detector.runCallbacks(test_host); - health_checker.runCallbacks(test_host, HealthTransition::Unchanged); - - EXPECT_CALL(*cp1, drainConnections()).WillOnce(Invoke([&]() { cp1->idle_cb_(); })); - test_host->healthFlagSet(Host::HealthFlag::FAILED_OUTLIER_CHECK); - outlier_detector.runCallbacks(test_host); - - EXPECT_TRUE(Mock::VerifyAndClearExpectations(cluster1.get())); -} - // Test that we close all TCP connection pool connections when there is a host health failure. TEST_F(ClusterManagerImplTest, CloseTcpConnectionPoolsOnHealthFailure) { const std::string json = fmt::sprintf("{\"static_resources\":{%s}}", @@ -1888,8 +1841,8 @@ TEST_F(ClusterManagerImplTest, CloseTcpConnectionPoolsOnHealthFailure) { Outlier::MockDetector outlier_detector; ON_CALL(*cluster1, outlierDetector()).WillByDefault(Return(&outlier_detector)); - Tcp::ConnectionPool::MockInstance* cp1 = new NiceMock(); - Tcp::ConnectionPool::MockInstance* cp2 = new NiceMock(); + Tcp::ConnectionPool::MockInstance* cp1 = new Tcp::ConnectionPool::MockInstance(); + Tcp::ConnectionPool::MockInstance* cp2 = new Tcp::ConnectionPool::MockInstance(); { InSequence s; @@ -2119,7 +2072,7 @@ TEST_F(ClusterManagerImplTest, DynamicHostRemove) { EXPECT_CALL(factory_, allocateConnPool_(_, _, _, _, _)) .Times(4) - .WillRepeatedly(ReturnNew>()); + .WillRepeatedly(ReturnNew()); // This should provide us a CP for each of the above hosts. Http::ConnectionPool::MockInstance* cp1 = HttpPoolDataPeer::getPool( @@ -2139,9 +2092,14 @@ TEST_F(ClusterManagerImplTest, DynamicHostRemove) { EXPECT_NE(cp1_high, cp2_high); EXPECT_NE(cp1, cp1_high); - EXPECT_CALL(factory_, allocateTcpConnPool_) + Http::ConnectionPool::Instance::DrainedCb drained_cb; + EXPECT_CALL(*cp1, addDrainedCallback(_)).WillOnce(SaveArg<0>(&drained_cb)); + Http::ConnectionPool::Instance::DrainedCb drained_cb_high; + EXPECT_CALL(*cp1_high, addDrainedCallback(_)).WillOnce(SaveArg<0>(&drained_cb_high)); + + EXPECT_CALL(factory_, allocateTcpConnPool_(_)) .Times(4) - .WillRepeatedly(ReturnNew>()); + .WillRepeatedly(ReturnNew()); // This should provide us a CP for each of the above hosts. Tcp::ConnectionPool::MockInstance* tcp1 = @@ -2161,19 +2119,24 @@ TEST_F(ClusterManagerImplTest, DynamicHostRemove) { EXPECT_NE(tcp1_high, tcp2_high); EXPECT_NE(tcp1, tcp1_high); + Tcp::ConnectionPool::Instance::DrainedCb tcp_drained_cb; + EXPECT_CALL(*tcp1, addDrainedCallback(_)).WillOnce(SaveArg<0>(&tcp_drained_cb)); + Tcp::ConnectionPool::Instance::DrainedCb tcp_drained_cb_high; + EXPECT_CALL(*tcp1_high, addDrainedCallback(_)).WillOnce(SaveArg<0>(&tcp_drained_cb_high)); + // Remove the first host, this should lead to the first cp being drained. dns_timer_->invokeCallback(); dns_callback(Network::DnsResolver::ResolutionStatus::Success, TestUtility::makeDnsResponse({"127.0.0.2"})); - cp1->idle_cb_(); - cp1->idle_cb_ = nullptr; - tcp1->idle_cb_(); - tcp1->idle_cb_ = nullptr; - EXPECT_CALL(factory_.tls_.dispatcher_, deferredDelete_(_)).Times(2); - cp1_high->idle_cb_(); - cp1_high->idle_cb_ = nullptr; - tcp1_high->idle_cb_(); - tcp1_high->idle_cb_ = nullptr; + drained_cb(); + drained_cb = nullptr; + tcp_drained_cb(); + tcp_drained_cb = nullptr; + EXPECT_CALL(factory_.tls_.dispatcher_, deferredDelete_(_)).Times(4); + drained_cb_high(); + drained_cb_high = nullptr; + tcp_drained_cb_high(); + tcp_drained_cb_high = nullptr; // Make sure we get back the same connection pool for the 2nd host as we did before the change. Http::ConnectionPool::MockInstance* cp3 = HttpPoolDataPeer::getPool( @@ -2274,7 +2237,7 @@ TEST_F(ClusterManagerImplTest, DynamicHostRemoveWithTls) { EXPECT_CALL(factory_, allocateConnPool_(_, _, _, _, _)) .Times(4) - .WillRepeatedly(ReturnNew>()); + .WillRepeatedly(ReturnNew()); // This should provide us a CP for each of the above hosts. Http::ConnectionPool::MockInstance* cp1 = HttpPoolDataPeer::getPool( @@ -2294,9 +2257,14 @@ TEST_F(ClusterManagerImplTest, DynamicHostRemoveWithTls) { EXPECT_NE(cp1_high, cp2_high); EXPECT_NE(cp1, cp1_high); - EXPECT_CALL(factory_, allocateTcpConnPool_) + Http::ConnectionPool::Instance::DrainedCb drained_cb; + EXPECT_CALL(*cp1, addDrainedCallback(_)).WillOnce(SaveArg<0>(&drained_cb)); + Http::ConnectionPool::Instance::DrainedCb drained_cb_high; + EXPECT_CALL(*cp1_high, addDrainedCallback(_)).WillOnce(SaveArg<0>(&drained_cb_high)); + + EXPECT_CALL(factory_, allocateTcpConnPool_(_)) .Times(10) - .WillRepeatedly(ReturnNew>()); + .WillRepeatedly(ReturnNew()); // This should provide us a CP for each of the above hosts, and for different SNIs Tcp::ConnectionPool::MockInstance* tcp1 = @@ -2358,22 +2326,33 @@ TEST_F(ClusterManagerImplTest, DynamicHostRemoveWithTls) { EXPECT_CALL(factory_.tls_.dispatcher_, deferredDelete_(_)).Times(6); + Tcp::ConnectionPool::Instance::DrainedCb tcp_drained_cb; + EXPECT_CALL(*tcp1, addDrainedCallback(_)).WillOnce(SaveArg<0>(&tcp_drained_cb)); + Tcp::ConnectionPool::Instance::DrainedCb tcp_drained_cb_high; + EXPECT_CALL(*tcp1_high, addDrainedCallback(_)).WillOnce(SaveArg<0>(&tcp_drained_cb_high)); + + Tcp::ConnectionPool::Instance::DrainedCb tcp_drained_cb_example_com; + EXPECT_CALL(*tcp1_example_com, addDrainedCallback(_)) + .WillOnce(SaveArg<0>(&tcp_drained_cb_example_com)); + Tcp::ConnectionPool::Instance::DrainedCb tcp_drained_cb_ibm_com; + EXPECT_CALL(*tcp1_ibm_com, addDrainedCallback(_)).WillOnce(SaveArg<0>(&tcp_drained_cb_ibm_com)); + // Remove the first host, this should lead to the first cp being drained. dns_timer_->invokeCallback(); dns_callback(Network::DnsResolver::ResolutionStatus::Success, TestUtility::makeDnsResponse({"127.0.0.2"})); - cp1->idle_cb_(); - cp1->idle_cb_ = nullptr; - tcp1->idle_cb_(); - tcp1->idle_cb_ = nullptr; - cp1_high->idle_cb_(); - cp1_high->idle_cb_ = nullptr; - tcp1_high->idle_cb_(); - tcp1_high->idle_cb_ = nullptr; - tcp1_example_com->idle_cb_(); - tcp1_example_com->idle_cb_ = nullptr; - tcp1_ibm_com->idle_cb_(); - tcp1_ibm_com->idle_cb_ = nullptr; + drained_cb(); + drained_cb = nullptr; + tcp_drained_cb(); + tcp_drained_cb = nullptr; + drained_cb_high(); + drained_cb_high = nullptr; + tcp_drained_cb_high(); + tcp_drained_cb_high = nullptr; + tcp_drained_cb_example_com(); + tcp_drained_cb_example_com = nullptr; + tcp_drained_cb_ibm_com(); + tcp_drained_cb_ibm_com = nullptr; // Make sure we get back the same connection pool for the 2nd host as we did before the change. Http::ConnectionPool::MockInstance* cp3 = HttpPoolDataPeer::getPool( @@ -2834,10 +2813,10 @@ TEST_F(ClusterManagerImplTest, DynamicHostRemoveDefaultPriority) { TestUtility::makeDnsResponse({"127.0.0.2"})); EXPECT_CALL(factory_, allocateConnPool_(_, _, _, _, _)) - .WillOnce(ReturnNew>()); + .WillOnce(ReturnNew()); - EXPECT_CALL(factory_, allocateTcpConnPool_) - .WillOnce(ReturnNew>()); + EXPECT_CALL(factory_, allocateTcpConnPool_(_)) + .WillOnce(ReturnNew()); Http::ConnectionPool::MockInstance* cp = HttpPoolDataPeer::getPool( cluster_manager_->getThreadLocalCluster("cluster_1") @@ -2848,14 +2827,11 @@ TEST_F(ClusterManagerImplTest, DynamicHostRemoveDefaultPriority) { ->tcpConnPool(ResourcePriority::Default, nullptr)); // Immediate drain, since this can happen with the HTTP codecs. - EXPECT_CALL(*cp, startDrain()).WillOnce(Invoke([&]() { - cp->idle_cb_(); - cp->idle_cb_ = nullptr; - })); - EXPECT_CALL(*tcp, startDrain()).WillOnce(Invoke([&]() { - tcp->idle_cb_(); - tcp->idle_cb_ = nullptr; - })); + EXPECT_CALL(*cp, addDrainedCallback(_)) + .WillOnce(Invoke([](Http::ConnectionPool::Instance::DrainedCb cb) { cb(); })); + + EXPECT_CALL(*tcp, addDrainedCallback(_)) + .WillOnce(Invoke([](Tcp::ConnectionPool::Instance::DrainedCb cb) { cb(); })); // Remove the first host, this should lead to the cp being drained, without // crash. @@ -2924,25 +2900,24 @@ TEST_F(ClusterManagerImplTest, ConnPoolDestroyWithDraining) { TestUtility::makeDnsResponse({"127.0.0.2"})); MockConnPoolWithDestroy* mock_cp = new MockConnPoolWithDestroy(); - Http::ConnectionPool::Instance::IdleCb drained_cb; EXPECT_CALL(factory_, allocateConnPool_(_, _, _, _, _)).WillOnce(Return(mock_cp)); - EXPECT_CALL(*mock_cp, addIdleCallback(_)).WillOnce(SaveArg<0>(&drained_cb)); - EXPECT_CALL(*mock_cp, startDrain()); - MockTcpConnPoolWithDestroy* mock_tcp = new NiceMock(); - Tcp::ConnectionPool::Instance::IdleCb tcp_drained_cb; - EXPECT_CALL(factory_, allocateTcpConnPool_).WillOnce(Return(mock_tcp)); - EXPECT_CALL(*mock_tcp, addIdleCallback(_)).WillOnce(SaveArg<0>(&tcp_drained_cb)); - EXPECT_CALL(*mock_tcp, startDrain()); + MockTcpConnPoolWithDestroy* mock_tcp = new MockTcpConnPoolWithDestroy(); + EXPECT_CALL(factory_, allocateTcpConnPool_(_)).WillOnce(Return(mock_tcp)); - HttpPoolDataPeer::getPool( + Http::ConnectionPool::MockInstance* cp = HttpPoolDataPeer::getPool( cluster_manager_->getThreadLocalCluster("cluster_1") ->httpConnPool(ResourcePriority::Default, Http::Protocol::Http11, nullptr)); - TcpPoolDataPeer::getPool(cluster_manager_->getThreadLocalCluster("cluster_1") - ->tcpConnPool(ResourcePriority::Default, nullptr)); + Tcp::ConnectionPool::MockInstance* tcp = + TcpPoolDataPeer::getPool(cluster_manager_->getThreadLocalCluster("cluster_1") + ->tcpConnPool(ResourcePriority::Default, nullptr)); // Remove the first host, this should lead to the cp being drained. + Http::ConnectionPool::Instance::DrainedCb drained_cb; + EXPECT_CALL(*cp, addDrainedCallback(_)).WillOnce(SaveArg<0>(&drained_cb)); + Tcp::ConnectionPool::Instance::DrainedCb tcp_drained_cb; + EXPECT_CALL(*tcp, addDrainedCallback(_)).WillOnce(SaveArg<0>(&tcp_drained_cb)); dns_timer_->invokeCallback(); dns_callback(Network::DnsResolver::ResolutionStatus::Success, TestUtility::makeDnsResponse({})); @@ -3380,7 +3355,6 @@ TEST_F(ClusterManagerImplTest, UpstreamSocketOptionsPassedToTcpConnPool) { EXPECT_CALL(context, upstreamSocketOptions()).WillOnce(Return(options_to_return)); EXPECT_CALL(factory_, allocateTcpConnPool_(_)).WillOnce(Return(to_create)); - EXPECT_CALL(*to_create, addIdleCallback(_)); auto opt_cp = cluster_manager_->getThreadLocalCluster("cluster_1") ->tcpConnPool(ResourcePriority::Default, &context); @@ -3391,8 +3365,7 @@ TEST_F(ClusterManagerImplTest, UpstreamSocketOptionsPassedToConnPool) { createWithLocalClusterUpdate(); NiceMock context; - Http::ConnectionPool::MockInstance* to_create = - new NiceMock(); + Http::ConnectionPool::MockInstance* to_create = new Http::ConnectionPool::MockInstance(); Network::Socket::OptionsSharedPtr options_to_return = Network::SocketOptionFactory::buildIpTransparentOptions(); @@ -3409,10 +3382,8 @@ TEST_F(ClusterManagerImplTest, UpstreamSocketOptionsUsedInConnPoolHash) { NiceMock context1; NiceMock context2; - Http::ConnectionPool::MockInstance* to_create1 = - new NiceMock(); - Http::ConnectionPool::MockInstance* to_create2 = - new NiceMock(); + Http::ConnectionPool::MockInstance* to_create1 = new Http::ConnectionPool::MockInstance(); + Http::ConnectionPool::MockInstance* to_create2 = new Http::ConnectionPool::MockInstance(); Network::Socket::OptionsSharedPtr options1 = Network::SocketOptionFactory::buildIpTransparentOptions(); Network::Socket::OptionsSharedPtr options2 = @@ -3452,8 +3423,7 @@ TEST_F(ClusterManagerImplTest, UpstreamSocketOptionsNullIsOkay) { createWithLocalClusterUpdate(); NiceMock context; - Http::ConnectionPool::MockInstance* to_create = - new NiceMock(); + Http::ConnectionPool::MockInstance* to_create = new Http::ConnectionPool::MockInstance(); Network::Socket::OptionsSharedPtr options_to_return = nullptr; EXPECT_CALL(context, upstreamSocketOptions()).WillOnce(Return(options_to_return)); @@ -3472,7 +3442,6 @@ TEST_F(ClusterManagerImplTest, HttpPoolDataForwardsCallsToConnectionPool) { Network::Socket::OptionsSharedPtr options_to_return = nullptr; EXPECT_CALL(factory_, allocateConnPool_(_, _, _, _, _)).WillOnce(Return(pool_mock)); - EXPECT_CALL(*pool_mock, addIdleCallback(_)); auto opt_cp = cluster_manager_->getThreadLocalCluster("cluster_1") ->httpConnPool(ResourcePriority::Default, Http::Protocol::Http11, &context); @@ -3481,9 +3450,9 @@ TEST_F(ClusterManagerImplTest, HttpPoolDataForwardsCallsToConnectionPool) { EXPECT_CALL(*pool_mock, hasActiveConnections()).WillOnce(Return(true)); opt_cp.value().hasActiveConnections(); - ConnectionPool::Instance::IdleCb drained_cb = []() {}; - EXPECT_CALL(*pool_mock, addIdleCallback(_)); - opt_cp.value().addIdleCallback(drained_cb); + ConnectionPool::Instance::DrainedCb drained_cb = []() {}; + EXPECT_CALL(*pool_mock, addDrainedCallback(_)); + opt_cp.value().addDrainedCallback(drained_cb); } class TestUpstreamNetworkFilter : public Network::WriteFilter { @@ -4395,11 +4364,11 @@ TEST_F(ClusterManagerImplTest, ConnPoolsDrainedOnHostSetChange) { EXPECT_CALL(factory_, allocateConnPool_(_, _, _, _, _)) .Times(3) - .WillRepeatedly(ReturnNew>()); + .WillRepeatedly(ReturnNew()); - EXPECT_CALL(factory_, allocateTcpConnPool_) + EXPECT_CALL(factory_, allocateTcpConnPool_(_)) .Times(3) - .WillRepeatedly(ReturnNew>()); + .WillRepeatedly(ReturnNew()); // This should provide us a CP for each of the above hosts. Http::ConnectionPool::MockInstance* cp1 = HttpPoolDataPeer::getPool( @@ -4421,22 +4390,17 @@ TEST_F(ClusterManagerImplTest, ConnPoolsDrainedOnHostSetChange) { EXPECT_NE(cp1, cp2); EXPECT_NE(tcp1, tcp2); - EXPECT_CALL(*cp2, startDrain()).WillOnce(Invoke([&]() { - cp2->idle_cb_(); - cp2->idle_cb_ = nullptr; - })); - EXPECT_CALL(*cp1, startDrain()).WillOnce(Invoke([&]() { - cp1->idle_cb_(); - cp1->idle_cb_ = nullptr; - })); - EXPECT_CALL(*tcp1, startDrain()).WillOnce(Invoke([&]() { - tcp1->idle_cb_(); - tcp1->idle_cb_ = nullptr; - })); - EXPECT_CALL(*tcp2, startDrain()).WillOnce(Invoke([&]() { - tcp2->idle_cb_(); - tcp2->idle_cb_ = nullptr; - })); + EXPECT_CALL(*cp2, addDrainedCallback(_)) + .WillOnce(Invoke([](Http::ConnectionPool::Instance::DrainedCb cb) { cb(); })); + + EXPECT_CALL(*cp1, addDrainedCallback(_)) + .WillOnce(Invoke([](Http::ConnectionPool::Instance::DrainedCb cb) { cb(); })); + + EXPECT_CALL(*tcp1, addDrainedCallback(_)) + .WillOnce(Invoke([](Tcp::ConnectionPool::Instance::DrainedCb cb) { cb(); })); + + EXPECT_CALL(*tcp2, addDrainedCallback(_)) + .WillOnce(Invoke([](Tcp::ConnectionPool::Instance::DrainedCb cb) { cb(); })); HostVector hosts_removed; hosts_removed.push_back(host2); @@ -4459,14 +4423,11 @@ TEST_F(ClusterManagerImplTest, ConnPoolsDrainedOnHostSetChange) { HostVector hosts_added; hosts_added.push_back(host3); - EXPECT_CALL(*cp1, startDrain()).WillOnce(Invoke([&]() { - cp1->idle_cb_(); - cp1->idle_cb_ = nullptr; - })); - EXPECT_CALL(*tcp1, startDrain()).WillOnce(Invoke([&]() { - tcp1->idle_cb_(); - tcp1->idle_cb_ = nullptr; - })); + EXPECT_CALL(*cp1, addDrainedCallback(_)) + .WillOnce(Invoke([](Http::ConnectionPool::Instance::DrainedCb cb) { cb(); })); + + EXPECT_CALL(*tcp1, addDrainedCallback(_)) + .WillOnce(Invoke([](Tcp::ConnectionPool::Instance::DrainedCb cb) { cb(); })); // Adding host3 should drain connection pool for host1. cluster.prioritySet().updateHosts( @@ -4510,11 +4471,11 @@ TEST_F(ClusterManagerImplTest, ConnPoolsNotDrainedOnHostSetChange) { EXPECT_CALL(factory_, allocateConnPool_(_, _, _, _, _)) .Times(1) - .WillRepeatedly(ReturnNew>()); + .WillRepeatedly(ReturnNew()); - EXPECT_CALL(factory_, allocateTcpConnPool_) + EXPECT_CALL(factory_, allocateTcpConnPool_(_)) .Times(1) - .WillRepeatedly(ReturnNew>()); + .WillRepeatedly(ReturnNew()); // This should provide us a CP for each of the above hosts. Http::ConnectionPool::MockInstance* cp1 = HttpPoolDataPeer::getPool( @@ -4539,93 +4500,6 @@ TEST_F(ClusterManagerImplTest, ConnPoolsNotDrainedOnHostSetChange) { hosts_added, {}, 100); } -TEST_F(ClusterManagerImplTest, ConnPoolsIdleDeleted) { - const std::string yaml = R"EOF( - static_resources: - clusters: - - name: cluster_1 - connect_timeout: 0.25s - lb_policy: ROUND_ROBIN - type: STATIC - )EOF"; - - ReadyWatcher initialized; - EXPECT_CALL(initialized, ready()); - create(parseBootstrapFromV3Yaml(yaml)); - - // Set up for an initialize callback. - cluster_manager_->setInitializedCb([&]() -> void { initialized.ready(); }); - - std::unique_ptr callbacks(new NiceMock()); - ClusterUpdateCallbacksHandlePtr cb = - cluster_manager_->addThreadLocalClusterUpdateCallbacks(*callbacks); - - Cluster& cluster = cluster_manager_->activeClusters().begin()->second; - - // Set up the HostSet. - HostSharedPtr host1 = makeTestHost(cluster.info(), "tcp://127.0.0.1:80", time_system_); - - HostVector hosts{host1}; - auto hosts_ptr = std::make_shared(hosts); - - // Sending non-mergeable updates. - cluster.prioritySet().updateHosts( - 0, HostSetImpl::partitionHosts(hosts_ptr, HostsPerLocalityImpl::empty()), nullptr, hosts, {}, - 100); - - { - auto* cp1 = new NiceMock(); - EXPECT_CALL(factory_, allocateConnPool_(_, _, _, _, _)).WillOnce(Return(cp1)); - std::function idle_callback; - EXPECT_CALL(*cp1, addIdleCallback(_)).WillOnce(SaveArg<0>(&idle_callback)); - - EXPECT_EQ(cp1, HttpPoolDataPeer::getPool(cluster_manager_->getThreadLocalCluster("cluster_1") - ->httpConnPool(ResourcePriority::Default, - Http::Protocol::Http11, nullptr))); - // Request the same pool again and verify that it produces the same output - EXPECT_EQ(cp1, HttpPoolDataPeer::getPool(cluster_manager_->getThreadLocalCluster("cluster_1") - ->httpConnPool(ResourcePriority::Default, - Http::Protocol::Http11, nullptr))); - - // Trigger the idle callback so we remove the connection pool - idle_callback(); - - auto* cp2 = new NiceMock(); - EXPECT_CALL(factory_, allocateConnPool_(_, _, _, _, _)).WillOnce(Return(cp2)); - EXPECT_CALL(*cp2, addIdleCallback(_)); - - // This time we expect cp2 since cp1 will have been destroyed - EXPECT_EQ(cp2, HttpPoolDataPeer::getPool(cluster_manager_->getThreadLocalCluster("cluster_1") - ->httpConnPool(ResourcePriority::Default, - Http::Protocol::Http11, nullptr))); - } - - { - auto* tcp1 = new NiceMock(); - EXPECT_CALL(factory_, allocateTcpConnPool_).WillOnce(Return(tcp1)); - std::function idle_callback; - EXPECT_CALL(*tcp1, addIdleCallback(_)).WillOnce(SaveArg<0>(&idle_callback)); - EXPECT_EQ(tcp1, - TcpPoolDataPeer::getPool(cluster_manager_->getThreadLocalCluster("cluster_1") - ->tcpConnPool(ResourcePriority::Default, nullptr))); - // Request the same pool again and verify that it produces the same output - EXPECT_EQ(tcp1, - TcpPoolDataPeer::getPool(cluster_manager_->getThreadLocalCluster("cluster_1") - ->tcpConnPool(ResourcePriority::Default, nullptr))); - - // Trigger the idle callback so we remove the connection pool - idle_callback(); - - auto* tcp2 = new NiceMock(); - EXPECT_CALL(factory_, allocateTcpConnPool_).WillOnce(Return(tcp2)); - - // This time we expect tcp2 since tcp1 will have been destroyed - EXPECT_EQ(tcp2, - TcpPoolDataPeer::getPool(cluster_manager_->getThreadLocalCluster("cluster_1") - ->tcpConnPool(ResourcePriority::Default, nullptr))); - } -} - TEST_F(ClusterManagerImplTest, InvalidPriorityLocalClusterNameStatic) { std::string yaml = R"EOF( static_resources: @@ -4733,7 +4607,6 @@ TEST_F(ClusterManagerImplTest, ConnectionPoolPerDownstreamConnection) { std::vector conn_pool_vector; for (size_t i = 0; i < 3; ++i) { conn_pool_vector.push_back(new Http::ConnectionPool::MockInstance()); - EXPECT_CALL(*conn_pool_vector.back(), addIdleCallback(_)); EXPECT_CALL(factory_, allocateConnPool_(_, _, _, _, _)) .WillOnce(Return(conn_pool_vector.back())); EXPECT_CALL(downstream_connection, hashKey) @@ -4845,7 +4718,7 @@ TEST_F(PreconnectTest, PreconnectOn) { ->httpConnPool(ResourcePriority::Default, Http::Protocol::Http11, nullptr); http_handle.value().newStream(decoder_, http_callbacks_); - EXPECT_CALL(factory_, allocateTcpConnPool_) + EXPECT_CALL(factory_, allocateTcpConnPool_(_)) .Times(2) .WillRepeatedly(ReturnNew>()); auto tcp_handle = cluster_manager_->getThreadLocalCluster("cluster_1") diff --git a/test/common/upstream/conn_pool_map_impl_test.cc b/test/common/upstream/conn_pool_map_impl_test.cc index 5b8cd99f2b78b..6ce14d2becd6a 100644 --- a/test/common/upstream/conn_pool_map_impl_test.cc +++ b/test/common/upstream/conn_pool_map_impl_test.cc @@ -69,10 +69,10 @@ class ConnPoolMapImplTest : public testing::Test { }; } - TestMap::PoolFactory getFactoryExpectIdleCb(Http::ConnectionPool::Instance::IdleCb* cb) { + TestMap::PoolFactory getFactoryExpectDrainedCb(Http::ConnectionPool::Instance::DrainedCb* cb) { return [this, cb]() { auto pool = std::make_unique>(); - EXPECT_CALL(*pool, addIdleCallback(_)).WillOnce(SaveArg<0>(cb)); + EXPECT_CALL(*pool, addDrainedCallback(_)).WillOnce(SaveArg<0>(cb)); mock_pools_.push_back(pool.get()); return pool; }; @@ -153,14 +153,13 @@ TEST_F(ConnPoolMapImplTest, CallbacksPassedToPools) { test_map->getPool(1, getBasicFactory()); test_map->getPool(2, getBasicFactory()); - Http::ConnectionPool::Instance::IdleCb cb1; - EXPECT_CALL(*mock_pools_[0], addIdleCallback(_)).WillOnce(SaveArg<0>(&cb1)); - Http::ConnectionPool::Instance::IdleCb cb2; - EXPECT_CALL(*mock_pools_[1], addIdleCallback(_)).WillOnce(SaveArg<0>(&cb2)); + Http::ConnectionPool::Instance::DrainedCb cb1; + EXPECT_CALL(*mock_pools_[0], addDrainedCallback(_)).WillOnce(SaveArg<0>(&cb1)); + Http::ConnectionPool::Instance::DrainedCb cb2; + EXPECT_CALL(*mock_pools_[1], addDrainedCallback(_)).WillOnce(SaveArg<0>(&cb2)); ReadyWatcher watcher; - test_map->addIdleCallback([&watcher]() { watcher.ready(); }); - test_map->startDrain(); + test_map->addDrainedCallback([&watcher] { watcher.ready(); }); EXPECT_CALL(watcher, ready()).Times(2); cb1(); @@ -172,14 +171,13 @@ TEST_F(ConnPoolMapImplTest, CallbacksCachedAndPassedOnCreation) { TestMapPtr test_map = makeTestMap(); ReadyWatcher watcher; - test_map->addIdleCallback([&watcher]() { watcher.ready(); }); - test_map->startDrain(); + test_map->addDrainedCallback([&watcher] { watcher.ready(); }); - Http::ConnectionPool::Instance::IdleCb cb1; - test_map->getPool(1, getFactoryExpectIdleCb(&cb1)); + Http::ConnectionPool::Instance::DrainedCb cb1; + test_map->getPool(1, getFactoryExpectDrainedCb(&cb1)); - Http::ConnectionPool::Instance::IdleCb cb2; - test_map->getPool(2, getFactoryExpectIdleCb(&cb2)); + Http::ConnectionPool::Instance::DrainedCb cb2; + test_map->getPool(2, getFactoryExpectDrainedCb(&cb2)); EXPECT_CALL(watcher, ready()).Times(2); cb1(); @@ -207,7 +205,7 @@ TEST_F(ConnPoolMapImplTest, DrainConnectionsForwarded) { TEST_F(ConnPoolMapImplTest, ClearDefersDelete) { TestMapPtr test_map = makeTestMap(); - Http::ConnectionPool::Instance::IdleCb cb1; + Http::ConnectionPool::Instance::DrainedCb cb1; test_map->getPool(1, getBasicFactory()); test_map->getPool(2, getBasicFactory()); test_map->clear(); @@ -392,19 +390,6 @@ TEST_F(ConnPoolMapImplTest, CircuitBreakerUsesProvidedPriorityHigh) { test_map->getPool(2, getBasicFactory()); } -TEST_F(ConnPoolMapImplTest, ErasePool) { - TestMapPtr test_map = makeTestMap(); - auto* pool_ptr = &test_map->getPool(1, getBasicFactory()).value().get(); - EXPECT_EQ(1, test_map->size()); - EXPECT_EQ(pool_ptr, &test_map->getPool(1, getNeverCalledFactory()).value().get()); - EXPECT_EQ(1, test_map->size()); - EXPECT_FALSE(test_map->erasePool(2)); - EXPECT_EQ(1, test_map->size()); - EXPECT_TRUE(test_map->erasePool(1)); - EXPECT_EQ(0, test_map->size()); - EXPECT_NE(pool_ptr, &test_map->getPool(1, getBasicFactory()).value().get()); -} - // The following tests only die in debug builds, so don't run them if this isn't one. #if !defined(NDEBUG) class ConnPoolMapImplDeathTest : public ConnPoolMapImplTest {}; @@ -413,10 +398,10 @@ TEST_F(ConnPoolMapImplDeathTest, ReentryClearTripsAssert) { TestMapPtr test_map = makeTestMap(); test_map->getPool(1, getBasicFactory()); - ON_CALL(*mock_pools_[0], addIdleCallback(_)) - .WillByDefault(Invoke([](Http::ConnectionPool::Instance::IdleCb cb) { cb(); })); + ON_CALL(*mock_pools_[0], addDrainedCallback(_)) + .WillByDefault(Invoke([](Http::ConnectionPool::Instance::DrainedCb cb) { cb(); })); - EXPECT_DEATH(test_map->addIdleCallback([&test_map]() { test_map->clear(); }), + EXPECT_DEATH(test_map->addDrainedCallback([&test_map] { test_map->clear(); }), ".*Details: A resource should only be entered once"); } @@ -424,11 +409,11 @@ TEST_F(ConnPoolMapImplDeathTest, ReentryGetPoolTripsAssert) { TestMapPtr test_map = makeTestMap(); test_map->getPool(1, getBasicFactory()); - ON_CALL(*mock_pools_[0], addIdleCallback(_)) - .WillByDefault(Invoke([](Http::ConnectionPool::Instance::IdleCb cb) { cb(); })); + ON_CALL(*mock_pools_[0], addDrainedCallback(_)) + .WillByDefault(Invoke([](Http::ConnectionPool::Instance::DrainedCb cb) { cb(); })); EXPECT_DEATH( - test_map->addIdleCallback([&test_map, this]() { test_map->getPool(2, getBasicFactory()); }), + test_map->addDrainedCallback([&test_map, this] { test_map->getPool(2, getBasicFactory()); }), ".*Details: A resource should only be entered once"); } @@ -436,10 +421,10 @@ TEST_F(ConnPoolMapImplDeathTest, ReentryDrainConnectionsTripsAssert) { TestMapPtr test_map = makeTestMap(); test_map->getPool(1, getBasicFactory()); - ON_CALL(*mock_pools_[0], addIdleCallback(_)) - .WillByDefault(Invoke([](Http::ConnectionPool::Instance::IdleCb cb) { cb(); })); + ON_CALL(*mock_pools_[0], addDrainedCallback(_)) + .WillByDefault(Invoke([](Http::ConnectionPool::Instance::DrainedCb cb) { cb(); })); - EXPECT_DEATH(test_map->addIdleCallback([&test_map]() { test_map->clear(); }), + EXPECT_DEATH(test_map->addDrainedCallback([&test_map] { test_map->drainConnections(); }), ".*Details: A resource should only be entered once"); } @@ -447,10 +432,10 @@ TEST_F(ConnPoolMapImplDeathTest, ReentryAddDrainedCallbackTripsAssert) { TestMapPtr test_map = makeTestMap(); test_map->getPool(1, getBasicFactory()); - ON_CALL(*mock_pools_[0], addIdleCallback(_)) - .WillByDefault(Invoke([](Http::ConnectionPool::Instance::IdleCb cb) { cb(); })); + ON_CALL(*mock_pools_[0], addDrainedCallback(_)) + .WillByDefault(Invoke([](Http::ConnectionPool::Instance::DrainedCb cb) { cb(); })); - EXPECT_DEATH(test_map->addIdleCallback([&test_map]() { test_map->addIdleCallback([]() {}); }), + EXPECT_DEATH(test_map->addDrainedCallback([&test_map] { test_map->addDrainedCallback([]() {}); }), ".*Details: A resource should only be entered once"); } #endif // !defined(NDEBUG) diff --git a/test/common/upstream/priority_conn_pool_map_impl_test.cc b/test/common/upstream/priority_conn_pool_map_impl_test.cc index a7ade68348547..60252e332c58c 100644 --- a/test/common/upstream/priority_conn_pool_map_impl_test.cc +++ b/test/common/upstream/priority_conn_pool_map_impl_test.cc @@ -36,13 +36,6 @@ class PriorityConnPoolMapImplTest : public testing::Test { }; } - TestMap::PoolFactory getNeverCalledFactory() { - return []() { - EXPECT_TRUE(false); - return nullptr; - }; - } - protected: NiceMock dispatcher_; std::vector*> mock_pools_; @@ -111,24 +104,6 @@ TEST_F(PriorityConnPoolMapImplTest, TestClearEmptiesOut) { EXPECT_EQ(test_map->size(), 0); } -TEST_F(PriorityConnPoolMapImplTest, TestErase) { - TestMapPtr test_map = makeTestMap(); - - auto* pool_ptr = &test_map->getPool(ResourcePriority::High, 1, getBasicFactory()).value().get(); - EXPECT_EQ(1, test_map->size()); - EXPECT_EQ(pool_ptr, - &test_map->getPool(ResourcePriority::High, 1, getNeverCalledFactory()).value().get()); - EXPECT_FALSE(test_map->erasePool(ResourcePriority::Default, 1)); - EXPECT_NE(pool_ptr, - &test_map->getPool(ResourcePriority::Default, 1, getBasicFactory()).value().get()); - EXPECT_EQ(2, test_map->size()); - EXPECT_TRUE(test_map->erasePool(ResourcePriority::Default, 1)); - EXPECT_TRUE(test_map->erasePool(ResourcePriority::High, 1)); - EXPECT_EQ(0, test_map->size()); - EXPECT_NE(pool_ptr, - &test_map->getPool(ResourcePriority::High, 1, getBasicFactory()).value().get()); -} - // Show that the drained callback is invoked once for the high priority pool, and once for // the default priority pool. TEST_F(PriorityConnPoolMapImplTest, TestAddDrainedCbProxiedThrough) { @@ -137,13 +112,13 @@ TEST_F(PriorityConnPoolMapImplTest, TestAddDrainedCbProxiedThrough) { test_map->getPool(ResourcePriority::High, 0, getBasicFactory()); test_map->getPool(ResourcePriority::Default, 0, getBasicFactory()); - Http::ConnectionPool::Instance::IdleCb cbHigh; - EXPECT_CALL(*mock_pools_[0], addIdleCallback(_)).WillOnce(SaveArg<0>(&cbHigh)); - Http::ConnectionPool::Instance::IdleCb cbDefault; - EXPECT_CALL(*mock_pools_[1], addIdleCallback(_)).WillOnce(SaveArg<0>(&cbDefault)); + Http::ConnectionPool::Instance::DrainedCb cbHigh; + EXPECT_CALL(*mock_pools_[0], addDrainedCallback(_)).WillOnce(SaveArg<0>(&cbHigh)); + Http::ConnectionPool::Instance::DrainedCb cbDefault; + EXPECT_CALL(*mock_pools_[1], addDrainedCallback(_)).WillOnce(SaveArg<0>(&cbDefault)); ReadyWatcher watcher; - test_map->addIdleCallback([&watcher]() { watcher.ready(); }); + test_map->addDrainedCallback([&watcher] { watcher.ready(); }); EXPECT_CALL(watcher, ready()).Times(2); cbHigh(); diff --git a/test/integration/idle_timeout_integration_test.cc b/test/integration/idle_timeout_integration_test.cc index 9ba8b0ff29ebe..9a8e0f75e1972 100644 --- a/test/integration/idle_timeout_integration_test.cc +++ b/test/integration/idle_timeout_integration_test.cc @@ -105,16 +105,6 @@ TEST_P(IdleTimeoutIntegrationTest, TimeoutBasic) { idle_time_out->set_seconds(seconds.count()); ConfigHelper::setProtocolOptions(*bootstrap.mutable_static_resources()->mutable_clusters(0), protocol_options); - - // Set pool limit so that the test can use it's stats to validate that - // the pool is deleted. - envoy::config::cluster::v3::CircuitBreakers circuit_breakers; - auto* threshold = circuit_breakers.mutable_thresholds()->Add(); - threshold->mutable_max_connection_pools()->set_value(1); - bootstrap.mutable_static_resources() - ->mutable_clusters(0) - ->mutable_circuit_breakers() - ->MergeFrom(circuit_breakers); }); initialize(); @@ -122,9 +112,6 @@ TEST_P(IdleTimeoutIntegrationTest, TimeoutBasic) { auto response = codec_client_->makeRequestWithBody(default_request_headers_, 1024); waitForNextUpstreamRequest(); - // Validate that the circuit breaker config is setup as we expect. - test_server_->waitForGaugeEq("cluster.cluster_0.circuit_breakers.default.cx_pool_open", 1); - upstream_request_->encodeHeaders(default_response_headers_, false); upstream_request_->encodeData(512, true); ASSERT_TRUE(response->waitForEndStream()); @@ -137,9 +124,6 @@ TEST_P(IdleTimeoutIntegrationTest, TimeoutBasic) { // Do not send any requests and validate if idle time out kicks in. ASSERT_TRUE(fake_upstream_connection_->waitForDisconnect()); test_server_->waitForCounterGe("cluster.cluster_0.upstream_cx_idle_timeout", 1); - - // Validate that the pool is deleted when it becomes idle. - test_server_->waitForGaugeEq("cluster.cluster_0.circuit_breakers.default.cx_pool_open", 0); } // Tests idle timeout behaviour with multiple requests and validates that idle timer kicks in diff --git a/test/mocks/http/conn_pool.cc b/test/mocks/http/conn_pool.cc index 7c1ffa928c0fc..035f566a6b130 100644 --- a/test/mocks/http/conn_pool.cc +++ b/test/mocks/http/conn_pool.cc @@ -1,8 +1,5 @@ #include "test/mocks/http/conn_pool.h" -using testing::_; -using testing::SaveArg; - namespace Envoy { namespace Http { namespace ConnectionPool { @@ -10,7 +7,6 @@ namespace ConnectionPool { MockInstance::MockInstance() : host_{std::make_shared>()} { ON_CALL(*this, host()).WillByDefault(Return(host_)); - ON_CALL(*this, addIdleCallback(_)).WillByDefault(SaveArg<0>(&idle_cb_)); } MockInstance::~MockInstance() = default; diff --git a/test/mocks/http/conn_pool.h b/test/mocks/http/conn_pool.h index c1f55c2f92ef8..2f3d5b9352de2 100644 --- a/test/mocks/http/conn_pool.h +++ b/test/mocks/http/conn_pool.h @@ -29,9 +29,7 @@ class MockInstance : public Instance { // Http::ConnectionPool::Instance MOCK_METHOD(Http::Protocol, protocol, (), (const)); - MOCK_METHOD(void, addIdleCallback, (IdleCb cb)); - MOCK_METHOD(bool, isIdle, (), (const)); - MOCK_METHOD(void, startDrain, ()); + MOCK_METHOD(void, addDrainedCallback, (DrainedCb cb)); MOCK_METHOD(void, drainConnections, ()); MOCK_METHOD(bool, hasActiveConnections, (), (const)); MOCK_METHOD(Cancellable*, newStream, (ResponseDecoder & response_decoder, Callbacks& callbacks)); @@ -40,7 +38,6 @@ class MockInstance : public Instance { MOCK_METHOD(absl::string_view, protocolDescription, (), (const)); std::shared_ptr> host_; - IdleCb idle_cb_; }; } // namespace ConnectionPool diff --git a/test/mocks/tcp/mocks.cc b/test/mocks/tcp/mocks.cc index 9b1a4cff79905..d6828f046a147 100644 --- a/test/mocks/tcp/mocks.cc +++ b/test/mocks/tcp/mocks.cc @@ -7,7 +7,6 @@ using testing::ReturnRef; using testing::_; using testing::Invoke; using testing::ReturnRef; -using testing::SaveArg; namespace Envoy { namespace Tcp { @@ -28,7 +27,6 @@ MockInstance::MockInstance() { return newConnectionImpl(cb); })); ON_CALL(*this, host()).WillByDefault(Return(host_)); - ON_CALL(*this, addIdleCallback(_)).WillByDefault(SaveArg<0>(&idle_cb_)); } MockInstance::~MockInstance() = default; diff --git a/test/mocks/tcp/mocks.h b/test/mocks/tcp/mocks.h index 75e79e7aea932..6b486918cb99c 100644 --- a/test/mocks/tcp/mocks.h +++ b/test/mocks/tcp/mocks.h @@ -59,9 +59,7 @@ class MockInstance : public Instance { ~MockInstance() override; // Tcp::ConnectionPool::Instance - MOCK_METHOD(void, addIdleCallback, (IdleCb cb)); - MOCK_METHOD(bool, isIdle, (), (const)); - MOCK_METHOD(void, startDrain, ()); + MOCK_METHOD(void, addDrainedCallback, (DrainedCb cb)); MOCK_METHOD(void, drainConnections, ()); MOCK_METHOD(void, closeConnections, ()); MOCK_METHOD(Cancellable*, newConnection, (Tcp::ConnectionPool::Callbacks & callbacks)); @@ -77,7 +75,6 @@ class MockInstance : public Instance { std::list> handles_; std::list callbacks_; - IdleCb idle_cb_; std::shared_ptr> host_{ new NiceMock()};