Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 32 additions & 1 deletion lib_test/test_sanity.ml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,11 @@ let server =
Server.respond ~status:`OK ~body:(Body.of_string_list chunk_body) ();
Server.respond ~status:`OK ~body:(Body.of_string "") ();
(* not modified *)
Server.respond ~status:`Not_modified ~body:Body.empty ()
Server.respond ~status:`Not_modified ~body:Body.empty ();
(* pipelined_interleave *)
Server.respond_string ~status:`OK ~body:"one" ();
Server.respond_string ~status:`OK ~body:"two" ();
Server.respond_string ~status:`OK ~body:"three" ();
]
|> List.map const
|> response_sequence
Expand Down Expand Up @@ -63,10 +67,37 @@ let ts =
Transfer.Unknown (Header.get_transfer_encoding headers);
body |> Body.is_empty >|= fun is_empty ->
assert_bool "No body returned when not modified" is_empty in
let pipelined_interleave () =
let r n =
let uri = Uri.with_query' uri ["test", (string_of_int n)] in
(Request.make uri, Body.empty) in
let (reqs, push) = Lwt_stream.create () in
push (Some (r 1));
push (Some (r 2));
Client.callv uri reqs >>= fun resps ->
let resps = Lwt_stream.map_s (fun (_, b) -> Body.to_string b) resps in
Lwt_stream.fold (fun b i ->
Lwt_log.ign_info_f "Request %i\n" i;
begin match i with
| 0 -> assert_equal b "one"
| 1 ->
assert_equal b "two";
Lwt_log.ign_info "Sending extra request";
push (Some (r 3))
| 2 ->
assert_equal b "three";
push None;
| x -> assert_failure ("Test failed with " ^ string_of_int x)
end;
succ i
) resps 0 >|= fun l ->
assert_equal l 3
in
[ "sanity test", t
; "empty chunk test", empty_chunk
; "pipelined chunk test", pipelined_chunk
; "no body when response is not modified", not_modified_has_no_body
; "pipelined with interleaving requests", pipelined_interleave
]
end

Expand Down
33 changes: 15 additions & 18 deletions lwt-core/cohttp_lwt.ml
Original file line number Diff line number Diff line change
Expand Up @@ -206,32 +206,29 @@ module Make_client
let callv ?(ctx=default_ctx) uri reqs =
Net.connect_uri ~ctx uri >>= fun (conn, ic, oc) ->
(* Serialise the requests out to the wire *)
Lwt_stream.fold_s (fun (req,body) meths ->
let meth_stream = Lwt_stream.map_s (fun (req,body) ->
Request.write (fun writer ->
Cohttp_lwt_body.write_body (Request.write_body writer) body
) req oc >>= fun () ->
return ((Request.meth req)::meths)
) reqs [] >>= fun meths ->
return (Request.meth req)
) reqs in
(* Read the responses. For each response, ensure that the previous
response has consumed the body before continuing to the next
response because HTTP/1.1-pipelining cannot be interleaved. *)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment seems a little wrong. Does it mean HTTP/1.1 pipelining cannot be out-of-order? It is called pipelining...

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On 06/29/2015 04:08 PM, David Sheets wrote:

In lwt-core/cohttp_lwt.ml #379 (comment):

   Request.write (fun writer ->
     Cohttp_lwt_body.write_body (Request.write_body writer) body
   ) req oc >>= fun () ->
  •  return ((Request.meth req)::meths)
    
  • ) reqs [] >>= fun meths ->
  •  return (Request.meth req)
    
  • ) reqs in
    (* Read the responses. For each response, ensure that the previous
    response has consumed the body before continuing to the next
    response because HTTP/1.1-pipelining cannot be interleaved. *)

This comment seems a little wrong. Does it mean HTTP/1.1 pipelining cannot be /out-of-order/? It is called /pipelining/...

The server is free to execute the requests in whatever order as long as it ensures that the replies are sent in the same order
as the requests: https://tools.ietf.org/html/rfc7230#page-54

There is nothing special in HTTP/1.1 pipelining that would allow clients to match responses with requests[1],
hence the pipelineing blacklists you see in tools like curl to deal with buggy servers that send replies out-of-order.

FWIW Firefox has pipelineing disabled by default, and chromium is about to remove it too: https://bugzilla.mozilla.org/show_bug.cgi?id=264354
Still I think it'd be useful for cohttp in situations where you know the server works correctly, in other situations you can still use persistence by specifying pipeline depth of 0.

[1] there was (a now expired) draft to add a header Assoc-Req: https://tools.ietf.org/html/rfc7230#page-54

let meth_stream = Lwt_stream.of_list (List.rev meths) in
let read_m = Lwt_mutex.create () in
let last_body = ref None in
let resps = Lwt_stream.from (fun () ->
let closefn () = Lwt_mutex.unlock read_m in
Lwt_stream.get meth_stream >>= function
| None -> return_none
| Some meth ->
begin match !last_body with None -> return_unit | Some body ->
Cohttp_lwt_body.drain_body body
end >>= fun () ->
Lwt_mutex.with_lock read_m (fun () -> read_response ~closefn ic oc meth)
>|= (fun ((_,body) as x) ->
last_body := Some body;
Some x
)
) in
let closefn () = Lwt_mutex.unlock read_m in
let resps = Lwt_stream.map_s (fun meth ->
begin match !last_body with
| None -> return_unit
| Some body -> Cohttp_lwt_body.drain_body body
end >>= fun () ->
Lwt_mutex.with_lock read_m (fun () -> read_response ~closefn ic oc meth)
>|= (fun ((_,body) as x) ->
last_body := Some body;
x
)
) meth_stream in
Lwt_stream.on_terminate resps (fun () -> Net.close ic oc);
return resps
end
Expand Down