From ae6c005b87a1b19170a3dcb9736f98cd28225719 Mon Sep 17 00:00:00 2001 From: qicosmos Date: Sat, 27 Jan 2024 19:09:47 +0800 Subject: [PATCH] simplify router --- examples/client/main.cpp | 10 ++-- examples/server/main.cpp | 14 ++--- include/rest_rpc/connection.h | 13 ++++- include/rest_rpc/router.h | 106 ++++++++++++++++------------------ include/rest_rpc/rpc_server.h | 8 +-- 5 files changed, 77 insertions(+), 74 deletions(-) diff --git a/examples/client/main.cpp b/examples/client/main.cpp index c35d60a..bd12645 100644 --- a/examples/client/main.cpp +++ b/examples/client/main.cpp @@ -217,7 +217,7 @@ void test_echo() { } { - auto result = client.call("async_echo", "test"); + auto result = client.call("delay_echo", "test"); std::cout << result << std::endl; } } @@ -349,11 +349,11 @@ void test_callback() { rpc_client client; bool r = client.connect("127.0.0.1", 9000); - for (size_t i = 0; i < 100; i++) { + for (size_t i = 0; i < 10; i++) { std::string test = "test" + std::to_string(i + 1); // set timeout 100ms - client.async_call<100>( - "async_echo", + client.async_call<10000>( + "delay_echo", [](const asio::error_code &ec, string_view data) { if (ec) { std::cout << ec.value() << " timeout" << std::endl; @@ -361,7 +361,7 @@ void test_callback() { } auto str = as(data); - std::cout << "echo " << str << '\n'; + std::cout << "delay echo " << str << '\n'; }, test); diff --git a/examples/server/main.cpp b/examples/server/main.cpp index 3305c90..47da07f 100644 --- a/examples/server/main.cpp +++ b/examples/server/main.cpp @@ -75,11 +75,11 @@ std::string get_name(rpc_conn conn, const person &p) { // if you want to response later, you can use async model, you can control when // to response -void async_echo(rpc_conn conn, const std::string &src) { - auto req_id = - conn.lock()->request_id(); // note: you need keep the request id at that - // time, and pass it into the async thread - +void delay_echo(rpc_conn conn, const std::string &src) { + auto sp = conn.lock(); + sp->set_delay(true); + auto req_id = sp->request_id(); // note: you need keep the request id at that + // time, and pass it into the async thread std::thread thd([conn, req_id, src] { std::this_thread::sleep_for(std::chrono::seconds(1)); auto conn_sp = conn.lock(); @@ -121,7 +121,7 @@ dummy1 get_dummy(rpc_conn conn, dummy1 d) { return d; } int main() { // benchmark_test(); - rpc_server server(9000, std::thread::hardware_concurrency()); + rpc_server server(9000, std::thread::hardware_concurrency(), 3600); dummy d; server.register_handler("add", &dummy::add, &d); @@ -135,7 +135,7 @@ int main() { server.register_handler("upload", upload); server.register_handler("download", download); server.register_handler("get_name", get_name); - server.register_handler("async_echo", async_echo); + server.register_handler("delay_echo", delay_echo); server.register_handler("echo", echo); server.register_handler("get_int", get_int); diff --git a/include/rest_rpc/connection.h b/include/rest_rpc/connection.h index 1690aa3..7219e74 100644 --- a/include/rest_rpc/connection.h +++ b/include/rest_rpc/connection.h @@ -101,6 +101,8 @@ class connection : public std::enable_shared_from_this, callback_ = std::move(callback); } + void set_delay(bool delay) { delay_ = delay; } + void on_network_error(std::function, std::string)> &on_net_err) { on_net_err_ = &on_net_err; @@ -208,8 +210,14 @@ class connection : public std::enable_shared_from_this, if (!ec) { read_head(); if (req_type_ == request_type::req_res) { - router_.route(func_id, body_.data(), length, - this->shared_from_this()); + route_result_t ret = router_.route( + func_id, nonstd::string_view{body_.data(), length}, + this->shared_from_this()); + if (delay_) { + delay_ = false; + } else { + response(req_id_, std::move(ret.result)); + } } else if (req_type_ == request_type::sub_pub) { try { msgpack_codec codec; @@ -420,6 +428,7 @@ class connection : public std::enable_shared_from_this, nullptr; router &router_; nonstd::any user_data_; + bool delay_ = false; }; } // namespace rpc_service } // namespace rest_rpc diff --git a/include/rest_rpc/router.h b/include/rest_rpc/router.h index 1cde1dc..8f7d268 100644 --- a/include/rest_rpc/router.h +++ b/include/rest_rpc/router.h @@ -4,33 +4,38 @@ #include "codec.h" #include "md5.hpp" #include "meta_util.hpp" +#include "string_view.hpp" #include "use_asio.hpp" #include #include #include namespace rest_rpc { -enum class ExecMode { sync, async }; -const constexpr ExecMode Async = ExecMode::async; - namespace rpc_service { class connection; +enum class router_error { ok, no_such_function, has_exception, unkonw }; + +struct route_result_t { + router_error ec = router_error::unkonw; + std::string result; +}; + class router : asio::noncopyable { public: - template + template void register_handler(std::string const &name, Function f) { uint32_t key = MD5::MD5Hash32(name.data()); key2func_name_.emplace(key, name); - return register_nonmember_func(key, std::move(f)); + return register_nonmember_func(key, std::move(f)); } - template + template void register_handler(std::string const &name, const Function &f, Self *self) { uint32_t key = MD5::MD5Hash32(name.data()); key2func_name_.emplace(key, name); - return register_member_func(key, f, self); + return register_member_func(key, f, self); } void remove_handler(std::string const &name) { @@ -48,14 +53,9 @@ class router : asio::noncopyable { } template - void route(uint32_t key, const char *data, std::size_t size, - std::weak_ptr conn) { - auto conn_sp = conn.lock(); - if (!conn_sp) { - return; - } - - auto req_id = conn_sp->request_id(); + route_result_t route(uint32_t key, nonstd::string_view data, + std::weak_ptr conn) { + route_result_t route_result{}; std::string result; try { msgpack_codec codec; @@ -63,26 +63,28 @@ class router : asio::noncopyable { if (it == map_invokers_.end()) { result = codec.pack_args_str( result_code::FAIL, "unknown function: " + get_name_by_key(key)); - conn_sp->response(req_id, std::move(result)); - return; - } - - ExecMode model; - it->second(conn, data, size, result, model); - if (model == ExecMode::sync) { - if (result.size() >= MAX_BUF_LEN) { - result = codec.pack_args_str( - result_code::FAIL, - "the response result is out of range: more than 10M " + - get_name_by_key(key)); - } - conn_sp->response(req_id, std::move(result)); + route_result.ec = router_error::no_such_function; + } else { + it->second(conn, data, result); + route_result.ec = router_error::ok; } } catch (const std::exception &ex) { msgpack_codec codec; - result = codec.pack_args_str(result_code::FAIL, ex.what()); - conn_sp->response(req_id, std::move(result)); + result = codec.pack_args_str( + result_code::FAIL, + std::string("exception occur when call").append(ex.what())); + route_result.ec = router_error::has_exception; + } catch (...) { + msgpack_codec codec; + result = codec.pack_args_str( + result_code::FAIL, std::string("unknown exception occur when call ") + .append(get_name_by_key(key))); + route_result.ec = router_error::no_such_function; } + + route_result.result = std::move(result); + + return route_result; } router() = default; @@ -152,19 +154,15 @@ class router : asio::noncopyable { result = msgpack_codec::pack_args_str(result_code::OK, r); } - template struct invoker { - template + template struct invoker { static inline void apply(const Function &func, - std::weak_ptr conn, const char *data, - size_t size, std::string &result, - ExecMode &exe_model) { + std::weak_ptr conn, + nonstd::string_view str, std::string &result) { using args_tuple = typename function_traits::bare_tuple_type; - exe_model = ExecMode::sync; msgpack_codec codec; try { - auto tp = codec.unpack(data, size); + auto tp = codec.unpack(str.data(), str.size()); call(func, conn, result, std::move(tp)); - exe_model = model; } catch (std::invalid_argument &e) { result = codec.pack_args_str(result_code::FAIL, e.what()); } catch (const std::exception &e) { @@ -172,18 +170,16 @@ class router : asio::noncopyable { } } - template + template static inline void apply_member(const Function &func, Self *self, std::weak_ptr conn, - const char *data, size_t size, - std::string &result, ExecMode &exe_model) { + nonstd::string_view str, + std::string &result) { using args_tuple = typename function_traits::bare_tuple_type; - exe_model = ExecMode::sync; msgpack_codec codec; try { - auto tp = codec.unpack(data, size); + auto tp = codec.unpack(str.data(), str.size()); call_member(func, self, conn, result, std::move(tp)); - exe_model = model; } catch (std::invalid_argument &e) { result = codec.pack_args_str(result_code::FAIL, e.what()); } catch (const std::exception &e) { @@ -192,25 +188,23 @@ class router : asio::noncopyable { } }; - template + template void register_nonmember_func(uint32_t key, Function f) { this->map_invokers_[key] = {std::bind( - &invoker::template apply, std::move(f), - std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, - std::placeholders::_4, std::placeholders::_5)}; + &invoker::apply, std::move(f), std::placeholders::_1, + std::placeholders::_2, std::placeholders::_3)}; } - template + template void register_member_func(uint32_t key, const Function &f, Self *self) { this->map_invokers_[key] = {std::bind( - &invoker::template apply_member, f, self, - std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, - std::placeholders::_4, std::placeholders::_5)}; + &invoker::template apply_member, f, self, + std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)}; } - std::unordered_map< - uint32_t, std::function, const char *, - size_t, std::string &, ExecMode &model)>> + std::unordered_map, + nonstd::string_view, std::string &)>> map_invokers_; std::unordered_map key2func_name_; }; diff --git a/include/rest_rpc/rpc_server.h b/include/rest_rpc/rpc_server.h index 9aab7f2..28f3126 100644 --- a/include/rest_rpc/rpc_server.h +++ b/include/rest_rpc/rpc_server.h @@ -52,15 +52,15 @@ class rpc_server : private asio::noncopyable { void run() { io_service_pool_.run(); } - template + template void register_handler(std::string const &name, const Function &f) { - router_.register_handler(name, f); + router_.register_handler(name, f); } - template + template void register_handler(std::string const &name, const Function &f, Self *self) { - router_.register_handler(name, f, self); + router_.register_handler(name, f, self); } void set_conn_timeout_callback(std::function callback) {