-
Notifications
You must be signed in to change notification settings - Fork 5.5k
Io socket handle for internal socket #13418
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 10 commits
f860594
d90cbea
80a9178
622ab92
420e498
d9ef28f
4c53d54
812a288
273447d
9f810f2
6422a4f
a978ced
0cf62b4
09c1adf
b73ced3
49bd8d6
55b9acc
a1e0b2b
fb32d30
9361c49
baf755a
a6686b4
2d93b61
992c377
d1a695d
683ad7f
673bca0
e59fa57
b65e540
9291161
437eb9b
9299130
1b4a204
e3d2839
58971d9
119a9cb
f18fb59
b3e1264
7fb8f79
1af2888
9cdfd1b
6ca32cf
fd7e7ea
bd745ab
2e80156
089526f
f203b6e
34b18ed
83bd452
c98c4f9
8d87a54
a876244
f780926
aefff6a
47cf2a8
c4a336e
6244c65
a58a712
1269e1e
c7ffacc
f2633f4
2c7820a
7c81f18
9e1744a
7c419d1
1993def
bfb6911
92cebbf
c4b1276
9484dc8
ecb54c8
968e09e
04ae3ac
dfc457d
042f44d
c5e7aba
82dc3f1
d2aa295
faf8dc5
f589a4c
5407f4b
cc4f7f6
d87f6b1
03aa2c3
6ad181c
f69ba83
c1290a8
6eaff67
21cc67a
29c1bea
51db809
38822f9
7333584
726e3e9
4adf60c
1f91272
65b590e
8087b0d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -8,6 +8,10 @@ | |
| #include "common/event/event_impl_base.h" | ||
|
|
||
| namespace Envoy { | ||
|
|
||
| namespace Network { | ||
| class BufferedIoSocketHandleImpl; | ||
| } | ||
| namespace Event { | ||
|
|
||
| /** | ||
|
|
@@ -41,5 +45,119 @@ class FileEventImpl : public FileEvent, ImplBase { | |
| // polling and activating new fd events. | ||
| const bool activate_fd_events_next_event_loop_; | ||
| }; | ||
|
|
||
| // Forward declare for friend class. | ||
| class UserSpaceFileEventFactory; | ||
|
|
||
| // The interface of populating event watcher and obtaining the active events. The events includes | ||
| // Read, Write and Closed. | ||
|
lambdai marked this conversation as resolved.
Outdated
|
||
| class EventListener { | ||
| public: | ||
| virtual ~EventListener() = default; | ||
|
|
||
| // Provide the activated events. | ||
|
lambdai marked this conversation as resolved.
Outdated
|
||
| virtual uint32_t triggeredEvents() PURE; | ||
| virtual uint32_t getAndClearEphemeralEvents() PURE; | ||
|
|
||
| // Callbacks of the event operation. | ||
| virtual void onEventEnabled(uint32_t enabled_events) PURE; | ||
| virtual void onEventActivated(uint32_t activated_events) PURE; | ||
| }; | ||
|
|
||
| // Return the enabled events except EV_CLOSED. This implementation is generally good since only | ||
| // epoll supports EV_CLOSED but the entire envoy code base supports another poller. The event owner | ||
| // must assume EV_CLOSED is never activated. Also event owner must tolerate that OS could notify | ||
|
lambdai marked this conversation as resolved.
Outdated
|
||
| // events which are not actually triggered. | ||
| class DefaultEventListener : public EventListener { | ||
|
lambdai marked this conversation as resolved.
Outdated
|
||
| public: | ||
| ~DefaultEventListener() override = default; | ||
|
|
||
| // Return both read and write. | ||
| uint32_t triggeredEvents() override { return pending_events_ & (~Event::FileReadyType::Closed); } | ||
|
lambdai marked this conversation as resolved.
Outdated
|
||
|
|
||
| void onEventEnabled(uint32_t enabled_events) override { pending_events_ = enabled_events; } | ||
|
|
||
| void onEventActivated(uint32_t activated_events) override { | ||
| ephermal_events_ |= activated_events; | ||
| } | ||
|
|
||
| uint32_t getAndClearEphemeralEvents() override { | ||
| auto res = ephermal_events_; | ||
|
lambdai marked this conversation as resolved.
Outdated
|
||
| ephermal_events_ = 0; | ||
| return res; | ||
| } | ||
|
|
||
| private: | ||
| // The persisted interested events and ready events. | ||
| uint32_t pending_events_{}; | ||
|
lambdai marked this conversation as resolved.
Outdated
|
||
| // The events set by activate() and will be cleared after the io callback. | ||
| uint32_t ephermal_events_{}; | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not clear from here why we have persistent and ephemeral events. maybe the comments will clear up? In general, the setEnabled filters which events can be triggered on activate. So if you're listening for data to read and there's new data, you get informed but if you're not, those events are never propagated. I'm not clear by naming if that's what you're trying to do here? |
||
| }; | ||
|
|
||
| // A FileEvent implementation which is | ||
|
lambdai marked this conversation as resolved.
Outdated
|
||
| class UserSpaceFileEventImpl : public FileEvent, Logger::Loggable<Logger::Id::io> { | ||
| public: | ||
| ~UserSpaceFileEventImpl() override { | ||
| if (schedulable_.enabled()) { | ||
| schedulable_.cancel(); | ||
| } | ||
| ASSERT(event_counter_ == 1); | ||
| --event_counter_; | ||
| } | ||
|
|
||
| // Event::FileEvent | ||
| void activate(uint32_t events) override { | ||
| event_listener_.onEventActivated(events); | ||
|
lambdai marked this conversation as resolved.
Outdated
|
||
| if (!schedulable_.enabled()) { | ||
| schedulable_.scheduleCallbackNextIteration(); | ||
|
lambdai marked this conversation as resolved.
Outdated
|
||
| } | ||
| } | ||
|
|
||
| void setEnabled(uint32_t events) override { | ||
| event_listener_.onEventEnabled(events); | ||
| bool was_enabled = schedulable_.enabled(); | ||
| if (!was_enabled) { | ||
| schedulable_.scheduleCallbackNextIteration(); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Again enabled just indicates what events to watch out for. If there's no pending data to read or write it generally doesn't trigger. |
||
| } | ||
| ENVOY_LOG(trace, "User space file event {} setEnabled {} on {}. Will {} reschedule.", | ||
|
lambdai marked this conversation as resolved.
Outdated
|
||
| static_cast<void*>(this), was_enabled ? "not " : ""); | ||
| } | ||
|
|
||
| EventListener& getEventListener() { return event_listener_; } | ||
| void onEvents() { cb_(); } | ||
| friend class UserSpaceFileEventFactory; | ||
| friend class Network::BufferedIoSocketHandleImpl; | ||
|
|
||
| private: | ||
| UserSpaceFileEventImpl(Event::FileReadyCb cb, uint32_t events, | ||
| SchedulableCallback& schedulable_cb, int& event_counter) | ||
| : schedulable_(schedulable_cb), cb_([this, cb]() { | ||
| auto all_events = getEventListener().triggeredEvents(); | ||
|
lambdai marked this conversation as resolved.
Outdated
|
||
| auto ephemeral_events = getEventListener().getAndClearEphemeralEvents(); | ||
| ENVOY_LOG(trace, | ||
| "User space event {} invokes callbacks on allevents = {}, ephermal events = {}", | ||
| static_cast<void*>(this), all_events, ephemeral_events); | ||
| cb(all_events | ephemeral_events); | ||
| }), | ||
| event_counter_(event_counter) { | ||
| event_listener_.onEventEnabled(events); | ||
| } | ||
| DefaultEventListener event_listener_; | ||
| SchedulableCallback& schedulable_; | ||
| std::function<void()> cb_; | ||
| int& event_counter_; | ||
|
lambdai marked this conversation as resolved.
Outdated
|
||
| }; | ||
|
|
||
| class UserSpaceFileEventFactory { | ||
| public: | ||
| static std::unique_ptr<UserSpaceFileEventImpl> | ||
| createUserSpaceFileEventImpl(Event::Dispatcher&, Event::FileReadyCb cb, Event::FileTriggerType, | ||
| uint32_t events, SchedulableCallback& scheduable_cb, | ||
| int& event_counter) { | ||
| return std::unique_ptr<UserSpaceFileEventImpl>( | ||
| new UserSpaceFileEventImpl(cb, events, scheduable_cb, event_counter)); | ||
| } | ||
| }; | ||
|
|
||
| } // namespace Event | ||
| } // namespace Envoy | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,228 @@ | ||
| #include "common/network/buffered_io_socket_handle_impl.h" | ||
|
|
||
| #include "envoy/buffer/buffer.h" | ||
| #include "envoy/common/platform.h" | ||
|
|
||
| #include "common/api/os_sys_calls_impl.h" | ||
| #include "common/common/assert.h" | ||
| #include "common/common/utility.h" | ||
| #include "common/event/file_event_impl.h" | ||
| #include "common/network/address_impl.h" | ||
|
|
||
| #include "absl/container/fixed_array.h" | ||
| #include "absl/types/optional.h" | ||
|
|
||
| namespace Envoy { | ||
| namespace Network { | ||
|
|
||
| Api::IoCallUint64Result BufferedIoSocketHandleImpl::close() { | ||
| ASSERT(!closed_); | ||
| if (!write_shutdown_) { | ||
|
lambdai marked this conversation as resolved.
Outdated
|
||
| ASSERT(writable_peer_); | ||
| // Notify the peer we won't write more data. shutdown(WRITE). | ||
| writable_peer_->setWriteEnd(); | ||
| // Notify the peer that we no longer accept data. shutdown(RD). | ||
| writable_peer_->onPeerDestroy(); | ||
| writable_peer_->maybeSetNewData(); | ||
| writable_peer_ = nullptr; | ||
| write_shutdown_ = true; | ||
| } | ||
| closed_ = true; | ||
| return Api::ioCallUint64ResultNoError(); | ||
| } | ||
|
|
||
| bool BufferedIoSocketHandleImpl::isOpen() const { return !closed_; } | ||
|
|
||
| Api::IoCallUint64Result BufferedIoSocketHandleImpl::readv(uint64_t max_length, | ||
|
lambdai marked this conversation as resolved.
Outdated
|
||
| Buffer::RawSlice* slices, | ||
| uint64_t num_slice) { | ||
| if (owned_buffer_.length() == 0) { | ||
|
lambdai marked this conversation as resolved.
Outdated
|
||
| if (read_end_stream_) { | ||
| return {0, Api::IoErrorPtr(nullptr, [](Api::IoError*) {})}; | ||
| } else { | ||
| return {0, Api::IoErrorPtr(IoSocketError::getIoSocketEagainInstance(), | ||
| IoSocketError::deleteIoError)}; | ||
| } | ||
| } else { | ||
|
lambdai marked this conversation as resolved.
Outdated
|
||
| absl::FixedArray<iovec> iov(num_slice); | ||
| uint64_t num_slices_to_read = 0; | ||
|
lambdai marked this conversation as resolved.
Outdated
|
||
| uint64_t num_bytes_to_read = 0; | ||
| for (; num_slices_to_read < num_slice && num_bytes_to_read < max_length; num_slices_to_read++) { | ||
| auto min_len = std::min(std::min(owned_buffer_.length(), max_length) - num_bytes_to_read, | ||
|
lambdai marked this conversation as resolved.
Outdated
|
||
| uint64_t(slices[num_slices_to_read].len_)); | ||
| owned_buffer_.copyOut(num_bytes_to_read, min_len, slices[num_slices_to_read].mem_); | ||
|
lambdai marked this conversation as resolved.
Outdated
lambdai marked this conversation as resolved.
Outdated
|
||
| num_bytes_to_read += min_len; | ||
| } | ||
| ASSERT(num_bytes_to_read <= max_length); | ||
| owned_buffer_.drain(num_bytes_to_read); | ||
| ENVOY_LOG(trace, "socket {} readv {} bytes", static_cast<void*>(this), num_bytes_to_read); | ||
| return {num_bytes_to_read, Api::IoErrorPtr(nullptr, [](Api::IoError*) {})}; | ||
| } | ||
| } | ||
|
|
||
| Api::IoCallUint64Result BufferedIoSocketHandleImpl::read(Buffer::Instance& buffer, | ||
| uint64_t max_length) { | ||
| if (owned_buffer_.length() == 0) { | ||
| if (read_end_stream_) { | ||
| return {0, Api::IoErrorPtr(nullptr, [](Api::IoError*) {})}; | ||
| } else { | ||
| return {0, Api::IoErrorPtr(IoSocketError::getIoSocketEagainInstance(), | ||
| IoSocketError::deleteIoError)}; | ||
| } | ||
| } else { | ||
| // TODO(lambdai): Move at slice boundary to move to reduce the copy. | ||
| uint64_t min_len = std::min(max_length, owned_buffer_.length()); | ||
| buffer.move(owned_buffer_, min_len); | ||
| return {min_len, Api::IoErrorPtr(nullptr, [](Api::IoError*) {})}; | ||
| } | ||
| } | ||
|
|
||
| Api::IoCallUint64Result BufferedIoSocketHandleImpl::writev(const Buffer::RawSlice* slices, | ||
| uint64_t num_slice) { | ||
| if (!writable_peer_) { | ||
| return sysCallResultToIoCallResult(Api::SysCallSizeResult{-1, SOCKET_ERROR_INTR}); | ||
| } | ||
| if (writable_peer_->isWriteEndSet() || !writable_peer_->isWritable()) { | ||
|
lambdai marked this conversation as resolved.
Outdated
|
||
| return {0, Api::IoErrorPtr(IoSocketError::getIoSocketEagainInstance(), | ||
| IoSocketError::deleteIoError)}; | ||
| } | ||
| // Write along with iteration. Buffer guarantee the fragment is always append-able. | ||
| uint64_t num_bytes_to_write = 0; | ||
| for (uint64_t i = 0; i < num_slice; i++) { | ||
| if (slices[i].mem_ != nullptr && slices[i].len_ != 0) { | ||
| writable_peer_->getWriteBuffer()->add(slices[i].mem_, slices[i].len_); | ||
|
lambdai marked this conversation as resolved.
Outdated
|
||
| num_bytes_to_write += slices[i].len_; | ||
| } | ||
| } | ||
| writable_peer_->maybeSetNewData(); | ||
|
lambdai marked this conversation as resolved.
Outdated
|
||
| ENVOY_LOG(trace, "socket {} writev {} bytes", static_cast<void*>(this), num_bytes_to_write); | ||
| return {num_bytes_to_write, Api::IoErrorPtr(nullptr, [](Api::IoError*) {})}; | ||
| } | ||
|
|
||
| Api::IoCallUint64Result BufferedIoSocketHandleImpl::write(Buffer::Instance& buffer) { | ||
| if (!writable_peer_) { | ||
| return sysCallResultToIoCallResult(Api::SysCallSizeResult{-1, SOCKET_ERROR_INTR}); | ||
| } | ||
| if (writable_peer_->isWriteEndSet() || !writable_peer_->isWritable()) { | ||
| return {0, Api::IoErrorPtr(IoSocketError::getIoSocketEagainInstance(), | ||
| IoSocketError::deleteIoError)}; | ||
| } | ||
| uint64_t num_bytes_to_write = buffer.length(); | ||
| writable_peer_->getWriteBuffer()->move(buffer); | ||
| writable_peer_->maybeSetNewData(); | ||
| ENVOY_LOG(trace, "socket {} writev {} bytes", static_cast<void*>(this), num_bytes_to_write); | ||
| return {num_bytes_to_write, Api::IoErrorPtr(nullptr, [](Api::IoError*) {})}; | ||
| } | ||
|
|
||
| Api::IoCallUint64Result BufferedIoSocketHandleImpl::sendmsg(const Buffer::RawSlice*, uint64_t, int, | ||
| const Address::Ip*, | ||
| const Address::Instance&) { | ||
| return IoSocketError::ioResultSocketInvalidAddress(); | ||
| } | ||
|
|
||
| Api::IoCallUint64Result BufferedIoSocketHandleImpl::recvmsg(Buffer::RawSlice*, const uint64_t, | ||
| uint32_t, RecvMsgOutput&) { | ||
| return IoSocketError::ioResultSocketInvalidAddress(); | ||
| } | ||
|
|
||
| Api::IoCallUint64Result BufferedIoSocketHandleImpl::recvmmsg(RawSliceArrays&, uint32_t, | ||
| RecvMsgOutput&) { | ||
| return IoSocketError::ioResultSocketInvalidAddress(); | ||
| } | ||
|
|
||
| Api::IoCallUint64Result BufferedIoSocketHandleImpl::recv(void* buffer, size_t length, int flags) { | ||
|
lambdai marked this conversation as resolved.
Outdated
|
||
| // No data and the writer closed. | ||
| if (owned_buffer_.length() == 0) { | ||
| if (read_end_stream_) { | ||
| return sysCallResultToIoCallResult(Api::SysCallSizeResult{-1, SOCKET_ERROR_INTR}); | ||
| } else { | ||
| return {0, Api::IoErrorPtr(IoSocketError::getIoSocketEagainInstance(), | ||
| IoSocketError::deleteIoError)}; | ||
| } | ||
| } else { | ||
| auto min_len = std::min(owned_buffer_.length(), length); | ||
| owned_buffer_.copyOut(0, min_len, buffer); | ||
| if (!(flags & MSG_PEEK)) { | ||
| owned_buffer_.drain(min_len); | ||
| } | ||
| return {min_len, Api::IoErrorPtr(nullptr, [](Api::IoError*) {})}; | ||
| } | ||
| } | ||
|
|
||
| bool BufferedIoSocketHandleImpl::supportsMmsg() const { return false; } | ||
|
|
||
| bool BufferedIoSocketHandleImpl::supportsUdpGro() const { return false; } | ||
|
|
||
| Api::SysCallIntResult makeInvalidSyscall() { | ||
| return Api::SysCallIntResult{-1, SOCKET_ERROR_NOT_SUP /*SOCKET_ERROR_NOT_SUP*/}; | ||
|
lambdai marked this conversation as resolved.
Outdated
|
||
| } | ||
|
|
||
| Api::SysCallIntResult BufferedIoSocketHandleImpl::bind(Address::InstanceConstSharedPtr) { | ||
| return makeInvalidSyscall(); | ||
| } | ||
|
|
||
| Api::SysCallIntResult BufferedIoSocketHandleImpl::listen(int) { return makeInvalidSyscall(); } | ||
|
|
||
| IoHandlePtr BufferedIoSocketHandleImpl::accept(struct sockaddr*, socklen_t*) { | ||
| NOT_IMPLEMENTED_GCOVR_EXCL_LINE; | ||
| } | ||
|
|
||
| Api::SysCallIntResult BufferedIoSocketHandleImpl::connect(Address::InstanceConstSharedPtr) { | ||
| // Buffered Io handle should always be considered as connected. Use write to determine if peer is | ||
|
lambdai marked this conversation as resolved.
Outdated
|
||
| // closed. | ||
| return {0, 0}; | ||
| } | ||
|
|
||
| Api::SysCallIntResult BufferedIoSocketHandleImpl::setOption(int, int, const void*, socklen_t) { | ||
| return makeInvalidSyscall(); | ||
| } | ||
|
|
||
| Api::SysCallIntResult BufferedIoSocketHandleImpl::getOption(int, int, void*, socklen_t*) { | ||
| return makeInvalidSyscall(); | ||
| } | ||
|
|
||
| Api::SysCallIntResult BufferedIoSocketHandleImpl::setBlocking(bool) { return makeInvalidSyscall(); } | ||
|
|
||
| absl::optional<int> BufferedIoSocketHandleImpl::domain() { return absl::nullopt; } | ||
|
|
||
| Address::InstanceConstSharedPtr BufferedIoSocketHandleImpl::localAddress() { | ||
| throw EnvoyException(fmt::format("getsockname failed for BufferedIoSocketHandleImpl")); | ||
| } | ||
|
|
||
| Address::InstanceConstSharedPtr BufferedIoSocketHandleImpl::peerAddress() { | ||
|
|
||
| throw EnvoyException(fmt::format("getsockname failed for BufferedIoSocketHandleImpl")); | ||
| } | ||
|
|
||
| Event::FileEventPtr BufferedIoSocketHandleImpl::createFileEvent(Event::Dispatcher& dispatcher, | ||
|
lambdai marked this conversation as resolved.
Outdated
|
||
| Event::FileReadyCb cb, | ||
| Event::FileTriggerType trigger_type, | ||
|
lambdai marked this conversation as resolved.
Outdated
|
||
| uint32_t events) { | ||
| ASSERT(event_counter_ == 0); | ||
| ++event_counter_; | ||
| io_callback_ = dispatcher.createSchedulableCallback([this]() { user_file_event_->onEvents(); }); | ||
| auto res = Event::UserSpaceFileEventFactory::createUserSpaceFileEventImpl( | ||
| dispatcher, cb, trigger_type, events, *io_callback_, event_counter_); | ||
| user_file_event_ = res.get(); | ||
| // Blindly activate the events. | ||
| io_callback_->scheduleCallbackNextIteration(); | ||
|
lambdai marked this conversation as resolved.
Outdated
|
||
| return res; | ||
| } | ||
|
|
||
| Api::SysCallIntResult BufferedIoSocketHandleImpl::shutdown(int how) { | ||
| // Support shutdown write. | ||
|
lambdai marked this conversation as resolved.
Outdated
|
||
| if ((how == ENVOY_SHUT_WR) || (how == ENVOY_SHUT_RDWR)) { | ||
| ASSERT(!closed_); | ||
| if (!write_shutdown_) { | ||
| ASSERT(writable_peer_); | ||
| // Notify the peer we won't write more data. shutdown(WRITE). | ||
| writable_peer_->setWriteEnd(); | ||
| writable_peer_->maybeSetNewData(); | ||
| write_shutdown_ = true; | ||
| } | ||
| } | ||
| return {0, 0}; | ||
| } | ||
|
|
||
| } // namespace Network | ||
| } // namespace Envoy | ||
Uh oh!
There was an error while loading. Please reload this page.