diff --git a/include/envoy/upstream/BUILD b/include/envoy/upstream/BUILD index be14e1d398c74..84318a63816fd 100644 --- a/include/envoy/upstream/BUILD +++ b/include/envoy/upstream/BUILD @@ -128,6 +128,7 @@ envoy_cc_library( deps = [ ":load_balancer_interface", ":upstream_interface", + "//include/envoy/http:async_client_interface", ], ) diff --git a/include/envoy/upstream/thread_local_cluster.h b/include/envoy/upstream/thread_local_cluster.h index 6c08c194103b1..16c59ebf7f0ef 100644 --- a/include/envoy/upstream/thread_local_cluster.h +++ b/include/envoy/upstream/thread_local_cluster.h @@ -8,6 +8,54 @@ namespace Envoy { namespace Upstream { +// HttpPoolData returns information about a given pool as well as a function +// to create streams on that pool. +class HttpPoolData { +public: + using OnNewStreamFn = std::function; + + HttpPoolData(OnNewStreamFn on_new_stream, Http::ConnectionPool::Instance* pool) + : on_new_stream_(on_new_stream), pool_(pool) {} + + Envoy::Http::ConnectionPool::Cancellable* + newStream(Http::ResponseDecoder& response_decoder, + Envoy::Http::ConnectionPool::Callbacks& callbacks) { + on_new_stream_(); + return pool_->newStream(response_decoder, callbacks); + } + + Upstream::HostDescriptionConstSharedPtr host() const { return pool_->host(); } + +private: + friend class HttpPoolDataPeer; + + OnNewStreamFn on_new_stream_; + Http::ConnectionPool::Instance* pool_; +}; + +// Tcp pool returns information about a given pool, as well as a function to +// create connections on that pool. +class TcpPoolData { +public: + using OnNewConnectionFn = std::function; + + TcpPoolData(OnNewConnectionFn on_new_connection, Tcp::ConnectionPool::Instance* pool) + : on_new_connection_(on_new_connection), pool_(pool) {} + + Envoy::Tcp::ConnectionPool::Cancellable* + newConnection(Envoy::Tcp::ConnectionPool::Callbacks& callbacks) { + on_new_connection_(); + return pool_->newConnection(callbacks); + } + + Upstream::HostDescriptionConstSharedPtr host() const { return pool_->host(); } + +private: + friend class TcpPoolDataPeer; + OnNewConnectionFn on_new_connection_; + Tcp::ConnectionPool::Instance* pool_; +}; + /** * A thread local cluster instance that can be used for direct load balancing and host set * interactions. In general, an instance of ThreadLocalCluster can only be safely used in the @@ -42,10 +90,11 @@ class ThreadLocalCluster { * @param priority the connection pool priority. * @param downstream_protocol the downstream protocol (if one exists) to use in protocol * selection. - * @param context the optional load balancer context. - * @return the connection pool or nullptr if there is no host available in the cluster. + * @param context the optional load balancer context. Must continue to be + * valid until newConnection is called on the pool (if it is to be called). + * @return the connection pool data or nullopt if there is no host available in the cluster. */ - virtual Http::ConnectionPool::Instance* + virtual absl::optional httpConnPool(ResourcePriority priority, absl::optional downstream_protocol, LoadBalancerContext* context) PURE; @@ -55,11 +104,12 @@ class ThreadLocalCluster { * is used is the one defined on the cluster when it was created. * * @param priority the connection pool priority. - * @param context the optional load balancer context. - * @return the connection pool or nullptr if there is no host available in the cluster. + * @param context the optional load balancer context. Must continue to be + * valid until newConnection is called on the pool (if it is to be called). + * @return the connection pool data or nullopt if there is no host available in the cluster. */ - virtual Tcp::ConnectionPool::Instance* tcpConnPool(ResourcePriority priority, - LoadBalancerContext* context) PURE; + virtual absl::optional tcpConnPool(ResourcePriority priority, + LoadBalancerContext* context) PURE; /** * Allocate a load balanced TCP connection for a cluster. The created connection is already diff --git a/source/common/tcp_proxy/upstream.cc b/source/common/tcp_proxy/upstream.cc index 22b939a30bc47..25665d290e10b 100644 --- a/source/common/tcp_proxy/upstream.cc +++ b/source/common/tcp_proxy/upstream.cc @@ -150,7 +150,7 @@ TcpConnPool::TcpConnPool(Upstream::ThreadLocalCluster& thread_local_cluster, Upstream::LoadBalancerContext* context, Tcp::ConnectionPool::UpstreamCallbacks& upstream_callbacks) : upstream_callbacks_(upstream_callbacks) { - conn_pool_ = thread_local_cluster.tcpConnPool(Upstream::ResourcePriority::Default, context); + conn_pool_data_ = thread_local_cluster.tcpConnPool(Upstream::ResourcePriority::Default, context); } TcpConnPool::~TcpConnPool() { @@ -165,7 +165,7 @@ void TcpConnPool::newStream(GenericConnectionPoolCallbacks& callbacks) { // valid connection handle. If newConnection fails inline it may result in attempting to // select a new host, and a recursive call to initializeUpstreamConnection. In this case the // first call to newConnection will return null and the inner call will persist. - Tcp::ConnectionPool::Cancellable* handle = conn_pool_->newConnection(*this); + Tcp::ConnectionPool::Cancellable* handle = conn_pool_data_.value().newConnection(*this); if (handle) { ASSERT(upstream_handle_ == nullptr); upstream_handle_ = handle; @@ -195,8 +195,8 @@ HttpConnPool::HttpConnPool(Upstream::ThreadLocalCluster& thread_local_cluster, Tcp::ConnectionPool::UpstreamCallbacks& upstream_callbacks, Http::CodecType type) : config_(config), type_(type), upstream_callbacks_(upstream_callbacks) { - conn_pool_ = thread_local_cluster.httpConnPool(Upstream::ResourcePriority::Default, absl::nullopt, - context); + conn_pool_data_ = thread_local_cluster.httpConnPool(Upstream::ResourcePriority::Default, + absl::nullopt, context); } HttpConnPool::~HttpConnPool() { @@ -215,7 +215,7 @@ void HttpConnPool::newStream(GenericConnectionPoolCallbacks& callbacks) { upstream_ = std::make_unique(upstream_callbacks_, config_); } Tcp::ConnectionPool::Cancellable* handle = - conn_pool_->newStream(upstream_->responseDecoder(), *this); + conn_pool_data_.value().newStream(upstream_->responseDecoder(), *this); if (handle != nullptr) { upstream_handle_ = handle; } diff --git a/source/common/tcp_proxy/upstream.h b/source/common/tcp_proxy/upstream.h index 3047441a9a4fb..1f910edd67960 100644 --- a/source/common/tcp_proxy/upstream.h +++ b/source/common/tcp_proxy/upstream.h @@ -5,6 +5,7 @@ #include "envoy/tcp/conn_pool.h" #include "envoy/tcp/upstream.h" #include "envoy/upstream/load_balancer.h" +#include "envoy/upstream/thread_local_cluster.h" #include "envoy/upstream/upstream.h" #include "common/common/dump_state_utils.h" @@ -21,7 +22,7 @@ class TcpConnPool : public GenericConnPool, public Tcp::ConnectionPool::Callback Tcp::ConnectionPool::UpstreamCallbacks& upstream_callbacks); ~TcpConnPool() override; - bool valid() const { return conn_pool_ != nullptr; } + bool valid() const { return conn_pool_data_.has_value(); } // GenericConnPool void newStream(GenericConnectionPoolCallbacks& callbacks) override; @@ -33,7 +34,7 @@ class TcpConnPool : public GenericConnPool, public Tcp::ConnectionPool::Callback Upstream::HostDescriptionConstSharedPtr host) override; private: - Tcp::ConnectionPool::Instance* conn_pool_{}; + absl::optional conn_pool_data_{}; Tcp::ConnectionPool::Cancellable* upstream_handle_{}; GenericConnectionPoolCallbacks* callbacks_{}; Tcp::ConnectionPool::UpstreamCallbacks& upstream_callbacks_; @@ -52,7 +53,7 @@ class HttpConnPool : public GenericConnPool, public Http::ConnectionPool::Callba ~HttpConnPool() override; // HTTP/3 upstreams are not supported at the moment. - bool valid() const { return conn_pool_ != nullptr && type_ <= Http::CodecType::HTTP2; } + bool valid() const { return conn_pool_data_.has_value() && type_ <= Http::CodecType::HTTP2; } // GenericConnPool void newStream(GenericConnectionPoolCallbacks& callbacks) override; @@ -97,7 +98,7 @@ class HttpConnPool : public GenericConnPool, public Http::ConnectionPool::Callba Ssl::ConnectionInfoConstSharedPtr ssl_info); const TunnelingConfig config_; Http::CodecType type_; - Http::ConnectionPool::Instance* conn_pool_{}; + absl::optional conn_pool_data_{}; Http::ConnectionPool::Cancellable* upstream_handle_{}; GenericConnectionPoolCallbacks* callbacks_{}; Tcp::ConnectionPool::UpstreamCallbacks& upstream_callbacks_; diff --git a/source/common/upstream/cluster_manager_impl.cc b/source/common/upstream/cluster_manager_impl.cc index 05e18092e9332..717c5f2b95e3d 100644 --- a/source/common/upstream/cluster_manager_impl.cc +++ b/source/common/upstream/cluster_manager_impl.cc @@ -905,41 +905,45 @@ void ClusterManagerImpl::maybePreconnect( } } -Http::ConnectionPool::Instance* +absl::optional ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::httpConnPool( ResourcePriority priority, absl::optional protocol, LoadBalancerContext* context) { // Select a host and create a connection pool for it if it does not already exist. - auto ret = connPool(priority, protocol, context, false); - - // Now see if another host should be preconnected. - // httpConnPool is called immediately before a call for newStream. newStream doesn't - // have the load balancer context needed to make selection decisions so preconnecting must be - // performed here in anticipation of the new stream. - // TODO(alyssawilk) refactor to have one function call and return a pair, so this invariant is - // code-enforced. - maybePreconnect(*this, parent_.cluster_manager_state_, [this, &priority, &protocol, &context]() { - return connPool(priority, protocol, context, true); - }); - return ret; + auto pool = connPool(priority, protocol, context, false); + if (pool == nullptr) { + return absl::nullopt; + } + + HttpPoolData data( + [this, priority, protocol, context]() -> void { + // Now that a new stream is being established, attempt to preconnect. + maybePreconnect(*this, parent_.cluster_manager_state_, + [this, &priority, &protocol, &context]() { + return connPool(priority, protocol, context, true); + }); + }, + pool); + return data; } -Tcp::ConnectionPool::Instance* +absl::optional ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::tcpConnPool( ResourcePriority priority, LoadBalancerContext* context) { // Select a host and create a connection pool for it if it does not already exist. - auto ret = tcpConnPool(priority, context, false); - - // tcpConnPool is called immediately before a call for newConnection. newConnection - // doesn't have the load balancer context needed to make selection decisions so preconnecting must - // be performed here in anticipation of the new connection. - // TODO(alyssawilk) refactor to have one function call and return a pair, so this invariant is - // code-enforced. - // Now see if another host should be preconnected. - maybePreconnect(*this, parent_.cluster_manager_state_, - [this, &priority, &context]() { return tcpConnPool(priority, context, true); }); - - return ret; + auto pool = tcpConnPool(priority, context, false); + if (pool == nullptr) { + return absl::nullopt; + } + + TcpPoolData data( + [this, priority, context]() -> void { + maybePreconnect(*this, parent_.cluster_manager_state_, [this, &priority, &context]() { + return tcpConnPool(priority, context, true); + }); + }, + pool); + return data; } void ClusterManagerImpl::postThreadLocalDrainConnections(const Cluster& cluster, diff --git a/source/common/upstream/cluster_manager_impl.h b/source/common/upstream/cluster_manager_impl.h index 5034a261c107b..9192e7cdcd528 100644 --- a/source/common/upstream/cluster_manager_impl.h +++ b/source/common/upstream/cluster_manager_impl.h @@ -413,11 +413,11 @@ class ClusterManagerImpl : public ClusterManager, Logger::Loggable downstream_protocol, - LoadBalancerContext* context) override; - Tcp::ConnectionPool::Instance* tcpConnPool(ResourcePriority priority, - LoadBalancerContext* context) override; + absl::optional httpConnPool(ResourcePriority priority, + absl::optional downstream_protocol, + LoadBalancerContext* context) override; + absl::optional tcpConnPool(ResourcePriority priority, + LoadBalancerContext* context) override; Host::CreateConnectionData tcpConn(LoadBalancerContext* context) override; Http::AsyncClient& httpAsyncClient() override; diff --git a/source/extensions/filters/network/dubbo_proxy/router/router_impl.cc b/source/extensions/filters/network/dubbo_proxy/router/router_impl.cc index 17e57fd49989a..6b9bc51861019 100644 --- a/source/extensions/filters/network/dubbo_proxy/router/router_impl.cc +++ b/source/extensions/filters/network/dubbo_proxy/router/router_impl.cc @@ -65,9 +65,8 @@ FilterStatus Router::onMessageDecoded(MessageMetadataSharedPtr metadata, Context return FilterStatus::StopIteration; } - Tcp::ConnectionPool::Instance* conn_pool = - cluster->tcpConnPool(Upstream::ResourcePriority::Default, this); - if (!conn_pool) { + auto conn_pool_data = cluster->tcpConnPool(Upstream::ResourcePriority::Default, this); + if (!conn_pool_data) { callbacks_->sendLocalReply( AppException( ResponseStatus::ServerError, @@ -112,8 +111,9 @@ FilterStatus Router::onMessageDecoded(MessageMetadataSharedPtr metadata, Context upstream_request_buffer_.move(ctx->originMessage(), ctx->messageSize()); } - upstream_request_ = std::make_unique( - *this, *conn_pool, metadata, callbacks_->serializationType(), callbacks_->protocolType()); + upstream_request_ = std::make_unique(*this, *conn_pool_data, metadata, + callbacks_->serializationType(), + callbacks_->protocolType()); return upstream_request_->start(); } @@ -188,11 +188,11 @@ void Router::cleanup() { } } -Router::UpstreamRequest::UpstreamRequest(Router& parent, Tcp::ConnectionPool::Instance& pool, +Router::UpstreamRequest::UpstreamRequest(Router& parent, Upstream::TcpPoolData& pool_data, MessageMetadataSharedPtr& metadata, SerializationType serialization_type, ProtocolType protocol_type) - : parent_(parent), conn_pool_(pool), metadata_(metadata), + : parent_(parent), conn_pool_data_(pool_data), metadata_(metadata), protocol_( NamedProtocolConfigFactory::getFactory(protocol_type).createProtocol(serialization_type)), request_complete_(false), response_started_(false), response_complete_(false), @@ -201,7 +201,7 @@ Router::UpstreamRequest::UpstreamRequest(Router& parent, Tcp::ConnectionPool::In Router::UpstreamRequest::~UpstreamRequest() = default; FilterStatus Router::UpstreamRequest::start() { - Tcp::ConnectionPool::Cancellable* handle = conn_pool_.newConnection(*this); + Tcp::ConnectionPool::Cancellable* handle = conn_pool_data_.newConnection(*this); if (handle) { // Pause while we wait for a connection. conn_pool_handle_ = handle; diff --git a/source/extensions/filters/network/dubbo_proxy/router/router_impl.h b/source/extensions/filters/network/dubbo_proxy/router/router_impl.h index 84a746d1f8a9a..f7683788b2626 100644 --- a/source/extensions/filters/network/dubbo_proxy/router/router_impl.h +++ b/source/extensions/filters/network/dubbo_proxy/router/router_impl.h @@ -47,7 +47,7 @@ class Router : public Tcp::ConnectionPool::UpstreamCallbacks, private: struct UpstreamRequest : public Tcp::ConnectionPool::Callbacks { - UpstreamRequest(Router& parent, Tcp::ConnectionPool::Instance& pool, + UpstreamRequest(Router& parent, Upstream::TcpPoolData& pool_data, MessageMetadataSharedPtr& metadata, SerializationType serialization_type, ProtocolType protocol_type); ~UpstreamRequest() override; @@ -69,7 +69,7 @@ class Router : public Tcp::ConnectionPool::UpstreamCallbacks, void onResetStream(ConnectionPool::PoolFailureReason reason); Router& parent_; - Tcp::ConnectionPool::Instance& conn_pool_; + Upstream::TcpPoolData conn_pool_data_; MessageMetadataSharedPtr metadata_; Tcp::ConnectionPool::Cancellable* conn_pool_handle_{}; diff --git a/source/extensions/filters/network/rocketmq_proxy/router/router_impl.cc b/source/extensions/filters/network/rocketmq_proxy/router/router_impl.cc index 8cfb40a7cdfa5..a7351aeadddfa 100644 --- a/source/extensions/filters/network/rocketmq_proxy/router/router_impl.cc +++ b/source/extensions/filters/network/rocketmq_proxy/router/router_impl.cc @@ -35,27 +35,25 @@ void RouterImpl::onBelowWriteBufferLowWatermark() { } void RouterImpl::onEvent(Network::ConnectionEvent event) { - switch (event) { + switch (event) case Network::ConnectionEvent::RemoteClose: { ENVOY_LOG(error, "Connection to upstream: {} is closed by remote peer", upstream_host_->address()->asString()); // Send local reply to downstream active_message_->onError("Connection to upstream is closed by remote peer"); break; - } - case Network::ConnectionEvent::LocalClose: { + case Network::ConnectionEvent::LocalClose: ENVOY_LOG(error, "Connection to upstream: {} has been closed", upstream_host_->address()->asString()); // Send local reply to downstream active_message_->onError("Connection to upstream has been closed"); break; - } default: // Ignore other events for now ENVOY_LOG(trace, "Ignore event type"); return; } - active_message_->onReset(); + active_message_->onReset(); } const Envoy::Router::MetadataMatchCriteria* RouterImpl::metadataMatchCriteria() { @@ -106,9 +104,8 @@ void RouterImpl::sendRequestToUpstream(ActiveMessage& active_message) { return; } - Tcp::ConnectionPool::Instance* conn_pool = - cluster->tcpConnPool(Upstream::ResourcePriority::Default, this); - if (!conn_pool) { + auto data = cluster->tcpConnPool(Upstream::ResourcePriority::Default, this); + if (!data) { ENVOY_LOG(warn, "No host available for cluster {}. Opaque: {}", cluster_name, opaque); active_message.onError("No host available"); reset(); @@ -116,7 +113,7 @@ void RouterImpl::sendRequestToUpstream(ActiveMessage& active_message) { } upstream_request_ = std::make_unique(*this); - Tcp::ConnectionPool::Cancellable* cancellable = conn_pool->newConnection(*upstream_request_); + Tcp::ConnectionPool::Cancellable* cancellable = data.value().newConnection(*upstream_request_); if (cancellable) { handle_ = cancellable; ENVOY_LOG(trace, "No connection is available for now. Create a cancellable handle. Opaque: {}", diff --git a/source/extensions/filters/network/thrift_proxy/router/router_impl.cc b/source/extensions/filters/network/thrift_proxy/router/router_impl.cc index 5712ea8dd171e..131ecb11180c2 100644 --- a/source/extensions/filters/network/thrift_proxy/router/router_impl.cc +++ b/source/extensions/filters/network/thrift_proxy/router/router_impl.cc @@ -281,9 +281,8 @@ FilterStatus Router::messageBegin(MessageMetadataSharedPtr metadata) { passthrough_supported_ = true; } - Tcp::ConnectionPool::Instance* conn_pool = - cluster->tcpConnPool(Upstream::ResourcePriority::Default, this); - if (!conn_pool) { + auto conn_pool_data = cluster->tcpConnPool(Upstream::ResourcePriority::Default, this); + if (!conn_pool_data) { stats_.no_healthy_upstream_.inc(); callbacks_->sendLocalReply( AppException(AppExceptionType::InternalError, @@ -303,7 +302,7 @@ FilterStatus Router::messageBegin(MessageMetadataSharedPtr metadata) { } upstream_request_ = - std::make_unique(*this, *conn_pool, metadata, transport, protocol); + std::make_unique(*this, *conn_pool_data, metadata, transport, protocol); return upstream_request_->start(); } @@ -432,10 +431,10 @@ void Router::convertMessageBegin(MessageMetadataSharedPtr metadata) { void Router::cleanup() { upstream_request_.reset(); } -Router::UpstreamRequest::UpstreamRequest(Router& parent, Tcp::ConnectionPool::Instance& pool, +Router::UpstreamRequest::UpstreamRequest(Router& parent, Upstream::TcpPoolData& pool_data, MessageMetadataSharedPtr& metadata, TransportType transport_type, ProtocolType protocol_type) - : parent_(parent), conn_pool_(pool), metadata_(metadata), + : parent_(parent), conn_pool_data_(pool_data), metadata_(metadata), transport_(NamedTransportConfigFactory::getFactory(transport_type).createTransport()), protocol_(NamedProtocolConfigFactory::getFactory(protocol_type).createProtocol()), request_complete_(false), response_started_(false), response_complete_(false) {} @@ -447,7 +446,7 @@ Router::UpstreamRequest::~UpstreamRequest() { } FilterStatus Router::UpstreamRequest::start() { - Tcp::ConnectionPool::Cancellable* handle = conn_pool_.newConnection(*this); + Tcp::ConnectionPool::Cancellable* handle = conn_pool_data_.newConnection(*this); if (handle) { // Pause while we wait for a connection. conn_pool_handle_ = handle; diff --git a/source/extensions/filters/network/thrift_proxy/router/router_impl.h b/source/extensions/filters/network/thrift_proxy/router/router_impl.h index c12a0a34df8d5..077ab117873e7 100644 --- a/source/extensions/filters/network/thrift_proxy/router/router_impl.h +++ b/source/extensions/filters/network/thrift_proxy/router/router_impl.h @@ -224,7 +224,7 @@ class Router : public Tcp::ConnectionPool::UpstreamCallbacks, private: struct UpstreamRequest : public Tcp::ConnectionPool::Callbacks { - UpstreamRequest(Router& parent, Tcp::ConnectionPool::Instance& pool, + UpstreamRequest(Router& parent, Upstream::TcpPoolData& pool_data, MessageMetadataSharedPtr& metadata, TransportType transport_type, ProtocolType protocol_type); ~UpstreamRequest() override; @@ -247,7 +247,7 @@ class Router : public Tcp::ConnectionPool::UpstreamCallbacks, void chargeResponseTiming(); Router& parent_; - Tcp::ConnectionPool::Instance& conn_pool_; + Upstream::TcpPoolData& conn_pool_data_; MessageMetadataSharedPtr metadata_; Tcp::ConnectionPool::Cancellable* conn_pool_handle_{}; diff --git a/source/extensions/upstreams/http/http/upstream_request.cc b/source/extensions/upstreams/http/http/upstream_request.cc index f41b081f8b188..ee411248f853c 100644 --- a/source/extensions/upstreams/http/http/upstream_request.cc +++ b/source/extensions/upstreams/http/http/upstream_request.cc @@ -32,7 +32,7 @@ void HttpConnPool::newStream(GenericConnectionPoolCallbacks* callbacks) { // might get deleted inline as well. Only write the returned handle out if it is not nullptr to // deal with this case. Envoy::Http::ConnectionPool::Cancellable* handle = - conn_pool_->newStream(callbacks->upstreamToDownstream(), *this); + pool_data_.value().newStream(callbacks->upstreamToDownstream(), *this); if (handle) { conn_pool_stream_handle_ = handle; } diff --git a/source/extensions/upstreams/http/http/upstream_request.h b/source/extensions/upstreams/http/http/upstream_request.h index 94f5a0a0fda6f..5855e38ed8a3f 100644 --- a/source/extensions/upstreams/http/http/upstream_request.h +++ b/source/extensions/upstreams/http/http/upstream_request.h @@ -26,7 +26,7 @@ class HttpConnPool : public Router::GenericConnPool, public Envoy::Http::Connect absl::optional downstream_protocol, Upstream::LoadBalancerContext* ctx) { ASSERT(!is_connect); - conn_pool_ = + pool_data_ = thread_local_cluster.httpConnPool(route_entry.priority(), downstream_protocol, ctx); } ~HttpConnPool() override { @@ -42,13 +42,15 @@ class HttpConnPool : public Router::GenericConnPool, public Envoy::Http::Connect void onPoolReady(Envoy::Http::RequestEncoder& callbacks_encoder, Upstream::HostDescriptionConstSharedPtr host, const StreamInfo::StreamInfo& info, absl::optional protocol) override; - Upstream::HostDescriptionConstSharedPtr host() const override { return conn_pool_->host(); } + Upstream::HostDescriptionConstSharedPtr host() const override { + return pool_data_.value().host(); + } - bool valid() { return conn_pool_ != nullptr; } + bool valid() { return pool_data_.has_value(); } protected: // Points to the actual connection pool to create streams from. - Envoy::Http::ConnectionPool::Instance* conn_pool_{}; + absl::optional pool_data_{}; Envoy::Http::ConnectionPool::Cancellable* conn_pool_stream_handle_{}; Router::GenericConnectionPoolCallbacks* callbacks_{}; }; diff --git a/source/extensions/upstreams/http/tcp/upstream_request.h b/source/extensions/upstreams/http/tcp/upstream_request.h index 3a002f4b7a128..091218e4ef8b4 100644 --- a/source/extensions/upstreams/http/tcp/upstream_request.h +++ b/source/extensions/upstreams/http/tcp/upstream_request.h @@ -26,11 +26,11 @@ class TcpConnPool : public Router::GenericConnPool, public Envoy::Tcp::Connectio const Router::RouteEntry& route_entry, absl::optional, Upstream::LoadBalancerContext* ctx) { ASSERT(is_connect); - conn_pool_ = thread_local_cluster.tcpConnPool(route_entry.priority(), ctx); + conn_pool_data_ = thread_local_cluster.tcpConnPool(route_entry.priority(), ctx); } void newStream(Router::GenericConnectionPoolCallbacks* callbacks) override { callbacks_ = callbacks; - upstream_handle_ = conn_pool_->newConnection(*this); + upstream_handle_ = conn_pool_data_.value().newConnection(*this); } bool cancelAnyPendingStream() override { @@ -41,9 +41,11 @@ class TcpConnPool : public Router::GenericConnPool, public Envoy::Tcp::Connectio } return false; } - Upstream::HostDescriptionConstSharedPtr host() const override { return conn_pool_->host(); } + Upstream::HostDescriptionConstSharedPtr host() const override { + return conn_pool_data_.value().host(); + } - bool valid() { return conn_pool_ != nullptr; } + bool valid() { return conn_pool_data_.has_value(); } // Tcp::ConnectionPool::Callbacks void onPoolFailure(ConnectionPool::PoolFailureReason reason, @@ -56,7 +58,7 @@ class TcpConnPool : public Router::GenericConnPool, public Envoy::Tcp::Connectio Upstream::HostDescriptionConstSharedPtr host) override; private: - Envoy::Tcp::ConnectionPool::Instance* conn_pool_; + absl::optional conn_pool_data_; Envoy::Tcp::ConnectionPool::Cancellable* upstream_handle_{}; Router::GenericConnectionPoolCallbacks* callbacks_{}; }; diff --git a/test/common/grpc/grpc_client_integration_test_harness.h b/test/common/grpc/grpc_client_integration_test_harness.h index 3145e6b89b84a..7bb886864082a 100644 --- a/test/common/grpc/grpc_client_integration_test_harness.h +++ b/test/common/grpc/grpc_client_integration_test_harness.h @@ -305,7 +305,7 @@ class GrpcClientIntegrationTest : public GrpcClientIntegrationParamTest { Upstream::ResourcePriority::Default, nullptr, nullptr, state_); EXPECT_CALL(cm_.thread_local_cluster_, httpConnPool(_, _, _)) - .WillRepeatedly(Return(http_conn_pool_.get())); + .WillRepeatedly(Return(Upstream::HttpPoolData([]() {}, http_conn_pool_.get()))); http_async_client_ = std::make_unique( cm_.thread_local_cluster_.cluster_.info_, *stats_store_, *dispatcher_, local_info_, cm_, runtime_, random_, std::move(shadow_writer_ptr_), http_context_, router_context_); diff --git a/test/common/http/async_client_impl_test.cc b/test/common/http/async_client_impl_test.cc index cbff548d733a8..cf7a3a027a70e 100644 --- a/test/common/http/async_client_impl_test.cc +++ b/test/common/http/async_client_impl_test.cc @@ -311,12 +311,11 @@ TEST_F(AsyncClientImplTest, BasicHashPolicy) { })); EXPECT_CALL(cm_.thread_local_cluster_, httpConnPool(_, _, _)) .WillOnce( - Invoke([&](Upstream::ResourcePriority, auto, - Upstream::LoadBalancerContext* context) -> Http::ConnectionPool::Instance* { + Invoke([&](Upstream::ResourcePriority, auto, Upstream::LoadBalancerContext* context) { // this is the hash of :path header value "/" // the hash stability across releases is expected, so test the hash value directly here. EXPECT_EQ(16761507700594825962UL, context->computeHashKey().value()); - return &cm_.thread_local_cluster_.conn_pool_; + return Upstream::HttpPoolData([]() {}, &cm_.thread_local_cluster_.conn_pool_); })); TestRequestHeaderMapImpl copy(message_->headers()); @@ -356,12 +355,11 @@ TEST_F(AsyncClientImplTest, WithoutMetadata) { })); EXPECT_CALL(cm_.thread_local_cluster_, httpConnPool(_, _, _)) - .WillOnce( - Invoke([&](Upstream::ResourcePriority, absl::optional, - Upstream::LoadBalancerContext* context) -> Http::ConnectionPool::Instance* { - EXPECT_EQ(context->metadataMatchCriteria(), nullptr); - return &cm_.thread_local_cluster_.conn_pool_; - })); + .WillOnce(Invoke([&](Upstream::ResourcePriority, absl::optional, + Upstream::LoadBalancerContext* context) { + EXPECT_EQ(context->metadataMatchCriteria(), nullptr); + return Upstream::HttpPoolData([]() {}, &cm_.thread_local_cluster_.conn_pool_); + })); TestRequestHeaderMapImpl copy(message_->headers()); copy.addCopy("x-envoy-internal", "true"); @@ -400,14 +398,13 @@ TEST_F(AsyncClientImplTest, WithMetadata) { })); EXPECT_CALL(cm_.thread_local_cluster_, httpConnPool(_, _, _)) - .WillOnce( - Invoke([&](Upstream::ResourcePriority, absl::optional, - Upstream::LoadBalancerContext* context) -> Http::ConnectionPool::Instance* { - EXPECT_NE(context->metadataMatchCriteria(), nullptr); - EXPECT_EQ(context->metadataMatchCriteria()->metadataMatchCriteria().at(0)->name(), - "fake_test_key"); - return &cm_.thread_local_cluster_.conn_pool_; - })); + .WillOnce(Invoke([&](Upstream::ResourcePriority, absl::optional, + Upstream::LoadBalancerContext* context) { + EXPECT_NE(context->metadataMatchCriteria(), nullptr); + EXPECT_EQ(context->metadataMatchCriteria()->metadataMatchCriteria().at(0)->name(), + "fake_test_key"); + return Upstream::HttpPoolData([]() {}, &cm_.thread_local_cluster_.conn_pool_); + })); TestRequestHeaderMapImpl copy(message_->headers()); copy.addCopy("x-envoy-internal", "true"); diff --git a/test/common/network/filter_manager_impl_test.cc b/test/common/network/filter_manager_impl_test.cc index 34c7e407d2030..f5e931a5b955b 100644 --- a/test/common/network/filter_manager_impl_test.cc +++ b/test/common/network/filter_manager_impl_test.cc @@ -416,7 +416,7 @@ stat_prefix: name EXPECT_EQ(manager.initializeReadFilters(), true); EXPECT_CALL(factory_context.cluster_manager_.thread_local_cluster_, tcpConnPool(_, _)) - .WillOnce(Return(&conn_pool)); + .WillOnce(Return(Upstream::TcpPoolData([]() {}, &conn_pool))); request_callbacks->complete(Extensions::Filters::Common::RateLimit::LimitStatus::OK, nullptr, nullptr, nullptr, "", nullptr); diff --git a/test/common/router/router_test.cc b/test/common/router/router_test.cc index dfac7ba12c36b..5250462968364 100644 --- a/test/common/router/router_test.cc +++ b/test/common/router/router_test.cc @@ -343,12 +343,11 @@ TEST_F(RouterTest, HashPolicy) { EXPECT_CALL(callbacks_.route_->route_entry_.hash_policy_, generateHash(_, _, _, _)) .WillOnce(Return(absl::optional(10))); EXPECT_CALL(cm_.thread_local_cluster_, httpConnPool(_, _, _)) - .WillOnce( - Invoke([&](Upstream::ResourcePriority, absl::optional, - Upstream::LoadBalancerContext* context) -> Http::ConnectionPool::Instance* { - EXPECT_EQ(10UL, context->computeHashKey().value()); - return &cm_.thread_local_cluster_.conn_pool_; - })); + .WillOnce(Invoke([&](Upstream::ResourcePriority, absl::optional, + Upstream::LoadBalancerContext* context) { + EXPECT_EQ(10UL, context->computeHashKey().value()); + return Upstream::HttpPoolData([]() {}, &cm_.thread_local_cluster_.conn_pool_); + })); EXPECT_CALL(cm_.thread_local_cluster_.conn_pool_, newStream(_, _)) .WillOnce(Return(&cancellable_)); expectResponseTimerCreate(); @@ -371,12 +370,11 @@ TEST_F(RouterTest, HashPolicyNoHash) { EXPECT_CALL(callbacks_.route_->route_entry_.hash_policy_, generateHash(_, _, _, _)) .WillOnce(Return(absl::optional())); EXPECT_CALL(cm_.thread_local_cluster_, httpConnPool(_, _, &router_)) - .WillOnce( - Invoke([&](Upstream::ResourcePriority, absl::optional, - Upstream::LoadBalancerContext* context) -> Http::ConnectionPool::Instance* { - EXPECT_FALSE(context->computeHashKey()); - return &cm_.thread_local_cluster_.conn_pool_; - })); + .WillOnce(Invoke([&](Upstream::ResourcePriority, absl::optional, + Upstream::LoadBalancerContext* context) { + EXPECT_FALSE(context->computeHashKey()); + return Upstream::HttpPoolData([]() {}, &cm_.thread_local_cluster_.conn_pool_); + })); EXPECT_CALL(cm_.thread_local_cluster_.conn_pool_, newStream(_, _)) .WillOnce(Return(&cancellable_)); expectResponseTimerCreate(); @@ -415,12 +413,11 @@ TEST_F(RouterTest, AddCookie) { })); EXPECT_CALL(cm_.thread_local_cluster_, httpConnPool(_, _, _)) - .WillOnce( - Invoke([&](Upstream::ResourcePriority, absl::optional, - Upstream::LoadBalancerContext* context) -> Http::ConnectionPool::Instance* { - EXPECT_EQ(10UL, context->computeHashKey().value()); - return &cm_.thread_local_cluster_.conn_pool_; - })); + .WillOnce(Invoke([&](Upstream::ResourcePriority, absl::optional, + Upstream::LoadBalancerContext* context) { + EXPECT_EQ(10UL, context->computeHashKey().value()); + return Upstream::HttpPoolData([]() {}, &cm_.thread_local_cluster_.conn_pool_); + })); std::string cookie_value; EXPECT_CALL(callbacks_.route_->route_entry_.hash_policy_, generateHash(_, _, _, _)) @@ -468,12 +465,11 @@ TEST_F(RouterTest, AddCookieNoDuplicate) { })); EXPECT_CALL(cm_.thread_local_cluster_, httpConnPool(_, _, _)) - .WillOnce( - Invoke([&](Upstream::ResourcePriority, absl::optional, - Upstream::LoadBalancerContext* context) -> Http::ConnectionPool::Instance* { - EXPECT_EQ(10UL, context->computeHashKey().value()); - return &cm_.thread_local_cluster_.conn_pool_; - })); + .WillOnce(Invoke([&](Upstream::ResourcePriority, absl::optional, + Upstream::LoadBalancerContext* context) { + EXPECT_EQ(10UL, context->computeHashKey().value()); + return Upstream::HttpPoolData([]() {}, &cm_.thread_local_cluster_.conn_pool_); + })); EXPECT_CALL(callbacks_.route_->route_entry_.hash_policy_, generateHash(_, _, _, _)) .WillOnce(Invoke([&](const Network::Address::Instance*, const Http::HeaderMap&, @@ -520,12 +516,11 @@ TEST_F(RouterTest, AddMultipleCookies) { })); EXPECT_CALL(cm_.thread_local_cluster_, httpConnPool(_, _, _)) - .WillOnce( - Invoke([&](Upstream::ResourcePriority, absl::optional, - Upstream::LoadBalancerContext* context) -> Http::ConnectionPool::Instance* { - EXPECT_EQ(10UL, context->computeHashKey().value()); - return &cm_.thread_local_cluster_.conn_pool_; - })); + .WillOnce(Invoke([&](Upstream::ResourcePriority, absl::optional, + Upstream::LoadBalancerContext* context) { + EXPECT_EQ(10UL, context->computeHashKey().value()); + return Upstream::HttpPoolData([]() {}, &cm_.thread_local_cluster_.conn_pool_); + })); std::string choco_c, foo_c; EXPECT_CALL(callbacks_.route_->route_entry_.hash_policy_, generateHash(_, _, _, _)) @@ -568,13 +563,12 @@ TEST_F(RouterTest, MetadataMatchCriteria) { ON_CALL(callbacks_.route_->route_entry_, metadataMatchCriteria()) .WillByDefault(Return(&callbacks_.route_->route_entry_.metadata_matches_criteria_)); EXPECT_CALL(cm_.thread_local_cluster_, httpConnPool(_, _, _)) - .WillOnce( - Invoke([&](Upstream::ResourcePriority, absl::optional, - Upstream::LoadBalancerContext* context) -> Http::ConnectionPool::Instance* { - EXPECT_EQ(context->metadataMatchCriteria(), - &callbacks_.route_->route_entry_.metadata_matches_criteria_); - return &cm_.thread_local_cluster_.conn_pool_; - })); + .WillOnce(Invoke([&](Upstream::ResourcePriority, absl::optional, + Upstream::LoadBalancerContext* context) { + EXPECT_EQ(context->metadataMatchCriteria(), + &callbacks_.route_->route_entry_.metadata_matches_criteria_); + return Upstream::HttpPoolData([]() {}, &cm_.thread_local_cluster_.conn_pool_); + })); EXPECT_CALL(cm_.thread_local_cluster_.conn_pool_, newStream(_, _)) .WillOnce(Return(&cancellable_)); expectResponseTimerCreate(); @@ -599,12 +593,11 @@ TEST_F(RouterTest, MetadataMatchCriteriaFromRequestNoRouteEntryMatch) { TEST_F(RouterTest, NoMetadataMatchCriteria) { ON_CALL(callbacks_.route_->route_entry_, metadataMatchCriteria()).WillByDefault(Return(nullptr)); EXPECT_CALL(cm_.thread_local_cluster_, httpConnPool(_, _, _)) - .WillOnce( - Invoke([&](Upstream::ResourcePriority, absl::optional, - Upstream::LoadBalancerContext* context) -> Http::ConnectionPool::Instance* { - EXPECT_EQ(context->metadataMatchCriteria(), nullptr); - return &cm_.thread_local_cluster_.conn_pool_; - })); + .WillOnce(Invoke([&](Upstream::ResourcePriority, absl::optional, + Upstream::LoadBalancerContext* context) { + EXPECT_EQ(context->metadataMatchCriteria(), nullptr); + return Upstream::HttpPoolData([]() {}, &cm_.thread_local_cluster_.conn_pool_); + })); EXPECT_CALL(cm_.thread_local_cluster_.conn_pool_, newStream(_, _)) .WillOnce(Return(&cancellable_)); expectResponseTimerCreate(); @@ -636,7 +629,7 @@ TEST_F(RouterTest, CancelBeforeBoundToPool) { } TEST_F(RouterTest, NoHost) { - EXPECT_CALL(cm_.thread_local_cluster_, httpConnPool(_, _, _)).WillOnce(Return(nullptr)); + EXPECT_CALL(cm_.thread_local_cluster_, httpConnPool(_, _, _)).WillOnce(Return(absl::nullopt)); Http::TestResponseHeaderMapImpl response_headers{ {":status", "503"}, {"content-length", "19"}, {"content-type", "text/plain"}}; @@ -3143,7 +3136,7 @@ TEST_F(RouterTest, RetryNoneHealthy) { putResult(Upstream::Outlier::Result::LocalOriginConnectFailed, _)); encoder1.stream_.resetStream(Http::StreamResetReason::LocalReset); - EXPECT_CALL(cm_.thread_local_cluster_, httpConnPool(_, _, _)).WillOnce(Return(nullptr)); + EXPECT_CALL(cm_.thread_local_cluster_, httpConnPool(_, _, _)).WillOnce(Return(absl::nullopt)); Http::TestResponseHeaderMapImpl response_headers{ {":status", "503"}, {"content-length", "19"}, {"content-type", "text/plain"}}; EXPECT_CALL(callbacks_, encodeHeaders_(HeaderMapEqualRef(&response_headers), false)); @@ -5775,18 +5768,17 @@ TEST_F(RouterTest, ApplicationProtocols) { StreamInfo::FilterState::StateType::ReadOnly, StreamInfo::FilterState::LifeSpan::FilterChain); EXPECT_CALL(cm_.thread_local_cluster_, httpConnPool(_, _, _)) - .WillOnce( - Invoke([&](Upstream::ResourcePriority, absl::optional, - Upstream::LoadBalancerContext* context) -> Http::ConnectionPool::Instance* { - Network::TransportSocketOptionsSharedPtr transport_socket_options = - context->upstreamTransportSocketOptions(); - EXPECT_NE(transport_socket_options, nullptr); - EXPECT_FALSE(transport_socket_options->applicationProtocolListOverride().empty()); - EXPECT_EQ(transport_socket_options->applicationProtocolListOverride().size(), 2); - EXPECT_EQ(transport_socket_options->applicationProtocolListOverride()[0], "foo"); - EXPECT_EQ(transport_socket_options->applicationProtocolListOverride()[1], "bar"); - return &cm_.thread_local_cluster_.conn_pool_; - })); + .WillOnce(Invoke([&](Upstream::ResourcePriority, absl::optional, + Upstream::LoadBalancerContext* context) { + Network::TransportSocketOptionsSharedPtr transport_socket_options = + context->upstreamTransportSocketOptions(); + EXPECT_NE(transport_socket_options, nullptr); + EXPECT_FALSE(transport_socket_options->applicationProtocolListOverride().empty()); + EXPECT_EQ(transport_socket_options->applicationProtocolListOverride().size(), 2); + EXPECT_EQ(transport_socket_options->applicationProtocolListOverride()[0], "foo"); + EXPECT_EQ(transport_socket_options->applicationProtocolListOverride()[1], "bar"); + return Upstream::HttpPoolData([]() {}, &cm_.thread_local_cluster_.conn_pool_); + })); EXPECT_CALL(cm_.thread_local_cluster_.conn_pool_, newStream(_, _)) .WillOnce(Return(&cancellable_)); diff --git a/test/common/router/router_test_base.cc b/test/common/router/router_test_base.cc index 77e4fa378d6cd..c3be7f152d369 100644 --- a/test/common/router/router_test_base.cc +++ b/test/common/router/router_test_base.cc @@ -98,31 +98,30 @@ void RouterTestBase::verifyMetadataMatchCriteriaFromRequest(bool route_entry_has } EXPECT_CALL(cm_.thread_local_cluster_, httpConnPool(_, _, _)) - .WillOnce( - Invoke([&](Upstream::ResourcePriority, absl::optional, - Upstream::LoadBalancerContext* context) -> Http::ConnectionPool::Instance* { - auto match = context->metadataMatchCriteria()->metadataMatchCriteria(); - EXPECT_EQ(match.size(), 2); - auto it = match.begin(); - - // Note: metadataMatchCriteria() keeps its entries sorted, so the order for checks - // below matters. - - // `stage` was only set by the request, not by the route entry. - EXPECT_EQ((*it)->name(), "stage"); - EXPECT_EQ((*it)->value().value().string_value(), "devel"); - it++; - - // `version` should be what came from the request, overriding the route entry. - EXPECT_EQ((*it)->name(), "version"); - EXPECT_EQ((*it)->value().value().string_value(), "v3.1"); - - // When metadataMatchCriteria() is computed from dynamic metadata, the result should - // be cached. - EXPECT_EQ(context->metadataMatchCriteria(), context->metadataMatchCriteria()); - - return &cm_.thread_local_cluster_.conn_pool_; - })); + .WillOnce(Invoke([&](Upstream::ResourcePriority, absl::optional, + Upstream::LoadBalancerContext* context) { + auto match = context->metadataMatchCriteria()->metadataMatchCriteria(); + EXPECT_EQ(match.size(), 2); + auto it = match.begin(); + + // Note: metadataMatchCriteria() keeps its entries sorted, so the order for checks + // below matters. + + // `stage` was only set by the request, not by the route entry. + EXPECT_EQ((*it)->name(), "stage"); + EXPECT_EQ((*it)->value().value().string_value(), "devel"); + it++; + + // `version` should be what came from the request, overriding the route entry. + EXPECT_EQ((*it)->name(), "version"); + EXPECT_EQ((*it)->value().value().string_value(), "v3.1"); + + // When metadataMatchCriteria() is computed from dynamic metadata, the result should + // be cached. + EXPECT_EQ(context->metadataMatchCriteria(), context->metadataMatchCriteria()); + + return Upstream::HttpPoolData([]() {}, &cm_.thread_local_cluster_.conn_pool_); + })); EXPECT_CALL(cm_.thread_local_cluster_.conn_pool_, newStream(_, _)) .WillOnce(Return(&cancellable_)); expectResponseTimerCreate(); diff --git a/test/common/tcp_proxy/config_test.cc b/test/common/tcp_proxy/config_test.cc index 98f29fdbfb867..9d35475973ff6 100644 --- a/test/common/tcp_proxy/config_test.cc +++ b/test/common/tcp_proxy/config_test.cc @@ -847,7 +847,7 @@ TEST_F(TcpProxyRoutingTest, DEPRECATED_FEATURE_TEST(NonRoutableConnection)) { // Expect filter to try to open a connection to the fallback cluster. EXPECT_CALL(factory_context_.cluster_manager_.thread_local_cluster_, tcpConnPool(_, _)) - .WillOnce(Return(nullptr)); + .WillOnce(Return(absl::nullopt)); filter_->onNewConnection(); @@ -870,7 +870,7 @@ TEST_F(TcpProxyRoutingTest, DEPRECATED_FEATURE_TEST(RoutableConnection)) { // Expect filter to try to open a connection to specified cluster. EXPECT_CALL(factory_context_.cluster_manager_.thread_local_cluster_, tcpConnPool(_, _)) - .WillOnce(Return(nullptr)); + .WillOnce(Return(absl::nullopt)); filter_->onNewConnection(); @@ -891,7 +891,7 @@ TEST_F(TcpProxyRoutingTest, DEPRECATED_FEATURE_TEST(UseClusterFromPerConnectionC // Expect filter to try to open a connection to specified cluster. EXPECT_CALL(factory_context_.cluster_manager_.thread_local_cluster_, tcpConnPool(_, _)) - .WillOnce(Return(nullptr)); + .WillOnce(Return(absl::nullopt)); filter_->onNewConnection(); } @@ -910,16 +910,14 @@ TEST_F(TcpProxyRoutingTest, DEPRECATED_FEATURE_TEST(UpstreamServerName)) { // Expect filter to try to open a connection to a cluster with the transport socket options with // override-server-name EXPECT_CALL(factory_context_.cluster_manager_.thread_local_cluster_, tcpConnPool(_, _)) - .WillOnce( - Invoke([](Upstream::ResourcePriority, - Upstream::LoadBalancerContext* context) -> Tcp::ConnectionPool::Instance* { - Network::TransportSocketOptionsSharedPtr transport_socket_options = - context->upstreamTransportSocketOptions(); - EXPECT_NE(transport_socket_options, nullptr); - EXPECT_TRUE(transport_socket_options->serverNameOverride().has_value()); - EXPECT_EQ(transport_socket_options->serverNameOverride().value(), "www.example.com"); - return nullptr; - })); + .WillOnce(Invoke([](Upstream::ResourcePriority, Upstream::LoadBalancerContext* context) { + Network::TransportSocketOptionsSharedPtr transport_socket_options = + context->upstreamTransportSocketOptions(); + EXPECT_NE(transport_socket_options, nullptr); + EXPECT_TRUE(transport_socket_options->serverNameOverride().has_value()); + EXPECT_EQ(transport_socket_options->serverNameOverride().value(), "www.example.com"); + return absl::nullopt; + })); // Port 9999 is within the specified destination port range. connection_.stream_info_.downstream_address_provider_->setLocalAddress( @@ -942,18 +940,16 @@ TEST_F(TcpProxyRoutingTest, DEPRECATED_FEATURE_TEST(ApplicationProtocols)) { // Expect filter to try to open a connection to a cluster with the transport socket options with // override-application-protocol EXPECT_CALL(factory_context_.cluster_manager_.thread_local_cluster_, tcpConnPool(_, _)) - .WillOnce( - Invoke([](Upstream::ResourcePriority, - Upstream::LoadBalancerContext* context) -> Tcp::ConnectionPool::Instance* { - Network::TransportSocketOptionsSharedPtr transport_socket_options = - context->upstreamTransportSocketOptions(); - EXPECT_NE(transport_socket_options, nullptr); - EXPECT_FALSE(transport_socket_options->applicationProtocolListOverride().empty()); - EXPECT_EQ(transport_socket_options->applicationProtocolListOverride().size(), 2); - EXPECT_EQ(transport_socket_options->applicationProtocolListOverride()[0], "foo"); - EXPECT_EQ(transport_socket_options->applicationProtocolListOverride()[1], "bar"); - return nullptr; - })); + .WillOnce(Invoke([](Upstream::ResourcePriority, Upstream::LoadBalancerContext* context) { + Network::TransportSocketOptionsSharedPtr transport_socket_options = + context->upstreamTransportSocketOptions(); + EXPECT_NE(transport_socket_options, nullptr); + EXPECT_FALSE(transport_socket_options->applicationProtocolListOverride().empty()); + EXPECT_EQ(transport_socket_options->applicationProtocolListOverride().size(), 2); + EXPECT_EQ(transport_socket_options->applicationProtocolListOverride()[0], "foo"); + EXPECT_EQ(transport_socket_options->applicationProtocolListOverride()[1], "bar"); + return absl::nullopt; + })); // Port 9999 is within the specified destination port range. connection_.stream_info_.downstream_address_provider_->setLocalAddress( @@ -986,7 +982,7 @@ TEST_F(TcpProxyNonDeprecatedConfigRoutingTest, ClusterNameSet) { // Expect filter to try to open a connection to specified cluster. EXPECT_CALL(factory_context_.cluster_manager_.thread_local_cluster_, tcpConnPool(_, _)) - .WillOnce(Return(nullptr)); + .WillOnce(Return(absl::nullopt)); absl::optional cluster_info; EXPECT_CALL(connection_.stream_info_, setUpstreamClusterInfo(_)) .WillOnce( @@ -1036,12 +1032,10 @@ TEST_F(TcpProxyHashingTest, HashWithSourceIp) { setup(); initializeFilter(); EXPECT_CALL(factory_context_.cluster_manager_.thread_local_cluster_, tcpConnPool(_, _)) - .WillOnce( - Invoke([](Upstream::ResourcePriority, - Upstream::LoadBalancerContext* context) -> Tcp::ConnectionPool::Instance* { - EXPECT_TRUE(context->computeHashKey().has_value()); - return nullptr; - })); + .WillOnce(Invoke([](Upstream::ResourcePriority, Upstream::LoadBalancerContext* context) { + EXPECT_TRUE(context->computeHashKey().has_value()); + return absl::nullopt; + })); connection_.stream_info_.downstream_address_provider_->setRemoteAddress( std::make_shared("1.2.3.4", 1111)); diff --git a/test/common/tcp_proxy/tcp_proxy_test.cc b/test/common/tcp_proxy/tcp_proxy_test.cc index 43447d35a4119..5827e1393f096 100644 --- a/test/common/tcp_proxy/tcp_proxy_test.cc +++ b/test/common/tcp_proxy/tcp_proxy_test.cc @@ -86,7 +86,7 @@ class TcpProxyTest : public TcpProxyTestBase { testing::InSequence sequence; for (uint32_t i = 0; i < connections; i++) { EXPECT_CALL(factory_context_.cluster_manager_.thread_local_cluster_, tcpConnPool(_, _)) - .WillOnce(Return(&conn_pool_)) + .WillOnce(Return(Upstream::TcpPoolData([]() {}, &conn_pool_))) .RetiresOnSaturation(); EXPECT_CALL(conn_pool_, newConnection(_)) .WillOnce(Invoke( @@ -98,7 +98,7 @@ class TcpProxyTest : public TcpProxyTestBase { .RetiresOnSaturation(); } EXPECT_CALL(factory_context_.cluster_manager_.thread_local_cluster_, tcpConnPool(_, _)) - .WillRepeatedly(Return(nullptr)); + .WillRepeatedly(Return(absl::nullopt)); } { @@ -553,7 +553,7 @@ TEST_F(TcpProxyTest, WeightedClusterWithMetadataMatch) { EXPECT_CALL(factory_context_.api_.random_, random()).WillOnce(Return(0)); EXPECT_CALL(factory_context_.cluster_manager_.thread_local_cluster_, tcpConnPool(_, _)) - .WillOnce(DoAll(SaveArg<1>(&context), Return(nullptr))); + .WillOnce(DoAll(SaveArg<1>(&context), Return(absl::nullopt))); EXPECT_EQ(Network::FilterStatus::StopIteration, filter_->onNewConnection()); EXPECT_NE(nullptr, context); @@ -577,7 +577,7 @@ TEST_F(TcpProxyTest, WeightedClusterWithMetadataMatch) { EXPECT_CALL(factory_context_.api_.random_, random()).WillOnce(Return(2)); EXPECT_CALL(factory_context_.cluster_manager_.thread_local_cluster_, tcpConnPool(_, _)) - .WillOnce(DoAll(SaveArg<1>(&context), Return(nullptr))); + .WillOnce(DoAll(SaveArg<1>(&context), Return(absl::nullopt))); EXPECT_EQ(Network::FilterStatus::StopIteration, filter_->onNewConnection()); EXPECT_NE(nullptr, context); @@ -616,7 +616,7 @@ TEST_F(TcpProxyTest, StreamInfoDynamicMetadata) { Upstream::LoadBalancerContext* context; EXPECT_CALL(factory_context_.cluster_manager_.thread_local_cluster_, tcpConnPool(_, _)) - .WillOnce(DoAll(SaveArg<1>(&context), Return(nullptr))); + .WillOnce(DoAll(SaveArg<1>(&context), Return(absl::nullopt))); EXPECT_EQ(Network::FilterStatus::StopIteration, filter_->onNewConnection()); EXPECT_NE(nullptr, context); @@ -670,7 +670,7 @@ TEST_F(TcpProxyTest, StreamInfoDynamicMetadataAndConfigMerged) { Upstream::LoadBalancerContext* context; EXPECT_CALL(factory_context_.cluster_manager_.thread_local_cluster_, tcpConnPool(_, _)) - .WillOnce(DoAll(SaveArg<1>(&context), Return(nullptr))); + .WillOnce(DoAll(SaveArg<1>(&context), Return(absl::nullopt))); EXPECT_EQ(Network::FilterStatus::StopIteration, filter_->onNewConnection()); EXPECT_NE(nullptr, context); diff --git a/test/common/upstream/cluster_manager_impl_test.cc b/test/common/upstream/cluster_manager_impl_test.cc index d15250242ab6b..293446f07502b 100644 --- a/test/common/upstream/cluster_manager_impl_test.cc +++ b/test/common/upstream/cluster_manager_impl_test.cc @@ -10,6 +10,7 @@ #include "extensions/transport_sockets/raw_buffer/config.h" #include "test/common/upstream/test_cluster_manager.h" +#include "test/mocks/http/conn_pool.h" #include "test/mocks/upstream/cds_api.h" #include "test/mocks/upstream/cluster_priority_set.h" #include "test/mocks/upstream/cluster_real_priority_set.h" @@ -21,6 +22,23 @@ namespace Envoy { namespace Upstream { + +class HttpPoolDataPeer { +public: + static Http::ConnectionPool::MockInstance* getPool(absl::optional data) { + ASSERT(data.has_value()); + return dynamic_cast(data.value().pool_); + } +}; + +class TcpPoolDataPeer { +public: + static Tcp::ConnectionPool::MockInstance* getPool(absl::optional data) { + ASSERT(data.has_value()); + return dynamic_cast(data.value().pool_); + } +}; + namespace { using ::testing::_; @@ -1604,13 +1622,14 @@ TEST_F(ClusterManagerImplTest, DynamicAddRemove) { EXPECT_EQ(1UL, cluster_manager_->clusters().active_clusters_.size()); Http::ConnectionPool::MockInstance* cp = new Http::ConnectionPool::MockInstance(); EXPECT_CALL(factory_, allocateConnPool_(_, _, _, _, _)).WillOnce(Return(cp)); - EXPECT_EQ(cp, cluster_manager_->getThreadLocalCluster("fake_cluster") - ->httpConnPool(ResourcePriority::Default, Http::Protocol::Http11, nullptr)); + EXPECT_EQ(cp, HttpPoolDataPeer::getPool(cluster_manager_->getThreadLocalCluster("fake_cluster") + ->httpConnPool(ResourcePriority::Default, + Http::Protocol::Http11, nullptr))); Tcp::ConnectionPool::MockInstance* cp2 = new Tcp::ConnectionPool::MockInstance(); EXPECT_CALL(factory_, allocateTcpConnPool_(_)).WillOnce(Return(cp2)); - EXPECT_EQ(cp2, cluster_manager_->getThreadLocalCluster("fake_cluster") - ->tcpConnPool(ResourcePriority::Default, nullptr)); + EXPECT_EQ(cp2, TcpPoolDataPeer::getPool(cluster_manager_->getThreadLocalCluster("fake_cluster") + ->tcpConnPool(ResourcePriority::Default, nullptr))); Network::MockClientConnection* connection = new Network::MockClientConnection(); ON_CALL(*cluster2->info_, features()) @@ -2039,16 +2058,16 @@ TEST_F(ClusterManagerImplTest, DynamicHostRemove) { .WillRepeatedly(ReturnNew()); // This should provide us a CP for each of the above hosts. - Http::ConnectionPool::MockInstance* cp1 = dynamic_cast( + Http::ConnectionPool::MockInstance* cp1 = HttpPoolDataPeer::getPool( cluster_manager_->getThreadLocalCluster("cluster_1") ->httpConnPool(ResourcePriority::Default, Http::Protocol::Http11, nullptr)); - Http::ConnectionPool::MockInstance* cp2 = dynamic_cast( + Http::ConnectionPool::MockInstance* cp2 = HttpPoolDataPeer::getPool( cluster_manager_->getThreadLocalCluster("cluster_1") ->httpConnPool(ResourcePriority::Default, Http::Protocol::Http11, nullptr)); - Http::ConnectionPool::MockInstance* cp1_high = dynamic_cast( + Http::ConnectionPool::MockInstance* cp1_high = HttpPoolDataPeer::getPool( cluster_manager_->getThreadLocalCluster("cluster_1") ->httpConnPool(ResourcePriority::High, Http::Protocol::Http11, nullptr)); - Http::ConnectionPool::MockInstance* cp2_high = dynamic_cast( + Http::ConnectionPool::MockInstance* cp2_high = HttpPoolDataPeer::getPool( cluster_manager_->getThreadLocalCluster("cluster_1") ->httpConnPool(ResourcePriority::High, Http::Protocol::Http11, nullptr)); @@ -2066,18 +2085,18 @@ TEST_F(ClusterManagerImplTest, DynamicHostRemove) { .WillRepeatedly(ReturnNew()); // This should provide us a CP for each of the above hosts. - Tcp::ConnectionPool::MockInstance* tcp1 = dynamic_cast( - cluster_manager_->getThreadLocalCluster("cluster_1") - ->tcpConnPool(ResourcePriority::Default, nullptr)); - Tcp::ConnectionPool::MockInstance* tcp2 = dynamic_cast( - cluster_manager_->getThreadLocalCluster("cluster_1") - ->tcpConnPool(ResourcePriority::Default, nullptr)); - Tcp::ConnectionPool::MockInstance* tcp1_high = dynamic_cast( - cluster_manager_->getThreadLocalCluster("cluster_1") - ->tcpConnPool(ResourcePriority::High, nullptr)); - Tcp::ConnectionPool::MockInstance* tcp2_high = dynamic_cast( - cluster_manager_->getThreadLocalCluster("cluster_1") - ->tcpConnPool(ResourcePriority::High, nullptr)); + Tcp::ConnectionPool::MockInstance* tcp1 = + TcpPoolDataPeer::getPool(cluster_manager_->getThreadLocalCluster("cluster_1") + ->tcpConnPool(ResourcePriority::Default, nullptr)); + Tcp::ConnectionPool::MockInstance* tcp2 = + TcpPoolDataPeer::getPool(cluster_manager_->getThreadLocalCluster("cluster_1") + ->tcpConnPool(ResourcePriority::Default, nullptr)); + Tcp::ConnectionPool::MockInstance* tcp1_high = + TcpPoolDataPeer::getPool(cluster_manager_->getThreadLocalCluster("cluster_1") + ->tcpConnPool(ResourcePriority::High, nullptr)); + Tcp::ConnectionPool::MockInstance* tcp2_high = + TcpPoolDataPeer::getPool(cluster_manager_->getThreadLocalCluster("cluster_1") + ->tcpConnPool(ResourcePriority::High, nullptr)); EXPECT_NE(tcp1, tcp2); EXPECT_NE(tcp1_high, tcp2_high); @@ -2103,21 +2122,21 @@ TEST_F(ClusterManagerImplTest, DynamicHostRemove) { tcp_drained_cb_high = nullptr; // Make sure we get back the same connection pool for the 2nd host as we did before the change. - Http::ConnectionPool::MockInstance* cp3 = dynamic_cast( + Http::ConnectionPool::MockInstance* cp3 = HttpPoolDataPeer::getPool( cluster_manager_->getThreadLocalCluster("cluster_1") ->httpConnPool(ResourcePriority::Default, Http::Protocol::Http11, nullptr)); - Http::ConnectionPool::MockInstance* cp3_high = dynamic_cast( + Http::ConnectionPool::MockInstance* cp3_high = HttpPoolDataPeer::getPool( cluster_manager_->getThreadLocalCluster("cluster_1") ->httpConnPool(ResourcePriority::High, Http::Protocol::Http11, nullptr)); EXPECT_EQ(cp2, cp3); EXPECT_EQ(cp2_high, cp3_high); - Tcp::ConnectionPool::MockInstance* tcp3 = dynamic_cast( - cluster_manager_->getThreadLocalCluster("cluster_1") - ->tcpConnPool(ResourcePriority::Default, nullptr)); - Tcp::ConnectionPool::MockInstance* tcp3_high = dynamic_cast( - cluster_manager_->getThreadLocalCluster("cluster_1") - ->tcpConnPool(ResourcePriority::High, nullptr)); + Tcp::ConnectionPool::MockInstance* tcp3 = + TcpPoolDataPeer::getPool(cluster_manager_->getThreadLocalCluster("cluster_1") + ->tcpConnPool(ResourcePriority::Default, nullptr)); + Tcp::ConnectionPool::MockInstance* tcp3_high = + TcpPoolDataPeer::getPool(cluster_manager_->getThreadLocalCluster("cluster_1") + ->tcpConnPool(ResourcePriority::High, nullptr)); EXPECT_EQ(tcp2, tcp3); EXPECT_EQ(tcp2_high, tcp3_high); @@ -2203,16 +2222,16 @@ TEST_F(ClusterManagerImplTest, DynamicHostRemoveWithTls) { .WillRepeatedly(ReturnNew()); // This should provide us a CP for each of the above hosts. - Http::ConnectionPool::MockInstance* cp1 = dynamic_cast( + Http::ConnectionPool::MockInstance* cp1 = HttpPoolDataPeer::getPool( cluster_manager_->getThreadLocalCluster("cluster_1") ->httpConnPool(ResourcePriority::Default, Http::Protocol::Http11, nullptr)); - Http::ConnectionPool::MockInstance* cp2 = dynamic_cast( + Http::ConnectionPool::MockInstance* cp2 = HttpPoolDataPeer::getPool( cluster_manager_->getThreadLocalCluster("cluster_1") ->httpConnPool(ResourcePriority::Default, Http::Protocol::Http11, nullptr)); - Http::ConnectionPool::MockInstance* cp1_high = dynamic_cast( + Http::ConnectionPool::MockInstance* cp1_high = HttpPoolDataPeer::getPool( cluster_manager_->getThreadLocalCluster("cluster_1") ->httpConnPool(ResourcePriority::High, Http::Protocol::Http11, nullptr)); - Http::ConnectionPool::MockInstance* cp2_high = dynamic_cast( + Http::ConnectionPool::MockInstance* cp2_high = HttpPoolDataPeer::getPool( cluster_manager_->getThreadLocalCluster("cluster_1") ->httpConnPool(ResourcePriority::High, Http::Protocol::Http11, nullptr)); @@ -2230,36 +2249,32 @@ TEST_F(ClusterManagerImplTest, DynamicHostRemoveWithTls) { .WillRepeatedly(ReturnNew()); // This should provide us a CP for each of the above hosts, and for different SNIs - Tcp::ConnectionPool::MockInstance* tcp1 = dynamic_cast( - cluster_manager_->getThreadLocalCluster("cluster_1") - ->tcpConnPool(ResourcePriority::Default, nullptr)); - Tcp::ConnectionPool::MockInstance* tcp2 = dynamic_cast( - cluster_manager_->getThreadLocalCluster("cluster_1") - ->tcpConnPool(ResourcePriority::Default, nullptr)); - Tcp::ConnectionPool::MockInstance* tcp1_high = dynamic_cast( - cluster_manager_->getThreadLocalCluster("cluster_1") - ->tcpConnPool(ResourcePriority::High, nullptr)); - Tcp::ConnectionPool::MockInstance* tcp2_high = dynamic_cast( - cluster_manager_->getThreadLocalCluster("cluster_1") - ->tcpConnPool(ResourcePriority::High, nullptr)); + Tcp::ConnectionPool::MockInstance* tcp1 = + TcpPoolDataPeer::getPool(cluster_manager_->getThreadLocalCluster("cluster_1") + ->tcpConnPool(ResourcePriority::Default, nullptr)); + Tcp::ConnectionPool::MockInstance* tcp2 = + TcpPoolDataPeer::getPool(cluster_manager_->getThreadLocalCluster("cluster_1") + ->tcpConnPool(ResourcePriority::Default, nullptr)); + Tcp::ConnectionPool::MockInstance* tcp1_high = + TcpPoolDataPeer::getPool(cluster_manager_->getThreadLocalCluster("cluster_1") + ->tcpConnPool(ResourcePriority::High, nullptr)); + Tcp::ConnectionPool::MockInstance* tcp2_high = + TcpPoolDataPeer::getPool(cluster_manager_->getThreadLocalCluster("cluster_1") + ->tcpConnPool(ResourcePriority::High, nullptr)); Tcp::ConnectionPool::MockInstance* tcp1_example_com = - dynamic_cast( - cluster_manager_->getThreadLocalCluster("cluster_1") - ->tcpConnPool(ResourcePriority::Default, &example_com_context)); + TcpPoolDataPeer::getPool(cluster_manager_->getThreadLocalCluster("cluster_1") + ->tcpConnPool(ResourcePriority::Default, &example_com_context)); Tcp::ConnectionPool::MockInstance* tcp2_example_com = - dynamic_cast( - cluster_manager_->getThreadLocalCluster("cluster_1") - ->tcpConnPool(ResourcePriority::Default, &example_com_context)); + TcpPoolDataPeer::getPool(cluster_manager_->getThreadLocalCluster("cluster_1") + ->tcpConnPool(ResourcePriority::Default, &example_com_context)); Tcp::ConnectionPool::MockInstance* tcp1_ibm_com = - dynamic_cast( - cluster_manager_->getThreadLocalCluster("cluster_1") - ->tcpConnPool(ResourcePriority::Default, &ibm_com_context)); + TcpPoolDataPeer::getPool(cluster_manager_->getThreadLocalCluster("cluster_1") + ->tcpConnPool(ResourcePriority::Default, &ibm_com_context)); Tcp::ConnectionPool::MockInstance* tcp2_ibm_com = - dynamic_cast( - cluster_manager_->getThreadLocalCluster("cluster_1") - ->tcpConnPool(ResourcePriority::Default, &ibm_com_context)); + TcpPoolDataPeer::getPool(cluster_manager_->getThreadLocalCluster("cluster_1") + ->tcpConnPool(ResourcePriority::Default, &ibm_com_context)); EXPECT_NE(tcp1, tcp2); EXPECT_NE(tcp1_high, tcp2_high); @@ -2322,38 +2337,34 @@ TEST_F(ClusterManagerImplTest, DynamicHostRemoveWithTls) { tcp_drained_cb_ibm_com = nullptr; // Make sure we get back the same connection pool for the 2nd host as we did before the change. - Http::ConnectionPool::MockInstance* cp3 = dynamic_cast( + Http::ConnectionPool::MockInstance* cp3 = HttpPoolDataPeer::getPool( cluster_manager_->getThreadLocalCluster("cluster_1") ->httpConnPool(ResourcePriority::Default, Http::Protocol::Http11, nullptr)); - Http::ConnectionPool::MockInstance* cp3_high = dynamic_cast( + Http::ConnectionPool::MockInstance* cp3_high = HttpPoolDataPeer::getPool( cluster_manager_->getThreadLocalCluster("cluster_1") ->httpConnPool(ResourcePriority::High, Http::Protocol::Http11, nullptr)); EXPECT_EQ(cp2, cp3); EXPECT_EQ(cp2_high, cp3_high); - Tcp::ConnectionPool::MockInstance* tcp3 = dynamic_cast( - cluster_manager_->getThreadLocalCluster("cluster_1") - ->tcpConnPool(ResourcePriority::Default, nullptr)); - Tcp::ConnectionPool::MockInstance* tcp3_high = dynamic_cast( - cluster_manager_->getThreadLocalCluster("cluster_1") - ->tcpConnPool(ResourcePriority::High, nullptr)); + Tcp::ConnectionPool::MockInstance* tcp3 = + TcpPoolDataPeer::getPool(cluster_manager_->getThreadLocalCluster("cluster_1") + ->tcpConnPool(ResourcePriority::Default, nullptr)); + Tcp::ConnectionPool::MockInstance* tcp3_high = + TcpPoolDataPeer::getPool(cluster_manager_->getThreadLocalCluster("cluster_1") + ->tcpConnPool(ResourcePriority::High, nullptr)); Tcp::ConnectionPool::MockInstance* tcp3_example_com = - dynamic_cast( - cluster_manager_->getThreadLocalCluster("cluster_1") - ->tcpConnPool(ResourcePriority::Default, &example_com_context)); - Tcp::ConnectionPool::MockInstance* tcp3_example_com_with_san = - dynamic_cast( - cluster_manager_->getThreadLocalCluster("cluster_1") - ->tcpConnPool(ResourcePriority::Default, &example_com_context_with_san)); - Tcp::ConnectionPool::MockInstance* tcp3_example_com_with_san2 = - dynamic_cast( - cluster_manager_->getThreadLocalCluster("cluster_1") - ->tcpConnPool(ResourcePriority::Default, &example_com_context_with_san2)); + TcpPoolDataPeer::getPool(cluster_manager_->getThreadLocalCluster("cluster_1") + ->tcpConnPool(ResourcePriority::Default, &example_com_context)); + Tcp::ConnectionPool::MockInstance* tcp3_example_com_with_san = TcpPoolDataPeer::getPool( + cluster_manager_->getThreadLocalCluster("cluster_1") + ->tcpConnPool(ResourcePriority::Default, &example_com_context_with_san)); + Tcp::ConnectionPool::MockInstance* tcp3_example_com_with_san2 = TcpPoolDataPeer::getPool( + cluster_manager_->getThreadLocalCluster("cluster_1") + ->tcpConnPool(ResourcePriority::Default, &example_com_context_with_san2)); Tcp::ConnectionPool::MockInstance* tcp3_ibm_com = - dynamic_cast( - cluster_manager_->getThreadLocalCluster("cluster_1") - ->tcpConnPool(ResourcePriority::Default, &ibm_com_context)); + TcpPoolDataPeer::getPool(cluster_manager_->getThreadLocalCluster("cluster_1") + ->tcpConnPool(ResourcePriority::Default, &ibm_com_context)); EXPECT_EQ(tcp2, tcp3); EXPECT_EQ(tcp2_high, tcp3_high); @@ -2504,13 +2515,13 @@ TEST_F(ClusterManagerImplTest, DynamicHostRemoveDefaultPriority) { EXPECT_CALL(factory_, allocateTcpConnPool_(_)) .WillOnce(ReturnNew()); - Http::ConnectionPool::MockInstance* cp = dynamic_cast( + Http::ConnectionPool::MockInstance* cp = HttpPoolDataPeer::getPool( cluster_manager_->getThreadLocalCluster("cluster_1") ->httpConnPool(ResourcePriority::Default, Http::Protocol::Http11, nullptr)); - Tcp::ConnectionPool::MockInstance* tcp = dynamic_cast( - cluster_manager_->getThreadLocalCluster("cluster_1") - ->tcpConnPool(ResourcePriority::Default, nullptr)); + Tcp::ConnectionPool::MockInstance* tcp = + TcpPoolDataPeer::getPool(cluster_manager_->getThreadLocalCluster("cluster_1") + ->tcpConnPool(ResourcePriority::Default, nullptr)); // Immediate drain, since this can happen with the HTTP codecs. EXPECT_CALL(*cp, addDrainedCallback(_)) @@ -2590,13 +2601,13 @@ TEST_F(ClusterManagerImplTest, ConnPoolDestroyWithDraining) { MockTcpConnPoolWithDestroy* mock_tcp = new MockTcpConnPoolWithDestroy(); EXPECT_CALL(factory_, allocateTcpConnPool_(_)).WillOnce(Return(mock_tcp)); - Http::ConnectionPool::MockInstance* cp = dynamic_cast( + Http::ConnectionPool::MockInstance* cp = HttpPoolDataPeer::getPool( cluster_manager_->getThreadLocalCluster("cluster_1") ->httpConnPool(ResourcePriority::Default, Http::Protocol::Http11, nullptr)); - Tcp::ConnectionPool::MockInstance* tcp = dynamic_cast( - cluster_manager_->getThreadLocalCluster("cluster_1") - ->tcpConnPool(ResourcePriority::Default, nullptr)); + Tcp::ConnectionPool::MockInstance* tcp = + TcpPoolDataPeer::getPool(cluster_manager_->getThreadLocalCluster("cluster_1") + ->tcpConnPool(ResourcePriority::Default, nullptr)); // Remove the first host, this should lead to the cp being drained. Http::ConnectionPool::Instance::DrainedCb drained_cb; @@ -2641,11 +2652,11 @@ TEST_F(ClusterManagerImplTest, OriginalDstInitialization) { EXPECT_FALSE(all_clusters.active_clusters_.at("cluster_1").get().info()->addedViaApi()); // Test for no hosts returning the correct values before we have hosts. - EXPECT_EQ(nullptr, + EXPECT_EQ(absl::nullopt, cluster_manager_->getThreadLocalCluster("cluster_1") ->httpConnPool(ResourcePriority::Default, Http::Protocol::Http11, nullptr)); - EXPECT_EQ(nullptr, cluster_manager_->getThreadLocalCluster("cluster_1") - ->tcpConnPool(ResourcePriority::Default, nullptr)); + EXPECT_EQ(absl::nullopt, cluster_manager_->getThreadLocalCluster("cluster_1") + ->tcpConnPool(ResourcePriority::Default, nullptr)); EXPECT_EQ(nullptr, cluster_manager_->getThreadLocalCluster("cluster_1")->tcpConn(nullptr).connection_); EXPECT_EQ(3UL, factory_.stats_.counter("cluster.cluster_1.upstream_cx_none_healthy").value()); @@ -3041,9 +3052,9 @@ TEST_F(ClusterManagerImplTest, UpstreamSocketOptionsPassedToTcpConnPool) { EXPECT_CALL(context, upstreamSocketOptions()).WillOnce(Return(options_to_return)); EXPECT_CALL(factory_, allocateTcpConnPool_(_)).WillOnce(Return(to_create)); - Tcp::ConnectionPool::Instance* cp = cluster_manager_->getThreadLocalCluster("cluster_1") - ->tcpConnPool(ResourcePriority::Default, &context); - EXPECT_NE(nullptr, cp); + auto opt_cp = cluster_manager_->getThreadLocalCluster("cluster_1") + ->tcpConnPool(ResourcePriority::Default, &context); + EXPECT_TRUE(opt_cp.has_value()); } TEST_F(ClusterManagerImplTest, UpstreamSocketOptionsPassedToConnPool) { @@ -3057,11 +3068,9 @@ TEST_F(ClusterManagerImplTest, UpstreamSocketOptionsPassedToConnPool) { EXPECT_CALL(context, upstreamSocketOptions()).WillOnce(Return(options_to_return)); EXPECT_CALL(factory_, allocateConnPool_(_, _, _, _, _)).WillOnce(Return(to_create)); - Http::ConnectionPool::Instance* cp = - cluster_manager_->getThreadLocalCluster("cluster_1") - ->httpConnPool(ResourcePriority::Default, Http::Protocol::Http11, &context); - - EXPECT_NE(nullptr, cp); + auto opt_cp = cluster_manager_->getThreadLocalCluster("cluster_1") + ->httpConnPool(ResourcePriority::Default, Http::Protocol::Http11, &context); + EXPECT_TRUE(opt_cp.has_value()); } TEST_F(ClusterManagerImplTest, UpstreamSocketOptionsUsedInConnPoolHash) { @@ -3080,21 +3089,23 @@ TEST_F(ClusterManagerImplTest, UpstreamSocketOptionsUsedInConnPoolHash) { EXPECT_CALL(context2, upstreamSocketOptions()).WillRepeatedly(Return(options2)); EXPECT_CALL(factory_, allocateConnPool_(_, _, _, _, _)).WillOnce(Return(to_create1)); - Http::ConnectionPool::Instance* cp1 = + Http::ConnectionPool::Instance* cp1 = HttpPoolDataPeer::getPool( cluster_manager_->getThreadLocalCluster("cluster_1") - ->httpConnPool(ResourcePriority::Default, Http::Protocol::Http11, &context1); + ->httpConnPool(ResourcePriority::Default, Http::Protocol::Http11, &context1)); EXPECT_CALL(factory_, allocateConnPool_(_, _, _, _, _)).WillOnce(Return(to_create2)); - Http::ConnectionPool::Instance* cp2 = + Http::ConnectionPool::Instance* cp2 = HttpPoolDataPeer::getPool( cluster_manager_->getThreadLocalCluster("cluster_1") - ->httpConnPool(ResourcePriority::Default, Http::Protocol::Http11, &context2); + ->httpConnPool(ResourcePriority::Default, Http::Protocol::Http11, &context2)); - Http::ConnectionPool::Instance* should_be_cp1 = + Http::ConnectionPool::Instance* should_be_cp1 = HttpPoolDataPeer::getPool( cluster_manager_->getThreadLocalCluster("cluster_1") - ->httpConnPool(ResourcePriority::Default, Http::Protocol::Http11, &context1); - Http::ConnectionPool::Instance* should_be_cp2 = + ->httpConnPool(ResourcePriority::Default, Http::Protocol::Http11, &context1)); + EXPECT_NE(nullptr, cp1); + EXPECT_NE(nullptr, cp2); + Http::ConnectionPool::Instance* should_be_cp2 = HttpPoolDataPeer::getPool( cluster_manager_->getThreadLocalCluster("cluster_1") - ->httpConnPool(ResourcePriority::Default, Http::Protocol::Http11, &context2); + ->httpConnPool(ResourcePriority::Default, Http::Protocol::Http11, &context2)); // The different upstream options should lead to different hashKeys, thus different pools. EXPECT_NE(cp1, cp2); @@ -3114,11 +3125,9 @@ TEST_F(ClusterManagerImplTest, UpstreamSocketOptionsNullIsOkay) { EXPECT_CALL(context, upstreamSocketOptions()).WillOnce(Return(options_to_return)); EXPECT_CALL(factory_, allocateConnPool_(_, _, _, _, _)).WillOnce(Return(to_create)); - Http::ConnectionPool::Instance* cp = - cluster_manager_->getThreadLocalCluster("cluster_1") - ->httpConnPool(ResourcePriority::Default, Http::Protocol::Http11, &context); - - EXPECT_NE(nullptr, cp); + auto opt_cp = cluster_manager_->getThreadLocalCluster("cluster_1") + ->httpConnPool(ResourcePriority::Default, Http::Protocol::Http11, &context); + EXPECT_TRUE(opt_cp.has_value()); } class TestUpstreamNetworkFilter : public Network::WriteFilter { @@ -4002,11 +4011,11 @@ TEST_F(ClusterManagerImplTest, ConnPoolsDrainedOnHostSetChange) { EXPECT_FALSE(all_clusters.active_clusters_.at("cluster_1").get().info()->addedViaApi()); // Verify that we get no hosts when the HostSet is empty. - EXPECT_EQ(nullptr, + EXPECT_EQ(absl::nullopt, cluster_manager_->getThreadLocalCluster("cluster_1") ->httpConnPool(ResourcePriority::Default, Http::Protocol::Http11, nullptr)); - EXPECT_EQ(nullptr, cluster_manager_->getThreadLocalCluster("cluster_1") - ->tcpConnPool(ResourcePriority::Default, nullptr)); + EXPECT_EQ(absl::nullopt, cluster_manager_->getThreadLocalCluster("cluster_1") + ->tcpConnPool(ResourcePriority::Default, nullptr)); EXPECT_EQ(nullptr, cluster_manager_->getThreadLocalCluster("cluster_1")->tcpConn(nullptr).connection_); @@ -4037,21 +4046,21 @@ TEST_F(ClusterManagerImplTest, ConnPoolsDrainedOnHostSetChange) { .WillRepeatedly(ReturnNew()); // This should provide us a CP for each of the above hosts. - Http::ConnectionPool::MockInstance* cp1 = dynamic_cast( + Http::ConnectionPool::MockInstance* cp1 = HttpPoolDataPeer::getPool( cluster_manager_->getThreadLocalCluster("cluster_1") ->httpConnPool(ResourcePriority::Default, Http::Protocol::Http11, nullptr)); // Create persistent connection for host2. - Http::ConnectionPool::MockInstance* cp2 = dynamic_cast( + Http::ConnectionPool::MockInstance* cp2 = HttpPoolDataPeer::getPool( cluster_manager_->getThreadLocalCluster("cluster_1") ->httpConnPool(ResourcePriority::Default, Http::Protocol::Http2, nullptr)); - Tcp::ConnectionPool::MockInstance* tcp1 = dynamic_cast( - cluster_manager_->getThreadLocalCluster("cluster_1") - ->tcpConnPool(ResourcePriority::Default, nullptr)); + Tcp::ConnectionPool::MockInstance* tcp1 = + TcpPoolDataPeer::getPool(cluster_manager_->getThreadLocalCluster("cluster_1") + ->tcpConnPool(ResourcePriority::Default, nullptr)); - Tcp::ConnectionPool::MockInstance* tcp2 = dynamic_cast( - cluster_manager_->getThreadLocalCluster("cluster_1") - ->tcpConnPool(ResourcePriority::Default, nullptr)); + Tcp::ConnectionPool::MockInstance* tcp2 = + TcpPoolDataPeer::getPool(cluster_manager_->getThreadLocalCluster("cluster_1") + ->tcpConnPool(ResourcePriority::Default, nullptr)); EXPECT_NE(cp1, cp2); EXPECT_NE(tcp1, tcp2); @@ -4077,13 +4086,12 @@ TEST_F(ClusterManagerImplTest, ConnPoolsDrainedOnHostSetChange) { hosts_removed, 100); // Recreate connection pool for host1. - cp1 = dynamic_cast( + cp1 = HttpPoolDataPeer::getPool( cluster_manager_->getThreadLocalCluster("cluster_1") ->httpConnPool(ResourcePriority::Default, Http::Protocol::Http11, nullptr)); - tcp1 = dynamic_cast( - cluster_manager_->getThreadLocalCluster("cluster_1") - ->tcpConnPool(ResourcePriority::Default, nullptr)); + tcp1 = TcpPoolDataPeer::getPool(cluster_manager_->getThreadLocalCluster("cluster_1") + ->tcpConnPool(ResourcePriority::Default, nullptr)); HostSharedPtr host3 = makeTestHost(cluster.info(), "tcp://127.0.0.1:82", time_system_); @@ -4145,13 +4153,13 @@ TEST_F(ClusterManagerImplTest, ConnPoolsNotDrainedOnHostSetChange) { .WillRepeatedly(ReturnNew()); // This should provide us a CP for each of the above hosts. - Http::ConnectionPool::MockInstance* cp1 = dynamic_cast( + Http::ConnectionPool::MockInstance* cp1 = HttpPoolDataPeer::getPool( cluster_manager_->getThreadLocalCluster("cluster_1") ->httpConnPool(ResourcePriority::Default, Http::Protocol::Http11, nullptr)); - Tcp::ConnectionPool::MockInstance* tcp1 = dynamic_cast( - cluster_manager_->getThreadLocalCluster("cluster_1") - ->tcpConnPool(ResourcePriority::Default, nullptr)); + Tcp::ConnectionPool::MockInstance* tcp1 = + TcpPoolDataPeer::getPool(cluster_manager_->getThreadLocalCluster("cluster_1") + ->tcpConnPool(ResourcePriority::Default, nullptr)); HostSharedPtr host2 = makeTestHost(cluster.info(), "tcp://127.0.0.1:82", time_system_); HostVector hosts_added; @@ -4279,8 +4287,9 @@ TEST_F(ClusterManagerImplTest, ConnectionPoolPerDownstreamConnection) { EXPECT_CALL(downstream_connection, hashKey) .WillOnce(Invoke([i](std::vector& hash_key) { hash_key.push_back(i); })); EXPECT_EQ(conn_pool_vector.back(), - cluster_manager_->getThreadLocalCluster("cluster_1") - ->httpConnPool(ResourcePriority::Default, Http::Protocol::Http11, &lb_context)); + HttpPoolDataPeer::getPool(cluster_manager_->getThreadLocalCluster("cluster_1") + ->httpConnPool(ResourcePriority::Default, + Http::Protocol::Http11, &lb_context))); } // Check that the first entry is still in the pool map @@ -4288,8 +4297,9 @@ TEST_F(ClusterManagerImplTest, ConnectionPoolPerDownstreamConnection) { hash_key.push_back(0); })); EXPECT_EQ(conn_pool_vector.front(), - cluster_manager_->getThreadLocalCluster("cluster_1") - ->httpConnPool(ResourcePriority::Default, Http::Protocol::Http11, &lb_context)); + HttpPoolDataPeer::getPool(cluster_manager_->getThreadLocalCluster("cluster_1") + ->httpConnPool(ResourcePriority::Default, + Http::Protocol::Http11, &lb_context))); } class PreconnectTest : public ClusterManagerImplTest { @@ -4346,6 +4356,9 @@ class PreconnectTest : public ClusterManagerImplTest { HostSharedPtr host2_; HostSharedPtr host3_; HostSharedPtr host4_; + Http::MockResponseDecoder decoder_; + Http::ConnectionPool::MockCallbacks http_callbacks_; + Tcp::ConnectionPool::MockCallbacks tcp_callbacks_; }; TEST_F(PreconnectTest, PreconnectOff) { @@ -4354,33 +4367,39 @@ TEST_F(PreconnectTest, PreconnectOff) { initialize(0); EXPECT_CALL(factory_, allocateConnPool_(_, _, _, _, _)) .Times(1) - .WillRepeatedly(ReturnNew()); - cluster_manager_->getThreadLocalCluster("cluster_1") - ->httpConnPool(ResourcePriority::Default, Http::Protocol::Http11, nullptr); + .WillRepeatedly(ReturnNew>()); + auto http_handle = cluster_manager_->getThreadLocalCluster("cluster_1") + ->httpConnPool(ResourcePriority::Default, Http::Protocol::Http11, nullptr); + http_handle.value().newStream(decoder_, http_callbacks_); EXPECT_CALL(factory_, allocateTcpConnPool_(_)) .Times(1) - .WillRepeatedly(ReturnNew()); - cluster_manager_->getThreadLocalCluster("cluster_1") - ->tcpConnPool(ResourcePriority::Default, nullptr); + .WillRepeatedly(ReturnNew>()); + auto tcp_handle = cluster_manager_->getThreadLocalCluster("cluster_1") + ->tcpConnPool(ResourcePriority::Default, nullptr); + ASSERT_TRUE(tcp_handle.has_value()); + tcp_handle.value().newConnection(tcp_callbacks_); } TEST_F(PreconnectTest, PreconnectOn) { - // With preconnect set to 1.1, each request for a connection pool will kick off + // With preconnect set to 1.1, maybePreconnect will kick off // preconnecting, so create the pool for both the current connection and the // anticipated one. initialize(1.1); EXPECT_CALL(factory_, allocateConnPool_(_, _, _, _, _)) .Times(2) .WillRepeatedly(ReturnNew>()); - cluster_manager_->getThreadLocalCluster("cluster_1") - ->httpConnPool(ResourcePriority::Default, Http::Protocol::Http11, nullptr); + auto http_handle = cluster_manager_->getThreadLocalCluster("cluster_1") + ->httpConnPool(ResourcePriority::Default, Http::Protocol::Http11, nullptr); + http_handle.value().newStream(decoder_, http_callbacks_); EXPECT_CALL(factory_, allocateTcpConnPool_(_)) .Times(2) .WillRepeatedly(ReturnNew>()); - cluster_manager_->getThreadLocalCluster("cluster_1") - ->tcpConnPool(ResourcePriority::Default, nullptr); + auto tcp_handle = cluster_manager_->getThreadLocalCluster("cluster_1") + ->tcpConnPool(ResourcePriority::Default, nullptr); + ASSERT_TRUE(tcp_handle.has_value()); + tcp_handle.value().newConnection(tcp_callbacks_); } TEST_F(PreconnectTest, PreconnectHighHttp) { @@ -4397,8 +4416,9 @@ TEST_F(PreconnectTest, PreconnectHighHttp) { })); return ret; })); - cluster_manager_->getThreadLocalCluster("cluster_1") - ->httpConnPool(ResourcePriority::Default, Http::Protocol::Http11, nullptr); + auto http_handle = cluster_manager_->getThreadLocalCluster("cluster_1") + ->httpConnPool(ResourcePriority::Default, Http::Protocol::Http11, nullptr); + http_handle.value().newStream(decoder_, http_callbacks_); // Expect preconnect to be called 3 times across the four hosts. EXPECT_EQ(3, http_preconnect); } @@ -4417,8 +4437,9 @@ TEST_F(PreconnectTest, PreconnectHighTcp) { })); return ret; })); - cluster_manager_->getThreadLocalCluster("cluster_1") - ->tcpConnPool(ResourcePriority::Default, nullptr); + auto tcp_handle = cluster_manager_->getThreadLocalCluster("cluster_1") + ->tcpConnPool(ResourcePriority::Default, nullptr); + tcp_handle.value().newConnection(tcp_callbacks_); // Expect preconnect to be called 3 times across the four hosts. EXPECT_EQ(3, tcp_preconnect); } @@ -4437,8 +4458,9 @@ TEST_F(PreconnectTest, PreconnectCappedAt3) { })); return ret; })); - cluster_manager_->getThreadLocalCluster("cluster_1") - ->httpConnPool(ResourcePriority::Default, Http::Protocol::Http11, nullptr); + auto http_handle = cluster_manager_->getThreadLocalCluster("cluster_1") + ->httpConnPool(ResourcePriority::Default, Http::Protocol::Http11, nullptr); + http_handle.value().newStream(decoder_, http_callbacks_); // Expect preconnect to be called 3 times across the four hosts. EXPECT_EQ(3, http_preconnect); @@ -4447,8 +4469,9 @@ TEST_F(PreconnectTest, PreconnectCappedAt3) { // do the full 3 as the number of outstanding preconnects is limited by the // number of healthy hosts. http_preconnect = 0; - cluster_manager_->getThreadLocalCluster("cluster_1") - ->httpConnPool(ResourcePriority::Default, Http::Protocol::Http11, nullptr); + http_handle = cluster_manager_->getThreadLocalCluster("cluster_1") + ->httpConnPool(ResourcePriority::Default, Http::Protocol::Http11, nullptr); + http_handle.value().newStream(decoder_, http_callbacks_); EXPECT_EQ(2, http_preconnect); } @@ -4467,8 +4490,9 @@ TEST_F(PreconnectTest, PreconnectCappedByMaybePreconnect) { })); return ret; })); - cluster_manager_->getThreadLocalCluster("cluster_1") - ->httpConnPool(ResourcePriority::Default, Http::Protocol::Http11, nullptr); + auto http_handle = cluster_manager_->getThreadLocalCluster("cluster_1") + ->httpConnPool(ResourcePriority::Default, Http::Protocol::Http11, nullptr); + http_handle.value().newStream(decoder_, http_callbacks_); // Expect preconnect to be called once and then preconnecting is stopped. EXPECT_EQ(1, http_preconnect_calls); } diff --git a/test/extensions/filters/network/dubbo_proxy/router_test.cc b/test/extensions/filters/network/dubbo_proxy/router_test.cc index e8945775b3ea9..2cce1608f030a 100644 --- a/test/extensions/filters/network/dubbo_proxy/router_test.cc +++ b/test/extensions/filters/network/dubbo_proxy/router_test.cc @@ -335,7 +335,7 @@ TEST_F(DubboRouterTest, NoHealthyHosts) { EXPECT_CALL(*route_, routeEntry()).WillOnce(Return(&route_entry_)); EXPECT_CALL(route_entry_, clusterName()).WillRepeatedly(ReturnRef(cluster_name_)); EXPECT_CALL(context_.cluster_manager_.thread_local_cluster_, tcpConnPool(_, _)) - .WillOnce(Return(nullptr)); + .WillOnce(Return(absl::nullopt)); EXPECT_CALL(callbacks_, sendLocalReply(_, _)) .WillOnce(Invoke([&](const DubboFilters::DirectResponse& response, bool end_stream) -> void { diff --git a/test/extensions/filters/network/rocketmq_proxy/router_test.cc b/test/extensions/filters/network/rocketmq_proxy/router_test.cc index 4041fb54fe2ac..b258426e9b763 100644 --- a/test/extensions/filters/network/rocketmq_proxy/router_test.cc +++ b/test/extensions/filters/network/rocketmq_proxy/router_test.cc @@ -246,7 +246,7 @@ TEST_F(RocketmqRouterTest, NoHealthyHosts) { EXPECT_THAT(error_message, ContainsRegex(".*No host available*.")); })); EXPECT_CALL(context_.cluster_manager_.thread_local_cluster_, tcpConnPool(_, _)) - .WillOnce(Return(nullptr)); + .WillOnce(Return(absl::nullopt)); EXPECT_CALL(*active_message_, onReset()); startRequest(); diff --git a/test/extensions/filters/network/thrift_proxy/router_test.cc b/test/extensions/filters/network/thrift_proxy/router_test.cc index d17758c682bd5..738e271abba5a 100644 --- a/test/extensions/filters/network/thrift_proxy/router_test.cc +++ b/test/extensions/filters/network/thrift_proxy/router_test.cc @@ -577,7 +577,7 @@ TEST_F(ThriftRouterTest, NoHealthyHosts) { EXPECT_CALL(*route_, routeEntry()).WillOnce(Return(&route_entry_)); EXPECT_CALL(route_entry_, clusterName()).WillRepeatedly(ReturnRef(cluster_name_)); EXPECT_CALL(context_.cluster_manager_.thread_local_cluster_, tcpConnPool(_, _)) - .WillOnce(Return(nullptr)); + .WillOnce(Return(absl::nullopt)); EXPECT_CALL(callbacks_, sendLocalReply(_, _)) .WillOnce(Invoke([&](const DirectResponse& response, bool end_stream) -> void { diff --git a/test/extensions/upstreams/http/tcp/upstream_request_test.cc b/test/extensions/upstreams/http/tcp/upstream_request_test.cc index 555f93a6e7501..676030dc8748a 100644 --- a/test/extensions/upstreams/http/tcp/upstream_request_test.cc +++ b/test/extensions/upstreams/http/tcp/upstream_request_test.cc @@ -39,7 +39,8 @@ class TcpConnPoolTest : public ::testing::Test { NiceMock route_entry; NiceMock cm; cm.initializeThreadLocalClusters({"fake_cluster"}); - EXPECT_CALL(cm.thread_local_cluster_, tcpConnPool(_, _)).WillOnce(Return(&mock_pool_)); + EXPECT_CALL(cm.thread_local_cluster_, tcpConnPool(_, _)) + .WillOnce(Return(Upstream::TcpPoolData([]() {}, &mock_pool_))); conn_pool_ = std::make_unique(cm.thread_local_cluster_, true, route_entry, Envoy::Http::Protocol::Http11, nullptr); } diff --git a/test/extensions/upstreams/tcp/generic/config_test.cc b/test/extensions/upstreams/tcp/generic/config_test.cc index ffc603614491e..f4a69e2c7a5fd 100644 --- a/test/extensions/upstreams/tcp/generic/config_test.cc +++ b/test/extensions/upstreams/tcp/generic/config_test.cc @@ -26,7 +26,7 @@ class TcpConnPoolTest : public ::testing::Test { TEST_F(TcpConnPoolTest, TestNoConnPool) { envoy::extensions::filters::network::tcp_proxy::v3::TcpProxy_TunnelingConfig config; config.set_hostname("host"); - EXPECT_CALL(thread_local_cluster_, httpConnPool(_, _, _)).WillOnce(Return(nullptr)); + EXPECT_CALL(thread_local_cluster_, httpConnPool(_, _, _)).WillOnce(Return(absl::nullopt)); EXPECT_EQ(nullptr, factory_.createGenericConnPool(thread_local_cluster_, config, nullptr, callbacks_)); } diff --git a/test/integration/tcp_conn_pool_integration_test.cc b/test/integration/tcp_conn_pool_integration_test.cc index bf02e97f5ccf0..a04f1f8f4d0e2 100644 --- a/test/integration/tcp_conn_pool_integration_test.cc +++ b/test/integration/tcp_conn_pool_integration_test.cc @@ -24,13 +24,13 @@ class TestFilter : public Network::ReadFilter { Network::FilterStatus onData(Buffer::Instance& data, bool end_stream) override { UNREFERENCED_PARAMETER(end_stream); - Tcp::ConnectionPool::Instance* pool = + absl::optional pool_data = cluster_manager_.getThreadLocalCluster("cluster_0") ->tcpConnPool(Upstream::ResourcePriority::Default, nullptr); - ASSERT(pool != nullptr); + ASSERT(pool_data.has_value()); requests_.emplace_back(*this, data); - pool->newConnection(requests_.back()); + pool_data.value().newConnection(requests_.back()); ASSERT(data.length() == 0); return Network::FilterStatus::StopIteration; diff --git a/test/mocks/http/conn_pool.h b/test/mocks/http/conn_pool.h index 97d22e6bd6777..2f3d5b9352de2 100644 --- a/test/mocks/http/conn_pool.h +++ b/test/mocks/http/conn_pool.h @@ -13,6 +13,15 @@ namespace Envoy { namespace Http { namespace ConnectionPool { +class MockCallbacks : public Callbacks { + MOCK_METHOD(void, onPoolFailure, + (PoolFailureReason reason, absl::string_view transport_failure_reason, + Upstream::HostDescriptionConstSharedPtr host)); + MOCK_METHOD(void, onPoolReady, + (RequestEncoder & encoder, Upstream::HostDescriptionConstSharedPtr host, + const StreamInfo::StreamInfo& info, absl::optional protocol)); +}; + class MockInstance : public Instance { public: MockInstance(); diff --git a/test/mocks/tcp/mocks.h b/test/mocks/tcp/mocks.h index b052e712353ac..04f0426eb303a 100644 --- a/test/mocks/tcp/mocks.h +++ b/test/mocks/tcp/mocks.h @@ -15,6 +15,13 @@ namespace Envoy { namespace Tcp { namespace ConnectionPool { +class MockCallbacks : public Callbacks { + MOCK_METHOD(void, onPoolFailure, + (PoolFailureReason reason, Upstream::HostDescriptionConstSharedPtr host)); + MOCK_METHOD(void, onPoolReady, + (ConnectionDataPtr && conn, Upstream::HostDescriptionConstSharedPtr host)); +}; + class MockUpstreamCallbacks : public UpstreamCallbacks { public: MockUpstreamCallbacks(); diff --git a/test/mocks/upstream/thread_local_cluster.cc b/test/mocks/upstream/thread_local_cluster.cc index 9e0d9eedd33e4..5842f04d8807a 100644 --- a/test/mocks/upstream/thread_local_cluster.cc +++ b/test/mocks/upstream/thread_local_cluster.cc @@ -13,8 +13,10 @@ MockThreadLocalCluster::MockThreadLocalCluster() { ON_CALL(*this, prioritySet()).WillByDefault(ReturnRef(cluster_.priority_set_)); ON_CALL(*this, info()).WillByDefault(Return(cluster_.info_)); ON_CALL(*this, loadBalancer()).WillByDefault(ReturnRef(lb_)); - ON_CALL(*this, httpConnPool(_, _, _)).WillByDefault(Return(&conn_pool_)); - ON_CALL(*this, tcpConnPool(_, _)).WillByDefault(Return(&tcp_conn_pool_)); + ON_CALL(*this, httpConnPool(_, _, _)) + .WillByDefault(Return(Upstream::HttpPoolData([]() {}, &conn_pool_))); + ON_CALL(*this, tcpConnPool(_, _)) + .WillByDefault(Return(Upstream::TcpPoolData([]() {}, &tcp_conn_pool_))); ON_CALL(*this, httpAsyncClient()).WillByDefault(ReturnRef(async_client_)); } diff --git a/test/mocks/upstream/thread_local_cluster.h b/test/mocks/upstream/thread_local_cluster.h index d4900d18c4933..7057b07da8766 100644 --- a/test/mocks/upstream/thread_local_cluster.h +++ b/test/mocks/upstream/thread_local_cluster.h @@ -30,10 +30,10 @@ class MockThreadLocalCluster : public ThreadLocalCluster { MOCK_METHOD(const PrioritySet&, prioritySet, ()); MOCK_METHOD(ClusterInfoConstSharedPtr, info, ()); MOCK_METHOD(LoadBalancer&, loadBalancer, ()); - MOCK_METHOD(Http::ConnectionPool::Instance*, httpConnPool, + MOCK_METHOD(absl::optional, httpConnPool, (ResourcePriority priority, absl::optional downstream_protocol, LoadBalancerContext* context)); - MOCK_METHOD(Tcp::ConnectionPool::Instance*, tcpConnPool, + MOCK_METHOD(absl::optional, tcpConnPool, (ResourcePriority priority, LoadBalancerContext* context)); MOCK_METHOD(MockHost::MockCreateConnectionData, tcpConn_, (LoadBalancerContext * context)); MOCK_METHOD(Http::AsyncClient&, httpAsyncClient, ());