diff --git a/async-http.gemspec b/async-http.gemspec index 18ffb04..396b1b2 100644 --- a/async-http.gemspec +++ b/async-http.gemspec @@ -28,8 +28,8 @@ Gem::Specification.new do |spec| spec.add_dependency "async-pool", "~> 0.7" spec.add_dependency "io-endpoint", "~> 0.11" spec.add_dependency "io-stream", "~> 0.4" - spec.add_dependency "protocol-http", "~> 0.36" - spec.add_dependency "protocol-http1", "~> 0.23" + spec.add_dependency "protocol-http", "~> 0.37" + spec.add_dependency "protocol-http1", "~> 0.24" spec.add_dependency "protocol-http2", "~> 0.18" spec.add_dependency "traces", ">= 0.10" end diff --git a/fixtures/async/http/a_protocol.rb b/fixtures/async/http/a_protocol.rb index 5d3870c..5248333 100644 --- a/fixtures/async/http/a_protocol.rb +++ b/fixtures/async/http/a_protocol.rb @@ -14,6 +14,7 @@ require 'tempfile' require 'protocol/http/body/file' +require "protocol/http/body/buffered" require 'sus/fixtures/async/http' @@ -100,7 +101,7 @@ module HTTP end with 'buffered body' do - let(:body) {Async::HTTP::Body::Buffered.new(["Hello World"])} + let(:body) {::Protocol::HTTP::Body::Buffered.new(["Hello World"])} let(:response) {::Protocol::HTTP::Response[200, {}, body]} let(:app) do @@ -410,7 +411,7 @@ module HTTP end with 'body with incorrect length' do - let(:bad_body) {Async::HTTP::Body::Buffered.new(["Borked"], 10)} + let(:bad_body) {::Protocol::HTTP::Body::Buffered.new(["Borked"], 10)} let(:app) do ::Protocol::HTTP::Middleware.for do |request| diff --git a/lib/async/http/client.rb b/lib/async/http/client.rb index 26bfc8c..7c2c8a8 100755 --- a/lib/async/http/client.rb +++ b/lib/async/http/client.rb @@ -187,18 +187,7 @@ def call(request) def make_response(request, connection) response = request.call(connection) - if body = request.body - finishable = Body::Finishable.new(body) - end - - # The connection won't be released until the body is completely read/released. - ::Protocol::HTTP::Body::Completable.wrap(response) do - # TODO: We should probably wait until the request is fully consumed and/or the connection is ready before releasing it back into the pool. - finishable&.wait - - # Release the connection back into the pool: - @pool.release(connection) - end + response.pool = @pool return response end diff --git a/lib/async/http/protocol/http1/client.rb b/lib/async/http/protocol/http1/client.rb index c7fa99b..c34eef0 100644 --- a/lib/async/http/protocol/http1/client.rb +++ b/lib/async/http/protocol/http1/client.rb @@ -10,11 +10,25 @@ module HTTP module Protocol module HTTP1 class Client < Connection + def initialize(...) + super + + @pool = nil + end + + attr_accessor :pool + + def closed! + super + + if pool = @pool + @pool = nil + pool.release(self) + end + end + # Used by the client to send requests to the remote server. def call(request, task: Task.current) - # We need to keep track of connections which are not in the initial "ready" state. - @ready = false - Console.logger.debug(self) {"#{request.method} #{request.path} #{request.headers.inspect}"} # Mark the start of the trailers: @@ -54,12 +68,11 @@ def call(request, task: Task.current) end response = Response.read(self, request) - @ready = true return response rescue # This will ensure that #reusable? returns false. - @stream.close + self.close raise end diff --git a/lib/async/http/protocol/http1/connection.rb b/lib/async/http/protocol/http1/connection.rb index 23a3ed6..0f88c15 100755 --- a/lib/async/http/protocol/http1/connection.rb +++ b/lib/async/http/protocol/http1/connection.rb @@ -16,12 +16,11 @@ class Connection < ::Protocol::HTTP1::Connection def initialize(stream, version) super(stream) - @ready = true @version = version end def to_s - "\#<#{self.class} negotiated #{@version}, currently #{@ready ? 'ready' : 'in-use'}>" + "\#<#{self.class} negotiated #{@version}, #{@state}>" end def as_json(...) @@ -62,11 +61,11 @@ def concurrency # Can we use this connection to make requests? def viable? - @ready && @stream&.readable? + self.idle? && @stream&.readable? end def reusable? - @ready && @persistent && @stream && !@stream.closed? + @persistent && @stream && !@stream.closed? end end end diff --git a/lib/async/http/protocol/http1/response.rb b/lib/async/http/protocol/http1/response.rb index a7a8c6f..08afc35 100644 --- a/lib/async/http/protocol/http1/response.rb +++ b/lib/async/http/protocol/http1/response.rb @@ -39,6 +39,14 @@ def initialize(connection, version, status, reason, headers, body) super(version, status, headers, body, protocol) end + def pool=(pool) + if @connection.idle? or @connection.closed? + pool.release(@connection) + else + @connection.pool = pool + end + end + def connection @connection end diff --git a/lib/async/http/protocol/http1/server.rb b/lib/async/http/protocol/http1/server.rb index 2451363..b30e0c6 100644 --- a/lib/async/http/protocol/http1/server.rb +++ b/lib/async/http/protocol/http1/server.rb @@ -22,7 +22,7 @@ def fail_request(status) write_body(@version, nil) rescue => error # At this point, there is very little we can do to recover: - Console::Event::Failure.for(error).emit(self, "Failed to write failure response.", severity: :debug) + Console::Event::Failure.for(error).emit(self, "Failed to write failure response!", severity: :debug) end def next_request @@ -37,7 +37,7 @@ def next_request end return request - rescue ::Protocol::HTTP1::BadRequest + rescue ::Protocol::HTTP1::BadRequest => error fail_request(400) # Conceivably we could retry here, but we don't really know how bad the error is, so it's better to just fail: raise diff --git a/lib/async/http/protocol/http2/response.rb b/lib/async/http/protocol/http2/response.rb index 5d79713..923b5c0 100644 --- a/lib/async/http/protocol/http2/response.rb +++ b/lib/async/http/protocol/http2/response.rb @@ -137,6 +137,16 @@ def initialize(stream) attr :stream attr :request + def pool=(pool) + # If we are already closed, the stream can be released now: + if @stream.closed? + pool.release(@stream.connection) + else + # Otherwise, we will release the stream when it is closed: + @stream.pool = pool + end + end + def connection @stream.connection end diff --git a/lib/async/http/protocol/http2/stream.rb b/lib/async/http/protocol/http2/stream.rb index 60f3fad..c9a98ee 100644 --- a/lib/async/http/protocol/http2/stream.rb +++ b/lib/async/http/protocol/http2/stream.rb @@ -20,6 +20,8 @@ def initialize(*) @headers = nil + @pool = nil + # Input buffer, reading request body, or response body (receive_data): @length = nil @input = nil @@ -30,6 +32,8 @@ def initialize(*) attr_accessor :headers + attr_accessor :pool + attr :input def add_header(key, value) @@ -158,6 +162,10 @@ def closed(error) @output = nil end + if pool = @pool and @connection + pool.release(@connection) + end + return self end end diff --git a/lib/async/http/proxy.rb b/lib/async/http/proxy.rb index 338031b..d1d7bee 100644 --- a/lib/async/http/proxy.rb +++ b/lib/async/http/proxy.rb @@ -96,6 +96,7 @@ def connect(&block) end else # This ensures we don't leave a response dangling: + input.close response.close raise ConnectFailure, response diff --git a/test/async/http/protocol/http11.rb b/test/async/http/protocol/http11.rb index 3fa8db7..c8bec4b 100755 --- a/test/async/http/protocol/http11.rb +++ b/test/async/http/protocol/http11.rb @@ -21,7 +21,7 @@ response = client.get("/") connection = response.connection - expect(connection.as_json).to be == "#" + expect(connection.as_json).to be =~ /Async::HTTP::Protocol::HTTP1::Client negotiated HTTP/ ensure response&.close end @@ -109,7 +109,7 @@ def around end with 'full hijack with empty response' do - let(:body) {Async::HTTP::Body::Buffered.new([], 0)} + let(:body) {::Protocol::HTTP::Body::Buffered.new([], 0)} let(:app) do ::Protocol::HTTP::Middleware.for do |request| diff --git a/test/async/http/proxy.rb b/test/async/http/proxy.rb index 7b37ba3..0d493a3 100644 --- a/test/async/http/proxy.rb +++ b/test/async/http/proxy.rb @@ -42,6 +42,8 @@ stream.close_read stream.write(chunk) + stream.close_write + ensure stream.close end end