Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement graceful shutdown #236

Open
wants to merge 11 commits into
base: master
Choose a base branch
from
Open
8 changes: 7 additions & 1 deletion bjoern.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import errno
import os
import socket
import _bjoern
Expand Down Expand Up @@ -98,5 +99,10 @@ def run(*args, **kwargs):
filename = sock.getsockname()
if filename[0] != '\0':
os.unlink(sock.getsockname())
sock.close()
try:
sock.close()
except OSError as e:
# the socket should already be closed from server.c
if e.errno != errno.EBADF:
raise
_default_instance = None
10 changes: 10 additions & 0 deletions bjoern/_bjoernmodule.c
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ static PyObject*
run(PyObject* self, PyObject* args)
{
ServerInfo info;
#ifdef WANT_GRACEFUL_SHUTDOWN
info.active_connections = 0;
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a built in way in libev to handle connection draining so that we don't have to keep a counter? I'm worried about counting bugs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately no, going through the documentation I can't see any way to do that. In case it helps, I went through a couple of iterations of the counting code and this was by far the simplest one I came up with - counting connections as "active" once http-parser tells us a message has begun, and then not counting them once we're done with the sending the response.

I contemplated doing something like... storing all of the requests in an array/linked list/hash map on the ThreadInfo struct, and then basically watching for that to drop to either drop to zero entries, or if the existing entries aren't handling any connections. That would eliminate counting bugs, but it would also be significantly more complex as we'd need to handle growing the array occasionally if there's an influx of requests. I think we'd also need to store some additional state on the Request struct for whether or not it's currently handling a request. Given the complexity, that's why I decided to go for the simpler approach :)

That said though, I think it's worth doing some testing on dropping connections halfway through requests at various points in the lifecycle to see how the counting holds up. I'll go back and do that, and report on my findings :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My testing found something - aborted connections weren't being accounted for. I've fixed that up and added some tests for it as well.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not implement counting as follows:

  • A successful accept increments the counter
  • A close decrements the counter

So, move to ev_io_on_requests and close_connection?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could do that! That would certainly be simpler. The downside to doing that is that any persistent connections that aren't being actively used will hold the event loop open until the timeout is reached. Is that acceptable?

info.shutting_down = false;
#endif

PyObject* socket;

Expand Down Expand Up @@ -154,6 +158,12 @@ PyMODINIT_FUNC INIT_BJOERN(void)
PyDict_SetItemString(features, "has_statsd_tags", Py_False);
#endif

#ifdef WANT_GRACEFUL_SHUTDOWN
PyDict_SetItemString(features, "has_graceful_shutdown", Py_True);
#else
PyDict_SetItemString(features, "has_graceful_shutdown", Py_False);
#endif

