Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions include/envoy/tcp/conn_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,22 @@ class UpstreamCallbacks : public Network::ConnectionCallbacks {
virtual void onUpstreamData(Buffer::Instance& data, bool end_stream) PURE;
};

/**
* ConnectionState is a base class for connection state maintained across requests. For example, a
* protocol may maintain a connection-specific request sequence number or negotiate options that
* affect the behavior of requests for the duration of the connection. A ConnectionState subclass
* is assigned to the ConnectionData to track this state when the connection is returned to the
* pool so that the state is available when the connection is re-used for a subsequent request.
* The ConnectionState assigned to a connection is automatically destroyed when the connection is
* closed.
*/
class ConnectionState {
public:
virtual ~ConnectionState() {}
};

typedef std::unique_ptr<ConnectionState> ConnectionStatePtr;

/*
* ConnectionData wraps a ClientConnection allocated to a caller. Open ClientConnections are
* released back to the pool for re-use when their containing ConnectionData is destroyed.
Expand All @@ -70,13 +86,31 @@ class ConnectionData {
*/
virtual Network::ClientConnection& connection() PURE;

/**
* Sets the ConnectionState for this connection. Any existing ConnectionState is destroyed.
* @param ConnectionStatePtr&& new ConnectionState for this connection.
*/
virtual void setConnectionState(ConnectionStatePtr&& state) PURE;

/**
* @return T* the current ConnectionState or nullptr if no state is set or if the state's type
* is not T.
*/
template <class T> T* connectionStateTyped() { return dynamic_cast<T*>(connectionState()); }

/**
* Sets the ConnectionPool::UpstreamCallbacks for the connection. If no callback is attached,
* data from the upstream will cause the connection to be closed. Callbacks cease when the
* connection is released.
* @param callback the UpstreamCallbacks to invoke for upstream data
*/
virtual void addUpstreamCallbacks(ConnectionPool::UpstreamCallbacks& callback) PURE;

protected:
/**
* @return ConnectionState* pointer to the current ConnectionState or nullptr if not set
*/
virtual ConnectionState* connectionState() PURE;
};

typedef std::unique_ptr<ConnectionData> ConnectionDataPtr;
Expand Down
17 changes: 17 additions & 0 deletions source/common/tcp/conn_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ class ConnPoolImpl : Logger::Loggable<Logger::Id::pool>, public ConnectionPool::

Network::ClientConnection& connection();
void addUpstreamCallbacks(ConnectionPool::UpstreamCallbacks& callbacks);
void setConnectionState(ConnectionPool::ConnectionStatePtr&& state) {
parent_.setConnectionState(std::move(state));
};
ConnectionPool::ConnectionState* connectionState() { return parent_.connectionState(); }

void release(bool closed);

void invalidate() { conn_valid_ = false; }
Expand All @@ -60,6 +65,12 @@ class ConnPoolImpl : Logger::Loggable<Logger::Id::pool>, public ConnectionPool::
void addUpstreamCallbacks(ConnectionPool::UpstreamCallbacks& callbacks) override {
wrapper_->addUpstreamCallbacks(callbacks);
};
void setConnectionState(ConnectionPool::ConnectionStatePtr&& state) override {
wrapper_->setConnectionState(std::move(state));
}
ConnectionPool::ConnectionState* connectionState() override {
return wrapper_->connectionState();
}

ConnectionWrapperSharedPtr wrapper_;
};
Expand Down Expand Up @@ -90,10 +101,16 @@ class ConnPoolImpl : Logger::Loggable<Logger::Id::pool>, public ConnectionPool::
void onAboveWriteBufferHighWatermark() override;
void onBelowWriteBufferLowWatermark() override;

void setConnectionState(ConnectionPool::ConnectionStatePtr&& state) {
conn_state_ = std::move(state);
}
ConnectionPool::ConnectionState* connectionState() { return conn_state_.get(); }

ConnPoolImpl& parent_;
Upstream::HostDescriptionConstSharedPtr real_host_description_;
ConnectionWrapperSharedPtr wrapper_;
Network::ClientConnectionPtr conn_;
ConnectionPool::ConnectionStatePtr conn_state_;
Event::TimerPtr connect_timer_;
Stats::TimespanPtr conn_length_;
uint64_t remaining_requests_;
Expand Down
134 changes: 133 additions & 1 deletion test/common/tcp/conn_pool_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,18 @@ using testing::_;

