Skip to content

Commit

Permalink
Streaming tests. (#182)
Browse files Browse the repository at this point in the history
* Wait for input to be consumed before continuing.

* Update dependencies.
  • Loading branch information
ioquatix authored Sep 16, 2024
1 parent 1c4b5ab commit 2f3180a
Show file tree
Hide file tree
Showing 7 changed files with 333 additions and 5 deletions.
2 changes: 1 addition & 1 deletion async-http.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ Gem::Specification.new do |spec|
spec.add_dependency "async-pool", "~> 0.7"
spec.add_dependency "io-endpoint", "~> 0.11"
spec.add_dependency "io-stream", "~> 0.4"
spec.add_dependency "protocol-http", "~> 0.34"
spec.add_dependency "protocol-http", "~> 0.35"
spec.add_dependency "protocol-http1", "~> 0.20"
spec.add_dependency "protocol-http2", "~> 0.18"
spec.add_dependency "traces", ">= 0.10"
Expand Down
39 changes: 39 additions & 0 deletions lib/async/http/body/finishable.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# frozen_string_literal: true

# Released under the MIT License.
# Copyright, 2019-2023, by Samuel Williams.

require 'protocol/http/body/wrapper'
require 'async/variable'

module Async
module HTTP
module Body
class Finishable < ::Protocol::HTTP::Body::Wrapper
def initialize(body)
super(body)

@closed = Async::Variable.new
@error = nil
end

def close(error = nil)
unless @closed.resolved?
@error = error
@closed.value = true
end

super
end

def wait
@closed.wait
end

def inspect
"#<#{self.class} closed=#{@closed} error=#{@error}> | #{super}"
end
end
end
end
end
3 changes: 3 additions & 0 deletions lib/async/http/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,9 @@ def make_response(request, connection)

# The connection won't be released until the body is completely read/released.
::Protocol::HTTP::Body::Completable.wrap(response) do
# TODO: We should probably wait until the request is fully consumed and/or the connection is ready before releasing it back into the pool.

# Release the connection back into the pool:
@pool.release(connection)
end

Expand Down
16 changes: 12 additions & 4 deletions lib/async/http/protocol/http1/server.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
# Copyright, 2024, by Anton Zhuravsky.

require_relative 'connection'
require_relative '../../body/finishable'

require 'console/event/failure'

module Async
Expand Down Expand Up @@ -46,6 +48,11 @@ def each(task: Task.current)
task.annotate("Reading #{self.version} requests for #{self.class}.")

while request = next_request
if body = request.body
finishable = Body::Finishable.new(body)
request.body = finishable
end

response = yield(request, self)
version = request.version
body = response&.body
Expand Down Expand Up @@ -102,23 +109,24 @@ def each(task: Task.current)
head = request.head?

# Same as above:
request = nil unless request.body
request = nil
response = nil

write_body(version, body, head, trailer)
end
end

# We are done with the body, you shouldn't need to call close on it:
# We are done with the body:
body = nil
else
# If the request failed to generate a response, it was an internal server error:
write_response(@version, 500, {})
write_body(version, nil)

request&.finish
end

# Gracefully finish reading the request body if it was not already done so.
request&.each{}
finishable&.wait

# This ensures we yield at least once every iteration of the loop and allow other fibers to execute.
task.yield
Expand Down
2 changes: 2 additions & 0 deletions test/async/http/middleware/location_redirector.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
with '301' do
let(:app) do
Protocol::HTTP::Middleware.for do |request|
request.finish # TODO: request.discard - or some default handling?

case request.path
when '/home'
Protocol::HTTP::Response[301, {'location' => '/'}, []]
Expand Down
142 changes: 142 additions & 0 deletions test/protocol/http/body/stream.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
# frozen_string_literal: true

# Released under the MIT License.
# Copyright, 2024, by Samuel Williams.

require "async/http/protocol/http"
require "protocol/http/body/streamable"
require "sus/fixtures/async/http"

AnEchoServer = Sus::Shared("an echo server") do
let(:app) do
::Protocol::HTTP::Middleware.for do |request|
output = ::Protocol::HTTP::Body::Writable.new

Async do
stream = ::Protocol::HTTP::Body::Stream.new(request.body, output)

Console.debug(self, "Echoing chunks...")
while chunk = stream.readpartial(1024)
Console.debug(self, "Reading chunk:", chunk: chunk)
stream.write(chunk)
end
rescue EOFError
Console.debug(self, "EOF.")
# Ignore.
ensure
Console.debug(self, "Closing stream.")
stream.close
end

::Protocol::HTTP::Response[200, {}, output]
end
end

it "should echo the request body" do
chunks = ["Hello,", "World!"]
response_chunks = Queue.new

output = ::Protocol::HTTP::Body::Writable.new
response = client.post("/", body: output)
stream = ::Protocol::HTTP::Body::Stream.new(response.body, output)

begin
Console.debug(self, "Echoing chunks...")
chunks.each do |chunk|
Console.debug(self, "Writing chunk:", chunk: chunk)
stream.write(chunk)
end

Console.debug(self, "Closing write.")
stream.close_write

Console.debug(self, "Reading chunks...")
while chunk = stream.readpartial(1024)
Console.debug(self, "Reading chunk:", chunk: chunk)
response_chunks << chunk
end
rescue EOFError
Console.debug(self, "EOF.")
# Ignore.
ensure
Console.debug(self, "Closing stream.")
stream.close
response_chunks.close
end

chunks.each do |chunk|
expect(response_chunks.pop).to be == chunk
end
end
end

AnEchoClient = Sus::Shared("an echo client") do
let(:chunks) {["Hello,", "World!"]}
let(:response_chunks) {Queue.new}

let(:app) do
::Protocol::HTTP::Middleware.for do |request|
output = ::Protocol::HTTP::Body::Writable.new

Async do
stream = ::Protocol::HTTP::Body::Stream.new(request.body, output)

Console.debug(self, "Echoing chunks...")
chunks.each do |chunk|
stream.write(chunk)
end

Console.debug(self, "Closing write.")
stream.close_write

Console.debug(self, "Reading chunks...")
while chunk = stream.readpartial(1024)
Console.debug(self, "Reading chunk:", chunk: chunk)
response_chunks << chunk
end
rescue EOFError
Console.debug(self, "EOF.")
# Ignore.
ensure
Console.debug(self, "Closing stream.")
stream.close
end

::Protocol::HTTP::Response[200, {}, output]
end
end

it "should echo the response body" do
output = ::Protocol::HTTP::Body::Writable.new
response = client.post("/", body: output)
stream = ::Protocol::HTTP::Body::Stream.new(response.body, output)

begin
Console.debug(self, "Echoing chunks...")
while chunk = stream.readpartial(1024)
stream.write(chunk)
end
rescue EOFError
Console.debug(self, "EOF.")
# Ignore.
ensure
Console.debug(self, "Closing stream.")
stream.close
end

chunks.each do |chunk|
expect(response_chunks.pop).to be == chunk
end
end
end

[Async::HTTP::Protocol::HTTP1, Async::HTTP::Protocol::HTTP2].each do |protocol|
describe protocol, unique: protocol.name do
include Sus::Fixtures::Async::HTTP::ServerContext

let(:protocol) {subject}

it_behaves_like AnEchoServer
it_behaves_like AnEchoClient
end
end
134 changes: 134 additions & 0 deletions test/protocol/http/body/streamable.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
# frozen_string_literal: true

# Released under the MIT License.
# Copyright, 2024, by Samuel Williams.

require "async/http/protocol/http"
require "protocol/http/body/streamable"
require "sus/fixtures/async/http"

AnEchoServer = Sus::Shared("an echo server") do
let(:app) do
::Protocol::HTTP::Middleware.for do |request|
streamable = ::Protocol::HTTP::Body::Streamable.response(request) do |stream|
Console.debug(self, "Echoing chunks...")
while chunk = stream.readpartial(1024)
Console.debug(self, "Reading chunk:", chunk: chunk)
stream.write(chunk)
end
rescue EOFError
Console.debug(self, "EOF.")
# Ignore.
ensure
Console.debug(self, "Closing stream.")
stream.close
end

::Protocol::HTTP::Response[200, {}, streamable]
end
end

it "should echo the request body" do
chunks = ["Hello,", "World!"]
response_chunks = Queue.new

output = ::Protocol::HTTP::Body::Writable.new
response = client.post("/", body: output)
stream = ::Protocol::HTTP::Body::Stream.new(response.body, output)

begin
Console.debug(self, "Echoing chunks...")
chunks.each do |chunk|
Console.debug(self, "Writing chunk:", chunk: chunk)
stream.write(chunk)
end

Console.debug(self, "Closing write.")
stream.close_write

Console.debug(self, "Reading chunks...")
while chunk = stream.readpartial(1024)
Console.debug(self, "Reading chunk:", chunk: chunk)
response_chunks << chunk
end
rescue EOFError
Console.debug(self, "EOF.")
# Ignore.
ensure
Console.debug(self, "Closing stream.")
stream.close
response_chunks.close
end

chunks.each do |chunk|
expect(response_chunks.pop).to be == chunk
end
end
end

AnEchoClient = Sus::Shared("an echo client") do
let(:chunks) {["Hello,", "World!"]}
let(:response_chunks) {Queue.new}

let(:app) do
::Protocol::HTTP::Middleware.for do |request|
streamable = ::Protocol::HTTP::Body::Streamable.response(request) do |stream|
Console.debug(self, "Echoing chunks...")
chunks.each do |chunk|
stream.write(chunk)
end

Console.debug(self, "Closing write.")
stream.close_write

Console.debug(self, "Reading chunks...")
while chunk = stream.readpartial(1024)
Console.debug(self, "Reading chunk:", chunk: chunk)
response_chunks << chunk
end
rescue EOFError
Console.debug(self, "EOF.")
# Ignore.
ensure
Console.debug(self, "Closing stream.")
stream.close
end

::Protocol::HTTP::Response[200, {}, streamable]
end
end

it "should echo the response body" do
output = ::Protocol::HTTP::Body::Writable.new
response = client.post("/", body: output)
stream = ::Protocol::HTTP::Body::Stream.new(response.body, output)

begin
Console.debug(self, "Echoing chunks...")
while chunk = stream.readpartial(1024)
stream.write(chunk)
end
rescue EOFError
Console.debug(self, "EOF.")
# Ignore.
ensure
Console.debug(self, "Closing stream.")
stream.close
end

chunks.each do |chunk|
expect(response_chunks.pop).to be == chunk
end
end
end

[Async::HTTP::Protocol::HTTP1, Async::HTTP::Protocol::HTTP2].each do |protocol|
describe protocol, unique: protocol.name do
include Sus::Fixtures::Async::HTTP::ServerContext

let(:protocol) {subject}

it_behaves_like AnEchoServer
it_behaves_like AnEchoClient
end
end

0 comments on commit 2f3180a

Please sign in to comment.