diff --git a/envoy/network/listener.h b/envoy/network/listener.h index 2e4598de04516..9969c4198fa93 100644 --- a/envoy/network/listener.h +++ b/envoy/network/listener.h @@ -107,6 +107,16 @@ class UdpListenerConfig { using UdpListenerConfigOptRef = OptRef; +/** + * Configuration for an internal listener. + */ +class InternalListenerConfig { +public: + virtual ~InternalListenerConfig() = default; +}; + +using InternalListenerConfigOptRef = OptRef; + /** * A configuration for an individual listener. */ @@ -185,6 +195,11 @@ class ListenerConfig { */ virtual UdpListenerConfigOptRef udpListenerConfig() PURE; + /** + * @return the internal configuration for the listener IFF it is an internal listener. + */ + virtual InternalListenerConfigOptRef internalListenerConfig() PURE; + /** * @return traffic direction of the listener. */ @@ -428,6 +443,40 @@ class UdpListener : public virtual Listener { using UdpListenerPtr = std::unique_ptr; +/** + * Internal listener callbacks. + */ +class InternalListener { +public: + virtual ~InternalListener() = default; + + /** + * Called when a new connection is accepted. + * @param socket supplies the socket that is moved into the callee. + */ + virtual void onAccept(ConnectionSocketPtr&& socket) PURE; +}; +using InternalListenerOptRef = OptRef; + +/** + * The query interface of the registered internal listener callbacks. + */ +class InternalListenerManager { +public: + virtual ~InternalListenerManager() = default; + + /** + * Return the internal listener callbacks binding the listener address. + * + * @param listen_address the internal address of the expected internal listener. + */ + virtual InternalListenerOptRef + findByAddress(const Address::InstanceConstSharedPtr& listen_address) PURE; +}; + +using InternalListenerManagerOptRef = + absl::optional>; + /** * Handles delivering datagrams to the correct worker. */ diff --git a/source/common/network/BUILD b/source/common/network/BUILD index b75b299dc7c8f..226da4bcc3128 100644 --- a/source/common/network/BUILD +++ b/source/common/network/BUILD @@ -64,6 +64,7 @@ envoy_cc_library( hdrs = ["connection_impl_base.h"], deps = [ ":filter_manager_lib", + ":listen_socket_lib", "//envoy/common:scope_tracker_interface", "//envoy/event:dispatcher_interface", "//source/common/common:assert_lib", diff --git a/source/common/network/listen_socket_impl.h b/source/common/network/listen_socket_impl.h index b555224ac489e..f48c04959b246 100644 --- a/source/common/network/listen_socket_impl.h +++ b/source/common/network/listen_socket_impl.h @@ -153,6 +153,30 @@ class UdsListenSocket : public ListenSocketImpl { Socket::Type socketType() const override { return Socket::Type::Stream; } }; +// This socket type adapts the ListenerComponentFactory. +class InternalListenSocket : public ListenSocketImpl { +public: + InternalListenSocket(const Address::InstanceConstSharedPtr& address) + : ListenSocketImpl(/* io_handle= */ nullptr, address) {} + Socket::Type socketType() const override { return Socket::Type::Stream; } + + // InternalListenSocket cannot be duplicated. + SocketPtr duplicate() override { + return std::make_unique(connectionInfoProvider().localAddress()); + } + + Api::SysCallIntResult bind(Network::Address::InstanceConstSharedPtr) override { + // internal listener socket does not support bind semantic. + NOT_IMPLEMENTED_GCOVR_EXCL_LINE; + } + + void close() override { ASSERT(io_handle_ == nullptr); } + bool isOpen() const override { + ASSERT(io_handle_ == nullptr); + return false; + } +}; + class ConnectionSocketImpl : public SocketImpl, public ConnectionSocket { public: ConnectionSocketImpl(IoHandlePtr&& io_handle, diff --git a/source/common/network/socket_impl.cc b/source/common/network/socket_impl.cc index 2de31a2fe7e7e..3e16eeddfd3c0 100644 --- a/source/common/network/socket_impl.cc +++ b/source/common/network/socket_impl.cc @@ -30,6 +30,11 @@ SocketImpl::SocketImpl(IoHandlePtr&& io_handle, return; } + if (connection_info_provider_->remoteAddress() != nullptr) { + addr_type_ = connection_info_provider_->remoteAddress()->type(); + return; + } + // Should not happen but some tests inject -1 fds if (!io_handle_->isOpen()) { return; diff --git a/source/common/network/utility.cc b/source/common/network/utility.cc index 7cd7b19c00be4..8138b1bcf41a8 100644 --- a/source/common/network/utility.cc +++ b/source/common/network/utility.cc @@ -534,6 +534,9 @@ Utility::protobufAddressSocketType(const envoy::config::core::v3::Address& proto } case envoy::config::core::v3::Address::AddressCase::kPipe: return Socket::Type::Stream; + case envoy::config::core::v3::Address::AddressCase::kEnvoyInternalAddress: + // Currently internal address supports stream operation only. + return Socket::Type::Stream; default: NOT_REACHED_GCOVR_EXCL_LINE; } diff --git a/source/common/runtime/runtime_features.cc b/source/common/runtime/runtime_features.cc index 4b060373c4e5b..1110422b548e8 100644 --- a/source/common/runtime/runtime_features.cc +++ b/source/common/runtime/runtime_features.cc @@ -69,6 +69,7 @@ constexpr const char* runtime_features[] = { "envoy.reloadable_features.http_reject_path_with_fragment", "envoy.reloadable_features.http_strip_fragment_from_path_unsafe_if_disabled", "envoy.reloadable_features.http_transport_failure_reason_in_body", + "envoy.reloadable_features.internal_address", "envoy.reloadable_features.internal_redirects_with_body", "envoy.reloadable_features.listener_reuse_port_default_enabled", "envoy.reloadable_features.listener_wildcard_match_ip_family", diff --git a/source/common/tcp_proxy/tcp_proxy.cc b/source/common/tcp_proxy/tcp_proxy.cc index b22fa5e956e5f..4dd925c34cdc0 100644 --- a/source/common/tcp_proxy/tcp_proxy.cc +++ b/source/common/tcp_proxy/tcp_proxy.cc @@ -531,6 +531,8 @@ Network::FilterStatus Filter::onNewConnection() { } void Filter::onDownstreamEvent(Network::ConnectionEvent event) { + ENVOY_CONN_LOG(trace, "on downstream event {}, has upstream = {}", read_callbacks_->connection(), + static_cast(event), upstream_ == nullptr); if (upstream_) { Tcp::ConnectionPool::ConnectionDataPtr conn_data(upstream_->onDownstreamEvent(event)); if (conn_data != nullptr && @@ -699,6 +701,7 @@ Drainer::Drainer(UpstreamDrainManager& parent, const Config::SharedConfigSharedP const Upstream::HostDescriptionConstSharedPtr& upstream_host) : parent_(parent), callbacks_(callbacks), upstream_conn_data_(std::move(conn_data)), timer_(std::move(idle_timer)), upstream_host_(upstream_host), config_(config) { + ENVOY_CONN_LOG(trace, "draining the upstream connection", upstream_conn_data_->connection()); config_->stats().upstream_flush_total_.inc(); config_->stats().upstream_flush_active_.inc(); } diff --git a/source/common/tcp_proxy/tcp_proxy.h b/source/common/tcp_proxy/tcp_proxy.h index ccd01ac8c098e..22a49739c4ebe 100644 --- a/source/common/tcp_proxy/tcp_proxy.h +++ b/source/common/tcp_proxy/tcp_proxy.h @@ -380,7 +380,7 @@ class Filter : public Network::ReadFilter, // This class deals with an upstream connection that needs to finish flushing, when the downstream // connection has been closed. The TcpProxy is destroyed when the downstream connection is closed, // so handling the upstream connection here allows it to finish draining or timeout. -class Drainer : public Event::DeferredDeletable { +class Drainer : public Event::DeferredDeletable, protected Logger::Loggable { public: Drainer(UpstreamDrainManager& parent, const Config::SharedConfigSharedPtr& config, const std::shared_ptr& callbacks, diff --git a/source/common/upstream/upstream_impl.cc b/source/common/upstream/upstream_impl.cc index 81e8654110d57..4a57f8d18b203 100644 --- a/source/common/upstream/upstream_impl.cc +++ b/source/common/upstream/upstream_impl.cc @@ -317,7 +317,10 @@ Network::ClientConnectionPtr HostImpl::createConnection( } else { connection_options = options; } - ASSERT(!address->envoyInternalAddress()); + + ASSERT(!address->envoyInternalAddress() || + Runtime::runtimeFeatureEnabled("envoy.reloadable_features.internal_address")); + Network::ClientConnectionPtr connection = address_list.size() > 1 ? std::make_unique( diff --git a/source/server/BUILD b/source/server/BUILD index fe5c9056f6558..03d7aee0d8354 100644 --- a/source/server/BUILD +++ b/source/server/BUILD @@ -64,6 +64,7 @@ envoy_cc_library( envoy_cc_library( name = "connection_handler_lib", deps = [ + ":active_internal_listener", ":active_tcp_listener", ":active_udp_listener", ":connection_handler_impl", @@ -77,6 +78,7 @@ envoy_cc_library( "connection_handler_impl.h", ], deps = [ + ":active_internal_listener", ":active_tcp_listener", "//envoy/common:time_interface", "//envoy/event:deferred_deletable", @@ -179,6 +181,36 @@ envoy_cc_library( ], ) +envoy_cc_library( + name = "active_internal_listener", + srcs = ["active_internal_listener.cc"], + hdrs = [ + "active_internal_listener.h", + ], + deps = [ + ":active_listener_base", + ":active_tcp_listener_headers", + "//envoy/common:time_interface", + "//envoy/event:deferred_deletable", + "//envoy/event:dispatcher_interface", + "//envoy/event:timer_interface", + "//envoy/network:connection_handler_interface", + "//envoy/network:connection_interface", + "//envoy/network:exception_interface", + "//envoy/network:filter_interface", + "//envoy/network:listen_socket_interface", + "//envoy/network:listener_interface", + "//envoy/server:listener_manager_interface", + "//envoy/stats:timespan_interface", + "//source/common/common:linked_object", + "//source/common/common:non_copyable", + "//source/common/event:deferred_task", + "//source/common/network:connection_lib", + "//source/common/stats:timespan_lib", + "//source/common/stream_info:stream_info_lib", + ], +) + envoy_cc_library( name = "active_tcp_socket", srcs = ["active_tcp_socket.cc"], diff --git a/source/server/active_internal_listener.cc b/source/server/active_internal_listener.cc new file mode 100644 index 0000000000000..701d9376175e7 --- /dev/null +++ b/source/server/active_internal_listener.cc @@ -0,0 +1,79 @@ +#include "source/server/active_internal_listener.h" + +#include "envoy/network/filter.h" +#include "envoy/stats/scope.h" + +#include "source/common/network/address_impl.h" +#include "source/common/stats/timespan_impl.h" + +#include "active_stream_listener_base.h" + +namespace Envoy { +namespace Server { + +ActiveInternalListener::ActiveInternalListener(Network::ConnectionHandler& conn_handler, + Event::Dispatcher& dispatcher, + Network::ListenerConfig& config) + : OwnedActiveStreamListenerBase( + conn_handler, dispatcher, + std::make_unique(), config) {} + +ActiveInternalListener::ActiveInternalListener(Network::ConnectionHandler& conn_handler, + Event::Dispatcher& dispatcher, + Network::ListenerPtr listener, + Network::ListenerConfig& config) + : OwnedActiveStreamListenerBase(conn_handler, dispatcher, std::move(listener), config) {} + +ActiveInternalListener::~ActiveInternalListener() { + is_deleting_ = true; + // Purge sockets that have not progressed to connections. This should only happen when + // a listener filter stops iteration and never resumes. + while (!sockets_.empty()) { + auto removed = sockets_.front()->removeFromList(sockets_); + dispatcher().deferredDelete(std::move(removed)); + } + + for (auto& [chain, active_connections] : connections_by_context_) { + ASSERT(active_connections != nullptr); + auto& connections = active_connections->connections_; + while (!connections.empty()) { + connections.front()->connection_->close(Network::ConnectionCloseType::NoFlush); + } + } + dispatcher().clearDeferredDeleteList(); +} + +void ActiveInternalListener::updateListenerConfig(Network::ListenerConfig& config) { + ENVOY_LOG(trace, "replacing listener ", config_->listenerTag(), " by ", config.listenerTag()); + config_ = &config; +} + +void ActiveInternalListener::onAccept(Network::ConnectionSocketPtr&& socket) { + // Unlike tcp listener, no rebalancer is applied and won't call pickTargetHandler to account + // connections. + incNumConnections(); + + auto active_socket = std::make_unique( + *this, std::move(socket), false /* do not hand off at internal listener */); + + onSocketAccepted(std::move(active_socket)); +} + +void ActiveInternalListener::newActiveConnection( + const Network::FilterChain& filter_chain, Network::ServerConnectionPtr server_conn_ptr, + std::unique_ptr stream_info) { + auto& active_connections = getOrCreateActiveConnections(filter_chain); + auto active_connection = + std::make_unique(active_connections, std::move(server_conn_ptr), + dispatcher().timeSource(), std::move(stream_info)); + // If the connection is already closed, we can just let this connection immediately die. + if (active_connection->connection_->state() != Network::Connection::State::Closed) { + ENVOY_CONN_LOG( + debug, "new connection from {}", *active_connection->connection_, + active_connection->connection_->connectionInfoProvider().remoteAddress()->asString()); + active_connection->connection_->addConnectionCallbacks(*active_connection); + LinkedList::moveIntoList(std::move(active_connection), active_connections.connections_); + } +} +} // namespace Server +} // namespace Envoy diff --git a/source/server/active_internal_listener.h b/source/server/active_internal_listener.h new file mode 100644 index 0000000000000..f455162a6c9c3 --- /dev/null +++ b/source/server/active_internal_listener.h @@ -0,0 +1,96 @@ +#pragma once +#include +#include +#include +#include + +#include "envoy/common/time.h" +#include "envoy/event/deferred_deletable.h" +#include "envoy/event/dispatcher.h" +#include "envoy/network/connection.h" +#include "envoy/network/connection_handler.h" +#include "envoy/network/filter.h" +#include "envoy/network/listen_socket.h" +#include "envoy/network/listener.h" +#include "envoy/server/listener_manager.h" +#include "envoy/stats/scope.h" +#include "envoy/stats/timespan.h" + +#include "source/common/common/linked_object.h" +#include "source/common/common/non_copyable.h" +#include "source/common/stream_info/stream_info_impl.h" +#include "source/server/active_stream_listener_base.h" + +#include "spdlog/spdlog.h" + +namespace Envoy { +namespace Server { + +class ActiveInternalListener : public OwnedActiveStreamListenerBase, + public Network::InternalListener { +public: + ActiveInternalListener(Network::ConnectionHandler& conn_handler, Event::Dispatcher& dispatcher, + Network::ListenerConfig& config); + ActiveInternalListener(Network::ConnectionHandler& conn_handler, Event::Dispatcher& dispatcher, + Network::ListenerPtr listener, Network::ListenerConfig& config); + ~ActiveInternalListener() override; + + class NetworkInternalListener : public Network::Listener { + + void disable() override { + // Similar to the listeners that does not bind to port. Accept is not driven by OS io event so + // the disable is not working. + // TODO(lambdai): Explore the approach to elegantly disable internal listener. Maybe an user + // space accept queue should be put here. + ENVOY_LOG(debug, "Warning: the internal listener cannot be disabled."); + } + + void enable() override { + ENVOY_LOG(debug, "Warning: the internal listener is always enabled."); + } + + void setRejectFraction(UnitFloat) override {} + }; + + // ActiveListenerImplBase + Network::Listener* listener() override { return listener_.get(); } + + // Network::TcpConnectionHandler + Network::BalancedConnectionHandlerOptRef + getBalancedHandlerByAddress(const Network::Address::Instance&) override { + // Internal listener doesn't support migrate connection to another worker. + // TODO(lambdai): implement the function of handling off to another listener of the same worker. + NOT_IMPLEMENTED_GCOVR_EXCL_LINE; + } + + void pauseListening() override { + if (listener_ != nullptr) { + listener_->disable(); + } + } + void resumeListening() override { + if (listener_ != nullptr) { + listener_->enable(); + } + } + void shutdownListener() override { listener_.reset(); } + + // Network::InternalListener + void onAccept(Network::ConnectionSocketPtr&& socket) override; + + // Network::BalancedConnectionHandler + void incNumConnections() override { config_->openConnections().inc(); } + void decNumConnections() override { config_->openConnections().dec(); } + + void newActiveConnection(const Network::FilterChain& filter_chain, + Network::ServerConnectionPtr server_conn_ptr, + std::unique_ptr stream_info) override; + /** + * Update the listener config. The follow up connections will see the new config. The existing + * connections are not impacted. + */ + void updateListenerConfig(Network::ListenerConfig& config) override; +}; + +} // namespace Server +} // namespace Envoy diff --git a/source/server/admin/admin.h b/source/server/admin/admin.h index e99330c2c8fc7..4219418426c38 100644 --- a/source/server/admin/admin.h +++ b/source/server/admin/admin.h @@ -360,6 +360,9 @@ class AdminImpl : public Admin, Network::UdpListenerConfigOptRef udpListenerConfig() override { return Network::UdpListenerConfigOptRef(); } + Network::InternalListenerConfigOptRef internalListenerConfig() override { + return Network::InternalListenerConfigOptRef(); + } envoy::config::core::v3::TrafficDirection direction() const override { return envoy::config::core::v3::UNSPECIFIED; } diff --git a/source/server/connection_handler_impl.cc b/source/server/connection_handler_impl.cc index 4bcdcd9cc16aa..24789e5feb5aa 100644 --- a/source/server/connection_handler_impl.cc +++ b/source/server/connection_handler_impl.cc @@ -5,9 +5,11 @@ #include "envoy/event/dispatcher.h" #include "envoy/network/filter.h" +#include "source/common/common/logger.h" #include "source/common/event/deferred_task.h" #include "source/common/network/utility.h" #include "source/common/runtime/runtime_features.h" +#include "source/server/active_internal_listener.h" #include "source/server/active_tcp_listener.h" namespace Envoy { @@ -38,7 +40,20 @@ void ConnectionHandlerImpl::addListener(absl::optional overridden_list } ActiveListenerDetails details; - if (config.listenSocketFactory().socketType() == Network::Socket::Type::Stream) { + if (config.internalListenerConfig().has_value()) { + if (overridden_listener.has_value()) { + for (auto& listener : listeners_) { + if (listener.second.listener_->listenerTag() == overridden_listener) { + listener.second.internalListener()->get().updateListenerConfig(config); + return; + } + } + NOT_REACHED_GCOVR_EXCL_LINE; + } + auto internal_listener = std::make_unique(*this, dispatcher(), config); + details.typed_listener_ = *internal_listener; + details.listener_ = std::move(internal_listener); + } else if (config.listenSocketFactory().socketType() == Network::Socket::Type::Stream) { if (!support_udp_in_place_filter_chain_update && overridden_listener.has_value()) { for (auto& listener : listeners_) { if (listener.second.listener_->listenerTag() == overridden_listener) { @@ -144,6 +159,25 @@ void ConnectionHandlerImpl::setListenerRejectFraction(UnitFloat reject_fraction) } } +Network::InternalListenerOptRef +ConnectionHandlerImpl::findByAddress(const Network::Address::InstanceConstSharedPtr& address) { + ASSERT(address->type() == Network::Address::Type::EnvoyInternal); + auto listener_it = + std::find_if(listeners_.begin(), listeners_.end(), + [&address](std::pair& p) { + return p.second.internalListener().has_value() && + p.second.listener_->listener() != nullptr && + p.first->type() == Network::Address::Type::EnvoyInternal && + *(p.first) == *address; + }); + + if (listener_it != listeners_.end()) { + return Network::InternalListenerOptRef(listener_it->second.internalListener().value().get()); + } + return OptRef(); +} + ConnectionHandlerImpl::ActiveTcpListenerOptRef ConnectionHandlerImpl::ActiveListenerDetails::tcpListener() { auto* val = absl::get_if>(&typed_listener_); @@ -156,6 +190,12 @@ ConnectionHandlerImpl::ActiveListenerDetails::udpListener() { return (val != nullptr) ? absl::make_optional(*val) : absl::nullopt; } +ConnectionHandlerImpl::ActiveInternalListenerOptRef +ConnectionHandlerImpl::ActiveListenerDetails::internalListener() { + auto* val = absl::get_if>(&typed_listener_); + return (val != nullptr) ? absl::make_optional(*val) : absl::nullopt; +} + ConnectionHandlerImpl::ActiveListenerDetailsOptRef ConnectionHandlerImpl::findActiveListenerByTag(uint64_t listener_tag) { // TODO(mattklein123): We should probably use a hash table here to lookup the tag diff --git a/source/server/connection_handler_impl.h b/source/server/connection_handler_impl.h index f10424b7c1a2d..23a39659c55d1 100644 --- a/source/server/connection_handler_impl.h +++ b/source/server/connection_handler_impl.h @@ -21,6 +21,7 @@ namespace Server { class ActiveUdpListenerBase; class ActiveTcpListener; +class ActiveInternalListener; /** * Server side connection handler. This is used both by workers as well as the @@ -28,12 +29,15 @@ class ActiveTcpListener; */ class ConnectionHandlerImpl : public Network::TcpConnectionHandler, public Network::UdpConnectionHandler, + public Network::InternalListenerManager, NonCopyable, Logger::Loggable { public: using UdpListenerCallbacksOptRef = absl::optional>; using ActiveTcpListenerOptRef = absl::optional>; + using ActiveInternalListenerOptRef = + absl::optional>; ConnectionHandlerImpl(Event::Dispatcher& dispatcher, absl::optional worker_index); @@ -63,18 +67,24 @@ class ConnectionHandlerImpl : public Network::TcpConnectionHandler, // Network::UdpConnectionHandler Network::UdpListenerCallbacksOptRef getUdpListenerCallbacks(uint64_t listener_tag) override; + // Network::InternalListenerManager + Network::InternalListenerOptRef + findByAddress(const Network::Address::InstanceConstSharedPtr& listen_address) override; + private: struct ActiveListenerDetails { // Strong pointer to the listener, whether TCP, UDP, QUIC, etc. Network::ConnectionHandler::ActiveListenerPtr listener_; absl::variant, - std::reference_wrapper> + std::reference_wrapper, + std::reference_wrapper> typed_listener_; // Helpers for accessing the data in the variant for cleaner code. ActiveTcpListenerOptRef tcpListener(); UdpListenerCallbacksOptRef udpListener(); + ActiveInternalListenerOptRef internalListener(); }; using ActiveListenerDetailsOptRef = absl::optional>; ActiveListenerDetailsOptRef findActiveListenerByTag(uint64_t listener_tag); diff --git a/source/server/listener_impl.cc b/source/server/listener_impl.cc index 4be05bd5329e8..1ed954ba5a9c3 100644 --- a/source/server/listener_impl.cc +++ b/source/server/listener_impl.cc @@ -83,12 +83,16 @@ ListenSocketFactoryImpl::ListenSocketFactoryImpl( ASSERT(bind_type_ == ListenerComponentFactory::BindType::ReusePort); } } else { - ASSERT(local_address_->type() == Network::Address::Type::Pipe); - // Listeners with Unix domain socket always use shared socket. - // TODO(mattklein123): This should be blocked at the config parsing layer instead of getting - // here and disabling reuse_port. - if (bind_type_ == ListenerComponentFactory::BindType::ReusePort) { - bind_type_ = ListenerComponentFactory::BindType::NoReusePort; + if (local_address_->type() == Network::Address::Type::Pipe) { + // Listeners with Unix domain socket always use shared socket. + // TODO(mattklein123): This should be blocked at the config parsing layer instead of getting + // here and disabling reuse_port. + if (bind_type_ == ListenerComponentFactory::BindType::ReusePort) { + bind_type_ = ListenerComponentFactory::BindType::NoReusePort; + } + } else { + ASSERT(local_address_->type() == Network::Address::Type::EnvoyInternal); + bind_type_ = ListenerComponentFactory::BindType::NoBind; } } @@ -366,6 +370,7 @@ ListenerImpl::ListenerImpl(const envoy::config::listener::v3::Listener& config, buildSocketOptions(); buildOriginalDstListenerFilter(); buildProxyProtocolListenerFilter(); + buildInternalListener(); } if (!workers_started_) { // Initialize dynamic_init_manager_ from Server's init manager if it's not initialized. @@ -423,7 +428,7 @@ ListenerImpl::ListenerImpl(ListenerImpl& origin, createListenerFilterFactories(socket_type); validateFilterChains(socket_type); buildFilterChains(); - + buildInternalListener(); if (socket_type == Network::Socket::Type::Stream) { // Apply the options below only for TCP. buildSocketOptions(); @@ -459,6 +464,38 @@ void ListenerImpl::buildAccessLog() { } } +void ListenerImpl::buildInternalListener() { + if (config_.address().has_envoy_internal_address()) { + internal_listener_config_ = std::make_unique(); + if (config_.has_api_listener()) { + throw EnvoyException( + fmt::format("error adding listener '{}': internal address cannot be used in api listener", + address_->asString())); + } + if ((config_.has_connection_balance_config() && + config_.connection_balance_config().has_exact_balance()) || + config_.enable_mptcp() || + config_.has_enable_reuse_port() // internal listener doesn't use physical l4 port. + || (config_.has_freebind() && config_.freebind().value()) || + config_.has_tcp_backlog_size() || config_.has_tcp_fast_open_queue_length() || + (config_.has_transparent() && config_.transparent().value())) { + throw EnvoyException( + fmt::format("error adding listener '{}': has unsupported tcp listener feature", + address_->asString())); + } + if (!config_.socket_options().empty()) { + throw EnvoyException(fmt::format("error adding listener '{}': does not support socket option", + address_->asString())); + } + } else { + if (config_.has_internal_listener()) { + throw EnvoyException(fmt::format("error adding listener '{}': address is not an internal " + "address but an internal listener config is provided", + address_->asString())); + } + } +} + void ListenerImpl::buildUdpListenerFactory(Network::Socket::Type socket_type, uint32_t concurrency) { if (socket_type != Network::Socket::Type::Datagram) { diff --git a/source/server/listener_impl.h b/source/server/listener_impl.h index a6b8166d61a91..14153bfaa7d6c 100644 --- a/source/server/listener_impl.h +++ b/source/server/listener_impl.h @@ -326,6 +326,10 @@ class ListenerImpl final : public Network::ListenerConfig, return udp_listener_config_ != nullptr ? *udp_listener_config_ : Network::UdpListenerConfigOptRef(); } + Network::InternalListenerConfigOptRef internalListenerConfig() override { + return internal_listener_config_ != nullptr ? *internal_listener_config_ + : Network::InternalListenerConfigOptRef(); + } Network::ConnectionBalancer& connectionBalancer() override { return *connection_balancer_; } ResourceLimit& openConnections() override { return *open_connections_; } const std::vector& accessLogs() const override { @@ -372,6 +376,13 @@ class ListenerImpl final : public Network::ListenerConfig, Network::UdpListenerWorkerRouterPtr listener_worker_router_; }; + struct InternalListenerConfigImpl : public Network::InternalListenerConfig { + InternalListenerConfigImpl( + const envoy::config::listener::v3::Listener_InternalListenerConfig config) + : config_(config) {} + const envoy::config::listener::v3::Listener_InternalListenerConfig config_; + }; + /** * Create a new listener from an existing listener and the new config message if the in place * filter chain update is decided. Should be called only by newListenerWithFilterChain(). @@ -381,6 +392,7 @@ class ListenerImpl final : public Network::ListenerConfig, const std::string& name, bool added_via_api, bool workers_started, uint64_t hash); // Helpers for constructor. void buildAccessLog(); + void buildInternalListener(); void validateConfig(Network::Socket::Type socket_type); void buildUdpListenerFactory(Network::Socket::Type socket_type, uint32_t concurrency); void buildListenSocketOptions(Network::Socket::Type socket_type); @@ -428,6 +440,7 @@ class ListenerImpl final : public Network::ListenerConfig, const std::chrono::milliseconds listener_filters_timeout_; const bool continue_on_listener_filters_timeout_; std::shared_ptr udp_listener_config_; + std::unique_ptr internal_listener_config_; Network::ConnectionBalancerSharedPtr connection_balancer_; std::shared_ptr listener_factory_context_; FilterChainManagerImpl filter_chain_manager_; diff --git a/source/server/listener_manager_impl.cc b/source/server/listener_manager_impl.cc index 2fbc9ab7172cf..545645f01cc29 100644 --- a/source/server/listener_manager_impl.cc +++ b/source/server/listener_manager_impl.cc @@ -170,8 +170,6 @@ Network::SocketSharedPtr ProdListenerComponentFactory::createListenSocket( Network::Address::InstanceConstSharedPtr address, Network::Socket::Type socket_type, const Network::Socket::OptionsSharedPtr& options, BindType bind_type, const Network::SocketCreationOptions& creation_options, uint32_t worker_index) { - ASSERT(address->type() == Network::Address::Type::Ip || - address->type() == Network::Address::Type::Pipe); ASSERT(socket_type == Network::Socket::Type::Stream || socket_type == Network::Socket::Type::Datagram); @@ -192,6 +190,11 @@ Network::SocketSharedPtr ProdListenerComponentFactory::createListenSocket( return std::make_shared(std::move(io_handle), address); } return std::make_shared(address); + } else if (address->type() == Network::Address::Type::EnvoyInternal) { + // Listener manager should have validated that envoy internal address doesn't work with udp + // listener yet. + ASSERT(socket_type == Network::Socket::Type::Stream); + return std::make_shared(address); } const std::string scheme = (socket_type == Network::Socket::Type::Stream) @@ -329,11 +332,12 @@ ListenerManagerStats ListenerManagerImpl::generateStats(Stats::Scope& scope) { bool ListenerManagerImpl::addOrUpdateListener(const envoy::config::listener::v3::Listener& config, const std::string& version_info, bool added_via_api) { - RELEASE_ASSERT( - !config.address().has_envoy_internal_address(), - fmt::format("listener {} has envoy internal address {}. Internal address cannot be used by " - "listener yet", - config.name(), config.address().envoy_internal_address().DebugString())); + if (!Runtime::runtimeFeatureEnabled("envoy.reloadable_features.internal_address")) { + RELEASE_ASSERT( + !config.address().has_envoy_internal_address(), + fmt::format("listener {} has envoy internal address {}. This runtime feature is disabled.", + config.name(), config.address().envoy_internal_address().DebugString())); + } // TODO(junr03): currently only one ApiListener can be installed via bootstrap to avoid having to // build a collection of listeners, and to have to be able to warm and drain the listeners. In the // future allow multiple ApiListeners, and allow them to be created via LDS as well as bootstrap. diff --git a/test/common/network/BUILD b/test/common/network/BUILD index 228068e269aa8..34fcb5c4a8536 100644 --- a/test/common/network/BUILD +++ b/test/common/network/BUILD @@ -91,6 +91,7 @@ envoy_cc_test( "//test/test_common:environment_lib", "//test/test_common:network_utility_lib", "//test/test_common:simulated_time_system_lib", + "//test/test_common:test_runtime_lib", "//test/test_common:test_time_lib", "//test/test_common:threadsafe_singleton_injector_lib", "@envoy_api//envoy/config/core/v3:pkg_cc_proto", diff --git a/test/common/network/connection_impl_test.cc b/test/common/network/connection_impl_test.cc index b666ded243ef4..af7d2786eddcb 100644 --- a/test/common/network/connection_impl_test.cc +++ b/test/common/network/connection_impl_test.cc @@ -6,6 +6,7 @@ #include "envoy/config/core/v3/base.pb.h" #include "envoy/event/scaled_range_timer_manager.h" #include "envoy/network/address.h" +#include "envoy/network/listener.h" #include "source/common/api/os_sys_calls_impl.h" #include "source/common/buffer/buffer_impl.h" @@ -29,6 +30,7 @@ #include "test/test_common/network_utility.h" #include "test/test_common/printers.h" #include "test/test_common/simulated_time_system.h" +#include "test/test_common/test_runtime.h" #include "test/test_common/threadsafe_singleton_injector.h" #include "test/test_common/utility.h" @@ -54,6 +56,11 @@ namespace Envoy { namespace Network { namespace { +class MockInternalListenerManager : public InternalListenerManager { +public: + MOCK_METHOD(InternalListenerOptRef, findByAddress, (const Address::InstanceConstSharedPtr&), ()); +}; + TEST(RawBufferSocket, TestBasics) { TransportSocketPtr raw_buffer_socket(Network::Test::createRawBufferSocket()); EXPECT_FALSE(raw_buffer_socket->ssl()); @@ -2961,6 +2968,36 @@ TEST_F(PipeClientConnectionImplTest, SkipSourceAddress) { connection->close(ConnectionCloseType::NoFlush); } +class InternalClientConnectionImplTest : public testing::Test { +protected: + InternalClientConnectionImplTest() + : api_(Api::createApiForTest()), dispatcher_(api_->allocateDispatcher("test_thread")) {} + + Api::ApiPtr api_; + Event::DispatcherPtr dispatcher_; + StrictMock client_callbacks_; +}; + +TEST_F(InternalClientConnectionImplTest, + CannotCreateConnectionToInternalAddressWithInternalAddressEnabled) { + auto scoped_runtime_guard = std::make_unique(); + Runtime::LoaderSingleton::getExisting()->mergeValues( + {{"envoy.reloadable_features.internal_address", "true"}}); + + const Network::SocketInterface* sock_interface = Network::socketInterface( + "envoy.extensions.network.socket_interface.default_socket_interface"); + Network::Address::InstanceConstSharedPtr address = + std::make_shared("listener_0", sock_interface); + // Not implemented yet. + ASSERT_DEATH( + { + ClientConnectionPtr connection = + dispatcher_->createClientConnection(address, Network::Address::InstanceConstSharedPtr(), + Network::Test::createRawBufferSocket(), nullptr); + }, + "panic: not implemented"); +} + } // namespace } // namespace Network } // namespace Envoy diff --git a/test/common/network/listen_socket_impl_test.cc b/test/common/network/listen_socket_impl_test.cc index 949054479cb9b..b3c45f9b135a2 100644 --- a/test/common/network/listen_socket_impl_test.cc +++ b/test/common/network/listen_socket_impl_test.cc @@ -24,7 +24,12 @@ namespace Envoy { namespace Network { namespace { -TEST(ConnectionSocketImplTest, LowerCaseRequestedServerName) { +class ConnectionSocketImplTest : public testing::TestWithParam {}; + +INSTANTIATE_TEST_SUITE_P(IpVersions, ConnectionSocketImplTest, + testing::ValuesIn(TestEnvironment::getIpVersionsForTest())); + +TEST_P(ConnectionSocketImplTest, LowerCaseRequestedServerName) { absl::string_view serverName("www.EXAMPLE.com"); absl::string_view expectedServerName("www.example.com"); auto loopback_addr = Network::Test::getCanonicalLoopbackAddress(Address::IpVersion::v4); @@ -33,6 +38,11 @@ TEST(ConnectionSocketImplTest, LowerCaseRequestedServerName) { EXPECT_EQ(expectedServerName, conn_socket_.requestedServerName()); } +TEST_P(ConnectionSocketImplTest, IpVersion) { + ClientSocketImpl socket(Network::Test::getCanonicalLoopbackAddress(GetParam()), nullptr); + EXPECT_EQ(socket.ipVersion(), GetParam()); +} + template class ListenSocketImplTest : public testing::TestWithParam { using ListenSocketType = NetworkListenSocket>; diff --git a/test/config/utility.cc b/test/config/utility.cc index 46f50346201bf..382946a90e20f 100644 --- a/test/config/utility.cc +++ b/test/config/utility.cc @@ -620,6 +620,18 @@ ConfigHelper::ConfigHelper(const Network::Address::IpVersion version, Api::Api& auto* static_resources = bootstrap_.mutable_static_resources(); for (int i = 0; i < static_resources->listeners_size(); ++i) { auto* listener = static_resources->mutable_listeners(i); + if (listener->mutable_address()->has_envoy_internal_address()) { + ENVOY_LOG_MISC( + debug, "Listener {} has internal address {}. Will not reset to loop back socket address.", + i, listener->mutable_address()->envoy_internal_address().server_listener_name()); + continue; + } + if (listener->mutable_address()->has_pipe()) { + ENVOY_LOG_MISC(debug, + "Listener {} has pipe address {}. Will not reset to loop back socket address.", + i, listener->mutable_address()->pipe().path()); + continue; + } auto* listener_socket_addr = listener->mutable_address()->mutable_socket_address(); if (listener_socket_addr->address() == "0.0.0.0" || listener_socket_addr->address() == "::") { listener_socket_addr->set_address(Network::Test::getAnyAddressString(version)); @@ -904,7 +916,6 @@ void ConfigHelper::setDefaultHostAndRoute(const std::string& domains, const std: void ConfigHelper::setBufferLimits(uint32_t upstream_buffer_limit, uint32_t downstream_buffer_limit) { RELEASE_ASSERT(!finalized_, ""); - RELEASE_ASSERT(bootstrap_.mutable_static_resources()->listeners_size() == 1, ""); auto* listener = bootstrap_.mutable_static_resources()->mutable_listeners(0); listener->mutable_per_connection_buffer_limit_bytes()->set_value(downstream_buffer_limit); const uint32_t stream_buffer_size = std::max( diff --git a/test/extensions/common/proxy_protocol/proxy_protocol_regression_test.cc b/test/extensions/common/proxy_protocol/proxy_protocol_regression_test.cc index cb9d97d5593f8..705f80156db01 100644 --- a/test/extensions/common/proxy_protocol/proxy_protocol_regression_test.cc +++ b/test/extensions/common/proxy_protocol/proxy_protocol_regression_test.cc @@ -72,6 +72,9 @@ class ProxyProtocolRegressionTest : public testing::TestWithParam, uint64_t listenerTag() const override { return 0; } const std::string& name() const override { return name_; } Network::UdpListenerConfigOptRef udpListenerConfig() override { return udp_listener_config_; } + Network::InternalListenerConfigOptRef internalListenerConfig() override { + return Network::InternalListenerConfigOptRef(); + } Network::ConnectionBalancer& connectionBalancer() override { return connection_balancer_; } envoy::config::core::v3::TrafficDirection direction() const override { return envoy::config::core::v3::UNSPECIFIED; diff --git a/test/integration/integration_tcp_client.cc b/test/integration/integration_tcp_client.cc index 2396d74c06da4..0e145ae12c838 100644 --- a/test/integration/integration_tcp_client.cc +++ b/test/integration/integration_tcp_client.cc @@ -53,7 +53,6 @@ IntegrationTcpClient::IntegrationTcpClient( std::function above_overflow) -> Buffer::Instance* { return new Buffer::WatermarkBuffer(below_low, above_high, above_overflow); })); - ; connection_ = dispatcher.createClientConnection( Network::Utility::resolveUrl( diff --git a/test/integration/internal_listener_integration_test.cc b/test/integration/internal_listener_integration_test.cc new file mode 100644 index 0000000000000..11614b84874a9 --- /dev/null +++ b/test/integration/internal_listener_integration_test.cc @@ -0,0 +1,104 @@ +#include "envoy/config/bootstrap/v3/bootstrap.pb.h" +#include "envoy/config/core/v3/config_source.pb.h" +#include "envoy/config/core/v3/grpc_service.pb.h" +#include "envoy/extensions/filters/network/tcp_proxy/v3/tcp_proxy.pb.h" +#include "envoy/network/connection.h" +#include "envoy/service/discovery/v3/discovery.pb.h" + +#include "source/common/config/api_version.h" + +#include "test/common/grpc/grpc_client_integration.h" +#include "test/integration/base_integration_test.h" +#include "test/test_common/network_utility.h" +#include "test/test_common/resources.h" + +#include "absl/strings/str_cat.h" +#include "gmock/gmock.h" +#include "gtest/gtest.h" + +namespace Envoy { +namespace { + +class InternalListenerIntegrationTest : public testing::TestWithParam, + public BaseIntegrationTest { +public: + InternalListenerIntegrationTest() + : BaseIntegrationTest(GetParam(), ConfigHelper::tcpProxyConfig()) {} + + void initialize() override { + config_helper_.renameListener("tcp"); + config_helper_.addConfigModifier([](envoy::config::bootstrap::v3::Bootstrap& bootstrap) { + auto& listener = *bootstrap.mutable_static_resources()->mutable_listeners(0); + listener.mutable_address()->mutable_envoy_internal_address()->set_server_listener_name( + "internal_listener"); + }); + BaseIntegrationTest::initialize(); + } +}; + +TEST_P(InternalListenerIntegrationTest, BasicConfigUpdate) { + initialize(); + EXPECT_EQ(1, test_server_->counter("listener_manager.lds.update_success")->value()); + + ConfigHelper new_config_helper( + version_, *api_, MessageUtil::getJsonStringFromMessageOrDie(config_helper_.bootstrap())); + new_config_helper.addConfigModifier( + [&](envoy::config::bootstrap::v3::Bootstrap& bootstrap) -> void { + auto* listener = bootstrap.mutable_static_resources()->mutable_listeners(0); + (*(*listener->mutable_metadata()->mutable_filter_metadata())["random_filter_name"] + .mutable_fields())["random_key"] + .set_number_value(1); + }); + + new_config_helper.setLds("1"); + + test_server_->waitForCounterEq("listener_manager.listener_modified", 1); + test_server_->waitForGaugeEq("listener_manager.total_listeners_draining", 0); +} + +TEST_P(InternalListenerIntegrationTest, InplaceUpdate) { + initialize(); + EXPECT_EQ(1, test_server_->counter("listener_manager.lds.update_success")->value()); + + ConfigHelper new_config_helper( + version_, *api_, MessageUtil::getJsonStringFromMessageOrDie(config_helper_.bootstrap())); + new_config_helper.addConfigModifier( + [&](envoy::config::bootstrap::v3::Bootstrap& bootstrap) -> void { + auto* listener = bootstrap.mutable_static_resources()->mutable_listeners(0); + auto new_filter_chain = *listener->mutable_filter_chains(0); + listener->mutable_filter_chains()->Add()->MergeFrom(new_filter_chain); + *(listener->mutable_filter_chains(1) + ->mutable_filter_chain_match() + ->mutable_application_protocols() + ->Add()) = "alpn"; + }); + + new_config_helper.setLds("1"); + + test_server_->waitForCounterEq("listener_manager.listener_modified", 1); + test_server_->waitForGaugeEq("listener_manager.total_listeners_draining", 0); +} + +TEST_P(InternalListenerIntegrationTest, DeleteListener) { + initialize(); + EXPECT_EQ(1, test_server_->counter("listener_manager.lds.update_success")->value()); + + ConfigHelper new_config_helper( + version_, *api_, MessageUtil::getJsonStringFromMessageOrDie(config_helper_.bootstrap())); + new_config_helper.addConfigModifier( + [&](envoy::config::bootstrap::v3::Bootstrap& bootstrap) -> void { + bootstrap.mutable_static_resources()->mutable_listeners()->RemoveLast(); + }); + + new_config_helper.setLds("1"); + + test_server_->waitForCounterEq("listener_manager.listener_removed", 1); + test_server_->waitForGaugeEq("listener_manager.total_listeners_draining", 0); +} + +INSTANTIATE_TEST_SUITE_P(IpVersions, InternalListenerIntegrationTest, + testing::ValuesIn(TestEnvironment::getIpVersionsForTest()), + TestUtility::ipTestParamsToString); + +} // namespace +} // namespace Envoy diff --git a/test/integration/socket_interface_integration_test.cc b/test/integration/socket_interface_integration_test.cc index 14bbc28500130..9cae29698a006 100644 --- a/test/integration/socket_interface_integration_test.cc +++ b/test/integration/socket_interface_integration_test.cc @@ -90,7 +90,6 @@ TEST_P(SocketInterfaceIntegrationTest, AddressWithSocketInterface) { } // Test that connecting to internal address will crash. -// TODO(lambdai): Add internal connection implementation to enable the connection creation. TEST_P(SocketInterfaceIntegrationTest, InternalAddressWithSocketInterface) { BaseIntegrationTest::initialize(); @@ -108,7 +107,7 @@ TEST_P(SocketInterfaceIntegrationTest, InternalAddressWithSocketInterface) { } // Test that recv from internal address will crash. -// TODO(lambdai): Add internal socket implementation to enable the io path. +// TODO(lambdai): Add UDP internal listener implementation to enable the io path. TEST_P(SocketInterfaceIntegrationTest, UdpRecvFromInternalAddressWithSocketInterface) { BaseIntegrationTest::initialize(); diff --git a/test/mocks/network/mocks.h b/test/mocks/network/mocks.h index e4f5d6c18df9e..19000616ee305 100644 --- a/test/mocks/network/mocks.h +++ b/test/mocks/network/mocks.h @@ -357,7 +357,6 @@ class MockConnectionSocket : public ConnectionSocket { IoHandlePtr io_handle_; std::shared_ptr connection_info_provider_; - bool is_closed_; }; class MockListenerFilterCallbacks : public ListenerFilterCallbacks { @@ -428,6 +427,7 @@ class MockListenerConfig : public ListenerConfig { MOCK_METHOD(uint64_t, listenerTag, (), (const)); MOCK_METHOD(const std::string&, name, (), (const)); MOCK_METHOD(Network::UdpListenerConfigOptRef, udpListenerConfig, ()); + MOCK_METHOD(InternalListenerConfigOptRef, internalListenerConfig, ()); MOCK_METHOD(ConnectionBalancer&, connectionBalancer, ()); MOCK_METHOD(ResourceLimit&, openConnections, ()); MOCK_METHOD(uint32_t, tcpBacklogSize, (), (const)); diff --git a/test/server/BUILD b/test/server/BUILD index 3195047b65ac4..ff737d73f7808 100644 --- a/test/server/BUILD +++ b/test/server/BUILD @@ -88,8 +88,6 @@ envoy_cc_test( "//test/test_common:network_utility_lib", "//test/test_common:test_runtime_lib", "//test/test_common:threadsafe_singleton_injector_lib", - "@envoy_api//envoy/config/core/v3:pkg_cc_proto", - "@envoy_api//envoy/config/listener/v3:pkg_cc_proto", ], ) @@ -102,14 +100,25 @@ envoy_cc_test( "//source/common/network:address_lib", "//source/common/network:connection_balancer_lib", "//source/common/stats:stats_lib", - "//source/server:active_raw_udp_listener_config", "//source/server:connection_handler_lib", - "//test/mocks/access_log:access_log_mocks", - "//test/mocks/api:api_mocks", "//test/mocks/network:io_handle_mocks", "//test/mocks/network:network_mocks", "//test/test_common:network_utility_lib", - "//test/test_common:threadsafe_singleton_injector_lib", + ], +) + +envoy_cc_test( + name = "active_internal_listener_test", + srcs = ["active_internal_listener_test.cc"], + deps = [ + ":utility_lib", + "//source/common/network:address_lib", + "//source/common/network:listen_socket_lib", + "//source/common/network:utility_lib", + "//source/common/stats:stats_lib", + "//source/extensions/transport_sockets/raw_buffer:config", + "//source/server:connection_handler_lib", + "//test/mocks/network:network_mocks", ], ) diff --git a/test/server/active_internal_listener_test.cc b/test/server/active_internal_listener_test.cc new file mode 100644 index 0000000000000..a2e60f3bc8c89 --- /dev/null +++ b/test/server/active_internal_listener_test.cc @@ -0,0 +1,230 @@ +#include + +#include "envoy/network/filter.h" +#include "envoy/network/listener.h" +#include "envoy/stats/scope.h" + +#include "source/common/network/address_impl.h" +#include "source/common/network/raw_buffer_socket.h" +#include "source/server/active_internal_listener.h" +#include "source/server/connection_handler_impl.h" + +#include "test/mocks/common.h" +#include "test/mocks/network/mocks.h" +#include "test/test_common/network_utility.h" + +#include "gmock/gmock.h" +#include "gtest/gtest.h" + +using testing::_; +using testing::Invoke; +using testing::NiceMock; +using testing::Return; +using testing::ReturnRef; + +namespace Envoy { +namespace Server { +namespace { + +class MockInternalListenerCallback : public Network::InternalListener { +public: + MOCK_METHOD(void, onAccept, (Network::ConnectionSocketPtr && socket), ()); + MOCK_METHOD(Event::Dispatcher&, dispatcher, ()); +}; +class ActiveInternalListenerTest : public testing::Test, + protected Logger::Loggable { +public: + ActiveInternalListenerTest() { + EXPECT_CALL(listener_config_, listenerScope).Times(testing::AnyNumber()); + EXPECT_CALL(conn_handler_, statPrefix()).WillRepeatedly(ReturnRef(listener_stat_prefix_)); + listener_filter_matcher_ = std::make_shared>(); + } + void addListener() { + EXPECT_CALL(listener_config_, listenerFiltersTimeout()); + EXPECT_CALL(listener_config_, continueOnListenerFiltersTimeout()); + EXPECT_CALL(listener_config_, filterChainManager()).WillRepeatedly(ReturnRef(manager_)); + EXPECT_CALL(listener_config_, openConnections()).WillRepeatedly(ReturnRef(resource_limit_)); + auto mock_listener_will_be_moved = std::make_unique(); + generic_listener_ = mock_listener_will_be_moved.get(); + internal_listener_ = std::make_shared( + conn_handler_, dispatcher_, std::move(mock_listener_will_be_moved), listener_config_); + } + Network::Listener* addListenerWithRealNetworkListener() { + EXPECT_CALL(listener_config_, listenerFiltersTimeout()); + EXPECT_CALL(listener_config_, continueOnListenerFiltersTimeout()); + EXPECT_CALL(listener_config_, filterChainManager()).WillRepeatedly(ReturnRef(manager_)); + EXPECT_CALL(listener_config_, openConnections()).WillRepeatedly(ReturnRef(resource_limit_)); + + internal_listener_ = + std::make_shared(conn_handler_, dispatcher_, listener_config_); + return internal_listener_->listener(); + } + void expectFilterChainFactory() { + EXPECT_CALL(listener_config_, filterChainFactory()) + .WillRepeatedly(ReturnRef(filter_chain_factory_)); + } + std::string listener_stat_prefix_{"listener_stat_prefix"}; + NiceMock dispatcher_{"test"}; + BasicResourceLimitImpl resource_limit_; + Network::MockConnectionHandler conn_handler_; + Network::MockListener* generic_listener_; + Network::MockListenerConfig listener_config_; + NiceMock manager_; + NiceMock filter_chain_factory_; + std::shared_ptr filter_chain_; + std::shared_ptr> listener_filter_matcher_; + std::shared_ptr internal_listener_; +}; + +TEST_F(ActiveInternalListenerTest, BasicInternalListener) { + addListener(); + EXPECT_CALL(*generic_listener_, onDestroy()); +} + +TEST_F(ActiveInternalListenerTest, AcceptSocketAndCreateListenerFilter) { + addListener(); + expectFilterChainFactory(); + Network::MockListenerFilter* test_listener_filter = new Network::MockListenerFilter(); + // FIX-ME: replace by mock socket + Network::Address::InstanceConstSharedPtr original_dst_address( + new Network::Address::Ipv4Instance("127.0.0.2", 8080)); + Network::MockConnectionSocket* accepted_socket = new NiceMock(); + + EXPECT_CALL(filter_chain_factory_, createListenerFilterChain(_)) + .WillRepeatedly(Invoke([&](Network::ListenerFilterManager& manager) -> bool { + // Insert the Mock filter. + manager.addAcceptFilter(listener_filter_matcher_, + Network::ListenerFilterPtr{test_listener_filter}); + return true; + })); + EXPECT_CALL(*test_listener_filter, onAccept(_)) + .WillOnce(Invoke([&](Network::ListenerFilterCallbacks& cb) -> Network::FilterStatus { + cb.socket().connectionInfoProvider().restoreLocalAddress(original_dst_address); + return Network::FilterStatus::Continue; + })); + EXPECT_CALL(*test_listener_filter, destroy_()); + EXPECT_CALL(manager_, findFilterChain(_)).WillOnce(Return(nullptr)); + internal_listener_->onAccept(Network::ConnectionSocketPtr{accepted_socket}); + EXPECT_CALL(*generic_listener_, onDestroy()); +} + +TEST_F(ActiveInternalListenerTest, DestroyListenerClosesActiveSocket) { + addListener(); + expectFilterChainFactory(); + Network::MockListenerFilter* test_listener_filter = new Network::MockListenerFilter(); + Network::MockConnectionSocket* accepted_socket = new NiceMock(); + NiceMock io_handle; + EXPECT_CALL(*accepted_socket, ioHandle()).WillOnce(ReturnRef(io_handle)); + EXPECT_CALL(io_handle, isOpen()).WillOnce(Return(true)); + + EXPECT_CALL(filter_chain_factory_, createListenerFilterChain(_)) + .WillRepeatedly(Invoke([&](Network::ListenerFilterManager& manager) -> bool { + manager.addAcceptFilter(listener_filter_matcher_, + Network::ListenerFilterPtr{test_listener_filter}); + return true; + })); + EXPECT_CALL(*test_listener_filter, onAccept(_)) + .WillOnce(Invoke([&](Network::ListenerFilterCallbacks&) -> Network::FilterStatus { + return Network::FilterStatus::StopIteration; + })); + + internal_listener_->onAccept(Network::ConnectionSocketPtr{accepted_socket}); + + EXPECT_CALL(*test_listener_filter, destroy_()); + EXPECT_CALL(*generic_listener_, onDestroy()); + internal_listener_.reset(); +} + +TEST_F(ActiveInternalListenerTest, AcceptSocketAndCreateNetworkFilter) { + addListener(); + expectFilterChainFactory(); + + Network::MockListenerFilter* test_listener_filter = new Network::MockListenerFilter(); + // FIX-ME: replace by mock socket + Network::Address::InstanceConstSharedPtr original_dst_address( + new Network::Address::Ipv4Instance("127.0.0.2", 8080)); + + Network::MockConnectionSocket* accepted_socket = new NiceMock(); + + EXPECT_CALL(filter_chain_factory_, createListenerFilterChain(_)) + .WillRepeatedly(Invoke([&](Network::ListenerFilterManager& manager) -> bool { + // Insert the Mock filter. + manager.addAcceptFilter(listener_filter_matcher_, + Network::ListenerFilterPtr{test_listener_filter}); + return true; + })); + EXPECT_CALL(*test_listener_filter, onAccept(_)) + .WillOnce(Invoke([&](Network::ListenerFilterCallbacks& cb) -> Network::FilterStatus { + cb.socket().connectionInfoProvider().restoreLocalAddress(original_dst_address); + return Network::FilterStatus::Continue; + })); + EXPECT_CALL(*test_listener_filter, destroy_()); + auto filter_factory_callback = std::make_shared>(); + filter_chain_ = std::make_shared>(); + auto transport_socket_factory = Network::Test::createRawBufferSocketFactory(); + + EXPECT_CALL(manager_, findFilterChain(_)).WillOnce(Return(filter_chain_.get())); + EXPECT_CALL(*filter_chain_, transportSocketFactory) + .WillOnce(testing::ReturnRef(*transport_socket_factory)); + EXPECT_CALL(*filter_chain_, networkFilterFactories).WillOnce(ReturnRef(*filter_factory_callback)); + auto* connection = new NiceMock(); + EXPECT_CALL(dispatcher_, createServerConnection_()).WillOnce(Return(connection)); + EXPECT_CALL(conn_handler_, incNumConnections()); + EXPECT_CALL(filter_chain_factory_, createNetworkFilterChain(_, _)).WillOnce(Return(true)); + EXPECT_CALL(listener_config_, perConnectionBufferLimitBytes()); + internal_listener_->onAccept(Network::ConnectionSocketPtr{accepted_socket}); + EXPECT_CALL(conn_handler_, decNumConnections()); + connection->close(Network::ConnectionCloseType::NoFlush); + dispatcher_.clearDeferredDeleteList(); + EXPECT_CALL(*generic_listener_, onDestroy()); +} + +TEST_F(ActiveInternalListenerTest, StopListener) { + addListener(); + EXPECT_CALL(*generic_listener_, onDestroy()); + internal_listener_->shutdownListener(); +} + +TEST_F(ActiveInternalListenerTest, PausedListenerAcceptNewSocket) { + addListenerWithRealNetworkListener(); + internal_listener_->pauseListening(); + + expectFilterChainFactory(); + Network::MockConnectionSocket* accepted_socket = new NiceMock(); + + EXPECT_CALL(filter_chain_factory_, createListenerFilterChain(_)) + .WillRepeatedly(Invoke([&](Network::ListenerFilterManager&) -> bool { return true; })); + EXPECT_CALL(manager_, findFilterChain(_)).WillOnce(Return(nullptr)); + internal_listener_->onAccept(Network::ConnectionSocketPtr{accepted_socket}); +} + +TEST_F(ActiveInternalListenerTest, DestroyListenerCloseAllConnections) { + addListenerWithRealNetworkListener(); + internal_listener_->pauseListening(); + + expectFilterChainFactory(); + Network::MockConnectionSocket* accepted_socket = new NiceMock(); + + auto filter_factory_callback = std::make_shared>(); + filter_chain_ = std::make_shared>(); + auto transport_socket_factory = Network::Test::createRawBufferSocketFactory(); + + EXPECT_CALL(filter_chain_factory_, createListenerFilterChain(_)) + .WillRepeatedly(Invoke([&](Network::ListenerFilterManager&) -> bool { return true; })); + EXPECT_CALL(manager_, findFilterChain(_)).WillOnce(Return(filter_chain_.get())); + EXPECT_CALL(*filter_chain_, transportSocketFactory) + .WillOnce(testing::ReturnRef(*transport_socket_factory)); + EXPECT_CALL(*filter_chain_, networkFilterFactories).WillOnce(ReturnRef(*filter_factory_callback)); + auto* connection = new NiceMock(); + EXPECT_CALL(dispatcher_, createServerConnection_()).WillOnce(Return(connection)); + EXPECT_CALL(conn_handler_, incNumConnections()); + EXPECT_CALL(filter_chain_factory_, createNetworkFilterChain(_, _)).WillOnce(Return(true)); + EXPECT_CALL(listener_config_, perConnectionBufferLimitBytes()); + internal_listener_->onAccept(Network::ConnectionSocketPtr{accepted_socket}); + + EXPECT_CALL(conn_handler_, decNumConnections()); + internal_listener_.reset(); +} +} // namespace +} // namespace Server +} // namespace Envoy diff --git a/test/server/active_tcp_listener_test.cc b/test/server/active_tcp_listener_test.cc index 2e6e2b807b03f..2217c3f64020d 100644 --- a/test/server/active_tcp_listener_test.cc +++ b/test/server/active_tcp_listener_test.cc @@ -10,7 +10,6 @@ #include "source/common/network/utility.h" #include "source/server/active_tcp_listener.h" -#include "test/mocks/api/mocks.h" #include "test/mocks/common.h" #include "test/mocks/network/io_handle.h" #include "test/mocks/network/mocks.h" diff --git a/test/server/connection_handler_test.cc b/test/server/connection_handler_test.cc index 672f24e901648..74a5ca332db0b 100644 --- a/test/server/connection_handler_test.cc +++ b/test/server/connection_handler_test.cc @@ -1,6 +1,9 @@ -#include "envoy/config/core/v3/base.pb.h" -#include "envoy/config/listener/v3/udp_listener_config.pb.h" -#include "envoy/network/exception.h" +#include +#include +#include +#include +#include + #include "envoy/network/filter.h" #include "envoy/stats/scope.h" @@ -104,6 +107,8 @@ class ConnectionHandlerTest : public testing::Test, protected Logger::Loggable udp_listener_config_; + std::unique_ptr internal_listener_config_; Network::ConnectionBalancerSharedPtr connection_balancer_; BasicResourceLimitImpl open_connections_; const std::vector access_logs_; @@ -256,6 +269,22 @@ class ConnectionHandlerTest : public testing::Test, protected Logger::Loggable> overridden_filter_chain_manager = + nullptr) { + listeners_.emplace_back(std::make_unique( + *this, tag, /*bind_to_port*/ false, /*hand_off_restored_destination_connections*/ false, + name, Network::Socket::Type::Stream, listener_filters_timeout, + continue_on_listener_filters_timeout, access_log_, overridden_filter_chain_manager, + ENVOY_TCP_BACKLOG_SIZE, nullptr)); + listeners_.back()->internal_listener_config_ = + std::make_unique(); + return listeners_.back().get(); + } + void validateOriginalDst(Network::TcpListenerCallbacks** listener_callbacks, TestListener* test_listener, Network::MockListener* listener) { Network::Address::InstanceConstSharedPtr normal_address( @@ -300,7 +329,7 @@ class ConnectionHandlerTest : public testing::Test, protected Logger::Loggable dispatcher_{"test"}; std::list listeners_; - Network::ConnectionHandlerPtr handler_; + std::unique_ptr handler_; NiceMock manager_; NiceMock factory_; const std::shared_ptr filter_chain_; @@ -1546,6 +1575,68 @@ TEST_F(ConnectionHandlerTest, ShutdownUdpListener) { << "The read_filter_ should be deleted before the udp_listener_ is deleted."; } +TEST_F(ConnectionHandlerTest, DisableInternalListener) { + InSequence s; + Network::Address::InstanceConstSharedPtr local_address{ + new Network::Address::EnvoyInternalInstance("server_internal_address")}; + + TestListener* internal_listener = + addInternalListener(1, "test_internal_listener", std::chrono::milliseconds(), false, nullptr); + EXPECT_CALL(internal_listener->socket_factory_, localAddress()) + .WillOnce(ReturnRef(local_address)); + handler_->addListener(absl::nullopt, *internal_listener); + auto internal_listener_cb = handler_->findByAddress(local_address); + ASSERT_TRUE(internal_listener_cb.has_value()); + + handler_->disableListeners(); + auto internal_listener_cb_disabled = handler_->findByAddress(local_address); + ASSERT_TRUE(internal_listener_cb_disabled.has_value()); + ASSERT_EQ(&internal_listener_cb_disabled.value().get(), &internal_listener_cb.value().get()); + + handler_->enableListeners(); + auto internal_listener_cb_enabled = handler_->findByAddress(local_address); + ASSERT_TRUE(internal_listener_cb_enabled.has_value()); + ASSERT_EQ(&internal_listener_cb_enabled.value().get(), &internal_listener_cb.value().get()); +} + +TEST_F(ConnectionHandlerTest, InternalListenerInplaceUpdate) { + InSequence s; + uint64_t old_listener_tag = 1; + uint64_t new_listener_tag = 2; + Network::Address::InstanceConstSharedPtr local_address{ + new Network::Address::EnvoyInternalInstance("server_internal_address")}; + + TestListener* internal_listener = addInternalListener( + old_listener_tag, "test_internal_listener", std::chrono::milliseconds(), false, nullptr); + EXPECT_CALL(internal_listener->socket_factory_, localAddress()) + .WillOnce(ReturnRef(local_address)); + handler_->addListener(absl::nullopt, *internal_listener); + + ASSERT_NE(internal_listener, nullptr); + + auto overridden_filter_chain_manager = + std::make_shared>(); + TestListener* new_test_listener = + addInternalListener(new_listener_tag, "test_internal_listener", std::chrono::milliseconds(), + false, overridden_filter_chain_manager); + + handler_->addListener(old_listener_tag, *new_test_listener); + + Network::MockConnectionSocket* connection = new NiceMock(); + + auto internal_listener_cb = handler_->findByAddress(local_address); + + EXPECT_CALL(manager_, findFilterChain(_)).Times(0); + EXPECT_CALL(*overridden_filter_chain_manager, findFilterChain(_)).WillOnce(Return(nullptr)); + EXPECT_CALL(*access_log_, log(_, _, _, _)); + internal_listener_cb.value().get().onAccept(Network::ConnectionSocketPtr{connection}); + EXPECT_EQ(0UL, handler_->numConnections()); + + testing::MockFunction completion; + handler_->removeFilterChains(old_listener_tag, {}, completion.AsStdFunction()); + EXPECT_CALL(completion, Call()); + dispatcher_.clearDeferredDeleteList(); +} } // namespace } // namespace Server } // namespace Envoy diff --git a/test/server/listener_manager_impl_test.cc b/test/server/listener_manager_impl_test.cc index 3798dea0a84fe..8002094c46467 100644 --- a/test/server/listener_manager_impl_test.cc +++ b/test/server/listener_manager_impl_test.cc @@ -519,15 +519,93 @@ bind_to_port: false } TEST_F(ListenerManagerImplTest, UnsupportedInternalListener) { + auto scoped_runtime_guard = std::make_unique(); + // Workaround of triggering death at windows platform. + Runtime::LoaderSingleton::getExisting()->mergeValues( + {{"envoy.reloadable_features.internal_address", "false"}}); + const std::string yaml = R"EOF( -address: - envoy_internal_address: - server_listener_name: a_listener_name -filter_chains: -- filters: [] + name: "foo" + address: + envoy_internal_address: + server_listener_name: a_listener_name + filter_chains: + - filters: [] + )EOF"; + + EXPECT_DEATH(manager_->addOrUpdateListener(parseListenerFromV3Yaml(yaml), "", true), ".*"); +} + +TEST_F(ListenerManagerImplTest, RejectListenerWithSocketAddressWithInternalListenerConfig) { + auto scoped_runtime_guard = std::make_unique(); + Runtime::LoaderSingleton::getExisting()->mergeValues( + {{"envoy.reloadable_features.internal_address", "true"}}); + + const std::string yaml = R"EOF( + name: "foo" + address: + socket_address: + address: 127.0.0.1 + port_value: 1234 + internal_listener: {} + filter_chains: + - filters: [] )EOF"; - ASSERT_DEATH(manager_->addOrUpdateListener(parseListenerFromV3Yaml(yaml), "", true), ""); + EXPECT_THROW_WITH_MESSAGE(manager_->addOrUpdateListener(parseListenerFromV3Yaml(yaml), "", true), + EnvoyException, + "error adding listener '127.0.0.1:1234': address is not an internal " + "address but an internal listener config is provided"); +} + +TEST_F(ListenerManagerImplTest, RejectTcpOptionsWithInternalListenerConfig) { + auto scoped_runtime_guard = std::make_unique(); + Runtime::LoaderSingleton::getExisting()->mergeValues( + {{"envoy.reloadable_features.internal_address", "true"}}); + + const std::string yaml = R"EOF( + name: "foo" + address: + envoy_internal_address: + server_listener_name: test_internal_listener_name + filter_chains: + - filters: [] + )EOF"; + + auto listener = parseListenerFromV3Yaml(yaml); + auto listener_mutators = std::vector>{ + [](envoy::config::listener::v3::Listener& l) { + l.mutable_connection_balance_config()->mutable_exact_balance(); + }, + [](envoy::config::listener::v3::Listener& l) { l.mutable_enable_reuse_port(); }, + [](envoy::config::listener::v3::Listener& l) { l.mutable_freebind()->set_value(true); }, + [](envoy::config::listener::v3::Listener& l) { l.mutable_tcp_backlog_size(); }, + [](envoy::config::listener::v3::Listener& l) { l.mutable_tcp_fast_open_queue_length(); }, + [](envoy::config::listener::v3::Listener& l) { l.mutable_transparent()->set_value(true); }, + + }; + for (const auto& f : listener_mutators) { + auto new_listener = listener; + f(new_listener); + EXPECT_THROW_WITH_MESSAGE(new ListenerImpl(new_listener, "version", *manager_, "foo", true, + false, /*hash=*/static_cast(0), 1), + EnvoyException, + "error adding listener 'envoy://test_internal_listener_name': has " + "unsupported tcp listener feature"); + } + { + auto new_listener = listener; + new_listener.mutable_socket_options()->Add(); + EXPECT_THROW_WITH_MESSAGE(manager_->addOrUpdateListener(new_listener, "", true), EnvoyException, + "error adding listener 'envoy://test_internal_listener_name': does " + "not support socket option") + } + { + auto new_listener = listener; + new_listener.set_enable_mptcp(true); + EXPECT_THROW_WITH_MESSAGE(manager_->addOrUpdateListener(new_listener, "", true), EnvoyException, + "listener foo: enable_mptcp can only be used with IP addresses") + } } TEST_F(ListenerManagerImplTest, NotDefaultListenerFiltersTimeout) { @@ -4768,6 +4846,303 @@ name: test_api_listener_2 EXPECT_EQ("test_api_listener", manager_->apiListener()->get().name()); } +TEST_F(ListenerManagerImplWithRealFiltersTest, AddOrUpdateInternalListener) { + auto scoped_runtime_guard = std::make_unique(); + Runtime::LoaderSingleton::getExisting()->mergeValues( + {{"envoy.reloadable_features.internal_address", "true"}}); + time_system_.setSystemTime(std::chrono::milliseconds(1001001001001)); + + InSequence s; + + auto* lds_api = new MockLdsApi(); + EXPECT_CALL(listener_factory_, createLdsApi_(_, _)).WillOnce(Return(lds_api)); + envoy::config::core::v3::ConfigSource lds_config; + manager_->createLdsApi(lds_config, nullptr); + + EXPECT_CALL(*lds_api, versionInfo()).WillOnce(Return("")); + checkConfigDump(R"EOF( +static_listeners: +)EOF"); + + // Add foo listener. The internal listener does not need explicit internal listener field. + const std::string listener_foo_yaml = R"EOF( +name: test_internal_listener +address: + envoy_internal_address: + server_listener_name: test_internal_listener_name +filter_chains: {} + )EOF"; + + ListenerHandle* listener_foo = expectListenerCreate(false, true); + EXPECT_TRUE( + manager_->addOrUpdateListener(parseListenerFromV3Yaml(listener_foo_yaml), "version1", true)); + checkStats(__LINE__, 1, 0, 0, 0, 1, 0, 0); + EXPECT_CALL(*lds_api, versionInfo()).WillOnce(Return("version1")); + checkConfigDump(R"EOF( +version_info: version1 +static_listeners: +dynamic_listeners: + - name: test_internal_listener + warming_state: + version_info: version1 + listener: + "@type": type.googleapis.com/envoy.config.listener.v3.Listener + name: test_internal_listener + address: + envoy_internal_address: + server_listener_name: test_internal_listener_name + filter_chains: {} + last_updated: + seconds: 1001001001 + nanos: 1000000 +)EOF"); + + // Update duplicate should be a NOP. + EXPECT_FALSE(manager_->addOrUpdateListener(parseListenerFromV3Yaml(listener_foo_yaml), "", true)); + checkStats(__LINE__, 1, 0, 0, 0, 1, 0, 0); + + // Update foo listener. Should share socket. + const std::string listener_foo_update1_yaml = R"EOF( +name: test_internal_listener +address: + envoy_internal_address: + server_listener_name: test_internal_listener_name +filter_chains: {} +per_connection_buffer_limit_bytes: 10 + )EOF"; + + time_system_.setSystemTime(std::chrono::milliseconds(2002002002002)); + + ListenerHandle* listener_foo_update1 = expectListenerCreate(false, true); + EXPECT_CALL(*listener_foo, onDestroy()); + EXPECT_TRUE(manager_->addOrUpdateListener(parseListenerFromV3Yaml(listener_foo_update1_yaml), + "version2", true)); + checkStats(__LINE__, 1, 1, 0, 0, 1, 0, 0); + EXPECT_CALL(*lds_api, versionInfo()).WillOnce(Return("version2")); + checkConfigDump(R"EOF( + version_info: version2 + static_listeners: + dynamic_listeners: + - name: test_internal_listener + warming_state: + version_info: version2 + listener: + "@type": type.googleapis.com/envoy.config.listener.v3.Listener + name: test_internal_listener + address: + envoy_internal_address: + server_listener_name: test_internal_listener_name + filter_chains: {} + per_connection_buffer_limit_bytes: 10 + last_updated: + seconds: 2002002002 + nanos: 2000000 + )EOF"); + + // Validate that workers_started stat is zero before calling startWorkers. + EXPECT_EQ(0, server_.stats_store_ + .gauge("listener_manager.workers_started", Stats::Gauge::ImportMode::NeverImport) + .value()); + + // Start workers. + EXPECT_CALL(*worker_, addListener(_, _, _)); + EXPECT_CALL(*worker_, start(_, _)); + manager_->startWorkers(guard_dog_, callback_.AsStdFunction()); + // Validate that workers_started stat is still zero before workers set the status via + // completion callback. + EXPECT_EQ(0, server_.stats_store_ + .gauge("listener_manager.workers_started", Stats::Gauge::ImportMode::NeverImport) + .value()); + worker_->callAddCompletion(); + + // Validate that workers_started stat is set to 1 after workers have responded with initialization + // status. + EXPECT_EQ(1, server_.stats_store_ + .gauge("listener_manager.workers_started", Stats::Gauge::ImportMode::NeverImport) + .value()); + + // Update duplicate should be a NOP. + EXPECT_FALSE( + manager_->addOrUpdateListener(parseListenerFromV3Yaml(listener_foo_update1_yaml), "", true)); + checkStats(__LINE__, 1, 1, 0, 0, 1, 0, 0); + + time_system_.setSystemTime(std::chrono::milliseconds(3003003003003)); + + // Update foo. Should go into warming, have an immediate warming callback, and start immediate + // removal. + ListenerHandle* listener_foo_update2 = expectListenerCreate(false, true); + EXPECT_CALL(*worker_, addListener(_, _, _)); + EXPECT_CALL(*worker_, stopListener(_, _)); + EXPECT_CALL(*listener_foo_update1->drain_manager_, startDrainSequence(_)); + EXPECT_TRUE( + manager_->addOrUpdateListener(parseListenerFromV3Yaml(listener_foo_yaml), "version3", true)); + worker_->callAddCompletion(); + checkStats(__LINE__, 1, 2, 0, 0, 1, 1, 0); + EXPECT_CALL(*lds_api, versionInfo()).WillOnce(Return("version3")); + checkConfigDump(R"EOF( + version_info: version3 + static_listeners: + dynamic_listeners: + - name: test_internal_listener + active_state: + version_info: version3 + listener: + "@type": type.googleapis.com/envoy.config.listener.v3.Listener + name: test_internal_listener + address: + envoy_internal_address: + server_listener_name: test_internal_listener_name + filter_chains: {} + last_updated: + seconds: 3003003003 + nanos: 3000000 + draining_state: + version_info: version2 + listener: + "@type": type.googleapis.com/envoy.config.listener.v3.Listener + name: test_internal_listener + address: + envoy_internal_address: + server_listener_name: test_internal_listener_name + filter_chains: {} + per_connection_buffer_limit_bytes: 10 + last_updated: + seconds: 2002002002 + nanos: 2000000 + )EOF"); + + EXPECT_CALL(*worker_, removeListener(_, _)); + listener_foo_update1->drain_manager_->drain_sequence_completion_(); + checkStats(__LINE__, 1, 2, 0, 0, 1, 1, 0); + EXPECT_CALL(*listener_foo_update1, onDestroy()); + worker_->callRemovalCompletion(); + checkStats(__LINE__, 1, 2, 0, 0, 1, 0, 0); + + time_system_.setSystemTime(std::chrono::milliseconds(4004004004004)); + + // Add bar listener. + const std::string listener_bar_yaml = R"EOF( + name: test_internal_listener_bar + address: + envoy_internal_address: + server_listener_name: test_internal_listener_bar + filter_chains: {} + internal_listener: {} + )EOF"; + + ListenerHandle* listener_bar = expectListenerCreate(false, true); + EXPECT_CALL(*worker_, addListener(_, _, _)); + EXPECT_TRUE( + manager_->addOrUpdateListener(parseListenerFromV3Yaml(listener_bar_yaml), "version4", true)); + EXPECT_EQ(2UL, manager_->listeners().size()); + worker_->callAddCompletion(); + checkStats(__LINE__, 2, 2, 0, 0, 2, 0, 0); + + time_system_.setSystemTime(std::chrono::milliseconds(5005005005005)); + + // Add baz listener, this time requiring initializing. + const std::string listener_baz_yaml = R"EOF( + name: test_internal_listener_baz + address: + envoy_internal_address: + server_listener_name: test_internal_listener_baz + filter_chains: {} + internal_listener: {} + )EOF"; + + ListenerHandle* listener_baz = expectListenerCreate(true, true); + EXPECT_CALL(listener_baz->target_, initialize()); + EXPECT_TRUE( + manager_->addOrUpdateListener(parseListenerFromV3Yaml(listener_baz_yaml), "version5", true)); + EXPECT_EQ(2UL, manager_->listeners().size()); + checkStats(__LINE__, 3, 2, 0, 1, 2, 0, 0); + EXPECT_CALL(*lds_api, versionInfo()).WillOnce(Return("version5")); + checkConfigDump(R"EOF( + version_info: version5 + dynamic_listeners: + - name: test_internal_listener + active_state: + version_info: version3 + listener: + "@type": type.googleapis.com/envoy.config.listener.v3.Listener + name: test_internal_listener + address: + envoy_internal_address: + server_listener_name: test_internal_listener_name + filter_chains: {} + last_updated: + seconds: 3003003003 + nanos: 3000000 + - name: test_internal_listener_bar + active_state: + version_info: version4 + listener: + "@type": type.googleapis.com/envoy.config.listener.v3.Listener + name: test_internal_listener_bar + address: + envoy_internal_address: + server_listener_name: test_internal_listener_bar + filter_chains: {} + internal_listener: {} + last_updated: + seconds: 4004004004 + nanos: 4000000 + - name: test_internal_listener_baz + warming_state: + version_info: version5 + listener: + "@type": type.googleapis.com/envoy.config.listener.v3.Listener + name: test_internal_listener_baz + address: + envoy_internal_address: + server_listener_name: test_internal_listener_baz + filter_chains: {} + internal_listener: {} + last_updated: + seconds: 5005005005 + nanos: 5000000 + )EOF"); + + // Update a duplicate baz that is currently warming. + EXPECT_FALSE(manager_->addOrUpdateListener(parseListenerFromV3Yaml(listener_baz_yaml), "", true)); + checkStats(__LINE__, 3, 2, 0, 1, 2, 0, 0); + + // Update baz while it is warming. + const std::string listener_baz_update1_yaml = R"EOF( + name: test_internal_listener_baz + address: + envoy_internal_address: + server_listener_name: test_internal_listener_baz + internal_listener: {} + filter_chains: + - filters: + - name: fake + typed_config: {} + )EOF"; + + ListenerHandle* listener_baz_update1 = expectListenerCreate(true, true); + EXPECT_CALL(*listener_baz, onDestroy()).WillOnce(Invoke([listener_baz]() -> void { + // Call the initialize callback during destruction like RDS will. + listener_baz->target_.ready(); + })); + EXPECT_CALL(listener_baz_update1->target_, initialize()); + EXPECT_TRUE( + manager_->addOrUpdateListener(parseListenerFromV3Yaml(listener_baz_update1_yaml), "", true)); + EXPECT_EQ(2UL, manager_->listeners().size()); + checkStats(__LINE__, 3, 3, 0, 1, 2, 0, 0); + + // Finish initialization for baz which should make it active. + EXPECT_CALL(*worker_, addListener(_, _, _)); + listener_baz_update1->target_.ready(); + EXPECT_EQ(3UL, manager_->listeners().size()); + worker_->callAddCompletion(); + checkStats(__LINE__, 3, 3, 0, 0, 3, 0, 0); + + EXPECT_CALL(*listener_foo_update2, onDestroy()); + EXPECT_CALL(*listener_bar, onDestroy()); + EXPECT_CALL(*listener_baz_update1, onDestroy()); +} + TEST_F(ListenerManagerImplTest, StopInplaceWarmingListener) { InSequence s;