-
Notifications
You must be signed in to change notification settings - Fork 5.3k
io: add iohandle backed with io_uring #19082
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
a9ca9f5
5ccddde
bf57019
aa6158d
4d1d16b
bb8cce1
ff5494e
a4e7d33
bc06800
8e7bd74
0227a48
980d34a
e743fc2
23fc1ba
d269dae
5b80a6e
620a532
6c41f50
a02ad71
f5708d6
b54ba51
c26d6f6
e30687e
63d0b78
15b2730
268997b
814dbb2
19c23f8
4684321
451af2f
6bc1821
dc4151e
9d365c7
25f1924
89faf66
5ffae83
7dfad5b
0ffeeee
0aabb0e
2e363f7
e6cea15
e0a1ee0
4aef5c9
e86974a
c8233aa
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,39 @@ | ||
| syntax = "proto3"; | ||
|
|
||
| package envoy.extensions.network.socket_interface.v3; | ||
|
|
||
| import "google/protobuf/wrappers.proto"; | ||
|
|
||
| import "udpa/annotations/status.proto"; | ||
| import "validate/validate.proto"; | ||
|
|
||
| option java_package = "io.envoyproxy.envoy.extensions.network.socket_interface.v3"; | ||
| option java_outer_classname = "IoUringSocketInterfaceProto"; | ||
| option java_multiple_files = true; | ||
| option go_package = "github.com/envoyproxy/go-control-plane/envoy/extensions/network/socket_interface/v3;socket_interfacev3"; | ||
| option (udpa.annotations.file_status).package_version_status = ACTIVE; | ||
|
|
||
| // [#protodoc-title: ``io_uring`` Socket Interface configuration] | ||
|
|
||
| // Configuration for a socket interface that relies on Linux specific ``io_uring`` API to create | ||
| // sockets. | ||
| message IoUringSocketInterface { | ||
| // The size of read buffer. If not set, defaults to 8192. | ||
| google.protobuf.UInt32Value read_buffer_size = 1 [(validate.rules).uint32 = {gte: 4096}]; | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 8192 seems a bit small. Also, are there any restrictions on this being a multiple of 4096?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not aware of any limit. I don't run real life clusters and simply decided to start with two pages. |
||
|
|
||
| // The size of both submission and completion queues in queue entries. For heavily loaded | ||
| // processes 300 queue entries is a good enough value. If the load is not high and memory | ||
| // is a constraint then it's safe to have smaller queues. If not set, defaults to 300 | ||
| // queue entries. | ||
| google.protobuf.UInt32Value io_uring_size = 2 [(validate.rules).uint32 = {gte: 16}]; | ||
|
|
||
| // When this flag is specified, a kernel thread is created to perform submission queue | ||
| // polling. An ``io_uring`` instance configured in this way enables ``io_uring`` sockets to | ||
| // issue I/O without ever context switching into the kernel and with better latency. | ||
| // | ||
| // Please note that the polling kernel thread will waste CPU cycles after the ``io_uring`` | ||
| // instance becomes inactive for a grace period which is set to 1 second currently. The | ||
| // polling kernel thread will be started automatically as soon as the ``io_uring`` instance | ||
| // becomes active again. | ||
| bool use_submission_queue_polling = 3; | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Are there any performance notes / tradeoffs that should be explained here? |
||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,26 @@ | ||
| .. _config_sock_interface_io_uring: | ||
|
|
||
| io_uring Socket Interface | ||
| ========================= | ||
|
|
||
| * :ref:`v3 API reference <envoy_v3_api_msg_extensions.network.socket_interface.v3.IoUringSocketInterface>` | ||
|
|
||
| .. attention:: | ||
|
|
||
| The io_uring socket interface extension is experimental and is currently under active development. | ||
|
|
||
| io_uring is an asynchronous I/O API implemented in the Linux kernel. | ||
| This socket interface uses [liburing](https://github.com/axboe/liburing) to integrate io_uring | ||
| with Envoy. | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It would be helpful to add some info about performance/latency/cpu/memory/etc tradeoffs that we expect to have when this extension is complete. Or at least some mention of the potential which justifies the development of this extension. |
||
|
|
||
| Example configuration | ||
| --------------------- | ||
|
|
||
| .. code-block:: yaml | ||
|
|
||
| bootstrap_extensions: | ||
| - name: envoy.extensions.io_socket.io_uring | ||
| typed_config: | ||
| "@type": type.googleapis.com/envoy.extensions.network.socket_interface.v3.IoUringSocketInterface | ||
| default_socket_interface: "envoy.extensions.network.socket_interface.io_uring" | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -5,6 +5,7 @@ Other features | |
| :maxdepth: 2 | ||
|
|
||
| internal_listener | ||
| io_uring | ||
| rate_limit | ||
| vcl | ||
| wasm | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,43 @@ | ||
| load( | ||
| "//bazel:envoy_build_system.bzl", | ||
| "envoy_cc_extension", | ||
| "envoy_cc_library", | ||
| "envoy_extension_package", | ||
| ) | ||
|
|
||
| licenses(["notice"]) # Apache 2 | ||
|
|
||
| envoy_extension_package() | ||
|
|
||
| envoy_cc_extension( | ||
| name = "config", | ||
| srcs = ["config.cc"], | ||
| hdrs = ["config.h"], | ||
| deps = [ | ||
| ":io_handle_impl_lib", | ||
| "//source/common/network:socket_interface_lib", | ||
| "@envoy_api//envoy/extensions/network/socket_interface/v3:pkg_cc_proto", | ||
| ] + select({ | ||
| "//bazel:linux": ["//source/common/io:io_uring_impl_lib"], | ||
| "//conditions:default": [], | ||
| }), | ||
| ) | ||
|
|
||
| envoy_cc_library( | ||
| name = "io_handle_impl_lib", | ||
| srcs = [ | ||
| "io_handle_impl.cc", | ||
| ], | ||
| hdrs = [ | ||
| "io_handle_impl.h", | ||
| ], | ||
| deps = [ | ||
| "//envoy/event:dispatcher_interface", | ||
| "//envoy/network:io_handle_interface", | ||
| "//source/common/api:os_sys_calls_lib", | ||
| "//source/common/buffer:buffer_lib", | ||
| "//source/common/io:io_uring_interface", | ||
| "//source/common/network:address_lib", | ||
| "//source/common/network:io_socket_error_lib", | ||
| ], | ||
| ) |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,123 @@ | ||
| #include "source/extensions/io_socket/io_uring/config.h" | ||
|
|
||
| #include "envoy/common/platform.h" | ||
| #include "envoy/extensions/network/socket_interface/v3/io_uring_socket_interface.pb.validate.h" | ||
|
|
||
| #include "source/common/api/os_sys_calls_impl.h" | ||
| #include "source/common/io/io_uring_impl.h" | ||
| #include "source/extensions/io_socket/io_uring/io_handle_impl.h" | ||
|
|
||
| namespace Envoy { | ||
| namespace Extensions { | ||
| namespace IoSocket { | ||
| namespace IoUring { | ||
|
|
||
| namespace { | ||
|
|
||
| constexpr uint32_t DefaultIoUringSize = 300; | ||
| constexpr uint32_t DefaultReadBufferSize = 8192; | ||
|
|
||
| } // namespace | ||
|
|
||
| void SocketInterfaceExtension::onServerInitialized() { factory_.onServerInitialized(); } | ||
|
|
||
| Network::IoHandlePtr | ||
| SocketInterfaceImpl::socket(Network::Socket::Type socket_type, Network::Address::Type addr_type, | ||
| Network::Address::IpVersion version, bool socket_v6only, | ||
| const Network::SocketCreationOptions& options) const { | ||
| int protocol = 0; | ||
| int flags = 0; | ||
|
|
||
| if (options.mptcp_enabled_) { | ||
| ASSERT(socket_type == Network::Socket::Type::Stream); | ||
| ASSERT(addr_type == Network::Address::Type::Ip); | ||
| protocol = IPPROTO_MPTCP; | ||
| } | ||
|
|
||
| if (socket_type == Network::Socket::Type::Stream) { | ||
| flags |= SOCK_STREAM; | ||
| } else { | ||
| flags |= SOCK_DGRAM; | ||
| } | ||
|
|
||
| int domain; | ||
| if (addr_type == Network::Address::Type::Ip) { | ||
| if (version == Network::Address::IpVersion::v6) { | ||
| domain = AF_INET6; | ||
| } else { | ||
| ASSERT(version == Network::Address::IpVersion::v4); | ||
| domain = AF_INET; | ||
| } | ||
| } else if (addr_type == Network::Address::Type::Pipe) { | ||
| domain = AF_UNIX; | ||
| } else { | ||
| PANIC("not implemented"); | ||
| } | ||
|
|
||
| const Api::SysCallSocketResult result = | ||
| Api::OsSysCallsSingleton::get().socket(domain, flags, protocol); | ||
| RELEASE_ASSERT(SOCKET_VALID(result.return_value_), | ||
| fmt::format("socket(2) failed, got error: {}", errorDetails(result.errno_))); | ||
|
|
||
| ASSERT(io_uring_factory_ != nullptr); | ||
| return std::make_unique<IoUringSocketHandleImpl>(read_buffer_size_, *io_uring_factory_, | ||
| result.return_value_, socket_v6only, domain); | ||
| } | ||
|
|
||
| Network::IoHandlePtr | ||
| SocketInterfaceImpl::socket(Network::Socket::Type socket_type, | ||
| const Network::Address::InstanceConstSharedPtr addr, | ||
| const Network::SocketCreationOptions& options) const { | ||
| Network::Address::IpVersion ip_version = | ||
| addr->ip() ? addr->ip()->version() : Network::Address::IpVersion::v4; | ||
| int v6only = 0; | ||
| if (addr->type() == Network::Address::Type::Ip && ip_version == Network::Address::IpVersion::v6) { | ||
| v6only = addr->ip()->ipv6()->v6only(); | ||
| } | ||
|
|
||
| Network::IoHandlePtr io_handle = | ||
| SocketInterfaceImpl::socket(socket_type, addr->type(), ip_version, v6only, options); | ||
| if (addr->type() == Network::Address::Type::Ip && ip_version == Network::Address::IpVersion::v6) { | ||
| // Setting IPV6_V6ONLY restricts the IPv6 socket to IPv6 connections only. | ||
| const Api::SysCallIntResult result = io_handle->setOption( | ||
| IPPROTO_IPV6, IPV6_V6ONLY, reinterpret_cast<const char*>(&v6only), sizeof(v6only)); | ||
| RELEASE_ASSERT(!SOCKET_FAILURE(result.return_value_), ""); | ||
| } | ||
| return io_handle; | ||
| } | ||
|
|
||
| bool SocketInterfaceImpl::ipFamilySupported(int domain) { | ||
| Api::OsSysCalls& os_sys_calls = Api::OsSysCallsSingleton::get(); | ||
| const Api::SysCallSocketResult result = os_sys_calls.socket(domain, SOCK_STREAM, 0); | ||
| if (SOCKET_VALID(result.return_value_)) { | ||
| RELEASE_ASSERT( | ||
| os_sys_calls.close(result.return_value_).return_value_ == 0, | ||
| fmt::format("Fail to close fd: response code {}", errorDetails(result.return_value_))); | ||
| } | ||
| return SOCKET_VALID(result.return_value_); | ||
| } | ||
|
|
||
| Server::BootstrapExtensionPtr SocketInterfaceImpl::createBootstrapExtension( | ||
| const Protobuf::Message& message, Server::Configuration::ServerFactoryContext& context) { | ||
| auto config = MessageUtil::downcastAndValidate< | ||
| const envoy::extensions::network::socket_interface::v3::IoUringSocketInterface&>( | ||
| message, context.messageValidationContext().staticValidationVisitor()); | ||
| read_buffer_size_ = | ||
| PROTOBUF_GET_WRAPPED_OR_DEFAULT(config, read_buffer_size, DefaultReadBufferSize); | ||
| io_uring_factory_ = std::make_unique<Io::IoUringFactoryImpl>( | ||
| PROTOBUF_GET_WRAPPED_OR_DEFAULT(config, io_uring_size, DefaultIoUringSize), | ||
| config.use_submission_queue_polling(), context.threadLocal()); | ||
| return std::make_unique<SocketInterfaceExtension>(*this, *io_uring_factory_); | ||
| } | ||
|
|
||
| ProtobufTypes::MessagePtr SocketInterfaceImpl::createEmptyConfigProto() { | ||
| return std::make_unique< | ||
| envoy::extensions::network::socket_interface::v3::IoUringSocketInterface>(); | ||
| } | ||
|
|
||
| REGISTER_FACTORY(SocketInterfaceImpl, Server::Configuration::BootstrapExtensionFactory); | ||
|
|
||
| } // namespace IoUring | ||
| } // namespace IoSocket | ||
| } // namespace Extensions | ||
| } // namespace Envoy |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,60 @@ | ||
| #pragma once | ||
|
|
||
| #include "envoy/extensions/network/socket_interface/v3/io_uring_socket_interface.pb.h" | ||
|
|
||
| #include "source/common/network/socket_interface.h" | ||
|
|
||
| namespace Envoy { | ||
|
|
||
| namespace Io { | ||
| class IoUringFactory; | ||
| } // namespace Io | ||
|
|
||
| namespace Extensions { | ||
| namespace IoSocket { | ||
| namespace IoUring { | ||
|
|
||
| class SocketInterfaceExtension : public Network::SocketInterfaceExtension { | ||
| public: | ||
| SocketInterfaceExtension(Network::SocketInterface& sock_interface, Io::IoUringFactory& factory) | ||
| : Network::SocketInterfaceExtension(sock_interface), factory_(factory) {} | ||
|
|
||
| // Server::BootstrapExtension | ||
| void onServerInitialized() override; | ||
|
|
||
| protected: | ||
| Io::IoUringFactory& factory_; | ||
| }; | ||
|
|
||
| class SocketInterfaceImpl : public Network::SocketInterfaceBase { | ||
| public: | ||
| // SocketInterface | ||
| Network::IoHandlePtr socket(Network::Socket::Type socket_type, Network::Address::Type addr_type, | ||
| Network::Address::IpVersion version, bool socket_v6only, | ||
| const Network::SocketCreationOptions& options) const override; | ||
| Network::IoHandlePtr socket(Network::Socket::Type socket_type, | ||
| const Network::Address::InstanceConstSharedPtr addr, | ||
| const Network::SocketCreationOptions& options) const override; | ||
| bool ipFamilySupported(int domain) override; | ||
|
|
||
| // Server::Configuration::BootstrapExtensionFactory | ||
| Server::BootstrapExtensionPtr | ||
| createBootstrapExtension(const Protobuf::Message& message, | ||
| Server::Configuration::ServerFactoryContext& context) override; | ||
|
|
||
| ProtobufTypes::MessagePtr createEmptyConfigProto() override; | ||
| std::string name() const override { | ||
| return "envoy.extensions.network.socket_interface.io_uring"; | ||
| }; | ||
|
|
||
| private: | ||
| uint32_t read_buffer_size_; | ||
| std::unique_ptr<Io::IoUringFactory> io_uring_factory_; | ||
| }; | ||
|
|
||
| DECLARE_FACTORY(SocketInterfaceImpl); | ||
|
|
||
| } // namespace IoUring | ||
| } // namespace IoSocket | ||
| } // namespace Extensions | ||
| } // namespace Envoy |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's chat about over slack about CODEOWNERS. I want to make sure I can satisfy this extension's ownership needs if my name is listed here :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Matt suggested the code should be in the core. Then CODEOWNERS will remain untouched :)