namespace Envoy {
namespace Tcp {
namespace {

struct TestConnectionState : public ConnectionPool::ConnectionState {
TestConnectionState(int id, std::function<void()> on_destructor)
: id_(id), on_destructor_(on_destructor) {}
~TestConnectionState() { on_destructor_(); }

int id_;
std::function<void()> on_destructor_;
};

} // namespace

/**
* Mock callbacks used for conn pool testing.
Expand Down Expand Up @@ -309,6 +321,9 @@ TEST_F(TcpConnPoolImplTest, VerifyBufferLimits) {
dispatcher_.clearDeferredDeleteList();
}

/**
* Test that upstream callback fire for assigned connections.
*/
TEST_F(TcpConnPoolImplTest, UpstreamCallbacks) {
Buffer::OwnedImpl buffer;

Expand Down Expand Up @@ -343,6 +358,9 @@ TEST_F(TcpConnPoolImplTest, UpstreamCallbacks) {
dispatcher_.clearDeferredDeleteList();
}

/**
* Test that upstream callback close event fires for assigned connections.
*/
TEST_F(TcpConnPoolImplTest, UpstreamCallbacksCloseEvent) {
Buffer::OwnedImpl buffer;

Expand All @@ -360,6 +378,9 @@ TEST_F(TcpConnPoolImplTest, UpstreamCallbacksCloseEvent) {
dispatcher_.clearDeferredDeleteList();
}

/**
* Test that a connection pool functions without upstream callbacks.
*/
TEST_F(TcpConnPoolImplTest, NoUpstreamCallbacks) {
Buffer::OwnedImpl buffer;

Expand Down Expand Up @@ -400,6 +421,45 @@ TEST_F(TcpConnPoolImplTest, MultipleRequestAndResponse) {
dispatcher_.clearDeferredDeleteList();
}

/**
* Tests ConnectionState assignment, lookup and destruction.
*/
TEST_F(TcpConnPoolImplTest, ConnectionStateLifecycle) {
InSequence s;

bool state_destroyed = false;

// Request 1 should kick off a new connection.
ActiveTestConn c1(*this, 0, ActiveTestConn::Type::CreateConnection);

auto* state = new TestConnectionState(1, [&]() -> void { state_destroyed = true; });
c1.callbacks_.conn_data_->setConnectionState(std::unique_ptr<TestConnectionState>(state));

EXPECT_EQ(state, c1.callbacks_.conn_data_->connectionStateTyped<TestConnectionState>());

EXPECT_CALL(conn_pool_, onConnReleasedForTest());
c1.releaseConn();

EXPECT_FALSE(state_destroyed);

// Request 2 should not.
ActiveTestConn c2(*this, 0, ActiveTestConn::Type::Immediate);

EXPECT_EQ(state, c2.callbacks_.conn_data_->connectionStateTyped<TestConnectionState>());

EXPECT_CALL(conn_pool_, onConnReleasedForTest());
c2.releaseConn();

EXPECT_FALSE(state_destroyed);

// Cause the connection to go away.
EXPECT_CALL(conn_pool_, onConnDestroyedForTest());
conn_pool_.test_conns_[0].connection_->raiseEvent(Network::ConnectionEvent::RemoteClose);
dispatcher_.clearDeferredDeleteList();

EXPECT_TRUE(state_destroyed);
}

/**
* Test when we overflow max pending requests.
*/
Expand Down Expand Up @@ -555,6 +615,9 @@ TEST_F(TcpConnPoolImplTest, DisconnectWhileBound) {
dispatcher_.clearDeferredDeleteList();
}

/**
* Test upstream disconnection of one request while another is pending.
*/
TEST_F(TcpConnPoolImplTest, DisconnectWhilePending) {
InSequence s;

Expand Down Expand Up @@ -664,6 +727,9 @@ TEST_F(TcpConnPoolImplTest, MaxRequestsPerConnection) {
EXPECT_EQ(1U, cluster_->stats_.upstream_cx_max_requests_.value());
}

/*
* Test that multiple connections can be assigned at once.
*/
TEST_F(TcpConnPoolImplTest, ConcurrentConnections) {
InSequence s;

Expand Down Expand Up @@ -691,6 +757,61 @@ TEST_F(TcpConnPoolImplTest, ConcurrentConnections) {
dispatcher_.clearDeferredDeleteList();
}

/**
* Tests ConnectionState lifecycle with multiple concurrent connections.
*/
TEST_F(TcpConnPoolImplTest, ConnectionStateWithConcurrentConnections) {
InSequence s;

int state_destroyed = 0;
auto* s1 = new TestConnectionState(1, [&]() -> void { state_destroyed |= 1; });
auto* s2 = new TestConnectionState(2, [&]() -> void { state_destroyed |= 2; });
auto* s3 = new TestConnectionState(2, [&]() -> void { state_destroyed |= 4; });

cluster_->resource_manager_.reset(
new Upstream::ResourceManagerImpl(runtime_, "fake_key", 2, 1024, 1024, 1));
ActiveTestConn c1(*this, 0, ActiveTestConn::Type::CreateConnection);
c1.callbacks_.conn_data_->setConnectionState(std::unique_ptr<TestConnectionState>(s1));
ActiveTestConn c2(*this, 1, ActiveTestConn::Type::CreateConnection);
c2.callbacks_.conn_data_->setConnectionState(std::unique_ptr<TestConnectionState>(s2));
ActiveTestConn c3(*this, 0, ActiveTestConn::Type::Pending);

EXPECT_EQ(0, state_destroyed);

// Finish c1, which gets c3 going.
EXPECT_CALL(conn_pool_, onConnReleasedForTest());
conn_pool_.expectEnableUpstreamReady();
c3.expectNewConn();
c1.releaseConn();

conn_pool_.expectAndRunUpstreamReady();

// c3 now has the state set by c1.
EXPECT_EQ(s1, c3.callbacks_.conn_data_->connectionStateTyped<TestConnectionState>());
EXPECT_EQ(s2, c2.callbacks_.conn_data_->connectionStateTyped<TestConnectionState>());

// replace c3's state
c3.callbacks_.conn_data_->setConnectionState(std::unique_ptr<TestConnectionState>(s3));
EXPECT_EQ(1, state_destroyed);

EXPECT_CALL(conn_pool_, onConnReleasedForTest()).Times(2);
c2.releaseConn();
c3.releaseConn();

EXPECT_EQ(1, state_destroyed);

// Disconnect both connections.
EXPECT_CALL(conn_pool_, onConnDestroyedForTest()).Times(2);
conn_pool_.test_conns_[1].connection_->raiseEvent(Network::ConnectionEvent::RemoteClose);
conn_pool_.test_conns_[0].connection_->raiseEvent(Network::ConnectionEvent::RemoteClose);
dispatcher_.clearDeferredDeleteList();

EXPECT_EQ(7, state_destroyed);
}

/**
* Tests that the DrainCallback is invoked when the number of connections goes to zero.
*/
TEST_F(TcpConnPoolImplTest, DrainCallback) {
InSequence s;
ReadyWatcher drained;
Expand All @@ -711,7 +832,9 @@ TEST_F(TcpConnPoolImplTest, DrainCallback) {
dispatcher_.clearDeferredDeleteList();
}

// Test draining a connection pool that has a pending connection.
/**
* Test draining a connection pool that has a pending connection.
*/
TEST_F(TcpConnPoolImplTest, DrainWhileConnecting) {
InSequence s;
ReadyWatcher drained;
Expand All @@ -731,6 +854,9 @@ TEST_F(TcpConnPoolImplTest, DrainWhileConnecting) {
dispatcher_.clearDeferredDeleteList();
}

/**
* Test that the DrainCallback is invoked when a connection is closed.
*/
TEST_F(TcpConnPoolImplTest, DrainOnClose) {
ReadyWatcher drained;
EXPECT_CALL(drained, ready());
Expand All @@ -754,6 +880,9 @@ TEST_F(TcpConnPoolImplTest, DrainOnClose) {
dispatcher_.clearDeferredDeleteList();
}

/**
* Test that busy connections are closed when the connection pool is destroyed.
*/
TEST_F(TcpConnPoolImplDestructorTest, TestBusyConnectionsAreClosed) {
prepareConn();

Expand All @@ -762,6 +891,9 @@ TEST_F(TcpConnPoolImplDestructorTest, TestBusyConnectionsAreClosed) {
conn_pool_.reset();
}

/**
* Test that ready connections are closed when the connection pool is destroyed.
*/
TEST_F(TcpConnPoolImplDestructorTest, TestReadyConnectionsAreClosed) {
prepareConn();

Expand Down
4 changes: 4 additions & 0 deletions test/mocks/tcp/mocks.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ class MockConnectionData : public ConnectionData {
// Tcp::ConnectionPool::ConnectionData
MOCK_METHOD0(connection, Network::ClientConnection&());
MOCK_METHOD1(addUpstreamCallbacks, void(ConnectionPool::UpstreamCallbacks&));
void setConnectionState(ConnectionStatePtr&& state) override { setConnectionState_(state); }
MOCK_METHOD0(connectionState, ConnectionPool::ConnectionState*());

MOCK_METHOD1(setConnectionState_, void(ConnectionPool::ConnectionStatePtr& state));

// If set, invoked in ~MockConnectionData, which indicates that the connection pool
// caller has relased a connection.
Expand Down