Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 14 additions & 8 deletions include/envoy/tcp/conn_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,25 @@ class Cancellable {
};

/**
* Reason that a pool stream could not be obtained.
* Reason that a pool connection could not be obtained.
*/
enum class PoolFailureReason {
// A resource overflowed and policy prevented a new stream from being created.
// A resource overflowed and policy prevented a new connection from being created.
Overflow,
// A connection failure took place and the stream could not be bound.
ConnectionFailure
// A local connection failure took place while creating a new connection.
LocalConnectionFailure,
// A remote connection failure took place while creating a new connection.
RemoteConnectionFailure,
// A timeout occurred while creating a new connection.
Timeout,
};

/*
* UpstreamCallbacks for connection pool upstream connection callbacks.
* UpstreamCallbacks for connection pool upstream connection callbacks and data. Note that
* onEvent(Connected) is never triggered since the event always occurs before a ConnectionPool
* caller is assigned a connection.
*/
class UpstreamCallbacks {
class UpstreamCallbacks : public Network::ConnectionCallbacks {
public:
virtual ~UpstreamCallbacks() {}

Expand Down Expand Up @@ -122,14 +128,14 @@ class Instance : public Event::DeferredDeletable {
/**
* Register a callback that gets called when the connection pool is fully drained. No actual
* draining is done. The owner of the connection pool is responsible for not creating any
* new streams.
* new connections.
*/
virtual void addDrainedCallback(DrainedCb cb) PURE;

/**
* Actively drain all existing connection pool connections. This method can be used in cases
* where the connection pool is not being destroyed, but the caller wishes to make sure that
* all new streams take place on a new connection. For example, when a health check failure
* all new requests take place on a new connection. For example, when a health check failure
* occurs.
*/
virtual void drainConnections() PURE;
Expand Down
45 changes: 41 additions & 4 deletions source/common/tcp/conn_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,15 @@ void ConnPoolImpl::onConnectionEvent(ActiveConn& conn, Network::ConnectionEvent
if (event == Network::ConnectionEvent::RemoteClose ||
event == Network::ConnectionEvent::LocalClose) {
ENVOY_CONN_LOG(debug, "client disconnected", *conn.conn_);

if (event == Network::ConnectionEvent::LocalClose) {
host_->cluster().stats().upstream_cx_destroy_local_.inc();
}
if (event == Network::ConnectionEvent::RemoteClose) {
host_->cluster().stats().upstream_cx_destroy_remote_.inc();
}
host_->cluster().stats().upstream_cx_destroy_.inc();

ActiveConnPtr removed;
bool check_for_drained = true;
if (conn.wrapper_ != nullptr) {
Expand Down Expand Up @@ -135,13 +144,21 @@ void ConnPoolImpl::onConnectionEvent(ActiveConn& conn, Network::ConnectionEvent
// do with the request.
// NOTE: We move the existing pending requests to a temporary list. This is done so that
// if retry logic submits a new request to the pool, we don't fail it inline.
ConnectionPool::PoolFailureReason reason;
if (conn.timed_out_) {
reason = ConnectionPool::PoolFailureReason::Timeout;
} else if (event == Network::ConnectionEvent::RemoteClose) {
reason = ConnectionPool::PoolFailureReason::RemoteConnectionFailure;
} else {
reason = ConnectionPool::PoolFailureReason::LocalConnectionFailure;
}

std::list<PendingRequestPtr> pending_requests_to_purge(std::move(pending_requests_));
while (!pending_requests_to_purge.empty()) {
PendingRequestPtr request =
pending_requests_to_purge.front()->removeFromList(pending_requests_to_purge);
host_->cluster().stats().upstream_rq_pending_failure_eject_.inc();
request->callbacks_.onPoolFailure(ConnectionPool::PoolFailureReason::ConnectionFailure,
conn.real_host_description_);
request->callbacks_.onPoolFailure(reason, conn.real_host_description_);
}
}

Expand Down Expand Up @@ -277,7 +294,7 @@ ConnPoolImpl::PendingRequest::~PendingRequest() {
ConnPoolImpl::ActiveConn::ActiveConn(ConnPoolImpl& parent)
: parent_(parent),
connect_timer_(parent_.dispatcher_.createTimer([this]() -> void { onConnectTimeout(); })),
remaining_requests_(parent_.host_->cluster().maxRequestsPerConnection()) {
remaining_requests_(parent_.host_->cluster().maxRequestsPerConnection()), timed_out_(false) {

parent_.conn_connect_ms_.reset(
new Stats::Timespan(parent_.host_->cluster().stats().upstream_cx_connect_ms_));
Expand All @@ -297,7 +314,6 @@ ConnPoolImpl::ActiveConn::ActiveConn(ConnPoolImpl& parent)

parent_.host_->cluster().stats().upstream_cx_total_.inc();
parent_.host_->cluster().stats().upstream_cx_active_.inc();
parent_.host_->cluster().stats().upstream_cx_http1_total_.inc();
parent_.host_->stats().cx_total_.inc();
parent_.host_->stats().cx_active_.inc();
conn_length_.reset(new Stats::Timespan(parent_.host_->cluster().stats().upstream_cx_length_ms_));
Expand Down Expand Up @@ -328,6 +344,7 @@ void ConnPoolImpl::ActiveConn::onConnectTimeout() {
// We just close the connection at this point. This will result in both a timeout and a connect
// failure and will fold into all the normal connect failure logic.
ENVOY_CONN_LOG(debug, "connect timeout", *conn_);
timed_out_ = true;
parent_.host_->cluster().stats().upstream_cx_connect_timeout_.inc();
conn_->close(Network::ConnectionCloseType::NoFlush);
}
Expand All @@ -343,5 +360,25 @@ void ConnPoolImpl::ActiveConn::onUpstreamData(Buffer::Instance& data, bool end_s
}
}

void ConnPoolImpl::ActiveConn::onEvent(Network::ConnectionEvent event) {
if (wrapper_ != nullptr && wrapper_->callbacks_ != nullptr) {
wrapper_->callbacks_->onEvent(event);
}

parent_.onConnectionEvent(*this, event);
}

void ConnPoolImpl::ActiveConn::onAboveWriteBufferHighWatermark() {
if (wrapper_ != nullptr && wrapper_->callbacks_ != nullptr) {
wrapper_->callbacks_->onAboveWriteBufferHighWatermark();
}
}

void ConnPoolImpl::ActiveConn::onBelowWriteBufferLowWatermark() {
if (wrapper_ != nullptr && wrapper_->callbacks_ != nullptr) {
wrapper_->callbacks_->onBelowWriteBufferLowWatermark();
}
}

} // namespace Tcp
} // namespace Envoy
9 changes: 4 additions & 5 deletions source/common/tcp/conn_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,9 @@ class ConnPoolImpl : Logger::Loggable<Logger::Id::pool>, public ConnectionPool::
void onUpstreamData(Buffer::Instance& data, bool end_stream);

// Network::ConnectionCallbacks
void onEvent(Network::ConnectionEvent event) override {
parent_.onConnectionEvent(*this, event);
}
void onAboveWriteBufferHighWatermark() override {}
void onBelowWriteBufferLowWatermark() override {}
void onEvent(Network::ConnectionEvent event) override;
void onAboveWriteBufferHighWatermark() override;
void onBelowWriteBufferLowWatermark() override;

ConnPoolImpl& parent_;
Upstream::HostDescriptionConstSharedPtr real_host_description_;
Expand All @@ -85,6 +83,7 @@ class ConnPoolImpl : Logger::Loggable<Logger::Id::pool>, public ConnectionPool::
Event::TimerPtr connect_timer_;
Stats::TimespanPtr conn_length_;
uint64_t remaining_requests_;
bool timed_out_;
};

typedef std::unique_ptr<ActiveConn> ActiveConnPtr;
Expand Down
68 changes: 65 additions & 3 deletions test/common/tcp/conn_pool_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,17 @@ struct ConnPoolCallbacks : public Tcp::ConnectionPool::Callbacks {
pool_ready_.ready();
}

void onPoolFailure(Tcp::ConnectionPool::PoolFailureReason,
void onPoolFailure(Tcp::ConnectionPool::PoolFailureReason reason,
Upstream::HostDescriptionConstSharedPtr host) override {
reason_ = reason;
host_ = host;
pool_failure_.ready();
}

ReadyWatcher pool_failure_;
ReadyWatcher pool_ready_;
ConnectionPool::ConnectionData* conn_data_;
ConnectionPool::ConnectionData* conn_data_{};
absl::optional<ConnectionPool::PoolFailureReason> reason_;
Upstream::HostDescriptionConstSharedPtr host_;
};

Expand Down Expand Up @@ -322,6 +324,16 @@ TEST_F(TcpConnPoolImplTest, UpstreamCallbacks) {
EXPECT_EQ(Network::FilterStatus::StopIteration,
conn_pool_.test_conns_[0].filter_->onData(buffer, false));

EXPECT_CALL(callbacks, onAboveWriteBufferHighWatermark());
for (auto* cb : conn_pool_.test_conns_[0].connection_->callbacks_) {
cb->onAboveWriteBufferHighWatermark();
}

EXPECT_CALL(callbacks, onBelowWriteBufferLowWatermark());
for (auto* cb : conn_pool_.test_conns_[0].connection_->callbacks_) {
cb->onBelowWriteBufferLowWatermark();
}

// Shutdown normally.
EXPECT_CALL(conn_pool_, onConnReleasedForTest());
c1.releaseConn();
Expand All @@ -331,6 +343,23 @@ TEST_F(TcpConnPoolImplTest, UpstreamCallbacks) {
dispatcher_.clearDeferredDeleteList();
}

TEST_F(TcpConnPoolImplTest, UpstreamCallbacksCloseEvent) {
Buffer::OwnedImpl buffer;

InSequence s;
ConnectionPool::MockUpstreamCallbacks callbacks;

// Create connection, set UpstreamCallbacks
ActiveTestConn c1(*this, 0, ActiveTestConn::Type::CreateConnection);
c1.callbacks_.conn_data_->addUpstreamCallbacks(callbacks);

EXPECT_CALL(callbacks, onEvent(Network::ConnectionEvent::RemoteClose));

EXPECT_CALL(conn_pool_, onConnDestroyedForTest());
conn_pool_.test_conns_[0].connection_->raiseEvent(Network::ConnectionEvent::RemoteClose);
dispatcher_.clearDeferredDeleteList();
}

TEST_F(TcpConnPoolImplTest, NoUpstreamCallbacks) {
Buffer::OwnedImpl buffer;

Expand Down Expand Up @@ -394,14 +423,16 @@ TEST_F(TcpConnPoolImplTest, MaxPendingRequests) {
conn_pool_.test_conns_[0].connection_->raiseEvent(Network::ConnectionEvent::RemoteClose);
dispatcher_.clearDeferredDeleteList();

EXPECT_EQ(ConnectionPool::PoolFailureReason::Overflow, callbacks2.reason_);

EXPECT_EQ(1U, cluster_->stats_.upstream_rq_pending_overflow_.value());
}

/**
* Tests a connection failure before a request is bound which should result in the pending request
* getting purged.
*/
TEST_F(TcpConnPoolImplTest, ConnectFailure) {
TEST_F(TcpConnPoolImplTest, RemoteConnectFailure) {
InSequence s;

// Request 1 should kick off a new connection.
Expand All @@ -417,6 +448,34 @@ TEST_F(TcpConnPoolImplTest, ConnectFailure) {
conn_pool_.test_conns_[0].connection_->raiseEvent(Network::ConnectionEvent::RemoteClose);
dispatcher_.clearDeferredDeleteList();

EXPECT_EQ(ConnectionPool::PoolFailureReason::RemoteConnectionFailure, callbacks.reason_);

EXPECT_EQ(1U, cluster_->stats_.upstream_cx_connect_fail_.value());
EXPECT_EQ(1U, cluster_->stats_.upstream_rq_pending_failure_eject_.value());
}

/**
* Tests a connection failure before a request is bound which should result in the pending request
* getting purged.
*/
TEST_F(TcpConnPoolImplTest, LocalConnectFailure) {
InSequence s;

// Request 1 should kick off a new connection.
ConnPoolCallbacks callbacks;
conn_pool_.expectConnCreate();
Tcp::ConnectionPool::Cancellable* handle = conn_pool_.newConnection(callbacks);
EXPECT_NE(nullptr, handle);

EXPECT_CALL(callbacks.pool_failure_, ready());
EXPECT_CALL(*conn_pool_.test_conns_[0].connect_timer_, disableTimer());

EXPECT_CALL(conn_pool_, onConnDestroyedForTest());
conn_pool_.test_conns_[0].connection_->raiseEvent(Network::ConnectionEvent::LocalClose);
dispatcher_.clearDeferredDeleteList();

EXPECT_EQ(ConnectionPool::PoolFailureReason::LocalConnectionFailure, callbacks.reason_);

EXPECT_EQ(1U, cluster_->stats_.upstream_cx_connect_fail_.value());
EXPECT_EQ(1U, cluster_->stats_.upstream_rq_pending_failure_eject_.value());
}
Expand Down Expand Up @@ -446,6 +505,9 @@ TEST_F(TcpConnPoolImplTest, ConnectTimeout) {
EXPECT_CALL(conn_pool_, onConnDestroyedForTest()).Times(2);
dispatcher_.clearDeferredDeleteList();

EXPECT_EQ(ConnectionPool::PoolFailureReason::Timeout, callbacks1.reason_);
EXPECT_EQ(ConnectionPool::PoolFailureReason::Timeout, callbacks2.reason_);

EXPECT_EQ(2U, cluster_->stats_.upstream_cx_connect_fail_.value());
EXPECT_EQ(2U, cluster_->stats_.upstream_cx_connect_timeout_.value());
}
Expand Down
5 changes: 5 additions & 0 deletions test/integration/tcp_conn_pool_integration_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ class TestFilter : public Network::ReadFilter {
public:
Request(TestFilter& parent, Buffer::Instance& data) : parent_(parent) { data_.move(data); }

// Tcp::ConnectionPool::Callbacks
void onPoolFailure(Tcp::ConnectionPool::PoolFailureReason,
Upstream::HostDescriptionConstSharedPtr) override {
ASSERT(false);
Expand All @@ -59,6 +60,7 @@ class TestFilter : public Network::ReadFilter {
upstream_->connection().write(data_, false);
}

// Tcp::ConnectionPool::UpstreamCallbacks
void onUpstreamData(Buffer::Instance& data, bool end_stream) override {
UNREFERENCED_PARAMETER(end_stream);

Expand All @@ -67,6 +69,9 @@ class TestFilter : public Network::ReadFilter {

upstream_->release();
}
void onEvent(Network::ConnectionEvent) override {}
void onAboveWriteBufferHighWatermark() override {}
void onBelowWriteBufferLowWatermark() override {}

TestFilter& parent_;
Buffer::OwnedImpl data_;
Expand Down
1 change: 1 addition & 0 deletions test/mocks/tcp/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ envoy_cc_mock(
deps = [
"//include/envoy/buffer:buffer_interface",
"//include/envoy/tcp:conn_pool_interface",
"//test/mocks/network:network_mocks",
"//test/mocks/upstream:host_mocks",
],
)
37 changes: 36 additions & 1 deletion test/mocks/tcp/mocks.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@
#include "gmock/gmock.h"
#include "gtest/gtest.h"

using testing::Invoke;
using testing::ReturnRef;
using testing::_;

namespace Envoy {
namespace Tcp {
namespace ConnectionPool {
Expand All @@ -13,9 +17,40 @@ MockCancellable::~MockCancellable() {}
MockUpstreamCallbacks::MockUpstreamCallbacks() {}
MockUpstreamCallbacks::~MockUpstreamCallbacks() {}

MockInstance::MockInstance() {}
MockConnectionData::MockConnectionData() {
ON_CALL(*this, connection()).WillByDefault(ReturnRef(connection_));
}
MockConnectionData::~MockConnectionData() {}

MockInstance::MockInstance() {
ON_CALL(*this, newConnection(_)).WillByDefault(Invoke([&](Callbacks& cb) -> Cancellable* {
return newConnectionImpl(cb);
}));
}
MockInstance::~MockInstance() {}

MockCancellable* MockInstance::newConnectionImpl(Callbacks& cb) {
handles_.emplace_back();
callbacks_.push_back(&cb);
return &handles_.back();
}

void MockInstance::poolFailure(PoolFailureReason reason) {
Callbacks* cb = callbacks_.front();
callbacks_.pop_front();
handles_.pop_front();

cb->onPoolFailure(reason, host_);
}

void MockInstance::poolReady() {
Callbacks* cb = callbacks_.front();
callbacks_.pop_front();
handles_.pop_front();

cb->onPoolReady(connection_data_, host_);
}

} // namespace ConnectionPool
} // namespace Tcp
} // namespace Envoy
Loading