Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upstream TCP connection buffer and read buffer limits (#150). #571

Merged
merged 5 commits into from
Mar 16, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion docs/configuration/cluster_manager/cluster.rst
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ Cluster
"name": "...",
"type": "...",
"connect_timeout_ms": "...",
"per_connection_buffer_limit_bytes": "...",
"lb_type": "...",
"hosts": [],
"service_name": "...",
Expand Down Expand Up @@ -37,6 +38,10 @@ connect_timeout_ms
*(required, integer)* The timeout for new network connections to hosts in the cluster specified
in milliseconds.

per_connection_buffer_limit_bytes
*(optional, integer)* Soft limit on size of the cluster's connections read and write buffers.
If unspecified, an implementation defined default is applied (1MiB).

lb_type
*(required, string)* The :ref:`load balancer type <arch_overview_load_balancing_types>` to use
when picking a host in the cluster. Possible options are *round_robin*, *least_request*,
Expand Down Expand Up @@ -172,7 +177,6 @@ outlier_detection
Each of the above configuration values can be overridden via
:ref:`runtime values <config_cluster_manager_cluster_runtime_outlier_detection>`.


.. toctree::
:hidden:

Expand Down
5 changes: 3 additions & 2 deletions docs/configuration/listeners/listeners.rst
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ Each individual listener configuration has the following format:
"ssl_context": "{...}",
"bind_to_port": "...",
"use_proxy_proto": "...",
"use_original_dst": "..."
"use_original_dst": "...",
"per_connection_buffer_limit_bytes": "..."
}

port
Expand Down Expand Up @@ -54,7 +55,7 @@ use_original_dst

per_connection_buffer_limit_bytes
*(optional, integer)* Soft limit on size of the listener's new connection read and write buffers.
If unspecified, an implementation defined default is applied (1MB).
If unspecified, an implementation defined default is applied (1MiB).

.. toctree::
:hidden:
Expand Down
5 changes: 5 additions & 0 deletions include/envoy/network/connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,11 @@ class Connection : public Event::DeferredDeletable, public FilterManager {
* processing pipeline.
*/
virtual void setReadBufferLimit(uint32_t limit) PURE;

/**
* Get the value set with setReadBufferLimit.
*/
virtual uint32_t readBufferLimit() const PURE;
};

