Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 11 additions & 11 deletions include/envoy/tcp/conn_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ class UpstreamCallbacks : public Network::ConnectionCallbacks {
};

/*
* ConnectionData wraps a ClientConnection allocated to a caller.
* ConnectionData wraps a ClientConnection allocated to a caller. Open ClientConnections are
* released back to the pool for re-use when their containing ConnectionData is destroyed.
*/
class ConnectionData {
public:
Expand All @@ -76,14 +77,10 @@ class ConnectionData {
* @param callback the UpstreamCallbacks to invoke for upstream data
*/
virtual void addUpstreamCallbacks(ConnectionPool::UpstreamCallbacks& callback) PURE;

/**
* Release the connection after use. The connection should be closed first only if it is
* not viable for future use.
*/
virtual void release() PURE;
};

typedef std::unique_ptr<ConnectionData> ConnectionDataPtr;

/**
* Pool callbacks invoked in the context of a newConnection() call, either synchronously or
* asynchronously.
Expand All @@ -102,14 +99,17 @@ class Callbacks {
Upstream::HostDescriptionConstSharedPtr host) PURE;

/**
* Called when a connection is available to process a request/response. Recipients of connections
* must release the connection after use. They should only close the underlying ClientConnection
* if it is no longer viable for future requests.
* Called when a connection is available to process a request/response. Connections may be
* released back to the pool for re-use by resetting the ConnectionDataPtr. If the connection is
* no longer viable for reuse (e.g. due to some kind of protocol error), the underlying
* ClientConnection should be closed to prevent its reuse.
*
* @param conn supplies the connection data to use.
* @param host supplies the description of the host that will carry the request. For logical
* connection pools the description may be different each time this is called.
*/
virtual void onPoolReady(ConnectionData& conn, Upstream::HostDescriptionConstSharedPtr host) PURE;
virtual void onPoolReady(ConnectionDataPtr&& conn,
Upstream::HostDescriptionConstSharedPtr host) PURE;
};

