diff --git a/source/extensions/io_socket/user_space/BUILD b/source/extensions/io_socket/user_space/BUILD index e449d227ef684..420943a7b005a 100644 --- a/source/extensions/io_socket/user_space/BUILD +++ b/source/extensions/io_socket/user_space/BUILD @@ -42,3 +42,19 @@ envoy_cc_library( "//include/envoy/event:dispatcher_interface", ], ) + +envoy_cc_library( + name = "io_socket_handle_lib", + srcs = [ + "io_socket_handle_impl.cc", + ], + hdrs = [ + "io_socket_handle_impl.h", + ], + deps = [ + ":file_event_lib", + ":io_handle_lib", + "//source/common/event:dispatcher_includes", + "//source/common/network:default_socket_interface_lib", + ], +) diff --git a/source/extensions/io_socket/user_space/io_socket_handle_impl.cc b/source/extensions/io_socket/user_space/io_socket_handle_impl.cc new file mode 100644 index 0000000000000..d54bed4da9659 --- /dev/null +++ b/source/extensions/io_socket/user_space/io_socket_handle_impl.cc @@ -0,0 +1,330 @@ +#include "extensions/io_socket/user_space/io_socket_handle_impl.h" + +#include "envoy/buffer/buffer.h" +#include "envoy/common/platform.h" + +#include "common/api/os_sys_calls_impl.h" +#include "common/common/assert.h" +#include "common/common/utility.h" +#include "common/network/address_impl.h" + +#include "extensions/io_socket/user_space/file_event_impl.h" + +#include "absl/types/optional.h" + +namespace Envoy { + +namespace Extensions { +namespace IoSocket { +namespace UserSpace { +namespace { +Api::SysCallIntResult makeInvalidSyscallResult() { + return Api::SysCallIntResult{-1, SOCKET_ERROR_NOT_SUP}; +} +} // namespace + +IoSocketHandleImpl::IoSocketHandleImpl() + : pending_received_data_([&]() -> void { this->onBelowLowWatermark(); }, + [&]() -> void { this->onAboveHighWatermark(); }, []() -> void {}) {} + +IoSocketHandleImpl::~IoSocketHandleImpl() { + if (!closed_) { + close(); + } +} + +Api::IoCallUint64Result IoSocketHandleImpl::close() { + ASSERT(!closed_); + if (!closed_) { + if (peer_handle_) { + ENVOY_LOG(trace, "socket {} close before peer {} closes.", static_cast(this), + static_cast(peer_handle_)); + // Notify the peer we won't write more data. shutdown(WRITE). + peer_handle_->setWriteEnd(); + // Notify the peer that we no longer accept data. shutdown(RD). + peer_handle_->onPeerDestroy(); + peer_handle_ = nullptr; + } else { + ENVOY_LOG(trace, "socket {} close after peer closed.", static_cast(this)); + } + } + closed_ = true; + return Api::ioCallUint64ResultNoError(); +} + +bool IoSocketHandleImpl::isOpen() const { return !closed_; } + +Api::IoCallUint64Result IoSocketHandleImpl::readv(uint64_t max_length, Buffer::RawSlice* slices, + uint64_t num_slice) { + if (!isOpen()) { + return {0, + // TODO(lambdai): Add EBADF in Network::IoSocketError and adopt it here. + Api::IoErrorPtr(new Network::IoSocketError(SOCKET_ERROR_INVAL), + Network::IoSocketError::deleteIoError)}; + } + if (pending_received_data_.length() == 0) { + if (receive_data_end_stream_) { + return {0, Api::IoErrorPtr(nullptr, [](Api::IoError*) {})}; + } else { + return {0, Api::IoErrorPtr(Network::IoSocketError::getIoSocketEagainInstance(), + Network::IoSocketError::deleteIoError)}; + } + } + uint64_t bytes_offset = 0; + for (uint64_t i = 0; i < num_slice && bytes_offset < max_length; i++) { + auto bytes_to_read_in_this_slice = + std::min(std::min(pending_received_data_.length(), max_length) - bytes_offset, + uint64_t(slices[i].len_)); + // Copy and drain, so pending_received_data_ always copy from offset 0. + pending_received_data_.copyOut(0, bytes_to_read_in_this_slice, slices[i].mem_); + pending_received_data_.drain(bytes_to_read_in_this_slice); + bytes_offset += bytes_to_read_in_this_slice; + } + auto bytes_read = bytes_offset; + ASSERT(bytes_read <= max_length); + ENVOY_LOG(trace, "socket {} readv {} bytes", static_cast(this), bytes_read); + return {bytes_read, Api::IoErrorPtr(nullptr, [](Api::IoError*) {})}; +} + +Api::IoCallUint64Result IoSocketHandleImpl::read(Buffer::Instance& buffer, + absl::optional max_length_opt) { + const uint64_t max_length = max_length_opt.value_or(UINT64_MAX); + if (max_length == 0) { + return Api::ioCallUint64ResultNoError(); + } + if (!isOpen()) { + return {0, Api::IoErrorPtr(new Network::IoSocketError(SOCKET_ERROR_INVAL), + Network::IoSocketError::deleteIoError)}; + } + if (pending_received_data_.length() == 0) { + if (receive_data_end_stream_) { + return {0, Api::IoErrorPtr(nullptr, [](Api::IoError*) {})}; + } else { + return {0, Api::IoErrorPtr(Network::IoSocketError::getIoSocketEagainInstance(), + Network::IoSocketError::deleteIoError)}; + } + } + // TODO(lambdai): Move slice by slice until high watermark. + uint64_t max_bytes_to_read = std::min(max_length, pending_received_data_.length()); + buffer.move(pending_received_data_, max_bytes_to_read); + return {max_bytes_to_read, Api::IoErrorPtr(nullptr, [](Api::IoError*) {})}; +} + +Api::IoCallUint64Result IoSocketHandleImpl::writev(const Buffer::RawSlice* slices, + uint64_t num_slice) { + // Empty input is allowed even though the peer is shutdown. + bool is_input_empty = true; + for (uint64_t i = 0; i < num_slice; i++) { + if (slices[i].mem_ != nullptr && slices[i].len_ != 0) { + is_input_empty = false; + break; + } + } + if (is_input_empty) { + return Api::ioCallUint64ResultNoError(); + }; + if (!isOpen()) { + return {0, Api::IoErrorPtr(new Network::IoSocketError(SOCKET_ERROR_INVAL), + Network::IoSocketError::deleteIoError)}; + } + // Closed peer. + if (!peer_handle_) { + return {0, Api::IoErrorPtr(new Network::IoSocketError(SOCKET_ERROR_INVAL), + Network::IoSocketError::deleteIoError)}; + } + // Error: write after close. + if (peer_handle_->isPeerShutDownWrite()) { + // TODO(lambdai): `EPIPE` or `ENOTCONN`. + return {0, Api::IoErrorPtr(new Network::IoSocketError(SOCKET_ERROR_INVAL), + Network::IoSocketError::deleteIoError)}; + } + // The peer is valid but temporary not accepts new data. Likely due to flow control. + if (!peer_handle_->isWritable()) { + return {0, Api::IoErrorPtr(Network::IoSocketError::getIoSocketEagainInstance(), + Network::IoSocketError::deleteIoError)}; + } + + auto* const dest_buffer = peer_handle_->getWriteBuffer(); + // Write along with iteration. Buffer guarantee the fragment is always append-able. + uint64_t bytes_written = 0; + for (uint64_t i = 0; i < num_slice && !dest_buffer->highWatermarkTriggered(); i++) { + if (slices[i].mem_ != nullptr && slices[i].len_ != 0) { + dest_buffer->add(slices[i].mem_, slices[i].len_); + bytes_written += slices[i].len_; + } + } + peer_handle_->setNewDataAvailable(); + ENVOY_LOG(trace, "socket {} writev {} bytes", static_cast(this), bytes_written); + return {bytes_written, Api::IoErrorPtr(nullptr, [](Api::IoError*) {})}; +} + +Api::IoCallUint64Result IoSocketHandleImpl::write(Buffer::Instance& buffer) { + // Empty input is allowed even though the peer is shutdown. + if (buffer.length() == 0) { + return Api::ioCallUint64ResultNoError(); + } + if (!isOpen()) { + return {0, Api::IoErrorPtr(new Network::IoSocketError(SOCKET_ERROR_INVAL), + Network::IoSocketError::deleteIoError)}; + } + // Closed peer. + if (!peer_handle_) { + return {0, Api::IoErrorPtr(new Network::IoSocketError(SOCKET_ERROR_INVAL), + Network::IoSocketError::deleteIoError)}; + } + // Error: write after close. + if (peer_handle_->isPeerShutDownWrite()) { + // TODO(lambdai): `EPIPE` or `ENOTCONN`. + return {0, Api::IoErrorPtr(new Network::IoSocketError(SOCKET_ERROR_INVAL), + Network::IoSocketError::deleteIoError)}; + } + // The peer is valid but temporary not accepts new data. Likely due to flow control. + if (!peer_handle_->isWritable()) { + return {0, Api::IoErrorPtr(Network::IoSocketError::getIoSocketEagainInstance(), + Network::IoSocketError::deleteIoError)}; + } + uint64_t total_bytes_to_write = 0; + const uint64_t max_bytes_to_write = buffer.length(); + while (peer_handle_->isWritable()) { + const auto& front_slice = buffer.frontSlice(); + if (front_slice.len_ == 0) { + break; + } else { + peer_handle_->getWriteBuffer()->move(buffer, front_slice.len_); + total_bytes_to_write += front_slice.len_; + } + } + peer_handle_->setNewDataAvailable(); + ENVOY_LOG(trace, "socket {} writev {} bytes of {}", static_cast(this), + total_bytes_to_write, max_bytes_to_write); + return {total_bytes_to_write, Api::IoErrorPtr(nullptr, [](Api::IoError*) {})}; +} + +Api::IoCallUint64Result IoSocketHandleImpl::sendmsg(const Buffer::RawSlice*, uint64_t, int, + const Network::Address::Ip*, + const Network::Address::Instance&) { + return Network::IoSocketError::ioResultSocketInvalidAddress(); +} + +Api::IoCallUint64Result IoSocketHandleImpl::recvmsg(Buffer::RawSlice*, const uint64_t, uint32_t, + RecvMsgOutput&) { + return Network::IoSocketError::ioResultSocketInvalidAddress(); +} + +Api::IoCallUint64Result IoSocketHandleImpl::recvmmsg(RawSliceArrays&, uint32_t, RecvMsgOutput&) { + return Network::IoSocketError::ioResultSocketInvalidAddress(); +} + +Api::IoCallUint64Result IoSocketHandleImpl::recv(void* buffer, size_t length, int flags) { + if (!isOpen()) { + return {0, Api::IoErrorPtr(new Network::IoSocketError(SOCKET_ERROR_INVAL), + Network::IoSocketError::deleteIoError)}; + } + // No data and the writer closed. + if (pending_received_data_.length() == 0) { + if (receive_data_end_stream_) { + return {0, Api::IoErrorPtr(nullptr, [](Api::IoError*) {})}; + } else { + return {0, Api::IoErrorPtr(Network::IoSocketError::getIoSocketEagainInstance(), + Network::IoSocketError::deleteIoError)}; + } + } + // Specify uint64_t since the latter length may not have the same type. + auto max_bytes_to_read = std::min(pending_received_data_.length(), length); + pending_received_data_.copyOut(0, max_bytes_to_read, buffer); + if (!(flags & MSG_PEEK)) { + pending_received_data_.drain(max_bytes_to_read); + } + return {max_bytes_to_read, Api::IoErrorPtr(nullptr, [](Api::IoError*) {})}; +} + +bool IoSocketHandleImpl::supportsMmsg() const { return false; } + +bool IoSocketHandleImpl::supportsUdpGro() const { return false; } + +Api::SysCallIntResult IoSocketHandleImpl::bind(Network::Address::InstanceConstSharedPtr) { + return makeInvalidSyscallResult(); +} + +Api::SysCallIntResult IoSocketHandleImpl::listen(int) { return makeInvalidSyscallResult(); } + +Network::IoHandlePtr IoSocketHandleImpl::accept(struct sockaddr*, socklen_t*) { + NOT_IMPLEMENTED_GCOVR_EXCL_LINE; +} + +Api::SysCallIntResult IoSocketHandleImpl::connect(Network::Address::InstanceConstSharedPtr) { + // Buffered Io handle should always be considered as connected. + // Use write or read to determine if peer is closed. + return {0, 0}; +} + +Api::SysCallIntResult IoSocketHandleImpl::setOption(int, int, const void*, socklen_t) { + return makeInvalidSyscallResult(); +} + +Api::SysCallIntResult IoSocketHandleImpl::getOption(int, int, void*, socklen_t*) { + return makeInvalidSyscallResult(); +} + +Api::SysCallIntResult IoSocketHandleImpl::setBlocking(bool) { return makeInvalidSyscallResult(); } + +absl::optional IoSocketHandleImpl::domain() { return absl::nullopt; } + +Network::Address::InstanceConstSharedPtr IoSocketHandleImpl::localAddress() { + // TODO(lambdai): Rewrite when caller accept error as the return value. + throw EnvoyException(fmt::format("getsockname failed for IoSocketHandleImpl")); +} + +Network::Address::InstanceConstSharedPtr IoSocketHandleImpl::peerAddress() { + // TODO(lambdai): Rewrite when caller accept error as the return value. + throw EnvoyException(fmt::format("getsockname failed for IoSocketHandleImpl")); +} + +void IoSocketHandleImpl::initializeFileEvent(Event::Dispatcher& dispatcher, Event::FileReadyCb cb, + Event::FileTriggerType trigger, uint32_t events) { + ASSERT(user_file_event_ == nullptr, "Attempting to initialize two `file_event_` for the same " + "file descriptor. This is not allowed."); + ASSERT(trigger != Event::FileTriggerType::Level, "Native level trigger is not supported yet."); + user_file_event_ = std::make_unique(dispatcher, cb, events, *this); +} + +Network::IoHandlePtr IoSocketHandleImpl::duplicate() { + // duplicate() is supposed to be used on listener io handle while this implementation doesn't + // support listen. + NOT_IMPLEMENTED_GCOVR_EXCL_LINE; +} + +void IoSocketHandleImpl::activateFileEvents(uint32_t events) { + if (user_file_event_) { + user_file_event_->activate(events); + } else { + ENVOY_BUG(false, "Null user_file_event_"); + } +} + +void IoSocketHandleImpl::enableFileEvents(uint32_t events) { + if (user_file_event_) { + user_file_event_->setEnabled(events); + } else { + ENVOY_BUG(false, "Null user_file_event_"); + } +} + +void IoSocketHandleImpl::resetFileEvents() { user_file_event_.reset(); } + +Api::SysCallIntResult IoSocketHandleImpl::shutdown(int how) { + // Support only shutdown write. + ASSERT(how == ENVOY_SHUT_WR); + ASSERT(!closed_); + if (!write_shutdown_) { + ASSERT(peer_handle_); + // Notify the peer we won't write more data. + peer_handle_->setWriteEnd(); + write_shutdown_ = true; + } + return {0, 0}; +} +} // namespace UserSpace +} // namespace IoSocket +} // namespace Extensions +} // namespace Envoy \ No newline at end of file diff --git a/source/extensions/io_socket/user_space/io_socket_handle_impl.h b/source/extensions/io_socket/user_space/io_socket_handle_impl.h new file mode 100644 index 0000000000000..8f3ff3bd6ccef --- /dev/null +++ b/source/extensions/io_socket/user_space/io_socket_handle_impl.h @@ -0,0 +1,168 @@ +#pragma once + +#include + +#include "envoy/api/io_error.h" +#include "envoy/api/os_sys_calls.h" +#include "envoy/common/platform.h" +#include "envoy/event/dispatcher.h" +#include "envoy/network/io_handle.h" + +#include "common/buffer/watermark_buffer.h" +#include "common/common/logger.h" +#include "common/network/io_socket_error_impl.h" + +#include "extensions/io_socket/user_space/file_event_impl.h" +#include "extensions/io_socket/user_space/io_handle.h" + +namespace Envoy { +namespace Extensions { +namespace IoSocket { +namespace UserSpace { +/** + * Network::IoHandle implementation which provides a buffer as data source. It is designed to used + * by Network::ConnectionImpl. Some known limitations include + * 1. It doesn't include a file descriptor. Do not use "fdDoNotUse". + * 2. It doesn't support socket options. Wrap this in ConnectionSocket and implement the socket + * getter/setter options. + * 3. It doesn't support UDP interface. + * 4. The peer BufferedIoSocket must be scheduled in the same thread to avoid data race because + * BufferedIoSocketHandle mutates the state of peer handle and no lock is introduced. + */ +class IoSocketHandleImpl final : public Network::IoHandle, + public UserSpace::IoHandle, + protected Logger::Loggable { +public: + IoSocketHandleImpl(); + + ~IoSocketHandleImpl() override; + + // Network::IoHandle + os_fd_t fdDoNotUse() const override { + ASSERT(false, "not supported"); + return INVALID_SOCKET; + } + Api::IoCallUint64Result close() override; + bool isOpen() const override; + Api::IoCallUint64Result readv(uint64_t max_length, Buffer::RawSlice* slices, + uint64_t num_slice) override; + Api::IoCallUint64Result read(Buffer::Instance& buffer, + absl::optional max_length_opt) override; + Api::IoCallUint64Result writev(const Buffer::RawSlice* slices, uint64_t num_slice) override; + Api::IoCallUint64Result write(Buffer::Instance& buffer) override; + Api::IoCallUint64Result sendmsg(const Buffer::RawSlice* slices, uint64_t num_slice, int flags, + const Network::Address::Ip* self_ip, + const Network::Address::Instance& peer_address) override; + Api::IoCallUint64Result recvmsg(Buffer::RawSlice* slices, const uint64_t num_slice, + uint32_t self_port, RecvMsgOutput& output) override; + Api::IoCallUint64Result recvmmsg(RawSliceArrays& slices, uint32_t self_port, + RecvMsgOutput& output) override; + Api::IoCallUint64Result recv(void* buffer, size_t length, int flags) override; + bool supportsMmsg() const override; + bool supportsUdpGro() const override; + Api::SysCallIntResult bind(Network::Address::InstanceConstSharedPtr address) override; + Api::SysCallIntResult listen(int backlog) override; + Network::IoHandlePtr accept(struct sockaddr* addr, socklen_t* addrlen) override; + Api::SysCallIntResult connect(Network::Address::InstanceConstSharedPtr address) override; + Api::SysCallIntResult setOption(int level, int optname, const void* optval, + socklen_t optlen) override; + Api::SysCallIntResult getOption(int level, int optname, void* optval, socklen_t* optlen) override; + Api::SysCallIntResult setBlocking(bool blocking) override; + absl::optional domain() override; + Network::Address::InstanceConstSharedPtr localAddress() override; + Network::Address::InstanceConstSharedPtr peerAddress() override; + + void initializeFileEvent(Event::Dispatcher& dispatcher, Event::FileReadyCb cb, + Event::FileTriggerType trigger, uint32_t events) override; + Network::IoHandlePtr duplicate() override; + void activateFileEvents(uint32_t events) override; + void enableFileEvents(uint32_t events) override; + void resetFileEvents() override; + + Api::SysCallIntResult shutdown(int how) override; + absl::optional lastRoundTripTime() override { return absl::nullopt; } + + void setWatermarks(uint32_t watermark) { pending_received_data_.setWatermarks(watermark); } + void onBelowLowWatermark() { + if (peer_handle_) { + ENVOY_LOG(debug, "Socket {} switches to low watermark. Notify {}.", static_cast(this), + static_cast(peer_handle_)); + peer_handle_->onPeerBufferLowWatermark(); + } + } + void onAboveHighWatermark() { + // Low to high is checked by peer after peer writes data. + } + + // UserSpace::IoHandle + void setWriteEnd() override { + receive_data_end_stream_ = true; + setNewDataAvailable(); + } + void setNewDataAvailable() override { + ENVOY_LOG(trace, "{} on socket {}", __FUNCTION__, static_cast(this)); + if (user_file_event_) { + user_file_event_->activateIfEnabled( + Event::FileReadyType::Read | + // Closed ready type is defined as `end of stream` + (receive_data_end_stream_ ? Event::FileReadyType::Closed : 0)); + } + } + void onPeerDestroy() override { + peer_handle_ = nullptr; + write_shutdown_ = true; + } + void onPeerBufferLowWatermark() override { + if (user_file_event_) { + user_file_event_->activateIfEnabled(Event::FileReadyType::Write); + } + } + bool isWritable() const override { return !pending_received_data_.highWatermarkTriggered(); } + bool isPeerShutDownWrite() const override { return receive_data_end_stream_; } + bool isPeerWritable() const override { + return peer_handle_ != nullptr && !peer_handle_->isPeerShutDownWrite() && + peer_handle_->isWritable(); + } + Buffer::Instance* getWriteBuffer() override { return &pending_received_data_; } + + // `UserspaceIoHandle` + bool isReadable() const override { + return isPeerShutDownWrite() || pending_received_data_.length() > 0; + } + + // Set the peer which will populate the owned pending_received_data. + void setPeerHandle(UserSpace::IoHandle* writable_peer) { + // Swapping writable peer is undefined behavior. + ASSERT(!peer_handle_); + ASSERT(!write_shutdown_); + peer_handle_ = writable_peer; + } + +private: + // Support isOpen() and close(). Network::IoHandle owner must invoke close() to avoid potential + // resource leak. + bool closed_{false}; + + // The attached file event with this socket. The event is not owned by the socket in the current + // Envoy model. Multiple events can be created during the life time of this IO handle but at any + // moment at most 1 event is attached. + std::unique_ptr user_file_event_; + + // True if pending_received_data_ is not addable. Note that pending_received_data_ may have + // pending data to drain. + bool receive_data_end_stream_{false}; + + // The buffer owned by this socket. This buffer is populated by the write operations of the peer + // socket and drained by read operations of this socket. + Buffer::WatermarkBuffer pending_received_data_; + + // Destination of the write(). The value remains non-null until the peer is closed. + UserSpace::IoHandle* peer_handle_{nullptr}; + + // The flag whether the peer is valid. Any write attempt must check this flag. + bool write_shutdown_{false}; +}; +} // namespace UserSpace +} // namespace IoSocket +} // namespace Extensions +} // namespace Envoy \ No newline at end of file diff --git a/test/extensions/io_socket/user_space/BUILD b/test/extensions/io_socket/user_space/BUILD index 3111862fe0107..829d9bacf9f14 100644 --- a/test/extensions/io_socket/user_space/BUILD +++ b/test/extensions/io_socket/user_space/BUILD @@ -27,3 +27,28 @@ envoy_extension_cc_test( "//test/test_common:utility_lib", ], ) + +envoy_extension_cc_test( + name = "io_socket_handle_impl_test", + srcs = ["io_socket_handle_impl_test.cc"], + extension_name = "envoy.io_socket.user_space", + deps = [ + "//source/common/common:utility_lib", + "//source/common/network:address_lib", + "//source/extensions/io_socket/user_space:io_socket_handle_lib", + "//test/mocks/event:event_mocks", + ], +) + +envoy_extension_cc_test( + name = "io_socket_handle_impl_platform_test", + srcs = ["io_socket_handle_impl_platform_test.cc"], + extension_name = "envoy.io_socket.user_space", + tags = ["fails_on_windows"], + deps = [ + "//source/common/common:utility_lib", + "//source/common/network:address_lib", + "//source/extensions/io_socket/user_space:io_socket_handle_lib", + "//test/mocks/event:event_mocks", + ], +) diff --git a/test/extensions/io_socket/user_space/io_socket_handle_impl_platform_test.cc b/test/extensions/io_socket/user_space/io_socket_handle_impl_platform_test.cc new file mode 100644 index 0000000000000..e5b1b808bfbbf --- /dev/null +++ b/test/extensions/io_socket/user_space/io_socket_handle_impl_platform_test.cc @@ -0,0 +1,63 @@ +#include "envoy/common/platform.h" +#include "envoy/event/file_event.h" + +#include "extensions/io_socket/user_space/io_socket_handle_impl.h" + +#include "test/mocks/event/mocks.h" + +#include "gmock/gmock.h" +#include "gtest/gtest.h" + +namespace Envoy { +namespace Extensions { +namespace IoSocket { +namespace UserSpace { +namespace { + +using testing::NiceMock; + +class MockFileEventCallback { +public: + MOCK_METHOD(void, called, (uint32_t arg)); +}; + +// Explicitly mark the test failing on windows and will be fixed. +class BufferedIoSocketHandlePlatformTest : public testing::Test { +public: + BufferedIoSocketHandlePlatformTest() { + first_io_handle_ = std::make_unique(); + second_io_handle_ = std::make_unique(); + first_io_handle_->setPeerHandle(second_io_handle_.get()); + second_io_handle_->setPeerHandle(first_io_handle_.get()); + } + + ~BufferedIoSocketHandlePlatformTest() override { + if (first_io_handle_->isOpen()) { + first_io_handle_->close(); + } + if (second_io_handle_->isOpen()) { + second_io_handle_->close(); + } + } + + std::unique_ptr first_io_handle_; + std::unique_ptr second_io_handle_; + NiceMock dispatcher_; + MockFileEventCallback cb_; +}; + +TEST_F(BufferedIoSocketHandlePlatformTest, CreatePlatformDefaultTriggerTypeFailOnWindows) { + // schedulable_cb will be destroyed by IoHandle. + auto schedulable_cb = new Event::MockSchedulableCallback(&dispatcher_); + EXPECT_CALL(*schedulable_cb, enabled()); + EXPECT_CALL(*schedulable_cb, cancel()); + first_io_handle_->initializeFileEvent( + dispatcher_, [this](uint32_t events) { cb_.called(events); }, + Event::PlatformDefaultTriggerType, Event::FileReadyType::Read); +} + +} // namespace +} // namespace UserSpace +} // namespace IoSocket +} // namespace Extensions +} // namespace Envoy diff --git a/test/extensions/io_socket/user_space/io_socket_handle_impl_test.cc b/test/extensions/io_socket/user_space/io_socket_handle_impl_test.cc new file mode 100644 index 0000000000000..093f12b10c5ce --- /dev/null +++ b/test/extensions/io_socket/user_space/io_socket_handle_impl_test.cc @@ -0,0 +1,1028 @@ +#include "envoy/buffer/buffer.h" +#include "envoy/event/file_event.h" + +#include "common/buffer/buffer_impl.h" +#include "common/common/fancy_logger.h" +#include "common/network/address_impl.h" + +#include "extensions/io_socket/user_space/io_socket_handle_impl.h" + +#include "test/mocks/event/mocks.h" + +#include "absl/container/fixed_array.h" +#include "gmock/gmock.h" +#include "gtest/gtest.h" + +using testing::NiceMock; + +namespace Envoy { +namespace Extensions { +namespace IoSocket { +namespace UserSpace { +namespace { + +MATCHER(IsInvalidAddress, "") { + return arg.err_->getErrorCode() == Api::IoError::IoErrorCode::NoSupport; +} + +MATCHER(IsNotSupportedResult, "") { return arg.errno_ == SOCKET_ERROR_NOT_SUP; } + +ABSL_MUST_USE_RESULT std::pair allocateOneSlice(uint64_t size) { + Buffer::Slice mutable_slice(size); + auto slice = mutable_slice.reserve(size); + EXPECT_NE(nullptr, slice.mem_); + EXPECT_EQ(size, slice.len_); + return {std::move(mutable_slice), slice}; +} + +class MockFileEventCallback { +public: + MOCK_METHOD(void, called, (uint32_t arg)); +}; + +class BufferedIoSocketHandleTest : public testing::Test { +public: + BufferedIoSocketHandleTest() : buf_(1024) { + io_handle_ = std::make_unique(); + io_handle_peer_ = std::make_unique(); + io_handle_->setPeerHandle(io_handle_peer_.get()); + io_handle_peer_->setPeerHandle(io_handle_.get()); + } + + ~BufferedIoSocketHandleTest() override = default; + + Buffer::WatermarkBuffer& getWatermarkBufferHelper(IoSocketHandleImpl& io_handle) { + return dynamic_cast(*io_handle.getWriteBuffer()); + } + + NiceMock dispatcher_; + + // Owned by BufferedIoSocketHandle. + NiceMock* schedulable_cb_; + MockFileEventCallback cb_; + std::unique_ptr io_handle_; + std::unique_ptr io_handle_peer_; + absl::FixedArray buf_; +}; + +// Test recv side effects. +TEST_F(BufferedIoSocketHandleTest, BasicRecv) { + auto& internal_buffer = getWatermarkBufferHelper(*io_handle_); + internal_buffer.add("0123456789"); + { + auto result = io_handle_->recv(buf_.data(), buf_.size(), 0); + ASSERT_EQ(10, result.rc_); + ASSERT_EQ("0123456789", absl::string_view(buf_.data(), result.rc_)); + } + { + auto result = io_handle_->recv(buf_.data(), buf_.size(), 0); + // `EAGAIN`. + EXPECT_FALSE(result.ok()); + EXPECT_EQ(Api::IoError::IoErrorCode::Again, result.err_->getErrorCode()); + } + { + io_handle_->setWriteEnd(); + auto result = io_handle_->recv(buf_.data(), buf_.size(), 0); + EXPECT_TRUE(result.ok()); + } +} + +// Test recv side effects. +TEST_F(BufferedIoSocketHandleTest, RecvPeek) { + auto& internal_buffer = getWatermarkBufferHelper(*io_handle_); + internal_buffer.add("0123456789"); + { + ::memset(buf_.data(), 1, buf_.size()); + auto result = io_handle_->recv(buf_.data(), 5, MSG_PEEK); + ASSERT_EQ(5, result.rc_); + ASSERT_EQ("01234", absl::string_view(buf_.data(), result.rc_)); + // The data beyond the boundary is untouched. + ASSERT_EQ(std::string(buf_.size() - 5, 1), absl::string_view(buf_.data() + 5, buf_.size() - 5)); + } + { + auto result = io_handle_->recv(buf_.data(), buf_.size(), MSG_PEEK); + ASSERT_EQ(10, result.rc_); + ASSERT_EQ("0123456789", absl::string_view(buf_.data(), result.rc_)); + } + { + // Drain the pending buffer. + auto recv_result = io_handle_->recv(buf_.data(), buf_.size(), 0); + EXPECT_TRUE(recv_result.ok()); + EXPECT_EQ(10, recv_result.rc_); + ASSERT_EQ("0123456789", absl::string_view(buf_.data(), recv_result.rc_)); + auto peek_result = io_handle_->recv(buf_.data(), buf_.size(), 0); + // `EAGAIN`. + EXPECT_FALSE(peek_result.ok()); + EXPECT_EQ(Api::IoError::IoErrorCode::Again, peek_result.err_->getErrorCode()); + } + { + // Peek upon shutdown. + io_handle_->setWriteEnd(); + auto result = io_handle_->recv(buf_.data(), buf_.size(), MSG_PEEK); + EXPECT_EQ(0, result.rc_); + ASSERT(result.ok()); + } +} + +TEST_F(BufferedIoSocketHandleTest, RecvPeekWhenPendingDataButShutdown) { + auto& internal_buffer = getWatermarkBufferHelper(*io_handle_); + internal_buffer.add("0123456789"); + auto result = io_handle_->recv(buf_.data(), buf_.size(), MSG_PEEK); + ASSERT_EQ(10, result.rc_); + ASSERT_EQ("0123456789", absl::string_view(buf_.data(), result.rc_)); +} + +TEST_F(BufferedIoSocketHandleTest, MultipleRecvDrain) { + auto& internal_buffer = getWatermarkBufferHelper(*io_handle_); + internal_buffer.add("abcd"); + { + auto result = io_handle_->recv(buf_.data(), 1, 0); + EXPECT_TRUE(result.ok()); + EXPECT_EQ(1, result.rc_); + EXPECT_EQ("a", absl::string_view(buf_.data(), 1)); + } + { + auto result = io_handle_->recv(buf_.data(), buf_.size(), 0); + EXPECT_TRUE(result.ok()); + EXPECT_EQ(3, result.rc_); + + EXPECT_EQ("bcd", absl::string_view(buf_.data(), 3)); + EXPECT_EQ(0, internal_buffer.length()); + } +} + +// Test read side effects. +TEST_F(BufferedIoSocketHandleTest, ReadEmpty) { + Buffer::OwnedImpl buf; + auto result = io_handle_->read(buf, 10); + EXPECT_FALSE(result.ok()); + EXPECT_EQ(Api::IoError::IoErrorCode::Again, result.err_->getErrorCode()); + io_handle_->setWriteEnd(); + result = io_handle_->read(buf, 10); + EXPECT_TRUE(result.ok()); + EXPECT_EQ(0, result.rc_); +} + +// Read allows max_length value 0 and returns no error. +TEST_F(BufferedIoSocketHandleTest, ReadWhileProvidingNoCapacity) { + Buffer::OwnedImpl buf; + absl::optional max_length_opt{0}; + auto result = io_handle_->read(buf, max_length_opt); + EXPECT_TRUE(result.ok()); + EXPECT_EQ(0, result.rc_); +} + +// Test read side effects. +TEST_F(BufferedIoSocketHandleTest, ReadContent) { + Buffer::OwnedImpl buf; + auto& internal_buffer = getWatermarkBufferHelper(*io_handle_); + internal_buffer.add("abcdefg"); + auto result = io_handle_->read(buf, 3); + EXPECT_TRUE(result.ok()); + EXPECT_EQ(3, result.rc_); + ASSERT_EQ(3, buf.length()); + ASSERT_EQ(4, internal_buffer.length()); + result = io_handle_->read(buf, 10); + EXPECT_TRUE(result.ok()); + EXPECT_EQ(4, result.rc_); + ASSERT_EQ(7, buf.length()); + ASSERT_EQ(0, internal_buffer.length()); +} + +// Test readv behavior. +TEST_F(BufferedIoSocketHandleTest, BasicReadv) { + Buffer::OwnedImpl buf_to_write("abc"); + io_handle_peer_->write(buf_to_write); + + Buffer::OwnedImpl buf; + auto reservation = buf.reserveSingleSlice(1024); + auto slice = reservation.slice(); + auto result = io_handle_->readv(1024, &slice, 1); + + EXPECT_TRUE(result.ok()); + EXPECT_EQ(3, result.rc_); + + result = io_handle_->readv(1024, &slice, 1); + + EXPECT_FALSE(result.ok()); + EXPECT_EQ(Api::IoError::IoErrorCode::Again, result.err_->getErrorCode()); + + io_handle_->setWriteEnd(); + result = io_handle_->readv(1024, &slice, 1); + // EOF + EXPECT_TRUE(result.ok()); + EXPECT_EQ(0, result.rc_); +} + +TEST_F(BufferedIoSocketHandleTest, FlowControl) { + io_handle_->setWatermarks(128); + EXPECT_FALSE(io_handle_->isReadable()); + EXPECT_TRUE(io_handle_->isWritable()); + + // Populate the data for io_handle_. + Buffer::OwnedImpl buffer(std::string(256, 'a')); + io_handle_peer_->write(buffer); + + EXPECT_TRUE(io_handle_->isReadable()); + EXPECT_FALSE(io_handle_->isWritable()); + + bool writable_flipped = false; + // During the repeated recv, the writable flag must switch to true. + auto& internal_buffer = getWatermarkBufferHelper(*io_handle_); + while (internal_buffer.length() > 0) { + SCOPED_TRACE(internal_buffer.length()); + FANCY_LOG(debug, "internal buffer length = {}", internal_buffer.length()); + EXPECT_TRUE(io_handle_->isReadable()); + bool writable = io_handle_->isWritable(); + FANCY_LOG(debug, "internal buffer length = {}, writable = {}", internal_buffer.length(), + writable); + if (writable) { + writable_flipped = true; + } else { + ASSERT_FALSE(writable_flipped); + } + auto result = io_handle_->recv(buf_.data(), 32, 0); + EXPECT_TRUE(result.ok()); + EXPECT_EQ(32, result.rc_); + } + ASSERT_EQ(0, internal_buffer.length()); + ASSERT_TRUE(writable_flipped); + + // Finally the buffer is empty. + EXPECT_FALSE(io_handle_->isReadable()); + EXPECT_TRUE(io_handle_->isWritable()); +} + +// Consistent with other IoHandle: allow write empty data when handle is closed. +TEST_F(BufferedIoSocketHandleTest, NoErrorWriteZeroDataToClosedIoHandle) { + io_handle_->close(); + { + Buffer::OwnedImpl buf; + auto result = io_handle_->write(buf); + ASSERT_EQ(0, result.rc_); + ASSERT(result.ok()); + } + { + Buffer::RawSlice slice{nullptr, 0}; + auto result = io_handle_->writev(&slice, 1); + ASSERT_EQ(0, result.rc_); + ASSERT(result.ok()); + } +} + +TEST_F(BufferedIoSocketHandleTest, ErrorOnClosedIoHandle) { + io_handle_->close(); + { + auto [guard, slice] = allocateOneSlice(1024); + auto result = io_handle_->recv(slice.mem_, slice.len_, 0); + ASSERT(!result.ok()); + ASSERT_EQ(Api::IoError::IoErrorCode::UnknownError, result.err_->getErrorCode()); + } + { + Buffer::OwnedImpl buf; + auto result = io_handle_->read(buf, 10); + ASSERT(!result.ok()); + ASSERT_EQ(Api::IoError::IoErrorCode::UnknownError, result.err_->getErrorCode()); + } + { + auto [guard, slice] = allocateOneSlice(1024); + auto result = io_handle_->readv(1024, &slice, 1); + ASSERT(!result.ok()); + ASSERT_EQ(Api::IoError::IoErrorCode::UnknownError, result.err_->getErrorCode()); + } + { + Buffer::OwnedImpl buf("0123456789"); + auto result = io_handle_->write(buf); + ASSERT(!result.ok()); + ASSERT_EQ(Api::IoError::IoErrorCode::UnknownError, result.err_->getErrorCode()); + } + { + Buffer::OwnedImpl buf("0123456789"); + auto slices = buf.getRawSlices(); + ASSERT(!slices.empty()); + auto result = io_handle_->writev(slices.data(), slices.size()); + ASSERT(!result.ok()); + ASSERT_EQ(Api::IoError::IoErrorCode::UnknownError, result.err_->getErrorCode()); + } +} + +TEST_F(BufferedIoSocketHandleTest, RepeatedShutdownWR) { + EXPECT_EQ(io_handle_peer_->shutdown(ENVOY_SHUT_WR).rc_, 0); + EXPECT_EQ(io_handle_peer_->shutdown(ENVOY_SHUT_WR).rc_, 0); +} + +TEST_F(BufferedIoSocketHandleTest, ShutDownOptionsNotSupported) { + ASSERT_DEBUG_DEATH(io_handle_peer_->shutdown(ENVOY_SHUT_RD), ""); + ASSERT_DEBUG_DEATH(io_handle_peer_->shutdown(ENVOY_SHUT_RDWR), ""); +} + +TEST_F(BufferedIoSocketHandleTest, WriteByMove) { + Buffer::OwnedImpl buf("0123456789"); + auto result = io_handle_peer_->write(buf); + EXPECT_TRUE(result.ok()); + EXPECT_EQ(10, result.rc_); + auto& internal_buffer = getWatermarkBufferHelper(*io_handle_); + EXPECT_EQ("0123456789", internal_buffer.toString()); + EXPECT_EQ(0, buf.length()); +} + +// Test write return error code. Ignoring the side effect of event scheduling. +TEST_F(BufferedIoSocketHandleTest, WriteAgain) { + // Populate write destination with massive data so as to not writable. + io_handle_peer_->setWatermarks(128); + Buffer::OwnedImpl pending_data(std::string(256, 'a')); + io_handle_->write(pending_data); + EXPECT_FALSE(io_handle_peer_->isWritable()); + + Buffer::OwnedImpl buf("0123456789"); + auto result = io_handle_->write(buf); + ASSERT_EQ(result.err_->getErrorCode(), Api::IoError::IoErrorCode::Again); + EXPECT_EQ(10, buf.length()); +} + +// Test write() moves the fragments in front until the destination is over high watermark. +TEST_F(BufferedIoSocketHandleTest, PartialWrite) { + // Populate write destination with massive data so as to not writable. + io_handle_peer_->setWatermarks(128); + // Fragment contents | a |`bbbb...b`|`ccc`| + // Len per fragment | 1 | 255 | 3 | + // Watermark boundary at b area | low | high | + // Write | 1st | 2nd | + Buffer::OwnedImpl pending_data("a"); + auto long_frag = Buffer::OwnedBufferFragmentImpl::create( + std::string(255, 'b'), + [](const Buffer::OwnedBufferFragmentImpl* fragment) { delete fragment; }); + auto tail_frag = Buffer::OwnedBufferFragmentImpl::create( + "ccc", [](const Buffer::OwnedBufferFragmentImpl* fragment) { delete fragment; }); + pending_data.addBufferFragment(*long_frag.release()); + pending_data.addBufferFragment(*tail_frag.release()); + + // Partial write: the first two slices are moved because the second slice move reaches the high + // watermark. + auto result = io_handle_->write(pending_data); + EXPECT_TRUE(result.ok()); + EXPECT_EQ(result.rc_, 256); + EXPECT_EQ(pending_data.length(), 3); + EXPECT_FALSE(io_handle_peer_->isWritable()); + + // Confirm that the further write return `EAGAIN`. + auto result2 = io_handle_->write(pending_data); + ASSERT_EQ(result2.err_->getErrorCode(), Api::IoError::IoErrorCode::Again); + + // Make the peer writable again. + Buffer::OwnedImpl black_hole_buffer; + io_handle_peer_->read(black_hole_buffer, 10240); + EXPECT_TRUE(io_handle_peer_->isWritable()); + auto result3 = io_handle_->write(pending_data); + EXPECT_EQ(result3.rc_, 3); + EXPECT_EQ(0, pending_data.length()); +} + +TEST_F(BufferedIoSocketHandleTest, WriteErrorAfterShutdown) { + Buffer::OwnedImpl buf("0123456789"); + // Write after shutdown. + io_handle_->shutdown(ENVOY_SHUT_WR); + auto result = io_handle_->write(buf); + ASSERT_EQ(result.err_->getErrorCode(), Api::IoError::IoErrorCode::UnknownError); + EXPECT_EQ(10, buf.length()); +} + +TEST_F(BufferedIoSocketHandleTest, WriteErrorAfterClose) { + Buffer::OwnedImpl buf("0123456789"); + io_handle_peer_->close(); + EXPECT_TRUE(io_handle_->isOpen()); + auto result = io_handle_->write(buf); + ASSERT_EQ(result.err_->getErrorCode(), Api::IoError::IoErrorCode::UnknownError); +} + +// Test writev return error code. Ignoring the side effect of event scheduling. +TEST_F(BufferedIoSocketHandleTest, WritevAgain) { + auto [guard, slice] = allocateOneSlice(128); + // Populate write destination with massive data so as to not writable. + io_handle_peer_->setWatermarks(128); + auto& internal_buffer = getWatermarkBufferHelper(*io_handle_peer_); + internal_buffer.add(std::string(256, ' ')); + auto result = io_handle_->writev(&slice, 1); + ASSERT_EQ(result.err_->getErrorCode(), Api::IoError::IoErrorCode::Again); +} + +// Test writev() copies the slices in front until the destination is over high watermark. +TEST_F(BufferedIoSocketHandleTest, PartialWritev) { + // Populate write destination with massive data so as to not writable. + io_handle_peer_->setWatermarks(128); + // Slices contents | a |`bbbb...b`|`ccc`| + // Len per slice | 1 | 255 | 3 | + // Watermark boundary at b area | low | high | + // Writev | 1st | 2nd | + Buffer::OwnedImpl pending_data("a"); + auto long_frag = Buffer::OwnedBufferFragmentImpl::create( + std::string(255, 'b'), + [](const Buffer::OwnedBufferFragmentImpl* fragment) { delete fragment; }); + auto tail_frag = Buffer::OwnedBufferFragmentImpl::create( + "ccc", [](const Buffer::OwnedBufferFragmentImpl* fragment) { delete fragment; }); + pending_data.addBufferFragment(*long_frag.release()); + pending_data.addBufferFragment(*tail_frag.release()); + + // Partial write: the first two slices are moved because the second slice move reaches the high + // watermark. + auto slices = pending_data.getRawSlices(); + EXPECT_EQ(3, slices.size()); + auto result = io_handle_->writev(slices.data(), slices.size()); + EXPECT_TRUE(result.ok()); + EXPECT_EQ(result.rc_, 256); + pending_data.drain(result.rc_); + EXPECT_EQ(pending_data.length(), 3); + EXPECT_FALSE(io_handle_peer_->isWritable()); + + // Confirm that the further write return `EAGAIN`. + auto slices2 = pending_data.getRawSlices(); + auto result2 = io_handle_->writev(slices2.data(), slices2.size()); + ASSERT_EQ(result2.err_->getErrorCode(), Api::IoError::IoErrorCode::Again); + + // Make the peer writable again. + Buffer::OwnedImpl black_hole_buffer; + io_handle_peer_->read(black_hole_buffer, 10240); + EXPECT_TRUE(io_handle_peer_->isWritable()); + auto slices3 = pending_data.getRawSlices(); + auto result3 = io_handle_->writev(slices3.data(), slices3.size()); + EXPECT_EQ(result3.rc_, 3); + pending_data.drain(result3.rc_); + EXPECT_EQ(0, pending_data.length()); +} + +TEST_F(BufferedIoSocketHandleTest, WritevErrorAfterShutdown) { + auto [guard, slice] = allocateOneSlice(128); + // Writev after shutdown. + io_handle_->shutdown(ENVOY_SHUT_WR); + auto result = io_handle_->writev(&slice, 1); + ASSERT_EQ(result.err_->getErrorCode(), Api::IoError::IoErrorCode::UnknownError); +} + +TEST_F(BufferedIoSocketHandleTest, WritevErrorAfterClose) { + auto [guard, slice] = allocateOneSlice(1024); + // Close the peer. + io_handle_peer_->close(); + EXPECT_TRUE(io_handle_->isOpen()); + auto result = io_handle_->writev(&slice, 1); + ASSERT_EQ(result.err_->getErrorCode(), Api::IoError::IoErrorCode::UnknownError); +} + +TEST_F(BufferedIoSocketHandleTest, WritevToPeer) { + std::string raw_data("0123456789"); + absl::InlinedVector slices{ + // Contains 1 byte. + Buffer::RawSlice{static_cast(raw_data.data()), 1}, + // Contains 0 byte. + Buffer::RawSlice{nullptr, 1}, + // Contains 0 byte. + Buffer::RawSlice{raw_data.data() + 1, 0}, + // Contains 2 byte. + Buffer::RawSlice{raw_data.data() + 1, 2}, + }; + io_handle_peer_->writev(slices.data(), slices.size()); + auto& internal_buffer = getWatermarkBufferHelper(*io_handle_); + EXPECT_EQ(3, internal_buffer.length()); + EXPECT_EQ("012", internal_buffer.toString()); +} + +TEST_F(BufferedIoSocketHandleTest, EventScheduleBasic) { + auto schedulable_cb = new Event::MockSchedulableCallback(&dispatcher_); + EXPECT_CALL(*schedulable_cb, enabled()); + EXPECT_CALL(*schedulable_cb, scheduleCallbackNextIteration()); + io_handle_->initializeFileEvent( + dispatcher_, [this](uint32_t events) { cb_.called(events); }, Event::FileTriggerType::Edge, + Event::FileReadyType::Read | Event::FileReadyType::Write); + + EXPECT_CALL(cb_, called(Event::FileReadyType::Write)); + schedulable_cb->invokeCallback(); + io_handle_->resetFileEvents(); +} + +TEST_F(BufferedIoSocketHandleTest, SetEnabledTriggerEventSchedule) { + auto schedulable_cb = new NiceMock(&dispatcher_); + // No data is available to read. Will not schedule read. + { + SCOPED_TRACE("enable read but no readable."); + EXPECT_CALL(*schedulable_cb, enabled()); + EXPECT_CALL(*schedulable_cb, scheduleCallbackNextIteration()).Times(0); + io_handle_->initializeFileEvent( + dispatcher_, [this](uint32_t events) { cb_.called(events); }, Event::FileTriggerType::Edge, + Event::FileReadyType::Read); + testing::Mock::VerifyAndClearExpectations(schedulable_cb); + } + { + SCOPED_TRACE("enable readwrite but only writable."); + EXPECT_CALL(*schedulable_cb, enabled()); + EXPECT_CALL(*schedulable_cb, scheduleCallbackNextIteration()); + io_handle_->enableFileEvents(Event::FileReadyType::Read | Event::FileReadyType::Write); + ASSERT_TRUE(schedulable_cb->enabled_); + EXPECT_CALL(cb_, called(Event::FileReadyType::Write)); + schedulable_cb->invokeCallback(); + ASSERT_FALSE(schedulable_cb->enabled_); + testing::Mock::VerifyAndClearExpectations(schedulable_cb); + } + { + SCOPED_TRACE("enable write and writable."); + EXPECT_CALL(*schedulable_cb, enabled()); + EXPECT_CALL(*schedulable_cb, scheduleCallbackNextIteration()); + io_handle_->enableFileEvents(Event::FileReadyType::Write); + ASSERT_TRUE(schedulable_cb->enabled_); + EXPECT_CALL(cb_, called(Event::FileReadyType::Write)); + schedulable_cb->invokeCallback(); + ASSERT_FALSE(schedulable_cb->enabled_); + testing::Mock::VerifyAndClearExpectations(schedulable_cb); + } + // Close io_handle_ first to prevent events originated from peer close. + io_handle_->close(); + io_handle_peer_->close(); +} + +TEST_F(BufferedIoSocketHandleTest, ReadAndWriteAreEdgeTriggered) { + auto schedulable_cb = new Event::MockSchedulableCallback(&dispatcher_); + EXPECT_CALL(*schedulable_cb, enabled()); + EXPECT_CALL(*schedulable_cb, scheduleCallbackNextIteration()); + io_handle_->initializeFileEvent( + dispatcher_, [this](uint32_t events) { cb_.called(events); }, Event::FileTriggerType::Edge, + Event::FileReadyType::Read | Event::FileReadyType::Write); + + EXPECT_CALL(cb_, called(Event::FileReadyType::Write)); + schedulable_cb->invokeCallback(); + + Buffer::OwnedImpl buf("abcd"); + EXPECT_CALL(*schedulable_cb, scheduleCallbackNextIteration()); + io_handle_peer_->write(buf); + + EXPECT_CALL(cb_, called(Event::FileReadyType::Read)); + schedulable_cb->invokeCallback(); + + // Drain 1 bytes. + auto result = io_handle_->recv(buf_.data(), 1, 0); + EXPECT_TRUE(result.ok()); + EXPECT_EQ(1, result.rc_); + + ASSERT_FALSE(schedulable_cb->enabled_); + io_handle_->resetFileEvents(); +} + +TEST_F(BufferedIoSocketHandleTest, SetDisabledBlockEventSchedule) { + auto schedulable_cb = new Event::MockSchedulableCallback(&dispatcher_); + EXPECT_CALL(*schedulable_cb, enabled()); + EXPECT_CALL(*schedulable_cb, scheduleCallbackNextIteration()); + io_handle_->initializeFileEvent( + dispatcher_, [this](uint32_t events) { cb_.called(events); }, Event::FileTriggerType::Edge, + Event::FileReadyType::Write); + ASSERT_TRUE(schedulable_cb->enabled_); + + // The write event is cleared and the read event is not ready. + EXPECT_CALL(*schedulable_cb, enabled()); + EXPECT_CALL(*schedulable_cb, cancel()); + io_handle_->enableFileEvents(Event::FileReadyType::Read); + testing::Mock::VerifyAndClearExpectations(schedulable_cb); + + ASSERT_FALSE(schedulable_cb->enabled_); + io_handle_->resetFileEvents(); +} + +TEST_F(BufferedIoSocketHandleTest, EventResetClearCallback) { + auto schedulable_cb = new Event::MockSchedulableCallback(&dispatcher_); + EXPECT_CALL(*schedulable_cb, enabled()); + EXPECT_CALL(*schedulable_cb, scheduleCallbackNextIteration()); + io_handle_->initializeFileEvent( + dispatcher_, [this](uint32_t events) { cb_.called(events); }, Event::FileTriggerType::Edge, + Event::FileReadyType::Write); + ASSERT_TRUE(schedulable_cb->enabled_); + io_handle_->resetFileEvents(); +} + +TEST_F(BufferedIoSocketHandleTest, DrainToLowWaterMarkTriggerReadEvent) { + io_handle_->setWatermarks(128); + auto& internal_buffer = getWatermarkBufferHelper(*io_handle_); + + EXPECT_FALSE(io_handle_->isReadable()); + EXPECT_TRUE(io_handle_peer_->isWritable()); + + std::string big_chunk(256, 'a'); + internal_buffer.add(big_chunk); + EXPECT_TRUE(io_handle_->isReadable()); + EXPECT_FALSE(io_handle_->isWritable()); + + auto schedulable_cb = new Event::MockSchedulableCallback(&dispatcher_); + EXPECT_CALL(*schedulable_cb, enabled()); + // No event is available. + EXPECT_CALL(*schedulable_cb, cancel()); + io_handle_peer_->initializeFileEvent( + dispatcher_, [this](uint32_t events) { cb_.called(events); }, Event::FileTriggerType::Edge, + Event::FileReadyType::Read | Event::FileReadyType::Write); + // Neither readable nor writable. + ASSERT_FALSE(schedulable_cb->enabled_); + + { + SCOPED_TRACE("drain very few data."); + auto result = io_handle_->recv(buf_.data(), 1, 0); + EXPECT_FALSE(io_handle_->isWritable()); + } + { + SCOPED_TRACE("drain to low watermark."); + EXPECT_CALL(*schedulable_cb, scheduleCallbackNextIteration()); + auto result = io_handle_->recv(buf_.data(), 232, 0); + EXPECT_TRUE(io_handle_->isWritable()); + EXPECT_CALL(cb_, called(Event::FileReadyType::Write)); + schedulable_cb->invokeCallback(); + } + { + SCOPED_TRACE("clean up."); + EXPECT_CALL(*schedulable_cb, scheduleCallbackNextIteration()); + // Important: close before peer. + io_handle_->close(); + } +} + +TEST_F(BufferedIoSocketHandleTest, Close) { + auto& internal_buffer = getWatermarkBufferHelper(*io_handle_); + internal_buffer.add("abcd"); + std::string accumulator; + schedulable_cb_ = new NiceMock(&dispatcher_); + EXPECT_CALL(*schedulable_cb_, scheduleCallbackNextIteration()); + bool should_close = false; + io_handle_->initializeFileEvent( + dispatcher_, + [this, &should_close, handle = io_handle_.get(), &accumulator](uint32_t events) { + if (events & Event::FileReadyType::Read) { + while (true) { + auto result = io_handle_->recv(buf_.data(), buf_.size(), 0); + if (result.ok()) { + // Read EOF. + if (result.rc_ == 0) { + should_close = true; + break; + } else { + accumulator += absl::string_view(buf_.data(), result.rc_); + } + } else if (result.err_->getErrorCode() == Api::IoError::IoErrorCode::Again) { + ENVOY_LOG_MISC(debug, "read returns EAGAIN"); + break; + } else { + ENVOY_LOG_MISC(debug, "will close"); + should_close = true; + break; + } + } + } + if (events & Event::FileReadyType::Write) { + Buffer::OwnedImpl buf(""); + auto result = io_handle_->write(buf); + if (!result.ok() && result.err_->getErrorCode() != Api::IoError::IoErrorCode::Again) { + should_close = true; + } + } + }, + Event::FileTriggerType::Edge, Event::FileReadyType::Read | Event::FileReadyType::Write); + schedulable_cb_->invokeCallback(); + + // Not closed yet. + ASSERT_FALSE(should_close); + + EXPECT_CALL(*schedulable_cb_, scheduleCallbackNextIteration()); + io_handle_peer_->close(); + + ASSERT_TRUE(schedulable_cb_->enabled()); + schedulable_cb_->invokeCallback(); + ASSERT_TRUE(should_close); + + EXPECT_CALL(*schedulable_cb_, scheduleCallbackNextIteration()).Times(0); + io_handle_->close(); + EXPECT_EQ(4, accumulator.size()); + io_handle_->resetFileEvents(); +} + +// Test that a readable event is raised when peer shutdown write. Also confirm read will return +// EAGAIN. +TEST_F(BufferedIoSocketHandleTest, ShutDownRaiseEvent) { + auto& internal_buffer = getWatermarkBufferHelper(*io_handle_); + internal_buffer.add("abcd"); + + std::string accumulator; + schedulable_cb_ = new NiceMock(&dispatcher_); + EXPECT_CALL(*schedulable_cb_, scheduleCallbackNextIteration()); + bool should_close = false; + io_handle_->initializeFileEvent( + dispatcher_, + [this, &should_close, handle = io_handle_.get(), &accumulator](uint32_t events) { + if (events & Event::FileReadyType::Read) { + auto result = io_handle_->recv(buf_.data(), buf_.size(), 0); + if (result.ok()) { + accumulator += absl::string_view(buf_.data(), result.rc_); + } else if (result.err_->getErrorCode() == Api::IoError::IoErrorCode::Again) { + ENVOY_LOG_MISC(debug, "read returns EAGAIN"); + } else { + ENVOY_LOG_MISC(debug, "will close"); + should_close = true; + } + } + }, + Event::FileTriggerType::Edge, Event::FileReadyType::Read); + schedulable_cb_->invokeCallback(); + + // Not closed yet. + ASSERT_FALSE(should_close); + + EXPECT_CALL(*schedulable_cb_, scheduleCallbackNextIteration()); + io_handle_peer_->shutdown(ENVOY_SHUT_WR); + + ASSERT_TRUE(schedulable_cb_->enabled()); + schedulable_cb_->invokeCallback(); + ASSERT_FALSE(should_close); + EXPECT_EQ(4, accumulator.size()); + io_handle_->close(); + io_handle_->resetFileEvents(); +} + +TEST_F(BufferedIoSocketHandleTest, WriteScheduleWritableEvent) { + std::string accumulator; + schedulable_cb_ = new NiceMock(&dispatcher_); + EXPECT_CALL(*schedulable_cb_, scheduleCallbackNextIteration()); + bool should_close = false; + io_handle_->initializeFileEvent( + dispatcher_, + [&should_close, handle = io_handle_.get(), &accumulator](uint32_t events) { + if (events & Event::FileReadyType::Read) { + Buffer::OwnedImpl buf; + auto reservation = buf.reserveSingleSlice(1024); + auto slice = reservation.slice(); + auto result = handle->readv(1024, &slice, 1); + if (result.ok()) { + accumulator += absl::string_view(static_cast(slice.mem_), result.rc_); + } else if (result.err_->getErrorCode() == Api::IoError::IoErrorCode::Again) { + ENVOY_LOG_MISC(debug, "read returns EAGAIN"); + } else { + ENVOY_LOG_MISC(debug, "will close"); + should_close = true; + } + } + }, + Event::FileTriggerType::Edge, Event::FileReadyType::Read | Event::FileReadyType::Write); + schedulable_cb_->invokeCallback(); + EXPECT_FALSE(schedulable_cb_->enabled()); + + Buffer::OwnedImpl data_to_write("0123456789"); + EXPECT_CALL(*schedulable_cb_, scheduleCallbackNextIteration()); + io_handle_peer_->write(data_to_write); + EXPECT_EQ(0, data_to_write.length()); + + EXPECT_TRUE(schedulable_cb_->enabled()); + schedulable_cb_->invokeCallback(); + EXPECT_EQ("0123456789", accumulator); + EXPECT_FALSE(should_close); + + io_handle_->close(); +} + +TEST_F(BufferedIoSocketHandleTest, WritevScheduleWritableEvent) { + std::string accumulator; + schedulable_cb_ = new NiceMock(&dispatcher_); + EXPECT_CALL(*schedulable_cb_, scheduleCallbackNextIteration()); + bool should_close = false; + io_handle_->initializeFileEvent( + dispatcher_, + [&should_close, handle = io_handle_.get(), &accumulator](uint32_t events) { + if (events & Event::FileReadyType::Read) { + Buffer::OwnedImpl buf; + auto reservation = buf.reserveSingleSlice(1024); + auto slice = reservation.slice(); + auto result = handle->readv(1024, &slice, 1); + if (result.ok()) { + accumulator += absl::string_view(static_cast(slice.mem_), result.rc_); + } else if (result.err_->getErrorCode() == Api::IoError::IoErrorCode::Again) { + ENVOY_LOG_MISC(debug, "read returns EAGAIN"); + } else { + ENVOY_LOG_MISC(debug, "will close"); + should_close = true; + } + } + }, + Event::FileTriggerType::Edge, Event::FileReadyType::Read | Event::FileReadyType::Write); + schedulable_cb_->invokeCallback(); + EXPECT_FALSE(schedulable_cb_->enabled()); + + std::string raw_data("0123456789"); + Buffer::RawSlice slice{static_cast(raw_data.data()), raw_data.size()}; + EXPECT_CALL(*schedulable_cb_, scheduleCallbackNextIteration()); + io_handle_peer_->writev(&slice, 1); + + EXPECT_TRUE(schedulable_cb_->enabled()); + schedulable_cb_->invokeCallback(); + EXPECT_EQ("0123456789", accumulator); + EXPECT_FALSE(should_close); + + io_handle_->close(); +} + +TEST_F(BufferedIoSocketHandleTest, ReadAfterShutdownWrite) { + io_handle_peer_->shutdown(ENVOY_SHUT_WR); + ENVOY_LOG_MISC(debug, "after {} shutdown write ", static_cast(io_handle_peer_.get())); + std::string accumulator; + schedulable_cb_ = new NiceMock(&dispatcher_); + bool should_close = false; + io_handle_peer_->initializeFileEvent( + dispatcher_, + [&should_close, handle = io_handle_peer_.get(), &accumulator](uint32_t events) { + if (events & Event::FileReadyType::Read) { + Buffer::OwnedImpl buf; + auto reservation = buf.reserveSingleSlice(1024); + auto slice = reservation.slice(); + auto result = handle->readv(1024, &slice, 1); + if (result.ok()) { + if (result.rc_ == 0) { + should_close = true; + } else { + accumulator += absl::string_view(static_cast(slice.mem_), result.rc_); + } + } else if (result.err_->getErrorCode() == Api::IoError::IoErrorCode::Again) { + ENVOY_LOG_MISC(debug, "read returns EAGAIN"); + } else { + ENVOY_LOG_MISC(debug, "will close"); + should_close = true; + } + } + }, + Event::FileTriggerType::Edge, Event::FileReadyType::Read); + + EXPECT_FALSE(schedulable_cb_->enabled()); + std::string raw_data("0123456789"); + Buffer::RawSlice slice{static_cast(raw_data.data()), raw_data.size()}; + EXPECT_CALL(*schedulable_cb_, scheduleCallbackNextIteration()); + io_handle_->writev(&slice, 1); + EXPECT_TRUE(schedulable_cb_->enabled()); + + schedulable_cb_->invokeCallback(); + EXPECT_FALSE(schedulable_cb_->enabled()); + EXPECT_EQ(raw_data, accumulator); + + EXPECT_CALL(*schedulable_cb_, scheduleCallbackNextIteration()); + io_handle_->close(); + io_handle_->resetFileEvents(); +} + +TEST_F(BufferedIoSocketHandleTest, NotifyWritableAfterShutdownWrite) { + io_handle_peer_->setWatermarks(128); + + Buffer::OwnedImpl buf(std::string(256, 'a')); + io_handle_->write(buf); + EXPECT_FALSE(io_handle_peer_->isWritable()); + + io_handle_->shutdown(ENVOY_SHUT_WR); + FANCY_LOG(debug, "after {} shutdown write", static_cast(io_handle_.get())); + + auto schedulable_cb = new Event::MockSchedulableCallback(&dispatcher_); + EXPECT_CALL(*schedulable_cb, enabled()); + EXPECT_CALL(*schedulable_cb, scheduleCallbackNextIteration()); + io_handle_peer_->initializeFileEvent( + dispatcher_, [this](uint32_t events) { cb_.called(events); }, Event::FileTriggerType::Edge, + Event::FileReadyType::Read); + EXPECT_CALL(cb_, called(Event::FileReadyType::Read)); + schedulable_cb->invokeCallback(); + EXPECT_FALSE(schedulable_cb->enabled_); + + EXPECT_CALL(*schedulable_cb, scheduleCallbackNextIteration()).Times(0); + auto result = io_handle_peer_->recv(buf_.data(), buf_.size(), 0); + EXPECT_EQ(256, result.rc_); + // Readable event is not activated due to edge trigger type. + EXPECT_FALSE(schedulable_cb->enabled_); + + // The `end of stream` is delivered. + auto result_at_eof = io_handle_peer_->recv(buf_.data(), buf_.size(), 0); + EXPECT_EQ(0, result_at_eof.rc_); + + // Also confirm `EOS` can triggered read ready event. + EXPECT_CALL(*schedulable_cb, enabled()); + EXPECT_CALL(*schedulable_cb, scheduleCallbackNextIteration()); + io_handle_peer_->enableFileEvents(Event::FileReadyType::Read); + EXPECT_CALL(cb_, called(Event::FileReadyType::Read)); + schedulable_cb->invokeCallback(); + + io_handle_peer_->close(); +} + +TEST_F(BufferedIoSocketHandleTest, NotSupportingMmsg) { EXPECT_FALSE(io_handle_->supportsMmsg()); } + +TEST_F(BufferedIoSocketHandleTest, NotSupportsUdpGro) { + EXPECT_FALSE(io_handle_->supportsUdpGro()); +} + +TEST_F(BufferedIoSocketHandleTest, DomainNullOpt) { + EXPECT_FALSE(io_handle_->domain().has_value()); +} + +TEST_F(BufferedIoSocketHandleTest, Connect) { + auto address_is_ignored = + std::make_shared("listener_id"); + EXPECT_EQ(0, io_handle_->connect(address_is_ignored).rc_); +} + +TEST_F(BufferedIoSocketHandleTest, ActivateEvent) { + schedulable_cb_ = new NiceMock(&dispatcher_); + io_handle_->initializeFileEvent( + dispatcher_, [&, handle = io_handle_.get()](uint32_t) {}, Event::FileTriggerType::Edge, + Event::FileReadyType::Read); + EXPECT_FALSE(schedulable_cb_->enabled()); + io_handle_->activateFileEvents(Event::FileReadyType::Read); + ASSERT_TRUE(schedulable_cb_->enabled()); +} + +TEST_F(BufferedIoSocketHandleTest, DeathOnActivatingDestroyedEvents) { + io_handle_->resetFileEvents(); + ASSERT_DEBUG_DEATH(io_handle_->activateFileEvents(Event::FileReadyType::Read), + "Null user_file_event_"); +} + +TEST_F(BufferedIoSocketHandleTest, DeathOnEnablingDestroyedEvents) { + io_handle_->resetFileEvents(); + ASSERT_DEBUG_DEATH(io_handle_->enableFileEvents(Event::FileReadyType::Read), + "Null user_file_event_"); +} + +TEST_F(BufferedIoSocketHandleTest, NotImplementDuplicate) { + ASSERT_DEATH(io_handle_->duplicate(), ""); +} + +TEST_F(BufferedIoSocketHandleTest, NotImplementAccept) { + ASSERT_DEATH(io_handle_->accept(nullptr, nullptr), ""); +} + +TEST_F(BufferedIoSocketHandleTest, LastRoundtripTimeNullOpt) { + ASSERT_EQ(absl::nullopt, io_handle_->lastRoundTripTime()); +} + +class BufferedIoSocketHandleNotImplementedTest : public testing::Test { +public: + BufferedIoSocketHandleNotImplementedTest() { + io_handle_ = std::make_unique(); + io_handle_peer_ = std::make_unique(); + io_handle_->setPeerHandle(io_handle_peer_.get()); + io_handle_peer_->setPeerHandle(io_handle_.get()); + } + + ~BufferedIoSocketHandleNotImplementedTest() override { + if (io_handle_->isOpen()) { + io_handle_->close(); + } + if (io_handle_peer_->isOpen()) { + io_handle_peer_->close(); + } + } + + std::unique_ptr io_handle_; + std::unique_ptr io_handle_peer_; + Buffer::RawSlice slice_; +}; + +TEST_F(BufferedIoSocketHandleNotImplementedTest, ErrorOnSetBlocking) { + EXPECT_THAT(io_handle_->setBlocking(false), IsNotSupportedResult()); + EXPECT_THAT(io_handle_->setBlocking(true), IsNotSupportedResult()); +} + +TEST_F(BufferedIoSocketHandleNotImplementedTest, ErrorOnSendmsg) { + EXPECT_THAT(io_handle_->sendmsg(&slice_, 0, 0, nullptr, + Network::Address::EnvoyInternalInstance("listener_id")), + IsInvalidAddress()); +} + +TEST_F(BufferedIoSocketHandleNotImplementedTest, ErrorOnRecvmsg) { + Network::IoHandle::RecvMsgOutput output_is_ignored(1, nullptr); + EXPECT_THAT(io_handle_->recvmsg(&slice_, 0, 0, output_is_ignored), IsInvalidAddress()); +} + +TEST_F(BufferedIoSocketHandleNotImplementedTest, ErrorOnRecvmmsg) { + RawSliceArrays slices_is_ignored(1, absl::FixedArray({slice_})); + Network::IoHandle::RecvMsgOutput output_is_ignored(1, nullptr); + EXPECT_THAT(io_handle_->recvmmsg(slices_is_ignored, 0, output_is_ignored), IsInvalidAddress()); +} + +TEST_F(BufferedIoSocketHandleNotImplementedTest, ErrorOnBind) { + auto address_is_ignored = + std::make_shared("listener_id"); + EXPECT_THAT(io_handle_->bind(address_is_ignored), IsNotSupportedResult()); +} + +TEST_F(BufferedIoSocketHandleNotImplementedTest, ErrorOnListen) { + int back_log_is_ignored = 0; + EXPECT_THAT(io_handle_->listen(back_log_is_ignored), IsNotSupportedResult()); +} + +TEST_F(BufferedIoSocketHandleNotImplementedTest, ErrorOnAddress) { + ASSERT_THROW(io_handle_->peerAddress(), EnvoyException); + ASSERT_THROW(io_handle_->localAddress(), EnvoyException); +} + +TEST_F(BufferedIoSocketHandleNotImplementedTest, ErrorOnSetOption) { + EXPECT_THAT(io_handle_->setOption(0, 0, nullptr, 0), IsNotSupportedResult()); +} + +TEST_F(BufferedIoSocketHandleNotImplementedTest, ErrorOnGetOption) { + EXPECT_THAT(io_handle_->getOption(0, 0, nullptr, nullptr), IsNotSupportedResult()); +} +} // namespace +} // namespace UserSpace +} // namespace IoSocket +} // namespace Extensions +} // namespace Envoy