Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 11 additions & 11 deletions include/envoy/tcp/conn_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,7 @@ class UpstreamCallbacks : public Network::ConnectionCallbacks {
};

/*
* ConnectionData wraps a ClientConnection allocated to a caller. Open ClientConnections are
* released back to the pool for re-use when their containing ConnectionData is destroyed.
* ConnectionData wraps a ClientConnection allocated to a caller.
*/
class ConnectionData {
public:
Expand All @@ -77,9 +76,13 @@ class ConnectionData {
* @param callback the UpstreamCallbacks to invoke for upstream data
*/
virtual void addUpstreamCallbacks(ConnectionPool::UpstreamCallbacks& callback) PURE;
};

typedef std::unique_ptr<ConnectionData> ConnectionDataPtr;
/**
* Release the connection after use. The connection should be closed first only if it is
* not viable for future use.
*/
virtual void release() PURE;
};

/**
* Pool callbacks invoked in the context of a newConnection() call, either synchronously or
Expand All @@ -99,17 +102,14 @@ class Callbacks {
Upstream::HostDescriptionConstSharedPtr host) PURE;

/**
* Called when a connection is available to process a request/response. Connections may be
* released back to the pool for re-use by resetting the ConnectionDataPtr. If the connection is
* no longer viable for reuse (e.g. due to some kind of protocol error), the underlying
* ClientConnection should be closed to prevent its reuse.
*
* Called when a connection is available to process a request/response. Recipients of connections
* must release the connection after use. They should only close the underlying ClientConnection
* if it is no longer viable for future requests.
* @param conn supplies the connection data to use.
* @param host supplies the description of the host that will carry the request. For logical
* connection pools the description may be different each time this is called.
*/
virtual void onPoolReady(ConnectionDataPtr&& conn,
Upstream::HostDescriptionConstSharedPtr host) PURE;
virtual void onPoolReady(ConnectionData& conn, Upstream::HostDescriptionConstSharedPtr host) PURE;
};

