diff --git a/include/envoy/http/filter.h b/include/envoy/http/filter.h index 44fa734bc3be9..4ffa535542006 100644 --- a/include/envoy/http/filter.h +++ b/include/envoy/http/filter.h @@ -418,6 +418,20 @@ class StreamDecoderFilterCallbacks : public virtual StreamFilterCallbacks { // Note that HttpConnectionManager sanitization will *not* be performed on the // recreated stream, as it is assumed that sanitization has already been done. virtual bool recreateStream() PURE; + + /** + * Adds socket options to be applied to any connections used for upstream requests. Note that + * unique values for the options will likely lead to many connection pools being created. The + * added options are appended to any previously added. + * + * @param options The options to be added. + */ + virtual void addUpstreamSocketOptions(const Network::Socket::OptionsSharedPtr& options) PURE; + + /** + * @return The socket options to be applied to the upstream request. + */ + virtual Network::Socket::OptionsSharedPtr getUpstreamSocketOptions() const PURE; }; /** diff --git a/include/envoy/upstream/load_balancer.h b/include/envoy/upstream/load_balancer.h index 6906990074e73..a62d2ac3e309a 100644 --- a/include/envoy/upstream/load_balancer.h +++ b/include/envoy/upstream/load_balancer.h @@ -71,6 +71,11 @@ class LoadBalancerContext { * ignored. */ virtual uint32_t hostSelectionRetryCount() const PURE; + + /** + * Returns the set of socket options which should be applied on upstream connections + */ + virtual Network::Socket::OptionsSharedPtr upstreamSocketOptions() const PURE; }; /** diff --git a/source/common/http/async_client_impl.h b/source/common/http/async_client_impl.h index eb319aa9c6bdb..f60883058fea6 100644 --- a/source/common/http/async_client_impl.h +++ b/source/common/http/async_client_impl.h @@ -355,6 +355,8 @@ class AsyncStreamImpl : public AsyncClient::Stream, void setDecoderBufferLimit(uint32_t) override {} uint32_t decoderBufferLimit() override { return 0; } bool recreateStream() override { return false; } + void addUpstreamSocketOptions(const Network::Socket::OptionsSharedPtr&) override {} + Network::Socket::OptionsSharedPtr getUpstreamSocketOptions() const override { return {}; } AsyncClient::StreamCallbacks& stream_callbacks_; const uint64_t stream_id_; diff --git a/source/common/http/conn_manager_impl.cc b/source/common/http/conn_manager_impl.cc index bc30b41f084c4..c9004f00ad9da 100644 --- a/source/common/http/conn_manager_impl.cc +++ b/source/common/http/conn_manager_impl.cc @@ -403,7 +403,8 @@ ConnectionManagerImpl::ActiveStream::ActiveStream(ConnectionManagerImpl& connect stream_id_(connection_manager.random_generator_.random()), request_response_timespan_(new Stats::Timespan( connection_manager_.stats_.named_.downstream_rq_time_, connection_manager_.timeSource())), - stream_info_(connection_manager_.codec_->protocol(), connection_manager_.timeSource()) { + stream_info_(connection_manager_.codec_->protocol(), connection_manager_.timeSource()), + upstream_options_(std::make_shared()) { connection_manager_.stats_.named_.downstream_rq_total_.inc(); connection_manager_.stats_.named_.downstream_rq_active_.inc(); if (connection_manager_.codec_->protocol() == Protocol::Http2) { diff --git a/source/common/http/conn_manager_impl.h b/source/common/http/conn_manager_impl.h index a932e920c2cd2..06a79a9b03578 100644 --- a/source/common/http/conn_manager_impl.h +++ b/source/common/http/conn_manager_impl.h @@ -242,6 +242,14 @@ class ConnectionManagerImpl : Logger::Loggable, uint32_t decoderBufferLimit() override { return parent_.buffer_limit_; } bool recreateStream() override; + void addUpstreamSocketOptions(const Network::Socket::OptionsSharedPtr& options) override { + Network::Socket::appendOptions(parent_.upstream_options_, options); + } + + Network::Socket::OptionsSharedPtr getUpstreamSocketOptions() const override { + return parent_.upstream_options_; + } + // Each decoder filter instance checks if the request passed to the filter is gRPC // so that we can issue gRPC local responses to gRPC requests. Filter's decodeHeaders() // called here may change the content type, so we must check it before the call. @@ -517,6 +525,7 @@ class ConnectionManagerImpl : Logger::Loggable, // Whether a filter has indicated that the response should be treated as a headers only // response. bool encoding_headers_only_{}; + Network::Socket::OptionsSharedPtr upstream_options_; }; typedef std::unique_ptr ActiveStreamPtr; diff --git a/source/common/router/router.h b/source/common/router/router.h index cc39a28140d2a..a2cb522f4b759 100644 --- a/source/common/router/router.h +++ b/source/common/router/router.h @@ -239,6 +239,10 @@ class Filter : Logger::Loggable, return retry_state_->hostSelectionMaxAttempts(); } + Network::Socket::OptionsSharedPtr upstreamSocketOptions() const override { + return callbacks_->getUpstreamSocketOptions(); + } + /** * Set a computed cookie to be sent with the downstream headers. * @param key supplies the size of the cookie diff --git a/source/common/upstream/cluster_manager_impl.cc b/source/common/upstream/cluster_manager_impl.cc index c73bf5b639c92..905a87ae2a5d3 100644 --- a/source/common/upstream/cluster_manager_impl.cc +++ b/source/common/upstream/cluster_manager_impl.cc @@ -40,6 +40,16 @@ namespace Envoy { namespace Upstream { +namespace { + +void addOptionsIfNotNull(Network::Socket::OptionsSharedPtr& options, + const Network::Socket::OptionsSharedPtr& to_add) { + if (to_add != nullptr) { + Network::Socket::appendOptions(options, to_add); + } +} + +} // namespace void ClusterManagerInitHelper::addCluster(Cluster& cluster) { // See comments in ClusterManagerImpl::addOrUpdateCluster() for why this is only called during @@ -1133,22 +1143,22 @@ ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::connPool( return nullptr; } - // Inherit socket options from downstream connection, if set. std::vector hash_key = {uint8_t(protocol)}; - // Use downstream connection socket options for computing connection pool hash key, if any. + Network::Socket::OptionsSharedPtr upstream_options(std::make_shared()); + if (context) { + // Inherit socket options from downstream connection, if set. + if (context->downstreamConnection()) { + addOptionsIfNotNull(upstream_options, context->downstreamConnection()->socketOptions()); + } + addOptionsIfNotNull(upstream_options, context->upstreamSocketOptions()); + } + + // Use the socket options for computing connection pool hash key, if any. // This allows socket options to control connection pooling so that connections with // different options are not pooled together. - bool have_options = false; - if (context && context->downstreamConnection()) { - const Network::ConnectionSocket::OptionsSharedPtr& options = - context->downstreamConnection()->socketOptions(); - if (options) { - for (const auto& option : *options) { - have_options = true; - option->hashKey(hash_key); - } - } + for (const auto& option : *upstream_options) { + option->hashKey(hash_key); } ConnPoolsContainer& container = *parent_.getHttpConnPoolsContainer(host, true); @@ -1159,7 +1169,7 @@ ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::connPool( container.pools_->getPool(priority, hash_key, [&]() { return parent_.parent_.factory_.allocateConnPool( parent_.thread_local_dispatcher_, host, priority, protocol, - have_options ? context->downstreamConnection()->socketOptions() : nullptr); + !upstream_options->empty() ? upstream_options : nullptr); }); if (pool.has_value()) { diff --git a/source/common/upstream/load_balancer_impl.h b/source/common/upstream/load_balancer_impl.h index 7ba7377cfe999..3f0dca76df6a4 100644 --- a/source/common/upstream/load_balancer_impl.h +++ b/source/common/upstream/load_balancer_impl.h @@ -148,6 +148,8 @@ class LoadBalancerContextBase : public LoadBalancerContext { bool shouldSelectAnotherHost(const Host&) override { return false; } uint32_t hostSelectionRetryCount() const override { return 1; } + + Network::Socket::OptionsSharedPtr upstreamSocketOptions() const override { return {}; } }; /** diff --git a/test/common/router/router_test.cc b/test/common/router/router_test.cc index e2bfb3ad30398..8b0c171a9e704 100644 --- a/test/common/router/router_test.cc +++ b/test/common/router/router_test.cc @@ -7,6 +7,7 @@ #include "common/config/metadata.h" #include "common/config/well_known_names.h" #include "common/http/context_impl.h" +#include "common/network/socket_option_factory.h" #include "common/network/utility.h" #include "common/router/config_impl.h" #include "common/router/router.h" @@ -2974,6 +2975,25 @@ TEST_F(RouterTest, AutoHostRewriteDisabled) { router_.decodeHeaders(incoming_headers, true); } +TEST_F(RouterTest, UpstreamSocketOptionsReturnedEmpty) { + EXPECT_CALL(callbacks_, getUpstreamSocketOptions()) + .WillOnce(Return(Network::Socket::OptionsSharedPtr())); + + auto options = router_.upstreamSocketOptions(); + + EXPECT_EQ(options.get(), nullptr); +} + +TEST_F(RouterTest, UpstreamSocketOptionsReturnedNonEmpty) { + Network::Socket::OptionsSharedPtr to_return = + Network::SocketOptionFactory::buildIpTransparentOptions(); + EXPECT_CALL(callbacks_, getUpstreamSocketOptions()).WillOnce(Return(to_return)); + + auto options = router_.upstreamSocketOptions(); + + EXPECT_EQ(to_return, options); +} + class WatermarkTest : public RouterTest { public: void sendRequest(bool header_only_request = true, bool pool_ready = true) { diff --git a/test/common/upstream/cluster_manager_impl_test.cc b/test/common/upstream/cluster_manager_impl_test.cc index 5647c8d8a6e24..8e54ce7133651 100644 --- a/test/common/upstream/cluster_manager_impl_test.cc +++ b/test/common/upstream/cluster_manager_impl_test.cc @@ -9,6 +9,7 @@ #include "common/config/bootstrap_json.h" #include "common/config/utility.h" #include "common/http/context_impl.h" +#include "common/network/socket_option_factory.h" #include "common/network/socket_option_impl.h" #include "common/network/transport_socket_options_impl.h" #include "common/network/utility.h" @@ -71,8 +72,8 @@ class TestClusterManagerFactory : public ClusterManagerFactory { Http::ConnectionPool::InstancePtr allocateConnPool(Event::Dispatcher&, HostConstSharedPtr host, ResourcePriority, Http::Protocol, - const Network::ConnectionSocket::OptionsSharedPtr&) override { - return Http::ConnectionPool::InstancePtr{allocateConnPool_(host)}; + const Network::ConnectionSocket::OptionsSharedPtr& options) override { + return Http::ConnectionPool::InstancePtr{allocateConnPool_(host, options)}; } Tcp::ConnectionPool::InstancePtr @@ -101,7 +102,9 @@ class TestClusterManagerFactory : public ClusterManagerFactory { MOCK_METHOD1(clusterManagerFromProto_, ClusterManager*(const envoy::config::bootstrap::v2::Bootstrap& bootstrap)); - MOCK_METHOD1(allocateConnPool_, Http::ConnectionPool::Instance*(HostConstSharedPtr host)); + MOCK_METHOD2(allocateConnPool_, + Http::ConnectionPool::Instance*(HostConstSharedPtr host, + Network::ConnectionSocket::OptionsSharedPtr)); MOCK_METHOD1(allocateTcpConnPool_, Tcp::ConnectionPool::Instance*(HostConstSharedPtr host)); MOCK_METHOD4(clusterFromProto_, ClusterSharedPtr(const envoy::api::v2::Cluster& cluster, ClusterManager& cm, @@ -1202,7 +1205,7 @@ TEST_F(ClusterManagerImplTest, DynamicAddRemove) { EXPECT_EQ(cluster2->info_, cluster_manager_->get("fake_cluster")->info()); EXPECT_EQ(1UL, cluster_manager_->clusters().size()); Http::ConnectionPool::MockInstance* cp = new Http::ConnectionPool::MockInstance(); - EXPECT_CALL(factory_, allocateConnPool_(_)).WillOnce(Return(cp)); + EXPECT_CALL(factory_, allocateConnPool_(_, _)).WillOnce(Return(cp)); EXPECT_EQ(cp, cluster_manager_->httpConnPoolForCluster("fake_cluster", ResourcePriority::Default, Http::Protocol::Http11, nullptr)); @@ -1357,7 +1360,7 @@ TEST_F(ClusterManagerImplTest, CloseHttpConnectionsOnHealthFailure) { })); create(parseBootstrapFromJson(json)); - EXPECT_CALL(factory_, allocateConnPool_(_)).WillOnce(Return(cp1)); + EXPECT_CALL(factory_, allocateConnPool_(_, _)).WillOnce(Return(cp1)); cluster_manager_->httpConnPoolForCluster("some_cluster", ResourcePriority::Default, Http::Protocol::Http11, nullptr); @@ -1368,7 +1371,7 @@ TEST_F(ClusterManagerImplTest, CloseHttpConnectionsOnHealthFailure) { test_host->healthFlagSet(Host::HealthFlag::FAILED_OUTLIER_CHECK); outlier_detector.runCallbacks(test_host); - EXPECT_CALL(factory_, allocateConnPool_(_)).WillOnce(Return(cp2)); + EXPECT_CALL(factory_, allocateConnPool_(_, _)).WillOnce(Return(cp2)); cluster_manager_->httpConnPoolForCluster("some_cluster", ResourcePriority::High, Http::Protocol::Http11, nullptr); } @@ -1624,7 +1627,7 @@ TEST_F(ClusterManagerImplTest, DynamicHostRemove) { EXPECT_CALL(initialized, ready()); cluster_manager_->setInitializedCb([&]() -> void { initialized.ready(); }); - EXPECT_CALL(factory_, allocateConnPool_(_)) + EXPECT_CALL(factory_, allocateConnPool_(_, _)) .Times(4) .WillRepeatedly(ReturnNew()); @@ -1788,7 +1791,7 @@ TEST_F(ClusterManagerImplTest, DynamicHostRemoveWithTls) { EXPECT_CALL(initialized, ready()); cluster_manager_->setInitializedCb([&]() -> void { initialized.ready(); }); - EXPECT_CALL(factory_, allocateConnPool_(_)) + EXPECT_CALL(factory_, allocateConnPool_(_, _)) .Times(4) .WillRepeatedly(ReturnNew()); @@ -1978,7 +1981,7 @@ TEST_F(ClusterManagerImplTest, DynamicHostRemoveDefaultPriority) { dns_callback(TestUtility::makeDnsResponse({"127.0.0.2"})); - EXPECT_CALL(factory_, allocateConnPool_(_)) + EXPECT_CALL(factory_, allocateConnPool_(_, _)) .WillOnce(ReturnNew()); EXPECT_CALL(factory_, allocateTcpConnPool_(_)) @@ -2055,7 +2058,7 @@ TEST_F(ClusterManagerImplTest, ConnPoolDestroyWithDraining) { dns_callback(TestUtility::makeDnsResponse({"127.0.0.2"})); MockConnPoolWithDestroy* mock_cp = new MockConnPoolWithDestroy(); - EXPECT_CALL(factory_, allocateConnPool_(_)).WillOnce(Return(mock_cp)); + EXPECT_CALL(factory_, allocateConnPool_(_, _)).WillOnce(Return(mock_cp)); MockTcpConnPoolWithDestroy* mock_tcp = new MockTcpConnPoolWithDestroy(); EXPECT_CALL(factory_, allocateTcpConnPool_(_)).WillOnce(Return(mock_tcp)); @@ -2120,7 +2123,7 @@ TEST_F(ClusterManagerImplTest, OriginalDstInitialization) { // Tests that all the HC/weight/metadata changes are delivered in one go, as long as // there's no hosts changes in between. // Also tests that if hosts are added/removed between mergeable updates, delivery will -// happen and the scheduled update will be canceled. +// happen and the scheduled update will be cancelled. TEST_F(ClusterManagerImplTest, MergedUpdates) { createWithLocalClusterUpdate(); @@ -2486,6 +2489,75 @@ TEST_F(ClusterManagerImplTest, MergedUpdatesDestroyedOnUpdate) { EXPECT_EQ(0, factory_.stats_.gauge("cluster_manager.warming_clusters").value()); } +TEST_F(ClusterManagerImplTest, UpstreamSocketOptionsPassedToConnPool) { + createWithLocalClusterUpdate(); + NiceMock context; + + Http::ConnectionPool::MockInstance* to_create = new Http::ConnectionPool::MockInstance(); + Network::Socket::OptionsSharedPtr options_to_return = + Network::SocketOptionFactory::buildIpTransparentOptions(); + + EXPECT_CALL(context, upstreamSocketOptions()).WillOnce(Return(options_to_return)); + EXPECT_CALL(factory_, allocateConnPool_(_, _)).WillOnce(Return(to_create)); + + Http::ConnectionPool::Instance* cp = cluster_manager_->httpConnPoolForCluster( + "cluster_1", ResourcePriority::Default, Http::Protocol::Http11, &context); + + EXPECT_NE(nullptr, cp); +} + +TEST_F(ClusterManagerImplTest, UpstreamSocketOptionsUsedInConnPoolHash) { + createWithLocalClusterUpdate(); + NiceMock context1; + NiceMock context2; + + Http::ConnectionPool::MockInstance* to_create1 = new Http::ConnectionPool::MockInstance(); + Http::ConnectionPool::MockInstance* to_create2 = new Http::ConnectionPool::MockInstance(); + Network::Socket::OptionsSharedPtr options1 = + Network::SocketOptionFactory::buildIpTransparentOptions(); + Network::Socket::OptionsSharedPtr options2 = + Network::SocketOptionFactory::buildSocketMarkOptions(3); + + EXPECT_CALL(context1, upstreamSocketOptions()).WillRepeatedly(Return(options1)); + EXPECT_CALL(context2, upstreamSocketOptions()).WillRepeatedly(Return(options2)); + EXPECT_CALL(factory_, allocateConnPool_(_, _)).WillOnce(Return(to_create1)); + + Http::ConnectionPool::Instance* cp1 = cluster_manager_->httpConnPoolForCluster( + "cluster_1", ResourcePriority::Default, Http::Protocol::Http11, &context1); + + EXPECT_CALL(factory_, allocateConnPool_(_, _)).WillOnce(Return(to_create2)); + Http::ConnectionPool::Instance* cp2 = cluster_manager_->httpConnPoolForCluster( + "cluster_1", ResourcePriority::Default, Http::Protocol::Http11, &context2); + + Http::ConnectionPool::Instance* should_be_cp1 = cluster_manager_->httpConnPoolForCluster( + "cluster_1", ResourcePriority::Default, Http::Protocol::Http11, &context1); + Http::ConnectionPool::Instance* should_be_cp2 = cluster_manager_->httpConnPoolForCluster( + "cluster_1", ResourcePriority::Default, Http::Protocol::Http11, &context2); + + // The different upstream options should lead to different hashKeys, thus different pools. + EXPECT_NE(cp1, cp2); + + // Reusing the same options should lead to the same connection pools. + EXPECT_EQ(cp1, should_be_cp1); + EXPECT_EQ(cp2, should_be_cp2); +} + +TEST_F(ClusterManagerImplTest, UpstreamSocketOptionsNullIsOkay) { + createWithLocalClusterUpdate(); + NiceMock context; + + Http::ConnectionPool::MockInstance* to_create = new Http::ConnectionPool::MockInstance(); + Network::Socket::OptionsSharedPtr options_to_return = nullptr; + + EXPECT_CALL(context, upstreamSocketOptions()).WillOnce(Return(options_to_return)); + EXPECT_CALL(factory_, allocateConnPool_(_, _)).WillOnce(Return(to_create)); + + Http::ConnectionPool::Instance* cp = cluster_manager_->httpConnPoolForCluster( + "cluster_1", ResourcePriority::Default, Http::Protocol::Http11, &context); + + EXPECT_NE(nullptr, cp); +} + class ClusterManagerInitHelperTest : public testing::Test { public: MOCK_METHOD1(onClusterInit, void(Cluster& cluster)); diff --git a/test/mocks/http/mocks.h b/test/mocks/http/mocks.h index 774672196a95e..6dd46f28b3470 100644 --- a/test/mocks/http/mocks.h +++ b/test/mocks/http/mocks.h @@ -148,6 +148,8 @@ class MockStreamDecoderFilterCallbacks : public StreamDecoderFilterCallbacks, MOCK_METHOD1(setDecoderBufferLimit, void(uint32_t)); MOCK_METHOD0(decoderBufferLimit, uint32_t()); MOCK_METHOD0(recreateStream, bool()); + MOCK_METHOD1(addUpstreamSocketOptions, void(const Network::Socket::OptionsSharedPtr& options)); + MOCK_CONST_METHOD0(getUpstreamSocketOptions, Network::Socket::OptionsSharedPtr()); // Http::StreamDecoderFilterCallbacks void sendLocalReply(Code code, absl::string_view body, diff --git a/test/mocks/upstream/load_balancer_context.cc b/test/mocks/upstream/load_balancer_context.cc index 5a07100bc6642..5a424b07b8672 100644 --- a/test/mocks/upstream/load_balancer_context.cc +++ b/test/mocks/upstream/load_balancer_context.cc @@ -1,9 +1,17 @@ #include "test/mocks/upstream/load_balancer_context.h" +using testing::_; +using testing::ReturnRef; + namespace Envoy { namespace Upstream { -MockLoadBalancerContext::MockLoadBalancerContext() = default; +MockLoadBalancerContext::MockLoadBalancerContext() { + // By default, set loads which treat everything as healthy in the first priority. + priority_load_.healthy_priority_load_ = HealthyLoad({100}); + priority_load_.degraded_priority_load_ = DegradedLoad({0}); + ON_CALL(*this, determinePriorityLoad(_, _)).WillByDefault(ReturnRef(priority_load_)); +} MockLoadBalancerContext::~MockLoadBalancerContext() = default; diff --git a/test/mocks/upstream/load_balancer_context.h b/test/mocks/upstream/load_balancer_context.h index 578813991fbe1..495da74935a4a 100644 --- a/test/mocks/upstream/load_balancer_context.h +++ b/test/mocks/upstream/load_balancer_context.h @@ -18,6 +18,10 @@ class MockLoadBalancerContext : public LoadBalancerContext { const HealthyAndDegradedLoad&(const PrioritySet&, const HealthyAndDegradedLoad&)); MOCK_METHOD1(shouldSelectAnotherHost, bool(const Host&)); MOCK_CONST_METHOD0(hostSelectionRetryCount, uint32_t()); + MOCK_CONST_METHOD0(upstreamSocketOptions, Network::Socket::OptionsSharedPtr()); + +private: + HealthyAndDegradedLoad priority_load_; }; } // namespace Upstream