Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 9 additions & 9 deletions include/envoy/common/conn_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,29 +8,29 @@ namespace Envoy {
namespace ConnectionPool {

/**
* Controls the behavior of a canceled request.
* Controls the behavior of a canceled stream.
*/
enum class CancelPolicy {
// By default, canceled requests allow a pending connection to complete and become
// available for a future request.
// By default, canceled streams allow a pending connection to complete and become
// available for a future stream.
Default,
// When a request is canceled, closes a pending connection if there will still be sufficient
// connections to serve pending requests. CloseExcess is largely useful for callers that never
// When a stream is canceled, closes a pending connection if there will still be sufficient
// connections to serve pending streams. CloseExcess is largely useful for callers that never
// re-use connections (e.g. by closing rather than releasing connections). Using CloseExcess in
// this situation guarantees that no idle connections will be held open by the conn pool awaiting
// a connection request.
// a connection stream.
CloseExcess,
};

/**
* Handle that allows a pending connection or stream request to be canceled before it is completed.
* Handle that allows a pending connection or stream to be canceled before it is completed.
*/
class Cancellable {
public:
virtual ~Cancellable() = default;

/**
* Cancel the pending connection or stream request.
* Cancel the pending connection or stream.
* @param cancel_policy a CancelPolicy that controls the behavior of this cancellation.
*/
virtual void cancel(CancelPolicy cancel_policy) PURE;
Expand All @@ -44,7 +44,7 @@ class Instance {
virtual ~Instance() = default;

/**
* Called when a connection pool has been drained of pending requests, busy connections, and
* Called when a connection pool has been drained of pending streams, busy connections, and
* ready connections.
*/
using DrainedCb = std::function<void()>;
Expand Down
4 changes: 2 additions & 2 deletions include/envoy/router/router.h
Original file line number Diff line number Diff line change
Expand Up @@ -1154,7 +1154,7 @@ class GenericConnPool {
* when a stream is available or GenericConnectionPoolCallbacks::onPoolFailure
* if stream creation fails.
*
* The caller is responsible for calling cancelAnyPendingRequest() if stream
* The caller is responsible for calling cancelAnyPendingStream() if stream
* creation is no longer desired. newStream may only be called once per
* GenericConnPool.
*
Expand All @@ -1164,7 +1164,7 @@ class GenericConnPool {
/**
* Called to cancel any pending newStream request,
*/
virtual bool cancelAnyPendingRequest() PURE;
virtual bool cancelAnyPendingStream() PURE;
/**
* @return optionally returns the protocol for the connection pool.
*/
Expand Down
60 changes: 30 additions & 30 deletions source/common/conn_pool/conn_pool_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -94,16 +94,16 @@ bool ConnPoolImplBase::tryCreateNewConnection() {
ActiveClientPtr client = instantiateActiveClient();
ASSERT(client->state_ == ActiveClient::State::CONNECTING);
ASSERT(std::numeric_limits<uint64_t>::max() - connecting_stream_capacity_ >=
client->effectiveConcurrentRequestLimit());
client->effectiveConcurrentStreamLimit());
ASSERT(client->real_host_description_);
connecting_stream_capacity_ += client->effectiveConcurrentRequestLimit();
connecting_stream_capacity_ += client->effectiveConcurrentStreamLimit();
LinkedList::moveIntoList(std::move(client), owningList(client->state_));
}
return can_create_connection;
}

void ConnPoolImplBase::attachRequestToClient(Envoy::ConnectionPool::ActiveClient& client,
AttachContext& context) {
void ConnPoolImplBase::attachStreamToClient(Envoy::ConnectionPool::ActiveClient& client,
AttachContext& context) {
ASSERT(client.state_ == Envoy::ConnectionPool::ActiveClient::State::READY);

if (!host_->cluster().resourceManager(priority_).requests().canCreate()) {
Expand All @@ -119,7 +119,7 @@ void ConnPoolImplBase::attachRequestToClient(Envoy::ConnectionPool::ActiveClient
ENVOY_CONN_LOG(debug, "maximum streams per connection, DRAINING", client);
host_->cluster().stats().upstream_cx_max_requests_.inc();
transitionActiveClientState(client, Envoy::ConnectionPool::ActiveClient::State::DRAINING);
} else if (client.numActiveRequests() + 1 >= client.concurrent_stream_limit_) {
} else if (client.numActiveStreams() + 1 >= client.concurrent_stream_limit_) {
// As soon as the new stream is created, the client will be maxed out.
transitionActiveClientState(client, Envoy::ConnectionPool::ActiveClient::State::BUSY);
}
Expand All @@ -135,20 +135,20 @@ void ConnPoolImplBase::attachRequestToClient(Envoy::ConnectionPool::ActiveClient
}
}

void ConnPoolImplBase::onRequestClosed(Envoy::ConnectionPool::ActiveClient& client,
bool delay_attaching_stream) {
ENVOY_CONN_LOG(debug, "destroying stream: {} remaining", client, client.numActiveRequests());
void ConnPoolImplBase::onStreamClosed(Envoy::ConnectionPool::ActiveClient& client,
bool delay_attaching_stream) {
ENVOY_CONN_LOG(debug, "destroying stream: {} remaining", client, client.numActiveStreams());
ASSERT(num_active_streams_ > 0);
num_active_streams_--;
host_->stats().rq_active_.dec();
host_->cluster().stats().upstream_rq_active_.dec();
host_->cluster().resourceManager(priority_).requests().dec();
if (client.state_ == ActiveClient::State::DRAINING && client.numActiveRequests() == 0) {
if (client.state_ == ActiveClient::State::DRAINING && client.numActiveStreams() == 0) {
// Close out the draining client if we no longer have active streams.
client.close();
} else if (client.state_ == ActiveClient::State::BUSY) {
// A stream was just ended, so we should be below the limit now.
ASSERT(client.numActiveRequests() < client.concurrent_stream_limit_);
ASSERT(client.numActiveStreams() < client.concurrent_stream_limit_);

transitionActiveClientState(client, ActiveClient::State::READY);
if (!delay_attaching_stream) {
Expand All @@ -161,16 +161,16 @@ ConnectionPool::Cancellable* ConnPoolImplBase::newStream(AttachContext& context)
if (!ready_clients_.empty()) {
ActiveClient& client = *ready_clients_.front();
ENVOY_CONN_LOG(debug, "using existing connection", client);
attachRequestToClient(client, context);
attachStreamToClient(client, context);
// Even if there's a ready client, we may want to prefetch a new connection
// to handle the next incoming stream.
tryCreateNewConnections();
return nullptr;
}

if (host_->cluster().resourceManager(priority_).pendingRequests().canCreate()) {
ConnectionPool::Cancellable* pending = newPendingRequest(context);
// This must come after newPendingRequest() because this function uses the
ConnectionPool::Cancellable* pending = newPendingStream(context);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't comment on the line above this, but will the resource manager's requests also be renamed?

// This must come after newPendingStream() because this function uses the
// length of pending_streams_ to determine if a new connection is needed.
tryCreateNewConnections();

Expand All @@ -189,7 +189,7 @@ void ConnPoolImplBase::onUpstreamReady() {
ActiveClientPtr& client = ready_clients_.front();
ENVOY_CONN_LOG(debug, "attaching to next stream", *client);
// Pending streams are pushed onto the front, so pull from the back.
attachRequestToClient(*client, pending_streams_.back()->context());
attachStreamToClient(*client, pending_streams_.back()->context());
pending_streams_.pop_back();
}
}
Expand Down Expand Up @@ -236,7 +236,7 @@ void ConnPoolImplBase::closeIdleConnections() {
std::list<ActiveClient*> to_close;

for (auto& client : ready_clients_) {
if (client->numActiveRequests() == 0) {
if (client->numActiveStreams() == 0) {
to_close.push_back(client.get());
}
}
Expand Down Expand Up @@ -289,8 +289,8 @@ void ConnPoolImplBase::checkForDrained() {
void ConnPoolImplBase::onConnectionEvent(ActiveClient& client, absl::string_view failure_reason,
Network::ConnectionEvent event) {
if (client.state_ == ActiveClient::State::CONNECTING) {
ASSERT(connecting_stream_capacity_ >= client.effectiveConcurrentRequestLimit());
connecting_stream_capacity_ -= client.effectiveConcurrentRequestLimit();
ASSERT(connecting_stream_capacity_ >= client.effectiveConcurrentStreamLimit());
connecting_stream_capacity_ -= client.effectiveConcurrentStreamLimit();
}

if (event == Network::ConnectionEvent::RemoteClose ||
Expand All @@ -299,7 +299,7 @@ void ConnPoolImplBase::onConnectionEvent(ActiveClient& client, absl::string_view
ENVOY_CONN_LOG(debug, "client disconnected, failure reason: {}", client, failure_reason);

Envoy::Upstream::reportUpstreamCxDestroy(host_, event);
const bool incomplete_stream = client.closingWithIncompleteRequest();
const bool incomplete_stream = client.closingWithIncompleteStream();
if (incomplete_stream) {
Envoy::Upstream::reportUpstreamCxDestroyActiveRequest(host_, event);
}
Expand All @@ -323,7 +323,7 @@ void ConnPoolImplBase::onConnectionEvent(ActiveClient& client, absl::string_view
// do with the stream.
// NOTE: We move the existing pending streams to a temporary list. This is done so that
// if retry logic submits a new stream to the pool, we don't fail it inline.
purgePendingRequests(client.real_host_description_, failure_reason, reason);
purgePendingStreams(client.real_host_description_, failure_reason, reason);
// See if we should prefetch another connection based on active connections.
tryCreateNewConnections();
}
Expand Down Expand Up @@ -362,29 +362,29 @@ void ConnPoolImplBase::onConnectionEvent(ActiveClient& client, absl::string_view
}
}

PendingRequest::PendingRequest(ConnPoolImplBase& parent) : parent_(parent) {
PendingStream::PendingStream(ConnPoolImplBase& parent) : parent_(parent) {
parent_.host()->cluster().stats().upstream_rq_pending_total_.inc();
parent_.host()->cluster().stats().upstream_rq_pending_active_.inc();
parent_.host()->cluster().resourceManager(parent_.priority()).pendingRequests().inc();
}

PendingRequest::~PendingRequest() {
PendingStream::~PendingStream() {
parent_.host()->cluster().stats().upstream_rq_pending_active_.dec();
parent_.host()->cluster().resourceManager(parent_.priority()).pendingRequests().dec();
}

void PendingRequest::cancel(Envoy::ConnectionPool::CancelPolicy policy) {
parent_.onPendingRequestCancel(*this, policy);
void PendingStream::cancel(Envoy::ConnectionPool::CancelPolicy policy) {
parent_.onPendingStreamCancel(*this, policy);
}

void ConnPoolImplBase::purgePendingRequests(
void ConnPoolImplBase::purgePendingStreams(
const Upstream::HostDescriptionConstSharedPtr& host_description,
absl::string_view failure_reason, ConnectionPool::PoolFailureReason reason) {
// NOTE: We move the existing pending streams to a temporary list. This is done so that
// if retry logic submits a new stream to the pool, we don't fail it inline.
pending_streams_to_purge_ = std::move(pending_streams_);
while (!pending_streams_to_purge_.empty()) {
PendingRequestPtr stream =
PendingStreamPtr stream =
pending_streams_to_purge_.front()->removeFromList(pending_streams_to_purge_);
host_->cluster().stats().upstream_rq_pending_failure_eject_.inc();
onPoolFailure(host_description, failure_reason, reason, stream->context());
Expand All @@ -393,7 +393,7 @@ void ConnPoolImplBase::purgePendingRequests(

bool ConnPoolImplBase::connectingConnectionIsExcess() const {
ASSERT(connecting_stream_capacity_ >=
connecting_clients_.front()->effectiveConcurrentRequestLimit());
connecting_clients_.front()->effectiveConcurrentStreamLimit());
// If prefetchRatio is one, this simplifies to checking if there would still be sufficient
// connecting stream capacity to serve all pending streams if the most recent client were
// removed from the picture.
Expand All @@ -403,15 +403,15 @@ bool ConnPoolImplBase::connectingConnectionIsExcess() const {
// that even with the most recent client removed.
return (pending_streams_.size() + num_active_streams_) * prefetchRatio() <=
(connecting_stream_capacity_ -
connecting_clients_.front()->effectiveConcurrentRequestLimit() + num_active_streams_);
connecting_clients_.front()->effectiveConcurrentStreamLimit() + num_active_streams_);
}

void ConnPoolImplBase::onPendingRequestCancel(PendingRequest& stream,
Envoy::ConnectionPool::CancelPolicy policy) {
void ConnPoolImplBase::onPendingStreamCancel(PendingStream& stream,
Envoy::ConnectionPool::CancelPolicy policy) {
ENVOY_LOG(debug, "cancelling pending stream");
if (!pending_streams_to_purge_.empty()) {
// If pending_streams_to_purge_ is not empty, it means that we are called from
// with-in a onPoolFailure callback invoked in purgePendingRequests (i.e. purgePendingRequests
// with-in a onPoolFailure callback invoked in purgePendingStreams (i.e. purgePendingStreams
// is down in the call stack). Remove this stream from the list as it is cancelled,
// and there is no need to call its onPoolFailure callback.
stream.removeFromList(pending_streams_to_purge_);
Expand Down
40 changes: 20 additions & 20 deletions source/common/conn_pool/conn_pool_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ class ActiveClient : public LinkedObject<ActiveClient>,

// Returns the concurrent stream limit, accounting for if the total stream limit
// is less than the concurrent stream limit.
uint64_t effectiveConcurrentRequestLimit() const {
uint64_t effectiveConcurrentStreamLimit() const {
return std::min(remaining_streams_, concurrent_stream_limit_);
}

Expand All @@ -53,9 +53,9 @@ class ActiveClient : public LinkedObject<ActiveClient>,
// Returns the ID of the underlying connection.
virtual uint64_t id() const PURE;
// Returns true if this closed with an incomplete stream, for stats tracking/ purposes.
virtual bool closingWithIncompleteRequest() const PURE;
virtual bool closingWithIncompleteStream() const PURE;
// Returns the number of active streams on this connection.
virtual size_t numActiveRequests() const PURE;
virtual size_t numActiveStreams() const PURE;

enum class State {
CONNECTING, // Connection is not yet established.
Expand All @@ -78,12 +78,12 @@ class ActiveClient : public LinkedObject<ActiveClient>,
bool timed_out_{false};
};

// TODO(alyssawilk) renames for Request classes and functions -> Stream classes and functions.
// PendingRequest is the base class for a connection which has been created but not yet established.
class PendingRequest : public LinkedObject<PendingRequest>, public ConnectionPool::Cancellable {
// PendingStream is the base class tracking streams for which a connection has been created but not
// yet established.
class PendingStream : public LinkedObject<PendingStream>, public ConnectionPool::Cancellable {
public:
PendingRequest(ConnPoolImplBase& parent);
~PendingRequest() override;
PendingStream(ConnPoolImplBase& parent);
~PendingStream() override;

// ConnectionPool::Cancellable
void cancel(Envoy::ConnectionPool::CancelPolicy policy) override;
Expand All @@ -95,7 +95,7 @@ class PendingRequest : public LinkedObject<PendingRequest>, public ConnectionPoo
ConnPoolImplBase& parent_;
};

using PendingRequestPtr = std::unique_ptr<PendingRequest>;
using PendingStreamPtr = std::unique_ptr<PendingStream>;

using ActiveClientPtr = std::unique_ptr<ActiveClient>;

Expand Down Expand Up @@ -129,14 +129,14 @@ class ConnPoolImplBase : protected Logger::Loggable<Logger::Id::pool> {
// Gets a pointer to the list that currently owns this client.
std::list<ActiveClientPtr>& owningList(ActiveClient::State state);

// Removes the PendingRequest from the list of streams. Called when the PendingRequest is
// Removes the PendingStream from the list of streams. Called when the PendingStream is
// cancelled, e.g. when the stream is reset before a connection has been established.
void onPendingRequestCancel(PendingRequest& stream, Envoy::ConnectionPool::CancelPolicy policy);
void onPendingStreamCancel(PendingStream& stream, Envoy::ConnectionPool::CancelPolicy policy);

// Fails all pending streams, calling onPoolFailure on the associated callbacks.
void purgePendingRequests(const Upstream::HostDescriptionConstSharedPtr& host_description,
absl::string_view failure_reason,
ConnectionPool::PoolFailureReason pool_failure_reason);
void purgePendingStreams(const Upstream::HostDescriptionConstSharedPtr& host_description,
absl::string_view failure_reason,
ConnectionPool::PoolFailureReason pool_failure_reason);

// Closes any idle connections.
void closeIdleConnections();
Expand All @@ -150,17 +150,17 @@ class ConnPoolImplBase : protected Logger::Loggable<Logger::Id::pool> {
void onUpstreamReady();
ConnectionPool::Cancellable* newStream(AttachContext& context);

virtual ConnectionPool::Cancellable* newPendingRequest(AttachContext& context) PURE;
virtual ConnectionPool::Cancellable* newPendingStream(AttachContext& context) PURE;

void attachRequestToClient(Envoy::ConnectionPool::ActiveClient& client, AttachContext& context);
void attachStreamToClient(Envoy::ConnectionPool::ActiveClient& client, AttachContext& context);

virtual void onPoolFailure(const Upstream::HostDescriptionConstSharedPtr& host_description,
absl::string_view failure_reason,
ConnectionPool::PoolFailureReason pool_failure_reason,
AttachContext& context) PURE;
virtual void onPoolReady(ActiveClient& client, AttachContext& context) PURE;
// Called by derived classes any time a stream is completed or destroyed for any reason.
void onRequestClosed(Envoy::ConnectionPool::ActiveClient& client, bool delay_attaching_stream);
void onStreamClosed(Envoy::ConnectionPool::ActiveClient& client, bool delay_attaching_stream);

const Upstream::HostConstSharedPtr& host() const { return host_; }
Event::Dispatcher& dispatcher() { return dispatcher_; }
Expand Down Expand Up @@ -196,11 +196,11 @@ class ConnPoolImplBase : protected Logger::Loggable<Logger::Id::pool> {
const Network::TransportSocketOptionsSharedPtr transport_socket_options_;

std::list<Instance::DrainedCb> drained_callbacks_;
std::list<PendingRequestPtr> pending_streams_;
std::list<PendingStreamPtr> pending_streams_;

// When calling purgePendingRequests, this list will be used to hold the streams we are about
// When calling purgePendingStreams, this list will be used to hold the streams we are about
// to purge. We need this if one cancelled streams cancels a different pending stream
std::list<PendingRequestPtr> pending_streams_to_purge_;
std::list<PendingStreamPtr> pending_streams_to_purge_;

// Clients that are ready to handle additional streams.
// All entries are in state READY.
Expand Down
6 changes: 3 additions & 3 deletions source/common/http/conn_pool_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,12 @@ bool HttpConnPoolImplBase::hasActiveConnections() const {
}

ConnectionPool::Cancellable*
HttpConnPoolImplBase::newPendingRequest(Envoy::ConnectionPool::AttachContext& context) {
HttpConnPoolImplBase::newPendingStream(Envoy::ConnectionPool::AttachContext& context) {
Http::ResponseDecoder& decoder = *typedContext<HttpAttachContext>(context).decoder_;
Http::ConnectionPool::Callbacks& callbacks = *typedContext<HttpAttachContext>(context).callbacks_;
ENVOY_LOG(debug, "queueing stream due to no available connections");
Envoy::ConnectionPool::PendingRequestPtr pending_stream(
new HttpPendingRequest(*this, decoder, callbacks));
Envoy::ConnectionPool::PendingStreamPtr pending_stream(
new HttpPendingStream(*this, decoder, callbacks));
LinkedList::moveIntoList(std::move(pending_stream), pending_streams_);
return pending_streams_.front().get();
}
Expand Down
Loading