diff --git a/source/common/conn_pool/conn_pool_base.cc b/source/common/conn_pool/conn_pool_base.cc index daec20bc40fc0..9bd90c1909833 100644 --- a/source/common/conn_pool/conn_pool_base.cc +++ b/source/common/conn_pool/conn_pool_base.cc @@ -15,7 +15,8 @@ ConnPoolImplBase::ConnPoolImplBase( const Network::TransportSocketOptionsSharedPtr& transport_socket_options, Upstream::ClusterConnectivityState& state) : state_(state), host_(host), priority_(priority), dispatcher_(dispatcher), - socket_options_(options), transport_socket_options_(transport_socket_options) {} + socket_options_(options), transport_socket_options_(transport_socket_options), + upstream_ready_cb_(dispatcher_.createSchedulableCallback([this]() { onUpstreamReady(); })) {} ConnPoolImplBase::~ConnPoolImplBase() { ASSERT(ready_clients_.empty()); @@ -214,6 +215,10 @@ bool ConnPoolImplBase::maybePrefetch(float global_prefetch_ratio) { return tryCreateNewConnection(global_prefetch_ratio); } +void ConnPoolImplBase::scheduleOnUpstreamReady() { + upstream_ready_cb_->scheduleCallbackCurrentIteration(); +} + void ConnPoolImplBase::onUpstreamReady() { while (!pending_streams_.empty() && !ready_clients_.empty()) { ActiveClientPtr& client = ready_clients_.front(); diff --git a/source/common/conn_pool/conn_pool_base.h b/source/common/conn_pool/conn_pool_base.h index 3cc2c2d089536..084681fc4cd94 100644 --- a/source/common/conn_pool/conn_pool_base.h +++ b/source/common/conn_pool/conn_pool_base.h @@ -154,7 +154,7 @@ class ConnPoolImplBase : protected Logger::Loggable { Network::ConnectionEvent event); // See if the drain process has started and/or completed. void checkForDrained(); - void onUpstreamReady(); + void scheduleOnUpstreamReady(); ConnectionPool::Cancellable* newStream(AttachContext& context); // Called if this pool is likely to be picked soon, to determine if it's worth // prefetching a connection. @@ -250,6 +250,9 @@ class ConnPoolImplBase : protected Logger::Loggable { // The number of streams that can be immediately dispatched // if all CONNECTING connections become connected. uint32_t connecting_stream_capacity_{0}; + + void onUpstreamReady(); + Event::SchedulableCallbackPtr upstream_ready_cb_; }; } // namespace ConnectionPool diff --git a/source/common/http/http1/conn_pool.cc b/source/common/http/http1/conn_pool.cc index cc69ab976c0e1..61060533cb5c7 100644 --- a/source/common/http/http1/conn_pool.cc +++ b/source/common/http/http1/conn_pool.cc @@ -76,7 +76,7 @@ void ActiveClient::StreamWrapper::onDecodeComplete() { parent_.codec_client_->close(); } else { auto* pool = &parent_.parent(); - pool->dispatcher().post([pool]() -> void { pool->onUpstreamReady(); }); + pool->scheduleOnUpstreamReady(); parent_.stream_wrapper_.reset(); pool->checkForDrained(); diff --git a/source/common/tcp/conn_pool.cc b/source/common/tcp/conn_pool.cc index 5abc86fc959c3..e905b23377115 100644 --- a/source/common/tcp/conn_pool.cc +++ b/source/common/tcp/conn_pool.cc @@ -49,7 +49,7 @@ ActiveTcpClient::~ActiveTcpClient() { void ActiveTcpClient::clearCallbacks() { if (state_ == Envoy::ConnectionPool::ActiveClient::State::BUSY && parent_.hasPendingStreams()) { auto* pool = &parent_; - pool->dispatcher().post([pool]() -> void { pool->onUpstreamReady(); }); + pool->scheduleOnUpstreamReady(); } callbacks_ = nullptr; tcp_connection_data_ = nullptr; diff --git a/test/common/conn_pool/conn_pool_base_test.cc b/test/common/conn_pool/conn_pool_base_test.cc index cc79fbea45722..1ecd11f73f2a3 100644 --- a/test/common/conn_pool/conn_pool_base_test.cc +++ b/test/common/conn_pool/conn_pool_base_test.cc @@ -52,7 +52,8 @@ class TestConnPoolImplBase : public ConnPoolImplBase { class ConnPoolImplBaseTest : public testing::Test { public: ConnPoolImplBaseTest() - : pool_(host_, Upstream::ResourcePriority::Default, dispatcher_, nullptr, nullptr, state_) { + : upstream_ready_cb_(new NiceMock(&dispatcher_)), + pool_(host_, Upstream::ResourcePriority::Default, dispatcher_, nullptr, nullptr, state_) { // Default connections to 1024 because the tests shouldn't be relying on the // connection resource limit for most tests. cluster_->resetResourceManager(1024, 1024, 1024, 1, 1); @@ -80,6 +81,7 @@ class ConnPoolImplBaseTest : public testing::Test { new NiceMock()}; std::shared_ptr cluster_{new NiceMock()}; NiceMock dispatcher_; + NiceMock* upstream_ready_cb_; Upstream::HostSharedPtr host_{ Upstream::makeTestHost(cluster_, "tcp://127.0.0.1:80", dispatcher_.timeSource())}; TestConnPoolImplBase pool_; diff --git a/test/common/http/http1/conn_pool_test.cc b/test/common/http/http1/conn_pool_test.cc index b86e9c426a316..9b6937abe3b4d 100644 --- a/test/common/http/http1/conn_pool_test.cc +++ b/test/common/http/http1/conn_pool_test.cc @@ -52,7 +52,8 @@ class ConnPoolImplForTest : public Event::TestUsingSimulatedTime, public FixedHt public: ConnPoolImplForTest(Event::MockDispatcher& dispatcher, Upstream::ClusterInfoConstSharedPtr cluster, - Random::RandomGenerator& random_generator) + Random::RandomGenerator& random_generator, + Event::MockSchedulableCallback* upstream_ready_cb) : FixedHttpConnPoolImpl( Upstream::makeTestHost(cluster, "tcp://127.0.0.1:9000", dispatcher.timeSource()), Upstream::ResourcePriority::Default, dispatcher, nullptr, nullptr, random_generator, @@ -62,7 +63,8 @@ class ConnPoolImplForTest : public Event::TestUsingSimulatedTime, public FixedHt return nullptr; // Not used: createCodecClient overloaded. }, std::vector{Protocol::Http11}), - api_(Api::createApiForTest()), mock_dispatcher_(dispatcher) {} + api_(Api::createApiForTest()), mock_dispatcher_(dispatcher), + mock_upstream_ready_cb_(upstream_ready_cb) {} ~ConnPoolImplForTest() override { EXPECT_EQ(0U, ready_clients_.size()); @@ -118,15 +120,17 @@ class ConnPoolImplForTest : public Event::TestUsingSimulatedTime, public FixedHt } void expectEnableUpstreamReady() { - EXPECT_CALL(mock_dispatcher_, post(_)).WillOnce(SaveArg<0>(&post_cb_)); + EXPECT_CALL(*mock_upstream_ready_cb_, scheduleCallbackCurrentIteration()) + .Times(1) + .RetiresOnSaturation(); } - void expectAndRunUpstreamReady() { post_cb_(); } + void expectAndRunUpstreamReady() { mock_upstream_ready_cb_->invokeCallback(); } Upstream::ClusterConnectivityState state_; Api::ApiPtr api_; Event::MockDispatcher& mock_dispatcher_; - Event::PostCb post_cb_; + Event::MockSchedulableCallback* mock_upstream_ready_cb_; std::vector test_clients_; }; @@ -136,7 +140,9 @@ class ConnPoolImplForTest : public Event::TestUsingSimulatedTime, public FixedHt class Http1ConnPoolImplTest : public testing::Test { public: Http1ConnPoolImplTest() - : conn_pool_(std::make_unique(dispatcher_, cluster_, random_)) {} + : upstream_ready_cb_(new Event::MockSchedulableCallback(&dispatcher_)), + conn_pool_(std::make_unique(dispatcher_, cluster_, random_, + upstream_ready_cb_)) {} ~Http1ConnPoolImplTest() override { EXPECT_EQ("", TestUtility::nonZeroedGauges(cluster_->stats_store_.gauges())); @@ -145,6 +151,7 @@ class Http1ConnPoolImplTest : public testing::Test { NiceMock random_; NiceMock dispatcher_; std::shared_ptr cluster_{new NiceMock()}; + Event::MockSchedulableCallback* upstream_ready_cb_; std::unique_ptr conn_pool_; NiceMock runtime_; }; @@ -243,14 +250,17 @@ TEST_F(Http1ConnPoolImplTest, DrainConnections) { ActiveTestRequest r2(*this, 1, ActiveTestRequest::Type::CreateConnection); r2.startRequest(); + conn_pool_->expectEnableUpstreamReady(); r1.completeResponse(false); // This will destroy the ready client and set requests remaining to 1 on the busy client. conn_pool_->drainConnections(); EXPECT_CALL(*conn_pool_, onClientDestroy()); dispatcher_.clearDeferredDeleteList(); + conn_pool_->expectAndRunUpstreamReady(); // This will destroy the busy client when the response finishes. + conn_pool_->expectEnableUpstreamReady(); r2.completeResponse(false); EXPECT_CALL(*conn_pool_, onClientDestroy()); dispatcher_.clearDeferredDeleteList(); @@ -267,9 +277,11 @@ TEST_F(Http1ConnPoolImplTest, VerifyTimingStats) { ActiveTestRequest r1(*this, 0, ActiveTestRequest::Type::CreateConnection); r1.startRequest(); + conn_pool_->expectEnableUpstreamReady(); r1.completeResponse(false); EXPECT_CALL(*conn_pool_, onClientDestroy()); + conn_pool_->expectAndRunUpstreamReady(); conn_pool_->test_clients_[0].connection_->raiseEvent(Network::ConnectionEvent::RemoteClose); dispatcher_.clearDeferredDeleteList(); } @@ -293,7 +305,10 @@ TEST_F(Http1ConnPoolImplTest, VerifyAlpnFallback) { // Recreate the conn pool so that the host re-evaluates the transport socket match, arriving at // our test transport socket factory. - conn_pool_ = std::make_unique(dispatcher_, cluster_, random_); + // Recreate this to refresh expectation that the callback is scheduled and saved. + new Event::MockSchedulableCallback(&dispatcher_); + conn_pool_ = + std::make_unique(dispatcher_, cluster_, random_, upstream_ready_cb_); NiceMock outer_decoder; ConnPoolCallbacks callbacks; conn_pool_->expectClientCreate(Protocol::Http11); @@ -366,15 +381,18 @@ TEST_F(Http1ConnPoolImplTest, MultipleRequestAndResponse) { // Request 1 should kick off a new connection. ActiveTestRequest r1(*this, 0, ActiveTestRequest::Type::CreateConnection); r1.startRequest(); + conn_pool_->expectEnableUpstreamReady(); r1.completeResponse(false); // Request 2 should not. ActiveTestRequest r2(*this, 0, ActiveTestRequest::Type::Immediate); r2.startRequest(); + conn_pool_->expectEnableUpstreamReady(); r2.completeResponse(true); // Cause the connection to go away. EXPECT_CALL(*conn_pool_, onClientDestroy()); + conn_pool_->expectAndRunUpstreamReady(); conn_pool_->test_clients_[0].connection_->raiseEvent(Network::ConnectionEvent::RemoteClose); dispatcher_.clearDeferredDeleteList(); } @@ -942,6 +960,7 @@ TEST_F(Http1ConnPoolImplTest, MaxRequestsPerConnection) { EXPECT_CALL(callbacks.pool_ready_, ready()); conn_pool_->test_clients_[0].connection_->raiseEvent(Network::ConnectionEvent::Connected); + conn_pool_->expectEnableUpstreamReady(); EXPECT_TRUE( callbacks.outer_encoder_ ->encodeHeaders(TestRequestHeaderMapImpl{{":path", "/"}, {":method", "GET"}}, true) @@ -1005,8 +1024,10 @@ TEST_F(Http1ConnPoolImplTest, DrainCallback) { r2.handle_->cancel(Envoy::ConnectionPool::CancelPolicy::Default); EXPECT_EQ(1U, cluster_->stats_.upstream_rq_total_.value()); + conn_pool_->expectEnableUpstreamReady(); EXPECT_CALL(drained, ready()); r1.startRequest(); + r1.completeResponse(false); EXPECT_CALL(*conn_pool_, onClientDestroy()); @@ -1090,15 +1111,18 @@ TEST_F(Http1ConnPoolImplTest, ActiveRequestHasActiveConnectionsTrue) { EXPECT_TRUE(conn_pool_->hasActiveConnections()); // cleanup + conn_pool_->expectEnableUpstreamReady(); r1.completeResponse(false); conn_pool_->drainConnections(); EXPECT_CALL(*conn_pool_, onClientDestroy()); dispatcher_.clearDeferredDeleteList(); + conn_pool_->expectAndRunUpstreamReady(); } TEST_F(Http1ConnPoolImplTest, ResponseCompletedConnectionReadyNoActiveConnections) { ActiveTestRequest r1(*this, 0, ActiveTestRequest::Type::CreateConnection); r1.startRequest(); + conn_pool_->expectEnableUpstreamReady(); r1.completeResponse(false); EXPECT_FALSE(conn_pool_->hasActiveConnections()); @@ -1106,6 +1130,7 @@ TEST_F(Http1ConnPoolImplTest, ResponseCompletedConnectionReadyNoActiveConnection conn_pool_->drainConnections(); EXPECT_CALL(*conn_pool_, onClientDestroy()); dispatcher_.clearDeferredDeleteList(); + conn_pool_->expectAndRunUpstreamReady(); } TEST_F(Http1ConnPoolImplTest, PendingRequestIsConsideredActive) { @@ -1125,6 +1150,87 @@ TEST_F(Http1ConnPoolImplTest, PendingRequestIsConsideredActive) { EXPECT_EQ(1U, cluster_->stats_.upstream_cx_destroy_local_.value()); } +// Schedulable callback that can track it's destruction. +class MockDestructSchedulableCallback : public Event::MockSchedulableCallback { +public: + MockDestructSchedulableCallback(Event::MockDispatcher* dispatcher) + : Event::MockSchedulableCallback(dispatcher) {} + MOCK_METHOD0(Die, void()); + + ~MockDestructSchedulableCallback() override { Die(); } +}; + +class Http1ConnPoolDestructImplTest : public testing::Test { +public: + Http1ConnPoolDestructImplTest() + : upstream_ready_cb_(new MockDestructSchedulableCallback(&dispatcher_)), + conn_pool_(std::make_unique(dispatcher_, cluster_, random_, + upstream_ready_cb_)) {} + + ~Http1ConnPoolDestructImplTest() override { + EXPECT_EQ("", TestUtility::nonZeroedGauges(cluster_->stats_store_.gauges())); + } + + NiceMock random_; + NiceMock dispatcher_; + std::shared_ptr cluster_{new NiceMock()}; + MockDestructSchedulableCallback* upstream_ready_cb_; + std::unique_ptr conn_pool_; + NiceMock runtime_; +}; + +// Regression test for use after free when dispatcher executes onUpstreamReady after connection pool +// is destroyed. +TEST_F(Http1ConnPoolDestructImplTest, CbAfterConnPoolDestroyed) { + InSequence s; + + NiceMock outer_decoder; + ConnPoolCallbacks callbacks; + conn_pool_->expectClientCreate(); + Http::ConnectionPool::Cancellable* handle = conn_pool_->newStream(outer_decoder, callbacks); + EXPECT_NE(nullptr, handle); + + NiceMock request_encoder; + ResponseDecoder* inner_decoder; + EXPECT_CALL(*conn_pool_->test_clients_[0].codec_, newStream(_)) + .WillOnce(DoAll(SaveArgAddress(&inner_decoder), ReturnRef(request_encoder))); + EXPECT_CALL(callbacks.pool_ready_, ready()); + EXPECT_CALL(*conn_pool_->test_clients_[0].connect_timer_, disableTimer()); + conn_pool_->test_clients_[0].connection_->raiseEvent(Network::ConnectionEvent::Connected); + + EXPECT_TRUE( + callbacks.outer_encoder_ + ->encodeHeaders(TestRequestHeaderMapImpl{{":path", "/"}, {":method", "GET"}}, true) + .ok()); + + conn_pool_->expectEnableUpstreamReady(); + // Schedules the onUpstreamReady callback. + inner_decoder->decodeHeaders( + ResponseHeaderMapPtr{new TestResponseHeaderMapImpl{{":status", "200"}}}, true); + + // Delete the connection pool. + EXPECT_CALL(*conn_pool_, onClientDestroy()); + conn_pool_->destructAllConnections(); + + // Delete connection pool and check that scheduled callback onUpstreamReady was destroyed. + dispatcher_.deferredDelete(std::move(conn_pool_)); + EXPECT_CALL(*upstream_ready_cb_, Die()); + + // When the dispatcher removes the connection pool, another call to clearDeferredDeleteList() + // occurs in ~HttpConnPoolImplBase. Avoid recursion. + bool deferring_delete = false; + ON_CALL(dispatcher_, clearDeferredDeleteList()) + .WillByDefault(Invoke([this, &deferring_delete]() -> void { + if (deferring_delete) { + return; + } + deferring_delete = true; + dispatcher_.to_delete_.clear(); + deferring_delete = false; + })); + dispatcher_.clearDeferredDeleteList(); +} + } // namespace } // namespace Http1 } // namespace Http diff --git a/test/common/http/http2/conn_pool_test.cc b/test/common/http/http2/conn_pool_test.cc index d15076d5785cd..4787a4d188c26 100644 --- a/test/common/http/http2/conn_pool_test.cc +++ b/test/common/http/http2/conn_pool_test.cc @@ -74,6 +74,7 @@ class Http2ConnPoolImplTest : public Event::TestUsingSimulatedTime, public testi Http2ConnPoolImplTest() : api_(Api::createApiForTest(stats_store_)), + upstream_ready_cb_(new NiceMock(&dispatcher_)), pool_(std::make_unique(dispatcher_, random_, host_, Upstream::ResourcePriority::Default, nullptr, nullptr, state_)) { @@ -212,6 +213,7 @@ class Http2ConnPoolImplTest : public Event::TestUsingSimulatedTime, public testi NiceMock dispatcher_; std::shared_ptr cluster_{new NiceMock()}; Upstream::HostSharedPtr host_{Upstream::makeTestHost(cluster_, "tcp://127.0.0.1:80", simTime())}; + NiceMock* upstream_ready_cb_; std::unique_ptr pool_; std::vector test_clients_; NiceMock runtime_; @@ -337,6 +339,7 @@ TEST_F(Http2ConnPoolImplTest, VerifyAlpnFallback) { // Recreate the conn pool so that the host re-evaluates the transport socket match, arriving at // our test transport socket factory. host_ = Upstream::makeTestHost(cluster_, "tcp://127.0.0.1:80", simTime()); + new NiceMock(&dispatcher_); pool_ = std::make_unique( dispatcher_, random_, host_, Upstream::ResourcePriority::Default, nullptr, nullptr, state_); diff --git a/test/common/http/mixed_conn_pool_test.cc b/test/common/http/mixed_conn_pool_test.cc index b826d2e29851e..9e0e00a885502 100644 --- a/test/common/http/mixed_conn_pool_test.cc +++ b/test/common/http/mixed_conn_pool_test.cc @@ -39,7 +39,8 @@ class ConnPoolImplForTest : public Event::TestUsingSimulatedTime, public HttpCon class MixedConnPoolImplTest : public testing::Test { public: MixedConnPoolImplTest() - : conn_pool_(std::make_unique(dispatcher_, state_, random_, cluster_)) {} + : upstream_ready_cb_(new NiceMock(&dispatcher_)), + conn_pool_(std::make_unique(dispatcher_, state_, random_, cluster_)) {} ~MixedConnPoolImplTest() override { EXPECT_EQ("", TestUtility::nonZeroedGauges(cluster_->stats_store_.gauges())); @@ -48,6 +49,7 @@ class MixedConnPoolImplTest : public testing::Test { Upstream::ClusterConnectivityState state_; NiceMock dispatcher_; std::shared_ptr cluster_{new NiceMock()}; + NiceMock* upstream_ready_cb_; std::unique_ptr conn_pool_; NiceMock runtime_; NiceMock random_; diff --git a/test/common/tcp/conn_pool_test.cc b/test/common/tcp/conn_pool_test.cc index 08e7f9fae2a60..5fb2510e304a1 100644 --- a/test/common/tcp/conn_pool_test.cc +++ b/test/common/tcp/conn_pool_test.cc @@ -230,11 +230,12 @@ void ConnPoolBase::expectEnableUpstreamReady(bool run) { if (!test_new_connection_pool_) { dynamic_cast(conn_pool_.get())->expectEnableUpstreamReady(run); } else { - Event::PostCb& post_cb = dynamic_cast(conn_pool_.get())->post_cb_; if (run) { - post_cb(); + mock_upstream_ready_cb_->invokeCallback(); } else { - EXPECT_CALL(mock_dispatcher_, post(_)).WillOnce(testing::SaveArg<0>(&post_cb)); + EXPECT_CALL(*mock_upstream_ready_cb_, scheduleCallbackCurrentIteration()) + .Times(1) + .RetiresOnSaturation(); } } } @@ -247,9 +248,7 @@ class TcpConnPoolImplTest : public Event::TestUsingSimulatedTime, public: TcpConnPoolImplTest() : test_new_connection_pool_(GetParam()), - upstream_ready_cb_(test_new_connection_pool_ - ? nullptr - : new NiceMock(&dispatcher_)), + upstream_ready_cb_(new NiceMock(&dispatcher_)), host_(Upstream::makeTestHost(cluster_, "tcp://127.0.0.1:9000", simTime())), conn_pool_(dispatcher_, host_, upstream_ready_cb_, test_new_connection_pool_) {} @@ -275,9 +274,7 @@ class TcpConnPoolImplDestructorTest : public Event::TestUsingSimulatedTime, public: TcpConnPoolImplDestructorTest() : test_new_connection_pool_(GetParam()), - upstream_ready_cb_(test_new_connection_pool_ - ? nullptr - : new NiceMock(&dispatcher_)) { + upstream_ready_cb_(new NiceMock(&dispatcher_)) { host_ = Upstream::makeTestHost(cluster_, "tcp://127.0.0.1:9000", simTime()); if (test_new_connection_pool_) { conn_pool_ = std::make_unique(