Skip to content
This repository has been archived by the owner on Aug 29, 2024. It is now read-only.

example on stop accept #19

Open
matti opened this issue Jan 26, 2019 · 26 comments
Open

example on stop accept #19

matti opened this issue Jan 26, 2019 · 26 comments
Assignees
Labels

Comments

@matti
Copy link

matti commented Jan 26, 2019

How do I stop accepting connections?

In my server I want to have shutdown behavior that gracefully handles all existing connections, but does not accept new ones.

@ioquatix
Copy link
Member

ioquatix commented Jan 26, 2019

Good question

@ioquatix
Copy link
Member

Graceful shutdown isn’t too complicated it could be done with a timeout around children tasks... would need to manage accept loop a bit more explicitly.... then on shutdown wait for all children tasks to complete.... after timeout invoke stop on them... then exit yourself.

@ioquatix
Copy link
Member

Here is a rough idea of how to do it:

require 'async'
require 'async/io'

Async.logger.info!

GRACE_TIME = 5

def graceful_server(endpoint, condition = nil, &block)
	Async do |parent_task|
		server_tasks = []
		
		Async.logger.info "Binding to #{endpoint}..."
		
		endpoint.each do |endpoint|
			server = endpoint.bind
			server.listen(Socket::SOMAXCONN)
			
			Async.logger.info "Accepting connections from #{server}..."
			server_tasks << parent_task.async do
				server.accept_each(task: parent_task, &block)
			ensure
				server.close
			end
		end
		
		if condition
			Async.logger.info "Waiting on #{condition}..."
			condition.wait
			
			Async.logger.info("Task tree"){|buffer| parent_task.print_hierarchy(buffer)}
			
			Async.logger.info "Stopping #{server_tasks.count} accept loops..."
			server_tasks.each(&:stop)
			
			children = parent_task.children
			
			Async.logger.info "Stopping #{children.count} connections..."
			
			if children.any?
				Async.logger.warn("Waiting for #{children.count} connections...")
				
				parent_task.sleep GRACE_TIME
			end
			
			children = parent_task.children
			
			if children.any?
				Async.logger.warn("Stopping #{children.count} connections...")
				
				children.each(&:stop)
			end
		end
	end
end

def echo_server(endpoint)
	condition = Async do |task|
		task.sleep 60 while true
	end
	
	graceful_server(endpoint, condition) do |client, task:|
		# This is an asynchronous block within the current reactor:
		while data = client.read(512)
			# This produces out-of-order responses.
			task.sleep(rand * 0.01)
		
			client.write(data.reverse)
		end
	ensure
		client.close
	end
	
	return condition
end

def echo_client(endpoint, data)
	Async do |task|
		endpoint.connect do |peer|
			10.times do
				Async.logger.info "Client #{data}: sleeping"
				task.sleep 2
				
				result = peer.write(data)
				message = peer.read(512)
				
				Async.logger.info "Sent #{data}, got response: #{message}"
			end
		end
	end
end

Async do |task|
	endpoint = Async::IO::Endpoint.tcp('0.0.0.0', 9000)
	
	Async.logger.info "Starting server..."
	server = echo_server(endpoint)
	
	Async.logger.info "Clients connecting..."
	1.times.collect do |i|
		echo_client(endpoint, "Hello World #{i}")
	end
	
	task.sleep 5
	
	Async.logger.info "Stopping server..."
	server.stop
end

Async.logger.info "Finished..."

Honestly, it seems pretty complicated, but I guess graceful shutdown isn't that trivial.

@ioquatix
Copy link
Member

It should work to put a timeout around the tasks, but I think there is a bug... needs to be fixed.

@ioquatix
Copy link
Member

				parent_task.with_timeout(GRACE_TIME) do
					children.each(&:wait)
				rescue Async::TimeoutError
					Async.logger.warn("Could not terminate child connections...")
				end

Task#wait is not timeout safe. :( doh.

@ioquatix ioquatix self-assigned this Jan 26, 2019
@ioquatix
Copy link
Member

Okay, this is fixed in socketry/async@0b63195

I'll need to test a bit more.

@ioquatix
Copy link
Member

Okay, this implementation works with async 1.15.1:

require 'async'
require 'async/io'

Async.logger.info!

GRACE_TIME = 5

