Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions envoy/event/dispatcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,19 @@ class Dispatcher : public DispatcherBase, public ScopeTracker {
Network::TransportSocketPtr&& transport_socket,
const Network::ConnectionSocket::OptionsSharedPtr& options) PURE;

/**
* Register an internal listener manager for this dispatcher.
*/
virtual void
registerInternalListenerManager(Network::InternalListenerManager& internal_listener_manager) PURE;

/**
* @brief Get the Internal Listener Manager object.
*
* @return the registered internal istener manager or nullopt.
*/
virtual Network::InternalListenerManagerOptRef getInternalListenerManager() PURE;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we avoid use of the dispatcher interface as a registration mechanism for factories like this one?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems there was slight agreement on this so I stopped investing on bootstrap extension.

My opinion is that adding this interface to dispatcher is more straightforward, well, at the cost of adding getter/setter

CC @yanavlasov

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need a client connection factory extension point that connection pools and other things can use to create connections. An eventual goal may be to completely remove the client connection creation methods from the dispatcher. But this is something that @envoyproxy/senior-maintainers may want to chime in on.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see two arguments here,
the first is to store the internal listener registry, here I choose dispatcher over bootstrap extension(where we can add a threadlocal registery), because the life time of dispatcher and listener manager is cleaner. The dispatcher is guaranteed existing when an internal listener is ready to add and use.

I think we can add the complexity in booting the server, and offer a hook for internal listener registry if we choose to store the internal listeners there. I feel we can do that after.

the second is to remove the thin dispatcher::createClientConnection. Again we can do that along with this PR but I prefer to do it later when signed off.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current approach of adding the internal listener registry to the dispatcher does not generalize to the next extension that needs to hook into the code that creates client connections. I recommend introducing a factory mechanism for client connections and transition to it, eventually deprecating the current Dispatcher::createClientConnection

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with @antoniovicente that the extension behavior has leaked into core interfaces. II think it would be cleaner if the mechanism for finding the listener was completely encapsulated in the factory. For example when internal listener is created it is registering itself in the factory's thread local storage, where it can later be looked up for establishing client connections. In this way you can avoid needing to change the dispatcher interface and ConnectionHandlerImpl


/**
* @return Filesystem::WatcherPtr a filesystem watcher owned by the caller.
*/
Expand Down
12 changes: 12 additions & 0 deletions envoy/network/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,18 @@ envoy_cc_library(
],
)

envoy_cc_library(
name = "client_connection_factory",
hdrs = ["client_connection_factory.h"],
deps = [
":address_interface",
":connection_interface",
":listen_socket_interface",
":transport_socket_interface",
"//envoy/config:typed_config_interface",
],
)

