diff --git a/src/Messages.jl b/src/Messages.jl index 25c188eea..6baa35835 100644 --- a/src/Messages.jl +++ b/src/Messages.jl @@ -177,6 +177,8 @@ Represents a HTTP Request Message. - `response`, the `Response` to this `Request` +- `txcount`, number of times this `Request` has been sent (see RetryRequest.jl). + - `parent`, the `Response` (if any) that led to this request (e.g. in the case of a redirect). [RFC7230 6.4](https://tools.ietf.org/html/rfc7231#section-6.4) @@ -188,6 +190,7 @@ mutable struct Request <: Message headers::Headers body::Vector{UInt8} response::Response + txcount::Int parent end @@ -201,6 +204,7 @@ function Request(method::String, target, headers=[], body=UInt8[]; mkheaders(headers), bytes(body), Response(0), + 0, parent) r.response.request = r return r diff --git a/src/RetryRequest.jl b/src/RetryRequest.jl index aec644c55..32416cf58 100644 --- a/src/RetryRequest.jl +++ b/src/RetryRequest.jl @@ -54,7 +54,7 @@ isrecoverable(e, req, retry_non_idempotent) = isrecoverable(e) && !(req.body === body_was_streamed) && !(req.response.body === body_was_streamed) && - (retry_non_idempotent || isidempotent(req)) + (retry_non_idempotent || req.txcount == 0 || isidempotent(req)) # "MUST NOT automatically retry a request with a non-idempotent method" # https://tools.ietf.org/html/rfc7230#section-6.3.1 diff --git a/src/StreamRequest.jl b/src/StreamRequest.jl index 28221e978..e8b7874e2 100644 --- a/src/StreamRequest.jl +++ b/src/StreamRequest.jl @@ -41,13 +41,19 @@ function request(::Type{StreamLayer}, io::IO, request::Request, body; end end + if !isidempotent(request) + # Wait for pipelined reads to complete + # before sending non-idempotent request body. + startread(io) + end + aborted = false try @sync begin + if iofunction == nothing @async writebody(http, request, body) - yield() startread(http) readbody(http, response, response_stream) else @@ -61,9 +67,10 @@ function request(::Type{StreamLayer}, io::IO, request::Request, body; end catch e - if aborted && - e isa CompositeException && - (ex = first(e.exceptions).ex; isioerror(ex)) + if e isa CompositeException + e = first(e.exceptions).ex + end + if aborted && isioerror(e) @debug 1 "⚠️ $(response.status) abort exception excpeted: $ex" else rethrow(e) @@ -88,6 +95,8 @@ function writebody(http::Stream, req::Request, body) write(http, req.body) end + req.txcount += 1 + if isidempotent(req) closewrite(http) else diff --git a/src/Streams.jl b/src/Streams.jl index 98b3dc666..529159cd0 100644 --- a/src/Streams.jl +++ b/src/Streams.jl @@ -139,7 +139,9 @@ IOExtras.isreadable(http::Stream) = isreadable(http.stream) function IOExtras.startread(http::Stream) - startread(http.stream) + if !isreadable(http.stream) + startread(http.stream) + end readheaders(http.stream, http.message) handle_continue(http) @@ -160,7 +162,6 @@ function handle_continue(http::Stream{Response}) @debug 1 "✅ Continue: $(http.stream)" readheaders(http.stream, http.message) end - end function handle_continue(http::Stream{Request})