diff --git a/include/envoy/tcp/conn_pool.h b/include/envoy/tcp/conn_pool.h index 8237af37fea31..c89b25eb91432 100644 --- a/include/envoy/tcp/conn_pool.h +++ b/include/envoy/tcp/conn_pool.h @@ -58,8 +58,7 @@ class UpstreamCallbacks : public Network::ConnectionCallbacks { }; /* - * ConnectionData wraps a ClientConnection allocated to a caller. Open ClientConnections are - * released back to the pool for re-use when their containing ConnectionData is destroyed. + * ConnectionData wraps a ClientConnection allocated to a caller. */ class ConnectionData { public: @@ -77,9 +76,13 @@ class ConnectionData { * @param callback the UpstreamCallbacks to invoke for upstream data */ virtual void addUpstreamCallbacks(ConnectionPool::UpstreamCallbacks& callback) PURE; -}; -typedef std::unique_ptr ConnectionDataPtr; + /** + * Release the connection after use. The connection should be closed first only if it is + * not viable for future use. + */ + virtual void release() PURE; +}; /** * Pool callbacks invoked in the context of a newConnection() call, either synchronously or @@ -99,17 +102,14 @@ class Callbacks { Upstream::HostDescriptionConstSharedPtr host) PURE; /** - * Called when a connection is available to process a request/response. Connections may be - * released back to the pool for re-use by resetting the ConnectionDataPtr. If the connection is - * no longer viable for reuse (e.g. due to some kind of protocol error), the underlying - * ClientConnection should be closed to prevent its reuse. - * + * Called when a connection is available to process a request/response. Recipients of connections + * must release the connection after use. They should only close the underlying ClientConnection + * if it is no longer viable for future requests. * @param conn supplies the connection data to use. * @param host supplies the description of the host that will carry the request. For logical * connection pools the description may be different each time this is called. */ - virtual void onPoolReady(ConnectionDataPtr&& conn, - Upstream::HostDescriptionConstSharedPtr host) PURE; + virtual void onPoolReady(ConnectionData& conn, Upstream::HostDescriptionConstSharedPtr host) PURE; }; /** diff --git a/source/common/http/websocket/ws_handler_impl.cc b/source/common/http/websocket/ws_handler_impl.cc index 5906c54b5d92c..541f085155dfe 100644 --- a/source/common/http/websocket/ws_handler_impl.cc +++ b/source/common/http/websocket/ws_handler_impl.cc @@ -131,8 +131,7 @@ void WsHandlerImpl::onConnectionSuccess() { // the connection pool. The current approach is a stop gap solution, where // we put the onus on the user to tell us if a route (and corresponding upstream) // is supposed to allow websocket upgrades or not. - Http1::ClientConnectionImpl upstream_http(upstream_conn_data_->connection(), - http_conn_callbacks_); + Http1::ClientConnectionImpl upstream_http(*upstream_connection_, http_conn_callbacks_); Http1::RequestStreamEncoderImpl upstream_request = Http1::RequestStreamEncoderImpl(upstream_http); upstream_request.encodeHeaders(request_headers_, false); ASSERT(state_ == ConnectState::PreConnect); diff --git a/source/common/tcp/conn_pool.cc b/source/common/tcp/conn_pool.cc index 2ee50a6fcdf9f..d6c60c174c465 100644 --- a/source/common/tcp/conn_pool.cc +++ b/source/common/tcp/conn_pool.cc @@ -46,10 +46,8 @@ void ConnPoolImpl::addDrainedCallback(DrainedCb cb) { void ConnPoolImpl::assignConnection(ActiveConn& conn, ConnectionPool::Callbacks& callbacks) { ASSERT(conn.wrapper_ == nullptr); - conn.wrapper_ = std::make_shared(conn); - - callbacks.onPoolReady(std::make_unique(conn.wrapper_), - conn.real_host_description_); + conn.wrapper_ = std::make_unique(conn); + callbacks.onPoolReady(*conn.wrapper_, conn.real_host_description_); } void ConnPoolImpl::checkForDrained() { @@ -126,8 +124,6 @@ void ConnPoolImpl::onConnectionEvent(ActiveConn& conn, Network::ConnectionEvent host_->cluster().stats().upstream_cx_destroy_remote_with_active_rq_.inc(); } host_->cluster().stats().upstream_cx_destroy_with_active_rq_.inc(); - - conn.wrapper_->release(true); } removed = conn.removeFromList(busy_conns_); @@ -263,29 +259,23 @@ ConnPoolImpl::ConnectionWrapper::ConnectionWrapper(ActiveConn& parent) : parent_ parent_.parent_.host_->stats().rq_active_.inc(); } -Network::ClientConnection& ConnPoolImpl::ConnectionWrapper::connection() { - ASSERT(!released_); - return *parent_.conn_; +ConnPoolImpl::ConnectionWrapper::~ConnectionWrapper() { + parent_.parent_.host_->cluster().stats().upstream_rq_active_.dec(); + parent_.parent_.host_->stats().rq_active_.dec(); } +Network::ClientConnection& ConnPoolImpl::ConnectionWrapper::connection() { return *parent_.conn_; } + void ConnPoolImpl::ConnectionWrapper::addUpstreamCallbacks(ConnectionPool::UpstreamCallbacks& cb) { ASSERT(!released_); callbacks_ = &cb; } -void ConnPoolImpl::ConnectionWrapper::release(bool closed) { - // Allow multiple calls: connection close and destruction of ConnectionDataImplPtr will both - // result in this call. - if (!released_) { - released_ = true; - callbacks_ = nullptr; - if (!closed) { - parent_.parent_.onConnReleased(parent_); - } - - parent_.parent_.host_->cluster().stats().upstream_rq_active_.dec(); - parent_.parent_.host_->stats().rq_active_.dec(); - } +void ConnPoolImpl::ConnectionWrapper::release() { + ASSERT(!released_); + released_ = true; + callbacks_ = nullptr; + parent_.parent_.onConnReleased(parent_); } ConnPoolImpl::PendingRequest::PendingRequest(ConnPoolImpl& parent, diff --git a/source/common/tcp/conn_pool.h b/source/common/tcp/conn_pool.h index 6e1846fe43c31..aa06d8b2d8b4d 100644 --- a/source/common/tcp/conn_pool.h +++ b/source/common/tcp/conn_pool.h @@ -34,32 +34,21 @@ class ConnPoolImpl : Logger::Loggable, public ConnectionPool:: protected: struct ActiveConn; - struct ConnectionWrapper { + struct ConnectionWrapper : public ConnectionPool::ConnectionData { ConnectionWrapper(ActiveConn& parent); + ~ConnectionWrapper(); - Network::ClientConnection& connection(); - void addUpstreamCallbacks(ConnectionPool::UpstreamCallbacks& callbacks); - void release(bool closed); + // ConnectionPool::ConnectionData + Network::ClientConnection& connection() override; + void addUpstreamCallbacks(ConnectionPool::UpstreamCallbacks& callbacks) override; + void release() override; ActiveConn& parent_; ConnectionPool::UpstreamCallbacks* callbacks_{}; bool released_{false}; }; - typedef std::shared_ptr ConnectionWrapperSharedPtr; - - struct ConnectionDataImpl : public ConnectionPool::ConnectionData { - ConnectionDataImpl(ConnectionWrapperSharedPtr wrapper) : wrapper_(wrapper) {} - ~ConnectionDataImpl() { wrapper_->release(false); } - - // ConnectionPool::ConnectionData - Network::ClientConnection& connection() override { return wrapper_->connection(); } - void addUpstreamCallbacks(ConnectionPool::UpstreamCallbacks& callbacks) override { - wrapper_->addUpstreamCallbacks(callbacks); - }; - - ConnectionWrapperSharedPtr wrapper_; - }; + typedef std::unique_ptr ConnectionWrapperPtr; struct ConnReadFilter : public Network::ReadFilterBaseImpl { ConnReadFilter(ActiveConn& parent) : parent_(parent) {} @@ -89,7 +78,7 @@ class ConnPoolImpl : Logger::Loggable, public ConnectionPool:: ConnPoolImpl& parent_; Upstream::HostDescriptionConstSharedPtr real_host_description_; - ConnectionWrapperSharedPtr wrapper_; + ConnectionWrapperPtr wrapper_; Network::ClientConnectionPtr conn_; Event::TimerPtr connect_timer_; Stats::TimespanPtr conn_length_; diff --git a/source/common/tcp_proxy/BUILD b/source/common/tcp_proxy/BUILD index 61de5015db426..fcd3000620fb8 100644 --- a/source/common/tcp_proxy/BUILD +++ b/source/common/tcp_proxy/BUILD @@ -24,7 +24,6 @@ envoy_cc_library( "//include/envoy/stats:stats_interface", "//include/envoy/stats:stats_macros", "//include/envoy/stats:timespan", - "//include/envoy/tcp:conn_pool_interface", "//include/envoy/upstream:cluster_manager_interface", "//include/envoy/upstream:upstream_interface", "//source/common/access_log:access_log_lib", diff --git a/source/common/tcp_proxy/tcp_proxy.cc b/source/common/tcp_proxy/tcp_proxy.cc index 906bdeeb696ea..68d4880200be7 100644 --- a/source/common/tcp_proxy/tcp_proxy.cc +++ b/source/common/tcp_proxy/tcp_proxy.cc @@ -135,12 +135,8 @@ Filter::~Filter() { access_log->log(nullptr, nullptr, nullptr, getRequestInfo()); } - if (upstream_handle_) { - upstream_handle_->cancel(); - } - - if (upstream_conn_data_) { - upstream_conn_data_->connection().close(Network::ConnectionCloseType::NoFlush); + if (upstream_connection_) { + finalizeUpstreamConnectionStats(); } } @@ -148,6 +144,21 @@ TcpProxyStats Config::SharedConfig::generateStats(Stats::Scope& scope) { return {ALL_TCP_PROXY_STATS(POOL_COUNTER(scope), POOL_GAUGE(scope))}; } +namespace { +void finalizeConnectionStats(const Upstream::HostDescription& host, + Stats::Timespan connected_timespan) { + host.cluster().stats().upstream_cx_destroy_.inc(); + host.cluster().stats().upstream_cx_active_.dec(); + host.stats().cx_active_.dec(); + host.cluster().resourceManager(Upstream::ResourcePriority::Default).connections().dec(); + connected_timespan.complete(); +} +} // namespace + +void Filter::finalizeUpstreamConnectionStats() { + finalizeConnectionStats(*read_callbacks_->upstreamHost(), *connected_timespan_); +} + void Filter::initializeReadFilterCallbacks(Network::ReadFilterCallbacks& callbacks) { initialize(callbacks, true); } @@ -177,15 +188,15 @@ void Filter::initialize(Network::ReadFilterCallbacks& callbacks, bool set_connec } void Filter::readDisableUpstream(bool disable) { - if (upstream_conn_data_ == nullptr || - upstream_conn_data_->connection().state() != Network::Connection::State::Open) { + if (upstream_connection_ == nullptr || + upstream_connection_->state() != Network::Connection::State::Open) { // Because we flush write downstream, we can have a case where upstream has already disconnected // and we are waiting to flush. If we had a watermark event during this time we should no // longer touch the upstream connection. return; } - upstream_conn_data_->connection().readDisable(disable); + upstream_connection_->readDisable(disable); if (disable) { read_callbacks_->upstreamHost() ->cluster() @@ -251,12 +262,13 @@ void Filter::UpstreamCallbacks::onBelowWriteBufferLowWatermark() { } } -void Filter::UpstreamCallbacks::onUpstreamData(Buffer::Instance& data, bool end_stream) { +Network::FilterStatus Filter::UpstreamCallbacks::onData(Buffer::Instance& data, bool end_stream) { if (parent_) { parent_->onUpstreamData(data, end_stream); } else { drainer_->onData(data, end_stream); } + return Network::FilterStatus::StopIteration; } void Filter::UpstreamCallbacks::onBytesSent() { @@ -282,7 +294,7 @@ void Filter::UpstreamCallbacks::drain(Drainer& drainer) { } Network::FilterStatus Filter::initializeUpstreamConnection() { - ASSERT(upstream_conn_data_ == nullptr); + ASSERT(upstream_connection_ == nullptr); const std::string& cluster_name = getUpstreamCluster(); @@ -299,9 +311,6 @@ Network::FilterStatus Filter::initializeUpstreamConnection() { } Upstream::ClusterInfoConstSharedPtr cluster = thread_local_cluster->info(); - - // Check this here because the TCP conn pool will queue our request waiting for a connection that - // will never be released. if (!cluster->resourceManager(Upstream::ResourcePriority::Default).connections().canCreate()) { getRequestInfo().setResponseFlag(RequestInfo::ResponseFlag::UpstreamOverflow); cluster->stats().upstream_cx_overflow_.inc(); @@ -316,105 +325,86 @@ Network::FilterStatus Filter::initializeUpstreamConnection() { return Network::FilterStatus::StopIteration; } - Tcp::ConnectionPool::Instance* conn_pool = cluster_manager_.tcpConnPoolForCluster( - cluster_name, Upstream::ResourcePriority::Default, this); - if (!conn_pool) { - // Either cluster is unknown or there are no healthy hosts. tcpConnPoolForCluster() increments - // cluster->stats().upstream_cx_none_healthy in the latter case. + Upstream::Host::CreateConnectionData conn_info = + cluster_manager_.tcpConnForCluster(cluster_name, this); + + upstream_connection_ = std::move(conn_info.connection_); + read_callbacks_->upstreamHost(conn_info.host_description_); + if (!upstream_connection_) { + // tcpConnForCluster() increments cluster->stats().upstream_cx_none_healthy. getRequestInfo().setResponseFlag(RequestInfo::ResponseFlag::NoHealthyUpstream); onInitFailure(UpstreamFailureReason::NO_HEALTHY_UPSTREAM); return Network::FilterStatus::StopIteration; } - connecting_ = true; connect_attempts_++; - - // Because we never return open connections to the pool, this should either return a handle while - // a connection completes or it invokes onPoolFailure inline. Either way, stop iteration. - upstream_handle_ = conn_pool->newConnection(*this); - return Network::FilterStatus::StopIteration; -} - -void Filter::onPoolFailure(Tcp::ConnectionPool::PoolFailureReason reason, - Upstream::HostDescriptionConstSharedPtr host) { - upstream_handle_ = nullptr; - - read_callbacks_->upstreamHost(host); - getRequestInfo().onUpstreamHostSelected(host); - - switch (reason) { - case Tcp::ConnectionPool::PoolFailureReason::Overflow: - case Tcp::ConnectionPool::PoolFailureReason::LocalConnectionFailure: - upstream_callbacks_->onEvent(Network::ConnectionEvent::LocalClose); - break; - - case Tcp::ConnectionPool::PoolFailureReason::RemoteConnectionFailure: - upstream_callbacks_->onEvent(Network::ConnectionEvent::RemoteClose); - break; - - case Tcp::ConnectionPool::PoolFailureReason::Timeout: - onConnectTimeout(); - break; - - default: - NOT_REACHED_GCOVR_EXCL_LINE; - } -} - -void Filter::onPoolReady(Tcp::ConnectionPool::ConnectionDataPtr&& conn_data, - Upstream::HostDescriptionConstSharedPtr host) { - upstream_handle_ = nullptr; - upstream_conn_data_ = std::move(conn_data); - read_callbacks_->upstreamHost(host); - - upstream_conn_data_->addUpstreamCallbacks(*upstream_callbacks_); - - Network::ClientConnection& connection = upstream_conn_data_->connection(); - - connection.enableHalfClose(true); - - getRequestInfo().onUpstreamHostSelected(host); - getRequestInfo().setUpstreamLocalAddress(connection.localAddress()); - - // Simulate the event that onPoolReady represents. - upstream_callbacks_->onEvent(Network::ConnectionEvent::Connected); - - read_callbacks_->continueReading(); + cluster->resourceManager(Upstream::ResourcePriority::Default).connections().inc(); + upstream_connection_->addReadFilter(upstream_callbacks_); + upstream_connection_->addConnectionCallbacks(*upstream_callbacks_); + upstream_connection_->enableHalfClose(true); + upstream_connection_->setConnectionStats( + {read_callbacks_->upstreamHost()->cluster().stats().upstream_cx_rx_bytes_total_, + read_callbacks_->upstreamHost()->cluster().stats().upstream_cx_rx_bytes_buffered_, + read_callbacks_->upstreamHost()->cluster().stats().upstream_cx_tx_bytes_total_, + read_callbacks_->upstreamHost()->cluster().stats().upstream_cx_tx_bytes_buffered_, + &read_callbacks_->upstreamHost()->cluster().stats().bind_errors_}); + upstream_connection_->connect(); + upstream_connection_->noDelay(true); + getRequestInfo().onUpstreamHostSelected(conn_info.host_description_); + getRequestInfo().setUpstreamLocalAddress(upstream_connection_->localAddress()); + + ASSERT(connect_timeout_timer_ == nullptr); + connect_timeout_timer_ = read_callbacks_->connection().dispatcher().createTimer( + [this]() -> void { onConnectTimeout(); }); + connect_timeout_timer_->enableTimer(cluster->connectTimeout()); + + read_callbacks_->upstreamHost()->cluster().stats().upstream_cx_total_.inc(); + read_callbacks_->upstreamHost()->cluster().stats().upstream_cx_active_.inc(); + read_callbacks_->upstreamHost()->stats().cx_total_.inc(); + read_callbacks_->upstreamHost()->stats().cx_active_.inc(); + connect_timespan_.reset(new Stats::Timespan( + read_callbacks_->upstreamHost()->cluster().stats().upstream_cx_connect_ms_)); + connected_timespan_.reset(new Stats::Timespan( + read_callbacks_->upstreamHost()->cluster().stats().upstream_cx_length_ms_)); + + return Network::FilterStatus::Continue; } void Filter::onConnectTimeout() { ENVOY_CONN_LOG(debug, "connect timeout", read_callbacks_->connection()); read_callbacks_->upstreamHost()->outlierDetector().putResult(Upstream::Outlier::Result::TIMEOUT); + read_callbacks_->upstreamHost()->cluster().stats().upstream_cx_connect_timeout_.inc(); getRequestInfo().setResponseFlag(RequestInfo::ResponseFlag::UpstreamConnectionFailure); - // Raise LocalClose, which will trigger a reconnect if needed/configured. - upstream_callbacks_->onEvent(Network::ConnectionEvent::LocalClose); + // This will cause a LocalClose event to be raised, which will trigger a reconnect if + // needed/configured. + upstream_connection_->close(Network::ConnectionCloseType::NoFlush); } Network::FilterStatus Filter::onData(Buffer::Instance& data, bool end_stream) { ENVOY_CONN_LOG(trace, "downstream connection received {} bytes, end_stream={}", read_callbacks_->connection(), data.length(), end_stream); getRequestInfo().addBytesReceived(data.length()); - upstream_conn_data_->connection().write(data, end_stream); + upstream_connection_->write(data, end_stream); ASSERT(0 == data.length()); resetIdleTimer(); // TODO(ggreenway) PERF: do we need to reset timer on both send and receive? return Network::FilterStatus::StopIteration; } void Filter::onDownstreamEvent(Network::ConnectionEvent event) { - if (upstream_conn_data_) { + if (upstream_connection_) { if (event == Network::ConnectionEvent::RemoteClose) { - upstream_conn_data_->connection().close(Network::ConnectionCloseType::FlushWrite); + upstream_connection_->close(Network::ConnectionCloseType::FlushWrite); - if (upstream_conn_data_ != nullptr && - upstream_conn_data_->connection().state() != Network::Connection::State::Closed) { - config_->drainManager().add(config_->sharedConfig(), std::move(upstream_conn_data_), + if (upstream_connection_ != nullptr && + upstream_connection_->state() != Network::Connection::State::Closed) { + config_->drainManager().add(config_->sharedConfig(), std::move(upstream_connection_), std::move(upstream_callbacks_), std::move(idle_timer_), - read_callbacks_->upstreamHost()); + read_callbacks_->upstreamHost(), + std::move(connected_timespan_)); } } else if (event == Network::ConnectionEvent::LocalClose) { - upstream_conn_data_->connection().close(Network::ConnectionCloseType::NoFlush); - upstream_conn_data_.reset(); + upstream_connection_->close(Network::ConnectionCloseType::NoFlush); disableIdleTimer(); } } @@ -430,21 +420,36 @@ void Filter::onUpstreamData(Buffer::Instance& data, bool end_stream) { } void Filter::onUpstreamEvent(Network::ConnectionEvent event) { - // Update the connecting flag before processing the event because we may start a new connection - // attempt in initializeUpstreamConnection. - bool connecting = connecting_; - connecting_ = false; + bool connecting = false; + + // The timer must be cleared before, not after, processing the event because + // if initializeUpstreamConnection() is called it will reset the timer, so + // clearing after that call will leave the timer unset. + if (connect_timeout_timer_) { + connecting = true; + connect_timeout_timer_->disableTimer(); + connect_timeout_timer_.reset(); + } if (event == Network::ConnectionEvent::RemoteClose || event == Network::ConnectionEvent::LocalClose) { - upstream_conn_data_.reset(); + finalizeUpstreamConnectionStats(); + read_callbacks_->connection().dispatcher().deferredDelete(std::move(upstream_connection_)); disableIdleTimer(); + auto& destroy_ctx_stat = + (event == Network::ConnectionEvent::RemoteClose) + ? read_callbacks_->upstreamHost()->cluster().stats().upstream_cx_destroy_remote_ + : read_callbacks_->upstreamHost()->cluster().stats().upstream_cx_destroy_local_; + destroy_ctx_stat.inc(); + if (connecting) { if (event == Network::ConnectionEvent::RemoteClose) { getRequestInfo().setResponseFlag(RequestInfo::ResponseFlag::UpstreamConnectionFailure); read_callbacks_->upstreamHost()->outlierDetector().putResult( Upstream::Outlier::Result::CONNECT_FAILED); + read_callbacks_->upstreamHost()->cluster().stats().upstream_cx_connect_fail_.inc(); + read_callbacks_->upstreamHost()->stats().cx_connect_fail_.inc(); } initializeUpstreamConnection(); @@ -454,6 +459,8 @@ void Filter::onUpstreamEvent(Network::ConnectionEvent event) { } } } else if (event == Network::ConnectionEvent::Connected) { + connect_timespan_->complete(); + // Re-enable downstream reads now that the upstream connection is established // so we have a place to send downstream data to. read_callbacks_->connection().readDisable(false); @@ -473,10 +480,8 @@ void Filter::onUpstreamEvent(Network::ConnectionEvent event) { }); resetIdleTimer(); read_callbacks_->connection().addBytesSentCallback([this](uint64_t) { resetIdleTimer(); }); - upstream_conn_data_->connection().addBytesSentCallback([upstream_callbacks = - upstream_callbacks_](uint64_t) { - upstream_callbacks->onBytesSent(); - }); + upstream_connection_->addBytesSentCallback([upstream_callbacks = upstream_callbacks_]( + uint64_t) { upstream_callbacks->onBytesSent(); }); } } } @@ -518,12 +523,14 @@ UpstreamDrainManager::~UpstreamDrainManager() { } void UpstreamDrainManager::add(const Config::SharedConfigSharedPtr& config, - Tcp::ConnectionPool::ConnectionDataPtr&& upstream_conn_data, + Network::ClientConnectionPtr&& upstream_connection, const std::shared_ptr& callbacks, Event::TimerPtr&& idle_timer, - const Upstream::HostDescriptionConstSharedPtr& upstream_host) { - DrainerPtr drainer(new Drainer(*this, config, callbacks, std::move(upstream_conn_data), - std::move(idle_timer), upstream_host)); + const Upstream::HostDescriptionConstSharedPtr& upstream_host, + Stats::TimespanPtr&& connected_timespan) { + DrainerPtr drainer(new Drainer(*this, config, callbacks, std::move(upstream_connection), + std::move(idle_timer), upstream_host, + std::move(connected_timespan))); callbacks->drain(*drainer); // Use temporary to ensure we get the pointer before we move it out of drainer @@ -540,10 +547,12 @@ void UpstreamDrainManager::remove(Drainer& drainer, Event::Dispatcher& dispatche Drainer::Drainer(UpstreamDrainManager& parent, const Config::SharedConfigSharedPtr& config, const std::shared_ptr& callbacks, - Tcp::ConnectionPool::ConnectionDataPtr&& conn_data, Event::TimerPtr&& idle_timer, - const Upstream::HostDescriptionConstSharedPtr& upstream_host) - : parent_(parent), callbacks_(callbacks), upstream_conn_data_(std::move(conn_data)), - timer_(std::move(idle_timer)), upstream_host_(upstream_host), config_(config) { + Network::ClientConnectionPtr&& connection, Event::TimerPtr&& idle_timer, + const Upstream::HostDescriptionConstSharedPtr& upstream_host, + Stats::TimespanPtr&& connected_timespan) + : parent_(parent), callbacks_(callbacks), upstream_connection_(std::move(connection)), + timer_(std::move(idle_timer)), connected_timespan_(std::move(connected_timespan)), + upstream_host_(upstream_host), config_(config) { config_->stats().upstream_flush_total_.inc(); config_->stats().upstream_flush_active_.inc(); } @@ -555,7 +564,8 @@ void Drainer::onEvent(Network::ConnectionEvent event) { timer_->disableTimer(); } config_->stats().upstream_flush_active_.dec(); - parent_.remove(*this, upstream_conn_data_->connection().dispatcher()); + finalizeConnectionStats(*upstream_host_, *connected_timespan_); + parent_.remove(*this, upstream_connection_->dispatcher()); } } @@ -582,7 +592,7 @@ void Drainer::onBytesSent() { void Drainer::cancelDrain() { // This sends onEvent(LocalClose). - upstream_conn_data_->connection().close(Network::ConnectionCloseType::NoFlush); + upstream_connection_->close(Network::ConnectionCloseType::NoFlush); } } // namespace TcpProxy diff --git a/source/common/tcp_proxy/tcp_proxy.h b/source/common/tcp_proxy/tcp_proxy.h index 61a803db85cca..3c9173edc3328 100644 --- a/source/common/tcp_proxy/tcp_proxy.h +++ b/source/common/tcp_proxy/tcp_proxy.h @@ -14,7 +14,6 @@ #include "envoy/server/filter_config.h" #include "envoy/stats/stats_macros.h" #include "envoy/stats/timespan.h" -#include "envoy/tcp/conn_pool.h" #include "envoy/upstream/cluster_manager.h" #include "envoy/upstream/upstream.h" @@ -139,7 +138,6 @@ typedef std::shared_ptr ConfigSharedPtr; * be proxied back and forth between the two connections. */ class Filter : public Network::ReadFilter, - Tcp::ConnectionPool::Callbacks, Upstream::LoadBalancerContext, protected Logger::Loggable { public: @@ -151,12 +149,6 @@ class Filter : public Network::ReadFilter, Network::FilterStatus onNewConnection() override { return initializeUpstreamConnection(); } void initializeReadFilterCallbacks(Network::ReadFilterCallbacks& callbacks) override; - // Tcp::ConnectionPool::Callbacks - void onPoolFailure(Tcp::ConnectionPool::PoolFailureReason reason, - Upstream::HostDescriptionConstSharedPtr host) override; - void onPoolReady(Tcp::ConnectionPool::ConnectionDataPtr&& conn_data, - Upstream::HostDescriptionConstSharedPtr host) override; - // Upstream::LoadBalancerContext absl::optional computeHashKey() override { return {}; } const Router::MetadataMatchCriteria* metadataMatchCriteria() override { @@ -174,15 +166,18 @@ class Filter : public Network::ReadFilter, void readDisableUpstream(bool disable); void readDisableDownstream(bool disable); - struct UpstreamCallbacks : public Tcp::ConnectionPool::UpstreamCallbacks { + struct UpstreamCallbacks : public Network::ConnectionCallbacks, + public Network::ReadFilterBaseImpl { UpstreamCallbacks(Filter* parent) : parent_(parent) {} - // Tcp::ConnectionPool::UpstreamCallbacks - void onUpstreamData(Buffer::Instance& data, bool end_stream) override; + // Network::ConnectionCallbacks void onEvent(Network::ConnectionEvent event) override; void onAboveWriteBufferHighWatermark() override; void onBelowWriteBufferLowWatermark() override; + // Network::ReadFilter + Network::FilterStatus onData(Buffer::Instance& data, bool end_stream) override; + void onBytesSent(); void onIdleTimeout(); void drain(Drainer& drainer); @@ -239,6 +234,7 @@ class Filter : public Network::ReadFilter, void onDownstreamEvent(Network::ConnectionEvent event); void onUpstreamData(Buffer::Instance& data, bool end_stream); void onUpstreamEvent(Network::ConnectionEvent event); + void finalizeUpstreamConnectionStats(); void onIdleTimeout(); void resetIdleTimer(); void disableIdleTimer(); @@ -246,26 +242,29 @@ class Filter : public Network::ReadFilter, const ConfigSharedPtr config_; Upstream::ClusterManager& cluster_manager_; Network::ReadFilterCallbacks* read_callbacks_{}; - Tcp::ConnectionPool::Cancellable* upstream_handle_{}; - Tcp::ConnectionPool::ConnectionDataPtr upstream_conn_data_; + Network::ClientConnectionPtr upstream_connection_; DownstreamCallbacks downstream_callbacks_; + Event::TimerPtr connect_timeout_timer_; Event::TimerPtr idle_timer_; + Stats::TimespanPtr connect_timespan_; + Stats::TimespanPtr connected_timespan_; std::shared_ptr upstream_callbacks_; // shared_ptr required for passing as a // read filter. RequestInfo::RequestInfoImpl request_info_; uint32_t connect_attempts_{}; - bool connecting_{}; }; -// This class deals with an upstream connection that needs to finish flushing, when the downstream -// connection has been closed. The TcpProxy is destroyed when the downstream connection is closed, -// so handling the upstream connection here allows it to finish draining or timeout. +// This class holds ownership of an upstream connection that needs to finish +// flushing, when the downstream connection has been closed. The TcpProxy is +// destroyed when the downstream connection is closed, so moving the upstream +// connection here allows it to finish draining or timeout. class Drainer : public Event::DeferredDeletable { public: Drainer(UpstreamDrainManager& parent, const Config::SharedConfigSharedPtr& config, const std::shared_ptr& callbacks, - Tcp::ConnectionPool::ConnectionDataPtr&& conn_data, Event::TimerPtr&& idle_timer, - const Upstream::HostDescriptionConstSharedPtr& upstream_host); + Network::ClientConnectionPtr&& connection, Event::TimerPtr&& idle_timer, + const Upstream::HostDescriptionConstSharedPtr& upstream_host, + Stats::TimespanPtr&& connected_timespan); void onEvent(Network::ConnectionEvent event); void onData(Buffer::Instance& data, bool end_stream); @@ -276,8 +275,9 @@ class Drainer : public Event::DeferredDeletable { private: UpstreamDrainManager& parent_; std::shared_ptr callbacks_; - Tcp::ConnectionPool::ConnectionDataPtr upstream_conn_data_; + Network::ClientConnectionPtr upstream_connection_; Event::TimerPtr timer_; + Stats::TimespanPtr connected_timespan_; Upstream::HostDescriptionConstSharedPtr upstream_host_; Config::SharedConfigSharedPtr config_; }; @@ -288,10 +288,11 @@ class UpstreamDrainManager : public ThreadLocal::ThreadLocalObject { public: ~UpstreamDrainManager(); void add(const Config::SharedConfigSharedPtr& config, - Tcp::ConnectionPool::ConnectionDataPtr&& upstream_conn_data, + Network::ClientConnectionPtr&& upstream_connection, const std::shared_ptr& callbacks, Event::TimerPtr&& idle_timer, - const Upstream::HostDescriptionConstSharedPtr& upstream_host); + const Upstream::HostDescriptionConstSharedPtr& upstream_host, + Stats::TimespanPtr&& connected_timespan); void remove(Drainer& drainer, Event::Dispatcher& dispatcher); private: diff --git a/source/extensions/filters/network/thrift_proxy/router/router_impl.cc b/source/extensions/filters/network/thrift_proxy/router/router_impl.cc index 09d1ac142c2d5..2b4fb77946c35 100644 --- a/source/extensions/filters/network/thrift_proxy/router/router_impl.cc +++ b/source/extensions/filters/network/thrift_proxy/router/router_impl.cc @@ -224,7 +224,7 @@ void Router::UpstreamRequest::start() { void Router::UpstreamRequest::resetStream() { if (conn_data_ != nullptr) { conn_data_->connection().close(Network::ConnectionCloseType::NoFlush); - conn_data_.reset(); + conn_data_ = nullptr; } } @@ -235,10 +235,10 @@ void Router::UpstreamRequest::onPoolFailure(Tcp::ConnectionPool::PoolFailureReas onResetStream(reason); } -void Router::UpstreamRequest::onPoolReady(Tcp::ConnectionPool::ConnectionDataPtr&& conn_data, +void Router::UpstreamRequest::onPoolReady(Tcp::ConnectionPool::ConnectionData& conn_data, Upstream::HostDescriptionConstSharedPtr host) { onUpstreamHostSelected(host); - conn_data_ = std::move(conn_data); + conn_data_ = &conn_data; conn_data_->addUpstreamCallbacks(parent_); conn_pool_handle_ = nullptr; @@ -263,7 +263,10 @@ void Router::UpstreamRequest::onRequestComplete() { request_complete_ = true; } void Router::UpstreamRequest::onResponseComplete() { response_complete_ = true; - conn_data_.reset(); + if (conn_data_ != nullptr) { + conn_data_->release(); + } + conn_data_ = nullptr; } void Router::UpstreamRequest::onUpstreamHostSelected(Upstream::HostDescriptionConstSharedPtr host) { diff --git a/source/extensions/filters/network/thrift_proxy/router/router_impl.h b/source/extensions/filters/network/thrift_proxy/router/router_impl.h index d298d38b354cd..117c99ca2635b 100644 --- a/source/extensions/filters/network/thrift_proxy/router/router_impl.h +++ b/source/extensions/filters/network/thrift_proxy/router/router_impl.h @@ -112,7 +112,7 @@ class Router : public Tcp::ConnectionPool::UpstreamCallbacks, // Tcp::ConnectionPool::Callbacks void onPoolFailure(Tcp::ConnectionPool::PoolFailureReason reason, Upstream::HostDescriptionConstSharedPtr host) override; - void onPoolReady(Tcp::ConnectionPool::ConnectionDataPtr&& conn, + void onPoolReady(Tcp::ConnectionPool::ConnectionData& conn, Upstream::HostDescriptionConstSharedPtr host) override; void onRequestComplete(); @@ -127,7 +127,7 @@ class Router : public Tcp::ConnectionPool::UpstreamCallbacks, const int32_t seq_id_; Tcp::ConnectionPool::Cancellable* conn_pool_handle_{}; - Tcp::ConnectionPool::ConnectionDataPtr conn_data_; + Tcp::ConnectionPool::ConnectionData* conn_data_{}; Upstream::HostDescriptionConstSharedPtr upstream_host_; TransportPtr transport_; ProtocolType proto_type_{ProtocolType::Auto}; diff --git a/test/common/http/conn_manager_impl_test.cc b/test/common/http/conn_manager_impl_test.cc index 3e7c089b59d16..a036e9edcadda 100644 --- a/test/common/http/conn_manager_impl_test.cc +++ b/test/common/http/conn_manager_impl_test.cc @@ -312,8 +312,6 @@ class HttpConnectionManagerImplTest : public Test, public ConnectionManagerConfi ConnectionManagerListenerStats listener_stats_; bool proxy_100_continue_ = false; Http::Http1Settings http1_settings_; - NiceMock upstream_conn_; // for websocket tests - NiceMock conn_pool_; // for websocket tests // TODO(mattklein123): Not all tests have been converted over to better setup. Convert the rest. MockStreamEncoder response_encoder_; @@ -1506,7 +1504,8 @@ TEST_F(HttpConnectionManagerImplTest, WebSocketNoThreadLocalCluster) { TEST_F(HttpConnectionManagerImplTest, WebSocketNoConnInPool) { setup(false, ""); - EXPECT_CALL(cluster_manager_, tcpConnPoolForCluster(_, _, _)).WillOnce(Return(nullptr)); + Upstream::MockHost::MockCreateConnectionData conn_info; + EXPECT_CALL(cluster_manager_, tcpConnForCluster_(_, _)).WillOnce(Return(conn_info)); expectOnUpstreamInitFailure(); EXPECT_EQ(1U, stats_.named_.downstream_cx_websocket_active_.value()); @@ -1521,37 +1520,14 @@ TEST_F(HttpConnectionManagerImplTest, WebSocketNoConnInPool) { TEST_F(HttpConnectionManagerImplTest, WebSocketDataAfterConnectFail) { setup(false, ""); - EXPECT_CALL(cluster_manager_, tcpConnPoolForCluster(_, _, _)).WillOnce(Return(&conn_pool_)); - - StreamDecoder* decoder = nullptr; - NiceMock encoder; - - configureRouteForWebsocket(route_config_provider_.route_config_->route_->route_entry_); - - EXPECT_CALL(*codec_, dispatch(_)).WillOnce(Invoke([&](Buffer::Instance& data) -> void { - decoder = &conn_manager_->newStream(encoder); - HeaderMapPtr headers{new TestHeaderMapImpl{{":authority", "host"}, - {":method", "GET"}, - {":path", "/"}, - {"connection", "Upgrade"}, - {"upgrade", "websocket"}}}; - decoder->decodeHeaders(std::move(headers), true); - data.drain(4); - })); - - Buffer::OwnedImpl fake_input("1234"); - conn_manager_->onData(fake_input, false); + Upstream::MockHost::MockCreateConnectionData conn_info; + EXPECT_CALL(cluster_manager_, tcpConnForCluster_(_, _)).WillOnce(Return(conn_info)); + expectOnUpstreamInitFailure(); EXPECT_EQ(1U, stats_.named_.downstream_cx_websocket_active_.value()); EXPECT_EQ(1U, stats_.named_.downstream_cx_websocket_total_.value()); EXPECT_EQ(0U, stats_.named_.downstream_cx_http1_active_.value()); - EXPECT_CALL(encoder, encodeHeaders(_, true)) - .WillOnce(Invoke([](const HeaderMap& headers, bool) -> void { - EXPECT_STREQ("504", headers.Status()->value().c_str()); - })); - - conn_pool_.poolFailure(Tcp::ConnectionPool::PoolFailureReason::RemoteConnectionFailure); filter_callbacks_.connection_.dispatcher_.clearDeferredDeleteList(); // This should get dropped, with no ASSERT or crash. @@ -1571,14 +1547,13 @@ TEST_F(HttpConnectionManagerImplTest, WebSocketMetadataMatch) { .WillByDefault(Return( &route_config_provider_.route_config_->route_->route_entry_.metadata_matches_criteria_)); - EXPECT_CALL(cluster_manager_, tcpConnPoolForCluster(_, _, _)) - .WillOnce(Invoke([&](const std::string&, Upstream::ResourcePriority, - Upstream::LoadBalancerContext* context) - -> Tcp::ConnectionPool::MockInstance* { + EXPECT_CALL(cluster_manager_, tcpConnForCluster_(_, _)) + .WillOnce(Invoke([&](const std::string&, Upstream::LoadBalancerContext* context) + -> Upstream::MockHost::MockCreateConnectionData { EXPECT_EQ( context->metadataMatchCriteria(), &route_config_provider_.route_config_->route_->route_entry_.metadata_matches_criteria_); - return nullptr; + return {}; })); expectOnUpstreamInitFailure(); @@ -1589,8 +1564,21 @@ TEST_F(HttpConnectionManagerImplTest, WebSocketMetadataMatch) { TEST_F(HttpConnectionManagerImplTest, WebSocketConnectTimeoutError) { setup(false, ""); - EXPECT_CALL(cluster_manager_, tcpConnPoolForCluster("fake_cluster", _, _)) - .WillOnce(Return(&conn_pool_)); + Event::MockTimer* connect_timer = + new NiceMock(&filter_callbacks_.connection_.dispatcher_); + NiceMock* upstream_connection = + new NiceMock(); + Upstream::MockHost::MockCreateConnectionData conn_info; + + conn_info.connection_ = upstream_connection; + conn_info.host_description_.reset(new Upstream::HostImpl( + cluster_manager_.thread_local_cluster_.cluster_.info_, "newhost", + Network::Utility::resolveUrl("tcp://127.0.0.1:80"), + envoy::api::v2::core::Metadata::default_instance(), 1, + envoy::api::v2::core::Locality().default_instance(), + envoy::api::v2::endpoint::Endpoint::HealthCheckConfig().default_instance())); + EXPECT_CALL(*connect_timer, enableTimer(_)); + EXPECT_CALL(cluster_manager_, tcpConnForCluster_("fake_cluster", _)).WillOnce(Return(conn_info)); StreamDecoder* decoder = nullptr; NiceMock encoder; @@ -1608,15 +1596,15 @@ TEST_F(HttpConnectionManagerImplTest, WebSocketConnectTimeoutError) { data.drain(4); })); - Buffer::OwnedImpl fake_input("1234"); - conn_manager_->onData(fake_input, false); - EXPECT_CALL(encoder, encodeHeaders(_, true)) .WillOnce(Invoke([](const HeaderMap& headers, bool) -> void { EXPECT_STREQ("504", headers.Status()->value().c_str()); })); - conn_pool_.poolFailure(Tcp::ConnectionPool::PoolFailureReason::Timeout); + Buffer::OwnedImpl fake_input("1234"); + conn_manager_->onData(fake_input, false); + + connect_timer->callback_(); filter_callbacks_.connection_.dispatcher_.clearDeferredDeleteList(); conn_manager_.reset(); } @@ -1624,8 +1612,21 @@ TEST_F(HttpConnectionManagerImplTest, WebSocketConnectTimeoutError) { TEST_F(HttpConnectionManagerImplTest, WebSocketConnectionFailure) { setup(false, ""); - EXPECT_CALL(cluster_manager_, tcpConnPoolForCluster("fake_cluster", _, _)) - .WillOnce(Return(&conn_pool_)); + Event::MockTimer* connect_timer = + new NiceMock(&filter_callbacks_.connection_.dispatcher_); + NiceMock* upstream_connection = + new NiceMock(); + Upstream::MockHost::MockCreateConnectionData conn_info; + + conn_info.connection_ = upstream_connection; + conn_info.host_description_.reset(new Upstream::HostImpl( + cluster_manager_.thread_local_cluster_.cluster_.info_, "newhost", + Network::Utility::resolveUrl("tcp://127.0.0.1:80"), + envoy::api::v2::core::Metadata::default_instance(), 1, + envoy::api::v2::core::Locality().default_instance(), + envoy::api::v2::endpoint::Endpoint::HealthCheckConfig().default_instance())); + EXPECT_CALL(*connect_timer, enableTimer(_)); + EXPECT_CALL(cluster_manager_, tcpConnForCluster_("fake_cluster", _)).WillOnce(Return(conn_info)); StreamDecoder* decoder = nullptr; NiceMock encoder; @@ -1643,16 +1644,16 @@ TEST_F(HttpConnectionManagerImplTest, WebSocketConnectionFailure) { data.drain(4); })); - Buffer::OwnedImpl fake_input("1234"); - conn_manager_->onData(fake_input, false); - EXPECT_CALL(encoder, encodeHeaders(_, true)) .WillOnce(Invoke([](const HeaderMap& headers, bool) -> void { EXPECT_STREQ("504", headers.Status()->value().c_str()); })); - conn_pool_.poolFailure(Tcp::ConnectionPool::PoolFailureReason::RemoteConnectionFailure); + Buffer::OwnedImpl fake_input("1234"); + conn_manager_->onData(fake_input, false); + // expectOnUpstreamInitFailure("504"); + upstream_connection->raiseEvent(Network::ConnectionEvent::RemoteClose); filter_callbacks_.connection_.dispatcher_.clearDeferredDeleteList(); conn_manager_.reset(); } @@ -1662,6 +1663,9 @@ TEST_F(HttpConnectionManagerImplTest, WebSocketPrefixAndAutoHostRewrite) { StreamDecoder* decoder = nullptr; NiceMock encoder; + NiceMock* upstream_connection = + new NiceMock(); + Upstream::MockHost::MockCreateConnectionData conn_info; HeaderMapPtr headers{new TestHeaderMapImpl{{":authority", "host"}, {":method", "GET"}, {":path", "/"}, @@ -1669,8 +1673,14 @@ TEST_F(HttpConnectionManagerImplTest, WebSocketPrefixAndAutoHostRewrite) { {"upgrade", "websocket"}}}; auto raw_header_ptr = headers.get(); - EXPECT_CALL(cluster_manager_, tcpConnPoolForCluster("fake_cluster", _, _)) - .WillOnce(Return(&conn_pool_)); + conn_info.connection_ = upstream_connection; + conn_info.host_description_.reset(new Upstream::HostImpl( + cluster_manager_.thread_local_cluster_.cluster_.info_, "newhost", + Network::Utility::resolveUrl("tcp://127.0.0.1:80"), + envoy::api::v2::core::Metadata::default_instance(), 1, + envoy::api::v2::core::Locality().default_instance(), + envoy::api::v2::endpoint::Endpoint::HealthCheckConfig().default_instance())); + EXPECT_CALL(cluster_manager_, tcpConnForCluster_("fake_cluster", _)).WillOnce(Return(conn_info)); configureRouteForWebsocket(route_config_provider_.route_config_->route_->route_entry_); @@ -1687,9 +1697,7 @@ TEST_F(HttpConnectionManagerImplTest, WebSocketPrefixAndAutoHostRewrite) { Buffer::OwnedImpl fake_input("1234"); conn_manager_->onData(fake_input, false); - - conn_pool_.host_->hostname_ = "newhost"; - conn_pool_.poolReady(upstream_conn_); + upstream_connection->raiseEvent(Network::ConnectionEvent::Connected); // rewritten authority header when auto_host_rewrite is true EXPECT_STREQ("newhost", raw_header_ptr->Host()->value().c_str()); @@ -1705,8 +1713,21 @@ TEST_F(HttpConnectionManagerImplTest, WebSocketPrefixAndAutoHostRewrite) { TEST_F(HttpConnectionManagerImplTest, WebSocketEarlyData) { setup(false, ""); - EXPECT_CALL(cluster_manager_, tcpConnPoolForCluster("fake_cluster", _, _)) - .WillOnce(Return(&conn_pool_)); + Event::MockTimer* connect_timer = + new NiceMock(&filter_callbacks_.connection_.dispatcher_); + NiceMock* upstream_connection = + new NiceMock(); + Upstream::MockHost::MockCreateConnectionData conn_info; + + conn_info.connection_ = upstream_connection; + conn_info.host_description_.reset(new Upstream::HostImpl( + cluster_manager_.thread_local_cluster_.cluster_.info_, "newhost", + Network::Utility::resolveUrl("tcp://127.0.0.1:80"), + envoy::api::v2::core::Metadata::default_instance(), 1, + envoy::api::v2::core::Locality().default_instance(), + envoy::api::v2::endpoint::Endpoint::HealthCheckConfig().default_instance())); + EXPECT_CALL(*connect_timer, enableTimer(_)); + EXPECT_CALL(cluster_manager_, tcpConnForCluster_("fake_cluster", _)).WillOnce(Return(conn_info)); StreamDecoder* decoder = nullptr; NiceMock encoder; @@ -1733,11 +1754,10 @@ TEST_F(HttpConnectionManagerImplTest, WebSocketEarlyData) { conn_manager_->onData(fake_input, false); - EXPECT_CALL(upstream_conn_, write(_, false)); - EXPECT_CALL(upstream_conn_, write(BufferEqual(&early_data), false)); + EXPECT_CALL(*upstream_connection, write(_, false)); + EXPECT_CALL(*upstream_connection, write(BufferEqual(&early_data), false)); EXPECT_CALL(filter_callbacks_.connection_, readDisable(false)); - conn_pool_.poolReady(upstream_conn_); - + upstream_connection->raiseEvent(Network::ConnectionEvent::Connected); filter_callbacks_.connection_.dispatcher_.clearDeferredDeleteList(); conn_manager_.reset(); } @@ -1745,8 +1765,21 @@ TEST_F(HttpConnectionManagerImplTest, WebSocketEarlyData) { TEST_F(HttpConnectionManagerImplTest, WebSocketEarlyDataConnectionFail) { setup(false, ""); - EXPECT_CALL(cluster_manager_, tcpConnPoolForCluster("fake_cluster", _, _)) - .WillOnce(Return(&conn_pool_)); + Event::MockTimer* connect_timer = + new NiceMock(&filter_callbacks_.connection_.dispatcher_); + NiceMock* upstream_connection = + new NiceMock(); + Upstream::MockHost::MockCreateConnectionData conn_info; + + conn_info.connection_ = upstream_connection; + conn_info.host_description_.reset(new Upstream::HostImpl( + cluster_manager_.thread_local_cluster_.cluster_.info_, "newhost", + Network::Utility::resolveUrl("tcp://127.0.0.1:80"), + envoy::api::v2::core::Metadata::default_instance(), 1, + envoy::api::v2::core::Locality().default_instance(), + envoy::api::v2::endpoint::Endpoint::HealthCheckConfig().default_instance())); + EXPECT_CALL(*connect_timer, enableTimer(_)); + EXPECT_CALL(cluster_manager_, tcpConnForCluster_("fake_cluster", _)).WillOnce(Return(conn_info)); StreamDecoder* decoder = nullptr; NiceMock encoder; @@ -1773,7 +1806,7 @@ TEST_F(HttpConnectionManagerImplTest, WebSocketEarlyDataConnectionFail) { conn_manager_->onData(fake_input, false); - conn_pool_.poolFailure(Tcp::ConnectionPool::PoolFailureReason::RemoteConnectionFailure); + upstream_connection->raiseEvent(Network::ConnectionEvent::RemoteClose); filter_callbacks_.connection_.dispatcher_.clearDeferredDeleteList(); // This should get dropped, with no crash or ASSERT. @@ -1786,8 +1819,21 @@ TEST_F(HttpConnectionManagerImplTest, WebSocketEarlyDataConnectionFail) { TEST_F(HttpConnectionManagerImplTest, WebSocketEarlyEndStream) { setup(false, ""); - EXPECT_CALL(cluster_manager_, tcpConnPoolForCluster("fake_cluster", _, _)) - .WillOnce(Return(&conn_pool_)); + Event::MockTimer* connect_timer = + new NiceMock(&filter_callbacks_.connection_.dispatcher_); + NiceMock* upstream_connection = + new NiceMock(); + Upstream::MockHost::MockCreateConnectionData conn_info; + + conn_info.connection_ = upstream_connection; + conn_info.host_description_.reset(new Upstream::HostImpl( + cluster_manager_.thread_local_cluster_.cluster_.info_, "newhost", + Network::Utility::resolveUrl("tcp://127.0.0.1:80"), + envoy::api::v2::core::Metadata::default_instance(), 1, + envoy::api::v2::core::Locality().default_instance(), + envoy::api::v2::endpoint::Endpoint::HealthCheckConfig().default_instance())); + EXPECT_CALL(*connect_timer, enableTimer(_)); + EXPECT_CALL(cluster_manager_, tcpConnForCluster_("fake_cluster", _)).WillOnce(Return(conn_info)); StreamDecoder* decoder = nullptr; NiceMock encoder; @@ -1809,9 +1855,9 @@ TEST_F(HttpConnectionManagerImplTest, WebSocketEarlyEndStream) { Buffer::OwnedImpl fake_input("1234"); conn_manager_->onData(fake_input, true); - EXPECT_CALL(upstream_conn_, write(_, false)); - EXPECT_CALL(upstream_conn_, write(_, true)).Times(0); - conn_pool_.poolReady(upstream_conn_); + EXPECT_CALL(*upstream_connection, write(_, false)); + EXPECT_CALL(*upstream_connection, write(_, true)).Times(0); + upstream_connection->raiseEvent(Network::ConnectionEvent::Connected); filter_callbacks_.connection_.dispatcher_.clearDeferredDeleteList(); conn_manager_.reset(); } diff --git a/test/common/network/filter_manager_impl_test.cc b/test/common/network/filter_manager_impl_test.cc index 9bcb366c4c42f..c8469a99f9336 100644 --- a/test/common/network/filter_manager_impl_test.cc +++ b/test/common/network/filter_manager_impl_test.cc @@ -153,8 +153,6 @@ TEST_F(NetworkFilterManagerTest, RateLimitAndTcpProxy) { InSequence s; NiceMock factory_context; NiceMock connection; - NiceMock upstream_connection; - NiceMock conn_pool; FilterManagerImpl manager(connection, *this); std::string rl_json = R"EOF( @@ -203,15 +201,21 @@ TEST_F(NetworkFilterManagerTest, RateLimitAndTcpProxy) { EXPECT_EQ(manager.initializeReadFilters(), true); - EXPECT_CALL(factory_context.cluster_manager_, tcpConnPoolForCluster("fake_cluster", _, _)) - .WillOnce(Return(&conn_pool)); + NiceMock* upstream_connection = + new NiceMock(); + Upstream::MockHost::MockCreateConnectionData conn_info; + conn_info.connection_ = upstream_connection; + conn_info.host_description_ = Upstream::makeTestHost( + factory_context.cluster_manager_.thread_local_cluster_.cluster_.info_, "tcp://127.0.0.1:80"); + EXPECT_CALL(factory_context.cluster_manager_, tcpConnForCluster_("fake_cluster", _)) + .WillOnce(Return(conn_info)); request_callbacks->complete(RateLimit::LimitStatus::OK); - conn_pool.poolReady(upstream_connection); + upstream_connection->raiseEvent(Network::ConnectionEvent::Connected); Buffer::OwnedImpl buffer("hello"); - EXPECT_CALL(upstream_connection, write(BufferEqual(&buffer), _)); + EXPECT_CALL(*upstream_connection, write(BufferEqual(&buffer), _)); read_buffer_.add("hello"); manager.onRead(); } diff --git a/test/common/tcp/conn_pool_test.cc b/test/common/tcp/conn_pool_test.cc index a6a080f83910d..90961972724e1 100644 --- a/test/common/tcp/conn_pool_test.cc +++ b/test/common/tcp/conn_pool_test.cc @@ -34,9 +34,9 @@ namespace Tcp { * Mock callbacks used for conn pool testing. */ struct ConnPoolCallbacks : public Tcp::ConnectionPool::Callbacks { - void onPoolReady(ConnectionPool::ConnectionDataPtr&& conn, + void onPoolReady(ConnectionPool::ConnectionData& conn, Upstream::HostDescriptionConstSharedPtr host) override { - conn_data_ = std::move(conn); + conn_data_ = &conn; host_ = host; pool_ready_.ready(); } @@ -50,7 +50,7 @@ struct ConnPoolCallbacks : public Tcp::ConnectionPool::Callbacks { ReadyWatcher pool_failure_; ReadyWatcher pool_ready_; - ConnectionPool::ConnectionDataPtr conn_data_{}; + ConnectionPool::ConnectionData* conn_data_{}; absl::optional reason_; Upstream::HostDescriptionConstSharedPtr host_; }; @@ -232,7 +232,7 @@ struct ActiveTestConn { void expectNewConn() { EXPECT_CALL(callbacks_.pool_ready_, ready()); } - void releaseConn() { callbacks_.conn_data_.reset(); } + void releaseConn() { callbacks_.conn_data_->release(); } void verifyConn() { EXPECT_EQ(&callbacks_.conn_data_->connection(), @@ -589,7 +589,7 @@ TEST_F(TcpConnPoolImplTest, DisconnectWhilePending) { conn_pool_.test_conns_[0].connection_->raiseEvent(Network::ConnectionEvent::Connected); EXPECT_CALL(conn_pool_, onConnReleasedForTest()); - callbacks2.conn_data_.reset(); + callbacks2.conn_data_->release(); // Disconnect EXPECT_CALL(conn_pool_, onConnDestroyedForTest()); @@ -625,11 +625,11 @@ TEST_F(TcpConnPoolImplTest, MaxConnections) { EXPECT_CALL(conn_pool_, onConnReleasedForTest()); conn_pool_.expectEnableUpstreamReady(); EXPECT_CALL(callbacks2.pool_ready_, ready()); - callbacks.conn_data_.reset(); + callbacks.conn_data_->release(); conn_pool_.expectAndRunUpstreamReady(); EXPECT_CALL(conn_pool_, onConnReleasedForTest()); - callbacks2.conn_data_.reset(); + callbacks2.conn_data_->release(); // Cause the connection to go away. EXPECT_CALL(conn_pool_, onConnDestroyedForTest()); @@ -657,7 +657,7 @@ TEST_F(TcpConnPoolImplTest, MaxRequestsPerConnection) { EXPECT_CALL(conn_pool_, onConnReleasedForTest()); EXPECT_CALL(conn_pool_, onConnDestroyedForTest()); - callbacks.conn_data_.reset(); + callbacks.conn_data_->release(); dispatcher_.clearDeferredDeleteList(); EXPECT_EQ(0U, cluster_->stats_.upstream_cx_destroy_with_active_rq_.value()); @@ -743,7 +743,7 @@ TEST_F(TcpConnPoolImplDestructorTest, TestReadyConnectionsAreClosed) { prepareConn(); // Transition connection to ready list - callbacks_->conn_data_.reset(); + callbacks_->conn_data_->release(); EXPECT_CALL(*connection_, close(Network::ConnectionCloseType::NoFlush)); EXPECT_CALL(dispatcher_, clearDeferredDeleteList()); diff --git a/test/common/tcp_proxy/tcp_proxy_test.cc b/test/common/tcp_proxy/tcp_proxy_test.cc index 73568c61de74e..a8616176943ba 100644 --- a/test/common/tcp_proxy/tcp_proxy_test.cc +++ b/test/common/tcp_proxy/tcp_proxy_test.cc @@ -18,7 +18,6 @@ #include "test/mocks/network/mocks.h" #include "test/mocks/runtime/mocks.h" #include "test/mocks/server/mocks.h" -#include "test/mocks/tcp/mocks.h" #include "test/mocks/upstream/host.h" #include "test/mocks/upstream/mocks.h" #include "test/test_common/printers.h" @@ -26,7 +25,6 @@ #include "gmock/gmock.h" #include "gtest/gtest.h" -using testing::Invoke; using testing::MatchesRegex; using testing::NiceMock; using testing::Return; @@ -381,50 +379,53 @@ class TcpProxyTest : public testing::Test { upstream_local_address_ = Network::Utility::resolveUrl("tcp://2.2.2.2:50000"); upstream_remote_address_ = Network::Utility::resolveUrl("tcp://127.0.0.1:80"); if (connections >= 1) { + { + testing::InSequence sequence; + for (uint32_t i = 0; i < connections; i++) { + connect_timers_.push_back( + new NiceMock(&filter_callbacks_.connection_.dispatcher_)); + EXPECT_CALL(*connect_timers_.at(i), enableTimer(_)); + } + } + for (uint32_t i = 0; i < connections; i++) { - upstream_connections_.push_back( - std::make_unique>()); - upstream_connection_data_.push_back( - std::make_unique>()); - ON_CALL(*upstream_connection_data_.back(), connection()) - .WillByDefault(ReturnRef(*upstream_connections_.back())); + upstream_connections_.push_back(new NiceMock()); upstream_hosts_.push_back(std::make_shared>()); - conn_pool_handles_.push_back( - std::make_unique>()); + conn_infos_.push_back(Upstream::MockHost::MockCreateConnectionData()); + conn_infos_.at(i).connection_ = upstream_connections_.back(); + conn_infos_.at(i).host_description_ = upstream_hosts_.back(); ON_CALL(*upstream_hosts_.at(i), cluster()) .WillByDefault(ReturnPointee( factory_context_.cluster_manager_.thread_local_cluster_.cluster_.info_)); ON_CALL(*upstream_hosts_.at(i), address()).WillByDefault(Return(upstream_remote_address_)); upstream_connections_.at(i)->local_address_ = upstream_local_address_; + EXPECT_CALL(*upstream_connections_.at(i), addReadFilter(_)) + .WillOnce(SaveArg<0>(&upstream_read_filter_)); EXPECT_CALL(*upstream_connections_.at(i), dispatcher()) .WillRepeatedly(ReturnRef(filter_callbacks_.connection_.dispatcher_)); + EXPECT_CALL(*upstream_connections_.at(i), enableHalfClose(true)); } } { testing::InSequence sequence; for (uint32_t i = 0; i < connections; i++) { - EXPECT_CALL(factory_context_.cluster_manager_, tcpConnPoolForCluster("fake_cluster", _, _)) - .WillOnce(Return(&conn_pool_)) - .RetiresOnSaturation(); - EXPECT_CALL(conn_pool_, newConnection(_)) - .WillOnce(Invoke( - [=](Tcp::ConnectionPool::Callbacks& cb) -> Tcp::ConnectionPool::Cancellable* { - conn_pool_callbacks_.push_back(&cb); - return conn_pool_handles_.at(i).get(); - })) + EXPECT_CALL(factory_context_.cluster_manager_, tcpConnForCluster_("fake_cluster", _)) + .WillOnce(Return(conn_infos_.at(i))) .RetiresOnSaturation(); } - EXPECT_CALL(factory_context_.cluster_manager_, tcpConnPoolForCluster("fake_cluster", _, _)) - .WillRepeatedly(Return(nullptr)); + EXPECT_CALL(factory_context_.cluster_manager_, tcpConnForCluster_("fake_cluster", _)) + .WillRepeatedly(Return(Upstream::MockHost::MockCreateConnectionData())); } filter_.reset(new Filter(config_, factory_context_.cluster_manager_)); EXPECT_CALL(filter_callbacks_.connection_, readDisable(true)); EXPECT_CALL(filter_callbacks_.connection_, enableHalfClose(true)); filter_->initializeReadFilterCallbacks(filter_callbacks_); - EXPECT_EQ(Network::FilterStatus::StopIteration, filter_->onNewConnection()); + EXPECT_EQ(connections >= 1 ? Network::FilterStatus::Continue + : Network::FilterStatus::StopIteration, + filter_->onNewConnection()); EXPECT_EQ(absl::optional(), filter_->computeHashKey()); EXPECT_EQ(&filter_callbacks_.connection_, filter_->downstreamConnection()); @@ -434,37 +435,19 @@ class TcpProxyTest : public testing::Test { void setup(uint32_t connections) { setup(connections, defaultConfig()); } void raiseEventUpstreamConnected(uint32_t conn_index) { + EXPECT_CALL(*connect_timers_.at(conn_index), disableTimer()); EXPECT_CALL(filter_callbacks_.connection_, readDisable(false)); - EXPECT_CALL(*upstream_connection_data_.at(conn_index), addUpstreamCallbacks(_)) - .WillOnce(Invoke([=](Tcp::ConnectionPool::UpstreamCallbacks& cb) -> void { - upstream_callbacks_ = &cb; - - // Simulate TCP conn pool upstream callbacks. This is safe because the TCP proxy never - // releases a connection so all events go to the same UpstreamCallbacks instance. - upstream_connections_.at(conn_index)->addConnectionCallbacks(cb); - })); - EXPECT_CALL(*upstream_connections_.at(conn_index), enableHalfClose(true)); - conn_pool_callbacks_.at(conn_index) - ->onPoolReady(std::move(upstream_connection_data_.at(conn_index)), - upstream_hosts_.at(conn_index)); - } - - void raiseEventUpstreamConnectFailed(uint32_t conn_index, - Tcp::ConnectionPool::PoolFailureReason reason) { - conn_pool_callbacks_.at(conn_index)->onPoolFailure(reason, upstream_hosts_.at(conn_index)); + upstream_connections_.at(conn_index)->raiseEvent(Network::ConnectionEvent::Connected); } ConfigSharedPtr config_; NiceMock filter_callbacks_; NiceMock factory_context_; std::vector>> upstream_hosts_{}; - std::vector>> upstream_connections_{}; - std::vector>> - upstream_connection_data_{}; - std::vector conn_pool_callbacks_; - std::vector>> conn_pool_handles_; - NiceMock conn_pool_; - Tcp::ConnectionPool::UpstreamCallbacks* upstream_callbacks_; + std::vector*> upstream_connections_{}; + std::vector conn_infos_; + Network::ReadFilterSharedPtr upstream_read_filter_; + std::vector*> connect_timers_; std::unique_ptr filter_; StringViewSaver access_log_data_; Network::Address::InstanceConstSharedPtr upstream_local_address_; @@ -478,65 +461,67 @@ TEST_F(TcpProxyTest, HalfCloseProxy) { EXPECT_CALL(filter_callbacks_.connection_, close(_)).Times(0); EXPECT_CALL(*upstream_connections_.at(0), close(_)).Times(0); - raiseEventUpstreamConnected(0); - Buffer::OwnedImpl buffer("hello"); EXPECT_CALL(*upstream_connections_.at(0), write(BufferEqual(&buffer), true)); filter_->onData(buffer, true); + raiseEventUpstreamConnected(0); + Buffer::OwnedImpl response("world"); EXPECT_CALL(filter_callbacks_.connection_, write(BufferEqual(&response), true)); - upstream_callbacks_->onUpstreamData(response, true); + upstream_read_filter_->onData(response, true); EXPECT_CALL(filter_callbacks_.connection_, close(_)); - upstream_callbacks_->onEvent(Network::ConnectionEvent::RemoteClose); + EXPECT_CALL(filter_callbacks_.connection_.dispatcher_, + deferredDelete_(upstream_connections_.at(0))); + upstream_connections_.at(0)->raiseEvent(Network::ConnectionEvent::RemoteClose); } // Test that downstream is closed after an upstream LocalClose. TEST_F(TcpProxyTest, UpstreamLocalDisconnect) { setup(1); - raiseEventUpstreamConnected(0); - Buffer::OwnedImpl buffer("hello"); EXPECT_CALL(*upstream_connections_.at(0), write(BufferEqual(&buffer), false)); filter_->onData(buffer, false); + raiseEventUpstreamConnected(0); + Buffer::OwnedImpl response("world"); EXPECT_CALL(filter_callbacks_.connection_, write(BufferEqual(&response), _)); - upstream_callbacks_->onUpstreamData(response, false); + upstream_read_filter_->onData(response, false); EXPECT_CALL(filter_callbacks_.connection_, close(_)); - upstream_callbacks_->onEvent(Network::ConnectionEvent::LocalClose); + upstream_connections_.at(0)->raiseEvent(Network::ConnectionEvent::LocalClose); } // Test that downstream is closed after an upstream RemoteClose. TEST_F(TcpProxyTest, UpstreamRemoteDisconnect) { setup(1); - raiseEventUpstreamConnected(0); - Buffer::OwnedImpl buffer("hello"); EXPECT_CALL(*upstream_connections_.at(0), write(BufferEqual(&buffer), false)); filter_->onData(buffer, false); + raiseEventUpstreamConnected(0); + Buffer::OwnedImpl response("world"); EXPECT_CALL(filter_callbacks_.connection_, write(BufferEqual(&response), _)); - upstream_callbacks_->onUpstreamData(response, false); + upstream_read_filter_->onData(response, false); EXPECT_CALL(filter_callbacks_.connection_, close(Network::ConnectionCloseType::FlushWrite)); - upstream_callbacks_->onEvent(Network::ConnectionEvent::RemoteClose); + upstream_connections_.at(0)->raiseEvent(Network::ConnectionEvent::RemoteClose); } // Test that reconnect is attempted after a local connect failure TEST_F(TcpProxyTest, ConnectAttemptsUpstreamLocalFail) { envoy::config::filter::network::tcp_proxy::v2::TcpProxy config = defaultConfig(); config.mutable_max_connect_attempts()->set_value(2); - setup(2, config); - raiseEventUpstreamConnectFailed(0, - Tcp::ConnectionPool::PoolFailureReason::LocalConnectionFailure); + EXPECT_CALL(filter_callbacks_.connection_.dispatcher_, + deferredDelete_(upstream_connections_.at(0))); + upstream_connections_.at(0)->raiseEvent(Network::ConnectionEvent::LocalClose); raiseEventUpstreamConnected(1); EXPECT_EQ(0U, factory_context_.cluster_manager_.thread_local_cluster_.cluster_.info_->stats_store_ @@ -550,8 +535,9 @@ TEST_F(TcpProxyTest, ConnectAttemptsUpstreamRemoteFail) { config.mutable_max_connect_attempts()->set_value(2); setup(2, config); - raiseEventUpstreamConnectFailed(0, - Tcp::ConnectionPool::PoolFailureReason::RemoteConnectionFailure); + EXPECT_CALL(filter_callbacks_.connection_.dispatcher_, + deferredDelete_(upstream_connections_.at(0))); + upstream_connections_.at(0)->raiseEvent(Network::ConnectionEvent::RemoteClose); raiseEventUpstreamConnected(1); EXPECT_EQ(0U, factory_context_.cluster_manager_.thread_local_cluster_.cluster_.info_->stats_store_ @@ -565,7 +551,8 @@ TEST_F(TcpProxyTest, ConnectAttemptsUpstreamTimeout) { config.mutable_max_connect_attempts()->set_value(2); setup(2, config); - raiseEventUpstreamConnectFailed(0, Tcp::ConnectionPool::PoolFailureReason::Timeout); + EXPECT_CALL(*upstream_connections_.at(0), close(Network::ConnectionCloseType::NoFlush)); + connect_timers_.at(0)->callback_(); raiseEventUpstreamConnected(1); EXPECT_EQ(0U, factory_context_.cluster_manager_.thread_local_cluster_.cluster_.info_->stats_store_ @@ -579,21 +566,38 @@ TEST_F(TcpProxyTest, ConnectAttemptsLimit) { config.mutable_max_connect_attempts()->set_value(3); setup(3, config); - EXPECT_CALL(upstream_hosts_.at(0)->outlier_detector_, - putResult(Upstream::Outlier::Result::TIMEOUT)); - EXPECT_CALL(upstream_hosts_.at(1)->outlier_detector_, - putResult(Upstream::Outlier::Result::CONNECT_FAILED)); - EXPECT_CALL(upstream_hosts_.at(2)->outlier_detector_, - putResult(Upstream::Outlier::Result::CONNECT_FAILED)); - - EXPECT_CALL(filter_callbacks_.connection_, close(Network::ConnectionCloseType::NoFlush)); + { + testing::InSequence sequence; + EXPECT_CALL(*upstream_connections_.at(0), close(Network::ConnectionCloseType::NoFlush)); + EXPECT_CALL(filter_callbacks_.connection_.dispatcher_, + deferredDelete_(upstream_connections_.at(0))); + EXPECT_CALL(filter_callbacks_.connection_.dispatcher_, + deferredDelete_(upstream_connections_.at(1))); + EXPECT_CALL(filter_callbacks_.connection_.dispatcher_, + deferredDelete_(upstream_connections_.at(2))); + EXPECT_CALL(filter_callbacks_.connection_, close(Network::ConnectionCloseType::NoFlush)); + } // Try both failure modes - raiseEventUpstreamConnectFailed(0, Tcp::ConnectionPool::PoolFailureReason::Timeout); - raiseEventUpstreamConnectFailed(1, - Tcp::ConnectionPool::PoolFailureReason::RemoteConnectionFailure); - raiseEventUpstreamConnectFailed(2, - Tcp::ConnectionPool::PoolFailureReason::RemoteConnectionFailure); + connect_timers_.at(0)->callback_(); + upstream_connections_.at(1)->raiseEvent(Network::ConnectionEvent::RemoteClose); + upstream_connections_.at(2)->raiseEvent(Network::ConnectionEvent::RemoteClose); + + EXPECT_EQ(1U, factory_context_.cluster_manager_.thread_local_cluster_.cluster_.info_->stats_store_ + .counter("upstream_cx_connect_timeout") + .value()); + EXPECT_EQ(2U, factory_context_.cluster_manager_.thread_local_cluster_.cluster_.info_->stats_store_ + .counter("upstream_cx_connect_fail") + .value()); + EXPECT_EQ(1U, factory_context_.cluster_manager_.thread_local_cluster_.cluster_.info_->stats_store_ + .counter("upstream_cx_connect_attempts_exceeded") + .value()); + EXPECT_EQ(0U, factory_context_.cluster_manager_.thread_local_cluster_.cluster_.info_->stats_store_ + .counter("upstream_cx_overflow") + .value()); + EXPECT_EQ(0U, factory_context_.cluster_manager_.thread_local_cluster_.cluster_.info_->stats_store_ + .counter("upstream_cx_no_successful_host") + .value()); } // Test that the tcp proxy sends the correct notifications to the outlier detector @@ -604,12 +608,11 @@ TEST_F(TcpProxyTest, OutlierDetection) { EXPECT_CALL(upstream_hosts_.at(0)->outlier_detector_, putResult(Upstream::Outlier::Result::TIMEOUT)); - raiseEventUpstreamConnectFailed(0, Tcp::ConnectionPool::PoolFailureReason::Timeout); + connect_timers_.at(0)->callback_(); EXPECT_CALL(upstream_hosts_.at(1)->outlier_detector_, putResult(Upstream::Outlier::Result::CONNECT_FAILED)); - raiseEventUpstreamConnectFailed(1, - Tcp::ConnectionPool::PoolFailureReason::RemoteConnectionFailure); + upstream_connections_.at(1)->raiseEvent(Network::ConnectionEvent::RemoteClose); EXPECT_CALL(upstream_hosts_.at(2)->outlier_detector_, putResult(Upstream::Outlier::Result::SUCCESS)); @@ -619,21 +622,21 @@ TEST_F(TcpProxyTest, OutlierDetection) { TEST_F(TcpProxyTest, UpstreamDisconnectDownstreamFlowControl) { setup(1); - raiseEventUpstreamConnected(0); - Buffer::OwnedImpl buffer("hello"); EXPECT_CALL(*upstream_connections_.at(0), write(BufferEqual(&buffer), _)); filter_->onData(buffer, false); + raiseEventUpstreamConnected(0); + Buffer::OwnedImpl response("world"); EXPECT_CALL(filter_callbacks_.connection_, write(BufferEqual(&response), _)); - upstream_callbacks_->onUpstreamData(response, false); + upstream_read_filter_->onData(response, false); EXPECT_CALL(*upstream_connections_.at(0), readDisable(true)); filter_callbacks_.connection_.runHighWatermarkCallbacks(); EXPECT_CALL(filter_callbacks_.connection_, close(Network::ConnectionCloseType::FlushWrite)); - upstream_callbacks_->onEvent(Network::ConnectionEvent::RemoteClose); + upstream_connections_.at(0)->raiseEvent(Network::ConnectionEvent::RemoteClose); filter_callbacks_.connection_.runLowWatermarkCallbacks(); } @@ -641,15 +644,15 @@ TEST_F(TcpProxyTest, UpstreamDisconnectDownstreamFlowControl) { TEST_F(TcpProxyTest, DownstreamDisconnectRemote) { setup(1); - raiseEventUpstreamConnected(0); - Buffer::OwnedImpl buffer("hello"); EXPECT_CALL(*upstream_connections_.at(0), write(BufferEqual(&buffer), _)); filter_->onData(buffer, false); + raiseEventUpstreamConnected(0); + Buffer::OwnedImpl response("world"); EXPECT_CALL(filter_callbacks_.connection_, write(BufferEqual(&response), _)); - upstream_callbacks_->onUpstreamData(response, false); + upstream_read_filter_->onData(response, false); EXPECT_CALL(*upstream_connections_.at(0), close(Network::ConnectionCloseType::FlushWrite)); filter_callbacks_.connection_.raiseEvent(Network::ConnectionEvent::RemoteClose); @@ -658,15 +661,15 @@ TEST_F(TcpProxyTest, DownstreamDisconnectRemote) { TEST_F(TcpProxyTest, DownstreamDisconnectLocal) { setup(1); - raiseEventUpstreamConnected(0); - Buffer::OwnedImpl buffer("hello"); EXPECT_CALL(*upstream_connections_.at(0), write(BufferEqual(&buffer), _)); filter_->onData(buffer, false); + raiseEventUpstreamConnected(0); + Buffer::OwnedImpl response("world"); EXPECT_CALL(filter_callbacks_.connection_, write(BufferEqual(&response), _)); - upstream_callbacks_->onUpstreamData(response, false); + upstream_read_filter_->onData(response, false); EXPECT_CALL(*upstream_connections_.at(0), close(Network::ConnectionCloseType::NoFlush)); filter_callbacks_.connection_.raiseEvent(Network::ConnectionEvent::LocalClose); @@ -675,8 +678,16 @@ TEST_F(TcpProxyTest, DownstreamDisconnectLocal) { TEST_F(TcpProxyTest, UpstreamConnectTimeout) { setup(1, accessLogConfig("%RESPONSE_FLAGS%")); + Buffer::OwnedImpl buffer("hello"); + EXPECT_CALL(*upstream_connections_.at(0), write(BufferEqual(&buffer), _)); + filter_->onData(buffer, false); + EXPECT_CALL(filter_callbacks_.connection_, close(Network::ConnectionCloseType::NoFlush)); - raiseEventUpstreamConnectFailed(0, Tcp::ConnectionPool::PoolFailureReason::Timeout); + EXPECT_CALL(*upstream_connections_.at(0), close(Network::ConnectionCloseType::NoFlush)); + connect_timers_.at(0)->callback_(); + EXPECT_EQ(1U, factory_context_.cluster_manager_.thread_local_cluster_.cluster_.info_->stats_store_ + .counter("upstream_cx_connect_timeout") + .value()); filter_.reset(); EXPECT_EQ(access_log_data_, "UF"); @@ -733,9 +744,15 @@ TEST_F(TcpProxyTest, DisconnectBeforeData) { TEST_F(TcpProxyTest, UpstreamConnectFailure) { setup(1, accessLogConfig("%RESPONSE_FLAGS%")); + Buffer::OwnedImpl buffer("hello"); + filter_->onData(buffer, false); + EXPECT_CALL(filter_callbacks_.connection_, close(Network::ConnectionCloseType::NoFlush)); - raiseEventUpstreamConnectFailed(0, - Tcp::ConnectionPool::PoolFailureReason::RemoteConnectionFailure); + EXPECT_CALL(*connect_timers_.at(0), disableTimer()); + upstream_connections_.at(0)->raiseEvent(Network::ConnectionEvent::RemoteClose); + EXPECT_EQ(1U, factory_context_.cluster_manager_.thread_local_cluster_.cluster_.info_->stats_store_ + .counter("upstream_cx_connect_fail") + .value()); filter_.reset(); EXPECT_EQ(access_log_data_, "UF"); @@ -753,6 +770,10 @@ TEST_F(TcpProxyTest, UpstreamConnectionLimit) { filter_->initializeReadFilterCallbacks(filter_callbacks_); filter_->onNewConnection(); + EXPECT_EQ(1U, factory_context_.cluster_manager_.thread_local_cluster_.cluster_.info_->stats_store_ + .counter("upstream_cx_overflow") + .value()); + filter_.reset(); EXPECT_EQ(access_log_data_, "UO"); } @@ -774,7 +795,7 @@ TEST_F(TcpProxyTest, IdleTimeout) { buffer.add("hello2"); EXPECT_CALL(*idle_timer, enableTimer(std::chrono::milliseconds(1000))); - upstream_callbacks_->onUpstreamData(buffer, false); + upstream_read_filter_->onData(buffer, false); EXPECT_CALL(*idle_timer, enableTimer(std::chrono::milliseconds(1000))); filter_callbacks_.connection_.raiseBytesSentCallbacks(1); @@ -813,13 +834,12 @@ TEST_F(TcpProxyTest, IdleTimerDisabledUpstreamClose) { raiseEventUpstreamConnected(0); EXPECT_CALL(*idle_timer, disableTimer()); - upstream_callbacks_->onEvent(Network::ConnectionEvent::RemoteClose); + upstream_connections_.at(0)->raiseEvent(Network::ConnectionEvent::RemoteClose); } // Test that access log fields %UPSTREAM_HOST% and %UPSTREAM_CLUSTER% are correctly logged. TEST_F(TcpProxyTest, AccessLogUpstreamHost) { setup(1, accessLogConfig("%UPSTREAM_HOST% %UPSTREAM_CLUSTER%")); - raiseEventUpstreamConnected(0); filter_.reset(); EXPECT_EQ(access_log_data_, "127.0.0.1:80 fake_cluster"); } @@ -827,7 +847,6 @@ TEST_F(TcpProxyTest, AccessLogUpstreamHost) { // Test that access log field %UPSTREAM_LOCAL_ADDRESS% is correctly logged. TEST_F(TcpProxyTest, AccessLogUpstreamLocalAddress) { setup(1, accessLogConfig("%UPSTREAM_LOCAL_ADDRESS%")); - raiseEventUpstreamConnected(0); filter_.reset(); EXPECT_EQ(access_log_data_, "2.2.2.2:50000"); } @@ -854,10 +873,10 @@ TEST_F(TcpProxyTest, AccessLogBytesRxTxDuration) { Buffer::OwnedImpl buffer("a"); filter_->onData(buffer, false); Buffer::OwnedImpl response("bb"); - upstream_callbacks_->onUpstreamData(response, false); + upstream_read_filter_->onData(response, false); std::this_thread::sleep_for(std::chrono::milliseconds(1)); - upstream_callbacks_->onEvent(Network::ConnectionEvent::RemoteClose); + upstream_connections_.at(0)->raiseEvent(Network::ConnectionEvent::RemoteClose); filter_.reset(); EXPECT_THAT(access_log_data_, @@ -870,8 +889,7 @@ TEST_F(TcpProxyTest, UpstreamFlushNoTimeout) { setup(1); raiseEventUpstreamConnected(0); - EXPECT_CALL(*upstream_connections_.at(0), - close(Network::ConnectionCloseType::FlushWrite)) + EXPECT_CALL(*upstream_connections_.at(0), close(Network::ConnectionCloseType::FlushWrite)) .WillOnce(Return()); // Cancel default action of raising LocalClose EXPECT_CALL(*upstream_connections_.at(0), state()) .WillOnce(Return(Network::Connection::State::Closing)); @@ -884,7 +902,7 @@ TEST_F(TcpProxyTest, UpstreamFlushNoTimeout) { upstream_connections_.at(0)->raiseBytesSentCallbacks(1); // Simulate flush complete. - upstream_callbacks_->onEvent(Network::ConnectionEvent::LocalClose); + upstream_connections_.at(0)->raiseEvent(Network::ConnectionEvent::LocalClose); EXPECT_EQ(1U, config_->stats().upstream_flush_total_.value()); EXPECT_EQ(0U, config_->stats().upstream_flush_active_.value()); } @@ -901,8 +919,7 @@ TEST_F(TcpProxyTest, UpstreamFlushTimeoutConfigured) { EXPECT_CALL(*idle_timer, enableTimer(_)); raiseEventUpstreamConnected(0); - EXPECT_CALL(*upstream_connections_.at(0), - close(Network::ConnectionCloseType::FlushWrite)) + EXPECT_CALL(*upstream_connections_.at(0), close(Network::ConnectionCloseType::FlushWrite)) .WillOnce(Return()); // Cancel default action of raising LocalClose EXPECT_CALL(*upstream_connections_.at(0), state()) .WillOnce(Return(Network::Connection::State::Closing)); @@ -916,7 +933,7 @@ TEST_F(TcpProxyTest, UpstreamFlushTimeoutConfigured) { // Simulate flush complete. EXPECT_CALL(*idle_timer, disableTimer()); - upstream_callbacks_->onEvent(Network::ConnectionEvent::LocalClose); + upstream_connections_.at(0)->raiseEvent(Network::ConnectionEvent::LocalClose); EXPECT_EQ(1U, config_->stats().upstream_flush_total_.value()); EXPECT_EQ(0U, config_->stats().upstream_flush_active_.value()); EXPECT_EQ(0U, config_->stats().idle_timeout_.value()); @@ -933,8 +950,7 @@ TEST_F(TcpProxyTest, UpstreamFlushTimeoutExpired) { EXPECT_CALL(*idle_timer, enableTimer(_)); raiseEventUpstreamConnected(0); - EXPECT_CALL(*upstream_connections_.at(0), - close(Network::ConnectionCloseType::FlushWrite)) + EXPECT_CALL(*upstream_connections_.at(0), close(Network::ConnectionCloseType::FlushWrite)) .WillOnce(Return()); // Cancel default action of raising LocalClose EXPECT_CALL(*upstream_connections_.at(0), state()) .WillOnce(Return(Network::Connection::State::Closing)); @@ -956,8 +972,7 @@ TEST_F(TcpProxyTest, UpstreamFlushReceiveUpstreamData) { setup(1); raiseEventUpstreamConnected(0); - EXPECT_CALL(*upstream_connections_.at(0), - close(Network::ConnectionCloseType::FlushWrite)) + EXPECT_CALL(*upstream_connections_.at(0), close(Network::ConnectionCloseType::FlushWrite)) .WillOnce(Return()); // Cancel default action of raising LocalClose EXPECT_CALL(*upstream_connections_.at(0), state()) .WillOnce(Return(Network::Connection::State::Closing)); @@ -969,7 +984,7 @@ TEST_F(TcpProxyTest, UpstreamFlushReceiveUpstreamData) { // Send some bytes; no timeout configured so this should be a no-op (not a crash). Buffer::OwnedImpl buffer("a"); EXPECT_CALL(*upstream_connections_.at(0), close(Network::ConnectionCloseType::NoFlush)); - upstream_callbacks_->onUpstreamData(buffer, false); + upstream_read_filter_->onData(buffer, false); } class TcpProxyRoutingTest : public testing::Test { @@ -1034,7 +1049,7 @@ TEST_F(TcpProxyRoutingTest, RoutableConnection) { connection_.local_address_ = std::make_shared("1.2.3.4", 9999); // Expect filter to try to open a connection to specified cluster. - EXPECT_CALL(factory_context_.cluster_manager_, tcpConnPoolForCluster("fake_cluster", _, _)); + EXPECT_CALL(factory_context_.cluster_manager_, tcpConnForCluster_("fake_cluster", _)); filter_->onNewConnection(); diff --git a/test/extensions/filters/network/thrift_proxy/router_test.cc b/test/extensions/filters/network/thrift_proxy/router_test.cc index 0fe593ed2cddd..94afe87c750d1 100644 --- a/test/extensions/filters/network/thrift_proxy/router_test.cc +++ b/test/extensions/filters/network/thrift_proxy/router_test.cc @@ -79,6 +79,9 @@ class ThriftRouterTestBase { route_ = new NiceMock(); route_ptr_.reset(route_); + host_ = new NiceMock(); + host_ptr_.reset(host_); + router_.reset(new Router(context_.clusterManager())); EXPECT_EQ(nullptr, router_->downstreamConnection()); @@ -95,8 +98,16 @@ class ThriftRouterTestBase { EXPECT_CALL(*route_, routeEntry()).WillOnce(Return(&route_entry_)); EXPECT_CALL(route_entry_, clusterName()).WillRepeatedly(ReturnRef(cluster_name_)); + EXPECT_CALL(context_.cluster_manager_.tcp_conn_pool_, newConnection(_)) + .WillOnce( + Invoke([&](Tcp::ConnectionPool::Callbacks& cb) -> Tcp::ConnectionPool::Cancellable* { + conn_pool_callbacks_ = &cb; + return &handle_; + })); + EXPECT_EQ(ThriftFilters::FilterStatus::StopIteration, router_->messageBegin(method_name_, msg_type_, seq_id_)); + EXPECT_NE(nullptr, conn_pool_callbacks_); NiceMock connection; EXPECT_CALL(callbacks_, connection()).WillRepeatedly(Return(&connection)); @@ -109,7 +120,7 @@ class ThriftRouterTestBase { } void connectUpstream() { - EXPECT_CALL(*context_.cluster_manager_.tcp_conn_pool_.connection_data_, addUpstreamCallbacks(_)) + EXPECT_CALL(conn_data_, addUpstreamCallbacks(_)) .WillOnce(Invoke([&](Tcp::ConnectionPool::UpstreamCallbacks& cb) -> void { upstream_callbacks_ = &cb; })); @@ -124,8 +135,7 @@ class ThriftRouterTestBase { EXPECT_CALL(*protocol_, writeMessageBegin(_, method_name_, msg_type_, seq_id_)); EXPECT_CALL(callbacks_, continueDecoding()); - - context_.cluster_manager_.tcp_conn_pool_.poolReady(upstream_connection_); + conn_pool_callbacks_->onPoolReady(conn_data_, host_ptr_); EXPECT_NE(nullptr, upstream_callbacks_); } @@ -184,10 +194,10 @@ class ThriftRouterTestBase { void completeRequest() { EXPECT_CALL(*protocol_, writeMessageEnd(_)); EXPECT_CALL(*transport_, encodeFrame(_, _)); - EXPECT_CALL(upstream_connection_, write(_, false)); + EXPECT_CALL(conn_data_.connection_, write(_, false)); if (msg_type_ == MessageType::Oneway) { - EXPECT_CALL(context_.cluster_manager_.tcp_conn_pool_, released(Ref(upstream_connection_))); + EXPECT_CALL(conn_data_, release()); } EXPECT_EQ(ThriftFilters::FilterStatus::Continue, router_->messageEnd()); @@ -203,7 +213,7 @@ class ThriftRouterTestBase { upstream_callbacks_->onUpstreamData(buffer, false); EXPECT_CALL(callbacks_, upstreamData(Ref(buffer))).WillOnce(Return(true)); - EXPECT_CALL(context_.cluster_manager_.tcp_conn_pool_, released(Ref(upstream_connection_))); + EXPECT_CALL(conn_data_, release()); upstream_callbacks_->onUpstreamData(buffer, false); } @@ -226,6 +236,8 @@ class ThriftRouterTestBase { NiceMock* host_{}; RouteConstSharedPtr route_ptr_; + Upstream::HostDescriptionConstSharedPtr host_ptr_; + std::unique_ptr router_; std::string cluster_name_{"cluster"}; @@ -234,8 +246,10 @@ class ThriftRouterTestBase { MessageType msg_type_{MessageType::Call}; int32_t seq_id_{1}; + NiceMock handle_; + NiceMock conn_data_; + Tcp::ConnectionPool::Callbacks* conn_pool_callbacks_{}; Tcp::ConnectionPool::UpstreamCallbacks* upstream_callbacks_{}; - NiceMock upstream_connection_; }; class ThriftRouterTest : public ThriftRouterTestBase, public Test { @@ -276,8 +290,8 @@ TEST_F(ThriftRouterTest, PoolRemoteConnectionFailure) { EXPECT_EQ(AppExceptionType::InternalError, app_ex->type_); EXPECT_THAT(app_ex->error_message_, ContainsRegex(".*connection failure.*")); })); - context_.cluster_manager_.tcp_conn_pool_.poolFailure( - Tcp::ConnectionPool::PoolFailureReason::RemoteConnectionFailure); + conn_pool_callbacks_->onPoolFailure( + Tcp::ConnectionPool::PoolFailureReason::RemoteConnectionFailure, host_ptr_); } TEST_F(ThriftRouterTest, PoolLocalConnectionFailure) { @@ -294,8 +308,8 @@ TEST_F(ThriftRouterTest, PoolLocalConnectionFailure) { EXPECT_EQ(AppExceptionType::InternalError, app_ex->type_); EXPECT_THAT(app_ex->error_message_, ContainsRegex(".*connection failure.*")); })); - context_.cluster_manager_.tcp_conn_pool_.poolFailure( - Tcp::ConnectionPool::PoolFailureReason::LocalConnectionFailure); + conn_pool_callbacks_->onPoolFailure( + Tcp::ConnectionPool::PoolFailureReason::LocalConnectionFailure, host_ptr_); } TEST_F(ThriftRouterTest, PoolTimeout) { @@ -312,8 +326,7 @@ TEST_F(ThriftRouterTest, PoolTimeout) { EXPECT_EQ(AppExceptionType::InternalError, app_ex->type_); EXPECT_THAT(app_ex->error_message_, ContainsRegex(".*connection failure.*")); })); - context_.cluster_manager_.tcp_conn_pool_.poolFailure( - Tcp::ConnectionPool::PoolFailureReason::Timeout); + conn_pool_callbacks_->onPoolFailure(Tcp::ConnectionPool::PoolFailureReason::Timeout, host_ptr_); } TEST_F(ThriftRouterTest, PoolOverflowFailure) { @@ -330,8 +343,7 @@ TEST_F(ThriftRouterTest, PoolOverflowFailure) { EXPECT_EQ(AppExceptionType::InternalError, app_ex->type_); EXPECT_THAT(app_ex->error_message_, ContainsRegex(".*too many connections.*")); })); - context_.cluster_manager_.tcp_conn_pool_.poolFailure( - Tcp::ConnectionPool::PoolFailureReason::Overflow); + conn_pool_callbacks_->onPoolFailure(Tcp::ConnectionPool::PoolFailureReason::Overflow, host_ptr_); } TEST_F(ThriftRouterTest, NoRoute) { @@ -428,7 +440,7 @@ TEST_F(ThriftRouterTest, TruncatedResponse) { EXPECT_CALL(callbacks_, startUpstreamResponse(TransportType::Framed, ProtocolType::Binary)); EXPECT_CALL(callbacks_, upstreamData(Ref(buffer))).WillOnce(Return(false)); - EXPECT_CALL(context_.cluster_manager_.tcp_conn_pool_, released(Ref(upstream_connection_))); + EXPECT_CALL(conn_data_, release()); EXPECT_CALL(callbacks_, resetDownstreamConnection()); upstream_callbacks_->onUpstreamData(buffer, true); @@ -450,7 +462,7 @@ TEST_F(ThriftRouterTest, UpstreamDataTriggersReset) { router_->resetUpstreamConnection(); return true; })); - EXPECT_CALL(upstream_connection_, close(Network::ConnectionCloseType::NoFlush)); + EXPECT_CALL(conn_data_.connection_, close(Network::ConnectionCloseType::NoFlush)); upstream_callbacks_->onUpstreamData(buffer, true); destroyRouter(); @@ -466,7 +478,7 @@ TEST_F(ThriftRouterTest, UnexpectedRouterDestroy) { initializeRouter(); startRequest(MessageType::Call); connectUpstream(); - EXPECT_CALL(upstream_connection_, close(Network::ConnectionCloseType::NoFlush)); + EXPECT_CALL(conn_data_.connection_, close(Network::ConnectionCloseType::NoFlush)); destroyRouter(); } diff --git a/test/integration/tcp_conn_pool_integration_test.cc b/test/integration/tcp_conn_pool_integration_test.cc index fbe93d877c9a4..c13d836e8d2e6 100644 --- a/test/integration/tcp_conn_pool_integration_test.cc +++ b/test/integration/tcp_conn_pool_integration_test.cc @@ -52,9 +52,9 @@ class TestFilter : public Network::ReadFilter { ASSERT(false); } - void onPoolReady(Tcp::ConnectionPool::ConnectionDataPtr&& conn, + void onPoolReady(Tcp::ConnectionPool::ConnectionData& conn, Upstream::HostDescriptionConstSharedPtr) override { - upstream_ = std::move(conn); + upstream_ = &conn; upstream_->addUpstreamCallbacks(*this); upstream_->connection().write(data_, false); @@ -67,7 +67,7 @@ class TestFilter : public Network::ReadFilter { Network::Connection& downstream = parent_.read_callbacks_->connection(); downstream.write(data, false); - upstream_.reset(); + upstream_->release(); } void onEvent(Network::ConnectionEvent) override {} void onAboveWriteBufferHighWatermark() override {} @@ -75,7 +75,7 @@ class TestFilter : public Network::ReadFilter { TestFilter& parent_; Buffer::OwnedImpl data_; - Tcp::ConnectionPool::ConnectionDataPtr upstream_; + Tcp::ConnectionPool::ConnectionData* upstream_; }; Upstream::ClusterManager& cluster_manager_; diff --git a/test/mocks/tcp/mocks.cc b/test/mocks/tcp/mocks.cc index 1374d415dea7f..757f7556f32b1 100644 --- a/test/mocks/tcp/mocks.cc +++ b/test/mocks/tcp/mocks.cc @@ -18,12 +18,10 @@ MockCancellable::~MockCancellable() {} MockUpstreamCallbacks::MockUpstreamCallbacks() {} MockUpstreamCallbacks::~MockUpstreamCallbacks() {} -MockConnectionData::MockConnectionData() {} -MockConnectionData::~MockConnectionData() { - if (release_callback_) { - release_callback_(); - } +MockConnectionData::MockConnectionData() { + ON_CALL(*this, connection()).WillByDefault(ReturnRef(connection_)); } +MockConnectionData::~MockConnectionData() {} MockInstance::MockInstance() { ON_CALL(*this, newConnection(_)).WillByDefault(Invoke([&](Callbacks& cb) -> Cancellable* { @@ -46,16 +44,12 @@ void MockInstance::poolFailure(PoolFailureReason reason) { cb->onPoolFailure(reason, host_); } -void MockInstance::poolReady(Network::MockClientConnection& conn) { +void MockInstance::poolReady() { Callbacks* cb = callbacks_.front(); callbacks_.pop_front(); handles_.pop_front(); - ON_CALL(*connection_data_, connection()).WillByDefault(ReturnRef(conn)); - - connection_data_->release_callback_ = [&]() -> void { released(conn); }; - - cb->onPoolReady(std::move(connection_data_), host_); + cb->onPoolReady(connection_data_, host_); } } // namespace ConnectionPool diff --git a/test/mocks/tcp/mocks.h b/test/mocks/tcp/mocks.h index 3fb969f088b2d..a9cc52fecbbb9 100644 --- a/test/mocks/tcp/mocks.h +++ b/test/mocks/tcp/mocks.h @@ -44,10 +44,9 @@ class MockConnectionData : public ConnectionData { // Tcp::ConnectionPool::ConnectionData MOCK_METHOD0(connection, Network::ClientConnection&()); MOCK_METHOD1(addUpstreamCallbacks, void(ConnectionPool::UpstreamCallbacks&)); + MOCK_METHOD0(release, void()); - // If set, invoked in ~MockConnectionData, which indicates that the connection pool - // caller has relased a connection. - std::function release_callback_; + NiceMock connection_; }; class MockInstance : public Instance { @@ -62,18 +61,14 @@ class MockInstance : public Instance { MockCancellable* newConnectionImpl(Callbacks& cb); void poolFailure(PoolFailureReason reason); - void poolReady(Network::MockClientConnection& conn); - - // Invoked when connection_data_, having been assigned via poolReady is released. - MOCK_METHOD1(released, void(Network::MockClientConnection&)); + void poolReady(); std::list> handles_; std::list callbacks_; std::shared_ptr> host_{ new NiceMock()}; - std::unique_ptr> connection_data_{ - new NiceMock()}; + NiceMock connection_data_; }; } // namespace ConnectionPool