Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 5 additions & 29 deletions source/common/http/http1/conn_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@ ConnPoolImpl::ConnPoolImpl(Event::Dispatcher& dispatcher, Upstream::HostConstSha
Upstream::ResourcePriority priority,
const Network::ConnectionSocket::OptionsSharedPtr& options)
: ConnPoolImplBase(std::move(host), std::move(priority)), dispatcher_(dispatcher),
socket_options_(options),
upstream_ready_timer_(dispatcher_.createTimer([this]() { onUpstreamReady(); })) {}
socket_options_(options) {}

ConnPoolImpl::~ConnPoolImpl() {
while (!ready_clients_.empty()) {
Expand Down Expand Up @@ -185,7 +184,7 @@ void ConnPoolImpl::onConnectionEvent(ActiveClient& client, Network::ConnectionEv
// whether the client is in the ready list (connected) or the busy list (failed to connect).
if (event == Network::ConnectionEvent::Connected) {
conn_connect_ms_->complete();
processIdleClient(client, false);
processIdleClient(client);
}
}

Expand All @@ -207,30 +206,13 @@ void ConnPoolImpl::onResponseComplete(ActiveClient& client) {
host_->cluster().stats().upstream_cx_max_requests_.inc();
onDownstreamReset(client);
} else {
// Upstream connection might be closed right after response is complete. Setting delay=true
// here to attach pending requests in next dispatcher loop to handle that case.
// https://github.com/envoyproxy/envoy/issues/2715
processIdleClient(client, true);
processIdleClient(client);
}
}

void ConnPoolImpl::onUpstreamReady() {
upstream_ready_enabled_ = false;
while (!pending_requests_.empty() && !ready_clients_.empty()) {
ActiveClient& client = *ready_clients_.front();
ENVOY_CONN_LOG(debug, "attaching to next request", *client.codec_client_);
// There is work to do so bind a request to the client and move it to the busy list. Pending
// requests are pushed onto the front, so pull from the back.
attachRequestToClient(client, pending_requests_.back()->decoder_,
pending_requests_.back()->callbacks_);
pending_requests_.pop_back();
client.moveBetweenLists(ready_clients_, busy_clients_);
}
}

void ConnPoolImpl::processIdleClient(ActiveClient& client, bool delay) {
void ConnPoolImpl::processIdleClient(ActiveClient& client) {
client.stream_wrapper_.reset();
if (pending_requests_.empty() || delay) {
if (pending_requests_.empty()) {
// There is nothing to service or delayed processing is requested, so just move the connection
// into the ready list.
ENVOY_CONN_LOG(debug, "moving to ready", *client.codec_client_);
Expand All @@ -243,12 +225,6 @@ void ConnPoolImpl::processIdleClient(ActiveClient& client, bool delay) {
pending_requests_.back()->callbacks_);
pending_requests_.pop_back();
}

if (delay && !pending_requests_.empty() && !upstream_ready_enabled_) {
upstream_ready_enabled_ = true;
upstream_ready_timer_->enableTimer(std::chrono::milliseconds(0));
}

checkForDrained();
}

Expand Down
5 changes: 1 addition & 4 deletions source/common/http/http1/conn_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,17 +113,14 @@ class ConnPoolImpl : public ConnectionPool::Instance, public ConnPoolImplBase {
void onConnectionEvent(ActiveClient& client, Network::ConnectionEvent event);
void onDownstreamReset(ActiveClient& client);
void onResponseComplete(ActiveClient& client);
void onUpstreamReady();
void processIdleClient(ActiveClient& client, bool delay);
void processIdleClient(ActiveClient& client);

Stats::TimespanPtr conn_connect_ms_;
Event::Dispatcher& dispatcher_;
std::list<ActiveClientPtr> ready_clients_;
std::list<ActiveClientPtr> busy_clients_;
std::list<DrainedCb> drained_callbacks_;
const Network::ConnectionSocket::OptionsSharedPtr socket_options_;
Event::TimerPtr upstream_ready_timer_;
bool upstream_ready_enabled_{false};
};

/**
Expand Down
90 changes: 3 additions & 87 deletions test/common/http/http1/conn_pool_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,10 @@ namespace {
class ConnPoolImplForTest : public ConnPoolImpl {
public:
ConnPoolImplForTest(Event::MockDispatcher& dispatcher,
Upstream::ClusterInfoConstSharedPtr cluster,
NiceMock<Event::MockTimer>* upstream_ready_timer)
Upstream::ClusterInfoConstSharedPtr cluster)
: ConnPoolImpl(dispatcher, Upstream::makeTestHost(cluster, "tcp://127.0.0.1:9000"),
Upstream::ResourcePriority::Default, nullptr),
api_(Api::createApiForTest()), mock_dispatcher_(dispatcher),
mock_upstream_ready_timer_(upstream_ready_timer) {}
api_(Api::createApiForTest()), mock_dispatcher_(dispatcher) {}

~ConnPoolImplForTest() {
EXPECT_EQ(0U, ready_clients_.size());
Expand Down Expand Up @@ -102,20 +100,8 @@ class ConnPoolImplForTest : public ConnPoolImpl {
ON_CALL(*test_client.codec_, protocol()).WillByDefault(Return(protocol));
}

void expectEnableUpstreamReady() {
EXPECT_FALSE(upstream_ready_enabled_);
EXPECT_CALL(*mock_upstream_ready_timer_, enableTimer(_)).Times(1).RetiresOnSaturation();
}

void expectAndRunUpstreamReady() {
EXPECT_TRUE(upstream_ready_enabled_);
mock_upstream_ready_timer_->callback_();
EXPECT_FALSE(upstream_ready_enabled_);
}

Api::ApiPtr api_;
Event::MockDispatcher& mock_dispatcher_;
NiceMock<Event::MockTimer>* mock_upstream_ready_timer_;
std::vector<TestCodecClient> test_clients_;
};

Expand All @@ -124,17 +110,14 @@ class ConnPoolImplForTest : public ConnPoolImpl {
*/
class Http1ConnPoolImplTest : public testing::Test {
public:
Http1ConnPoolImplTest()
: upstream_ready_timer_(new NiceMock<Event::MockTimer>(&dispatcher_)),
conn_pool_(dispatcher_, cluster_, upstream_ready_timer_) {}
Http1ConnPoolImplTest() : conn_pool_(dispatcher_, cluster_) {}

~Http1ConnPoolImplTest() {
EXPECT_TRUE(TestUtility::gaugesZeroed(cluster_->stats_store_.gauges()));
}

NiceMock<Event::MockDispatcher> dispatcher_;
std::shared_ptr<Upstream::MockClusterInfo> cluster_{new NiceMock<Upstream::MockClusterInfo>()};
NiceMock<Event::MockTimer>* upstream_ready_timer_;
ConnPoolImplForTest conn_pool_;
NiceMock<Runtime::MockLoader> runtime_;
};
Expand Down Expand Up @@ -470,7 +453,6 @@ TEST_F(Http1ConnPoolImplTest, MaxConnections) {
conn_pool_.test_clients_[0].connection_->raiseEvent(Network::ConnectionEvent::Connected);

// Finishing request 1 will immediately bind to request 2.
conn_pool_.expectEnableUpstreamReady();
EXPECT_CALL(*conn_pool_.test_clients_[0].codec_, newStream(_))
.WillOnce(DoAll(SaveArgAddress(&inner_decoder), ReturnRef(request_encoder)));
EXPECT_CALL(callbacks2.pool_ready_, ready());
Expand All @@ -479,7 +461,6 @@ TEST_F(Http1ConnPoolImplTest, MaxConnections) {
Http::HeaderMapPtr response_headers(new TestHeaderMapImpl{{":status", "200"}});
inner_decoder->decodeHeaders(std::move(response_headers), true);

conn_pool_.expectAndRunUpstreamReady();
callbacks2.outer_encoder_->encodeHeaders(TestHeaderMapImpl{}, true);
// N.B. clang_tidy insists that we use std::make_unique which can not infer std::initialize_list.
response_headers = std::make_unique<TestHeaderMapImpl>(
Expand All @@ -492,69 +473,6 @@ TEST_F(Http1ConnPoolImplTest, MaxConnections) {
dispatcher_.clearDeferredDeleteList();
}

/**
* Test when upstream closes connection without 'connection: close' like
* https://github.com/envoyproxy/envoy/pull/2715
*/
TEST_F(Http1ConnPoolImplTest, ConnectionCloseWithoutHeader) {
InSequence s;

// Request 1 should kick off a new connection.
NiceMock<Http::MockStreamDecoder> outer_decoder1;
ConnPoolCallbacks callbacks;
conn_pool_.expectClientCreate();
Http::ConnectionPool::Cancellable* handle = conn_pool_.newStream(outer_decoder1, callbacks);

EXPECT_NE(nullptr, handle);

// Request 2 should not kick off a new connection.
NiceMock<Http::MockStreamDecoder> outer_decoder2;
ConnPoolCallbacks callbacks2;
handle = conn_pool_.newStream(outer_decoder2, callbacks2);
EXPECT_EQ(1U, cluster_->stats_.upstream_cx_overflow_.value());

EXPECT_NE(nullptr, handle);

// Connect event will bind to request 1.
NiceMock<Http::MockStreamEncoder> request_encoder;
Http::StreamDecoder* inner_decoder;
EXPECT_CALL(*conn_pool_.test_clients_[0].codec_, newStream(_))
.WillOnce(DoAll(SaveArgAddress(&inner_decoder), ReturnRef(request_encoder)));
EXPECT_CALL(callbacks.pool_ready_, ready());

conn_pool_.test_clients_[0].connection_->raiseEvent(Network::ConnectionEvent::Connected);

// Finishing request 1 will schedule binding the connection to request 2.
conn_pool_.expectEnableUpstreamReady();

callbacks.outer_encoder_->encodeHeaders(TestHeaderMapImpl{}, true);
Http::HeaderMapPtr response_headers(new TestHeaderMapImpl{{":status", "200"}});
inner_decoder->decodeHeaders(std::move(response_headers), true);

// Cause the connection to go away.
conn_pool_.expectClientCreate();
EXPECT_CALL(conn_pool_, onClientDestroy());
conn_pool_.test_clients_[0].connection_->raiseEvent(Network::ConnectionEvent::RemoteClose);
dispatcher_.clearDeferredDeleteList();

conn_pool_.expectAndRunUpstreamReady();

EXPECT_CALL(*conn_pool_.test_clients_[0].codec_, newStream(_))
.WillOnce(DoAll(SaveArgAddress(&inner_decoder), ReturnRef(request_encoder)));
EXPECT_CALL(callbacks2.pool_ready_, ready());
conn_pool_.test_clients_[0].connection_->raiseEvent(Network::ConnectionEvent::Connected);

callbacks2.outer_encoder_->encodeHeaders(TestHeaderMapImpl{}, true);
// N.B. clang_tidy insists that we use std::make_unique which can not infer std::initialize_list.
response_headers = std::make_unique<TestHeaderMapImpl>(
std::initializer_list<std::pair<std::string, std::string>>{{":status", "200"}});
inner_decoder->decodeHeaders(std::move(response_headers), true);

EXPECT_CALL(conn_pool_, onClientDestroy());
conn_pool_.test_clients_[0].connection_->raiseEvent(Network::ConnectionEvent::RemoteClose);
dispatcher_.clearDeferredDeleteList();
}

/**
* Test when upstream sends us 'connection: close'
*/
Expand Down Expand Up @@ -702,11 +620,9 @@ TEST_F(Http1ConnPoolImplTest, ConcurrentConnections) {
ActiveTestRequest r3(*this, 0, ActiveTestRequest::Type::Pending);

// Finish r1, which gets r3 going.
conn_pool_.expectEnableUpstreamReady();
r3.expectNewStream();

r1.completeResponse(false);
conn_pool_.expectAndRunUpstreamReady();
r3.startRequest();
EXPECT_EQ(3U, cluster_->stats_.upstream_rq_total_.value());

Expand Down