diff --git a/include/crow/http_connection.h b/include/crow/http_connection.h index 9a7fa71ce..1543eebae 100644 --- a/include/crow/http_connection.h +++ b/include/crow/http_connection.h @@ -186,7 +186,8 @@ namespace crow std::tuple* middlewares, std::function& get_cached_date_str_f, detail::task_timer& task_timer, - typename Adaptor::context* adaptor_ctx_): + typename Adaptor::context* adaptor_ctx_, + int &queue_length_): adaptor_(io_service, adaptor_ctx_), handler_(handler), parser_(this), @@ -194,7 +195,8 @@ namespace crow middlewares_(middlewares), get_cached_date_str(get_cached_date_str_f), task_timer_(task_timer), - res_stream_threshold_(handler->stream_threshold()) + res_stream_threshold_(handler->stream_threshold()), + queue_length(queue_length_) { #ifdef CROW_ENABLE_DEBUG connectionCount++; @@ -404,6 +406,8 @@ namespace crow { do_write_general(); } + + queue_length -= 1; } private: @@ -700,6 +704,8 @@ namespace crow detail::task_timer& task_timer_; size_t res_stream_threshold_; + + int &queue_length; }; } // namespace crow diff --git a/include/crow/http_server.h b/include/crow/http_server.h index 9e1d787dd..deea364ec 100644 --- a/include/crow/http_server.h +++ b/include/crow/http_server.h @@ -63,6 +63,7 @@ namespace crow io_service_pool_.emplace_back(new boost::asio::io_service()); get_cached_date_str_pool_.resize(concurrency_); task_timer_pool_.resize(concurrency_); + task_queue_length_pool_.resize(concurrency_); std::vector> v; std::atomic init_count(0); @@ -107,6 +108,7 @@ namespace crow { try { + task_queue_length_pool_[i] = 0; if (io_service_pool_[i]->run() == 0) { // when io_service.run returns 0, there are no more works to do. @@ -175,21 +177,27 @@ namespace crow } private: - asio::io_service& pick_io_service() + int pick_io_service_idx() { - // TODO load balancing - roundrobin_index_++; - if (roundrobin_index_ >= io_service_pool_.size()) - roundrobin_index_ = 0; - return *io_service_pool_[roundrobin_index_]; + int min_queue_idx = 0; + + // TODO improve load balancing + for (uint i = 1; i < task_queue_length_pool_.size(); i += 1) + { + if (task_queue_length_pool_[i] < task_queue_length_pool_[min_queue_idx]) + min_queue_idx = i; + } + return min_queue_idx; } void do_accept() { - asio::io_service& is = pick_io_service(); + int service_idx = pick_io_service_idx(); + asio::io_service& is = *io_service_pool_[service_idx]; auto p = new Connection( is, handler_, server_name_, middlewares_, - get_cached_date_str_pool_[roundrobin_index_], *task_timer_pool_[roundrobin_index_], adaptor_ctx_); + get_cached_date_str_pool_[service_idx], *task_timer_pool_[service_idx], adaptor_ctx_, task_queue_length_pool_[service_idx]); + task_queue_length_pool_[service_idx] += 1; acceptor_.async_accept( p->socket(), [this, p, &is](boost::system::error_code ec) { @@ -213,6 +221,7 @@ namespace crow std::vector> io_service_pool_; std::vector task_timer_pool_; std::vector> get_cached_date_str_pool_; + std::vector task_queue_length_pool_; tcp::acceptor acceptor_; boost::asio::signal_set signals_; boost::asio::deadline_timer tick_timer_;