/**
Expand Down
3 changes: 1 addition & 2 deletions source/common/http/websocket/ws_handler_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,7 @@ void WsHandlerImpl::onConnectionSuccess() {
// the connection pool. The current approach is a stop gap solution, where
// we put the onus on the user to tell us if a route (and corresponding upstream)
// is supposed to allow websocket upgrades or not.
Http1::ClientConnectionImpl upstream_http(upstream_conn_data_->connection(),
http_conn_callbacks_);
Http1::ClientConnectionImpl upstream_http(*upstream_connection_, http_conn_callbacks_);
Http1::RequestStreamEncoderImpl upstream_request = Http1::RequestStreamEncoderImpl(upstream_http);
upstream_request.encodeHeaders(request_headers_, false);
ASSERT(state_ == ConnectState::PreConnect);
Expand Down
34 changes: 12 additions & 22 deletions source/common/tcp/conn_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,8 @@ void ConnPoolImpl::addDrainedCallback(DrainedCb cb) {

void ConnPoolImpl::assignConnection(ActiveConn& conn, ConnectionPool::Callbacks& callbacks) {
ASSERT(conn.wrapper_ == nullptr);
conn.wrapper_ = std::make_shared<ConnectionWrapper>(conn);

callbacks.onPoolReady(std::make_unique<ConnectionDataImpl>(conn.wrapper_),
conn.real_host_description_);
conn.wrapper_ = std::make_unique<ConnectionWrapper>(conn);
callbacks.onPoolReady(*conn.wrapper_, conn.real_host_description_);
}

void ConnPoolImpl::checkForDrained() {
Expand Down Expand Up @@ -126,8 +124,6 @@ void ConnPoolImpl::onConnectionEvent(ActiveConn& conn, Network::ConnectionEvent
host_->cluster().stats().upstream_cx_destroy_remote_with_active_rq_.inc();
}
host_->cluster().stats().upstream_cx_destroy_with_active_rq_.inc();

conn.wrapper_->release(true);
}

removed = conn.removeFromList(busy_conns_);
Expand Down Expand Up @@ -263,29 +259,23 @@ ConnPoolImpl::ConnectionWrapper::ConnectionWrapper(ActiveConn& parent) : parent_
parent_.parent_.host_->stats().rq_active_.inc();
}

Network::ClientConnection& ConnPoolImpl::ConnectionWrapper::connection() {
ASSERT(!released_);
return *parent_.conn_;
ConnPoolImpl::ConnectionWrapper::~ConnectionWrapper() {
parent_.parent_.host_->cluster().stats().upstream_rq_active_.dec();
parent_.parent_.host_->stats().rq_active_.dec();
}

Network::ClientConnection& ConnPoolImpl::ConnectionWrapper::connection() { return *parent_.conn_; }

void ConnPoolImpl::ConnectionWrapper::addUpstreamCallbacks(ConnectionPool::UpstreamCallbacks& cb) {
ASSERT(!released_);
callbacks_ = &cb;
}

void ConnPoolImpl::ConnectionWrapper::release(bool closed) {
// Allow multiple calls: connection close and destruction of ConnectionDataImplPtr will both
// result in this call.
if (!released_) {
released_ = true;
callbacks_ = nullptr;
if (!closed) {
parent_.parent_.onConnReleased(parent_);
}

parent_.parent_.host_->cluster().stats().upstream_rq_active_.dec();
parent_.parent_.host_->stats().rq_active_.dec();
}
void ConnPoolImpl::ConnectionWrapper::release() {
ASSERT(!released_);
released_ = true;
callbacks_ = nullptr;
parent_.parent_.onConnReleased(parent_);
}

ConnPoolImpl::PendingRequest::PendingRequest(ConnPoolImpl& parent,
Expand Down
27 changes: 8 additions & 19 deletions source/common/tcp/conn_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,32 +34,21 @@ class ConnPoolImpl : Logger::Loggable<Logger::Id::pool>, public ConnectionPool::
protected:
struct ActiveConn;

struct ConnectionWrapper {
struct ConnectionWrapper : public ConnectionPool::ConnectionData {
ConnectionWrapper(ActiveConn& parent);
~ConnectionWrapper();

Network::ClientConnection& connection();
void addUpstreamCallbacks(ConnectionPool::UpstreamCallbacks& callbacks);
void release(bool closed);
// ConnectionPool::ConnectionData
Network::ClientConnection& connection() override;
void addUpstreamCallbacks(ConnectionPool::UpstreamCallbacks& callbacks) override;
void release() override;

ActiveConn& parent_;
ConnectionPool::UpstreamCallbacks* callbacks_{};
bool released_{false};
};

typedef std::shared_ptr<ConnectionWrapper> ConnectionWrapperSharedPtr;

struct ConnectionDataImpl : public ConnectionPool::ConnectionData {
ConnectionDataImpl(ConnectionWrapperSharedPtr wrapper) : wrapper_(wrapper) {}
~ConnectionDataImpl() { wrapper_->release(false); }

// ConnectionPool::ConnectionData
Network::ClientConnection& connection() override { return wrapper_->connection(); }
void addUpstreamCallbacks(ConnectionPool::UpstreamCallbacks& callbacks) override {
wrapper_->addUpstreamCallbacks(callbacks);
};

ConnectionWrapperSharedPtr wrapper_;
};
typedef std::unique_ptr<ConnectionWrapper> ConnectionWrapperPtr;

struct ConnReadFilter : public Network::ReadFilterBaseImpl {
ConnReadFilter(ActiveConn& parent) : parent_(parent) {}
Expand Down Expand Up @@ -89,7 +78,7 @@ class ConnPoolImpl : Logger::Loggable<Logger::Id::pool>, public ConnectionPool::

ConnPoolImpl& parent_;
Upstream::HostDescriptionConstSharedPtr real_host_description_;
ConnectionWrapperSharedPtr wrapper_;
ConnectionWrapperPtr wrapper_;
Network::ClientConnectionPtr conn_;
Event::TimerPtr connect_timer_;
Stats::TimespanPtr conn_length_;
Expand Down
1 change: 0 additions & 1 deletion source/common/tcp_proxy/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ envoy_cc_library(
"//include/envoy/stats:stats_interface",
"//include/envoy/stats:stats_macros",
"//include/envoy/stats:timespan",
"//include/envoy/tcp:conn_pool_interface",
"//include/envoy/upstream:cluster_manager_interface",
"//include/envoy/upstream:upstream_interface",
"//source/common/access_log:access_log_lib",
Expand Down
Loading