Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 23 additions & 59 deletions src/httpserver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

#include "httpserver.h"

#include "init.h"
#include "chainparamsbase.h"
#include "compat.h"
#include "util.h"
Expand All @@ -20,7 +21,6 @@
#include <sys/types.h>
#include <sys/stat.h>
#include <signal.h>
#include <future>

#include <event2/thread.h>
#include <event2/buffer.h>
Expand All @@ -37,6 +37,10 @@
#endif
#endif

#include <thread>
#include <mutex>
#include <condition_variable>

/** Maximum size of http request (request line + headers) */
static const size_t MAX_HEADERS_SIZE = 8192;

Expand Down Expand Up @@ -73,34 +77,13 @@ class WorkQueue
std::deque<std::unique_ptr<WorkItem>> queue;
bool running;
size_t maxDepth;
int numThreads;

/** RAII object to keep track of number of running worker threads */
class ThreadCounter
{
public:
WorkQueue &wq;
ThreadCounter(WorkQueue &w): wq(w)
{
std::lock_guard<std::mutex> lock(wq.cs);
wq.numThreads += 1;
}
~ThreadCounter()
{
std::lock_guard<std::mutex> lock(wq.cs);
wq.numThreads -= 1;
wq.cond.notify_all();
}
};

public:
WorkQueue(size_t _maxDepth) : running(true),
maxDepth(_maxDepth),
numThreads(0)
maxDepth(_maxDepth)
{
}
/** Precondition: worker threads have all stopped
* (call WaitExit)
/** Precondition: worker threads have all stopped (they have been joined).
*/
~WorkQueue()
{
Expand All @@ -119,7 +102,6 @@ class WorkQueue
/** Thread function */
void Run()
{
ThreadCounter count(*this);
while (true) {
std::unique_ptr<WorkItem> i;
{
Expand All @@ -141,14 +123,6 @@ class WorkQueue
running = false;
cond.notify_all();
}
/** Wait for worker threads to exit */
void WaitExit()
{
std::unique_lock<std::mutex> lock(cs);
while (numThreads > 0){
cond.wait(lock);
}
}
};

struct HTTPPathHandler
Expand Down Expand Up @@ -450,20 +424,17 @@ bool UpdateHTTPServerLogging(bool enable) {
}

std::thread threadHTTP;
std::future<bool> threadResult;
static std::vector<std::thread> g_thread_http_workers;

bool StartHTTPServer()
{
LogPrint(BCLog::HTTP, "Starting HTTP server\n");
int rpcThreads = std::max((long)gArgs.GetArg("-rpcthreads", DEFAULT_HTTP_THREADS), 1L);
LogPrintf("HTTP: starting %d worker threads\n", rpcThreads);
std::packaged_task<bool(event_base*, evhttp*)> task(ThreadHTTP);
threadResult = task.get_future();
threadHTTP = std::thread(std::move(task), eventBase, eventHTTP);
threadHTTP = std::thread(ThreadHTTP, eventBase, eventHTTP);

for (int i = 0; i < rpcThreads; i++) {
std::thread rpc_worker(HTTPWorkQueueRun, workQueue);
rpc_worker.detach();
g_thread_http_workers.emplace_back(HTTPWorkQueueRun, workQueue);
}
return true;
}
Expand All @@ -472,10 +443,6 @@ void InterruptHTTPServer()
{
LogPrint(BCLog::HTTP, "Interrupting HTTP server\n");
if (eventHTTP) {
// Unlisten sockets
for (evhttp_bound_socket *socket : boundSockets) {
evhttp_del_accept_socket(eventHTTP, socket);
}
// Reject requests on current connections
evhttp_set_gencb(eventHTTP, http_reject_request_cb, nullptr);
}
Expand All @@ -488,27 +455,21 @@ void StopHTTPServer()
LogPrint(BCLog::HTTP, "Stopping HTTP server\n");
if (workQueue) {
LogPrint(BCLog::HTTP, "Waiting for HTTP worker threads to exit\n");
#ifndef WIN32
// ToDo: Disabling WaitExit() for Windows platforms is an ugly workaround for the wallet not
// closing during a repair-restart. It doesn't hurt, though, because threadHTTP.timed_join
// below takes care of this and sends a loopbreak.
workQueue->WaitExit();
#endif
for (auto& thread: g_thread_http_workers) {
thread.join();
}
g_thread_http_workers.clear();
delete workQueue;
workQueue = nullptr;
}
// Unlisten sockets, these are what make the event loop running, which means
// that after this and all connections are closed the event loop will quit.
for (evhttp_bound_socket *socket : boundSockets) {
evhttp_del_accept_socket(eventHTTP, socket);
}
boundSockets.clear();
if (eventBase) {
LogPrint(BCLog::HTTP, "Waiting for HTTP event thread to exit\n");
// Give event loop a few seconds to exit (to send back last RPC responses), then break it
// Before this was solved with event_base_loopexit, but that didn't work as expected in
// at least libevent 2.0.21 and always introduced a delay. In libevent
// master that appears to be solved, so in the future that solution
// could be used again (if desirable).
// (see discussion in https://github.com/bitcoin/bitcoin/pull/6990)
if (threadResult.valid() && threadResult.wait_for(std::chrono::milliseconds(2000)) == std::future_status::timeout) {
LogPrintf("HTTP event loop did not exit within allotted time, sending loopbreak\n");
event_base_loopbreak(eventBase);
}
threadHTTP.join();
}
if (eventHTTP) {
Expand Down Expand Up @@ -613,6 +574,9 @@ void HTTPRequest::WriteHeader(const std::string& hdr, const std::string& value)
void HTTPRequest::WriteReply(int nStatus, const std::string& strReply)
{
assert(!replySent && req);
if (ShutdownRequested()) {
WriteHeader("Connection", "close");
}
// Send event to main http thread to send reply message
struct evbuffer* evb = evhttp_request_get_output_buffer(req);
assert(evb);
Expand Down
1 change: 1 addition & 0 deletions src/rpc/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ static const CRPCConvertParam vRPCConvertParams[] =
{ "echojson", 7, "arg7" },
{ "echojson", 8, "arg8" },
{ "echojson", 9, "arg9" },
{ "stop", 0, "wait" },
};

class CRPCConvertTable
Expand Down
8 changes: 7 additions & 1 deletion src/rpc/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -293,13 +293,19 @@ UniValue help(const JSONRPCRequest& jsonRequest)
UniValue stop(const JSONRPCRequest& jsonRequest)
{
// Accept the deprecated and ignored 'detach' boolean argument
// Also accept the hidden 'wait' integer argument (milliseconds)
// For instance, 'stop 1000' makes the call wait 1 second before returning
// to the client (intended for testing)
if (jsonRequest.fHelp || jsonRequest.params.size() > 1)
throw std::runtime_error(
"stop\n"
"\nStop Dash Core server.");
// Event loop will exit after current HTTP requests have been handled, so
// this reply will get back to the client.
StartShutdown();
if (jsonRequest.params[0].isNum()) {
MilliSleep(jsonRequest.params[0].get_int());
}
return "Dash Core server stopping";
}

Expand Down Expand Up @@ -327,7 +333,7 @@ static const CRPCCommand vRPCCommands[] =
// --------------------- ------------------------ ----------------------- ------ ----------
/* Overall control/query calls */
{ "control", "help", &help, true, {"command"} },
{ "control", "stop", &stop, true, {} },
{ "control", "stop", &stop, true, {"wait"} },
{ "control", "uptime", &uptime, true, {} },
};

Expand Down
28 changes: 28 additions & 0 deletions test/functional/feature_shutdown.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
#!/usr/bin/env python3
# Copyright (c) 2018 The Bitcoin Core developers
# Distributed under the MIT software license, see the accompanying
# file COPYING or http://www.opensource.org/licenses/mit-license.php.
"""Test bitcoind shutdown."""

from test_framework.test_framework import BitcoinTestFramework
from test_framework.util import assert_equal, get_rpc_proxy
from threading import Thread

def test_long_call(node):
block = node.waitfornewblock()
assert_equal(block['height'], 0)

class ShutdownTest(BitcoinTestFramework):

def set_test_params(self):
self.setup_clean_chain = True
self.num_nodes = 1

def run_test(self):
node = get_rpc_proxy(self.nodes[0].url, 1, timeout=600, coveragedir=self.nodes[0].coverage_dir)
Thread(target=test_long_call, args=(node,)).start()
# wait 1 second to ensure event loop waits for current connections to close
self.stop_node(0, wait=1000)

if __name__ == '__main__':
ShutdownTest().main()
8 changes: 4 additions & 4 deletions test/functional/test_framework/test_framework.py
Original file line number Diff line number Diff line change
Expand Up @@ -280,16 +280,16 @@ def start_nodes(self, extra_args=None, stderr=None):
for node in self.nodes:
coverage.write_all_rpc_commands(self.options.coveragedir, node.rpc)

def stop_node(self, i):
def stop_node(self, i, wait=0):
"""Stop a dashd test node"""
self.nodes[i].stop_node()
self.nodes[i].stop_node(wait=wait)
self.nodes[i].wait_until_stopped()

def stop_nodes(self):
def stop_nodes(self, wait=0):
"""Stop multiple dashd test nodes"""
for node in self.nodes:
# Issue RPC to stop nodes
node.stop_node()
node.stop_node(wait=wait)

for node in self.nodes:
# Wait for nodes to stop
Expand Down
4 changes: 2 additions & 2 deletions test/functional/test_framework/test_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,13 +119,13 @@ def get_wallet_rpc(self, wallet_name):
wallet_path = "wallet/%s" % wallet_name
return self.rpc / wallet_path

def stop_node(self):
def stop_node(self, wait=0):
"""Stop the node."""
if not self.running:
return
self.log.debug("Stopping node")
try:
self.stop()
self.stop(wait=wait)
except http.client.CannotSendRequest:
self.log.exception("Unable to stop node.")
del self.p2ps[:]
Expand Down
1 change: 1 addition & 0 deletions test/functional/test_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@
'resendwallettransactions.py',
'minchainwork.py',
'p2p-acceptblock.py', # NOTE: needs dash_hash to pass
'feature_shutdown.py',
]

EXTENDED_SCRIPTS = [
Expand Down