Skip to content
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion envoy/network/connection_handler.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <cstdint>
#include <memory>

#include "envoy/network/address.h"
#include "envoy/network/connection.h"
#include "envoy/network/connection_balancer.h"
#include "envoy/network/filter.h"
Expand Down Expand Up @@ -178,10 +179,12 @@ class TcpConnectionHandler : public virtual ConnectionHandler {
/**
* Obtain the rebalancer of the tcp listener.
* @param listener_tag supplies the tag of the tcp listener that was passed to addListener().
* @param address is used to query the address specific handler.
* @return BalancedConnectionHandlerOptRef the balancer attached to the listener. `nullopt` if
* listener doesn't exist or rebalancer doesn't exist.
*/
virtual BalancedConnectionHandlerOptRef getBalancedHandlerByTag(uint64_t listener_tag) PURE;
virtual BalancedConnectionHandlerOptRef
getBalancedHandlerByTag(uint64_t listener_tag, const Network::Address::Instance& address) PURE;

/**
* Obtain the rebalancer of the tcp listener.
Expand Down
4 changes: 3 additions & 1 deletion envoy/network/listener.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include "envoy/config/listener/v3/udp_listener_config.pb.h"
#include "envoy/config/typed_metadata.h"
#include "envoy/init/manager.h"
#include "envoy/network/address.h"
#include "envoy/network/connection.h"
#include "envoy/network/connection_balancer.h"
#include "envoy/network/listen_socket.h"
Expand Down Expand Up @@ -221,10 +222,11 @@ class ListenerConfig {
virtual envoy::config::core::v3::TrafficDirection direction() const PURE;

/**
* @param address is used for query the address specific connection balancer.
* @return the connection balancer for this listener. All listeners have a connection balancer,
* though the implementation may be a NOP balancer.
*/
virtual ConnectionBalancer& connectionBalancer() PURE;
virtual ConnectionBalancer& connectionBalancer(const Network::Address::Instance& address) PURE;

/**
* Open connection resources for this listener.
Expand Down
26 changes: 15 additions & 11 deletions source/server/active_tcp_listener.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,27 +14,32 @@ namespace Server {

ActiveTcpListener::ActiveTcpListener(Network::TcpConnectionHandler& parent,
Network::ListenerConfig& config, Runtime::Loader& runtime,
Network::SocketSharedPtr&& socket)
Network::SocketSharedPtr&& socket,
Network::Address::InstanceConstSharedPtr& address,
Network::ConnectionBalancer& connection_balancer)
: OwnedActiveStreamListenerBase(
parent, parent.dispatcher(),
parent.dispatcher().createListener(std::move(socket), *this, runtime, config.bindToPort(),
config.ignoreGlobalConnLimit()),
config),
tcp_conn_handler_(parent) {
config.connectionBalancer().registerHandler(*this);
tcp_conn_handler_(parent), connection_balancer_(connection_balancer), address_(address) {
connection_balancer_.registerHandler(*this);
}

ActiveTcpListener::ActiveTcpListener(Network::TcpConnectionHandler& parent,
Network::ListenerPtr&& listener,
Network::ListenerConfig& config, Runtime::Loader&)
Network::Address::InstanceConstSharedPtr& address,
Network::ListenerConfig& config,
Network::ConnectionBalancer& connection_balancer,
Runtime::Loader&)
: OwnedActiveStreamListenerBase(parent, parent.dispatcher(), std::move(listener), config),
tcp_conn_handler_(parent) {
config.connectionBalancer().registerHandler(*this);
tcp_conn_handler_(parent), connection_balancer_(connection_balancer), address_(address) {
connection_balancer_.registerHandler(*this);
}

ActiveTcpListener::~ActiveTcpListener() {
is_deleting_ = true;
config_->connectionBalancer().unregisterHandler(*this);
connection_balancer_.unregisterHandler(*this);

// Purge sockets that have not progressed to connections. This should only happen when
// a listener filter stops iteration and never resumes.
Expand Down Expand Up @@ -64,7 +69,6 @@ ActiveTcpListener::~ActiveTcpListener() {

void ActiveTcpListener::updateListenerConfig(Network::ListenerConfig& config) {
ENVOY_LOG(trace, "replacing listener ", config_->listenerTag(), " by ", config.listenerTag());
ASSERT(&config_->connectionBalancer() == &config.connectionBalancer());
config_ = &config;
}

Expand Down Expand Up @@ -97,7 +101,7 @@ void ActiveTcpListener::onAcceptWorker(Network::ConnectionSocketPtr&& socket,
bool rebalanced) {
if (!rebalanced) {
Network::BalancedConnectionHandler& target_handler =
config_->connectionBalancer().pickTargetHandler(*this);
connection_balancer_.pickTargetHandler(*this);
if (&target_handler != this) {
target_handler.post(std::move(socket));
return;
Expand Down Expand Up @@ -152,10 +156,10 @@ void ActiveTcpListener::post(Network::ConnectionSocketPtr&& socket) {
RebalancedSocketSharedPtr socket_to_rebalance = std::make_shared<RebalancedSocket>();
socket_to_rebalance->socket = std::move(socket);

dispatcher().post([socket_to_rebalance, tag = config_->listenerTag(),
dispatcher().post([socket_to_rebalance, address = address_, tag = config_->listenerTag(),
&tcp_conn_handler = tcp_conn_handler_,
handoff = config_->handOffRestoredDestinationConnections()]() {
auto balanced_handler = tcp_conn_handler.getBalancedHandlerByTag(tag);
auto balanced_handler = tcp_conn_handler.getBalancedHandlerByTag(tag, *address);
if (balanced_handler.has_value()) {
balanced_handler->get().onAcceptWorker(std::move(socket_to_rebalance->socket), handoff, true);
return;
Expand Down
14 changes: 12 additions & 2 deletions source/server/active_tcp_listener.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,13 @@ class ActiveTcpListener final : public Network::TcpListenerCallbacks,
public Network::BalancedConnectionHandler {
public:
ActiveTcpListener(Network::TcpConnectionHandler& parent, Network::ListenerConfig& config,
Runtime::Loader& runtime, Network::SocketSharedPtr&& socket);
Runtime::Loader& runtime, Network::SocketSharedPtr&& socket,
Network::Address::InstanceConstSharedPtr& address,
Network::ConnectionBalancer& connection_balancer);
ActiveTcpListener(Network::TcpConnectionHandler& parent, Network::ListenerPtr&& listener,
Network::ListenerConfig& config, Runtime::Loader& runtime);
Network::Address::InstanceConstSharedPtr& address,
Network::ListenerConfig& config,
Network::ConnectionBalancer& connection_balancer, Runtime::Loader& runtime);
~ActiveTcpListener() override;

bool listenerConnectionLimitReached() const {
Expand Down Expand Up @@ -81,6 +85,12 @@ class ActiveTcpListener final : public Network::TcpListenerCallbacks,
// The number of connections currently active on this listener. This is typically used for
// connection balancing across per-handler listeners.
std::atomic<uint64_t> num_listener_connections_{};

Network::ConnectionBalancer& connection_balancer_;
// This is the address of this listener is listening on. And used for get the correct listener
Comment thread
soulxu marked this conversation as resolved.
Outdated
// when rebalancing. The accepted socket can't be used to get the listening address, since
// the accepted socket's remote address can be another address than the listening address.
Network::Address::InstanceConstSharedPtr address_;

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This listener must know it's address because it has a socket, right? It seems like this shouldn't be necessary?

@soulxu soulxu Jun 22, 2022

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have think about another two ways:

  • We can get the address from the accepted socket, but it won't work with the case of the connection is redirected by the iptable
  • The socket object is held by the socket listener, then we can add an interface to the listener to return the socket address.
    class Listener {

The second way works also. But let me know if the second way you think better.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Network::Address::InstanceConstSharedPtr address_;
Network::Address::InstanceConstSharedPtr listen_address_;

Can't you get this from the config though? Is't that stored in ActiveListenerImplBase in config_ or somewhere else?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After add multiple addresses support for ListenerImpl, then the ListenerImpl's address() will changes to addresses() (also need to move address() interface to base ListenerConfig), there still no way for ActiveListener knows which address is listening. I have tried to pass an index of addresses() to the ActiveListener, but I thought the index isn't readable in the end, hold a Network::Address::InstanceConstSharedPtr is more readable.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK this makes sense, thanks.

};

using ActiveTcpListenerOptRef = absl::optional<std::reference_wrapper<ActiveTcpListener>>;
Expand Down
4 changes: 3 additions & 1 deletion source/server/admin/admin.h
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,9 @@ class AdminImpl : public Admin,
envoy::config::core::v3::TrafficDirection direction() const override {
return envoy::config::core::v3::UNSPECIFIED;
}
Network::ConnectionBalancer& connectionBalancer() override { return connection_balancer_; }
Network::ConnectionBalancer& connectionBalancer(const Network::Address::Instance&) override {
return connection_balancer_;
}
ResourceLimit& openConnections() override { return open_connections_; }
const std::vector<AccessLog::InstanceSharedPtr>& accessLogs() const override {
return empty_access_logs_;
Expand Down
32 changes: 19 additions & 13 deletions source/server/connection_handler_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,9 @@ void ConnectionHandlerImpl::addListener(absl::optional<uint64_t> overridden_list
local_registry->createActiveInternalListener(*this, config, dispatcher());
// TODO(soulxu): support multiple internal addresses in listener in the future.
ASSERT(config.listenSocketFactories().size() == 1);
details->addActiveListener(config, config.listenSocketFactories()[0], listener_reject_fraction_,
disable_listeners_, std::move(internal_listener));
details->addActiveListener(config, config.listenSocketFactories()[0]->localAddress(),
listener_reject_fraction_, disable_listeners_,
std::move(internal_listener));
} else if (config.listenSocketFactories()[0]->socketType() == Network::Socket::Type::Stream) {
if (!support_udp_in_place_filter_chain_update && overridden_listener.has_value()) {
if (auto iter = listener_map_by_tag_.find(overridden_listener.value());
Expand All @@ -82,20 +83,22 @@ void ConnectionHandlerImpl::addListener(absl::optional<uint64_t> overridden_list
IS_ENVOY_BUG("unexpected");
}
for (auto& socket_factory : config.listenSocketFactories()) {
auto address = socket_factory->localAddress();
// worker_index_ doesn't have a value on the main thread for the admin server.
details->addActiveListener(
config, socket_factory, listener_reject_fraction_, disable_listeners_,
config, address, listener_reject_fraction_, disable_listeners_,
std::make_unique<ActiveTcpListener>(
*this, config, runtime,
socket_factory->getListenSocket(worker_index_.has_value() ? *worker_index_ : 0)));
socket_factory->getListenSocket(worker_index_.has_value() ? *worker_index_ : 0),
address, config.connectionBalancer(*address)));
}

} else {
ASSERT(config.udpListenerConfig().has_value(), "UDP listener factory is not initialized.");
ASSERT(worker_index_.has_value());
for (auto& socket_factory : config.listenSocketFactories()) {
auto address = socket_factory->localAddress();
details->addActiveListener(
config, socket_factory, listener_reject_fraction_, disable_listeners_,
config, address, listener_reject_fraction_, disable_listeners_,
config.udpListenerConfig()->listenerFactory().createActiveUdpListener(
runtime, *worker_index_, *this,
config.listenSocketFactories()[0]->getListenSocket(*worker_index_), dispatcher_,
Expand Down Expand Up @@ -319,15 +322,18 @@ ConnectionHandlerImpl::findActiveListenerByTag(uint64_t listener_tag) {
}

Network::BalancedConnectionHandlerOptRef
ConnectionHandlerImpl::getBalancedHandlerByTag(uint64_t listener_tag) {
ConnectionHandlerImpl::getBalancedHandlerByTag(uint64_t listener_tag,
const Network::Address::Instance& address) {
auto active_listener = findActiveListenerByTag(listener_tag);
if (active_listener.has_value()) {
// TODO(soulxu): return first listener here, this will be changed
// when ConnectionBalancer supports the multiple addresses.
ASSERT(absl::holds_alternative<std::reference_wrapper<ActiveTcpListener>>(
active_listener->get().per_address_details_list_[0]->typed_listener_) &&
active_listener->get().per_address_details_list_[0]->listener_->listener() != nullptr);
return active_listener->get().per_address_details_list_[0]->tcpListener().value().get();
for (auto& details : active_listener->get().per_address_details_list_) {
if (*details->address_ == address) {
ASSERT(absl::holds_alternative<std::reference_wrapper<ActiveTcpListener>>(
details->typed_listener_) &&
details->listener_->listener() != nullptr);
return {details->tcpListener().value().get()};
}
}
}
return absl::nullopt;
}
Expand Down
8 changes: 5 additions & 3 deletions source/server/connection_handler_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,9 @@ class ConnectionHandlerImpl : public Network::TcpConnectionHandler,

// Network::TcpConnectionHandler
Event::Dispatcher& dispatcher() override { return dispatcher_; }
Network::BalancedConnectionHandlerOptRef getBalancedHandlerByTag(uint64_t listener_tag) override;
Network::BalancedConnectionHandlerOptRef
getBalancedHandlerByTag(uint64_t listener_tag,
const Network::Address::Instance& address) override;
Network::BalancedConnectionHandlerOptRef
getBalancedHandlerByAddress(const Network::Address::Instance& address) override;

Expand Down Expand Up @@ -107,13 +109,13 @@ class ConnectionHandlerImpl : public Network::TcpConnectionHandler,
*/
template <class ActiveListener>
void addActiveListener(Network::ListenerConfig& config,
const Network::ListenSocketFactoryPtr& socket_factory,
const Network::Address::InstanceConstSharedPtr& address,
UnitFloat& listener_reject_fraction, bool disable_listeners,
ActiveListener&& listener) {
auto per_address_details = std::make_shared<PerAddressActiveListenerDetails>();
per_address_details->typed_listener_ = *listener;
per_address_details->listener_ = std::move(listener);
per_address_details->address_ = socket_factory->localAddress();
per_address_details->address_ = address;
if (disable_listeners) {
per_address_details->listener_->pauseListening();
}
Expand Down
Loading