From 614b0195a7aca53de41b1c489ad555a0d5b76073 Mon Sep 17 00:00:00 2001 From: He Jie Xu Date: Mon, 16 May 2022 01:50:28 +0000 Subject: [PATCH 01/16] Ensure connection balancer works with multi addresses Signed-off-by: He Jie Xu --- envoy/network/connection_handler.h | 10 +- envoy/network/listener.h | 5 +- source/server/active_tcp_listener.cc | 27 +- source/server/active_tcp_listener.h | 11 +- source/server/admin/admin.h | 4 +- source/server/connection_handler_impl.cc | 29 +- source/server/connection_handler_impl.h | 5 +- source/server/listener_impl.cc | 23 +- source/server/listener_impl.h | 10 +- .../proxy_protocol_regression_test.cc | 4 +- .../common/fuzz/listener_filter_fuzzer.h | 4 +- .../proxy_protocol/proxy_protocol_test.cc | 8 +- test/integration/fake_upstream.h | 4 +- test/mocks/network/mocks.h | 2 +- test/server/active_tcp_listener_test.cc | 126 ++++++- test/server/connection_handler_test.cc | 356 ++++++------------ 16 files changed, 313 insertions(+), 315 deletions(-) diff --git a/envoy/network/connection_handler.h b/envoy/network/connection_handler.h index eb88410aba69e..7c0e11f936e49 100644 --- a/envoy/network/connection_handler.h +++ b/envoy/network/connection_handler.h @@ -13,6 +13,8 @@ #include "source/common/common/interval_value.h" +#include "address.h" + namespace Envoy { namespace Network { @@ -175,14 +177,6 @@ class TcpConnectionHandler : public virtual ConnectionHandler { public: virtual Event::Dispatcher& dispatcher() PURE; - /** - * Obtain the rebalancer of the tcp listener. - * @param listener_tag supplies the tag of the tcp listener that was passed to addListener(). - * @return BalancedConnectionHandlerOptRef the balancer attached to the listener. `nullopt` if - * listener doesn't exist or rebalancer doesn't exist. - */ - virtual BalancedConnectionHandlerOptRef getBalancedHandlerByTag(uint64_t listener_tag) PURE; - /** * Obtain the rebalancer of the tcp listener. * @param address supplies the address of the tcp listener. diff --git a/envoy/network/listener.h b/envoy/network/listener.h index 62dbfd4759cc9..e90086e61237e 100644 --- a/envoy/network/listener.h +++ b/envoy/network/listener.h @@ -20,6 +20,8 @@ #include "source/common/common/interval_value.h" +#include "address.h" + namespace Envoy { namespace Network { @@ -221,10 +223,11 @@ class ListenerConfig { virtual envoy::config::core::v3::TrafficDirection direction() const PURE; /** + * @param address is used for query the address specific connection balancer. * @return the connection balancer for this listener. All listeners have a connection balancer, * though the implementation may be a NOP balancer. */ - virtual ConnectionBalancer& connectionBalancer() PURE; + virtual ConnectionBalancer& connectionBalancer(const Network::Address::Instance& address) PURE; /** * Open connection resources for this listener. diff --git a/source/server/active_tcp_listener.cc b/source/server/active_tcp_listener.cc index 3581e72ad58ad..89c7882107b03 100644 --- a/source/server/active_tcp_listener.cc +++ b/source/server/active_tcp_listener.cc @@ -14,27 +14,32 @@ namespace Server { ActiveTcpListener::ActiveTcpListener(Network::TcpConnectionHandler& parent, Network::ListenerConfig& config, Runtime::Loader& runtime, - Network::SocketSharedPtr&& socket) + Network::SocketSharedPtr&& socket, + Network::Address::InstanceConstSharedPtr& address, + Network::ConnectionBalancer& connection_balancer) : OwnedActiveStreamListenerBase( parent, parent.dispatcher(), parent.dispatcher().createListener(std::move(socket), *this, runtime, config.bindToPort(), config.ignoreGlobalConnLimit()), config), - tcp_conn_handler_(parent) { - config.connectionBalancer().registerHandler(*this); + tcp_conn_handler_(parent), connection_balancer_(connection_balancer), address_(address) { + connection_balancer_.registerHandler(*this); } ActiveTcpListener::ActiveTcpListener(Network::TcpConnectionHandler& parent, Network::ListenerPtr&& listener, - Network::ListenerConfig& config, Runtime::Loader&) + Network::Address::InstanceConstSharedPtr& address, + Network::ListenerConfig& config, + Network::ConnectionBalancer& connection_balancer, + Runtime::Loader&) : OwnedActiveStreamListenerBase(parent, parent.dispatcher(), std::move(listener), config), - tcp_conn_handler_(parent) { - config.connectionBalancer().registerHandler(*this); + tcp_conn_handler_(parent), connection_balancer_(connection_balancer), address_(address) { + connection_balancer_.registerHandler(*this); } ActiveTcpListener::~ActiveTcpListener() { is_deleting_ = true; - config_->connectionBalancer().unregisterHandler(*this); + connection_balancer_.unregisterHandler(*this); // Purge sockets that have not progressed to connections. This should only happen when // a listener filter stops iteration and never resumes. @@ -64,7 +69,6 @@ ActiveTcpListener::~ActiveTcpListener() { void ActiveTcpListener::updateListenerConfig(Network::ListenerConfig& config) { ENVOY_LOG(trace, "replacing listener ", config_->listenerTag(), " by ", config.listenerTag()); - ASSERT(&config_->connectionBalancer() == &config.connectionBalancer()); config_ = &config; } @@ -97,7 +101,7 @@ void ActiveTcpListener::onAcceptWorker(Network::ConnectionSocketPtr&& socket, bool rebalanced) { if (!rebalanced) { Network::BalancedConnectionHandler& target_handler = - config_->connectionBalancer().pickTargetHandler(*this); + connection_balancer_.pickTargetHandler(*this); if (&target_handler != this) { target_handler.post(std::move(socket)); return; @@ -152,10 +156,9 @@ void ActiveTcpListener::post(Network::ConnectionSocketPtr&& socket) { RebalancedSocketSharedPtr socket_to_rebalance = std::make_shared(); socket_to_rebalance->socket = std::move(socket); - dispatcher().post([socket_to_rebalance, tag = config_->listenerTag(), - &tcp_conn_handler = tcp_conn_handler_, + dispatcher().post([socket_to_rebalance, address = address_, &tcp_conn_handler = tcp_conn_handler_, handoff = config_->handOffRestoredDestinationConnections()]() { - auto balanced_handler = tcp_conn_handler.getBalancedHandlerByTag(tag); + auto balanced_handler = tcp_conn_handler.getBalancedHandlerByAddress(*address); if (balanced_handler.has_value()) { balanced_handler->get().onAcceptWorker(std::move(socket_to_rebalance->socket), handoff, true); return; diff --git a/source/server/active_tcp_listener.h b/source/server/active_tcp_listener.h index 6e9fd2c2a541b..f0a0ae49bd4a9 100644 --- a/source/server/active_tcp_listener.h +++ b/source/server/active_tcp_listener.h @@ -27,9 +27,13 @@ class ActiveTcpListener final : public Network::TcpListenerCallbacks, public Network::BalancedConnectionHandler { public: ActiveTcpListener(Network::TcpConnectionHandler& parent, Network::ListenerConfig& config, - Runtime::Loader& runtime, Network::SocketSharedPtr&& socket); + Runtime::Loader& runtime, Network::SocketSharedPtr&& socket, + Network::Address::InstanceConstSharedPtr& address, + Network::ConnectionBalancer& connection_balancer); ActiveTcpListener(Network::TcpConnectionHandler& parent, Network::ListenerPtr&& listener, - Network::ListenerConfig& config, Runtime::Loader& runtime); + Network::Address::InstanceConstSharedPtr& address, + Network::ListenerConfig& config, + Network::ConnectionBalancer& connection_balancer, Runtime::Loader& runtime); ~ActiveTcpListener() override; bool listenerConnectionLimitReached() const { @@ -81,6 +85,9 @@ class ActiveTcpListener final : public Network::TcpListenerCallbacks, // The number of connections currently active on this listener. This is typically used for // connection balancing across per-handler listeners. std::atomic num_listener_connections_{}; + + Network::ConnectionBalancer& connection_balancer_; + Network::Address::InstanceConstSharedPtr address_; }; using ActiveTcpListenerOptRef = absl::optional>; diff --git a/source/server/admin/admin.h b/source/server/admin/admin.h index 934c9ee45260a..37fad51615c83 100644 --- a/source/server/admin/admin.h +++ b/source/server/admin/admin.h @@ -413,7 +413,9 @@ class AdminImpl : public Admin, envoy::config::core::v3::TrafficDirection direction() const override { return envoy::config::core::v3::UNSPECIFIED; } - Network::ConnectionBalancer& connectionBalancer() override { return connection_balancer_; } + Network::ConnectionBalancer& connectionBalancer(const Network::Address::Instance&) override { + return connection_balancer_; + } ResourceLimit& openConnections() override { return open_connections_; } const std::vector& accessLogs() const override { return empty_access_logs_; diff --git a/source/server/connection_handler_impl.cc b/source/server/connection_handler_impl.cc index 93c5ef1357015..f3492bb01b7a7 100644 --- a/source/server/connection_handler_impl.cc +++ b/source/server/connection_handler_impl.cc @@ -67,8 +67,9 @@ void ConnectionHandlerImpl::addListener(absl::optional overridden_list local_registry->createActiveInternalListener(*this, config, dispatcher()); // TODO(soulxu): support multiple internal addresses in listener in the future. ASSERT(config.listenSocketFactories().size() == 1); - details->addActiveListener(config, config.listenSocketFactories()[0], listener_reject_fraction_, - disable_listeners_, std::move(internal_listener)); + details->addActiveListener(config, config.listenSocketFactories()[0]->localAddress(), + listener_reject_fraction_, disable_listeners_, + std::move(internal_listener)); } else if (config.listenSocketFactories()[0]->socketType() == Network::Socket::Type::Stream) { if (!support_udp_in_place_filter_chain_update && overridden_listener.has_value()) { if (auto iter = listener_map_by_tag_.find(overridden_listener.value()); @@ -82,20 +83,22 @@ void ConnectionHandlerImpl::addListener(absl::optional overridden_list IS_ENVOY_BUG("unexpected"); } for (auto& socket_factory : config.listenSocketFactories()) { + auto address = socket_factory->localAddress(); // worker_index_ doesn't have a value on the main thread for the admin server. details->addActiveListener( - config, socket_factory, listener_reject_fraction_, disable_listeners_, + config, address, listener_reject_fraction_, disable_listeners_, std::make_unique( *this, config, runtime, - socket_factory->getListenSocket(worker_index_.has_value() ? *worker_index_ : 0))); + socket_factory->getListenSocket(worker_index_.has_value() ? *worker_index_ : 0), + address, config.connectionBalancer(*address))); } - } else { ASSERT(config.udpListenerConfig().has_value(), "UDP listener factory is not initialized."); ASSERT(worker_index_.has_value()); for (auto& socket_factory : config.listenSocketFactories()) { + auto address = socket_factory->localAddress(); details->addActiveListener( - config, socket_factory, listener_reject_fraction_, disable_listeners_, + config, address, listener_reject_fraction_, disable_listeners_, config.udpListenerConfig()->listenerFactory().createActiveUdpListener( runtime, *worker_index_, *this, config.listenSocketFactories()[0]->getListenSocket(*worker_index_), dispatcher_, @@ -318,20 +321,6 @@ ConnectionHandlerImpl::findActiveListenerByTag(uint64_t listener_tag) { return absl::nullopt; } -Network::BalancedConnectionHandlerOptRef -ConnectionHandlerImpl::getBalancedHandlerByTag(uint64_t listener_tag) { - auto active_listener = findActiveListenerByTag(listener_tag); - if (active_listener.has_value()) { - // TODO(soulxu): return first listener here, this will be changed - // when ConnectionBalancer supports the multiple addresses. - ASSERT(absl::holds_alternative>( - active_listener->get().per_address_details_list_[0]->typed_listener_) && - active_listener->get().per_address_details_list_[0]->listener_->listener() != nullptr); - return active_listener->get().per_address_details_list_[0]->tcpListener().value().get(); - } - return absl::nullopt; -} - Network::BalancedConnectionHandlerOptRef ConnectionHandlerImpl::getBalancedHandlerByAddress(const Network::Address::Instance& address) { // Only Ip address can be restored to original address and redirect. diff --git a/source/server/connection_handler_impl.h b/source/server/connection_handler_impl.h index 95ed03f04e928..0f020f58ec9a1 100644 --- a/source/server/connection_handler_impl.h +++ b/source/server/connection_handler_impl.h @@ -58,7 +58,6 @@ class ConnectionHandlerImpl : public Network::TcpConnectionHandler, // Network::TcpConnectionHandler Event::Dispatcher& dispatcher() override { return dispatcher_; } - Network::BalancedConnectionHandlerOptRef getBalancedHandlerByTag(uint64_t listener_tag) override; Network::BalancedConnectionHandlerOptRef getBalancedHandlerByAddress(const Network::Address::Instance& address) override; @@ -107,13 +106,13 @@ class ConnectionHandlerImpl : public Network::TcpConnectionHandler, */ template void addActiveListener(Network::ListenerConfig& config, - const Network::ListenSocketFactoryPtr& socket_factory, + const Network::Address::InstanceConstSharedPtr& address, UnitFloat& listener_reject_fraction, bool disable_listeners, ActiveListener&& listener) { auto per_address_details = std::make_shared(); per_address_details->typed_listener_ = *listener; per_address_details->listener_ = std::move(listener); - per_address_details->address_ = socket_factory->localAddress(); + per_address_details->address_ = address; if (disable_listeners) { per_address_details->listener_->pauseListening(); } diff --git a/source/server/listener_impl.cc b/source/server/listener_impl.cc index 225416cbc0b76..3d5d8f397809f 100644 --- a/source/server/listener_impl.cc +++ b/source/server/listener_impl.cc @@ -430,7 +430,7 @@ ListenerImpl::ListenerImpl(ListenerImpl& origin, PROTOBUF_GET_MS_OR_DEFAULT(config, listener_filters_timeout, 15000)), continue_on_listener_filters_timeout_(config.continue_on_listener_filters_timeout()), udp_listener_config_(origin.udp_listener_config_), - connection_balancer_(origin.connection_balancer_), + connection_balancers_(origin.connection_balancers_), listener_factory_context_(std::make_shared( origin.listener_factory_context_->listener_factory_context_base_, this, *this)), filter_chain_manager_(address_, origin.listener_factory_context_->parentFactoryContext(), @@ -674,9 +674,9 @@ void ListenerImpl::buildFilterChains() { filter_chain_manager_); } -void ListenerImpl::buildSocketOptions() { - // TCP specific setup. - if (connection_balancer_ == nullptr) { +void ListenerImpl::buildConnectionBalancer(const Network::Address::Instance& address) { + auto iter = connection_balancers_.find(address.asString()); + if (iter == connection_balancers_.end() && socket_type_ == Network::Socket::Type::Stream) { #ifdef WIN32 // On Windows we use the exact connection balancer to dispatch connections // from worker 1 to all workers. This is a perf hit but it is the only way @@ -688,19 +688,24 @@ void ListenerImpl::buildSocketOptions() { "Envoy is running on Windows." "ExactBalance is used to load balance connections between workers on Windows.", config_.name()); - connection_balancer_ = std::make_shared(); + connection_balancers_.emplace(address.asString(), + std::make_shared()); #else // Not in place listener update. if (config_.has_connection_balance_config()) { // Currently exact balance is the only supported type and there are no options. ASSERT(config_.connection_balance_config().has_exact_balance()); - connection_balancer_ = std::make_shared(); + connection_balancers_.emplace(address.asString(), + std::make_shared()); } else { - connection_balancer_ = std::make_shared(); + connection_balancers_.emplace(address.asString(), + std::make_shared()); } #endif } +} +void ListenerImpl::buildSocketOptions() { if (config_.has_tcp_fast_open_queue_length()) { addListenSocketOptions(Network::SocketOptionFactory::buildTcpFastOpenOptions( config_.tcp_fast_open_queue_length().value())); @@ -881,8 +886,8 @@ ListenerImpl::~ListenerImpl() { Init::Manager& ListenerImpl::initManager() { return *dynamic_init_manager_; } -void ListenerImpl::setSocketFactory(Network::ListenSocketFactoryPtr&& socket_factory) { - ASSERT(socket_factories_.empty()); +void ListenerImpl::addSocketFactory(Network::ListenSocketFactoryPtr&& socket_factory) { + buildConnectionBalancer(*socket_factory->localAddress()); socket_factories_.emplace_back(std::move(socket_factory)); } diff --git a/source/server/listener_impl.h b/source/server/listener_impl.h index 898be592b17e0..59693b4037b9b 100644 --- a/source/server/listener_impl.h +++ b/source/server/listener_impl.h @@ -346,7 +346,12 @@ class ListenerImpl final : public Network::ListenerConfig, return internal_listener_config_ != nullptr ? *internal_listener_config_ : Network::InternalListenerConfigOptRef(); } - Network::ConnectionBalancer& connectionBalancer() override { return *connection_balancer_; } + Network::ConnectionBalancer& + connectionBalancer(const Network::Address::Instance& address) override { + auto balancer = connection_balancers_.find(address.asString()); + ASSERT(balancer != connection_balancers_.end()); + return *balancer->second; + } ResourceLimit& openConnections() override { return *open_connections_; } const std::vector& accessLogs() const override { return access_logs_; @@ -422,6 +427,7 @@ class ListenerImpl final : public Network::ListenerConfig, void createListenerFilterFactories(Network::Socket::Type socket_type); void validateFilterChains(Network::Socket::Type socket_type); void buildFilterChains(); + void buildConnectionBalancer(const Network::Address::Instance& address); void buildSocketOptions(); void buildOriginalDstListenerFilter(); void buildProxyProtocolListenerFilter(); @@ -464,7 +470,7 @@ class ListenerImpl final : public Network::ListenerConfig, const bool continue_on_listener_filters_timeout_; std::shared_ptr udp_listener_config_; std::unique_ptr internal_listener_config_; - Network::ConnectionBalancerSharedPtr connection_balancer_; + absl::flat_hash_map connection_balancers_; std::shared_ptr listener_factory_context_; FilterChainManagerImpl filter_chain_manager_; const bool reuse_port_; diff --git a/test/extensions/common/proxy_protocol/proxy_protocol_regression_test.cc b/test/extensions/common/proxy_protocol/proxy_protocol_regression_test.cc index 090f76a0fa3f1..e44ddab6a6801 100644 --- a/test/extensions/common/proxy_protocol/proxy_protocol_regression_test.cc +++ b/test/extensions/common/proxy_protocol/proxy_protocol_regression_test.cc @@ -88,7 +88,9 @@ class ProxyProtocolRegressionTest : public testing::TestWithParam& accessLogs() const override { return empty_access_logs_; } diff --git a/test/extensions/filters/listener/common/fuzz/listener_filter_fuzzer.h b/test/extensions/filters/listener/common/fuzz/listener_filter_fuzzer.h index 49ff988d2af67..ae87c10b385d7 100644 --- a/test/extensions/filters/listener/common/fuzz/listener_filter_fuzzer.h +++ b/test/extensions/filters/listener/common/fuzz/listener_filter_fuzzer.h @@ -70,7 +70,9 @@ class ListenerFilterWithDataFuzzer : public Network::ListenerConfig, envoy::config::core::v3::TrafficDirection direction() const override { return envoy::config::core::v3::UNSPECIFIED; } - Network::ConnectionBalancer& connectionBalancer() override { return connection_balancer_; } + Network::ConnectionBalancer& connectionBalancer(const Network::Address::Instance&) override { + return connection_balancer_; + } const std::vector& accessLogs() const override { return empty_access_logs_; } diff --git a/test/extensions/filters/listener/proxy_protocol/proxy_protocol_test.cc b/test/extensions/filters/listener/proxy_protocol/proxy_protocol_test.cc index 033e44c865878..0aba05e58a98e 100644 --- a/test/extensions/filters/listener/proxy_protocol/proxy_protocol_test.cc +++ b/test/extensions/filters/listener/proxy_protocol/proxy_protocol_test.cc @@ -105,7 +105,9 @@ class ProxyProtocolTest : public testing::TestWithParam& accessLogs() const override { return empty_access_logs_; } @@ -1806,7 +1808,9 @@ class WildcardProxyProtocolTest : public testing::TestWithParam& accessLogs() const override { return empty_access_logs_; } diff --git a/test/integration/fake_upstream.h b/test/integration/fake_upstream.h index fe808a2b5ec70..bd25aa00608cf 100644 --- a/test/integration/fake_upstream.h +++ b/test/integration/fake_upstream.h @@ -813,7 +813,9 @@ class FakeUpstream : Logger::Loggable, Network::InternalListenerConfigOptRef internalListenerConfig() override { return Network::InternalListenerConfigOptRef(); } - Network::ConnectionBalancer& connectionBalancer() override { return connection_balancer_; } + Network::ConnectionBalancer& connectionBalancer(const Network::Address::Instance&) override { + return connection_balancer_; + } envoy::config::core::v3::TrafficDirection direction() const override { return envoy::config::core::v3::UNSPECIFIED; } diff --git a/test/mocks/network/mocks.h b/test/mocks/network/mocks.h index 479f4885db22f..ed047f79da852 100644 --- a/test/mocks/network/mocks.h +++ b/test/mocks/network/mocks.h @@ -438,7 +438,7 @@ class MockListenerConfig : public ListenerConfig { MOCK_METHOD(const std::string&, name, (), (const)); MOCK_METHOD(Network::UdpListenerConfigOptRef, udpListenerConfig, ()); MOCK_METHOD(InternalListenerConfigOptRef, internalListenerConfig, ()); - MOCK_METHOD(ConnectionBalancer&, connectionBalancer, ()); + MOCK_METHOD(ConnectionBalancer&, connectionBalancer, (const Network::Address::Instance&)); MOCK_METHOD(ResourceLimit&, openConnections, ()); MOCK_METHOD(uint32_t, tcpBacklogSize, (), (const)); MOCK_METHOD(Init::Manager&, initManager, ()); diff --git a/test/server/active_tcp_listener_test.cc b/test/server/active_tcp_listener_test.cc index a053999643188..9a42c5ed63369 100644 --- a/test/server/active_tcp_listener_test.cc +++ b/test/server/active_tcp_listener_test.cc @@ -35,8 +35,6 @@ class MockTcpConnectionHandler : public Network::TcpConnectionHandler, public Network::MockConnectionHandler { public: MOCK_METHOD(Event::Dispatcher&, dispatcher, ()); - MOCK_METHOD(Network::BalancedConnectionHandlerOptRef, getBalancedHandlerByTag, - (uint64_t listener_tag)); MOCK_METHOD(Network::BalancedConnectionHandlerOptRef, getBalancedHandlerByAddress, (const Network::Address::Instance& address)); }; @@ -51,7 +49,6 @@ class ActiveTcpListenerTest : public testing::Test, protected Logger::Loggable>(); EXPECT_CALL(*generic_listener_, onDestroy()); - generic_active_listener_ = std::make_unique( - conn_handler_, std::move(generic_listener_), listener_config_, runtime_); + Network::Address::InstanceConstSharedPtr address( + new Network::Address::Ipv4Instance("127.0.0.1", 10001)); + generic_active_listener_ = + std::make_unique(conn_handler_, std::move(generic_listener_), address, + listener_config_, balancer_, runtime_); generic_active_listener_->incNumConnections(); generic_accepted_socket_ = std::make_unique>(); EXPECT_CALL(*generic_accepted_socket_, ioHandle()).WillRepeatedly(ReturnRef(io_handle_)); @@ -90,8 +90,11 @@ class ActiveTcpListenerTest : public testing::Test, protected Logger::Loggable>(); EXPECT_CALL(*generic_listener_, onDestroy()); - generic_active_listener_ = std::make_unique( - conn_handler_, std::move(generic_listener_), listener_config_, runtime_); + Network::Address::InstanceConstSharedPtr address( + new Network::Address::Ipv4Instance("127.0.0.1", 10001)); + generic_active_listener_ = + std::make_unique(conn_handler_, std::move(generic_listener_), address, + listener_config_, balancer_, runtime_); generic_active_listener_->incNumConnections(); generic_accepted_socket_ = std::make_unique>(); EXPECT_CALL(*generic_accepted_socket_, ioHandle()).WillRepeatedly(ReturnRef(io_handle_)); @@ -220,8 +223,11 @@ TEST_F(ActiveTcpListenerTest, ListenerFilterWithInspectDataMultipleFilters) { auto listener = std::make_unique>(); EXPECT_CALL(*listener, onDestroy()); - auto active_listener = std::make_unique(conn_handler_, std::move(listener), - listener_config_, runtime_); + Network::NopConnectionBalancerImpl balancer; + Network::Address::InstanceConstSharedPtr address( + new Network::Address::Ipv4Instance("127.0.0.1", 10001)); + auto active_listener = std::make_unique( + conn_handler_, std::move(listener), address, listener_config_, balancer, runtime_); auto accepted_socket = std::make_unique>(); EXPECT_CALL(*accepted_socket, ioHandle()).WillRepeatedly(ReturnRef(io_handle_)); @@ -309,8 +315,11 @@ TEST_F(ActiveTcpListenerTest, ListenerFilterWithInspectDataMultipleFilters2) { auto listener = std::make_unique>(); EXPECT_CALL(*listener, onDestroy()); - auto active_listener = std::make_unique(conn_handler_, std::move(listener), - listener_config_, runtime_); + Network::NopConnectionBalancerImpl balancer; + Network::Address::InstanceConstSharedPtr address( + new Network::Address::Ipv4Instance("127.0.0.1", 10001)); + auto active_listener = std::make_unique( + conn_handler_, std::move(listener), address, listener_config_, balancer, runtime_); auto accepted_socket = std::make_unique>(); EXPECT_CALL(*accepted_socket, ioHandle()).WillRepeatedly(ReturnRef(io_handle_)); @@ -477,7 +486,6 @@ TEST_F(ActiveTcpListenerTest, RedirectedRebalancer) { Network::Address::InstanceConstSharedPtr normal_address( new Network::Address::Ipv4Instance("127.0.0.1", 10001)); EXPECT_CALL(*socket_factory_, localAddress()).WillRepeatedly(ReturnRef(normal_address)); - EXPECT_CALL(listener_config1, connectionBalancer()).WillRepeatedly(ReturnRef(balancer1)); EXPECT_CALL(listener_config1, listenerScope).Times(testing::AnyNumber()); EXPECT_CALL(listener_config1, listenerFiltersTimeout()); EXPECT_CALL(listener_config1, continueOnListenerFiltersTimeout()); @@ -488,8 +496,9 @@ TEST_F(ActiveTcpListenerTest, RedirectedRebalancer) { auto mock_listener_will_be_moved1 = std::make_unique(); auto& listener1 = *mock_listener_will_be_moved1; - auto active_listener1 = std::make_unique( - conn_handler_, std::move(mock_listener_will_be_moved1), listener_config1, runtime_); + auto active_listener1 = + std::make_unique(conn_handler_, std::move(mock_listener_will_be_moved1), + normal_address, listener_config1, balancer1, runtime_); NiceMock listener_config2; Network::MockConnectionBalancer balancer2; @@ -500,7 +509,6 @@ TEST_F(ActiveTcpListenerTest, RedirectedRebalancer) { new Network::Address::Ipv4Instance("127.0.0.2", 20002)); EXPECT_CALL(*socket_factory_, localAddress()).WillRepeatedly(ReturnRef(alt_address)); EXPECT_CALL(listener_config2, listenerFiltersTimeout()); - EXPECT_CALL(listener_config2, connectionBalancer()).WillRepeatedly(ReturnRef(balancer2)); EXPECT_CALL(listener_config2, listenerScope).Times(testing::AnyNumber()); EXPECT_CALL(listener_config2, handOffRestoredDestinationConnections()) .WillRepeatedly(Return(false)); @@ -509,8 +517,9 @@ TEST_F(ActiveTcpListenerTest, RedirectedRebalancer) { EXPECT_CALL(listener_config2, openConnections()).WillRepeatedly(ReturnRef(resource_limit_)); auto mock_listener_will_be_moved2 = std::make_unique(); auto& listener2 = *mock_listener_will_be_moved2; - auto active_listener2 = std::make_shared( - conn_handler_, std::move(mock_listener_will_be_moved2), listener_config2, runtime_); + auto active_listener2 = + std::make_shared(conn_handler_, std::move(mock_listener_will_be_moved2), + alt_address, listener_config2, balancer2, runtime_); auto* test_filter = new NiceMock(); EXPECT_CALL(*test_filter, destroy_()); @@ -582,6 +591,89 @@ TEST_F(ActiveTcpListenerTest, RedirectedRebalancer) { EXPECT_CALL(listener2, onDestroy()); active_listener2.reset(); } + +TEST_F(ActiveTcpListenerTest, Rebalance) { + NiceMock listener_config1; + NiceMock balancer1; + EXPECT_CALL(balancer1, registerHandler(_)).Times(2); + EXPECT_CALL(balancer1, unregisterHandler(_)).Times(2); + + Network::Address::InstanceConstSharedPtr normal_address( + new Network::Address::Ipv4Instance("127.0.0.1", 10001)); + EXPECT_CALL(*socket_factory_, localAddress()).WillRepeatedly(ReturnRef(normal_address)); + EXPECT_CALL(listener_config1, listenerScope).Times(testing::AnyNumber()); + EXPECT_CALL(listener_config1, listenerFiltersTimeout()); + EXPECT_CALL(listener_config1, continueOnListenerFiltersTimeout()); + EXPECT_CALL(listener_config1, filterChainManager()).WillRepeatedly(ReturnRef(manager_)); + EXPECT_CALL(listener_config1, openConnections()).WillRepeatedly(ReturnRef(resource_limit_)); + EXPECT_CALL(listener_config1, handOffRestoredDestinationConnections()) + .WillRepeatedly(Return(true)); + + auto mock_listener_will_be_moved1 = std::make_unique(); + auto& listener1 = *mock_listener_will_be_moved1; + auto active_listener1 = + std::make_unique(conn_handler_, std::move(mock_listener_will_be_moved1), + normal_address, listener_config1, balancer1, runtime_); + + NiceMock listener_config2; + + EXPECT_CALL(*socket_factory_, localAddress()).WillRepeatedly(ReturnRef(normal_address)); + EXPECT_CALL(listener_config2, listenerFiltersTimeout()); + EXPECT_CALL(listener_config2, listenerScope).Times(testing::AnyNumber()); + EXPECT_CALL(listener_config2, handOffRestoredDestinationConnections()) + .WillRepeatedly(Return(false)); + EXPECT_CALL(listener_config2, continueOnListenerFiltersTimeout()); + EXPECT_CALL(listener_config2, filterChainManager()).WillRepeatedly(ReturnRef(manager_)); + EXPECT_CALL(listener_config2, openConnections()).WillRepeatedly(ReturnRef(resource_limit_)); + auto mock_listener_will_be_moved2 = std::make_unique(); + auto& listener2 = *mock_listener_will_be_moved2; + auto active_listener2 = + std::make_shared(conn_handler_, std::move(mock_listener_will_be_moved2), + normal_address, listener_config2, balancer1, runtime_); + Network::MockConnectionSocket* accepted_socket = new NiceMock(); + + // active_listener1 re-balance. Set the balance target to the the active_listener2. + EXPECT_CALL(balancer1, pickTargetHandler(_)) + .WillOnce(testing::DoAll(testing::WithArg<0>(Invoke([&active_listener2](auto&) { + active_listener2->incNumConnections(); + })), + ReturnRef(*active_listener2))); + + EXPECT_CALL(conn_handler_, getBalancedHandlerByAddress) + .WillOnce( + Invoke([&normal_address, &active_listener2](const Network::Address::Instance& address) { + EXPECT_EQ(address, *normal_address); + return Network::BalancedConnectionHandlerOptRef(*active_listener2); + })); + auto filter_factory_callback = std::make_shared>(); + auto transport_socket_factory = Network::Test::createRawBufferSocketFactory(); + filter_chain_ = std::make_shared>(); + + EXPECT_CALL(conn_handler_, incNumConnections()); + EXPECT_CALL(manager_, findFilterChain(_)).WillOnce(Return(filter_chain_.get())); + EXPECT_CALL(*filter_chain_, transportSocketFactory) + .WillOnce(testing::ReturnRef(*transport_socket_factory)); + EXPECT_CALL(*filter_chain_, networkFilterFactories).WillOnce(ReturnRef(*filter_factory_callback)); + EXPECT_CALL(listener_config2, filterChainFactory()) + .WillRepeatedly(ReturnRef(filter_chain_factory_)); + + auto* connection = new NiceMock(); + EXPECT_CALL(dispatcher_, createServerConnection_()).WillOnce(Return(connection)); + EXPECT_CALL(filter_chain_factory_, createNetworkFilterChain(_, _)).WillOnce(Return(true)); + active_listener1->onAccept(Network::ConnectionSocketPtr{accepted_socket}); + + // Verify per-listener connection stats. + EXPECT_EQ(1UL, conn_handler_.numConnections()); + + EXPECT_CALL(conn_handler_, decNumConnections()); + connection->close(Network::ConnectionCloseType::NoFlush); + + EXPECT_CALL(listener1, onDestroy()); + active_listener1.reset(); + EXPECT_CALL(listener2, onDestroy()); + active_listener2.reset(); +} + } // namespace } // namespace Server } // namespace Envoy diff --git a/test/server/connection_handler_test.cc b/test/server/connection_handler_test.cc index 8d936825c8f80..63929c661a9d6 100644 --- a/test/server/connection_handler_test.cc +++ b/test/server/connection_handler_test.cc @@ -149,7 +149,9 @@ class ConnectionHandlerTest : public testing::Test, protected Logger::Loggable& accessLogs() const override { return access_logs_; } @@ -225,6 +227,7 @@ class ConnectionHandlerTest : public testing::Test, protected Logger::Loggable connection_balancer = nullptr, Network::BalancedConnectionHandler** balanced_connection_handler = nullptr, Network::Socket::Type socket_type = Network::Socket::Type::Stream, @@ -245,6 +248,11 @@ class ConnectionHandlerTest : public testing::Test, protected Logger::LoggablesocketFactory(), socketType()).WillOnce(Return(socket_type)); + if (address == nullptr) { + address = local_address_; + } + EXPECT_CALL(listeners_.back()->socketFactory(), localAddress()) + .WillRepeatedly(ReturnRef(address)); EXPECT_CALL(listeners_.back()->socketFactory(), getListenSocket(_)) .WillOnce(Return(listeners_.back()->socket_)); if (socket_type == Network::Socket::Type::Stream) { @@ -291,6 +299,8 @@ class ConnectionHandlerTest : public testing::Test, protected Logger::LoggablesocketFactory(0), socketType()) .WillOnce(Return(Network::Socket::Type::Stream)); for (std::vector::size_type i = 0; i < mock_listeners.size(); i++) { + EXPECT_CALL(listeners_.back()->socketFactory(i), localAddress()) + .WillRepeatedly(ReturnRef(addresses[i])); EXPECT_CALL(listeners_.back()->socketFactory(i), getListenSocket(_)) .WillOnce(Return(listeners_.back()->socket_)); @@ -305,8 +315,6 @@ class ConnectionHandlerTest : public testing::Test, protected Logger::LoggablesocketFactory(i), localAddress()) - .WillRepeatedly(ReturnRef(addresses[i])); if (disable_listener) { EXPECT_CALL(*static_cast(mock_listeners[i]), disable()); } @@ -373,12 +381,14 @@ class ConnectionHandlerTest : public testing::Test, protected Logger::Loggable(); Network::BalancedConnectionHandler* current_handler; TestListener* test_listener = - addListener(1, true, false, "test_listener", listener, &listener_callbacks, + addListener(1, true, false, "test_listener", listener, &listener_callbacks, nullptr, connection_balancer, ¤t_handler); - EXPECT_CALL(test_listener->socketFactory(), localAddress()) - .WillRepeatedly(ReturnRef(local_address_)); handler_->addListener(absl::nullopt, *test_listener, runtime_); // Fake a balancer posting a connection to us. @@ -422,28 +430,25 @@ TEST_F(ConnectionHandlerTest, RemoveListenerDuringRebalance) { post_cb(); #endif } +*/ TEST_F(ConnectionHandlerTest, ListenerConnectionLimitEnforced) { Network::TcpListenerCallbacks* listener_callbacks1; auto listener1 = new NiceMock(); - TestListener* test_listener1 = - addListener(1, false, false, "test_listener1", listener1, &listener_callbacks1); Network::Address::InstanceConstSharedPtr normal_address( new Network::Address::Ipv4Instance("127.0.0.1", 10001)); - EXPECT_CALL(test_listener1->socketFactory(), localAddress()) - .WillRepeatedly(ReturnRef(normal_address)); + TestListener* test_listener1 = addListener(1, false, false, "test_listener1", listener1, + &listener_callbacks1, normal_address); // Only allow a single connection on this listener. test_listener1->setMaxConnections(1); handler_->addListener(absl::nullopt, *test_listener1, runtime_); auto listener2 = new NiceMock(); Network::TcpListenerCallbacks* listener_callbacks2; - TestListener* test_listener2 = - addListener(2, false, false, "test_listener2", listener2, &listener_callbacks2); Network::Address::InstanceConstSharedPtr alt_address( new Network::Address::Ipv4Instance("127.0.0.2", 20002)); - EXPECT_CALL(test_listener2->socketFactory(), localAddress()) - .WillRepeatedly(ReturnRef(alt_address)); + TestListener* test_listener2 = + addListener(2, false, false, "test_listener2", listener2, &listener_callbacks2, alt_address); // Do not allow any connections on this listener. test_listener2->setMaxConnections(0); handler_->addListener(absl::nullopt, *test_listener2, runtime_); @@ -515,8 +520,6 @@ TEST_F(ConnectionHandlerTest, RemoveListener) { auto listener = new NiceMock(); TestListener* test_listener = addListener(1, true, false, "test_listener", listener, &listener_callbacks); - EXPECT_CALL(test_listener->socketFactory(), localAddress()) - .WillRepeatedly(ReturnRef(local_address_)); handler_->addListener(absl::nullopt, *test_listener, runtime_); Network::MockConnectionSocket* connection = new NiceMock(); @@ -587,8 +590,6 @@ TEST_F(ConnectionHandlerTest, DisableListener) { auto listener = new NiceMock(); TestListener* test_listener = addListener(1, false, false, "test_listener", listener, &listener_callbacks); - EXPECT_CALL(test_listener->socketFactory(), localAddress()) - .WillRepeatedly(ReturnRef(local_address_)); handler_->addListener(absl::nullopt, *test_listener, runtime_); EXPECT_CALL(*listener, disable()); @@ -631,8 +632,6 @@ TEST_F(ConnectionHandlerTest, StopAndDisableStoppedListener) { auto listener = new NiceMock(); TestListener* test_listener = addListener(1, false, false, "test_listener", listener, &listener_callbacks); - EXPECT_CALL(test_listener->socketFactory(), localAddress()) - .WillRepeatedly(ReturnRef(local_address_)); handler_->addListener(absl::nullopt, *test_listener, runtime_); EXPECT_CALL(*listener, onDestroy()); @@ -652,8 +651,6 @@ TEST_F(ConnectionHandlerTest, AddDisabledListener) { auto listener = new NiceMock(); TestListener* test_listener = addListener(1, false, false, "test_listener", listener, &listener_callbacks); - EXPECT_CALL(test_listener->socketFactory(), localAddress()) - .WillRepeatedly(ReturnRef(local_address_)); EXPECT_CALL(*listener, disable()); EXPECT_CALL(*listener, onDestroy()); @@ -691,8 +688,6 @@ TEST_F(ConnectionHandlerTest, SetListenerRejectFraction) { auto listener = new NiceMock(); TestListener* test_listener = addListener(1, false, false, "test_listener", listener, &listener_callbacks); - EXPECT_CALL(test_listener->socketFactory(), localAddress()) - .WillRepeatedly(ReturnRef(local_address_)); handler_->addListener(absl::nullopt, *test_listener, runtime_); EXPECT_CALL(*listener, setRejectFraction(UnitFloat(0.1234f))); @@ -708,8 +703,6 @@ TEST_F(ConnectionHandlerTest, AddListenerSetRejectFraction) { auto listener = new NiceMock(); TestListener* test_listener = addListener(1, false, false, "test_listener", listener, &listener_callbacks); - EXPECT_CALL(test_listener->socketFactory(), localAddress()) - .WillRepeatedly(ReturnRef(local_address_)); EXPECT_CALL(*listener, setRejectFraction(UnitFloat(0.12345f))); EXPECT_CALL(*listener, onDestroy()); @@ -725,8 +718,6 @@ TEST_F(ConnectionHandlerTest, SetsTransportSocketConnectTimeout) { TestListener* test_listener = addListener(1, false, false, "test_listener", listener, &listener_callbacks); - EXPECT_CALL(test_listener->socketFactory(), localAddress()) - .WillRepeatedly(ReturnRef(local_address_)); handler_->addListener(absl::nullopt, *test_listener, runtime_); auto server_connection = new NiceMock(); @@ -751,8 +742,6 @@ TEST_F(ConnectionHandlerTest, DestroyCloseConnections) { auto listener = new NiceMock(); TestListener* test_listener = addListener(1, true, false, "test_listener", listener, &listener_callbacks); - EXPECT_CALL(test_listener->socketFactory(), localAddress()) - .WillRepeatedly(ReturnRef(local_address_)); handler_->addListener(absl::nullopt, *test_listener, runtime_); Network::MockConnectionSocket* connection = new NiceMock(); @@ -772,8 +761,6 @@ TEST_F(ConnectionHandlerTest, CloseDuringFilterChainCreate) { auto listener = new NiceMock(); TestListener* test_listener = addListener(1, true, false, "test_listener", listener, &listener_callbacks); - EXPECT_CALL(test_listener->socketFactory(), localAddress()) - .WillRepeatedly(ReturnRef(local_address_)); handler_->addListener(absl::nullopt, *test_listener, runtime_); EXPECT_CALL(manager_, findFilterChain(_)).WillOnce(Return(filter_chain_.get())); @@ -797,8 +784,6 @@ TEST_F(ConnectionHandlerTest, CloseConnectionOnEmptyFilterChain) { auto listener = new NiceMock(); TestListener* test_listener = addListener(1, true, false, "test_listener", listener, &listener_callbacks); - EXPECT_CALL(test_listener->socketFactory(), localAddress()) - .WillRepeatedly(ReturnRef(local_address_)); handler_->addListener(absl::nullopt, *test_listener, runtime_); EXPECT_CALL(manager_, findFilterChain(_)).WillOnce(Return(filter_chain_.get())); @@ -818,22 +803,18 @@ TEST_F(ConnectionHandlerTest, CloseConnectionOnEmptyFilterChain) { TEST_F(ConnectionHandlerTest, NormalRedirect) { Network::TcpListenerCallbacks* listener_callbacks1; auto listener1 = new NiceMock(); - TestListener* test_listener1 = - addListener(1, true, true, "test_listener1", listener1, &listener_callbacks1); Network::Address::InstanceConstSharedPtr normal_address( new Network::Address::Ipv4Instance("127.0.0.1", 10001)); - EXPECT_CALL(test_listener1->socketFactory(), localAddress()) - .WillRepeatedly(ReturnRef(normal_address)); + TestListener* test_listener1 = + addListener(1, true, true, "test_listener1", listener1, &listener_callbacks1, normal_address); handler_->addListener(absl::nullopt, *test_listener1, runtime_); Network::TcpListenerCallbacks* listener_callbacks2; auto listener2 = new NiceMock(); - TestListener* test_listener2 = - addListener(2, false, false, "test_listener2", listener2, &listener_callbacks2); Network::Address::InstanceConstSharedPtr alt_address( new Network::Address::Ipv4Instance("127.0.0.2", 20002)); - EXPECT_CALL(test_listener2->socketFactory(), localAddress()) - .WillRepeatedly(ReturnRef(alt_address)); + TestListener* test_listener2 = + addListener(2, false, false, "test_listener2", listener2, &listener_callbacks2, alt_address); handler_->addListener(absl::nullopt, *test_listener2, runtime_); auto* test_filter = new NiceMock(); @@ -956,36 +937,30 @@ TEST_F(ConnectionHandlerTest, MatchLatestListener) { auto listener1 = new NiceMock(); TestListener* test_listener1 = addListener(1, true, true, "test_listener1", listener1, &listener_callbacks); - EXPECT_CALL(test_listener1->socketFactory(), localAddress()) - .WillRepeatedly(ReturnRef(local_address_)); handler_->addListener(absl::nullopt, *test_listener1, runtime_); // Listener2 will be replaced by Listener3. auto listener2_overridden_filter_chain_manager = std::make_shared>(); auto listener2 = new NiceMock(); - TestListener* test_listener2 = - addListener(2, false, false, "test_listener2", listener2, nullptr, nullptr, nullptr, - Network::Socket::Type::Stream, std::chrono::milliseconds(15000), false, - listener2_overridden_filter_chain_manager); Network::Address::InstanceConstSharedPtr listener2_address( new Network::Address::Ipv4Instance("127.0.0.1", 10002)); - EXPECT_CALL(test_listener2->socketFactory(), localAddress()) - .WillRepeatedly(ReturnRef(listener2_address)); + TestListener* test_listener2 = + addListener(2, false, false, "test_listener2", listener2, nullptr, listener2_address, nullptr, + nullptr, Network::Socket::Type::Stream, std::chrono::milliseconds(15000), false, + listener2_overridden_filter_chain_manager); handler_->addListener(absl::nullopt, *test_listener2, runtime_); // Listener3 will replace the listener2. auto listener3_overridden_filter_chain_manager = std::make_shared>(); auto listener3 = new NiceMock(); - TestListener* test_listener3 = - addListener(3, false, false, "test_listener3", listener3, nullptr, nullptr, nullptr, - Network::Socket::Type::Stream, std::chrono::milliseconds(15000), false, - listener3_overridden_filter_chain_manager); Network::Address::InstanceConstSharedPtr listener3_address( new Network::Address::Ipv4Instance("127.0.0.1", 10002)); - EXPECT_CALL(test_listener3->socketFactory(), localAddress()) - .WillRepeatedly(ReturnRef(listener3_address)); + TestListener* test_listener3 = + addListener(3, false, false, "test_listener3", listener3, nullptr, listener3_address, nullptr, + nullptr, Network::Socket::Type::Stream, std::chrono::milliseconds(15000), false, + listener3_overridden_filter_chain_manager); // This emulated the case of update listener in-place. Stop the old listener and // add the new listener. @@ -1039,21 +1014,17 @@ TEST_F(ConnectionHandlerTest, EnsureNotMatchStoppedListener) { auto listener1 = new NiceMock(); TestListener* test_listener1 = addListener(1, true, true, "test_listener1", listener1, &listener_callbacks); - EXPECT_CALL(test_listener1->socketFactory(), localAddress()) - .WillRepeatedly(ReturnRef(local_address_)); handler_->addListener(absl::nullopt, *test_listener1, runtime_); auto listener2_overridden_filter_chain_manager = std::make_shared>(); auto listener2 = new NiceMock(); - TestListener* test_listener2 = - addListener(2, false, false, "test_listener2", listener2, nullptr, nullptr, nullptr, - Network::Socket::Type::Stream, std::chrono::milliseconds(15000), false, - listener2_overridden_filter_chain_manager); Network::Address::InstanceConstSharedPtr listener2_address( new Network::Address::Ipv4Instance("127.0.0.1", 10002)); - EXPECT_CALL(test_listener2->socketFactory(), localAddress()) - .WillRepeatedly(ReturnRef(listener2_address)); + TestListener* test_listener2 = + addListener(2, false, false, "test_listener2", listener2, nullptr, listener2_address, nullptr, + nullptr, Network::Socket::Type::Stream, std::chrono::milliseconds(15000), false, + listener2_overridden_filter_chain_manager); handler_->addListener(absl::nullopt, *test_listener2, runtime_); // Stop the listener2. @@ -1103,21 +1074,17 @@ TEST_F(ConnectionHandlerTest, EnsureNotMatchStoppedAnyAddressListener) { auto listener1 = new NiceMock(); TestListener* test_listener1 = addListener(1, true, true, "test_listener1", listener1, &listener_callbacks); - EXPECT_CALL(test_listener1->socketFactory(), localAddress()) - .WillRepeatedly(ReturnRef(local_address_)); handler_->addListener(absl::nullopt, *test_listener1, runtime_); auto listener2_overridden_filter_chain_manager = std::make_shared>(); auto listener2 = new NiceMock(); - TestListener* test_listener2 = - addListener(2, false, false, "test_listener2", listener2, nullptr, nullptr, nullptr, - Network::Socket::Type::Stream, std::chrono::milliseconds(15000), false, - listener2_overridden_filter_chain_manager); Network::Address::InstanceConstSharedPtr listener2_address( new Network::Address::Ipv4Instance("0.0.0.0", 10002)); - EXPECT_CALL(test_listener2->socketFactory(), localAddress()) - .WillRepeatedly(ReturnRef(listener2_address)); + TestListener* test_listener2 = + addListener(2, false, false, "test_listener2", listener2, nullptr, listener2_address, nullptr, + nullptr, Network::Socket::Type::Stream, std::chrono::milliseconds(15000), false, + listener2_overridden_filter_chain_manager); handler_->addListener(absl::nullopt, *test_listener2, runtime_); // Stop the listener2. @@ -1164,21 +1131,17 @@ TEST_F(ConnectionHandlerTest, EnsureNotMatchStoppedAnyAddressListener) { TEST_F(ConnectionHandlerTest, FallbackToWildcardListener) { Network::TcpListenerCallbacks* listener_callbacks1; auto listener1 = new NiceMock(); - TestListener* test_listener1 = - addListener(1, true, true, "test_listener1", listener1, &listener_callbacks1); Network::Address::InstanceConstSharedPtr normal_address( new Network::Address::Ipv4Instance("127.0.0.1", 10001)); - EXPECT_CALL(test_listener1->socketFactory(), localAddress()) - .WillRepeatedly(ReturnRef(normal_address)); + TestListener* test_listener1 = + addListener(1, true, true, "test_listener1", listener1, &listener_callbacks1, normal_address); handler_->addListener(absl::nullopt, *test_listener1, runtime_); Network::TcpListenerCallbacks* listener_callbacks2; auto listener2 = new NiceMock(); - TestListener* test_listener2 = - addListener(2, false, false, "test_listener2", listener2, &listener_callbacks2); Network::Address::InstanceConstSharedPtr any_address = Network::Utility::getIpv4AnyAddress(); - EXPECT_CALL(test_listener2->socketFactory(), localAddress()) - .WillRepeatedly(ReturnRef(any_address)); + TestListener* test_listener2 = + addListener(2, false, false, "test_listener2", listener2, &listener_callbacks2, any_address); handler_->addListener(absl::nullopt, *test_listener2, runtime_); Network::MockListenerFilter* test_filter = new Network::MockListenerFilter(); @@ -1218,41 +1181,34 @@ TEST_F(ConnectionHandlerTest, FallbackToWildcardListener) { TEST_F(ConnectionHandlerTest, MatchIPv6WildcardListener) { Network::TcpListenerCallbacks* listener_callbacks1; auto listener1 = new NiceMock(); - TestListener* test_listener1 = - addListener(1, true, true, "test_listener1", listener1, &listener_callbacks1); Network::Address::InstanceConstSharedPtr normal_address( new Network::Address::Ipv4Instance("127.0.0.1", 10001)); - EXPECT_CALL(test_listener1->socketFactory(), localAddress()) - .WillRepeatedly(ReturnRef(normal_address)); + TestListener* test_listener1 = + addListener(1, true, true, "test_listener1", listener1, &listener_callbacks1, normal_address); handler_->addListener(absl::nullopt, *test_listener1, runtime_); auto ipv4_overridden_filter_chain_manager = std::make_shared>(); Network::TcpListenerCallbacks* ipv4_any_listener_callbacks; auto listener2 = new NiceMock(); - TestListener* ipv4_any_listener = - addListener(2, false, false, "ipv4_any_test_listener", listener2, - &ipv4_any_listener_callbacks, nullptr, nullptr, Network::Socket::Type::Stream, - std::chrono::milliseconds(15000), false, ipv4_overridden_filter_chain_manager); - Network::Address::InstanceConstSharedPtr any_address( new Network::Address::Ipv4Instance("0.0.0.0", 80)); - EXPECT_CALL(ipv4_any_listener->socketFactory(), localAddress()) - .WillRepeatedly(ReturnRef(any_address)); + TestListener* ipv4_any_listener = addListener( + 2, false, false, "ipv4_any_test_listener", listener2, &ipv4_any_listener_callbacks, + any_address, nullptr, nullptr, Network::Socket::Type::Stream, + std::chrono::milliseconds(15000), false, ipv4_overridden_filter_chain_manager); handler_->addListener(absl::nullopt, *ipv4_any_listener, runtime_); auto ipv6_overridden_filter_chain_manager = std::make_shared>(); Network::TcpListenerCallbacks* ipv6_any_listener_callbacks; auto listener3 = new NiceMock(); - TestListener* ipv6_any_listener = - addListener(3, false, false, "ipv6_any_test_listener", listener3, - &ipv6_any_listener_callbacks, nullptr, nullptr, Network::Socket::Type::Stream, - std::chrono::milliseconds(15000), false, ipv6_overridden_filter_chain_manager); Network::Address::InstanceConstSharedPtr any_address_ipv6( new Network::Address::Ipv6Instance("::", 80)); - EXPECT_CALL(ipv6_any_listener->socketFactory(), localAddress()) - .WillRepeatedly(ReturnRef(any_address_ipv6)); + TestListener* ipv6_any_listener = addListener( + 3, false, false, "ipv6_any_test_listener", listener3, &ipv6_any_listener_callbacks, + any_address_ipv6, nullptr, nullptr, Network::Socket::Type::Stream, + std::chrono::milliseconds(15000), false, ipv6_overridden_filter_chain_manager); handler_->addListener(absl::nullopt, *ipv6_any_listener, runtime_); Network::MockListenerFilter* test_filter = new Network::MockListenerFilter(); @@ -1301,27 +1257,23 @@ TEST_F(ConnectionHandlerTest, MatchIPv6WildcardListener) { TEST_F(ConnectionHandlerTest, MatchIPv6WildcardListenerWithAnyAddressAndIpv4CompatFlag) { Network::TcpListenerCallbacks* listener_callbacks1; auto listener1 = new NiceMock(); - TestListener* test_listener1 = - addListener(1, true, true, "test_listener1", listener1, &listener_callbacks1); Network::Address::InstanceConstSharedPtr normal_address( new Network::Address::Ipv4Instance("127.0.0.1", 10001)); - EXPECT_CALL(test_listener1->socketFactory(), localAddress()) - .WillRepeatedly(ReturnRef(normal_address)); + TestListener* test_listener1 = + addListener(1, true, true, "test_listener1", listener1, &listener_callbacks1, normal_address); handler_->addListener(absl::nullopt, *test_listener1, runtime_); auto ipv6_overridden_filter_chain_manager = std::make_shared>(); Network::TcpListenerCallbacks* ipv6_any_listener_callbacks; auto listener2 = new NiceMock(); - TestListener* ipv6_any_listener = - addListener(2, false, false, "ipv6_any_test_listener", listener2, - &ipv6_any_listener_callbacks, nullptr, nullptr, Network::Socket::Type::Stream, - std::chrono::milliseconds(15000), false, ipv6_overridden_filter_chain_manager); // Set the ipv6only as false. Network::Address::InstanceConstSharedPtr any_address_ipv6( new Network::Address::Ipv6Instance("::", 80, nullptr, false)); - EXPECT_CALL(ipv6_any_listener->socketFactory(), localAddress()) - .WillRepeatedly(ReturnRef(any_address_ipv6)); + TestListener* ipv6_any_listener = addListener( + 2, false, false, "ipv6_any_test_listener", listener2, &ipv6_any_listener_callbacks, + any_address_ipv6, nullptr, nullptr, Network::Socket::Type::Stream, + std::chrono::milliseconds(15000), false, ipv6_overridden_filter_chain_manager); handler_->addListener(absl::nullopt, *ipv6_any_listener, runtime_); Network::MockListenerFilter* test_filter = new Network::MockListenerFilter(); @@ -1370,27 +1322,23 @@ TEST_F(ConnectionHandlerTest, MatchIPv6WildcardListenerWithAnyAddressAndIpv4Comp TEST_F(ConnectionHandlerTest, MatchhIpv4CompatiableIPv6ListenerWithIpv4CompatFlag) { Network::TcpListenerCallbacks* listener_callbacks1; auto listener1 = new NiceMock(); - TestListener* test_listener1 = - addListener(1, true, true, "test_listener1", listener1, &listener_callbacks1); Network::Address::InstanceConstSharedPtr normal_address( new Network::Address::Ipv4Instance("127.0.0.1", 10001)); - EXPECT_CALL(test_listener1->socketFactory(), localAddress()) - .WillRepeatedly(ReturnRef(normal_address)); + TestListener* test_listener1 = + addListener(1, true, true, "test_listener1", listener1, &listener_callbacks1, normal_address); handler_->addListener(absl::nullopt, *test_listener1, runtime_); auto ipv6_overridden_filter_chain_manager = std::make_shared>(); Network::TcpListenerCallbacks* ipv6_listener_callbacks; auto listener2 = new NiceMock(); - TestListener* ipv6_listener = - addListener(2, false, false, "ipv6_test_listener", listener2, &ipv6_listener_callbacks, - nullptr, nullptr, Network::Socket::Type::Stream, std::chrono::milliseconds(15000), - false, ipv6_overridden_filter_chain_manager); // Set the ipv6only as false. Network::Address::InstanceConstSharedPtr ipv4_mapped_ipv6_address( new Network::Address::Ipv6Instance("::FFFF:192.168.0.1", 80, nullptr, false)); - EXPECT_CALL(ipv6_listener->socketFactory(), localAddress()) - .WillRepeatedly(ReturnRef(ipv4_mapped_ipv6_address)); + TestListener* ipv6_listener = + addListener(2, false, false, "ipv6_test_listener", listener2, &ipv6_listener_callbacks, + ipv4_mapped_ipv6_address, nullptr, nullptr, Network::Socket::Type::Stream, + std::chrono::milliseconds(15000), false, ipv6_overridden_filter_chain_manager); handler_->addListener(absl::nullopt, *ipv6_listener, runtime_); Network::MockListenerFilter* test_filter = new Network::MockListenerFilter(); @@ -1439,27 +1387,23 @@ TEST_F(ConnectionHandlerTest, MatchhIpv4CompatiableIPv6ListenerWithIpv4CompatFla TEST_F(ConnectionHandlerTest, NotMatchIPv6WildcardListenerWithoutIpv4CompatFlag) { Network::TcpListenerCallbacks* listener_callbacks1; auto listener1 = new NiceMock(); - TestListener* test_listener1 = - addListener(1, true, true, "test_listener1", listener1, &listener_callbacks1); Network::Address::InstanceConstSharedPtr normal_address( new Network::Address::Ipv4Instance("127.0.0.1", 10001)); - EXPECT_CALL(test_listener1->socketFactory(), localAddress()) - .WillRepeatedly(ReturnRef(normal_address)); + TestListener* test_listener1 = + addListener(1, true, true, "test_listener1", listener1, &listener_callbacks1, normal_address); handler_->addListener(absl::nullopt, *test_listener1, runtime_); auto ipv6_overridden_filter_chain_manager = std::make_shared>(); Network::TcpListenerCallbacks* ipv6_any_listener_callbacks; auto listener2 = new NiceMock(); - TestListener* ipv6_any_listener = - addListener(2, false, false, "ipv6_any_test_listener", listener2, - &ipv6_any_listener_callbacks, nullptr, nullptr, Network::Socket::Type::Stream, - std::chrono::milliseconds(15000), false, ipv6_overridden_filter_chain_manager); // not set the ipv6only flag, the default value is true. Network::Address::InstanceConstSharedPtr any_address_ipv6( new Network::Address::Ipv6Instance("::", 80)); - EXPECT_CALL(ipv6_any_listener->socketFactory(), localAddress()) - .WillRepeatedly(ReturnRef(any_address_ipv6)); + TestListener* ipv6_any_listener = addListener( + 2, false, false, "ipv6_any_test_listener", listener2, &ipv6_any_listener_callbacks, + any_address_ipv6, nullptr, nullptr, Network::Socket::Type::Stream, + std::chrono::milliseconds(15000), false, ipv6_overridden_filter_chain_manager); handler_->addListener(absl::nullopt, *ipv6_any_listener, runtime_); Network::MockListenerFilter* test_filter = new Network::MockListenerFilter(); @@ -1505,12 +1449,10 @@ TEST_F(ConnectionHandlerTest, MatchhIpv4WhenBothIpv4AndIPv6WithIpv4CompatFlag) { // Listener1 is response for redirect the connection. Network::TcpListenerCallbacks* listener_callbacks1; auto listener1 = new NiceMock(); - TestListener* test_listener1 = - addListener(1, true, true, "test_listener1", listener1, &listener_callbacks1); Network::Address::InstanceConstSharedPtr normal_address( new Network::Address::Ipv4Instance("127.0.0.1", 10001)); - EXPECT_CALL(test_listener1->socketFactory(), localAddress()) - .WillRepeatedly(ReturnRef(normal_address)); + TestListener* test_listener1 = + addListener(1, true, true, "test_listener1", listener1, &listener_callbacks1, normal_address); handler_->addListener(absl::nullopt, *test_listener1, runtime_); // Listener2 is listening on an ipv4-mapped ipv6 address. @@ -1518,15 +1460,13 @@ TEST_F(ConnectionHandlerTest, MatchhIpv4WhenBothIpv4AndIPv6WithIpv4CompatFlag) { std::make_shared>(); Network::TcpListenerCallbacks* ipv6_any_listener_callbacks; auto listener2 = new NiceMock(); - TestListener* ipv6_listener = - addListener(2, false, false, "ipv6_test_listener", listener2, &ipv6_any_listener_callbacks, - nullptr, nullptr, Network::Socket::Type::Stream, std::chrono::milliseconds(15000), - false, ipv6_overridden_filter_chain_manager); // Set the ipv6only as false. Network::Address::InstanceConstSharedPtr ipv6_any_address( new Network::Address::Ipv6Instance("::", 80, nullptr, false)); - EXPECT_CALL(ipv6_listener->socketFactory(), localAddress()) - .WillRepeatedly(ReturnRef(ipv6_any_address)); + TestListener* ipv6_listener = + addListener(2, false, false, "ipv6_test_listener", listener2, &ipv6_any_listener_callbacks, + ipv6_any_address, nullptr, nullptr, Network::Socket::Type::Stream, + std::chrono::milliseconds(15000), false, ipv6_overridden_filter_chain_manager); handler_->addListener(absl::nullopt, *ipv6_listener, runtime_); // Listener3 is listening on an ipv4 address. @@ -1534,14 +1474,12 @@ TEST_F(ConnectionHandlerTest, MatchhIpv4WhenBothIpv4AndIPv6WithIpv4CompatFlag) { std::make_shared>(); Network::TcpListenerCallbacks* ipv4_listener_callbacks; auto listener3 = new NiceMock(); - TestListener* ipv4_listener = - addListener(3, false, false, "ipv4_test_listener", listener3, &ipv4_listener_callbacks, - nullptr, nullptr, Network::Socket::Type::Stream, std::chrono::milliseconds(15000), - false, ipv4_overridden_filter_chain_manager); Network::Address::InstanceConstSharedPtr ipv4_address( new Network::Address::Ipv4Instance("0.0.0.0", 80, nullptr)); - EXPECT_CALL(ipv4_listener->socketFactory(), localAddress()) - .WillRepeatedly(ReturnRef(ipv4_address)); + TestListener* ipv4_listener = + addListener(3, false, false, "ipv4_test_listener", listener3, &ipv4_listener_callbacks, + ipv4_address, nullptr, nullptr, Network::Socket::Type::Stream, + std::chrono::milliseconds(15000), false, ipv4_overridden_filter_chain_manager); handler_->addListener(absl::nullopt, *ipv4_listener, runtime_); Network::MockListenerFilter* test_filter = new Network::MockListenerFilter(); @@ -1591,12 +1529,10 @@ TEST_F(ConnectionHandlerTest, MatchhIpv4WhenBothIpv4AndIPv6WithIpv4CompatFlag2) // Listener1 is response for redirect the connection. Network::TcpListenerCallbacks* listener_callbacks1; auto listener1 = new NiceMock(); - TestListener* test_listener1 = - addListener(1, true, true, "test_listener1", listener1, &listener_callbacks1); Network::Address::InstanceConstSharedPtr normal_address( new Network::Address::Ipv4Instance("127.0.0.1", 10001)); - EXPECT_CALL(test_listener1->socketFactory(), localAddress()) - .WillRepeatedly(ReturnRef(normal_address)); + TestListener* test_listener1 = + addListener(1, true, true, "test_listener1", listener1, &listener_callbacks1, normal_address); handler_->addListener(absl::nullopt, *test_listener1, runtime_); // Listener3 is listening on an ipv4 address. @@ -1604,14 +1540,12 @@ TEST_F(ConnectionHandlerTest, MatchhIpv4WhenBothIpv4AndIPv6WithIpv4CompatFlag2) std::make_shared>(); Network::TcpListenerCallbacks* ipv4_listener_callbacks; auto listener3 = new NiceMock(); - TestListener* ipv4_listener = - addListener(3, false, false, "ipv4_test_listener", listener3, &ipv4_listener_callbacks, - nullptr, nullptr, Network::Socket::Type::Stream, std::chrono::milliseconds(15000), - false, ipv4_overridden_filter_chain_manager); Network::Address::InstanceConstSharedPtr ipv4_address( new Network::Address::Ipv4Instance("0.0.0.0", 80, nullptr)); - EXPECT_CALL(ipv4_listener->socketFactory(), localAddress()) - .WillRepeatedly(ReturnRef(ipv4_address)); + TestListener* ipv4_listener = + addListener(3, false, false, "ipv4_test_listener", listener3, &ipv4_listener_callbacks, + ipv4_address, nullptr, nullptr, Network::Socket::Type::Stream, + std::chrono::milliseconds(15000), false, ipv4_overridden_filter_chain_manager); handler_->addListener(absl::nullopt, *ipv4_listener, runtime_); // Listener2 is listening on an ipv4-mapped ipv6 address. @@ -1619,15 +1553,13 @@ TEST_F(ConnectionHandlerTest, MatchhIpv4WhenBothIpv4AndIPv6WithIpv4CompatFlag2) std::make_shared>(); Network::TcpListenerCallbacks* ipv6_any_listener_callbacks; auto listener2 = new NiceMock(); - TestListener* ipv6_listener = - addListener(2, false, false, "ipv6_test_listener", listener2, &ipv6_any_listener_callbacks, - nullptr, nullptr, Network::Socket::Type::Stream, std::chrono::milliseconds(15000), - false, ipv6_overridden_filter_chain_manager); // Set the ipv6only as false. Network::Address::InstanceConstSharedPtr ipv4_mapped_ipv6_address( new Network::Address::Ipv6Instance("::", 80, nullptr, false)); - EXPECT_CALL(ipv6_listener->socketFactory(), localAddress()) - .WillRepeatedly(ReturnRef(ipv4_mapped_ipv6_address)); + TestListener* ipv6_listener = + addListener(2, false, false, "ipv6_test_listener", listener2, &ipv6_any_listener_callbacks, + ipv4_mapped_ipv6_address, nullptr, nullptr, Network::Socket::Type::Stream, + std::chrono::milliseconds(15000), false, ipv6_overridden_filter_chain_manager); handler_->addListener(absl::nullopt, *ipv6_listener, runtime_); Network::MockListenerFilter* test_filter = new Network::MockListenerFilter(); @@ -1678,12 +1610,10 @@ TEST_F(ConnectionHandlerTest, AddIpv4MappedListenerAfterIpv4ListenerStopped) { // Listener1 is response for redirect the connection. Network::TcpListenerCallbacks* listener_callbacks1; auto listener1 = new NiceMock(); - TestListener* test_listener1 = - addListener(1, true, true, "test_listener1", listener1, &listener_callbacks1); Network::Address::InstanceConstSharedPtr normal_address( new Network::Address::Ipv4Instance("127.0.0.1", 10001)); - EXPECT_CALL(test_listener1->socketFactory(), localAddress()) - .WillRepeatedly(ReturnRef(normal_address)); + TestListener* test_listener1 = + addListener(1, true, true, "test_listener1", listener1, &listener_callbacks1, normal_address); handler_->addListener(absl::nullopt, *test_listener1, runtime_); // Listener2 is listening on an Ipv4 address. @@ -1691,14 +1621,12 @@ TEST_F(ConnectionHandlerTest, AddIpv4MappedListenerAfterIpv4ListenerStopped) { std::make_shared>(); Network::TcpListenerCallbacks* ipv4_listener_callbacks; auto listener2 = new NiceMock(); - TestListener* ipv4_listener = - addListener(2, false, false, "ipv4_test_listener", listener2, &ipv4_listener_callbacks, - nullptr, nullptr, Network::Socket::Type::Stream, std::chrono::milliseconds(15000), - false, ipv4_overridden_filter_chain_manager); Network::Address::InstanceConstSharedPtr ipv4_address( new Network::Address::Ipv4Instance("0.0.0.0", 80, nullptr)); - EXPECT_CALL(ipv4_listener->socketFactory(), localAddress()) - .WillRepeatedly(ReturnRef(ipv4_address)); + TestListener* ipv4_listener = + addListener(2, false, false, "ipv4_test_listener", listener2, &ipv4_listener_callbacks, + ipv4_address, nullptr, nullptr, Network::Socket::Type::Stream, + std::chrono::milliseconds(15000), false, ipv4_overridden_filter_chain_manager); handler_->addListener(absl::nullopt, *ipv4_listener, runtime_); EXPECT_CALL(*listener2, onDestroy()); @@ -1710,15 +1638,13 @@ TEST_F(ConnectionHandlerTest, AddIpv4MappedListenerAfterIpv4ListenerStopped) { std::make_shared>(); Network::TcpListenerCallbacks* ipv6_any_listener_callbacks; auto listener3 = new NiceMock(); - TestListener* ipv6_listener = - addListener(3, false, false, "ipv6_test_listener", listener3, &ipv6_any_listener_callbacks, - nullptr, nullptr, Network::Socket::Type::Stream, std::chrono::milliseconds(15000), - false, ipv6_overridden_filter_chain_manager); // Set the ipv6only as false. Network::Address::InstanceConstSharedPtr ipv4_mapped_ipv6_address( new Network::Address::Ipv6Instance("::", 80, nullptr, false)); - EXPECT_CALL(ipv6_listener->socketFactory(), localAddress()) - .WillRepeatedly(ReturnRef(ipv4_mapped_ipv6_address)); + TestListener* ipv6_listener = + addListener(3, false, false, "ipv6_test_listener", listener3, &ipv6_any_listener_callbacks, + ipv4_mapped_ipv6_address, nullptr, nullptr, Network::Socket::Type::Stream, + std::chrono::milliseconds(15000), false, ipv6_overridden_filter_chain_manager); handler_->addListener(absl::nullopt, *ipv6_listener, runtime_); Network::MockListenerFilter* test_filter = new Network::MockListenerFilter(); @@ -1784,15 +1710,12 @@ TEST_F(ConnectionHandlerTest, WildcardListenerWithOriginalDstOutbound) { TEST_F(ConnectionHandlerTest, WildcardListenerWithNoOriginalDst) { Network::TcpListenerCallbacks* listener_callbacks1; auto listener1 = new NiceMock(); - TestListener* test_listener1 = - addListener(1, true, true, "test_listener1", listener1, &listener_callbacks1); - Network::Address::InstanceConstSharedPtr normal_address( new Network::Address::Ipv4Instance("127.0.0.1", 80)); Network::Address::InstanceConstSharedPtr any_address = Network::Utility::getAddressWithPort( *Network::Utility::getIpv4AnyAddress(), normal_address->ip()->port()); - EXPECT_CALL(test_listener1->socketFactory(), localAddress()) - .WillRepeatedly(ReturnRef(any_address)); + TestListener* test_listener1 = + addListener(1, true, true, "test_listener1", listener1, &listener_callbacks1, any_address); handler_->addListener(absl::nullopt, *test_listener1, runtime_); Network::MockListenerFilter* test_filter = new Network::MockListenerFilter(); @@ -1821,8 +1744,6 @@ TEST_F(ConnectionHandlerTest, TransportProtocolDefault) { auto listener = new NiceMock(); TestListener* test_listener = addListener(1, true, false, "test_listener", listener, &listener_callbacks); - EXPECT_CALL(test_listener->socketFactory(), localAddress()) - .WillRepeatedly(ReturnRef(local_address_)); handler_->addListener(absl::nullopt, *test_listener, runtime_); Network::MockConnectionSocket* accepted_socket = new NiceMock(); @@ -1841,8 +1762,6 @@ TEST_F(ConnectionHandlerTest, TransportProtocolCustom) { auto listener = new NiceMock(); TestListener* test_listener = addListener(1, true, false, "test_listener", listener, &listener_callbacks); - EXPECT_CALL(test_listener->socketFactory(), localAddress()) - .WillRepeatedly(ReturnRef(local_address_)); handler_->addListener(absl::nullopt, *test_listener, runtime_); Network::MockListenerFilter* test_filter = new Network::MockListenerFilter(); @@ -1876,8 +1795,6 @@ TEST_F(ConnectionHandlerTest, ListenerFilterTimeout) { auto listener = new NiceMock(); TestListener* test_listener = addListener(1, true, false, "test_listener", listener, &listener_callbacks); - EXPECT_CALL(test_listener->socketFactory(), localAddress()) - .WillRepeatedly(ReturnRef(local_address_)); handler_->addListener(absl::nullopt, *test_listener, runtime_); Network::MockListenerFilter* test_filter = new Network::MockListenerFilter(512); @@ -1928,9 +1845,7 @@ TEST_F(ConnectionHandlerTest, ContinueOnListenerFilterTimeout) { auto listener = new NiceMock(); TestListener* test_listener = addListener(1, true, false, "test_listener", listener, &listener_callbacks, nullptr, nullptr, - Network::Socket::Type::Stream, std::chrono::milliseconds(15000), true); - EXPECT_CALL(test_listener->socketFactory(), localAddress()) - .WillRepeatedly(ReturnRef(local_address_)); + nullptr, Network::Socket::Type::Stream, std::chrono::milliseconds(15000), true); handler_->addListener(absl::nullopt, *test_listener, runtime_); Network::MockListenerFilter* test_filter = new NiceMock(128); @@ -1991,8 +1906,6 @@ TEST_F(ConnectionHandlerTest, ListenerFilterTimeoutResetOnSuccess) { auto listener = new NiceMock(); TestListener* test_listener = addListener(1, true, false, "test_listener", listener, &listener_callbacks); - EXPECT_CALL(test_listener->socketFactory(), localAddress()) - .WillRepeatedly(ReturnRef(local_address_)); handler_->addListener(absl::nullopt, *test_listener, runtime_); Network::MockListenerFilter* test_filter = new Network::MockListenerFilter(123); @@ -2044,9 +1957,7 @@ TEST_F(ConnectionHandlerTest, ListenerFilterDisabledTimeout) { auto listener = new NiceMock(); TestListener* test_listener = addListener(1, true, false, "test_listener", listener, &listener_callbacks, nullptr, nullptr, - Network::Socket::Type::Stream, std::chrono::milliseconds()); - EXPECT_CALL(test_listener->socketFactory(), localAddress()) - .WillRepeatedly(ReturnRef(local_address_)); + nullptr, Network::Socket::Type::Stream, std::chrono::milliseconds()); handler_->addListener(absl::nullopt, *test_listener, runtime_); Network::MockListenerFilter* test_filter = new Network::MockListenerFilter(512); @@ -2082,8 +1993,6 @@ TEST_F(ConnectionHandlerTest, ListenerFilterReportError) { auto listener = new NiceMock(); TestListener* test_listener = addListener(1, true, false, "test_listener", listener, &listener_callbacks); - EXPECT_CALL(test_listener->socketFactory(), localAddress()) - .WillRepeatedly(ReturnRef(local_address_)); handler_->addListener(absl::nullopt, *test_listener, runtime_); Network::MockListenerFilter* first_filter = new Network::MockListenerFilter(); @@ -2123,7 +2032,7 @@ TEST_F(ConnectionHandlerTest, UdpListenerNoFilter) { auto listener = new NiceMock(); TestListener* test_listener = - addListener(1, true, false, "test_listener", listener, nullptr, nullptr, nullptr, + addListener(1, true, false, "test_listener", listener, nullptr, nullptr, nullptr, nullptr, Network::Socket::Type::Datagram, std::chrono::milliseconds()); EXPECT_CALL(factory_, createUdpListenerFilterChain(_, _)) .WillOnce(Invoke([&](Network::UdpListenerFilterManager&, @@ -2153,9 +2062,7 @@ TEST_F(ConnectionHandlerTest, TcpListenerInplaceUpdate) { TestListener* old_test_listener = addListener(old_listener_tag, true, false, "test_listener", old_listener, - &old_listener_callbacks, mock_connection_balancer, ¤t_handler); - EXPECT_CALL(old_test_listener->socketFactory(), localAddress()) - .WillRepeatedly(ReturnRef(local_address_)); + &old_listener_callbacks, nullptr, mock_connection_balancer, ¤t_handler); handler_->addListener(absl::nullopt, *old_test_listener, runtime_); ASSERT_NE(old_test_listener, nullptr); @@ -2163,10 +2070,11 @@ TEST_F(ConnectionHandlerTest, TcpListenerInplaceUpdate) { auto overridden_filter_chain_manager = std::make_shared>(); - TestListener* new_test_listener = addListener( - new_listener_tag, true, false, "test_listener", /* Network::Listener */ nullptr, - &new_listener_callbacks, mock_connection_balancer, nullptr, Network::Socket::Type::Stream, - std::chrono::milliseconds(15000), false, overridden_filter_chain_manager); + TestListener* new_test_listener = + addListener(new_listener_tag, true, false, "test_listener", /* Network::Listener */ nullptr, + &new_listener_callbacks, nullptr, mock_connection_balancer, nullptr, + Network::Socket::Type::Stream, std::chrono::milliseconds(15000), false, + overridden_filter_chain_manager); handler_->addListener(old_listener_tag, *new_test_listener, runtime_); ASSERT_EQ(new_listener_callbacks, nullptr) << "new listener should be inplace added and callback should not change"; @@ -2240,8 +2148,6 @@ TEST_F(ConnectionHandlerTest, TcpListenerRemoveFilterChain) { auto listener = new NiceMock(); TestListener* test_listener = addListener(listener_tag, true, false, "test_listener", listener, &listener_callbacks); - EXPECT_CALL(test_listener->socketFactory(), localAddress()) - .WillRepeatedly(ReturnRef(local_address_)); handler_->addListener(absl::nullopt, *test_listener, runtime_); Network::MockConnectionSocket* connection = new NiceMock(); @@ -2289,8 +2195,6 @@ TEST_F(ConnectionHandlerTest, TcpListenerRemoveFilterChainCalledAfterListenerIsR auto listener = new NiceMock(); TestListener* test_listener = addListener(listener_tag, true, false, "test_listener", listener, &listener_callbacks); - EXPECT_CALL(test_listener->socketFactory(), localAddress()) - .WillRepeatedly(ReturnRef(local_address_)); handler_->addListener(absl::nullopt, *test_listener, runtime_); Network::MockConnectionSocket* connection = new NiceMock(); @@ -2352,8 +2256,6 @@ TEST_F(ConnectionHandlerTest, TcpListenerRemoveListener) { auto listener = new NiceMock(); TestListener* test_listener = addListener(1, true, false, "test_listener", listener, &listener_callbacks); - EXPECT_CALL(test_listener->socketFactory(), localAddress()) - .WillRepeatedly(ReturnRef(local_address_)); handler_->addListener(absl::nullopt, *test_listener, runtime_); Network::MockConnectionSocket* connection = new NiceMock(); @@ -2382,12 +2284,10 @@ TEST_F(ConnectionHandlerTest, TcpListenerRemoveIpv6AnyAddressWithIpv4CompatListe Network::TcpListenerCallbacks* listener_callbacks; auto listener = new NiceMock(); - TestListener* test_listener = - addListener(1, true, false, "test_listener", listener, &listener_callbacks); Network::Address::InstanceConstSharedPtr any_address_ipv6( new Network::Address::Ipv6Instance("::", 80, nullptr, false)); - EXPECT_CALL(test_listener->socketFactory(), localAddress()) - .WillRepeatedly(ReturnRef(any_address_ipv6)); + TestListener* test_listener = + addListener(1, true, false, "test_listener", listener, &listener_callbacks, any_address_ipv6); handler_->addListener(absl::nullopt, *test_listener, runtime_); Network::MockConnectionSocket* connection = new NiceMock(); @@ -2413,12 +2313,10 @@ TEST_F(ConnectionHandlerTest, TcpListenerRemoveIpv4CompatAddressListener) { Network::TcpListenerCallbacks* listener_callbacks; auto listener = new NiceMock(); - TestListener* test_listener = - addListener(1, true, false, "test_listener", listener, &listener_callbacks); Network::Address::InstanceConstSharedPtr address_ipv6( new Network::Address::Ipv6Instance("::FFFF:192.168.0.1", 80, nullptr, false)); - EXPECT_CALL(test_listener->socketFactory(), localAddress()) - .WillRepeatedly(ReturnRef(address_ipv6)); + TestListener* test_listener = + addListener(1, true, false, "test_listener", listener, &listener_callbacks, address_ipv6); handler_->addListener(absl::nullopt, *test_listener, runtime_); Network::MockConnectionSocket* connection = new NiceMock(); @@ -2444,22 +2342,18 @@ TEST_F(ConnectionHandlerTest, TcpListenerRemoveWithBothIpv4AnyAndIpv6Any) { Network::TcpListenerCallbacks* listener_callbacks; auto listener = new NiceMock(); - TestListener* test_listener = - addListener(1, true, false, "test_listener", listener, &listener_callbacks); Network::Address::InstanceConstSharedPtr address_ipv6( new Network::Address::Ipv6Instance("::", 80, nullptr, false)); - EXPECT_CALL(test_listener->socketFactory(), localAddress()) - .WillRepeatedly(ReturnRef(address_ipv6)); + TestListener* test_listener = + addListener(1, true, false, "test_listener", listener, &listener_callbacks, address_ipv6); handler_->addListener(absl::nullopt, *test_listener, runtime_); Network::TcpListenerCallbacks* listener_callbacks2; auto listener2 = new NiceMock(); - TestListener* test_listener2 = - addListener(2, true, false, "test_listener2", listener2, &listener_callbacks2); Network::Address::InstanceConstSharedPtr address_ipv4( new Network::Address::Ipv4Instance("0.0.0.0", 80, nullptr)); - EXPECT_CALL(test_listener2->socketFactory(), localAddress()) - .WillRepeatedly(ReturnRef(address_ipv4)); + TestListener* test_listener2 = + addListener(2, true, false, "test_listener2", listener2, &listener_callbacks2, address_ipv4); handler_->addListener(absl::nullopt, *test_listener2, runtime_); Network::MockConnectionSocket* connection = new NiceMock(); @@ -2498,8 +2392,6 @@ TEST_F(ConnectionHandlerTest, TcpListenerGlobalCxLimitReject) { auto listener = new NiceMock(); TestListener* test_listener = addListener(1, true, false, "test_listener", listener, &listener_callbacks); - EXPECT_CALL(test_listener->socketFactory(), localAddress()) - .WillRepeatedly(ReturnRef(local_address_)); handler_->addListener(absl::nullopt, *test_listener, runtime_); listener_callbacks->onReject(Network::TcpListenerCallbacks::RejectCause::GlobalCxLimit); @@ -2514,8 +2406,6 @@ TEST_F(ConnectionHandlerTest, TcpListenerOverloadActionReject) { auto listener = new NiceMock(); TestListener* test_listener = addListener(1, true, false, "test_listener", listener, &listener_callbacks); - EXPECT_CALL(test_listener->socketFactory(), localAddress()) - .WillRepeatedly(ReturnRef(local_address_)); handler_->addListener(absl::nullopt, *test_listener, runtime_); listener_callbacks->onReject(Network::TcpListenerCallbacks::RejectCause::OverloadAction); @@ -2531,8 +2421,6 @@ TEST_F(ConnectionHandlerTest, ListenerFilterWorks) { auto listener = new NiceMock(); TestListener* test_listener = addListener(1, true, false, "test_listener", listener, &listener_callbacks); - EXPECT_CALL(test_listener->socketFactory(), localAddress()) - .WillRepeatedly(ReturnRef(local_address_)); handler_->addListener(absl::nullopt, *test_listener, runtime_); auto all_matcher = std::make_shared(); @@ -2567,7 +2455,7 @@ TEST_F(ConnectionHandlerTest, ShutdownUdpListener) { Network::MockUdpReadFilterCallbacks dummy_callbacks; auto listener = new NiceMock(*this); TestListener* test_listener = - addListener(1, true, false, "test_listener", listener, nullptr, nullptr, nullptr, + addListener(1, true, false, "test_listener", listener, nullptr, nullptr, nullptr, nullptr, Network::Socket::Type::Datagram, std::chrono::milliseconds(), false, nullptr); auto filter = std::make_unique>(*this, dummy_callbacks); From e43ebf171c7bbc69b455c3ca751c94767d9c89d4 Mon Sep 17 00:00:00 2001 From: He Jie Xu Date: Mon, 16 May 2022 22:54:08 +0000 Subject: [PATCH 02/16] enable RemoveListenerDuringRebalance Signed-off-by: He Jie Xu --- test/server/connection_handler_test.cc | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/test/server/connection_handler_test.cc b/test/server/connection_handler_test.cc index 63929c661a9d6..fa2a552086e89 100644 --- a/test/server/connection_handler_test.cc +++ b/test/server/connection_handler_test.cc @@ -381,14 +381,13 @@ class ConnectionHandlerTest : public testing::Test, protected Logger::Loggable Date: Tue, 17 May 2022 01:03:30 +0000 Subject: [PATCH 03/16] Fix the unitest for ConnectionHandlerImplTest Signed-off-by: He Jie Xu --- test/server/connection_handler_test.cc | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/test/server/connection_handler_test.cc b/test/server/connection_handler_test.cc index fa2a552086e89..fd8e2021e7841 100644 --- a/test/server/connection_handler_test.cc +++ b/test/server/connection_handler_test.cc @@ -249,7 +249,11 @@ class ConnectionHandlerTest : public testing::Test, protected Logger::LoggablesocketFactory(), socketType()).WillOnce(Return(socket_type)); if (address == nullptr) { - address = local_address_; + EXPECT_CALL(listeners_.back()->socketFactory(), localAddress()) + .WillRepeatedly(ReturnRef(local_address_)); + } else { + EXPECT_CALL(listeners_.back()->socketFactory(), localAddress()) + .WillRepeatedly(ReturnRef(address)); } EXPECT_CALL(listeners_.back()->socketFactory(), localAddress()) .WillRepeatedly(ReturnRef(address)); From 7f48d18109b60c0566641b484f1675e9ee464289 Mon Sep 17 00:00:00 2001 From: He Jie Xu Date: Tue, 17 May 2022 11:17:06 +0000 Subject: [PATCH 04/16] fix connection handler test Signed-off-by: He Jie Xu --- test/server/connection_handler_test.cc | 2 -- 1 file changed, 2 deletions(-) diff --git a/test/server/connection_handler_test.cc b/test/server/connection_handler_test.cc index fd8e2021e7841..c1752fe8e89a2 100644 --- a/test/server/connection_handler_test.cc +++ b/test/server/connection_handler_test.cc @@ -255,8 +255,6 @@ class ConnectionHandlerTest : public testing::Test, protected Logger::LoggablesocketFactory(), localAddress()) .WillRepeatedly(ReturnRef(address)); } - EXPECT_CALL(listeners_.back()->socketFactory(), localAddress()) - .WillRepeatedly(ReturnRef(address)); EXPECT_CALL(listeners_.back()->socketFactory(), getListenSocket(_)) .WillOnce(Return(listeners_.back()->socket_)); if (socket_type == Network::Socket::Type::Stream) { From 4a797f0f9aada1d64013542d53ea866bbe399ccd Mon Sep 17 00:00:00 2001 From: He Jie Xu Date: Wed, 18 May 2022 06:48:58 +0000 Subject: [PATCH 05/16] ConnectionBalancer: back to use getConnectionHandlerByTag since it supports pipe Signed-off-by: He Jie Xu --- envoy/network/connection_handler.h | 10 ++++++++++ source/server/active_tcp_listener.cc | 5 +++-- source/server/connection_handler_impl.cc | 17 +++++++++++++++++ source/server/connection_handler_impl.h | 3 +++ test/server/active_tcp_listener_test.cc | 14 ++++++++------ 5 files changed, 41 insertions(+), 8 deletions(-) diff --git a/envoy/network/connection_handler.h b/envoy/network/connection_handler.h index 7c0e11f936e49..1a0ff3efd02af 100644 --- a/envoy/network/connection_handler.h +++ b/envoy/network/connection_handler.h @@ -177,6 +177,16 @@ class TcpConnectionHandler : public virtual ConnectionHandler { public: virtual Event::Dispatcher& dispatcher() PURE; + /** + * Obtain the rebalancer of the tcp listener. + * @param listener_tag supplies the tag of the tcp listener that was passed to addListener(). + * @param address is used to query the address specific handler. + * @return BalancedConnectionHandlerOptRef the balancer attached to the listener. `nullopt` if + * listener doesn't exist or rebalancer doesn't exist. + */ + virtual BalancedConnectionHandlerOptRef + getBalancedHandlerByTag(uint64_t listener_tag, const Network::Address::Instance& address) PURE; + /** * Obtain the rebalancer of the tcp listener. * @param address supplies the address of the tcp listener. diff --git a/source/server/active_tcp_listener.cc b/source/server/active_tcp_listener.cc index 89c7882107b03..a169a7a33f0ea 100644 --- a/source/server/active_tcp_listener.cc +++ b/source/server/active_tcp_listener.cc @@ -156,9 +156,10 @@ void ActiveTcpListener::post(Network::ConnectionSocketPtr&& socket) { RebalancedSocketSharedPtr socket_to_rebalance = std::make_shared(); socket_to_rebalance->socket = std::move(socket); - dispatcher().post([socket_to_rebalance, address = address_, &tcp_conn_handler = tcp_conn_handler_, + dispatcher().post([socket_to_rebalance, address = address_, tag = config_->listenerTag(), + &tcp_conn_handler = tcp_conn_handler_, handoff = config_->handOffRestoredDestinationConnections()]() { - auto balanced_handler = tcp_conn_handler.getBalancedHandlerByAddress(*address); + auto balanced_handler = tcp_conn_handler.getBalancedHandlerByTag(tag, *address); if (balanced_handler.has_value()) { balanced_handler->get().onAcceptWorker(std::move(socket_to_rebalance->socket), handoff, true); return; diff --git a/source/server/connection_handler_impl.cc b/source/server/connection_handler_impl.cc index f3492bb01b7a7..e6402077b74c2 100644 --- a/source/server/connection_handler_impl.cc +++ b/source/server/connection_handler_impl.cc @@ -321,6 +321,23 @@ ConnectionHandlerImpl::findActiveListenerByTag(uint64_t listener_tag) { return absl::nullopt; } +Network::BalancedConnectionHandlerOptRef +ConnectionHandlerImpl::getBalancedHandlerByTag(uint64_t listener_tag, + const Network::Address::Instance& address) { + auto active_listener = findActiveListenerByTag(listener_tag); + if (active_listener.has_value()) { + for (auto& details : active_listener->get().per_address_details_) { + if (*details->address_ == address) { + ASSERT(absl::holds_alternative>( + details->typed_listener_) && + details->listener_->listener() != nullptr); + return Network::BalancedConnectionHandlerOptRef(details->tcpListener().value().get()); + } + } + } + return absl::nullopt; +} + Network::BalancedConnectionHandlerOptRef ConnectionHandlerImpl::getBalancedHandlerByAddress(const Network::Address::Instance& address) { // Only Ip address can be restored to original address and redirect. diff --git a/source/server/connection_handler_impl.h b/source/server/connection_handler_impl.h index 0f020f58ec9a1..79649b06f9dc4 100644 --- a/source/server/connection_handler_impl.h +++ b/source/server/connection_handler_impl.h @@ -59,6 +59,9 @@ class ConnectionHandlerImpl : public Network::TcpConnectionHandler, // Network::TcpConnectionHandler Event::Dispatcher& dispatcher() override { return dispatcher_; } Network::BalancedConnectionHandlerOptRef + getBalancedHandlerByTag(uint64_t listener_tag, + const Network::Address::Instance& address) override; + Network::BalancedConnectionHandlerOptRef getBalancedHandlerByAddress(const Network::Address::Instance& address) override; // Network::UdpConnectionHandler diff --git a/test/server/active_tcp_listener_test.cc b/test/server/active_tcp_listener_test.cc index 9a42c5ed63369..95f4a329387e6 100644 --- a/test/server/active_tcp_listener_test.cc +++ b/test/server/active_tcp_listener_test.cc @@ -35,6 +35,8 @@ class MockTcpConnectionHandler : public Network::TcpConnectionHandler, public Network::MockConnectionHandler { public: MOCK_METHOD(Event::Dispatcher&, dispatcher, ()); + MOCK_METHOD(Network::BalancedConnectionHandlerOptRef, getBalancedHandlerByTag, + (uint64_t listener_tag, const Network::Address::Instance&)); MOCK_METHOD(Network::BalancedConnectionHandlerOptRef, getBalancedHandlerByAddress, (const Network::Address::Instance& address)); }; @@ -639,12 +641,12 @@ TEST_F(ActiveTcpListenerTest, Rebalance) { })), ReturnRef(*active_listener2))); - EXPECT_CALL(conn_handler_, getBalancedHandlerByAddress) - .WillOnce( - Invoke([&normal_address, &active_listener2](const Network::Address::Instance& address) { - EXPECT_EQ(address, *normal_address); - return Network::BalancedConnectionHandlerOptRef(*active_listener2); - })); + EXPECT_CALL(conn_handler_, getBalancedHandlerByTag) + .WillOnce(Invoke([&normal_address, + &active_listener2](uint64_t, const Network::Address::Instance& address) { + EXPECT_EQ(address, *normal_address); + return Network::BalancedConnectionHandlerOptRef(*active_listener2); + })); auto filter_factory_callback = std::make_shared>(); auto transport_socket_factory = Network::Test::createRawBufferSocketFactory(); filter_chain_ = std::make_shared>(); From af214ea2a6ef15c9a829c033a8302c263212b28f Mon Sep 17 00:00:00 2001 From: He Jie Xu Date: Mon, 20 Jun 2022 06:20:27 +0000 Subject: [PATCH 06/16] fix Signed-off-by: He Jie Xu --- source/server/connection_handler_impl.cc | 2 +- source/server/listener_impl.cc | 50 +++++++++---------- source/server/listener_impl.h | 13 ++--- source/server/listener_manager_impl.cc | 6 +-- .../active_internal_listener_test.cc | 4 +- test/server/active_tcp_listener_test.cc | 2 +- test/server/connection_handler_test.cc | 11 ++-- 7 files changed, 45 insertions(+), 43 deletions(-) diff --git a/source/server/connection_handler_impl.cc b/source/server/connection_handler_impl.cc index e6402077b74c2..34ceef1f77477 100644 --- a/source/server/connection_handler_impl.cc +++ b/source/server/connection_handler_impl.cc @@ -326,7 +326,7 @@ ConnectionHandlerImpl::getBalancedHandlerByTag(uint64_t listener_tag, const Network::Address::Instance& address) { auto active_listener = findActiveListenerByTag(listener_tag); if (active_listener.has_value()) { - for (auto& details : active_listener->get().per_address_details_) { + for (auto& details : active_listener->get().per_address_details_list_) { if (*details->address_ == address) { ASSERT(absl::holds_alternative>( details->typed_listener_) && diff --git a/source/server/listener_impl.cc b/source/server/listener_impl.cc index 3d5d8f397809f..74af9899fcb56 100644 --- a/source/server/listener_impl.cc +++ b/source/server/listener_impl.cc @@ -299,6 +299,7 @@ ListenerImpl::ListenerImpl(const envoy::config::listener::v3::Listener& config, const std::string& name, bool added_via_api, bool workers_started, uint64_t hash, uint32_t concurrency) : parent_(parent), address_(Network::Address::resolveProtoAddress(config.address())), + socket_type_(Network::Utility::protobufAddressSocketType(config.address())), bind_to_port_(shouldBindToPort(config)), mptcp_enabled_(config.enable_mptcp()), hand_off_restored_destination_connections_( PROTOBUF_GET_WRAPPED_OR_DEFAULT(config, use_original_dst, false)), @@ -378,16 +379,15 @@ ListenerImpl::ListenerImpl(const envoy::config::listener::v3::Listener& config, } buildAccessLog(); - auto socket_type = Network::Utility::protobufAddressSocketType(config.address()); - validateConfig(socket_type); + validateConfig(); // buildUdpListenerFactory() must come before buildListenSocketOptions() because the UDP // listener factory can provide additional options. - buildUdpListenerFactory(socket_type, concurrency); - buildListenSocketOptions(socket_type); - createListenerFilterFactories(socket_type); - validateFilterChains(socket_type); + buildUdpListenerFactory(concurrency); + buildListenSocketOptions(); + createListenerFilterFactories(); + validateFilterChains(); buildFilterChains(); - if (socket_type != Network::Socket::Type::Datagram) { + if (socket_type_ != Network::Socket::Type::Datagram) { buildSocketOptions(); buildOriginalDstListenerFilter(); buildProxyProtocolListenerFilter(); @@ -407,8 +407,8 @@ ListenerImpl::ListenerImpl(ListenerImpl& origin, const std::string& version_info, ListenerManagerImpl& parent, const std::string& name, bool added_via_api, bool workers_started, uint64_t hash) - : parent_(parent), address_(origin.address_), bind_to_port_(shouldBindToPort(config)), - mptcp_enabled_(config.enable_mptcp()), + : parent_(parent), address_(origin.address_), socket_type_(origin.socket_type_), + bind_to_port_(shouldBindToPort(config)), mptcp_enabled_(config.enable_mptcp()), hand_off_restored_destination_connections_( PROTOBUF_GET_WRAPPED_OR_DEFAULT(config, use_original_dst, false)), per_connection_buffer_limit_bytes_( @@ -446,14 +446,13 @@ ListenerImpl::ListenerImpl(ListenerImpl& origin, missing_listener_config_stats_({ALL_MISSING_LISTENER_CONFIG_STATS( POOL_COUNTER(listener_factory_context_->listenerScope()))}) { buildAccessLog(); - auto socket_type = Network::Utility::protobufAddressSocketType(config.address()); - validateConfig(socket_type); - buildListenSocketOptions(socket_type); - createListenerFilterFactories(socket_type); - validateFilterChains(socket_type); + validateConfig(); + buildListenSocketOptions(); + createListenerFilterFactories(); + validateFilterChains(); buildFilterChains(); buildInternalListener(); - if (socket_type == Network::Socket::Type::Stream) { + if (socket_type_ == Network::Socket::Type::Stream) { // Apply the options below only for TCP. buildSocketOptions(); buildOriginalDstListenerFilter(); @@ -462,9 +461,9 @@ ListenerImpl::ListenerImpl(ListenerImpl& origin, } } -void ListenerImpl::validateConfig(Network::Socket::Type socket_type) { +void ListenerImpl::validateConfig() { if (mptcp_enabled_) { - if (socket_type != Network::Socket::Type::Stream) { + if (socket_type_ != Network::Socket::Type::Stream) { throw EnvoyException( fmt::format("listener {}: enable_mptcp can only be used with TCP listeners", name_)); } @@ -529,9 +528,8 @@ void ListenerImpl::buildInternalListener() { } } -void ListenerImpl::buildUdpListenerFactory(Network::Socket::Type socket_type, - uint32_t concurrency) { - if (socket_type != Network::Socket::Type::Datagram) { +void ListenerImpl::buildUdpListenerFactory(uint32_t concurrency) { + if (socket_type_ != Network::Socket::Type::Datagram) { return; } if (!reuse_port_ && concurrency > 1) { @@ -582,7 +580,7 @@ void ListenerImpl::buildUdpListenerFactory(Network::Socket::Type socket_type, } } -void ListenerImpl::buildListenSocketOptions(Network::Socket::Type socket_type) { +void ListenerImpl::buildListenSocketOptions() { // The process-wide `signal()` handling may fail to handle SIGPIPE if overridden // in the process (i.e., on a mobile client). Some OSes support handling it at the socket layer: if (ENVOY_SOCKET_SO_NOSIGPIPE.hasValue()) { @@ -601,7 +599,7 @@ void ListenerImpl::buildListenSocketOptions(Network::Socket::Type socket_type) { addListenSocketOptions( Network::SocketOptionFactory::buildLiteralOptions(config_.socket_options())); } - if (socket_type == Network::Socket::Type::Datagram) { + if (socket_type_ == Network::Socket::Type::Datagram) { // Needed for recvmsg to return destination address in IP header. addListenSocketOptions(Network::SocketOptionFactory::buildIpPacketInfoOptions()); // Needed to return receive buffer overflown indicator. @@ -619,9 +617,9 @@ void ListenerImpl::buildListenSocketOptions(Network::Socket::Type socket_type) { } } -void ListenerImpl::createListenerFilterFactories(Network::Socket::Type socket_type) { +void ListenerImpl::createListenerFilterFactories() { if (!config_.listener_filters().empty()) { - switch (socket_type) { + switch (socket_type_) { case Network::Socket::Type::Datagram: udp_listener_filter_factories_ = parent_.factory_.createUdpListenerFilterFactoryList( config_.listener_filters(), *listener_factory_context_); @@ -634,9 +632,9 @@ void ListenerImpl::createListenerFilterFactories(Network::Socket::Type socket_ty } } -void ListenerImpl::validateFilterChains(Network::Socket::Type socket_type) { +void ListenerImpl::validateFilterChains() { if (config_.filter_chains().empty() && !config_.has_default_filter_chain() && - (socket_type == Network::Socket::Type::Stream || + (socket_type_ == Network::Socket::Type::Stream || !udp_listener_config_->listener_factory_->isTransportConnectionless())) { // If we got here, this is a tcp listener or connection-oriented udp listener, so ensure there // is a filter chain specified diff --git a/source/server/listener_impl.h b/source/server/listener_impl.h index 59693b4037b9b..bebe2ad861e8e 100644 --- a/source/server/listener_impl.h +++ b/source/server/listener_impl.h @@ -303,7 +303,7 @@ class ListenerImpl final : public Network::ListenerConfig, DrainManager& localDrainManager() const { return listener_factory_context_->listener_factory_context_base_->drainManager(); } - void setSocketFactory(Network::ListenSocketFactoryPtr&& socket_factory); + void addSocketFactory(Network::ListenSocketFactoryPtr&& socket_factory); void setSocketAndOptions(const Network::SocketSharedPtr& socket); const Network::Socket::OptionsSharedPtr& listenSocketOptions() { return listen_socket_options_; } const std::string& versionInfo() const { return version_info_; } @@ -421,11 +421,11 @@ class ListenerImpl final : public Network::ListenerConfig, // Helpers for constructor. void buildAccessLog(); void buildInternalListener(); - void validateConfig(Network::Socket::Type socket_type); - void buildUdpListenerFactory(Network::Socket::Type socket_type, uint32_t concurrency); - void buildListenSocketOptions(Network::Socket::Type socket_type); - void createListenerFilterFactories(Network::Socket::Type socket_type); - void validateFilterChains(Network::Socket::Type socket_type); + void validateConfig(); + void buildUdpListenerFactory(uint32_t concurrency); + void buildListenSocketOptions(); + void createListenerFilterFactories(); + void validateFilterChains(); void buildFilterChains(); void buildConnectionBalancer(const Network::Address::Instance& address); void buildSocketOptions(); @@ -439,6 +439,7 @@ class ListenerImpl final : public Network::ListenerConfig, ListenerManagerImpl& parent_; Network::Address::InstanceConstSharedPtr address_; + const Network::Socket::Type socket_type_; std::vector socket_factories_; const bool bind_to_port_; diff --git a/source/server/listener_manager_impl.cc b/source/server/listener_manager_impl.cc index 9a15d31d54445..db6f115f94ce3 100644 --- a/source/server/listener_manager_impl.cc +++ b/source/server/listener_manager_impl.cc @@ -487,7 +487,7 @@ bool ListenerManagerImpl::addOrUpdateListenerInternal( if (!(*existing_warming_listener)->hasCompatibleAddress(*new_listener)) { setNewOrDrainingSocketFactory(name, config.address(), *new_listener); } else { - new_listener->setSocketFactory((*existing_warming_listener)->getSocketFactory().clone()); + new_listener->addSocketFactory((*existing_warming_listener)->getSocketFactory().clone()); } *existing_warming_listener = std::move(new_listener); } else if (existing_active_listener != active_listeners_.end()) { @@ -496,7 +496,7 @@ bool ListenerManagerImpl::addOrUpdateListenerInternal( if (!(*existing_active_listener)->hasCompatibleAddress(*new_listener)) { setNewOrDrainingSocketFactory(name, config.address(), *new_listener); } else { - new_listener->setSocketFactory((*existing_active_listener)->getSocketFactory().clone()); + new_listener->addSocketFactory((*existing_active_listener)->getSocketFactory().clone()); } if (workers_started_) { new_listener->debugLog("add warming listener"); @@ -1054,7 +1054,7 @@ void ListenerManagerImpl::setNewOrDrainingSocketFactory( } } - listener.setSocketFactory(draining_listen_socket_factory != nullptr + listener.addSocketFactory(draining_listen_socket_factory != nullptr ? draining_listen_socket_factory->clone() : createListenSocketFactory(proto_address, listener)); } diff --git a/test/extensions/bootstrap/internal_listener/active_internal_listener_test.cc b/test/extensions/bootstrap/internal_listener/active_internal_listener_test.cc index c093faadda537..791457ed190bd 100644 --- a/test/extensions/bootstrap/internal_listener/active_internal_listener_test.cc +++ b/test/extensions/bootstrap/internal_listener/active_internal_listener_test.cc @@ -323,7 +323,9 @@ class ConnectionHandlerTest : public testing::Test, protected Logger::Loggable& accessLogs() const override { return access_logs_; } diff --git a/test/server/active_tcp_listener_test.cc b/test/server/active_tcp_listener_test.cc index 95f4a329387e6..daf7d0e81f1aa 100644 --- a/test/server/active_tcp_listener_test.cc +++ b/test/server/active_tcp_listener_test.cc @@ -648,7 +648,7 @@ TEST_F(ActiveTcpListenerTest, Rebalance) { return Network::BalancedConnectionHandlerOptRef(*active_listener2); })); auto filter_factory_callback = std::make_shared>(); - auto transport_socket_factory = Network::Test::createRawBufferSocketFactory(); + auto transport_socket_factory = Network::Test::createRawBufferDownstreamSocketFactory(); filter_chain_ = std::make_shared>(); EXPECT_CALL(conn_handler_, incNumConnections()); diff --git a/test/server/connection_handler_test.cc b/test/server/connection_handler_test.cc index c1752fe8e89a2..21a409c3ceb2e 100644 --- a/test/server/connection_handler_test.cc +++ b/test/server/connection_handler_test.cc @@ -2107,7 +2107,7 @@ TEST_F(ConnectionHandlerTest, TcpListenerInplaceUpdateWithoutUdpInplaceSupport) TestListener* old_test_listener = addListener(old_listener_tag, true, false, "test_listener", old_listener, - &old_listener_callbacks, mock_connection_balancer, ¤t_handler); + &old_listener_callbacks, nullptr, mock_connection_balancer, ¤t_handler); EXPECT_CALL(old_test_listener->socketFactory(), localAddress()) .WillRepeatedly(ReturnRef(local_address_)); handler_->addListener(absl::nullopt, *old_test_listener, runtime_); @@ -2117,10 +2117,11 @@ TEST_F(ConnectionHandlerTest, TcpListenerInplaceUpdateWithoutUdpInplaceSupport) auto overridden_filter_chain_manager = std::make_shared>(); - TestListener* new_test_listener = addListener( - new_listener_tag, true, false, "test_listener", /* Network::Listener */ nullptr, - &new_listener_callbacks, mock_connection_balancer, nullptr, Network::Socket::Type::Stream, - std::chrono::milliseconds(15000), false, overridden_filter_chain_manager); + TestListener* new_test_listener = + addListener(new_listener_tag, true, false, "test_listener", /* Network::Listener */ nullptr, + &new_listener_callbacks, nullptr, mock_connection_balancer, nullptr, + Network::Socket::Type::Stream, std::chrono::milliseconds(15000), false, + overridden_filter_chain_manager); EXPECT_CALL(new_test_listener->socketFactory(), socketType()) .WillOnce(Return(Network::Socket::Type::Stream)); handler_->addListener(old_listener_tag, *new_test_listener, runtime_); From 5a214247da32d99f7c4b5dc43d02413b74d6b099 Mon Sep 17 00:00:00 2001 From: He Jie Xu Date: Mon, 20 Jun 2022 09:20:02 +0000 Subject: [PATCH 07/16] fix clang-tidy Signed-off-by: He Jie Xu --- source/server/connection_handler_impl.cc | 2 +- test/integration/fake_upstream.h | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/source/server/connection_handler_impl.cc b/source/server/connection_handler_impl.cc index 34ceef1f77477..76b8c19f81dfd 100644 --- a/source/server/connection_handler_impl.cc +++ b/source/server/connection_handler_impl.cc @@ -331,7 +331,7 @@ ConnectionHandlerImpl::getBalancedHandlerByTag(uint64_t listener_tag, ASSERT(absl::holds_alternative>( details->typed_listener_) && details->listener_->listener() != nullptr); - return Network::BalancedConnectionHandlerOptRef(details->tcpListener().value().get()); + return {details->tcpListener().value().get()}; } } } diff --git a/test/integration/fake_upstream.h b/test/integration/fake_upstream.h index bd25aa00608cf..63281f172a61e 100644 --- a/test/integration/fake_upstream.h +++ b/test/integration/fake_upstream.h @@ -811,7 +811,7 @@ class FakeUpstream : Logger::Loggable, const std::string& name() const override { return name_; } Network::UdpListenerConfigOptRef udpListenerConfig() override { return udp_listener_config_; } Network::InternalListenerConfigOptRef internalListenerConfig() override { - return Network::InternalListenerConfigOptRef(); + return {}; } Network::ConnectionBalancer& connectionBalancer(const Network::Address::Instance&) override { return connection_balancer_; From 529d0d47ae78ec783b4a7b27138fb262eeae8447 Mon Sep 17 00:00:00 2001 From: He Jie Xu Date: Mon, 20 Jun 2022 10:17:40 +0000 Subject: [PATCH 08/16] fix format Signed-off-by: He Jie Xu --- test/integration/fake_upstream.h | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/test/integration/fake_upstream.h b/test/integration/fake_upstream.h index 63281f172a61e..fe677279fdf94 100644 --- a/test/integration/fake_upstream.h +++ b/test/integration/fake_upstream.h @@ -810,9 +810,7 @@ class FakeUpstream : Logger::Loggable, uint64_t listenerTag() const override { return 0; } const std::string& name() const override { return name_; } Network::UdpListenerConfigOptRef udpListenerConfig() override { return udp_listener_config_; } - Network::InternalListenerConfigOptRef internalListenerConfig() override { - return {}; - } + Network::InternalListenerConfigOptRef internalListenerConfig() override { return {}; } Network::ConnectionBalancer& connectionBalancer(const Network::Address::Instance&) override { return connection_balancer_; } From caf35669d27b99cc4b341bfda014dd2ac3467ed8 Mon Sep 17 00:00:00 2001 From: He Jie Xu Date: Wed, 22 Jun 2022 06:35:23 +0000 Subject: [PATCH 09/16] address comments Signed-off-by: He Jie Xu --- envoy/network/connection_handler.h | 3 +-- source/server/active_tcp_listener.h | 3 +++ source/server/listener_impl.h | 2 ++ 3 files changed, 6 insertions(+), 2 deletions(-) diff --git a/envoy/network/connection_handler.h b/envoy/network/connection_handler.h index 1a0ff3efd02af..109a13cc50848 100644 --- a/envoy/network/connection_handler.h +++ b/envoy/network/connection_handler.h @@ -3,6 +3,7 @@ #include #include +#include "envoy/network/address.h" #include "envoy/network/connection.h" #include "envoy/network/connection_balancer.h" #include "envoy/network/filter.h" @@ -13,8 +14,6 @@ #include "source/common/common/interval_value.h" -#include "address.h" - namespace Envoy { namespace Network { diff --git a/source/server/active_tcp_listener.h b/source/server/active_tcp_listener.h index f0a0ae49bd4a9..12661452fbf6e 100644 --- a/source/server/active_tcp_listener.h +++ b/source/server/active_tcp_listener.h @@ -87,6 +87,9 @@ class ActiveTcpListener final : public Network::TcpListenerCallbacks, std::atomic num_listener_connections_{}; Network::ConnectionBalancer& connection_balancer_; + // This is the address of this listener is listening on. And used for get the correct listener + // when rebalancing. The accepted socket can't be used to get the listening address, since + // the accepted socket's remote address can be another address than the listening address. Network::Address::InstanceConstSharedPtr address_; }; diff --git a/source/server/listener_impl.h b/source/server/listener_impl.h index bebe2ad861e8e..c4513966c73b9 100644 --- a/source/server/listener_impl.h +++ b/source/server/listener_impl.h @@ -471,6 +471,8 @@ class ListenerImpl final : public Network::ListenerConfig, const bool continue_on_listener_filters_timeout_; std::shared_ptr udp_listener_config_; std::unique_ptr internal_listener_config_; + // The key is the address string, the value is the address specific connection balancer. + // TODO (soulxu): Add hash support for address, then needn't a string address as key anymore. absl::flat_hash_map connection_balancers_; std::shared_ptr listener_factory_context_; FilterChainManagerImpl filter_chain_manager_; From cb8de67aa739ac144dabe7e326d4f2d81720e6dd Mon Sep 17 00:00:00 2001 From: He Jie Xu Date: Wed, 22 Jun 2022 09:52:55 +0000 Subject: [PATCH 10/16] Add multiple address test for connection handler Signed-off-by: He Jie Xu --- test/server/connection_handler_test.cc | 223 ++++++++++++++++++++----- 1 file changed, 180 insertions(+), 43 deletions(-) diff --git a/test/server/connection_handler_test.cc b/test/server/connection_handler_test.cc index 21a409c3ceb2e..2b86a84a51f42 100644 --- a/test/server/connection_handler_test.cc +++ b/test/server/connection_handler_test.cc @@ -63,9 +63,9 @@ class ConnectionHandlerTest : public testing::Test, protected Logger::Loggable access_log, std::shared_ptr> filter_chain_manager = nullptr, uint32_t tcp_backlog_size = ENVOY_TCP_BACKLOG_SIZE, - Network::ConnectionBalancerSharedPtr connection_balancer = nullptr, bool ignore_global_conn_limit = false, int num_of_socket_factories = 1) - : parent_(parent), socket_(std::make_shared>()), + : parent_(parent), tag_(tag), bind_to_port_(bind_to_port), tcp_backlog_size_(tcp_backlog_size), hand_off_restored_destination_connections_(hand_off_restored_destination_connections), name_(name), listener_filters_timeout_(listener_filters_timeout), continue_on_listener_filters_timeout_(continue_on_listener_filters_timeout), - connection_balancer_(connection_balancer == nullptr - ? std::make_shared() - : connection_balancer), access_logs_({access_log}), inline_filter_chain_manager_(filter_chain_manager), init_manager_(nullptr), ignore_global_conn_limit_(ignore_global_conn_limit) { for (int i = 0; i < num_of_socket_factories; i++) { socket_factories_.emplace_back(std::make_unique()); + sockets_.emplace_back(std::make_shared>()); + ON_CALL(*sockets_.back().get(), socketType()).WillByDefault(Return(socket_type)); } envoy::config::listener::v3::UdpListenerConfig udp_config; udp_listener_config_ = std::make_unique(udp_config); udp_listener_config_->listener_factory_ = std::make_unique(1); udp_listener_config_->writer_factory_ = std::make_unique(); - ON_CALL(*socket_, socketType()).WillByDefault(Return(socket_type)); } struct UdpListenerConfigImpl : public Network::UdpListenerConfig { @@ -149,9 +146,6 @@ class ConnectionHandlerTest : public testing::Test, protected Logger::Loggable& accessLogs() const override { return access_logs_; } @@ -165,7 +159,7 @@ class ConnectionHandlerTest : public testing::Test, protected Logger::Loggable> socket_; + std::vector>> sockets_; std::vector socket_factories_; uint64_t tag_; bool bind_to_port_; @@ -175,7 +169,6 @@ class ConnectionHandlerTest : public testing::Test, protected Logger::Loggable udp_listener_config_; - Network::ConnectionBalancerSharedPtr connection_balancer_; BasicResourceLimitImpl open_connections_; const std::vector access_logs_; std::shared_ptr> inline_filter_chain_manager_; @@ -185,7 +178,60 @@ class ConnectionHandlerTest : public testing::Test, protected Logger::Loggable; + class TestListener : public TestListenerBase { + public: + TestListener( + ConnectionHandlerTest& parent, uint64_t tag, bool bind_to_port, + bool hand_off_restored_destination_connections, const std::string& name, + Network::Socket::Type socket_type, std::chrono::milliseconds listener_filters_timeout, + bool continue_on_listener_filters_timeout, + std::shared_ptr access_log, + std::shared_ptr> filter_chain_manager = nullptr, + uint32_t tcp_backlog_size = ENVOY_TCP_BACKLOG_SIZE, + Network::ConnectionBalancerSharedPtr connection_balancer = nullptr, + bool ignore_global_conn_limit = false, int num_of_socket_factories = 1) + : TestListenerBase(parent, tag, bind_to_port, hand_off_restored_destination_connections, name, + socket_type, listener_filters_timeout, continue_on_listener_filters_timeout, access_log, + filter_chain_manager, tcp_backlog_size, ignore_global_conn_limit, num_of_socket_factories), + connection_balancer_(connection_balancer == nullptr + ? std::make_shared() + : connection_balancer) { + } + Network::ConnectionBalancer& connectionBalancer(const Network::Address::Instance&) override { + return *connection_balancer_; + } + + Network::ConnectionBalancerSharedPtr connection_balancer_; + }; + + class TestMultiAddressesListener : public TestListenerBase { + public: + TestMultiAddressesListener( + ConnectionHandlerTest& parent, uint64_t tag, bool bind_to_port, + bool hand_off_restored_destination_connections, const std::string& name, + Network::Socket::Type socket_type, std::chrono::milliseconds listener_filters_timeout, + bool continue_on_listener_filters_timeout, + std::shared_ptr access_log, + absl::flat_hash_map& connection_balancers, + std::shared_ptr> filter_chain_manager = nullptr, + uint32_t tcp_backlog_size = ENVOY_TCP_BACKLOG_SIZE, + bool ignore_global_conn_limit = false, int num_of_socket_factories = 1) + : TestListenerBase(parent, tag, bind_to_port, hand_off_restored_destination_connections, name, + socket_type, listener_filters_timeout, continue_on_listener_filters_timeout, access_log, + filter_chain_manager, tcp_backlog_size, ignore_global_conn_limit, num_of_socket_factories), + connection_balancers_(connection_balancers) { + } + + Network::ConnectionBalancer& connectionBalancer(const Network::Address::Instance& address) override { + auto iter = connection_balancers_.find(address.asString()); + EXPECT_NE(iter, connection_balancers_.end()); + return *iter->second; + } + + absl::flat_hash_map connection_balancers_; + }; + + using TestListenerPtr = std::unique_ptr; class MockUpstreamUdpFilter : public Network::UdpListenerReadFilter { public: @@ -236,16 +282,18 @@ class ConnectionHandlerTest : public testing::Test, protected Logger::Loggable> overridden_filter_chain_manager = nullptr, uint32_t tcp_backlog_size = ENVOY_TCP_BACKLOG_SIZE, bool ignore_global_conn_limit = false) { - listeners_.emplace_back(std::make_unique( + auto test_listener = std::make_unique( *this, tag, bind_to_port, hand_off_restored_destination_connections, name, socket_type, listener_filters_timeout, continue_on_listener_filters_timeout, access_log_, overridden_filter_chain_manager, tcp_backlog_size, connection_balancer, - ignore_global_conn_limit)); + ignore_global_conn_limit); + TestListener* test_listener_raw_ptr = test_listener.get(); + listeners_.emplace_back(std::move(test_listener)); if (listener == nullptr) { // Expecting listener config in place update. // If so, dispatcher would not create new network listener. - return listeners_.back().get(); + return test_listener_raw_ptr; } EXPECT_CALL(listeners_.back()->socketFactory(), socketType()).WillOnce(Return(socket_type)); if (address == nullptr) { @@ -256,7 +304,7 @@ class ConnectionHandlerTest : public testing::Test, protected Logger::LoggablesocketFactory(), getListenSocket(_)) - .WillOnce(Return(listeners_.back()->socket_)); + .WillOnce(Return(listeners_.back()->sockets_[0])); if (socket_type == Network::Socket::Type::Stream) { EXPECT_CALL(dispatcher_, createListener_(_, _, _, _, _)) .WillOnce(Invoke([listener, listener_callbacks]( @@ -285,18 +333,26 @@ class ConnectionHandlerTest : public testing::Test, protected Logger::Loggable& mock_listeners, std::vector& addresses, - Network::TcpListenerCallbacks** listener_callbacks = nullptr, bool disable_listener = false) { - listeners_.emplace_back(std::make_unique( + absl::flat_hash_map& connection_balancers, + absl::flat_hash_map& listener_callbacks_map, bool disable_listener = false) { + if (connection_balancers.empty()) { + for (auto& address : addresses) { + connection_balancers.emplace(address->asString(), std::make_shared()); + } + } + auto test_listener = std::make_unique( *this, tag, bind_to_port, hand_off_restored_destination_connections, name, Network::Socket::Type::Stream, std::chrono::milliseconds(15000), false, access_log_, - nullptr, ENVOY_TCP_BACKLOG_SIZE, nullptr, false, mock_listeners.size())); + connection_balancers, nullptr, ENVOY_TCP_BACKLOG_SIZE, false, mock_listeners.size()); + TestMultiAddressesListener* test_listener_raw_ptr = test_listener.get(); + listeners_.emplace_back(std::move(test_listener)); EXPECT_CALL(listeners_.back()->socketFactory(0), socketType()) .WillOnce(Return(Network::Socket::Type::Stream)); @@ -304,15 +360,16 @@ class ConnectionHandlerTest : public testing::Test, protected Logger::LoggablesocketFactory(i), localAddress()) .WillRepeatedly(ReturnRef(addresses[i])); EXPECT_CALL(listeners_.back()->socketFactory(i), getListenSocket(_)) - .WillOnce(Return(listeners_.back()->socket_)); + .WillOnce(Return(listeners_.back()->sockets_[i])); + test_listener_raw_ptr->sockets_[i]->connection_info_provider_->setLocalAddress(addresses[i]); EXPECT_CALL(dispatcher_, createListener_(_, _, _, _, _)) - .WillOnce(Invoke([i, &mock_listeners, listener_callbacks]( - Network::SocketSharedPtr&&, Network::TcpListenerCallbacks& cb, + .WillOnce(Invoke([i, &mock_listeners, &listener_callbacks_map]( + Network::SocketSharedPtr&& socket, Network::TcpListenerCallbacks& cb, Runtime::Loader&, bool, bool) -> Network::Listener* { - if (listener_callbacks != nullptr) { - *listener_callbacks = &cb; - } + auto listener_callbacks_iter = listener_callbacks_map.find(socket->connectionInfoProvider().localAddress()->asString()); + EXPECT_NE(listener_callbacks_iter, listener_callbacks_map.end()); + *listener_callbacks_iter->second = &cb; return mock_listeners[i]; })) .RetiresOnSaturation(); @@ -322,7 +379,7 @@ class ConnectionHandlerTest : public testing::Test, protected Logger::Loggable(); auto listener2 = new NiceMock(); std::vector mock_listeners; @@ -558,13 +614,19 @@ TEST_F(ConnectionHandlerTest, RemoveListenerWithMultiAddrs) { std::vector addresses; addresses.emplace_back(address1); addresses.emplace_back(address2); - TestListener* test_listener = addMultiAddrsListener( - 1, true, false, "test_listener", mock_listeners, addresses, &listener_callbacks); + Network::TcpListenerCallbacks* listener_callbacks1; + Network::TcpListenerCallbacks* listener_callbacks2; + absl::flat_hash_map listener_callbacks_map; + listener_callbacks_map.emplace(address1->asString(), &listener_callbacks1); + listener_callbacks_map.emplace(address2->asString(), &listener_callbacks2); + absl::flat_hash_map connection_balancers; + TestMultiAddressesListener* test_listener = addMultiAddrsListener( + 1, true, false, "test_listener", mock_listeners, addresses, connection_balancers, listener_callbacks_map); handler_->addListener(absl::nullopt, *test_listener, runtime_); Network::MockConnectionSocket* connection = new NiceMock(); EXPECT_CALL(*access_log_, log(_, _, _, _)); - listener_callbacks->onAccept(Network::ConnectionSocketPtr{connection}); + listener_callbacks1->onAccept(Network::ConnectionSocketPtr{connection}); EXPECT_EQ(0UL, handler_->numConnections()); // Test stop/remove of not existent listener. @@ -599,7 +661,6 @@ TEST_F(ConnectionHandlerTest, DisableListener) { } TEST_F(ConnectionHandlerTest, DisableListenerWithMultiAddrs) { - Network::TcpListenerCallbacks* listener_callbacks; auto listener1 = new NiceMock(); auto listener2 = new NiceMock(); std::vector mock_listeners; @@ -612,8 +673,14 @@ TEST_F(ConnectionHandlerTest, DisableListenerWithMultiAddrs) { std::vector addresses; addresses.emplace_back(address1); addresses.emplace_back(address2); - TestListener* test_listener = addMultiAddrsListener( - 1, false, false, "test_listener", mock_listeners, addresses, &listener_callbacks); + Network::TcpListenerCallbacks* listener_callbacks1; + Network::TcpListenerCallbacks* listener_callbacks2; + absl::flat_hash_map listener_callbacks_map; + listener_callbacks_map.emplace(address1->asString(), &listener_callbacks1); + listener_callbacks_map.emplace(address2->asString(), &listener_callbacks2); + absl::flat_hash_map connection_balancers; + TestMultiAddressesListener* test_listener = addMultiAddrsListener( + 1, false, false, "test_listener", mock_listeners, addresses, connection_balancers, listener_callbacks_map); handler_->addListener(absl::nullopt, *test_listener, runtime_); EXPECT_CALL(*listener1, disable()); @@ -624,6 +691,66 @@ TEST_F(ConnectionHandlerTest, DisableListenerWithMultiAddrs) { handler_->disableListeners(); } +TEST_F(ConnectionHandlerTest, RebalanceWithMultiAddressListener) { + auto listener1 = new NiceMock(); + auto listener2 = new NiceMock(); + std::vector mock_listeners; + mock_listeners.emplace_back(listener1); + mock_listeners.emplace_back(listener2); + Network::Address::InstanceConstSharedPtr address1( + new Network::Address::Ipv4Instance("127.0.0.1", 80, nullptr)); + Network::Address::InstanceConstSharedPtr address2( + new Network::Address::Ipv4Instance("127.0.0.2", 80, nullptr)); + std::vector addresses; + addresses.emplace_back(address1); + addresses.emplace_back(address2); + + Network::TcpListenerCallbacks* listener_callbacks1; + Network::TcpListenerCallbacks* listener_callbacks2; + absl::flat_hash_map listener_callbacks_map; + listener_callbacks_map.emplace(address1->asString(), &listener_callbacks1); + listener_callbacks_map.emplace(address2->asString(), &listener_callbacks2); + + absl::flat_hash_map connection_balancers; + auto mock_connection_balancer1 = std::make_shared(); + auto mock_connection_balancer2 = std::make_shared(); + connection_balancers.emplace(address1->asString(), mock_connection_balancer1); + connection_balancers.emplace(address2->asString(), mock_connection_balancer2); + + Network::BalancedConnectionHandler* current_handler1; + Network::BalancedConnectionHandler* current_handler2; + + EXPECT_CALL(*mock_connection_balancer1, registerHandler(_)).WillOnce(SaveArgAddress(¤t_handler1)); + EXPECT_CALL(*mock_connection_balancer2, registerHandler(_)).WillOnce(SaveArgAddress(¤t_handler2)); + + TestMultiAddressesListener* test_listener = addMultiAddrsListener( + 1, false, false, "test_listener", mock_listeners, addresses, connection_balancers, listener_callbacks_map); + handler_->addListener(absl::nullopt, *test_listener, runtime_); + + // Send connection to the first listener, expect mock_connection_balancer1 will be called. + // then mock_connection_balancer1 will rebalance the connection to the same listener. + EXPECT_CALL(*mock_connection_balancer1, pickTargetHandler(_)).WillOnce(ReturnRef(*current_handler1)); + EXPECT_CALL(*access_log_, log(_, _, _, _)); + EXPECT_CALL(manager_, findFilterChain(_)).WillOnce(Return(nullptr)); + + current_handler1->incNumConnections(); + listener_callbacks1->onAccept(std::make_unique>()); + + // Send connection to the second listener, expect mock_connection_balancer2 will be called. + // then mock_connection_balancer2 will rebalance the connection to the same listener. + EXPECT_CALL(*mock_connection_balancer2, pickTargetHandler(_)).WillOnce(ReturnRef(*current_handler2)); + EXPECT_CALL(*access_log_, log(_, _, _, _)); + EXPECT_CALL(manager_, findFilterChain(_)).WillOnce(Return(nullptr)); + + current_handler2->incNumConnections(); + listener_callbacks2->onAccept(std::make_unique>()); + + EXPECT_CALL(*mock_connection_balancer1, unregisterHandler(_)); + EXPECT_CALL(*mock_connection_balancer2, unregisterHandler(_)); + EXPECT_CALL(*listener1, onDestroy()); + EXPECT_CALL(*listener2, onDestroy()); +} + // Envoy doesn't have such case yet, just ensure the code won't break with it. TEST_F(ConnectionHandlerTest, StopAndDisableStoppedListener) { InSequence s; @@ -659,7 +786,6 @@ TEST_F(ConnectionHandlerTest, AddDisabledListener) { } TEST_F(ConnectionHandlerTest, AddDisabledListenerWithMultiAddrs) { - Network::TcpListenerCallbacks* listener_callbacks; auto listener1 = new NiceMock(); auto listener2 = new NiceMock(); std::vector mock_listeners; @@ -672,8 +798,14 @@ TEST_F(ConnectionHandlerTest, AddDisabledListenerWithMultiAddrs) { std::vector addresses; addresses.emplace_back(address1); addresses.emplace_back(address2); - TestListener* test_listener = addMultiAddrsListener( - 1, false, false, "test_listener", mock_listeners, addresses, &listener_callbacks, true); + Network::TcpListenerCallbacks* listener_callbacks1; + Network::TcpListenerCallbacks* listener_callbacks2; + absl::flat_hash_map listener_callbacks_map; + listener_callbacks_map.emplace(address1->asString(), &listener_callbacks1); + listener_callbacks_map.emplace(address2->asString(), &listener_callbacks2); + absl::flat_hash_map connection_balancers; + TestMultiAddressesListener* test_listener = addMultiAddrsListener( + 1, false, false, "test_listener", mock_listeners, addresses, connection_balancers, listener_callbacks_map, true); EXPECT_CALL(*listener1, onDestroy()); EXPECT_CALL(*listener2, onDestroy()); @@ -864,7 +996,6 @@ TEST_F(ConnectionHandlerTest, NormalRedirect) { } TEST_F(ConnectionHandlerTest, NormalRedirectWithMultiAddrs) { - Network::TcpListenerCallbacks* listener_callbacks1; auto listener1 = new NiceMock(); auto listener2 = new NiceMock(); std::vector mock_listeners; @@ -877,8 +1008,14 @@ TEST_F(ConnectionHandlerTest, NormalRedirectWithMultiAddrs) { std::vector addresses; addresses.emplace_back(normal_address); addresses.emplace_back(alt_address); - TestListener* test_listener1 = addMultiAddrsListener( - 1, true, true, "test_listener1", mock_listeners, addresses, &listener_callbacks1); + Network::TcpListenerCallbacks* listener_callbacks1; + Network::TcpListenerCallbacks* listener_callbacks2; + absl::flat_hash_map listener_callbacks_map; + listener_callbacks_map.emplace(normal_address->asString(), &listener_callbacks1); + listener_callbacks_map.emplace(alt_address->asString(), &listener_callbacks2); + absl::flat_hash_map connection_balancers; + TestMultiAddressesListener* test_listener1 = addMultiAddrsListener( + 1, true, true, "test_listener1", mock_listeners, addresses, connection_balancers, listener_callbacks_map); handler_->addListener(absl::nullopt, *test_listener1, runtime_); auto* test_filter = new NiceMock(); From 23ad91bbec02847ff72093662135af90e3a46149 Mon Sep 17 00:00:00 2001 From: He Jie Xu Date: Wed, 22 Jun 2022 09:53:48 +0000 Subject: [PATCH 11/16] fix format Signed-off-by: He Jie Xu --- test/server/connection_handler_test.cc | 96 +++++++++++++++----------- 1 file changed, 55 insertions(+), 41 deletions(-) diff --git a/test/server/connection_handler_test.cc b/test/server/connection_handler_test.cc index 2b86a84a51f42..83f49b4c0deca 100644 --- a/test/server/connection_handler_test.cc +++ b/test/server/connection_handler_test.cc @@ -72,10 +72,10 @@ class ConnectionHandlerTest : public testing::Test, protected Logger::Loggable access_log, std::shared_ptr> filter_chain_manager = nullptr, - uint32_t tcp_backlog_size = ENVOY_TCP_BACKLOG_SIZE, - bool ignore_global_conn_limit = false, int num_of_socket_factories = 1) - : parent_(parent), - tag_(tag), bind_to_port_(bind_to_port), tcp_backlog_size_(tcp_backlog_size), + uint32_t tcp_backlog_size = ENVOY_TCP_BACKLOG_SIZE, bool ignore_global_conn_limit = false, + int num_of_socket_factories = 1) + : parent_(parent), tag_(tag), bind_to_port_(bind_to_port), + tcp_backlog_size_(tcp_backlog_size), hand_off_restored_destination_connections_(hand_off_restored_destination_connections), name_(name), listener_filters_timeout_(listener_filters_timeout), continue_on_listener_filters_timeout_(continue_on_listener_filters_timeout), @@ -179,7 +179,7 @@ class ConnectionHandlerTest : public testing::Test, protected Logger::Loggable() - : connection_balancer) { - } + : connection_balancer) {} Network::ConnectionBalancer& connectionBalancer(const Network::Address::Instance&) override { return *connection_balancer_; } @@ -205,24 +205,26 @@ class ConnectionHandlerTest : public testing::Test, protected Logger::Loggable access_log, - absl::flat_hash_map& connection_balancers, + absl::flat_hash_map& + connection_balancers, std::shared_ptr> filter_chain_manager = nullptr, - uint32_t tcp_backlog_size = ENVOY_TCP_BACKLOG_SIZE, - bool ignore_global_conn_limit = false, int num_of_socket_factories = 1) - : TestListenerBase(parent, tag, bind_to_port, hand_off_restored_destination_connections, name, - socket_type, listener_filters_timeout, continue_on_listener_filters_timeout, access_log, - filter_chain_manager, tcp_backlog_size, ignore_global_conn_limit, num_of_socket_factories), - connection_balancers_(connection_balancers) { - } - - Network::ConnectionBalancer& connectionBalancer(const Network::Address::Instance& address) override { + uint32_t tcp_backlog_size = ENVOY_TCP_BACKLOG_SIZE, bool ignore_global_conn_limit = false, + int num_of_socket_factories = 1) + : TestListenerBase(parent, tag, bind_to_port, hand_off_restored_destination_connections, + name, socket_type, listener_filters_timeout, + continue_on_listener_filters_timeout, access_log, filter_chain_manager, + tcp_backlog_size, ignore_global_conn_limit, num_of_socket_factories), + connection_balancers_(connection_balancers) {} + + Network::ConnectionBalancer& + connectionBalancer(const Network::Address::Instance& address) override { auto iter = connection_balancers_.find(address.asString()); EXPECT_NE(iter, connection_balancers_.end()); return *iter->second; @@ -341,11 +343,13 @@ class ConnectionHandlerTest : public testing::Test, protected Logger::Loggable& mock_listeners, std::vector& addresses, absl::flat_hash_map& connection_balancers, - absl::flat_hash_map& listener_callbacks_map, bool disable_listener = false) { + absl::flat_hash_map& listener_callbacks_map, + bool disable_listener = false) { if (connection_balancers.empty()) { - for (auto& address : addresses) { - connection_balancers.emplace(address->asString(), std::make_shared()); - } + for (auto& address : addresses) { + connection_balancers.emplace(address->asString(), + std::make_shared()); + } } auto test_listener = std::make_unique( *this, tag, bind_to_port, hand_off_restored_destination_connections, name, @@ -367,7 +371,8 @@ class ConnectionHandlerTest : public testing::Test, protected Logger::Loggable Network::Listener* { - auto listener_callbacks_iter = listener_callbacks_map.find(socket->connectionInfoProvider().localAddress()->asString()); + auto listener_callbacks_iter = listener_callbacks_map.find( + socket->connectionInfoProvider().localAddress()->asString()); EXPECT_NE(listener_callbacks_iter, listener_callbacks_map.end()); *listener_callbacks_iter->second = &cb; return mock_listeners[i]; @@ -620,8 +625,9 @@ TEST_F(ConnectionHandlerTest, RemoveListenerWithMultiAddrs) { listener_callbacks_map.emplace(address1->asString(), &listener_callbacks1); listener_callbacks_map.emplace(address2->asString(), &listener_callbacks2); absl::flat_hash_map connection_balancers; - TestMultiAddressesListener* test_listener = addMultiAddrsListener( - 1, true, false, "test_listener", mock_listeners, addresses, connection_balancers, listener_callbacks_map); + TestMultiAddressesListener* test_listener = + addMultiAddrsListener(1, true, false, "test_listener", mock_listeners, addresses, + connection_balancers, listener_callbacks_map); handler_->addListener(absl::nullopt, *test_listener, runtime_); Network::MockConnectionSocket* connection = new NiceMock(); @@ -679,8 +685,9 @@ TEST_F(ConnectionHandlerTest, DisableListenerWithMultiAddrs) { listener_callbacks_map.emplace(address1->asString(), &listener_callbacks1); listener_callbacks_map.emplace(address2->asString(), &listener_callbacks2); absl::flat_hash_map connection_balancers; - TestMultiAddressesListener* test_listener = addMultiAddrsListener( - 1, false, false, "test_listener", mock_listeners, addresses, connection_balancers, listener_callbacks_map); + TestMultiAddressesListener* test_listener = + addMultiAddrsListener(1, false, false, "test_listener", mock_listeners, addresses, + connection_balancers, listener_callbacks_map); handler_->addListener(absl::nullopt, *test_listener, runtime_); EXPECT_CALL(*listener1, disable()); @@ -720,16 +727,20 @@ TEST_F(ConnectionHandlerTest, RebalanceWithMultiAddressListener) { Network::BalancedConnectionHandler* current_handler1; Network::BalancedConnectionHandler* current_handler2; - EXPECT_CALL(*mock_connection_balancer1, registerHandler(_)).WillOnce(SaveArgAddress(¤t_handler1)); - EXPECT_CALL(*mock_connection_balancer2, registerHandler(_)).WillOnce(SaveArgAddress(¤t_handler2)); + EXPECT_CALL(*mock_connection_balancer1, registerHandler(_)) + .WillOnce(SaveArgAddress(¤t_handler1)); + EXPECT_CALL(*mock_connection_balancer2, registerHandler(_)) + .WillOnce(SaveArgAddress(¤t_handler2)); - TestMultiAddressesListener* test_listener = addMultiAddrsListener( - 1, false, false, "test_listener", mock_listeners, addresses, connection_balancers, listener_callbacks_map); + TestMultiAddressesListener* test_listener = + addMultiAddrsListener(1, false, false, "test_listener", mock_listeners, addresses, + connection_balancers, listener_callbacks_map); handler_->addListener(absl::nullopt, *test_listener, runtime_); // Send connection to the first listener, expect mock_connection_balancer1 will be called. // then mock_connection_balancer1 will rebalance the connection to the same listener. - EXPECT_CALL(*mock_connection_balancer1, pickTargetHandler(_)).WillOnce(ReturnRef(*current_handler1)); + EXPECT_CALL(*mock_connection_balancer1, pickTargetHandler(_)) + .WillOnce(ReturnRef(*current_handler1)); EXPECT_CALL(*access_log_, log(_, _, _, _)); EXPECT_CALL(manager_, findFilterChain(_)).WillOnce(Return(nullptr)); @@ -738,7 +749,8 @@ TEST_F(ConnectionHandlerTest, RebalanceWithMultiAddressListener) { // Send connection to the second listener, expect mock_connection_balancer2 will be called. // then mock_connection_balancer2 will rebalance the connection to the same listener. - EXPECT_CALL(*mock_connection_balancer2, pickTargetHandler(_)).WillOnce(ReturnRef(*current_handler2)); + EXPECT_CALL(*mock_connection_balancer2, pickTargetHandler(_)) + .WillOnce(ReturnRef(*current_handler2)); EXPECT_CALL(*access_log_, log(_, _, _, _)); EXPECT_CALL(manager_, findFilterChain(_)).WillOnce(Return(nullptr)); @@ -804,8 +816,9 @@ TEST_F(ConnectionHandlerTest, AddDisabledListenerWithMultiAddrs) { listener_callbacks_map.emplace(address1->asString(), &listener_callbacks1); listener_callbacks_map.emplace(address2->asString(), &listener_callbacks2); absl::flat_hash_map connection_balancers; - TestMultiAddressesListener* test_listener = addMultiAddrsListener( - 1, false, false, "test_listener", mock_listeners, addresses, connection_balancers, listener_callbacks_map, true); + TestMultiAddressesListener* test_listener = + addMultiAddrsListener(1, false, false, "test_listener", mock_listeners, addresses, + connection_balancers, listener_callbacks_map, true); EXPECT_CALL(*listener1, onDestroy()); EXPECT_CALL(*listener2, onDestroy()); @@ -1014,8 +1027,9 @@ TEST_F(ConnectionHandlerTest, NormalRedirectWithMultiAddrs) { listener_callbacks_map.emplace(normal_address->asString(), &listener_callbacks1); listener_callbacks_map.emplace(alt_address->asString(), &listener_callbacks2); absl::flat_hash_map connection_balancers; - TestMultiAddressesListener* test_listener1 = addMultiAddrsListener( - 1, true, true, "test_listener1", mock_listeners, addresses, connection_balancers, listener_callbacks_map); + TestMultiAddressesListener* test_listener1 = + addMultiAddrsListener(1, true, true, "test_listener1", mock_listeners, addresses, + connection_balancers, listener_callbacks_map); handler_->addListener(absl::nullopt, *test_listener1, runtime_); auto* test_filter = new NiceMock(); From a7c93cdb93ace959f7d3e0e583d4b104204d01a7 Mon Sep 17 00:00:00 2001 From: He Jie Xu Date: Wed, 22 Jun 2022 09:55:07 +0000 Subject: [PATCH 12/16] fix spell Signed-off-by: He Jie Xu --- test/server/connection_handler_test.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/server/connection_handler_test.cc b/test/server/connection_handler_test.cc index 83f49b4c0deca..b325dfef001bb 100644 --- a/test/server/connection_handler_test.cc +++ b/test/server/connection_handler_test.cc @@ -738,7 +738,7 @@ TEST_F(ConnectionHandlerTest, RebalanceWithMultiAddressListener) { handler_->addListener(absl::nullopt, *test_listener, runtime_); // Send connection to the first listener, expect mock_connection_balancer1 will be called. - // then mock_connection_balancer1 will rebalance the connection to the same listener. + // then mock_connection_balancer1 will balance the connection to the same listener. EXPECT_CALL(*mock_connection_balancer1, pickTargetHandler(_)) .WillOnce(ReturnRef(*current_handler1)); EXPECT_CALL(*access_log_, log(_, _, _, _)); @@ -748,7 +748,7 @@ TEST_F(ConnectionHandlerTest, RebalanceWithMultiAddressListener) { listener_callbacks1->onAccept(std::make_unique>()); // Send connection to the second listener, expect mock_connection_balancer2 will be called. - // then mock_connection_balancer2 will rebalance the connection to the same listener. + // then mock_connection_balancer2 will balance the connection to the same listener. EXPECT_CALL(*mock_connection_balancer2, pickTargetHandler(_)) .WillOnce(ReturnRef(*current_handler2)); EXPECT_CALL(*access_log_, log(_, _, _, _)); From eace31aa7d93dc1f7faf0d296cc18a43578427c0 Mon Sep 17 00:00:00 2001 From: He Jie Xu Date: Wed, 22 Jun 2022 09:59:57 +0000 Subject: [PATCH 13/16] correct the header Signed-off-by: He Jie Xu --- envoy/network/listener.h | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/envoy/network/listener.h b/envoy/network/listener.h index e90086e61237e..f25707db4b842 100644 --- a/envoy/network/listener.h +++ b/envoy/network/listener.h @@ -12,6 +12,7 @@ #include "envoy/config/listener/v3/udp_listener_config.pb.h" #include "envoy/config/typed_metadata.h" #include "envoy/init/manager.h" +#include "envoy/network/address.h" #include "envoy/network/connection.h" #include "envoy/network/connection_balancer.h" #include "envoy/network/listen_socket.h" @@ -20,8 +21,6 @@ #include "source/common/common/interval_value.h" -#include "address.h" - namespace Envoy { namespace Network { From 95163a43983c68aec3988b8d2e7498340976ee27 Mon Sep 17 00:00:00 2001 From: He Jie Xu Date: Wed, 22 Jun 2022 23:15:35 +0000 Subject: [PATCH 14/16] address comments Signed-off-by: He Jie Xu --- source/server/active_tcp_listener.cc | 12 +++++++----- source/server/active_tcp_listener.h | 8 ++++---- 2 files changed, 11 insertions(+), 9 deletions(-) diff --git a/source/server/active_tcp_listener.cc b/source/server/active_tcp_listener.cc index a169a7a33f0ea..66a5f604bd106 100644 --- a/source/server/active_tcp_listener.cc +++ b/source/server/active_tcp_listener.cc @@ -15,25 +15,27 @@ namespace Server { ActiveTcpListener::ActiveTcpListener(Network::TcpConnectionHandler& parent, Network::ListenerConfig& config, Runtime::Loader& runtime, Network::SocketSharedPtr&& socket, - Network::Address::InstanceConstSharedPtr& address, + Network::Address::InstanceConstSharedPtr& listen_address, Network::ConnectionBalancer& connection_balancer) : OwnedActiveStreamListenerBase( parent, parent.dispatcher(), parent.dispatcher().createListener(std::move(socket), *this, runtime, config.bindToPort(), config.ignoreGlobalConnLimit()), config), - tcp_conn_handler_(parent), connection_balancer_(connection_balancer), address_(address) { + tcp_conn_handler_(parent), connection_balancer_(connection_balancer), + listen_address_(listen_address) { connection_balancer_.registerHandler(*this); } ActiveTcpListener::ActiveTcpListener(Network::TcpConnectionHandler& parent, Network::ListenerPtr&& listener, - Network::Address::InstanceConstSharedPtr& address, + Network::Address::InstanceConstSharedPtr& listen_address, Network::ListenerConfig& config, Network::ConnectionBalancer& connection_balancer, Runtime::Loader&) : OwnedActiveStreamListenerBase(parent, parent.dispatcher(), std::move(listener), config), - tcp_conn_handler_(parent), connection_balancer_(connection_balancer), address_(address) { + tcp_conn_handler_(parent), connection_balancer_(connection_balancer), + listen_address_(listen_address) { connection_balancer_.registerHandler(*this); } @@ -156,7 +158,7 @@ void ActiveTcpListener::post(Network::ConnectionSocketPtr&& socket) { RebalancedSocketSharedPtr socket_to_rebalance = std::make_shared(); socket_to_rebalance->socket = std::move(socket); - dispatcher().post([socket_to_rebalance, address = address_, tag = config_->listenerTag(), + dispatcher().post([socket_to_rebalance, address = listen_address_, tag = config_->listenerTag(), &tcp_conn_handler = tcp_conn_handler_, handoff = config_->handOffRestoredDestinationConnections()]() { auto balanced_handler = tcp_conn_handler.getBalancedHandlerByTag(tag, *address); diff --git a/source/server/active_tcp_listener.h b/source/server/active_tcp_listener.h index 12661452fbf6e..23dfe3a3228fc 100644 --- a/source/server/active_tcp_listener.h +++ b/source/server/active_tcp_listener.h @@ -28,10 +28,10 @@ class ActiveTcpListener final : public Network::TcpListenerCallbacks, public: ActiveTcpListener(Network::TcpConnectionHandler& parent, Network::ListenerConfig& config, Runtime::Loader& runtime, Network::SocketSharedPtr&& socket, - Network::Address::InstanceConstSharedPtr& address, + Network::Address::InstanceConstSharedPtr& listen_address, Network::ConnectionBalancer& connection_balancer); ActiveTcpListener(Network::TcpConnectionHandler& parent, Network::ListenerPtr&& listener, - Network::Address::InstanceConstSharedPtr& address, + Network::Address::InstanceConstSharedPtr& listen_address, Network::ListenerConfig& config, Network::ConnectionBalancer& connection_balancer, Runtime::Loader& runtime); ~ActiveTcpListener() override; @@ -87,10 +87,10 @@ class ActiveTcpListener final : public Network::TcpListenerCallbacks, std::atomic num_listener_connections_{}; Network::ConnectionBalancer& connection_balancer_; - // This is the address of this listener is listening on. And used for get the correct listener + // This is the address this listener is listening on. It's used to get the correct listener // when rebalancing. The accepted socket can't be used to get the listening address, since // the accepted socket's remote address can be another address than the listening address. - Network::Address::InstanceConstSharedPtr address_; + Network::Address::InstanceConstSharedPtr listen_address_; }; using ActiveTcpListenerOptRef = absl::optional>; From 44b5a027001417f8314bf59a8d500bef051f8a99 Mon Sep 17 00:00:00 2001 From: He Jie Xu Date: Thu, 23 Jun 2022 02:03:36 +0000 Subject: [PATCH 15/16] fix format Signed-off-by: He Jie Xu --- source/server/listener_impl.cc | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/source/server/listener_impl.cc b/source/server/listener_impl.cc index d543d6b24c116..96f7ed905a261 100644 --- a/source/server/listener_impl.cc +++ b/source/server/listener_impl.cc @@ -694,7 +694,7 @@ void ListenerImpl::buildConnectionBalancer(const Network::Address::Instance& add switch (config_.connection_balance_config().balance_type_case()) { case envoy::config::listener::v3::Listener_ConnectionBalanceConfig::kExactBalance: { connection_balancers_.emplace(address.asString(), - std::make_shared()); + std::make_shared()); break; } case envoy::config::listener::v3::Listener_ConnectionBalanceConfig::kExtendBalance: { @@ -707,8 +707,10 @@ void ListenerImpl::buildConnectionBalancer(const Network::Address::Instance& add throw EnvoyException(fmt::format("Didn't find a registered implementation for type: '{}'", connection_balance_library_type)); } - connection_balancers_.emplace(address.asString(), factory->createConnectionBalancerFromProto( - config_.connection_balance_config().extend_balance(), *listener_factory_context_)); + connection_balancers_.emplace( + address.asString(), + factory->createConnectionBalancerFromProto( + config_.connection_balance_config().extend_balance(), *listener_factory_context_)); break; } case envoy::config::listener::v3::Listener_ConnectionBalanceConfig::BALANCE_TYPE_NOT_SET: { From a59c318292afb3116e90e69b467397174bf53b6b Mon Sep 17 00:00:00 2001 From: He Jie Xu Date: Thu, 23 Jun 2022 08:23:44 +0000 Subject: [PATCH 16/16] fix test Signed-off-by: He Jie Xu --- test/server/listener_manager_impl_test.cc | 20 +++++++++++++++----- 1 file changed, 15 insertions(+), 5 deletions(-) diff --git a/test/server/listener_manager_impl_test.cc b/test/server/listener_manager_impl_test.cc index 9c0b9f3a377ea..3ec4508242e9d 100644 --- a/test/server/listener_manager_impl_test.cc +++ b/test/server/listener_manager_impl_test.cc @@ -6710,10 +6710,15 @@ TEST_P(ListenerManagerImplWithRealFiltersTest, InvalidExtendConnectionBalanceCon extend_balance_config->mutable_typed_config()->set_type_url( "type.googleapis.com/google.protobuf.test"); + auto listener_impl = ListenerImpl(listener, "version", *manager_, "foo", true, false, + /*hash=*/static_cast(0), 1); + auto socket_factory = std::make_unique(); + Network::Address::InstanceConstSharedPtr address( + new Network::Address::Ipv4Instance("192.168.0.1", 80, nullptr)); + EXPECT_CALL(*socket_factory, localAddress()).WillOnce(ReturnRef(address)); EXPECT_THROW_WITH_MESSAGE( - new ListenerImpl(listener, "version", *manager_, "foo", true, false, - /*hash=*/static_cast(0), 1), - EnvoyException, "Didn't find a registered implementation for type: 'google.protobuf.test'"); + listener_impl.addSocketFactory(std::move(socket_factory)), EnvoyException, + "Didn't find a registered implementation for type: 'google.protobuf.test'"); #endif } @@ -6723,8 +6728,13 @@ TEST_P(ListenerManagerImplWithRealFiltersTest, EmptyConnectionBalanceConfig) { auto listener = createIPv4Listener("TCPListener"); listener.mutable_connection_balance_config(); - EXPECT_THROW_WITH_MESSAGE(new ListenerImpl(listener, "version", *manager_, "foo", true, false, - /*hash=*/static_cast(0), 1), + auto listener_impl = ListenerImpl(listener, "version", *manager_, "foo", true, false, + /*hash=*/static_cast(0), 1); + auto socket_factory = std::make_unique(); + Network::Address::InstanceConstSharedPtr address( + new Network::Address::Ipv4Instance("192.168.0.1", 80, nullptr)); + EXPECT_CALL(*socket_factory, localAddress()).WillOnce(ReturnRef(address)); + EXPECT_THROW_WITH_MESSAGE(listener_impl.addSocketFactory(std::move(socket_factory)), EnvoyException, "No valid balance type for connection balance"); #endif }