Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 25 additions & 9 deletions source/common/conn_pool/conn_pool_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -95,24 +95,28 @@ float ConnPoolImplBase::perUpstreamPreconnectRatio() const {
}
}

void ConnPoolImplBase::tryCreateNewConnections() {
ConnPoolImplBase::ConnectionResult ConnPoolImplBase::tryCreateNewConnections() {
ConnPoolImplBase::ConnectionResult result;
// Somewhat arbitrarily cap the number of connections preconnected due to new
// incoming connections. The preconnect ratio is capped at 3, so in steady
// state, no more than 3 connections should be preconnected. If hosts go
// unhealthy, and connections are not immediately preconnected, it could be that
// many connections are desired when the host becomes healthy again, but
// overwhelming it with connections is not desirable.
for (int i = 0; i < 3; ++i) {
if (!tryCreateNewConnection()) {
return;
result = tryCreateNewConnection();
if (result != ConnectionResult::CreatedNewConnection) {
break;
}
}
return result;
}

bool ConnPoolImplBase::tryCreateNewConnection(float global_preconnect_ratio) {
ConnPoolImplBase::ConnectionResult
ConnPoolImplBase::tryCreateNewConnection(float global_preconnect_ratio) {
// There are already enough CONNECTING connections for the number of queued streams.
if (!shouldCreateNewConnection(global_preconnect_ratio)) {
return false;
return ConnectionResult::ShouldNotConnect;
}

const bool can_create_connection =
Expand All @@ -135,8 +139,12 @@ bool ConnPoolImplBase::tryCreateNewConnection(float global_preconnect_ratio) {
state_.incrConnectingStreamCapacity(client->effectiveConcurrentStreamLimit());
connecting_stream_capacity_ += client->effectiveConcurrentStreamLimit();
LinkedList::moveIntoList(std::move(client), owningList(client->state_));
return can_create_connection ? ConnectionResult::CreatedNewConnection
: ConnectionResult::CreatedButRateLimited;
} else {
ENVOY_LOG(trace, "not creating a new connection: connection constrained");
return ConnectionResult::NoConnectionRateLimited;
}
return can_create_connection;
}

void ConnPoolImplBase::attachStreamToClient(Envoy::ConnectionPool::ActiveClient& client,
Expand Down Expand Up @@ -218,10 +226,18 @@ ConnectionPool::Cancellable* ConnPoolImplBase::newStream(AttachContext& context)

if (host_->cluster().resourceManager(priority_).pendingRequests().canCreate()) {
ConnectionPool::Cancellable* pending = newPendingStream(context);
ENVOY_LOG(debug, "trying to create new connection");

auto old_capacity = connecting_stream_capacity_;
// This must come after newPendingStream() because this function uses the
// length of pending_streams_ to determine if a new connection is needed.
tryCreateNewConnections();

const ConnectionResult result = tryCreateNewConnections();
// If there is not enough connecting capacity, the only reason to not
// increase capacity is if the connection limits are exceeded.
ENVOY_BUG(pending_streams_.size() <= connecting_stream_capacity_ ||
connecting_stream_capacity_ > old_capacity ||
result == ConnectionResult::NoConnectionRateLimited,
fmt::format("Failed to create expected connection: {}", *this));
return pending;
} else {
ENVOY_LOG(debug, "max pending streams overflow");
Expand All @@ -233,7 +249,7 @@ ConnectionPool::Cancellable* ConnPoolImplBase::newStream(AttachContext& context)
}

bool ConnPoolImplBase::maybePreconnect(float global_preconnect_ratio) {
return tryCreateNewConnection(global_preconnect_ratio);
return tryCreateNewConnection(global_preconnect_ratio) == ConnectionResult::CreatedNewConnection;
}

void ConnPoolImplBase::scheduleOnUpstreamReady() {
Expand Down
25 changes: 23 additions & 2 deletions source/common/conn_pool/conn_pool_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include "envoy/stats/timespan.h"
#include "envoy/upstream/cluster_manager.h"

#include "common/common/dump_state_utils.h"
#include "common/common/linked_object.h"

#include "absl/strings/string_view.h"
Expand Down Expand Up @@ -192,18 +193,38 @@ class ConnPoolImplBase : protected Logger::Loggable<Logger::Id::pool> {
}
bool hasPendingStreams() const { return !pending_streams_.empty(); }

void dumpState(std::ostream& os, int indent_level = 0) const {
const char* spaces = spacesForLevel(indent_level);
os << spaces << "ConnPoolImplBase " << this << DUMP_MEMBER(ready_clients_.size())
<< DUMP_MEMBER(busy_clients_.size()) << DUMP_MEMBER(connecting_clients_.size())
<< DUMP_MEMBER(connecting_stream_capacity_) << DUMP_MEMBER(num_active_streams_);
}

friend std::ostream& operator<<(std::ostream& os, const ConnPoolImplBase& s) {
s.dumpState(os);
return os;
}

protected:
// Creates up to 3 connections, based on the preconnect ratio.
virtual void onConnected(Envoy::ConnectionPool::ActiveClient&) {}

enum class ConnectionResult {
CreatedNewConnection,
ShouldNotConnect,
NoConnectionRateLimited,
CreatedButRateLimited,
};

// Creates up to 3 connections, based on the preconnect ratio.
void tryCreateNewConnections();
// Returns the ConnectionResult of the last attempt.
ConnectionResult tryCreateNewConnections();

// Creates a new connection if there is sufficient demand, it is allowed by resourceManager, or
// to avoid starving this pool.
// Demand is determined either by perUpstreamPreconnectRatio() or global_preconnect_ratio
// if this is called by maybePreconnect()
bool tryCreateNewConnection(float global_preconnect_ratio = 0);
ConnectionResult tryCreateNewConnection(float global_preconnect_ratio = 0);

// A helper function which determines if a canceled pending connection should
// be closed as excess or not.
Expand Down
10 changes: 10 additions & 0 deletions test/common/conn_pool/conn_pool_base_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ namespace Envoy {
namespace ConnectionPool {

using testing::AnyNumber;
using testing::HasSubstr;
using testing::Invoke;
using testing::InvokeWithoutArgs;
using testing::Return;
Expand Down Expand Up @@ -90,6 +91,15 @@ class ConnPoolImplBaseTest : public testing::Test {
std::vector<ActiveClient*> clients_;
};

TEST_F(ConnPoolImplBaseTest, DumpState) {
std::stringstream out;
pool_.dumpState(out, 0);
std::string state = out.str();
EXPECT_THAT(state, HasSubstr("ready_clients_.size(): 0, busy_clients_.size(): 0, "
"connecting_clients_.size(): 0, connecting_stream_capacity_: 0, "
"num_active_streams_: 0"));
}

TEST_F(ConnPoolImplBaseTest, BasicPreconnect) {
// Create more than one connection per new stream.
ON_CALL(*cluster_, perUpstreamPreconnectRatio).WillByDefault(Return(1.5));
Expand Down