def graceful_server(endpoint, condition = nil, &block)
	Async do |parent_task|
		server_tasks = []
		
		Async.logger.info "Binding to #{endpoint}..."
		
		endpoint.each do |endpoint|
			server = endpoint.bind
			server.listen(Socket::SOMAXCONN)
			
			Async.logger.info "Accepting connections from #{server}..."
			server_tasks << parent_task.async do
				server.accept_each(task: parent_task, &block)
			ensure
				server.close
			end
		end
		
		if condition
			Async.logger.info "Waiting on #{condition}..."
			condition.wait
			
			Async.logger.info("Task tree"){|buffer| parent_task.print_hierarchy(buffer)}
			
			Async.logger.info "Stopping #{server_tasks.count} accept loops..."
			server_tasks.each(&:stop)
			
			children = parent_task.children
			
			Async.logger.info "Stopping #{children.count} connections..."
			
			if children.any?
				Async.logger.warn("Waiting for #{children.count} connections...")
				
				parent_task.with_timeout(GRACE_TIME) do
					children.each(&:wait)
				rescue Async::TimeoutError
					Async.logger.warn("Could not terminate child connections...")
				end
			end
			
			children = parent_task.children
			
			if children.any?
				Async.logger.warn("Stopping #{children.count} connections...")
				
				children.each(&:stop)
			end
		end
	end
end

def echo_server(endpoint)
	condition = Async do |task|
		task.sleep 60 while true
	end
	
	graceful_server(endpoint, condition) do |client, task:|
		# This is an asynchronous block within the current reactor:
		while data = client.read(512)
			# This produces out-of-order responses.
			task.sleep(rand * 0.01)
		
			client.write(data.reverse)
		end
	ensure
		client.close
	end
	
	return condition
end

def echo_client(endpoint, data)
	Async do |task|
		endpoint.connect do |peer|
			10.times do
				Async.logger.info "Client #{data}: sleeping"
				task.sleep 2
				
				result = peer.write(data)
				message = peer.read(512)
				
				Async.logger.info "Sent #{data}, got response: #{message}"
			end
		end
	end
end

Async do |task|
	endpoint = Async::IO::Endpoint.tcp('0.0.0.0', 9000)
	
	Async.logger.info "Starting server..."
	server = echo_server(endpoint)
	
	Async.logger.info "Clients connecting..."
	1.times.collect do |i|
		echo_client(endpoint, "Hello World #{i}")
	end
	
	task.sleep 5
	
	Async.logger.info "Stopping server..."
	server.stop
end

Async.logger.info "Finished..."

@ioquatix
Copy link
Member

I think you probably also want to tell the server tasks to signal the clients that shutdown is in progress...

To do this, you'll need some broadcast message. It will ultimately depend on how you implement your connection state... there is nothing wrong with making an array of connections and then just connections.each{|connection| connection.write("Goodbye!")} before waiting on the children tasks.

@ioquatix
Copy link
Member

One more thing to keep in mind is if you have a graceful shutdown, you need to handle the situations where the graceful shutdown fails in some way and invoke #stop on everything.

@matti
Copy link
Author

matti commented Jan 26, 2019

wow, thanks!

when are there multiple server_tasks ? to me it looks like there is just one server_task ?

			server_tasks << parent_task.async do
				server.accept_each(task: parent_task, &block)
			ensure
				server.close
			end

@ioquatix
Copy link
Member

bind to localhost instead of 0.0.0.0 and you'll get IPv4 and IPv6

@matti
Copy link
Author

matti commented Jan 26, 2019

what about threads (Async::Container) - I tried to follow falcon source with SharedEndpoint, but I can't come up with how to do it without server.accept_each that returns the task..

@ioquatix
Copy link
Member

How would threads solve the problem? I think it only makes more problems.

@ioquatix
Copy link
Member

Sorry for the terse reply.

So, now that I have few more minutes to spare.

Generally speaking, threads and processes are for scaling your application across multiple processors and computers.

They shouldn't be for the purposes of implementing functionality, at least not in this situation.

Graceful shutdown is going to be directly part of the protocol. e.g. how websocket and http/2 can send close and goaway respectively. Connection is still open at the OS level, but essentially closed from the application POV. How you negotiate this will depend on what you are doing. For example, a web server might be servicing a long running request. Should you wait until the request is finished before terminating? Yes, if you are feeling friendly. What happens if you wait 30 minutes and user is still connected. Maybe you get a bit more frustrated and terminate the connection more forcefully. All these steps, short of calling close(fd) are client/server protocol concerns.

