Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 11 additions & 11 deletions include/envoy/tcp/conn_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,7 @@ class ConnectionState {
typedef std::unique_ptr<ConnectionState> ConnectionStatePtr;

/*
* ConnectionData wraps a ClientConnection allocated to a caller. Open ClientConnections are
* released back to the pool for re-use when their containing ConnectionData is destroyed.
* ConnectionData wraps a ClientConnection allocated to a caller.
*/
class ConnectionData {
public:
Expand Down Expand Up @@ -106,15 +105,19 @@ class ConnectionData {
*/
virtual void addUpstreamCallbacks(ConnectionPool::UpstreamCallbacks& callback) PURE;

/**
* Release the connection after use. The connection should be closed first only if it is
* not viable for future use.
*/
virtual void release() PURE;

protected:
/**
* @return ConnectionState* pointer to the current ConnectionState or nullptr if not set
*/
virtual ConnectionState* connectionState() PURE;
};

typedef std::unique_ptr<ConnectionData> ConnectionDataPtr;

/**
* Pool callbacks invoked in the context of a newConnection() call, either synchronously or
* asynchronously.
Expand All @@ -133,17 +136,14 @@ class Callbacks {
Upstream::HostDescriptionConstSharedPtr host) PURE;

/**
* Called when a connection is available to process a request/response. Connections may be
* released back to the pool for re-use by resetting the ConnectionDataPtr. If the connection is
* no longer viable for reuse (e.g. due to some kind of protocol error), the underlying
* ClientConnection should be closed to prevent its reuse.
*
* Called when a connection is available to process a request/response. Recipients of connections
* must release the connection after use. They should only close the underlying ClientConnection
* if it is no longer viable for future requests.
* @param conn supplies the connection data to use.
* @param host supplies the description of the host that will carry the request. For logical
* connection pools the description may be different each time this is called.
*/
virtual void onPoolReady(ConnectionDataPtr&& conn,
Upstream::HostDescriptionConstSharedPtr host) PURE;
virtual void onPoolReady(ConnectionData& conn, Upstream::HostDescriptionConstSharedPtr host) PURE;
};

/**
Expand Down
3 changes: 1 addition & 2 deletions source/common/http/websocket/ws_handler_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,7 @@ void WsHandlerImpl::onConnectionSuccess() {
// the connection pool. The current approach is a stop gap solution, where
// we put the onus on the user to tell us if a route (and corresponding upstream)
// is supposed to allow websocket upgrades or not.
Http1::ClientConnectionImpl upstream_http(upstream_conn_data_->connection(),
http_conn_callbacks_);
Http1::ClientConnectionImpl upstream_http(*upstream_connection_, http_conn_callbacks_);
Http1::RequestStreamEncoderImpl upstream_request = Http1::RequestStreamEncoderImpl(upstream_http);
upstream_request.encodeHeaders(request_headers_, false);
ASSERT(state_ == ConnectState::PreConnect);
Expand Down
53 changes: 14 additions & 39 deletions source/common/tcp/conn_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,8 @@ void ConnPoolImpl::addDrainedCallback(DrainedCb cb) {

void ConnPoolImpl::assignConnection(ActiveConn& conn, ConnectionPool::Callbacks& callbacks) {
ASSERT(conn.wrapper_ == nullptr);
conn.wrapper_ = std::make_shared<ConnectionWrapper>(conn);

callbacks.onPoolReady(std::make_unique<ConnectionDataImpl>(conn.wrapper_),
conn.real_host_description_);
conn.wrapper_ = std::make_unique<ConnectionWrapper>(conn);
callbacks.onPoolReady(*conn.wrapper_, conn.real_host_description_);
}

void ConnPoolImpl::checkForDrained() {
Expand Down Expand Up @@ -125,8 +123,6 @@ void ConnPoolImpl::onConnectionEvent(ActiveConn& conn, Network::ConnectionEvent
host_->cluster().stats().upstream_cx_destroy_remote_with_active_rq_.inc();
}
host_->cluster().stats().upstream_cx_destroy_with_active_rq_.inc();

conn.wrapper_->release(true);
}

removed = conn.removeFromList(busy_conns_);
Expand Down Expand Up @@ -233,11 +229,7 @@ void ConnPoolImpl::onUpstreamReady() {
}

void ConnPoolImpl::processIdleConnection(ActiveConn& conn, bool delay) {
if (conn.wrapper_) {
conn.wrapper_->invalidate();
conn.wrapper_.reset();
}

conn.wrapper_.reset();
if (pending_requests_.empty() || delay) {
// There is nothing to service or delayed processing is requested, so just move the connection
// into the ready list.
Expand Down Expand Up @@ -266,29 +258,23 @@ ConnPoolImpl::ConnectionWrapper::ConnectionWrapper(ActiveConn& parent) : parent_
parent_.parent_.host_->stats().rq_active_.inc();
}

Network::ClientConnection& ConnPoolImpl::ConnectionWrapper::connection() {
ASSERT(conn_valid_);
return *parent_.conn_;
ConnPoolImpl::ConnectionWrapper::~ConnectionWrapper() {
parent_.parent_.host_->cluster().stats().upstream_rq_active_.dec();
parent_.parent_.host_->stats().rq_active_.dec();
}

Network::ClientConnection& ConnPoolImpl::ConnectionWrapper::connection() { return *parent_.conn_; }

void ConnPoolImpl::ConnectionWrapper::addUpstreamCallbacks(ConnectionPool::UpstreamCallbacks& cb) {
ASSERT(!released_);
callbacks_ = &cb;
}

void ConnPoolImpl::ConnectionWrapper::release(bool closed) {
// Allow multiple calls: connection close and destruction of ConnectionDataImplPtr will both
// result in this call.
if (!released_) {
released_ = true;
callbacks_ = nullptr;
if (!closed) {
parent_.parent_.onConnReleased(parent_);
}

parent_.parent_.host_->cluster().stats().upstream_rq_active_.dec();
parent_.parent_.host_->stats().rq_active_.dec();
}
void ConnPoolImpl::ConnectionWrapper::release() {
ASSERT(!released_);
released_ = true;
callbacks_ = nullptr;
parent_.parent_.onConnReleased(parent_);
}

ConnPoolImpl::PendingRequest::PendingRequest(ConnPoolImpl& parent,
Expand Down Expand Up @@ -345,10 +331,6 @@ ConnPoolImpl::ActiveConn::ActiveConn(ConnPoolImpl& parent)
}

ConnPoolImpl::ActiveConn::~ActiveConn() {
if (wrapper_) {
wrapper_->invalidate();
}

parent_.host_->cluster().stats().upstream_cx_active_.dec();
parent_.host_->stats().cx_active_.dec();
conn_length_->complete();
Expand Down Expand Up @@ -378,18 +360,11 @@ void ConnPoolImpl::ActiveConn::onUpstreamData(Buffer::Instance& data, bool end_s
}

void ConnPoolImpl::ActiveConn::onEvent(Network::ConnectionEvent event) {
ConnectionPool::UpstreamCallbacks* cb = nullptr;
if (wrapper_ != nullptr && wrapper_->callbacks_ != nullptr) {
cb = wrapper_->callbacks_;
wrapper_->callbacks_->onEvent(event);
}

// In the event of a close event, we want to update the pool's state before triggering callbacks,
// preventing the case where we attempt to return a closed connection to the ready pool.
parent_.onConnectionEvent(*this, event);

if (cb) {
cb->onEvent(event);
}
}

void ConnPoolImpl::ActiveConn::onAboveWriteBufferHighWatermark() {
Expand Down
43 changes: 12 additions & 31 deletions source/common/tcp/conn_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,46 +34,27 @@ class ConnPoolImpl : Logger::Loggable<Logger::Id::pool>, public ConnectionPool::
protected:
struct ActiveConn;

struct ConnectionWrapper {
struct ConnectionWrapper : public ConnectionPool::ConnectionData {
ConnectionWrapper(ActiveConn& parent);
~ConnectionWrapper();

Network::ClientConnection& connection();
void addUpstreamCallbacks(ConnectionPool::UpstreamCallbacks& callbacks);
void setConnectionState(ConnectionPool::ConnectionStatePtr&& state) {
// ConnectionPool::ConnectionData
Network::ClientConnection& connection() override;
void addUpstreamCallbacks(ConnectionPool::UpstreamCallbacks& callbacks) override;
void setConnectionState(ConnectionPool::ConnectionStatePtr&& state) override {
parent_.setConnectionState(std::move(state));
};
ConnectionPool::ConnectionState* connectionState() { return parent_.connectionState(); }

void release(bool closed);

void invalidate() { conn_valid_ = false; }
ConnectionPool::ConnectionState* connectionState() override {
return parent_.connectionState();
}
void release() override;

ActiveConn& parent_;
ConnectionPool::UpstreamCallbacks* callbacks_{};
bool released_{false};
bool conn_valid_{true};
};

typedef std::shared_ptr<ConnectionWrapper> ConnectionWrapperSharedPtr;

struct ConnectionDataImpl : public ConnectionPool::ConnectionData {
ConnectionDataImpl(ConnectionWrapperSharedPtr wrapper) : wrapper_(wrapper) {}
~ConnectionDataImpl() { wrapper_->release(false); }

// ConnectionPool::ConnectionData
Network::ClientConnection& connection() override { return wrapper_->connection(); }
void addUpstreamCallbacks(ConnectionPool::UpstreamCallbacks& callbacks) override {
wrapper_->addUpstreamCallbacks(callbacks);
};
void setConnectionState(ConnectionPool::ConnectionStatePtr&& state) override {
wrapper_->setConnectionState(std::move(state));
}
ConnectionPool::ConnectionState* connectionState() override {
return wrapper_->connectionState();
}

ConnectionWrapperSharedPtr wrapper_;
};
typedef std::unique_ptr<ConnectionWrapper> ConnectionWrapperPtr;

struct ConnReadFilter : public Network::ReadFilterBaseImpl {
ConnReadFilter(ActiveConn& parent) : parent_(parent) {}
Expand Down Expand Up @@ -108,7 +89,7 @@ class ConnPoolImpl : Logger::Loggable<Logger::Id::pool>, public ConnectionPool::

ConnPoolImpl& parent_;
Upstream::HostDescriptionConstSharedPtr real_host_description_;
ConnectionWrapperSharedPtr wrapper_;
ConnectionWrapperPtr wrapper_;
Network::ClientConnectionPtr conn_;
ConnectionPool::ConnectionStatePtr conn_state_;
Event::TimerPtr connect_timer_;
Expand Down
1 change: 0 additions & 1 deletion source/common/tcp_proxy/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ envoy_cc_library(
"//include/envoy/stats:stats_interface",
"//include/envoy/stats:stats_macros",
"//include/envoy/stats:timespan",
"//include/envoy/tcp:conn_pool_interface",
"//include/envoy/upstream:cluster_manager_interface",
"//include/envoy/upstream:upstream_interface",
"//source/common/access_log:access_log_lib",
Expand Down
Loading