Skip to content

Commit

Permalink
Wait for input to be consumed before continuing.
Browse files Browse the repository at this point in the history
  • Loading branch information
ioquatix committed Sep 15, 2024
1 parent f750834 commit 97382e1
Show file tree
Hide file tree
Showing 4 changed files with 194 additions and 4 deletions.
39 changes: 39 additions & 0 deletions lib/async/http/body/finishable.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# frozen_string_literal: true

# Released under the MIT License.
# Copyright, 2019-2023, by Samuel Williams.

require 'protocol/http/body/wrapper'
require 'async/variable'

module Async
module HTTP
module Body
class Finishable < ::Protocol::HTTP::Body::Wrapper
def initialize(body)
super(body)

@closed = Async::Variable.new
@error = nil
end

def close(error = nil)
unless @closed.resolved?
@error = error
@closed.value = true
end

super
end

def wait
@closed.wait
end

def inspect
"#<#{self.class} closed=#{@closed} error=#{@error}> | #{super}"
end
end
end
end
end
3 changes: 3 additions & 0 deletions lib/async/http/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,9 @@ def make_response(request, connection)

# The connection won't be released until the body is completely read/released.
::Protocol::HTTP::Body::Completable.wrap(response) do
# TODO: We should probably wait until the request is fully consumed and/or the connection is ready before releasing it back into the pool.

# Release the connection back into the pool:
@pool.release(connection)
end

Expand Down
14 changes: 10 additions & 4 deletions lib/async/http/protocol/http1/server.rb
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@ def each(task: Task.current)
task.annotate("Reading #{self.version} requests for #{self.class}.")

while request = next_request
if body = request.body
finishable = Body::Finishable.new(body)
request.body = finishable
end

response = yield(request, self)
version = request.version
body = response&.body
Expand Down Expand Up @@ -102,23 +107,24 @@ def each(task: Task.current)
head = request.head?

# Same as above:
request = nil unless request.body
request = nil
response = nil

write_body(version, body, head, trailer)
end
end

# We are done with the body, you shouldn't need to call close on it:
# We are done with the body:
body = nil
else
# If the request failed to generate a response, it was an internal server error:
write_response(@version, 500, {})
write_body(version, nil)

request&.finish
end

# Gracefully finish reading the request body if it was not already done so.
request&.each{}
finishable&.wait

# This ensures we yield at least once every iteration of the loop and allow other fibers to execute.
task.yield
Expand Down
142 changes: 142 additions & 0 deletions test/async/http/protocol/streaming.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
# frozen_string_literal: true

# Released under the MIT License.
# Copyright, 2024, by Samuel Williams.

require "async/http/protocol/http"
require "protocol/http/body/streamable"
require "sus/fixtures/async/http"

AnEchoServer = Sus::Shared("an echo server") do
let(:app) do
::Protocol::HTTP::Middleware.for do |request|
output = ::Protocol::HTTP::Body::Writable.new

Async do
stream = ::Protocol::HTTP::Body::Stream.new(request.body, output)

Console.debug(self, "Echoing chunks...")
while chunk = stream.readpartial(1024)
Console.debug(self, "Reading chunk:", chunk: chunk)
stream.write(chunk)
end
rescue EOFError
Console.debug(self, "EOF.")
# Ignore.
ensure
Console.debug(self, "Closing stream.")
stream.close
end

::Protocol::HTTP::Response[200, {}, output]
end
end

it "should echo the request body" do
chunks = ["Hello,", "World!"]
response_chunks = Queue.new

output = ::Protocol::HTTP::Body::Writable.new
response = client.post("/", body: output)
stream = ::Protocol::HTTP::Body::Stream.new(response.body, output)

begin
Console.debug(self, "Echoing chunks...")
chunks.each do |chunk|
Console.debug(self, "Writing chunk:", chunk: chunk)
stream.write(chunk)
end

Console.debug(self, "Closing write.")
stream.close_write

Console.debug(self, "Reading chunks...")
while chunk = stream.readpartial(1024)
Console.debug(self, "Reading chunk:", chunk: chunk)
response_chunks << chunk
end
rescue EOFError
Console.debug(self, "EOF.")
# Ignore.
ensure
Console.debug(self, "Closing stream.")
stream.close
response_chunks.close
end

chunks.each do |chunk|
expect(response_chunks.pop).to be == chunk
end
end
end

AnEchoClient = Sus::Shared("an echo client") do
let(:chunks) {["Hello,", "World!"]}
let(:response_chunks) {Queue.new}

let(:app) do
::Protocol::HTTP::Middleware.for do |request|
output = ::Protocol::HTTP::Body::Writable.new

Async do
stream = ::Protocol::HTTP::Body::Stream.new(request.body, output)

Console.debug(self, "Echoing chunks...")
chunks.each do |chunk|
stream.write(chunk)
end

Console.debug(self, "Closing write.")
stream.close_write

Console.debug(self, "Reading chunks...")
while chunk = stream.readpartial(1024)
Console.debug(self, "Reading chunk:", chunk: chunk)
response_chunks << chunk
end
rescue EOFError
Console.debug(self, "EOF.")
# Ignore.
ensure
Console.debug(self, "Closing stream.")
stream.close
end

::Protocol::HTTP::Response[200, {}, output]
end
end

it "should echo the response body" do
output = ::Protocol::HTTP::Body::Writable.new
response = client.post("/", body: output)
stream = ::Protocol::HTTP::Body::Stream.new(response.body, output)

begin
Console.debug(self, "Echoing chunks...")
while chunk = stream.readpartial(1024)
stream.write(chunk)
end
rescue EOFError
Console.debug(self, "EOF.")
# Ignore.
ensure
Console.debug(self, "Closing stream.")
stream.close
end

chunks.each do |chunk|
expect(response_chunks.pop).to be == chunk
end
end
end

[Async::HTTP::Protocol::HTTP1].each do |protocol|
describe protocol do
include Sus::Fixtures::Async::HTTP::ServerContext

let(:protocol) {subject}

it_behaves_like AnEchoServer
it_behaves_like AnEchoClient
end
end

0 comments on commit 97382e1

Please sign in to comment.