Introducing threads won't solve this problem, it's only tangentially related to resource management, but doesn't solve the client/server protocol concerns e.g. grace period, knowing when the server can shut down immediately, etc. Some of those concerns are addressed in the example code above, but generally speaking, it's not an easy problem to solve.

In the first instance, perhaps you can describe what you should do to tell the remote end that the server is shutting down. Do you send a message?

@matti
Copy link
Author

matti commented Jan 27, 2019

Uhm, I should have opened up a bit more what I want to do :)

So I used your approach to write a TCP reverse proxy which just proxies requests through to a webserver (nginx). Connection draining works great and I even added reuse_port to the mix to see if I can a) drain connections and exit and b) replace the proxy with another instance without clients noticing. Great success!

Then I did some fast benchmarking and if I ran wrk directly against nginx I got "6000r/s" and when I ran it against my proxy, I got "3000r/s". I think that the performance is acceptable for my usecase, but I was just curious what would happen if I added threads. So: I don't want to use threads to implement some weird connection draining, but simply to see how it would affect the performance.

So I took a look at falcon which uses Async::Container and SharedEndpoint - but I can't come up with a way to use accept_each for the SharedEndpoint - only accept - and with accept I don't get server_tasks that I could stop for connection draining.

My threaded implementation seemed to perform better than "3000r/s" though, but I really need connection draining (eg. stop accepting new connections) - I tried calling .close and .stop on everything, but new connections were still being accepted while the server was draining (which worked).

So my question is that how to use .accept_each with SharedEndpoint which seems to be required for threaded approach? It was also getting late while I tried to refactor my code, so I might have missed something obvious...

And as always, thank you so much for your answers and time. Socketry rules.

@ioquatix
Copy link
Member

I see, that makes sense. It's late so I'll take a look tomorrow, but do you think you can share your code?

@ioquatix
Copy link
Member

You should be able to get more than 3000 req/s - what OS are you testing on?

@matti
Copy link
Author

matti commented Jan 27, 2019

I made a mistake by coding directly in a private codebase... But my code follows your example very closely.

I see if I can extract a version of it, but basically it's your example + attempt to use falcon style threaded setup...

@matti
Copy link
Author

matti commented Jan 27, 2019

I have no need to use "falcon style" - it was just the only example where I could see this in action.

I guess my question could also be reworded as "how to do connection draining with falcon", since to me it looks like you can not because falcon uses async-http-server which uses accept and not accept_each.

@ioquatix
Copy link
Member

Is this just a straight socket proxy or are you proxying HTTP requests?

@matti
Copy link
Author

matti commented Jan 27, 2019

I did the benchmarking on OSX, but I think it's safe to ignore all numbers - it was a quick try.

@ioquatix
Copy link
Member

OSX nio4r uses IO.select - it's not that efficient. Try Linux.

I guess my question could also be reworded as "how to do connection draining with falcon", since to me it looks like you can not because falcon uses async-http-server which uses accept and not accept_each.

The process model of falcon is really limited at the moment. It's the next thing to sort out.

For example, if a process/thread crashes, it won't get restarted.

Connection draining/graceful shutdown is also on the cards. It's required for rolling restarts which I hope to support soon in a generic way.

@matti
Copy link
Author

matti commented Jan 27, 2019

Proxying http requests, but as TCP (so basically your example with tiny modifications to not echo, but to make another connection)

@matti
Copy link
Author

matti commented Jan 27, 2019

I might be optimistic, but to me it looks like draining would be a "simple" thing to do in Falcon if only it would stop accepting new connections? I just don't know how :)

Okay - I'll try Linux - and as I said this need for threads is just to see what happens. And to understand async-* APIs. I'm happy with just one thread/process for now.

@matti
Copy link
Author

matti commented Jan 27, 2019

Figured out why the performance was bad - I had peer.read(512) when the message ("welcome to nginx" html-page) is 850. After I changed that to peer.read(1024). With 512 the requests/s is around 500 and with 1024 it's 3900 (!)

So my quick comparison:

            Proxy   Nginx
    wrk     3900    5500
    hey     3800    4400

where Proxy is the async-ruby component and Nginx means direct connection without proxy.

Pushed some code to: https://github.com/matti/socketry-proxy

@ioquatix
Copy link
Member

Nice, thanks! I will take a look.

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
Projects
None yet
Development

No branches or pull requests

2 participants