#if PY_MAJOR_VERSION >= 3
PyObject* bjoern_module = PyModule_Create(&module);
if (bjoern_module == NULL) {
Expand Down
4 changes: 4 additions & 0 deletions bjoern/request.c
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,10 @@ on_message_begin(http_parser* parser)
{
assert(PARSER->field == NULL);
REQUEST->headers = PyDict_New();
#ifdef WANT_GRACEFUL_SHUTDOWN
REQUEST->server_info->active_connections++;
DBG("active connections=%d", REQUEST->server_info->active_connections);
#endif
return 0;
}

Expand Down
168 changes: 132 additions & 36 deletions bjoern/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,19 @@ typedef enum _rw_state write_state;
typedef struct {
ServerInfo* server_info;
ev_io accept_watcher;
#ifdef WANT_SIGINT_HANDLING
ev_signal sigint_watcher;
#endif
#ifdef WANT_GRACEFUL_SHUTDOWN
ev_signal sigterm_watcher;
#endif
} ThreadInfo;

typedef void ev_io_callback(struct ev_loop*, ev_io*, const int);

#ifdef WANT_SIGINT_HANDLING
typedef void ev_signal_callback(struct ev_loop*, ev_signal*, const int);

#ifdef WANT_SIGINT_HANDLING
static ev_signal_callback ev_signal_on_sigint;
#endif

Expand All @@ -64,7 +71,14 @@ static ev_timer_callback ev_timer_ontick;
ev_timer timeout_watcher;
#endif

static ev_io_callback ev_io_on_request;
#ifdef WANT_GRACEFUL_SHUTDOWN
static ev_signal_callback ev_signal_on_sigterm;
typedef void ev_shutdown_callback(struct ev_loop*, ev_timer*, const int);
static ev_shutdown_callback ev_shutdown_ontick, ev_shutdown_timeout_ontick;
ev_timer shutdown_watcher, shutdown_timeout_watcher;
#endif

static ev_io_callback ev_io_on_requests;
static ev_io_callback ev_io_on_read;
static ev_io_callback ev_io_on_write;
static write_state on_write_sendfile(struct ev_loop*, Request*);
Expand All @@ -74,6 +88,9 @@ static bool do_sendfile(Request*);
static bool handle_nonzero_errno(Request*);
static void close_connection(struct ev_loop*, Request*);

#define THREAD_INFO ((ThreadInfo*)ev_userdata(mainloop))



void server_run(ServerInfo* server_info)
{
Expand All @@ -84,13 +101,17 @@ void server_run(ServerInfo* server_info)

ev_set_userdata(mainloop, &thread_info);

ev_io_init(&thread_info.accept_watcher, ev_io_on_request, server_info->sockfd, EV_READ);
ev_io_init(&thread_info.accept_watcher, ev_io_on_requests, server_info->sockfd, EV_READ);
ev_io_start(mainloop, &thread_info.accept_watcher);

#ifdef WANT_SIGINT_HANDLING
ev_signal sigint_watcher;
ev_signal_init(&sigint_watcher, ev_signal_on_sigint, SIGINT);
ev_signal_start(mainloop, &sigint_watcher);
ev_signal_init(&thread_info.sigint_watcher, ev_signal_on_sigint, SIGINT);
ev_signal_start(mainloop, &thread_info.sigint_watcher);
#endif

#ifdef WANT_GRACEFUL_SHUTDOWN
ev_signal_init(&thread_info.sigterm_watcher, ev_signal_on_sigterm, SIGTERM);
ev_signal_start(mainloop, &thread_info.sigterm_watcher);
#endif

#ifdef WANT_SIGNAL_HANDLING
Expand All @@ -117,20 +138,54 @@ pyerr_set_interrupt(struct ev_loop* mainloop, struct ev_cleanup* watcher, const
static void
ev_signal_on_sigint(struct ev_loop* mainloop, ev_signal* watcher, const int events)
{
#if defined(WANT_SIGNAL_HANDLING) || defined(WANT_GRACEFUL_SHUTDOWN)
ev_timer_stop(mainloop, &timeout_watcher);
#endif

/* Clean up and shut down this thread.
* (Shuts down the Python interpreter if this is the main thread) */
ev_cleanup* cleanup_watcher = malloc(sizeof(ev_cleanup));
ev_cleanup_init(cleanup_watcher, pyerr_set_interrupt);
ev_cleanup_start(mainloop, cleanup_watcher);

ev_io_stop(mainloop, &((ThreadInfo*)ev_userdata(mainloop))->accept_watcher);
ev_io_stop(mainloop, &THREAD_INFO->accept_watcher);
ev_signal_stop(mainloop, watcher);
#ifdef WANT_SIGNAL_HANDLING
ev_timer_stop(mainloop, &timeout_watcher);
#ifdef WANT_GRACEFUL_SHUTDOWN
ev_signal_stop(mainloop, &THREAD_INFO->sigterm_watcher);
#endif
}
#endif

#ifdef WANT_GRACEFUL_SHUTDOWN
static void
ev_signal_on_sigterm(struct ev_loop* mainloop, ev_signal* watcher, const int events)
{

ev_io_stop(mainloop, &THREAD_INFO->accept_watcher);
// Drain the accept queue before we close to try and minimize connection resets
ev_io_on_requests(mainloop, &THREAD_INFO->accept_watcher, 0);
// Close the socket now to ensure no more clients can talk to us
close(THREAD_INFO->server_info->sockfd);

ev_signal_stop(mainloop, watcher);
ev_timer_stop(mainloop, &timeout_watcher);
nhoad marked this conversation as resolved.
Show resolved Hide resolved

// don't shutdown immediately, but start a timer to check if we can shutdown in 100ms.
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we still need this if we change the counting to what I suggested above? It feels like this timeout stuff is unnecessarily complicated

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We would still need it, yes - without this there's nothing checking if there are any active connections, so it will always take 30 seconds to shutdown (i.e. the other timeout would be reached).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking a little bit more on this, I could remove the ev_shutdown_timeout_ontick function and associated timer if I did the following:

  • When SIGTERM is received, store ev_now(loop) + 30 in an ev_tstamp on the server
  • Make ev_shutdown_ontick check if shutdown_time <= ev_now(loop), and use that to stop the loop.

That feels a bit simpler to me than timeout juggling it's currently doing.

// That gives any of the new connections we recently accepted a good chance to start
// sending us data so we can mark them active.

DBG("SIGTERM received, %d active connections", THREAD_INFO->server_info->active_connections);
// stop processing any more keep-alives
THREAD_INFO->server_info->shutting_down = true;
ev_timer_init(&shutdown_watcher, ev_shutdown_ontick, 0., SHUTDOWN_CHECK_INTERVAL);
ev_timer_init(&shutdown_timeout_watcher, ev_shutdown_timeout_ontick, SHUTDOWN_TIMEOUT, 0.);
ev_timer_start(mainloop, &shutdown_watcher);
ev_timer_start(mainloop, &shutdown_timeout_watcher);
ev_set_priority(&shutdown_watcher, EV_MINPRI);
ev_set_priority(&shutdown_timeout_watcher, EV_MINPRI);
#endif
}

#ifdef WANT_SIGNAL_HANDLING
static void
ev_timer_ontick(struct ev_loop* mainloop, ev_timer* watcher, const int events)
Expand All @@ -141,46 +196,76 @@ ev_timer_ontick(struct ev_loop* mainloop, ev_timer* watcher, const int events)
}
#endif

#ifdef WANT_GRACEFUL_SHUTDOWN
static void
ev_shutdown_ontick(struct ev_loop* mainloop, ev_timer* watcher, const int events)
{
DBG("ev_shutdown_ontick %d", THREAD_INFO->server_info->active_connections);

if (THREAD_INFO->server_info->active_connections == 0) {
DBG("No more active connections, shutting down");

ev_timer_stop(mainloop, &shutdown_watcher);
ev_timer_stop(mainloop, &shutdown_timeout_watcher);

#ifdef WANT_SIGINT_HANDLING
ev_signal_stop(mainloop, &THREAD_INFO->sigint_watcher);
#endif
} else {
DBG("Still have active connections %d", THREAD_INFO->server_info->active_connections);
}
}

static void
ev_shutdown_timeout_ontick(struct ev_loop* mainloop, ev_timer* watcher, const int events)
{
GIL_LOCK(0);
ev_timer_stop(mainloop, &shutdown_watcher);
ev_timer_stop(mainloop, &shutdown_timeout_watcher);
// break because we've exceeded the timeout and want to just stop where we are,
// regardless of where the other watchers are
ev_break(mainloop, EVBREAK_ALL);
DBG("Shutdown took too long, %d active connections", THREAD_INFO->server_info->active_connections);
GIL_UNLOCK(0);
}
#endif

static void
ev_io_on_request(struct ev_loop* mainloop, ev_io* watcher, const int events)
ev_io_on_requests(struct ev_loop* mainloop, ev_io* watcher, const int events)
{
int client_fd;
struct sockaddr_in sockaddr;
socklen_t addrlen;

addrlen = sizeof(struct sockaddr_in);
client_fd = accept(watcher->fd, (struct sockaddr*)&sockaddr, &addrlen);
if(client_fd < 0) {
DBG("Could not accept() client: errno %d", errno);
STATSD_INCREMENT("conn.accept.error");
return;
}

int flags = fcntl(client_fd, F_GETFL, 0);
if(fcntl(client_fd, F_SETFL, (flags < 0 ? 0 : flags) | O_NONBLOCK) == -1) {
STATSD_INCREMENT("conn.accept.error");
DBG("Could not set_nonblocking() client %d: errno %d", client_fd, errno);
return;
}

GIL_LOCK(0);

Request* request = Request_new(
((ThreadInfo*)ev_userdata(mainloop))->server_info,
client_fd,
inet_ntoa(sockaddr.sin_addr)
);
while ((client_fd = accept(watcher->fd, (struct sockaddr*)&sockaddr, &addrlen)) > 0) {
nhoad marked this conversation as resolved.
Show resolved Hide resolved
nhoad marked this conversation as resolved.
Show resolved Hide resolved
DBG("Accepted %d", client_fd);
int flags = fcntl(client_fd, F_GETFL, 0);
if(fcntl(client_fd, F_SETFL, (flags < 0 ? 0 : flags) | O_NONBLOCK) == -1) {
STATSD_INCREMENT("conn.accept.error");
DBG("Could not set_nonblocking() client %d: errno %d", client_fd, errno);
return;
nhoad marked this conversation as resolved.
Show resolved Hide resolved
}

GIL_UNLOCK(0);
Request* request = Request_new(
((ThreadInfo*)ev_userdata(mainloop))->server_info,
client_fd,
inet_ntoa(sockaddr.sin_addr)
);

STATSD_INCREMENT("conn.accept.success");
STATSD_INCREMENT("conn.accept.success");

DBG_REQ(request, "Accepted client %s:%d on fd %d",
inet_ntoa(sockaddr.sin_addr), ntohs(sockaddr.sin_port), client_fd);
DBG_REQ(request, "Accepted client %s:%d on fd %d",
inet_ntoa(sockaddr.sin_addr), ntohs(sockaddr.sin_port), client_fd);

ev_io_init(&request->ev_watcher, &ev_io_on_read,
client_fd, EV_READ);
ev_io_start(mainloop, &request->ev_watcher);
ev_io_init(&request->ev_watcher, &ev_io_on_read,
client_fd, EV_READ);
ev_io_start(mainloop, &request->ev_watcher);
}

GIL_UNLOCK(0);
}


Expand Down Expand Up @@ -289,6 +374,9 @@ ev_io_on_read(struct ev_loop* mainloop, ev_io* watcher, const int events)
STATSD_INCREMENT("req.done");
break;
case aborted:
#ifdef WANT_GRACEFUL_SHUTDOWN
request->server_info->active_connections--;
#endif
close_connection(mainloop, request);
STATSD_INCREMENT("req.aborted");
break;
Expand Down Expand Up @@ -337,7 +425,12 @@ ev_io_on_write(struct ev_loop* mainloop, ev_io* watcher, const int events)
break;
case done:
STATSD_INCREMENT("resp.done");
#ifdef WANT_GRACEFUL_SHUTDOWN
request->server_info->active_connections--;
if(request->state.keep_alive && !request->server_info->shutting_down) {
#else
if(request->state.keep_alive) {
#endif
DBG_REQ(request, "done, keep-alive");
STATSD_INCREMENT("resp.done.keepalive");
ev_io_stop(mainloop, &request->ev_watcher);
Expand All @@ -360,6 +453,9 @@ ev_io_on_write(struct ev_loop* mainloop, ev_io* watcher, const int events)
start_reading(mainloop, request);
break;
case aborted:
#ifdef WANT_GRACEFUL_SHUTDOWN
request->server_info->active_connections--;
#endif
/* Response was aborted due to an error. We can't do anything graceful here
* because at least one chunk is already sent... just close the connection. */
close_connection(mainloop, request);
Expand Down
4 changes: 4 additions & 0 deletions bjoern/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ typedef struct {
char* statsd_tags;
# endif
#endif
#ifdef WANT_GRACEFUL_SHUTDOWN
int active_connections;
unsigned shutting_down : 1;
#endif
} ServerInfo;

void server_run(ServerInfo*);
Expand Down
12 changes: 11 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,22 @@
from setuptools import setup, Extension

WANT_SIGINT_HANDLING = os.environ.get('BJOERN_WANT_SIGINT_HANDLING', True)
WANT_GRACEFUL_SHUTDOWN = os.environ.get('BJOERN_WANT_GRACEFUL_SHUTDOWN', True)
WANT_SIGNAL_HANDLING = os.environ.get('BJOERN_WANT_SIGNAL_HANDLING', True)
SIGNAL_CHECK_INTERVAL = os.environ.get('BJOERN_SIGNAL_CHECK_INTERVAL', '0.1')
SHUTDOWN_CHECK_INTERVAL = os.environ.get('BJOERN_SHUTDOWN_CHECK_INTERVAL', '0.1')
SHUTDOWN_TIMEOUT = os.environ.get('BJOERN_SHUTDOWN_TIMEOUT', '30.0')
WANT_STATSD = os.environ.get('BJOERN_WANT_STATSD', False)
WANT_STATSD_TAGS = os.environ.get('BJOERN_WANT_STATSD_TAGS', False)

compile_flags = [('SIGNAL_CHECK_INTERVAL', SIGNAL_CHECK_INTERVAL)]
compile_flags = [
('SIGNAL_CHECK_INTERVAL', SIGNAL_CHECK_INTERVAL),
("SHUTDOWN_CHECK_INTERVAL", SHUTDOWN_CHECK_INTERVAL),
("SHUTDOWN_TIMEOUT", SHUTDOWN_TIMEOUT),
]

if WANT_GRACEFUL_SHUTDOWN:
compile_flags.append(('WANT_GRACEFUL_SHUTDOWN', 'yes'))
if WANT_SIGNAL_HANDLING:
compile_flags.append(('WANT_SIGNAL_HANDLING', 'yes'))
if WANT_SIGINT_HANDLING:
Expand Down
46 changes: 46 additions & 0 deletions tests/graceful-shutdown-aborted-request.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import os
import signal
import socket
import time
import threading
import http.client

import bjoern

HOST = ('127.0.0.1', 9000)

requests_completed = 0


def application(environ, start_response):
start_response('200 ok', [])
yield b"chunk1"
yield b"chunk2"
yield b"chunk3"

global requests_completed
requests_completed += 1


def requester():
# wait just a little to give the server a chance
time.sleep(0.1)
with socket.socket() as sock:
sock.connect(HOST)
# create an active connection by sending a request and delay receiving it
sock.sendall(b"GET /sleep HTTP/1.1\r\n")

# create a signal for shutdown
os.kill(os.getpid(), signal.SIGTERM)


threading.Thread(target=requester).start()
start = time.time()
bjoern.run(application, *HOST)

# we should have zero completed requests
assert requests_completed == 0, requests_completed

# we expect aborted connections to be accounted for
print(time.time() - start)
assert time.time() - start < 30
Loading