diff --git a/cohttp-async/src/client.ml b/cohttp-async/src/client.ml index 8afe996af7..96d0f127fa 100644 --- a/cohttp-async/src/client.ml +++ b/cohttp-async/src/client.ml @@ -108,41 +108,62 @@ let request ?ssl_ctx ?uri ?(body=`Empty) req = raise e end +module Connection = struct + type t' = + { ic : Reader.t + ; oc : Writer.t } + + (* we can't send concurrent requests over HTTP/1 *) + type t = t' Sequencer.t + + let connect ?ssl_ctx uri = + Net.connect_uri ?ssl_ctx uri + >>| fun (ic, oc) -> + let t = + { ic ; oc } + |> Sequencer.create ~continue_on_error:false + in + Throttle.at_kill t (fun { ic ; oc } -> + Deferred.both (Writer.close oc) (Reader.close ic) + >>| fun ((), ()) -> ()); + (Deferred.any [ Writer.consumer_left oc ; Reader.close_finished ic ] + >>| fun () -> + Throttle.kill t) + |> don't_wait_for; + t + + let close t = + Throttle.kill t; + Throttle.cleaned t + + let is_closed t = + Throttle.is_dead t + + let request ?(body=Body.empty) t req = + let res = Ivar.create () in + Throttle.enqueue t (fun { ic ; oc } -> + Request.write (fun writer -> + Body_raw.write_body Request.write_body body writer) req oc + >>= fun () -> + read_response ic + >>= fun (resp, body) -> + Ivar.fill res (resp, `Pipe body); + (* block starting any more requests until the consumer has finished reading this request *) + Pipe.closed body) + |> don't_wait_for; + Ivar.read res +end + let callv ?ssl_ctx uri reqs = - let reqs_c = ref 0 in - let resp_c = ref 0 in - Net.connect_uri ?ssl_ctx uri >>= fun (ic, oc) -> - try_with (fun () -> - reqs - |> Pipe.iter ~f:(fun (req, body) -> - Int.incr reqs_c; - Request.write (fun w -> Body_raw.write_body Request.write_body body w) - req oc) - |> don't_wait_for; - let last_body_drained = ref Deferred.unit in - let responses = Reader.read_all ic (fun ic -> - !last_body_drained >>= fun () -> - if Pipe.is_closed reqs && (!resp_c >= !reqs_c) then - return `Eof - else - ic |> read_response >>| fun (resp, body) -> - Int.incr resp_c; - last_body_drained := Pipe.closed body; - `Ok (resp, `Pipe body) - ) in - don't_wait_for ( - Pipe.closed reqs >>= fun () -> - Pipe.closed responses >>= fun () -> - Writer.close oc - ); - return responses) - >>= begin function - | Ok x -> return x - | Error e -> - don't_wait_for (Reader.close ic); - don't_wait_for (Writer.close oc); - raise e - end + Connection.connect ?ssl_ctx uri + >>| fun connection -> + let responses = + Pipe.map' ~max_queue_length:1 reqs ~f:(fun reqs -> + Deferred.Queue.map reqs ~f:(fun (req, body) -> + Connection.request ~body connection req)) + in + (Pipe.closed responses >>= fun () -> Connection.close connection) |> don't_wait_for; + responses let call ?ssl_ctx ?headers ?(chunked=false) ?(body=`Empty) meth uri = (* Create a request, then make the request. Figure out an appropriate diff --git a/cohttp-async/src/client.mli b/cohttp-async/src/client.mli index d74fc253db..fd8ad68e81 100644 --- a/cohttp-async/src/client.mli +++ b/cohttp-async/src/client.mli @@ -28,6 +28,26 @@ val call : Uri.t -> (Cohttp.Response.t * Body.t) Async_kernel.Deferred.t + +module Connection : sig + type t + + val connect : + ?ssl_ctx:Conduit_async_ssl.context -> + Uri.t -> + t Async_kernel.Deferred.t + + val close : t -> unit Async_kernel.Deferred.t + + val is_closed : t -> bool + + val request : + ?body: Body.t -> + t -> + Cohttp.Request.t -> + (Cohttp.Response.t * Body.t) Async_kernel.Deferred.t +end + val callv : ?ssl_ctx:Conduit_async_ssl.context -> Uri.t ->