diff --git a/source/common/network/BUILD b/source/common/network/BUILD index 114142e7040a8..8920fafc685ea 100644 --- a/source/common/network/BUILD +++ b/source/common/network/BUILD @@ -235,6 +235,7 @@ envoy_cc_library( "//envoy/event:dispatcher_interface", "//envoy/network:io_handle_interface", "//source/common/api:os_sys_calls_lib", + "//source/common/buffer:buffer_lib", "//source/common/event:dispatcher_includes", "@envoy_api//envoy/extensions/network/socket_interface/v3:pkg_cc_proto", ], diff --git a/source/common/network/io_socket_handle_impl.cc b/source/common/network/io_socket_handle_impl.cc index 452f57711b5e4..3ba5c012e8bb3 100644 --- a/source/common/network/io_socket_handle_impl.cc +++ b/source/common/network/io_socket_handle_impl.cc @@ -114,6 +114,16 @@ Api::IoCallUint64Result IoSocketHandleImpl::read(Buffer::Instance& buffer, if (max_length == 0) { return Api::ioCallUint64ResultNoError(); } + if constexpr (Event::PlatformDefaultTriggerType == Event::FileTriggerType::EmulatedEdge) { + if (buffer_->length() > 0) { + auto move_length = std::min(buffer_->length(), max_length); + buffer.move(*buffer_); + if (file_event_) { + file_event_->registerEventIfEmulatedEdge(Event::FileReadyType::Read); + } + return Api::IoCallUint64Result(move_length, Api::IoErrorPtr(nullptr, [](Api::IoError*) {})); + } + } Buffer::Reservation reservation = buffer.reserveForRead(); Api::IoCallUint64Result result = readv(std::min(reservation.length(), max_length), reservation.slices(), reservation.numSlices()); @@ -502,6 +512,46 @@ Api::IoCallUint64Result IoSocketHandleImpl::recvmmsg(RawSliceArrays& slices, uin } Api::IoCallUint64Result IoSocketHandleImpl::recv(void* buffer, size_t length, int flags) { + if constexpr (Event::PlatformDefaultTriggerType == Event::FileTriggerType::EmulatedEdge) { + // Mimic the MSG_PEEK by read data. + if (flags & MSG_PEEK) { + // Actually, this drain the data out of socket buffer. Although the `buffer_` may + // already has the enough data, but we have to drain the socket to ensure there is + // no more `Read` event. + Buffer::Reservation reservation = buffer_->reserveForRead(); + auto length_to_read = std::min(reservation.length(), length); + // TODO(soulxu): this should be in a while loop until eagain returned. + Api::IoCallUint64Result result = + readv(length_to_read, reservation.slices(), reservation.numSlices()); + uint64_t bytes_to_commit = result.ok() ? result.return_value_ : 0; + reservation.commit(bytes_to_commit); + if (file_event_) { + file_event_->registerEventIfEmulatedEdge(Event::FileReadyType::Read); + } + // The read failed, then we return the failure also. + if (bytes_to_commit == 0) { + return result; + } + auto copy_size = std::min(buffer_->length(), length); + buffer_->copyOut(0, copy_size, buffer); + return Api::IoCallUint64Result(copy_size, Api::IoErrorPtr(nullptr, [](Api::IoError*) {})); + ; + } else { + if (buffer_->length() > 0) { + auto copy_size = std::min(buffer_->length(), length); + buffer_->copyOut(0, copy_size, buffer); + buffer_->drain(copy_size); + if (copy_size < length) { + length = length - copy_size; + } else { + if (file_event_) { + file_event_->registerEventIfEmulatedEdge(Event::FileReadyType::Read); + } + return Api::IoCallUint64Result(copy_size, Api::IoErrorPtr(nullptr, [](Api::IoError*) {})); + } + } + } + } const Api::SysCallSizeResult result = Api::OsSysCallsSingleton::get().recv(fd_, buffer, length, flags); auto io_result = sysCallResultToIoCallResult(result); diff --git a/source/common/network/io_socket_handle_impl.h b/source/common/network/io_socket_handle_impl.h index ae4129de73e5a..a964b480e4874 100644 --- a/source/common/network/io_socket_handle_impl.h +++ b/source/common/network/io_socket_handle_impl.h @@ -6,6 +6,7 @@ #include "envoy/event/dispatcher.h" #include "envoy/network/io_handle.h" +#include "source/common/buffer/buffer_impl.h" #include "source/common/common/logger.h" #include "source/common/network/io_socket_error_impl.h" @@ -19,7 +20,8 @@ class IoSocketHandleImpl : public IoHandle, protected Logger::Loggable domain = absl::nullopt) - : fd_(fd), socket_v6only_(socket_v6only), domain_(domain) {} + : fd_(fd), socket_v6only_(socket_v6only), domain_(domain), + buffer_(std::make_unique()) {} // Close underlying socket if close() hasn't been call yet. ~IoSocketHandleImpl() override; @@ -104,6 +106,7 @@ class IoSocketHandleImpl : public IoHandle, protected Logger::Loggable domain_; Event::FileEventPtr file_event_{nullptr}; + std::unique_ptr buffer_; // The minimum cmsg buffer size to filled in destination address, packets dropped and gso // size when receiving a packet. It is possible for a received packet to contain both IPv4