Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions include/envoy/http/filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,20 @@ class StreamDecoderFilterCallbacks : public virtual StreamFilterCallbacks {
// Note that HttpConnectionManager sanitization will *not* be performed on the
// recreated stream, as it is assumed that sanitization has already been done.
virtual bool recreateStream() PURE;

/**
* Adds socket options to be applied to any connections used for upstream requests. Note that
* unique values for the options will likely lead to many connection pools being created. The
* added options are appended to any previously added.
*
* @param options The options to be added.
*/
virtual void addUpstreamSocketOptions(const Network::Socket::OptionsSharedPtr& options) PURE;

/**
* @return The socket options to be applied to the upstream request.
*/
virtual Network::Socket::OptionsSharedPtr getUpstreamSocketOptions() const PURE;
};

/**
Expand Down
5 changes: 5 additions & 0 deletions include/envoy/upstream/load_balancer.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,11 @@ class LoadBalancerContext {
* ignored.
*/
virtual uint32_t hostSelectionRetryCount() const PURE;

/**
* Returns the set of socket options which should be applied on upstream connections
*/
virtual Network::Socket::OptionsSharedPtr upstreamSocketOptions() const PURE;
};

/**
Expand Down
2 changes: 2 additions & 0 deletions source/common/http/async_client_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,8 @@ class AsyncStreamImpl : public AsyncClient::Stream,
void setDecoderBufferLimit(uint32_t) override {}
uint32_t decoderBufferLimit() override { return 0; }
bool recreateStream() override { return false; }
void addUpstreamSocketOptions(const Network::Socket::OptionsSharedPtr&) override {}
Network::Socket::OptionsSharedPtr getUpstreamSocketOptions() const override { return {}; }

AsyncClient::StreamCallbacks& stream_callbacks_;
const uint64_t stream_id_;
Expand Down
3 changes: 2 additions & 1 deletion source/common/http/conn_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,8 @@ ConnectionManagerImpl::ActiveStream::ActiveStream(ConnectionManagerImpl& connect
stream_id_(connection_manager.random_generator_.random()),
request_response_timespan_(new Stats::Timespan(
connection_manager_.stats_.named_.downstream_rq_time_, connection_manager_.timeSource())),
stream_info_(connection_manager_.codec_->protocol(), connection_manager_.timeSource()) {
stream_info_(connection_manager_.codec_->protocol(), connection_manager_.timeSource()),
upstream_options_(std::make_shared<Network::Socket::Options>()) {
connection_manager_.stats_.named_.downstream_rq_total_.inc();
connection_manager_.stats_.named_.downstream_rq_active_.inc();
if (connection_manager_.codec_->protocol() == Protocol::Http2) {
Expand Down
9 changes: 9 additions & 0 deletions source/common/http/conn_manager_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,14 @@ class ConnectionManagerImpl : Logger::Loggable<Logger::Id::http>,
uint32_t decoderBufferLimit() override { return parent_.buffer_limit_; }
bool recreateStream() override;

void addUpstreamSocketOptions(const Network::Socket::OptionsSharedPtr& options) override {
Network::Socket::appendOptions(parent_.upstream_options_, options);
}

Network::Socket::OptionsSharedPtr getUpstreamSocketOptions() const override {
return parent_.upstream_options_;
}

// Each decoder filter instance checks if the request passed to the filter is gRPC
// so that we can issue gRPC local responses to gRPC requests. Filter's decodeHeaders()
// called here may change the content type, so we must check it before the call.
Expand Down Expand Up @@ -517,6 +525,7 @@ class ConnectionManagerImpl : Logger::Loggable<Logger::Id::http>,
// Whether a filter has indicated that the response should be treated as a headers only
// response.
bool encoding_headers_only_{};
Network::Socket::OptionsSharedPtr upstream_options_;
};

typedef std::unique_ptr<ActiveStream> ActiveStreamPtr;
Expand Down
4 changes: 4 additions & 0 deletions source/common/router/router.h
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,10 @@ class Filter : Logger::Loggable<Logger::Id::router>,
return retry_state_->hostSelectionMaxAttempts();
}

Network::Socket::OptionsSharedPtr upstreamSocketOptions() const override {
return callbacks_->getUpstreamSocketOptions();
}

/**
* Set a computed cookie to be sent with the downstream headers.
* @param key supplies the size of the cookie
Expand Down
36 changes: 23 additions & 13 deletions source/common/upstream/cluster_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,16 @@

namespace Envoy {
namespace Upstream {
namespace {

void addOptionsIfNotNull(Network::Socket::OptionsSharedPtr& options,
const Network::Socket::OptionsSharedPtr& to_add) {
if (to_add != nullptr) {
Network::Socket::appendOptions(options, to_add);
}
}

} // namespace

void ClusterManagerInitHelper::addCluster(Cluster& cluster) {
// See comments in ClusterManagerImpl::addOrUpdateCluster() for why this is only called during
Expand Down Expand Up @@ -1133,22 +1143,22 @@ ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::connPool(
return nullptr;
}

// Inherit socket options from downstream connection, if set.
std::vector<uint8_t> hash_key = {uint8_t(protocol)};

// Use downstream connection socket options for computing connection pool hash key, if any.
Network::Socket::OptionsSharedPtr upstream_options(std::make_shared<Network::Socket::Options>());
if (context) {
// Inherit socket options from downstream connection, if set.
if (context->downstreamConnection()) {
addOptionsIfNotNull(upstream_options, context->downstreamConnection()->socketOptions());
}
addOptionsIfNotNull(upstream_options, context->upstreamSocketOptions());
}

// Use the socket options for computing connection pool hash key, if any.
// This allows socket options to control connection pooling so that connections with
// different options are not pooled together.
bool have_options = false;
if (context && context->downstreamConnection()) {
const Network::ConnectionSocket::OptionsSharedPtr& options =
context->downstreamConnection()->socketOptions();
if (options) {
for (const auto& option : *options) {
have_options = true;
option->hashKey(hash_key);
}
}
for (const auto& option : *upstream_options) {
option->hashKey(hash_key);
}

ConnPoolsContainer& container = *parent_.getHttpConnPoolsContainer(host, true);
Expand All @@ -1159,7 +1169,7 @@ ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::connPool(
container.pools_->getPool(priority, hash_key, [&]() {
return parent_.parent_.factory_.allocateConnPool(
parent_.thread_local_dispatcher_, host, priority, protocol,
have_options ? context->downstreamConnection()->socketOptions() : nullptr);
!upstream_options->empty() ? upstream_options : nullptr);
});

if (pool.has_value()) {
Expand Down
2 changes: 2 additions & 0 deletions source/common/upstream/load_balancer_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,8 @@ class LoadBalancerContextBase : public LoadBalancerContext {
bool shouldSelectAnotherHost(const Host&) override { return false; }

uint32_t hostSelectionRetryCount() const override { return 1; }

Network::Socket::OptionsSharedPtr upstreamSocketOptions() const override { return {}; }
};

/**
Expand Down
20 changes: 20 additions & 0 deletions test/common/router/router_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include "common/config/metadata.h"
#include "common/config/well_known_names.h"
#include "common/http/context_impl.h"
#include "common/network/socket_option_factory.h"
#include "common/network/utility.h"
#include "common/router/config_impl.h"
#include "common/router/router.h"
Expand Down Expand Up @@ -2974,6 +2975,25 @@ TEST_F(RouterTest, AutoHostRewriteDisabled) {
router_.decodeHeaders(incoming_headers, true);
}

TEST_F(RouterTest, UpstreamSocketOptionsReturnedEmpty) {
EXPECT_CALL(callbacks_, getUpstreamSocketOptions())
.WillOnce(Return(Network::Socket::OptionsSharedPtr()));

auto options = router_.upstreamSocketOptions();

EXPECT_EQ(options.get(), nullptr);
}

TEST_F(RouterTest, UpstreamSocketOptionsReturnedNonEmpty) {
Network::Socket::OptionsSharedPtr to_return =
Network::SocketOptionFactory::buildIpTransparentOptions();
EXPECT_CALL(callbacks_, getUpstreamSocketOptions()).WillOnce(Return(to_return));

auto options = router_.upstreamSocketOptions();

EXPECT_EQ(to_return, options);
}

class WatermarkTest : public RouterTest {
public:
void sendRequest(bool header_only_request = true, bool pool_ready = true) {
Expand Down
94 changes: 83 additions & 11 deletions test/common/upstream/cluster_manager_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include "common/config/bootstrap_json.h"
#include "common/config/utility.h"
#include "common/http/context_impl.h"
#include "common/network/socket_option_factory.h"
#include "common/network/socket_option_impl.h"
#include "common/network/transport_socket_options_impl.h"
#include "common/network/utility.h"
Expand Down Expand Up @@ -71,8 +72,8 @@ class TestClusterManagerFactory : public ClusterManagerFactory {

Http::ConnectionPool::InstancePtr
allocateConnPool(Event::Dispatcher&, HostConstSharedPtr host, ResourcePriority, Http::Protocol,
const Network::ConnectionSocket::OptionsSharedPtr&) override {
return Http::ConnectionPool::InstancePtr{allocateConnPool_(host)};
const Network::ConnectionSocket::OptionsSharedPtr& options) override {
return Http::ConnectionPool::InstancePtr{allocateConnPool_(host, options)};
}

Tcp::ConnectionPool::InstancePtr
Expand Down Expand Up @@ -101,7 +102,9 @@ class TestClusterManagerFactory : public ClusterManagerFactory {

MOCK_METHOD1(clusterManagerFromProto_,
ClusterManager*(const envoy::config::bootstrap::v2::Bootstrap& bootstrap));
MOCK_METHOD1(allocateConnPool_, Http::ConnectionPool::Instance*(HostConstSharedPtr host));
MOCK_METHOD2(allocateConnPool_,
Http::ConnectionPool::Instance*(HostConstSharedPtr host,
Network::ConnectionSocket::OptionsSharedPtr));
MOCK_METHOD1(allocateTcpConnPool_, Tcp::ConnectionPool::Instance*(HostConstSharedPtr host));
MOCK_METHOD4(clusterFromProto_,
ClusterSharedPtr(const envoy::api::v2::Cluster& cluster, ClusterManager& cm,
Expand Down Expand Up @@ -1202,7 +1205,7 @@ TEST_F(ClusterManagerImplTest, DynamicAddRemove) {
EXPECT_EQ(cluster2->info_, cluster_manager_->get("fake_cluster")->info());
EXPECT_EQ(1UL, cluster_manager_->clusters().size());
Http::ConnectionPool::MockInstance* cp = new Http::ConnectionPool::MockInstance();
EXPECT_CALL(factory_, allocateConnPool_(_)).WillOnce(Return(cp));
EXPECT_CALL(factory_, allocateConnPool_(_, _)).WillOnce(Return(cp));
EXPECT_EQ(cp, cluster_manager_->httpConnPoolForCluster("fake_cluster", ResourcePriority::Default,
Http::Protocol::Http11, nullptr));

Expand Down Expand Up @@ -1357,7 +1360,7 @@ TEST_F(ClusterManagerImplTest, CloseHttpConnectionsOnHealthFailure) {
}));
create(parseBootstrapFromJson(json));

EXPECT_CALL(factory_, allocateConnPool_(_)).WillOnce(Return(cp1));
EXPECT_CALL(factory_, allocateConnPool_(_, _)).WillOnce(Return(cp1));
cluster_manager_->httpConnPoolForCluster("some_cluster", ResourcePriority::Default,
Http::Protocol::Http11, nullptr);

Expand All @@ -1368,7 +1371,7 @@ TEST_F(ClusterManagerImplTest, CloseHttpConnectionsOnHealthFailure) {
test_host->healthFlagSet(Host::HealthFlag::FAILED_OUTLIER_CHECK);
outlier_detector.runCallbacks(test_host);

EXPECT_CALL(factory_, allocateConnPool_(_)).WillOnce(Return(cp2));
EXPECT_CALL(factory_, allocateConnPool_(_, _)).WillOnce(Return(cp2));
cluster_manager_->httpConnPoolForCluster("some_cluster", ResourcePriority::High,
Http::Protocol::Http11, nullptr);
}
Expand Down Expand Up @@ -1624,7 +1627,7 @@ TEST_F(ClusterManagerImplTest, DynamicHostRemove) {
EXPECT_CALL(initialized, ready());
cluster_manager_->setInitializedCb([&]() -> void { initialized.ready(); });

EXPECT_CALL(factory_, allocateConnPool_(_))
EXPECT_CALL(factory_, allocateConnPool_(_, _))
.Times(4)
.WillRepeatedly(ReturnNew<Http::ConnectionPool::MockInstance>());

Expand Down Expand Up @@ -1788,7 +1791,7 @@ TEST_F(ClusterManagerImplTest, DynamicHostRemoveWithTls) {
EXPECT_CALL(initialized, ready());
cluster_manager_->setInitializedCb([&]() -> void { initialized.ready(); });

EXPECT_CALL(factory_, allocateConnPool_(_))
EXPECT_CALL(factory_, allocateConnPool_(_, _))
.Times(4)
.WillRepeatedly(ReturnNew<Http::ConnectionPool::MockInstance>());

Expand Down Expand Up @@ -1978,7 +1981,7 @@ TEST_F(ClusterManagerImplTest, DynamicHostRemoveDefaultPriority) {

dns_callback(TestUtility::makeDnsResponse({"127.0.0.2"}));

EXPECT_CALL(factory_, allocateConnPool_(_))
EXPECT_CALL(factory_, allocateConnPool_(_, _))
.WillOnce(ReturnNew<Http::ConnectionPool::MockInstance>());

EXPECT_CALL(factory_, allocateTcpConnPool_(_))
Expand Down Expand Up @@ -2055,7 +2058,7 @@ TEST_F(ClusterManagerImplTest, ConnPoolDestroyWithDraining) {
dns_callback(TestUtility::makeDnsResponse({"127.0.0.2"}));

MockConnPoolWithDestroy* mock_cp = new MockConnPoolWithDestroy();
EXPECT_CALL(factory_, allocateConnPool_(_)).WillOnce(Return(mock_cp));
EXPECT_CALL(factory_, allocateConnPool_(_, _)).WillOnce(Return(mock_cp));

MockTcpConnPoolWithDestroy* mock_tcp = new MockTcpConnPoolWithDestroy();
EXPECT_CALL(factory_, allocateTcpConnPool_(_)).WillOnce(Return(mock_tcp));
Expand Down Expand Up @@ -2120,7 +2123,7 @@ TEST_F(ClusterManagerImplTest, OriginalDstInitialization) {
// Tests that all the HC/weight/metadata changes are delivered in one go, as long as
// there's no hosts changes in between.
// Also tests that if hosts are added/removed between mergeable updates, delivery will
// happen and the scheduled update will be canceled.
// happen and the scheduled update will be cancelled.
TEST_F(ClusterManagerImplTest, MergedUpdates) {
createWithLocalClusterUpdate();

Expand Down Expand Up @@ -2486,6 +2489,75 @@ TEST_F(ClusterManagerImplTest, MergedUpdatesDestroyedOnUpdate) {
EXPECT_EQ(0, factory_.stats_.gauge("cluster_manager.warming_clusters").value());
}

TEST_F(ClusterManagerImplTest, UpstreamSocketOptionsPassedToConnPool) {
createWithLocalClusterUpdate();
NiceMock<MockLoadBalancerContext> context;

Http::ConnectionPool::MockInstance* to_create = new Http::ConnectionPool::MockInstance();
Network::Socket::OptionsSharedPtr options_to_return =
Network::SocketOptionFactory::buildIpTransparentOptions();

EXPECT_CALL(context, upstreamSocketOptions()).WillOnce(Return(options_to_return));
EXPECT_CALL(factory_, allocateConnPool_(_, _)).WillOnce(Return(to_create));

Http::ConnectionPool::Instance* cp = cluster_manager_->httpConnPoolForCluster(
"cluster_1", ResourcePriority::Default, Http::Protocol::Http11, &context);

EXPECT_NE(nullptr, cp);
}

TEST_F(ClusterManagerImplTest, UpstreamSocketOptionsUsedInConnPoolHash) {
createWithLocalClusterUpdate();
NiceMock<MockLoadBalancerContext> context1;
NiceMock<MockLoadBalancerContext> context2;

Http::ConnectionPool::MockInstance* to_create1 = new Http::ConnectionPool::MockInstance();
Http::ConnectionPool::MockInstance* to_create2 = new Http::ConnectionPool::MockInstance();
Network::Socket::OptionsSharedPtr options1 =
Network::SocketOptionFactory::buildIpTransparentOptions();
Network::Socket::OptionsSharedPtr options2 =
Network::SocketOptionFactory::buildSocketMarkOptions(3);

EXPECT_CALL(context1, upstreamSocketOptions()).WillRepeatedly(Return(options1));
EXPECT_CALL(context2, upstreamSocketOptions()).WillRepeatedly(Return(options2));
EXPECT_CALL(factory_, allocateConnPool_(_, _)).WillOnce(Return(to_create1));

Http::ConnectionPool::Instance* cp1 = cluster_manager_->httpConnPoolForCluster(
"cluster_1", ResourcePriority::Default, Http::Protocol::Http11, &context1);

EXPECT_CALL(factory_, allocateConnPool_(_, _)).WillOnce(Return(to_create2));
Http::ConnectionPool::Instance* cp2 = cluster_manager_->httpConnPoolForCluster(
"cluster_1", ResourcePriority::Default, Http::Protocol::Http11, &context2);

Http::ConnectionPool::Instance* should_be_cp1 = cluster_manager_->httpConnPoolForCluster(
"cluster_1", ResourcePriority::Default, Http::Protocol::Http11, &context1);
Http::ConnectionPool::Instance* should_be_cp2 = cluster_manager_->httpConnPoolForCluster(
"cluster_1", ResourcePriority::Default, Http::Protocol::Http11, &context2);

// The different upstream options should lead to different hashKeys, thus different pools.
EXPECT_NE(cp1, cp2);

// Reusing the same options should lead to the same connection pools.
EXPECT_EQ(cp1, should_be_cp1);
EXPECT_EQ(cp2, should_be_cp2);
}

TEST_F(ClusterManagerImplTest, UpstreamSocketOptionsNullIsOkay) {
createWithLocalClusterUpdate();
NiceMock<MockLoadBalancerContext> context;

Http::ConnectionPool::MockInstance* to_create = new Http::ConnectionPool::MockInstance();
Network::Socket::OptionsSharedPtr options_to_return = nullptr;

EXPECT_CALL(context, upstreamSocketOptions()).WillOnce(Return(options_to_return));
EXPECT_CALL(factory_, allocateConnPool_(_, _)).WillOnce(Return(to_create));

Http::ConnectionPool::Instance* cp = cluster_manager_->httpConnPoolForCluster(
"cluster_1", ResourcePriority::Default, Http::Protocol::Http11, &context);

EXPECT_NE(nullptr, cp);
}

class ClusterManagerInitHelperTest : public testing::Test {
public:
MOCK_METHOD1(onClusterInit, void(Cluster& cluster));
Expand Down
2 changes: 2 additions & 0 deletions test/mocks/http/mocks.h
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,8 @@ class MockStreamDecoderFilterCallbacks : public StreamDecoderFilterCallbacks,
MOCK_METHOD1(setDecoderBufferLimit, void(uint32_t));
MOCK_METHOD0(decoderBufferLimit, uint32_t());
MOCK_METHOD0(recreateStream, bool());
MOCK_METHOD1(addUpstreamSocketOptions, void(const Network::Socket::OptionsSharedPtr& options));
MOCK_CONST_METHOD0(getUpstreamSocketOptions, Network::Socket::OptionsSharedPtr());

// Http::StreamDecoderFilterCallbacks
void sendLocalReply(Code code, absl::string_view body,
Expand Down
10 changes: 9 additions & 1 deletion test/mocks/upstream/load_balancer_context.cc
Original file line number Diff line number Diff line change
@@ -1,9 +1,17 @@
#include "test/mocks/upstream/load_balancer_context.h"

using testing::_;
using testing::ReturnRef;

namespace Envoy {
namespace Upstream {

MockLoadBalancerContext::MockLoadBalancerContext() = default;
MockLoadBalancerContext::MockLoadBalancerContext() {
// By default, set loads which treat everything as healthy in the first priority.
priority_load_.healthy_priority_load_ = HealthyLoad({100});
priority_load_.degraded_priority_load_ = DegradedLoad({0});
ON_CALL(*this, determinePriorityLoad(_, _)).WillByDefault(ReturnRef(priority_load_));
}

MockLoadBalancerContext::~MockLoadBalancerContext() = default;

Expand Down
Loading