/**
Expand Down
3 changes: 2 additions & 1 deletion source/common/http/websocket/ws_handler_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,8 @@ void WsHandlerImpl::onConnectionSuccess() {
// the connection pool. The current approach is a stop gap solution, where
// we put the onus on the user to tell us if a route (and corresponding upstream)
// is supposed to allow websocket upgrades or not.
Http1::ClientConnectionImpl upstream_http(*upstream_connection_, http_conn_callbacks_);
Http1::ClientConnectionImpl upstream_http(upstream_conn_data_->connection(),
http_conn_callbacks_);
Http1::RequestStreamEncoderImpl upstream_request = Http1::RequestStreamEncoderImpl(upstream_http);
upstream_request.encodeHeaders(request_headers_, false);
ASSERT(state_ == ConnectState::PreConnect);
Expand Down
34 changes: 22 additions & 12 deletions source/common/tcp/conn_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,10 @@ void ConnPoolImpl::addDrainedCallback(DrainedCb cb) {

void ConnPoolImpl::assignConnection(ActiveConn& conn, ConnectionPool::Callbacks& callbacks) {
ASSERT(conn.wrapper_ == nullptr);
conn.wrapper_ = std::make_unique<ConnectionWrapper>(conn);
callbacks.onPoolReady(*conn.wrapper_, conn.real_host_description_);
conn.wrapper_ = std::make_shared<ConnectionWrapper>(conn);

callbacks.onPoolReady(std::make_unique<ConnectionDataImpl>(conn.wrapper_),
conn.real_host_description_);
}

void ConnPoolImpl::checkForDrained() {
Expand Down Expand Up @@ -124,6 +126,8 @@ void ConnPoolImpl::onConnectionEvent(ActiveConn& conn, Network::ConnectionEvent
host_->cluster().stats().upstream_cx_destroy_remote_with_active_rq_.inc();
}
host_->cluster().stats().upstream_cx_destroy_with_active_rq_.inc();

conn.wrapper_->release(true);
}

removed = conn.removeFromList(busy_conns_);
Expand Down Expand Up @@ -259,23 +263,29 @@ ConnPoolImpl::ConnectionWrapper::ConnectionWrapper(ActiveConn& parent) : parent_
parent_.parent_.host_->stats().rq_active_.inc();
}

ConnPoolImpl::ConnectionWrapper::~ConnectionWrapper() {
parent_.parent_.host_->cluster().stats().upstream_rq_active_.dec();
parent_.parent_.host_->stats().rq_active_.dec();
Network::ClientConnection& ConnPoolImpl::ConnectionWrapper::connection() {
ASSERT(!released_);
return *parent_.conn_;
}

Network::ClientConnection& ConnPoolImpl::ConnectionWrapper::connection() { return *parent_.conn_; }

void ConnPoolImpl::ConnectionWrapper::addUpstreamCallbacks(ConnectionPool::UpstreamCallbacks& cb) {
ASSERT(!released_);
callbacks_ = &cb;
}

void ConnPoolImpl::ConnectionWrapper::release() {
ASSERT(!released_);
released_ = true;
callbacks_ = nullptr;
parent_.parent_.onConnReleased(parent_);
void ConnPoolImpl::ConnectionWrapper::release(bool closed) {
// Allow multiple calls: connection close and destruction of ConnectionDataImplPtr will both
// result in this call.
if (!released_) {
released_ = true;
callbacks_ = nullptr;
if (!closed) {
parent_.parent_.onConnReleased(parent_);
}

parent_.parent_.host_->cluster().stats().upstream_rq_active_.dec();
parent_.parent_.host_->stats().rq_active_.dec();
}
}

ConnPoolImpl::PendingRequest::PendingRequest(ConnPoolImpl& parent,
Expand Down
27 changes: 19 additions & 8 deletions source/common/tcp/conn_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,21 +34,32 @@ class ConnPoolImpl : Logger::Loggable<Logger::Id::pool>, public ConnectionPool::
protected:
struct ActiveConn;

struct ConnectionWrapper : public ConnectionPool::ConnectionData {
struct ConnectionWrapper {
ConnectionWrapper(ActiveConn& parent);
~ConnectionWrapper();

// ConnectionPool::ConnectionData
Network::ClientConnection& connection() override;
void addUpstreamCallbacks(ConnectionPool::UpstreamCallbacks& callbacks) override;
void release() override;
Network::ClientConnection& connection();
void addUpstreamCallbacks(ConnectionPool::UpstreamCallbacks& callbacks);
void release(bool closed);

ActiveConn& parent_;
ConnectionPool::UpstreamCallbacks* callbacks_{};
bool released_{false};
};

typedef std::unique_ptr<ConnectionWrapper> ConnectionWrapperPtr;
typedef std::shared_ptr<ConnectionWrapper> ConnectionWrapperSharedPtr;

struct ConnectionDataImpl : public ConnectionPool::ConnectionData {
ConnectionDataImpl(ConnectionWrapperSharedPtr wrapper) : wrapper_(wrapper) {}
~ConnectionDataImpl() { wrapper_->release(false); }

// ConnectionPool::ConnectionData
Network::ClientConnection& connection() override { return wrapper_->connection(); }
void addUpstreamCallbacks(ConnectionPool::UpstreamCallbacks& callbacks) override {
wrapper_->addUpstreamCallbacks(callbacks);
};

ConnectionWrapperSharedPtr wrapper_;
};

struct ConnReadFilter : public Network::ReadFilterBaseImpl {
ConnReadFilter(ActiveConn& parent) : parent_(parent) {}
Expand Down Expand Up @@ -78,7 +89,7 @@ class ConnPoolImpl : Logger::Loggable<Logger::Id::pool>, public ConnectionPool::

ConnPoolImpl& parent_;
Upstream::HostDescriptionConstSharedPtr real_host_description_;
ConnectionWrapperPtr wrapper_;
ConnectionWrapperSharedPtr wrapper_;
Network::ClientConnectionPtr conn_;
Event::TimerPtr connect_timer_;
Stats::TimespanPtr conn_length_;
Expand Down
1 change: 1 addition & 0 deletions source/common/tcp_proxy/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ envoy_cc_library(
"//include/envoy/stats:stats_interface",
"//include/envoy/stats:stats_macros",
"//include/envoy/stats:timespan",
"//include/envoy/tcp:conn_pool_interface",
"//include/envoy/upstream:cluster_manager_interface",
"//include/envoy/upstream:upstream_interface",
"//source/common/access_log:access_log_lib",
Expand Down
Loading