envoy_cc_library(
name = "connection_handler_interface",
hdrs = ["connection_handler.h"],
Expand Down
29 changes: 29 additions & 0 deletions envoy/network/client_connection_factory.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
#pragma once

#include "envoy/config/typed_config.h"
#include "envoy/network/address.h"
#include "envoy/network/connection.h"
#include "envoy/network/listen_socket.h"
#include "envoy/network/transport_socket.h"

namespace Envoy {
namespace Network {

class ClientConnectionFactory : public Config::UntypedFactory {
public:
ClientConnectionFactory() = default;
~ClientConnectionFactory() override = default;

// Config::UntypedFactory
std::string category() const override { return "network.connection"; }

virtual Network::ClientConnectionPtr
createClientConnection(Event::Dispatcher& dispatcher,
Network::Address::InstanceConstSharedPtr address,
Network::Address::InstanceConstSharedPtr source_address,
Network::TransportSocketPtr&& transport_socket,
const Network::ConnectionSocket::OptionsSharedPtr& options) PURE;
};

} // namespace Network
} // namespace Envoy
3 changes: 3 additions & 0 deletions source/common/event/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,14 @@ envoy_cc_library(
"//envoy/common:scope_tracker_interface",
"//envoy/common:time_interface",
"//envoy/event:signal_interface",
"//envoy/network:client_connection_factory",
"//envoy/network:listen_socket_interface",
"//envoy/network:listener_interface",
"//source/common/common:assert_lib",
"//source/common/common:thread_lib",
"//source/common/config:utility_lib",
"//source/common/filesystem:watcher_lib",
"//source/common/network:address_lib",
"//source/common/network:connection_lib",
"//source/common/network:listener_lib",
"@envoy_api//envoy/config/overload/v3:pkg_cc_proto",
Expand Down
11 changes: 9 additions & 2 deletions source/common/event/dispatcher_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,22 @@
#include "envoy/api/api.h"
#include "envoy/common/scope_tracker.h"
#include "envoy/config/overload/v3/overload.pb.h"
#include "envoy/network/client_connection_factory.h"
#include "envoy/network/listen_socket.h"
#include "envoy/network/listener.h"

#include "source/common/buffer/buffer_impl.h"
#include "source/common/common/assert.h"
#include "source/common/common/lock_guard.h"
#include "source/common/common/thread.h"
#include "source/common/config/utility.h"
#include "source/common/event/file_event_impl.h"
#include "source/common/event/libevent_scheduler.h"
#include "source/common/event/scaled_range_timer_manager_impl.h"
#include "source/common/event/signal_impl.h"
#include "source/common/event/timer_impl.h"
#include "source/common/filesystem/watcher_impl.h"
#include "source/common/network/address_impl.h"
#include "source/common/network/connection_impl.h"
#include "source/common/network/tcp_listener_impl.h"
#include "source/common/network/udp_listener_impl.h"
Expand Down Expand Up @@ -148,8 +151,12 @@ DispatcherImpl::createClientConnection(Network::Address::InstanceConstSharedPtr
Network::TransportSocketPtr&& transport_socket,
const Network::ConnectionSocket::OptionsSharedPtr& options) {
ASSERT(isThreadSafe());
return std::make_unique<Network::ClientConnectionImpl>(*this, address, source_address,
std::move(transport_socket), options);

auto factory = Config::Utility::getFactoryByName<Network::ClientConnectionFactory>(
std::string(Network::Address::addressType(address)));
// The caller expects a non-null connection as of today. A unsupported address will crash any way.
return factory->createClientConnection(*this, address, source_address,
std::move(transport_socket), options);
}

FileEventPtr DispatcherImpl::createFileEvent(os_fd_t fd, FileReadyCb cb, FileTriggerType trigger,
Expand Down
12 changes: 12 additions & 0 deletions source/common/event/dispatcher_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,17 @@ class DispatcherImpl : Logger::Loggable<Logger::Id::main>,
Network::Address::InstanceConstSharedPtr source_address,
Network::TransportSocketPtr&& transport_socket,
const Network::ConnectionSocket::OptionsSharedPtr& options) override;

void registerInternalListenerManager(
Network::InternalListenerManager& internal_listener_manager) override {
ASSERT(!internal_listener_manager_.has_value());
internal_listener_manager_ = internal_listener_manager;
}

Network::InternalListenerManagerOptRef getInternalListenerManager() override {
return internal_listener_manager_;
}

FileEventPtr createFileEvent(os_fd_t fd, FileReadyCb cb, FileTriggerType trigger,
uint32_t events) override;
Filesystem::WatcherPtr createFilesystemWatcher() override;
Expand Down Expand Up @@ -172,6 +183,7 @@ class DispatcherImpl : Logger::Loggable<Logger::Id::main>,
MonotonicTime approximate_monotonic_time_;
WatchdogRegistrationPtr watchdog_registration_;
const ScaledRangeTimerManagerPtr scaled_timer_manager_;
Network::InternalListenerManagerOptRef internal_listener_manager_;
};

} // namespace Event
Expand Down
25 changes: 25 additions & 0 deletions source/common/network/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ envoy_cc_library(
deps = [
":address_lib",
":connection_base_lib",
":default_client_connection_factory",
":raw_buffer_socket_lib",
":utility_lib",
"//envoy/event:timer_interface",
Expand Down Expand Up @@ -108,6 +109,30 @@ envoy_cc_library(
],
)

envoy_cc_library(
name = "default_client_connection_factory",
srcs = [
"default_client_connection_factory.cc",
],
hdrs = [
"connection_impl.h",
"default_client_connection_factory.h",
],
deps = [
":address_lib",
"//envoy/network:client_connection_factory",
"//envoy/network:connection_interface",
"//envoy/network:transport_socket_interface",
"//envoy/registry",

# required by connection_impl.h
# will organize below when default client factory is confirmed to use dedicated target.
"//source/common/buffer:watermark_buffer_lib",
":connection_base_lib",
"//source/common/stream_info:stream_info_lib",
],
)

envoy_cc_library(
name = "filter_lib",
hdrs = ["filter_impl.h"],
Expand Down
12 changes: 12 additions & 0 deletions source/common/network/address_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,18 @@ addressFromSockAddrOrDie(const sockaddr_storage& ss, socklen_t ss_len, os_fd_t f
return *address;
}

absl::string_view addressType(const Network::Address::InstanceConstSharedPtr& addr) {
ASSERT(addr != nullptr);
switch (addr->type()) {
case Network::Address::Type::Ip:
case Network::Address::Type::Pipe:
return "default";
case Network::Address::Type::EnvoyInternal:
return "EnvoyInternal";
}
NOT_REACHED_GCOVR_EXCL_LINE;
};

Ipv4Instance::Ipv4Instance(const sockaddr_in* address, const SocketInterface* sock_interface)
: InstanceBase(Type::Ip, sockInterfaceOrDefault(sock_interface)) {
throwOnError(validateProtocolSupported());
Expand Down
6 changes: 6 additions & 0 deletions source/common/network/address_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,12 @@ InstanceConstSharedPtr addressFromSockAddrOrThrow(const sockaddr_storage& ss, so
InstanceConstSharedPtr addressFromSockAddrOrDie(const sockaddr_storage& ss, socklen_t ss_len,
os_fd_t fd, bool v6only = true);

/**
* Return the address type in string_view. The returned type name is used by calling
* ClientConnectionFactory.
*/
absl::string_view addressType(const Network::Address::InstanceConstSharedPtr& addr);

/**
* Base class for all address types.
*/
Expand Down
24 changes: 24 additions & 0 deletions source/common/network/default_client_connection_factory.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
#include "source/common/network/default_client_connection_factory.h"

#include "envoy/registry/registry.h"

#include "source/common/network/address_impl.h"
#include "source/common/network/connection_impl.h"

namespace Envoy {

namespace Network {

Network::ClientConnectionPtr DefaultClientConnectionFactory::createClientConnection(
Event::Dispatcher& dispatcher, Network::Address::InstanceConstSharedPtr address,
Network::Address::InstanceConstSharedPtr source_address,
Network::TransportSocketPtr&& transport_socket,
const Network::ConnectionSocket::OptionsSharedPtr& options) {
ASSERT(address->ip() || address->pipe());
return std::make_unique<Network::ClientConnectionImpl>(dispatcher, address, source_address,
std::move(transport_socket), options);
}
REGISTER_FACTORY(DefaultClientConnectionFactory, Network::ClientConnectionFactory);

} // namespace Network
} // namespace Envoy
24 changes: 24 additions & 0 deletions source/common/network/default_client_connection_factory.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
#pragma once

#include "envoy/common/pure.h"
#include "envoy/network/client_connection_factory.h"
#include "envoy/network/connection.h"

namespace Envoy {

namespace Network {

class DefaultClientConnectionFactory : public Network::ClientConnectionFactory {
public:
~DefaultClientConnectionFactory() override = default;
std::string name() const override { return "default"; }
Network::ClientConnectionPtr
createClientConnection(Event::Dispatcher& dispatcher,
Network::Address::InstanceConstSharedPtr address,
Network::Address::InstanceConstSharedPtr source_address,
Network::TransportSocketPtr&& transport_socket,
const Network::ConnectionSocket::OptionsSharedPtr& options) override;
};

} // namespace Network
} // namespace Envoy
19 changes: 19 additions & 0 deletions source/extensions/io_socket/user_space/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ envoy_cc_extension(
name = "config",
srcs = ["config.h"],
deps = [
":client_connection_factory",
":io_handle_impl_lib",
],
)
Expand Down Expand Up @@ -57,3 +58,21 @@ envoy_cc_library(
"//source/common/network:default_socket_interface_lib",
],
)

envoy_cc_library(
name = "client_connection_factory",
srcs = [
"client_connection_factory.cc",
],
hdrs = [
"client_connection_factory.h",
],
deps = [
":io_handle_impl_lib",
"//envoy/network:client_connection_factory",
"//envoy/network:connection_interface",
"//envoy/registry",
"//source/common/network:connection_lib",
"//source/common/network:listen_socket_lib",
],
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
#include "source/extensions/io_socket/user_space/client_connection_factory.h"

#include "envoy/registry/registry.h"

#include "source/common/network/address_impl.h"
#include "source/common/network/connection_impl.h"
#include "source/common/network/listen_socket_impl.h"
#include "source/extensions/io_socket/user_space/io_handle_impl.h"

namespace Envoy {

namespace Extensions {
namespace IoSocket {
namespace UserSpace {

Network::ClientConnectionPtr InternalClientConnectionFactory::createClientConnection(
Event::Dispatcher& dispatcher, Network::Address::InstanceConstSharedPtr address,
Network::Address::InstanceConstSharedPtr source_address,
Network::TransportSocketPtr&& transport_socket,
const Network::ConnectionSocket::OptionsSharedPtr& options) {

auto [io_handle_client, io_handle_server] =
Extensions::IoSocket::UserSpace::IoHandleFactory::createIoHandlePair();

auto client_conn = std::make_unique<Network::ClientConnectionImpl>(
dispatcher,
std::make_unique<Network::ConnectionSocketImpl>(std::move(io_handle_client), source_address,
address),
source_address, std::move(transport_socket), options);

// It's either in the main thread or the worker is not yet started.
auto internal_listener_manager = dispatcher.getInternalListenerManager();
if (!internal_listener_manager.has_value()) {
io_handle_server->close();
return client_conn;
}

// The request internal listener may not exist.
auto internal_listener = internal_listener_manager.value().get().findByAddress(address);
if (!internal_listener.has_value()) {
io_handle_server->close();
return client_conn;
}

auto accepted_socket = std::make_unique<Network::AcceptedSocketImpl>(std::move(io_handle_server),
address, source_address);
internal_listener->onAccept(std::move(accepted_socket));
return client_conn;
}

REGISTER_FACTORY(InternalClientConnectionFactory, Network::ClientConnectionFactory);

} // namespace UserSpace
} // namespace IoSocket
} // namespace Extensions
} // namespace Envoy
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
#pragma once

#include <memory>
#include <string>

#include "envoy/common/pure.h"
#include "envoy/network/client_connection_factory.h"
#include "envoy/network/connection.h"

#include "source/common/common/logger.h"

namespace Envoy {

namespace Extensions {
namespace IoSocket {
namespace UserSpace {

class InternalClientConnectionFactory : public Network::ClientConnectionFactory,
Logger::Loggable<Logger::Id::connection> {
public:
~InternalClientConnectionFactory() override = default;
std::string name() const override { return "EnvoyInternal"; }
Network::ClientConnectionPtr
createClientConnection(Event::Dispatcher& dispatcher,
Network::Address::InstanceConstSharedPtr address,
Network::Address::InstanceConstSharedPtr source_address,
Network::TransportSocketPtr&& transport_socket,
const Network::ConnectionSocket::OptionsSharedPtr& options) override;
};

} // namespace UserSpace
} // namespace IoSocket
} // namespace Extensions
} // namespace Envoy
1 change: 1 addition & 0 deletions source/server/worker_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ WorkerPtr ProdWorkerFactory::createWorker(uint32_t index, OverloadManager& overl
Event::DispatcherPtr dispatcher(
api_.allocateDispatcher(worker_name, overload_manager.scaledTimerFactory()));
auto conn_handler = std::make_unique<ConnectionHandlerImpl>(*dispatcher, index);
dispatcher->registerInternalListenerManager(*conn_handler);
return std::make_unique<WorkerImpl>(tls_, hooks_, std::move(dispatcher), std::move(conn_handler),
overload_manager, api_, stat_names_);
}
Expand Down
Loading