diff --git a/lib_test/test_sanity.ml b/lib_test/test_sanity.ml index 1a3c7861de..726c6dd048 100644 --- a/lib_test/test_sanity.ml +++ b/lib_test/test_sanity.ml @@ -20,7 +20,11 @@ let server = Server.respond ~status:`OK ~body:(Body.of_string_list chunk_body) (); Server.respond ~status:`OK ~body:(Body.of_string "") (); (* not modified *) - Server.respond ~status:`Not_modified ~body:Body.empty () + Server.respond ~status:`Not_modified ~body:Body.empty (); + (* pipelined_interleave *) + Server.respond_string ~status:`OK ~body:"one" (); + Server.respond_string ~status:`OK ~body:"two" (); + Server.respond_string ~status:`OK ~body:"three" (); ] |> List.map const |> response_sequence @@ -63,10 +67,37 @@ let ts = Transfer.Unknown (Header.get_transfer_encoding headers); body |> Body.is_empty >|= fun is_empty -> assert_bool "No body returned when not modified" is_empty in + let pipelined_interleave () = + let r n = + let uri = Uri.with_query' uri ["test", (string_of_int n)] in + (Request.make uri, Body.empty) in + let (reqs, push) = Lwt_stream.create () in + push (Some (r 1)); + push (Some (r 2)); + Client.callv uri reqs >>= fun resps -> + let resps = Lwt_stream.map_s (fun (_, b) -> Body.to_string b) resps in + Lwt_stream.fold (fun b i -> + Lwt_log.ign_info_f "Request %i\n" i; + begin match i with + | 0 -> assert_equal b "one" + | 1 -> + assert_equal b "two"; + Lwt_log.ign_info "Sending extra request"; + push (Some (r 3)) + | 2 -> + assert_equal b "three"; + push None; + | x -> assert_failure ("Test failed with " ^ string_of_int x) + end; + succ i + ) resps 0 >|= fun l -> + assert_equal l 3 + in [ "sanity test", t ; "empty chunk test", empty_chunk ; "pipelined chunk test", pipelined_chunk ; "no body when response is not modified", not_modified_has_no_body + ; "pipelined with interleaving requests", pipelined_interleave ] end diff --git a/lwt-core/cohttp_lwt.ml b/lwt-core/cohttp_lwt.ml index 1d07a291b7..7cb12e9704 100644 --- a/lwt-core/cohttp_lwt.ml +++ b/lwt-core/cohttp_lwt.ml @@ -206,32 +206,29 @@ module Make_client let callv ?(ctx=default_ctx) uri reqs = Net.connect_uri ~ctx uri >>= fun (conn, ic, oc) -> (* Serialise the requests out to the wire *) - Lwt_stream.fold_s (fun (req,body) meths -> + let meth_stream = Lwt_stream.map_s (fun (req,body) -> Request.write (fun writer -> Cohttp_lwt_body.write_body (Request.write_body writer) body ) req oc >>= fun () -> - return ((Request.meth req)::meths) - ) reqs [] >>= fun meths -> + return (Request.meth req) + ) reqs in (* Read the responses. For each response, ensure that the previous response has consumed the body before continuing to the next response because HTTP/1.1-pipelining cannot be interleaved. *) - let meth_stream = Lwt_stream.of_list (List.rev meths) in let read_m = Lwt_mutex.create () in let last_body = ref None in - let resps = Lwt_stream.from (fun () -> - let closefn () = Lwt_mutex.unlock read_m in - Lwt_stream.get meth_stream >>= function - | None -> return_none - | Some meth -> - begin match !last_body with None -> return_unit | Some body -> - Cohttp_lwt_body.drain_body body - end >>= fun () -> - Lwt_mutex.with_lock read_m (fun () -> read_response ~closefn ic oc meth) - >|= (fun ((_,body) as x) -> - last_body := Some body; - Some x - ) - ) in + let closefn () = Lwt_mutex.unlock read_m in + let resps = Lwt_stream.map_s (fun meth -> + begin match !last_body with + | None -> return_unit + | Some body -> Cohttp_lwt_body.drain_body body + end >>= fun () -> + Lwt_mutex.with_lock read_m (fun () -> read_response ~closefn ic oc meth) + >|= (fun ((_,body) as x) -> + last_body := Some body; + x + ) + ) meth_stream in Lwt_stream.on_terminate resps (fun () -> Net.close ic oc); return resps end