diff --git a/include/envoy/tcp/conn_pool.h b/include/envoy/tcp/conn_pool.h index c89b25eb91432..8237af37fea31 100644 --- a/include/envoy/tcp/conn_pool.h +++ b/include/envoy/tcp/conn_pool.h @@ -58,7 +58,8 @@ class UpstreamCallbacks : public Network::ConnectionCallbacks { }; /* - * ConnectionData wraps a ClientConnection allocated to a caller. + * ConnectionData wraps a ClientConnection allocated to a caller. Open ClientConnections are + * released back to the pool for re-use when their containing ConnectionData is destroyed. */ class ConnectionData { public: @@ -76,14 +77,10 @@ class ConnectionData { * @param callback the UpstreamCallbacks to invoke for upstream data */ virtual void addUpstreamCallbacks(ConnectionPool::UpstreamCallbacks& callback) PURE; - - /** - * Release the connection after use. The connection should be closed first only if it is - * not viable for future use. - */ - virtual void release() PURE; }; +typedef std::unique_ptr ConnectionDataPtr; + /** * Pool callbacks invoked in the context of a newConnection() call, either synchronously or * asynchronously. @@ -102,14 +99,17 @@ class Callbacks { Upstream::HostDescriptionConstSharedPtr host) PURE; /** - * Called when a connection is available to process a request/response. Recipients of connections - * must release the connection after use. They should only close the underlying ClientConnection - * if it is no longer viable for future requests. + * Called when a connection is available to process a request/response. Connections may be + * released back to the pool for re-use by resetting the ConnectionDataPtr. If the connection is + * no longer viable for reuse (e.g. due to some kind of protocol error), the underlying + * ClientConnection should be closed to prevent its reuse. + * * @param conn supplies the connection data to use. * @param host supplies the description of the host that will carry the request. For logical * connection pools the description may be different each time this is called. */ - virtual void onPoolReady(ConnectionData& conn, Upstream::HostDescriptionConstSharedPtr host) PURE; + virtual void onPoolReady(ConnectionDataPtr&& conn, + Upstream::HostDescriptionConstSharedPtr host) PURE; }; /** diff --git a/source/common/http/websocket/ws_handler_impl.cc b/source/common/http/websocket/ws_handler_impl.cc index 541f085155dfe..5906c54b5d92c 100644 --- a/source/common/http/websocket/ws_handler_impl.cc +++ b/source/common/http/websocket/ws_handler_impl.cc @@ -131,7 +131,8 @@ void WsHandlerImpl::onConnectionSuccess() { // the connection pool. The current approach is a stop gap solution, where // we put the onus on the user to tell us if a route (and corresponding upstream) // is supposed to allow websocket upgrades or not. - Http1::ClientConnectionImpl upstream_http(*upstream_connection_, http_conn_callbacks_); + Http1::ClientConnectionImpl upstream_http(upstream_conn_data_->connection(), + http_conn_callbacks_); Http1::RequestStreamEncoderImpl upstream_request = Http1::RequestStreamEncoderImpl(upstream_http); upstream_request.encodeHeaders(request_headers_, false); ASSERT(state_ == ConnectState::PreConnect); diff --git a/source/common/tcp/conn_pool.cc b/source/common/tcp/conn_pool.cc index d6c60c174c465..9d062f96a09f5 100644 --- a/source/common/tcp/conn_pool.cc +++ b/source/common/tcp/conn_pool.cc @@ -46,8 +46,10 @@ void ConnPoolImpl::addDrainedCallback(DrainedCb cb) { void ConnPoolImpl::assignConnection(ActiveConn& conn, ConnectionPool::Callbacks& callbacks) { ASSERT(conn.wrapper_ == nullptr); - conn.wrapper_ = std::make_unique(conn); - callbacks.onPoolReady(*conn.wrapper_, conn.real_host_description_); + conn.wrapper_ = std::make_shared(conn); + + callbacks.onPoolReady(std::make_unique(conn.wrapper_), + conn.real_host_description_); } void ConnPoolImpl::checkForDrained() { @@ -124,6 +126,8 @@ void ConnPoolImpl::onConnectionEvent(ActiveConn& conn, Network::ConnectionEvent host_->cluster().stats().upstream_cx_destroy_remote_with_active_rq_.inc(); } host_->cluster().stats().upstream_cx_destroy_with_active_rq_.inc(); + + conn.wrapper_->release(true); } removed = conn.removeFromList(busy_conns_); @@ -230,7 +234,11 @@ void ConnPoolImpl::onUpstreamReady() { } void ConnPoolImpl::processIdleConnection(ActiveConn& conn, bool delay) { - conn.wrapper_.reset(); + if (conn.wrapper_) { + conn.wrapper_->invalidate(); + conn.wrapper_.reset(); + } + if (pending_requests_.empty() || delay) { // There is nothing to service or delayed processing is requested, so just move the connection // into the ready list. @@ -259,23 +267,29 @@ ConnPoolImpl::ConnectionWrapper::ConnectionWrapper(ActiveConn& parent) : parent_ parent_.parent_.host_->stats().rq_active_.inc(); } -ConnPoolImpl::ConnectionWrapper::~ConnectionWrapper() { - parent_.parent_.host_->cluster().stats().upstream_rq_active_.dec(); - parent_.parent_.host_->stats().rq_active_.dec(); +Network::ClientConnection& ConnPoolImpl::ConnectionWrapper::connection() { + ASSERT(conn_valid_); + return *parent_.conn_; } -Network::ClientConnection& ConnPoolImpl::ConnectionWrapper::connection() { return *parent_.conn_; } - void ConnPoolImpl::ConnectionWrapper::addUpstreamCallbacks(ConnectionPool::UpstreamCallbacks& cb) { ASSERT(!released_); callbacks_ = &cb; } -void ConnPoolImpl::ConnectionWrapper::release() { - ASSERT(!released_); - released_ = true; - callbacks_ = nullptr; - parent_.parent_.onConnReleased(parent_); +void ConnPoolImpl::ConnectionWrapper::release(bool closed) { + // Allow multiple calls: connection close and destruction of ConnectionDataImplPtr will both + // result in this call. + if (!released_) { + released_ = true; + callbacks_ = nullptr; + if (!closed) { + parent_.parent_.onConnReleased(parent_); + } + + parent_.parent_.host_->cluster().stats().upstream_rq_active_.dec(); + parent_.parent_.host_->stats().rq_active_.dec(); + } } ConnPoolImpl::PendingRequest::PendingRequest(ConnPoolImpl& parent, @@ -332,6 +346,10 @@ ConnPoolImpl::ActiveConn::ActiveConn(ConnPoolImpl& parent) } ConnPoolImpl::ActiveConn::~ActiveConn() { + if (wrapper_) { + wrapper_->invalidate(); + } + parent_.host_->cluster().stats().upstream_cx_active_.dec(); parent_.host_->stats().cx_active_.dec(); conn_length_->complete(); @@ -361,11 +379,18 @@ void ConnPoolImpl::ActiveConn::onUpstreamData(Buffer::Instance& data, bool end_s } void ConnPoolImpl::ActiveConn::onEvent(Network::ConnectionEvent event) { + ConnectionPool::UpstreamCallbacks* cb = nullptr; if (wrapper_ != nullptr && wrapper_->callbacks_ != nullptr) { - wrapper_->callbacks_->onEvent(event); + cb = wrapper_->callbacks_; } + // In the event of a close event, we want to update the pool's state before triggering callbacks, + // preventing the case where we attempt to return a closed connection to the ready pool. parent_.onConnectionEvent(*this, event); + + if (cb) { + cb->onEvent(event); + } } void ConnPoolImpl::ActiveConn::onAboveWriteBufferHighWatermark() { diff --git a/source/common/tcp/conn_pool.h b/source/common/tcp/conn_pool.h index aa06d8b2d8b4d..42f8dc52bd7bd 100644 --- a/source/common/tcp/conn_pool.h +++ b/source/common/tcp/conn_pool.h @@ -34,21 +34,35 @@ class ConnPoolImpl : Logger::Loggable, public ConnectionPool:: protected: struct ActiveConn; - struct ConnectionWrapper : public ConnectionPool::ConnectionData { + struct ConnectionWrapper { ConnectionWrapper(ActiveConn& parent); - ~ConnectionWrapper(); - // ConnectionPool::ConnectionData - Network::ClientConnection& connection() override; - void addUpstreamCallbacks(ConnectionPool::UpstreamCallbacks& callbacks) override; - void release() override; + Network::ClientConnection& connection(); + void addUpstreamCallbacks(ConnectionPool::UpstreamCallbacks& callbacks); + void release(bool closed); + + void invalidate() { conn_valid_ = false; } ActiveConn& parent_; ConnectionPool::UpstreamCallbacks* callbacks_{}; bool released_{false}; + bool conn_valid_{true}; }; - typedef std::unique_ptr ConnectionWrapperPtr; + typedef std::shared_ptr ConnectionWrapperSharedPtr; + + struct ConnectionDataImpl : public ConnectionPool::ConnectionData { + ConnectionDataImpl(ConnectionWrapperSharedPtr wrapper) : wrapper_(wrapper) {} + ~ConnectionDataImpl() { wrapper_->release(false); } + + // ConnectionPool::ConnectionData + Network::ClientConnection& connection() override { return wrapper_->connection(); } + void addUpstreamCallbacks(ConnectionPool::UpstreamCallbacks& callbacks) override { + wrapper_->addUpstreamCallbacks(callbacks); + }; + + ConnectionWrapperSharedPtr wrapper_; + }; struct ConnReadFilter : public Network::ReadFilterBaseImpl { ConnReadFilter(ActiveConn& parent) : parent_(parent) {} @@ -78,7 +92,7 @@ class ConnPoolImpl : Logger::Loggable, public ConnectionPool:: ConnPoolImpl& parent_; Upstream::HostDescriptionConstSharedPtr real_host_description_; - ConnectionWrapperPtr wrapper_; + ConnectionWrapperSharedPtr wrapper_; Network::ClientConnectionPtr conn_; Event::TimerPtr connect_timer_; Stats::TimespanPtr conn_length_; diff --git a/source/common/tcp_proxy/BUILD b/source/common/tcp_proxy/BUILD index fcd3000620fb8..61de5015db426 100644 --- a/source/common/tcp_proxy/BUILD +++ b/source/common/tcp_proxy/BUILD @@ -24,6 +24,7 @@ envoy_cc_library( "//include/envoy/stats:stats_interface", "//include/envoy/stats:stats_macros", "//include/envoy/stats:timespan", + "//include/envoy/tcp:conn_pool_interface", "//include/envoy/upstream:cluster_manager_interface", "//include/envoy/upstream:upstream_interface", "//source/common/access_log:access_log_lib", diff --git a/source/common/tcp_proxy/tcp_proxy.cc b/source/common/tcp_proxy/tcp_proxy.cc index 68d4880200be7..906bdeeb696ea 100644 --- a/source/common/tcp_proxy/tcp_proxy.cc +++ b/source/common/tcp_proxy/tcp_proxy.cc @@ -135,8 +135,12 @@ Filter::~Filter() { access_log->log(nullptr, nullptr, nullptr, getRequestInfo()); } - if (upstream_connection_) { - finalizeUpstreamConnectionStats(); + if (upstream_handle_) { + upstream_handle_->cancel(); + } + + if (upstream_conn_data_) { + upstream_conn_data_->connection().close(Network::ConnectionCloseType::NoFlush); } } @@ -144,21 +148,6 @@ TcpProxyStats Config::SharedConfig::generateStats(Stats::Scope& scope) { return {ALL_TCP_PROXY_STATS(POOL_COUNTER(scope), POOL_GAUGE(scope))}; } -namespace { -void finalizeConnectionStats(const Upstream::HostDescription& host, - Stats::Timespan connected_timespan) { - host.cluster().stats().upstream_cx_destroy_.inc(); - host.cluster().stats().upstream_cx_active_.dec(); - host.stats().cx_active_.dec(); - host.cluster().resourceManager(Upstream::ResourcePriority::Default).connections().dec(); - connected_timespan.complete(); -} -} // namespace - -void Filter::finalizeUpstreamConnectionStats() { - finalizeConnectionStats(*read_callbacks_->upstreamHost(), *connected_timespan_); -} - void Filter::initializeReadFilterCallbacks(Network::ReadFilterCallbacks& callbacks) { initialize(callbacks, true); } @@ -188,15 +177,15 @@ void Filter::initialize(Network::ReadFilterCallbacks& callbacks, bool set_connec } void Filter::readDisableUpstream(bool disable) { - if (upstream_connection_ == nullptr || - upstream_connection_->state() != Network::Connection::State::Open) { + if (upstream_conn_data_ == nullptr || + upstream_conn_data_->connection().state() != Network::Connection::State::Open) { // Because we flush write downstream, we can have a case where upstream has already disconnected // and we are waiting to flush. If we had a watermark event during this time we should no // longer touch the upstream connection. return; } - upstream_connection_->readDisable(disable); + upstream_conn_data_->connection().readDisable(disable); if (disable) { read_callbacks_->upstreamHost() ->cluster() @@ -262,13 +251,12 @@ void Filter::UpstreamCallbacks::onBelowWriteBufferLowWatermark() { } } -Network::FilterStatus Filter::UpstreamCallbacks::onData(Buffer::Instance& data, bool end_stream) { +void Filter::UpstreamCallbacks::onUpstreamData(Buffer::Instance& data, bool end_stream) { if (parent_) { parent_->onUpstreamData(data, end_stream); } else { drainer_->onData(data, end_stream); } - return Network::FilterStatus::StopIteration; } void Filter::UpstreamCallbacks::onBytesSent() { @@ -294,7 +282,7 @@ void Filter::UpstreamCallbacks::drain(Drainer& drainer) { } Network::FilterStatus Filter::initializeUpstreamConnection() { - ASSERT(upstream_connection_ == nullptr); + ASSERT(upstream_conn_data_ == nullptr); const std::string& cluster_name = getUpstreamCluster(); @@ -311,6 +299,9 @@ Network::FilterStatus Filter::initializeUpstreamConnection() { } Upstream::ClusterInfoConstSharedPtr cluster = thread_local_cluster->info(); + + // Check this here because the TCP conn pool will queue our request waiting for a connection that + // will never be released. if (!cluster->resourceManager(Upstream::ResourcePriority::Default).connections().canCreate()) { getRequestInfo().setResponseFlag(RequestInfo::ResponseFlag::UpstreamOverflow); cluster->stats().upstream_cx_overflow_.inc(); @@ -325,86 +316,105 @@ Network::FilterStatus Filter::initializeUpstreamConnection() { return Network::FilterStatus::StopIteration; } - Upstream::Host::CreateConnectionData conn_info = - cluster_manager_.tcpConnForCluster(cluster_name, this); - - upstream_connection_ = std::move(conn_info.connection_); - read_callbacks_->upstreamHost(conn_info.host_description_); - if (!upstream_connection_) { - // tcpConnForCluster() increments cluster->stats().upstream_cx_none_healthy. + Tcp::ConnectionPool::Instance* conn_pool = cluster_manager_.tcpConnPoolForCluster( + cluster_name, Upstream::ResourcePriority::Default, this); + if (!conn_pool) { + // Either cluster is unknown or there are no healthy hosts. tcpConnPoolForCluster() increments + // cluster->stats().upstream_cx_none_healthy in the latter case. getRequestInfo().setResponseFlag(RequestInfo::ResponseFlag::NoHealthyUpstream); onInitFailure(UpstreamFailureReason::NO_HEALTHY_UPSTREAM); return Network::FilterStatus::StopIteration; } + connecting_ = true; connect_attempts_++; - cluster->resourceManager(Upstream::ResourcePriority::Default).connections().inc(); - upstream_connection_->addReadFilter(upstream_callbacks_); - upstream_connection_->addConnectionCallbacks(*upstream_callbacks_); - upstream_connection_->enableHalfClose(true); - upstream_connection_->setConnectionStats( - {read_callbacks_->upstreamHost()->cluster().stats().upstream_cx_rx_bytes_total_, - read_callbacks_->upstreamHost()->cluster().stats().upstream_cx_rx_bytes_buffered_, - read_callbacks_->upstreamHost()->cluster().stats().upstream_cx_tx_bytes_total_, - read_callbacks_->upstreamHost()->cluster().stats().upstream_cx_tx_bytes_buffered_, - &read_callbacks_->upstreamHost()->cluster().stats().bind_errors_}); - upstream_connection_->connect(); - upstream_connection_->noDelay(true); - getRequestInfo().onUpstreamHostSelected(conn_info.host_description_); - getRequestInfo().setUpstreamLocalAddress(upstream_connection_->localAddress()); - - ASSERT(connect_timeout_timer_ == nullptr); - connect_timeout_timer_ = read_callbacks_->connection().dispatcher().createTimer( - [this]() -> void { onConnectTimeout(); }); - connect_timeout_timer_->enableTimer(cluster->connectTimeout()); - - read_callbacks_->upstreamHost()->cluster().stats().upstream_cx_total_.inc(); - read_callbacks_->upstreamHost()->cluster().stats().upstream_cx_active_.inc(); - read_callbacks_->upstreamHost()->stats().cx_total_.inc(); - read_callbacks_->upstreamHost()->stats().cx_active_.inc(); - connect_timespan_.reset(new Stats::Timespan( - read_callbacks_->upstreamHost()->cluster().stats().upstream_cx_connect_ms_)); - connected_timespan_.reset(new Stats::Timespan( - read_callbacks_->upstreamHost()->cluster().stats().upstream_cx_length_ms_)); - - return Network::FilterStatus::Continue; + + // Because we never return open connections to the pool, this should either return a handle while + // a connection completes or it invokes onPoolFailure inline. Either way, stop iteration. + upstream_handle_ = conn_pool->newConnection(*this); + return Network::FilterStatus::StopIteration; +} + +void Filter::onPoolFailure(Tcp::ConnectionPool::PoolFailureReason reason, + Upstream::HostDescriptionConstSharedPtr host) { + upstream_handle_ = nullptr; + + read_callbacks_->upstreamHost(host); + getRequestInfo().onUpstreamHostSelected(host); + + switch (reason) { + case Tcp::ConnectionPool::PoolFailureReason::Overflow: + case Tcp::ConnectionPool::PoolFailureReason::LocalConnectionFailure: + upstream_callbacks_->onEvent(Network::ConnectionEvent::LocalClose); + break; + + case Tcp::ConnectionPool::PoolFailureReason::RemoteConnectionFailure: + upstream_callbacks_->onEvent(Network::ConnectionEvent::RemoteClose); + break; + + case Tcp::ConnectionPool::PoolFailureReason::Timeout: + onConnectTimeout(); + break; + + default: + NOT_REACHED_GCOVR_EXCL_LINE; + } +} + +void Filter::onPoolReady(Tcp::ConnectionPool::ConnectionDataPtr&& conn_data, + Upstream::HostDescriptionConstSharedPtr host) { + upstream_handle_ = nullptr; + upstream_conn_data_ = std::move(conn_data); + read_callbacks_->upstreamHost(host); + + upstream_conn_data_->addUpstreamCallbacks(*upstream_callbacks_); + + Network::ClientConnection& connection = upstream_conn_data_->connection(); + + connection.enableHalfClose(true); + + getRequestInfo().onUpstreamHostSelected(host); + getRequestInfo().setUpstreamLocalAddress(connection.localAddress()); + + // Simulate the event that onPoolReady represents. + upstream_callbacks_->onEvent(Network::ConnectionEvent::Connected); + + read_callbacks_->continueReading(); } void Filter::onConnectTimeout() { ENVOY_CONN_LOG(debug, "connect timeout", read_callbacks_->connection()); read_callbacks_->upstreamHost()->outlierDetector().putResult(Upstream::Outlier::Result::TIMEOUT); - read_callbacks_->upstreamHost()->cluster().stats().upstream_cx_connect_timeout_.inc(); getRequestInfo().setResponseFlag(RequestInfo::ResponseFlag::UpstreamConnectionFailure); - // This will cause a LocalClose event to be raised, which will trigger a reconnect if - // needed/configured. - upstream_connection_->close(Network::ConnectionCloseType::NoFlush); + // Raise LocalClose, which will trigger a reconnect if needed/configured. + upstream_callbacks_->onEvent(Network::ConnectionEvent::LocalClose); } Network::FilterStatus Filter::onData(Buffer::Instance& data, bool end_stream) { ENVOY_CONN_LOG(trace, "downstream connection received {} bytes, end_stream={}", read_callbacks_->connection(), data.length(), end_stream); getRequestInfo().addBytesReceived(data.length()); - upstream_connection_->write(data, end_stream); + upstream_conn_data_->connection().write(data, end_stream); ASSERT(0 == data.length()); resetIdleTimer(); // TODO(ggreenway) PERF: do we need to reset timer on both send and receive? return Network::FilterStatus::StopIteration; } void Filter::onDownstreamEvent(Network::ConnectionEvent event) { - if (upstream_connection_) { + if (upstream_conn_data_) { if (event == Network::ConnectionEvent::RemoteClose) { - upstream_connection_->close(Network::ConnectionCloseType::FlushWrite); + upstream_conn_data_->connection().close(Network::ConnectionCloseType::FlushWrite); - if (upstream_connection_ != nullptr && - upstream_connection_->state() != Network::Connection::State::Closed) { - config_->drainManager().add(config_->sharedConfig(), std::move(upstream_connection_), + if (upstream_conn_data_ != nullptr && + upstream_conn_data_->connection().state() != Network::Connection::State::Closed) { + config_->drainManager().add(config_->sharedConfig(), std::move(upstream_conn_data_), std::move(upstream_callbacks_), std::move(idle_timer_), - read_callbacks_->upstreamHost(), - std::move(connected_timespan_)); + read_callbacks_->upstreamHost()); } } else if (event == Network::ConnectionEvent::LocalClose) { - upstream_connection_->close(Network::ConnectionCloseType::NoFlush); + upstream_conn_data_->connection().close(Network::ConnectionCloseType::NoFlush); + upstream_conn_data_.reset(); disableIdleTimer(); } } @@ -420,36 +430,21 @@ void Filter::onUpstreamData(Buffer::Instance& data, bool end_stream) { } void Filter::onUpstreamEvent(Network::ConnectionEvent event) { - bool connecting = false; - - // The timer must be cleared before, not after, processing the event because - // if initializeUpstreamConnection() is called it will reset the timer, so - // clearing after that call will leave the timer unset. - if (connect_timeout_timer_) { - connecting = true; - connect_timeout_timer_->disableTimer(); - connect_timeout_timer_.reset(); - } + // Update the connecting flag before processing the event because we may start a new connection + // attempt in initializeUpstreamConnection. + bool connecting = connecting_; + connecting_ = false; if (event == Network::ConnectionEvent::RemoteClose || event == Network::ConnectionEvent::LocalClose) { - finalizeUpstreamConnectionStats(); - read_callbacks_->connection().dispatcher().deferredDelete(std::move(upstream_connection_)); + upstream_conn_data_.reset(); disableIdleTimer(); - auto& destroy_ctx_stat = - (event == Network::ConnectionEvent::RemoteClose) - ? read_callbacks_->upstreamHost()->cluster().stats().upstream_cx_destroy_remote_ - : read_callbacks_->upstreamHost()->cluster().stats().upstream_cx_destroy_local_; - destroy_ctx_stat.inc(); - if (connecting) { if (event == Network::ConnectionEvent::RemoteClose) { getRequestInfo().setResponseFlag(RequestInfo::ResponseFlag::UpstreamConnectionFailure); read_callbacks_->upstreamHost()->outlierDetector().putResult( Upstream::Outlier::Result::CONNECT_FAILED); - read_callbacks_->upstreamHost()->cluster().stats().upstream_cx_connect_fail_.inc(); - read_callbacks_->upstreamHost()->stats().cx_connect_fail_.inc(); } initializeUpstreamConnection(); @@ -459,8 +454,6 @@ void Filter::onUpstreamEvent(Network::ConnectionEvent event) { } } } else if (event == Network::ConnectionEvent::Connected) { - connect_timespan_->complete(); - // Re-enable downstream reads now that the upstream connection is established // so we have a place to send downstream data to. read_callbacks_->connection().readDisable(false); @@ -480,8 +473,10 @@ void Filter::onUpstreamEvent(Network::ConnectionEvent event) { }); resetIdleTimer(); read_callbacks_->connection().addBytesSentCallback([this](uint64_t) { resetIdleTimer(); }); - upstream_connection_->addBytesSentCallback([upstream_callbacks = upstream_callbacks_]( - uint64_t) { upstream_callbacks->onBytesSent(); }); + upstream_conn_data_->connection().addBytesSentCallback([upstream_callbacks = + upstream_callbacks_](uint64_t) { + upstream_callbacks->onBytesSent(); + }); } } } @@ -523,14 +518,12 @@ UpstreamDrainManager::~UpstreamDrainManager() { } void UpstreamDrainManager::add(const Config::SharedConfigSharedPtr& config, - Network::ClientConnectionPtr&& upstream_connection, + Tcp::ConnectionPool::ConnectionDataPtr&& upstream_conn_data, const std::shared_ptr& callbacks, Event::TimerPtr&& idle_timer, - const Upstream::HostDescriptionConstSharedPtr& upstream_host, - Stats::TimespanPtr&& connected_timespan) { - DrainerPtr drainer(new Drainer(*this, config, callbacks, std::move(upstream_connection), - std::move(idle_timer), upstream_host, - std::move(connected_timespan))); + const Upstream::HostDescriptionConstSharedPtr& upstream_host) { + DrainerPtr drainer(new Drainer(*this, config, callbacks, std::move(upstream_conn_data), + std::move(idle_timer), upstream_host)); callbacks->drain(*drainer); // Use temporary to ensure we get the pointer before we move it out of drainer @@ -547,12 +540,10 @@ void UpstreamDrainManager::remove(Drainer& drainer, Event::Dispatcher& dispatche Drainer::Drainer(UpstreamDrainManager& parent, const Config::SharedConfigSharedPtr& config, const std::shared_ptr& callbacks, - Network::ClientConnectionPtr&& connection, Event::TimerPtr&& idle_timer, - const Upstream::HostDescriptionConstSharedPtr& upstream_host, - Stats::TimespanPtr&& connected_timespan) - : parent_(parent), callbacks_(callbacks), upstream_connection_(std::move(connection)), - timer_(std::move(idle_timer)), connected_timespan_(std::move(connected_timespan)), - upstream_host_(upstream_host), config_(config) { + Tcp::ConnectionPool::ConnectionDataPtr&& conn_data, Event::TimerPtr&& idle_timer, + const Upstream::HostDescriptionConstSharedPtr& upstream_host) + : parent_(parent), callbacks_(callbacks), upstream_conn_data_(std::move(conn_data)), + timer_(std::move(idle_timer)), upstream_host_(upstream_host), config_(config) { config_->stats().upstream_flush_total_.inc(); config_->stats().upstream_flush_active_.inc(); } @@ -564,8 +555,7 @@ void Drainer::onEvent(Network::ConnectionEvent event) { timer_->disableTimer(); } config_->stats().upstream_flush_active_.dec(); - finalizeConnectionStats(*upstream_host_, *connected_timespan_); - parent_.remove(*this, upstream_connection_->dispatcher()); + parent_.remove(*this, upstream_conn_data_->connection().dispatcher()); } } @@ -592,7 +582,7 @@ void Drainer::onBytesSent() { void Drainer::cancelDrain() { // This sends onEvent(LocalClose). - upstream_connection_->close(Network::ConnectionCloseType::NoFlush); + upstream_conn_data_->connection().close(Network::ConnectionCloseType::NoFlush); } } // namespace TcpProxy diff --git a/source/common/tcp_proxy/tcp_proxy.h b/source/common/tcp_proxy/tcp_proxy.h index 3c9173edc3328..61a803db85cca 100644 --- a/source/common/tcp_proxy/tcp_proxy.h +++ b/source/common/tcp_proxy/tcp_proxy.h @@ -14,6 +14,7 @@ #include "envoy/server/filter_config.h" #include "envoy/stats/stats_macros.h" #include "envoy/stats/timespan.h" +#include "envoy/tcp/conn_pool.h" #include "envoy/upstream/cluster_manager.h" #include "envoy/upstream/upstream.h" @@ -138,6 +139,7 @@ typedef std::shared_ptr ConfigSharedPtr; * be proxied back and forth between the two connections. */ class Filter : public Network::ReadFilter, + Tcp::ConnectionPool::Callbacks, Upstream::LoadBalancerContext, protected Logger::Loggable { public: @@ -149,6 +151,12 @@ class Filter : public Network::ReadFilter, Network::FilterStatus onNewConnection() override { return initializeUpstreamConnection(); } void initializeReadFilterCallbacks(Network::ReadFilterCallbacks& callbacks) override; + // Tcp::ConnectionPool::Callbacks + void onPoolFailure(Tcp::ConnectionPool::PoolFailureReason reason, + Upstream::HostDescriptionConstSharedPtr host) override; + void onPoolReady(Tcp::ConnectionPool::ConnectionDataPtr&& conn_data, + Upstream::HostDescriptionConstSharedPtr host) override; + // Upstream::LoadBalancerContext absl::optional computeHashKey() override { return {}; } const Router::MetadataMatchCriteria* metadataMatchCriteria() override { @@ -166,18 +174,15 @@ class Filter : public Network::ReadFilter, void readDisableUpstream(bool disable); void readDisableDownstream(bool disable); - struct UpstreamCallbacks : public Network::ConnectionCallbacks, - public Network::ReadFilterBaseImpl { + struct UpstreamCallbacks : public Tcp::ConnectionPool::UpstreamCallbacks { UpstreamCallbacks(Filter* parent) : parent_(parent) {} - // Network::ConnectionCallbacks + // Tcp::ConnectionPool::UpstreamCallbacks + void onUpstreamData(Buffer::Instance& data, bool end_stream) override; void onEvent(Network::ConnectionEvent event) override; void onAboveWriteBufferHighWatermark() override; void onBelowWriteBufferLowWatermark() override; - // Network::ReadFilter - Network::FilterStatus onData(Buffer::Instance& data, bool end_stream) override; - void onBytesSent(); void onIdleTimeout(); void drain(Drainer& drainer); @@ -234,7 +239,6 @@ class Filter : public Network::ReadFilter, void onDownstreamEvent(Network::ConnectionEvent event); void onUpstreamData(Buffer::Instance& data, bool end_stream); void onUpstreamEvent(Network::ConnectionEvent event); - void finalizeUpstreamConnectionStats(); void onIdleTimeout(); void resetIdleTimer(); void disableIdleTimer(); @@ -242,29 +246,26 @@ class Filter : public Network::ReadFilter, const ConfigSharedPtr config_; Upstream::ClusterManager& cluster_manager_; Network::ReadFilterCallbacks* read_callbacks_{}; - Network::ClientConnectionPtr upstream_connection_; + Tcp::ConnectionPool::Cancellable* upstream_handle_{}; + Tcp::ConnectionPool::ConnectionDataPtr upstream_conn_data_; DownstreamCallbacks downstream_callbacks_; - Event::TimerPtr connect_timeout_timer_; Event::TimerPtr idle_timer_; - Stats::TimespanPtr connect_timespan_; - Stats::TimespanPtr connected_timespan_; std::shared_ptr upstream_callbacks_; // shared_ptr required for passing as a // read filter. RequestInfo::RequestInfoImpl request_info_; uint32_t connect_attempts_{}; + bool connecting_{}; }; -// This class holds ownership of an upstream connection that needs to finish -// flushing, when the downstream connection has been closed. The TcpProxy is -// destroyed when the downstream connection is closed, so moving the upstream -// connection here allows it to finish draining or timeout. +// This class deals with an upstream connection that needs to finish flushing, when the downstream +// connection has been closed. The TcpProxy is destroyed when the downstream connection is closed, +// so handling the upstream connection here allows it to finish draining or timeout. class Drainer : public Event::DeferredDeletable { public: Drainer(UpstreamDrainManager& parent, const Config::SharedConfigSharedPtr& config, const std::shared_ptr& callbacks, - Network::ClientConnectionPtr&& connection, Event::TimerPtr&& idle_timer, - const Upstream::HostDescriptionConstSharedPtr& upstream_host, - Stats::TimespanPtr&& connected_timespan); + Tcp::ConnectionPool::ConnectionDataPtr&& conn_data, Event::TimerPtr&& idle_timer, + const Upstream::HostDescriptionConstSharedPtr& upstream_host); void onEvent(Network::ConnectionEvent event); void onData(Buffer::Instance& data, bool end_stream); @@ -275,9 +276,8 @@ class Drainer : public Event::DeferredDeletable { private: UpstreamDrainManager& parent_; std::shared_ptr callbacks_; - Network::ClientConnectionPtr upstream_connection_; + Tcp::ConnectionPool::ConnectionDataPtr upstream_conn_data_; Event::TimerPtr timer_; - Stats::TimespanPtr connected_timespan_; Upstream::HostDescriptionConstSharedPtr upstream_host_; Config::SharedConfigSharedPtr config_; }; @@ -288,11 +288,10 @@ class UpstreamDrainManager : public ThreadLocal::ThreadLocalObject { public: ~UpstreamDrainManager(); void add(const Config::SharedConfigSharedPtr& config, - Network::ClientConnectionPtr&& upstream_connection, + Tcp::ConnectionPool::ConnectionDataPtr&& upstream_conn_data, const std::shared_ptr& callbacks, Event::TimerPtr&& idle_timer, - const Upstream::HostDescriptionConstSharedPtr& upstream_host, - Stats::TimespanPtr&& connected_timespan); + const Upstream::HostDescriptionConstSharedPtr& upstream_host); void remove(Drainer& drainer, Event::Dispatcher& dispatcher); private: diff --git a/source/extensions/filters/network/thrift_proxy/router/router_impl.cc b/source/extensions/filters/network/thrift_proxy/router/router_impl.cc index 2b4fb77946c35..09d1ac142c2d5 100644 --- a/source/extensions/filters/network/thrift_proxy/router/router_impl.cc +++ b/source/extensions/filters/network/thrift_proxy/router/router_impl.cc @@ -224,7 +224,7 @@ void Router::UpstreamRequest::start() { void Router::UpstreamRequest::resetStream() { if (conn_data_ != nullptr) { conn_data_->connection().close(Network::ConnectionCloseType::NoFlush); - conn_data_ = nullptr; + conn_data_.reset(); } } @@ -235,10 +235,10 @@ void Router::UpstreamRequest::onPoolFailure(Tcp::ConnectionPool::PoolFailureReas onResetStream(reason); } -void Router::UpstreamRequest::onPoolReady(Tcp::ConnectionPool::ConnectionData& conn_data, +void Router::UpstreamRequest::onPoolReady(Tcp::ConnectionPool::ConnectionDataPtr&& conn_data, Upstream::HostDescriptionConstSharedPtr host) { onUpstreamHostSelected(host); - conn_data_ = &conn_data; + conn_data_ = std::move(conn_data); conn_data_->addUpstreamCallbacks(parent_); conn_pool_handle_ = nullptr; @@ -263,10 +263,7 @@ void Router::UpstreamRequest::onRequestComplete() { request_complete_ = true; } void Router::UpstreamRequest::onResponseComplete() { response_complete_ = true; - if (conn_data_ != nullptr) { - conn_data_->release(); - } - conn_data_ = nullptr; + conn_data_.reset(); } void Router::UpstreamRequest::onUpstreamHostSelected(Upstream::HostDescriptionConstSharedPtr host) { diff --git a/source/extensions/filters/network/thrift_proxy/router/router_impl.h b/source/extensions/filters/network/thrift_proxy/router/router_impl.h index 117c99ca2635b..d298d38b354cd 100644 --- a/source/extensions/filters/network/thrift_proxy/router/router_impl.h +++ b/source/extensions/filters/network/thrift_proxy/router/router_impl.h @@ -112,7 +112,7 @@ class Router : public Tcp::ConnectionPool::UpstreamCallbacks, // Tcp::ConnectionPool::Callbacks void onPoolFailure(Tcp::ConnectionPool::PoolFailureReason reason, Upstream::HostDescriptionConstSharedPtr host) override; - void onPoolReady(Tcp::ConnectionPool::ConnectionData& conn, + void onPoolReady(Tcp::ConnectionPool::ConnectionDataPtr&& conn, Upstream::HostDescriptionConstSharedPtr host) override; void onRequestComplete(); @@ -127,7 +127,7 @@ class Router : public Tcp::ConnectionPool::UpstreamCallbacks, const int32_t seq_id_; Tcp::ConnectionPool::Cancellable* conn_pool_handle_{}; - Tcp::ConnectionPool::ConnectionData* conn_data_{}; + Tcp::ConnectionPool::ConnectionDataPtr conn_data_; Upstream::HostDescriptionConstSharedPtr upstream_host_; TransportPtr transport_; ProtocolType proto_type_{ProtocolType::Auto}; diff --git a/test/common/http/conn_manager_impl_test.cc b/test/common/http/conn_manager_impl_test.cc index a036e9edcadda..3e7c089b59d16 100644 --- a/test/common/http/conn_manager_impl_test.cc +++ b/test/common/http/conn_manager_impl_test.cc @@ -312,6 +312,8 @@ class HttpConnectionManagerImplTest : public Test, public ConnectionManagerConfi ConnectionManagerListenerStats listener_stats_; bool proxy_100_continue_ = false; Http::Http1Settings http1_settings_; + NiceMock upstream_conn_; // for websocket tests + NiceMock conn_pool_; // for websocket tests // TODO(mattklein123): Not all tests have been converted over to better setup. Convert the rest. MockStreamEncoder response_encoder_; @@ -1504,8 +1506,7 @@ TEST_F(HttpConnectionManagerImplTest, WebSocketNoThreadLocalCluster) { TEST_F(HttpConnectionManagerImplTest, WebSocketNoConnInPool) { setup(false, ""); - Upstream::MockHost::MockCreateConnectionData conn_info; - EXPECT_CALL(cluster_manager_, tcpConnForCluster_(_, _)).WillOnce(Return(conn_info)); + EXPECT_CALL(cluster_manager_, tcpConnPoolForCluster(_, _, _)).WillOnce(Return(nullptr)); expectOnUpstreamInitFailure(); EXPECT_EQ(1U, stats_.named_.downstream_cx_websocket_active_.value()); @@ -1520,14 +1521,37 @@ TEST_F(HttpConnectionManagerImplTest, WebSocketNoConnInPool) { TEST_F(HttpConnectionManagerImplTest, WebSocketDataAfterConnectFail) { setup(false, ""); - Upstream::MockHost::MockCreateConnectionData conn_info; - EXPECT_CALL(cluster_manager_, tcpConnForCluster_(_, _)).WillOnce(Return(conn_info)); + EXPECT_CALL(cluster_manager_, tcpConnPoolForCluster(_, _, _)).WillOnce(Return(&conn_pool_)); + + StreamDecoder* decoder = nullptr; + NiceMock encoder; + + configureRouteForWebsocket(route_config_provider_.route_config_->route_->route_entry_); + + EXPECT_CALL(*codec_, dispatch(_)).WillOnce(Invoke([&](Buffer::Instance& data) -> void { + decoder = &conn_manager_->newStream(encoder); + HeaderMapPtr headers{new TestHeaderMapImpl{{":authority", "host"}, + {":method", "GET"}, + {":path", "/"}, + {"connection", "Upgrade"}, + {"upgrade", "websocket"}}}; + decoder->decodeHeaders(std::move(headers), true); + data.drain(4); + })); + + Buffer::OwnedImpl fake_input("1234"); + conn_manager_->onData(fake_input, false); - expectOnUpstreamInitFailure(); EXPECT_EQ(1U, stats_.named_.downstream_cx_websocket_active_.value()); EXPECT_EQ(1U, stats_.named_.downstream_cx_websocket_total_.value()); EXPECT_EQ(0U, stats_.named_.downstream_cx_http1_active_.value()); + EXPECT_CALL(encoder, encodeHeaders(_, true)) + .WillOnce(Invoke([](const HeaderMap& headers, bool) -> void { + EXPECT_STREQ("504", headers.Status()->value().c_str()); + })); + + conn_pool_.poolFailure(Tcp::ConnectionPool::PoolFailureReason::RemoteConnectionFailure); filter_callbacks_.connection_.dispatcher_.clearDeferredDeleteList(); // This should get dropped, with no ASSERT or crash. @@ -1547,13 +1571,14 @@ TEST_F(HttpConnectionManagerImplTest, WebSocketMetadataMatch) { .WillByDefault(Return( &route_config_provider_.route_config_->route_->route_entry_.metadata_matches_criteria_)); - EXPECT_CALL(cluster_manager_, tcpConnForCluster_(_, _)) - .WillOnce(Invoke([&](const std::string&, Upstream::LoadBalancerContext* context) - -> Upstream::MockHost::MockCreateConnectionData { + EXPECT_CALL(cluster_manager_, tcpConnPoolForCluster(_, _, _)) + .WillOnce(Invoke([&](const std::string&, Upstream::ResourcePriority, + Upstream::LoadBalancerContext* context) + -> Tcp::ConnectionPool::MockInstance* { EXPECT_EQ( context->metadataMatchCriteria(), &route_config_provider_.route_config_->route_->route_entry_.metadata_matches_criteria_); - return {}; + return nullptr; })); expectOnUpstreamInitFailure(); @@ -1564,21 +1589,8 @@ TEST_F(HttpConnectionManagerImplTest, WebSocketMetadataMatch) { TEST_F(HttpConnectionManagerImplTest, WebSocketConnectTimeoutError) { setup(false, ""); - Event::MockTimer* connect_timer = - new NiceMock(&filter_callbacks_.connection_.dispatcher_); - NiceMock* upstream_connection = - new NiceMock(); - Upstream::MockHost::MockCreateConnectionData conn_info; - - conn_info.connection_ = upstream_connection; - conn_info.host_description_.reset(new Upstream::HostImpl( - cluster_manager_.thread_local_cluster_.cluster_.info_, "newhost", - Network::Utility::resolveUrl("tcp://127.0.0.1:80"), - envoy::api::v2::core::Metadata::default_instance(), 1, - envoy::api::v2::core::Locality().default_instance(), - envoy::api::v2::endpoint::Endpoint::HealthCheckConfig().default_instance())); - EXPECT_CALL(*connect_timer, enableTimer(_)); - EXPECT_CALL(cluster_manager_, tcpConnForCluster_("fake_cluster", _)).WillOnce(Return(conn_info)); + EXPECT_CALL(cluster_manager_, tcpConnPoolForCluster("fake_cluster", _, _)) + .WillOnce(Return(&conn_pool_)); StreamDecoder* decoder = nullptr; NiceMock encoder; @@ -1596,15 +1608,15 @@ TEST_F(HttpConnectionManagerImplTest, WebSocketConnectTimeoutError) { data.drain(4); })); + Buffer::OwnedImpl fake_input("1234"); + conn_manager_->onData(fake_input, false); + EXPECT_CALL(encoder, encodeHeaders(_, true)) .WillOnce(Invoke([](const HeaderMap& headers, bool) -> void { EXPECT_STREQ("504", headers.Status()->value().c_str()); })); + conn_pool_.poolFailure(Tcp::ConnectionPool::PoolFailureReason::Timeout); - Buffer::OwnedImpl fake_input("1234"); - conn_manager_->onData(fake_input, false); - - connect_timer->callback_(); filter_callbacks_.connection_.dispatcher_.clearDeferredDeleteList(); conn_manager_.reset(); } @@ -1612,21 +1624,8 @@ TEST_F(HttpConnectionManagerImplTest, WebSocketConnectTimeoutError) { TEST_F(HttpConnectionManagerImplTest, WebSocketConnectionFailure) { setup(false, ""); - Event::MockTimer* connect_timer = - new NiceMock(&filter_callbacks_.connection_.dispatcher_); - NiceMock* upstream_connection = - new NiceMock(); - Upstream::MockHost::MockCreateConnectionData conn_info; - - conn_info.connection_ = upstream_connection; - conn_info.host_description_.reset(new Upstream::HostImpl( - cluster_manager_.thread_local_cluster_.cluster_.info_, "newhost", - Network::Utility::resolveUrl("tcp://127.0.0.1:80"), - envoy::api::v2::core::Metadata::default_instance(), 1, - envoy::api::v2::core::Locality().default_instance(), - envoy::api::v2::endpoint::Endpoint::HealthCheckConfig().default_instance())); - EXPECT_CALL(*connect_timer, enableTimer(_)); - EXPECT_CALL(cluster_manager_, tcpConnForCluster_("fake_cluster", _)).WillOnce(Return(conn_info)); + EXPECT_CALL(cluster_manager_, tcpConnPoolForCluster("fake_cluster", _, _)) + .WillOnce(Return(&conn_pool_)); StreamDecoder* decoder = nullptr; NiceMock encoder; @@ -1644,16 +1643,16 @@ TEST_F(HttpConnectionManagerImplTest, WebSocketConnectionFailure) { data.drain(4); })); + Buffer::OwnedImpl fake_input("1234"); + conn_manager_->onData(fake_input, false); + EXPECT_CALL(encoder, encodeHeaders(_, true)) .WillOnce(Invoke([](const HeaderMap& headers, bool) -> void { EXPECT_STREQ("504", headers.Status()->value().c_str()); })); - Buffer::OwnedImpl fake_input("1234"); - conn_manager_->onData(fake_input, false); + conn_pool_.poolFailure(Tcp::ConnectionPool::PoolFailureReason::RemoteConnectionFailure); - // expectOnUpstreamInitFailure("504"); - upstream_connection->raiseEvent(Network::ConnectionEvent::RemoteClose); filter_callbacks_.connection_.dispatcher_.clearDeferredDeleteList(); conn_manager_.reset(); } @@ -1663,9 +1662,6 @@ TEST_F(HttpConnectionManagerImplTest, WebSocketPrefixAndAutoHostRewrite) { StreamDecoder* decoder = nullptr; NiceMock encoder; - NiceMock* upstream_connection = - new NiceMock(); - Upstream::MockHost::MockCreateConnectionData conn_info; HeaderMapPtr headers{new TestHeaderMapImpl{{":authority", "host"}, {":method", "GET"}, {":path", "/"}, @@ -1673,14 +1669,8 @@ TEST_F(HttpConnectionManagerImplTest, WebSocketPrefixAndAutoHostRewrite) { {"upgrade", "websocket"}}}; auto raw_header_ptr = headers.get(); - conn_info.connection_ = upstream_connection; - conn_info.host_description_.reset(new Upstream::HostImpl( - cluster_manager_.thread_local_cluster_.cluster_.info_, "newhost", - Network::Utility::resolveUrl("tcp://127.0.0.1:80"), - envoy::api::v2::core::Metadata::default_instance(), 1, - envoy::api::v2::core::Locality().default_instance(), - envoy::api::v2::endpoint::Endpoint::HealthCheckConfig().default_instance())); - EXPECT_CALL(cluster_manager_, tcpConnForCluster_("fake_cluster", _)).WillOnce(Return(conn_info)); + EXPECT_CALL(cluster_manager_, tcpConnPoolForCluster("fake_cluster", _, _)) + .WillOnce(Return(&conn_pool_)); configureRouteForWebsocket(route_config_provider_.route_config_->route_->route_entry_); @@ -1697,7 +1687,9 @@ TEST_F(HttpConnectionManagerImplTest, WebSocketPrefixAndAutoHostRewrite) { Buffer::OwnedImpl fake_input("1234"); conn_manager_->onData(fake_input, false); - upstream_connection->raiseEvent(Network::ConnectionEvent::Connected); + + conn_pool_.host_->hostname_ = "newhost"; + conn_pool_.poolReady(upstream_conn_); // rewritten authority header when auto_host_rewrite is true EXPECT_STREQ("newhost", raw_header_ptr->Host()->value().c_str()); @@ -1713,21 +1705,8 @@ TEST_F(HttpConnectionManagerImplTest, WebSocketPrefixAndAutoHostRewrite) { TEST_F(HttpConnectionManagerImplTest, WebSocketEarlyData) { setup(false, ""); - Event::MockTimer* connect_timer = - new NiceMock(&filter_callbacks_.connection_.dispatcher_); - NiceMock* upstream_connection = - new NiceMock(); - Upstream::MockHost::MockCreateConnectionData conn_info; - - conn_info.connection_ = upstream_connection; - conn_info.host_description_.reset(new Upstream::HostImpl( - cluster_manager_.thread_local_cluster_.cluster_.info_, "newhost", - Network::Utility::resolveUrl("tcp://127.0.0.1:80"), - envoy::api::v2::core::Metadata::default_instance(), 1, - envoy::api::v2::core::Locality().default_instance(), - envoy::api::v2::endpoint::Endpoint::HealthCheckConfig().default_instance())); - EXPECT_CALL(*connect_timer, enableTimer(_)); - EXPECT_CALL(cluster_manager_, tcpConnForCluster_("fake_cluster", _)).WillOnce(Return(conn_info)); + EXPECT_CALL(cluster_manager_, tcpConnPoolForCluster("fake_cluster", _, _)) + .WillOnce(Return(&conn_pool_)); StreamDecoder* decoder = nullptr; NiceMock encoder; @@ -1754,10 +1733,11 @@ TEST_F(HttpConnectionManagerImplTest, WebSocketEarlyData) { conn_manager_->onData(fake_input, false); - EXPECT_CALL(*upstream_connection, write(_, false)); - EXPECT_CALL(*upstream_connection, write(BufferEqual(&early_data), false)); + EXPECT_CALL(upstream_conn_, write(_, false)); + EXPECT_CALL(upstream_conn_, write(BufferEqual(&early_data), false)); EXPECT_CALL(filter_callbacks_.connection_, readDisable(false)); - upstream_connection->raiseEvent(Network::ConnectionEvent::Connected); + conn_pool_.poolReady(upstream_conn_); + filter_callbacks_.connection_.dispatcher_.clearDeferredDeleteList(); conn_manager_.reset(); } @@ -1765,21 +1745,8 @@ TEST_F(HttpConnectionManagerImplTest, WebSocketEarlyData) { TEST_F(HttpConnectionManagerImplTest, WebSocketEarlyDataConnectionFail) { setup(false, ""); - Event::MockTimer* connect_timer = - new NiceMock(&filter_callbacks_.connection_.dispatcher_); - NiceMock* upstream_connection = - new NiceMock(); - Upstream::MockHost::MockCreateConnectionData conn_info; - - conn_info.connection_ = upstream_connection; - conn_info.host_description_.reset(new Upstream::HostImpl( - cluster_manager_.thread_local_cluster_.cluster_.info_, "newhost", - Network::Utility::resolveUrl("tcp://127.0.0.1:80"), - envoy::api::v2::core::Metadata::default_instance(), 1, - envoy::api::v2::core::Locality().default_instance(), - envoy::api::v2::endpoint::Endpoint::HealthCheckConfig().default_instance())); - EXPECT_CALL(*connect_timer, enableTimer(_)); - EXPECT_CALL(cluster_manager_, tcpConnForCluster_("fake_cluster", _)).WillOnce(Return(conn_info)); + EXPECT_CALL(cluster_manager_, tcpConnPoolForCluster("fake_cluster", _, _)) + .WillOnce(Return(&conn_pool_)); StreamDecoder* decoder = nullptr; NiceMock encoder; @@ -1806,7 +1773,7 @@ TEST_F(HttpConnectionManagerImplTest, WebSocketEarlyDataConnectionFail) { conn_manager_->onData(fake_input, false); - upstream_connection->raiseEvent(Network::ConnectionEvent::RemoteClose); + conn_pool_.poolFailure(Tcp::ConnectionPool::PoolFailureReason::RemoteConnectionFailure); filter_callbacks_.connection_.dispatcher_.clearDeferredDeleteList(); // This should get dropped, with no crash or ASSERT. @@ -1819,21 +1786,8 @@ TEST_F(HttpConnectionManagerImplTest, WebSocketEarlyDataConnectionFail) { TEST_F(HttpConnectionManagerImplTest, WebSocketEarlyEndStream) { setup(false, ""); - Event::MockTimer* connect_timer = - new NiceMock(&filter_callbacks_.connection_.dispatcher_); - NiceMock* upstream_connection = - new NiceMock(); - Upstream::MockHost::MockCreateConnectionData conn_info; - - conn_info.connection_ = upstream_connection; - conn_info.host_description_.reset(new Upstream::HostImpl( - cluster_manager_.thread_local_cluster_.cluster_.info_, "newhost", - Network::Utility::resolveUrl("tcp://127.0.0.1:80"), - envoy::api::v2::core::Metadata::default_instance(), 1, - envoy::api::v2::core::Locality().default_instance(), - envoy::api::v2::endpoint::Endpoint::HealthCheckConfig().default_instance())); - EXPECT_CALL(*connect_timer, enableTimer(_)); - EXPECT_CALL(cluster_manager_, tcpConnForCluster_("fake_cluster", _)).WillOnce(Return(conn_info)); + EXPECT_CALL(cluster_manager_, tcpConnPoolForCluster("fake_cluster", _, _)) + .WillOnce(Return(&conn_pool_)); StreamDecoder* decoder = nullptr; NiceMock encoder; @@ -1855,9 +1809,9 @@ TEST_F(HttpConnectionManagerImplTest, WebSocketEarlyEndStream) { Buffer::OwnedImpl fake_input("1234"); conn_manager_->onData(fake_input, true); - EXPECT_CALL(*upstream_connection, write(_, false)); - EXPECT_CALL(*upstream_connection, write(_, true)).Times(0); - upstream_connection->raiseEvent(Network::ConnectionEvent::Connected); + EXPECT_CALL(upstream_conn_, write(_, false)); + EXPECT_CALL(upstream_conn_, write(_, true)).Times(0); + conn_pool_.poolReady(upstream_conn_); filter_callbacks_.connection_.dispatcher_.clearDeferredDeleteList(); conn_manager_.reset(); } diff --git a/test/common/network/filter_manager_impl_test.cc b/test/common/network/filter_manager_impl_test.cc index c8469a99f9336..9bcb366c4c42f 100644 --- a/test/common/network/filter_manager_impl_test.cc +++ b/test/common/network/filter_manager_impl_test.cc @@ -153,6 +153,8 @@ TEST_F(NetworkFilterManagerTest, RateLimitAndTcpProxy) { InSequence s; NiceMock factory_context; NiceMock connection; + NiceMock upstream_connection; + NiceMock conn_pool; FilterManagerImpl manager(connection, *this); std::string rl_json = R"EOF( @@ -201,21 +203,15 @@ TEST_F(NetworkFilterManagerTest, RateLimitAndTcpProxy) { EXPECT_EQ(manager.initializeReadFilters(), true); - NiceMock* upstream_connection = - new NiceMock(); - Upstream::MockHost::MockCreateConnectionData conn_info; - conn_info.connection_ = upstream_connection; - conn_info.host_description_ = Upstream::makeTestHost( - factory_context.cluster_manager_.thread_local_cluster_.cluster_.info_, "tcp://127.0.0.1:80"); - EXPECT_CALL(factory_context.cluster_manager_, tcpConnForCluster_("fake_cluster", _)) - .WillOnce(Return(conn_info)); + EXPECT_CALL(factory_context.cluster_manager_, tcpConnPoolForCluster("fake_cluster", _, _)) + .WillOnce(Return(&conn_pool)); request_callbacks->complete(RateLimit::LimitStatus::OK); - upstream_connection->raiseEvent(Network::ConnectionEvent::Connected); + conn_pool.poolReady(upstream_connection); Buffer::OwnedImpl buffer("hello"); - EXPECT_CALL(*upstream_connection, write(BufferEqual(&buffer), _)); + EXPECT_CALL(upstream_connection, write(BufferEqual(&buffer), _)); read_buffer_.add("hello"); manager.onRead(); } diff --git a/test/common/tcp/conn_pool_test.cc b/test/common/tcp/conn_pool_test.cc index 90961972724e1..6b275d59c6fe7 100644 --- a/test/common/tcp/conn_pool_test.cc +++ b/test/common/tcp/conn_pool_test.cc @@ -34,9 +34,9 @@ namespace Tcp { * Mock callbacks used for conn pool testing. */ struct ConnPoolCallbacks : public Tcp::ConnectionPool::Callbacks { - void onPoolReady(ConnectionPool::ConnectionData& conn, + void onPoolReady(ConnectionPool::ConnectionDataPtr&& conn, Upstream::HostDescriptionConstSharedPtr host) override { - conn_data_ = &conn; + conn_data_ = std::move(conn); host_ = host; pool_ready_.ready(); } @@ -50,7 +50,7 @@ struct ConnPoolCallbacks : public Tcp::ConnectionPool::Callbacks { ReadyWatcher pool_failure_; ReadyWatcher pool_ready_; - ConnectionPool::ConnectionData* conn_data_{}; + ConnectionPool::ConnectionDataPtr conn_data_{}; absl::optional reason_; Upstream::HostDescriptionConstSharedPtr host_; }; @@ -232,7 +232,7 @@ struct ActiveTestConn { void expectNewConn() { EXPECT_CALL(callbacks_.pool_ready_, ready()); } - void releaseConn() { callbacks_.conn_data_->release(); } + void releaseConn() { callbacks_.conn_data_.reset(); } void verifyConn() { EXPECT_EQ(&callbacks_.conn_data_->connection(), @@ -589,7 +589,7 @@ TEST_F(TcpConnPoolImplTest, DisconnectWhilePending) { conn_pool_.test_conns_[0].connection_->raiseEvent(Network::ConnectionEvent::Connected); EXPECT_CALL(conn_pool_, onConnReleasedForTest()); - callbacks2.conn_data_->release(); + callbacks2.conn_data_.reset(); // Disconnect EXPECT_CALL(conn_pool_, onConnDestroyedForTest()); @@ -625,11 +625,11 @@ TEST_F(TcpConnPoolImplTest, MaxConnections) { EXPECT_CALL(conn_pool_, onConnReleasedForTest()); conn_pool_.expectEnableUpstreamReady(); EXPECT_CALL(callbacks2.pool_ready_, ready()); - callbacks.conn_data_->release(); + callbacks.conn_data_.reset(); conn_pool_.expectAndRunUpstreamReady(); EXPECT_CALL(conn_pool_, onConnReleasedForTest()); - callbacks2.conn_data_->release(); + callbacks2.conn_data_.reset(); // Cause the connection to go away. EXPECT_CALL(conn_pool_, onConnDestroyedForTest()); @@ -657,7 +657,7 @@ TEST_F(TcpConnPoolImplTest, MaxRequestsPerConnection) { EXPECT_CALL(conn_pool_, onConnReleasedForTest()); EXPECT_CALL(conn_pool_, onConnDestroyedForTest()); - callbacks.conn_data_->release(); + callbacks.conn_data_.reset(); dispatcher_.clearDeferredDeleteList(); EXPECT_EQ(0U, cluster_->stats_.upstream_cx_destroy_with_active_rq_.value()); @@ -731,6 +731,29 @@ TEST_F(TcpConnPoolImplTest, DrainWhileConnecting) { dispatcher_.clearDeferredDeleteList(); } +TEST_F(TcpConnPoolImplTest, DrainOnClose) { + ReadyWatcher drained; + EXPECT_CALL(drained, ready()); + conn_pool_.addDrainedCallback([&]() -> void { drained.ready(); }); + + InSequence s; + ActiveTestConn c1(*this, 0, ActiveTestConn::Type::CreateConnection); + + ConnectionPool::MockUpstreamCallbacks callbacks; + c1.callbacks_.conn_data_->addUpstreamCallbacks(callbacks); + + EXPECT_CALL(drained, ready()); + EXPECT_CALL(callbacks, onEvent(Network::ConnectionEvent::RemoteClose)) + .WillOnce(Invoke([&](Network::ConnectionEvent event) -> void { + EXPECT_EQ(Network::ConnectionEvent::RemoteClose, event); + c1.releaseConn(); + })); + conn_pool_.test_conns_[0].connection_->raiseEvent(Network::ConnectionEvent::RemoteClose); + + EXPECT_CALL(conn_pool_, onConnDestroyedForTest()); + dispatcher_.clearDeferredDeleteList(); +} + TEST_F(TcpConnPoolImplDestructorTest, TestBusyConnectionsAreClosed) { prepareConn(); @@ -743,7 +766,7 @@ TEST_F(TcpConnPoolImplDestructorTest, TestReadyConnectionsAreClosed) { prepareConn(); // Transition connection to ready list - callbacks_->conn_data_->release(); + callbacks_->conn_data_.reset(); EXPECT_CALL(*connection_, close(Network::ConnectionCloseType::NoFlush)); EXPECT_CALL(dispatcher_, clearDeferredDeleteList()); diff --git a/test/common/tcp_proxy/tcp_proxy_test.cc b/test/common/tcp_proxy/tcp_proxy_test.cc index a8616176943ba..73568c61de74e 100644 --- a/test/common/tcp_proxy/tcp_proxy_test.cc +++ b/test/common/tcp_proxy/tcp_proxy_test.cc @@ -18,6 +18,7 @@ #include "test/mocks/network/mocks.h" #include "test/mocks/runtime/mocks.h" #include "test/mocks/server/mocks.h" +#include "test/mocks/tcp/mocks.h" #include "test/mocks/upstream/host.h" #include "test/mocks/upstream/mocks.h" #include "test/test_common/printers.h" @@ -25,6 +26,7 @@ #include "gmock/gmock.h" #include "gtest/gtest.h" +using testing::Invoke; using testing::MatchesRegex; using testing::NiceMock; using testing::Return; @@ -379,53 +381,50 @@ class TcpProxyTest : public testing::Test { upstream_local_address_ = Network::Utility::resolveUrl("tcp://2.2.2.2:50000"); upstream_remote_address_ = Network::Utility::resolveUrl("tcp://127.0.0.1:80"); if (connections >= 1) { - { - testing::InSequence sequence; - for (uint32_t i = 0; i < connections; i++) { - connect_timers_.push_back( - new NiceMock(&filter_callbacks_.connection_.dispatcher_)); - EXPECT_CALL(*connect_timers_.at(i), enableTimer(_)); - } - } - for (uint32_t i = 0; i < connections; i++) { - upstream_connections_.push_back(new NiceMock()); + upstream_connections_.push_back( + std::make_unique>()); + upstream_connection_data_.push_back( + std::make_unique>()); + ON_CALL(*upstream_connection_data_.back(), connection()) + .WillByDefault(ReturnRef(*upstream_connections_.back())); upstream_hosts_.push_back(std::make_shared>()); - conn_infos_.push_back(Upstream::MockHost::MockCreateConnectionData()); - conn_infos_.at(i).connection_ = upstream_connections_.back(); - conn_infos_.at(i).host_description_ = upstream_hosts_.back(); + conn_pool_handles_.push_back( + std::make_unique>()); ON_CALL(*upstream_hosts_.at(i), cluster()) .WillByDefault(ReturnPointee( factory_context_.cluster_manager_.thread_local_cluster_.cluster_.info_)); ON_CALL(*upstream_hosts_.at(i), address()).WillByDefault(Return(upstream_remote_address_)); upstream_connections_.at(i)->local_address_ = upstream_local_address_; - EXPECT_CALL(*upstream_connections_.at(i), addReadFilter(_)) - .WillOnce(SaveArg<0>(&upstream_read_filter_)); EXPECT_CALL(*upstream_connections_.at(i), dispatcher()) .WillRepeatedly(ReturnRef(filter_callbacks_.connection_.dispatcher_)); - EXPECT_CALL(*upstream_connections_.at(i), enableHalfClose(true)); } } { testing::InSequence sequence; for (uint32_t i = 0; i < connections; i++) { - EXPECT_CALL(factory_context_.cluster_manager_, tcpConnForCluster_("fake_cluster", _)) - .WillOnce(Return(conn_infos_.at(i))) + EXPECT_CALL(factory_context_.cluster_manager_, tcpConnPoolForCluster("fake_cluster", _, _)) + .WillOnce(Return(&conn_pool_)) + .RetiresOnSaturation(); + EXPECT_CALL(conn_pool_, newConnection(_)) + .WillOnce(Invoke( + [=](Tcp::ConnectionPool::Callbacks& cb) -> Tcp::ConnectionPool::Cancellable* { + conn_pool_callbacks_.push_back(&cb); + return conn_pool_handles_.at(i).get(); + })) .RetiresOnSaturation(); } - EXPECT_CALL(factory_context_.cluster_manager_, tcpConnForCluster_("fake_cluster", _)) - .WillRepeatedly(Return(Upstream::MockHost::MockCreateConnectionData())); + EXPECT_CALL(factory_context_.cluster_manager_, tcpConnPoolForCluster("fake_cluster", _, _)) + .WillRepeatedly(Return(nullptr)); } filter_.reset(new Filter(config_, factory_context_.cluster_manager_)); EXPECT_CALL(filter_callbacks_.connection_, readDisable(true)); EXPECT_CALL(filter_callbacks_.connection_, enableHalfClose(true)); filter_->initializeReadFilterCallbacks(filter_callbacks_); - EXPECT_EQ(connections >= 1 ? Network::FilterStatus::Continue - : Network::FilterStatus::StopIteration, - filter_->onNewConnection()); + EXPECT_EQ(Network::FilterStatus::StopIteration, filter_->onNewConnection()); EXPECT_EQ(absl::optional(), filter_->computeHashKey()); EXPECT_EQ(&filter_callbacks_.connection_, filter_->downstreamConnection()); @@ -435,19 +434,37 @@ class TcpProxyTest : public testing::Test { void setup(uint32_t connections) { setup(connections, defaultConfig()); } void raiseEventUpstreamConnected(uint32_t conn_index) { - EXPECT_CALL(*connect_timers_.at(conn_index), disableTimer()); EXPECT_CALL(filter_callbacks_.connection_, readDisable(false)); - upstream_connections_.at(conn_index)->raiseEvent(Network::ConnectionEvent::Connected); + EXPECT_CALL(*upstream_connection_data_.at(conn_index), addUpstreamCallbacks(_)) + .WillOnce(Invoke([=](Tcp::ConnectionPool::UpstreamCallbacks& cb) -> void { + upstream_callbacks_ = &cb; + + // Simulate TCP conn pool upstream callbacks. This is safe because the TCP proxy never + // releases a connection so all events go to the same UpstreamCallbacks instance. + upstream_connections_.at(conn_index)->addConnectionCallbacks(cb); + })); + EXPECT_CALL(*upstream_connections_.at(conn_index), enableHalfClose(true)); + conn_pool_callbacks_.at(conn_index) + ->onPoolReady(std::move(upstream_connection_data_.at(conn_index)), + upstream_hosts_.at(conn_index)); + } + + void raiseEventUpstreamConnectFailed(uint32_t conn_index, + Tcp::ConnectionPool::PoolFailureReason reason) { + conn_pool_callbacks_.at(conn_index)->onPoolFailure(reason, upstream_hosts_.at(conn_index)); } ConfigSharedPtr config_; NiceMock filter_callbacks_; NiceMock factory_context_; std::vector>> upstream_hosts_{}; - std::vector*> upstream_connections_{}; - std::vector conn_infos_; - Network::ReadFilterSharedPtr upstream_read_filter_; - std::vector*> connect_timers_; + std::vector>> upstream_connections_{}; + std::vector>> + upstream_connection_data_{}; + std::vector conn_pool_callbacks_; + std::vector>> conn_pool_handles_; + NiceMock conn_pool_; + Tcp::ConnectionPool::UpstreamCallbacks* upstream_callbacks_; std::unique_ptr filter_; StringViewSaver access_log_data_; Network::Address::InstanceConstSharedPtr upstream_local_address_; @@ -461,67 +478,65 @@ TEST_F(TcpProxyTest, HalfCloseProxy) { EXPECT_CALL(filter_callbacks_.connection_, close(_)).Times(0); EXPECT_CALL(*upstream_connections_.at(0), close(_)).Times(0); + raiseEventUpstreamConnected(0); + Buffer::OwnedImpl buffer("hello"); EXPECT_CALL(*upstream_connections_.at(0), write(BufferEqual(&buffer), true)); filter_->onData(buffer, true); - raiseEventUpstreamConnected(0); - Buffer::OwnedImpl response("world"); EXPECT_CALL(filter_callbacks_.connection_, write(BufferEqual(&response), true)); - upstream_read_filter_->onData(response, true); + upstream_callbacks_->onUpstreamData(response, true); EXPECT_CALL(filter_callbacks_.connection_, close(_)); - EXPECT_CALL(filter_callbacks_.connection_.dispatcher_, - deferredDelete_(upstream_connections_.at(0))); - upstream_connections_.at(0)->raiseEvent(Network::ConnectionEvent::RemoteClose); + upstream_callbacks_->onEvent(Network::ConnectionEvent::RemoteClose); } // Test that downstream is closed after an upstream LocalClose. TEST_F(TcpProxyTest, UpstreamLocalDisconnect) { setup(1); + raiseEventUpstreamConnected(0); + Buffer::OwnedImpl buffer("hello"); EXPECT_CALL(*upstream_connections_.at(0), write(BufferEqual(&buffer), false)); filter_->onData(buffer, false); - raiseEventUpstreamConnected(0); - Buffer::OwnedImpl response("world"); EXPECT_CALL(filter_callbacks_.connection_, write(BufferEqual(&response), _)); - upstream_read_filter_->onData(response, false); + upstream_callbacks_->onUpstreamData(response, false); EXPECT_CALL(filter_callbacks_.connection_, close(_)); - upstream_connections_.at(0)->raiseEvent(Network::ConnectionEvent::LocalClose); + upstream_callbacks_->onEvent(Network::ConnectionEvent::LocalClose); } // Test that downstream is closed after an upstream RemoteClose. TEST_F(TcpProxyTest, UpstreamRemoteDisconnect) { setup(1); + raiseEventUpstreamConnected(0); + Buffer::OwnedImpl buffer("hello"); EXPECT_CALL(*upstream_connections_.at(0), write(BufferEqual(&buffer), false)); filter_->onData(buffer, false); - raiseEventUpstreamConnected(0); - Buffer::OwnedImpl response("world"); EXPECT_CALL(filter_callbacks_.connection_, write(BufferEqual(&response), _)); - upstream_read_filter_->onData(response, false); + upstream_callbacks_->onUpstreamData(response, false); EXPECT_CALL(filter_callbacks_.connection_, close(Network::ConnectionCloseType::FlushWrite)); - upstream_connections_.at(0)->raiseEvent(Network::ConnectionEvent::RemoteClose); + upstream_callbacks_->onEvent(Network::ConnectionEvent::RemoteClose); } // Test that reconnect is attempted after a local connect failure TEST_F(TcpProxyTest, ConnectAttemptsUpstreamLocalFail) { envoy::config::filter::network::tcp_proxy::v2::TcpProxy config = defaultConfig(); config.mutable_max_connect_attempts()->set_value(2); + setup(2, config); - EXPECT_CALL(filter_callbacks_.connection_.dispatcher_, - deferredDelete_(upstream_connections_.at(0))); - upstream_connections_.at(0)->raiseEvent(Network::ConnectionEvent::LocalClose); + raiseEventUpstreamConnectFailed(0, + Tcp::ConnectionPool::PoolFailureReason::LocalConnectionFailure); raiseEventUpstreamConnected(1); EXPECT_EQ(0U, factory_context_.cluster_manager_.thread_local_cluster_.cluster_.info_->stats_store_ @@ -535,9 +550,8 @@ TEST_F(TcpProxyTest, ConnectAttemptsUpstreamRemoteFail) { config.mutable_max_connect_attempts()->set_value(2); setup(2, config); - EXPECT_CALL(filter_callbacks_.connection_.dispatcher_, - deferredDelete_(upstream_connections_.at(0))); - upstream_connections_.at(0)->raiseEvent(Network::ConnectionEvent::RemoteClose); + raiseEventUpstreamConnectFailed(0, + Tcp::ConnectionPool::PoolFailureReason::RemoteConnectionFailure); raiseEventUpstreamConnected(1); EXPECT_EQ(0U, factory_context_.cluster_manager_.thread_local_cluster_.cluster_.info_->stats_store_ @@ -551,8 +565,7 @@ TEST_F(TcpProxyTest, ConnectAttemptsUpstreamTimeout) { config.mutable_max_connect_attempts()->set_value(2); setup(2, config); - EXPECT_CALL(*upstream_connections_.at(0), close(Network::ConnectionCloseType::NoFlush)); - connect_timers_.at(0)->callback_(); + raiseEventUpstreamConnectFailed(0, Tcp::ConnectionPool::PoolFailureReason::Timeout); raiseEventUpstreamConnected(1); EXPECT_EQ(0U, factory_context_.cluster_manager_.thread_local_cluster_.cluster_.info_->stats_store_ @@ -566,38 +579,21 @@ TEST_F(TcpProxyTest, ConnectAttemptsLimit) { config.mutable_max_connect_attempts()->set_value(3); setup(3, config); - { - testing::InSequence sequence; - EXPECT_CALL(*upstream_connections_.at(0), close(Network::ConnectionCloseType::NoFlush)); - EXPECT_CALL(filter_callbacks_.connection_.dispatcher_, - deferredDelete_(upstream_connections_.at(0))); - EXPECT_CALL(filter_callbacks_.connection_.dispatcher_, - deferredDelete_(upstream_connections_.at(1))); - EXPECT_CALL(filter_callbacks_.connection_.dispatcher_, - deferredDelete_(upstream_connections_.at(2))); - EXPECT_CALL(filter_callbacks_.connection_, close(Network::ConnectionCloseType::NoFlush)); - } + EXPECT_CALL(upstream_hosts_.at(0)->outlier_detector_, + putResult(Upstream::Outlier::Result::TIMEOUT)); + EXPECT_CALL(upstream_hosts_.at(1)->outlier_detector_, + putResult(Upstream::Outlier::Result::CONNECT_FAILED)); + EXPECT_CALL(upstream_hosts_.at(2)->outlier_detector_, + putResult(Upstream::Outlier::Result::CONNECT_FAILED)); - // Try both failure modes - connect_timers_.at(0)->callback_(); - upstream_connections_.at(1)->raiseEvent(Network::ConnectionEvent::RemoteClose); - upstream_connections_.at(2)->raiseEvent(Network::ConnectionEvent::RemoteClose); + EXPECT_CALL(filter_callbacks_.connection_, close(Network::ConnectionCloseType::NoFlush)); - EXPECT_EQ(1U, factory_context_.cluster_manager_.thread_local_cluster_.cluster_.info_->stats_store_ - .counter("upstream_cx_connect_timeout") - .value()); - EXPECT_EQ(2U, factory_context_.cluster_manager_.thread_local_cluster_.cluster_.info_->stats_store_ - .counter("upstream_cx_connect_fail") - .value()); - EXPECT_EQ(1U, factory_context_.cluster_manager_.thread_local_cluster_.cluster_.info_->stats_store_ - .counter("upstream_cx_connect_attempts_exceeded") - .value()); - EXPECT_EQ(0U, factory_context_.cluster_manager_.thread_local_cluster_.cluster_.info_->stats_store_ - .counter("upstream_cx_overflow") - .value()); - EXPECT_EQ(0U, factory_context_.cluster_manager_.thread_local_cluster_.cluster_.info_->stats_store_ - .counter("upstream_cx_no_successful_host") - .value()); + // Try both failure modes + raiseEventUpstreamConnectFailed(0, Tcp::ConnectionPool::PoolFailureReason::Timeout); + raiseEventUpstreamConnectFailed(1, + Tcp::ConnectionPool::PoolFailureReason::RemoteConnectionFailure); + raiseEventUpstreamConnectFailed(2, + Tcp::ConnectionPool::PoolFailureReason::RemoteConnectionFailure); } // Test that the tcp proxy sends the correct notifications to the outlier detector @@ -608,11 +604,12 @@ TEST_F(TcpProxyTest, OutlierDetection) { EXPECT_CALL(upstream_hosts_.at(0)->outlier_detector_, putResult(Upstream::Outlier::Result::TIMEOUT)); - connect_timers_.at(0)->callback_(); + raiseEventUpstreamConnectFailed(0, Tcp::ConnectionPool::PoolFailureReason::Timeout); EXPECT_CALL(upstream_hosts_.at(1)->outlier_detector_, putResult(Upstream::Outlier::Result::CONNECT_FAILED)); - upstream_connections_.at(1)->raiseEvent(Network::ConnectionEvent::RemoteClose); + raiseEventUpstreamConnectFailed(1, + Tcp::ConnectionPool::PoolFailureReason::RemoteConnectionFailure); EXPECT_CALL(upstream_hosts_.at(2)->outlier_detector_, putResult(Upstream::Outlier::Result::SUCCESS)); @@ -622,21 +619,21 @@ TEST_F(TcpProxyTest, OutlierDetection) { TEST_F(TcpProxyTest, UpstreamDisconnectDownstreamFlowControl) { setup(1); + raiseEventUpstreamConnected(0); + Buffer::OwnedImpl buffer("hello"); EXPECT_CALL(*upstream_connections_.at(0), write(BufferEqual(&buffer), _)); filter_->onData(buffer, false); - raiseEventUpstreamConnected(0); - Buffer::OwnedImpl response("world"); EXPECT_CALL(filter_callbacks_.connection_, write(BufferEqual(&response), _)); - upstream_read_filter_->onData(response, false); + upstream_callbacks_->onUpstreamData(response, false); EXPECT_CALL(*upstream_connections_.at(0), readDisable(true)); filter_callbacks_.connection_.runHighWatermarkCallbacks(); EXPECT_CALL(filter_callbacks_.connection_, close(Network::ConnectionCloseType::FlushWrite)); - upstream_connections_.at(0)->raiseEvent(Network::ConnectionEvent::RemoteClose); + upstream_callbacks_->onEvent(Network::ConnectionEvent::RemoteClose); filter_callbacks_.connection_.runLowWatermarkCallbacks(); } @@ -644,15 +641,15 @@ TEST_F(TcpProxyTest, UpstreamDisconnectDownstreamFlowControl) { TEST_F(TcpProxyTest, DownstreamDisconnectRemote) { setup(1); + raiseEventUpstreamConnected(0); + Buffer::OwnedImpl buffer("hello"); EXPECT_CALL(*upstream_connections_.at(0), write(BufferEqual(&buffer), _)); filter_->onData(buffer, false); - raiseEventUpstreamConnected(0); - Buffer::OwnedImpl response("world"); EXPECT_CALL(filter_callbacks_.connection_, write(BufferEqual(&response), _)); - upstream_read_filter_->onData(response, false); + upstream_callbacks_->onUpstreamData(response, false); EXPECT_CALL(*upstream_connections_.at(0), close(Network::ConnectionCloseType::FlushWrite)); filter_callbacks_.connection_.raiseEvent(Network::ConnectionEvent::RemoteClose); @@ -661,15 +658,15 @@ TEST_F(TcpProxyTest, DownstreamDisconnectRemote) { TEST_F(TcpProxyTest, DownstreamDisconnectLocal) { setup(1); + raiseEventUpstreamConnected(0); + Buffer::OwnedImpl buffer("hello"); EXPECT_CALL(*upstream_connections_.at(0), write(BufferEqual(&buffer), _)); filter_->onData(buffer, false); - raiseEventUpstreamConnected(0); - Buffer::OwnedImpl response("world"); EXPECT_CALL(filter_callbacks_.connection_, write(BufferEqual(&response), _)); - upstream_read_filter_->onData(response, false); + upstream_callbacks_->onUpstreamData(response, false); EXPECT_CALL(*upstream_connections_.at(0), close(Network::ConnectionCloseType::NoFlush)); filter_callbacks_.connection_.raiseEvent(Network::ConnectionEvent::LocalClose); @@ -678,16 +675,8 @@ TEST_F(TcpProxyTest, DownstreamDisconnectLocal) { TEST_F(TcpProxyTest, UpstreamConnectTimeout) { setup(1, accessLogConfig("%RESPONSE_FLAGS%")); - Buffer::OwnedImpl buffer("hello"); - EXPECT_CALL(*upstream_connections_.at(0), write(BufferEqual(&buffer), _)); - filter_->onData(buffer, false); - EXPECT_CALL(filter_callbacks_.connection_, close(Network::ConnectionCloseType::NoFlush)); - EXPECT_CALL(*upstream_connections_.at(0), close(Network::ConnectionCloseType::NoFlush)); - connect_timers_.at(0)->callback_(); - EXPECT_EQ(1U, factory_context_.cluster_manager_.thread_local_cluster_.cluster_.info_->stats_store_ - .counter("upstream_cx_connect_timeout") - .value()); + raiseEventUpstreamConnectFailed(0, Tcp::ConnectionPool::PoolFailureReason::Timeout); filter_.reset(); EXPECT_EQ(access_log_data_, "UF"); @@ -744,15 +733,9 @@ TEST_F(TcpProxyTest, DisconnectBeforeData) { TEST_F(TcpProxyTest, UpstreamConnectFailure) { setup(1, accessLogConfig("%RESPONSE_FLAGS%")); - Buffer::OwnedImpl buffer("hello"); - filter_->onData(buffer, false); - EXPECT_CALL(filter_callbacks_.connection_, close(Network::ConnectionCloseType::NoFlush)); - EXPECT_CALL(*connect_timers_.at(0), disableTimer()); - upstream_connections_.at(0)->raiseEvent(Network::ConnectionEvent::RemoteClose); - EXPECT_EQ(1U, factory_context_.cluster_manager_.thread_local_cluster_.cluster_.info_->stats_store_ - .counter("upstream_cx_connect_fail") - .value()); + raiseEventUpstreamConnectFailed(0, + Tcp::ConnectionPool::PoolFailureReason::RemoteConnectionFailure); filter_.reset(); EXPECT_EQ(access_log_data_, "UF"); @@ -770,10 +753,6 @@ TEST_F(TcpProxyTest, UpstreamConnectionLimit) { filter_->initializeReadFilterCallbacks(filter_callbacks_); filter_->onNewConnection(); - EXPECT_EQ(1U, factory_context_.cluster_manager_.thread_local_cluster_.cluster_.info_->stats_store_ - .counter("upstream_cx_overflow") - .value()); - filter_.reset(); EXPECT_EQ(access_log_data_, "UO"); } @@ -795,7 +774,7 @@ TEST_F(TcpProxyTest, IdleTimeout) { buffer.add("hello2"); EXPECT_CALL(*idle_timer, enableTimer(std::chrono::milliseconds(1000))); - upstream_read_filter_->onData(buffer, false); + upstream_callbacks_->onUpstreamData(buffer, false); EXPECT_CALL(*idle_timer, enableTimer(std::chrono::milliseconds(1000))); filter_callbacks_.connection_.raiseBytesSentCallbacks(1); @@ -834,12 +813,13 @@ TEST_F(TcpProxyTest, IdleTimerDisabledUpstreamClose) { raiseEventUpstreamConnected(0); EXPECT_CALL(*idle_timer, disableTimer()); - upstream_connections_.at(0)->raiseEvent(Network::ConnectionEvent::RemoteClose); + upstream_callbacks_->onEvent(Network::ConnectionEvent::RemoteClose); } // Test that access log fields %UPSTREAM_HOST% and %UPSTREAM_CLUSTER% are correctly logged. TEST_F(TcpProxyTest, AccessLogUpstreamHost) { setup(1, accessLogConfig("%UPSTREAM_HOST% %UPSTREAM_CLUSTER%")); + raiseEventUpstreamConnected(0); filter_.reset(); EXPECT_EQ(access_log_data_, "127.0.0.1:80 fake_cluster"); } @@ -847,6 +827,7 @@ TEST_F(TcpProxyTest, AccessLogUpstreamHost) { // Test that access log field %UPSTREAM_LOCAL_ADDRESS% is correctly logged. TEST_F(TcpProxyTest, AccessLogUpstreamLocalAddress) { setup(1, accessLogConfig("%UPSTREAM_LOCAL_ADDRESS%")); + raiseEventUpstreamConnected(0); filter_.reset(); EXPECT_EQ(access_log_data_, "2.2.2.2:50000"); } @@ -873,10 +854,10 @@ TEST_F(TcpProxyTest, AccessLogBytesRxTxDuration) { Buffer::OwnedImpl buffer("a"); filter_->onData(buffer, false); Buffer::OwnedImpl response("bb"); - upstream_read_filter_->onData(response, false); + upstream_callbacks_->onUpstreamData(response, false); std::this_thread::sleep_for(std::chrono::milliseconds(1)); - upstream_connections_.at(0)->raiseEvent(Network::ConnectionEvent::RemoteClose); + upstream_callbacks_->onEvent(Network::ConnectionEvent::RemoteClose); filter_.reset(); EXPECT_THAT(access_log_data_, @@ -889,7 +870,8 @@ TEST_F(TcpProxyTest, UpstreamFlushNoTimeout) { setup(1); raiseEventUpstreamConnected(0); - EXPECT_CALL(*upstream_connections_.at(0), close(Network::ConnectionCloseType::FlushWrite)) + EXPECT_CALL(*upstream_connections_.at(0), + close(Network::ConnectionCloseType::FlushWrite)) .WillOnce(Return()); // Cancel default action of raising LocalClose EXPECT_CALL(*upstream_connections_.at(0), state()) .WillOnce(Return(Network::Connection::State::Closing)); @@ -902,7 +884,7 @@ TEST_F(TcpProxyTest, UpstreamFlushNoTimeout) { upstream_connections_.at(0)->raiseBytesSentCallbacks(1); // Simulate flush complete. - upstream_connections_.at(0)->raiseEvent(Network::ConnectionEvent::LocalClose); + upstream_callbacks_->onEvent(Network::ConnectionEvent::LocalClose); EXPECT_EQ(1U, config_->stats().upstream_flush_total_.value()); EXPECT_EQ(0U, config_->stats().upstream_flush_active_.value()); } @@ -919,7 +901,8 @@ TEST_F(TcpProxyTest, UpstreamFlushTimeoutConfigured) { EXPECT_CALL(*idle_timer, enableTimer(_)); raiseEventUpstreamConnected(0); - EXPECT_CALL(*upstream_connections_.at(0), close(Network::ConnectionCloseType::FlushWrite)) + EXPECT_CALL(*upstream_connections_.at(0), + close(Network::ConnectionCloseType::FlushWrite)) .WillOnce(Return()); // Cancel default action of raising LocalClose EXPECT_CALL(*upstream_connections_.at(0), state()) .WillOnce(Return(Network::Connection::State::Closing)); @@ -933,7 +916,7 @@ TEST_F(TcpProxyTest, UpstreamFlushTimeoutConfigured) { // Simulate flush complete. EXPECT_CALL(*idle_timer, disableTimer()); - upstream_connections_.at(0)->raiseEvent(Network::ConnectionEvent::LocalClose); + upstream_callbacks_->onEvent(Network::ConnectionEvent::LocalClose); EXPECT_EQ(1U, config_->stats().upstream_flush_total_.value()); EXPECT_EQ(0U, config_->stats().upstream_flush_active_.value()); EXPECT_EQ(0U, config_->stats().idle_timeout_.value()); @@ -950,7 +933,8 @@ TEST_F(TcpProxyTest, UpstreamFlushTimeoutExpired) { EXPECT_CALL(*idle_timer, enableTimer(_)); raiseEventUpstreamConnected(0); - EXPECT_CALL(*upstream_connections_.at(0), close(Network::ConnectionCloseType::FlushWrite)) + EXPECT_CALL(*upstream_connections_.at(0), + close(Network::ConnectionCloseType::FlushWrite)) .WillOnce(Return()); // Cancel default action of raising LocalClose EXPECT_CALL(*upstream_connections_.at(0), state()) .WillOnce(Return(Network::Connection::State::Closing)); @@ -972,7 +956,8 @@ TEST_F(TcpProxyTest, UpstreamFlushReceiveUpstreamData) { setup(1); raiseEventUpstreamConnected(0); - EXPECT_CALL(*upstream_connections_.at(0), close(Network::ConnectionCloseType::FlushWrite)) + EXPECT_CALL(*upstream_connections_.at(0), + close(Network::ConnectionCloseType::FlushWrite)) .WillOnce(Return()); // Cancel default action of raising LocalClose EXPECT_CALL(*upstream_connections_.at(0), state()) .WillOnce(Return(Network::Connection::State::Closing)); @@ -984,7 +969,7 @@ TEST_F(TcpProxyTest, UpstreamFlushReceiveUpstreamData) { // Send some bytes; no timeout configured so this should be a no-op (not a crash). Buffer::OwnedImpl buffer("a"); EXPECT_CALL(*upstream_connections_.at(0), close(Network::ConnectionCloseType::NoFlush)); - upstream_read_filter_->onData(buffer, false); + upstream_callbacks_->onUpstreamData(buffer, false); } class TcpProxyRoutingTest : public testing::Test { @@ -1049,7 +1034,7 @@ TEST_F(TcpProxyRoutingTest, RoutableConnection) { connection_.local_address_ = std::make_shared("1.2.3.4", 9999); // Expect filter to try to open a connection to specified cluster. - EXPECT_CALL(factory_context_.cluster_manager_, tcpConnForCluster_("fake_cluster", _)); + EXPECT_CALL(factory_context_.cluster_manager_, tcpConnPoolForCluster("fake_cluster", _, _)); filter_->onNewConnection(); diff --git a/test/extensions/filters/network/thrift_proxy/router_test.cc b/test/extensions/filters/network/thrift_proxy/router_test.cc index 94afe87c750d1..0fe593ed2cddd 100644 --- a/test/extensions/filters/network/thrift_proxy/router_test.cc +++ b/test/extensions/filters/network/thrift_proxy/router_test.cc @@ -79,9 +79,6 @@ class ThriftRouterTestBase { route_ = new NiceMock(); route_ptr_.reset(route_); - host_ = new NiceMock(); - host_ptr_.reset(host_); - router_.reset(new Router(context_.clusterManager())); EXPECT_EQ(nullptr, router_->downstreamConnection()); @@ -98,16 +95,8 @@ class ThriftRouterTestBase { EXPECT_CALL(*route_, routeEntry()).WillOnce(Return(&route_entry_)); EXPECT_CALL(route_entry_, clusterName()).WillRepeatedly(ReturnRef(cluster_name_)); - EXPECT_CALL(context_.cluster_manager_.tcp_conn_pool_, newConnection(_)) - .WillOnce( - Invoke([&](Tcp::ConnectionPool::Callbacks& cb) -> Tcp::ConnectionPool::Cancellable* { - conn_pool_callbacks_ = &cb; - return &handle_; - })); - EXPECT_EQ(ThriftFilters::FilterStatus::StopIteration, router_->messageBegin(method_name_, msg_type_, seq_id_)); - EXPECT_NE(nullptr, conn_pool_callbacks_); NiceMock connection; EXPECT_CALL(callbacks_, connection()).WillRepeatedly(Return(&connection)); @@ -120,7 +109,7 @@ class ThriftRouterTestBase { } void connectUpstream() { - EXPECT_CALL(conn_data_, addUpstreamCallbacks(_)) + EXPECT_CALL(*context_.cluster_manager_.tcp_conn_pool_.connection_data_, addUpstreamCallbacks(_)) .WillOnce(Invoke([&](Tcp::ConnectionPool::UpstreamCallbacks& cb) -> void { upstream_callbacks_ = &cb; })); @@ -135,7 +124,8 @@ class ThriftRouterTestBase { EXPECT_CALL(*protocol_, writeMessageBegin(_, method_name_, msg_type_, seq_id_)); EXPECT_CALL(callbacks_, continueDecoding()); - conn_pool_callbacks_->onPoolReady(conn_data_, host_ptr_); + + context_.cluster_manager_.tcp_conn_pool_.poolReady(upstream_connection_); EXPECT_NE(nullptr, upstream_callbacks_); } @@ -194,10 +184,10 @@ class ThriftRouterTestBase { void completeRequest() { EXPECT_CALL(*protocol_, writeMessageEnd(_)); EXPECT_CALL(*transport_, encodeFrame(_, _)); - EXPECT_CALL(conn_data_.connection_, write(_, false)); + EXPECT_CALL(upstream_connection_, write(_, false)); if (msg_type_ == MessageType::Oneway) { - EXPECT_CALL(conn_data_, release()); + EXPECT_CALL(context_.cluster_manager_.tcp_conn_pool_, released(Ref(upstream_connection_))); } EXPECT_EQ(ThriftFilters::FilterStatus::Continue, router_->messageEnd()); @@ -213,7 +203,7 @@ class ThriftRouterTestBase { upstream_callbacks_->onUpstreamData(buffer, false); EXPECT_CALL(callbacks_, upstreamData(Ref(buffer))).WillOnce(Return(true)); - EXPECT_CALL(conn_data_, release()); + EXPECT_CALL(context_.cluster_manager_.tcp_conn_pool_, released(Ref(upstream_connection_))); upstream_callbacks_->onUpstreamData(buffer, false); } @@ -236,8 +226,6 @@ class ThriftRouterTestBase { NiceMock* host_{}; RouteConstSharedPtr route_ptr_; - Upstream::HostDescriptionConstSharedPtr host_ptr_; - std::unique_ptr router_; std::string cluster_name_{"cluster"}; @@ -246,10 +234,8 @@ class ThriftRouterTestBase { MessageType msg_type_{MessageType::Call}; int32_t seq_id_{1}; - NiceMock handle_; - NiceMock conn_data_; - Tcp::ConnectionPool::Callbacks* conn_pool_callbacks_{}; Tcp::ConnectionPool::UpstreamCallbacks* upstream_callbacks_{}; + NiceMock upstream_connection_; }; class ThriftRouterTest : public ThriftRouterTestBase, public Test { @@ -290,8 +276,8 @@ TEST_F(ThriftRouterTest, PoolRemoteConnectionFailure) { EXPECT_EQ(AppExceptionType::InternalError, app_ex->type_); EXPECT_THAT(app_ex->error_message_, ContainsRegex(".*connection failure.*")); })); - conn_pool_callbacks_->onPoolFailure( - Tcp::ConnectionPool::PoolFailureReason::RemoteConnectionFailure, host_ptr_); + context_.cluster_manager_.tcp_conn_pool_.poolFailure( + Tcp::ConnectionPool::PoolFailureReason::RemoteConnectionFailure); } TEST_F(ThriftRouterTest, PoolLocalConnectionFailure) { @@ -308,8 +294,8 @@ TEST_F(ThriftRouterTest, PoolLocalConnectionFailure) { EXPECT_EQ(AppExceptionType::InternalError, app_ex->type_); EXPECT_THAT(app_ex->error_message_, ContainsRegex(".*connection failure.*")); })); - conn_pool_callbacks_->onPoolFailure( - Tcp::ConnectionPool::PoolFailureReason::LocalConnectionFailure, host_ptr_); + context_.cluster_manager_.tcp_conn_pool_.poolFailure( + Tcp::ConnectionPool::PoolFailureReason::LocalConnectionFailure); } TEST_F(ThriftRouterTest, PoolTimeout) { @@ -326,7 +312,8 @@ TEST_F(ThriftRouterTest, PoolTimeout) { EXPECT_EQ(AppExceptionType::InternalError, app_ex->type_); EXPECT_THAT(app_ex->error_message_, ContainsRegex(".*connection failure.*")); })); - conn_pool_callbacks_->onPoolFailure(Tcp::ConnectionPool::PoolFailureReason::Timeout, host_ptr_); + context_.cluster_manager_.tcp_conn_pool_.poolFailure( + Tcp::ConnectionPool::PoolFailureReason::Timeout); } TEST_F(ThriftRouterTest, PoolOverflowFailure) { @@ -343,7 +330,8 @@ TEST_F(ThriftRouterTest, PoolOverflowFailure) { EXPECT_EQ(AppExceptionType::InternalError, app_ex->type_); EXPECT_THAT(app_ex->error_message_, ContainsRegex(".*too many connections.*")); })); - conn_pool_callbacks_->onPoolFailure(Tcp::ConnectionPool::PoolFailureReason::Overflow, host_ptr_); + context_.cluster_manager_.tcp_conn_pool_.poolFailure( + Tcp::ConnectionPool::PoolFailureReason::Overflow); } TEST_F(ThriftRouterTest, NoRoute) { @@ -440,7 +428,7 @@ TEST_F(ThriftRouterTest, TruncatedResponse) { EXPECT_CALL(callbacks_, startUpstreamResponse(TransportType::Framed, ProtocolType::Binary)); EXPECT_CALL(callbacks_, upstreamData(Ref(buffer))).WillOnce(Return(false)); - EXPECT_CALL(conn_data_, release()); + EXPECT_CALL(context_.cluster_manager_.tcp_conn_pool_, released(Ref(upstream_connection_))); EXPECT_CALL(callbacks_, resetDownstreamConnection()); upstream_callbacks_->onUpstreamData(buffer, true); @@ -462,7 +450,7 @@ TEST_F(ThriftRouterTest, UpstreamDataTriggersReset) { router_->resetUpstreamConnection(); return true; })); - EXPECT_CALL(conn_data_.connection_, close(Network::ConnectionCloseType::NoFlush)); + EXPECT_CALL(upstream_connection_, close(Network::ConnectionCloseType::NoFlush)); upstream_callbacks_->onUpstreamData(buffer, true); destroyRouter(); @@ -478,7 +466,7 @@ TEST_F(ThriftRouterTest, UnexpectedRouterDestroy) { initializeRouter(); startRequest(MessageType::Call); connectUpstream(); - EXPECT_CALL(conn_data_.connection_, close(Network::ConnectionCloseType::NoFlush)); + EXPECT_CALL(upstream_connection_, close(Network::ConnectionCloseType::NoFlush)); destroyRouter(); } diff --git a/test/integration/tcp_conn_pool_integration_test.cc b/test/integration/tcp_conn_pool_integration_test.cc index c13d836e8d2e6..fbe93d877c9a4 100644 --- a/test/integration/tcp_conn_pool_integration_test.cc +++ b/test/integration/tcp_conn_pool_integration_test.cc @@ -52,9 +52,9 @@ class TestFilter : public Network::ReadFilter { ASSERT(false); } - void onPoolReady(Tcp::ConnectionPool::ConnectionData& conn, + void onPoolReady(Tcp::ConnectionPool::ConnectionDataPtr&& conn, Upstream::HostDescriptionConstSharedPtr) override { - upstream_ = &conn; + upstream_ = std::move(conn); upstream_->addUpstreamCallbacks(*this); upstream_->connection().write(data_, false); @@ -67,7 +67,7 @@ class TestFilter : public Network::ReadFilter { Network::Connection& downstream = parent_.read_callbacks_->connection(); downstream.write(data, false); - upstream_->release(); + upstream_.reset(); } void onEvent(Network::ConnectionEvent) override {} void onAboveWriteBufferHighWatermark() override {} @@ -75,7 +75,7 @@ class TestFilter : public Network::ReadFilter { TestFilter& parent_; Buffer::OwnedImpl data_; - Tcp::ConnectionPool::ConnectionData* upstream_; + Tcp::ConnectionPool::ConnectionDataPtr upstream_; }; Upstream::ClusterManager& cluster_manager_; diff --git a/test/mocks/tcp/mocks.cc b/test/mocks/tcp/mocks.cc index 757f7556f32b1..1374d415dea7f 100644 --- a/test/mocks/tcp/mocks.cc +++ b/test/mocks/tcp/mocks.cc @@ -18,10 +18,12 @@ MockCancellable::~MockCancellable() {} MockUpstreamCallbacks::MockUpstreamCallbacks() {} MockUpstreamCallbacks::~MockUpstreamCallbacks() {} -MockConnectionData::MockConnectionData() { - ON_CALL(*this, connection()).WillByDefault(ReturnRef(connection_)); +MockConnectionData::MockConnectionData() {} +MockConnectionData::~MockConnectionData() { + if (release_callback_) { + release_callback_(); + } } -MockConnectionData::~MockConnectionData() {} MockInstance::MockInstance() { ON_CALL(*this, newConnection(_)).WillByDefault(Invoke([&](Callbacks& cb) -> Cancellable* { @@ -44,12 +46,16 @@ void MockInstance::poolFailure(PoolFailureReason reason) { cb->onPoolFailure(reason, host_); } -void MockInstance::poolReady() { +void MockInstance::poolReady(Network::MockClientConnection& conn) { Callbacks* cb = callbacks_.front(); callbacks_.pop_front(); handles_.pop_front(); - cb->onPoolReady(connection_data_, host_); + ON_CALL(*connection_data_, connection()).WillByDefault(ReturnRef(conn)); + + connection_data_->release_callback_ = [&]() -> void { released(conn); }; + + cb->onPoolReady(std::move(connection_data_), host_); } } // namespace ConnectionPool diff --git a/test/mocks/tcp/mocks.h b/test/mocks/tcp/mocks.h index a9cc52fecbbb9..3fb969f088b2d 100644 --- a/test/mocks/tcp/mocks.h +++ b/test/mocks/tcp/mocks.h @@ -44,9 +44,10 @@ class MockConnectionData : public ConnectionData { // Tcp::ConnectionPool::ConnectionData MOCK_METHOD0(connection, Network::ClientConnection&()); MOCK_METHOD1(addUpstreamCallbacks, void(ConnectionPool::UpstreamCallbacks&)); - MOCK_METHOD0(release, void()); - NiceMock connection_; + // If set, invoked in ~MockConnectionData, which indicates that the connection pool + // caller has relased a connection. + std::function release_callback_; }; class MockInstance : public Instance { @@ -61,14 +62,18 @@ class MockInstance : public Instance { MockCancellable* newConnectionImpl(Callbacks& cb); void poolFailure(PoolFailureReason reason); - void poolReady(); + void poolReady(Network::MockClientConnection& conn); + + // Invoked when connection_data_, having been assigned via poolReady is released. + MOCK_METHOD1(released, void(Network::MockClientConnection&)); std::list> handles_; std::list callbacks_; std::shared_ptr> host_{ new NiceMock()}; - NiceMock connection_data_; + std::unique_ptr> connection_data_{ + new NiceMock()}; }; } // namespace ConnectionPool