Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions source/common/conn_pool/conn_pool_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ void ConnPoolImplBase::onStreamClosed(Envoy::ConnectionPool::ActiveClient& clien
}
}

ConnectionPool::Cancellable* ConnPoolImplBase::newStream(AttachContext& context) {
ConnectionPool::Cancellable* ConnPoolImplBase::newStreamImpl(AttachContext& context) {
ASSERT(!deferred_deleting_);

ASSERT(static_cast<ssize_t>(connecting_stream_capacity_) ==
Expand Down Expand Up @@ -277,7 +277,7 @@ ConnectionPool::Cancellable* ConnPoolImplBase::newStream(AttachContext& context)
}
}

bool ConnPoolImplBase::maybePreconnect(float global_preconnect_ratio) {
bool ConnPoolImplBase::maybePreconnectImpl(float global_preconnect_ratio) {
ASSERT(!deferred_deleting_);
return tryCreateNewConnection(global_preconnect_ratio) == ConnectionResult::CreatedNewConnection;
}
Expand Down
14 changes: 7 additions & 7 deletions source/common/conn_pool/conn_pool_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -162,9 +162,15 @@ class ConnPoolImplBase : protected Logger::Loggable<Logger::Id::pool> {
int64_t connecting_and_connected_capacity, float preconnect_ratio,
bool anticipate_incoming_stream = false);

// Envoy::ConnectionPool::Instance implementation helpers
void addIdleCallbackImpl(Instance::IdleCb cb);
// Returns true if the pool is idle.
bool isIdleImpl() const;
void startDrainImpl();
void drainConnectionsImpl();
const Upstream::HostConstSharedPtr& host() const { return host_; }
// Called if this pool is likely to be picked soon, to determine if it's worth preconnecting.
bool maybePreconnectImpl(float global_preconnect_ratio);

// Closes and destroys all connections. This must be called in the destructor of
// derived classes because the derived ActiveClient will downcast parent_ to a more
Expand Down Expand Up @@ -196,16 +202,11 @@ class ConnPoolImplBase : protected Logger::Loggable<Logger::Id::pool> {
void onConnectionEvent(ActiveClient& client, absl::string_view failure_reason,
Network::ConnectionEvent event);

// Returns true if the pool is idle.
bool isIdleImpl() const;

// See if the pool has gone idle. If we're draining, this will also close idle connections.
void checkForIdleAndCloseIdleConnsIfDraining();

void scheduleOnUpstreamReady();
ConnectionPool::Cancellable* newStream(AttachContext& context);
// Called if this pool is likely to be picked soon, to determine if it's worth preconnecting.
bool maybePreconnect(float global_preconnect_ratio);
ConnectionPool::Cancellable* newStreamImpl(AttachContext& context);

virtual ConnectionPool::Cancellable* newPendingStream(AttachContext& context) PURE;

Expand All @@ -220,7 +221,6 @@ class ConnPoolImplBase : protected Logger::Loggable<Logger::Id::pool> {
// Called by derived classes any time a stream is completed or destroyed for any reason.
void onStreamClosed(Envoy::ConnectionPool::ActiveClient& client, bool delay_attaching_stream);

const Upstream::HostConstSharedPtr& host() const { return host_; }
Event::Dispatcher& dispatcher() { return dispatcher_; }
Upstream::ResourcePriority priority() const { return priority_; }
const Network::ConnectionSocket::OptionsSharedPtr& socketOptions() { return socket_options_; }
Expand Down
2 changes: 1 addition & 1 deletion source/common/http/conn_pool_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ ConnectionPool::Cancellable*
HttpConnPoolImplBase::newStream(Http::ResponseDecoder& response_decoder,
Http::ConnectionPool::Callbacks& callbacks) {
HttpAttachContext context({&response_decoder, &callbacks});
return Envoy::ConnectionPool::ConnPoolImplBase::newStream(context);
return newStreamImpl(context);
}

bool HttpConnPoolImplBase::hasActiveConnections() const {
Expand Down
4 changes: 1 addition & 3 deletions source/common/http/conn_pool_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,7 @@ class HttpConnPoolImplBase : public Envoy::ConnectionPool::ConnPoolImplBase,
Upstream::HostDescriptionConstSharedPtr host() const override { return host_; }
ConnectionPool::Cancellable* newStream(Http::ResponseDecoder& response_decoder,
Http::ConnectionPool::Callbacks& callbacks) override;
bool maybePreconnect(float ratio) override {
return Envoy::ConnectionPool::ConnPoolImplBase::maybePreconnect(ratio);
}
bool maybePreconnect(float ratio) override { return maybePreconnectImpl(ratio); }
bool hasActiveConnections() const override;

// Creates a new PendingStream and enqueues it into the queue.
Expand Down
4 changes: 2 additions & 2 deletions source/common/tcp/conn_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -176,10 +176,10 @@ class ConnPoolImpl : public Envoy::ConnectionPool::ConnPoolImplBase,
}
ConnectionPool::Cancellable* newConnection(Tcp::ConnectionPool::Callbacks& callbacks) override {
TcpAttachContext context(&callbacks);
return Envoy::ConnectionPool::ConnPoolImplBase::newStream(context);
return newStreamImpl(context);
}
bool maybePreconnect(float preconnect_ratio) override {
return Envoy::ConnectionPool::ConnPoolImplBase::maybePreconnect(preconnect_ratio);
return maybePreconnectImpl(preconnect_ratio);
}

ConnectionPool::Cancellable*
Expand Down
12 changes: 9 additions & 3 deletions source/extensions/clusters/dynamic_forward_proxy/cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ class Cluster : public Upstream::BaseDynamicClusterImpl,

using HostInfoMap = absl::flat_hash_map<std::string, HostInfo>;

struct LoadBalancer : public Upstream::LoadBalancer {
class LoadBalancer : public Upstream::LoadBalancer {
public:
LoadBalancer(const Cluster& cluster) : cluster_(cluster) {}

// Upstream::LoadBalancer
Expand All @@ -61,19 +62,23 @@ class Cluster : public Upstream::BaseDynamicClusterImpl,
return nullptr;
}

private:
const Cluster& cluster_;
};

struct LoadBalancerFactory : public Upstream::LoadBalancerFactory {
class LoadBalancerFactory : public Upstream::LoadBalancerFactory {
public:
LoadBalancerFactory(Cluster& cluster) : cluster_(cluster) {}

// Upstream::LoadBalancerFactory
Upstream::LoadBalancerPtr create() override { return std::make_unique<LoadBalancer>(cluster_); }

private:
Cluster& cluster_;
};

struct ThreadAwareLoadBalancer : public Upstream::ThreadAwareLoadBalancer {
class ThreadAwareLoadBalancer : public Upstream::ThreadAwareLoadBalancer {
public:
ThreadAwareLoadBalancer(Cluster& cluster) : cluster_(cluster) {}

// Upstream::ThreadAwareLoadBalancer
Expand All @@ -82,6 +87,7 @@ class Cluster : public Upstream::BaseDynamicClusterImpl,
}
void initialize() override {}

private:
Cluster& cluster_;
};

Expand Down
26 changes: 13 additions & 13 deletions test/common/conn_pool/conn_pool_base_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ TEST_F(ConnPoolImplBaseTest, BasicPreconnect) {
// On new stream, create 2 connections.
CHECK_STATE(0 /*active*/, 0 /*pending*/, 0 /*connecting capacity*/);
EXPECT_CALL(pool_, instantiateActiveClient).Times(2);
auto cancelable = pool_.newStream(context_);
auto cancelable = pool_.newStreamImpl(context_);
CHECK_STATE(0 /*active*/, 1 /*pending*/, 2 /*connecting capacity*/);

cancelable->cancel(ConnectionPool::CancelPolicy::CloseExcess);
Expand All @@ -124,13 +124,13 @@ TEST_F(ConnPoolImplBaseTest, PreconnectOnDisconnect) {

// On new stream, create 2 connections.
EXPECT_CALL(pool_, instantiateActiveClient).Times(2);
pool_.newStream(context_);
pool_.newStreamImpl(context_);
CHECK_STATE(0 /*active*/, 1 /*pending*/, 2 /*connecting capacity*/);

// If a connection fails, existing connections are purged. If a retry causes
// a new stream, make sure we create the correct number of connections.
EXPECT_CALL(pool_, onPoolFailure).WillOnce(InvokeWithoutArgs([&]() -> void {
pool_.newStream(context_);
pool_.newStreamImpl(context_);
}));
EXPECT_CALL(pool_, instantiateActiveClient);
clients_[0]->close();
Expand All @@ -149,7 +149,7 @@ TEST_F(ConnPoolImplBaseTest, NoPreconnectIfUnhealthy) {

// On new stream, create 1 connection.
EXPECT_CALL(pool_, instantiateActiveClient);
auto cancelable = pool_.newStream(context_);
auto cancelable = pool_.newStreamImpl(context_);
CHECK_STATE(0 /*active*/, 1 /*pending*/, 1 /*connecting capacity*/);

cancelable->cancel(ConnectionPool::CancelPolicy::CloseExcess);
Expand All @@ -166,7 +166,7 @@ TEST_F(ConnPoolImplBaseTest, NoPreconnectIfDegraded) {

// On new stream, create 1 connection.
EXPECT_CALL(pool_, instantiateActiveClient);
auto cancelable = pool_.newStream(context_);
auto cancelable = pool_.newStreamImpl(context_);

cancelable->cancel(ConnectionPool::CancelPolicy::CloseExcess);
pool_.destructAllConnections();
Expand All @@ -178,17 +178,17 @@ TEST_F(ConnPoolImplBaseTest, ExplicitPreconnect) {
EXPECT_CALL(pool_, instantiateActiveClient).Times(AnyNumber());

// With global preconnect off, we won't preconnect.
EXPECT_FALSE(pool_.maybePreconnect(0));
EXPECT_FALSE(pool_.maybePreconnectImpl(0));
CHECK_STATE(0 /*active*/, 0 /*pending*/, 0 /*connecting capacity*/);
// With preconnect ratio of 1.1, we'll preconnect two connections.
// Currently, no number of subsequent calls to preconnect will increase that.
EXPECT_TRUE(pool_.maybePreconnect(1.1));
EXPECT_TRUE(pool_.maybePreconnect(1.1));
EXPECT_FALSE(pool_.maybePreconnect(1.1));
EXPECT_TRUE(pool_.maybePreconnectImpl(1.1));
EXPECT_TRUE(pool_.maybePreconnectImpl(1.1));
EXPECT_FALSE(pool_.maybePreconnectImpl(1.1));
CHECK_STATE(0 /*active*/, 0 /*pending*/, 2 /*connecting capacity*/);

// With a higher preconnect ratio, more connections may be preconnected.
EXPECT_TRUE(pool_.maybePreconnect(3));
EXPECT_TRUE(pool_.maybePreconnectImpl(3));

pool_.destructAllConnections();
}
Expand All @@ -199,7 +199,7 @@ TEST_F(ConnPoolImplBaseTest, ExplicitPreconnectNotHealthy) {

// Preconnect won't occur if the host is not healthy.
host_->healthFlagSet(Upstream::Host::HealthFlag::DEGRADED_EDS_HEALTH);
EXPECT_FALSE(pool_.maybePreconnect(1));
EXPECT_FALSE(pool_.maybePreconnectImpl(1));
}

// Remote close simulates the peer closing the connection.
Expand All @@ -208,7 +208,7 @@ TEST_F(ConnPoolImplBaseTest, PoolIdleCallbackTriggeredRemoteClose) {

// Create a new stream using the pool
EXPECT_CALL(pool_, instantiateActiveClient);
pool_.newStream(context_);
pool_.newStreamImpl(context_);
ASSERT_EQ(1, clients_.size());

// Emulate the new upstream connection establishment
Expand Down Expand Up @@ -236,7 +236,7 @@ TEST_F(ConnPoolImplBaseTest, PoolIdleCallbackTriggeredLocalClose) {

// Create a new stream using the pool
EXPECT_CALL(pool_, instantiateActiveClient);
pool_.newStream(context_);
pool_.newStreamImpl(context_);
ASSERT_EQ(1, clients_.size());

// Emulate the new upstream connection establishment
Expand Down