Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions source/common/network/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,7 @@ envoy_cc_library(
"//envoy/event:dispatcher_interface",
"//envoy/network:io_handle_interface",
"//source/common/api:os_sys_calls_lib",
"//source/common/buffer:buffer_lib",
"//source/common/event:dispatcher_includes",
"@envoy_api//envoy/extensions/network/socket_interface/v3:pkg_cc_proto",
],
Expand Down
50 changes: 50 additions & 0 deletions source/common/network/io_socket_handle_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,16 @@ Api::IoCallUint64Result IoSocketHandleImpl::read(Buffer::Instance& buffer,
if (max_length == 0) {
return Api::ioCallUint64ResultNoError();
}
if constexpr (Event::PlatformDefaultTriggerType == Event::FileTriggerType::EmulatedEdge) {
if (buffer_->length() > 0) {
auto move_length = std::min(buffer_->length(), max_length);
buffer.move(*buffer_);
if (file_event_) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure if this is needed. The caller should be reading until EAGAIN is returned

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Emm...good point, should we emulate the case if the caller provides a small max_length?

file_event_->registerEventIfEmulatedEdge(Event::FileReadyType::Read);
}
return Api::IoCallUint64Result(move_length, Api::IoErrorPtr(nullptr, [](Api::IoError*) {}));
}
}
Buffer::Reservation reservation = buffer.reserveForRead();
Api::IoCallUint64Result result = readv(std::min(reservation.length(), max_length),
reservation.slices(), reservation.numSlices());
Expand Down Expand Up @@ -502,6 +512,46 @@ Api::IoCallUint64Result IoSocketHandleImpl::recvmmsg(RawSliceArrays& slices, uin
}

Api::IoCallUint64Result IoSocketHandleImpl::recv(void* buffer, size_t length, int flags) {
if constexpr (Event::PlatformDefaultTriggerType == Event::FileTriggerType::EmulatedEdge) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am wondering if we should have a dedicated peek function for this.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea, agree with you.

// Mimic the MSG_PEEK by read data.
if (flags & MSG_PEEK) {
// Actually, this drain the data out of socket buffer. Although the `buffer_` may
// already has the enough data, but we have to drain the socket to ensure there is
// no more `Read` event.
Buffer::Reservation reservation = buffer_->reserveForRead();
auto length_to_read = std::min(reservation.length(), length);
// TODO(soulxu): this should be in a while loop until eagain returned.
Api::IoCallUint64Result result =
readv(length_to_read, reservation.slices(), reservation.numSlices());
uint64_t bytes_to_commit = result.ok() ? result.return_value_ : 0;
reservation.commit(bytes_to_commit);
if (file_event_) {
file_event_->registerEventIfEmulatedEdge(Event::FileReadyType::Read);
}
// The read failed, then we return the failure also.
if (bytes_to_commit == 0) {
return result;
}
auto copy_size = std::min(buffer_->length(), length);
buffer_->copyOut(0, copy_size, buffer);
return Api::IoCallUint64Result(copy_size, Api::IoErrorPtr(nullptr, [](Api::IoError*) {}));
;
} else {
if (buffer_->length() > 0) {
auto copy_size = std::min(buffer_->length(), length);
buffer_->copyOut(0, copy_size, buffer);
buffer_->drain(copy_size);
if (copy_size < length) {
length = length - copy_size;
} else {
if (file_event_) {
file_event_->registerEventIfEmulatedEdge(Event::FileReadyType::Read);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general it is assumed that it is a responsibility of the caller to re-active the event if they expect more data unless the error is WOULD_BLOCK

In the case of tls/proxy inspectors they should be able to determine that the data they currently have are not available to do their work and they should re-enable read. This is something that connection_impl does well.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea, same as the previous one, whether we should emulate all the cases. or we can just add assert here, assume people always read the data until WOULD_BLOCK return.

}
return Api::IoCallUint64Result(copy_size, Api::IoErrorPtr(nullptr, [](Api::IoError*) {}));
}
}
}
}
const Api::SysCallSizeResult result =
Api::OsSysCallsSingleton::get().recv(fd_, buffer, length, flags);
auto io_result = sysCallResultToIoCallResult(result);
Expand Down
5 changes: 4 additions & 1 deletion source/common/network/io_socket_handle_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include "envoy/event/dispatcher.h"
#include "envoy/network/io_handle.h"

#include "source/common/buffer/buffer_impl.h"
#include "source/common/common/logger.h"
#include "source/common/network/io_socket_error_impl.h"

Expand All @@ -19,7 +20,8 @@ class IoSocketHandleImpl : public IoHandle, protected Logger::Loggable<Logger::I
public:
explicit IoSocketHandleImpl(os_fd_t fd = INVALID_SOCKET, bool socket_v6only = false,
absl::optional<int> domain = absl::nullopt)
: fd_(fd), socket_v6only_(socket_v6only), domain_(domain) {}
: fd_(fd), socket_v6only_(socket_v6only), domain_(domain),
buffer_(std::make_unique<Buffer::OwnedImpl>()) {}

// Close underlying socket if close() hasn't been call yet.
~IoSocketHandleImpl() override;
Expand Down Expand Up @@ -104,6 +106,7 @@ class IoSocketHandleImpl : public IoHandle, protected Logger::Loggable<Logger::I
int socket_v6only_{false};
const absl::optional<int> domain_;
Event::FileEventPtr file_event_{nullptr};
std::unique_ptr<Buffer::Instance> buffer_;

// The minimum cmsg buffer size to filled in destination address, packets dropped and gso
// size when receiving a packet. It is possible for a received packet to contain both IPv4
Expand Down