typedef std::unique_ptr<Connection> ConnectionPtr;
Expand Down
5 changes: 5 additions & 0 deletions include/envoy/upstream/upstream.h
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,11 @@ class ClusterInfo {
*/
virtual std::chrono::milliseconds connectTimeout() const PURE;

/**
* @return soft limit on size of the cluster's connections read and write buffers.
*/
virtual uint32_t perConnectionBufferLimitBytes() const PURE;

/**
* @return uint64_t features supported by the cluster. @see Features.
*/
Expand Down
5 changes: 5 additions & 0 deletions source/common/json/config_schemas.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1037,6 +1037,11 @@ const std::string Json::Schema::CLUSTER_SCHEMA(R"EOF(
"minimum" : 0,
"exclusiveMinimum" : true
},
"per_connection_buffer_limit_bytes" : {
"type" : "integer",
"minimum" : 0,
"exclusiveMinimum" : true
},
"lb_type" : {
"type" : "string",
"enum" : ["round_robin", "least_request", "random", "ring_hash"]
Expand Down
1 change: 1 addition & 0 deletions source/common/network/connection_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ class ConnectionImpl : public virtual Connection,
State state() override;
void write(Buffer::Instance& data) override;
void setReadBufferLimit(uint32_t limit) override { read_buffer_limit_ = limit; }
uint32_t readBufferLimit() const override { return read_buffer_limit_; }

// Network::BufferSource
Buffer::Instance& getReadBuffer() override { return read_buffer_; }
Expand Down
13 changes: 7 additions & 6 deletions source/common/upstream/upstream_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,11 @@ Host::CreateConnectionData HostImpl::createConnection(Event::Dispatcher& dispatc
Network::ClientConnectionPtr HostImpl::createConnection(Event::Dispatcher& dispatcher,
const ClusterInfo& cluster,
Network::Address::InstancePtr address) {
if (cluster.sslContext()) {
return Network::ClientConnectionPtr{
dispatcher.createSslClientConnection(*cluster.sslContext(), address)};
} else {
return Network::ClientConnectionPtr{dispatcher.createClientConnection(address)};
}
Network::ClientConnectionPtr connection =
cluster.sslContext() ? dispatcher.createSslClientConnection(*cluster.sslContext(), address)
: dispatcher.createClientConnection(address);
connection->setReadBufferLimit(cluster.perConnectionBufferLimitBytes());
return connection;
}

void HostImpl::weight(uint32_t new_weight) { weight_ = std::max(1U, std::min(100U, new_weight)); }
Expand All @@ -60,6 +59,8 @@ ClusterInfoImpl::ClusterInfoImpl(const Json::Object& config, Runtime::Loader& ru
: runtime_(runtime), name_(config.getString("name")),
max_requests_per_connection_(config.getInteger("max_requests_per_connection", 0)),
connect_timeout_(std::chrono::milliseconds(config.getInteger("connect_timeout_ms"))),
per_connection_buffer_limit_bytes_(
config.getInteger("per_connection_buffer_limit_bytes", 1024 * 1024)),
stats_scope_(stats.createScope(fmt::format("cluster.{}.", name_))),
stats_(generateStats(*stats_scope_)), features_(parseFeatures(config)),
http_codec_options_(Http::Utility::parseCodecOptions(config)),
Expand Down
4 changes: 4 additions & 0 deletions source/common/upstream/upstream_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,9 @@ class ClusterInfoImpl : public ClusterInfo {

// Upstream::ClusterInfo
std::chrono::milliseconds connectTimeout() const override { return connect_timeout_; }
uint32_t perConnectionBufferLimitBytes() const override {
return per_connection_buffer_limit_bytes_;
}
uint64_t features() const override { return features_; }
uint64_t httpCodecOptions() const override { return http_codec_options_; }
LoadBalancerType lbType() const override { return lb_type_; }
Expand Down Expand Up @@ -189,6 +192,7 @@ class ClusterInfoImpl : public ClusterInfo {
const std::string name_;
const uint64_t max_requests_per_connection_;
const std::chrono::milliseconds connect_timeout_;
const uint32_t per_connection_buffer_limit_bytes_;
Stats::ScopePtr stats_scope_;
mutable ClusterStats stats_;
Ssl::ClientContextPtr ssl_ctx_;
Expand Down
17 changes: 17 additions & 0 deletions test/common/http/http1/conn_pool_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,23 @@ TEST_F(Http1ConnPoolImplTest, VerifyTimingStats) {
dispatcher_.clearDeferredDeleteList();
}

/**
* Test that buffer limits are set.
*/
TEST_F(Http1ConnPoolImplTest, VerifyBufferLimits) {
NiceMock<Http::MockStreamDecoder> outer_decoder;
ConnPoolCallbacks callbacks;
conn_pool_.expectClientCreate();
EXPECT_CALL(*cluster_, perConnectionBufferLimitBytes()).WillOnce(Return(8192));
EXPECT_CALL(*conn_pool_.test_clients_.back().connection_, setReadBufferLimit(8192));
Http::ConnectionPool::Cancellable* handle = conn_pool_.newStream(outer_decoder, callbacks);
EXPECT_NE(nullptr, handle);

EXPECT_CALL(conn_pool_, onClientDestroy());
conn_pool_.test_clients_[0].connection_->raiseEvents(Network::ConnectionEvent::RemoteClose);
dispatcher_.clearDeferredDeleteList();
}

/**
* Tests a request that generates a new connection, completes, and then a second request that uses
* the same connection.
Expand Down
20 changes: 20 additions & 0 deletions test/common/http/http2/conn_pool_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,26 @@ TEST_F(Http2ConnPoolImplTest, VerifyConnectionTimingStats) {
dispatcher_.clearDeferredDeleteList();
}

/**
* Test that buffer limits are set.
*/
TEST_F(Http2ConnPoolImplTest, VerifyBufferLimits) {
expectClientCreate();
EXPECT_CALL(*cluster_, perConnectionBufferLimitBytes()).WillOnce(Return(8192));
EXPECT_CALL(*test_clients_.back().connection_, setReadBufferLimit(8192));

ActiveTestRequest r1(*this, 0);
EXPECT_CALL(r1.inner_encoder_, encodeHeaders(_, true));
r1.callbacks_.outer_encoder_->encodeHeaders(HeaderMapImpl{}, true);
expectClientConnect(0);
EXPECT_CALL(r1.decoder_, decodeHeaders_(_, true));
r1.inner_decoder_->decodeHeaders(HeaderMapPtr{new HeaderMapImpl{}}, true);

test_clients_[0].connection_->raiseEvents(Network::ConnectionEvent::RemoteClose);
EXPECT_CALL(*this, onClientDestroy());
dispatcher_.clearDeferredDeleteList();
}

TEST_F(Http2ConnPoolImplTest, RequestAndResponse) {
InSequence s;

Expand Down
1 change: 1 addition & 0 deletions test/common/network/connection_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ class ReadBufferLimitTest : public testing::Test {
server_connection = std::move(conn);
server_connection->addReadFilter(read_filter);
EXPECT_EQ("", server_connection->nextProtocol());
EXPECT_EQ(read_buffer_limit, server_connection->readBufferLimit());
}));

uint32_t filter_seen = 0;
Expand Down
1 change: 1 addition & 0 deletions test/common/ssl/connection_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,7 @@ class SslReadBufferLimitTest : public testing::Test {
server_connection = std::move(conn);
server_connection->addReadFilter(read_filter);
EXPECT_EQ("", server_connection->nextProtocol());
EXPECT_EQ(read_buffer_limit, server_connection->readBufferLimit());
}));

uint32_t filter_seen = 0;
Expand Down
28 changes: 28 additions & 0 deletions test/common/upstream/cluster_manager_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,34 @@ TEST_F(ClusterManagerImplTest, UnknownCluster) {
factory_.tls_.shutdownThread();
}

/**
* Test that buffer limits are set on new TCP connections.
*/
TEST_F(ClusterManagerImplTest, VerifyBufferLimits) {
std::string json = R"EOF(
{
"clusters": [
{
"name": "cluster_1",
"connect_timeout_ms": 250,
"per_connection_buffer_limit_bytes": 8192,
"type": "static",
"lb_type": "round_robin",
"hosts": [{"url": "tcp://127.0.0.1:11001"}]
}]
}
)EOF";

Json::ObjectPtr loader = Json::Factory::LoadFromString(json);
create(*loader);
Network::MockClientConnection* connection = new NiceMock<Network::MockClientConnection>();
EXPECT_CALL(*connection, setReadBufferLimit(8192));
EXPECT_CALL(factory_.tls_.dispatcher_, createClientConnection_(_)).WillOnce(Return(connection));
auto conn_data = cluster_manager_->tcpConnForCluster("cluster_1");
EXPECT_EQ(connection, conn_data.connection_.get());
factory_.tls_.shutdownThread();
}

TEST_F(ClusterManagerImplTest, ShutdownOrder) {
std::string json = R"EOF(
{
Expand Down
21 changes: 11 additions & 10 deletions test/common/upstream/health_checker_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include "test/test_common/utility.h"

using testing::_;
using testing::Invoke;
using testing::NiceMock;
using testing::Return;
using testing::ReturnRef;
Expand All @@ -23,12 +24,12 @@ class TestHttpHealthCheckerImpl : public HttpHealthCheckerImpl {
public:
using HttpHealthCheckerImpl::HttpHealthCheckerImpl;

Http::CodecClient* createCodecClient(Upstream::Host::CreateConnectionData&) override {
return createCodecClient_();
Http::CodecClient* createCodecClient(Upstream::Host::CreateConnectionData& conn_data) override {
return createCodecClient_(conn_data);
};

// HttpHealthCheckerImpl
MOCK_METHOD0(createCodecClient_, Http::CodecClient*());
MOCK_METHOD1(createCodecClient_, Http::CodecClient*(Upstream::Host::CreateConnectionData&));
};

class HttpHealthCheckerImplTest : public testing::Test {
Expand All @@ -41,7 +42,6 @@ class HttpHealthCheckerImplTest : public testing::Test {
Http::MockClientConnection* codec_{};
Stats::IsolatedStoreImpl stats_store_;
Network::MockClientConnection* client_connection_{};
Http::CodecClient* codec_client_{};
NiceMock<Http::MockStreamEncoder> request_encoder_;
Http::StreamDecoder* stream_response_callbacks_{};
};
Expand Down Expand Up @@ -104,14 +104,15 @@ class HttpHealthCheckerImplTest : public testing::Test {
void expectClientCreate(size_t index) {
TestSession& test_session = *test_sessions_[index];

test_session.codec_ = new NiceMock<Http::MockClientConnection>();
auto* codec = test_session.codec_ = new NiceMock<Http::MockClientConnection>();
test_session.client_connection_ = new NiceMock<Network::MockClientConnection>();
auto create_codec_client = [codec](Upstream::Host::CreateConnectionData& conn_data) {
return new CodecClientForTest(std::move(conn_data.connection_), codec, nullptr, nullptr);
};

Network::ClientConnectionPtr connection{test_session.client_connection_};
test_session.codec_client_ =
new CodecClientForTest(std::move(connection), test_session.codec_, nullptr, nullptr);
EXPECT_CALL(*health_checker_, createCodecClient_())
.WillOnce(Return(test_session.codec_client_));
EXPECT_CALL(dispatcher_, createClientConnection_(_))
.WillOnce(Return(test_session.client_connection_));
EXPECT_CALL(*health_checker_, createCodecClient_(_)).WillOnce(Invoke(create_codec_client));
}

void expectStreamCreate(size_t index) {
Expand Down
12 changes: 8 additions & 4 deletions test/common/upstream/logical_dns_cluster_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,8 @@ TEST_F(LogicalDnsClusterTest, Basic) {
HostPtr logical_host = cluster_->hosts()[0];

EXPECT_CALL(dispatcher_, createClientConnection_(
PointeesEq(Network::Utility::resolveUrl("tcp://127.0.0.1:443"))));
PointeesEq(Network::Utility::resolveUrl("tcp://127.0.0.1:443"))))
.WillOnce(Return(new NiceMock<Network::MockClientConnection>()));
logical_host->createConnection(dispatcher_);
logical_host->outlierDetector().putHttpResponseCode(200);

Expand All @@ -133,7 +134,8 @@ TEST_F(LogicalDnsClusterTest, Basic) {

EXPECT_EQ(logical_host, cluster_->hosts()[0]);
EXPECT_CALL(dispatcher_, createClientConnection_(
PointeesEq(Network::Utility::resolveUrl("tcp://127.0.0.1:443"))));
PointeesEq(Network::Utility::resolveUrl("tcp://127.0.0.1:443"))))
.WillOnce(Return(new NiceMock<Network::MockClientConnection>()));
Host::CreateConnectionData data = logical_host->createConnection(dispatcher_);
EXPECT_FALSE(data.host_description_->canary());
EXPECT_EQ(&cluster_->hosts()[0]->cluster(), &data.host_description_->cluster());
Expand All @@ -152,7 +154,8 @@ TEST_F(LogicalDnsClusterTest, Basic) {

EXPECT_EQ(logical_host, cluster_->hosts()[0]);
EXPECT_CALL(dispatcher_, createClientConnection_(
PointeesEq(Network::Utility::resolveUrl("tcp://127.0.0.3:443"))));
PointeesEq(Network::Utility::resolveUrl("tcp://127.0.0.3:443"))))
.WillOnce(Return(new NiceMock<Network::MockClientConnection>()));
logical_host->createConnection(dispatcher_);

expectResolve();
Expand All @@ -164,7 +167,8 @@ TEST_F(LogicalDnsClusterTest, Basic) {

EXPECT_EQ(logical_host, cluster_->hosts()[0]);
EXPECT_CALL(dispatcher_, createClientConnection_(
PointeesEq(Network::Utility::resolveUrl("tcp://127.0.0.3:443"))));
PointeesEq(Network::Utility::resolveUrl("tcp://127.0.0.3:443"))))
.WillOnce(Return(new NiceMock<Network::MockClientConnection>()));
logical_host->createConnection(dispatcher_);

// Make sure we cancel.
Expand Down
2 changes: 2 additions & 0 deletions test/mocks/network/mocks.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ class MockConnection : public Connection, public MockConnectionBase {
MOCK_METHOD0(state, State());
MOCK_METHOD1(write, void(Buffer::Instance& data));
MOCK_METHOD1(setReadBufferLimit, void(uint32_t limit));
MOCK_CONST_METHOD0(readBufferLimit, uint32_t());
};

/**
Expand Down Expand Up @@ -89,6 +90,7 @@ class MockClientConnection : public ClientConnection, public MockConnectionBase
MOCK_METHOD0(state, State());
MOCK_METHOD1(write, void(Buffer::Instance& data));
MOCK_METHOD1(setReadBufferLimit, void(uint32_t limit));
MOCK_CONST_METHOD0(readBufferLimit, uint32_t());

// Network::ClientConnection
MOCK_METHOD0(connect, void());
Expand Down
1 change: 1 addition & 0 deletions test/mocks/upstream/cluster_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ class MockClusterInfo : public ClusterInfo {

// Upstream::ClusterInfo
MOCK_CONST_METHOD0(connectTimeout, std::chrono::milliseconds());
MOCK_CONST_METHOD0(perConnectionBufferLimitBytes, uint32_t());
MOCK_CONST_METHOD0(features, uint64_t());
MOCK_CONST_METHOD0(httpCodecOptions, uint64_t());
MOCK_CONST_METHOD0(lbType, LoadBalancerType());
Expand Down
35 changes: 35 additions & 0 deletions test/server/configuration_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,41 @@ TEST(ConfigurationImplTest, VerifySubjectAltNameConfig) {
EXPECT_TRUE(config.listeners().back()->sslContext() != nullptr);
}

TEST(ConfigurationImplTest, SetUpstreamClusterPerConnectionBufferLimit) {
std::string json = R"EOF(
{
"listeners" : [],
"cluster_manager": {
"clusters": [
{
"name": "test_cluster",
"type": "static",
"connect_timeout_ms": 1,
"per_connection_buffer_limit_bytes": 8192,
"lb_type": "round_robin",
"hosts": []
}
]
}
}
)EOF";

Json::ObjectPtr loader = Json::Factory::LoadFromString(json);

NiceMock<Server::MockInstance> server;
MainImpl config(server);
config.initialize(*loader);

ASSERT_EQ(1U, config.clusterManager().clusters().count("test_cluster"));
EXPECT_EQ(8192U, config.clusterManager()
.clusters()
.find("test_cluster")
->second.get()
.info()
->perConnectionBufferLimitBytes());
server.thread_local_.shutdownThread();
}

TEST(ConfigurationImplTest, BadListenerConfig) {
std::string json = R"EOF(
{
Expand Down