diff --git a/CHANGES b/CHANGES index 9e4c14fd2c..4b664bc065 100644 --- a/CHANGES +++ b/CHANGES @@ -1,4 +1,8 @@ -0.17.2 (trunk): +0.17.3 (trunk): +* Add Cohttp_async.Client.callv. Allows for making requests while reusing an + HTTP conncetion (#344) + +0.17.2 (2015-05-24): * Remove dependency on the Lwt Camlp4 syntax extension (#334). * Add `make github` target to push documentation to GitHub Pages (#338 from Jyotsna Prakash). diff --git a/async/cohttp_async.ml b/async/cohttp_async.ml index 50dedf5cfb..c2949098aa 100644 --- a/async/cohttp_async.ml +++ b/async/cohttp_async.ml @@ -81,7 +81,7 @@ let pipe_of_body read_chunk ic = | Done -> return (`Finished ()) ) in don't_wait_for (finished >>| fun () -> Pipe.close wr); - rd + (rd, finished) module Body = struct module B = Cohttp.Body @@ -156,25 +156,60 @@ end module Client = struct + let read_request ic = + Response.read ic >>| function + | `Eof -> failwith "Connection closed by remote host" + | `Invalid reason -> failwith reason + | `Ok res -> + (* Build a response pipe for the body *) + let reader = Response.make_body_reader res ic in + let (rd, finished_read) = + pipe_of_body (fun ic -> Response.read_body_chunk reader) ic in + (res, `Pipe rd, finished_read) + let request ?interrupt ?(body=`Empty) req = (* Connect to the remote side *) Net.connect_uri ?interrupt req.Request.uri >>= fun (ic,oc) -> Request.write (fun writer -> Body.write Request.write_body body writer) req oc >>= fun () -> - Response.read ic - >>| function - | `Eof -> failwith "Connection closed by remote host" - | `Invalid reason -> failwith reason - | `Ok res -> - (* Build a response pipe for the body *) - let reader = Response.make_body_reader res ic in - let rd = pipe_of_body (fun ic -> Response.read_body_chunk reader) ic in - don't_wait_for ( - Pipe.closed rd >>= fun () -> - Deferred.all_ignore [Reader.close ic; Writer.close oc] - ); - res, `Pipe rd + read_request ic >>| fun (resp, body, body_finished) -> + don't_wait_for ( + body_finished >>= fun () -> + Deferred.all_ignore [Reader.close ic; Writer.close oc]); + (resp, body) + + let callv ?interrupt uri reqs = + let reqs_c = ref 0 in + let resp_c = ref 0 in + Net.connect_uri ?interrupt uri >>| fun (ic, oc) -> + reqs + |> Pipe.iter ~f:(fun (req, body) -> + incr reqs_c; + Request.write (fun writer -> Body.write Request.write_body body writer) + req oc) + |> don't_wait_for; + let last_body = ref None in + let responses = Reader.read_all ic (fun ic -> + let last_body_drained = + match !last_body with + | None -> Deferred.unit + | Some b -> b in + last_body_drained >>= fun () -> + if Pipe.is_closed reqs && (!resp_c >= !reqs_c) + then return `Eof + else + ic |> read_request >>| fun (resp, body, body_finished) -> + incr resp_c; + last_body := Some body_finished; + `Ok (resp, body) + ) in + don't_wait_for ( + Pipe.closed reqs >>= fun () -> + Pipe.closed responses >>= fun () -> + Writer.close oc + ); + responses let call ?interrupt ?headers ?(chunked=false) ?(body=`Empty) meth uri = (* Create a request, then make the request. @@ -242,7 +277,8 @@ module Server = struct | `No | `Unknown -> `Empty | `Yes -> (* Create a Pipe for the body *) let reader = Request.make_body_reader req rd in - `Pipe (pipe_of_body (fun ic -> Request.read_body_chunk reader) rd) + let (p, _) = pipe_of_body (fun ic -> Request.read_body_chunk reader) rd in + `Pipe p let handle_client handle_request sock rd wr = let last_body_pipe_drained = ref (Ivar.create ()) in diff --git a/async/cohttp_async.mli b/async/cohttp_async.mli index 7f29c265c2..463dedb537 100644 --- a/async/cohttp_async.mli +++ b/async/cohttp_async.mli @@ -71,6 +71,12 @@ module Client : sig Uri.t -> (Response.t * Body.t) Deferred.t + val callv : + ?interrupt:unit Deferred.t -> + Uri.t -> + (Request.t * Body.t) Pipe.Reader.t -> + (Response.t * Body.t) Pipe.Reader.t Deferred.t + (** Send an HTTP GET request *) val get : ?interrupt:unit Deferred.t -> diff --git a/lib_test/test_async_integration.ml b/lib_test/test_async_integration.ml index 5fa0963ab6..09b7345eb8 100644 --- a/lib_test/test_async_integration.ml +++ b/lib_test/test_async_integration.ml @@ -9,9 +9,13 @@ let chunk_body = ["one"; ""; " "; "bar"; ""] let large_string = String.make (Int.pow 2 16) 'A' +let response_bodies = [ "Testing" + ; "Foo bar" ] + +let ok s = Server.respond `OK ~body:(Body.of_string s) + let server = - response_sequence [ - (* empty_chunk *) + [ (* empty_chunk *) const @@ Server.respond `OK ~body:(Body.of_string_list chunk_body); (* large response *) const @@ Server.respond_with_string large_string; @@ -19,7 +23,10 @@ let server = (fun _ body -> body |> Body.to_string >>| String.length >>= fun len -> Server.respond_with_string (Int.to_string len)) - ] + ] @ (* pipelined_chunk *) + (response_bodies |> List.map ~f:(Fn.compose const ok)) + |> response_sequence + let ts = test_server_s server begin fun uri -> @@ -37,9 +44,28 @@ let ts = >>= fun (_, body) -> body |> Body.to_string >>| fun s -> assert_equal (String.length large_string) (Int.of_string s) in + let pipelined_chunk () = + let printer x = x in + let reqs = [ + Request.make ~meth:`POST uri, (Body.of_string "foo"); + Request.make ~meth:`POST uri, (Body.of_string "bar"); + ] in + let body_q = response_bodies |> Queue.of_list in + reqs + |> Pipe.of_list + |> Client.callv uri >>= fun responses -> responses + |> Pipe.to_list + >>= fun resps -> resps + |> Deferred.List.iter ~f:(fun (resp, body) -> + let expected_body = body_q |> Queue.dequeue_exn in + body |> Body.to_string >>| fun body -> + assert_equal ~printer expected_body body + ) + in [ "empty chunk test", empty_chunk ; "large response", large_response ; "large request", large_request + ; "pipelined chunk test", pipelined_chunk ] end