Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
98 commits
Select commit Hold shift + click to select a range
f860594
format
lambdai Oct 6, 2020
d90cbea
amend
lambdai Oct 6, 2020
80a9178
Merge remote-tracking branch 'origin/master' into bsock
lambdai Oct 6, 2020
622ab92
clean up and add impl ::read()
lambdai Oct 7, 2020
420e498
clang tidy
lambdai Oct 8, 2020
d9ef28f
Merge remote-tracking branch 'origin/master' into bsock
lambdai Oct 8, 2020
4c53d54
impl BufferedIoSocketHandleImpl:write
lambdai Oct 8, 2020
812a288
comment
lambdai Oct 8, 2020
273447d
remove posix header file
lambdai Oct 8, 2020
9f810f2
fix error code on read EOS
lambdai Oct 9, 2020
6422a4f
address comments
lambdai Oct 13, 2020
a978ced
fix TestShutdown and close
lambdai Oct 13, 2020
0cf62b4
fix
lambdai Oct 13, 2020
09c1adf
fix log and comments
lambdai Oct 14, 2020
b73ced3
addressing comment
lambdai Oct 14, 2020
49bd8d6
remove event counter
lambdai Oct 14, 2020
55b9acc
more test and event counter fix
lambdai Oct 14, 2020
a1e0b2b
Merge branch 'bsock' of github.com:lambdai/envoy-dai into bsock
lambdai Oct 14, 2020
fb32d30
format
lambdai Oct 14, 2020
9361c49
add missing
lambdai Oct 15, 2020
baf755a
ASSERT_DEBUG_DEATH
lambdai Oct 15, 2020
a6686b4
coverage
lambdai Oct 15, 2020
2d93b61
coverage, cont
lambdai Oct 15, 2020
992c377
coverage, cont
lambdai Oct 15, 2020
d1a695d
more
lambdai Oct 15, 2020
683ad7f
split userspacefileeventimpl
lambdai Oct 21, 2020
673bca0
add missing files forgot to add to git
lambdai Oct 21, 2020
e59fa57
add scaffold of user space event test
lambdai Oct 22, 2020
b65e540
various fix: file event test not ready yet
lambdai Oct 22, 2020
9291161
Merge branch 'master' into bsock
lambdai Oct 22, 2020
437eb9b
complete user space file event test
lambdai Oct 22, 2020
9299130
fix destroy order in test
lambdai Oct 22, 2020
1b4a204
address comment: no schedule next, typo
lambdai Oct 27, 2020
e3d2839
Merge branch 'master' into bsock
lambdai Oct 28, 2020
58971d9
file event owned by io handle
lambdai Oct 28, 2020
119a9cb
fix format
lambdai Oct 28, 2020
f18fb59
address comment: also add fails_on_windows to track
lambdai Oct 28, 2020
b3e1264
fix cc format
lambdai Oct 28, 2020
7fb8f79
declare UserSpaceFileEventImpl final
lambdai Oct 28, 2020
1af2888
remove onEvents
lambdai Oct 29, 2020
9cdfd1b
fix user space event test
lambdai Oct 29, 2020
6ca32cf
fix buffer io socket handle test
lambdai Oct 29, 2020
fd7e7ea
clang tidy
lambdai Oct 29, 2020
bd745ab
clang tidy and test coverage
lambdai Oct 29, 2020
2e80156
ct
lambdai Oct 30, 2020
089526f
address comments
lambdai Oct 30, 2020
f203b6e
simplify test on new dtor
lambdai Oct 30, 2020
34b18ed
erase triggered events
lambdai Oct 30, 2020
83bd452
final and remove EventListener interface
lambdai Nov 2, 2020
c98c4f9
move to test extensions
lambdai Nov 2, 2020
8d87a54
ns Extensions::IoSocket::BufferedIoSocket
lambdai Nov 2, 2020
a876244
test: ns Extensions::IoSocket::BufferedIoSocket
lambdai Nov 2, 2020
f780926
codeformat and owners
lambdai Nov 2, 2020
aefff6a
moving peer buffer to extension
lambdai Nov 2, 2020
47cf2a8
save file
lambdai Nov 2, 2020
c4a336e
Merge remote-tracking branch 'origin/master' into bsock
lambdai Nov 2, 2020
6244c65
clangtidy wellknown names
lambdai Nov 3, 2020
a58a712
dup counter
lambdai Nov 3, 2020
1269e1e
extension and LT
lambdai Nov 5, 2020
c7ffacc
add staging
lambdai Nov 5, 2020
f2633f4
revert
lambdai Nov 5, 2020
2c7820a
Merge remote-tracking branch 'origin/master' into bsock
lambdai Nov 5, 2020
7c81f18
revert cont
lambdai Nov 5, 2020
9e1744a
destroy cluster info on main thread
lambdai Nov 14, 2020
7c419d1
close test
lambdai Nov 16, 2020
1993def
rewrite recv
lambdai Nov 17, 2020
bfb6911
flow control test
lambdai Nov 17, 2020
92cebbf
Events test
lambdai Nov 17, 2020
c4b1276
move close
lambdai Nov 17, 2020
9484dc8
shutdown test
lambdai Nov 17, 2020
ecb54c8
move writev
lambdai Nov 17, 2020
968e09e
user space event test
lambdai Nov 17, 2020
04ae3ac
typo
lambdai Nov 17, 2020
dfc457d
watermark func, inlineNewDataAvailable, split userspace event file
lambdai Nov 18, 2020
042f44d
more events test
lambdai Nov 18, 2020
c5e7aba
lint
lambdai Nov 19, 2020
82dc3f1
Merge branch 'master' into bsock
lambdai Dec 21, 2020
d2aa295
move test to cc_extension_test
lambdai Dec 21, 2020
faf8dc5
typo and remove Test prefix at test case name
lambdai Dec 21, 2020
f589a4c
renaming and format fix
lambdai Dec 21, 2020
5407f4b
impl partial write in userspace io socket
lambdai Dec 21, 2020
cc4f7f6
typo extension
lambdai Dec 21, 2020
d87f6b1
mark io_socket extension undocumented
lambdai Dec 22, 2020
03aa2c3
fix OwnedBufferFragmentImpl usage in asan test
lambdai Dec 22, 2020
6ad181c
add specialized file covr and fix mac build
lambdai Dec 22, 2020
f69ba83
parent lcov annotation
lambdai Dec 23, 2020
c1290a8
Merge branch 'master' into bsock
lambdai Dec 23, 2020
6eaff67
add ReadyType::Closed support and
lambdai Jan 11, 2021
21cc67a
optimization: add poll method to deliver enabled events unless explic…
lambdai Jan 11, 2021
29c1bea
massage merge master
lambdai Jan 13, 2021
51db809
add coverage by adding more Closed ready type
lambdai Jan 14, 2021
38822f9
massage CI
lambdai Jan 14, 2021
7333584
Merge branch 'main' into bsock
lambdai Feb 2, 2021
726e3e9
fixing build
lambdai Feb 2, 2021
4adf60c
fix test build
lambdai Feb 2, 2021
1f91272
fix build
lambdai Feb 2, 2021
65b590e
fix tests
lambdai Feb 2, 2021
8087b0d
cover read(buf, 0)
lambdai Feb 2, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions source/extensions/io_socket/user_space/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,19 @@ envoy_cc_library(
"//include/envoy/event:dispatcher_interface",
],
)

