diff --git a/.gitmodules b/.gitmodules new file mode 100644 index 00000000..11baf1b1 --- /dev/null +++ b/.gitmodules @@ -0,0 +1,3 @@ +[submodule "tests/third_party/corral"] + path = tests/third_party/corral + url = https://github.com/hudson-trading/corral diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index e20aaaaf..ee7b02bb 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -1,3 +1,6 @@ +OPTION (COVERAGE "Enable gcda file generation needed by lcov" OFF) +OPTION (CPPZMQ_TEST_COROUTINE "Enable C++20 coroutine support test cases. This requires Boost.Asio" ON) + find_package(Threads) find_package(Catch2 QUIET) @@ -32,6 +35,27 @@ add_executable( utilities.cpp ) +if (CPPZMQ_TEST_COROUTINE) + target_compile_features( + unit_tests PRIVATE cxx_std_23 + ) + target_compile_definitions( + unit_tests PRIVATE + CPPZMQ_ENABLE_CORRAL_COROUTINE + ) + find_package(Boost CONFIG REQUIRED COMPONENTS asio) + target_include_directories(unit_tests PRIVATE third_party/corral) + target_link_libraries( + unit_tests + PRIVATE Boost::asio + ) + target_sources( + unit_tests PRIVATE + async/corral/message.cpp + async/corral/common.hpp + ) +endif() + target_include_directories(unit_tests PUBLIC ${CATCH_MODULE_PATH}) target_link_libraries( unit_tests @@ -40,7 +64,6 @@ target_link_libraries( PRIVATE ${CMAKE_THREAD_LIBS_INIT} ) -OPTION (COVERAGE "Enable gcda file generation needed by lcov" OFF) if (COVERAGE) target_compile_options(unit_tests PRIVATE --coverage) diff --git a/tests/async/corral/common.hpp b/tests/async/corral/common.hpp new file mode 100644 index 00000000..c8c78a61 --- /dev/null +++ b/tests/async/corral/common.hpp @@ -0,0 +1,60 @@ +#pragma once + +#include +#include +#include + +#include +#include +#include +#include + +template<> struct corral::EventLoopTraits +{ + using T = boost::asio::io_context; + /// Returns a value identifying the event loop. + /// Traits for any sort of wrappers should return ID + /// of the wrapped event loop. + static EventLoopID eventLoopID(T &ex) { return EventLoopID{std::addressof(ex)}; } + + /// Runs the event loop. + static void run(T &ex) { ex.run(); } + + /// Tells the event loop to exit. + /// run() should return shortly thereafter. + static void stop(T &ex) { ex.stop(); } +}; + + +struct [[nodiscard]] Timer +{ + explicit Timer(std::chrono::milliseconds sec, boost::asio::io_context &io) : + timer{io, sec} + { + } + boost::asio::steady_timer timer; + + public /* awaitable */: + inline auto await_ready() const noexcept + { + return timer.expiry() <= std::chrono::steady_clock::now(); + } + + inline auto await_suspend(std::coroutine_handle<> h) noexcept + { + timer.async_wait([h](boost::system::error_code e) { + if (!e) + h.resume(); + }); + } + + inline auto await_resume() {} + + public /* corral extensions */: + inline bool await_cancel(std::coroutine_handle<>) noexcept + { + return timer.cancel(); + } + + inline std::false_type await_must_resume() const noexcept { return {}; } +}; diff --git a/tests/async/corral/message.cpp b/tests/async/corral/message.cpp new file mode 100644 index 00000000..62973a4d --- /dev/null +++ b/tests/async/corral/message.cpp @@ -0,0 +1,177 @@ +#include "common.hpp" +#include "corral/Nursery.h" +#include "corral/wait.h" +#include "zmq.hpp" +#include "zmq_addon.hpp" +#include +#include + +using namespace std::string_literals; +using namespace std::string_view_literals; +using namespace std::chrono_literals; +using zmq::async::corral::socket_t, zmq::message_t, zmq::context_t; + + +TEST_CASE("basic REQ and REP", "[async_corral]") +{ + boost::asio::io_context io; + context_t ctx; + + constexpr auto req_msg = "Hi"sv; + constexpr auto rep_msg = "There"sv; + constexpr auto inproc_addr = "inproc://async_corral-basic"; + + corral::run(io, [&] -> corral::Task<> { + co_await corral::allOf( + [&] -> corral::Task<> { + socket_t socket{io, ctx, zmq::socket_type::req}; + socket.connect(inproc_addr); + co_await socket.send(message_t{req_msg}); + auto msg = co_await socket.recv(); + REQUIRE(msg.to_string() == rep_msg); + }, + [&] -> corral::Task<> { + socket_t socket{io, ctx, zmq::socket_type::rep}; + socket.bind(inproc_addr); + auto r = co_await socket.recv(); + REQUIRE(r.to_string() == req_msg); + co_await socket.send(message_t{rep_msg}); + }); + }); +} + +TEST_CASE("simple ROUTER and DEALER", "[async_corral]") +{ + boost::asio::io_context io; + context_t ctx; + + constexpr auto request_msg1 = "Test"sv; + constexpr auto request_msg2 = "ing"sv; + constexpr auto response_msg = "42!"sv; + constexpr auto response_repeat = 2; + constexpr auto inproc_addr = "inproc://async_corral-router_dealer"; + + auto server = [&] -> corral::Task<> { + auto external = socket_t{io, ctx, zmq::socket_type::router}; + external.bind(inproc_addr); + + for (;;) { + auto msg = co_await external.recv_multipart(); + REQUIRE(msg.size() == 3); + REQUIRE(msg[1].to_string_view() == request_msg1); + REQUIRE(msg[2].to_string_view() == request_msg2); + auto routing_id = msg.pop(); + + for (auto i = 0; i < response_repeat; ++i) { + zmq::multipart_t response; + response.add(std::move(message_t{routing_id.to_string_view()})); + response.add(message_t{response_msg}); + co_await external.send(std::move(response)); + co_await Timer{5ms, io}; + } + } + }; + + + auto client = [&] -> corral::Task<> { + auto socket = socket_t{io.get_executor(), ctx, zmq::socket_type::dealer}; + socket.connect(inproc_addr); + + for (auto i = 0; i < 3; ++i) { + zmq::multipart_t msg; + msg.add(message_t{request_msg1}); + msg.add(message_t{request_msg2}); + co_await socket.send(std::move(msg)); + + for (auto i = 0; i < response_repeat; ++i) { + auto response = co_await socket.recv_multipart(); + REQUIRE(response.size() == 1); + REQUIRE(response[0].to_string_view() == response_msg); + } + } + }; + + corral::run(io, corral::anyOf(client(), server())); +} + +TEST_CASE("ROUTER forwarding", "[async_corral]") +{ + // dealer client -> external router + // external router -> work dispatcher (spawn a new worker) + // worker -> internal router + // (forward) internal router -> external router + + + boost::asio::io_context io; + context_t ctx; + + constexpr auto request_msg1 = "Test"sv; + constexpr auto request_msg2 = "ing"sv; + constexpr auto response_msg = "42!"sv; + constexpr auto response_repeat = 2; + constexpr auto inproc_external_addr = + "inproc://async_corral-router_forwarding-router"; + constexpr auto inproc_internal_addr = + "inproc://async_corral-router_forwarding-rep"; + + auto worker = [&](socket_t dealer, zmq::multipart_t msg) -> corral::Task<> { + REQUIRE(msg.size() == 2); + REQUIRE(msg[0].to_string_view() == request_msg1); + REQUIRE(msg[1].to_string_view() == request_msg2); + for (auto i = 0; i < response_repeat; ++i) { + co_await dealer.send(message_t{response_msg}); + co_await Timer{50ms, io}; + } + }; + + auto work_dispatcher = [&](socket_t &external) -> corral::Task<> { + CORRAL_WITH_NURSERY(n) + { + for (;;) { + auto msg = co_await external.recv_multipart(); + + auto worker_socket = socket_t{io, ctx, zmq::socket_type::dealer}; + worker_socket.set(zmq::sockopt::routing_id, msg[0].to_string_view()); + worker_socket.connect(inproc_internal_addr); + msg.pop(); + n.start(worker, std::move(worker_socket), std::move(msg)); + } + }; + }; + + auto forward = [&](socket_t &external, socket_t &internal) -> corral::Task<> { + for (;;) { + auto msg_from_internal = co_await internal.recv_multipart(); + co_await external.send(std::move(msg_from_internal)); + } + }; + + auto server = [&] -> corral::Task<> { + auto external = socket_t{io, ctx, zmq::socket_type::router}; + auto internal = socket_t{io, ctx, zmq::socket_type::router}; + + external.bind(inproc_external_addr); + internal.bind(inproc_internal_addr); + + co_await corral::anyOf(forward(external, internal), + work_dispatcher(external)); + }; + + auto client = [&] -> corral::Task<> { + auto socket = socket_t{io.get_executor(), ctx, zmq::socket_type::dealer}; + socket.connect(inproc_external_addr); + + zmq::multipart_t msg; + msg.add(message_t{request_msg1}); + msg.add(message_t{request_msg2}); + co_await socket.send(std::move(msg)); + + for (auto i = 0; i < response_repeat; ++i) { + auto response = co_await socket.recv_multipart(); + REQUIRE(response.size() == 1); + REQUIRE(response[0].to_string_view() == response_msg); + } + }; + + corral::run(io, corral::anyOf(client(), server())); +} diff --git a/tests/third_party/corral b/tests/third_party/corral new file mode 160000 index 00000000..2739aefe --- /dev/null +++ b/tests/third_party/corral @@ -0,0 +1 @@ +Subproject commit 2739aefe6e40bdb4ca7158c222ce7f1ad84c1922 diff --git a/vcpkg-configuration.json b/vcpkg-configuration.json new file mode 100644 index 00000000..69ef8447 --- /dev/null +++ b/vcpkg-configuration.json @@ -0,0 +1,14 @@ +{ + "default-registry": { + "kind": "git", + "baseline": "856505bb767458c99d8e3c3ed441f59a058d3687", + "repository": "https://github.com/microsoft/vcpkg" + }, + "registries": [ + { + "kind": "artifact", + "location": "https://github.com/microsoft/vcpkg-ce-catalog/archive/refs/heads/main.zip", + "name": "microsoft" + } + ] +} diff --git a/vcpkg.json b/vcpkg.json new file mode 100644 index 00000000..a4fa8fe3 --- /dev/null +++ b/vcpkg.json @@ -0,0 +1,21 @@ +{ + "name": "cppzmq", + "version-semver": "4.10.0", + "features": { + "coroutine": { + "description": "Dependencies for enabling C++ 20 coroutine support", + "dependencies": [ + "boost-asio" + ] + }, + "test": { + "description": "Dependencies for testing", + "dependencies": [ + { + "name": "catch2", + "version>=": "3.5.3" + } + ] + } + } +} \ No newline at end of file diff --git a/zmq.hpp b/zmq.hpp index 757eeebe..0bfac6e5 100644 --- a/zmq.hpp +++ b/zmq.hpp @@ -60,6 +60,9 @@ #if CPPZMQ_LANG >= 201703L #define ZMQ_CPP17 #endif +#if CPPZMQ_LANG >= 202002L +#define ZMQ_CPP20 +#endif #if defined(ZMQ_CPP14) && !defined(_MSC_VER) #define ZMQ_DEPRECATED(msg) [[deprecated(msg)]] diff --git a/zmq_async.hpp b/zmq_async.hpp new file mode 100644 index 00000000..01e945bf --- /dev/null +++ b/zmq_async.hpp @@ -0,0 +1,656 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include + + +#if !defined(ZMQ_CPP20) && defined(CPPZMQ_ENABLE_CORRAL_COROUTINE) +#error "Coroutine support enabled with a C++ standard lower than C++20" +#endif +#if defined(ZMQ_CPP20) && defined(CPPZMQ_ENABLE_CORRAL_COROUTINE) +// As coroutine support is only avaiable for C++20 and upward, +// there is no point in using the ZMQ_XXX macros for compatibility. +// Everything that's available in C++20 can be used here (e.g. inline, noexcept). + + +namespace zmq +{ + +namespace async::corral +{ + +using native_fd_watcher_t = +#if defined(_WIN32) || defined(_WIN64) + boost::asio::windows::stream_handle; +#else + boost::asio::posix::stream_descriptor; +#endif + +class recv_error_t : public std::exception +{ + public: + inline explicit recv_error_t() noexcept = default; + inline virtual const char *what() const noexcept override + { + return "Failed receiving ZeroMQ message."; + } +}; + +class send_error_t : public std::exception +{ + public: + inline explicit send_error_t() noexcept = default; + inline virtual const char *what() const noexcept override + { + return "Failed sending ZeroMQ message."; + } +}; + +namespace details +{ + +template Fn> struct [[nodiscard]] defer_guard +{ + [[no_unique_address, msvc::no_unique_address]] Fn fn; + + constexpr defer_guard(Fn &&f) noexcept( + ::std::is_nothrow_move_constructible_v) : + fn(::std::move(f)) + { + } + + constexpr ~defer_guard() noexcept(::std::is_nothrow_invocable_v) { fn(); } +}; + +/// +/// \brief Asynchronously send message to a ZeroMQ socket. +/// \return [zmq::message_t] - The message to be sent. +/// \throw [send_error_t] - On send error. +/// +struct [[nodiscard]] async_send_awaitable_t +{ + public /* ctor */: + inline async_send_awaitable_t(native_fd_watcher_t &watcher, + zmq::socket_ref socket_ref, + zmq::message_t message, + zmq::send_flags flags, + bool &is_sending_multipart) : + m_wait(watcher), + m_socket(socket_ref), + m_msg(std::move(message)), + m_flags(flags), + m_is_sending_multipart(is_sending_multipart) + { + } + + public /* awaitable */: + /// + /// \brief An optimization which avoids suspending the coroutine if it isn't necessary. + /// \return true - when there are already poll-out events and `await_resume` will be + /// immediately invoked. + /// + inline bool await_ready() const noexcept + { + return !m_is_sending_multipart && has_pollout_events(); + } + + inline auto await_resume() + { + auto result = + m_socket.send(std::move(m_msg), zmq::send_flags::dontwait | m_flags); + if (!result) [[unlikely]] + throw send_error_t{}; + } + + inline void await_suspend(std::coroutine_handle<> h) noexcept + { + m_wait.async_wait(native_fd_watcher_t::wait_write, + [this, h](boost::system::error_code err) -> void { + if (!err) { + if (!m_is_sending_multipart + && has_pollout_events()) { + if (h) + h.resume(); + } else { + // schedule for another awake. + this->await_suspend(h); + } + } + }); + } + + public /* corral extensions */: + inline bool await_cancel(std::coroutine_handle<> h) noexcept + { + m_wait.cancel(); + return true; + } + + inline bool await_must_resume() const noexcept { return false; } + + private: + inline bool has_pollout_events() const noexcept + { + return m_socket.get(zmq::sockopt::events) & ZMQ_POLLOUT; + } + + private: + native_fd_watcher_t &m_wait; + zmq::socket_ref m_socket; + zmq::message_t m_msg; + zmq::send_flags m_flags; + bool const &m_is_sending_multipart; +}; + +/// +/// \brief Asynchronously receive message from a ZeroMQ socket. +/// \return [zmq::message_t] - The received message. +/// \throw [recv_error_t] - On receive error. +/// +struct [[nodiscard]] async_recv_awaitable_t +{ + public /* ctor */: + inline async_recv_awaitable_t(native_fd_watcher_t &watcher, + zmq::socket_ref socket_ref, + zmq::recv_flags flags, + bool &is_receiving_multipart) : + m_wait(watcher), + m_socket(socket_ref), + m_flags(flags), + m_is_receiving_multipart(is_receiving_multipart) + { + } + + public /* awaitable */: + /// + /// \brief An optimization which avoids suspending the coroutine if it isn't necessary. + /// \return true - when there are already poll-in events and `await_resume` will be + /// immediately invoked. + /// + inline bool await_ready() const noexcept + { + return !m_is_receiving_multipart && has_pollin_events(); + } + + [[nodiscard]] inline auto await_resume() -> zmq::message_t + { + zmq::message_t msg{}; + auto result = m_socket.recv(msg, zmq::recv_flags::dontwait | m_flags); + if (!result) [[unlikely]] + throw recv_error_t{}; + return msg; + } + + inline void await_suspend(std::coroutine_handle<> h) noexcept + { + m_wait.async_wait( + native_fd_watcher_t::wait_read, + [this, h](boost::system::error_code err) -> void { + if (!err) { + // only resume if there are poll-in events + if (!m_is_receiving_multipart && has_pollin_events()) { + if (h) + h.resume(); + } else { + // If the file descriptor can be read but socket doesn't + // have poll-in event, schedule for another awake. + this->await_suspend(h); + } + } + }); + } + + public /* corral extensions */: + inline bool await_cancel(std::coroutine_handle<> h) noexcept + { + m_wait.cancel(); + return true; + } + + [[nodiscard]] inline bool await_must_resume() const noexcept { return false; } + + private: + [[nodiscard]] inline bool has_pollin_events() const noexcept + { + return m_socket.get(zmq::sockopt::events) & ZMQ_POLLIN; + } + + private: + native_fd_watcher_t &m_wait; + zmq::socket_ref m_socket; + zmq::recv_flags m_flags; + bool const &m_is_receiving_multipart; +}; + +/// +/// \brief Asynchronously send message to a ZeroMQ socket. +/// \return [zmq::multipart_t] - The message to be sent. +/// \throw [send_error_t] - On send error. +/// +struct [[nodiscard]] async_send_multipart_awaitable_t +{ + public /* ctor */: + inline async_send_multipart_awaitable_t(native_fd_watcher_t &watcher, + zmq::socket_ref socket_ref, + zmq::multipart_t message, + zmq::send_flags flags, + bool &is_sending_multipart) : + m_wait(watcher), + m_socket(socket_ref), + m_message(std::move(message)), + // `sndmore` flag should not be controlled by the user. + m_flags(flags & ~(zmq::send_flags::sndmore)), + m_is_sending_multipart(is_sending_multipart) + { + } + + public /* awaitable */: + /// + /// \note The early-resume optimization isn't available here: + /// The filter must be invoked to properly check if it's OK to submit message + /// into the send queue. + /// + inline bool await_ready() const noexcept { return false; } + + inline void await_resume() + { + if (!m_succeed) [[unlikely]] + throw send_error_t{}; + } + + /// \note Refer to resume_filter for resumption behavior + inline void await_suspend(std::coroutine_handle<> h) noexcept + { + m_started = true; + m_wait.async_wait(native_fd_watcher_t::wait_write, + [this, h](boost::system::error_code err) -> void { + if (!err) { + this->resume_filter(h, m_message); + } + }); + } + + public /* corral extensions */: + inline bool await_cancel(std::coroutine_handle<> h) noexcept + { + m_wait.cancel(); + // If either the send proccess hasn't started, or it has finished, + // Cancellation does not require any additional efforts. + if (!m_started || m_succeed) [[likely]] + return true; + + // Otherwise: + // The multipart message should never be queued half-way without being sent, + // continue sending the message + this->await_suspend(h); + return false; + } + + inline bool await_must_resume() const noexcept + { + return m_started && !m_succeed; + } + + private: + inline bool has_pollout_events() const noexcept + { + return m_socket.get(zmq::sockopt::events) & ZMQ_POLLOUT; + } + + /// + /// \brief This function is used as a callback. It's called when the file descriptor + /// can be written with data. + /// + inline void resume_filter(std::coroutine_handle<> h, + zmq::multipart_t &msg) noexcept + { + if (!m_is_sending_multipart && has_pollout_events()) { + m_is_sending_multipart = true; + defer_guard defer = [&] { m_is_sending_multipart = false; }; + + /// + /// \brief This boolean is used to indicate if this function had sent anything: + /// On sending failure, it can either be: + /// + /// - that the queue is full (and therefore should be suspended + /// and scheduled for another awake). + /// - or that the socket is disconnected. + /// + bool sent_something = false; + + // Instead of repeatedly suspending and awaking, try to fill the send queue + // once, and fallback to suspension if no message can be submitted to the send + // queue at the moment. + while (true) { + auto m = msg.pop(); + const bool more = !msg.empty(); + + const auto result = m_socket.send( + std::move(m), + (more ? zmq::send_flags::sndmore : zmq::send_flags::none) + | zmq::send_flags::dontwait | m_flags); + + if (result) [[likely]] { + sent_something = true; + } else [[unlikely]] { + if (sent_something) + goto schedule_another_awake; + + // Resume without setting the success flag, + // to indicate that the operation has failed. + goto resume_coroutine; + } + + if (!more) { + m_succeed = true; + goto resume_coroutine; + } + } + } + + schedule_another_awake: + /// If the above operations neither success nor fail, it means that + /// the multipart message haven't been sent. Schedule for another awake. + this->await_suspend(h); + return; + + resume_coroutine: + if (h) + h.resume(); + return; + } + + private: + native_fd_watcher_t &m_wait; + zmq::socket_ref m_socket; + zmq::multipart_t m_message; + bool m_started = false; + bool m_succeed = false; + zmq::send_flags m_flags; + bool &m_is_sending_multipart; +}; + +/// +/// \brief Asynchronously receive message from a ZeroMQ socket. +/// \return [zmq::multipart_t] - The received message. +/// \throw [recv_error_t] - On receive error. +/// +struct [[nodiscard]] async_recv_multipart_awaitable_t +{ + public /* ctor */: + inline async_recv_multipart_awaitable_t(native_fd_watcher_t &watcher, + zmq::socket_ref socket_ref, + zmq::recv_flags flags, + bool &is_receiving_multipart) : + m_wait(watcher), + m_socket(socket_ref), + m_flags(flags), + m_is_receiving_multipart(is_receiving_multipart) + { + } + + public /* awaitable */: + /// + /// \brief An optimization which avoids suspending the coroutine if it isn't necessary. + /// \return true - when there are already poll-in events and `await_resume` will be + /// immediately invoked. + /// + inline bool await_ready() const noexcept + { + return !m_is_receiving_multipart && has_pollin_events(); + } + + /// + /// \note ZeroMQ's multipart message is not equivalent to streams in other libraries. + /// A multipart message is atomic: either it's fully received or it's not. + /// It's like sending a std::deque in a `zmq::message_t`. + /// Therefore it's not necessary to repeatedly suspend the coroutine in the receive loop: + /// When poll-in event comes in, the whole multipart message has already been received by + /// ZeroMQ. + /// + [[nodiscard]] inline auto await_resume() -> zmq::multipart_t + { + if (m_is_receiving_multipart) [[unlikely]] + throw std::runtime_error{ + "Internal error: This socket has multiple readers"}; + + m_is_receiving_multipart = true; + defer_guard defer = [&] { m_is_receiving_multipart = false; }; + zmq::multipart_t msg; + while (true) { + zmq::message_t m; + auto result = m_socket.recv(m, zmq::recv_flags::dontwait | m_flags); + if (!result) [[unlikely]] + throw recv_error_t{}; + if (m.more()) { + msg.add(std::move(m)); + continue; + } else { + msg.add(std::move(m)); + break; + } + } + return msg; + } + + inline void await_suspend(std::coroutine_handle<> h) noexcept + { + m_wait.async_wait( + native_fd_watcher_t::wait_read, + [this, h](boost::system::error_code err) -> void { + if (!err) { + if (!m_is_receiving_multipart && has_pollin_events()) { + if (h) + h.resume(); + } else { + // If the file descriptor can be read but socket doesn't + // have poll-in event, schedule for another awake. + this->await_suspend(h); + } + } + }); + } + + public /* corral extensions */: + inline bool await_cancel(std::coroutine_handle<> h) noexcept + { + m_wait.cancel(); + return true; + } + + [[nodiscard]] inline bool await_must_resume() const noexcept { return false; } + + private: + inline bool has_pollin_events() const noexcept + { + return m_socket.get(zmq::sockopt::events) & ZMQ_POLLIN; + } + + private: + native_fd_watcher_t &m_wait; + zmq::socket_ref m_socket; + zmq::recv_flags m_flags; + bool &m_is_receiving_multipart; +}; +} + + +struct socket_t +{ + public /* constructors */: + inline socket_t(boost::asio::any_io_executor io_executor, + ::zmq::context_t &context, + ::zmq::socket_type type) : + m_socket(context, type), + m_watcher(io_executor, m_socket.get(::zmq::sockopt::fd)) + { + } + + /// syntax sugar to automatically get executor from io_context + inline socket_t(boost::asio::io_context &io_context, + zmq::context_t &context, + zmq::socket_type type) : + socket_t(io_context.get_executor(), context, type) + { + } + + socket_t(socket_t &&) noexcept = default; + + + /// + /// \brief Releases the file descriptor without closing it. + /// stream_descriptor attempts to close the file descriptor on destruction, + /// this prevents that from happening because the file descriptor should be + /// managed by ZeroMQ instead. + /// + inline ~socket_t() { m_watcher.release(); } + + public /* interfaces */: + inline auto send(zmq::message_t msg, + zmq::send_flags flags = zmq::send_flags::none) + -> details::async_send_awaitable_t + { + return {m_watcher, m_socket, std::move(msg), flags, m_is_sending_multipart}; + } + + inline auto recv(zmq::recv_flags flags = zmq::recv_flags::none) + -> details::async_recv_awaitable_t + { + return {m_watcher, m_socket, flags, m_is_receiving_multipart}; + } + + inline auto send_multipart(zmq::multipart_t msg, + zmq::send_flags flags = zmq::send_flags::none) + -> details::async_send_multipart_awaitable_t + { + return {m_watcher, m_socket, std::move(msg), flags, m_is_sending_multipart}; + } + + inline auto send(zmq::multipart_t msg, + zmq::send_flags flags = zmq::send_flags::none) + { + return this->send_multipart(std::move(msg), flags); + } + + inline auto recv_multipart(zmq::recv_flags flags = zmq::recv_flags::none) + -> details::async_recv_multipart_awaitable_t + { + return {m_watcher, m_socket, flags, m_is_receiving_multipart}; + } + + public /* proxy */: + inline decltype(auto) bind(const char *addr) { return m_socket.bind(addr); } + inline decltype(auto) bind(std::string addr) + { + return m_socket.bind(std::move(addr)); + } + inline decltype(auto) connect(const char *addr) + { + return m_socket.connect(addr); + } + inline decltype(auto) connect(std::string addr) + { + return m_socket.connect(std::move(addr)); + } + inline decltype(auto) close() { return m_socket.close(); } + + inline decltype(auto) swap(zmq::socket_t &other) { return m_socket.swap(other); } + inline decltype(auto) disconnect(const char *addr) + { + return m_socket.disconnect(addr); + } + inline decltype(auto) disconnect(std::string addr) + { + return m_socket.disconnect(std::move(addr)); + } + + template + inline decltype(auto) get(sockopt::integral_option _) + { + return m_socket.get(_); + } + + inline decltype(auto) handle() { return m_socket.handle(); } + inline decltype(auto) join(const char *group) { return m_socket.join(group); } + inline decltype(auto) leave(const char *group) { return m_socket.leave(group); } + + // Set integral socket option, e.g. + // `socket.set(zmq::sockopt::linger, 0)` + template + inline void set(sockopt::integral_option _, const T &val) + { + m_socket.set(_, val); + } + + // Set integral socket option from boolean, e.g. + // `socket.set(zmq::sockopt::immediate, false)` + template + inline void set(sockopt::integral_option _, bool val) + { + m_socket.set(_, val); + } + + // Set array socket option, e.g. + // `socket.set(zmq::sockopt::plain_username, "foo123")` + template + inline void set(sockopt::array_option _, const char *buf) + { + m_socket.set(_, buf); + } + + // Set array socket option, e.g. + // `socket.set(zmq::sockopt::routing_id, zmq::buffer(id))` + template + inline void set(sockopt::array_option _, const_buffer buf) + { + m_socket.set(_, buf); + } + + // Set array socket option, e.g. + // `socket.set(zmq::sockopt::routing_id, id_str)` + template + inline void set(sockopt::array_option _, const std::string &buf) + { + m_socket.set(_, buf); + } + + // Set array socket option, e.g. + // `socket.set(zmq::sockopt::routing_id, id_str)` + template + inline void set(sockopt::array_option _, std::string_view buf) + { + m_socket.set(_, buf); + } + + inline decltype(auto) unbind(const char *addr) { return m_socket.unbind(addr); } + inline decltype(auto) unbind(std::string addr) + { + return m_socket.unbind(std::move(addr)); + } + + inline auto &get_socket() { return m_socket; } + + private: + zmq::socket_t m_socket; + native_fd_watcher_t m_watcher; + /// \brief true - if this socket is currently processing a multipart message. + /// This blocks the socket from being requested to process another multipart + /// message at the same time. + bool m_is_sending_multipart = false; + bool m_is_receiving_multipart = false; +}; + + +} + +#endif + +} // namespace zmq + +#undef RESULT \ No newline at end of file