Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 52 additions & 38 deletions cohttp-lwt/src/client.ml
Original file line number Diff line number Diff line change
Expand Up @@ -12,33 +12,25 @@ module Make

type ctx = Net.ctx

let read_response ~closefn ic _oc meth =
Response.read ic >>= begin function
| `Invalid reason ->
Lwt.fail (Failure ("Failed to read response: " ^ reason))
| `Eof -> Lwt.fail (Failure "Client connection was closed")
| `Ok res -> begin
let has_body = match meth with
| `HEAD -> `No
| _ -> Response.has_body res
in
match has_body with
| `Yes | `Unknown ->
let reader = Response.make_body_reader res ic in
let stream = Body.create_stream Response.read_body_chunk reader in
let closefn = closefn in
Lwt.on_success (Lwt_stream.closed stream) closefn;
let gcfn _st = closefn () in
Gc.finalise gcfn stream;
let body = Body.of_stream stream in
Lwt.return (res, body)
| `No -> closefn (); Lwt.return (res, `Empty)
end
end
|> fun t ->
Lwt.on_cancel t closefn;
Lwt.on_failure t (fun _exn -> closefn ());
t
let read_body ~closefn ic res =
match Response.has_body res with
| `Yes | `Unknown ->
let reader = Response.make_body_reader res ic in
let stream = Body.create_stream Response.read_body_chunk reader in
let body = Body.of_stream stream in
let closed = ref false in
(* Lwt.on_success registers a callback in the stream.
* The GC will still be able to collect stream. *)
Lwt.on_success (Lwt_stream.closed stream)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this mean that if we fail due to an exception or maybe a timeout that cancels the thread, this will automatically leak the stream?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's what I initially suspected because I thougt Lwt.on_success will hold a reference on stream. But this is not true. It is the other way round. stream holds a callback to the Lwt.on_success machinery. So the GC will be able to collect stream.

Copy link
Copy Markdown
Collaborator

@mseri mseri Oct 25, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I see, thanks. This explains why while testing I was not noticing any regression.
Can you add a comment about it on this line?

(fun () -> closed := true; closefn ());
(* finalise could run in a thread different from the lwt main thread.
* You may therefore not call into Lwt from a finaliser. *)
Gc.finalise_last
(fun () -> if not !closed then
prerr_endline "Cohttp_lwt: body not consumed - leaking stream!")
stream;
body
| `No -> closefn (); `Empty

let is_meth_chunked = function
| `HEAD -> false
Expand Down Expand Up @@ -70,7 +62,22 @@ module Make
Body.write_body (Request.write_body writer) buf) req oc
in
sent >>= fun () ->
read_response ~closefn ic oc meth
Response.read ic >>= begin function
| `Invalid reason ->
Lwt.fail (Failure ("Failed to read response: " ^ reason))
| `Eof ->
Lwt.fail (Failure "Server closed connection prematurely.")
| `Ok res ->
match meth with
| `HEAD ->
closefn () ;
Lwt.return (res, `Empty)
| _ ->
let body = read_body ~closefn ic res in
Lwt.return (res, body) end |> fun t ->
Lwt.on_cancel t closefn ;
Lwt.on_failure t (fun _exn -> closefn ()) ;
t

(* The HEAD should not have a response body *)
let head ?ctx ?headers uri =
Expand Down Expand Up @@ -106,18 +113,25 @@ module Make
response has consumed the body before continuing to the next
response because HTTP/1.1-pipelining cannot be interleaved. *)
let read_m = Lwt_mutex.create () in
let last_body = ref None in
let closefn () = Lwt_mutex.unlock read_m in
let resps = Lwt_stream.map_s (fun meth ->
begin match !last_body with
| None -> Lwt.return_unit
| Some body -> Body.drain_body body
end >>= fun () ->
Lwt_mutex.with_lock read_m (fun () -> read_response ~closefn ic oc meth)
>|= (fun ((_,body) as x) ->
last_body := Some body;
x
)
Lwt_mutex.with_lock read_m begin fun () ->
Response.read ic >>= begin function
| `Invalid reason ->
Lwt.fail (Failure ("Failed to read response: " ^ reason))
| `Eof ->
Lwt.fail (Failure "Server closed connection prematurely.")
| `Ok res ->
match meth with
| `HEAD ->
closefn () ;
Lwt.return (res, `Empty)
| _ ->
let body = read_body ~closefn ic res in
Lwt.return (res, body) end |> fun t ->
Lwt.on_cancel t closefn ;
Lwt.on_failure t (fun _exn -> closefn ()) ; t
end
) meth_stream in
Lwt.on_success (Lwt_stream.closed resps) (fun () -> Net.close ic oc);
Lwt.return resps
Expand Down