Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion CHANGES
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
0.17.2 (trunk):
0.17.3 (trunk):
* Add Cohttp_async.Client.callv. Allows for making requests while reusing an
HTTP conncetion (#344)

0.17.2 (2015-05-24):
* Remove dependency on the Lwt Camlp4 syntax extension (#334).
* Add `make github` target to push documentation to GitHub Pages
(#338 from Jyotsna Prakash).
Expand Down
66 changes: 51 additions & 15 deletions async/cohttp_async.ml
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ let pipe_of_body read_chunk ic =
| Done -> return (`Finished ())
) in
don't_wait_for (finished >>| fun () -> Pipe.close wr);
rd
(rd, finished)

module Body = struct
module B = Cohttp.Body
Expand Down Expand Up @@ -156,25 +156,60 @@ end

module Client = struct

let read_request ic =
Response.read ic >>| function
| `Eof -> failwith "Connection closed by remote host"
| `Invalid reason -> failwith reason
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the Async convention here to raise Failure or is there some other more descriptive exception? I'm not sure what more idiomatic Async code does.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that defining an Invalid_response exception would be fine.

But this PR doesn't introduce any kind of new error handling. This function was simply extracted out of request to be reusable in callv.

However, you remind of a good point. I think that both async's and lwt's callv shouldn't raise at all because it might be possible that raising on a bad response could possibly make it impossible for the user to retrieve the previous good responses (although the connection should be terminated regardless)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rgrinberg Yes that's fine. Would your thought be that another PR would introduce a new error handling mechanism? As you point out the current error handling strategy is not very useful in the wild. It seems that one wants a (Response.t * Body.t) Or_error.t Pipe.Reader.t Deferred.t instead which would allow one to figure out what is going on with each response.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes @trevorsummerssmith I think that's a better return type. However the connection will probably be terminated after the first error. If the body or response is corrupted or unreadable somehow it probably doesn't make much sense to attempt reading the next response. So you will get at most one error.

Just making sure that's clear.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes that is clear, and seems to me to be the correct behavior. That allows one to process, and make decisions about retries.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I don't think that my previous characterization of the issue is correct. The current implementation makes it impossible to read all responses that were successfully written to the pipe.

Some care will be necessary however because an exception might be thrown before the corresponding response is actually processed in the pipe. But you should be able to catch it and iterate over the pipe regardless.

| `Ok res ->
(* Build a response pipe for the body *)
let reader = Response.make_body_reader res ic in
let (rd, finished_read) =
pipe_of_body (fun ic -> Response.read_body_chunk reader) ic in
(res, `Pipe rd, finished_read)

let request ?interrupt ?(body=`Empty) req =
(* Connect to the remote side *)
Net.connect_uri ?interrupt req.Request.uri
>>= fun (ic,oc) ->
Request.write (fun writer -> Body.write Request.write_body body writer) req oc
>>= fun () ->
Response.read ic
>>| function
| `Eof -> failwith "Connection closed by remote host"
| `Invalid reason -> failwith reason
| `Ok res ->
(* Build a response pipe for the body *)
let reader = Response.make_body_reader res ic in
let rd = pipe_of_body (fun ic -> Response.read_body_chunk reader) ic in
don't_wait_for (
Pipe.closed rd >>= fun () ->
Deferred.all_ignore [Reader.close ic; Writer.close oc]
);
res, `Pipe rd
read_request ic >>| fun (resp, body, body_finished) ->
don't_wait_for (
body_finished >>= fun () ->
Deferred.all_ignore [Reader.close ic; Writer.close oc]);
(resp, body)

let callv ?interrupt uri reqs =
let reqs_c = ref 0 in
let resp_c = ref 0 in
Net.connect_uri ?interrupt uri >>| fun (ic, oc) ->
reqs
|> Pipe.iter ~f:(fun (req, body) ->
incr reqs_c;
Request.write (fun writer -> Body.write Request.write_body body writer)
req oc)
|> don't_wait_for;
let last_body = ref None in
let responses = Reader.read_all ic (fun ic ->
let last_body_drained =
match !last_body with
| None -> Deferred.unit
| Some b -> b in
last_body_drained >>= fun () ->
if Pipe.is_closed reqs && (!resp_c >= !reqs_c)
then return `Eof
else
ic |> read_request >>| fun (resp, body, body_finished) ->
incr resp_c;
last_body := Some body_finished;
`Ok (resp, body)
) in
don't_wait_for (
Pipe.closed reqs >>= fun () ->
Pipe.closed responses >>= fun () ->
Writer.close oc
);
responses

let call ?interrupt ?headers ?(chunked=false) ?(body=`Empty) meth uri =
(* Create a request, then make the request.
Expand Down Expand Up @@ -242,7 +277,8 @@ module Server = struct
| `No | `Unknown -> `Empty
| `Yes -> (* Create a Pipe for the body *)
let reader = Request.make_body_reader req rd in
`Pipe (pipe_of_body (fun ic -> Request.read_body_chunk reader) rd)
let (p, _) = pipe_of_body (fun ic -> Request.read_body_chunk reader) rd in
`Pipe p

let handle_client handle_request sock rd wr =
let last_body_pipe_drained = ref (Ivar.create ()) in
Expand Down
6 changes: 6 additions & 0 deletions async/cohttp_async.mli
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,12 @@ module Client : sig
Uri.t ->
(Response.t * Body.t) Deferred.t

val callv :
?interrupt:unit Deferred.t ->
Uri.t ->
(Request.t * Body.t) Pipe.Reader.t ->
(Response.t * Body.t) Pipe.Reader.t Deferred.t

(** Send an HTTP GET request *)
val get :
?interrupt:unit Deferred.t ->
Expand Down
32 changes: 29 additions & 3 deletions lib_test/test_async_integration.ml
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,24 @@ let chunk_body = ["one"; ""; " "; "bar"; ""]

let large_string = String.make (Int.pow 2 16) 'A'

let response_bodies = [ "Testing"
; "Foo bar" ]

let ok s = Server.respond `OK ~body:(Body.of_string s)

let server =
response_sequence [
(* empty_chunk *)
[ (* empty_chunk *)
const @@ Server.respond `OK ~body:(Body.of_string_list chunk_body);
(* large response *)
const @@ Server.respond_with_string large_string;
(* large request *)
(fun _ body ->
body |> Body.to_string >>| String.length >>= fun len ->
Server.respond_with_string (Int.to_string len))
]
] @ (* pipelined_chunk *)
(response_bodies |> List.map ~f:(Fn.compose const ok))
|> response_sequence


let ts =
test_server_s server begin fun uri ->
Expand All @@ -37,9 +44,28 @@ let ts =
>>= fun (_, body) ->
body |> Body.to_string >>| fun s ->
assert_equal (String.length large_string) (Int.of_string s) in
let pipelined_chunk () =
let printer x = x in
let reqs = [
Request.make ~meth:`POST uri, (Body.of_string "foo");
Request.make ~meth:`POST uri, (Body.of_string "bar");
] in
let body_q = response_bodies |> Queue.of_list in
reqs
|> Pipe.of_list
|> Client.callv uri >>= fun responses -> responses
|> Pipe.to_list
>>= fun resps -> resps
|> Deferred.List.iter ~f:(fun (resp, body) ->
let expected_body = body_q |> Queue.dequeue_exn in
body |> Body.to_string >>| fun body ->
assert_equal ~printer expected_body body
)
in
[ "empty chunk test", empty_chunk
; "large response", large_response
; "large request", large_request
; "pipelined chunk test", pipelined_chunk
]
end

Expand Down