Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PoC] Update/Reload without downtime #149

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -413,8 +413,14 @@ se = ServerEngine.create(MyServer, MyWorker, {
se.run
```

See also [examples](https://github.com/fluent/serverengine/tree/master/examples).
Other features:

- `socket_manager_server = SocketManager::Server.take_over_another_server(path)`
- It starts a new manager server that has all UDP/TCP sockets of the existing manager.
- It receives the sockets and stops the existing manager after starts a new manager.
- It means that another process can take over UDP/TCP sockets without downtime.

See also [examples](https://github.com/fluent/serverengine/tree/master/examples).

## Module API

Expand Down
2 changes: 1 addition & 1 deletion lib/serverengine/multi_worker_server.rb
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def reload
def run
while true
num_alive_or_restarting = keepalive_workers
break if num_alive_or_restarting == 0
break if num_alive_or_restarting == 0 && @num_workers != 0
wait_tick
end
end
Expand Down
15 changes: 11 additions & 4 deletions lib/serverengine/socket_manager.rb
Original file line number Diff line number Diff line change
Expand Up @@ -96,11 +96,18 @@ def self.open(path = nil)
end
end

def initialize(path)
def self.take_over_another_server(path)
raise NotImplementedError, "Not supported on Windows." if ServerEngine.windows?
server = new(path, start: false)
server.take_over_another_server
server
end

def initialize(path, start: true)
@tcp_sockets = {}
@udp_sockets = {}
@mutex = Mutex.new
@path = start_server(path)
@path = start ? start_server(path) : path
end

attr_reader :path
Expand Down Expand Up @@ -159,9 +166,9 @@ def process_peer(peer)
res = SocketManager.recv_peer(peer)
return if res.nil?

pid, method, bind, port = *res
pid, method, *opts = res
begin
send_socket(peer, pid, method, bind, port)
send_socket(peer, pid, method, *opts)
rescue => e
SocketManager.send_peer(peer, e)
end
Expand Down
126 changes: 86 additions & 40 deletions lib/serverengine/socket_manager_unix.rb
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,63 @@ def recv_udp(family, peer, sent)
end

module ServerModule
def start_server(path)
# return absolute path so that client can connect to this path
# when client changed working directory
path = File.expand_path(path)

begin
old_umask = File.umask(0077) # Protect unix socket from other users
@server = UNIXServer.new(path)
ensure
File.umask(old_umask)
end

@thread = Thread.new do
begin
while peer = @server.accept
Thread.new(peer, &method(:process_peer)) # process_peer calls send_socket
end
rescue => e
unless @server.closed?
ServerEngine.dump_uncaught_error(e)
end
end
end

return path
end

def take_over_another_server
another_server = UNIXSocket.new(@path)
begin
idx = 0
while true
SocketManager.send_peer(another_server, [Process.pid, :get_listening_tcp, idx])
key = SocketManager.recv_peer(another_server)
break if key.nil?
@tcp_sockets[key] = another_server.recv_io TCPServer
idx += 1
end

idx = 0
while true
SocketManager.send_peer(another_server, [Process.pid, :get_listening_udp, idx])
key = SocketManager.recv_peer(another_server)
break if key.nil?
@udp_sockets[key] = another_server.recv_io UDPSocket
idx += 1
end

FileUtils.rm_f(@path)
start_server(@path)

SocketManager.send_peer(another_server, [Process.pid, :stop_with_socket_alive])
ensure
another_server.close
end
end

private

def listen_tcp_new(bind_ip, port)
Expand Down Expand Up @@ -76,33 +133,6 @@ def listen_udp_new(bind_ip, port)
UDPSocket.for_fd(usock.fileno)
end

def start_server(path)
# return absolute path so that client can connect to this path
# when client changed working directory
path = File.expand_path(path)

begin
old_umask = File.umask(0077) # Protect unix socket from other users
@server = UNIXServer.new(path)
ensure
File.umask(old_umask)
end

@thread = Thread.new do
begin
while peer = @server.accept
Thread.new(peer, &method(:process_peer)) # process_peer calls send_socket
end
rescue => e
unless @server.closed?
ServerEngine.dump_uncaught_error(e)
end
end
end

return path
end

def stop_server
@tcp_sockets.reject! {|key,lsock| lsock.close; true }
@udp_sockets.reject! {|key,usock| usock.close; true }
Expand All @@ -111,19 +141,35 @@ def stop_server
@thread.join if RUBY_VERSION >= "2.2"
end

def send_socket(peer, pid, method, bind, port)
sock = case method
when :listen_tcp
listen_tcp(bind, port)
when :listen_udp
listen_udp(bind, port)
else
raise ArgumentError, "Unknown method: #{method.inspect}"
end

SocketManager.send_peer(peer, nil)

peer.send_io sock
def send_socket(peer, pid, method, *opts)
case method
when :listen_tcp
bind, port = opts
sock = listen_tcp(bind, port)
SocketManager.send_peer(peer, nil)
peer.send_io sock
when :listen_udp
bind, port = opts
sock = listen_udp(bind, port)
SocketManager.send_peer(peer, nil)
peer.send_io sock
when :get_listening_tcp
idx, = opts
key = @tcp_sockets.keys[idx]
SocketManager.send_peer(peer, key)
peer.send_io(@tcp_sockets.values[idx]) if key
when :get_listening_udp
idx, = opts
key = @udp_sockets.keys[idx]
SocketManager.send_peer(peer, key)
peer.send_io(@udp_sockets.values[idx]) if key
when :stop_with_socket_alive
@tcp_sockets.clear
@udp_sockets.clear
stop_server
else
raise ArgumentError, "Unknown method: #{method.inspect}"
end
end
end

Expand Down
40 changes: 20 additions & 20 deletions lib/serverengine/socket_manager_win.rb
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,26 @@ def recv_udp(family, peer, sent)
end

module ServerModule
def start_server(addr)
# We need to take care about selecting an available port.
# By passing `nil` or `0` as `addr`, an available port is automatically selected.
# However, we should consider using NamedPipe instead of TCPServer.
@server = TCPServer.new("127.0.0.1", addr)
@thread = Thread.new do
begin
while peer = @server.accept
Thread.new(peer, &method(:process_peer)) # process_peer calls send_socket
end
rescue => e
unless @server.closed?
ServerEngine.dump_uncaught_error(e)
end
end
end

return @server.addr[1]
end

private

TCP_OPTIONS = [Socket::SOCK_STREAM, Socket::IPPROTO_TCP, TCPServer, true]
Expand Down Expand Up @@ -107,26 +127,6 @@ def htons(h)
[h].pack("S").unpack("n")[0]
end

def start_server(addr)
# We need to take care about selecting an available port.
# By passing `nil` or `0` as `addr`, an available port is automatically selected.
# However, we should consider using NamedPipe instead of TCPServer.
@server = TCPServer.new("127.0.0.1", addr)
@thread = Thread.new do
begin
while peer = @server.accept
Thread.new(peer, &method(:process_peer)) # process_peer calls send_socket
end
rescue => e
unless @server.closed?
ServerEngine.dump_uncaught_error(e)
end
end
end

return @server.addr[1]
end

def stop_server
@tcp_sockets.reject! {|key,lsock| lsock.close; true }
@udp_sockets.reject! {|key,usock| usock.close; true }
Expand Down
Loading