Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitmodules
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
[submodule "tests/third_party/corral"]
path = tests/third_party/corral
url = https://github.com/hudson-trading/corral
25 changes: 24 additions & 1 deletion tests/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
OPTION (COVERAGE "Enable gcda file generation needed by lcov" OFF)
OPTION (CPPZMQ_TEST_COROUTINE "Enable C++20 coroutine support test cases. This requires Boost.Asio" ON)

find_package(Threads)

find_package(Catch2 QUIET)
Expand Down Expand Up @@ -32,6 +35,27 @@ add_executable(
utilities.cpp
)

if (CPPZMQ_TEST_COROUTINE)
target_compile_features(
unit_tests PRIVATE cxx_std_23
)
target_compile_definitions(
unit_tests PRIVATE
CPPZMQ_ENABLE_CORRAL_COROUTINE
)
find_package(Boost CONFIG REQUIRED COMPONENTS asio)
target_include_directories(unit_tests PRIVATE third_party/corral)
target_link_libraries(
unit_tests
PRIVATE Boost::asio
)
target_sources(
unit_tests PRIVATE
async/corral/message.cpp
async/corral/common.hpp
)
endif()

target_include_directories(unit_tests PUBLIC ${CATCH_MODULE_PATH})
target_link_libraries(
unit_tests
Expand All @@ -40,7 +64,6 @@ target_link_libraries(
PRIVATE ${CMAKE_THREAD_LIBS_INIT}
)

OPTION (COVERAGE "Enable gcda file generation needed by lcov" OFF)

if (COVERAGE)
target_compile_options(unit_tests PRIVATE --coverage)
Expand Down
60 changes: 60 additions & 0 deletions tests/async/corral/common.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
#pragma once

#include <boost/asio/io_context.hpp>
#include <catch2/catch_all.hpp>
#include <catch2/catch_test_macros.hpp>

#include <chrono>
#include <zmq_async.hpp>
#include <corral/corral.h>
#include <boost/asio/steady_timer.hpp>

template<> struct corral::EventLoopTraits<boost::asio::io_context>
{
using T = boost::asio::io_context;
/// Returns a value identifying the event loop.
/// Traits for any sort of wrappers should return ID
/// of the wrapped event loop.
static EventLoopID eventLoopID(T &ex) { return EventLoopID{std::addressof(ex)}; }

/// Runs the event loop.
static void run(T &ex) { ex.run(); }

/// Tells the event loop to exit.
/// run() should return shortly thereafter.
static void stop(T &ex) { ex.stop(); }
};


struct [[nodiscard]] Timer
{
explicit Timer(std::chrono::milliseconds sec, boost::asio::io_context &io) :
timer{io, sec}
{
}
boost::asio::steady_timer timer;

public /* awaitable */:
inline auto await_ready() const noexcept
{
return timer.expiry() <= std::chrono::steady_clock::now();
}

inline auto await_suspend(std::coroutine_handle<> h) noexcept
{
timer.async_wait([h](boost::system::error_code e) {
if (!e)
h.resume();
});
}

inline auto await_resume() {}

public /* corral extensions */:
inline bool await_cancel(std::coroutine_handle<>) noexcept
{
return timer.cancel();
}

inline std::false_type await_must_resume() const noexcept { return {}; }
};
177 changes: 177 additions & 0 deletions tests/async/corral/message.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
#include "common.hpp"
#include "corral/Nursery.h"
#include "corral/wait.h"
#include "zmq.hpp"
#include "zmq_addon.hpp"
#include <catch2/catch_test_macros.hpp>
#include <utility>

using namespace std::string_literals;
using namespace std::string_view_literals;
using namespace std::chrono_literals;
using zmq::async::corral::socket_t, zmq::message_t, zmq::context_t;


TEST_CASE("basic REQ and REP", "[async_corral]")
{
boost::asio::io_context io;
context_t ctx;

constexpr auto req_msg = "Hi"sv;
constexpr auto rep_msg = "There"sv;
constexpr auto inproc_addr = "inproc://async_corral-basic";

corral::run(io, [&] -> corral::Task<> {
co_await corral::allOf(
[&] -> corral::Task<> {
socket_t socket{io, ctx, zmq::socket_type::req};
socket.connect(inproc_addr);
co_await socket.send(message_t{req_msg});
auto msg = co_await socket.recv();
REQUIRE(msg.to_string() == rep_msg);
},
[&] -> corral::Task<> {
socket_t socket{io, ctx, zmq::socket_type::rep};
socket.bind(inproc_addr);
auto r = co_await socket.recv();
REQUIRE(r.to_string() == req_msg);
co_await socket.send(message_t{rep_msg});
});
});
}

TEST_CASE("simple ROUTER and DEALER", "[async_corral]")
{
boost::asio::io_context io;
context_t ctx;

constexpr auto request_msg1 = "Test"sv;
constexpr auto request_msg2 = "ing"sv;
constexpr auto response_msg = "42!"sv;
constexpr auto response_repeat = 2;
constexpr auto inproc_addr = "inproc://async_corral-router_dealer";

auto server = [&] -> corral::Task<> {
auto external = socket_t{io, ctx, zmq::socket_type::router};
external.bind(inproc_addr);

for (;;) {
auto msg = co_await external.recv_multipart();
REQUIRE(msg.size() == 3);
REQUIRE(msg[1].to_string_view() == request_msg1);
REQUIRE(msg[2].to_string_view() == request_msg2);
auto routing_id = msg.pop();

for (auto i = 0; i < response_repeat; ++i) {
zmq::multipart_t response;
response.add(std::move(message_t{routing_id.to_string_view()}));
response.add(message_t{response_msg});
co_await external.send(std::move(response));
co_await Timer{5ms, io};
}
}
};


auto client = [&] -> corral::Task<> {
auto socket = socket_t{io.get_executor(), ctx, zmq::socket_type::dealer};
socket.connect(inproc_addr);

for (auto i = 0; i < 3; ++i) {
zmq::multipart_t msg;
msg.add(message_t{request_msg1});
msg.add(message_t{request_msg2});
co_await socket.send(std::move(msg));

for (auto i = 0; i < response_repeat; ++i) {
auto response = co_await socket.recv_multipart();
REQUIRE(response.size() == 1);
REQUIRE(response[0].to_string_view() == response_msg);
}
}
};

corral::run(io, corral::anyOf(client(), server()));
}

TEST_CASE("ROUTER forwarding", "[async_corral]")
{
// dealer client -> external router
// external router -> work dispatcher (spawn a new worker)
// worker -> internal router
// (forward) internal router -> external router


boost::asio::io_context io;
context_t ctx;

constexpr auto request_msg1 = "Test"sv;
constexpr auto request_msg2 = "ing"sv;
constexpr auto response_msg = "42!"sv;
constexpr auto response_repeat = 2;
constexpr auto inproc_external_addr =
"inproc://async_corral-router_forwarding-router";
constexpr auto inproc_internal_addr =
"inproc://async_corral-router_forwarding-rep";

auto worker = [&](socket_t dealer, zmq::multipart_t msg) -> corral::Task<> {
REQUIRE(msg.size() == 2);
REQUIRE(msg[0].to_string_view() == request_msg1);
REQUIRE(msg[1].to_string_view() == request_msg2);
for (auto i = 0; i < response_repeat; ++i) {
co_await dealer.send(message_t{response_msg});
co_await Timer{50ms, io};
}
};

auto work_dispatcher = [&](socket_t &external) -> corral::Task<> {
CORRAL_WITH_NURSERY(n)
{
for (;;) {
auto msg = co_await external.recv_multipart();

auto worker_socket = socket_t{io, ctx, zmq::socket_type::dealer};
worker_socket.set(zmq::sockopt::routing_id, msg[0].to_string_view());
worker_socket.connect(inproc_internal_addr);
msg.pop();
n.start(worker, std::move(worker_socket), std::move(msg));
}
};
};

auto forward = [&](socket_t &external, socket_t &internal) -> corral::Task<> {
for (;;) {
auto msg_from_internal = co_await internal.recv_multipart();
co_await external.send(std::move(msg_from_internal));
}
};

auto server = [&] -> corral::Task<> {
auto external = socket_t{io, ctx, zmq::socket_type::router};
auto internal = socket_t{io, ctx, zmq::socket_type::router};

external.bind(inproc_external_addr);
internal.bind(inproc_internal_addr);

co_await corral::anyOf(forward(external, internal),
work_dispatcher(external));
};

auto client = [&] -> corral::Task<> {
auto socket = socket_t{io.get_executor(), ctx, zmq::socket_type::dealer};
socket.connect(inproc_external_addr);

zmq::multipart_t msg;
msg.add(message_t{request_msg1});
msg.add(message_t{request_msg2});
co_await socket.send(std::move(msg));

for (auto i = 0; i < response_repeat; ++i) {
auto response = co_await socket.recv_multipart();
REQUIRE(response.size() == 1);
REQUIRE(response[0].to_string_view() == response_msg);
}
};

corral::run(io, corral::anyOf(client(), server()));
}
1 change: 1 addition & 0 deletions tests/third_party/corral
Submodule corral added at 2739ae
14 changes: 14 additions & 0 deletions vcpkg-configuration.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
{
"default-registry": {
"kind": "git",
"baseline": "856505bb767458c99d8e3c3ed441f59a058d3687",
"repository": "https://github.com/microsoft/vcpkg"
},
"registries": [
{
"kind": "artifact",
"location": "https://github.com/microsoft/vcpkg-ce-catalog/archive/refs/heads/main.zip",
"name": "microsoft"
}
]
}
21 changes: 21 additions & 0 deletions vcpkg.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
{
"name": "cppzmq",
"version-semver": "4.10.0",
"features": {
"coroutine": {
"description": "Dependencies for enabling C++ 20 coroutine support",
"dependencies": [
"boost-asio"
]
},
"test": {
"description": "Dependencies for testing",
"dependencies": [
{
"name": "catch2",
"version>=": "3.5.3"
}
]
}
}
}
3 changes: 3 additions & 0 deletions zmq.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@
#if CPPZMQ_LANG >= 201703L
#define ZMQ_CPP17
#endif
#if CPPZMQ_LANG >= 202002L
#define ZMQ_CPP20
#endif

#if defined(ZMQ_CPP14) && !defined(_MSC_VER)
#define ZMQ_DEPRECATED(msg) [[deprecated(msg)]]
Expand Down
Loading