envoy_cc_library(
name = "io_socket_handle_lib",
srcs = [
"io_socket_handle_impl.cc",
],
hdrs = [
"io_socket_handle_impl.h",
],
deps = [
":file_event_lib",
":io_handle_lib",
"//source/common/event:dispatcher_includes",
"//source/common/network:default_socket_interface_lib",
],
)
330 changes: 330 additions & 0 deletions source/extensions/io_socket/user_space/io_socket_handle_impl.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,330 @@
#include "extensions/io_socket/user_space/io_socket_handle_impl.h"

#include "envoy/buffer/buffer.h"
#include "envoy/common/platform.h"

#include "common/api/os_sys_calls_impl.h"
#include "common/common/assert.h"
#include "common/common/utility.h"
#include "common/network/address_impl.h"

#include "extensions/io_socket/user_space/file_event_impl.h"

#include "absl/types/optional.h"

namespace Envoy {

namespace Extensions {
namespace IoSocket {
namespace UserSpace {
namespace {
Api::SysCallIntResult makeInvalidSyscallResult() {
return Api::SysCallIntResult{-1, SOCKET_ERROR_NOT_SUP};
}
} // namespace

IoSocketHandleImpl::IoSocketHandleImpl()
: pending_received_data_([&]() -> void { this->onBelowLowWatermark(); },
[&]() -> void { this->onAboveHighWatermark(); }, []() -> void {}) {}

IoSocketHandleImpl::~IoSocketHandleImpl() {
if (!closed_) {
close();
}
}

Api::IoCallUint64Result IoSocketHandleImpl::close() {
ASSERT(!closed_);
if (!closed_) {
if (peer_handle_) {
ENVOY_LOG(trace, "socket {} close before peer {} closes.", static_cast<void*>(this),
static_cast<void*>(peer_handle_));
// Notify the peer we won't write more data. shutdown(WRITE).
peer_handle_->setWriteEnd();
// Notify the peer that we no longer accept data. shutdown(RD).
peer_handle_->onPeerDestroy();
peer_handle_ = nullptr;
} else {
ENVOY_LOG(trace, "socket {} close after peer closed.", static_cast<void*>(this));
}
}
closed_ = true;
return Api::ioCallUint64ResultNoError();
}

bool IoSocketHandleImpl::isOpen() const { return !closed_; }

Api::IoCallUint64Result IoSocketHandleImpl::readv(uint64_t max_length, Buffer::RawSlice* slices,
uint64_t num_slice) {
if (!isOpen()) {
return {0,
// TODO(lambdai): Add EBADF in Network::IoSocketError and adopt it here.
Api::IoErrorPtr(new Network::IoSocketError(SOCKET_ERROR_INVAL),
Network::IoSocketError::deleteIoError)};
}
if (pending_received_data_.length() == 0) {
if (receive_data_end_stream_) {
return {0, Api::IoErrorPtr(nullptr, [](Api::IoError*) {})};
} else {
return {0, Api::IoErrorPtr(Network::IoSocketError::getIoSocketEagainInstance(),
Network::IoSocketError::deleteIoError)};
}
}
uint64_t bytes_offset = 0;
for (uint64_t i = 0; i < num_slice && bytes_offset < max_length; i++) {
auto bytes_to_read_in_this_slice =
std::min(std::min(pending_received_data_.length(), max_length) - bytes_offset,
uint64_t(slices[i].len_));
// Copy and drain, so pending_received_data_ always copy from offset 0.
pending_received_data_.copyOut(0, bytes_to_read_in_this_slice, slices[i].mem_);
pending_received_data_.drain(bytes_to_read_in_this_slice);
bytes_offset += bytes_to_read_in_this_slice;
}
auto bytes_read = bytes_offset;
ASSERT(bytes_read <= max_length);
ENVOY_LOG(trace, "socket {} readv {} bytes", static_cast<void*>(this), bytes_read);
return {bytes_read, Api::IoErrorPtr(nullptr, [](Api::IoError*) {})};
}

Api::IoCallUint64Result IoSocketHandleImpl::read(Buffer::Instance& buffer,
absl::optional<uint64_t> max_length_opt) {
const uint64_t max_length = max_length_opt.value_or(UINT64_MAX);
if (max_length == 0) {
return Api::ioCallUint64ResultNoError();
}
if (!isOpen()) {
return {0, Api::IoErrorPtr(new Network::IoSocketError(SOCKET_ERROR_INVAL),
Network::IoSocketError::deleteIoError)};
}
if (pending_received_data_.length() == 0) {
if (receive_data_end_stream_) {
return {0, Api::IoErrorPtr(nullptr, [](Api::IoError*) {})};
} else {
return {0, Api::IoErrorPtr(Network::IoSocketError::getIoSocketEagainInstance(),
Network::IoSocketError::deleteIoError)};
}
}
// TODO(lambdai): Move slice by slice until high watermark.
uint64_t max_bytes_to_read = std::min(max_length, pending_received_data_.length());
buffer.move(pending_received_data_, max_bytes_to_read);
return {max_bytes_to_read, Api::IoErrorPtr(nullptr, [](Api::IoError*) {})};
}

Api::IoCallUint64Result IoSocketHandleImpl::writev(const Buffer::RawSlice* slices,
uint64_t num_slice) {
// Empty input is allowed even though the peer is shutdown.
bool is_input_empty = true;
for (uint64_t i = 0; i < num_slice; i++) {
if (slices[i].mem_ != nullptr && slices[i].len_ != 0) {
is_input_empty = false;
break;
}
}
if (is_input_empty) {
return Api::ioCallUint64ResultNoError();
};
if (!isOpen()) {
return {0, Api::IoErrorPtr(new Network::IoSocketError(SOCKET_ERROR_INVAL),
Network::IoSocketError::deleteIoError)};
}
// Closed peer.
if (!peer_handle_) {
return {0, Api::IoErrorPtr(new Network::IoSocketError(SOCKET_ERROR_INVAL),
Network::IoSocketError::deleteIoError)};
}
// Error: write after close.
if (peer_handle_->isPeerShutDownWrite()) {
// TODO(lambdai): `EPIPE` or `ENOTCONN`.
return {0, Api::IoErrorPtr(new Network::IoSocketError(SOCKET_ERROR_INVAL),
Network::IoSocketError::deleteIoError)};
}
// The peer is valid but temporary not accepts new data. Likely due to flow control.
if (!peer_handle_->isWritable()) {
return {0, Api::IoErrorPtr(Network::IoSocketError::getIoSocketEagainInstance(),
Network::IoSocketError::deleteIoError)};
}

auto* const dest_buffer = peer_handle_->getWriteBuffer();
// Write along with iteration. Buffer guarantee the fragment is always append-able.
uint64_t bytes_written = 0;
for (uint64_t i = 0; i < num_slice && !dest_buffer->highWatermarkTriggered(); i++) {
if (slices[i].mem_ != nullptr && slices[i].len_ != 0) {
dest_buffer->add(slices[i].mem_, slices[i].len_);
bytes_written += slices[i].len_;
}
}
peer_handle_->setNewDataAvailable();
ENVOY_LOG(trace, "socket {} writev {} bytes", static_cast<void*>(this), bytes_written);
return {bytes_written, Api::IoErrorPtr(nullptr, [](Api::IoError*) {})};
}

Api::IoCallUint64Result IoSocketHandleImpl::write(Buffer::Instance& buffer) {
// Empty input is allowed even though the peer is shutdown.
if (buffer.length() == 0) {
return Api::ioCallUint64ResultNoError();
}
if (!isOpen()) {
return {0, Api::IoErrorPtr(new Network::IoSocketError(SOCKET_ERROR_INVAL),
Network::IoSocketError::deleteIoError)};
}
// Closed peer.
if (!peer_handle_) {
return {0, Api::IoErrorPtr(new Network::IoSocketError(SOCKET_ERROR_INVAL),
Network::IoSocketError::deleteIoError)};
}
// Error: write after close.
if (peer_handle_->isPeerShutDownWrite()) {
// TODO(lambdai): `EPIPE` or `ENOTCONN`.
return {0, Api::IoErrorPtr(new Network::IoSocketError(SOCKET_ERROR_INVAL),
Network::IoSocketError::deleteIoError)};
}
// The peer is valid but temporary not accepts new data. Likely due to flow control.
if (!peer_handle_->isWritable()) {
return {0, Api::IoErrorPtr(Network::IoSocketError::getIoSocketEagainInstance(),
Network::IoSocketError::deleteIoError)};
}
uint64_t total_bytes_to_write = 0;
const uint64_t max_bytes_to_write = buffer.length();
while (peer_handle_->isWritable()) {
const auto& front_slice = buffer.frontSlice();
if (front_slice.len_ == 0) {
break;
} else {
peer_handle_->getWriteBuffer()->move(buffer, front_slice.len_);
total_bytes_to_write += front_slice.len_;
}
}
peer_handle_->setNewDataAvailable();
ENVOY_LOG(trace, "socket {} writev {} bytes of {}", static_cast<void*>(this),
total_bytes_to_write, max_bytes_to_write);
return {total_bytes_to_write, Api::IoErrorPtr(nullptr, [](Api::IoError*) {})};
}

Api::IoCallUint64Result IoSocketHandleImpl::sendmsg(const Buffer::RawSlice*, uint64_t, int,
const Network::Address::Ip*,
const Network::Address::Instance&) {
return Network::IoSocketError::ioResultSocketInvalidAddress();
}

Api::IoCallUint64Result IoSocketHandleImpl::recvmsg(Buffer::RawSlice*, const uint64_t, uint32_t,
RecvMsgOutput&) {
return Network::IoSocketError::ioResultSocketInvalidAddress();
}

Api::IoCallUint64Result IoSocketHandleImpl::recvmmsg(RawSliceArrays&, uint32_t, RecvMsgOutput&) {
return Network::IoSocketError::ioResultSocketInvalidAddress();
}

Api::IoCallUint64Result IoSocketHandleImpl::recv(void* buffer, size_t length, int flags) {
if (!isOpen()) {
return {0, Api::IoErrorPtr(new Network::IoSocketError(SOCKET_ERROR_INVAL),
Network::IoSocketError::deleteIoError)};
}
// No data and the writer closed.
if (pending_received_data_.length() == 0) {
if (receive_data_end_stream_) {
return {0, Api::IoErrorPtr(nullptr, [](Api::IoError*) {})};
} else {
return {0, Api::IoErrorPtr(Network::IoSocketError::getIoSocketEagainInstance(),
Network::IoSocketError::deleteIoError)};
}
}
// Specify uint64_t since the latter length may not have the same type.
auto max_bytes_to_read = std::min<uint64_t>(pending_received_data_.length(), length);
pending_received_data_.copyOut(0, max_bytes_to_read, buffer);
if (!(flags & MSG_PEEK)) {
pending_received_data_.drain(max_bytes_to_read);
}
return {max_bytes_to_read, Api::IoErrorPtr(nullptr, [](Api::IoError*) {})};
}

bool IoSocketHandleImpl::supportsMmsg() const { return false; }

bool IoSocketHandleImpl::supportsUdpGro() const { return false; }

Api::SysCallIntResult IoSocketHandleImpl::bind(Network::Address::InstanceConstSharedPtr) {
return makeInvalidSyscallResult();
}

Api::SysCallIntResult IoSocketHandleImpl::listen(int) { return makeInvalidSyscallResult(); }

Network::IoHandlePtr IoSocketHandleImpl::accept(struct sockaddr*, socklen_t*) {
NOT_IMPLEMENTED_GCOVR_EXCL_LINE;
}

Api::SysCallIntResult IoSocketHandleImpl::connect(Network::Address::InstanceConstSharedPtr) {
// Buffered Io handle should always be considered as connected.
// Use write or read to determine if peer is closed.
return {0, 0};
}

Api::SysCallIntResult IoSocketHandleImpl::setOption(int, int, const void*, socklen_t) {
return makeInvalidSyscallResult();
}

Api::SysCallIntResult IoSocketHandleImpl::getOption(int, int, void*, socklen_t*) {
return makeInvalidSyscallResult();
}

Api::SysCallIntResult IoSocketHandleImpl::setBlocking(bool) { return makeInvalidSyscallResult(); }

absl::optional<int> IoSocketHandleImpl::domain() { return absl::nullopt; }

Network::Address::InstanceConstSharedPtr IoSocketHandleImpl::localAddress() {
// TODO(lambdai): Rewrite when caller accept error as the return value.
throw EnvoyException(fmt::format("getsockname failed for IoSocketHandleImpl"));
}

Network::Address::InstanceConstSharedPtr IoSocketHandleImpl::peerAddress() {
// TODO(lambdai): Rewrite when caller accept error as the return value.
throw EnvoyException(fmt::format("getsockname failed for IoSocketHandleImpl"));
}

void IoSocketHandleImpl::initializeFileEvent(Event::Dispatcher& dispatcher, Event::FileReadyCb cb,
Event::FileTriggerType trigger, uint32_t events) {
ASSERT(user_file_event_ == nullptr, "Attempting to initialize two `file_event_` for the same "
"file descriptor. This is not allowed.");
ASSERT(trigger != Event::FileTriggerType::Level, "Native level trigger is not supported yet.");
user_file_event_ = std::make_unique<FileEventImpl>(dispatcher, cb, events, *this);
}

Network::IoHandlePtr IoSocketHandleImpl::duplicate() {
// duplicate() is supposed to be used on listener io handle while this implementation doesn't
// support listen.
NOT_IMPLEMENTED_GCOVR_EXCL_LINE;
}

void IoSocketHandleImpl::activateFileEvents(uint32_t events) {
if (user_file_event_) {
user_file_event_->activate(events);
} else {
ENVOY_BUG(false, "Null user_file_event_");
}
}

void IoSocketHandleImpl::enableFileEvents(uint32_t events) {
if (user_file_event_) {
user_file_event_->setEnabled(events);
} else {
ENVOY_BUG(false, "Null user_file_event_");
}
}

void IoSocketHandleImpl::resetFileEvents() { user_file_event_.reset(); }

Api::SysCallIntResult IoSocketHandleImpl::shutdown(int how) {
// Support only shutdown write.
ASSERT(how == ENVOY_SHUT_WR);
ASSERT(!closed_);
if (!write_shutdown_) {
ASSERT(peer_handle_);
// Notify the peer we won't write more data.
peer_handle_->setWriteEnd();
write_shutdown_ = true;
}
return {0, 0};
}
} // namespace UserSpace
} // namespace IoSocket
} // namespace Extensions
} // namespace Envoy
Loading