diff --git a/lib/async/http/body/finishable.rb b/lib/async/http/body/finishable.rb new file mode 100644 index 0000000..cfdf996 --- /dev/null +++ b/lib/async/http/body/finishable.rb @@ -0,0 +1,39 @@ +# frozen_string_literal: true + +# Released under the MIT License. +# Copyright, 2019-2023, by Samuel Williams. + +require 'protocol/http/body/wrapper' +require 'async/variable' + +module Async + module HTTP + module Body + class Finishable < ::Protocol::HTTP::Body::Wrapper + def initialize(body) + super(body) + + @closed = Async::Variable.new + @error = nil + end + + def close(error = nil) + unless @closed.resolved? + @error = error + @closed.value = true + end + + super + end + + def wait + @closed.wait + end + + def inspect + "#<#{self.class} closed=#{@closed} error=#{@error}> | #{super}" + end + end + end + end +end diff --git a/lib/async/http/client.rb b/lib/async/http/client.rb index bbaeb18..53468a7 100755 --- a/lib/async/http/client.rb +++ b/lib/async/http/client.rb @@ -188,6 +188,9 @@ def make_response(request, connection) # The connection won't be released until the body is completely read/released. ::Protocol::HTTP::Body::Completable.wrap(response) do + # TODO: We should probably wait until the request is fully consumed and/or the connection is ready before releasing it back into the pool. + + # Release the connection back into the pool: @pool.release(connection) end diff --git a/lib/async/http/protocol/http1/server.rb b/lib/async/http/protocol/http1/server.rb index 51d32ca..71c1c51 100644 --- a/lib/async/http/protocol/http1/server.rb +++ b/lib/async/http/protocol/http1/server.rb @@ -46,6 +46,11 @@ def each(task: Task.current) task.annotate("Reading #{self.version} requests for #{self.class}.") while request = next_request + if body = request.body + finishable = Body::Finishable.new(body) + request.body = finishable + end + response = yield(request, self) version = request.version body = response&.body @@ -102,23 +107,24 @@ def each(task: Task.current) head = request.head? # Same as above: - request = nil unless request.body + request = nil response = nil write_body(version, body, head, trailer) end end - # We are done with the body, you shouldn't need to call close on it: + # We are done with the body: body = nil else # If the request failed to generate a response, it was an internal server error: write_response(@version, 500, {}) write_body(version, nil) + + request&.finish end - # Gracefully finish reading the request body if it was not already done so. - request&.each{} + finishable&.wait # This ensures we yield at least once every iteration of the loop and allow other fibers to execute. task.yield diff --git a/test/async/http/protocol/streaming.rb b/test/async/http/protocol/streaming.rb new file mode 100644 index 0000000..1b85715 --- /dev/null +++ b/test/async/http/protocol/streaming.rb @@ -0,0 +1,142 @@ +# frozen_string_literal: true + +# Released under the MIT License. +# Copyright, 2024, by Samuel Williams. + +require "async/http/protocol/http" +require "protocol/http/body/streamable" +require "sus/fixtures/async/http" + +AnEchoServer = Sus::Shared("an echo server") do + let(:app) do + ::Protocol::HTTP::Middleware.for do |request| + output = ::Protocol::HTTP::Body::Writable.new + + Async do + stream = ::Protocol::HTTP::Body::Stream.new(request.body, output) + + Console.debug(self, "Echoing chunks...") + while chunk = stream.readpartial(1024) + Console.debug(self, "Reading chunk:", chunk: chunk) + stream.write(chunk) + end + rescue EOFError + Console.debug(self, "EOF.") + # Ignore. + ensure + Console.debug(self, "Closing stream.") + stream.close + end + + ::Protocol::HTTP::Response[200, {}, output] + end + end + + it "should echo the request body" do + chunks = ["Hello,", "World!"] + response_chunks = Queue.new + + output = ::Protocol::HTTP::Body::Writable.new + response = client.post("/", body: output) + stream = ::Protocol::HTTP::Body::Stream.new(response.body, output) + + begin + Console.debug(self, "Echoing chunks...") + chunks.each do |chunk| + Console.debug(self, "Writing chunk:", chunk: chunk) + stream.write(chunk) + end + + Console.debug(self, "Closing write.") + stream.close_write + + Console.debug(self, "Reading chunks...") + while chunk = stream.readpartial(1024) + Console.debug(self, "Reading chunk:", chunk: chunk) + response_chunks << chunk + end + rescue EOFError + Console.debug(self, "EOF.") + # Ignore. + ensure + Console.debug(self, "Closing stream.") + stream.close + response_chunks.close + end + + chunks.each do |chunk| + expect(response_chunks.pop).to be == chunk + end + end +end + +AnEchoClient = Sus::Shared("an echo client") do + let(:chunks) {["Hello,", "World!"]} + let(:response_chunks) {Queue.new} + + let(:app) do + ::Protocol::HTTP::Middleware.for do |request| + output = ::Protocol::HTTP::Body::Writable.new + + Async do + stream = ::Protocol::HTTP::Body::Stream.new(request.body, output) + + Console.debug(self, "Echoing chunks...") + chunks.each do |chunk| + stream.write(chunk) + end + + Console.debug(self, "Closing write.") + stream.close_write + + Console.debug(self, "Reading chunks...") + while chunk = stream.readpartial(1024) + Console.debug(self, "Reading chunk:", chunk: chunk) + response_chunks << chunk + end + rescue EOFError + Console.debug(self, "EOF.") + # Ignore. + ensure + Console.debug(self, "Closing stream.") + stream.close + end + + ::Protocol::HTTP::Response[200, {}, output] + end + end + + it "should echo the response body" do + output = ::Protocol::HTTP::Body::Writable.new + response = client.post("/", body: output) + stream = ::Protocol::HTTP::Body::Stream.new(response.body, output) + + begin + Console.debug(self, "Echoing chunks...") + while chunk = stream.readpartial(1024) + stream.write(chunk) + end + rescue EOFError + Console.debug(self, "EOF.") + # Ignore. + ensure + Console.debug(self, "Closing stream.") + stream.close + end + + chunks.each do |chunk| + expect(response_chunks.pop).to be == chunk + end + end +end + +[Async::HTTP::Protocol::HTTP1].each do |protocol| + describe protocol do + include Sus::Fixtures::Async::HTTP::ServerContext + + let(:protocol) {subject} + + it_behaves_like AnEchoServer + it_behaves_like AnEchoClient + end +end