diff --git a/include/envoy/network/BUILD b/include/envoy/network/BUILD index aa0e567416575..0427cc307df44 100644 --- a/include/envoy/network/BUILD +++ b/include/envoy/network/BUILD @@ -100,6 +100,7 @@ envoy_cc_library( ":address_interface", "//include/envoy/api:io_error_interface", "//include/envoy/api:os_sys_calls_interface", + "//include/envoy/event:file_event_interface", "//source/common/common:assert_lib", ], ) diff --git a/include/envoy/network/io_handle.h b/include/envoy/network/io_handle.h index 6e98a54280387..b676385a8796c 100644 --- a/include/envoy/network/io_handle.h +++ b/include/envoy/network/io_handle.h @@ -5,6 +5,7 @@ #include "envoy/api/io_error.h" #include "envoy/common/platform.h" #include "envoy/common/pure.h" +#include "envoy/event/file_event.h" #include "envoy/network/address.h" #include "absl/container/fixed_array.h" @@ -15,6 +16,10 @@ namespace Buffer { struct RawSlice; } // namespace Buffer +namespace Event { +class Dispatcher; +} // namespace Event + using RawSliceArrays = absl::FixedArray>; namespace Network { @@ -223,6 +228,18 @@ class IoHandle { * @return peer's address as @ref Address::InstanceConstSharedPtr */ virtual Address::InstanceConstSharedPtr peerAddress() PURE; + + /** + * Creates a file event that will signal when the io handle is readable, writable or closed. + * @param dispatcher dispatcher to be used to allocate the file event. + * @param cb supplies the callback to fire when the handle is ready. + * @param trigger specifies whether to edge or level trigger. + * @param events supplies a logical OR of @ref Event::FileReadyType events that the file event + * should initially listen on. + * @return @ref Event::FileEventPtr + */ + virtual Event::FileEventPtr createFileEvent(Event::Dispatcher& dispatcher, Event::FileReadyCb cb, + Event::FileTriggerType trigger, uint32_t events) PURE; }; using IoHandlePtr = std::unique_ptr; diff --git a/source/common/network/BUILD b/source/common/network/BUILD index 928938b4b0d1f..11b6e565bd890 100644 --- a/source/common/network/BUILD +++ b/source/common/network/BUILD @@ -191,6 +191,7 @@ envoy_cc_library( ":io_socket_error_lib", ":socket_interface_lib", ":socket_lib", + "//include/envoy/event:dispatcher_interface", "//include/envoy/network:io_handle_interface", "//source/common/api:os_sys_calls_lib", "@envoy_api//envoy/extensions/network/socket_interface/v3:pkg_cc_proto", diff --git a/source/common/network/connection_impl.cc b/source/common/network/connection_impl.cc index 5e542caae0f3e..083802e092574 100644 --- a/source/common/network/connection_impl.cc +++ b/source/common/network/connection_impl.cc @@ -69,9 +69,9 @@ ConnectionImpl::ConnectionImpl(Event::Dispatcher& dispatcher, ConnectionSocketPt // We never ask for both early close and read at the same time. If we are reading, we want to // consume all available data. - file_event_ = dispatcher_.createFileEvent( - ConnectionImpl::ioHandle().fd(), [this](uint32_t events) -> void { onFileEvent(events); }, - trigger, Event::FileReadyType::Read | Event::FileReadyType::Write); + file_event_ = socket_->ioHandle().createFileEvent( + dispatcher_, [this](uint32_t events) -> void { onFileEvent(events); }, trigger, + Event::FileReadyType::Read | Event::FileReadyType::Write); transport_socket_->setTransportSocketCallbacks(*this); } diff --git a/source/common/network/io_socket_handle_impl.cc b/source/common/network/io_socket_handle_impl.cc index fb0ecf4615e81..9f95869d82985 100644 --- a/source/common/network/io_socket_handle_impl.cc +++ b/source/common/network/io_socket_handle_impl.cc @@ -472,5 +472,12 @@ Address::InstanceConstSharedPtr IoSocketHandleImpl::peerAddress() { return Address::addressFromSockAddr(ss, ss_len); } +Event::FileEventPtr IoSocketHandleImpl::createFileEvent(Event::Dispatcher& dispatcher, + Event::FileReadyCb cb, + Event::FileTriggerType trigger, + uint32_t events) { + return dispatcher.createFileEvent(fd_, cb, trigger, events); +} + } // namespace Network } // namespace Envoy diff --git a/source/common/network/io_socket_handle_impl.h b/source/common/network/io_socket_handle_impl.h index 8be4b5aa92521..c5ced3e2ae37b 100644 --- a/source/common/network/io_socket_handle_impl.h +++ b/source/common/network/io_socket_handle_impl.h @@ -3,6 +3,7 @@ #include "envoy/api/io_error.h" #include "envoy/api/os_sys_calls.h" #include "envoy/common/platform.h" +#include "envoy/event/dispatcher.h" #include "envoy/network/io_handle.h" #include "common/common/logger.h" @@ -58,6 +59,8 @@ class IoSocketHandleImpl : public IoHandle, protected Logger::Loggable domain() override; Address::InstanceConstSharedPtr localAddress() override; Address::InstanceConstSharedPtr peerAddress() override; + Event::FileEventPtr createFileEvent(Event::Dispatcher& dispatcher, Event::FileReadyCb cb, + Event::FileTriggerType trigger, uint32_t events) override; protected: // Converts a SysCallSizeResult to IoCallUint64Result. diff --git a/source/common/network/listener_impl.cc b/source/common/network/listener_impl.cc index c1722c88239c6..5a733c9c4dfa1 100644 --- a/source/common/network/listener_impl.cc +++ b/source/common/network/listener_impl.cc @@ -95,8 +95,8 @@ void ListenerImpl::setupServerSocket(Event::DispatcherImpl& dispatcher, Socket& // Although onSocketEvent drains to completion, use level triggered mode to avoid potential // loss of the trigger due to transient accept errors. - file_event_ = dispatcher.createFileEvent( - socket.ioHandle().fd(), [this](uint32_t events) -> void { onSocketEvent(events); }, + file_event_ = socket.ioHandle().createFileEvent( + dispatcher, [this](uint32_t events) -> void { onSocketEvent(events); }, Event::FileTriggerType::Level, Event::FileReadyType::Read); if (!Network::Socket::applyOptions(socket.options(), socket, diff --git a/source/common/network/udp_listener_impl.cc b/source/common/network/udp_listener_impl.cc index 84d42555f1b4d..d2cbd0feb7337 100644 --- a/source/common/network/udp_listener_impl.cc +++ b/source/common/network/udp_listener_impl.cc @@ -31,8 +31,8 @@ namespace Network { UdpListenerImpl::UdpListenerImpl(Event::DispatcherImpl& dispatcher, SocketSharedPtr socket, UdpListenerCallbacks& cb, TimeSource& time_source) : BaseListenerImpl(dispatcher, std::move(socket)), cb_(cb), time_source_(time_source) { - file_event_ = dispatcher_.createFileEvent( - socket_->ioHandle().fd(), [this](uint32_t events) -> void { onSocketEvent(events); }, + file_event_ = socket_->ioHandle().createFileEvent( + dispatcher, [this](uint32_t events) -> void { onSocketEvent(events); }, Event::PlatformDefaultTriggerType, Event::FileReadyType::Read | Event::FileReadyType::Write); ASSERT(file_event_); diff --git a/source/extensions/filters/listener/http_inspector/http_inspector.cc b/source/extensions/filters/listener/http_inspector/http_inspector.cc index 3a7f46addb47f..db1c634bcdb20 100644 --- a/source/extensions/filters/listener/http_inspector/http_inspector.cc +++ b/source/extensions/filters/listener/http_inspector/http_inspector.cc @@ -61,8 +61,8 @@ Network::FilterStatus Filter::onAccept(Network::ListenerFilterCallbacks& cb) { case ParseState::Continue: // do nothing but create the event ASSERT(file_event_ == nullptr); - file_event_ = cb.dispatcher().createFileEvent( - socket.ioHandle().fd(), + file_event_ = cb.socket().ioHandle().createFileEvent( + cb.dispatcher(), [this](uint32_t events) { ENVOY_LOG(trace, "http inspector event: {}", events); // inspector is always peeking and can never determine EOF. diff --git a/source/extensions/filters/listener/proxy_protocol/proxy_protocol.cc b/source/extensions/filters/listener/proxy_protocol/proxy_protocol.cc index 9c474bc8b4207..f532fa3e57367 100644 --- a/source/extensions/filters/listener/proxy_protocol/proxy_protocol.cc +++ b/source/extensions/filters/listener/proxy_protocol/proxy_protocol.cc @@ -68,8 +68,8 @@ Network::FilterStatus Filter::onAccept(Network::ListenerFilterCallbacks& cb) { ENVOY_LOG(debug, "proxy_protocol: New connection accepted"); Network::ConnectionSocket& socket = cb.socket(); ASSERT(file_event_.get() == nullptr); - file_event_ = cb.dispatcher().createFileEvent( - socket.ioHandle().fd(), + file_event_ = socket.ioHandle().createFileEvent( + cb.dispatcher(), [this](uint32_t events) { ASSERT(events == Event::FileReadyType::Read); onRead(); diff --git a/source/extensions/filters/listener/tls_inspector/tls_inspector.cc b/source/extensions/filters/listener/tls_inspector/tls_inspector.cc index 87095b65c9702..89b37b0775191 100644 --- a/source/extensions/filters/listener/tls_inspector/tls_inspector.cc +++ b/source/extensions/filters/listener/tls_inspector/tls_inspector.cc @@ -92,8 +92,8 @@ Network::FilterStatus Filter::onAccept(Network::ListenerFilterCallbacks& cb) { return Network::FilterStatus::Continue; case ParseState::Continue: // do nothing but create the event - file_event_ = cb.dispatcher().createFileEvent( - socket.ioHandle().fd(), + file_event_ = socket.ioHandle().createFileEvent( + cb.dispatcher(), [this](uint32_t events) { if (events & Event::FileReadyType::Closed) { config_->stats().connection_closed_.inc(); diff --git a/source/extensions/filters/udp/udp_proxy/udp_proxy_filter.cc b/source/extensions/filters/udp/udp_proxy/udp_proxy_filter.cc index 07dc6a49c5efd..37a60c34493cb 100644 --- a/source/extensions/filters/udp/udp_proxy/udp_proxy_filter.cc +++ b/source/extensions/filters/udp/udp_proxy/udp_proxy_filter.cc @@ -158,8 +158,9 @@ UdpProxyFilter::ActiveSession::ActiveSession(ClusterInfo& cluster, // NOTE: The socket call can only fail due to memory/fd exhaustion. No local ephemeral port // is bound until the first packet is sent to the upstream host. io_handle_(cluster.filter_.createIoHandle(host)), - socket_event_(cluster.filter_.read_callbacks_->udpListener().dispatcher().createFileEvent( - io_handle_->fd(), [this](uint32_t) { onReadReady(); }, Event::PlatformDefaultTriggerType, + socket_event_(io_handle_->createFileEvent( + cluster.filter_.read_callbacks_->udpListener().dispatcher(), + [this](uint32_t) { onReadReady(); }, Event::PlatformDefaultTriggerType, Event::FileReadyType::Read)) { ENVOY_LOG(debug, "creating new session: downstream={} local={} upstream={}", addresses_.peer_->asStringView(), addresses_.local_->asStringView(), diff --git a/source/extensions/quic_listeners/quiche/envoy_quic_client_connection.cc b/source/extensions/quic_listeners/quiche/envoy_quic_client_connection.cc index bb3c172536dfb..830b28e286344 100644 --- a/source/extensions/quic_listeners/quiche/envoy_quic_client_connection.cc +++ b/source/extensions/quic_listeners/quiche/envoy_quic_client_connection.cc @@ -78,10 +78,9 @@ uint64_t EnvoyQuicClientConnection::maxPacketSize() const { void EnvoyQuicClientConnection::setUpConnectionSocket() { if (connectionSocket()->ioHandle().isOpen()) { - file_event_ = dispatcher_.createFileEvent( - connectionSocket()->ioHandle().fd(), - [this](uint32_t events) -> void { onFileEvent(events); }, Event::FileTriggerType::Edge, - Event::FileReadyType::Read | Event::FileReadyType::Write); + file_event_ = connectionSocket()->ioHandle().createFileEvent( + dispatcher_, [this](uint32_t events) -> void { onFileEvent(events); }, + Event::FileTriggerType::Edge, Event::FileReadyType::Read | Event::FileReadyType::Write); if (!Network::Socket::applyOptions(connectionSocket()->options(), *connectionSocket(), envoy::config::core::v3::SocketOption::STATE_LISTENING)) { diff --git a/source/extensions/quic_listeners/quiche/quic_io_handle_wrapper.h b/source/extensions/quic_listeners/quiche/quic_io_handle_wrapper.h index d58d4a1060446..b4e9c25d11aa5 100644 --- a/source/extensions/quic_listeners/quiche/quic_io_handle_wrapper.h +++ b/source/extensions/quic_listeners/quiche/quic_io_handle_wrapper.h @@ -92,6 +92,10 @@ class QuicIoHandleWrapper : public Network::IoHandle { Network::Address::InstanceConstSharedPtr peerAddress() override { return io_handle_.peerAddress(); } + Event::FileEventPtr createFileEvent(Event::Dispatcher& dispatcher, Event::FileReadyCb cb, + Event::FileTriggerType trigger, uint32_t events) override { + return io_handle_.createFileEvent(dispatcher, cb, trigger, events); + } private: Network::IoHandle& io_handle_; diff --git a/test/extensions/filters/listener/http_inspector/http_inspector_test.cc b/test/extensions/filters/listener/http_inspector/http_inspector_test.cc index e685ceb98211e..713c2aa6902d9 100644 --- a/test/extensions/filters/listener/http_inspector/http_inspector_test.cc +++ b/test/extensions/filters/listener/http_inspector/http_inspector_test.cc @@ -41,6 +41,7 @@ class HttpInspectorTest : public testing::Test { EXPECT_CALL(socket_, detectedTransportProtocol()).WillRepeatedly(Return("raw_buffer")); EXPECT_CALL(cb_, dispatcher()).WillRepeatedly(ReturnRef(dispatcher_)); EXPECT_CALL(testing::Const(socket_), ioHandle()).WillRepeatedly(ReturnRef(*io_handle_)); + EXPECT_CALL(socket_, ioHandle()).WillRepeatedly(ReturnRef(*io_handle_)); if (include_inline_recv) { EXPECT_CALL(os_sys_calls_, recv(42, _, _, MSG_PEEK)) @@ -72,6 +73,7 @@ TEST_F(HttpInspectorTest, SkipHttpInspectForTLS) { filter_ = std::make_unique(cfg_); EXPECT_CALL(cb_, socket()).WillRepeatedly(ReturnRef(socket_)); + EXPECT_CALL(socket_, ioHandle()).WillRepeatedly(ReturnRef(*io_handle_)); EXPECT_CALL(socket_, detectedTransportProtocol()).WillRepeatedly(Return("TLS")); EXPECT_EQ(filter_->onAccept(cb_), Network::FilterStatus::Continue); } diff --git a/test/extensions/filters/udp/udp_proxy/udp_proxy_filter_test.cc b/test/extensions/filters/udp/udp_proxy/udp_proxy_filter_test.cc index dcaf3846fb004..80f957d67509b 100644 --- a/test/extensions/filters/udp/udp_proxy/udp_proxy_filter_test.cc +++ b/test/extensions/filters/udp/udp_proxy/udp_proxy_filter_test.cc @@ -162,10 +162,8 @@ class UdpProxyFilterTest : public testing::Test { new_session.idle_timer_ = new Event::MockTimer(&callbacks_.udp_listener_.dispatcher_); EXPECT_CALL(*filter_, createIoHandle(_)) .WillOnce(Return(ByMove(Network::IoHandlePtr{test_sessions_.back().io_handle_}))); - EXPECT_CALL(*new_session.io_handle_, fd()); - EXPECT_CALL( - callbacks_.udp_listener_.dispatcher_, - createFileEvent_(_, _, Event::PlatformDefaultTriggerType, Event::FileReadyType::Read)) + EXPECT_CALL(*new_session.io_handle_, createFileEvent_(_, _, Event::PlatformDefaultTriggerType, + Event::FileReadyType::Read)) .WillOnce(DoAll(SaveArg<1>(&new_session.file_event_cb_), Return(nullptr))); // Internal Buffer is Empty, flush will be a no-op ON_CALL(callbacks_.udp_listener_, flush()) diff --git a/test/mocks/network/BUILD b/test/mocks/network/BUILD index 020e4b6db4041..ce0fac882b21b 100644 --- a/test/mocks/network/BUILD +++ b/test/mocks/network/BUILD @@ -25,6 +25,7 @@ envoy_cc_mock( srcs = ["io_handle.cc"], hdrs = ["io_handle.h"], deps = [ + "//include/envoy/event:dispatcher_interface", "//include/envoy/network:io_handle_interface", "//source/common/buffer:buffer_lib", ], diff --git a/test/mocks/network/io_handle.h b/test/mocks/network/io_handle.h index 5bfdcc23ac45f..42b621767455f 100644 --- a/test/mocks/network/io_handle.h +++ b/test/mocks/network/io_handle.h @@ -1,6 +1,7 @@ #pragma once #include "envoy/buffer/buffer.h" +#include "envoy/event/dispatcher.h" #include "envoy/network/io_handle.h" #include "gmock/gmock.h" @@ -13,6 +14,11 @@ class MockIoHandle : public IoHandle { MockIoHandle(); ~MockIoHandle() override; + Event::FileEventPtr createFileEvent(Event::Dispatcher& dispatcher, Event::FileReadyCb cb, + Event::FileTriggerType trigger, uint32_t events) override { + return Event::FileEventPtr{createFileEvent_(dispatcher, cb, trigger, events)}; + } + MOCK_METHOD(os_fd_t, fd, (), (const)); MOCK_METHOD(Api::IoCallUint64Result, close, ()); MOCK_METHOD(bool, isOpen, (), (const)); @@ -42,6 +48,9 @@ class MockIoHandle : public IoHandle { MOCK_METHOD(absl::optional, domain, ()); MOCK_METHOD(Address::InstanceConstSharedPtr, localAddress, ()); MOCK_METHOD(Address::InstanceConstSharedPtr, peerAddress, ()); + MOCK_METHOD(Event::FileEvent*, createFileEvent_, + (Event::Dispatcher & dispatcher, Event::FileReadyCb cb, + Event::FileTriggerType trigger, uint32_t events)); }; } // namespace Network