From 2f3180acad2a5b1362c02bc1f98f66e8382eafa7 Mon Sep 17 00:00:00 2001 From: Samuel Williams Date: Tue, 17 Sep 2024 10:08:24 +1200 Subject: [PATCH] Streaming tests. (#182) * Wait for input to be consumed before continuing. * Update dependencies. --- async-http.gemspec | 2 +- lib/async/http/body/finishable.rb | 39 +++++ lib/async/http/client.rb | 3 + lib/async/http/protocol/http1/server.rb | 16 +- .../http/middleware/location_redirector.rb | 2 + test/protocol/http/body/stream.rb | 142 ++++++++++++++++++ test/protocol/http/body/streamable.rb | 134 +++++++++++++++++ 7 files changed, 333 insertions(+), 5 deletions(-) create mode 100644 lib/async/http/body/finishable.rb create mode 100644 test/protocol/http/body/stream.rb create mode 100644 test/protocol/http/body/streamable.rb diff --git a/async-http.gemspec b/async-http.gemspec index e39e376..c316537 100644 --- a/async-http.gemspec +++ b/async-http.gemspec @@ -28,7 +28,7 @@ Gem::Specification.new do |spec| spec.add_dependency "async-pool", "~> 0.7" spec.add_dependency "io-endpoint", "~> 0.11" spec.add_dependency "io-stream", "~> 0.4" - spec.add_dependency "protocol-http", "~> 0.34" + spec.add_dependency "protocol-http", "~> 0.35" spec.add_dependency "protocol-http1", "~> 0.20" spec.add_dependency "protocol-http2", "~> 0.18" spec.add_dependency "traces", ">= 0.10" diff --git a/lib/async/http/body/finishable.rb b/lib/async/http/body/finishable.rb new file mode 100644 index 0000000..cfdf996 --- /dev/null +++ b/lib/async/http/body/finishable.rb @@ -0,0 +1,39 @@ +# frozen_string_literal: true + +# Released under the MIT License. +# Copyright, 2019-2023, by Samuel Williams. + +require 'protocol/http/body/wrapper' +require 'async/variable' + +module Async + module HTTP + module Body + class Finishable < ::Protocol::HTTP::Body::Wrapper + def initialize(body) + super(body) + + @closed = Async::Variable.new + @error = nil + end + + def close(error = nil) + unless @closed.resolved? + @error = error + @closed.value = true + end + + super + end + + def wait + @closed.wait + end + + def inspect + "#<#{self.class} closed=#{@closed} error=#{@error}> | #{super}" + end + end + end + end +end diff --git a/lib/async/http/client.rb b/lib/async/http/client.rb index bbaeb18..53468a7 100755 --- a/lib/async/http/client.rb +++ b/lib/async/http/client.rb @@ -188,6 +188,9 @@ def make_response(request, connection) # The connection won't be released until the body is completely read/released. ::Protocol::HTTP::Body::Completable.wrap(response) do + # TODO: We should probably wait until the request is fully consumed and/or the connection is ready before releasing it back into the pool. + + # Release the connection back into the pool: @pool.release(connection) end diff --git a/lib/async/http/protocol/http1/server.rb b/lib/async/http/protocol/http1/server.rb index 51d32ca..2451363 100644 --- a/lib/async/http/protocol/http1/server.rb +++ b/lib/async/http/protocol/http1/server.rb @@ -7,6 +7,8 @@ # Copyright, 2024, by Anton Zhuravsky. require_relative 'connection' +require_relative '../../body/finishable' + require 'console/event/failure' module Async @@ -46,6 +48,11 @@ def each(task: Task.current) task.annotate("Reading #{self.version} requests for #{self.class}.") while request = next_request + if body = request.body + finishable = Body::Finishable.new(body) + request.body = finishable + end + response = yield(request, self) version = request.version body = response&.body @@ -102,23 +109,24 @@ def each(task: Task.current) head = request.head? # Same as above: - request = nil unless request.body + request = nil response = nil write_body(version, body, head, trailer) end end - # We are done with the body, you shouldn't need to call close on it: + # We are done with the body: body = nil else # If the request failed to generate a response, it was an internal server error: write_response(@version, 500, {}) write_body(version, nil) + + request&.finish end - # Gracefully finish reading the request body if it was not already done so. - request&.each{} + finishable&.wait # This ensures we yield at least once every iteration of the loop and allow other fibers to execute. task.yield diff --git a/test/async/http/middleware/location_redirector.rb b/test/async/http/middleware/location_redirector.rb index 1efb8ee..9a950b9 100644 --- a/test/async/http/middleware/location_redirector.rb +++ b/test/async/http/middleware/location_redirector.rb @@ -18,6 +18,8 @@ with '301' do let(:app) do Protocol::HTTP::Middleware.for do |request| + request.finish # TODO: request.discard - or some default handling? + case request.path when '/home' Protocol::HTTP::Response[301, {'location' => '/'}, []] diff --git a/test/protocol/http/body/stream.rb b/test/protocol/http/body/stream.rb new file mode 100644 index 0000000..430a1ee --- /dev/null +++ b/test/protocol/http/body/stream.rb @@ -0,0 +1,142 @@ +# frozen_string_literal: true + +# Released under the MIT License. +# Copyright, 2024, by Samuel Williams. + +require "async/http/protocol/http" +require "protocol/http/body/streamable" +require "sus/fixtures/async/http" + +AnEchoServer = Sus::Shared("an echo server") do + let(:app) do + ::Protocol::HTTP::Middleware.for do |request| + output = ::Protocol::HTTP::Body::Writable.new + + Async do + stream = ::Protocol::HTTP::Body::Stream.new(request.body, output) + + Console.debug(self, "Echoing chunks...") + while chunk = stream.readpartial(1024) + Console.debug(self, "Reading chunk:", chunk: chunk) + stream.write(chunk) + end + rescue EOFError + Console.debug(self, "EOF.") + # Ignore. + ensure + Console.debug(self, "Closing stream.") + stream.close + end + + ::Protocol::HTTP::Response[200, {}, output] + end + end + + it "should echo the request body" do + chunks = ["Hello,", "World!"] + response_chunks = Queue.new + + output = ::Protocol::HTTP::Body::Writable.new + response = client.post("/", body: output) + stream = ::Protocol::HTTP::Body::Stream.new(response.body, output) + + begin + Console.debug(self, "Echoing chunks...") + chunks.each do |chunk| + Console.debug(self, "Writing chunk:", chunk: chunk) + stream.write(chunk) + end + + Console.debug(self, "Closing write.") + stream.close_write + + Console.debug(self, "Reading chunks...") + while chunk = stream.readpartial(1024) + Console.debug(self, "Reading chunk:", chunk: chunk) + response_chunks << chunk + end + rescue EOFError + Console.debug(self, "EOF.") + # Ignore. + ensure + Console.debug(self, "Closing stream.") + stream.close + response_chunks.close + end + + chunks.each do |chunk| + expect(response_chunks.pop).to be == chunk + end + end +end + +AnEchoClient = Sus::Shared("an echo client") do + let(:chunks) {["Hello,", "World!"]} + let(:response_chunks) {Queue.new} + + let(:app) do + ::Protocol::HTTP::Middleware.for do |request| + output = ::Protocol::HTTP::Body::Writable.new + + Async do + stream = ::Protocol::HTTP::Body::Stream.new(request.body, output) + + Console.debug(self, "Echoing chunks...") + chunks.each do |chunk| + stream.write(chunk) + end + + Console.debug(self, "Closing write.") + stream.close_write + + Console.debug(self, "Reading chunks...") + while chunk = stream.readpartial(1024) + Console.debug(self, "Reading chunk:", chunk: chunk) + response_chunks << chunk + end + rescue EOFError + Console.debug(self, "EOF.") + # Ignore. + ensure + Console.debug(self, "Closing stream.") + stream.close + end + + ::Protocol::HTTP::Response[200, {}, output] + end + end + + it "should echo the response body" do + output = ::Protocol::HTTP::Body::Writable.new + response = client.post("/", body: output) + stream = ::Protocol::HTTP::Body::Stream.new(response.body, output) + + begin + Console.debug(self, "Echoing chunks...") + while chunk = stream.readpartial(1024) + stream.write(chunk) + end + rescue EOFError + Console.debug(self, "EOF.") + # Ignore. + ensure + Console.debug(self, "Closing stream.") + stream.close + end + + chunks.each do |chunk| + expect(response_chunks.pop).to be == chunk + end + end +end + +[Async::HTTP::Protocol::HTTP1, Async::HTTP::Protocol::HTTP2].each do |protocol| + describe protocol, unique: protocol.name do + include Sus::Fixtures::Async::HTTP::ServerContext + + let(:protocol) {subject} + + it_behaves_like AnEchoServer + it_behaves_like AnEchoClient + end +end diff --git a/test/protocol/http/body/streamable.rb b/test/protocol/http/body/streamable.rb new file mode 100644 index 0000000..17f2cef --- /dev/null +++ b/test/protocol/http/body/streamable.rb @@ -0,0 +1,134 @@ +# frozen_string_literal: true + +# Released under the MIT License. +# Copyright, 2024, by Samuel Williams. + +require "async/http/protocol/http" +require "protocol/http/body/streamable" +require "sus/fixtures/async/http" + +AnEchoServer = Sus::Shared("an echo server") do + let(:app) do + ::Protocol::HTTP::Middleware.for do |request| + streamable = ::Protocol::HTTP::Body::Streamable.response(request) do |stream| + Console.debug(self, "Echoing chunks...") + while chunk = stream.readpartial(1024) + Console.debug(self, "Reading chunk:", chunk: chunk) + stream.write(chunk) + end + rescue EOFError + Console.debug(self, "EOF.") + # Ignore. + ensure + Console.debug(self, "Closing stream.") + stream.close + end + + ::Protocol::HTTP::Response[200, {}, streamable] + end + end + + it "should echo the request body" do + chunks = ["Hello,", "World!"] + response_chunks = Queue.new + + output = ::Protocol::HTTP::Body::Writable.new + response = client.post("/", body: output) + stream = ::Protocol::HTTP::Body::Stream.new(response.body, output) + + begin + Console.debug(self, "Echoing chunks...") + chunks.each do |chunk| + Console.debug(self, "Writing chunk:", chunk: chunk) + stream.write(chunk) + end + + Console.debug(self, "Closing write.") + stream.close_write + + Console.debug(self, "Reading chunks...") + while chunk = stream.readpartial(1024) + Console.debug(self, "Reading chunk:", chunk: chunk) + response_chunks << chunk + end + rescue EOFError + Console.debug(self, "EOF.") + # Ignore. + ensure + Console.debug(self, "Closing stream.") + stream.close + response_chunks.close + end + + chunks.each do |chunk| + expect(response_chunks.pop).to be == chunk + end + end +end + +AnEchoClient = Sus::Shared("an echo client") do + let(:chunks) {["Hello,", "World!"]} + let(:response_chunks) {Queue.new} + + let(:app) do + ::Protocol::HTTP::Middleware.for do |request| + streamable = ::Protocol::HTTP::Body::Streamable.response(request) do |stream| + Console.debug(self, "Echoing chunks...") + chunks.each do |chunk| + stream.write(chunk) + end + + Console.debug(self, "Closing write.") + stream.close_write + + Console.debug(self, "Reading chunks...") + while chunk = stream.readpartial(1024) + Console.debug(self, "Reading chunk:", chunk: chunk) + response_chunks << chunk + end + rescue EOFError + Console.debug(self, "EOF.") + # Ignore. + ensure + Console.debug(self, "Closing stream.") + stream.close + end + + ::Protocol::HTTP::Response[200, {}, streamable] + end + end + + it "should echo the response body" do + output = ::Protocol::HTTP::Body::Writable.new + response = client.post("/", body: output) + stream = ::Protocol::HTTP::Body::Stream.new(response.body, output) + + begin + Console.debug(self, "Echoing chunks...") + while chunk = stream.readpartial(1024) + stream.write(chunk) + end + rescue EOFError + Console.debug(self, "EOF.") + # Ignore. + ensure + Console.debug(self, "Closing stream.") + stream.close + end + + chunks.each do |chunk| + expect(response_chunks.pop).to be == chunk + end + end +end + +[Async::HTTP::Protocol::HTTP1, Async::HTTP::Protocol::HTTP2].each do |protocol| + describe protocol, unique: protocol.name do + include Sus::Fixtures::Async::HTTP::ServerContext + + let(:protocol) {subject} + + it_behaves_like AnEchoServer + it_behaves_like AnEchoClient + end +end