Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Close websockets when app is terminated #269

Merged
merged 9 commits into from
May 18, 2022
4 changes: 2 additions & 2 deletions examples/websocket/templates/ws.html
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
sock.onerror = (e)=>{
console.log('error',e)
}
sock.onclose = ()=>{
console.log('close')
sock.onclose = (e)=>{
console.log('close', e)
}
sock.onmessage = (e)=>{
$("#log").val(
Expand Down
9 changes: 8 additions & 1 deletion include/crow/app.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ namespace crow
Crow()
{}


std::atomic<int> websocket_count{0};

/// Process an Upgrade request

///
Expand Down Expand Up @@ -115,6 +118,11 @@ namespace crow
return *this;
}

std::vector<int> signals()
{
return signals_;
}

/// Set the port that Crow will handle requests on
self_t& port(std::uint16_t port)
{
Expand Down Expand Up @@ -300,7 +308,6 @@ namespace crow
{
server_ = std::move(std::unique_ptr<server_t>(new server_t(this, bindaddr_, port_, server_name_, &middlewares_, concurrency_, timeout_, nullptr)));
server_->set_tick_function(tick_interval_, tick_function_);
server_->signal_clear();
for (auto snum : signals_)
{
server_->signal_add(snum);
Expand Down
74 changes: 45 additions & 29 deletions include/crow/http_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -161,9 +161,21 @@ namespace crow

void stop()
{
io_service_.stop();
shutting_down_ = true; //Prevent the acceptor from taking new connections
while (handler_->websocket_count.load(std::memory_order_release) != 0) //Wait for the websockets to close properly
{
}
for (auto& io_service : io_service_pool_)
io_service->stop();
{
if (io_service != nullptr)
{
CROW_LOG_INFO << "Closing IO service " << &io_service;
io_service->stop(); //Close all io_services (and HTTP connections)
}
}

CROW_LOG_INFO << "Closing main IO service (" << &io_service_ << ')';
io_service_.stop(); //Close main io_service
}

void signal_clear()
Expand Down Expand Up @@ -195,33 +207,36 @@ namespace crow

void do_accept()
{
uint16_t service_idx = pick_io_service_idx();
asio::io_service& is = *io_service_pool_[service_idx];
task_queue_length_pool_[service_idx]++;
CROW_LOG_DEBUG << &is << " {" << service_idx << "} queue length: " << task_queue_length_pool_[service_idx];

auto p = new Connection<Adaptor, Handler, Middlewares...>(
is, handler_, server_name_, middlewares_,
get_cached_date_str_pool_[service_idx], *task_timer_pool_[service_idx], adaptor_ctx_, task_queue_length_pool_[service_idx]);

acceptor_.async_accept(
p->socket(),
[this, p, &is, service_idx](boost::system::error_code ec) {
if (!ec)
{
is.post(
[p] {
p->start();
});
}
else
{
task_queue_length_pool_[service_idx]--;
CROW_LOG_DEBUG << &is << " {" << service_idx << "} queue length: " << task_queue_length_pool_[service_idx];
delete p;
}
do_accept();
});
if (!shutting_down_)
{
uint16_t service_idx = pick_io_service_idx();
asio::io_service& is = *io_service_pool_[service_idx];
task_queue_length_pool_[service_idx]++;
CROW_LOG_DEBUG << &is << " {" << service_idx << "} queue length: " << task_queue_length_pool_[service_idx];

auto p = new Connection<Adaptor, Handler, Middlewares...>(
is, handler_, server_name_, middlewares_,
get_cached_date_str_pool_[service_idx], *task_timer_pool_[service_idx], adaptor_ctx_, task_queue_length_pool_[service_idx]);

acceptor_.async_accept(
p->socket(),
[this, p, &is, service_idx](boost::system::error_code ec) {
if (!ec)
{
is.post(
[p] {
p->start();
});
}
else
{
task_queue_length_pool_[service_idx]--;
CROW_LOG_DEBUG << &is << " {" << service_idx << "} queue length: " << task_queue_length_pool_[service_idx];
delete p;
}
do_accept();
});
}
}

private:
Expand All @@ -230,6 +245,7 @@ namespace crow
std::vector<detail::task_timer*> task_timer_pool_;
std::vector<std::function<std::string()>> get_cached_date_str_pool_;
tcp::acceptor acceptor_;
bool shutting_down_ = false;
boost::asio::signal_set signals_;
boost::asio::deadline_timer tick_timer_;

Expand Down
30 changes: 29 additions & 1 deletion include/crow/websocket.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#pragma once
#include <boost/algorithm/string/predicate.hpp>
#include <boost/array.hpp>
#include "crow/logging.h"
#include "crow/socket_adaptors.h"
#include "crow/http_request.h"
#include "crow/TinySHA1.hpp"
Expand Down Expand Up @@ -60,6 +61,7 @@ namespace crow
//

/// A websocket connection.

template<typename Adaptor, typename Handler>
class Connection : public connection
{
Expand All @@ -77,11 +79,13 @@ namespace crow
std::function<bool(const crow::request&)> accept_handler):
adaptor_(std::move(adaptor)),
handler_(handler),
websocket_count_(handler_->websocket_count),
open_handler_(std::move(open_handler)),
message_handler_(std::move(message_handler)),
close_handler_(std::move(close_handler)),
error_handler_(std::move(error_handler)),
accept_handler_(std::move(accept_handler))
accept_handler_(std::move(accept_handler)),
signals_(adaptor_.get_io_service())
{
if (!boost::iequals(req.get_header_value("upgrade"), "websocket"))
{
Expand All @@ -100,13 +104,27 @@ namespace crow
}
}


signals_.clear();
for (auto snum : handler_->signals())
signals_.add(snum);

// Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==
// Sec-WebSocket-Version: 13
std::string magic = req.get_header_value("Sec-WebSocket-Key") + "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
sha1::SHA1 s;
s.processBytes(magic.data(), magic.size());
uint8_t digest[20];
s.getDigestBytes(digest);

signals_.async_wait(
[&](const boost::system::error_code& e, int /*signal_number*/) {
if (!e)
{
CROW_LOG_INFO << "Quitting Websocket: " << this;
close("Server Application Terminated");
}
});
start(crow::utility::base64encode((unsigned char*)digest, 20));
}

Expand Down Expand Up @@ -290,6 +308,7 @@ namespace crow
has_mask_ = false;
#else
close_connection_ = true;
adaptor_.shutdown_readwrite();
adaptor_.close();
if (error_handler_)
error_handler_(*this);
Expand All @@ -315,6 +334,7 @@ namespace crow
else
{
close_connection_ = true;
adaptor_.shutdown_readwrite();
adaptor_.close();
if (error_handler_)
error_handler_(*this);
Expand Down Expand Up @@ -352,6 +372,7 @@ namespace crow
else
{
close_connection_ = true;
adaptor_.shutdown_readwrite();
adaptor_.close();
if (error_handler_)
error_handler_(*this);
Expand Down Expand Up @@ -386,6 +407,7 @@ namespace crow
else
{
close_connection_ = true;
adaptor_.shutdown_readwrite();
adaptor_.close();
if (error_handler_)
error_handler_(*this);
Expand Down Expand Up @@ -422,6 +444,7 @@ namespace crow
close_connection_ = true;
if (error_handler_)
error_handler_(*this);
adaptor_.shutdown_readwrite();
adaptor_.close();
}
});
Expand Down Expand Up @@ -460,6 +483,7 @@ namespace crow
close_connection_ = true;
if (error_handler_)
error_handler_(*this);
adaptor_.shutdown_readwrite();
adaptor_.close();
}
});
Expand Down Expand Up @@ -539,6 +563,7 @@ namespace crow
}
else
{
adaptor_.shutdown_readwrite();
adaptor_.close();
close_connection_ = true;
if (!is_close_handler_called_)
Expand Down Expand Up @@ -608,6 +633,7 @@ namespace crow
if (!is_close_handler_called_)
if (close_handler_)
close_handler_(*this, "uncleanly");
websocket_count_--;
if (sending_buffers_.empty() && !is_reading)
delete this;
}
Expand Down Expand Up @@ -636,12 +662,14 @@ namespace crow
bool error_occured_{false};
bool pong_received_{false};
bool is_close_handler_called_{false};
std::atomic<int>& websocket_count_;

std::function<void(crow::websocket::connection&)> open_handler_;
std::function<void(crow::websocket::connection&, const std::string&, bool)> message_handler_;
std::function<void(crow::websocket::connection&, const std::string&)> close_handler_;
std::function<void(crow::websocket::connection&)> error_handler_;
std::function<bool(const crow::request&)> accept_handler_;
boost::asio::signal_set signals_;
};
} // namespace websocket
} // namespace crow