From 62fba89c857fc4ff3783bb72dff9c70e92afd6e1 Mon Sep 17 00:00:00 2001 From: Christopher Zimmermann Date: Sun, 24 May 2020 11:25:10 +0200 Subject: [PATCH 1/8] partly inline Client.read_response this means read_response becomes read_body. It will allow callv to try reading a response header without access to the request method. --- cohttp-lwt/src/client.ml | 68 +++++++++++++++++++++++----------------- 1 file changed, 40 insertions(+), 28 deletions(-) diff --git a/cohttp-lwt/src/client.ml b/cohttp-lwt/src/client.ml index 982626f814..a5ff790edf 100644 --- a/cohttp-lwt/src/client.ml +++ b/cohttp-lwt/src/client.ml @@ -12,28 +12,18 @@ module Make type ctx = Net.ctx - let read_response ~closefn ic _oc meth = - Response.read ic >>= begin function - | `Invalid reason -> - Lwt.fail (Failure ("Failed to read response: " ^ reason)) - | `Eof -> Lwt.fail (Failure "Client connection was closed") - | `Ok res -> begin - let has_body = match meth with - | `HEAD -> `No - | _ -> Response.has_body res - in - match has_body with - | `Yes | `Unknown -> - let reader = Response.make_body_reader res ic in - let stream = Body.create_stream Response.read_body_chunk reader in - let closefn = closefn in - Lwt.on_success (Lwt_stream.closed stream) closefn; - let gcfn _st = closefn () in - Gc.finalise gcfn stream; - let body = Body.of_stream stream in - Lwt.return (res, body) - | `No -> closefn (); Lwt.return (res, `Empty) - end + let read_body ~closefn ic res = + begin + match Response.has_body res with + | `Yes | `Unknown -> + let reader = Response.make_body_reader res ic in + let stream = Body.create_stream Response.read_body_chunk reader in + Lwt.on_success (Lwt_stream.closed stream) closefn; + let gcfn st = closefn () in + Gc.finalise gcfn stream; + let body = Body.of_stream stream in + Lwt.return body + | `No -> closefn (); Lwt.return `Empty end |> fun t -> Lwt.on_cancel t closefn; @@ -70,7 +60,18 @@ module Make Body.write_body (Request.write_body writer) buf) req oc in sent >>= fun () -> - read_response ~closefn ic oc meth + Response.read ic >>= function + | `Invalid reason -> + Lwt.fail (Failure ("Failed to read response: " ^ reason)) + | `Eof -> + Lwt.fail (Failure "Server closed connection prematurely.") + | `Ok res -> + match meth with + | `HEAD -> + Lwt.return (res, `Empty) + | _ -> + read_body ~closefn ic res >>= fun body -> + Lwt.return (res, body) (* The HEAD should not have a response body *) let head ?ctx ?headers uri = @@ -113,11 +114,22 @@ module Make | None -> Lwt.return_unit | Some body -> Body.drain_body body end >>= fun () -> - Lwt_mutex.with_lock read_m (fun () -> read_response ~closefn ic oc meth) - >|= (fun ((_,body) as x) -> - last_body := Some body; - x - ) + Lwt_mutex.with_lock read_m begin fun () -> + Response.read ic >>= function + | `Invalid reason -> + Lwt.fail (Failure ("Failed to read response: " ^ reason)) + | `Eof -> + Lwt.fail (Failure "Server closed connection prematurely.") + | `Ok res -> + match meth with + | `HEAD -> + last_body := None; + Lwt.return (res, `Empty) + | _ -> + read_body ~closefn ic res >>= fun body -> + last_body := Some body; + Lwt.return (res, body) + end ) meth_stream in Lwt.on_success (Lwt_stream.closed resps) (fun () -> Net.close ic oc); Lwt.return resps From 79d42875441972d65abd604e811156ce94a383be Mon Sep 17 00:00:00 2001 From: Christopher Zimmermann Date: Wed, 3 Jun 2020 18:41:59 +0200 Subject: [PATCH 2/8] read_body does not need to wrap result in Lwt.t --- cohttp-lwt/src/client.ml | 30 ++++++++++++------------------ 1 file changed, 12 insertions(+), 18 deletions(-) diff --git a/cohttp-lwt/src/client.ml b/cohttp-lwt/src/client.ml index a5ff790edf..3a6bf203e5 100644 --- a/cohttp-lwt/src/client.ml +++ b/cohttp-lwt/src/client.ml @@ -13,22 +13,16 @@ module Make type ctx = Net.ctx let read_body ~closefn ic res = - begin - match Response.has_body res with - | `Yes | `Unknown -> - let reader = Response.make_body_reader res ic in - let stream = Body.create_stream Response.read_body_chunk reader in - Lwt.on_success (Lwt_stream.closed stream) closefn; - let gcfn st = closefn () in - Gc.finalise gcfn stream; - let body = Body.of_stream stream in - Lwt.return body - | `No -> closefn (); Lwt.return `Empty - end - |> fun t -> - Lwt.on_cancel t closefn; - Lwt.on_failure t (fun _exn -> closefn ()); - t + match Response.has_body res with + | `Yes | `Unknown -> + let reader = Response.make_body_reader res ic in + let stream = Body.create_stream Response.read_body_chunk reader in + Lwt.on_success (Lwt_stream.closed stream) closefn; + let gcfn st = closefn () in + Gc.finalise gcfn stream; + let body = Body.of_stream stream in + body + | `No -> closefn (); `Empty let is_meth_chunked = function | `HEAD -> false @@ -70,7 +64,7 @@ module Make | `HEAD -> Lwt.return (res, `Empty) | _ -> - read_body ~closefn ic res >>= fun body -> + let body = read_body ~closefn ic res in Lwt.return (res, body) (* The HEAD should not have a response body *) @@ -126,7 +120,7 @@ module Make last_body := None; Lwt.return (res, `Empty) | _ -> - read_body ~closefn ic res >>= fun body -> + let body = read_body ~closefn ic res in last_body := Some body; Lwt.return (res, body) end From 1329d4de49120dbfd105441b7a22daa66e49620a Mon Sep 17 00:00:00 2001 From: Christopher Zimmermann Date: Fri, 5 Jun 2020 07:50:40 +0200 Subject: [PATCH 3/8] untangle finalising maze in read_body This commit addresses two problems: - for closed streams closefn will be called twice via the two finalisers - In the callv case, all bodys need to be drained In the finaliser the body is drained, and thereby the stream closed, so that closefn will be called once via Lwt.on_success (closed stream). --- cohttp-lwt/src/client.ml | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/cohttp-lwt/src/client.ml b/cohttp-lwt/src/client.ml index 3a6bf203e5..89607c229e 100644 --- a/cohttp-lwt/src/client.ml +++ b/cohttp-lwt/src/client.ml @@ -17,10 +17,11 @@ module Make | `Yes | `Unknown -> let reader = Response.make_body_reader res ic in let stream = Body.create_stream Response.read_body_chunk reader in - Lwt.on_success (Lwt_stream.closed stream) closefn; - let gcfn st = closefn () in - Gc.finalise gcfn stream; let body = Body.of_stream stream in + Lwt.on_success (Lwt_stream.closed stream) closefn; + Gc.finalise + (fun body -> Lwt.async (fun () -> Body.drain_body body)) + stream; body | `No -> closefn (); `Empty From a3c8a35a2736f48379d24868a808fd678cd11608 Mon Sep 17 00:00:00 2001 From: Christopher Zimmermann Date: Fri, 5 Jun 2020 10:38:17 +0200 Subject: [PATCH 4/8] no need to drain the body in callv anymore --- cohttp-lwt/src/client.ml | 7 ------- 1 file changed, 7 deletions(-) diff --git a/cohttp-lwt/src/client.ml b/cohttp-lwt/src/client.ml index 89607c229e..411ca64573 100644 --- a/cohttp-lwt/src/client.ml +++ b/cohttp-lwt/src/client.ml @@ -102,13 +102,8 @@ module Make response has consumed the body before continuing to the next response because HTTP/1.1-pipelining cannot be interleaved. *) let read_m = Lwt_mutex.create () in - let last_body = ref None in let closefn () = Lwt_mutex.unlock read_m in let resps = Lwt_stream.map_s (fun meth -> - begin match !last_body with - | None -> Lwt.return_unit - | Some body -> Body.drain_body body - end >>= fun () -> Lwt_mutex.with_lock read_m begin fun () -> Response.read ic >>= function | `Invalid reason -> @@ -118,11 +113,9 @@ module Make | `Ok res -> match meth with | `HEAD -> - last_body := None; Lwt.return (res, `Empty) | _ -> let body = read_body ~closefn ic res in - last_body := Some body; Lwt.return (res, body) end ) meth_stream in From a218e37c941a6d438e9a3a0fe568b35cde4ca1b3 Mon Sep 17 00:00:00 2001 From: Christopher Zimmermann Date: Fri, 5 Jun 2020 23:54:21 +0200 Subject: [PATCH 5/8] Don't use Gc.finalise for closing fd or consuming stream It may run from any thread, which forbids calling into Lwt from it. The best alternative I can currently think of is just issueing a warning in case the stream is not closed yet when stream is collected. Not even callin Log.warn is permitted, since it also uses Lwt. --- cohttp-lwt/src/client.ml | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/cohttp-lwt/src/client.ml b/cohttp-lwt/src/client.ml index 411ca64573..55caadd99d 100644 --- a/cohttp-lwt/src/client.ml +++ b/cohttp-lwt/src/client.ml @@ -18,9 +18,14 @@ module Make let reader = Response.make_body_reader res ic in let stream = Body.create_stream Response.read_body_chunk reader in let body = Body.of_stream stream in - Lwt.on_success (Lwt_stream.closed stream) closefn; - Gc.finalise - (fun body -> Lwt.async (fun () -> Body.drain_body body)) + let closed = ref false in + Lwt.on_success (Lwt_stream.closed stream) + (fun () -> closed := true; closefn ()); + (* finalise could run in a thread different from the lwt main thread. + * You may therefore not call into Lwt from a finaliser. *) + Gc.finalise_last + (fun () -> if not !closed then + prerr_endline "Cohttp_lwt: body not consumed - leaking stream!") stream; body | `No -> closefn (); `Empty From feb2775ca7cc448db327a7e0881b4ea64b6e3857 Mon Sep 17 00:00:00 2001 From: Christopher Zimmermann Date: Sun, 25 Oct 2020 18:14:23 +0100 Subject: [PATCH 6/8] add comment about Lwt.on_success an GC --- cohttp-lwt/src/client.ml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/cohttp-lwt/src/client.ml b/cohttp-lwt/src/client.ml index 55caadd99d..74222152ef 100644 --- a/cohttp-lwt/src/client.ml +++ b/cohttp-lwt/src/client.ml @@ -19,6 +19,8 @@ module Make let stream = Body.create_stream Response.read_body_chunk reader in let body = Body.of_stream stream in let closed = ref false in + (* Lwt.on_success registers a callback in the stream. + * The GC will still be able to collect stream. *) Lwt.on_success (Lwt_stream.closed stream) (fun () -> closed := true; closefn ()); (* finalise could run in a thread different from the lwt main thread. From 384fa05459dec7efd52b0d181392519b5299cf7b Mon Sep 17 00:00:00 2001 From: Calascibetta Romain Date: Tue, 27 Oct 2020 16:11:23 +0100 Subject: [PATCH 7/8] Add closefn where it is needed --- cohttp-lwt/src/client.ml | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/cohttp-lwt/src/client.ml b/cohttp-lwt/src/client.ml index 74222152ef..349618672d 100644 --- a/cohttp-lwt/src/client.ml +++ b/cohttp-lwt/src/client.ml @@ -64,12 +64,15 @@ module Make sent >>= fun () -> Response.read ic >>= function | `Invalid reason -> + closefn () ; Lwt.fail (Failure ("Failed to read response: " ^ reason)) | `Eof -> + closefn () ; Lwt.fail (Failure "Server closed connection prematurely.") | `Ok res -> match meth with | `HEAD -> + closefn () ; Lwt.return (res, `Empty) | _ -> let body = read_body ~closefn ic res in @@ -114,12 +117,15 @@ module Make Lwt_mutex.with_lock read_m begin fun () -> Response.read ic >>= function | `Invalid reason -> + closefn () ; Lwt.fail (Failure ("Failed to read response: " ^ reason)) | `Eof -> + closefn () ; Lwt.fail (Failure "Server closed connection prematurely.") | `Ok res -> match meth with | `HEAD -> + closefn () ; Lwt.return (res, `Empty) | _ -> let body = read_body ~closefn ic res in From 68c74dbe3a434a97509108ad13ebc07f7e16be9e Mon Sep 17 00:00:00 2001 From: Calascibetta Romain Date: Tue, 27 Oct 2020 16:32:06 +0100 Subject: [PATCH 8/8] Re-use Lwt.on_cancel/on_failure on our flow to properly close the connection --- cohttp-lwt/src/client.ml | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/cohttp-lwt/src/client.ml b/cohttp-lwt/src/client.ml index 349618672d..8f6770f8e2 100644 --- a/cohttp-lwt/src/client.ml +++ b/cohttp-lwt/src/client.ml @@ -62,12 +62,10 @@ module Make Body.write_body (Request.write_body writer) buf) req oc in sent >>= fun () -> - Response.read ic >>= function + Response.read ic >>= begin function | `Invalid reason -> - closefn () ; Lwt.fail (Failure ("Failed to read response: " ^ reason)) | `Eof -> - closefn () ; Lwt.fail (Failure "Server closed connection prematurely.") | `Ok res -> match meth with @@ -76,7 +74,10 @@ module Make Lwt.return (res, `Empty) | _ -> let body = read_body ~closefn ic res in - Lwt.return (res, body) + Lwt.return (res, body) end |> fun t -> + Lwt.on_cancel t closefn ; + Lwt.on_failure t (fun _exn -> closefn ()) ; + t (* The HEAD should not have a response body *) let head ?ctx ?headers uri = @@ -115,12 +116,10 @@ module Make let closefn () = Lwt_mutex.unlock read_m in let resps = Lwt_stream.map_s (fun meth -> Lwt_mutex.with_lock read_m begin fun () -> - Response.read ic >>= function + Response.read ic >>= begin function | `Invalid reason -> - closefn () ; Lwt.fail (Failure ("Failed to read response: " ^ reason)) | `Eof -> - closefn () ; Lwt.fail (Failure "Server closed connection prematurely.") | `Ok res -> match meth with @@ -129,7 +128,9 @@ module Make Lwt.return (res, `Empty) | _ -> let body = read_body ~closefn ic res in - Lwt.return (res, body) + Lwt.return (res, body) end |> fun t -> + Lwt.on_cancel t closefn ; + Lwt.on_failure t (fun _exn -> closefn ()) ; t end ) meth_stream in Lwt.on_success (Lwt_stream.closed resps) (fun () -> Net.close ic oc);