Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions include/envoy/upstream/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ envoy_cc_library(
deps = [
":load_balancer_interface",
":upstream_interface",
"//include/envoy/http:async_client_interface",
],
)

Expand Down
64 changes: 57 additions & 7 deletions include/envoy/upstream/thread_local_cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,54 @@
namespace Envoy {
namespace Upstream {

// HttpPoolData returns information about a given pool as well as a function
// to create streams on that pool.
class HttpPoolData {
public:
using OnNewStreamFn = std::function<void()>;

HttpPoolData(OnNewStreamFn on_new_stream, Http::ConnectionPool::Instance* pool)
: on_new_stream_(on_new_stream), pool_(pool) {}

Envoy::Http::ConnectionPool::Cancellable*
newStream(Http::ResponseDecoder& response_decoder,
Envoy::Http::ConnectionPool::Callbacks& callbacks) {
on_new_stream_();
return pool_->newStream(response_decoder, callbacks);
}

Upstream::HostDescriptionConstSharedPtr host() const { return pool_->host(); }

private:
friend class HttpPoolDataPeer;

OnNewStreamFn on_new_stream_;
Http::ConnectionPool::Instance* pool_;
};

// Tcp pool returns information about a given pool, as well as a function to
// create connections on that pool.
class TcpPoolData {
public:
using OnNewConnectionFn = std::function<void()>;

TcpPoolData(OnNewConnectionFn on_new_connection, Tcp::ConnectionPool::Instance* pool)
: on_new_connection_(on_new_connection), pool_(pool) {}

Envoy::Tcp::ConnectionPool::Cancellable*
newConnection(Envoy::Tcp::ConnectionPool::Callbacks& callbacks) {
on_new_connection_();
return pool_->newConnection(callbacks);
}

Upstream::HostDescriptionConstSharedPtr host() const { return pool_->host(); }

private:
friend class TcpPoolDataPeer;
OnNewConnectionFn on_new_connection_;
Tcp::ConnectionPool::Instance* pool_;
};

/**
* A thread local cluster instance that can be used for direct load balancing and host set
* interactions. In general, an instance of ThreadLocalCluster can only be safely used in the
Expand Down Expand Up @@ -42,10 +90,11 @@ class ThreadLocalCluster {
* @param priority the connection pool priority.
* @param downstream_protocol the downstream protocol (if one exists) to use in protocol
* selection.
* @param context the optional load balancer context.
* @return the connection pool or nullptr if there is no host available in the cluster.
* @param context the optional load balancer context. Must continue to be
* valid until newConnection is called on the pool (if it is to be called).
* @return the connection pool data or nullopt if there is no host available in the cluster.
*/
virtual Http::ConnectionPool::Instance*
virtual absl::optional<HttpPoolData>
httpConnPool(ResourcePriority priority, absl::optional<Http::Protocol> downstream_protocol,
LoadBalancerContext* context) PURE;

Expand All @@ -55,11 +104,12 @@ class ThreadLocalCluster {
* is used is the one defined on the cluster when it was created.
*
* @param priority the connection pool priority.
* @param context the optional load balancer context.
* @return the connection pool or nullptr if there is no host available in the cluster.
* @param context the optional load balancer context. Must continue to be
* valid until newConnection is called on the pool (if it is to be called).
* @return the connection pool data or nullopt if there is no host available in the cluster.
*/
virtual Tcp::ConnectionPool::Instance* tcpConnPool(ResourcePriority priority,
LoadBalancerContext* context) PURE;
virtual absl::optional<TcpPoolData> tcpConnPool(ResourcePriority priority,
LoadBalancerContext* context) PURE;

/**
* Allocate a load balanced TCP connection for a cluster. The created connection is already
Expand Down
10 changes: 5 additions & 5 deletions source/common/tcp_proxy/upstream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ TcpConnPool::TcpConnPool(Upstream::ThreadLocalCluster& thread_local_cluster,
Upstream::LoadBalancerContext* context,
Tcp::ConnectionPool::UpstreamCallbacks& upstream_callbacks)
: upstream_callbacks_(upstream_callbacks) {
conn_pool_ = thread_local_cluster.tcpConnPool(Upstream::ResourcePriority::Default, context);
conn_pool_data_ = thread_local_cluster.tcpConnPool(Upstream::ResourcePriority::Default, context);
}

TcpConnPool::~TcpConnPool() {
Expand All @@ -165,7 +165,7 @@ void TcpConnPool::newStream(GenericConnectionPoolCallbacks& callbacks) {
// valid connection handle. If newConnection fails inline it may result in attempting to
// select a new host, and a recursive call to initializeUpstreamConnection. In this case the
// first call to newConnection will return null and the inner call will persist.
Tcp::ConnectionPool::Cancellable* handle = conn_pool_->newConnection(*this);
Tcp::ConnectionPool::Cancellable* handle = conn_pool_data_.value().newConnection(*this);
if (handle) {
ASSERT(upstream_handle_ == nullptr);
upstream_handle_ = handle;
Expand Down Expand Up @@ -195,8 +195,8 @@ HttpConnPool::HttpConnPool(Upstream::ThreadLocalCluster& thread_local_cluster,
Tcp::ConnectionPool::UpstreamCallbacks& upstream_callbacks,
Http::CodecType type)
: config_(config), type_(type), upstream_callbacks_(upstream_callbacks) {
conn_pool_ = thread_local_cluster.httpConnPool(Upstream::ResourcePriority::Default, absl::nullopt,
context);
conn_pool_data_ = thread_local_cluster.httpConnPool(Upstream::ResourcePriority::Default,
absl::nullopt, context);
}

HttpConnPool::~HttpConnPool() {
Expand All @@ -215,7 +215,7 @@ void HttpConnPool::newStream(GenericConnectionPoolCallbacks& callbacks) {
upstream_ = std::make_unique<Http2Upstream>(upstream_callbacks_, config_);
}
Tcp::ConnectionPool::Cancellable* handle =
conn_pool_->newStream(upstream_->responseDecoder(), *this);
conn_pool_data_.value().newStream(upstream_->responseDecoder(), *this);
if (handle != nullptr) {
upstream_handle_ = handle;
}
Expand Down
9 changes: 5 additions & 4 deletions source/common/tcp_proxy/upstream.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include "envoy/tcp/conn_pool.h"
#include "envoy/tcp/upstream.h"
#include "envoy/upstream/load_balancer.h"
#include "envoy/upstream/thread_local_cluster.h"
#include "envoy/upstream/upstream.h"

#include "common/common/dump_state_utils.h"
Expand All @@ -21,7 +22,7 @@ class TcpConnPool : public GenericConnPool, public Tcp::ConnectionPool::Callback
Tcp::ConnectionPool::UpstreamCallbacks& upstream_callbacks);
~TcpConnPool() override;

bool valid() const { return conn_pool_ != nullptr; }
bool valid() const { return conn_pool_data_.has_value(); }

// GenericConnPool
void newStream(GenericConnectionPoolCallbacks& callbacks) override;
Expand All @@ -33,7 +34,7 @@ class TcpConnPool : public GenericConnPool, public Tcp::ConnectionPool::Callback
Upstream::HostDescriptionConstSharedPtr host) override;

private:
Tcp::ConnectionPool::Instance* conn_pool_{};
absl::optional<Upstream::TcpPoolData> conn_pool_data_{};
Tcp::ConnectionPool::Cancellable* upstream_handle_{};
GenericConnectionPoolCallbacks* callbacks_{};
Tcp::ConnectionPool::UpstreamCallbacks& upstream_callbacks_;
Expand All @@ -52,7 +53,7 @@ class HttpConnPool : public GenericConnPool, public Http::ConnectionPool::Callba
~HttpConnPool() override;

// HTTP/3 upstreams are not supported at the moment.
bool valid() const { return conn_pool_ != nullptr && type_ <= Http::CodecType::HTTP2; }
bool valid() const { return conn_pool_data_.has_value() && type_ <= Http::CodecType::HTTP2; }

// GenericConnPool
void newStream(GenericConnectionPoolCallbacks& callbacks) override;
Expand Down Expand Up @@ -97,7 +98,7 @@ class HttpConnPool : public GenericConnPool, public Http::ConnectionPool::Callba
Ssl::ConnectionInfoConstSharedPtr ssl_info);
const TunnelingConfig config_;
Http::CodecType type_;
Http::ConnectionPool::Instance* conn_pool_{};
absl::optional<Upstream::HttpPoolData> conn_pool_data_{};
Http::ConnectionPool::Cancellable* upstream_handle_{};
GenericConnectionPoolCallbacks* callbacks_{};
Tcp::ConnectionPool::UpstreamCallbacks& upstream_callbacks_;
Expand Down
56 changes: 30 additions & 26 deletions source/common/upstream/cluster_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -905,41 +905,45 @@ void ClusterManagerImpl::maybePreconnect(
}
}

Http::ConnectionPool::Instance*
absl::optional<HttpPoolData>
ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::httpConnPool(
ResourcePriority priority, absl::optional<Http::Protocol> protocol,
LoadBalancerContext* context) {
// Select a host and create a connection pool for it if it does not already exist.
auto ret = connPool(priority, protocol, context, false);

// Now see if another host should be preconnected.
// httpConnPool is called immediately before a call for newStream. newStream doesn't
// have the load balancer context needed to make selection decisions so preconnecting must be
// performed here in anticipation of the new stream.
// TODO(alyssawilk) refactor to have one function call and return a pair, so this invariant is
// code-enforced.
maybePreconnect(*this, parent_.cluster_manager_state_, [this, &priority, &protocol, &context]() {
return connPool(priority, protocol, context, true);
});
return ret;
auto pool = connPool(priority, protocol, context, false);
if (pool == nullptr) {
return absl::nullopt;
}

HttpPoolData data(
[this, priority, protocol, context]() -> void {
// Now that a new stream is being established, attempt to preconnect.
maybePreconnect(*this, parent_.cluster_manager_state_,
[this, &priority, &protocol, &context]() {
return connPool(priority, protocol, context, true);
});
},
pool);
return data;
}

Tcp::ConnectionPool::Instance*
absl::optional<TcpPoolData>
ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::tcpConnPool(
ResourcePriority priority, LoadBalancerContext* context) {
// Select a host and create a connection pool for it if it does not already exist.
auto ret = tcpConnPool(priority, context, false);

// tcpConnPool is called immediately before a call for newConnection. newConnection
// doesn't have the load balancer context needed to make selection decisions so preconnecting must
// be performed here in anticipation of the new connection.
// TODO(alyssawilk) refactor to have one function call and return a pair, so this invariant is
// code-enforced.
// Now see if another host should be preconnected.
maybePreconnect(*this, parent_.cluster_manager_state_,
[this, &priority, &context]() { return tcpConnPool(priority, context, true); });

return ret;
auto pool = tcpConnPool(priority, context, false);
if (pool == nullptr) {
return absl::nullopt;
}

TcpPoolData data(
[this, priority, context]() -> void {
maybePreconnect(*this, parent_.cluster_manager_state_, [this, &priority, &context]() {
return tcpConnPool(priority, context, true);
});
},
pool);
return data;
}

void ClusterManagerImpl::postThreadLocalDrainConnections(const Cluster& cluster,
Expand Down
10 changes: 5 additions & 5 deletions source/common/upstream/cluster_manager_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -413,11 +413,11 @@ class ClusterManagerImpl : public ClusterManager, Logger::Loggable<Logger::Id::u
const PrioritySet& prioritySet() override { return priority_set_; }
ClusterInfoConstSharedPtr info() override { return cluster_info_; }
LoadBalancer& loadBalancer() override { return *lb_; }
Http::ConnectionPool::Instance*
httpConnPool(ResourcePriority priority, absl::optional<Http::Protocol> downstream_protocol,
LoadBalancerContext* context) override;
Tcp::ConnectionPool::Instance* tcpConnPool(ResourcePriority priority,
LoadBalancerContext* context) override;
absl::optional<HttpPoolData> httpConnPool(ResourcePriority priority,
absl::optional<Http::Protocol> downstream_protocol,
LoadBalancerContext* context) override;
absl::optional<TcpPoolData> tcpConnPool(ResourcePriority priority,
LoadBalancerContext* context) override;
Host::CreateConnectionData tcpConn(LoadBalancerContext* context) override;
Http::AsyncClient& httpAsyncClient() override;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,8 @@ FilterStatus Router::onMessageDecoded(MessageMetadataSharedPtr metadata, Context
return FilterStatus::StopIteration;
}

Tcp::ConnectionPool::Instance* conn_pool =
cluster->tcpConnPool(Upstream::ResourcePriority::Default, this);
if (!conn_pool) {
auto conn_pool_data = cluster->tcpConnPool(Upstream::ResourcePriority::Default, this);
if (!conn_pool_data) {
callbacks_->sendLocalReply(
AppException(
ResponseStatus::ServerError,
Expand Down Expand Up @@ -112,8 +111,9 @@ FilterStatus Router::onMessageDecoded(MessageMetadataSharedPtr metadata, Context
upstream_request_buffer_.move(ctx->originMessage(), ctx->messageSize());
}

upstream_request_ = std::make_unique<UpstreamRequest>(
*this, *conn_pool, metadata, callbacks_->serializationType(), callbacks_->protocolType());
upstream_request_ = std::make_unique<UpstreamRequest>(*this, *conn_pool_data, metadata,
callbacks_->serializationType(),
callbacks_->protocolType());
return upstream_request_->start();
}

Expand Down Expand Up @@ -188,11 +188,11 @@ void Router::cleanup() {
}
}

Router::UpstreamRequest::UpstreamRequest(Router& parent, Tcp::ConnectionPool::Instance& pool,
Router::UpstreamRequest::UpstreamRequest(Router& parent, Upstream::TcpPoolData& pool_data,
MessageMetadataSharedPtr& metadata,
SerializationType serialization_type,
ProtocolType protocol_type)
: parent_(parent), conn_pool_(pool), metadata_(metadata),
: parent_(parent), conn_pool_data_(pool_data), metadata_(metadata),
protocol_(
NamedProtocolConfigFactory::getFactory(protocol_type).createProtocol(serialization_type)),
request_complete_(false), response_started_(false), response_complete_(false),
Expand All @@ -201,7 +201,7 @@ Router::UpstreamRequest::UpstreamRequest(Router& parent, Tcp::ConnectionPool::In
Router::UpstreamRequest::~UpstreamRequest() = default;

FilterStatus Router::UpstreamRequest::start() {
Tcp::ConnectionPool::Cancellable* handle = conn_pool_.newConnection(*this);
Tcp::ConnectionPool::Cancellable* handle = conn_pool_data_.newConnection(*this);
if (handle) {
// Pause while we wait for a connection.
conn_pool_handle_ = handle;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class Router : public Tcp::ConnectionPool::UpstreamCallbacks,

private:
struct UpstreamRequest : public Tcp::ConnectionPool::Callbacks {
UpstreamRequest(Router& parent, Tcp::ConnectionPool::Instance& pool,
UpstreamRequest(Router& parent, Upstream::TcpPoolData& pool_data,
MessageMetadataSharedPtr& metadata, SerializationType serialization_type,
ProtocolType protocol_type);
~UpstreamRequest() override;
Expand All @@ -69,7 +69,7 @@ class Router : public Tcp::ConnectionPool::UpstreamCallbacks,
void onResetStream(ConnectionPool::PoolFailureReason reason);

Router& parent_;
Tcp::ConnectionPool::Instance& conn_pool_;
Upstream::TcpPoolData conn_pool_data_;
MessageMetadataSharedPtr metadata_;

Tcp::ConnectionPool::Cancellable* conn_pool_handle_{};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,27 +35,25 @@ void RouterImpl::onBelowWriteBufferLowWatermark() {
}

void RouterImpl::onEvent(Network::ConnectionEvent event) {
switch (event) {
switch (event)
case Network::ConnectionEvent::RemoteClose: {
ENVOY_LOG(error, "Connection to upstream: {} is closed by remote peer",
upstream_host_->address()->asString());
// Send local reply to downstream
active_message_->onError("Connection to upstream is closed by remote peer");
break;
}
case Network::ConnectionEvent::LocalClose: {
case Network::ConnectionEvent::LocalClose:
ENVOY_LOG(error, "Connection to upstream: {} has been closed",
upstream_host_->address()->asString());
// Send local reply to downstream
active_message_->onError("Connection to upstream has been closed");
break;
}
default:
// Ignore other events for now
ENVOY_LOG(trace, "Ignore event type");
return;
}
active_message_->onReset();
active_message_->onReset();
}

const Envoy::Router::MetadataMatchCriteria* RouterImpl::metadataMatchCriteria() {
Expand Down Expand Up @@ -106,17 +104,16 @@ void RouterImpl::sendRequestToUpstream(ActiveMessage& active_message) {
return;
}

Tcp::ConnectionPool::Instance* conn_pool =
cluster->tcpConnPool(Upstream::ResourcePriority::Default, this);
if (!conn_pool) {
auto data = cluster->tcpConnPool(Upstream::ResourcePriority::Default, this);
if (!data) {
ENVOY_LOG(warn, "No host available for cluster {}. Opaque: {}", cluster_name, opaque);
active_message.onError("No host available");
reset();
return;
}

upstream_request_ = std::make_unique<UpstreamRequest>(*this);
Tcp::ConnectionPool::Cancellable* cancellable = conn_pool->newConnection(*upstream_request_);
Tcp::ConnectionPool::Cancellable* cancellable = data.value().newConnection(*upstream_request_);
if (cancellable) {
handle_ = cancellable;
ENVOY_LOG(trace, "No connection is available for now. Create a cancellable handle. Opaque: {}",
Expand Down
Loading