diff --git a/source/server/BUILD b/source/server/BUILD index 911137aeea517..ac6804ffe92aa 100644 --- a/source/server/BUILD +++ b/source/server/BUILD @@ -61,8 +61,9 @@ envoy_cc_library( envoy_cc_library( name = "connection_handler_lib", deps = [ - ":active_udp_listener", - ":connection_handler_impl", + "//source/server:active_tcp_listener", + "//source/server:active_udp_listener", + "//source/server:connection_handler_impl", ], ) @@ -72,6 +73,37 @@ envoy_cc_library( hdrs = [ "connection_handler_impl.h", ], + deps = [ + "//include/envoy/common:time_interface", + "//include/envoy/event:deferred_deletable", + "//include/envoy/event:dispatcher_interface", + "//include/envoy/network:connection_handler_interface", + "//include/envoy/network:connection_interface", + "//include/envoy/network:filter_interface", + "//include/envoy/network:listen_socket_interface", + "//include/envoy/network:listener_interface", + "//include/envoy/server:active_udp_listener_config_interface", + "//include/envoy/server:listener_manager_interface", + "//include/envoy/stats:timespan_interface", + "//source/common/common:linked_object", + "//source/common/common:non_copyable", + "//source/common/event:deferred_task", + "//source/common/network:connection_lib", + "//source/common/stream_info:stream_info_lib", + "//source/server:active_tcp_listener_header", + ], +) + +envoy_cc_library( + # Currently both `active_tcp_listener` and `connection_handler_impl` need below headers + # while addressing https://github.com/envoyproxy/envoy/issues/15126 + # TODO(lambdai): Remove the definition of ActiveTcpListener from dependency of ConnectionHandlerImpl + # and delete this target. + name = "active_tcp_listener_header", + hdrs = [ + "active_tcp_listener.h", + "connection_handler_impl.h", + ], deps = [ "//include/envoy/common:time_interface", "//include/envoy/event:deferred_deletable", @@ -79,7 +111,6 @@ envoy_cc_library( "//include/envoy/event:timer_interface", "//include/envoy/network:connection_handler_interface", "//include/envoy/network:connection_interface", - "//include/envoy/network:exception_interface", "//include/envoy/network:filter_interface", "//include/envoy/network:listen_socket_interface", "//include/envoy/network:listener_interface", @@ -92,7 +123,37 @@ envoy_cc_library( "//source/common/network:connection_lib", "//source/common/stats:timespan_lib", "//source/common/stream_info:stream_info_lib", + ], +) + +envoy_cc_library( + name = "active_tcp_listener", + srcs = ["active_tcp_listener.cc"], + hdrs = [ + "active_tcp_listener.h", + ], + deps = [ + "//include/envoy/common:time_interface", + "//include/envoy/event:deferred_deletable", + "//include/envoy/event:dispatcher_interface", + "//include/envoy/event:timer_interface", + "//include/envoy/network:connection_handler_interface", + "//include/envoy/network:connection_interface", + "//include/envoy/network:filter_interface", + "//include/envoy/network:listen_socket_interface", + "//include/envoy/network:listener_interface", + "//include/envoy/server:active_udp_listener_config_interface", + "//include/envoy/server:listener_manager_interface", + "//include/envoy/stats:timespan_interface", + "//source/common/common:linked_object", + "//source/common/common:non_copyable", + "//source/common/common:safe_memcpy_lib", + "//source/common/event:deferred_task", + "//source/common/network:connection_lib", + "//source/common/stats:timespan_lib", + "//source/common/stream_info:stream_info_lib", "//source/extensions/transport_sockets:well_known_names", + "//source/server:active_tcp_listener_header", ], ) diff --git a/source/server/active_tcp_listener.cc b/source/server/active_tcp_listener.cc new file mode 100644 index 0000000000000..e5af541135ebf --- /dev/null +++ b/source/server/active_tcp_listener.cc @@ -0,0 +1,435 @@ +#include "server/active_tcp_listener.h" + +#include + +#include "envoy/event/dispatcher.h" +#include "envoy/event/timer.h" +#include "envoy/network/filter.h" +#include "envoy/stats/scope.h" + +#include "common/event/deferred_task.h" +#include "common/network/connection_impl.h" +#include "common/network/utility.h" +#include "common/stats/timespan_impl.h" + +#include "server/connection_handler_impl.h" + +#include "extensions/transport_sockets/well_known_names.h" + +namespace Envoy { +namespace Server { + +namespace { +void emitLogs(Network::ListenerConfig& config, StreamInfo::StreamInfo& stream_info) { + stream_info.onRequestComplete(); + for (const auto& access_log : config.accessLogs()) { + access_log->log(nullptr, nullptr, nullptr, stream_info); + } +} +} // namespace + +void ActiveTcpListener::removeConnection(ActiveTcpConnection& connection) { + ENVOY_CONN_LOG(debug, "adding to cleanup list", *connection.connection_); + ActiveConnections& active_connections = connection.active_connections_; + ActiveTcpConnectionPtr removed = connection.removeFromList(active_connections.connections_); + parent_.dispatcher().deferredDelete(std::move(removed)); + // Delete map entry only iff connections becomes empty. + if (active_connections.connections_.empty()) { + auto iter = connections_by_context_.find(&active_connections.filter_chain_); + ASSERT(iter != connections_by_context_.end()); + // To cover the lifetime of every single connection, Connections need to be deferred deleted + // because the previously contained connection is deferred deleted. + parent_.dispatcher().deferredDelete(std::move(iter->second)); + // The erase will break the iteration over the connections_by_context_ during the deletion. + if (!is_deleting_) { + connections_by_context_.erase(iter); + } + } +} + +ActiveTcpListener::ActiveTcpListener(Network::TcpConnectionHandler& parent, + Network::ListenerConfig& config) + : ActiveTcpListener( + parent, + parent.dispatcher().createListener(config.listenSocketFactory().getListenSocket(), *this, + config.bindToPort(), config.tcpBacklogSize()), + config) {} + +ActiveTcpListener::ActiveTcpListener(Network::TcpConnectionHandler& parent, + Network::ListenerPtr&& listener, + Network::ListenerConfig& config) + : ActiveListenerImplBase(parent, &config), parent_(parent), listener_(std::move(listener)), + listener_filters_timeout_(config.listenerFiltersTimeout()), + continue_on_listener_filters_timeout_(config.continueOnListenerFiltersTimeout()) { + config.connectionBalancer().registerHandler(*this); +} + +void ActiveTcpListener::updateListenerConfig(Network::ListenerConfig& config) { + ENVOY_LOG(trace, "replacing listener ", config_->listenerTag(), " by ", config.listenerTag()); + ASSERT(&config_->connectionBalancer() == &config.connectionBalancer()); + config_ = &config; +} + +ActiveTcpListener::~ActiveTcpListener() { + is_deleting_ = true; + config_->connectionBalancer().unregisterHandler(*this); + + // Purge sockets that have not progressed to connections. This should only happen when + // a listener filter stops iteration and never resumes. + while (!sockets_.empty()) { + ActiveTcpSocketPtr removed = sockets_.front()->removeFromList(sockets_); + parent_.dispatcher().deferredDelete(std::move(removed)); + } + + for (auto& chain_and_connections : connections_by_context_) { + ASSERT(chain_and_connections.second != nullptr); + auto& connections = chain_and_connections.second->connections_; + while (!connections.empty()) { + connections.front()->connection_->close(Network::ConnectionCloseType::NoFlush); + } + } + parent_.dispatcher().clearDeferredDeleteList(); + + // By the time a listener is destroyed, in the common case, there should be no connections. + // However, this is not always true if there is an in flight rebalanced connection that is + // being posted. This assert is extremely useful for debugging the common path so we will leave it + // for now. If it becomes a problem (developers hitting this assert when using debug builds) we + // can revisit. This case, if it happens, should be benign on production builds. This case is + // covered in ConnectionHandlerTest::RemoveListenerDuringRebalance. + ASSERT(num_listener_connections_ == 0); +} + +void ActiveTcpSocket::onTimeout() { + listener_.stats_.downstream_pre_cx_timeout_.inc(); + ASSERT(inserted()); + ENVOY_LOG(debug, "listener filter times out after {} ms", + listener_.listener_filters_timeout_.count()); + + if (listener_.continue_on_listener_filters_timeout_) { + ENVOY_LOG(debug, "fallback to default listener filter"); + newConnection(); + } + unlink(); +} + +void ActiveTcpSocket::startTimer() { + if (listener_.listener_filters_timeout_.count() > 0) { + timer_ = listener_.parent_.dispatcher().createTimer([this]() -> void { onTimeout(); }); + timer_->enableTimer(listener_.listener_filters_timeout_); + } +} + +void ActiveTcpSocket::unlink() { + ActiveTcpSocketPtr removed = removeFromList(listener_.sockets_); + if (removed->timer_ != nullptr) { + removed->timer_->disableTimer(); + } + // Emit logs if a connection is not established. + if (!connected_) { + emitLogs(*listener_.config_, *stream_info_); + } + listener_.parent_.dispatcher().deferredDelete(std::move(removed)); +} + +void ActiveTcpSocket::continueFilterChain(bool success) { + if (success) { + bool no_error = true; + if (iter_ == accept_filters_.end()) { + iter_ = accept_filters_.begin(); + } else { + iter_ = std::next(iter_); + } + + for (; iter_ != accept_filters_.end(); iter_++) { + Network::FilterStatus status = (*iter_)->onAccept(*this); + if (status == Network::FilterStatus::StopIteration) { + // The filter is responsible for calling us again at a later time to continue the filter + // chain from the next filter. + if (!socket().ioHandle().isOpen()) { + // break the loop but should not create new connection + no_error = false; + break; + } else { + // Blocking at the filter but no error + return; + } + } + } + // Successfully ran all the accept filters. + if (no_error) { + newConnection(); + } else { + // Signal the caller that no extra filter chain iteration is needed. + iter_ = accept_filters_.end(); + } + } + + // Filter execution concluded, unlink and delete this ActiveTcpSocket if it was linked. + if (inserted()) { + unlink(); + } +} + +void ActiveTcpSocket::setDynamicMetadata(const std::string& name, + const ProtobufWkt::Struct& value) { + stream_info_->setDynamicMetadata(name, value); +} + +void ActiveTcpSocket::newConnection() { + connected_ = true; + + // Check if the socket may need to be redirected to another listener. + Network::BalancedConnectionHandlerOptRef new_listener; + + if (hand_off_restored_destination_connections_ && + socket_->addressProvider().localAddressRestored()) { + // Find a listener associated with the original destination address. + new_listener = + listener_.parent_.getBalancedHandlerByAddress(*socket_->addressProvider().localAddress()); + } + if (new_listener.has_value()) { + // Hands off connections redirected by iptables to the listener associated with the + // original destination address. Pass 'hand_off_restored_destination_connections' as false to + // prevent further redirection as well as 'rebalanced' as true since the connection has + // already been balanced if applicable inside onAcceptWorker() when the connection was + // initially accepted. Note also that we must account for the number of connections properly + // across both listeners. + // TODO(mattklein123): See note in ~ActiveTcpSocket() related to making this accounting better. + listener_.decNumConnections(); + new_listener.value().get().incNumConnections(); + new_listener.value().get().onAcceptWorker(std::move(socket_), false, true); + } else { + // Set default transport protocol if none of the listener filters did it. + if (socket_->detectedTransportProtocol().empty()) { + socket_->setDetectedTransportProtocol( + Extensions::TransportSockets::TransportProtocolNames::get().RawBuffer); + } + // TODO(lambdai): add integration test + // TODO: Address issues in wider scope. See https://github.com/envoyproxy/envoy/issues/8925 + // Erase accept filter states because accept filters may not get the opportunity to clean up. + // Particularly the assigned events need to reset before assigning new events in the follow up. + accept_filters_.clear(); + // Create a new connection on this listener. + listener_.newConnection(std::move(socket_), std::move(stream_info_)); + } +} + +void ActiveTcpListener::onAccept(Network::ConnectionSocketPtr&& socket) { + if (listenerConnectionLimitReached()) { + ENVOY_LOG(trace, "closing connection: listener connection limit reached for {}", + config_->name()); + socket->close(); + stats_.downstream_cx_overflow_.inc(); + return; + } + + onAcceptWorker(std::move(socket), config_->handOffRestoredDestinationConnections(), false); +} + +void ActiveTcpListener::onReject(RejectCause cause) { + switch (cause) { + case RejectCause::GlobalCxLimit: + stats_.downstream_global_cx_overflow_.inc(); + break; + case RejectCause::OverloadAction: + stats_.downstream_cx_overload_reject_.inc(); + break; + } +} + +void ActiveTcpListener::onAcceptWorker(Network::ConnectionSocketPtr&& socket, + bool hand_off_restored_destination_connections, + bool rebalanced) { + if (!rebalanced) { + Network::BalancedConnectionHandler& target_handler = + config_->connectionBalancer().pickTargetHandler(*this); + if (&target_handler != this) { + target_handler.post(std::move(socket)); + return; + } + } + + auto active_socket = std::make_unique(*this, std::move(socket), + hand_off_restored_destination_connections); + + // Create and run the filters + config_->filterChainFactory().createListenerFilterChain(*active_socket); + active_socket->continueFilterChain(true); + + // Move active_socket to the sockets_ list if filter iteration needs to continue later. + // Otherwise we let active_socket be destructed when it goes out of scope. + if (active_socket->iter_ != active_socket->accept_filters_.end()) { + active_socket->startTimer(); + LinkedList::moveIntoListBack(std::move(active_socket), sockets_); + } else { + // If active_socket is about to be destructed, emit logs if a connection is not created. + if (!active_socket->connected_) { + if (active_socket->stream_info_ != nullptr) { + emitLogs(*config_, *active_socket->stream_info_); + } else { + // If the active_socket is not connected, this socket is not promoted to active connection. + // Thus the stream_info_ is owned by this active socket. + ENVOY_BUG(active_socket->stream_info_ != nullptr, + "the unconnected active socket must have stream info."); + } + } + } +} + +void ActiveTcpListener::pauseListening() { + if (listener_ != nullptr) { + listener_->disable(); + } +} + +void ActiveTcpListener::resumeListening() { + if (listener_ != nullptr) { + listener_->enable(); + } +} + +void ActiveTcpListener::newConnection(Network::ConnectionSocketPtr&& socket, + std::unique_ptr stream_info) { + + // Find matching filter chain. + const auto filter_chain = config_->filterChainManager().findFilterChain(*socket); + if (filter_chain == nullptr) { + ENVOY_LOG(debug, "closing connection: no matching filter chain found"); + stats_.no_filter_chain_match_.inc(); + stream_info->setResponseFlag(StreamInfo::ResponseFlag::NoRouteFound); + stream_info->setResponseCodeDetails(StreamInfo::ResponseCodeDetails::get().FilterChainNotFound); + emitLogs(*config_, *stream_info); + socket->close(); + return; + } + + stream_info->setFilterChainName(filter_chain->name()); + auto transport_socket = filter_chain->transportSocketFactory().createTransportSocket(nullptr); + stream_info->setDownstreamSslConnection(transport_socket->ssl()); + auto& active_connections = getOrCreateActiveConnections(*filter_chain); + auto server_conn_ptr = parent_.dispatcher().createServerConnection( + std::move(socket), std::move(transport_socket), *stream_info); + if (const auto timeout = filter_chain->transportSocketConnectTimeout(); + timeout != std::chrono::milliseconds::zero()) { + server_conn_ptr->setTransportSocketConnectTimeout(timeout); + } + ActiveTcpConnectionPtr active_connection( + new ActiveTcpConnection(active_connections, std::move(server_conn_ptr), + parent_.dispatcher().timeSource(), std::move(stream_info))); + active_connection->connection_->setBufferLimits(config_->perConnectionBufferLimitBytes()); + + const bool empty_filter_chain = !config_->filterChainFactory().createNetworkFilterChain( + *active_connection->connection_, filter_chain->networkFilterFactories()); + if (empty_filter_chain) { + ENVOY_CONN_LOG(debug, "closing connection: no filters", *active_connection->connection_); + active_connection->connection_->close(Network::ConnectionCloseType::NoFlush); + } + + // If the connection is already closed, we can just let this connection immediately die. + if (active_connection->connection_->state() != Network::Connection::State::Closed) { + ENVOY_CONN_LOG(debug, "new connection", *active_connection->connection_); + active_connection->connection_->addConnectionCallbacks(*active_connection); + LinkedList::moveIntoList(std::move(active_connection), active_connections.connections_); + } +} + +ActiveConnections& +ActiveTcpListener::getOrCreateActiveConnections(const Network::FilterChain& filter_chain) { + ActiveConnectionsPtr& connections = connections_by_context_[&filter_chain]; + if (connections == nullptr) { + connections = std::make_unique(*this, filter_chain); + } + return *connections; +} + +void ActiveTcpListener::deferredRemoveFilterChains( + const std::list& draining_filter_chains) { + // Need to recover the original deleting state. + const bool was_deleting = is_deleting_; + is_deleting_ = true; + for (const auto* filter_chain : draining_filter_chains) { + auto iter = connections_by_context_.find(filter_chain); + if (iter == connections_by_context_.end()) { + // It is possible when listener is stopping. + } else { + auto& connections = iter->second->connections_; + while (!connections.empty()) { + connections.front()->connection_->close(Network::ConnectionCloseType::NoFlush); + } + // Since is_deleting_ is on, we need to manually remove the map value and drive the iterator. + // Defer delete connection container to avoid race condition in destroying connection. + parent_.dispatcher().deferredDelete(std::move(iter->second)); + connections_by_context_.erase(iter); + } + } + is_deleting_ = was_deleting; +} + +void ActiveTcpListener::post(Network::ConnectionSocketPtr&& socket) { + // It is not possible to capture a unique_ptr because the post() API copies the lambda, so we must + // bundle the socket inside a shared_ptr that can be captured. + // TODO(mattklein123): It may be possible to change the post() API such that the lambda is only + // moved, but this is non-trivial and needs investigation. + RebalancedSocketSharedPtr socket_to_rebalance = std::make_shared(); + socket_to_rebalance->socket = std::move(socket); + + parent_.dispatcher().post([socket_to_rebalance, tag = config_->listenerTag(), &parent = parent_, + handoff = config_->handOffRestoredDestinationConnections()]() { + auto balanced_handler = parent.getBalancedHandlerByTag(tag); + if (balanced_handler.has_value()) { + balanced_handler->get().onAcceptWorker(std::move(socket_to_rebalance->socket), handoff, true); + return; + } + }); +} + +ActiveConnections::ActiveConnections(ActiveTcpListener& listener, + const Network::FilterChain& filter_chain) + : listener_(listener), filter_chain_(filter_chain) {} + +ActiveConnections::~ActiveConnections() { + // connections should be defer deleted already. + ASSERT(connections_.empty()); +} + +ActiveTcpConnection::ActiveTcpConnection(ActiveConnections& active_connections, + Network::ConnectionPtr&& new_connection, + TimeSource& time_source, + std::unique_ptr&& stream_info) + : stream_info_(std::move(stream_info)), active_connections_(active_connections), + connection_(std::move(new_connection)), + conn_length_(new Stats::HistogramCompletableTimespanImpl( + active_connections_.listener_.stats_.downstream_cx_length_ms_, time_source)) { + // We just universally set no delay on connections. Theoretically we might at some point want + // to make this configurable. + connection_->noDelay(true); + auto& listener = active_connections_.listener_; + listener.stats_.downstream_cx_total_.inc(); + listener.stats_.downstream_cx_active_.inc(); + listener.per_worker_stats_.downstream_cx_total_.inc(); + listener.per_worker_stats_.downstream_cx_active_.inc(); + stream_info_->setConnectionID(connection_->id()); + + // Active connections on the handler (not listener). The per listener connections have already + // been incremented at this point either via the connection balancer or in the socket accept + // path if there is no configured balancer. + listener.parent_.incNumConnections(); +} + +ActiveTcpConnection::~ActiveTcpConnection() { + emitLogs(*active_connections_.listener_.config_, *stream_info_); + auto& listener = active_connections_.listener_; + listener.stats_.downstream_cx_active_.dec(); + listener.stats_.downstream_cx_destroy_.inc(); + listener.per_worker_stats_.downstream_cx_active_.dec(); + conn_length_->complete(); + + // Active listener connections (not handler). + listener.decNumConnections(); + + // Active handler connections (not listener). + listener.parent_.decNumConnections(); +} + +} // namespace Server +} // namespace Envoy \ No newline at end of file diff --git a/source/server/active_tcp_listener.h b/source/server/active_tcp_listener.h new file mode 100644 index 0000000000000..ebac7edbc6aca --- /dev/null +++ b/source/server/active_tcp_listener.h @@ -0,0 +1,259 @@ +#pragma once + +#include "envoy/stats/timespan.h" + +#include "common/common/linked_object.h" +#include "common/stream_info/stream_info_impl.h" + +#include "server/connection_handler_impl.h" + +namespace Envoy { +namespace Server { + +struct ActiveTcpConnection; +using ActiveTcpConnectionPtr = std::unique_ptr; +struct ActiveTcpSocket; +using ActiveTcpSocketPtr = std::unique_ptr; +class ActiveConnections; +using ActiveConnectionsPtr = std::unique_ptr; + +namespace { +// Structure used to allow a unique_ptr to be captured in a posted lambda. See below. +struct RebalancedSocket { + Network::ConnectionSocketPtr socket; +}; +using RebalancedSocketSharedPtr = std::shared_ptr; +} // namespace + +/** + * Wrapper for an active tcp listener owned by this handler. + */ +class ActiveTcpListener : public Network::TcpListenerCallbacks, + public ConnectionHandlerImpl::ActiveListenerImplBase, + public Network::BalancedConnectionHandler, + Logger::Loggable { +public: + ActiveTcpListener(Network::TcpConnectionHandler& parent, Network::ListenerConfig& config); + ActiveTcpListener(Network::TcpConnectionHandler& parent, Network::ListenerPtr&& listener, + Network::ListenerConfig& config); + ~ActiveTcpListener() override; + bool listenerConnectionLimitReached() const { + // TODO(tonya11en): Delegate enforcement of per-listener connection limits to overload + // manager. + return !config_->openConnections().canCreate(); + } + + void decNumConnections() { + ASSERT(num_listener_connections_ > 0); + --num_listener_connections_; + config_->openConnections().dec(); + } + + // Network::TcpListenerCallbacks + void onAccept(Network::ConnectionSocketPtr&& socket) override; + void onReject(RejectCause) override; + + // ActiveListenerImplBase + Network::Listener* listener() override { return listener_.get(); } + void pauseListening() override; + void resumeListening() override; + void shutdownListener() override { listener_.reset(); } + + // Network::BalancedConnectionHandler + uint64_t numConnections() const override { return num_listener_connections_; } + void incNumConnections() override { + ++num_listener_connections_; + config_->openConnections().inc(); + } + void post(Network::ConnectionSocketPtr&& socket) override; + void onAcceptWorker(Network::ConnectionSocketPtr&& socket, + bool hand_off_restored_destination_connections, bool rebalanced) override; + + /** + * Remove and destroy an active connection. + * @param connection supplies the connection to remove. + */ + void removeConnection(ActiveTcpConnection& connection); + + /** + * Create a new connection from a socket accepted by the listener. + */ + void newConnection(Network::ConnectionSocketPtr&& socket, + std::unique_ptr stream_info); + + /** + * Return the active connections container attached with the given filter chain. + */ + ActiveConnections& getOrCreateActiveConnections(const Network::FilterChain& filter_chain); + + /** + * Schedule to remove and destroy the active connections which are not tracked by listener + * config. Caution: The connection are not destroyed yet when function returns. + */ + void + deferredRemoveFilterChains(const std::list& draining_filter_chains); + + /** + * Update the listener config. The follow up connections will see the new config. The existing + * connections are not impacted. + */ + void updateListenerConfig(Network::ListenerConfig& config); + + Network::TcpConnectionHandler& parent_; + Network::ListenerPtr listener_; + const std::chrono::milliseconds listener_filters_timeout_; + const bool continue_on_listener_filters_timeout_; + std::list sockets_; + absl::node_hash_map connections_by_context_; + + // The number of connections currently active on this listener. This is typically used for + // connection balancing across per-handler listeners. + std::atomic num_listener_connections_{}; + bool is_deleting_{false}; +}; + +/** + * Wrapper for a group of active connections which are attached to the same filter chain context. + */ +class ActiveConnections : public Event::DeferredDeletable { +public: + ActiveConnections(ActiveTcpListener& listener, const Network::FilterChain& filter_chain); + ~ActiveConnections() override; + + // listener filter chain pair is the owner of the connections + ActiveTcpListener& listener_; + const Network::FilterChain& filter_chain_; + // Owned connections + std::list connections_; +}; + +/** + * Wrapper for an active TCP connection owned by this handler. + */ +struct ActiveTcpConnection : LinkedObject, + public Event::DeferredDeletable, + public Network::ConnectionCallbacks { + ActiveTcpConnection(ActiveConnections& active_connections, + Network::ConnectionPtr&& new_connection, TimeSource& time_system, + std::unique_ptr&& stream_info); + ~ActiveTcpConnection() override; + + // Network::ConnectionCallbacks + void onEvent(Network::ConnectionEvent event) override { + // Any event leads to destruction of the connection. + if (event == Network::ConnectionEvent::LocalClose || + event == Network::ConnectionEvent::RemoteClose) { + active_connections_.listener_.removeConnection(*this); + } + } + void onAboveWriteBufferHighWatermark() override {} + void onBelowWriteBufferLowWatermark() override {} + + std::unique_ptr stream_info_; + ActiveConnections& active_connections_; + Network::ConnectionPtr connection_; + Stats::TimespanPtr conn_length_; +}; + +/** + * Wrapper for an active accepted TCP socket owned by this handler. + */ +struct ActiveTcpSocket : public Network::ListenerFilterManager, + public Network::ListenerFilterCallbacks, + LinkedObject, + public Event::DeferredDeletable, + Logger::Loggable { + ActiveTcpSocket(ActiveTcpListener& listener, Network::ConnectionSocketPtr&& socket, + bool hand_off_restored_destination_connections) + : listener_(listener), socket_(std::move(socket)), + hand_off_restored_destination_connections_(hand_off_restored_destination_connections), + iter_(accept_filters_.end()), + stream_info_(std::make_unique( + listener_.parent_.dispatcher().timeSource(), socket_->addressProviderSharedPtr(), + StreamInfo::FilterState::LifeSpan::Connection)) { + listener_.stats_.downstream_pre_cx_active_.inc(); + } + ~ActiveTcpSocket() override { + accept_filters_.clear(); + listener_.stats_.downstream_pre_cx_active_.dec(); + + // If the underlying socket is no longer attached, it means that it has been transferred to + // an active connection. In this case, the active connection will decrement the number + // of listener connections. + // TODO(mattklein123): In general the way we account for the number of listener connections + // is incredibly fragile. Revisit this by potentially merging ActiveTcpSocket and + // ActiveTcpConnection, having a shared object which does accounting (but would require + // another allocation, etc.). + if (socket_ != nullptr) { + listener_.decNumConnections(); + } + } + + void onTimeout(); + void startTimer(); + void unlink(); + void newConnection(); + + class GenericListenerFilter : public Network::ListenerFilter { + public: + GenericListenerFilter(const Network::ListenerFilterMatcherSharedPtr& matcher, + Network::ListenerFilterPtr listener_filter) + : listener_filter_(std::move(listener_filter)), matcher_(std::move(matcher)) {} + Network::FilterStatus onAccept(ListenerFilterCallbacks& cb) override { + if (isDisabled(cb)) { + return Network::FilterStatus::Continue; + } + return listener_filter_->onAccept(cb); + } + /** + * Check if this filter filter should be disabled on the incoming socket. + * @param cb the callbacks the filter instance can use to communicate with the filter chain. + **/ + bool isDisabled(ListenerFilterCallbacks& cb) { + if (matcher_ == nullptr) { + return false; + } else { + return matcher_->matches(cb); + } + } + + private: + const Network::ListenerFilterPtr listener_filter_; + const Network::ListenerFilterMatcherSharedPtr matcher_; + }; + using ListenerFilterWrapperPtr = std::unique_ptr; + + // Network::ListenerFilterManager + void addAcceptFilter(const Network::ListenerFilterMatcherSharedPtr& listener_filter_matcher, + Network::ListenerFilterPtr&& filter) override { + accept_filters_.emplace_back( + std::make_unique(listener_filter_matcher, std::move(filter))); + } + + // Network::ListenerFilterCallbacks + Network::ConnectionSocket& socket() override { return *socket_.get(); } + Event::Dispatcher& dispatcher() override { return listener_.parent_.dispatcher(); } + void continueFilterChain(bool success) override; + void setDynamicMetadata(const std::string& name, const ProtobufWkt::Struct& value) override; + envoy::config::core::v3::Metadata& dynamicMetadata() override { + return stream_info_->dynamicMetadata(); + }; + const envoy::config::core::v3::Metadata& dynamicMetadata() const override { + return stream_info_->dynamicMetadata(); + }; + + StreamInfo::FilterState& filterState() override { return *stream_info_->filterState().get(); } + + ActiveTcpListener& listener_; + Network::ConnectionSocketPtr socket_; + const bool hand_off_restored_destination_connections_; + std::list accept_filters_; + std::list::iterator iter_; + Event::TimerPtr timer_; + std::unique_ptr stream_info_; + bool connected_{false}; +}; +using ActiveTcpListenerOptRef = absl::optional>; + +} // namespace Server +} // namespace Envoy \ No newline at end of file diff --git a/source/server/connection_handler_impl.cc b/source/server/connection_handler_impl.cc index c1318fb554ebd..37983bc08331c 100644 --- a/source/server/connection_handler_impl.cc +++ b/source/server/connection_handler_impl.cc @@ -3,30 +3,16 @@ #include #include "envoy/event/dispatcher.h" -#include "envoy/event/timer.h" -#include "envoy/network/exception.h" #include "envoy/network/filter.h" -#include "envoy/stats/scope.h" #include "common/event/deferred_task.h" -#include "common/network/connection_impl.h" #include "common/network/utility.h" -#include "common/stats/timespan_impl.h" -#include "extensions/transport_sockets/well_known_names.h" +#include "server/active_tcp_listener.h" namespace Envoy { namespace Server { -namespace { -void emitLogs(Network::ListenerConfig& config, StreamInfo::StreamInfo& stream_info) { - stream_info.onRequestComplete(); - for (const auto& access_log : config.accessLogs()) { - access_log->log(nullptr, nullptr, nullptr, stream_info); - } -} -} // namespace - ConnectionHandlerImpl::ConnectionHandlerImpl(Event::Dispatcher& dispatcher, absl::optional worker_index) : worker_index_(worker_index), dispatcher_(dispatcher), @@ -52,6 +38,7 @@ void ConnectionHandlerImpl::addListener(absl::optional overridden_list } NOT_REACHED_GCOVR_EXCL_LINE; } + // TODO(lambdai): Remove the dependency of ActiveTcpListener. auto tcp_listener = std::make_unique(*this, config); details.typed_listener_ = *tcp_listener; details.listener_ = std::move(tcp_listener); @@ -145,8 +132,7 @@ void ConnectionHandlerImpl::setListenerRejectFraction(UnitFloat reject_fraction) } } -ConnectionHandlerImpl::ActiveTcpListenerOptRef -ConnectionHandlerImpl::ActiveListenerDetails::tcpListener() { +ActiveTcpListenerOptRef ConnectionHandlerImpl::ActiveListenerDetails::tcpListener() { auto* val = absl::get_if>(&typed_listener_); return (val != nullptr) ? absl::make_optional(*val) : absl::nullopt; } @@ -210,8 +196,7 @@ ConnectionHandlerImpl::getBalancedHandlerByAddress(const Network::Address::Insta std::find_if(listeners_.begin(), listeners_.end(), [&address](const std::pair& p) { - return absl::holds_alternative< - std::reference_wrapper>( + return absl::holds_alternative>( p.second.typed_listener_) && p.second.listener_->listener() != nullptr && p.first->type() == Network::Address::Type::Ip && @@ -220,9 +205,8 @@ ConnectionHandlerImpl::getBalancedHandlerByAddress(const Network::Address::Insta }); return (listener_it != listeners_.end()) ? Network::BalancedConnectionHandlerOptRef( - ConnectionHandlerImpl::ActiveTcpListenerOptRef( - absl::get>( - listener_it->second.typed_listener_)) + ActiveTcpListenerOptRef(absl::get>( + listener_it->second.typed_listener_)) .value() .get()) : absl::nullopt; @@ -238,418 +222,5 @@ ConnectionHandlerImpl::ActiveListenerImplBase::ActiveListenerImplBase( POOL_GAUGE_PREFIX(config->listenerScope(), parent.statPrefix()))}), config_(config) {} -void ConnectionHandlerImpl::ActiveTcpListener::removeConnection( - ConnectionHandlerImpl::ActiveTcpConnection& connection) { - ENVOY_CONN_LOG(debug, "adding to cleanup list", *connection.connection_); - ActiveConnections& active_connections = connection.active_connections_; - ActiveTcpConnectionPtr removed = connection.removeFromList(active_connections.connections_); - parent_.dispatcher().deferredDelete(std::move(removed)); - // Delete map entry only iff connections becomes empty. - if (active_connections.connections_.empty()) { - auto iter = connections_by_context_.find(&active_connections.filter_chain_); - ASSERT(iter != connections_by_context_.end()); - // To cover the lifetime of every single connection, Connections need to be deferred deleted - // because the previously contained connection is deferred deleted. - parent_.dispatcher().deferredDelete(std::move(iter->second)); - // The erase will break the iteration over the connections_by_context_ during the deletion. - if (!is_deleting_) { - connections_by_context_.erase(iter); - } - } -} - -ConnectionHandlerImpl::ActiveTcpListener::ActiveTcpListener(Network::TcpConnectionHandler& parent, - Network::ListenerConfig& config) - : ActiveTcpListener( - parent, - parent.dispatcher().createListener(config.listenSocketFactory().getListenSocket(), *this, - config.bindToPort(), config.tcpBacklogSize()), - config) {} - -ConnectionHandlerImpl::ActiveTcpListener::ActiveTcpListener(Network::TcpConnectionHandler& parent, - Network::ListenerPtr&& listener, - Network::ListenerConfig& config) - : ActiveListenerImplBase(parent, &config), parent_(parent), listener_(std::move(listener)), - listener_filters_timeout_(config.listenerFiltersTimeout()), - continue_on_listener_filters_timeout_(config.continueOnListenerFiltersTimeout()) { - config.connectionBalancer().registerHandler(*this); -} - -void ConnectionHandlerImpl::ActiveTcpListener::updateListenerConfig( - Network::ListenerConfig& config) { - ENVOY_LOG(trace, "replacing listener ", config_->listenerTag(), " by ", config.listenerTag()); - ASSERT(&config_->connectionBalancer() == &config.connectionBalancer()); - config_ = &config; -} - -ConnectionHandlerImpl::ActiveTcpListener::~ActiveTcpListener() { - is_deleting_ = true; - config_->connectionBalancer().unregisterHandler(*this); - - // Purge sockets that have not progressed to connections. This should only happen when - // a listener filter stops iteration and never resumes. - while (!sockets_.empty()) { - ActiveTcpSocketPtr removed = sockets_.front()->removeFromList(sockets_); - parent_.dispatcher().deferredDelete(std::move(removed)); - } - - for (auto& chain_and_connections : connections_by_context_) { - ASSERT(chain_and_connections.second != nullptr); - auto& connections = chain_and_connections.second->connections_; - while (!connections.empty()) { - connections.front()->connection_->close(Network::ConnectionCloseType::NoFlush); - } - } - parent_.dispatcher().clearDeferredDeleteList(); - - // By the time a listener is destroyed, in the common case, there should be no connections. - // However, this is not always true if there is an in flight rebalanced connection that is - // being posted. This assert is extremely useful for debugging the common path so we will leave it - // for now. If it becomes a problem (developers hitting this assert when using debug builds) we - // can revisit. This case, if it happens, should be benign on production builds. This case is - // covered in ConnectionHandlerTest::RemoveListenerDuringRebalance. - ASSERT(num_listener_connections_ == 0); -} - -void ConnectionHandlerImpl::ActiveTcpSocket::onTimeout() { - listener_.stats_.downstream_pre_cx_timeout_.inc(); - ASSERT(inserted()); - ENVOY_LOG(debug, "listener filter times out after {} ms", - listener_.listener_filters_timeout_.count()); - - if (listener_.continue_on_listener_filters_timeout_) { - ENVOY_LOG(debug, "fallback to default listener filter"); - newConnection(); - } - unlink(); -} - -void ConnectionHandlerImpl::ActiveTcpSocket::startTimer() { - if (listener_.listener_filters_timeout_.count() > 0) { - timer_ = listener_.parent_.dispatcher().createTimer([this]() -> void { onTimeout(); }); - timer_->enableTimer(listener_.listener_filters_timeout_); - } -} - -void ConnectionHandlerImpl::ActiveTcpSocket::unlink() { - ConnectionHandlerImpl::ActiveTcpSocketPtr removed = removeFromList(listener_.sockets_); - if (removed->timer_ != nullptr) { - removed->timer_->disableTimer(); - } - // Emit logs if a connection is not established. - if (!connected_) { - emitLogs(*listener_.config_, *stream_info_); - } - listener_.parent_.dispatcher().deferredDelete(std::move(removed)); -} - -void ConnectionHandlerImpl::ActiveTcpSocket::continueFilterChain(bool success) { - if (success) { - bool no_error = true; - if (iter_ == accept_filters_.end()) { - iter_ = accept_filters_.begin(); - } else { - iter_ = std::next(iter_); - } - - for (; iter_ != accept_filters_.end(); iter_++) { - Network::FilterStatus status = (*iter_)->onAccept(*this); - if (status == Network::FilterStatus::StopIteration) { - // The filter is responsible for calling us again at a later time to continue the filter - // chain from the next filter. - if (!socket().ioHandle().isOpen()) { - // break the loop but should not create new connection - no_error = false; - break; - } else { - // Blocking at the filter but no error - return; - } - } - } - // Successfully ran all the accept filters. - if (no_error) { - newConnection(); - } else { - // Signal the caller that no extra filter chain iteration is needed. - iter_ = accept_filters_.end(); - } - } - - // Filter execution concluded, unlink and delete this ActiveTcpSocket if it was linked. - if (inserted()) { - unlink(); - } -} - -void ConnectionHandlerImpl::ActiveTcpSocket::setDynamicMetadata(const std::string& name, - const ProtobufWkt::Struct& value) { - stream_info_->setDynamicMetadata(name, value); -} - -void ConnectionHandlerImpl::ActiveTcpSocket::newConnection() { - connected_ = true; - - // Check if the socket may need to be redirected to another listener. - Network::BalancedConnectionHandlerOptRef new_listener; - - if (hand_off_restored_destination_connections_ && - socket_->addressProvider().localAddressRestored()) { - // Find a listener associated with the original destination address. - new_listener = - listener_.parent_.getBalancedHandlerByAddress(*socket_->addressProvider().localAddress()); - } - if (new_listener.has_value()) { - // Hands off connections redirected by iptables to the listener associated with the - // original destination address. Pass 'hand_off_restored_destination_connections' as false to - // prevent further redirection as well as 'rebalanced' as true since the connection has - // already been balanced if applicable inside onAcceptWorker() when the connection was - // initially accepted. Note also that we must account for the number of connections properly - // across both listeners. - // TODO(mattklein123): See note in ~ActiveTcpSocket() related to making this accounting better. - listener_.decNumConnections(); - new_listener.value().get().incNumConnections(); - new_listener.value().get().onAcceptWorker(std::move(socket_), false, true); - } else { - // Set default transport protocol if none of the listener filters did it. - if (socket_->detectedTransportProtocol().empty()) { - socket_->setDetectedTransportProtocol( - Extensions::TransportSockets::TransportProtocolNames::get().RawBuffer); - } - // TODO(lambdai): add integration test - // TODO: Address issues in wider scope. See https://github.com/envoyproxy/envoy/issues/8925 - // Erase accept filter states because accept filters may not get the opportunity to clean up. - // Particularly the assigned events need to reset before assigning new events in the follow up. - accept_filters_.clear(); - // Create a new connection on this listener. - listener_.newConnection(std::move(socket_), std::move(stream_info_)); - } -} - -void ConnectionHandlerImpl::ActiveTcpListener::onAccept(Network::ConnectionSocketPtr&& socket) { - if (listenerConnectionLimitReached()) { - ENVOY_LOG(trace, "closing connection: listener connection limit reached for {}", - config_->name()); - socket->close(); - stats_.downstream_cx_overflow_.inc(); - return; - } - - onAcceptWorker(std::move(socket), config_->handOffRestoredDestinationConnections(), false); -} - -void ConnectionHandlerImpl::ActiveTcpListener::onReject(RejectCause cause) { - switch (cause) { - case RejectCause::GlobalCxLimit: - stats_.downstream_global_cx_overflow_.inc(); - break; - case RejectCause::OverloadAction: - stats_.downstream_cx_overload_reject_.inc(); - break; - } -} - -void ConnectionHandlerImpl::ActiveTcpListener::onAcceptWorker( - Network::ConnectionSocketPtr&& socket, bool hand_off_restored_destination_connections, - bool rebalanced) { - if (!rebalanced) { - Network::BalancedConnectionHandler& target_handler = - config_->connectionBalancer().pickTargetHandler(*this); - if (&target_handler != this) { - target_handler.post(std::move(socket)); - return; - } - } - - auto active_socket = std::make_unique(*this, std::move(socket), - hand_off_restored_destination_connections); - - // Create and run the filters - config_->filterChainFactory().createListenerFilterChain(*active_socket); - active_socket->continueFilterChain(true); - - // Move active_socket to the sockets_ list if filter iteration needs to continue later. - // Otherwise we let active_socket be destructed when it goes out of scope. - if (active_socket->iter_ != active_socket->accept_filters_.end()) { - active_socket->startTimer(); - LinkedList::moveIntoListBack(std::move(active_socket), sockets_); - } else { - // If active_socket is about to be destructed, emit logs if a connection is not created. - if (!active_socket->connected_) { - if (active_socket->stream_info_ != nullptr) { - emitLogs(*config_, *active_socket->stream_info_); - } else { - // If the active_socket is not connected, this socket is not promoted to active connection. - // Thus the stream_info_ is owned by this active socket. - ENVOY_BUG(active_socket->stream_info_ != nullptr, - "the unconnected active socket must have stream info."); - } - } - } -} - -void ConnectionHandlerImpl::ActiveTcpListener::pauseListening() { - if (listener_ != nullptr) { - listener_->disable(); - } -} - -void ConnectionHandlerImpl::ActiveTcpListener::resumeListening() { - if (listener_ != nullptr) { - listener_->enable(); - } -} - -void ConnectionHandlerImpl::ActiveTcpListener::newConnection( - Network::ConnectionSocketPtr&& socket, std::unique_ptr stream_info) { - - // Find matching filter chain. - const auto filter_chain = config_->filterChainManager().findFilterChain(*socket); - if (filter_chain == nullptr) { - ENVOY_LOG(debug, "closing connection: no matching filter chain found"); - stats_.no_filter_chain_match_.inc(); - stream_info->setResponseFlag(StreamInfo::ResponseFlag::NoRouteFound); - stream_info->setResponseCodeDetails(StreamInfo::ResponseCodeDetails::get().FilterChainNotFound); - emitLogs(*config_, *stream_info); - socket->close(); - return; - } - - stream_info->setFilterChainName(filter_chain->name()); - auto transport_socket = filter_chain->transportSocketFactory().createTransportSocket(nullptr); - stream_info->setDownstreamSslConnection(transport_socket->ssl()); - auto& active_connections = getOrCreateActiveConnections(*filter_chain); - auto server_conn_ptr = parent_.dispatcher().createServerConnection( - std::move(socket), std::move(transport_socket), *stream_info); - if (const auto timeout = filter_chain->transportSocketConnectTimeout(); - timeout != std::chrono::milliseconds::zero()) { - server_conn_ptr->setTransportSocketConnectTimeout(timeout); - } - ActiveTcpConnectionPtr active_connection( - new ActiveTcpConnection(active_connections, std::move(server_conn_ptr), - parent_.dispatcher().timeSource(), std::move(stream_info))); - active_connection->connection_->setBufferLimits(config_->perConnectionBufferLimitBytes()); - - const bool empty_filter_chain = !config_->filterChainFactory().createNetworkFilterChain( - *active_connection->connection_, filter_chain->networkFilterFactories()); - if (empty_filter_chain) { - ENVOY_CONN_LOG(debug, "closing connection: no filters", *active_connection->connection_); - active_connection->connection_->close(Network::ConnectionCloseType::NoFlush); - } - - // If the connection is already closed, we can just let this connection immediately die. - if (active_connection->connection_->state() != Network::Connection::State::Closed) { - ENVOY_CONN_LOG(debug, "new connection", *active_connection->connection_); - active_connection->connection_->addConnectionCallbacks(*active_connection); - LinkedList::moveIntoList(std::move(active_connection), active_connections.connections_); - } -} - -ConnectionHandlerImpl::ActiveConnections& -ConnectionHandlerImpl::ActiveTcpListener::getOrCreateActiveConnections( - const Network::FilterChain& filter_chain) { - ActiveConnectionsPtr& connections = connections_by_context_[&filter_chain]; - if (connections == nullptr) { - connections = std::make_unique(*this, filter_chain); - } - return *connections; -} - -void ConnectionHandlerImpl::ActiveTcpListener::deferredRemoveFilterChains( - const std::list& draining_filter_chains) { - // Need to recover the original deleting state. - const bool was_deleting = is_deleting_; - is_deleting_ = true; - for (const auto* filter_chain : draining_filter_chains) { - auto iter = connections_by_context_.find(filter_chain); - if (iter == connections_by_context_.end()) { - // It is possible when listener is stopping. - } else { - auto& connections = iter->second->connections_; - while (!connections.empty()) { - connections.front()->connection_->close(Network::ConnectionCloseType::NoFlush); - } - // Since is_deleting_ is on, we need to manually remove the map value and drive the iterator. - // Defer delete connection container to avoid race condition in destroying connection. - parent_.dispatcher().deferredDelete(std::move(iter->second)); - connections_by_context_.erase(iter); - } - } - is_deleting_ = was_deleting; -} - -namespace { -// Structure used to allow a unique_ptr to be captured in a posted lambda. See below. -struct RebalancedSocket { - Network::ConnectionSocketPtr socket; -}; -using RebalancedSocketSharedPtr = std::shared_ptr; -} // namespace - -void ConnectionHandlerImpl::ActiveTcpListener::post(Network::ConnectionSocketPtr&& socket) { - // It is not possible to capture a unique_ptr because the post() API copies the lambda, so we must - // bundle the socket inside a shared_ptr that can be captured. - // TODO(mattklein123): It may be possible to change the post() API such that the lambda is only - // moved, but this is non-trivial and needs investigation. - RebalancedSocketSharedPtr socket_to_rebalance = std::make_shared(); - socket_to_rebalance->socket = std::move(socket); - - parent_.dispatcher().post([socket_to_rebalance, tag = config_->listenerTag(), &parent = parent_, - handoff = config_->handOffRestoredDestinationConnections()]() { - auto balanced_handler = parent.getBalancedHandlerByTag(tag); - if (balanced_handler.has_value()) { - balanced_handler->get().onAcceptWorker(std::move(socket_to_rebalance->socket), handoff, true); - return; - } - }); -} - -ConnectionHandlerImpl::ActiveConnections::ActiveConnections( - ActiveTcpListener& listener, const Network::FilterChain& filter_chain) - : listener_(listener), filter_chain_(filter_chain) {} - -ConnectionHandlerImpl::ActiveConnections::~ActiveConnections() { - // connections should be defer deleted already. - ASSERT(connections_.empty()); -} - -ConnectionHandlerImpl::ActiveTcpConnection::ActiveTcpConnection( - ActiveConnections& active_connections, Network::ConnectionPtr&& new_connection, - TimeSource& time_source, std::unique_ptr&& stream_info) - : stream_info_(std::move(stream_info)), active_connections_(active_connections), - connection_(std::move(new_connection)), - conn_length_(new Stats::HistogramCompletableTimespanImpl( - active_connections_.listener_.stats_.downstream_cx_length_ms_, time_source)) { - // We just universally set no delay on connections. Theoretically we might at some point want - // to make this configurable. - connection_->noDelay(true); - auto& listener = active_connections_.listener_; - listener.stats_.downstream_cx_total_.inc(); - listener.stats_.downstream_cx_active_.inc(); - listener.per_worker_stats_.downstream_cx_total_.inc(); - listener.per_worker_stats_.downstream_cx_active_.inc(); - stream_info_->setConnectionID(connection_->id()); - - // Active connections on the handler (not listener). The per listener connections have already - // been incremented at this point either via the connection balancer or in the socket accept - // path if there is no configured balancer. - listener.parent_.incNumConnections(); -} - -ConnectionHandlerImpl::ActiveTcpConnection::~ActiveTcpConnection() { - emitLogs(*active_connections_.listener_.config_, *stream_info_); - auto& listener = active_connections_.listener_; - listener.stats_.downstream_cx_active_.dec(); - listener.stats_.downstream_cx_destroy_.inc(); - listener.per_worker_stats_.downstream_cx_active_.dec(); - conn_length_->complete(); - - // Active listener connections (not handler). - listener.decNumConnections(); - - // Active handler connections (not listener). - listener.parent_.decNumConnections(); -} - } // namespace Server } // namespace Envoy diff --git a/source/server/connection_handler_impl.h b/source/server/connection_handler_impl.h index e0352039b9a8a..ecfaf9c96b45a 100644 --- a/source/server/connection_handler_impl.h +++ b/source/server/connection_handler_impl.h @@ -5,9 +5,6 @@ #include #include -#include "envoy/common/time.h" -#include "envoy/event/deferred_deletable.h" -#include "envoy/network/connection.h" #include "envoy/network/connection_handler.h" #include "envoy/network/filter.h" #include "envoy/network/listen_socket.h" @@ -15,11 +12,8 @@ #include "envoy/server/active_udp_listener_config.h" #include "envoy/server/listener_manager.h" #include "envoy/stats/scope.h" -#include "envoy/stats/timespan.h" -#include "common/common/linked_object.h" #include "common/common/non_copyable.h" -#include "common/stream_info/stream_info_impl.h" #include "spdlog/spdlog.h" @@ -70,6 +64,8 @@ class ConnectionHandlerImpl : public Network::TcpConnectionHandler, public: using UdpListenerCallbacksOptRef = absl::optional>; + using ActiveTcpListenerOptRef = absl::optional>; + ConnectionHandlerImpl(Event::Dispatcher& dispatcher, absl::optional worker_index); // Network::ConnectionHandler @@ -113,249 +109,14 @@ class ConnectionHandlerImpl : public Network::TcpConnectionHandler, Network::ListenerConfig* config_{}; }; - struct ActiveTcpConnection; - using ActiveTcpConnectionPtr = std::unique_ptr; - struct ActiveTcpSocket; - using ActiveTcpSocketPtr = std::unique_ptr; - class ActiveConnections; - using ActiveConnectionsPtr = std::unique_ptr; - - /** - * Wrapper for an active tcp listener owned by this handler. - */ - class ActiveTcpListener : public Network::TcpListenerCallbacks, - public ActiveListenerImplBase, - public Network::BalancedConnectionHandler, - Logger::Loggable { - public: - ActiveTcpListener(Network::TcpConnectionHandler& parent, Network::ListenerConfig& config); - ActiveTcpListener(Network::TcpConnectionHandler& parent, Network::ListenerPtr&& listener, - Network::ListenerConfig& config); - ~ActiveTcpListener() override; - bool listenerConnectionLimitReached() const { - // TODO(tonya11en): Delegate enforcement of per-listener connection limits to overload - // manager. - return !config_->openConnections().canCreate(); - } - - void decNumConnections() { - ASSERT(num_listener_connections_ > 0); - --num_listener_connections_; - config_->openConnections().dec(); - } - - // Network::TcpListenerCallbacks - void onAccept(Network::ConnectionSocketPtr&& socket) override; - void onReject(RejectCause) override; - - // ActiveListenerImplBase - Network::Listener* listener() override { return listener_.get(); } - void pauseListening() override; - void resumeListening() override; - void shutdownListener() override { listener_.reset(); } - - // Network::BalancedConnectionHandler - uint64_t numConnections() const override { return num_listener_connections_; } - void incNumConnections() override { - ++num_listener_connections_; - config_->openConnections().inc(); - } - void post(Network::ConnectionSocketPtr&& socket) override; - void onAcceptWorker(Network::ConnectionSocketPtr&& socket, - bool hand_off_restored_destination_connections, bool rebalanced) override; - - /** - * Remove and destroy an active connection. - * @param connection supplies the connection to remove. - */ - void removeConnection(ActiveTcpConnection& connection); - - /** - * Create a new connection from a socket accepted by the listener. - */ - void newConnection(Network::ConnectionSocketPtr&& socket, - std::unique_ptr stream_info); - - /** - * Return the active connections container attached with the given filter chain. - */ - ActiveConnections& getOrCreateActiveConnections(const Network::FilterChain& filter_chain); - - /** - * Schedule to remove and destroy the active connections which are not tracked by listener - * config. Caution: The connection are not destroyed yet when function returns. - */ - void deferredRemoveFilterChains( - const std::list& draining_filter_chains); - - /** - * Update the listener config. The follow up connections will see the new config. The existing - * connections are not impacted. - */ - void updateListenerConfig(Network::ListenerConfig& config); - - Network::TcpConnectionHandler& parent_; - Network::ListenerPtr listener_; - const std::chrono::milliseconds listener_filters_timeout_; - const bool continue_on_listener_filters_timeout_; - std::list sockets_; - absl::node_hash_map connections_by_context_; - - // The number of connections currently active on this listener. This is typically used for - // connection balancing across per-handler listeners. - std::atomic num_listener_connections_{}; - bool is_deleting_{false}; - }; - - /** - * Wrapper for a group of active connections which are attached to the same filter chain context. - */ - class ActiveConnections : public Event::DeferredDeletable { - public: - ActiveConnections(ActiveTcpListener& listener, const Network::FilterChain& filter_chain); - ~ActiveConnections() override; - - // listener filter chain pair is the owner of the connections - ActiveTcpListener& listener_; - const Network::FilterChain& filter_chain_; - // Owned connections - std::list connections_; - }; - - /** - * Wrapper for an active TCP connection owned by this handler. - */ - struct ActiveTcpConnection : LinkedObject, - public Event::DeferredDeletable, - public Network::ConnectionCallbacks { - ActiveTcpConnection(ActiveConnections& active_connections, - Network::ConnectionPtr&& new_connection, TimeSource& time_system, - std::unique_ptr&& stream_info); - ~ActiveTcpConnection() override; - - // Network::ConnectionCallbacks - void onEvent(Network::ConnectionEvent event) override { - // Any event leads to destruction of the connection. - if (event == Network::ConnectionEvent::LocalClose || - event == Network::ConnectionEvent::RemoteClose) { - active_connections_.listener_.removeConnection(*this); - } - } - void onAboveWriteBufferHighWatermark() override {} - void onBelowWriteBufferLowWatermark() override {} - - std::unique_ptr stream_info_; - ActiveConnections& active_connections_; - Network::ConnectionPtr connection_; - Stats::TimespanPtr conn_length_; - }; - - /** - * Wrapper for an active accepted TCP socket owned by this handler. - */ - struct ActiveTcpSocket : public Network::ListenerFilterManager, - public Network::ListenerFilterCallbacks, - LinkedObject, - public Event::DeferredDeletable, - Logger::Loggable { - ActiveTcpSocket(ActiveTcpListener& listener, Network::ConnectionSocketPtr&& socket, - bool hand_off_restored_destination_connections) - : listener_(listener), socket_(std::move(socket)), - hand_off_restored_destination_connections_(hand_off_restored_destination_connections), - iter_(accept_filters_.end()), - stream_info_(std::make_unique( - listener_.parent_.dispatcher().timeSource(), socket_->addressProviderSharedPtr(), - StreamInfo::FilterState::LifeSpan::Connection)) { - listener_.stats_.downstream_pre_cx_active_.inc(); - } - ~ActiveTcpSocket() override { - accept_filters_.clear(); - listener_.stats_.downstream_pre_cx_active_.dec(); - - // If the underlying socket is no longer attached, it means that it has been transferred to - // an active connection. In this case, the active connection will decrement the number - // of listener connections. - // TODO(mattklein123): In general the way we account for the number of listener connections - // is incredibly fragile. Revisit this by potentially merging ActiveTcpSocket and - // ActiveTcpConnection, having a shared object which does accounting (but would require - // another allocation, etc.). - if (socket_ != nullptr) { - listener_.decNumConnections(); - } - } - - void onTimeout(); - void startTimer(); - void unlink(); - void newConnection(); - - class GenericListenerFilter : public Network::ListenerFilter { - public: - GenericListenerFilter(const Network::ListenerFilterMatcherSharedPtr& matcher, - Network::ListenerFilterPtr listener_filter) - : listener_filter_(std::move(listener_filter)), matcher_(std::move(matcher)) {} - Network::FilterStatus onAccept(ListenerFilterCallbacks& cb) override { - if (isDisabled(cb)) { - return Network::FilterStatus::Continue; - } - return listener_filter_->onAccept(cb); - } - /** - * Check if this filter filter should be disabled on the incoming socket. - * @param cb the callbacks the filter instance can use to communicate with the filter chain. - **/ - bool isDisabled(ListenerFilterCallbacks& cb) { - if (matcher_ == nullptr) { - return false; - } else { - return matcher_->matches(cb); - } - } - - private: - const Network::ListenerFilterPtr listener_filter_; - const Network::ListenerFilterMatcherSharedPtr matcher_; - }; - using ListenerFilterWrapperPtr = std::unique_ptr; - - // Network::ListenerFilterManager - void addAcceptFilter(const Network::ListenerFilterMatcherSharedPtr& listener_filter_matcher, - Network::ListenerFilterPtr&& filter) override { - accept_filters_.emplace_back( - std::make_unique(listener_filter_matcher, std::move(filter))); - } - - // Network::ListenerFilterCallbacks - Network::ConnectionSocket& socket() override { return *socket_.get(); } - Event::Dispatcher& dispatcher() override { return listener_.parent_.dispatcher(); } - void continueFilterChain(bool success) override; - void setDynamicMetadata(const std::string& name, const ProtobufWkt::Struct& value) override; - envoy::config::core::v3::Metadata& dynamicMetadata() override { - return stream_info_->dynamicMetadata(); - }; - const envoy::config::core::v3::Metadata& dynamicMetadata() const override { - return stream_info_->dynamicMetadata(); - }; - - StreamInfo::FilterState& filterState() override { return *stream_info_->filterState().get(); } - - ActiveTcpListener& listener_; - Network::ConnectionSocketPtr socket_; - const bool hand_off_restored_destination_connections_; - std::list accept_filters_; - std::list::iterator iter_; - Event::TimerPtr timer_; - std::unique_ptr stream_info_; - bool connected_{false}; - }; - using ActiveTcpListenerOptRef = absl::optional>; - private: struct ActiveListenerDetails { // Strong pointer to the listener, whether TCP, UDP, QUIC, etc. Network::ConnectionHandler::ActiveListenerPtr listener_; - absl::variant, + absl::variant, std::reference_wrapper> typed_listener_;