From 212b6e1e19aa85c00352e446a8461104ab60416a Mon Sep 17 00:00:00 2001 From: He Jie Xu Date: Wed, 29 Sep 2021 09:18:43 +0000 Subject: [PATCH 1/3] socket: mimic recv peek by read Signed-off-by: He Jie Xu --- source/common/network/BUILD | 1 + .../common/network/io_socket_handle_impl.cc | 36 +++++++++++++++++++ source/common/network/io_socket_handle_impl.h | 5 ++- 3 files changed, 41 insertions(+), 1 deletion(-) diff --git a/source/common/network/BUILD b/source/common/network/BUILD index 114142e7040a8..8920fafc685ea 100644 --- a/source/common/network/BUILD +++ b/source/common/network/BUILD @@ -235,6 +235,7 @@ envoy_cc_library( "//envoy/event:dispatcher_interface", "//envoy/network:io_handle_interface", "//source/common/api:os_sys_calls_lib", + "//source/common/buffer:buffer_lib", "//source/common/event:dispatcher_includes", "@envoy_api//envoy/extensions/network/socket_interface/v3:pkg_cc_proto", ], diff --git a/source/common/network/io_socket_handle_impl.cc b/source/common/network/io_socket_handle_impl.cc index 452f57711b5e4..186f3aba24ebe 100644 --- a/source/common/network/io_socket_handle_impl.cc +++ b/source/common/network/io_socket_handle_impl.cc @@ -114,6 +114,13 @@ Api::IoCallUint64Result IoSocketHandleImpl::read(Buffer::Instance& buffer, if (max_length == 0) { return Api::ioCallUint64ResultNoError(); } + if constexpr (Event::PlatformDefaultTriggerType == Event::FileTriggerType::EmulatedEdge) { + if (buffer_->length() > 0) { + auto move_length = std::min(buffer_->length(), max_length); + buffer_->move(buffer); + return Api::IoCallUint64Result(move_length, Api::IoErrorPtr(nullptr, [](Api::IoError*) {})); + } + } Buffer::Reservation reservation = buffer.reserveForRead(); Api::IoCallUint64Result result = readv(std::min(reservation.length(), max_length), reservation.slices(), reservation.numSlices()); @@ -502,6 +509,35 @@ Api::IoCallUint64Result IoSocketHandleImpl::recvmmsg(RawSliceArrays& slices, uin } Api::IoCallUint64Result IoSocketHandleImpl::recv(void* buffer, size_t length, int flags) { + if constexpr (Event::PlatformDefaultTriggerType == Event::FileTriggerType::EmulatedEdge) { + if (flags & MSG_PEEK) { + Buffer::Reservation reservation = buffer_->reserveForRead(); + Api::IoCallUint64Result result = readv(std::min(reservation.length(), length), + reservation.slices(), reservation.numSlices()); + uint64_t bytes_to_commit = result.ok() ? result.return_value_ : 0; + reservation.commit(bytes_to_commit); + // Emulated edge events need to registered if the socket operation did not complete + // because the socket would block. + if constexpr (Event::PlatformDefaultTriggerType == Event::FileTriggerType::EmulatedEdge) { + if (result.wouldBlock() && file_event_) { + file_event_->registerEventIfEmulatedEdge(Event::FileReadyType::Read); + } + } + buffer_->copyOut(0, bytes_to_commit, buffer); + return result; + } else { + if (buffer_->length() > 0) { + auto copy_size = std::min(buffer_->length(), length); + buffer_->copyOut(0, copy_size, buffer); + buffer_->drain(copy_size); + if (copy_size < length) { + length = length - copy_size; + } else { + return Api::IoCallUint64Result(copy_size, Api::IoErrorPtr(nullptr, [](Api::IoError*) {})); + } + } + } + } const Api::SysCallSizeResult result = Api::OsSysCallsSingleton::get().recv(fd_, buffer, length, flags); auto io_result = sysCallResultToIoCallResult(result); diff --git a/source/common/network/io_socket_handle_impl.h b/source/common/network/io_socket_handle_impl.h index ae4129de73e5a..a964b480e4874 100644 --- a/source/common/network/io_socket_handle_impl.h +++ b/source/common/network/io_socket_handle_impl.h @@ -6,6 +6,7 @@ #include "envoy/event/dispatcher.h" #include "envoy/network/io_handle.h" +#include "source/common/buffer/buffer_impl.h" #include "source/common/common/logger.h" #include "source/common/network/io_socket_error_impl.h" @@ -19,7 +20,8 @@ class IoSocketHandleImpl : public IoHandle, protected Logger::Loggable domain = absl::nullopt) - : fd_(fd), socket_v6only_(socket_v6only), domain_(domain) {} + : fd_(fd), socket_v6only_(socket_v6only), domain_(domain), + buffer_(std::make_unique()) {} // Close underlying socket if close() hasn't been call yet. ~IoSocketHandleImpl() override; @@ -104,6 +106,7 @@ class IoSocketHandleImpl : public IoHandle, protected Logger::Loggable domain_; Event::FileEventPtr file_event_{nullptr}; + std::unique_ptr buffer_; // The minimum cmsg buffer size to filled in destination address, packets dropped and gso // size when receiving a packet. It is possible for a received packet to contain both IPv4 From 0ecf9d9a069d44b82e5a271ce09b05011c2bced9 Mon Sep 17 00:00:00 2001 From: He Jie Xu Date: Wed, 29 Sep 2021 14:40:16 +0000 Subject: [PATCH 2/3] fix Signed-off-by: He Jie Xu --- .../common/network/io_socket_handle_impl.cc | 36 +++++++++++++------ source/common/network/io_socket_handle_impl.h | 8 +++-- 2 files changed, 31 insertions(+), 13 deletions(-) diff --git a/source/common/network/io_socket_handle_impl.cc b/source/common/network/io_socket_handle_impl.cc index 186f3aba24ebe..3ba5c012e8bb3 100644 --- a/source/common/network/io_socket_handle_impl.cc +++ b/source/common/network/io_socket_handle_impl.cc @@ -117,7 +117,10 @@ Api::IoCallUint64Result IoSocketHandleImpl::read(Buffer::Instance& buffer, if constexpr (Event::PlatformDefaultTriggerType == Event::FileTriggerType::EmulatedEdge) { if (buffer_->length() > 0) { auto move_length = std::min(buffer_->length(), max_length); - buffer_->move(buffer); + buffer.move(*buffer_); + if (file_event_) { + file_event_->registerEventIfEmulatedEdge(Event::FileReadyType::Read); + } return Api::IoCallUint64Result(move_length, Api::IoErrorPtr(nullptr, [](Api::IoError*) {})); } } @@ -510,21 +513,29 @@ Api::IoCallUint64Result IoSocketHandleImpl::recvmmsg(RawSliceArrays& slices, uin Api::IoCallUint64Result IoSocketHandleImpl::recv(void* buffer, size_t length, int flags) { if constexpr (Event::PlatformDefaultTriggerType == Event::FileTriggerType::EmulatedEdge) { + // Mimic the MSG_PEEK by read data. if (flags & MSG_PEEK) { + // Actually, this drain the data out of socket buffer. Although the `buffer_` may + // already has the enough data, but we have to drain the socket to ensure there is + // no more `Read` event. Buffer::Reservation reservation = buffer_->reserveForRead(); - Api::IoCallUint64Result result = readv(std::min(reservation.length(), length), - reservation.slices(), reservation.numSlices()); + auto length_to_read = std::min(reservation.length(), length); + // TODO(soulxu): this should be in a while loop until eagain returned. + Api::IoCallUint64Result result = + readv(length_to_read, reservation.slices(), reservation.numSlices()); uint64_t bytes_to_commit = result.ok() ? result.return_value_ : 0; reservation.commit(bytes_to_commit); - // Emulated edge events need to registered if the socket operation did not complete - // because the socket would block. - if constexpr (Event::PlatformDefaultTriggerType == Event::FileTriggerType::EmulatedEdge) { - if (result.wouldBlock() && file_event_) { - file_event_->registerEventIfEmulatedEdge(Event::FileReadyType::Read); - } + if (file_event_) { + file_event_->registerEventIfEmulatedEdge(Event::FileReadyType::Read); + } + // The read failed, then we return the failure also. + if (bytes_to_commit == 0) { + return result; } - buffer_->copyOut(0, bytes_to_commit, buffer); - return result; + auto copy_size = std::min(buffer_->length(), length); + buffer_->copyOut(0, copy_size, buffer); + return Api::IoCallUint64Result(copy_size, Api::IoErrorPtr(nullptr, [](Api::IoError*) {})); + ; } else { if (buffer_->length() > 0) { auto copy_size = std::min(buffer_->length(), length); @@ -533,6 +544,9 @@ Api::IoCallUint64Result IoSocketHandleImpl::recv(void* buffer, size_t length, in if (copy_size < length) { length = length - copy_size; } else { + if (file_event_) { + file_event_->registerEventIfEmulatedEdge(Event::FileReadyType::Read); + } return Api::IoCallUint64Result(copy_size, Api::IoErrorPtr(nullptr, [](Api::IoError*) {})); } } diff --git a/source/common/network/io_socket_handle_impl.h b/source/common/network/io_socket_handle_impl.h index a964b480e4874..fe7fbc26fb730 100644 --- a/source/common/network/io_socket_handle_impl.h +++ b/source/common/network/io_socket_handle_impl.h @@ -6,7 +6,7 @@ #include "envoy/event/dispatcher.h" #include "envoy/network/io_handle.h" -#include "source/common/buffer/buffer_impl.h" +#include "source/common/buffer/watermark_buffer.h" #include "source/common/common/logger.h" #include "source/common/network/io_socket_error_impl.h" @@ -21,11 +21,15 @@ class IoSocketHandleImpl : public IoHandle, protected Logger::Loggable domain = absl::nullopt) : fd_(fd), socket_v6only_(socket_v6only), domain_(domain), - buffer_(std::make_unique()) {} + buffer_(dispatcher.getWatermarkFactory().createBuffer( + [this]() -> void { this->onReadBufferLowWatermark(); }, + [this]() -> void { this->onReadBufferHighWatermark(); }, []() -> void {})) {} // Close underlying socket if close() hasn't been call yet. ~IoSocketHandleImpl() override; + void onReadBufferLowWatermark(); + void onReadBufferHighWatermark(); // TODO(sbelair2) To be removed when the fd is fully abstracted from clients. os_fd_t fdDoNotUse() const override { return fd_; } From b27392d497423d9a1d5ca0690e5de3b0c1b843ce Mon Sep 17 00:00:00 2001 From: He Jie Xu Date: Wed, 29 Sep 2021 14:49:41 +0000 Subject: [PATCH 3/3] fix Signed-off-by: He Jie Xu --- source/common/network/io_socket_handle_impl.h | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/source/common/network/io_socket_handle_impl.h b/source/common/network/io_socket_handle_impl.h index fe7fbc26fb730..a964b480e4874 100644 --- a/source/common/network/io_socket_handle_impl.h +++ b/source/common/network/io_socket_handle_impl.h @@ -6,7 +6,7 @@ #include "envoy/event/dispatcher.h" #include "envoy/network/io_handle.h" -#include "source/common/buffer/watermark_buffer.h" +#include "source/common/buffer/buffer_impl.h" #include "source/common/common/logger.h" #include "source/common/network/io_socket_error_impl.h" @@ -21,15 +21,11 @@ class IoSocketHandleImpl : public IoHandle, protected Logger::Loggable domain = absl::nullopt) : fd_(fd), socket_v6only_(socket_v6only), domain_(domain), - buffer_(dispatcher.getWatermarkFactory().createBuffer( - [this]() -> void { this->onReadBufferLowWatermark(); }, - [this]() -> void { this->onReadBufferHighWatermark(); }, []() -> void {})) {} + buffer_(std::make_unique()) {} // Close underlying socket if close() hasn't been call yet. ~IoSocketHandleImpl() override; - void onReadBufferLowWatermark(); - void onReadBufferHighWatermark(); // TODO(sbelair2) To be removed when the fd is fully abstracted from clients. os_fd_t fdDoNotUse() const override { return fd_; }